JobQueue: Allow jobs to be added before streaming starts

This commit is contained in:
Scott Nonnenberg 2024-01-10 16:09:54 -08:00
parent 2394a25fc1
commit ca92068664
4 changed files with 48 additions and 26 deletions

View file

@ -167,14 +167,14 @@ export abstract class JobQueue<T> {
data: Readonly<T>, data: Readonly<T>,
insert?: (job: ParsedJob<T>) => Promise<void> insert?: (job: ParsedJob<T>) => Promise<void>
): Promise<Job<T>> { ): Promise<Job<T>> {
const job = this.createJob(data);
if (!this.started) { if (!this.started) {
throw new Error( log.warn(
`${this.logPrefix} has not started streaming. Make sure to call streamJobs().` `${this.logPrefix} This queue has not started streaming, adding job ${job.id} to database only.`
); );
} }
const job = this.createJob(data);
if (insert) { if (insert) {
await insert(job); await insert(job);
} }

View file

@ -35,20 +35,21 @@ export class JobQueueDatabaseStore implements JobQueueStore {
); );
const initialFetchPromise = this.initialFetchPromises.get(job.queueType); const initialFetchPromise = this.initialFetchPromises.get(job.queueType);
if (!initialFetchPromise) { if (initialFetchPromise) {
throw new Error( await initialFetchPromise;
`JobQueueDatabaseStore tried to add job for queue ${JSON.stringify( } else {
job.queueType log.warn(
)} but streaming had not yet started` `JobQueueDatabaseStore: added job for queue "${job.queueType}" but streaming has not yet started (shouldPersist=${shouldPersist})`
); );
} }
await initialFetchPromise;
if (shouldPersist) { if (shouldPersist) {
await this.db.insertJob(formatJobForInsert(job)); await this.db.insertJob(formatJobForInsert(job));
} }
this.getQueue(job.queueType).add(job); if (initialFetchPromise) {
this.getQueue(job.queueType).add(job);
}
} }
async delete(id: string): Promise<void> { async delete(id: string): Promise<void> {

View file

@ -25,22 +25,23 @@ describe('JobQueueDatabaseStore', () => {
}); });
describe('insert', () => { describe('insert', () => {
it("fails if streaming hasn't started yet", async () => { it("adds jobs to database even if streaming hasn't started yet", async () => {
const store = new JobQueueDatabaseStore(fakeDatabase); const store = new JobQueueDatabaseStore(fakeDatabase);
let error: unknown; await store.insert({
try { id: 'abc',
await store.insert({ timestamp: 1234,
id: 'abc', queueType: 'test queue',
timestamp: 1234, data: { hi: 5 },
queueType: 'test queue', });
data: { hi: 5 },
});
} catch (err: unknown) {
error = err;
}
assert.instanceOf(error, Error); sinon.assert.calledOnce(fakeDatabase.insertJob);
sinon.assert.calledWithMatch(fakeDatabase.insertJob, {
id: 'abc',
timestamp: 1234,
queueType: 'test queue',
data: { hi: 5 },
});
}); });
it('adds jobs to the database', async () => { it('adds jobs to the database', async () => {
@ -153,6 +154,26 @@ describe('JobQueueDatabaseStore', () => {
sinon.assert.calledOnce(fakeDatabase.insertJob); sinon.assert.calledOnce(fakeDatabase.insertJob);
assert.deepEqual(events, ['loaded jobs', 'insert']); assert.deepEqual(events, ['loaded jobs', 'insert']);
}); });
it("adds jobs if we haven't started streaming at all", async () => {
const events: Array<string> = [];
fakeDatabase.insertJob.callsFake(() => {
events.push('insert');
});
const store = new JobQueueDatabaseStore(fakeDatabase);
await store.insert({
id: 'abc',
timestamp: 1234,
queueType: 'test queue',
data: { hi: 5 },
});
sinon.assert.calledOnce(fakeDatabase.insertJob);
assert.deepEqual(events, ['insert']);
});
}); });
describe('delete', () => { describe('delete', () => {

View file

@ -838,7 +838,7 @@ describe('JobQueue', () => {
}); });
describe('add', () => { describe('add', () => {
it('rejects if the job queue has not started streaming', async () => { it('adds even if the job queue has not started streaming', async () => {
const fakeStore = { const fakeStore = {
insert: sinon.stub().resolves(), insert: sinon.stub().resolves(),
delete: sinon.stub().resolves(), delete: sinon.stub().resolves(),
@ -861,7 +861,7 @@ describe('JobQueue', () => {
maxAttempts: 99, maxAttempts: 99,
}); });
await assert.isRejected(noopQueue.add(undefined)); await noopQueue.add(undefined);
sinon.assert.notCalled(fakeStore.stream as sinon.SinonStub); sinon.assert.notCalled(fakeStore.stream as sinon.SinonStub);
}); });