// Imports import { Worker } from "rabbiteer"; // Constants const jobQueueName = "KubeJobs"; const jobQueueRoutingName = "KubeJobsRouting"; const exchangeName = "KubeJobsExchange"; const setQueues = { SET1Sec: 1000, SET5Sec: 5000, SET10Sec: 10_000, SET30Sec: 30_000, SET1Min: 60_000, SET5Min: 60_000 * 5, SET10Min: 60_000 * 10, SET15Min: 60_000 * 15, SET30Min: 60_000 * 30, SET1Hr: 360_000, SET2Hr: 360_000 * 2, SET3Hr: 360_000 * 3, SET4Hr: 360_000 * 4, }; // Class export default class KubeJobsWorker extends Worker { constructor(skio) { super(jobQueueName); this.skio = skio; } async configure(ch) { await ch.assertExchange(exchangeName, "direct"); await ch.assertQueue(this.queue, this.queueOptions); await ch.bindQueue(this.queue, exchangeName, jobQueueRoutingName); await this.configureSetQueues(ch); await ch.consume(this.queue, (msg) => this.consume(msg, () => ch.ack(msg))); } // Configure set queues that will all filter into this queue async configureSetQueues(ch) { await Promise.all(Object.keys(setQueues).map((k)=> ch.assertQueue(k, { messageTtl: setQueues[k], deadLetterExchange: exchangeName, deadLetterRoutingKey: jobQueueRoutingName }))) } onMessage(string) { console.log(string); } }