Make JobQueue an abstract class

This commit is contained in:
Fedor Indutny 2021-07-21 14:10:08 -07:00 committed by GitHub
parent d9e90e9ea8
commit 943bb38af1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 208 additions and 133 deletions

View file

@ -15,7 +15,7 @@ const noopOnCompleteCallbacks = {
reject: noop, reject: noop,
}; };
type JobQueueOptions<T> = { type JobQueueOptions = {
/** /**
* The backing store for jobs. Typically a wrapper around the database. * The backing store for jobs. Typically a wrapper around the database.
*/ */
@ -32,38 +32,13 @@ type JobQueueOptions<T> = {
* the job to fail; a value of 2 will allow the job to fail once; etc. * the job to fail; a value of 2 will allow the job to fail once; etc.
*/ */
maxAttempts: number; maxAttempts: number;
/**
* `parseData` will be called with the raw data from `store`. For example, if the job
* takes a single number, `parseData` should throw if `data` is a number and should
* return the number otherwise.
*
* If it throws, the job will be deleted from the store and the job will not be run.
*
* Will only be called once per job, even if `maxAttempts > 1`.
*/
parseData: (data: unknown) => T;
/**
* Run the job, given data.
*
* If it resolves, the job will be deleted from the store.
*
* If it rejects, the job will be retried up to `maxAttempts - 1` times, after which it
* will be deleted from the store.
*/
run: (job: Readonly<ParsedJob<T>>) => Promise<void>;
}; };
export class JobQueue<T> { export abstract class JobQueue<T> {
private readonly maxAttempts: number; private readonly maxAttempts: number;
private readonly parseData: (data: unknown) => T;
private readonly queueType: string; private readonly queueType: string;
private readonly run: (job: Readonly<ParsedJob<T>>) => Promise<unknown>;
private readonly store: JobQueueStore; private readonly store: JobQueueStore;
private readonly logPrefix: string; private readonly logPrefix: string;
@ -78,7 +53,7 @@ export class JobQueue<T> {
private started = false; private started = false;
constructor(options: Readonly<JobQueueOptions<T>>) { constructor(options: Readonly<JobQueueOptions>) {
assert( assert(
Number.isInteger(options.maxAttempts) && options.maxAttempts >= 1, Number.isInteger(options.maxAttempts) && options.maxAttempts >= 1,
'maxAttempts should be a positive integer' 'maxAttempts should be a positive integer'
@ -93,14 +68,33 @@ export class JobQueue<T> {
); );
this.maxAttempts = options.maxAttempts; this.maxAttempts = options.maxAttempts;
this.parseData = options.parseData;
this.queueType = options.queueType; this.queueType = options.queueType;
this.run = options.run;
this.store = options.store; this.store = options.store;
this.logPrefix = `${this.queueType} job queue:`; this.logPrefix = `${this.queueType} job queue:`;
} }
/**
* `parseData` will be called with the raw data from `store`. For example, if the job
* takes a single number, `parseData` should throw if `data` is a number and should
* return the number otherwise.
*
* If it throws, the job will be deleted from the store and the job will not be run.
*
* Will only be called once per job, even if `maxAttempts > 1`.
*/
protected abstract parseData(data: unknown): T;
/**
* Run the job, given data.
*
* If it resolves, the job will be deleted from the store.
*
* If it rejects, the job will be retried up to `maxAttempts - 1` times, after which it
* will be deleted from the store.
*/
protected abstract run(job: Readonly<ParsedJob<T>>): Promise<void>;
/** /**
* Start streaming jobs from the store. * Start streaming jobs from the store.
*/ */

View file

@ -1,5 +1,6 @@
// Copyright 2021 Signal Messenger, LLC // Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only // SPDX-License-Identifier: AGPL-3.0-only
/* eslint-disable class-methods-use-this */
import { z } from 'zod'; import { z } from 'zod';
@ -12,18 +13,12 @@ const removeStorageKeyJobDataSchema = z.object({
type RemoveStorageKeyJobData = z.infer<typeof removeStorageKeyJobDataSchema>; type RemoveStorageKeyJobData = z.infer<typeof removeStorageKeyJobDataSchema>;
export const removeStorageKeyJobQueue = new JobQueue<RemoveStorageKeyJobData>({ export class RemoveStorageKeyJobQueue extends JobQueue<RemoveStorageKeyJobData> {
store: jobQueueDatabaseStore, protected parseData(data: unknown): RemoveStorageKeyJobData {
queueType: 'remove storage key',
maxAttempts: 100,
parseData(data: unknown): RemoveStorageKeyJobData {
return removeStorageKeyJobDataSchema.parse(data); return removeStorageKeyJobDataSchema.parse(data);
}, }
async run({ protected async run({
data, data,
}: Readonly<{ data: RemoveStorageKeyJobData }>): Promise<void> { }: Readonly<{ data: RemoveStorageKeyJobData }>): Promise<void> {
await new Promise<void>(resolve => { await new Promise<void>(resolve => {
@ -31,5 +26,13 @@ export const removeStorageKeyJobQueue = new JobQueue<RemoveStorageKeyJobData>({
}); });
await window.storage.remove(data.key); await window.storage.remove(data.key);
}, }
}
export const removeStorageKeyJobQueue = new RemoveStorageKeyJobQueue({
store: jobQueueDatabaseStore,
queueType: 'remove storage key',
maxAttempts: 100,
}); });

View file

@ -1,5 +1,6 @@
// Copyright 2021 Signal Messenger, LLC // Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only // SPDX-License-Identifier: AGPL-3.0-only
/* eslint-disable class-methods-use-this */
import * as z from 'zod'; import * as z from 'zod';
import * as moment from 'moment'; import * as moment from 'moment';
@ -45,18 +46,14 @@ const reportSpamJobDataSchema = z.object({
export type ReportSpamJobData = z.infer<typeof reportSpamJobDataSchema>; export type ReportSpamJobData = z.infer<typeof reportSpamJobDataSchema>;
export const reportSpamJobQueue = new JobQueue<ReportSpamJobData>({ export class ReportSpamJobQueue extends JobQueue<ReportSpamJobData> {
store: jobQueueDatabaseStore, protected parseData(data: unknown): ReportSpamJobData {
queueType: 'report spam',
maxAttempts: 25,
parseData(data: unknown): ReportSpamJobData {
return reportSpamJobDataSchema.parse(data); return reportSpamJobDataSchema.parse(data);
}, }
async run({ data }: Readonly<{ data: ReportSpamJobData }>): Promise<void> { protected async run({
data,
}: Readonly<{ data: ReportSpamJobData }>): Promise<void> {
const { e164, serverGuids } = data; const { e164, serverGuids } = data;
await new Promise<void>(resolve => { await new Promise<void>(resolve => {
@ -115,5 +112,13 @@ export const reportSpamJobQueue = new JobQueue<ReportSpamJobData>({
throw err; throw err;
} }
}, }
}
export const reportSpamJobQueue = new ReportSpamJobQueue({
store: jobQueueDatabaseStore,
queueType: 'report spam',
maxAttempts: 25,
}); });

View file

@ -1,11 +1,12 @@
// Copyright 2021 Signal Messenger, LLC // Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only // SPDX-License-Identifier: AGPL-3.0-only
/* eslint-disable max-classes-per-file, class-methods-use-this */
import { assert } from 'chai'; import { assert } from 'chai';
import * as sinon from 'sinon'; import * as sinon from 'sinon';
import EventEmitter, { once } from 'events'; import EventEmitter, { once } from 'events';
import { z } from 'zod'; import { z } from 'zod';
import { identity, noop, groupBy } from 'lodash'; import { noop, groupBy } from 'lodash';
import { v4 as uuid } from 'uuid'; import { v4 as uuid } from 'uuid';
import { JobError } from '../../jobs/JobError'; import { JobError } from '../../jobs/JobError';
import { TestJobQueueStore } from './TestJobQueueStore'; import { TestJobQueueStore } from './TestJobQueueStore';
@ -28,16 +29,20 @@ describe('JobQueue', () => {
const results = new Set<unknown>(); const results = new Set<unknown>();
const store = new TestJobQueueStore(); const store = new TestJobQueueStore();
const addQueue = new JobQueue({ class Queue extends JobQueue<TestJobData> {
parseData(data: unknown): TestJobData {
return testJobSchema.parse(data);
}
async run({ data }: ParsedJob<TestJobData>): Promise<void> {
results.add(data.a + data.b);
}
}
const addQueue = new Queue({
store, store,
queueType: 'test add queue', queueType: 'test add queue',
maxAttempts: 1, maxAttempts: 1,
parseData(data: unknown): TestJobData {
return testJobSchema.parse(data);
},
async run({ data }: ParsedJob<TestJobData>): Promise<void> {
results.add(data.a + data.b);
},
}); });
assert.deepEqual(results, new Set()); assert.deepEqual(results, new Set());
@ -64,25 +69,25 @@ describe('JobQueue', () => {
it('writes jobs to the database correctly', async () => { it('writes jobs to the database correctly', async () => {
const store = new TestJobQueueStore(); const store = new TestJobQueueStore();
const queue1 = new JobQueue({ class TestQueue extends JobQueue<string> {
parseData(data: unknown): string {
return z.string().parse(data);
}
async run(): Promise<void> {
return Promise.resolve();
}
}
const queue1 = new TestQueue({
store, store,
queueType: 'test 1', queueType: 'test 1',
maxAttempts: 1, maxAttempts: 1,
parseData: (data: unknown): string => {
return z.string().parse(data);
},
run: sinon.stub().resolves(),
}); });
const queue2 = new JobQueue({ const queue2 = new TestQueue({
store, store,
queueType: 'test 2', queueType: 'test 2',
maxAttempts: 1, maxAttempts: 1,
parseData: (data: unknown): string => {
return z.string().parse(data);
},
run(): Promise<void> {
return Promise.resolve();
},
}); });
store.pauseStream('test 1'); store.pauseStream('test 1');
@ -138,16 +143,14 @@ describe('JobQueue', () => {
const store = new TestJobQueueStore(); const store = new TestJobQueueStore();
const retryQueue = new JobQueue({ class RetryQueue extends JobQueue<TestJobData> {
store,
queueType: 'test retry queue',
maxAttempts: 5,
parseData(data: unknown): TestJobData { parseData(data: unknown): TestJobData {
if (data !== 'foo' && data !== 'bar') { if (data !== 'foo' && data !== 'bar') {
throw new Error('Invalid data'); throw new Error('Invalid data');
} }
return data; return data;
}, }
async run({ data }: ParsedJob<TestJobData>): Promise<void> { async run({ data }: ParsedJob<TestJobData>): Promise<void> {
switch (data) { switch (data) {
case 'foo': case 'foo':
@ -166,7 +169,13 @@ describe('JobQueue', () => {
default: default:
throw missingCaseError(data); throw missingCaseError(data);
} }
}, }
}
const retryQueue = new RetryQueue({
store,
queueType: 'test retry queue',
maxAttempts: 5,
}); });
retryQueue.streamJobs(); retryQueue.streamJobs();
@ -196,17 +205,23 @@ describe('JobQueue', () => {
}); });
it('makes job.completion reject if parseData throws', async () => { it('makes job.completion reject if parseData throws', async () => {
const queue = new JobQueue({ class TestQueue extends JobQueue<string> {
store: new TestJobQueueStore(), parseData(data: unknown): string {
queueType: 'test queue',
maxAttempts: 999,
parseData: (data: unknown): string => {
if (data === 'valid') { if (data === 'valid') {
return data; return data;
} }
throw new Error('uh oh'); throw new Error('uh oh');
}, }
run: sinon.stub().resolves(),
async run(): Promise<void> {
return Promise.resolve();
}
}
const queue = new TestQueue({
store: new TestJobQueueStore(),
queueType: 'test queue',
maxAttempts: 999,
}); });
queue.streamJobs(); queue.streamJobs();
@ -234,17 +249,23 @@ describe('JobQueue', () => {
it("doesn't run the job if parseData throws", async () => { it("doesn't run the job if parseData throws", async () => {
const run = sinon.stub().resolves(); const run = sinon.stub().resolves();
const queue = new JobQueue({ class TestQueue extends JobQueue<string> {
store: new TestJobQueueStore(), parseData(data: unknown): string {
queueType: 'test queue',
maxAttempts: 999,
parseData: (data: unknown): string => {
if (data === 'valid') { if (data === 'valid') {
return data; return data;
} }
throw new Error('invalid data!'); throw new Error('invalid data!');
}, }
run,
run(job: { data: string }): Promise<void> {
return run(job);
}
}
const queue = new TestQueue({
store: new TestJobQueueStore(),
queueType: 'test queue',
maxAttempts: 999,
}); });
queue.streamJobs(); queue.streamJobs();
@ -262,17 +283,23 @@ describe('JobQueue', () => {
it('keeps jobs in the storage if parseData throws', async () => { it('keeps jobs in the storage if parseData throws', async () => {
const store = new TestJobQueueStore(); const store = new TestJobQueueStore();
const queue = new JobQueue({ class TestQueue extends JobQueue<string> {
store, parseData(data: unknown): string {
queueType: 'test queue',
maxAttempts: 999,
parseData: (data: unknown): string => {
if (data === 'valid') { if (data === 'valid') {
return data; return data;
} }
throw new Error('uh oh'); throw new Error('invalid data!');
}, }
run: sinon.stub().resolves(),
async run(): Promise<void> {
return Promise.resolve();
}
}
const queue = new TestQueue({
store,
queueType: 'test queue',
maxAttempts: 999,
}); });
queue.streamJobs(); queue.streamJobs();
@ -292,12 +319,20 @@ describe('JobQueue', () => {
inserted = true; inserted = true;
}); });
const queue = new JobQueue({ class TestQueue extends JobQueue<undefined> {
parseData(_: unknown): undefined {
return undefined;
}
async run(): Promise<void> {
return Promise.resolve();
}
}
const queue = new TestQueue({
store, store,
queueType: 'test queue', queueType: 'test queue',
maxAttempts: 999, maxAttempts: 999,
parseData: identity,
run: sinon.stub().resolves(),
}); });
queue.streamJobs(); queue.streamJobs();
@ -317,17 +352,21 @@ describe('JobQueue', () => {
events.push('insert'); events.push('insert');
}); });
const queue = new JobQueue({ class TestQueue extends JobQueue<unknown> {
parseData(data: unknown): unknown {
events.push('parsing data');
return data;
}
async run(): Promise<void> {
events.push('running');
}
}
const queue = new TestQueue({
store, store,
queueType: 'test queue', queueType: 'test queue',
maxAttempts: 999, maxAttempts: 999,
parseData: (data: unknown): unknown => {
events.push('parsing data');
return data;
},
async run() {
events.push('running');
},
}); });
queue.streamJobs(); queue.streamJobs();
@ -345,12 +384,20 @@ describe('JobQueue', () => {
events.push('delete'); events.push('delete');
}); });
const queue = new JobQueue({ class TestQueue extends JobQueue<undefined> {
parseData(_: unknown): undefined {
return undefined;
}
async run(): Promise<void> {
return Promise.resolve();
}
}
const queue = new TestQueue({
store, store,
queueType: 'test queue', queueType: 'test queue',
maxAttempts: 999, maxAttempts: 999,
parseData: identity,
run: sinon.stub().resolves(),
}); });
queue.streamJobs(); queue.streamJobs();
@ -378,15 +425,21 @@ describe('JobQueue', () => {
events.push('delete'); events.push('delete');
}); });
const queue = new JobQueue({ class TestQueue extends JobQueue<undefined> {
parseData(_: unknown): undefined {
return undefined;
}
async run(): Promise<void> {
events.push('running');
throw new Error('uh oh');
}
}
const queue = new TestQueue({
store, store,
queueType: 'test queue', queueType: 'test queue',
maxAttempts: 5, maxAttempts: 5,
parseData: identity,
async run() {
events.push('running');
throw new Error('uh oh');
},
}); });
queue.streamJobs(); queue.streamJobs();
@ -453,16 +506,20 @@ describe('JobQueue', () => {
it('starts streaming jobs from the store', async () => { it('starts streaming jobs from the store', async () => {
const eventEmitter = new EventEmitter(); const eventEmitter = new EventEmitter();
const noopQueue = new JobQueue({ class TestQueue extends JobQueue<number> {
parseData(data: unknown): number {
return z.number().parse(data);
}
async run({ data }: Readonly<{ data: number }>): Promise<void> {
eventEmitter.emit('run', data);
}
}
const noopQueue = new TestQueue({
store: fakeStore, store: fakeStore,
queueType: 'test noop queue', queueType: 'test noop queue',
maxAttempts: 99, maxAttempts: 99,
parseData(data: unknown): number {
return z.number().parse(data);
},
async run({ data }: Readonly<{ data: number }>) {
eventEmitter.emit('run', data);
},
}); });
sinon.assert.notCalled(fakeStore.stream as sinon.SinonStub); sinon.assert.notCalled(fakeStore.stream as sinon.SinonStub);
@ -492,12 +549,20 @@ describe('JobQueue', () => {
}); });
it('rejects when called more than once', async () => { it('rejects when called more than once', async () => {
const noopQueue = new JobQueue({ class TestQueue extends JobQueue<unknown> {
parseData(data: unknown): unknown {
return data;
}
async run(): Promise<void> {
return Promise.resolve();
}
}
const noopQueue = new TestQueue({
store: fakeStore, store: fakeStore,
queueType: 'test noop queue', queueType: 'test noop queue',
maxAttempts: 99, maxAttempts: 99,
parseData: identity,
run: sinon.stub().resolves(),
}); });
noopQueue.streamJobs(); noopQueue.streamJobs();
@ -517,12 +582,20 @@ describe('JobQueue', () => {
stream: sinon.stub(), stream: sinon.stub(),
}; };
const noopQueue = new JobQueue({ class TestQueue extends JobQueue<undefined> {
parseData(_: unknown): undefined {
return undefined;
}
async run(): Promise<void> {
return Promise.resolve();
}
}
const noopQueue = new TestQueue({
store: fakeStore, store: fakeStore,
queueType: 'test noop queue', queueType: 'test noop queue',
maxAttempts: 99, maxAttempts: 99,
parseData: identity,
run: sinon.stub().resolves(),
}); });
await assertRejects(() => noopQueue.add(undefined)); await assertRejects(() => noopQueue.add(undefined));