Retry outbound read syncs for up to 24 hours

This commit is contained in:
Evan Hahn 2021-07-23 17:02:36 -05:00 committed by GitHub
parent fc33e9be41
commit 18140c4a9d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 366 additions and 19 deletions

View file

@ -93,7 +93,10 @@ export abstract class JobQueue<T> {
* If it rejects, the job will be retried up to `maxAttempts - 1` times, after which it * If it rejects, the job will be retried up to `maxAttempts - 1` times, after which it
* will be deleted from the store. * will be deleted from the store.
*/ */
protected abstract run(job: Readonly<ParsedJob<T>>): Promise<void>; protected abstract run(
job: Readonly<ParsedJob<T>>,
extra?: Readonly<{ attempt: number }>
): Promise<void>;
/** /**
* Start streaming jobs from the store. * Start streaming jobs from the store.
@ -198,7 +201,7 @@ export abstract class JobQueue<T> {
// We want an `await` in the loop, as we don't want a single job running more // We want an `await` in the loop, as we don't want a single job running more
// than once at a time. Ideally, the job will succeed on the first attempt. // than once at a time. Ideally, the job will succeed on the first attempt.
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
await this.run(parsedJob); await this.run(parsedJob, { attempt });
result = { success: true }; result = { success: true };
log.info( log.info(
`${this.logPrefix} job ${storedJob.id} succeeded on attempt ${attempt}` `${this.logPrefix} job ${storedJob.id} succeeded on attempt ${attempt}`

View file

@ -3,6 +3,7 @@
import type { WebAPIType } from '../textsecure/WebAPI'; import type { WebAPIType } from '../textsecure/WebAPI';
import { readSyncJobQueue } from './readSyncJobQueue';
import { removeStorageKeyJobQueue } from './removeStorageKeyJobQueue'; import { removeStorageKeyJobQueue } from './removeStorageKeyJobQueue';
import { reportSpamJobQueue } from './reportSpamJobQueue'; import { reportSpamJobQueue } from './reportSpamJobQueue';
@ -16,6 +17,7 @@ export function initializeAllJobQueues({
}): void { }): void {
reportSpamJobQueue.initialize({ server }); reportSpamJobQueue.initialize({ server });
readSyncJobQueue.streamJobs();
removeStorageKeyJobQueue.streamJobs(); removeStorageKeyJobQueue.streamJobs();
reportSpamJobQueue.streamJobs(); reportSpamJobQueue.streamJobs();
} }

118
ts/jobs/readSyncJobQueue.ts Normal file
View file

@ -0,0 +1,118 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
/* eslint-disable class-methods-use-this */
import * as z from 'zod';
import * as moment from 'moment';
import { getSendOptions } from '../util/getSendOptions';
import { handleMessageSend } from '../util/handleMessageSend';
import { isNotNil } from '../util/isNotNil';
import { sleep } from '../util/sleep';
import {
exponentialBackoffSleepTime,
exponentialBackoffMaxAttempts,
} from '../util/exponentialBackoff';
import * as log from '../logging/log';
import { isDone as isDeviceLinked } from '../util/registration';
import { waitForOnline } from '../util/waitForOnline';
import { parseIntWithFallback } from '../util/parseIntWithFallback';
import { JobQueue } from './JobQueue';
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
const MAX_RETRY_TIME = moment.duration(1, 'day').asMilliseconds();
const readSyncJobDataSchema = z.object({
readSyncs: z.array(
z.object({
messageId: z.string().optional(),
senderE164: z.string().optional(),
senderUuid: z.string().optional(),
timestamp: z.number(),
})
),
});
export type ReadSyncJobData = z.infer<typeof readSyncJobDataSchema>;
export class ReadSyncJobQueue extends JobQueue<ReadSyncJobData> {
protected parseData(data: unknown): ReadSyncJobData {
return readSyncJobDataSchema.parse(data);
}
protected async run(
{ data, timestamp }: Readonly<{ data: ReadSyncJobData; timestamp: number }>,
{ attempt }: Readonly<{ attempt: number }>
): Promise<void> {
const { readSyncs } = data;
if (!readSyncs.length) {
log.info(
"readSyncJobQueue: skipping this job because there's nothing to sync"
);
return;
}
const maxJobAge = timestamp + MAX_RETRY_TIME;
const timeRemaining = maxJobAge - Date.now();
if (timeRemaining <= 0) {
log.info("readSyncJobQueue: giving up because it's been too long");
return;
}
try {
await waitForOnline(window.navigator, window, { timeout: timeRemaining });
} catch (err) {
log.info("readSyncJobQueue: didn't come online in time, giving up");
return;
}
await new Promise<void>(resolve => {
window.storage.onready(resolve);
});
if (!isDeviceLinked()) {
log.info("readSyncJobQueue: skipping this job because we're unlinked");
return;
}
await sleep(exponentialBackoffSleepTime(attempt));
const messageIds = readSyncs.map(item => item.messageId).filter(isNotNil);
const ourConversation = window.ConversationController.getOurConversationOrThrow();
const sendOptions = await getSendOptions(ourConversation.attributes, {
syncMessage: true,
});
try {
await handleMessageSend(
window.textsecure.messaging.syncReadMessages(readSyncs, sendOptions),
{ messageIds, sendType: 'readSync' }
);
} catch (err: unknown) {
if (!(err instanceof Error)) {
throw err;
}
const code = parseIntWithFallback(err.code, -1);
if (code === 508) {
log.info(
'readSyncJobQueue: server responded with 508. Giving up on this job'
);
return;
}
throw err;
}
}
}
export const readSyncJobQueue = new ReadSyncJobQueue({
store: jobQueueDatabaseStore,
queueType: 'read sync',
maxAttempts: exponentialBackoffMaxAttempts(MAX_RETRY_TIME),
});

View file

@ -0,0 +1,46 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { assert } from 'chai';
import * as moment from 'moment';
import {
exponentialBackoffSleepTime,
exponentialBackoffMaxAttempts,
} from '../../util/exponentialBackoff';
describe('exponential backoff utilities', () => {
describe('exponentialBackoffSleepTime', () => {
it('returns slowly growing values', () => {
assert.strictEqual(exponentialBackoffSleepTime(1), 0);
assert.strictEqual(exponentialBackoffSleepTime(2), 190);
assert.strictEqual(exponentialBackoffSleepTime(3), 361);
assert.approximately(exponentialBackoffSleepTime(4), 686, 1);
assert.approximately(exponentialBackoffSleepTime(5), 1303, 1);
});
it('plateaus at a maximum after 15 attempts', () => {
const maximum = moment.duration(15, 'minutes').asMilliseconds();
for (let attempt = 16; attempt < 100; attempt += 1) {
assert.strictEqual(exponentialBackoffSleepTime(attempt), maximum);
}
});
});
describe('exponentialBackoffMaxAttempts', () => {
it('returns 2 attempts for a short period of time', () => {
assert.strictEqual(exponentialBackoffMaxAttempts(1), 2);
assert.strictEqual(exponentialBackoffMaxAttempts(99), 2);
});
it('returns 6 attempts for a 5 seconds', () => {
assert.strictEqual(exponentialBackoffMaxAttempts(5000), 6);
});
it('returns 110 attempts for 1 day', () => {
// This is a test case that is lifted from iOS's codebase.
const oneDay = moment.duration(24, 'hours').asMilliseconds();
assert.strictEqual(exponentialBackoffMaxAttempts(oneDay), 110);
});
});
});

View file

@ -7,6 +7,16 @@ import * as sinon from 'sinon';
import { waitForOnline } from '../../util/waitForOnline'; import { waitForOnline } from '../../util/waitForOnline';
describe('waitForOnline', () => { describe('waitForOnline', () => {
let sandbox: sinon.SinonSandbox;
beforeEach(() => {
sandbox = sinon.createSandbox();
});
afterEach(() => {
sandbox.restore();
});
function getFakeWindow(): EventTarget { function getFakeWindow(): EventTarget {
const result = new EventTarget(); const result = new EventTarget();
sinon.stub(result, 'addEventListener'); sinon.stub(result, 'addEventListener');
@ -24,7 +34,7 @@ describe('waitForOnline', () => {
sinon.assert.notCalled(fakeWindow.removeEventListener as sinon.SinonStub); sinon.assert.notCalled(fakeWindow.removeEventListener as sinon.SinonStub);
}); });
it("if you're offline, resolves as soon as you're online", async () => { it("if you're offline, resolves as soon as you're online (and cleans up listeners)", async () => {
const fakeNavigator = { onLine: false }; const fakeNavigator = { onLine: false };
const fakeWindow = getFakeWindow(); const fakeWindow = getFakeWindow();
@ -48,4 +58,88 @@ describe('waitForOnline', () => {
sinon.assert.calledOnce(fakeWindow.addEventListener as sinon.SinonStub); sinon.assert.calledOnce(fakeWindow.addEventListener as sinon.SinonStub);
sinon.assert.calledOnce(fakeWindow.removeEventListener as sinon.SinonStub); sinon.assert.calledOnce(fakeWindow.removeEventListener as sinon.SinonStub);
}); });
it("resolves immediately if you're online when passed a timeout", async () => {
const fakeNavigator = { onLine: true };
const fakeWindow = getFakeWindow();
await waitForOnline(fakeNavigator, fakeWindow, { timeout: 1234 });
sinon.assert.notCalled(fakeWindow.addEventListener as sinon.SinonStub);
sinon.assert.notCalled(fakeWindow.removeEventListener as sinon.SinonStub);
});
it("resolves immediately if you're online even if passed a timeout of 0", async () => {
const fakeNavigator = { onLine: true };
const fakeWindow = getFakeWindow();
await waitForOnline(fakeNavigator, fakeWindow, { timeout: 0 });
sinon.assert.notCalled(fakeWindow.addEventListener as sinon.SinonStub);
sinon.assert.notCalled(fakeWindow.removeEventListener as sinon.SinonStub);
});
it("if you're offline, resolves as soon as you're online if it happens before the timeout", async () => {
const clock = sandbox.useFakeTimers();
const fakeNavigator = { onLine: false };
const fakeWindow = getFakeWindow();
(fakeWindow.addEventListener as sinon.SinonStub)
.withArgs('online')
.callsFake((_eventName: string, callback: () => void) => {
setTimeout(callback, 1000);
});
let done = false;
(async () => {
await waitForOnline(fakeNavigator, fakeWindow, { timeout: 9999 });
done = true;
})();
await clock.tickAsync(600);
assert.isFalse(done);
await clock.tickAsync(500);
assert.isTrue(done);
});
it('rejects if too much time has passed, and cleans up listeners', async () => {
const clock = sandbox.useFakeTimers();
const fakeNavigator = { onLine: false };
const fakeWindow = getFakeWindow();
(fakeWindow.addEventListener as sinon.SinonStub)
.withArgs('online')
.callsFake((_eventName: string, callback: () => void) => {
setTimeout(callback, 9999);
});
const promise = waitForOnline(fakeNavigator, fakeWindow, {
timeout: 100,
});
await clock.tickAsync(500);
await assert.isRejected(promise);
sinon.assert.calledOnce(fakeWindow.removeEventListener as sinon.SinonStub);
});
it('rejects if offline and passed a timeout of 0', async () => {
const fakeNavigator = { onLine: false };
const fakeWindow = getFakeWindow();
(fakeWindow.addEventListener as sinon.SinonStub)
.withArgs('online')
.callsFake((_eventName: string, callback: () => void) => {
setTimeout(callback, 9999);
});
const promise = waitForOnline(fakeNavigator, fakeWindow, { timeout: 0 });
await assert.isRejected(promise);
});
}); });

View file

@ -204,6 +204,42 @@ describe('JobQueue', () => {
assert.isEmpty(store.storedJobs); assert.isEmpty(store.storedJobs);
}); });
it('passes the attempt number to the run function', async () => {
const attempts: Array<number> = [];
const store = new TestJobQueueStore();
class TestQueue extends JobQueue<string> {
parseData(data: unknown): string {
return z.string().parse(data);
}
async run(
_: unknown,
{ attempt }: Readonly<{ attempt: number }>
): Promise<void> {
attempts.push(attempt);
throw new Error('this job always fails');
}
}
const queue = new TestQueue({
store,
queueType: 'test',
maxAttempts: 6,
});
queue.streamJobs();
try {
await (await queue.add('foo')).completion;
} catch (err: unknown) {
// We expect this to fail.
}
assert.deepStrictEqual(attempts, [1, 2, 3, 4, 5, 6]);
});
it('makes job.completion reject if parseData throws', async () => { it('makes job.completion reject if parseData throws', async () => {
class TestQueue extends JobQueue<string> { class TestQueue extends JobQueue<string> {
parseData(data: unknown): string { parseData(data: unknown): string {

View file

@ -0,0 +1,45 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import * as moment from 'moment';
const BACKOFF_FACTOR = 1.9;
const MAX_BACKOFF = moment.duration(15, 'minutes').asMilliseconds();
/**
* For a given attempt, how long should we sleep (in milliseconds)?
*
* The attempt should be a positive integer, and it is 1-indexed. The first attempt is 1,
* the second is 2, and so on.
*
* This is modified from [iOS's codebase][0].
*
* [0]: https://github.com/signalapp/Signal-iOS/blob/6069741602421744edfb59923d2fb3a66b1b23c1/SignalServiceKit/src/Util/OWSOperation.swift
*/
export function exponentialBackoffSleepTime(attempt: number): number {
const failureCount = attempt - 1;
if (failureCount === 0) {
return 0;
}
return Math.min(MAX_BACKOFF, 100 * BACKOFF_FACTOR ** failureCount);
}
/**
* If I want to retry for X milliseconds, how many attempts is that, roughly? For example,
* 24 hours (86,400,000 milliseconds) is 111 attempts.
*
* `desiredDurationMs` should be at least 1.
*/
export function exponentialBackoffMaxAttempts(
desiredDurationMs: number
): number {
let attempts = 0;
let total = 0;
// There's probably some algebra we could do here instead of this loop, but this is
// fast even for giant numbers, and is typically called just once at startup.
do {
attempts += 1;
total += exponentialBackoffSleepTime(attempts);
} while (total < desiredDurationMs);
return attempts;
}

View file

@ -2,11 +2,9 @@
// SPDX-License-Identifier: AGPL-3.0-only // SPDX-License-Identifier: AGPL-3.0-only
import { ConversationAttributesType } from '../model-types.d'; import { ConversationAttributesType } from '../model-types.d';
import { handleMessageSend } from './handleMessageSend';
import { getSendOptions } from './getSendOptions';
import { sendReadReceiptsFor } from './sendReadReceiptsFor'; import { sendReadReceiptsFor } from './sendReadReceiptsFor';
import { hasErrors } from '../state/selectors/message'; import { hasErrors } from '../state/selectors/message';
import { isNotNil } from './isNotNil'; import { readSyncJobQueue } from '../jobs/readSyncJobQueue';
export async function markConversationRead( export async function markConversationRead(
conversationAttrs: ConversationAttributesType, conversationAttrs: ConversationAttributesType,
@ -105,26 +103,17 @@ export async function markConversationRead(
...unreadMessagesSyncData, ...unreadMessagesSyncData,
...Array.from(unreadReactionSyncData.values()), ...Array.from(unreadReactionSyncData.values()),
]; ];
const messageIds = readSyncs.map(item => item.messageId).filter(isNotNil);
if (readSyncs.length && options.sendReadReceipts) { if (readSyncs.length && options.sendReadReceipts) {
window.log.info(`Sending ${readSyncs.length} read syncs`); window.log.info(`Sending ${readSyncs.length} read syncs`);
// Because syncReadMessages sends to our other devices, and sendReadReceipts goes // Because syncReadMessages sends to our other devices, and sendReadReceipts goes
// to a contact, we need accessKeys for both. // to a contact, we need accessKeys for both.
const ourConversation = window.ConversationController.getOurConversationOrThrow();
const sendOptions = await getSendOptions(ourConversation.attributes, {
syncMessage: true,
});
if (window.ConversationController.areWePrimaryDevice()) { if (window.ConversationController.areWePrimaryDevice()) {
window.log.warn( window.log.warn(
'markConversationRead: We are primary device; not sending read syncs' 'markConversationRead: We are primary device; not sending read syncs'
); );
} else { } else {
await handleMessageSend( readSyncJobQueue.add({ readSyncs });
window.textsecure.messaging.syncReadMessages(readSyncs, sendOptions),
{ messageIds, sendType: 'readSync' }
);
} }
await sendReadReceiptsFor(conversationAttrs, unreadMessagesSyncData); await sendReadReceiptsFor(conversationAttrs, unreadMessagesSyncData);

View file

@ -3,19 +3,33 @@
export function waitForOnline( export function waitForOnline(
navigator: Readonly<{ onLine: boolean }>, navigator: Readonly<{ onLine: boolean }>,
onlineEventTarget: EventTarget onlineEventTarget: EventTarget,
options: Readonly<{ timeout?: number }> = {}
): Promise<void> { ): Promise<void> {
return new Promise(resolve => { const { timeout } = options;
return new Promise((resolve, reject) => {
if (navigator.onLine) { if (navigator.onLine) {
resolve(); resolve();
return; return;
} }
const listener = () => { const listener = () => {
onlineEventTarget.removeEventListener('online', listener); cleanup();
resolve(); resolve();
}; };
const cleanup = () => {
onlineEventTarget.removeEventListener('online', listener);
};
onlineEventTarget.addEventListener('online', listener); onlineEventTarget.addEventListener('online', listener);
if (timeout !== undefined) {
setTimeout(() => {
cleanup();
reject(new Error('waitForOnline: did not come online in time'));
}, timeout);
}
}); });
} }