Minor Adjustments
This commit is contained in:
parent
1084f5d937
commit
f486d50efa
60 changed files with 1965 additions and 127 deletions
|
@ -34,6 +34,7 @@ export default class Qualiteer {
|
|||
this.routes = buildRoutes(this.pg, this.sockets);
|
||||
this.rabbiteer = buildRabbiteer(this.pg, this.sockets);
|
||||
this.app.use(this.routes);
|
||||
OK("INIT", "Initialized!");
|
||||
}
|
||||
|
||||
async _connect() {
|
||||
|
|
|
@ -3,8 +3,8 @@ import evt from "../../common/sockets/events.js";
|
|||
export const initiator = async (socket, jobs) => {
|
||||
const jobStr = socket.handshake.query.job;
|
||||
const jobReq = JSON.parse(jobStr);
|
||||
|
||||
if (!jobReq || !(jobReq instanceof Object))
|
||||
|
||||
if (!jobReq || !(jobReq instanceof Object))
|
||||
throw Error("No 'job' was included in the handshake query");
|
||||
|
||||
const job = await jobs.newJob(jobReq, socket.id);
|
||||
|
|
|
@ -49,7 +49,6 @@ const applySockets = (server, jobs, options) => {
|
|||
io.on("connection", (socket) => socketConnect(io, socket, jobs));
|
||||
io.of("/").adapter.on("leave-room", (room, id) => socketDrop(io, room, id));
|
||||
return io;
|
||||
cle;
|
||||
};
|
||||
|
||||
export default applySockets;
|
||||
|
|
|
@ -5,6 +5,7 @@ import { migrate } from "postgres-migrations";
|
|||
import createPgp from "pg-promise";
|
||||
import moment from "moment";
|
||||
import { INFO, WARN, OK, VERB } from "../util/logging.js";
|
||||
|
||||
// Environment Variables
|
||||
const {
|
||||
QUALITEER_POSTGRES_DATABASE: database,
|
||||
|
@ -46,7 +47,7 @@ const connect = (pg) => async () => {
|
|||
// Override fake methods
|
||||
const pgInstance = pgp(dbConfig);
|
||||
for (var k in pgInstance) pg[k] = pgInstance[k];
|
||||
VERB("POSTGRES", "Migrated Successfully");
|
||||
VERB("POSTGRES", "Migrated Successfully!");
|
||||
await pg.connect();
|
||||
VERB("POSTGRES", "Postgres connected Successfully!");
|
||||
|
||||
|
|
48
lib/server/rabbit/workers/KubeJobsWorker.js
Normal file
48
lib/server/rabbit/workers/KubeJobsWorker.js
Normal file
|
@ -0,0 +1,48 @@
|
|||
// Imports
|
||||
import { Worker } from "rabbiteer";
|
||||
|
||||
// Constants
|
||||
const jobQueueName = "KubeJobs";
|
||||
const jobQueueRoutingName = "KubeJobsRouting";
|
||||
const exchangeName = "KubeJobsExchange";
|
||||
const setQueues = {
|
||||
SET1Sec: 1000,
|
||||
SET5Sec: 5000,
|
||||
SET10Sec: 10_000,
|
||||
SET30Sec: 30_000,
|
||||
SET1Min: 60_000,
|
||||
SET5Min: 60_000 * 5,
|
||||
SET10Min: 60_000 * 10,
|
||||
SET15Min: 60_000 * 15,
|
||||
SET30Min: 60_000 * 30,
|
||||
SET1Hr: 360_000,
|
||||
SET2Hr: 360_000 * 2,
|
||||
SET3Hr: 360_000 * 3,
|
||||
SET4Hr: 360_000 * 4,
|
||||
};
|
||||
|
||||
// Class
|
||||
export default class KubeJobsWorker extends Worker {
|
||||
constructor(skio) {
|
||||
super(jobQueueName);
|
||||
this.skio = skio;
|
||||
}
|
||||
|
||||
async configure(ch) {
|
||||
await ch.assertExchange(exchangeName, "direct");
|
||||
await ch.assertQueue(this.queue, this.queueOptions);
|
||||
await ch.bindQueue(this.queue, exchangeName, jobQueueRoutingName);
|
||||
await this.configureSetQueues(ch);
|
||||
await ch.consume(this.queue, (msg) => this.consume(msg, () => ch.ack(msg)));
|
||||
}
|
||||
|
||||
// Configure set queues that will all filter into this queue
|
||||
async configureSetQueues(ch) {
|
||||
await Promise.all(Object.keys(setQueues).map((k)=>
|
||||
ch.assertQueue(k, { messageTtl: setQueues[k], deadLetterExchange: exchangeName, deadLetterRoutingKey: jobQueueRoutingName })))
|
||||
}
|
||||
|
||||
onMessage(string) {
|
||||
console.log(string);
|
||||
}
|
||||
}
|
|
@ -1,4 +1,5 @@
|
|||
import KubeJobsWorker from "./KubeJobsWorker.js";
|
||||
import TestResultsWorker from "./TestResultsWorker.js";
|
||||
|
||||
const buildWorkers = (skio) => [new TestResultsWorker(skio)];
|
||||
const buildWorkers = (skio) => [new TestResultsWorker(skio), new KubeJobsWorker(skio)];
|
||||
export default buildWorkers;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue