import io from "socket.io-client"; import cp from "child_process"; import modes from "../modes.js"; import events from "../events.js"; export { default as events } from "../events.js"; export { default as modes } from "../modes.js"; // Data Stream Types const ERR = "e"; const OUT = "o"; export default class Executor { constructor(config, payload) { this.url = config.url(payload) ?? process.env.QUALITEER_URL; this.jobId = config.jobId(payload) ?? process.env.QUALITEER_JOB_ID; this.command = config.command(payload) ?? process.env.QUALITEER_COMMAND; this.mode = modes.EXEC; // Internal Buffer this.buf = {}; this.buf[ERR] = ""; this.buf[OUT] = ""; // Methods this.spawn = this.spawn.bind(this); this.report = this.report.bind(this); this.onProcClose = this.onProcClose.bind(this); this.onClose = this.onClose.bind(this); } spawn() { const cmdArgs = this.command; const cmd = cmdArgs.shift(); this.proc = cp.spawn(cmd, cmdArgs); // Set Encoding this.proc.stdout.setEncoding("utf8"); this.proc.stderr.setEncoding("utf8"); // Process Events this.proc.stdout.on("data", (d) => this.report(d.toString(), OUT)); this.proc.stderr.on("data", (d) => this.report(d.toString(), ERR)); this.proc.on("close", this.onProcClose); } runJob() { this.socket = io(this.url, { query: { mode: this.mode, jobId: this.jobId }, }); this.socket.on("connect", this.spawn); this.socket.on("disconnect", this.onClose); } onClose() { console.log("Server disconnected, terminating process."); if (this.proc) this.proc.kill("SIGKILL"); } onProcClose(code) { this.socket.emit(events.JOB_CLS, code, () => this.socket.disconnect()); console.log(`Process finished with code ${code}`); } report(d, dType) { this.buf[dType] += d; if (!this.buf[dType].includes("\n")) return; if (this.buf[dType].endsWith("\n")) this.buf[dType] = this.buf[dType].slice(0, -1); this.socket.emit(events.JOB_REP, this.buf[dType]); if (dType === ERR) console.error(`err: ${this.buf[dType]}`); else console.log(`out: ${this.buf[dType]}`); this.buf[dType] = ""; } }