// Copyright 2021 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only import { assert } from 'chai'; import * as sinon from 'sinon'; import { noop } from 'lodash'; import type { StoredJob } from '../../jobs/types'; import { JobQueueDatabaseStore } from '../../jobs/JobQueueDatabaseStore'; describe('JobQueueDatabaseStore', () => { let fakeDatabase: { getJobsInQueue: sinon.SinonStub; insertJob: sinon.SinonStub; deleteJob: sinon.SinonStub; }; beforeEach(() => { fakeDatabase = { getJobsInQueue: sinon.stub().resolves([]), insertJob: sinon.stub(), deleteJob: sinon.stub(), }; }); describe('insert', () => { it("fails if streaming hasn't started yet", async () => { const store = new JobQueueDatabaseStore(fakeDatabase); let error: unknown; try { await store.insert({ id: 'abc', timestamp: 1234, queueType: 'test queue', data: { hi: 5 }, }); } catch (err: unknown) { error = err; } assert.instanceOf(error, Error); }); it('adds jobs to the database', async () => { const store = new JobQueueDatabaseStore(fakeDatabase); store.stream('test queue'); await store.insert({ id: 'abc', timestamp: 1234, queueType: 'test queue', data: { hi: 5 }, }); sinon.assert.calledOnce(fakeDatabase.insertJob); sinon.assert.calledWithMatch(fakeDatabase.insertJob, { id: 'abc', timestamp: 1234, queueType: 'test queue', data: { hi: 5 }, }); }); it('enqueues jobs after putting them in the database', async () => { const events: Array = []; fakeDatabase.insertJob.callsFake(() => { events.push('insert'); }); const store = new JobQueueDatabaseStore(fakeDatabase); const streamPromise = (async () => { // We don't actually care about using the variable from the async iterable. // eslint-disable-next-line @typescript-eslint/no-unused-vars for await (const _job of store.stream('test queue')) { events.push('yielded job'); break; } })(); await store.insert({ id: 'abc', timestamp: 1234, queueType: 'test queue', data: { hi: 5 }, }); await streamPromise; assert.deepEqual(events, ['insert', 'yielded job']); }); it('can skip the database', async () => { const store = new JobQueueDatabaseStore(fakeDatabase); const streamPromise = (async () => { // We don't actually care about using the variable from the async iterable. // eslint-disable-next-line @typescript-eslint/no-unused-vars for await (const _job of store.stream('test queue')) { break; } })(); await store.insert( { id: 'abc', timestamp: 1234, queueType: 'test queue', data: { hi: 5 }, }, { shouldPersist: false } ); await streamPromise; sinon.assert.notCalled(fakeDatabase.insertJob); }); it("doesn't insert jobs until the initial fetch has completed", async () => { const events: Array = []; let resolveGetJobsInQueue = noop; const getJobsInQueuePromise = new Promise(resolve => { resolveGetJobsInQueue = resolve; }); fakeDatabase.getJobsInQueue.callsFake(() => { events.push('loaded jobs'); return getJobsInQueuePromise; }); fakeDatabase.insertJob.callsFake(() => { events.push('insert'); }); const store = new JobQueueDatabaseStore(fakeDatabase); store.stream('test queue'); const insertPromise = store.insert({ id: 'abc', timestamp: 1234, queueType: 'test queue', data: { hi: 5 }, }); sinon.assert.notCalled(fakeDatabase.insertJob); resolveGetJobsInQueue([]); await insertPromise; sinon.assert.calledOnce(fakeDatabase.insertJob); assert.deepEqual(events, ['loaded jobs', 'insert']); }); }); describe('delete', () => { it('deletes jobs from the database', async () => { const store = new JobQueueDatabaseStore(fakeDatabase); await store.delete('xyz'); sinon.assert.calledOnce(fakeDatabase.deleteJob); sinon.assert.calledWith(fakeDatabase.deleteJob, 'xyz'); }); }); describe('stream', () => { it('yields all values in the database, then all values inserted', async () => { const makeJob = (id: string, queueType: string) => ({ id, timestamp: Date.now(), queueType, data: { hi: 5 }, }); const ids = async ( stream: AsyncIterable, amount: number ): Promise> => { const result: Array = []; for await (const job of stream) { result.push(job.id); if (result.length >= amount) { break; } } return result; }; fakeDatabase.getJobsInQueue .withArgs('queue A') .resolves([ makeJob('A.1', 'queue A'), makeJob('A.2', 'queue A'), makeJob('A.3', 'queue A'), ]); fakeDatabase.getJobsInQueue.withArgs('queue B').resolves([]); fakeDatabase.getJobsInQueue .withArgs('queue C') .resolves([makeJob('C.1', 'queue C'), makeJob('C.2', 'queue C')]); const store = new JobQueueDatabaseStore(fakeDatabase); const streamA = store.stream('queue A'); const streamB = store.stream('queue B'); const streamC = store.stream('queue C'); await store.insert(makeJob('A.4', 'queue A')); await store.insert(makeJob('C.3', 'queue C')); await store.insert(makeJob('B.1', 'queue B')); await store.insert(makeJob('A.5', 'queue A')); const streamAIds = await ids(streamA, 5); const streamBIds = await ids(streamB, 1); const streamCIds = await ids(streamC, 3); assert.deepEqual(streamAIds, ['A.1', 'A.2', 'A.3', 'A.4', 'A.5']); assert.deepEqual(streamBIds, ['B.1']); assert.deepEqual(streamCIds, ['C.1', 'C.2', 'C.3']); sinon.assert.calledThrice(fakeDatabase.getJobsInQueue); }); }); });