Rewrote k8s handling
This commit is contained in:
parent
dd97d013cb
commit
bb6a6396dc
7 changed files with 100 additions and 95 deletions
BIN
bin/executor
BIN
bin/executor
Binary file not shown.
|
@ -1,8 +1,11 @@
|
||||||
import { v4 } from "uuid";
|
import { v4 } from "uuid";
|
||||||
import applyJob from "./k8s/kubernetes.js";
|
import applyJobInternally from "./k8s/k8s-internal.js";
|
||||||
|
import applyJob from "./k8s/k8s.js";
|
||||||
import buildJob from "./job-builder.js";
|
import buildJob from "./job-builder.js";
|
||||||
|
|
||||||
const maxJobs = process.env.MAX_JOBS ? parseInt(process.env.MAX_JOBS) : 3;
|
const maxJobs = process.env.MAX_JOBS ? parseInt(process.env.MAX_JOBS) : 3;
|
||||||
|
const internalDeploy = process.env.INTERNAL_DEPLOY === "true";
|
||||||
|
const launchJob = internalDeploy ? applyJobInternally : applyJob;
|
||||||
|
|
||||||
class JobManager {
|
class JobManager {
|
||||||
constructor() {
|
constructor() {
|
||||||
|
@ -36,18 +39,11 @@ class JobManager {
|
||||||
newJob(jobRequest, id) {
|
newJob(jobRequest, id) {
|
||||||
if (!jobRequest) throw Error("Request Must Be Object!");
|
if (!jobRequest) throw Error("Request Must Be Object!");
|
||||||
if (!this.clients[id]) this.clients[id] = { jobs: [] };
|
if (!this.clients[id]) this.clients[id] = { jobs: [] };
|
||||||
const client = this.clients[id];
|
|
||||||
if (
|
|
||||||
client.jobs.filter((j) => j.exitcode === undefined).length >=
|
|
||||||
this.clientMaxJobs
|
|
||||||
)
|
|
||||||
throw Error("Client's Active Jobs Exceeded!");
|
|
||||||
|
|
||||||
const job = buildJob(jobRequest, id);
|
const job = buildJob(jobRequest, id);
|
||||||
job.id = v4();
|
job.id = v4();
|
||||||
job.log = [];
|
job.log = [];
|
||||||
this.clients[id].jobs.push(job);
|
this.clients[id].jobs.push(job);
|
||||||
applyJob(job);
|
launchJob(job);
|
||||||
return { ...job };
|
return { ...job };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
59
lib/jobs/k8s/k8s-common.js
Normal file
59
lib/jobs/k8s/k8s-common.js
Normal file
|
@ -0,0 +1,59 @@
|
||||||
|
import fs from "node:fs";
|
||||||
|
import path from "node:path";
|
||||||
|
const {
|
||||||
|
QUALITEER_EXECUTOR_URL,
|
||||||
|
QUALITEER_EXECUTOR_USE_SCRIPT,
|
||||||
|
QUALITEER_EXECUTOR_BIN,
|
||||||
|
QUALITEER_EXECUTOR_BIN_URL,
|
||||||
|
} = process.env;
|
||||||
|
|
||||||
|
const executorUrl = QUALITEER_EXECUTOR_URL;
|
||||||
|
const executorAsScript = QUALITEER_EXECUTOR_USE_SCRIPT === "true";
|
||||||
|
const executorBin = QUALITEER_EXECUTOR_BIN ?? `qltr-executor`;
|
||||||
|
const executorBinFetchUrl = QUALITEER_EXECUTOR_BIN_URL;
|
||||||
|
|
||||||
|
const jobsDir = "jobs/";
|
||||||
|
const jobsPath = path.resolve(jobsDir);
|
||||||
|
const defaultsFile = path.resolve("./lib/jobs/k8s/k8s-job.json");
|
||||||
|
const defaults = JSON.parse(fs.readFileSync(defaultsFile));
|
||||||
|
|
||||||
|
function wrapCommand(jobId, command) {
|
||||||
|
const bin = executorAsScript
|
||||||
|
? `node ${executorBin}`
|
||||||
|
: `chmod +x ${executorBin} && ./${executorBin}`;
|
||||||
|
const executorPayload = JSON.stringify({ jobId, command, url: executorUrl });
|
||||||
|
const payload = Buffer.from(executorPayload, "utf8").toString("base64");
|
||||||
|
const curlCmd = `if ! [ -f qltr-executor ]; then curl -o qltr-executor ${executorBinFetchUrl}; fi || true && ${bin} ${payload}`;
|
||||||
|
return curlCmd;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function jobBuilder(jobRequest) {
|
||||||
|
const { resources, name, image, command, id: jobId } = jobRequest;
|
||||||
|
// Safety Checks
|
||||||
|
if (!jobId) throw Error("'jobId' required!");
|
||||||
|
if (!name) throw Error("'name' required!");
|
||||||
|
if (!command) throw Error("'command' required!");
|
||||||
|
if (!image) throw Error("'image' required!");
|
||||||
|
if (!Array.isArray(command)) throw Error("'command' must be an array!");
|
||||||
|
|
||||||
|
// Apply configuration
|
||||||
|
const job = { ...defaults };
|
||||||
|
job.metadata.name = `qltr-${name}-${jobId}`;
|
||||||
|
const container = job.spec.template.spec.containers[0];
|
||||||
|
container.name = job.metadata.name;
|
||||||
|
container.command = wrapCommand(jobId, command);
|
||||||
|
container.image = JSON.stringify(image);
|
||||||
|
// Apply resources
|
||||||
|
job.resources = { ...job.resources, ...resources };
|
||||||
|
return job;
|
||||||
|
}
|
||||||
|
|
||||||
|
export const createFile = (job) => {
|
||||||
|
const { name } = job.metadata;
|
||||||
|
if (!fs.existsSync(jobsPath)) fs.mkdirSync(jobsPath);
|
||||||
|
const filePath = path.resolve(jobsDir, `${name}.json`);
|
||||||
|
fs.writeFileSync(filePath, JSON.stringify(job));
|
||||||
|
return filePath;
|
||||||
|
};
|
||||||
|
|
||||||
|
export const deleteFile = (filePath) => fs.unlinkSync(filePath);
|
20
lib/jobs/k8s/k8s-internal.js
Normal file
20
lib/jobs/k8s/k8s-internal.js
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
import cp from "node:child_process";
|
||||||
|
import fs from "node:fs";
|
||||||
|
import path from "node:path";
|
||||||
|
import { jobBuilder, createFile, deleteFile } from "./k8s-common.js";
|
||||||
|
|
||||||
|
// Constants
|
||||||
|
const internalEngine = path.resolve("./lib/jobs/k8s/k8s-internal-engine.js");
|
||||||
|
|
||||||
|
// Functions
|
||||||
|
const applyFileInternally = (filePath) => {
|
||||||
|
const job = fs.readFileSync(filePath, { encoding: "utf8" });
|
||||||
|
cp.fork(internalEngine, [job]);
|
||||||
|
};
|
||||||
|
|
||||||
|
export default async function createJobInternally(jobRequest) {
|
||||||
|
const job = jobBuilder(jobRequest);
|
||||||
|
const filePath = createFile(job);
|
||||||
|
applyFileInternally(filePath);
|
||||||
|
deleteFile(filePath);
|
||||||
|
}
|
16
lib/jobs/k8s/k8s.js
Normal file
16
lib/jobs/k8s/k8s.js
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
import cp from "node:child_process";
|
||||||
|
import { jobBuilder, createFile, deleteFile } from "./k8s-common.js";
|
||||||
|
|
||||||
|
const applyFile = async (filePath) => {
|
||||||
|
const command = `${kubCmd} ${filePath}`;
|
||||||
|
return new Promise((res, rej) =>
|
||||||
|
cp.exec(command, (err, stdout, stderr) => (err && rej(err)) || res(stdout))
|
||||||
|
);
|
||||||
|
};
|
||||||
|
|
||||||
|
export default async function createJob(jobRequest) {
|
||||||
|
const job = jobBuilder(jobRequest);
|
||||||
|
const filePath = createFile(job);
|
||||||
|
applyFile(filePath);
|
||||||
|
deleteFile(filePath);
|
||||||
|
}
|
|
@ -1,86 +0,0 @@
|
||||||
import cp from "child_process";
|
|
||||||
import fs from "fs";
|
|
||||||
import path from "path";
|
|
||||||
|
|
||||||
const internalDeploy = process.env.INTERNAL_DEPLOY === "true";
|
|
||||||
const executorUrl = process.env.EXECUTOR_URL;
|
|
||||||
const executorScriptOnly = process.env.EXECUTOR_SCRIPT_ONLY === "true";
|
|
||||||
const executorBin =
|
|
||||||
process.env.EXECUTOR_BIN ?? `qltr-executor${executorScriptOnly ? ".js" : ""}`;
|
|
||||||
|
|
||||||
const qualiteerUrl =
|
|
||||||
process.env.QUALITEER_URL ?? "file:///home/runner/Qualiteer/bin/executor";
|
|
||||||
|
|
||||||
const kubCmd = "kubectl apply -f";
|
|
||||||
const jobsDir = "jobs/";
|
|
||||||
const defaults = JSON.parse(
|
|
||||||
fs.readFileSync(path.resolve("./lib/jobs/k8s/k8s-job.json"))
|
|
||||||
);
|
|
||||||
|
|
||||||
const wrapCommand = (jobId, command) => {
|
|
||||||
const bin = executorScriptOnly
|
|
||||||
? `node ${executorBin}`
|
|
||||||
: `chmod +x ${executorBin} && ./${executorBin}`;
|
|
||||||
const cmd = command.map((arg) => JSON.stringify(arg));
|
|
||||||
const payload = Buffer.from(
|
|
||||||
JSON.stringify({ jobId, command, url: qualiteerUrl }),
|
|
||||||
"utf8"
|
|
||||||
).toString("base64");
|
|
||||||
const curlCmd = `curl -o qltr-executor ${executorUrl} || true && ${bin} ${payload}`;
|
|
||||||
return curlCmd;
|
|
||||||
};
|
|
||||||
|
|
||||||
const createFile = (job) => {
|
|
||||||
const { name } = job.metadata;
|
|
||||||
const jobsPath = path.resolve(jobsDir);
|
|
||||||
if (!fs.existsSync(jobsPath)) fs.mkdirSync(jobsPath);
|
|
||||||
const filePath = path.resolve(jobsDir, `${name}.json`);
|
|
||||||
fs.writeFileSync(filePath, JSON.stringify(job));
|
|
||||||
return filePath;
|
|
||||||
};
|
|
||||||
|
|
||||||
const applyFileInternally = (filePath) => {
|
|
||||||
const job = fs.readFileSync(filePath, { encoding: "utf8" });
|
|
||||||
cp.fork(path.resolve("./lib/jobs/k8s/k8s-bypass.js"), [job]);
|
|
||||||
};
|
|
||||||
|
|
||||||
const applyFile = async (filePath) => {
|
|
||||||
const command = `${kubCmd} ${filePath}`;
|
|
||||||
return new Promise((res, rej) =>
|
|
||||||
cp.exec(command, (err, stdout, stderr) => (err && rej(err)) || res(stdout))
|
|
||||||
);
|
|
||||||
};
|
|
||||||
|
|
||||||
const deleteFile = (filePath) => fs.unlinkSync(filePath);
|
|
||||||
|
|
||||||
const jobBuilder = (jobRequest) => {
|
|
||||||
const { resources, name, image, command, id: jobId } = jobRequest;
|
|
||||||
|
|
||||||
// Safety Checks
|
|
||||||
if (!jobId) throw Error("'jobId' required!");
|
|
||||||
if (!name) throw Error("'name' required!");
|
|
||||||
if (!command) throw Error("'command' required!");
|
|
||||||
if (!image) throw Error("'image' required!");
|
|
||||||
|
|
||||||
if (!Array.isArray(command)) throw Error("'command' must be an array!");
|
|
||||||
|
|
||||||
// Apply configuration
|
|
||||||
const job = { ...defaults };
|
|
||||||
job.metadata.name = `qltr-${name}-${jobId}`;
|
|
||||||
const container = job.spec.template.spec.containers[0];
|
|
||||||
container.name = job.metadata.name;
|
|
||||||
container.command = wrapCommand(jobId, command);
|
|
||||||
container.image = JSON.stringify(image);
|
|
||||||
|
|
||||||
// Apply resources
|
|
||||||
job.resources = { ...job.resources, ...resources };
|
|
||||||
return job;
|
|
||||||
};
|
|
||||||
|
|
||||||
export default async function createJob(jobRequest) {
|
|
||||||
const job = jobBuilder(jobRequest);
|
|
||||||
const filePath = createFile(job);
|
|
||||||
if (!internalDeploy) await applyFile(filePath);
|
|
||||||
else await applyFileInternally(filePath);
|
|
||||||
deleteFile(filePath);
|
|
||||||
}
|
|
Loading…
Add table
Add a link
Reference in a new issue