Revamp Job flow

This commit is contained in:
Elijah Dunemask 2022-10-15 11:47:47 +00:00
parent 945afdfbbe
commit 4a0a4b29a5
86 changed files with 592 additions and 608 deletions

View file

@ -0,0 +1,38 @@
import { URL } from "node:url";
import path from "node:path";
import caxa from "caxa";
import { rollup } from "rollup";
import loadConfigFile from "rollup/loadConfigFile";
import { executorLibraryDir, binName, scriptName } from "./executor-config.js";
// Fix import
const { default: caxaPackage } = caxa;
// Rollup Config
const rollupConfigPath = path.resolve(executorLibraryDir, "rollup.config.js");
// Build functions
async function packageBin() {
console.log("Packaging bundle into binary");
return caxaPackage({
input: "dist/bundles/",
output: `bin/${binName}`,
command: ["{{caxa}}/node_modules/.bin/node", `{{caxa}}/${scriptName}`],
uncompressionMessage: "Unpacking, please wait...",
});
}
async function rollupBundle() {
console.log("Rolling up executor into bundle");
const { options, warnings } = await loadConfigFile(rollupConfigPath);
if (warnings.count !== 0)
console.log(`Rollup has ${warnings.count} warnings`);
warnings.flush();
for (const optionsObj of options) {
const bundle = await rollup(optionsObj);
await Promise.all(optionsObj.output.map(bundle.write));
}
}
await rollupBundle();
await packageBin();
console.log("Done");

View file

@ -0,0 +1,5 @@
export const executorLibraryDir = new URL(".", import.meta.url).pathname;
export const binName = "qltr-executor";
export const configName = "executor.config.mjs";
export const scriptName = "qualiteer-executor.mjs";
export const entrypointName = "executor-entrypoint.js";

View file

@ -1,9 +1,9 @@
import path from "node:path";
import Executor from "../../sockets/clients/Executor.js";
import Executor from "../sockets/clients/Executor.js";
import { normalize } from "./executor-configurator.js";
const { default: executorConfig } = await import(
path.resolve("executor.config.js")
);
import { configName as executorConfigName } from "./executor-config.js";
const executorConfigPath = path.resolve(executorConfigName);
const { default: executorConfig } = await import(executorConfigPath);
// Load config and args
const args = process.argv.slice(2);

View file

@ -1,11 +1,17 @@
import path from "node:path";
import { nodeResolve } from "@rollup/plugin-node-resolve";
import commonjs from "@rollup/plugin-commonjs";
import { terser } from "rollup-plugin-terser";
import {
executorLibraryDir,
entrypointName,
scriptName,
} from "./executor-config.js";
export default {
input: "lib/jobs/executor/executor-entrypoint.js",
input: path.resolve(executorLibraryDir, entrypointName),
output: {
file: "dist/bundles/qualiteer-executor.mjs",
file: `dist/bundles/${scriptName}`,
},
plugins: [nodeResolve(), commonjs(), terser()],
};

View file

@ -1,56 +0,0 @@
<<<<<<< HEAD
CREATE SEQUENCE test_results_id_seq;
CREATE TABLE test_results (
id bigint NOT NULL DEFAULT nextval('test_results_seq') PRIMARY KEY,
test_name varchar(255) DEFAULT NULL,
test_class varchar(255) DEFAULT NULL,
test_method varchar(255) DEFAULT NULL,
test_path varchar(255) DEFAULT NULL,
test_type varchar(32) DEFAULT NULL,
test_timestamp timestamptz NOT NULL DEFAULT now(),
test_retry BOOLEAN DEFAULT FALSE,
origin varchar(255) DEFAULT NULL,
failed BOOLEAN DEFAULT FALSE,
failed_message varchar(2047) DEFAULT NULL,
screenshot_url varchar(255) DEFAULT NULL,
weblog_url varchar(255) DEFAULT NULL,
);
ALTER SEQUENCE test_results_id_seq OWNED BY test_results.id;
=======
> > > > > > > b023d8910c89d80573499890a958c0df649849e1
# Tables
PG Database Tables Mapped Out
<<<<<<< HEAD
## `test_results`
| id | test_name | test_class | test_method | test_path | test_type | test_timestamp | test_retry | origin | failed | failed_message | screenshot_url | weblog_url |
| int | string | string | string | string | string | timestamp | boolean | string | boolean | string | string | string |
| 1 | My Test | My Test Class | My Failing Test Method | My Test Class Path | API | Date.now() | false | Test Suite A | true | Some Failure Messsage | screenshotUrl | weblogUrl |
=======
Table `test_results`
| id | test_name | test_class | test_method | test_path | test_type | test_timestamp | test_retry | origin | failed | failed_message | screenshot_url | weblog_url |
|-----|-----------|------------|-------------|---------------------|---------------|----------------|------------|--------------|---------|------------------------|---------------------|--------------------|
| int | string | string | string | string | string | timestamp | boolean | string | boolean | string | string | string |
| 1 | My Test | My Class | My Method | /path/to/test_class | API/UI/MOBILE | Date.now() | false | Test Suite A | true | I am a test that fails | https://example.com | http://example.com |
> > > > > > > b023d8910c89d80573499890a958c0df649849e1
- id Automatically Generated
- test_name\* Name of test
- test_class\* Name of class
- test_method Name of failed method if failed else null
- test_path Path to test class
- test_type API/UI/Mobile
- test_timestamp UTC Timestamp
- test_retry Should test remedy failed tests
- origin Test Suite test belongs to
- failed Indicates if the test failed or not
- failed_message Failure Message of test or null
- screenshot_url Screenshot of failure
- weblog_url Log from the web console

