diff --git a/ts/jobs/JobQueue.ts b/ts/jobs/JobQueue.ts index b4d6256c96..37dbee052d 100644 --- a/ts/jobs/JobQueue.ts +++ b/ts/jobs/JobQueue.ts @@ -15,7 +15,7 @@ const noopOnCompleteCallbacks = { reject: noop, }; -type JobQueueOptions = { +type JobQueueOptions = { /** * The backing store for jobs. Typically a wrapper around the database. */ @@ -32,38 +32,13 @@ type JobQueueOptions = { * the job to fail; a value of 2 will allow the job to fail once; etc. */ maxAttempts: number; - - /** - * `parseData` will be called with the raw data from `store`. For example, if the job - * takes a single number, `parseData` should throw if `data` is a number and should - * return the number otherwise. - * - * If it throws, the job will be deleted from the store and the job will not be run. - * - * Will only be called once per job, even if `maxAttempts > 1`. - */ - parseData: (data: unknown) => T; - - /** - * Run the job, given data. - * - * If it resolves, the job will be deleted from the store. - * - * If it rejects, the job will be retried up to `maxAttempts - 1` times, after which it - * will be deleted from the store. - */ - run: (job: Readonly>) => Promise; }; -export class JobQueue { +export abstract class JobQueue { private readonly maxAttempts: number; - private readonly parseData: (data: unknown) => T; - private readonly queueType: string; - private readonly run: (job: Readonly>) => Promise; - private readonly store: JobQueueStore; private readonly logPrefix: string; @@ -78,7 +53,7 @@ export class JobQueue { private started = false; - constructor(options: Readonly>) { + constructor(options: Readonly) { assert( Number.isInteger(options.maxAttempts) && options.maxAttempts >= 1, 'maxAttempts should be a positive integer' @@ -93,14 +68,33 @@ export class JobQueue { ); this.maxAttempts = options.maxAttempts; - this.parseData = options.parseData; this.queueType = options.queueType; - this.run = options.run; this.store = options.store; this.logPrefix = `${this.queueType} job queue:`; } + /** + * `parseData` will be called with the raw data from `store`. For example, if the job + * takes a single number, `parseData` should throw if `data` is a number and should + * return the number otherwise. + * + * If it throws, the job will be deleted from the store and the job will not be run. + * + * Will only be called once per job, even if `maxAttempts > 1`. + */ + protected abstract parseData(data: unknown): T; + + /** + * Run the job, given data. + * + * If it resolves, the job will be deleted from the store. + * + * If it rejects, the job will be retried up to `maxAttempts - 1` times, after which it + * will be deleted from the store. + */ + protected abstract run(job: Readonly>): Promise; + /** * Start streaming jobs from the store. */ diff --git a/ts/jobs/removeStorageKeyJobQueue.ts b/ts/jobs/removeStorageKeyJobQueue.ts index a21a905c8d..71d88d92bb 100644 --- a/ts/jobs/removeStorageKeyJobQueue.ts +++ b/ts/jobs/removeStorageKeyJobQueue.ts @@ -1,5 +1,6 @@ // Copyright 2021 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only +/* eslint-disable class-methods-use-this */ import { z } from 'zod'; @@ -12,18 +13,12 @@ const removeStorageKeyJobDataSchema = z.object({ type RemoveStorageKeyJobData = z.infer; -export const removeStorageKeyJobQueue = new JobQueue({ - store: jobQueueDatabaseStore, - - queueType: 'remove storage key', - - maxAttempts: 100, - - parseData(data: unknown): RemoveStorageKeyJobData { +export class RemoveStorageKeyJobQueue extends JobQueue { + protected parseData(data: unknown): RemoveStorageKeyJobData { return removeStorageKeyJobDataSchema.parse(data); - }, + } - async run({ + protected async run({ data, }: Readonly<{ data: RemoveStorageKeyJobData }>): Promise { await new Promise(resolve => { @@ -31,5 +26,13 @@ export const removeStorageKeyJobQueue = new JobQueue({ }); await window.storage.remove(data.key); - }, + } +} + +export const removeStorageKeyJobQueue = new RemoveStorageKeyJobQueue({ + store: jobQueueDatabaseStore, + + queueType: 'remove storage key', + + maxAttempts: 100, }); diff --git a/ts/jobs/reportSpamJobQueue.ts b/ts/jobs/reportSpamJobQueue.ts index b54f691307..351416ded2 100644 --- a/ts/jobs/reportSpamJobQueue.ts +++ b/ts/jobs/reportSpamJobQueue.ts @@ -1,5 +1,6 @@ // Copyright 2021 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only +/* eslint-disable class-methods-use-this */ import * as z from 'zod'; import * as moment from 'moment'; @@ -45,18 +46,14 @@ const reportSpamJobDataSchema = z.object({ export type ReportSpamJobData = z.infer; -export const reportSpamJobQueue = new JobQueue({ - store: jobQueueDatabaseStore, - - queueType: 'report spam', - - maxAttempts: 25, - - parseData(data: unknown): ReportSpamJobData { +export class ReportSpamJobQueue extends JobQueue { + protected parseData(data: unknown): ReportSpamJobData { return reportSpamJobDataSchema.parse(data); - }, + } - async run({ data }: Readonly<{ data: ReportSpamJobData }>): Promise { + protected async run({ + data, + }: Readonly<{ data: ReportSpamJobData }>): Promise { const { e164, serverGuids } = data; await new Promise(resolve => { @@ -115,5 +112,13 @@ export const reportSpamJobQueue = new JobQueue({ throw err; } - }, + } +} + +export const reportSpamJobQueue = new ReportSpamJobQueue({ + store: jobQueueDatabaseStore, + + queueType: 'report spam', + + maxAttempts: 25, }); diff --git a/ts/test-node/jobs/JobQueue_test.ts b/ts/test-node/jobs/JobQueue_test.ts index a296240ad9..04aa76b69f 100644 --- a/ts/test-node/jobs/JobQueue_test.ts +++ b/ts/test-node/jobs/JobQueue_test.ts @@ -1,11 +1,12 @@ // Copyright 2021 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only +/* eslint-disable max-classes-per-file, class-methods-use-this */ import { assert } from 'chai'; import * as sinon from 'sinon'; import EventEmitter, { once } from 'events'; import { z } from 'zod'; -import { identity, noop, groupBy } from 'lodash'; +import { noop, groupBy } from 'lodash'; import { v4 as uuid } from 'uuid'; import { JobError } from '../../jobs/JobError'; import { TestJobQueueStore } from './TestJobQueueStore'; @@ -28,16 +29,20 @@ describe('JobQueue', () => { const results = new Set(); const store = new TestJobQueueStore(); - const addQueue = new JobQueue({ + class Queue extends JobQueue { + parseData(data: unknown): TestJobData { + return testJobSchema.parse(data); + } + + async run({ data }: ParsedJob): Promise { + results.add(data.a + data.b); + } + } + + const addQueue = new Queue({ store, queueType: 'test add queue', maxAttempts: 1, - parseData(data: unknown): TestJobData { - return testJobSchema.parse(data); - }, - async run({ data }: ParsedJob): Promise { - results.add(data.a + data.b); - }, }); assert.deepEqual(results, new Set()); @@ -64,25 +69,25 @@ describe('JobQueue', () => { it('writes jobs to the database correctly', async () => { const store = new TestJobQueueStore(); - const queue1 = new JobQueue({ + class TestQueue extends JobQueue { + parseData(data: unknown): string { + return z.string().parse(data); + } + + async run(): Promise { + return Promise.resolve(); + } + } + + const queue1 = new TestQueue({ store, queueType: 'test 1', maxAttempts: 1, - parseData: (data: unknown): string => { - return z.string().parse(data); - }, - run: sinon.stub().resolves(), }); - const queue2 = new JobQueue({ + const queue2 = new TestQueue({ store, queueType: 'test 2', maxAttempts: 1, - parseData: (data: unknown): string => { - return z.string().parse(data); - }, - run(): Promise { - return Promise.resolve(); - }, }); store.pauseStream('test 1'); @@ -138,16 +143,14 @@ describe('JobQueue', () => { const store = new TestJobQueueStore(); - const retryQueue = new JobQueue({ - store, - queueType: 'test retry queue', - maxAttempts: 5, + class RetryQueue extends JobQueue { parseData(data: unknown): TestJobData { if (data !== 'foo' && data !== 'bar') { throw new Error('Invalid data'); } return data; - }, + } + async run({ data }: ParsedJob): Promise { switch (data) { case 'foo': @@ -166,7 +169,13 @@ describe('JobQueue', () => { default: throw missingCaseError(data); } - }, + } + } + + const retryQueue = new RetryQueue({ + store, + queueType: 'test retry queue', + maxAttempts: 5, }); retryQueue.streamJobs(); @@ -196,17 +205,23 @@ describe('JobQueue', () => { }); it('makes job.completion reject if parseData throws', async () => { - const queue = new JobQueue({ - store: new TestJobQueueStore(), - queueType: 'test queue', - maxAttempts: 999, - parseData: (data: unknown): string => { + class TestQueue extends JobQueue { + parseData(data: unknown): string { if (data === 'valid') { return data; } throw new Error('uh oh'); - }, - run: sinon.stub().resolves(), + } + + async run(): Promise { + return Promise.resolve(); + } + } + + const queue = new TestQueue({ + store: new TestJobQueueStore(), + queueType: 'test queue', + maxAttempts: 999, }); queue.streamJobs(); @@ -234,17 +249,23 @@ describe('JobQueue', () => { it("doesn't run the job if parseData throws", async () => { const run = sinon.stub().resolves(); - const queue = new JobQueue({ - store: new TestJobQueueStore(), - queueType: 'test queue', - maxAttempts: 999, - parseData: (data: unknown): string => { + class TestQueue extends JobQueue { + parseData(data: unknown): string { if (data === 'valid') { return data; } throw new Error('invalid data!'); - }, - run, + } + + run(job: { data: string }): Promise { + return run(job); + } + } + + const queue = new TestQueue({ + store: new TestJobQueueStore(), + queueType: 'test queue', + maxAttempts: 999, }); queue.streamJobs(); @@ -262,17 +283,23 @@ describe('JobQueue', () => { it('keeps jobs in the storage if parseData throws', async () => { const store = new TestJobQueueStore(); - const queue = new JobQueue({ - store, - queueType: 'test queue', - maxAttempts: 999, - parseData: (data: unknown): string => { + class TestQueue extends JobQueue { + parseData(data: unknown): string { if (data === 'valid') { return data; } - throw new Error('uh oh'); - }, - run: sinon.stub().resolves(), + throw new Error('invalid data!'); + } + + async run(): Promise { + return Promise.resolve(); + } + } + + const queue = new TestQueue({ + store, + queueType: 'test queue', + maxAttempts: 999, }); queue.streamJobs(); @@ -292,12 +319,20 @@ describe('JobQueue', () => { inserted = true; }); - const queue = new JobQueue({ + class TestQueue extends JobQueue { + parseData(_: unknown): undefined { + return undefined; + } + + async run(): Promise { + return Promise.resolve(); + } + } + + const queue = new TestQueue({ store, queueType: 'test queue', maxAttempts: 999, - parseData: identity, - run: sinon.stub().resolves(), }); queue.streamJobs(); @@ -317,17 +352,21 @@ describe('JobQueue', () => { events.push('insert'); }); - const queue = new JobQueue({ + class TestQueue extends JobQueue { + parseData(data: unknown): unknown { + events.push('parsing data'); + return data; + } + + async run(): Promise { + events.push('running'); + } + } + + const queue = new TestQueue({ store, queueType: 'test queue', maxAttempts: 999, - parseData: (data: unknown): unknown => { - events.push('parsing data'); - return data; - }, - async run() { - events.push('running'); - }, }); queue.streamJobs(); @@ -345,12 +384,20 @@ describe('JobQueue', () => { events.push('delete'); }); - const queue = new JobQueue({ + class TestQueue extends JobQueue { + parseData(_: unknown): undefined { + return undefined; + } + + async run(): Promise { + return Promise.resolve(); + } + } + + const queue = new TestQueue({ store, queueType: 'test queue', maxAttempts: 999, - parseData: identity, - run: sinon.stub().resolves(), }); queue.streamJobs(); @@ -378,15 +425,21 @@ describe('JobQueue', () => { events.push('delete'); }); - const queue = new JobQueue({ + class TestQueue extends JobQueue { + parseData(_: unknown): undefined { + return undefined; + } + + async run(): Promise { + events.push('running'); + throw new Error('uh oh'); + } + } + + const queue = new TestQueue({ store, queueType: 'test queue', maxAttempts: 5, - parseData: identity, - async run() { - events.push('running'); - throw new Error('uh oh'); - }, }); queue.streamJobs(); @@ -453,16 +506,20 @@ describe('JobQueue', () => { it('starts streaming jobs from the store', async () => { const eventEmitter = new EventEmitter(); - const noopQueue = new JobQueue({ + class TestQueue extends JobQueue { + parseData(data: unknown): number { + return z.number().parse(data); + } + + async run({ data }: Readonly<{ data: number }>): Promise { + eventEmitter.emit('run', data); + } + } + + const noopQueue = new TestQueue({ store: fakeStore, queueType: 'test noop queue', maxAttempts: 99, - parseData(data: unknown): number { - return z.number().parse(data); - }, - async run({ data }: Readonly<{ data: number }>) { - eventEmitter.emit('run', data); - }, }); sinon.assert.notCalled(fakeStore.stream as sinon.SinonStub); @@ -492,12 +549,20 @@ describe('JobQueue', () => { }); it('rejects when called more than once', async () => { - const noopQueue = new JobQueue({ + class TestQueue extends JobQueue { + parseData(data: unknown): unknown { + return data; + } + + async run(): Promise { + return Promise.resolve(); + } + } + + const noopQueue = new TestQueue({ store: fakeStore, queueType: 'test noop queue', maxAttempts: 99, - parseData: identity, - run: sinon.stub().resolves(), }); noopQueue.streamJobs(); @@ -517,12 +582,20 @@ describe('JobQueue', () => { stream: sinon.stub(), }; - const noopQueue = new JobQueue({ + class TestQueue extends JobQueue { + parseData(_: unknown): undefined { + return undefined; + } + + async run(): Promise { + return Promise.resolve(); + } + } + + const noopQueue = new TestQueue({ store: fakeStore, queueType: 'test noop queue', maxAttempts: 99, - parseData: identity, - run: sinon.stub().resolves(), }); await assertRejects(() => noopQueue.add(undefined));