Retry outbound reactions for up to a day
This commit is contained in:
parent
4a6b7968c1
commit
8670a4d864
25 changed files with 1444 additions and 473 deletions
|
@ -110,7 +110,7 @@ describe('JobQueueDatabaseStore', () => {
|
|||
queueType: 'test queue',
|
||||
data: { hi: 5 },
|
||||
},
|
||||
{ shouldInsertIntoDatabase: false }
|
||||
{ shouldPersist: false }
|
||||
);
|
||||
|
||||
await streamPromise;
|
||||
|
|
|
@ -228,6 +228,45 @@ describe('JobQueue', () => {
|
|||
assert.lengthOf(queueTypes['test 2'], 2);
|
||||
});
|
||||
|
||||
it('can override the insertion logic, skipping storage persistence', async () => {
|
||||
const store = new TestJobQueueStore();
|
||||
|
||||
class TestQueue extends JobQueue<string> {
|
||||
parseData(data: unknown): string {
|
||||
return z.string().parse(data);
|
||||
}
|
||||
|
||||
async run(): Promise<void> {
|
||||
return Promise.resolve();
|
||||
}
|
||||
}
|
||||
|
||||
const queue = new TestQueue({
|
||||
store,
|
||||
queueType: 'test queue',
|
||||
maxAttempts: 1,
|
||||
});
|
||||
|
||||
queue.streamJobs();
|
||||
|
||||
const insert = sinon.stub().resolves();
|
||||
|
||||
await queue.add('foo bar', insert);
|
||||
|
||||
assert.lengthOf(store.storedJobs, 0);
|
||||
|
||||
sinon.assert.calledOnce(insert);
|
||||
sinon.assert.calledWith(
|
||||
insert,
|
||||
sinon.match({
|
||||
id: sinon.match.string,
|
||||
timestamp: sinon.match.number,
|
||||
queueType: 'test queue',
|
||||
data: 'foo bar',
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it('retries jobs, running them up to maxAttempts times', async () => {
|
||||
type TestJobData = 'foo' | 'bar';
|
||||
|
||||
|
|
|
@ -24,7 +24,10 @@ export class TestJobQueueStore implements JobQueueStore {
|
|||
});
|
||||
}
|
||||
|
||||
async insert(job: Readonly<StoredJob>): Promise<void> {
|
||||
async insert(
|
||||
job: Readonly<StoredJob>,
|
||||
{ shouldPersist = true }: Readonly<{ shouldPersist?: boolean }> = {}
|
||||
): Promise<void> {
|
||||
await fakeDelay();
|
||||
|
||||
this.storedJobs.forEach(storedJob => {
|
||||
|
@ -33,7 +36,9 @@ export class TestJobQueueStore implements JobQueueStore {
|
|||
}
|
||||
});
|
||||
|
||||
this.storedJobs.push(job);
|
||||
if (shouldPersist) {
|
||||
this.storedJobs.push(job);
|
||||
}
|
||||
|
||||
this.getPipe(job.queueType).add(job);
|
||||
|
||||
|
|
44
ts/test-node/jobs/helpers/InMemoryQueues_test.ts
Normal file
44
ts/test-node/jobs/helpers/InMemoryQueues_test.ts
Normal file
|
@ -0,0 +1,44 @@
|
|||
// Copyright 2021 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import { assert } from 'chai';
|
||||
|
||||
import { InMemoryQueues } from '../../../jobs/helpers/InMemoryQueues';
|
||||
|
||||
describe('InMemoryQueues', () => {
|
||||
describe('get', () => {
|
||||
it('returns a new PQueue for each key', () => {
|
||||
const queues = new InMemoryQueues();
|
||||
|
||||
assert.strictEqual(queues.get('a'), queues.get('a'));
|
||||
assert.notStrictEqual(queues.get('a'), queues.get('b'));
|
||||
assert.notStrictEqual(queues.get('b'), queues.get('c'));
|
||||
});
|
||||
|
||||
it('returns a queue that only executes one thing at a time', () => {
|
||||
const queue = new InMemoryQueues().get('foo');
|
||||
|
||||
assert.strictEqual(queue.concurrency, 1);
|
||||
});
|
||||
|
||||
it('cleans up the queues when all tasks have run', async () => {
|
||||
const queues = new InMemoryQueues();
|
||||
|
||||
const originalQueue = queues.get('foo');
|
||||
|
||||
originalQueue.pause();
|
||||
const tasksPromise = originalQueue.addAll([
|
||||
async () => {
|
||||
assert.strictEqual(queues.get('foo'), originalQueue);
|
||||
},
|
||||
async () => {
|
||||
assert.strictEqual(queues.get('foo'), originalQueue);
|
||||
},
|
||||
]);
|
||||
originalQueue.start();
|
||||
await tasksPromise;
|
||||
|
||||
assert.notStrictEqual(queues.get('foo'), originalQueue);
|
||||
});
|
||||
});
|
||||
});
|
Loading…
Add table
Add a link
Reference in a new issue