View file

@ -1,30 +0,0 @@
import pg from "./postgres.js";
import { upsertTest } from "./queries/catalog.js";
import { insertTestResult } from "./queries/results.js";
import { upsertAlertSilence } from "./queries/alerting.js";
import {
seed as catalogSeed,
table as catalogTable,
} from "./seeds/catalog-seed.js";
import {
seed as resultsSeed,
table as resultsTable,
} from "./seeds/results-seed.js";
import {
seed as alertingSeed,
table as alertingTable,
} from "./seeds/alerting-seed.js";
const database = process.env.POSTGRES_DATABASE ?? "qualiteer";
await pg.connect();
const resetAndSeed = async (table, getSeeds, seed) => {
await pg.query(`TRUNCATE ${table} RESTART IDENTITY CASCADE;`);
for (var s of getSeeds()) await seed(s);
};
await resetAndSeed(catalogTable, catalogSeed, upsertTest);
await resetAndSeed(resultsTable, resultsSeed, insertTestResult);
await resetAndSeed(alertingTable, alertingSeed, upsertAlertSilence);
process.exit();

View file

@ -1 +1 @@
export { default } from "./core/Qualiteer.js";
export { default } from "./server/core/Qualiteer.js";

View file

@ -1,50 +0,0 @@
import { URL } from "url";
import loadConfigFile from "rollup/loadConfigFile";
import path from "path";
import { rollup } from "rollup";
import caxa from "caxa";
import { verify, normalize } from "./executor-configurator.js";
const { default: executorConfig } = await import(
path.resolve("executor.config.js")
);
const __dirname = new URL(".", import.meta.url).pathname;
const { default: caxaPackage } = caxa;
function testConfig() {
console.log("Testing config");
verify(normalize(executorConfig([])));
}
async function packageBin() {
console.log("Packaging bundle into binary");
return caxaPackage({
input: "dist/bundles/",
output: "bin/executor",
command: [
"{{caxa}}/node_modules/.bin/node",
"{{caxa}}/qualiteer-executor.mjs",
],
uncompressionMessage: "Unpacking, please wait...",
});
}
async function rollupBundle() {
console.log("Rolling up executor into bundle");
const { options, warnings } = await loadConfigFile(
path.resolve(__dirname, "rollup.config.js")
);
console.log(`Rollup has ${warnings.count} warnings`);
warnings.flush();
for (const optionsObj of options) {
const bundle = await rollup(optionsObj);
await Promise.all(optionsObj.output.map(bundle.write));
}
}
testConfig();
await rollupBundle();
await packageBin();
console.log("Done");

View file

