diff --git a/ts/background.ts b/ts/background.ts index ad40c3ab591..cf043d6773d 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -425,7 +425,7 @@ export async function startApp(): Promise { accountManager = new window.textsecure.AccountManager(server); accountManager.addEventListener('startRegistration', () => { - pauseProcessing(); + pauseProcessing('startRegistration'); // We should already be logged out, but this ensures that the next time we connect // to the auth socket it is from newly-registered credentials drop(server?.logout()); @@ -771,7 +771,7 @@ export async function startApp(): Promise { 'WebAPI should be initialized together with MessageReceiver' ); log.info('background/shutdown: shutting down messageReceiver'); - pauseProcessing(); + pauseProcessing('shutdown'); await window.waitForAllBatchers(); } @@ -1182,14 +1182,14 @@ export async function startApp(): Promise { log.info('Storage fetch'); drop(window.storage.fetch()); - function pauseProcessing() { + function pauseProcessing(reason: string) { strictAssert(server != null, 'WebAPI not initialized'); strictAssert( messageReceiver != null, 'messageReceiver must be initialized' ); - StorageService.disableStorageService(); + StorageService.disableStorageService(reason); server.unregisterRequestHandler(messageReceiver); messageReceiver.stopProcessing(); } @@ -1447,13 +1447,18 @@ export async function startApp(): Promise { remotelyExpired = true; }); - async function runStorageService({ reason }: { reason: string }) { + async function enableStorageService({ andSync }: { andSync?: string } = {}) { + log.info('enableStorageService: waiting for backupReady'); await backupReady.promise; + log.info('enableStorageService: enabling and running'); StorageService.enableStorageService(); - StorageService.runStorageServiceSyncJob({ - reason: `runStorageService/${reason}`, - }); + + if (andSync != null) { + await StorageService.runStorageServiceSyncJob({ + reason: andSync, + }); + } } async function start() { @@ -1795,21 +1800,28 @@ export async function startApp(): Promise { wasBackupImported, }); - // 5. Kickoff storage service sync + // 5. Start processing messages from websocket and clear + // `messageReceiver.#isEmptied`. + log.info(`${logId}: enabling message processing`); + messageReceiver.startProcessingQueue(); + server.registerRequestHandler(messageReceiver); + + // 6. Kickoff storage service sync if (isFirstAuthSocketConnect || !postRegistrationSyncsComplete) { log.info(`${logId}: triggering storage service sync`); storageServiceSyncComplete = waitForEvent( 'storageService:syncComplete' ); - drop(runStorageService({ reason: 'afterFirstAuthSocketConnect' })); + drop( + enableStorageService({ + andSync: 'afterFirstAuthSocketConnect', + }) + ); + } else { + drop(enableStorageService()); } - // 6. Start processing messages from websocket - log.info(`${logId}: enabling message processing`); - messageReceiver.startProcessingQueue(); - server.registerRequestHandler(messageReceiver); - // 7. Wait for critical post-registration syncs before showing inbox if (!postRegistrationSyncsComplete) { const syncsToAwaitBeforeShowingInbox = [contactSyncComplete]; @@ -2077,32 +2089,62 @@ export async function startApp(): Promise { return; } + const waitStart = Date.now(); + if (!messageReceiver.hasEmptied()) { log.info( 'waitForEmptyEventQueue: Waiting for MessageReceiver empty event...' ); const { resolve, reject, promise } = explodePromise(); - const timeout = Timers.setTimeout(() => { - reject(new Error('Empty queue never fired')); - }, FIVE_MINUTES); + const cleanup = () => { + messageReceiver?.removeEventListener('empty', onEmptyOnce); + messageReceiver?.removeEventListener('envelopeQueued', onResetTimer); - const onEmptyOnce = () => { - if (messageReceiver) { - messageReceiver.removeEventListener('empty', onEmptyOnce); - } - Timers.clearTimeout(timeout); - if (resolve) { - resolve(); + if (timeout !== undefined) { + Timers.clearTimeout(timeout); + timeout = undefined; } }; + + // Reject after 1 minutes of inactivity. + const onTimeout = () => { + cleanup(); + reject(new Error('Empty queue never fired')); + }; + let timeout: Timers.Timeout | undefined = Timers.setTimeout( + onTimeout, + durations.MINUTE + ); + + const onEmptyOnce = () => { + cleanup(); + resolve(); + }; messageReceiver.addEventListener('empty', onEmptyOnce); + const onResetTimer = () => { + if (timeout !== undefined) { + Timers.clearTimeout(timeout); + } + timeout = Timers.setTimeout(onTimeout, durations.MINUTE); + }; + messageReceiver.addEventListener('envelopeQueued', onResetTimer); + await promise; } - log.info('waitForEmptyEventQueue: Waiting for event handler queue idle...'); - await eventHandlerQueue.onIdle(); + if (eventHandlerQueue.pending !== 0 || eventHandlerQueue.size !== 0) { + log.info( + 'waitForEmptyEventQueue: Waiting for event handler queue idle...' + ); + await eventHandlerQueue.onIdle(); + } + + const duration = Date.now() - waitStart; + if (duration > SECOND) { + log.info(`waitForEmptyEventQueue: resolving after ${duration}ms`); + } } window.waitForEmptyEventQueue = waitForEmptyEventQueue; @@ -3114,7 +3156,7 @@ export async function startApp(): Promise { log.info('unlinkAndDisconnect: logging out'); strictAssert(server !== undefined, 'WebAPI not initialized'); - pauseProcessing(); + pauseProcessing('unlinkAndDisconnect'); backupReady.reject(new Error('Aborted')); backupReady = explodePromise(); diff --git a/ts/services/storage.ts b/ts/services/storage.ts index daa05895f77..cccc508f943 100644 --- a/ts/services/storage.ts +++ b/ts/services/storage.ts @@ -2062,6 +2062,8 @@ async function sync({ strictAssert(manifest.version != null, 'Manifest without version'); const version = manifest.version?.toNumber() ?? 0; + await window.waitForEmptyEventQueue(); + log.info( `storageService.sync: updating to remoteVersion=${version} ` + `sourceDevice=${manifest.sourceDevice ?? '?'} from ` + @@ -2193,10 +2195,20 @@ async function upload({ let storageServiceEnabled = false; export function enableStorageService(): void { + if (storageServiceEnabled) { + return; + } + storageServiceEnabled = true; + log.info('storageService.enableStorageService'); } -export function disableStorageService(): void { +export function disableStorageService(reason: string): void { + if (!storageServiceEnabled) { + return; + } + + log.info(`storageService.disableStorageService: ${reason}`); storageServiceEnabled = false; } @@ -2316,7 +2328,8 @@ export const runStorageServiceSyncJob = debounce( ({ reason }: { reason: string }) => { if (!storageServiceEnabled) { log.info( - 'storageService.runStorageServiceSyncJob: called before enabled' + `storageService.runStorageServiceSyncJob(${reason}): ` + + 'called before enabled' ); return; } diff --git a/ts/textsecure/MessageReceiver.ts b/ts/textsecure/MessageReceiver.ts index 1bb1de9a04f..655c55e1caf 100644 --- a/ts/textsecure/MessageReceiver.ts +++ b/ts/textsecure/MessageReceiver.ts @@ -292,7 +292,6 @@ export default class MessageReceiver #appQueue: PQueue; #decryptAndCacheBatcher: BatcherType; #cacheRemoveBatcher: BatcherType; - #count: number; #processedCount: number; #incomingQueue: PQueue; #isEmptied?: boolean; @@ -308,7 +307,6 @@ export default class MessageReceiver this.#storage = storage; - this.#count = 0; this.#processedCount = 0; if (!serverTrustRoot) { @@ -471,10 +469,12 @@ export default class MessageReceiver ); } + public handleDisconnect(): void { + this.#isEmptied = false; + } + public startProcessingQueue(): void { log.info('MessageReceiver.startProcessingQueue'); - this.#count = 0; - this.#isEmptied = false; this.#stoppingProcessing = false; drop(this.#addCachedMessagesToQueue()); @@ -731,10 +731,6 @@ export default class MessageReceiver id: string, taskType: TaskType ): Promise { - if (taskType === TaskType.Encrypted) { - this.#count += 1; - } - const queue = taskType === TaskType.Encrypted ? this.#encryptedQueue @@ -796,10 +792,6 @@ export default class MessageReceiver }; const waitForIncomingQueue = async () => { - // Note: this.count is used in addToQueue - // Resetting count so everything from the websocket after this starts at zero - this.#count = 0; - drop( this.#addToQueue( waitForEncryptedQueue, diff --git a/ts/textsecure/Provisioner.ts b/ts/textsecure/Provisioner.ts index 4933e2b37fd..57ac3cb7c1a 100644 --- a/ts/textsecure/Provisioner.ts +++ b/ts/textsecure/Provisioner.ts @@ -396,6 +396,9 @@ export class Provisioner { resource.close(); } }, + handleDisconnect() { + // No-op + }, }, timeout ); diff --git a/ts/textsecure/SocketManager.ts b/ts/textsecure/SocketManager.ts index 3792a3a363b..ea0b7c68c85 100644 --- a/ts/textsecure/SocketManager.ts +++ b/ts/textsecure/SocketManager.ts @@ -879,6 +879,17 @@ export class SocketManager extends EventListener { this.#incomingRequestQueue = []; this.#authenticated = undefined; this.#setAuthenticatedStatus({ status: SocketStatus.CLOSED }); + + for (const handlers of this.#requestHandlers) { + try { + handlers.handleDisconnect(); + } catch (error) { + log.warn( + 'SocketManager: got exception while handling disconnect, ' + + `error: ${Errors.toLogFormat(error)}` + ); + } + } } #dropUnauthenticated(process: AbortableProcess): void { diff --git a/ts/textsecure/Types.d.ts b/ts/textsecure/Types.d.ts index 1a6e1efcde1..99002022983 100644 --- a/ts/textsecure/Types.d.ts +++ b/ts/textsecure/Types.d.ts @@ -289,6 +289,7 @@ export type CallbackResultType = { export type IRequestHandler = { handleRequest(request: IncomingWebSocketRequest): void; + handleDisconnect(): void; }; export type PniKeyMaterialType = Readonly<{