import { Manager } from "socket.io-client"; import cp from "child_process"; import modes from "../modes.js"; import events from "../events.js"; export { default as events } from "../events.js"; export { default as modes } from "../modes.js"; // Data Stream Types const ERR = "e"; const OUT = "o"; export default class Executor { constructor(url, job, options = {}) { this.url = url; this.job = job; this.mode = modes.EXEC; // Internal Buffer this.buf = {}; this.buf[ERR] = ""; this.buf[OUT] = ""; // Methods this.spawn = options.spawn ?? this.spawn.bind(this); this.report = options.report ?? this.report.bind(this); this.onProcClose = options.onProcClose ?? this.onProcClose.bind(this); this.onClose = options.onClose ?? this.onClose.bind(this); } spawn() { const cmdArgs = this.job.command; const cmd = cmdArgs.shift(); this.proc = cp.spawn(cmd, cmdArgs); // Set Encoding this.proc.stdout.setEncoding("utf8"); this.proc.stderr.setEncoding("utf8"); // Process Events this.proc.stdout.on("data", (d) => this.report(d.toString(), OUT)); this.proc.stderr.on("data", (d) => this.report(d.toString(), ERR)); this.proc.on("close", this.onProcClose); } runJob() { const mgr = new Manager(this.url, { query: { mode: this.mode, jobId: this.job.id }, }); this.socket = mgr.socket("/"); this.socket.on("connect", this.spawn); this.socket.on("disconnect", this.onClose); } onClose() { console.log("Server disconnected, terminating process."); this.proc.kill("SIGKILL"); } onProcClose(code) { this.socket.emit(events.JOB_CLS, code); console.log(`Process finished with code ${code}`); this.socket.disconnect(); } report(d, dType) { this.buf[dType] += d; if (!this.buf[dType].includes("\n")) return; if (this.buf[dType].endsWith("\n")) this.buf[dType] = this.buf[dType].slice(0, -1); this.socket.emit(events.JOB_REP, this.buf[dType]); if (dType === ERR) console.error(`err: ${this.buf[dType]}`); else console.log(`out: ${this.buf[dType]}`); this.buf[dType] = ""; } }