@ -1,53 +0,0 @@
const baseCommand = "node";
const suiteEntry = "tests/assets/suite/runner.js";
const buildCommon = (jobRequest) => {
const { isTriage, ignore, region, testNames } = jobRequest;
const command = [baseCommand, suiteEntry];
// Apply Common Flags
if (isTriage) command.push(`isTriage=${isTriage}`);
if (ignore && ignore.length > 0) console.log("Would ignore", ignore);
if (region) command.push(`region=${region}`);
// Return new request
return { ...jobRequest, command };
};
const buildManual = (jobReq) => {
const { command, testNames } = jobReq;
if (testNames.length > 1)
throw Error("Currently only 1 test can be selected!");
command.push(`test=${testNames[0]}`);
return { ...jobReq, command };
};
const buildTags = (jobReq) => {
const { command, tags, testNames } = jobReq;
if (testNames && testNames.length > 0) {
return console.log("Would run tags as manual");
}
const arg = Buffer.from(JSON.stringify(tags), "utf8").toString("base64");
command.push(`tags=${arg}`);
return { ...jobReq, command };
};
const buildPipeline = (jobReq, socketId) => {
const { command, pipeline } = jobReq;
const { __test: test } = pipeline;
if (!test) throw Error("__test is required for pipeline jobs!");
pipeline.dashboardSocketId = socketId;
const arg = Buffer.from(JSON.stringify(pipeline), "utf8").toString("base64");
command.push(`pipeline=${arg}`);
command.push(`test=${test}`);
return { ...jobReq, command };
};
export default function jobBuilder(jobRequest, id) {
const jobReq = buildCommon(jobRequest, id);
const { pipeline, testNames, tags } = jobReq;
if (pipeline) return buildPipeline(jobReq, id);
else if (tags) return buildTags(jobReq);
else if (testNames) return buildManual(jobReq); //TODO currently does nothing
else throw Error("At least 1 'pipeline or tags or testNames' is required! ");
}

View file

@ -1,9 +0,0 @@
import Executor from "../sockets/clients/Executor.js";
const args = process.argv.slice(2);
const url = args[0];
const jobId = args[1];
const command = args.slice(2);
const job = { id: jobId, command };
const exec = new Executor(url, job, command);
exec.runJob();

View file

