Initial support for job queue

This commit is contained in:
Evan Hahn 2021-04-29 18:02:27 -05:00 committed by GitHub
parent 1238cca538
commit bbd7fd3854
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 1708 additions and 28 deletions

View file

@ -14,6 +14,8 @@ import { isValidReactionEmoji } from './reactions/isValidReactionEmoji';
import { ConversationModel } from './models/conversations';
import { createBatcher } from './util/batcher';
import { updateConversationsWithUuidLookup } from './updateConversationsWithUuidLookup';
import { initializeAllJobQueues } from './jobs/initializeAllJobQueues';
import { removeStorageKeyJobQueue } from './jobs/removeStorageKeyJobQueue';
const MAX_ATTACHMENT_DOWNLOAD_AGE = 3600 * 72 * 1000;
@ -31,6 +33,8 @@ export async function startApp(): Promise<void> {
);
}
initializeAllJobQueues();
let resolveOnAppView: (() => void) | undefined;
const onAppView = new Promise<void>(resolve => {
resolveOnAppView = resolve;
@ -654,6 +658,13 @@ export async function startApp(): Promise<void> {
await window.Signal.Data.clearAllErrorStickerPackAttempts();
}
if (window.isBeforeVersion(lastVersion, 'v5.2.0')) {
const legacySenderCertificateStorageKey = 'senderCertificateWithUuid';
await removeStorageKeyJobQueue.add({
key: legacySenderCertificateStorageKey,
});
}
// This one should always be last - it could restart the app
if (window.isBeforeVersion(lastVersion, 'v1.15.0-beta.5')) {
await window.Signal.Logs.deleteAll();

17
ts/jobs/Job.ts Normal file
View file

@ -0,0 +1,17 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { ParsedJob } from './types';
/**
* A single job instance. Shouldn't be instantiated directly, except by `JobQueue`.
*/
export class Job<T> implements ParsedJob<T> {
constructor(
readonly id: string,
readonly timestamp: number,
readonly queueType: string,
readonly data: T,
readonly completion: Promise<void>
) {}
}

22
ts/jobs/JobError.ts Normal file
View file

@ -0,0 +1,22 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { reallyJsonStringify } from '../util/reallyJsonStringify';
/**
* An error that wraps job errors.
*
* Should not be instantiated directly, except by `JobQueue`.
*/
export class JobError extends Error {
constructor(public readonly lastErrorThrownByJob: unknown) {
super(`Job failed. Last error: ${formatError(lastErrorThrownByJob)}`);
}
}
function formatError(err: unknown): string {
if (err instanceof Error) {
return err.message;
}
return reallyJsonStringify(err);
}

233
ts/jobs/JobQueue.ts Normal file
View file

@ -0,0 +1,233 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { v4 as uuid } from 'uuid';
import { noop } from 'lodash';
import { Job } from './Job';
import { JobError } from './JobError';
import { ParsedJob, StoredJob, JobQueueStore } from './types';
import { assert } from '../util/assert';
import * as log from '../logging/log';
const noopOnCompleteCallbacks = {
resolve: noop,
reject: noop,
};
type JobQueueOptions<T> = {
/**
* The backing store for jobs. Typically a wrapper around the database.
*/
store: JobQueueStore;
/**
* A unique name for this job queue. For example, might be "attachment downloads" or
* "message send".
*/
queueType: string;
/**
* The maximum number of attempts for a job in this queue. A value of 1 will not allow
* the job to fail; a value of 2 will allow the job to fail once; etc.
*/
maxAttempts: number;
/**
* `parseData` will be called with the raw data from `store`. For example, if the job
* takes a single number, `parseData` should throw if `data` is a number and should
* return the number otherwise.
*
* If it throws, the job will be deleted from the store and the job will not be run.
*
* Will only be called once per job, even if `maxAttempts > 1`.
*/
parseData: (data: unknown) => T;
/**
* Run the job, given data.
*
* If it resolves, the job will be deleted from the store.
*
* If it rejects, the job will be retried up to `maxAttempts - 1` times, after which it
* will be deleted from the store.
*/
run: (job: Readonly<ParsedJob<T>>) => Promise<void>;
};
export class JobQueue<T> {
private readonly maxAttempts: number;
private readonly parseData: (data: unknown) => T;
private readonly queueType: string;
private readonly run: (job: Readonly<ParsedJob<T>>) => Promise<unknown>;
private readonly store: JobQueueStore;
private readonly logPrefix: string;
private readonly onCompleteCallbacks = new Map<
string,
{
resolve: () => void;
reject: (err: unknown) => void;
}
>();
private started = false;
constructor(options: Readonly<JobQueueOptions<T>>) {
assert(
Number.isInteger(options.maxAttempts) && options.maxAttempts >= 1,
'maxAttempts should be a positive integer'
);
assert(
options.maxAttempts <= Number.MAX_SAFE_INTEGER,
'maxAttempts is too large'
);
assert(
options.queueType.trim().length,
'queueType should be a non-blank string'
);
this.maxAttempts = options.maxAttempts;
this.parseData = options.parseData;
this.queueType = options.queueType;
this.run = options.run;
this.store = options.store;
this.logPrefix = `${this.queueType} job queue:`;
}
/**
* Start streaming jobs from the store.
*/
async streamJobs(): Promise<void> {
if (this.started) {
throw new Error(
`${this.logPrefix} should not start streaming more than once`
);
}
this.started = true;
log.info(`${this.logPrefix} starting to stream jobs`);
const stream = this.store.stream(this.queueType);
// We want to enqueue the jobs in sequence, not in parallel. `for await ... of` is a
// good way to do that.
// eslint-disable-next-line no-restricted-syntax
for await (const storedJob of stream) {
this.enqueueStoredJob(storedJob);
}
}
/**
* Add a job, which should cause it to be enqueued and run.
*
* If `streamJobs` has not been called yet, it will be called.
*/
async add(data: Readonly<T>): Promise<Job<T>> {
if (!this.started) {
throw new Error(
`${this.logPrefix} has not started streaming. Make sure to call streamJobs().`
);
}
const id = uuid();
const timestamp = Date.now();
const completionPromise = new Promise<void>((resolve, reject) => {
this.onCompleteCallbacks.set(id, { resolve, reject });
});
const completion = (async () => {
try {
await completionPromise;
} catch (err: unknown) {
throw new JobError(err);
} finally {
this.onCompleteCallbacks.delete(id);
}
})();
log.info(`${this.logPrefix} added new job ${id}`);
const job = new Job(id, timestamp, this.queueType, data, completion);
await this.store.insert(job);
return job;
}
private async enqueueStoredJob(storedJob: Readonly<StoredJob>) {
assert(
storedJob.queueType === this.queueType,
'Received a mis-matched queue type'
);
log.info(`${this.logPrefix} enqueuing job ${storedJob.id}`);
// It's okay if we don't have a callback; that likely means the job was created before
// the process was started (e.g., from a previous run).
const { resolve, reject } =
this.onCompleteCallbacks.get(storedJob.id) || noopOnCompleteCallbacks;
let parsedData: T;
try {
parsedData = this.parseData(storedJob.data);
} catch (err) {
log.error(
`${this.logPrefix} failed to parse data for job ${storedJob.id}`
);
reject(
new Error(
'Failed to parse job data. Was unexpected data loaded from the database?'
)
);
return;
}
const parsedJob: ParsedJob<T> = {
...storedJob,
data: parsedData,
};
let result:
| undefined
| { success: true }
| { success: false; err: unknown };
for (let attempt = 1; attempt <= this.maxAttempts; attempt += 1) {
log.info(
`${this.logPrefix} running job ${storedJob.id}, attempt ${attempt} of ${this.maxAttempts}`
);
try {
// We want an `await` in the loop, as we don't want a single job running more
// than once at a time. Ideally, the job will succeed on the first attempt.
// eslint-disable-next-line no-await-in-loop
await this.run(parsedJob);
result = { success: true };
log.info(
`${this.logPrefix} job ${storedJob.id} succeeded on attempt ${attempt}`
);
break;
} catch (err: unknown) {
result = { success: false, err };
log.error(
`${this.logPrefix} job ${storedJob.id} failed on attempt ${attempt}`
);
}
}
await this.store.delete(storedJob.id);
assert(
result,
'The job never ran. This indicates a developer error in the job queue'
);
if (result.success) {
resolve();
} else {
reject(result.err);
}
}
}

View file

@ -0,0 +1,107 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { pick, noop } from 'lodash';
import { AsyncQueue } from '../util/AsyncQueue';
import { concat, wrapPromise } from '../util/asyncIterables';
import { JobQueueStore, StoredJob } from './types';
import databaseInterface from '../sql/Client';
import * as log from '../logging/log';
type Database = {
getJobsInQueue(queueType: string): Promise<Array<StoredJob>>;
insertJob(job: Readonly<StoredJob>): Promise<void>;
deleteJob(id: string): Promise<void>;
};
export class JobQueueDatabaseStore implements JobQueueStore {
private activeQueueTypes = new Set<string>();
private queues = new Map<string, AsyncQueue<StoredJob>>();
private initialFetchPromises = new Map<string, Promise<void>>();
constructor(private readonly db: Database) {}
async insert(job: Readonly<StoredJob>): Promise<void> {
log.info(
`JobQueueDatabaseStore adding job ${job.id} to queue ${JSON.stringify(
job.queueType
)}`
);
const initialFetchPromise = this.initialFetchPromises.get(job.queueType);
if (!initialFetchPromise) {
throw new Error(
`JobQueueDatabaseStore tried to add job for queue ${JSON.stringify(
job.queueType
)} but streaming had not yet started`
);
}
await initialFetchPromise;
await this.db.insertJob(
pick(job, ['id', 'timestamp', 'queueType', 'data'])
);
this.getQueue(job.queueType).add(job);
}
async delete(id: string): Promise<void> {
await this.db.deleteJob(id);
}
stream(queueType: string): AsyncIterable<StoredJob> {
if (this.activeQueueTypes.has(queueType)) {
throw new Error(
`Cannot stream queue type ${JSON.stringify(queueType)} more than once`
);
}
this.activeQueueTypes.add(queueType);
return concat([
wrapPromise(this.fetchJobsAtStart(queueType)),
this.getQueue(queueType),
]);
}
private getQueue(queueType: string): AsyncQueue<StoredJob> {
const existingQueue = this.queues.get(queueType);
if (existingQueue) {
return existingQueue;
}
const result = new AsyncQueue<StoredJob>();
this.queues.set(queueType, result);
return result;
}
private async fetchJobsAtStart(queueType: string): Promise<Array<StoredJob>> {
log.info(
`JobQueueDatabaseStore fetching existing jobs for queue ${JSON.stringify(
queueType
)}`
);
// This is initialized to `noop` because TypeScript doesn't know that `Promise` calls
// its callback synchronously, making sure `onFinished` is defined.
let onFinished: () => void = noop;
const initialFetchPromise = new Promise<void>(resolve => {
onFinished = resolve;
});
this.initialFetchPromises.set(queueType, initialFetchPromise);
const result = await this.db.getJobsInQueue(queueType);
log.info(
`JobQueueDatabaseStore finished fetching existing ${
result.length
} jobs for queue ${JSON.stringify(queueType)}`
);
onFinished();
return result;
}
}
export const jobQueueDatabaseStore = new JobQueueDatabaseStore(
databaseInterface
);

View file

@ -0,0 +1,11 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { removeStorageKeyJobQueue } from './removeStorageKeyJobQueue';
/**
* Start all of the job queues. Should be called when the database is ready.
*/
export function initializeAllJobQueues(): void {
removeStorageKeyJobQueue.streamJobs();
}

View file

@ -0,0 +1,35 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import * as z from 'zod';
import { JobQueue } from './JobQueue';
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
const removeStorageKeyJobDataSchema = z.object({
key: z.string().min(1),
});
type RemoveStorageKeyJobData = z.infer<typeof removeStorageKeyJobDataSchema>;
export const removeStorageKeyJobQueue = new JobQueue<RemoveStorageKeyJobData>({
store: jobQueueDatabaseStore,
queueType: 'remove storage key',
maxAttempts: 100,
parseData(data: unknown): RemoveStorageKeyJobData {
return removeStorageKeyJobDataSchema.parse(data);
},
async run({
data,
}: Readonly<{ data: RemoveStorageKeyJobData }>): Promise<void> {
await new Promise<void>(resolve => {
window.storage.onready(resolve);
});
await window.storage.remove(data.key);
},
});

37
ts/jobs/types.ts Normal file
View file

@ -0,0 +1,37 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
export type JobQueueStore = {
/**
* Add a job to the database. Doing this should enqueue it in the stream.
*/
insert(job: Readonly<StoredJob>): Promise<void>;
/**
* Remove a job. This should be called when a job finishes successfully or
* if a job has totally failed.
*
* It should NOT be called to cancel a job.
*/
delete(id: string): Promise<void>;
/**
* Stream jobs for a given queue. At startup, this stream may produce a bunch of
* jobs. After that, it should produce one job per `insert`.
*/
stream(queueType: string): AsyncIterable<StoredJob>;
};
export type ParsedJob<T> = {
readonly id: string;
readonly timestamp: number;
readonly queueType: string;
readonly data: T;
};
export type StoredJob = {
readonly id: string;
readonly timestamp: number;
readonly queueType: string;
readonly data?: unknown;
};

View file

@ -60,8 +60,6 @@ export class SenderCertificateService {
this.navigator = navigator;
this.onlineEventTarget = onlineEventTarget;
this.storage = storage;
removeOldKey(storage);
}
async get(
@ -242,12 +240,4 @@ function isExpirationValid(expiration: unknown): expiration is number {
return typeof expiration === 'number' && expiration > Date.now();
}
function removeOldKey(storage: Readonly<Storage>) {
const oldCertKey = 'senderCertificateWithUuid';
const oldUuidCert = storage.get(oldCertKey);
if (oldUuidCert) {
storage.remove(oldCertKey);
}
}
export const senderCertificateService = new SenderCertificateService();

View file

@ -33,6 +33,7 @@ import {
ConversationModelCollectionType,
MessageModelCollectionType,
} from '../model-types.d';
import { StoredJob } from '../jobs/types';
import {
AttachmentDownloadJobType,
@ -225,6 +226,10 @@ const dataInterface: ClientInterface = {
getMessagesWithVisualMediaAttachments,
getMessagesWithFileAttachments,
getJobsInQueue,
insertJob,
deleteJob,
// Test-only
_getAllMessages,
@ -1491,3 +1496,15 @@ async function getMessagesWithFileAttachments(
limit,
});
}
function getJobsInQueue(queueType: string): Promise<Array<StoredJob>> {
return channels.getJobsInQueue(queueType);
}
function insertJob(job: Readonly<StoredJob>): Promise<void> {
return channels.insertJob(job);
}
function deleteJob(id: string): Promise<void> {
return channels.deleteJob(id);
}

View file

@ -13,6 +13,7 @@ import {
} from '../model-types.d';
import { MessageModel } from '../models/messages';
import { ConversationModel } from '../models/conversations';
import { StoredJob } from '../jobs/types';
export type AttachmentDownloadJobType = {
id: string;
@ -289,6 +290,10 @@ export type DataInterface = {
conversationId: string,
options: { limit: number }
) => Promise<Array<MessageType>>;
getJobsInQueue(queueType: string): Promise<Array<StoredJob>>;
insertJob(job: Readonly<StoredJob>): Promise<void>;
deleteJob(id: string): Promise<void>;
};
// The reason for client/server divergence is the need to inject Backbone models and

View file

@ -31,8 +31,10 @@ import {
import { assert } from '../util/assert';
import { isNormalNumber } from '../util/isNormalNumber';
import { combineNames } from '../util/combineNames';
import { isNotNil } from '../util/isNotNil';
import { GroupV2MemberType } from '../model-types.d';
import { StoredJob } from '../jobs/types';
import {
AttachmentDownloadJobType,
@ -214,6 +216,10 @@ const dataInterface: ServerInterface = {
getMessagesWithVisualMediaAttachments,
getMessagesWithFileAttachments,
getJobsInQueue,
insertJob,
deleteJob,
// Server-only
initialize,
@ -1687,6 +1693,27 @@ async function updateToSchemaVersion27(currentVersion: number, db: Database) {
})();
}
function updateToSchemaVersion28(currentVersion: number, db: Database) {
if (currentVersion >= 28) {
return;
}
db.transaction(() => {
db.exec(`
CREATE TABLE jobs(
id TEXT PRIMARY KEY,
queueType TEXT STRING NOT NULL,
timestamp INTEGER NOT NULL,
data STRING TEXT
);
CREATE INDEX jobs_timestamp ON jobs (timestamp);
`);
db.pragma('user_version = 28');
})();
}
const SCHEMA_VERSIONS = [
updateToSchemaVersion1,
updateToSchemaVersion2,
@ -1715,6 +1742,7 @@ const SCHEMA_VERSIONS = [
updateToSchemaVersion25,
updateToSchemaVersion26,
updateToSchemaVersion27,
updateToSchemaVersion28,
];
function updateSchema(db: Database): void {
@ -4241,6 +4269,7 @@ async function removeAll(): Promise<void> {
DELETE FROM stickers;
DELETE FROM sticker_packs;
DELETE FROM sticker_references;
DELETE FROM jobs;
`);
})();
}
@ -4257,6 +4286,7 @@ async function removeAllConfiguration(): Promise<void> {
DELETE FROM sessions;
DELETE FROM signedPreKeys;
DELETE FROM unprocessed;
DELETE FROM jobs;
`);
})();
}
@ -4638,3 +4668,48 @@ async function removeKnownDraftAttachments(
return Object.keys(lookup);
}
async function getJobsInQueue(queueType: string): Promise<Array<StoredJob>> {
const db = getInstance();
return db
.prepare<Query>(
`
SELECT id, timestamp, data
FROM jobs
WHERE queueType = $queueType
ORDER BY timestamp;
`
)
.all({ queueType })
.map(row => ({
id: row.id,
queueType,
timestamp: row.timestamp,
data: isNotNil(row.data) ? JSON.parse(row.data) : undefined,
}));
}
async function insertJob(job: Readonly<StoredJob>): Promise<void> {
const db = getInstance();
db.prepare<Query>(
`
INSERT INTO jobs
(id, queueType, timestamp, data)
VALUES
($id, $queueType, $timestamp, $data);
`
).run({
id: job.id,
queueType: job.queueType,
timestamp: job.timestamp,
data: isNotNil(job.data) ? JSON.stringify(job.data) : null,
});
}
async function deleteJob(id: string): Promise<void> {
const db = getInstance();
db.prepare<Query>('DELETE FROM jobs WHERE id = $id').run({ id });
}

View file

@ -0,0 +1,36 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
/* eslint-disable no-restricted-syntax */
import { assert } from 'chai';
import { AsyncQueue } from '../util/AsyncQueue';
describe('AsyncQueue', () => {
it('yields values as they are added, even if they were added before consuming', async () => {
const queue = new AsyncQueue<number>();
queue.add(1);
queue.add(2);
const resultPromise = (async () => {
const results = [];
for await (const value of queue) {
results.push(value);
if (value === 4) {
break;
}
}
return results;
})();
queue.add(3);
queue.add(4);
// Ignored, because we should've stopped iterating.
queue.add(5);
assert.deepEqual(await resultPromise, [1, 2, 3, 4]);
});
});

View file

@ -0,0 +1,89 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
/* eslint-disable no-await-in-loop */
/* eslint-disable no-restricted-syntax */
import { assert } from 'chai';
import {
MaybeAsyncIterable,
concat,
wrapPromise,
} from '../../util/asyncIterables';
describe('async iterable utilities', () => {
describe('concat', () => {
it('returns an empty async iterable if called with an empty list', async () => {
const result = concat([]);
assert.isEmpty(await collect(result));
});
it('concatenates synchronous and asynchronous iterables', async () => {
function* makeSync() {
yield 'sync 1';
yield 'sync 2';
}
async function* makeAsync() {
yield 'async 1';
yield 'async 2';
}
const syncIterable: Iterable<string> = makeSync();
const asyncIterable1: AsyncIterable<string> = makeAsync();
const asyncIterable2: AsyncIterable<string> = makeAsync();
const result = concat([
syncIterable,
asyncIterable1,
['array 1', 'array 2'],
asyncIterable2,
]);
assert.deepEqual(await collect(result), [
'sync 1',
'sync 2',
'async 1',
'async 2',
'array 1',
'array 2',
'async 1',
'async 2',
]);
});
});
describe('wrapPromise', () => {
it('resolves to an array when wrapping a synchronous iterable', async () => {
const iterable = new Set([1, 2, 3]);
const result = wrapPromise(Promise.resolve(iterable));
assert.sameMembers(await collect(result), [1, 2, 3]);
});
it('resolves to an array when wrapping an asynchronous iterable', async () => {
const iterable = (async function* test() {
yield 1;
yield 2;
yield 3;
})();
const result = wrapPromise(Promise.resolve(iterable));
assert.deepEqual(await collect(result), [1, 2, 3]);
});
});
});
/**
* Turns an iterable into a fully-realized array.
*
* If we want this outside of tests, we could make it into a "real" function.
*/
async function collect<T>(iterable: MaybeAsyncIterable<T>): Promise<Array<T>> {
const result: Array<T> = [];
for await (const value of iterable) {
result.push(value);
}
return result;
}

View file

@ -93,24 +93,6 @@ describe('SenderCertificateService', () => {
fakeStorage.get.withArgs('password').returns('abc123');
});
describe('initialize', () => {
it('removes an old storage service key if it was present', () => {
fakeStorage.get
.withArgs('senderCertificateWithUuid')
.returns('some value');
initializeTestService();
sinon.assert.calledWith(fakeStorage.remove, 'senderCertificateWithUuid');
});
it("doesn't remove anything from storage if it wasn't there", () => {
initializeTestService();
sinon.assert.notCalled(fakeStorage.put);
});
});
describe('get', () => {
it('returns valid yes-E164 certificates from storage if they exist', async () => {
const cert = {

View file

@ -0,0 +1,33 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { assert } from 'chai';
import { JobError } from '../../jobs/JobError';
describe('JobError', () => {
it('stores the provided argument as a property', () => {
const fakeError = new Error('uh oh');
const jobError1 = new JobError(fakeError);
assert.strictEqual(jobError1.lastErrorThrownByJob, fakeError);
const jobError2 = new JobError(123);
assert.strictEqual(jobError2.lastErrorThrownByJob, 123);
});
it('if passed an Error, augments its `message`', () => {
const fakeError = new Error('uh oh');
const jobError = new JobError(fakeError);
assert.strictEqual(jobError.message, 'Job failed. Last error: uh oh');
});
it('if passed a non-Error, stringifies it', () => {
const jobError = new JobError({ foo: 'bar' });
assert.strictEqual(
jobError.message,
'Job failed. Last error: {"foo":"bar"}'
);
});
});

View file

@ -0,0 +1,202 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
/* eslint-disable no-restricted-syntax */
import { assert } from 'chai';
import * as sinon from 'sinon';
import { noop } from 'lodash';
import { StoredJob } from '../../jobs/types';
import { JobQueueDatabaseStore } from '../../jobs/JobQueueDatabaseStore';
describe('JobQueueDatabaseStore', () => {
let fakeDatabase: {
getJobsInQueue: sinon.SinonStub;
insertJob: sinon.SinonStub;
deleteJob: sinon.SinonStub;
};
beforeEach(() => {
fakeDatabase = {
getJobsInQueue: sinon.stub().resolves([]),
insertJob: sinon.stub(),
deleteJob: sinon.stub(),
};
});
describe('insert', () => {
it("fails if streaming hasn't started yet", async () => {
const store = new JobQueueDatabaseStore(fakeDatabase);
let error: unknown;
try {
await store.insert({
id: 'abc',
timestamp: 1234,
queueType: 'test queue',
data: { hi: 5 },
});
} catch (err: unknown) {
error = err;
}
assert.instanceOf(error, Error);
});
it('adds jobs to the database', async () => {
const store = new JobQueueDatabaseStore(fakeDatabase);
store.stream('test queue');
await store.insert({
id: 'abc',
timestamp: 1234,
queueType: 'test queue',
data: { hi: 5 },
});
sinon.assert.calledOnce(fakeDatabase.insertJob);
sinon.assert.calledWithMatch(fakeDatabase.insertJob, {
id: 'abc',
timestamp: 1234,
queueType: 'test queue',
data: { hi: 5 },
});
});
it('enqueues jobs after putting them in the database', async () => {
const events: Array<string> = [];
fakeDatabase.insertJob.callsFake(() => {
events.push('insert');
});
const store = new JobQueueDatabaseStore(fakeDatabase);
const streamPromise = (async () => {
// We don't actually care about using the variable from the async iterable.
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _job of store.stream('test queue')) {
events.push('yielded job');
break;
}
})();
await store.insert({
id: 'abc',
timestamp: 1234,
queueType: 'test queue',
data: { hi: 5 },
});
await streamPromise;
assert.deepEqual(events, ['insert', 'yielded job']);
});
it("doesn't insert jobs until the initial fetch has completed", async () => {
const events: Array<string> = [];
let resolveGetJobsInQueue = noop;
const getJobsInQueuePromise = new Promise(resolve => {
resolveGetJobsInQueue = resolve;
});
fakeDatabase.getJobsInQueue.callsFake(() => {
events.push('loaded jobs');
return getJobsInQueuePromise;
});
fakeDatabase.insertJob.callsFake(() => {
events.push('insert');
});
const store = new JobQueueDatabaseStore(fakeDatabase);
store.stream('test queue');
const insertPromise = store.insert({
id: 'abc',
timestamp: 1234,
queueType: 'test queue',
data: { hi: 5 },
});
sinon.assert.notCalled(fakeDatabase.insertJob);
resolveGetJobsInQueue([]);
await insertPromise;
sinon.assert.calledOnce(fakeDatabase.insertJob);
assert.deepEqual(events, ['loaded jobs', 'insert']);
});
});
describe('delete', () => {
it('deletes jobs from the database', async () => {
const store = new JobQueueDatabaseStore(fakeDatabase);
await store.delete('xyz');
sinon.assert.calledOnce(fakeDatabase.deleteJob);
sinon.assert.calledWith(fakeDatabase.deleteJob, 'xyz');
});
});
describe('stream', () => {
it('yields all values in the database, then all values inserted', async () => {
const makeJob = (id: string, queueType: string) => ({
id,
timestamp: Date.now(),
queueType,
data: { hi: 5 },
});
const ids = async (
stream: AsyncIterable<StoredJob>,
amount: number
): Promise<Array<string>> => {
const result: Array<string> = [];
for await (const job of stream) {
result.push(job.id);
if (result.length >= amount) {
break;
}
}
return result;
};
fakeDatabase.getJobsInQueue
.withArgs('queue A')
.resolves([
makeJob('A.1', 'queue A'),
makeJob('A.2', 'queue A'),
makeJob('A.3', 'queue A'),
]);
fakeDatabase.getJobsInQueue.withArgs('queue B').resolves([]);
fakeDatabase.getJobsInQueue
.withArgs('queue C')
.resolves([makeJob('C.1', 'queue C'), makeJob('C.2', 'queue C')]);
const store = new JobQueueDatabaseStore(fakeDatabase);
const streamA = store.stream('queue A');
const streamB = store.stream('queue B');
const streamC = store.stream('queue C');
await store.insert(makeJob('A.4', 'queue A'));
await store.insert(makeJob('C.3', 'queue C'));
await store.insert(makeJob('B.1', 'queue B'));
await store.insert(makeJob('A.5', 'queue A'));
const streamAIds = await ids(streamA, 5);
const streamBIds = await ids(streamB, 1);
const streamCIds = await ids(streamC, 3);
assert.deepEqual(streamAIds, ['A.1', 'A.2', 'A.3', 'A.4', 'A.5']);
assert.deepEqual(streamBIds, ['B.1']);
assert.deepEqual(streamCIds, ['C.1', 'C.2', 'C.3']);
sinon.assert.calledThrice(fakeDatabase.getJobsInQueue);
});
});
});

View file

@ -0,0 +1,533 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { assert } from 'chai';
import * as sinon from 'sinon';
import EventEmitter, { once } from 'events';
import * as z from 'zod';
import { identity, noop, groupBy } from 'lodash';
import { v4 as uuid } from 'uuid';
import { JobError } from '../../jobs/JobError';
import { TestJobQueueStore } from './TestJobQueueStore';
import { missingCaseError } from '../../util/missingCaseError';
import { assertRejects } from '../helpers';
import { JobQueue } from '../../jobs/JobQueue';
import { ParsedJob, StoredJob, JobQueueStore } from '../../jobs/types';
describe('JobQueue', () => {
describe('end-to-end tests', () => {
it('writes jobs to the database, processes them, and then deletes them', async () => {
const testJobSchema = z.object({
a: z.number(),
b: z.number(),
});
type TestJobData = z.infer<typeof testJobSchema>;
const results = new Set<unknown>();
const store = new TestJobQueueStore();
const addQueue = new JobQueue({
store,
queueType: 'test add queue',
maxAttempts: 1,
parseData(data: unknown): TestJobData {
return testJobSchema.parse(data);
},
async run({ data }: ParsedJob<TestJobData>): Promise<void> {
results.add(data.a + data.b);
},
});
assert.deepEqual(results, new Set());
assert.isEmpty(store.storedJobs);
addQueue.streamJobs();
store.pauseStream('test add queue');
const job1 = await addQueue.add({ a: 1, b: 2 });
const job2 = await addQueue.add({ a: 3, b: 4 });
assert.deepEqual(results, new Set());
assert.lengthOf(store.storedJobs, 2);
store.resumeStream('test add queue');
await job1.completion;
await job2.completion;
assert.deepEqual(results, new Set([3, 7]));
assert.isEmpty(store.storedJobs);
});
it('writes jobs to the database correctly', async () => {
const store = new TestJobQueueStore();
const queue1 = new JobQueue({
store,
queueType: 'test 1',
maxAttempts: 1,
parseData: (data: unknown): string => {
return z.string().parse(data);
},
run: sinon.stub().resolves(),
});
const queue2 = new JobQueue({
store,
queueType: 'test 2',
maxAttempts: 1,
parseData: (data: unknown): string => {
return z.string().parse(data);
},
run(): Promise<void> {
return Promise.resolve();
},
});
store.pauseStream('test 1');
store.pauseStream('test 2');
queue1.streamJobs();
queue2.streamJobs();
await queue1.add('one');
await queue2.add('A');
await queue1.add('two');
await queue2.add('B');
await queue1.add('three');
assert.lengthOf(store.storedJobs, 5);
const ids = store.storedJobs.map(job => job.id);
assert.lengthOf(
store.storedJobs,
new Set(ids).size,
'Expected every job to have a unique ID'
);
const timestamps = store.storedJobs.map(job => job.timestamp);
timestamps.forEach(timestamp => {
assert.approximately(
timestamp,
Date.now(),
3000,
'Expected the timestamp to be ~now'
);
});
const datas = store.storedJobs.map(job => job.data);
assert.sameMembers(
datas,
['three', 'two', 'one', 'A', 'B'],
"Expected every job's data to be stored"
);
const queueTypes = groupBy(store.storedJobs, 'queueType');
assert.hasAllKeys(queueTypes, ['test 1', 'test 2']);
assert.lengthOf(queueTypes['test 1'], 3);
assert.lengthOf(queueTypes['test 2'], 2);
});
it('retries jobs, running them up to maxAttempts times', async () => {
type TestJobData = 'foo' | 'bar';
let fooAttempts = 0;
let barAttempts = 0;
let fooSucceeded = false;
const store = new TestJobQueueStore();
const retryQueue = new JobQueue({
store,
queueType: 'test retry queue',
maxAttempts: 5,
parseData(data: unknown): TestJobData {
if (data !== 'foo' && data !== 'bar') {
throw new Error('Invalid data');
}
return data;
},
async run({ data }: ParsedJob<TestJobData>): Promise<void> {
switch (data) {
case 'foo':
fooAttempts += 1;
if (fooAttempts < 3) {
throw new Error(
'foo job should fail the first and second time'
);
}
fooSucceeded = true;
break;
case 'bar':
barAttempts += 1;
throw new Error('bar job always fails in this test');
break;
default:
throw missingCaseError(data);
}
},
});
retryQueue.streamJobs();
await (await retryQueue.add('foo')).completion;
let booErr: unknown;
try {
await (await retryQueue.add('bar')).completion;
} catch (err: unknown) {
booErr = err;
}
assert.strictEqual(fooAttempts, 3);
assert.isTrue(fooSucceeded);
assert.strictEqual(barAttempts, 5);
// Chai's `assert.instanceOf` doesn't tell TypeScript anything, so we do it here.
if (!(booErr instanceof JobError)) {
assert.fail('Expected error to be a JobError');
return;
}
assert.include(booErr.message, 'bar job always fails in this test');
assert.isEmpty(store.storedJobs);
});
it('makes job.completion reject if parseData throws', async () => {
const queue = new JobQueue({
store: new TestJobQueueStore(),
queueType: 'test queue',
maxAttempts: 999,
parseData: (data: unknown): string => {
if (data === 'valid') {
return data;
}
throw new Error('uh oh');
},
run: sinon.stub().resolves(),
});
queue.streamJobs();
const job = await queue.add('this will fail to parse');
let jobError: unknown;
try {
await job.completion;
} catch (err: unknown) {
jobError = err;
}
// Chai's `assert.instanceOf` doesn't tell TypeScript anything, so we do it here.
if (!(jobError instanceof JobError)) {
assert.fail('Expected error to be a JobError');
return;
}
assert.include(
jobError.message,
'Failed to parse job data. Was unexpected data loaded from the database?'
);
});
it("doesn't run the job if parseData throws", async () => {
const run = sinon.stub().resolves();
const queue = new JobQueue({
store: new TestJobQueueStore(),
queueType: 'test queue',
maxAttempts: 999,
parseData: (data: unknown): string => {
if (data === 'valid') {
return data;
}
throw new Error('invalid data!');
},
run,
});
queue.streamJobs();
(await queue.add('invalid')).completion.catch(noop);
(await queue.add('invalid')).completion.catch(noop);
await queue.add('valid');
(await queue.add('invalid')).completion.catch(noop);
(await queue.add('invalid')).completion.catch(noop);
sinon.assert.calledOnce(run);
sinon.assert.calledWithMatch(run, { data: 'valid' });
});
it('keeps jobs in the storage if parseData throws', async () => {
const store = new TestJobQueueStore();
const queue = new JobQueue({
store,
queueType: 'test queue',
maxAttempts: 999,
parseData: (data: unknown): string => {
if (data === 'valid') {
return data;
}
throw new Error('uh oh');
},
run: sinon.stub().resolves(),
});
queue.streamJobs();
await (await queue.add('invalid 1')).completion.catch(noop);
await (await queue.add('invalid 2')).completion.catch(noop);
const datas = store.storedJobs.map(job => job.data);
assert.sameMembers(datas, ['invalid 1', 'invalid 2']);
});
it('adding the job resolves AFTER inserting the job into the database', async () => {
let inserted = false;
const store = new TestJobQueueStore();
store.events.on('insert', () => {
inserted = true;
});
const queue = new JobQueue({
store,
queueType: 'test queue',
maxAttempts: 999,
parseData: identity,
run: sinon.stub().resolves(),
});
queue.streamJobs();
const addPromise = queue.add(undefined);
assert.isFalse(inserted);
await addPromise;
assert.isTrue(inserted);
});
it('starts the job AFTER inserting the job into the database', async () => {
const events: Array<string> = [];
const store = new TestJobQueueStore();
store.events.on('insert', () => {
events.push('insert');
});
const queue = new JobQueue({
store,
queueType: 'test queue',
maxAttempts: 999,
parseData: (data: unknown): unknown => {
events.push('parsing data');
return data;
},
async run() {
events.push('running');
},
});
queue.streamJobs();
await (await queue.add(123)).completion;
assert.deepEqual(events, ['insert', 'parsing data', 'running']);
});
it('resolves job.completion AFTER deleting the job from the database', async () => {
const events: Array<string> = [];
const store = new TestJobQueueStore();
store.events.on('delete', () => {
events.push('delete');
});
const queue = new JobQueue({
store,
queueType: 'test queue',
maxAttempts: 999,
parseData: identity,
run: sinon.stub().resolves(),
});
queue.streamJobs();
store.pauseStream('test queue');
const job = await queue.add(undefined);
// eslint-disable-next-line more/no-then
const jobCompletionPromise = job.completion.then(() => {
events.push('resolved');
});
assert.lengthOf(store.storedJobs, 1);
store.resumeStream('test queue');
await jobCompletionPromise;
assert.deepEqual(events, ['delete', 'resolved']);
});
it('if the job fails after every attempt, rejects job.completion AFTER deleting the job from the database', async () => {
const events: Array<string> = [];
const store = new TestJobQueueStore();
store.events.on('delete', () => {
events.push('delete');
});
const queue = new JobQueue({
store,
queueType: 'test queue',
maxAttempts: 5,
parseData: identity,
async run() {
events.push('running');
throw new Error('uh oh');
},
});
queue.streamJobs();
store.pauseStream('test queue');
const job = await queue.add(undefined);
const jobCompletionPromise = job.completion.catch(() => {
events.push('rejected');
});
assert.lengthOf(store.storedJobs, 1);
store.resumeStream('test queue');
await jobCompletionPromise;
assert.deepEqual(events, [
'running',
'running',
'running',
'running',
'running',
'delete',
'rejected',
]);
});
});
describe('streamJobs', () => {
const storedJobSchema = z.object({
id: z.string(),
timestamp: z.number(),
queueType: z.string(),
data: z.unknown(),
});
class FakeStream implements AsyncIterable<StoredJob> {
private eventEmitter = new EventEmitter();
async *[Symbol.asyncIterator]() {
while (true) {
// eslint-disable-next-line no-await-in-loop
const [job] = await once(this.eventEmitter, 'drip');
yield storedJobSchema.parse(job);
}
}
drip(job: Readonly<StoredJob>): void {
this.eventEmitter.emit('drip', job);
}
}
let fakeStream: FakeStream;
let fakeStore: JobQueueStore;
beforeEach(() => {
fakeStream = new FakeStream();
fakeStore = {
insert: sinon.stub().resolves(),
delete: sinon.stub().resolves(),
stream: sinon.stub().returns(fakeStream),
};
});
it('starts streaming jobs from the store', async () => {
const eventEmitter = new EventEmitter();
const noopQueue = new JobQueue({
store: fakeStore,
queueType: 'test noop queue',
maxAttempts: 99,
parseData(data: unknown): number {
return z.number().parse(data);
},
async run({ data }: Readonly<{ data: number }>) {
eventEmitter.emit('run', data);
},
});
sinon.assert.notCalled(fakeStore.stream as sinon.SinonStub);
noopQueue.streamJobs();
sinon.assert.calledOnce(fakeStore.stream as sinon.SinonStub);
fakeStream.drip({
id: uuid(),
timestamp: Date.now(),
queueType: 'test noop queue',
data: 123,
});
const [firstRunData] = await once(eventEmitter, 'run');
fakeStream.drip({
id: uuid(),
timestamp: Date.now(),
queueType: 'test noop queue',
data: 456,
});
const [secondRunData] = await once(eventEmitter, 'run');
assert.strictEqual(firstRunData, 123);
assert.strictEqual(secondRunData, 456);
});
it('rejects when called more than once', async () => {
const noopQueue = new JobQueue({
store: fakeStore,
queueType: 'test noop queue',
maxAttempts: 99,
parseData: identity,
run: sinon.stub().resolves(),
});
noopQueue.streamJobs();
await assertRejects(() => noopQueue.streamJobs());
await assertRejects(() => noopQueue.streamJobs());
sinon.assert.calledOnce(fakeStore.stream as sinon.SinonStub);
});
});
describe('add', () => {
it('rejects if the job queue has not started streaming', async () => {
const fakeStore = {
insert: sinon.stub().resolves(),
delete: sinon.stub().resolves(),
stream: sinon.stub(),
};
const noopQueue = new JobQueue({
store: fakeStore,
queueType: 'test noop queue',
maxAttempts: 99,
parseData: identity,
run: sinon.stub().resolves(),
});
await assertRejects(() => noopQueue.add(undefined));
sinon.assert.notCalled(fakeStore.stream as sinon.SinonStub);
});
});
});

View file

@ -0,0 +1,24 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { assert } from 'chai';
import { Job } from '../../jobs/Job';
describe('Job', () => {
it('stores its arguments', () => {
const id = 'abc123';
const timestamp = Date.now();
const queueType = 'test queue';
const data = { foo: 'bar' };
const completion = Promise.resolve();
const job = new Job(id, timestamp, queueType, data, completion);
assert.strictEqual(job.id, id);
assert.strictEqual(job.timestamp, timestamp);
assert.strictEqual(job.queueType, queueType);
assert.strictEqual(job.data, data);
assert.strictEqual(job.completion, completion);
});
});

View file

@ -0,0 +1,131 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
/* eslint-disable max-classes-per-file */
/* eslint-disable no-restricted-syntax */
/* eslint-disable no-await-in-loop */
import EventEmitter, { once } from 'events';
import { JobQueueStore, StoredJob } from '../../jobs/types';
import { sleep } from '../../util/sleep';
export class TestJobQueueStore implements JobQueueStore {
events = new EventEmitter();
private openStreams = new Set<string>();
private pipes = new Map<string, Pipe>();
storedJobs: Array<StoredJob> = [];
constructor(jobs: ReadonlyArray<StoredJob> = []) {
jobs.forEach(job => {
this.insert(job);
});
}
async insert(job: Readonly<StoredJob>): Promise<void> {
await fakeDelay();
this.storedJobs.forEach(storedJob => {
if (job.id === storedJob.id) {
throw new Error('Cannot store two jobs with the same ID');
}
});
this.storedJobs.push(job);
this.getPipe(job.queueType).add(job);
this.events.emit('insert');
}
async delete(id: string): Promise<void> {
await fakeDelay();
this.storedJobs = this.storedJobs.filter(job => job.id !== id);
this.events.emit('delete');
}
stream(queueType: string): Pipe {
if (this.openStreams.has(queueType)) {
throw new Error('Cannot stream the same queueType more than once');
}
this.openStreams.add(queueType);
return this.getPipe(queueType);
}
pauseStream(queueType: string): void {
return this.getPipe(queueType).pause();
}
resumeStream(queueType: string): void {
return this.getPipe(queueType).resume();
}
private getPipe(queueType: string): Pipe {
const existingPipe = this.pipes.get(queueType);
if (existingPipe) {
return existingPipe;
}
const result = new Pipe();
this.pipes.set(queueType, result);
return result;
}
}
class Pipe implements AsyncIterable<StoredJob> {
private queue: Array<StoredJob> = [];
private eventEmitter = new EventEmitter();
private isLocked = false;
private isPaused = false;
add(value: Readonly<StoredJob>) {
this.queue.push(value);
this.eventEmitter.emit('add');
}
async *[Symbol.asyncIterator]() {
if (this.isLocked) {
throw new Error('Cannot iterate over a pipe more than once');
}
this.isLocked = true;
while (true) {
for (const value of this.queue) {
await this.waitForUnpaused();
yield value;
}
this.queue = [];
// We do this because we want to yield values in series.
await once(this.eventEmitter, 'add');
}
}
pause(): void {
this.isPaused = true;
}
resume(): void {
this.isPaused = false;
this.eventEmitter.emit('resume');
}
private async waitForUnpaused() {
if (this.isPaused) {
await once(this.eventEmitter, 'resume');
}
}
}
function fakeDelay(): Promise<void> {
return sleep(0);
}

48
ts/util/AsyncQueue.ts Normal file
View file

@ -0,0 +1,48 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { once, noop } from 'lodash';
/**
* You can do two things with an async queue:
*
* 1. Put values in.
* 2. Consume values out in the order they were added.
*
* Values are removed from the queue when they're consumed.
*
* There can only be one consumer, though this could be changed.
*
* See the tests to see how this works.
*/
export class AsyncQueue<T> implements AsyncIterable<T> {
private onAdd: () => void = noop;
private queue: Array<T> = [];
private isReading = false;
add(value: Readonly<T>): void {
this.queue.push(value);
this.onAdd();
}
async *[Symbol.asyncIterator](): AsyncIterator<T> {
if (this.isReading) {
throw new Error('Cannot iterate over a queue more than once');
}
this.isReading = true;
while (true) {
yield* this.queue;
this.queue = [];
// We want to iterate over the queue in series.
// eslint-disable-next-line no-await-in-loop
await new Promise<void>(resolve => {
this.onAdd = once(resolve);
});
}
}
}

42
ts/util/asyncIterables.ts Normal file
View file

@ -0,0 +1,42 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
/* eslint-disable max-classes-per-file */
/* eslint-disable no-await-in-loop */
/* eslint-disable no-restricted-syntax */
export type MaybeAsyncIterable<T> = Iterable<T> | AsyncIterable<T>;
export function concat<T>(
iterables: Iterable<MaybeAsyncIterable<T>>
): AsyncIterable<T> {
return new ConcatAsyncIterable(iterables);
}
class ConcatAsyncIterable<T> implements AsyncIterable<T> {
constructor(private readonly iterables: Iterable<MaybeAsyncIterable<T>>) {}
async *[Symbol.asyncIterator](): AsyncIterator<T> {
for (const iterable of this.iterables) {
for await (const value of iterable) {
yield value;
}
}
}
}
export function wrapPromise<T>(
promise: Promise<MaybeAsyncIterable<T>>
): AsyncIterable<T> {
return new WrapPromiseAsyncIterable(promise);
}
class WrapPromiseAsyncIterable<T> implements AsyncIterable<T> {
constructor(private readonly promise: Promise<MaybeAsyncIterable<T>>) {}
async *[Symbol.asyncIterator](): AsyncIterator<T> {
for await (const value of await this.promise) {
yield value;
}
}
}