diff --git a/bin/executor b/bin/executor index b852dee..2b291d5 100755 Binary files a/bin/executor and b/bin/executor differ diff --git a/lib/jobs/JobManager.js b/lib/jobs/JobManager.js index d90b6dc..97834a5 100644 --- a/lib/jobs/JobManager.js +++ b/lib/jobs/JobManager.js @@ -1,8 +1,11 @@ import { v4 } from "uuid"; -import applyJob from "./k8s/kubernetes.js"; +import applyJobInternally from "./k8s/k8s-internal.js"; +import applyJob from "./k8s/k8s.js"; import buildJob from "./job-builder.js"; const maxJobs = process.env.MAX_JOBS ? parseInt(process.env.MAX_JOBS) : 3; +const internalDeploy = process.env.INTERNAL_DEPLOY === "true"; +const launchJob = internalDeploy ? applyJobInternally : applyJob; class JobManager { constructor() { @@ -36,18 +39,11 @@ class JobManager { newJob(jobRequest, id) { if (!jobRequest) throw Error("Request Must Be Object!"); if (!this.clients[id]) this.clients[id] = { jobs: [] }; - const client = this.clients[id]; - if ( - client.jobs.filter((j) => j.exitcode === undefined).length >= - this.clientMaxJobs - ) - throw Error("Client's Active Jobs Exceeded!"); - const job = buildJob(jobRequest, id); job.id = v4(); job.log = []; this.clients[id].jobs.push(job); - applyJob(job); + launchJob(job); return { ...job }; } diff --git a/lib/jobs/k8s/k8s-common.js b/lib/jobs/k8s/k8s-common.js new file mode 100644 index 0000000..587e76b --- /dev/null +++ b/lib/jobs/k8s/k8s-common.js @@ -0,0 +1,59 @@ +import fs from "node:fs"; +import path from "node:path"; +const { + QUALITEER_EXECUTOR_URL, + QUALITEER_EXECUTOR_USE_SCRIPT, + QUALITEER_EXECUTOR_BIN, + QUALITEER_EXECUTOR_BIN_URL, +} = process.env; + +const executorUrl = QUALITEER_EXECUTOR_URL; +const executorAsScript = QUALITEER_EXECUTOR_USE_SCRIPT === "true"; +const executorBin = QUALITEER_EXECUTOR_BIN ?? `qltr-executor`; +const executorBinFetchUrl = QUALITEER_EXECUTOR_BIN_URL; + +const jobsDir = "jobs/"; +const jobsPath = path.resolve(jobsDir); +const defaultsFile = path.resolve("./lib/jobs/k8s/k8s-job.json"); +const defaults = JSON.parse(fs.readFileSync(defaultsFile)); + +function wrapCommand(jobId, command) { + const bin = executorAsScript + ? `node ${executorBin}` + : `chmod +x ${executorBin} && ./${executorBin}`; + const executorPayload = JSON.stringify({ jobId, command, url: executorUrl }); + const payload = Buffer.from(executorPayload, "utf8").toString("base64"); + const curlCmd = `if ! [ -f qltr-executor ]; then curl -o qltr-executor ${executorBinFetchUrl}; fi || true && ${bin} ${payload}`; + return curlCmd; +} + +export function jobBuilder(jobRequest) { + const { resources, name, image, command, id: jobId } = jobRequest; + // Safety Checks + if (!jobId) throw Error("'jobId' required!"); + if (!name) throw Error("'name' required!"); + if (!command) throw Error("'command' required!"); + if (!image) throw Error("'image' required!"); + if (!Array.isArray(command)) throw Error("'command' must be an array!"); + + // Apply configuration + const job = { ...defaults }; + job.metadata.name = `qltr-${name}-${jobId}`; + const container = job.spec.template.spec.containers[0]; + container.name = job.metadata.name; + container.command = wrapCommand(jobId, command); + container.image = JSON.stringify(image); + // Apply resources + job.resources = { ...job.resources, ...resources }; + return job; +} + +export const createFile = (job) => { + const { name } = job.metadata; + if (!fs.existsSync(jobsPath)) fs.mkdirSync(jobsPath); + const filePath = path.resolve(jobsDir, `${name}.json`); + fs.writeFileSync(filePath, JSON.stringify(job)); + return filePath; +}; + +export const deleteFile = (filePath) => fs.unlinkSync(filePath); diff --git a/lib/jobs/k8s/k8s-bypass.js b/lib/jobs/k8s/k8s-internal-engine.js similarity index 100% rename from lib/jobs/k8s/k8s-bypass.js rename to lib/jobs/k8s/k8s-internal-engine.js diff --git a/lib/jobs/k8s/k8s-internal.js b/lib/jobs/k8s/k8s-internal.js new file mode 100644 index 0000000..cd0936d --- /dev/null +++ b/lib/jobs/k8s/k8s-internal.js @@ -0,0 +1,20 @@ +import cp from "node:child_process"; +import fs from "node:fs"; +import path from "node:path"; +import { jobBuilder, createFile, deleteFile } from "./k8s-common.js"; + +// Constants +const internalEngine = path.resolve("./lib/jobs/k8s/k8s-internal-engine.js"); + +// Functions +const applyFileInternally = (filePath) => { + const job = fs.readFileSync(filePath, { encoding: "utf8" }); + cp.fork(internalEngine, [job]); +}; + +export default async function createJobInternally(jobRequest) { + const job = jobBuilder(jobRequest); + const filePath = createFile(job); + applyFileInternally(filePath); + deleteFile(filePath); +} diff --git a/lib/jobs/k8s/k8s.js b/lib/jobs/k8s/k8s.js new file mode 100644 index 0000000..23aacf5 --- /dev/null +++ b/lib/jobs/k8s/k8s.js @@ -0,0 +1,16 @@ +import cp from "node:child_process"; +import { jobBuilder, createFile, deleteFile } from "./k8s-common.js"; + +const applyFile = async (filePath) => { + const command = `${kubCmd} ${filePath}`; + return new Promise((res, rej) => + cp.exec(command, (err, stdout, stderr) => (err && rej(err)) || res(stdout)) + ); +}; + +export default async function createJob(jobRequest) { + const job = jobBuilder(jobRequest); + const filePath = createFile(job); + applyFile(filePath); + deleteFile(filePath); +} diff --git a/lib/jobs/k8s/kubernetes.js b/lib/jobs/k8s/kubernetes.js deleted file mode 100644 index c2eb66f..0000000 --- a/lib/jobs/k8s/kubernetes.js +++ /dev/null @@ -1,86 +0,0 @@ -import cp from "child_process"; -import fs from "fs"; -import path from "path"; - -const internalDeploy = process.env.INTERNAL_DEPLOY === "true"; -const executorUrl = process.env.EXECUTOR_URL; -const executorScriptOnly = process.env.EXECUTOR_SCRIPT_ONLY === "true"; -const executorBin = - process.env.EXECUTOR_BIN ?? `qltr-executor${executorScriptOnly ? ".js" : ""}`; - -const qualiteerUrl = - process.env.QUALITEER_URL ?? "file:///home/runner/Qualiteer/bin/executor"; - -const kubCmd = "kubectl apply -f"; -const jobsDir = "jobs/"; -const defaults = JSON.parse( - fs.readFileSync(path.resolve("./lib/jobs/k8s/k8s-job.json")) -); - -const wrapCommand = (jobId, command) => { - const bin = executorScriptOnly - ? `node ${executorBin}` - : `chmod +x ${executorBin} && ./${executorBin}`; - const cmd = command.map((arg) => JSON.stringify(arg)); - const payload = Buffer.from( - JSON.stringify({ jobId, command, url: qualiteerUrl }), - "utf8" - ).toString("base64"); - const curlCmd = `curl -o qltr-executor ${executorUrl} || true && ${bin} ${payload}`; - return curlCmd; -}; - -const createFile = (job) => { - const { name } = job.metadata; - const jobsPath = path.resolve(jobsDir); - if (!fs.existsSync(jobsPath)) fs.mkdirSync(jobsPath); - const filePath = path.resolve(jobsDir, `${name}.json`); - fs.writeFileSync(filePath, JSON.stringify(job)); - return filePath; -}; - -const applyFileInternally = (filePath) => { - const job = fs.readFileSync(filePath, { encoding: "utf8" }); - cp.fork(path.resolve("./lib/jobs/k8s/k8s-bypass.js"), [job]); -}; - -const applyFile = async (filePath) => { - const command = `${kubCmd} ${filePath}`; - return new Promise((res, rej) => - cp.exec(command, (err, stdout, stderr) => (err && rej(err)) || res(stdout)) - ); -}; - -const deleteFile = (filePath) => fs.unlinkSync(filePath); - -const jobBuilder = (jobRequest) => { - const { resources, name, image, command, id: jobId } = jobRequest; - - // Safety Checks - if (!jobId) throw Error("'jobId' required!"); - if (!name) throw Error("'name' required!"); - if (!command) throw Error("'command' required!"); - if (!image) throw Error("'image' required!"); - - if (!Array.isArray(command)) throw Error("'command' must be an array!"); - - // Apply configuration - const job = { ...defaults }; - job.metadata.name = `qltr-${name}-${jobId}`; - const container = job.spec.template.spec.containers[0]; - container.name = job.metadata.name; - container.command = wrapCommand(jobId, command); - container.image = JSON.stringify(image); - - // Apply resources - job.resources = { ...job.resources, ...resources }; - return job; -}; - -export default async function createJob(jobRequest) { - const job = jobBuilder(jobRequest); - const filePath = createFile(job); - if (!internalDeploy) await applyFile(filePath); - else await applyFileInternally(filePath); - deleteFile(filePath); -}