@ -1,12 +1,19 @@
import { v4 } from "uuid";
import applyJobInternally from "./k8s/k8s-internal.js";
import applyJob from "./k8s/k8s.js";
import buildJob from "./job-builder.js";
import { getTest } from "../database/queries/catalog.js";
import applyJobInternally from "../k8s/k8s-internal.js";
import applyJob from "../k8s/k8s.js";
const maxJobs = process.env.MAX_JOBS ? parseInt(process.env.MAX_JOBS) : 3;
const internalDeploy = process.env.INTERNAL_DEPLOY === "true";
const launchJob = internalDeploy ? applyJobInternally : applyJob;
async function getTests(job) {
if (job.pipeline) return [await getTest(job.pipeline.__test)];
if (!job.testNames) return [];
const tests = await Promise.all(job.testNames.map((name) => getTest(name)));
return tests;
}
class JobManager {
constructor() {
this.clientMaxJobs = maxJobs;
@ -35,16 +42,21 @@ class JobManager {
closeJob(jobId, exitcode) {
const job = this.getJobById(jobId);
if (!job) return;
job.exitcode = exitcode;
}
newJob(jobRequest, id) {
async newJob(jobRequest, id) {
if (!jobRequest) throw Error("Request Must Be Object!");
if (!this.clients[id]) this.clients[id] = { jobs: [] };
const job = buildJob(jobRequest, id);
const job = { ...jobRequest };
job.image = "registry.dunemask.net/garden/dev/reed:latest";
job.id = v4();
job.log = [];
this.clients[id].jobs.push(job);
job.dashboardSocketId = id;
job.tests = await getTests(job);
for (var t of job.tests) if (!t) throw Error("1 or more tests not found!");
launchJob(job);
return { ...job };
}

View file

@ -7,8 +7,8 @@ import { INFO, OK, logInfo } from "../util/logging.js";
// Import Core Modules
import buildRoutes from "../routes/router.js";
import pg from "../database/postgres.js";
import injectSockets from "../sockets/socket-server.js";
import JobManager from "../jobs/JobManager.js";
import injectSockets from "./socket-server.js";
import JobManager from "./JobManager.js";
import buildRabbiteer from "../rabbit/rabbit-workers.js";
// Constants

View file

@ -1,13 +1,13 @@
import evt from "./events.js";
import evt from "../../common/sockets/events.js";
export const initiator = (socket, jobs) => {
export const initiator = async (socket, jobs) => {
const jobStr = socket.handshake.query.job;
const jobReq = JSON.parse(jobStr);
if (!jobReq || !(jobReq instanceof Object))
throw Error("No 'job' was included in the handshake query");
const job = jobs.newJob(jobReq, socket.id);
const job = await jobs.newJob(jobReq, socket.id);
socket.join(job.id);
socket.emit(evt.JOB_CRT, job.id);
};

View file

@ -1,6 +1,6 @@
import { Server as Skio } from "socket.io";
import evt from "./events.js";
import modes from "./modes.js";
import evt from "../../common/sockets/events.js";
import modes from "../../common/sockets/modes.js";
import { initiator, executor, viewer } from "./client-listeners.js";
@ -13,22 +13,28 @@ const socketDrop = (io, room, id) => {
s.disconnect();
};
const socketConnect = (io, socket, jobs) => {
const socketConnect = async (io, socket, jobs) => {
const { mode } = socket.handshake.query;
switch (mode) {
case modes.INIT:
initiator(socket, jobs);
break;
case modes.EXEC:
executor(io, socket, jobs);
break;
case modes.VIEW:
viewer(socket);
break;
default:
socket.send(evt.ERR, "Invalid Mode!");
socket.disconnect();
break;
try {
switch (mode) {
case modes.INIT:
await initiator(socket, jobs);
break;
case modes.EXEC:
executor(io, socket, jobs);
break;
case modes.VIEW:
viewer(socket);
break;
default:
socket.send(evt.ERR, "Invalid Mode!");
socket.disconnect();
break;
}
} catch (err) {
console.log(err);
socket.send(evt.ERR, err);
socket.disconnect();
}
};

View file

@ -1,4 +1,6 @@
// Imports
import path from "node:path";
import { URL } from "node:url";
import { migrate } from "postgres-migrations";
import createPgp from "pg-promise";
import moment from "moment";
@ -29,7 +31,8 @@ const dbConfig = {
ensureDatabaseExists: true,
};
const migrationsDir = "lib/database/migrations";
const databaseDir = new URL(".", import.meta.url).pathname;
const migrationsDir = path.resolve(databaseDir, "migrations/");
const queryMock = (str) => INFO("POSTGRES MOCK", str);

View file

@ -5,6 +5,7 @@ import {
selectWhereAnyQuery,
onConflictUpdate,
} from "../pg-query.js";
import { WARN } from "../../util/logging.js";
import getFilteredTags from "../tags.js";
import getDelay from "../delays.js";
@ -14,6 +15,21 @@ const PG_DISABLED = process.env.POSTGRES_DISABLED;
import { testsMock, mappingsMock } from "../mocks/catalog-mock.js";
// Queries
export const removeDroppedTests = async (testNames) => {
// BUG: After dropping a test, the id jumps ridiculously high
const pgNames = testNames.map((tn) => `'${tn}'`).join(",");
const query = `DELETE FROM catalog as x where x.name not in (${pgNames});`;
return pg.query(query);
};
export const getTest = async (name) => {
const query = selectWhereAnyQuery(table, { name });
const results = await pg.query(query);
if (results.length > 1)
WARN("CATALOG", `More than 1 test found for '${name}'`);
return results[0];
};
export const getTests = async () => {
if (PG_DISABLED) return testsMock();
const query = `SELECT * from ${table}`;
@ -54,6 +70,12 @@ export const getProjects = async () => {
}
};
export const truncateTests = async () => {
if (PG_DISABLED) return console.log(`Would truncate table ${table}`);
const query = `TRUNCATE ${table} RESTART IDENTITY CASCADE;`;
return await pg.query(query);
};
export const upsertTest = async (test) => {
if (PG_DISABLED) return console.log("Would insert test", test);
const {

View file

@ -1,4 +1,5 @@
import fs from "node:fs";
import { URL } from "node:url";
import path from "node:path";
const {
QUALITEER_EXECUTOR_URL,
@ -14,34 +15,32 @@ const executorBinFetchUrl = QUALITEER_EXECUTOR_BIN_URL;
const jobsDir = "jobs/";
const jobsPath = path.resolve(jobsDir);
const defaultsFile = path.resolve("./lib/jobs/k8s/k8s-job.json");
const defaults = JSON.parse(fs.readFileSync(defaultsFile));
const k8sFolder = new URL(".", import.meta.url).pathname;
const defaultsFilePath = path.resolve(k8sFolder, "k8s-job.json");
const defaults = JSON.parse(fs.readFileSync(defaultsFilePath));
function wrapCommand(jobId, command) {
const bin = executorAsScript
? `node ${executorBin}`
: `chmod +x ${executorBin} && ./${executorBin}`;
const executorPayload = JSON.stringify({ jobId, command, url: executorUrl });
function commandBuilder(jobId, jobRequest) {
const executorPayload = JSON.stringify({
jobId,
jobRequest,
url: executorUrl,
});
const payload = Buffer.from(executorPayload, "utf8").toString("base64");
const curlCmd = `if ! [ -f qltr-executor ]; then curl -o qltr-executor ${executorBinFetchUrl}; fi || true && ${bin} ${payload}`;
return curlCmd;
return [`./${executorBin}`, payload];
}
export function jobBuilder(jobRequest) {
const { resources, name, image, command, id: jobId } = jobRequest;
const { resources, name, image, id: jobId } = jobRequest;
// Safety Checks
if (!jobId) throw Error("'jobId' required!");
if (!name) throw Error("'name' required!");
if (!command) throw Error("'command' required!");
if (!image) throw Error("'image' required!");
if (!Array.isArray(command)) throw Error("'command' must be an array!");
// Apply configuration
const job = { ...defaults };
job.metadata.name = `qltr-${name}-${jobId}`;
job.metadata.name = `qltr-${jobId}`;
const container = job.spec.template.spec.containers[0];
container.name = job.metadata.name;
container.command = wrapCommand(jobId, command);
container.command = commandBuilder(jobId, jobRequest);
container.image = JSON.stringify(image);
// Apply resources
job.resources = { ...job.resources, ...resources };

View file

@ -13,9 +13,19 @@
"name": "qltr-job-test-suite-1",
"image": "node:latest",
"imagePullPolicy": "Always",
"command": ["node", "--version"]
"command": ["node", "--version"],
"envFrom": [
{
"configMapRef": {
"name": "qualiteer-job-environment"
}
}
]
}
],
"imagePullSecrets": [
{ "name": "usw-registry-secret", "namespace": "default" }
],
"restartPolicy": "Never"
}
},

View file

@ -1,13 +1,12 @@
import k8s from "@kubernetes/client-node";
import { INFO, ERR } from "../../util/logging.js";
import { INFO, ERR } from "../util/logging.js";
import { jobBuilder, createFile, deleteFile } from "./k8s-common.js";
export default async function createJob(jobRequest) {
//console.log(await jobRequest.tests);
const job = jobBuilder(jobRequest);
job.spec.template.spec.containers[0].image = "node:latest";
job.spec.template.spec.containers[0].command = ["node", "--version"];
// job.spec.template.spec.containers[0].image = "reed";
// job.spec.template.spec.containers[0].command = "python3 -m pytest -v --tb=no -p no:warnings".split(" ");
job.spec.template.spec.containers[0].image =
"registry.dunemask.net/garden/dev/reed:latest";
const kc = new k8s.KubeConfig();
kc.loadFromCluster();
const batchV1Api = kc.makeApiClient(k8s.BatchV1Api);
@ -17,8 +16,4 @@ export default async function createJob(jobRequest) {
.createNamespacedJob("dunestorm-dunemask", job)
.then((res) => INFO("K8S", `Job ${jobName} created!`))
.catch((err) => ERR("K8S", err));
/*const filePath = createFile(job);
applyFile(filePath);
deleteFile(filePath);*/
}

View file

@ -2,7 +2,7 @@
import { Worker } from "rabbiteer";
import { VERB } from "../../util/logging.js";
import { insertTestResult } from "../../database/queries/results.js";
import evt from "../../sockets/events.js";
import evt from "../../../common/sockets/events.js";
// Class
export default class TestResultsWorker extends Worker {
constructor(skio) {

View file

@ -3,6 +3,8 @@ import {
getTests,
getPipelineMappings,
upsertTest,
truncateTests,
removeDroppedTests,
} from "../database/queries/catalog.js";
const router = Router();
@ -23,9 +25,20 @@ router.get("/pipeline-mappings", (req, res) => {
// Post Routes
router.post("/update", (req, res) => {
if (!req.body) return res.status(400).send("Body required!");
if(!Array.isArray(req.body)) return res.status(400).send("Body must be an array!");
const upserts = Promise.all(req.body.map((catalogItem)=>upsertTest(catalogItem)));
upserts.then(()=>res.sendStatus(200)).catch((e)=>res.status(500).send(e));
if (!Array.isArray(req.body))
return res.status(400).send("Body must be an array!");
const wrongImage = req.body.find(({ image }) => image !== req.body[0].image);
if (wrongImage)
return res.status(400).send("Tests cannot have unique images!");
const testNames = req.body.map(({ name }) => name);
// Upsert new tests
const upserts = Promise.all(
req.body.map((catalogItem) => upsertTest(catalogItem))
);
const dropRm = upserts.then(() => removeDroppedTests(testNames));
dropRm.then(() => res.sendStatus(200)).catch((e) => res.status(500).send(e));
});
export default router;