From ca920686640faa16a6b11042492d7ce60d999332 Mon Sep 17 00:00:00 2001 From: Scott Nonnenberg Date: Wed, 10 Jan 2024 16:09:54 -0800 Subject: [PATCH] JobQueue: Allow jobs to be added before streaming starts --- ts/jobs/JobQueue.ts | 8 ++-- ts/jobs/JobQueueDatabaseStore.ts | 15 +++--- .../jobs/JobQueueDatabaseStore_test.ts | 47 ++++++++++++++----- ts/test-node/jobs/JobQueue_test.ts | 4 +- 4 files changed, 48 insertions(+), 26 deletions(-) diff --git a/ts/jobs/JobQueue.ts b/ts/jobs/JobQueue.ts index 09d1454564..5391c4df2d 100644 --- a/ts/jobs/JobQueue.ts +++ b/ts/jobs/JobQueue.ts @@ -167,14 +167,14 @@ export abstract class JobQueue { data: Readonly, insert?: (job: ParsedJob) => Promise ): Promise> { + const job = this.createJob(data); + if (!this.started) { - throw new Error( - `${this.logPrefix} has not started streaming. Make sure to call streamJobs().` + log.warn( + `${this.logPrefix} This queue has not started streaming, adding job ${job.id} to database only.` ); } - const job = this.createJob(data); - if (insert) { await insert(job); } diff --git a/ts/jobs/JobQueueDatabaseStore.ts b/ts/jobs/JobQueueDatabaseStore.ts index 0dc8aafdd4..fdce520ee6 100644 --- a/ts/jobs/JobQueueDatabaseStore.ts +++ b/ts/jobs/JobQueueDatabaseStore.ts @@ -35,20 +35,21 @@ export class JobQueueDatabaseStore implements JobQueueStore { ); const initialFetchPromise = this.initialFetchPromises.get(job.queueType); - if (!initialFetchPromise) { - throw new Error( - `JobQueueDatabaseStore tried to add job for queue ${JSON.stringify( - job.queueType - )} but streaming had not yet started` + if (initialFetchPromise) { + await initialFetchPromise; + } else { + log.warn( + `JobQueueDatabaseStore: added job for queue "${job.queueType}" but streaming has not yet started (shouldPersist=${shouldPersist})` ); } - await initialFetchPromise; if (shouldPersist) { await this.db.insertJob(formatJobForInsert(job)); } - this.getQueue(job.queueType).add(job); + if (initialFetchPromise) { + this.getQueue(job.queueType).add(job); + } } async delete(id: string): Promise { diff --git a/ts/test-node/jobs/JobQueueDatabaseStore_test.ts b/ts/test-node/jobs/JobQueueDatabaseStore_test.ts index 07e09c9f92..f79cc1d388 100644 --- a/ts/test-node/jobs/JobQueueDatabaseStore_test.ts +++ b/ts/test-node/jobs/JobQueueDatabaseStore_test.ts @@ -25,22 +25,23 @@ describe('JobQueueDatabaseStore', () => { }); describe('insert', () => { - it("fails if streaming hasn't started yet", async () => { + it("adds jobs to database even if streaming hasn't started yet", async () => { const store = new JobQueueDatabaseStore(fakeDatabase); - let error: unknown; - try { - await store.insert({ - id: 'abc', - timestamp: 1234, - queueType: 'test queue', - data: { hi: 5 }, - }); - } catch (err: unknown) { - error = err; - } + await store.insert({ + id: 'abc', + timestamp: 1234, + queueType: 'test queue', + data: { hi: 5 }, + }); - assert.instanceOf(error, Error); + sinon.assert.calledOnce(fakeDatabase.insertJob); + sinon.assert.calledWithMatch(fakeDatabase.insertJob, { + id: 'abc', + timestamp: 1234, + queueType: 'test queue', + data: { hi: 5 }, + }); }); it('adds jobs to the database', async () => { @@ -153,6 +154,26 @@ describe('JobQueueDatabaseStore', () => { sinon.assert.calledOnce(fakeDatabase.insertJob); assert.deepEqual(events, ['loaded jobs', 'insert']); }); + + it("adds jobs if we haven't started streaming at all", async () => { + const events: Array = []; + + fakeDatabase.insertJob.callsFake(() => { + events.push('insert'); + }); + + const store = new JobQueueDatabaseStore(fakeDatabase); + + await store.insert({ + id: 'abc', + timestamp: 1234, + queueType: 'test queue', + data: { hi: 5 }, + }); + + sinon.assert.calledOnce(fakeDatabase.insertJob); + assert.deepEqual(events, ['insert']); + }); }); describe('delete', () => { diff --git a/ts/test-node/jobs/JobQueue_test.ts b/ts/test-node/jobs/JobQueue_test.ts index 18461c9144..6632ad8217 100644 --- a/ts/test-node/jobs/JobQueue_test.ts +++ b/ts/test-node/jobs/JobQueue_test.ts @@ -838,7 +838,7 @@ describe('JobQueue', () => { }); describe('add', () => { - it('rejects if the job queue has not started streaming', async () => { + it('adds even if the job queue has not started streaming', async () => { const fakeStore = { insert: sinon.stub().resolves(), delete: sinon.stub().resolves(), @@ -861,7 +861,7 @@ describe('JobQueue', () => { maxAttempts: 99, }); - await assert.isRejected(noopQueue.add(undefined)); + await noopQueue.add(undefined); sinon.assert.notCalled(fakeStore.stream as sinon.SinonStub); });