Deduplicate and cancel unneeded retry requests

This commit is contained in:
Scott Nonnenberg 2024-10-01 08:23:32 +10:00 committed by GitHub
parent d1f130e542
commit b68e731950
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 473 additions and 161 deletions

10
package-lock.json generated
View file

@ -126,7 +126,7 @@
"@indutny/parallel-prettier": "3.0.0",
"@indutny/rezip-electron": "1.3.1",
"@indutny/symbolicate-mac": "2.3.0",
"@signalapp/mock-server": "6.10.0",
"@signalapp/mock-server": "6.11.0",
"@storybook/addon-a11y": "8.1.11",
"@storybook/addon-actions": "8.1.11",
"@storybook/addon-controls": "8.1.11",
@ -7274,12 +7274,13 @@
}
},
"node_modules/@signalapp/mock-server": {
"version": "6.10.0",
"resolved": "https://registry.npmjs.org/@signalapp/mock-server/-/mock-server-6.10.0.tgz",
"integrity": "sha512-StUP0vIKN43T1PDeyIyr7+PBMW/1mxjPOiHFzF7VDKvC53Q2pX84OCd6hsW6m16Tsjrxm5B7gVLIh2e4OYbkdQ==",
"version": "6.11.0",
"resolved": "https://registry.npmjs.org/@signalapp/mock-server/-/mock-server-6.11.0.tgz",
"integrity": "sha512-wIPUtsLcngcum0dkGuJ1YfTuWso7DhH0JZUA5ZS82XlmF2Qz0ZF03AdeeNcQx4XY4mJU1+nIEhTZpB0AFqEM4Q==",
"dev": true,
"license": "AGPL-3.0-only",
"dependencies": {
"@indutny/parallel-prettier": "^3.0.0",
"@signalapp/libsignal-client": "^0.45.0",
"@tus/file-store": "^1.4.0",
"@tus/server": "^1.7.0",
@ -7287,6 +7288,7 @@
"long": "^4.0.0",
"micro": "^9.3.4",
"microrouter": "^3.1.3",
"prettier": "^3.3.3",
"protobufjs": "^7.2.4",
"url-pattern": "^1.0.3",
"uuid": "^8.3.2",

View file

@ -210,7 +210,7 @@
"@indutny/parallel-prettier": "3.0.0",
"@indutny/rezip-electron": "1.3.1",
"@indutny/symbolicate-mac": "2.3.0",
"@signalapp/mock-server": "6.10.0",
"@signalapp/mock-server": "6.11.0",
"@storybook/addon-a11y": "8.1.11",
"@storybook/addon-actions": "8.1.11",
"@storybook/addon-controls": "8.1.11",

View file

@ -1,7 +1,7 @@
// Copyright 2020 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { isNumber, throttle, groupBy } from 'lodash';
import { isNumber, groupBy, throttle } from 'lodash';
import { render } from 'react-dom';
import { batch as batchDispatch } from 'react-redux';
import PQueue from 'p-queue';
@ -77,7 +77,6 @@ import { parseIntOrThrow } from './util/parseIntOrThrow';
import { getProfile } from './util/getProfile';
import type {
ConfigurationEvent,
DecryptionErrorEvent,
DeliveryEvent,
EnvelopeQueuedEvent,
EnvelopeUnsealedEvent,
@ -129,9 +128,10 @@ import { InstallScreenStep } from './types/InstallScreen';
import { getEnvironment } from './environment';
import { SignalService as Proto } from './protobuf';
import {
getOnDecryptionError,
onRetryRequest,
onDecryptionError,
onInvalidPlaintextMessage,
onSuccessfulDecrypt,
} from './util/handleRetry';
import { themeChanged } from './shims/themeChanged';
import { createIPCEvents } from './util/createIPCEvents';
@ -618,11 +618,14 @@ export async function startApp(): Promise<void> {
'error',
queuedEventListener(onError, false)
);
messageReceiver.addEventListener(
'successful-decrypt',
queuedEventListener(onSuccessfulDecrypt)
);
messageReceiver.addEventListener(
'decryption-error',
queuedEventListener((event: DecryptionErrorEvent): void => {
drop(onDecryptionErrorQueue.add(() => onDecryptionError(event)));
})
queuedEventListener(getOnDecryptionError(() => onDecryptionErrorQueue))
);
messageReceiver.addEventListener(
'invalid-plaintext',
@ -1308,7 +1311,7 @@ export async function startApp(): Promise<void> {
remotelyExpired = true;
});
async function runStorageService() {
async function runStorageService({ reason }: { reason: string }) {
if (window.storage.get('backupDownloadPath')) {
log.info(
'background: not running storage service while downloading backup'
@ -1316,7 +1319,9 @@ export async function startApp(): Promise<void> {
return;
}
StorageService.enableStorageService();
StorageService.runStorageServiceSyncJob();
StorageService.runStorageServiceSyncJob({
reason: `runStorageService/${reason}`,
});
}
async function start() {
@ -1429,7 +1434,7 @@ export async function startApp(): Promise<void> {
afterStart();
// Run storage service after linking
drop(runStorageService());
drop(runStorageService({ reason: 'background/registration_done' }));
});
cancelInitializationMessage();
@ -1600,7 +1605,7 @@ export async function startApp(): Promise<void> {
log.warn('downloadBackup: no backup download path, skipping');
backupReady.resolve();
server.registerRequestHandler(messageReceiver);
drop(runStorageService());
drop(runStorageService({ reason: 'downloadBackup/noPath' }));
return;
}
@ -1623,7 +1628,7 @@ export async function startApp(): Promise<void> {
log.info('downloadBackup: processing websocket messages, storage service');
backupReady.resolve();
server.registerRequestHandler(messageReceiver);
drop(runStorageService());
drop(runStorageService({ reason: 'downloadBackup/complete' }));
}
window.getSyncRequest = (timeoutMillis?: number) => {
@ -1790,7 +1795,7 @@ export async function startApp(): Promise<void> {
if (connectCount === 1) {
Stickers.downloadQueuedPacks();
if (!newVersion) {
drop(runStorageService());
drop(runStorageService({ reason: 'connect/connectCount=1' }));
}
}
@ -1808,7 +1813,7 @@ export async function startApp(): Promise<void> {
window.getSyncRequest();
void StorageService.reprocessUnknownFields();
void runStorageService();
void runStorageService({ reason: 'connect/bootAfterUpgrade' });
const manager = window.getAccountManager();
await Promise.all([
@ -1904,7 +1909,7 @@ export async function startApp(): Promise<void> {
MessageSender.getRequestConfigurationSyncMessage()
),
singleProtoJobQueue.add(MessageSender.getRequestBlockSyncMessage()),
runStorageService(),
runStorageService({ reason: 'firstRun/initialSync' }),
singleProtoJobQueue.add(
MessageSender.getRequestContactSyncMessage()
),
@ -3175,7 +3180,7 @@ export async function startApp(): Promise<void> {
}
case FETCH_LATEST_ENUM.STORAGE_MANIFEST:
log.info('onFetchLatestSync: fetching latest manifest');
StorageService.runStorageServiceSyncJob();
StorageService.runStorageServiceSyncJob({ reason: 'syncFetchLatest' });
break;
case FETCH_LATEST_ENUM.SUBSCRIPTION_STATUS:
log.info('onFetchLatestSync: fetching latest subscription status');
@ -3233,7 +3238,7 @@ export async function startApp(): Promise<void> {
}
}
await StorageService.runStorageServiceSyncJob();
await StorageService.runStorageServiceSyncJob({ reason: 'onKeysSync' });
}
}

View file

@ -2006,7 +2006,7 @@ export async function createGroupV2(
);
await conversation.queueJob('storageServiceUploadJob', async () => {
await storageServiceUploadJob();
await storageServiceUploadJob({ reason: 'createGroupV2' });
});
const timestamp = Date.now();

View file

@ -5392,7 +5392,7 @@ export class ConversationModel extends window.Backbone
this.set({ needsStorageServiceSync: true });
void this.queueJob('captureChange', async () => {
storageServiceUploadJob();
storageServiceUploadJob({ reason: `captureChange/${logMessage}` });
});
}

View file

@ -1854,15 +1854,20 @@ async function processRemoteRecords(
return 0;
}
async function sync(
ignoreConflicts = false
): Promise<Proto.ManifestRecord | undefined> {
async function sync({
ignoreConflicts = false,
reason,
}: {
ignoreConflicts?: boolean;
reason: string;
}): Promise<Proto.ManifestRecord | undefined> {
if (!window.storage.get('storageKey')) {
const masterKeyBase64 = window.storage.get('masterKey');
if (!masterKeyBase64) {
throw new Error(
'storageService.sync: Cannot start; no storage or master key!'
log.error(
`storageService.sync(${reason}): Cannot start; no storage or master key!`
);
return;
}
const masterKey = Bytes.fromBase64(masterKeyBase64);
@ -1873,7 +1878,7 @@ async function sync(
}
log.info(
`storageService.sync: starting... ignoreConflicts=${ignoreConflicts}`
`storageService.sync: starting... ignoreConflicts=${ignoreConflicts}, reason=${reason}`
);
let manifest: Proto.ManifestRecord | undefined;
@ -1921,7 +1926,7 @@ async function sync(
const hasConflicts = conflictCount !== 0;
if (hasConflicts && !ignoreConflicts) {
await upload(true);
await upload({ fromSync: true, reason: `sync/${reason}` });
}
// We now know that we've successfully completed a storage service fetch
@ -1943,9 +1948,17 @@ async function sync(
return manifest;
}
async function upload(fromSync = false): Promise<void> {
async function upload({
fromSync = false,
reason,
}: {
fromSync?: boolean;
reason: string;
}): Promise<void> {
const logId = `storageService.upload/${reason}`;
if (!window.textsecure.messaging) {
throw new Error('storageService.upload: We are offline!');
throw new Error(`${logId}: We are offline!`);
}
// Rate limit uploads coming from syncing
@ -1955,9 +1968,7 @@ async function upload(fromSync = false): Promise<void> {
const [firstMostRecentWrite] = uploadBucket;
if (isMoreRecentThan(5 * durations.MINUTE, firstMostRecentWrite)) {
throw new Error(
'storageService.uploadManifest: too many writes too soon.'
);
throw new Error(`${logId}: too many writes too soon.`);
}
uploadBucket.shift();
@ -1967,13 +1978,11 @@ async function upload(fromSync = false): Promise<void> {
if (!window.storage.get('storageKey')) {
// requesting new keys runs the sync job which will detect the conflict
// and re-run the upload job once we're merged and up-to-date.
log.info('storageService.upload: no storageKey, requesting new keys');
log.info(`${logId}: no storageKey, requesting new keys`);
backOff.reset();
if (window.ConversationController.areWePrimaryDevice()) {
log.warn(
'storageService.upload: We are primary device; not sending key sync request'
);
log.warn(`${logId}: We are primary device; not sending key sync request`);
return;
}
@ -1981,7 +1990,7 @@ async function upload(fromSync = false): Promise<void> {
await singleProtoJobQueue.add(MessageSender.getRequestKeySyncMessage());
} catch (error) {
log.error(
'storageService.upload: Failed to queue sync message',
`${logId}: Failed to queue sync message`,
Errors.toLogFormat(error)
);
}
@ -1997,15 +2006,16 @@ async function upload(fromSync = false): Promise<void> {
// We are going to upload after this sync so we can ignore any conflicts
// that arise during the sync.
const ignoreConflicts = true;
previousManifest = await sync(ignoreConflicts);
previousManifest = await sync({
ignoreConflicts,
reason: `upload/${reason}`,
});
}
const localManifestVersion = window.storage.get('manifestVersion', 0);
const version = Number(localManifestVersion) + 1;
log.info(
`storageService.upload(${version}): will update to manifest version`
);
log.info(`${logId}/${version}: will update to manifest version`);
try {
const generatedManifest = await generateManifest(
@ -2021,17 +2031,14 @@ async function upload(fromSync = false): Promise<void> {
} catch (err) {
if (err.code === 409) {
await sleep(conflictBackOff.getAndIncrement());
log.info('storageService.upload: pushing sync on the queue');
log.info(`${logId}: pushing sync on the queue`);
// The sync job will check for conflicts and as part of that conflict
// check if an item needs sync and doesn't match with the remote record
// it'll kick off another upload.
setTimeout(runStorageServiceSyncJob);
return;
}
log.error(
`storageService.upload(${version}): error`,
Errors.toLogFormat(err)
);
log.error(`${logId}/${version}: error`, Errors.toLogFormat(err));
}
}
@ -2140,44 +2147,52 @@ export async function reprocessUnknownFields(): Promise<void> {
log.info(
`storageService.reprocessUnknownFields(${version}): uploading`
);
await upload();
await upload({ reason: 'reprocessUnknownFields/hasConflicts' });
}
})
);
}
export const storageServiceUploadJob = debounce(() => {
if (!storageServiceEnabled) {
log.info('storageService.storageServiceUploadJob: called before enabled');
return;
}
export const storageServiceUploadJob = debounce(
({ reason }: { reason: string }) => {
if (!storageServiceEnabled) {
log.info('storageService.storageServiceUploadJob: called before enabled');
return;
}
void storageJobQueue(
async () => {
await upload();
},
`upload v${window.storage.get('manifestVersion')}`
);
}, 500);
export const runStorageServiceSyncJob = debounce(() => {
if (!storageServiceEnabled) {
log.info('storageService.runStorageServiceSyncJob: called before enabled');
return;
}
ourProfileKeyService.blockGetWithPromise(
storageJobQueue(
void storageJobQueue(
async () => {
await sync();
// Notify listeners about sync completion
window.Whisper.events.trigger('storageService:syncComplete');
await upload({ reason: `storageServiceUploadJob/${reason}` });
},
`sync v${window.storage.get('manifestVersion')}`
)
);
}, 500);
`upload v${window.storage.get('manifestVersion')}`
);
},
500
);
export const runStorageServiceSyncJob = debounce(
({ reason }: { reason: string }) => {
if (!storageServiceEnabled) {
log.info(
'storageService.runStorageServiceSyncJob: called before enabled'
);
return;
}
ourProfileKeyService.blockGetWithPromise(
storageJobQueue(
async () => {
await sync({ reason });
// Notify listeners about sync completion
window.Whisper.events.trigger('storageService:syncComplete');
},
`sync v${window.storage.get('manifestVersion')}`
)
);
},
500
);
export const addPendingDelete = (item: ExtendedStorageID): void => {
void storageJobQueue(

View file

@ -361,7 +361,7 @@ export async function resetLink(username: string): Promise<void> {
await window.storage.remove('usernameLinkCorrupted');
me.captureChange('usernameLink');
storageServiceUploadJob();
storageServiceUploadJob({ reason: 'resetLink' });
}
const USERNAME_LINK_ENTROPY_SIZE = 32;

View file

@ -141,7 +141,7 @@ class UsernameIntegrityService {
`storage service sync (local: ${localValue}, remote: ${remoteValue})`
);
runStorageServiceSyncJob();
runStorageServiceSyncJob({ reason: 'checkPhoneNumberSharing' });
}
// Since we already run on storage service job queue - don't await the

View file

@ -2108,7 +2108,7 @@ function createCallLink(
DataWriter.saveCallHistory(callHistory),
]);
storageServiceUploadJob();
storageServiceUploadJob({ reason: 'createCallLink' });
dispatch({
type: HANDLE_CALL_LINK_UPDATE,
@ -2125,7 +2125,7 @@ function deleteCallLink(
): ThunkAction<void, RootStateType, unknown, HandleCallLinkDeleteActionType> {
return async dispatch => {
await DataWriter.beginDeleteCallLink(roomId, { storageNeedsSync: true });
storageServiceUploadJob();
storageServiceUploadJob({ reason: 'deleteCallLink' });
// Wait for storage service sync before finalizing delete
drop(CallLinkDeleteManager.addJob({ roomId }, { delay: 10000 }));
dispatch(handleCallLinkDelete({ roomId }));

View file

@ -250,7 +250,7 @@ async function doInstallStickerPack(
}
if (!fromStorageService && !fromBackup) {
storageServiceUploadJob();
storageServiceUploadJob({ reason: 'doInstallServicePack' });
}
const recentStickers = await getRecentStickers();
@ -295,7 +295,7 @@ async function doUninstallStickerPack(
}
if (!fromStorageService) {
storageServiceUploadJob();
storageServiceUploadJob({ reason: 'doUninstallStickerPack' });
}
const recentStickers = await getRecentStickers();

View file

@ -137,7 +137,9 @@ function allowsRepliesChanged(
storageNeedsSync: true,
});
storageServiceUploadJob();
storageServiceUploadJob({
reason: 'distributionLists/allowsRepliesChanged',
});
log.info(
'storyDistributionLists.allowsRepliesChanged: allowsReplies has changed',
@ -182,7 +184,7 @@ function createDistributionList(
}
if (storyDistribution.storageNeedsSync) {
storageServiceUploadJob();
storageServiceUploadJob({ reason: 'createDistributionList' });
}
dispatch({
@ -241,7 +243,7 @@ function deleteDistributionList(
listId
);
storageServiceUploadJob();
storageServiceUploadJob({ reason: 'deleteDistributionList' });
dispatch({
type: DELETE_LIST,
@ -290,7 +292,9 @@ function hideMyStoriesFrom(
}
);
storageServiceUploadJob();
storageServiceUploadJob({
reason: 'storyDistributionLists/hideMyStoriesFrom',
});
await window.storage.put('hasSetMyStoriesPrivacy', true);
@ -362,7 +366,7 @@ function removeMembersFromDistributionList(
}
);
storageServiceUploadJob();
storageServiceUploadJob({ reason: 'removeMembersFromDistributionList' });
dispatch({
type: MODIFY_LIST,
@ -407,7 +411,7 @@ function setMyStoriesToAllSignalConnections(): ThunkAction<
}
);
storageServiceUploadJob();
storageServiceUploadJob({ reason: 'setMyStoriesToAllSignalConnections' });
}
await window.storage.put('hasSetMyStoriesPrivacy', true);
@ -466,7 +470,7 @@ function updateStoryViewers(
}
);
storageServiceUploadJob();
storageServiceUploadJob({ reason: 'updateStoryViewers' });
if (listId === MY_STORY_ID) {
await window.storage.put('hasSetMyStoriesPrivacy', true);

View file

@ -327,7 +327,7 @@ function markCompletedUsernameOnboarding(): ThunkAction<
await window.storage.put('hasCompletedUsernameOnboarding', true);
const me = window.ConversationController.getOurConversationOrThrow();
me.captureChange('usernameOnboarding');
storageServiceUploadJob();
storageServiceUploadJob({ reason: 'markCompletedUsernameOnboarding' });
};
}
@ -349,7 +349,7 @@ function setUsernameLinkColor(
await window.storage.put('usernameLinkColor', color);
const me = window.ConversationController.getOurConversationOrThrow();
me.captureChange('usernameLinkColor');
storageServiceUploadJob();
storageServiceUploadJob({ reason: 'setUsernameLinkColor' });
};
}

View file

@ -0,0 +1,173 @@
// Copyright 2023 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { assert } from 'chai';
import createDebug from 'debug';
import { EnvelopeType, StorageState } from '@signalapp/mock-server';
import type { App } from '../playwright';
import * as durations from '../../util/durations';
import { Bootstrap } from '../bootstrap';
import { sleep } from '../../util/sleep';
export const debug = createDebug('mock:test:retries');
describe('retries', function (this: Mocha.Suite) {
this.timeout(durations.MINUTE);
let bootstrap: Bootstrap;
let app: App;
beforeEach(async () => {
bootstrap = new Bootstrap();
await bootstrap.init();
app = await bootstrap.link();
const { contacts, phone } = bootstrap;
const [first] = contacts;
let state = StorageState.getEmpty();
state = state.addContact(first, {
identityKey: first.publicKey.serialize(),
profileKey: first.profileKey.serialize(),
whitelisted: true,
});
state = state.pin(first);
await phone.setStorageState(state);
});
afterEach(async function (this: Mocha.Context) {
if (!bootstrap) {
return;
}
await bootstrap.maybeSaveLogs(this.currentTest, app);
await app.close();
await bootstrap.teardown();
});
it('sends a retry request on a missing sender key error', async () => {
const { desktop, contacts } = bootstrap;
const [first] = contacts;
debug('send a sender key message without sending skdm first');
const distributionId = await first.sendSenderKey(desktop, {
timestamp: bootstrap.getTimestamp(),
skipSkdmSend: true,
});
const timestamp = bootstrap.getTimestamp();
await first.sendText(desktop, 'hello', {
distributionId,
sealed: true,
timestamp,
});
debug('waiting for the resend request');
const message = await first.waitForDecryptionError();
debug(JSON.stringify(message));
assert.equal(message.envelopeType, EnvelopeType.Plaintext);
assert.equal(message.timestamp, timestamp);
});
it('does not send a retry request if message succeeded later', async () => {
const { desktop, contacts } = bootstrap;
const [first] = contacts;
debug('send a sender key message without sending skdm first');
const firstDistributionId = await first.sendSenderKey(desktop, {
timestamp: bootstrap.getTimestamp(),
skipSkdmSend: true,
});
const content = 'how are you?';
debug('send a failing message');
const timestamp = bootstrap.getTimestamp();
await first.sendText(desktop, content, {
distributionId: firstDistributionId,
sealed: true,
timestamp,
});
debug('send second sender key out');
const secondDistributionId = await first.sendSenderKey(desktop, {
timestamp: bootstrap.getTimestamp(),
});
debug('send same hello message, this time it should work');
await first.sendText(desktop, content, {
distributionId: secondDistributionId,
sealed: true,
timestamp,
});
debug('open conversation');
const window = await app.getWindow();
const leftPane = window.locator('#LeftPane');
await leftPane.locator(`[data-testid="${first.device.aci}"]`).click();
const conversationStack = window.locator('.Inbox__conversation-stack');
debug('verify message receipt');
await conversationStack
.locator(`.module-message--incoming >> "${content}"`)
.waitFor();
debug('verify that no resend request was sent');
const count = await first.getDecryptionErrorQueueSize();
assert.equal(count, 0);
});
it('sends only one retry request if many failures with same timestamp', async () => {
const { desktop, contacts } = bootstrap;
const [first] = contacts;
debug('send a sender key message without sending skdm first');
const firstDistributionId = await first.sendSenderKey(desktop, {
timestamp: bootstrap.getTimestamp(),
skipSkdmSend: true,
});
const content = 'how are you?';
debug('send a failing message');
const timestamp = bootstrap.getTimestamp();
await first.sendText(desktop, content, {
distributionId: firstDistributionId,
sealed: true,
timestamp,
});
debug('send a failing message a second time');
await first.sendText(desktop, content, {
distributionId: firstDistributionId,
sealed: true,
timestamp,
});
debug('send a failing message a third time');
await first.sendText(desktop, content, {
distributionId: firstDistributionId,
sealed: true,
timestamp,
});
debug('waiting for the resend request');
const message = await first.waitForDecryptionError();
debug(JSON.stringify(message));
assert.equal(message.envelopeType, EnvelopeType.Plaintext);
assert.equal(message.timestamp, timestamp);
debug('wait for max jitter delay');
await sleep(500);
debug('verify that no other resend requests were sent');
const count = await first.getDecryptionErrorQueueSize();
assert.equal(count, 0);
});
});

View file

@ -33,7 +33,7 @@ describe('sendSync', function (this: Mocha.Suite) {
await bootstrap.teardown();
});
it('processes a synd sync in a group', async () => {
it('processes a send sync in a group', async () => {
const { contacts, desktop, phone } = bootstrap;
const window = await app.getWindow();

View file

@ -71,6 +71,7 @@ describe('senderKey', function (this: Mocha.Suite) {
await first.sendText(desktop, 'hello', {
timestamp: bootstrap.getTimestamp(),
sealed: true,
group,
distributionId,
});

View file

@ -348,18 +348,18 @@ describe('pnp/merge', function (this: Mocha.Suite) {
'removing both contacts from storage service, adding one combined contact'
);
{
const state = await phone.expectStorageState('consistency check');
await phone.setStorageState(
state.mergeContact(pniContact, {
identityState: Proto.ContactRecord.IdentityState.DEFAULT,
whitelisted: true,
identityKey: pniContact.publicKey.serialize(),
profileKey: pniContact.profileKey.serialize(),
})
);
let state = await phone.expectStorageState('consistency check');
state = state.mergeContact(pniContact, {
identityState: Proto.ContactRecord.IdentityState.DEFAULT,
whitelisted: true,
identityKey: pniContact.publicKey.serialize(),
profileKey: pniContact.profileKey.serialize(),
});
await phone.setStorageState(state);
await phone.sendFetchStorage({
timestamp: bootstrap.getTimestamp(),
});
await app.waitForManifestVersion(state.version);
}
const window = await app.getWindow();
@ -393,7 +393,6 @@ describe('pnp/merge', function (this: Mocha.Suite) {
const newState = await phone.waitForStorageState({
after: state,
});
const { added, removed } = newState.diff(state);
assert.strictEqual(added.length, 2, 'only two records must be added');
assert.strictEqual(removed.length, 1, 'only one record must be removed');

View file

@ -100,46 +100,44 @@ describe('storage service', function (this: Mocha.Suite) {
const { phone } = bootstrap;
debug('duplicating account record');
{
const state = await phone.expectStorageState('consistency check');
const state = await phone.expectStorageState('consistency check');
const oldAccount = state.findRecord(({ type }) => {
return type === IdentifierType.ACCOUNT;
});
if (oldAccount === undefined) {
throw new Error('should have initial account record');
}
const updatedState = await phone.setStorageState(
state.addRecord({
type: IdentifierType.ACCOUNT,
record: oldAccount.record,
})
);
debug('sending fetch storage');
await phone.sendFetchStorage({
timestamp: bootstrap.getTimestamp(),
});
debug('waiting for next storage state');
const nextState = await phone.waitForStorageState({
after: updatedState,
});
assert.isFalse(
nextState.hasRecord(({ type, key }) => {
return type === IdentifierType.ACCOUNT && key.equals(oldAccount.key);
}),
'should not have old account record'
);
assert.isTrue(
nextState.hasRecord(({ type }) => {
return type === IdentifierType.ACCOUNT;
}),
'should have new account record'
);
const oldAccount = state.findRecord(({ type }) => {
return type === IdentifierType.ACCOUNT;
});
if (oldAccount === undefined) {
throw new Error('should have initial account record');
}
const updatedState = await phone.setStorageState(
state.addRecord({
type: IdentifierType.ACCOUNT,
record: oldAccount.record,
})
);
debug('sending fetch storage');
await phone.sendFetchStorage({
timestamp: bootstrap.getTimestamp(),
});
debug('waiting for next storage state');
const nextState = await phone.waitForStorageState({
after: updatedState,
});
assert.isFalse(
nextState.hasRecord(({ type, key }) => {
return type === IdentifierType.ACCOUNT && key.equals(oldAccount.key);
}),
'should not have old account record'
);
assert.isTrue(
nextState.hasRecord(({ type }) => {
return type === IdentifierType.ACCOUNT;
}),
'should have new account record'
);
});
});

View file

@ -130,6 +130,7 @@ import {
SentEvent,
StickerPackEvent,
StoryRecipientUpdateEvent,
SuccessfulDecryptEvent,
TypingEvent,
ViewEvent,
ViewOnceOpenSyncEvent,
@ -574,6 +575,11 @@ export default class MessageReceiver
handler: (ev: DeliveryEvent) => void
): void;
public override addEventListener(
name: 'successful-decrypt',
handler: (ev: SuccessfulDecryptEvent) => void
): void;
public override addEventListener(
name: 'decryption-error',
handler: (ev: DecryptionErrorEvent) => void
@ -2021,17 +2027,38 @@ export default class MessageReceiver
ciphertext: Uint8Array,
serviceIdKind: ServiceIdKind
): Promise<InnerDecryptResultType | undefined> {
const uuid = envelope.sourceServiceId;
const deviceId = envelope.sourceDevice;
const envelopeId = getEnvelopeId(envelope);
try {
return await this.innerDecrypt(
const result = await this.innerDecrypt(
stores,
envelope,
ciphertext,
serviceIdKind
);
} catch (error) {
const uuid = envelope.sourceServiceId;
const deviceId = envelope.sourceDevice;
if (isAciString(uuid) && isNumber(deviceId)) {
const event = new SuccessfulDecryptEvent(
{
senderDevice: deviceId,
senderAci: uuid,
timestamp: envelope.timestamp,
},
() => this.removeFromCache(envelope)
);
drop(
this.addToQueue(
async () => this.dispatchEvent(event),
`decrypted/dispatchEvent/SuccessfulDecryptEvent(${envelopeId})`,
TaskType.Decrypted
)
);
}
return result;
} catch (error) {
const ourAci = this.storage.user.getCheckedAci();
const isFromMe = ourAci === uuid;
@ -2069,8 +2096,6 @@ export default class MessageReceiver
throw error;
}
const envelopeId = getEnvelopeId(envelope);
if (uuid && deviceId) {
const senderAci = uuid;
if (!isAciString(senderAci)) {

View file

@ -140,6 +140,21 @@ export class DeliveryEvent extends ConfirmableEvent {
}
}
export type SuccessfulDecryptEventData = Readonly<{
senderDevice: number;
senderAci: AciString;
timestamp: number;
}>;
export class SuccessfulDecryptEvent extends ConfirmableEvent {
constructor(
public readonly data: SuccessfulDecryptEventData,
confirm: ConfirmCallback
) {
super('successful-decrypt', confirm);
}
}
export type DecryptionErrorEventData = Readonly<{
cipherTextBytes?: Uint8Array;
cipherTextType?: number;

View file

@ -1342,7 +1342,7 @@ export async function clearCallHistoryDataAndSync(
);
const messageIds = await DataWriter.clearCallHistory(latestCall);
await DataWriter.beginDeleteAllCallLinks();
storageServiceUploadJob();
storageServiceUploadJob({ reason: 'clearCallHistoryDataAndSync' });
// Wait for storage sync before finalizing delete
drop(CallLinkDeleteManager.enqueueAllDeletedCallLinks({ delay: 10000 }));
updateDeletedMessages(messageIds);

View file

@ -5,7 +5,8 @@ import {
DecryptionErrorMessage,
PlaintextContent,
} from '@signalapp/libsignal-client';
import { isNumber } from 'lodash';
import { isNumber, random } from 'lodash';
import type PQueue from 'p-queue';
import * as Bytes from '../Bytes';
import { DataReader, DataWriter } from '../sql/Client';
@ -28,6 +29,7 @@ import type {
InvalidPlaintextEvent,
RetryRequestEvent,
RetryRequestEventData,
SuccessfulDecryptEvent,
} from '../textsecure/messageReceiverEvents';
import { SignalService as Proto } from '../protobuf';
@ -37,14 +39,83 @@ import type { StoryDistributionListDataType } from '../state/ducks/storyDistribu
import { drop } from './drop';
import { conversationJobQueue } from '../jobs/conversationJobQueue';
import { incrementMessageCounter } from './incrementMessageCounter';
import { SECOND } from './durations';
import { sleep } from './sleep';
const RETRY_LIMIT = 5;
// Entrypoints
type RetryKeyType = `${AciString}.${number}:${number}`;
const retryRecord = new Map<RetryKeyType, number>();
const DELAY_UNIT = window.SignalCI ? 100 : SECOND;
// Entrypoints
export function onSuccessfulDecrypt(event: SuccessfulDecryptEvent): void {
const key = getRetryKey(event.data);
unregisterError(key);
}
export function getOnDecryptionError(getDecryptionErrorQueue: () => PQueue) {
return (event: DecryptionErrorEvent): void => {
const key = getRetryKey(event.decryptionError);
const logId = `decryption-error(${key})`;
if (isErrorRegistered(key)) {
log.warn(`${logId}: key registered before queueing job; dropping.`);
event.confirm();
return;
}
const needsDelay = !getDecryptionErrorQueue().isPaused;
registerError(key);
drop(
getDecryptionErrorQueue().add(async () => {
if (needsDelay) {
const jitter = random(5) * DELAY_UNIT;
const delay = DELAY_UNIT + jitter;
log.warn(`${logId}: delay needed; sleeping for ${delay}ms`);
await sleep(delay);
}
if (!isErrorRegistered(key)) {
log.warn(`${logId}: key unregistered before job ran; dropping.`);
event.confirm();
return;
}
try {
await handleDecryptionError(event);
} finally {
unregisterError(key);
}
})
);
};
}
export function getRetryKey({
senderAci,
senderDevice,
timestamp,
}: {
senderAci: AciString;
senderDevice: number;
timestamp: number;
}): RetryKeyType {
return `${senderAci}.${senderDevice}:${timestamp}`;
}
const registeredErrors = new Set<RetryKeyType>();
export function registerError(key: RetryKeyType): void {
registeredErrors.add(key);
}
export function isErrorRegistered(key: RetryKeyType): boolean {
return registeredErrors.has(key);
}
export function unregisterError(key: RetryKeyType): void {
registeredErrors.delete(key);
}
export function _getRetryRecord(): Map<string, number> {
return retryRecord;
}
@ -70,7 +141,11 @@ export async function onRetryRequest(event: RetryRequestEvent): Promise<void> {
return;
}
const retryKey: RetryKeyType = `${requesterAci}.${requesterDevice}:${sentAt}`;
const retryKey = getRetryKey({
senderAci: requesterAci,
senderDevice: requesterDevice,
timestamp: sentAt,
});
const retryCount = (retryRecord.get(retryKey) || 0) + 1;
retryRecord.set(retryKey, retryCount);
if (retryCount > RETRY_LIMIT) {
@ -222,21 +297,21 @@ export function onInvalidPlaintextMessage({
maybeShowDecryptionToast(logId, name, senderDevice);
}
export async function onDecryptionError(
export async function handleDecryptionError(
event: DecryptionErrorEvent
): Promise<void> {
const { confirm, decryptionError } = event;
const { senderAci, senderDevice, timestamp } = decryptionError;
const logId = `${senderAci}.${senderDevice} ${timestamp}`;
log.info(`onDecryptionError/${logId}: Starting...`);
log.info(`handleDecryptionError/${logId}: Starting...`);
const retryKey: RetryKeyType = `${senderAci}.${senderDevice}:${timestamp}`;
const retryKey = getRetryKey(decryptionError);
const retryCount = (retryRecord.get(retryKey) || 0) + 1;
retryRecord.set(retryKey, retryCount);
if (retryCount > RETRY_LIMIT) {
log.warn(
`onDecryptionError/${logId}: retryCount is ${retryCount}; returning early.`
`handleDecryptionError/${logId}: retryCount is ${retryCount}; returning early.`
);
confirm();
return;
@ -256,7 +331,7 @@ export async function onDecryptionError(
}
confirm();
log.info(`onDecryptionError/${logId}: ...complete`);
log.info(`handleDecryptionError/${logId}: ...complete`);
}
// Helpers

View file

@ -51,7 +51,7 @@ export async function markOnboardingStoryAsRead(): Promise<boolean> {
await window.storage.put('hasViewedOnboardingStory', true);
storageServiceUploadJob();
storageServiceUploadJob({ reason: 'markOnboardingStoryAsRead' });
return true;
}