From b68e73195046ea48b5608ea06e8d86050341e655 Mon Sep 17 00:00:00 2001 From: Scott Nonnenberg Date: Tue, 1 Oct 2024 08:23:32 +1000 Subject: [PATCH] Deduplicate and cancel unneeded retry requests --- package-lock.json | 10 +- package.json | 2 +- ts/background.ts | 37 +++-- ts/groups.ts | 2 +- ts/models/conversations.ts | 2 +- ts/services/storage.ts | 127 +++++++++------- ts/services/username.ts | 2 +- ts/services/usernameIntegrity.ts | 2 +- ts/state/ducks/calling.ts | 4 +- ts/state/ducks/stickers.ts | 4 +- ts/state/ducks/storyDistributionLists.ts | 18 ++- ts/state/ducks/username.ts | 4 +- ts/test-mock/messaging/retries_test.ts | 173 ++++++++++++++++++++++ ts/test-mock/messaging/sendSync_test.ts | 2 +- ts/test-mock/messaging/sender_key_test.ts | 1 + ts/test-mock/pnp/merge_test.ts | 19 ++- ts/test-mock/storage/drop_test.ts | 76 +++++----- ts/textsecure/MessageReceiver.ts | 37 ++++- ts/textsecure/messageReceiverEvents.ts | 15 ++ ts/util/callDisposition.ts | 2 +- ts/util/handleRetry.ts | 93 ++++++++++-- ts/util/markOnboardingStoryAsRead.ts | 2 +- 22 files changed, 473 insertions(+), 161 deletions(-) create mode 100644 ts/test-mock/messaging/retries_test.ts diff --git a/package-lock.json b/package-lock.json index 05c384798460..3a6ecc97b4f0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -126,7 +126,7 @@ "@indutny/parallel-prettier": "3.0.0", "@indutny/rezip-electron": "1.3.1", "@indutny/symbolicate-mac": "2.3.0", - "@signalapp/mock-server": "6.10.0", + "@signalapp/mock-server": "6.11.0", "@storybook/addon-a11y": "8.1.11", "@storybook/addon-actions": "8.1.11", "@storybook/addon-controls": "8.1.11", @@ -7274,12 +7274,13 @@ } }, "node_modules/@signalapp/mock-server": { - "version": "6.10.0", - "resolved": "https://registry.npmjs.org/@signalapp/mock-server/-/mock-server-6.10.0.tgz", - "integrity": "sha512-StUP0vIKN43T1PDeyIyr7+PBMW/1mxjPOiHFzF7VDKvC53Q2pX84OCd6hsW6m16Tsjrxm5B7gVLIh2e4OYbkdQ==", + "version": "6.11.0", + "resolved": "https://registry.npmjs.org/@signalapp/mock-server/-/mock-server-6.11.0.tgz", + "integrity": "sha512-wIPUtsLcngcum0dkGuJ1YfTuWso7DhH0JZUA5ZS82XlmF2Qz0ZF03AdeeNcQx4XY4mJU1+nIEhTZpB0AFqEM4Q==", "dev": true, "license": "AGPL-3.0-only", "dependencies": { + "@indutny/parallel-prettier": "^3.0.0", "@signalapp/libsignal-client": "^0.45.0", "@tus/file-store": "^1.4.0", "@tus/server": "^1.7.0", @@ -7287,6 +7288,7 @@ "long": "^4.0.0", "micro": "^9.3.4", "microrouter": "^3.1.3", + "prettier": "^3.3.3", "protobufjs": "^7.2.4", "url-pattern": "^1.0.3", "uuid": "^8.3.2", diff --git a/package.json b/package.json index 29916050cc5a..7767fcbce962 100644 --- a/package.json +++ b/package.json @@ -210,7 +210,7 @@ "@indutny/parallel-prettier": "3.0.0", "@indutny/rezip-electron": "1.3.1", "@indutny/symbolicate-mac": "2.3.0", - "@signalapp/mock-server": "6.10.0", + "@signalapp/mock-server": "6.11.0", "@storybook/addon-a11y": "8.1.11", "@storybook/addon-actions": "8.1.11", "@storybook/addon-controls": "8.1.11", diff --git a/ts/background.ts b/ts/background.ts index c5a6dd38c3f7..55c627f874d3 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -1,7 +1,7 @@ // Copyright 2020 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only -import { isNumber, throttle, groupBy } from 'lodash'; +import { isNumber, groupBy, throttle } from 'lodash'; import { render } from 'react-dom'; import { batch as batchDispatch } from 'react-redux'; import PQueue from 'p-queue'; @@ -77,7 +77,6 @@ import { parseIntOrThrow } from './util/parseIntOrThrow'; import { getProfile } from './util/getProfile'; import type { ConfigurationEvent, - DecryptionErrorEvent, DeliveryEvent, EnvelopeQueuedEvent, EnvelopeUnsealedEvent, @@ -129,9 +128,10 @@ import { InstallScreenStep } from './types/InstallScreen'; import { getEnvironment } from './environment'; import { SignalService as Proto } from './protobuf'; import { + getOnDecryptionError, onRetryRequest, - onDecryptionError, onInvalidPlaintextMessage, + onSuccessfulDecrypt, } from './util/handleRetry'; import { themeChanged } from './shims/themeChanged'; import { createIPCEvents } from './util/createIPCEvents'; @@ -618,11 +618,14 @@ export async function startApp(): Promise { 'error', queuedEventListener(onError, false) ); + + messageReceiver.addEventListener( + 'successful-decrypt', + queuedEventListener(onSuccessfulDecrypt) + ); messageReceiver.addEventListener( 'decryption-error', - queuedEventListener((event: DecryptionErrorEvent): void => { - drop(onDecryptionErrorQueue.add(() => onDecryptionError(event))); - }) + queuedEventListener(getOnDecryptionError(() => onDecryptionErrorQueue)) ); messageReceiver.addEventListener( 'invalid-plaintext', @@ -1308,7 +1311,7 @@ export async function startApp(): Promise { remotelyExpired = true; }); - async function runStorageService() { + async function runStorageService({ reason }: { reason: string }) { if (window.storage.get('backupDownloadPath')) { log.info( 'background: not running storage service while downloading backup' @@ -1316,7 +1319,9 @@ export async function startApp(): Promise { return; } StorageService.enableStorageService(); - StorageService.runStorageServiceSyncJob(); + StorageService.runStorageServiceSyncJob({ + reason: `runStorageService/${reason}`, + }); } async function start() { @@ -1429,7 +1434,7 @@ export async function startApp(): Promise { afterStart(); // Run storage service after linking - drop(runStorageService()); + drop(runStorageService({ reason: 'background/registration_done' })); }); cancelInitializationMessage(); @@ -1600,7 +1605,7 @@ export async function startApp(): Promise { log.warn('downloadBackup: no backup download path, skipping'); backupReady.resolve(); server.registerRequestHandler(messageReceiver); - drop(runStorageService()); + drop(runStorageService({ reason: 'downloadBackup/noPath' })); return; } @@ -1623,7 +1628,7 @@ export async function startApp(): Promise { log.info('downloadBackup: processing websocket messages, storage service'); backupReady.resolve(); server.registerRequestHandler(messageReceiver); - drop(runStorageService()); + drop(runStorageService({ reason: 'downloadBackup/complete' })); } window.getSyncRequest = (timeoutMillis?: number) => { @@ -1790,7 +1795,7 @@ export async function startApp(): Promise { if (connectCount === 1) { Stickers.downloadQueuedPacks(); if (!newVersion) { - drop(runStorageService()); + drop(runStorageService({ reason: 'connect/connectCount=1' })); } } @@ -1808,7 +1813,7 @@ export async function startApp(): Promise { window.getSyncRequest(); void StorageService.reprocessUnknownFields(); - void runStorageService(); + void runStorageService({ reason: 'connect/bootAfterUpgrade' }); const manager = window.getAccountManager(); await Promise.all([ @@ -1904,7 +1909,7 @@ export async function startApp(): Promise { MessageSender.getRequestConfigurationSyncMessage() ), singleProtoJobQueue.add(MessageSender.getRequestBlockSyncMessage()), - runStorageService(), + runStorageService({ reason: 'firstRun/initialSync' }), singleProtoJobQueue.add( MessageSender.getRequestContactSyncMessage() ), @@ -3175,7 +3180,7 @@ export async function startApp(): Promise { } case FETCH_LATEST_ENUM.STORAGE_MANIFEST: log.info('onFetchLatestSync: fetching latest manifest'); - StorageService.runStorageServiceSyncJob(); + StorageService.runStorageServiceSyncJob({ reason: 'syncFetchLatest' }); break; case FETCH_LATEST_ENUM.SUBSCRIPTION_STATUS: log.info('onFetchLatestSync: fetching latest subscription status'); @@ -3233,7 +3238,7 @@ export async function startApp(): Promise { } } - await StorageService.runStorageServiceSyncJob(); + await StorageService.runStorageServiceSyncJob({ reason: 'onKeysSync' }); } } diff --git a/ts/groups.ts b/ts/groups.ts index a76b0761a320..be87730f844e 100644 --- a/ts/groups.ts +++ b/ts/groups.ts @@ -2006,7 +2006,7 @@ export async function createGroupV2( ); await conversation.queueJob('storageServiceUploadJob', async () => { - await storageServiceUploadJob(); + await storageServiceUploadJob({ reason: 'createGroupV2' }); }); const timestamp = Date.now(); diff --git a/ts/models/conversations.ts b/ts/models/conversations.ts index a11241657fb1..eb2916a95b99 100644 --- a/ts/models/conversations.ts +++ b/ts/models/conversations.ts @@ -5392,7 +5392,7 @@ export class ConversationModel extends window.Backbone this.set({ needsStorageServiceSync: true }); void this.queueJob('captureChange', async () => { - storageServiceUploadJob(); + storageServiceUploadJob({ reason: `captureChange/${logMessage}` }); }); } diff --git a/ts/services/storage.ts b/ts/services/storage.ts index e1d9663dde29..dc8111096280 100644 --- a/ts/services/storage.ts +++ b/ts/services/storage.ts @@ -1854,15 +1854,20 @@ async function processRemoteRecords( return 0; } -async function sync( - ignoreConflicts = false -): Promise { +async function sync({ + ignoreConflicts = false, + reason, +}: { + ignoreConflicts?: boolean; + reason: string; +}): Promise { if (!window.storage.get('storageKey')) { const masterKeyBase64 = window.storage.get('masterKey'); if (!masterKeyBase64) { - throw new Error( - 'storageService.sync: Cannot start; no storage or master key!' + log.error( + `storageService.sync(${reason}): Cannot start; no storage or master key!` ); + return; } const masterKey = Bytes.fromBase64(masterKeyBase64); @@ -1873,7 +1878,7 @@ async function sync( } log.info( - `storageService.sync: starting... ignoreConflicts=${ignoreConflicts}` + `storageService.sync: starting... ignoreConflicts=${ignoreConflicts}, reason=${reason}` ); let manifest: Proto.ManifestRecord | undefined; @@ -1921,7 +1926,7 @@ async function sync( const hasConflicts = conflictCount !== 0; if (hasConflicts && !ignoreConflicts) { - await upload(true); + await upload({ fromSync: true, reason: `sync/${reason}` }); } // We now know that we've successfully completed a storage service fetch @@ -1943,9 +1948,17 @@ async function sync( return manifest; } -async function upload(fromSync = false): Promise { +async function upload({ + fromSync = false, + reason, +}: { + fromSync?: boolean; + reason: string; +}): Promise { + const logId = `storageService.upload/${reason}`; + if (!window.textsecure.messaging) { - throw new Error('storageService.upload: We are offline!'); + throw new Error(`${logId}: We are offline!`); } // Rate limit uploads coming from syncing @@ -1955,9 +1968,7 @@ async function upload(fromSync = false): Promise { const [firstMostRecentWrite] = uploadBucket; if (isMoreRecentThan(5 * durations.MINUTE, firstMostRecentWrite)) { - throw new Error( - 'storageService.uploadManifest: too many writes too soon.' - ); + throw new Error(`${logId}: too many writes too soon.`); } uploadBucket.shift(); @@ -1967,13 +1978,11 @@ async function upload(fromSync = false): Promise { if (!window.storage.get('storageKey')) { // requesting new keys runs the sync job which will detect the conflict // and re-run the upload job once we're merged and up-to-date. - log.info('storageService.upload: no storageKey, requesting new keys'); + log.info(`${logId}: no storageKey, requesting new keys`); backOff.reset(); if (window.ConversationController.areWePrimaryDevice()) { - log.warn( - 'storageService.upload: We are primary device; not sending key sync request' - ); + log.warn(`${logId}: We are primary device; not sending key sync request`); return; } @@ -1981,7 +1990,7 @@ async function upload(fromSync = false): Promise { await singleProtoJobQueue.add(MessageSender.getRequestKeySyncMessage()); } catch (error) { log.error( - 'storageService.upload: Failed to queue sync message', + `${logId}: Failed to queue sync message`, Errors.toLogFormat(error) ); } @@ -1997,15 +2006,16 @@ async function upload(fromSync = false): Promise { // We are going to upload after this sync so we can ignore any conflicts // that arise during the sync. const ignoreConflicts = true; - previousManifest = await sync(ignoreConflicts); + previousManifest = await sync({ + ignoreConflicts, + reason: `upload/${reason}`, + }); } const localManifestVersion = window.storage.get('manifestVersion', 0); const version = Number(localManifestVersion) + 1; - log.info( - `storageService.upload(${version}): will update to manifest version` - ); + log.info(`${logId}/${version}: will update to manifest version`); try { const generatedManifest = await generateManifest( @@ -2021,17 +2031,14 @@ async function upload(fromSync = false): Promise { } catch (err) { if (err.code === 409) { await sleep(conflictBackOff.getAndIncrement()); - log.info('storageService.upload: pushing sync on the queue'); + log.info(`${logId}: pushing sync on the queue`); // The sync job will check for conflicts and as part of that conflict // check if an item needs sync and doesn't match with the remote record // it'll kick off another upload. setTimeout(runStorageServiceSyncJob); return; } - log.error( - `storageService.upload(${version}): error`, - Errors.toLogFormat(err) - ); + log.error(`${logId}/${version}: error`, Errors.toLogFormat(err)); } } @@ -2140,44 +2147,52 @@ export async function reprocessUnknownFields(): Promise { log.info( `storageService.reprocessUnknownFields(${version}): uploading` ); - await upload(); + await upload({ reason: 'reprocessUnknownFields/hasConflicts' }); } }) ); } -export const storageServiceUploadJob = debounce(() => { - if (!storageServiceEnabled) { - log.info('storageService.storageServiceUploadJob: called before enabled'); - return; - } +export const storageServiceUploadJob = debounce( + ({ reason }: { reason: string }) => { + if (!storageServiceEnabled) { + log.info('storageService.storageServiceUploadJob: called before enabled'); + return; + } - void storageJobQueue( - async () => { - await upload(); - }, - `upload v${window.storage.get('manifestVersion')}` - ); -}, 500); - -export const runStorageServiceSyncJob = debounce(() => { - if (!storageServiceEnabled) { - log.info('storageService.runStorageServiceSyncJob: called before enabled'); - return; - } - - ourProfileKeyService.blockGetWithPromise( - storageJobQueue( + void storageJobQueue( async () => { - await sync(); - - // Notify listeners about sync completion - window.Whisper.events.trigger('storageService:syncComplete'); + await upload({ reason: `storageServiceUploadJob/${reason}` }); }, - `sync v${window.storage.get('manifestVersion')}` - ) - ); -}, 500); + `upload v${window.storage.get('manifestVersion')}` + ); + }, + 500 +); + +export const runStorageServiceSyncJob = debounce( + ({ reason }: { reason: string }) => { + if (!storageServiceEnabled) { + log.info( + 'storageService.runStorageServiceSyncJob: called before enabled' + ); + return; + } + + ourProfileKeyService.blockGetWithPromise( + storageJobQueue( + async () => { + await sync({ reason }); + + // Notify listeners about sync completion + window.Whisper.events.trigger('storageService:syncComplete'); + }, + `sync v${window.storage.get('manifestVersion')}` + ) + ); + }, + 500 +); export const addPendingDelete = (item: ExtendedStorageID): void => { void storageJobQueue( diff --git a/ts/services/username.ts b/ts/services/username.ts index 2138339f6f44..a510d0deae33 100644 --- a/ts/services/username.ts +++ b/ts/services/username.ts @@ -361,7 +361,7 @@ export async function resetLink(username: string): Promise { await window.storage.remove('usernameLinkCorrupted'); me.captureChange('usernameLink'); - storageServiceUploadJob(); + storageServiceUploadJob({ reason: 'resetLink' }); } const USERNAME_LINK_ENTROPY_SIZE = 32; diff --git a/ts/services/usernameIntegrity.ts b/ts/services/usernameIntegrity.ts index d75e753fba4e..3eb9907d050e 100644 --- a/ts/services/usernameIntegrity.ts +++ b/ts/services/usernameIntegrity.ts @@ -141,7 +141,7 @@ class UsernameIntegrityService { `storage service sync (local: ${localValue}, remote: ${remoteValue})` ); - runStorageServiceSyncJob(); + runStorageServiceSyncJob({ reason: 'checkPhoneNumberSharing' }); } // Since we already run on storage service job queue - don't await the diff --git a/ts/state/ducks/calling.ts b/ts/state/ducks/calling.ts index 5e668ed6e2cd..167017c79e06 100644 --- a/ts/state/ducks/calling.ts +++ b/ts/state/ducks/calling.ts @@ -2108,7 +2108,7 @@ function createCallLink( DataWriter.saveCallHistory(callHistory), ]); - storageServiceUploadJob(); + storageServiceUploadJob({ reason: 'createCallLink' }); dispatch({ type: HANDLE_CALL_LINK_UPDATE, @@ -2125,7 +2125,7 @@ function deleteCallLink( ): ThunkAction { return async dispatch => { await DataWriter.beginDeleteCallLink(roomId, { storageNeedsSync: true }); - storageServiceUploadJob(); + storageServiceUploadJob({ reason: 'deleteCallLink' }); // Wait for storage service sync before finalizing delete drop(CallLinkDeleteManager.addJob({ roomId }, { delay: 10000 })); dispatch(handleCallLinkDelete({ roomId })); diff --git a/ts/state/ducks/stickers.ts b/ts/state/ducks/stickers.ts index 5ea75084db6e..6e8ef2c2c8ef 100644 --- a/ts/state/ducks/stickers.ts +++ b/ts/state/ducks/stickers.ts @@ -250,7 +250,7 @@ async function doInstallStickerPack( } if (!fromStorageService && !fromBackup) { - storageServiceUploadJob(); + storageServiceUploadJob({ reason: 'doInstallServicePack' }); } const recentStickers = await getRecentStickers(); @@ -295,7 +295,7 @@ async function doUninstallStickerPack( } if (!fromStorageService) { - storageServiceUploadJob(); + storageServiceUploadJob({ reason: 'doUninstallStickerPack' }); } const recentStickers = await getRecentStickers(); diff --git a/ts/state/ducks/storyDistributionLists.ts b/ts/state/ducks/storyDistributionLists.ts index f662fd75803c..4e002675f023 100644 --- a/ts/state/ducks/storyDistributionLists.ts +++ b/ts/state/ducks/storyDistributionLists.ts @@ -137,7 +137,9 @@ function allowsRepliesChanged( storageNeedsSync: true, }); - storageServiceUploadJob(); + storageServiceUploadJob({ + reason: 'distributionLists/allowsRepliesChanged', + }); log.info( 'storyDistributionLists.allowsRepliesChanged: allowsReplies has changed', @@ -182,7 +184,7 @@ function createDistributionList( } if (storyDistribution.storageNeedsSync) { - storageServiceUploadJob(); + storageServiceUploadJob({ reason: 'createDistributionList' }); } dispatch({ @@ -241,7 +243,7 @@ function deleteDistributionList( listId ); - storageServiceUploadJob(); + storageServiceUploadJob({ reason: 'deleteDistributionList' }); dispatch({ type: DELETE_LIST, @@ -290,7 +292,9 @@ function hideMyStoriesFrom( } ); - storageServiceUploadJob(); + storageServiceUploadJob({ + reason: 'storyDistributionLists/hideMyStoriesFrom', + }); await window.storage.put('hasSetMyStoriesPrivacy', true); @@ -362,7 +366,7 @@ function removeMembersFromDistributionList( } ); - storageServiceUploadJob(); + storageServiceUploadJob({ reason: 'removeMembersFromDistributionList' }); dispatch({ type: MODIFY_LIST, @@ -407,7 +411,7 @@ function setMyStoriesToAllSignalConnections(): ThunkAction< } ); - storageServiceUploadJob(); + storageServiceUploadJob({ reason: 'setMyStoriesToAllSignalConnections' }); } await window.storage.put('hasSetMyStoriesPrivacy', true); @@ -466,7 +470,7 @@ function updateStoryViewers( } ); - storageServiceUploadJob(); + storageServiceUploadJob({ reason: 'updateStoryViewers' }); if (listId === MY_STORY_ID) { await window.storage.put('hasSetMyStoriesPrivacy', true); diff --git a/ts/state/ducks/username.ts b/ts/state/ducks/username.ts index ee26682bb47e..c05d6356d56b 100644 --- a/ts/state/ducks/username.ts +++ b/ts/state/ducks/username.ts @@ -327,7 +327,7 @@ function markCompletedUsernameOnboarding(): ThunkAction< await window.storage.put('hasCompletedUsernameOnboarding', true); const me = window.ConversationController.getOurConversationOrThrow(); me.captureChange('usernameOnboarding'); - storageServiceUploadJob(); + storageServiceUploadJob({ reason: 'markCompletedUsernameOnboarding' }); }; } @@ -349,7 +349,7 @@ function setUsernameLinkColor( await window.storage.put('usernameLinkColor', color); const me = window.ConversationController.getOurConversationOrThrow(); me.captureChange('usernameLinkColor'); - storageServiceUploadJob(); + storageServiceUploadJob({ reason: 'setUsernameLinkColor' }); }; } diff --git a/ts/test-mock/messaging/retries_test.ts b/ts/test-mock/messaging/retries_test.ts new file mode 100644 index 000000000000..ead4e0f06fa3 --- /dev/null +++ b/ts/test-mock/messaging/retries_test.ts @@ -0,0 +1,173 @@ +// Copyright 2023 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; +import createDebug from 'debug'; +import { EnvelopeType, StorageState } from '@signalapp/mock-server'; + +import type { App } from '../playwright'; +import * as durations from '../../util/durations'; +import { Bootstrap } from '../bootstrap'; +import { sleep } from '../../util/sleep'; + +export const debug = createDebug('mock:test:retries'); + +describe('retries', function (this: Mocha.Suite) { + this.timeout(durations.MINUTE); + + let bootstrap: Bootstrap; + let app: App; + + beforeEach(async () => { + bootstrap = new Bootstrap(); + await bootstrap.init(); + app = await bootstrap.link(); + + const { contacts, phone } = bootstrap; + const [first] = contacts; + + let state = StorageState.getEmpty(); + + state = state.addContact(first, { + identityKey: first.publicKey.serialize(), + profileKey: first.profileKey.serialize(), + whitelisted: true, + }); + state = state.pin(first); + + await phone.setStorageState(state); + }); + + afterEach(async function (this: Mocha.Context) { + if (!bootstrap) { + return; + } + + await bootstrap.maybeSaveLogs(this.currentTest, app); + await app.close(); + await bootstrap.teardown(); + }); + + it('sends a retry request on a missing sender key error', async () => { + const { desktop, contacts } = bootstrap; + const [first] = contacts; + + debug('send a sender key message without sending skdm first'); + const distributionId = await first.sendSenderKey(desktop, { + timestamp: bootstrap.getTimestamp(), + skipSkdmSend: true, + }); + + const timestamp = bootstrap.getTimestamp(); + await first.sendText(desktop, 'hello', { + distributionId, + sealed: true, + timestamp, + }); + + debug('waiting for the resend request'); + const message = await first.waitForDecryptionError(); + debug(JSON.stringify(message)); + + assert.equal(message.envelopeType, EnvelopeType.Plaintext); + assert.equal(message.timestamp, timestamp); + }); + + it('does not send a retry request if message succeeded later', async () => { + const { desktop, contacts } = bootstrap; + const [first] = contacts; + + debug('send a sender key message without sending skdm first'); + const firstDistributionId = await first.sendSenderKey(desktop, { + timestamp: bootstrap.getTimestamp(), + skipSkdmSend: true, + }); + + const content = 'how are you?'; + + debug('send a failing message'); + const timestamp = bootstrap.getTimestamp(); + await first.sendText(desktop, content, { + distributionId: firstDistributionId, + sealed: true, + timestamp, + }); + + debug('send second sender key out'); + const secondDistributionId = await first.sendSenderKey(desktop, { + timestamp: bootstrap.getTimestamp(), + }); + + debug('send same hello message, this time it should work'); + await first.sendText(desktop, content, { + distributionId: secondDistributionId, + sealed: true, + timestamp, + }); + + debug('open conversation'); + const window = await app.getWindow(); + const leftPane = window.locator('#LeftPane'); + await leftPane.locator(`[data-testid="${first.device.aci}"]`).click(); + + const conversationStack = window.locator('.Inbox__conversation-stack'); + + debug('verify message receipt'); + await conversationStack + .locator(`.module-message--incoming >> "${content}"`) + .waitFor(); + + debug('verify that no resend request was sent'); + const count = await first.getDecryptionErrorQueueSize(); + assert.equal(count, 0); + }); + + it('sends only one retry request if many failures with same timestamp', async () => { + const { desktop, contacts } = bootstrap; + const [first] = contacts; + + debug('send a sender key message without sending skdm first'); + const firstDistributionId = await first.sendSenderKey(desktop, { + timestamp: bootstrap.getTimestamp(), + skipSkdmSend: true, + }); + + const content = 'how are you?'; + + debug('send a failing message'); + const timestamp = bootstrap.getTimestamp(); + await first.sendText(desktop, content, { + distributionId: firstDistributionId, + sealed: true, + timestamp, + }); + + debug('send a failing message a second time'); + await first.sendText(desktop, content, { + distributionId: firstDistributionId, + sealed: true, + timestamp, + }); + + debug('send a failing message a third time'); + await first.sendText(desktop, content, { + distributionId: firstDistributionId, + sealed: true, + timestamp, + }); + + debug('waiting for the resend request'); + const message = await first.waitForDecryptionError(); + debug(JSON.stringify(message)); + + assert.equal(message.envelopeType, EnvelopeType.Plaintext); + assert.equal(message.timestamp, timestamp); + + debug('wait for max jitter delay'); + await sleep(500); + + debug('verify that no other resend requests were sent'); + const count = await first.getDecryptionErrorQueueSize(); + assert.equal(count, 0); + }); +}); diff --git a/ts/test-mock/messaging/sendSync_test.ts b/ts/test-mock/messaging/sendSync_test.ts index eeb7740d7479..328f0608dcc7 100644 --- a/ts/test-mock/messaging/sendSync_test.ts +++ b/ts/test-mock/messaging/sendSync_test.ts @@ -33,7 +33,7 @@ describe('sendSync', function (this: Mocha.Suite) { await bootstrap.teardown(); }); - it('processes a synd sync in a group', async () => { + it('processes a send sync in a group', async () => { const { contacts, desktop, phone } = bootstrap; const window = await app.getWindow(); diff --git a/ts/test-mock/messaging/sender_key_test.ts b/ts/test-mock/messaging/sender_key_test.ts index 8dfb02fe827f..dbe4d3bcaddd 100644 --- a/ts/test-mock/messaging/sender_key_test.ts +++ b/ts/test-mock/messaging/sender_key_test.ts @@ -71,6 +71,7 @@ describe('senderKey', function (this: Mocha.Suite) { await first.sendText(desktop, 'hello', { timestamp: bootstrap.getTimestamp(), + sealed: true, group, distributionId, }); diff --git a/ts/test-mock/pnp/merge_test.ts b/ts/test-mock/pnp/merge_test.ts index e25b1d78bd8b..576a654b14ba 100644 --- a/ts/test-mock/pnp/merge_test.ts +++ b/ts/test-mock/pnp/merge_test.ts @@ -348,18 +348,18 @@ describe('pnp/merge', function (this: Mocha.Suite) { 'removing both contacts from storage service, adding one combined contact' ); { - const state = await phone.expectStorageState('consistency check'); - await phone.setStorageState( - state.mergeContact(pniContact, { - identityState: Proto.ContactRecord.IdentityState.DEFAULT, - whitelisted: true, - identityKey: pniContact.publicKey.serialize(), - profileKey: pniContact.profileKey.serialize(), - }) - ); + let state = await phone.expectStorageState('consistency check'); + state = state.mergeContact(pniContact, { + identityState: Proto.ContactRecord.IdentityState.DEFAULT, + whitelisted: true, + identityKey: pniContact.publicKey.serialize(), + profileKey: pniContact.profileKey.serialize(), + }); + await phone.setStorageState(state); await phone.sendFetchStorage({ timestamp: bootstrap.getTimestamp(), }); + await app.waitForManifestVersion(state.version); } const window = await app.getWindow(); @@ -393,7 +393,6 @@ describe('pnp/merge', function (this: Mocha.Suite) { const newState = await phone.waitForStorageState({ after: state, }); - const { added, removed } = newState.diff(state); assert.strictEqual(added.length, 2, 'only two records must be added'); assert.strictEqual(removed.length, 1, 'only one record must be removed'); diff --git a/ts/test-mock/storage/drop_test.ts b/ts/test-mock/storage/drop_test.ts index ddca28310d43..c5e44e6a8228 100644 --- a/ts/test-mock/storage/drop_test.ts +++ b/ts/test-mock/storage/drop_test.ts @@ -100,46 +100,44 @@ describe('storage service', function (this: Mocha.Suite) { const { phone } = bootstrap; debug('duplicating account record'); - { - const state = await phone.expectStorageState('consistency check'); + const state = await phone.expectStorageState('consistency check'); - const oldAccount = state.findRecord(({ type }) => { - return type === IdentifierType.ACCOUNT; - }); - if (oldAccount === undefined) { - throw new Error('should have initial account record'); - } - - const updatedState = await phone.setStorageState( - state.addRecord({ - type: IdentifierType.ACCOUNT, - record: oldAccount.record, - }) - ); - - debug('sending fetch storage'); - await phone.sendFetchStorage({ - timestamp: bootstrap.getTimestamp(), - }); - - debug('waiting for next storage state'); - const nextState = await phone.waitForStorageState({ - after: updatedState, - }); - - assert.isFalse( - nextState.hasRecord(({ type, key }) => { - return type === IdentifierType.ACCOUNT && key.equals(oldAccount.key); - }), - 'should not have old account record' - ); - - assert.isTrue( - nextState.hasRecord(({ type }) => { - return type === IdentifierType.ACCOUNT; - }), - 'should have new account record' - ); + const oldAccount = state.findRecord(({ type }) => { + return type === IdentifierType.ACCOUNT; + }); + if (oldAccount === undefined) { + throw new Error('should have initial account record'); } + + const updatedState = await phone.setStorageState( + state.addRecord({ + type: IdentifierType.ACCOUNT, + record: oldAccount.record, + }) + ); + + debug('sending fetch storage'); + await phone.sendFetchStorage({ + timestamp: bootstrap.getTimestamp(), + }); + + debug('waiting for next storage state'); + const nextState = await phone.waitForStorageState({ + after: updatedState, + }); + + assert.isFalse( + nextState.hasRecord(({ type, key }) => { + return type === IdentifierType.ACCOUNT && key.equals(oldAccount.key); + }), + 'should not have old account record' + ); + + assert.isTrue( + nextState.hasRecord(({ type }) => { + return type === IdentifierType.ACCOUNT; + }), + 'should have new account record' + ); }); }); diff --git a/ts/textsecure/MessageReceiver.ts b/ts/textsecure/MessageReceiver.ts index 3820da5565e7..0c24896beca1 100644 --- a/ts/textsecure/MessageReceiver.ts +++ b/ts/textsecure/MessageReceiver.ts @@ -130,6 +130,7 @@ import { SentEvent, StickerPackEvent, StoryRecipientUpdateEvent, + SuccessfulDecryptEvent, TypingEvent, ViewEvent, ViewOnceOpenSyncEvent, @@ -574,6 +575,11 @@ export default class MessageReceiver handler: (ev: DeliveryEvent) => void ): void; + public override addEventListener( + name: 'successful-decrypt', + handler: (ev: SuccessfulDecryptEvent) => void + ): void; + public override addEventListener( name: 'decryption-error', handler: (ev: DecryptionErrorEvent) => void @@ -2021,17 +2027,38 @@ export default class MessageReceiver ciphertext: Uint8Array, serviceIdKind: ServiceIdKind ): Promise { + const uuid = envelope.sourceServiceId; + const deviceId = envelope.sourceDevice; + const envelopeId = getEnvelopeId(envelope); + try { - return await this.innerDecrypt( + const result = await this.innerDecrypt( stores, envelope, ciphertext, serviceIdKind ); - } catch (error) { - const uuid = envelope.sourceServiceId; - const deviceId = envelope.sourceDevice; + if (isAciString(uuid) && isNumber(deviceId)) { + const event = new SuccessfulDecryptEvent( + { + senderDevice: deviceId, + senderAci: uuid, + timestamp: envelope.timestamp, + }, + () => this.removeFromCache(envelope) + ); + drop( + this.addToQueue( + async () => this.dispatchEvent(event), + `decrypted/dispatchEvent/SuccessfulDecryptEvent(${envelopeId})`, + TaskType.Decrypted + ) + ); + } + + return result; + } catch (error) { const ourAci = this.storage.user.getCheckedAci(); const isFromMe = ourAci === uuid; @@ -2069,8 +2096,6 @@ export default class MessageReceiver throw error; } - const envelopeId = getEnvelopeId(envelope); - if (uuid && deviceId) { const senderAci = uuid; if (!isAciString(senderAci)) { diff --git a/ts/textsecure/messageReceiverEvents.ts b/ts/textsecure/messageReceiverEvents.ts index fc433162743e..573f7cffa1c7 100644 --- a/ts/textsecure/messageReceiverEvents.ts +++ b/ts/textsecure/messageReceiverEvents.ts @@ -140,6 +140,21 @@ export class DeliveryEvent extends ConfirmableEvent { } } +export type SuccessfulDecryptEventData = Readonly<{ + senderDevice: number; + senderAci: AciString; + timestamp: number; +}>; + +export class SuccessfulDecryptEvent extends ConfirmableEvent { + constructor( + public readonly data: SuccessfulDecryptEventData, + confirm: ConfirmCallback + ) { + super('successful-decrypt', confirm); + } +} + export type DecryptionErrorEventData = Readonly<{ cipherTextBytes?: Uint8Array; cipherTextType?: number; diff --git a/ts/util/callDisposition.ts b/ts/util/callDisposition.ts index 3b6a9ed1893f..c42d4bf506ed 100644 --- a/ts/util/callDisposition.ts +++ b/ts/util/callDisposition.ts @@ -1342,7 +1342,7 @@ export async function clearCallHistoryDataAndSync( ); const messageIds = await DataWriter.clearCallHistory(latestCall); await DataWriter.beginDeleteAllCallLinks(); - storageServiceUploadJob(); + storageServiceUploadJob({ reason: 'clearCallHistoryDataAndSync' }); // Wait for storage sync before finalizing delete drop(CallLinkDeleteManager.enqueueAllDeletedCallLinks({ delay: 10000 })); updateDeletedMessages(messageIds); diff --git a/ts/util/handleRetry.ts b/ts/util/handleRetry.ts index 6c5eeb90a460..2082227fdf4c 100644 --- a/ts/util/handleRetry.ts +++ b/ts/util/handleRetry.ts @@ -5,7 +5,8 @@ import { DecryptionErrorMessage, PlaintextContent, } from '@signalapp/libsignal-client'; -import { isNumber } from 'lodash'; +import { isNumber, random } from 'lodash'; +import type PQueue from 'p-queue'; import * as Bytes from '../Bytes'; import { DataReader, DataWriter } from '../sql/Client'; @@ -28,6 +29,7 @@ import type { InvalidPlaintextEvent, RetryRequestEvent, RetryRequestEventData, + SuccessfulDecryptEvent, } from '../textsecure/messageReceiverEvents'; import { SignalService as Proto } from '../protobuf'; @@ -37,14 +39,83 @@ import type { StoryDistributionListDataType } from '../state/ducks/storyDistribu import { drop } from './drop'; import { conversationJobQueue } from '../jobs/conversationJobQueue'; import { incrementMessageCounter } from './incrementMessageCounter'; +import { SECOND } from './durations'; +import { sleep } from './sleep'; const RETRY_LIMIT = 5; -// Entrypoints - type RetryKeyType = `${AciString}.${number}:${number}`; const retryRecord = new Map(); +const DELAY_UNIT = window.SignalCI ? 100 : SECOND; + +// Entrypoints + +export function onSuccessfulDecrypt(event: SuccessfulDecryptEvent): void { + const key = getRetryKey(event.data); + unregisterError(key); +} + +export function getOnDecryptionError(getDecryptionErrorQueue: () => PQueue) { + return (event: DecryptionErrorEvent): void => { + const key = getRetryKey(event.decryptionError); + const logId = `decryption-error(${key})`; + if (isErrorRegistered(key)) { + log.warn(`${logId}: key registered before queueing job; dropping.`); + event.confirm(); + return; + } + + const needsDelay = !getDecryptionErrorQueue().isPaused; + + registerError(key); + drop( + getDecryptionErrorQueue().add(async () => { + if (needsDelay) { + const jitter = random(5) * DELAY_UNIT; + const delay = DELAY_UNIT + jitter; + log.warn(`${logId}: delay needed; sleeping for ${delay}ms`); + await sleep(delay); + } + + if (!isErrorRegistered(key)) { + log.warn(`${logId}: key unregistered before job ran; dropping.`); + event.confirm(); + return; + } + try { + await handleDecryptionError(event); + } finally { + unregisterError(key); + } + }) + ); + }; +} + +export function getRetryKey({ + senderAci, + senderDevice, + timestamp, +}: { + senderAci: AciString; + senderDevice: number; + timestamp: number; +}): RetryKeyType { + return `${senderAci}.${senderDevice}:${timestamp}`; +} + +const registeredErrors = new Set(); +export function registerError(key: RetryKeyType): void { + registeredErrors.add(key); +} +export function isErrorRegistered(key: RetryKeyType): boolean { + return registeredErrors.has(key); +} +export function unregisterError(key: RetryKeyType): void { + registeredErrors.delete(key); +} + export function _getRetryRecord(): Map { return retryRecord; } @@ -70,7 +141,11 @@ export async function onRetryRequest(event: RetryRequestEvent): Promise { return; } - const retryKey: RetryKeyType = `${requesterAci}.${requesterDevice}:${sentAt}`; + const retryKey = getRetryKey({ + senderAci: requesterAci, + senderDevice: requesterDevice, + timestamp: sentAt, + }); const retryCount = (retryRecord.get(retryKey) || 0) + 1; retryRecord.set(retryKey, retryCount); if (retryCount > RETRY_LIMIT) { @@ -222,21 +297,21 @@ export function onInvalidPlaintextMessage({ maybeShowDecryptionToast(logId, name, senderDevice); } -export async function onDecryptionError( +export async function handleDecryptionError( event: DecryptionErrorEvent ): Promise { const { confirm, decryptionError } = event; const { senderAci, senderDevice, timestamp } = decryptionError; const logId = `${senderAci}.${senderDevice} ${timestamp}`; - log.info(`onDecryptionError/${logId}: Starting...`); + log.info(`handleDecryptionError/${logId}: Starting...`); - const retryKey: RetryKeyType = `${senderAci}.${senderDevice}:${timestamp}`; + const retryKey = getRetryKey(decryptionError); const retryCount = (retryRecord.get(retryKey) || 0) + 1; retryRecord.set(retryKey, retryCount); if (retryCount > RETRY_LIMIT) { log.warn( - `onDecryptionError/${logId}: retryCount is ${retryCount}; returning early.` + `handleDecryptionError/${logId}: retryCount is ${retryCount}; returning early.` ); confirm(); return; @@ -256,7 +331,7 @@ export async function onDecryptionError( } confirm(); - log.info(`onDecryptionError/${logId}: ...complete`); + log.info(`handleDecryptionError/${logId}: ...complete`); } // Helpers diff --git a/ts/util/markOnboardingStoryAsRead.ts b/ts/util/markOnboardingStoryAsRead.ts index 65fefcefc442..b83d372c9923 100644 --- a/ts/util/markOnboardingStoryAsRead.ts +++ b/ts/util/markOnboardingStoryAsRead.ts @@ -51,7 +51,7 @@ export async function markOnboardingStoryAsRead(): Promise { await window.storage.put('hasViewedOnboardingStory', true); - storageServiceUploadJob(); + storageServiceUploadJob({ reason: 'markOnboardingStoryAsRead' }); return true; }