From 90356d4c0f35077f7f41688fb27e7d123b45e9dd Mon Sep 17 00:00:00 2001 From: Scott Nonnenberg Date: Fri, 14 Jan 2022 13:34:52 -0800 Subject: [PATCH] Move a number of sync messages to jobs for retry --- _locales/en/messages.json | 2 +- ts/SignalProtocolStore.ts | 31 +- ts/background.ts | 61 ++-- ts/jobs/helpers/getHttpErrorCode.ts | 4 +- ts/jobs/helpers/handleMultipleSendErrors.ts | 4 +- ts/jobs/initializeAllJobQueues.ts | 4 +- ts/jobs/singleProtoJobQueue.ts | 113 +++++++ ts/models/conversations.ts | 68 ++--- ts/services/storage.ts | 57 ++-- ts/services/writeProfile.ts | 20 +- ts/services/writeUsername.ts | 20 +- ts/shims/textsecure.ts | 34 +-- ts/sql/Server.ts | 1 - .../helpers/handleMultipleSendErrors_test.ts | 13 +- ts/textsecure/OutgoingMessage.ts | 12 +- ts/textsecure/SendMessage.ts | 284 +++++++++--------- ts/textsecure/SyncRequest.ts | 57 ++-- ts/util/handleMessageSend.ts | 66 ++-- ts/util/handleRetry.ts | 23 +- 19 files changed, 501 insertions(+), 373 deletions(-) create mode 100644 ts/jobs/singleProtoJobQueue.ts diff --git a/_locales/en/messages.json b/_locales/en/messages.json index e90d851e5e..50a7c0d4aa 100644 --- a/_locales/en/messages.json +++ b/_locales/en/messages.json @@ -6630,7 +6630,7 @@ "description": "A title of the dialog displayed when starting an application after a recent crash" }, "CrashReportDialog__body": { - "message": "Signal restarted after a crash. You can submit a crash a report to help Signal investigate the issue.", + "message": "Signal restarted after a crash. You can submit a crash report to help Signal investigate the issue.", "description": "The body of the dialog displayed when starting an application after a recent crash" }, "CrashReportDialog__submit": { diff --git a/ts/SignalProtocolStore.ts b/ts/SignalProtocolStore.ts index 3ac3fd6541..c9c507b79e 100644 --- a/ts/SignalProtocolStore.ts +++ b/ts/SignalProtocolStore.ts @@ -1,4 +1,4 @@ -// Copyright 2016-2021 Signal Messenger, LLC +// Copyright 2016-2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only import PQueue from 'p-queue'; @@ -18,7 +18,6 @@ import { import * as Bytes from './Bytes'; import { constantTimeEqual } from './Crypto'; import { assert, strictAssert } from './util/assert'; -import { handleMessageSend } from './util/handleMessageSend'; import { isNotNil } from './util/isNotNil'; import { Zone } from './util/Zone'; import { isMoreRecentThan } from './util/timestamp'; @@ -44,7 +43,6 @@ import type { UnprocessedType, UnprocessedUpdateType, } from './textsecure/Types.d'; -import { getSendOptions } from './util/getSendOptions'; import type { RemoveAllConfiguration } from './types/RemoveAllConfiguration'; import type { UUIDStringType } from './types/UUID'; import { UUID } from './types/UUID'; @@ -52,6 +50,8 @@ import type { Address } from './types/Address'; import type { QualifiedAddressStringType } from './types/QualifiedAddress'; import { QualifiedAddress } from './types/QualifiedAddress'; import * as log from './logging/log'; +import { singleProtoJobQueue } from './jobs/singleProtoJobQueue'; +import * as Errors from './types/errors'; const TIMESTAMP_THRESHOLD = 5 * 1000; // 5 seconds @@ -1384,29 +1384,22 @@ export class SignalProtocolStore extends EventsMixin { // Archive open session with this device await this.archiveSession(qualifiedAddress); - // Send a null message with newly-created session - const sendOptions = await getSendOptions(conversation.attributes); - const result = await handleMessageSend( - window.textsecure.messaging.sendNullMessage( - { - uuid: uuid.toString(), - }, - sendOptions - ), - { messageIds: [], sendType: 'nullMessage' } + // Enqueue a null message with newly-created session + await singleProtoJobQueue.add( + window.textsecure.messaging.getNullMessage({ + uuid: uuid.toString(), + }) ); - - if (result && result.errors && result.errors.length) { - throw result.errors[0]; - } } catch (error) { // If we failed to do the session reset, then we'll allow another attempt sooner // than one hour from now. delete sessionResets[id]; window.storage.put('sessionResets', sessionResets); - const errorString = error && error.stack ? error.stack : error; - log.error(`lightSessionReset/${id}: Encountered error`, errorString); + log.error( + `lightSessionReset/${id}: Encountered error`, + Errors.toLogFormat(error) + ); } } diff --git a/ts/background.ts b/ts/background.ts index bb44bc64ba..f987a8ac40 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -1,4 +1,4 @@ -// Copyright 2020-2021 Signal Messenger, LLC +// Copyright 2020-2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only import { webFrame } from 'electron'; @@ -89,9 +89,7 @@ import type { WebAPIType } from './textsecure/WebAPI'; import * as KeyChangeListener from './textsecure/KeyChangeListener'; import { RotateSignedPreKeyListener } from './textsecure/RotateSignedPreKeyListener'; import { isDirectConversation, isGroupV2 } from './util/whatTypeOfConversation'; -import { getSendOptions } from './util/getSendOptions'; import { BackOff, FIBONACCI_TIMEOUTS } from './util/BackOff'; -import { handleMessageSend } from './util/handleMessageSend'; import { AppViewType } from './state/ducks/app'; import { UsernameSaveState } from './state/ducks/conversationsEnums'; import type { BadgesStateType } from './state/ducks/badges'; @@ -144,6 +142,7 @@ import { startInteractionMode } from './windows/startInteractionMode'; import { deliveryReceiptsJobQueue } from './jobs/deliveryReceiptsJobQueue'; import { updateOurUsername } from './util/updateOurUsername'; import { ReactionSource } from './reactions/ReactionSource'; +import { singleProtoJobQueue } from './jobs/singleProtoJobQueue'; const MAX_ATTACHMENT_DOWNLOAD_AGE = 3600 * 72 * 1000; @@ -1607,7 +1606,7 @@ export async function startApp(): Promise { unlinkAndDisconnect(RemoveAllConfiguration.Full); }); - function runStorageService() { + async function runStorageService() { window.Signal.Services.enableStorageService(); if (window.ConversationController.areWePrimaryDevice()) { @@ -1617,10 +1616,16 @@ export async function startApp(): Promise { return; } - handleMessageSend(window.textsecure.messaging.sendRequestKeySyncMessage(), { - messageIds: [], - sendType: 'otherSync', - }); + try { + await singleProtoJobQueue.add( + window.textsecure.messaging.getRequestKeySyncMessage() + ); + } catch (error) { + log.error( + 'runStorageService: Failed to queue sync message', + Errors.toLogFormat(error) + ); + } } let challengeHandler: ChallengeHandler | undefined; @@ -1865,10 +1870,16 @@ export async function startApp(): Promise { return; } - await handleMessageSend( - window.textsecure.messaging.sendRequestKeySyncMessage(), - { messageIds: [], sendType: 'otherSync' } - ); + try { + await singleProtoJobQueue.add( + window.textsecure.messaging.getRequestKeySyncMessage() + ); + } catch (error) { + log.error( + 'desktop.storage/onChange: Failed to queue sync message', + Errors.toLogFormat(error) + ); + } } ); @@ -2196,12 +2207,6 @@ export async function startApp(): Promise { runStorageService(); }); - const ourConversation = - window.ConversationController.getOurConversationOrThrow(); - const sendOptions = await getSendOptions(ourConversation.attributes, { - syncMessage: true, - }); - const installedStickerPacks = Stickers.getInstalledStickerPacks(); if (installedStickerPacks.length) { const operations = installedStickerPacks.map(pack => ({ @@ -2217,18 +2222,16 @@ export async function startApp(): Promise { return; } - handleMessageSend( - window.textsecure.messaging.sendStickerPackSync( - operations, - sendOptions - ), - { messageIds: [], sendType: 'otherSync' } - ).catch(error => { - log.error( - 'Failed to send installed sticker packs via sync message', - error && error.stack ? error.stack : error + try { + await singleProtoJobQueue.add( + window.textsecure.messaging.getStickerPackSync(operations) ); - }); + } catch (error) { + log.error( + 'connect: Failed to queue sticker sync message', + Errors.toLogFormat(error) + ); + } } } diff --git a/ts/jobs/helpers/getHttpErrorCode.ts b/ts/jobs/helpers/getHttpErrorCode.ts index f69245560d..1ff23dfa82 100644 --- a/ts/jobs/helpers/getHttpErrorCode.ts +++ b/ts/jobs/helpers/getHttpErrorCode.ts @@ -1,4 +1,4 @@ -// Copyright 2021 Signal Messenger, LLC +// Copyright 2021-2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only import { isRecord } from '../../util/isRecord'; @@ -13,11 +13,13 @@ export function getHttpErrorCode(maybeError: unknown): number { return -1; } + // This might be a textsecure/Errors/HTTPError const maybeTopLevelCode = parseIntWithFallback(maybeError.code, -1); if (maybeTopLevelCode !== -1) { return maybeTopLevelCode; } + // Various errors in textsecure/Errors have a nested httpError property const { httpError } = maybeError; if (!isRecord(httpError)) { return -1; diff --git a/ts/jobs/helpers/handleMultipleSendErrors.ts b/ts/jobs/helpers/handleMultipleSendErrors.ts index 5e9d8473f9..831b9f0651 100644 --- a/ts/jobs/helpers/handleMultipleSendErrors.ts +++ b/ts/jobs/helpers/handleMultipleSendErrors.ts @@ -18,7 +18,7 @@ export async function handleMultipleSendErrors({ errors: ReadonlyArray; isFinalAttempt: boolean; log: Pick; - markFailed: (() => void) | (() => Promise); + markFailed?: (() => void) | (() => Promise); timeRemaining: number; }>): Promise { strictAssert(errors.length, 'Expected at least one error'); @@ -50,7 +50,7 @@ export async function handleMultipleSendErrors({ ); if (isFinalAttempt || serverAskedUsToStop) { - await markFailed(); + await markFailed?.(); } if (serverAskedUsToStop) { diff --git a/ts/jobs/initializeAllJobQueues.ts b/ts/jobs/initializeAllJobQueues.ts index cb48d5b76e..7b13c0eec7 100644 --- a/ts/jobs/initializeAllJobQueues.ts +++ b/ts/jobs/initializeAllJobQueues.ts @@ -1,4 +1,4 @@ -// Copyright 2021 Signal Messenger, LLC +// Copyright 2021-2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only import type { WebAPIType } from '../textsecure/WebAPI'; @@ -10,6 +10,7 @@ import { readReceiptsJobQueue } from './readReceiptsJobQueue'; import { readSyncJobQueue } from './readSyncJobQueue'; import { removeStorageKeyJobQueue } from './removeStorageKeyJobQueue'; import { reportSpamJobQueue } from './reportSpamJobQueue'; +import { singleProtoJobQueue } from './singleProtoJobQueue'; import { viewSyncJobQueue } from './viewSyncJobQueue'; import { viewedReceiptsJobQueue } from './viewedReceiptsJobQueue'; @@ -30,6 +31,7 @@ export function initializeAllJobQueues({ readSyncJobQueue.streamJobs(); removeStorageKeyJobQueue.streamJobs(); reportSpamJobQueue.streamJobs(); + singleProtoJobQueue.streamJobs(); viewSyncJobQueue.streamJobs(); viewedReceiptsJobQueue.streamJobs(); } diff --git a/ts/jobs/singleProtoJobQueue.ts b/ts/jobs/singleProtoJobQueue.ts new file mode 100644 index 0000000000..fc099a126a --- /dev/null +++ b/ts/jobs/singleProtoJobQueue.ts @@ -0,0 +1,113 @@ +// Copyright 2022 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import PQueue from 'p-queue'; + +import * as Bytes from '../Bytes'; +import type { LoggerType } from '../types/Logging'; +import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff'; +import type { ParsedJob } from './types'; +import { JobQueue } from './JobQueue'; +import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; +import { DAY } from '../util/durations'; +import { commonShouldJobContinue } from './helpers/commonShouldJobContinue'; +import { SignalService as Proto } from '../protobuf'; +import { handleMessageSend } from '../util/handleMessageSend'; +import { getSendOptions } from '../util/getSendOptions'; +import type { SingleProtoJobData } from '../textsecure/SendMessage'; +import { singleProtoJobDataSchema } from '../textsecure/SendMessage'; +import { handleMultipleSendErrors } from './helpers/handleMultipleSendErrors'; +import { SendMessageProtoError } from '../textsecure/Errors'; + +const MAX_RETRY_TIME = DAY; +const MAX_PARALLEL_JOBS = 5; +const MAX_ATTEMPTS = exponentialBackoffMaxAttempts(MAX_RETRY_TIME); + +export class SingleProtoJobQueue extends JobQueue { + private parallelQueue = new PQueue({ concurrency: MAX_PARALLEL_JOBS }); + + protected override getInMemoryQueue( + _parsedJob: ParsedJob + ): PQueue { + return this.parallelQueue; + } + + protected parseData(data: unknown): SingleProtoJobData { + return singleProtoJobDataSchema.parse(data); + } + + protected async run( + { + data, + timestamp, + }: Readonly<{ data: SingleProtoJobData; timestamp: number }>, + { attempt, log }: Readonly<{ attempt: number; log: LoggerType }> + ): Promise { + const timeRemaining = timestamp + MAX_RETRY_TIME - Date.now(); + const isFinalAttempt = attempt >= MAX_ATTEMPTS; + + const shouldContinue = await commonShouldJobContinue({ + attempt, + log, + timeRemaining, + }); + if (!shouldContinue) { + return; + } + + const { + contentHint, + identifier, + isSyncMessage, + messageIds = [], + protoBase64, + type, + } = data; + log.info( + `starting ${type} send to ${identifier} with timestamp ${timestamp}` + ); + + const conversation = window.ConversationController.get(identifier); + if (!conversation) { + throw new Error( + `Failed to get conversation for identifier ${identifier}` + ); + } + + const proto = Proto.Content.decode(Bytes.fromBase64(protoBase64)); + const options = await getSendOptions(conversation.attributes, { + syncMessage: isSyncMessage, + }); + + try { + await handleMessageSend( + window.textsecure.messaging.sendIndividualProto({ + contentHint, + identifier, + options, + proto, + timestamp, + }), + { messageIds, sendType: type } + ); + } catch (error: unknown) { + const errors = + error instanceof SendMessageProtoError + ? error.errors || [error] + : [error]; + + await handleMultipleSendErrors({ + errors, + isFinalAttempt, + log, + timeRemaining, + }); + } + } +} + +export const singleProtoJobQueue = new SingleProtoJobQueue({ + maxAttempts: MAX_ATTEMPTS, + queueType: 'single proto', + store: jobQueueDatabaseStore, +}); diff --git a/ts/models/conversations.ts b/ts/models/conversations.ts index f61df7d53d..9bbc71fc1d 100644 --- a/ts/models/conversations.ts +++ b/ts/models/conversations.ts @@ -106,6 +106,7 @@ import * as log from '../logging/log'; import * as Errors from '../types/errors'; import { isMessageUnread } from '../util/isMessageUnread'; import type { SenderKeyTargetType } from '../util/sendToGroup'; +import { singleProtoJobQueue } from '../jobs/singleProtoJobQueue'; /* eslint-disable more/no-then */ window.Whisper = window.Whisper || {}; @@ -2401,12 +2402,6 @@ export class ConversationModel extends window.Backbone // server updates were successful. await this.applyMessageRequestResponse(response); - const ourConversation = - window.ConversationController.getOurConversationOrThrow(); - const sendOptions = await getSendOptions(ourConversation.attributes, { - syncMessage: true, - }); - const groupId = this.getGroupIdBuffer(); if (window.ConversationController.areWePrimaryDevice()) { @@ -2417,20 +2412,19 @@ export class ConversationModel extends window.Backbone } try { - await handleMessageSend( - window.textsecure.messaging.syncMessageRequestResponse( - { - threadE164: this.get('e164'), - threadUuid: this.get('uuid'), - groupId, - type: response, - }, - sendOptions - ), - { messageIds: [], sendType: 'otherSync' } + await singleProtoJobQueue.add( + window.textsecure.messaging.getMessageRequestResponseSync({ + threadE164: this.get('e164'), + threadUuid: this.get('uuid'), + groupId, + type: response, + }) + ); + } catch (error) { + log.error( + 'syncMessageRequestResponse: Failed to queue sync message', + Errors.toLogFormat(error) ); - } catch (result) { - this.processSendResponse(result); } } @@ -2619,17 +2613,6 @@ export class ConversationModel extends window.Backbone return; } - // Because syncVerification sends a (null) message to the target of the verify and - // a sync message to our own devices, we need to send the accessKeys down for both - // contacts. So we merge their sendOptions. - const ourConversation = - window.ConversationController.getOurConversationOrThrow(); - const sendOptions = await getSendOptions(ourConversation.attributes, { - syncMessage: true, - }); - const contactSendOptions = await getSendOptions(this.attributes); - const options = { ...sendOptions, ...contactSendOptions }; - const key = await window.textsecure.storage.protocol.loadIdentityKey( UUID.checkedLookup(identifier) ); @@ -2639,16 +2622,21 @@ export class ConversationModel extends window.Backbone ); } - await handleMessageSend( - window.textsecure.messaging.syncVerification( - e164, - uuid.toString(), - state, - key, - options - ), - { messageIds: [], sendType: 'verificationSync' } - ); + try { + await singleProtoJobQueue.add( + window.textsecure.messaging.getVerificationSync( + e164, + uuid.toString(), + state, + key + ) + ); + } catch (error) { + log.error( + 'sendVerifySyncMessage: Failed to queue sync message', + Errors.toLogFormat(error) + ); + } } isVerified(): boolean { diff --git a/ts/services/storage.ts b/ts/services/storage.ts index 73103f86b1..c5c1523156 100644 --- a/ts/services/storage.ts +++ b/ts/services/storage.ts @@ -1,4 +1,4 @@ -// Copyright 2020-2021 Signal Messenger, LLC +// Copyright 2020-2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only import { debounce, isNumber } from 'lodash'; @@ -27,7 +27,6 @@ import type { ConversationModel } from '../models/conversations'; import { strictAssert } from '../util/assert'; import * as durations from '../util/durations'; import { BackOff } from '../util/BackOff'; -import { handleMessageSend } from '../util/handleMessageSend'; import { storageJobQueue } from '../util/JobQueue'; import { sleep } from '../util/sleep'; import { isMoreRecentThan } from '../util/timestamp'; @@ -39,6 +38,8 @@ import { } from '../util/whatTypeOfConversation'; import { SignalService as Proto } from '../protobuf'; import * as log from '../logging/log'; +import { singleProtoJobQueue } from '../jobs/singleProtoJobQueue'; +import * as Errors from '../types/errors'; type IManifestRecordIdentifier = Proto.ManifestRecord.IIdentifier; @@ -553,15 +554,21 @@ async function uploadManifest( if (window.ConversationController.areWePrimaryDevice()) { log.warn( - 'uploadManifest: We are primary device; not sending sync manifest' + 'storageService.uploadManifest: We are primary device; not sending sync manifest' ); return; } - await handleMessageSend( - window.textsecure.messaging.sendFetchManifestSyncMessage(), - { messageIds: [], sendType: 'otherSync' } - ); + try { + await singleProtoJobQueue.add( + window.textsecure.messaging.getFetchManifestSyncMessage() + ); + } catch (error) { + log.error( + 'storageService.uploadManifest: Failed to queue sync message', + Errors.toLogFormat(error) + ); + } } async function stopStorageServiceSync() { @@ -578,7 +585,7 @@ async function stopStorageServiceSync() { await sleep(backOff.getAndIncrement()); log.info('storageService.stopStorageServiceSync: requesting new keys'); - setTimeout(() => { + setTimeout(async () => { if (!window.textsecure.messaging) { throw new Error('storageService.stopStorageServiceSync: We are offline!'); } @@ -589,11 +596,16 @@ async function stopStorageServiceSync() { ); return; } - - handleMessageSend(window.textsecure.messaging.sendRequestKeySyncMessage(), { - messageIds: [], - sendType: 'otherSync', - }); + try { + await singleProtoJobQueue.add( + window.textsecure.messaging.getRequestKeySyncMessage() + ); + } catch (error) { + log.error( + 'storageService.stopStorageServiceSync: Failed to queue sync message', + Errors.toLogFormat(error) + ); + } }); } @@ -1118,14 +1130,23 @@ async function upload(fromSync = false): Promise { backOff.reset(); if (window.ConversationController.areWePrimaryDevice()) { - log.warn('upload: We are primary device; not sending key sync request'); + log.warn( + 'storageService.upload: We are primary device; not sending key sync request' + ); return; } - await handleMessageSend( - window.textsecure.messaging.sendRequestKeySyncMessage(), - { messageIds: [], sendType: 'otherSync' } - ); + try { + await singleProtoJobQueue.add( + window.textsecure.messaging.getRequestKeySyncMessage() + ); + } catch (error) { + log.error( + 'storageService.upload: Failed to queue sync message', + Errors.toLogFormat(error) + ); + } + return; } diff --git a/ts/services/writeProfile.ts b/ts/services/writeProfile.ts index ec5abe0b3c..aebe094c32 100644 --- a/ts/services/writeProfile.ts +++ b/ts/services/writeProfile.ts @@ -1,4 +1,4 @@ -// Copyright 2021 Signal Messenger, LLC +// Copyright 2021-2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only import dataInterface from '../sql/Client'; @@ -6,7 +6,9 @@ import type { ConversationType } from '../state/ducks/conversations'; import { computeHash } from '../Crypto'; import { encryptProfileData } from '../util/encryptProfileData'; import { getProfile } from '../util/getProfile'; -import { handleMessageSend } from '../util/handleMessageSend'; +import { singleProtoJobQueue } from '../jobs/singleProtoJobQueue'; +import * as Errors from '../types/errors'; +import * as log from '../logging/log'; export async function writeProfile( conversation: ConversationType, @@ -85,8 +87,14 @@ export async function writeProfile( dataInterface.updateConversation(model.attributes); model.captureChange('writeProfile'); - await handleMessageSend( - window.textsecure.messaging.sendFetchLocalProfileSyncMessage(), - { messageIds: [], sendType: 'otherSync' } - ); + try { + await singleProtoJobQueue.add( + window.textsecure.messaging.getFetchLocalProfileSyncMessage() + ); + } catch (error) { + log.error( + 'writeProfile: Failed to queue sync message', + Errors.toLogFormat(error) + ); + } } diff --git a/ts/services/writeUsername.ts b/ts/services/writeUsername.ts index 1de5b48c46..c224a060dc 100644 --- a/ts/services/writeUsername.ts +++ b/ts/services/writeUsername.ts @@ -1,9 +1,11 @@ -// Copyright 2021 Signal Messenger, LLC +// Copyright 2021-2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only +import { singleProtoJobQueue } from '../jobs/singleProtoJobQueue'; import dataInterface from '../sql/Client'; -import { handleMessageSend } from '../util/handleMessageSend'; import { updateOurUsername } from '../util/updateOurUsername'; +import * as Errors from '../types/errors'; +import * as log from '../logging/log'; export async function writeUsername({ username, @@ -32,8 +34,14 @@ export async function writeUsername({ dataInterface.updateConversation(me.attributes); - await handleMessageSend( - window.textsecure.messaging.sendFetchLocalProfileSyncMessage(), - { messageIds: [], sendType: 'otherSync' } - ); + try { + await singleProtoJobQueue.add( + window.textsecure.messaging.getFetchLocalProfileSyncMessage() + ); + } catch (error) { + log.error( + 'writeUsername: Failed to queue sync message', + Errors.toLogFormat(error) + ); + } } diff --git a/ts/shims/textsecure.ts b/ts/shims/textsecure.ts index f05f14f8fe..65286c0ca0 100644 --- a/ts/shims/textsecure.ts +++ b/ts/shims/textsecure.ts @@ -1,20 +1,16 @@ -// Copyright 2019-2021 Signal Messenger, LLC +// Copyright 2019-2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only -import { handleMessageSend } from '../util/handleMessageSend'; -import { getSendOptions } from '../util/getSendOptions'; import * as log from '../logging/log'; +import { singleProtoJobQueue } from '../jobs/singleProtoJobQueue'; +import * as Errors from '../types/errors'; export async function sendStickerPackSync( packId: string, packKey: string, installed: boolean ): Promise { - const { ConversationController, textsecure } = window; - const ourConversation = ConversationController.getOurConversationOrThrow(); - const sendOptions = await getSendOptions(ourConversation.attributes, { - syncMessage: true, - }); + const { textsecure } = window; if (!textsecure.messaging) { log.error( @@ -31,22 +27,20 @@ export async function sendStickerPackSync( return; } - handleMessageSend( - textsecure.messaging.sendStickerPackSync( - [ + try { + await singleProtoJobQueue.add( + textsecure.messaging.getStickerPackSync([ { packId, packKey, installed, }, - ], - sendOptions - ), - { messageIds: [], sendType: 'otherSync' } - ).catch(error => { - log.error( - 'shim: Error calling sendStickerPackSync:', - error && error.stack ? error.stack : error + ]) ); - }); + } catch (error) { + log.error( + 'sendStickerPackSync: Failed to queue sync message', + Errors.toLogFormat(error) + ); + } } diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index 91a2130235..de24ac6724 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -4096,7 +4096,6 @@ async function removeAll(): Promise { DELETE FROM identityKeys; DELETE FROM items; DELETE FROM jobs; - DELETE FROM jobs; DELETE FROM messages_fts; DELETE FROM messages; DELETE FROM preKeys; diff --git a/ts/test-node/jobs/helpers/handleMultipleSendErrors_test.ts b/ts/test-node/jobs/helpers/handleMultipleSendErrors_test.ts index 24e316da11..38614cd44e 100644 --- a/ts/test-node/jobs/helpers/handleMultipleSendErrors_test.ts +++ b/ts/test-node/jobs/helpers/handleMultipleSendErrors_test.ts @@ -3,7 +3,7 @@ import { assert } from 'chai'; import * as sinon from 'sinon'; -import { noop } from 'lodash'; +import { noop, omit } from 'lodash'; import { HTTPError } from '../../../textsecure/Errors'; import { SECOND } from '../../../util/durations'; @@ -63,6 +63,17 @@ describe('handleMultipleSendErrors', () => { sinon.assert.calledOnceWithExactly(markFailed); }); + it("doesn't require `markFailed`", async () => { + await assert.isRejected( + handleMultipleSendErrors({ + ...omit(defaultOptions, 'markFailed'), + errors: [new Error('Test message')], + isFinalAttempt: true, + }), + 'Test message' + ); + }); + describe('413 handling', () => { it('sleeps for the longest 413 Retry-After time', async () => { let done = false; diff --git a/ts/textsecure/OutgoingMessage.ts b/ts/textsecure/OutgoingMessage.ts index 8f19d6f32f..726f73c322 100644 --- a/ts/textsecure/OutgoingMessage.ts +++ b/ts/textsecure/OutgoingMessage.ts @@ -1,4 +1,4 @@ -// Copyright 2020-2021 Signal Messenger, LLC +// Copyright 2020-2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only /* eslint-disable guard-for-in */ @@ -53,12 +53,10 @@ export type SendLogCallbackType = (options: { deviceIds: Array; }) => Promise; -export const serializedCertificateSchema = z - .object({ - expires: z.number().optional(), - serialized: z.instanceof(Uint8Array), - }) - .nonstrict(); +export const serializedCertificateSchema = z.object({ + expires: z.number().optional(), + serialized: z.instanceof(Uint8Array), +}); export type SerializedCertificateType = z.infer< typeof serializedCertificateSchema diff --git a/ts/textsecure/SendMessage.ts b/ts/textsecure/SendMessage.ts index 888895325d..ce8e489a4e 100644 --- a/ts/textsecure/SendMessage.ts +++ b/ts/textsecure/SendMessage.ts @@ -1,4 +1,4 @@ -// Copyright 2020-2021 Signal Messenger, LLC +// Copyright 2020-2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only /* eslint-disable no-nested-ternary */ @@ -6,6 +6,7 @@ /* eslint-disable no-bitwise */ /* eslint-disable max-classes-per-file */ +import { z } from 'zod'; import type { Dictionary } from 'lodash'; import Long from 'long'; import PQueue from 'p-queue'; @@ -62,7 +63,7 @@ import type { } from '../linkPreviews/linkPreviewFetch'; import { concat, isEmpty, map } from '../util/iterables'; import type { SendTypesType } from '../util/handleMessageSend'; -import { handleMessageSend, shouldSaveProto } from '../util/handleMessageSend'; +import { shouldSaveProto, sendTypesEnum } from '../util/handleMessageSend'; import { SignalService as Proto } from '../protobuf'; import * as log from '../logging/log'; @@ -139,6 +140,17 @@ export type AttachmentType = { blurHash?: string; }; +export const singleProtoJobDataSchema = z.object({ + contentHint: z.number(), + identifier: z.string(), + isSyncMessage: z.boolean(), + messageIds: z.array(z.string()).optional(), + protoBase64: z.string(), + type: sendTypesEnum, +}); + +export type SingleProtoJobData = z.infer; + function makeAttachmentSendReady( attachment: Attachment.AttachmentType ): AttachmentType | undefined { @@ -956,17 +968,17 @@ export default class MessageSender { } async sendIndividualProto({ + contentHint, identifier, + options, proto, timestamp, - contentHint, - options, }: Readonly<{ + contentHint: number; identifier: string | undefined; + options?: SendOptionsType; proto: Proto.DataMessage | Proto.Content | PlaintextContent; timestamp: number; - contentHint: number; - options?: SendOptionsType; }>): Promise { assert(identifier, "Identifier can't be undefined"); return new Promise((resolve, reject) => { @@ -1087,7 +1099,7 @@ export default class MessageSender { sentMessage.isRecipientUpdate = true; } - // Though this field has 'unidenified' in the name, it should have entries for each + // Though this field has 'unidentified' in the name, it should have entries for each // number we sent to. if (!isEmpty(conversationIdsSentTo)) { sentMessage.unidentifiedStatus = [ @@ -1128,9 +1140,7 @@ export default class MessageSender { }); } - async sendRequestBlockSyncMessage( - options?: Readonly - ): Promise { + getRequestBlockSyncMessage(): SingleProtoJobData { const myUuid = window.textsecure.storage.user.getCheckedUuid(); const request = new Proto.SyncMessage.Request(); @@ -1142,18 +1152,18 @@ export default class MessageSender { const { ContentHint } = Proto.UnidentifiedSenderMessage.Message; - return this.sendIndividualProto({ + return { + contentHint: ContentHint.RESENDABLE, identifier: myUuid.toString(), - proto: contentMessage, - timestamp: Date.now(), - contentHint: ContentHint.IMPLICIT, - options, - }); + isSyncMessage: true, + protoBase64: Bytes.toBase64( + Proto.Content.encode(contentMessage).finish() + ), + type: 'blockSyncRequest', + }; } - async sendRequestConfigurationSyncMessage( - options?: Readonly - ): Promise { + getRequestConfigurationSyncMessage(): SingleProtoJobData { const myUuid = window.textsecure.storage.user.getCheckedUuid(); const request = new Proto.SyncMessage.Request(); @@ -1165,18 +1175,18 @@ export default class MessageSender { const { ContentHint } = Proto.UnidentifiedSenderMessage.Message; - return this.sendIndividualProto({ + return { + contentHint: ContentHint.RESENDABLE, identifier: myUuid.toString(), - proto: contentMessage, - timestamp: Date.now(), - contentHint: ContentHint.IMPLICIT, - options, - }); + isSyncMessage: true, + protoBase64: Bytes.toBase64( + Proto.Content.encode(contentMessage).finish() + ), + type: 'configurationSyncRequest', + }; } - async sendRequestGroupSyncMessage( - options?: Readonly - ): Promise { + getRequestGroupSyncMessage(): SingleProtoJobData { const myUuid = window.textsecure.storage.user.getCheckedUuid(); const request = new Proto.SyncMessage.Request(); @@ -1188,18 +1198,18 @@ export default class MessageSender { const { ContentHint } = Proto.UnidentifiedSenderMessage.Message; - return this.sendIndividualProto({ + return { + contentHint: ContentHint.RESENDABLE, identifier: myUuid.toString(), - proto: contentMessage, - timestamp: Date.now(), - contentHint: ContentHint.IMPLICIT, - options, - }); + isSyncMessage: true, + protoBase64: Bytes.toBase64( + Proto.Content.encode(contentMessage).finish() + ), + type: 'groupSyncRequest', + }; } - async sendRequestContactSyncMessage( - options?: Readonly - ): Promise { + getRequestContactSyncMessage(): SingleProtoJobData { const myUuid = window.textsecure.storage.user.getCheckedUuid(); const request = new Proto.SyncMessage.Request(); @@ -1211,18 +1221,18 @@ export default class MessageSender { const { ContentHint } = Proto.UnidentifiedSenderMessage.Message; - return this.sendIndividualProto({ + return { + contentHint: ContentHint.RESENDABLE, identifier: myUuid.toString(), - proto: contentMessage, - timestamp: Date.now(), - contentHint: ContentHint.IMPLICIT, - options, - }); + isSyncMessage: true, + protoBase64: Bytes.toBase64( + Proto.Content.encode(contentMessage).finish() + ), + type: 'contactSyncRequest', + }; } - async sendFetchManifestSyncMessage( - options?: Readonly - ): Promise { + getFetchManifestSyncMessage(): SingleProtoJobData { const myUuid = window.textsecure.storage.user.getCheckedUuid(); const fetchLatest = new Proto.SyncMessage.FetchLatest(); @@ -1235,18 +1245,18 @@ export default class MessageSender { const { ContentHint } = Proto.UnidentifiedSenderMessage.Message; - return this.sendIndividualProto({ + return { + contentHint: ContentHint.RESENDABLE, identifier: myUuid.toString(), - proto: contentMessage, - timestamp: Date.now(), - contentHint: ContentHint.IMPLICIT, - options, - }); + isSyncMessage: true, + protoBase64: Bytes.toBase64( + Proto.Content.encode(contentMessage).finish() + ), + type: 'fetchLatestManifestSync', + }; } - async sendFetchLocalProfileSyncMessage( - options?: Readonly - ): Promise { + getFetchLocalProfileSyncMessage(): SingleProtoJobData { const myUuid = window.textsecure.storage.user.getCheckedUuid(); const fetchLatest = new Proto.SyncMessage.FetchLatest(); @@ -1259,18 +1269,18 @@ export default class MessageSender { const { ContentHint } = Proto.UnidentifiedSenderMessage.Message; - return this.sendIndividualProto({ + return { + contentHint: ContentHint.RESENDABLE, identifier: myUuid.toString(), - proto: contentMessage, - timestamp: Date.now(), - contentHint: ContentHint.IMPLICIT, - options, - }); + isSyncMessage: true, + protoBase64: Bytes.toBase64( + Proto.Content.encode(contentMessage).finish() + ), + type: 'fetchLocalProfileSync', + }; } - async sendRequestKeySyncMessage( - options?: Readonly - ): Promise { + getRequestKeySyncMessage(): SingleProtoJobData { const myUuid = window.textsecure.storage.user.getCheckedUuid(); const request = new Proto.SyncMessage.Request(); @@ -1283,13 +1293,15 @@ export default class MessageSender { const { ContentHint } = Proto.UnidentifiedSenderMessage.Message; - return this.sendIndividualProto({ + return { + contentHint: ContentHint.RESENDABLE, identifier: myUuid.toString(), - proto: contentMessage, - timestamp: Date.now(), - contentHint: ContentHint.IMPLICIT, - options, - }); + isSyncMessage: true, + protoBase64: Bytes.toBase64( + Proto.Content.encode(contentMessage).finish() + ), + type: 'keySyncRequest', + }; } async syncReadMessages( @@ -1381,30 +1393,29 @@ export default class MessageSender { }); } - async syncMessageRequestResponse( - responseArgs: Readonly<{ + getMessageRequestResponseSync( + options: Readonly<{ threadE164?: string; threadUuid?: string; groupId?: Uint8Array; type: number; - }>, - options?: Readonly - ): Promise { + }> + ): SingleProtoJobData { const myUuid = window.textsecure.storage.user.getCheckedUuid(); const syncMessage = this.createSyncMessage(); const response = new Proto.SyncMessage.MessageRequestResponse(); - if (responseArgs.threadE164 !== undefined) { - response.threadE164 = responseArgs.threadE164; + if (options.threadE164 !== undefined) { + response.threadE164 = options.threadE164; } - if (responseArgs.threadUuid !== undefined) { - response.threadUuid = responseArgs.threadUuid; + if (options.threadUuid !== undefined) { + response.threadUuid = options.threadUuid; } - if (responseArgs.groupId) { - response.groupId = responseArgs.groupId; + if (options.groupId) { + response.groupId = options.groupId; } - response.type = responseArgs.type; + response.type = options.type; syncMessage.messageRequestResponse = response; const contentMessage = new Proto.Content(); @@ -1412,23 +1423,24 @@ export default class MessageSender { const { ContentHint } = Proto.UnidentifiedSenderMessage.Message; - return this.sendIndividualProto({ - identifier: myUuid.toString(), - proto: contentMessage, - timestamp: Date.now(), + return { contentHint: ContentHint.RESENDABLE, - options, - }); + identifier: myUuid.toString(), + isSyncMessage: true, + protoBase64: Bytes.toBase64( + Proto.Content.encode(contentMessage).finish() + ), + type: 'messageRequestSync', + }; } - async sendStickerPackSync( + getStickerPackSync( operations: ReadonlyArray<{ packId: string; packKey: string; installed: boolean; - }>, - options?: Readonly - ): Promise { + }> + ): SingleProtoJobData { const myUuid = window.textsecure.storage.user.getCheckedUuid(); const ENUM = Proto.SyncMessage.StickerPackOperation.Type; @@ -1451,44 +1463,31 @@ export default class MessageSender { const { ContentHint } = Proto.UnidentifiedSenderMessage.Message; - return this.sendIndividualProto({ + return { + contentHint: ContentHint.RESENDABLE, identifier: myUuid.toString(), - proto: contentMessage, - timestamp: Date.now(), - contentHint: ContentHint.IMPLICIT, - options, - }); + isSyncMessage: true, + protoBase64: Bytes.toBase64( + Proto.Content.encode(contentMessage).finish() + ), + type: 'stickerPackSync', + }; } - async syncVerification( + getVerificationSync( destinationE164: string | undefined, destinationUuid: string | undefined, state: number, - identityKey: Readonly, - options?: Readonly - ): Promise { + identityKey: Readonly + ): SingleProtoJobData { const myUuid = window.textsecure.storage.user.getCheckedUuid(); - const now = Date.now(); if (!destinationE164 && !destinationUuid) { throw new Error('syncVerification: Neither e164 nor UUID were provided'); } - // Get padding which we can share between null message and verified sync const padding = this.getRandomPadding(); - // First send a null message to mask the sync message. - await handleMessageSend( - this.sendNullMessage( - { uuid: destinationUuid, e164: destinationE164, padding }, - options - ), - { - messageIds: [], - sendType: 'nullMessage', - } - ); - const verified = new Proto.Verified(); verified.state = state; if (destinationE164) { @@ -1503,18 +1502,20 @@ export default class MessageSender { const syncMessage = this.createSyncMessage(); syncMessage.verified = verified; - const secondMessage = new Proto.Content(); - secondMessage.syncMessage = syncMessage; + const contentMessage = new Proto.Content(); + contentMessage.syncMessage = syncMessage; const { ContentHint } = Proto.UnidentifiedSenderMessage.Message; - return this.sendIndividualProto({ - identifier: myUuid.toString(), - proto: secondMessage, - timestamp: now, + return { contentHint: ContentHint.RESENDABLE, - options, - }); + identifier: myUuid.toString(), + isSyncMessage: true, + protoBase64: Bytes.toBase64( + Proto.Content.encode(contentMessage).finish() + ), + type: 'verificationSync', + }; } // Sending messages to contacts @@ -1542,7 +1543,7 @@ export default class MessageSender { } : {}), }, - contentHint: ContentHint.IMPLICIT, + contentHint: ContentHint.RESENDABLE, groupId: undefined, options, }); @@ -1650,14 +1651,15 @@ export default class MessageSender { }); } - async sendNullMessage( - { - uuid, - e164, - padding, - }: Readonly<{ uuid?: string; e164?: string; padding?: Uint8Array }>, - options?: Readonly - ): Promise { + getNullMessage({ + uuid, + e164, + padding, + }: Readonly<{ + uuid?: string; + e164?: string; + padding?: Uint8Array; + }>): SingleProtoJobData { const nullMessage = new Proto.NullMessage(); const identifier = uuid || e164; @@ -1672,15 +1674,15 @@ export default class MessageSender { const { ContentHint } = Proto.UnidentifiedSenderMessage.Message; - // We want the NullMessage to look like a normal outgoing message - const timestamp = Date.now(); - return this.sendIndividualProto({ + return { + contentHint: ContentHint.RESENDABLE, identifier, - proto: contentMessage, - timestamp, - contentHint: ContentHint.IMPLICIT, - options, - }); + isSyncMessage: false, + protoBase64: Bytes.toBase64( + Proto.Content.encode(contentMessage).finish() + ), + type: 'nullMessage', + }; } async sendExpirationTimerUpdateToIdentifier( diff --git a/ts/textsecure/SyncRequest.ts b/ts/textsecure/SyncRequest.ts index c63d7fb0a2..33b9eb6d45 100644 --- a/ts/textsecure/SyncRequest.ts +++ b/ts/textsecure/SyncRequest.ts @@ -1,4 +1,4 @@ -// Copyright 2020-2021 Signal Messenger, LLC +// Copyright 2020-2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only /* eslint-disable more/no-then */ @@ -12,9 +12,9 @@ import MessageReceiver from './MessageReceiver'; import type { ContactSyncEvent, GroupSyncEvent } from './messageReceiverEvents'; import MessageSender from './SendMessage'; import { assert } from '../util/assert'; -import { getSendOptions } from '../util/getSendOptions'; -import { handleMessageSend } from '../util/handleMessageSend'; import * as log from '../logging/log'; +import { singleProtoJobQueue } from '../jobs/singleProtoJobQueue'; +import * as Errors from '../types/errors'; class SyncRequestInner extends EventTarget { private started = false; @@ -65,47 +65,28 @@ class SyncRequestInner extends EventTarget { const { sender } = this; - const ourConversation = - window.ConversationController.getOurConversationOrThrow(); - const sendOptions = await getSendOptions(ourConversation.attributes, { - syncMessage: true, - }); - if (window.ConversationController.areWePrimaryDevice()) { log.warn('SyncRequest.start: We are primary device; returning early'); return; } - log.info('SyncRequest created. Sending config sync request...'); - handleMessageSend(sender.sendRequestConfigurationSyncMessage(sendOptions), { - messageIds: [], - sendType: 'otherSync', - }); + log.info( + 'SyncRequest created. Sending config, block, contact, and group requests...' + ); + try { + await Promise.all([ + singleProtoJobQueue.add(sender.getRequestConfigurationSyncMessage()), + singleProtoJobQueue.add(sender.getRequestBlockSyncMessage()), + singleProtoJobQueue.add(sender.getRequestContactSyncMessage()), + singleProtoJobQueue.add(sender.getRequestGroupSyncMessage()), + ]); + } catch (error: unknown) { + log.error( + 'SyncRequest: Failed to add request jobs', + Errors.toLogFormat(error) + ); + } - log.info('SyncRequest now sending block sync request...'); - handleMessageSend(sender.sendRequestBlockSyncMessage(sendOptions), { - messageIds: [], - sendType: 'otherSync', - }); - - log.info('SyncRequest now sending contact sync message...'); - handleMessageSend(sender.sendRequestContactSyncMessage(sendOptions), { - messageIds: [], - sendType: 'otherSync', - }) - .then(() => { - log.info('SyncRequest now sending group sync message...'); - return handleMessageSend( - sender.sendRequestGroupSyncMessage(sendOptions), - { messageIds: [], sendType: 'otherSync' } - ); - }) - .catch((error: Error) => { - log.error( - 'SyncRequest error:', - error && error.stack ? error.stack : error - ); - }); this.timeout = setTimeout(this.onTimeout.bind(this), this.timeoutMillis); } diff --git a/ts/util/handleMessageSend.ts b/ts/util/handleMessageSend.ts index 70bf4accec..dd2255e951 100644 --- a/ts/util/handleMessageSend.ts +++ b/ts/util/handleMessageSend.ts @@ -1,6 +1,7 @@ -// Copyright 2021 Signal Messenger, LLC +// Copyright 2021-2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only +import { z } from 'zod'; import { isNumber } from 'lodash'; import type { CallbackResultType } from '../textsecure/Types.d'; import dataInterface from '../sql/Client'; @@ -15,40 +16,47 @@ export const SEALED_SENDER = { UNRESTRICTED: 3, }; -export type SendTypesType = - | 'callingMessage' // excluded from send log - | 'deleteForEveryone' - | 'deliveryReceipt' - | 'expirationTimerUpdate' - | 'groupChange' - | 'legacyGroupChange' - | 'message' - | 'nullMessage' // excluded from send log - | 'otherSync' - | 'profileKeyUpdate' - | 'reaction' - | 'readReceipt' - | 'readSync' - | 'resendFromLog' // excluded from send log - | 'resetSession' - | 'retryRequest' // excluded from send log - | 'senderKeyDistributionMessage' - | 'sentSync' - | 'typing' // excluded from send log - | 'verificationSync' - | 'viewOnceSync' - | 'viewSync' - | 'viewedReceipt'; +export const sendTypesEnum = z.enum([ + 'blockSyncRequest', + 'callingMessage', // excluded from send log + 'configurationSyncRequest', + 'contactSyncRequest', + 'deleteForEveryone', + 'deliveryReceipt', + 'expirationTimerUpdate', + 'fetchLatestManifestSync', + 'fetchLocalProfileSync', + 'groupChange', + 'groupSyncRequest', + 'keySyncRequest', + 'legacyGroupChange', + 'message', + 'messageRequestSync', + 'nullMessage', + 'profileKeyUpdate', + 'reaction', + 'readReceipt', + 'readSync', + 'resendFromLog', // excluded from send log + 'resetSession', + 'retryRequest', // excluded from send log + 'senderKeyDistributionMessage', + 'sentSync', + 'stickerPackSync', + 'typing', // excluded from send log + 'verificationSync', + 'viewOnceSync', + 'viewSync', + 'viewedReceipt', +]); + +export type SendTypesType = z.infer; export function shouldSaveProto(sendType: SendTypesType): boolean { if (sendType === 'callingMessage') { return false; } - if (sendType === 'nullMessage') { - return false; - } - if (sendType === 'resendFromLog') { return false; } diff --git a/ts/util/handleRetry.ts b/ts/util/handleRetry.ts index 0847b5cc1e..4c01c5975b 100644 --- a/ts/util/handleRetry.ts +++ b/ts/util/handleRetry.ts @@ -1,4 +1,4 @@ -// Copyright 2021 Signal Messenger, LLC +// Copyright 2021-2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only import { @@ -19,6 +19,7 @@ import { Address } from '../types/Address'; import { QualifiedAddress } from '../types/QualifiedAddress'; import { ToastDecryptionError } from '../components/ToastDecryptionError'; import { showToast } from './showToast'; +import * as Errors from '../types/errors'; import type { ConversationModel } from '../models/conversations'; import type { @@ -30,6 +31,7 @@ import type { import { SignalService as Proto } from '../protobuf'; import * as log from '../logging/log'; +import { singleProtoJobQueue } from '../jobs/singleProtoJobQueue'; const RETRY_LIMIT = 5; @@ -311,22 +313,17 @@ async function sendDistributionMessageOrNullMessage( `sendDistributionMessageOrNullMessage/${logId}: Did not send distribution message, sending null message` ); + // Enqueue a null message using the newly-created session try { - const sendOptions = await getSendOptions(conversation.attributes); - const result = await handleMessageSend( - window.textsecure.messaging.sendNullMessage( - { uuid: requesterUuid }, - sendOptions - ), - { messageIds: [], sendType: 'nullMessage' } + await singleProtoJobQueue.add( + window.textsecure.messaging.getNullMessage({ + uuid: requesterUuid, + }) ); - if (result && result.errors && result.errors.length > 0) { - throw result.errors[0]; - } } catch (error) { log.error( - `maybeSendDistributionMessage/${logId}: Failed to send null message`, - error && error.stack ? error.stack : error + 'sendDistributionMessageOrNullMessage: Failed to queue null message', + Errors.toLogFormat(error) ); } }