From b5849f872a8f2d07bf1b68024e440e473b2327e5 Mon Sep 17 00:00:00 2001 From: Alvaro <110414366+alvaro-signal@users.noreply.github.com> Date: Fri, 24 Feb 2023 12:03:17 -0700 Subject: [PATCH] Drain jobs cleanly on shutdown --- ts/background.ts | 47 ++++++++- ts/jobs/JobQueue.ts | 24 +++++ ts/jobs/conversationJobQueue.ts | 29 +++++- ts/jobs/helpers/InMemoryQueues.ts | 4 + ts/jobs/helpers/commonShouldJobContinue.ts | 7 +- .../sleepForRateLimitRetryAfterTime.ts | 8 +- ts/jobs/initializeAllJobQueues.ts | 12 +++ ts/jobs/reportSpamJobQueue.ts | 7 +- ts/jobs/singleProtoJobQueue.ts | 4 + ts/messageModifiers/ReadSyncs.ts | 9 +- ts/models/conversations.ts | 37 ++++++- ts/test-mock/pnp/send_gv2_invite_test.ts | 1 - ts/util/StartupQueue.ts | 44 ++++++--- ts/util/sleeper.ts | 98 +++++++++++++++++++ 14 files changed, 301 insertions(+), 30 deletions(-) create mode 100644 ts/util/sleeper.ts diff --git a/ts/background.ts b/ts/background.ts index 2f8500496f1..e602a86fa2b 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -65,7 +65,10 @@ import { getContact, isIncoming } from './messages/helpers'; import { migrateMessageData } from './messages/migrateMessageData'; import { createBatcher } from './util/batcher'; import { updateConversationsWithUuidLookup } from './updateConversationsWithUuidLookup'; -import { initializeAllJobQueues } from './jobs/initializeAllJobQueues'; +import { + initializeAllJobQueues, + shutdownAllJobQueues, +} from './jobs/initializeAllJobQueues'; import { removeStorageKeyJobQueue } from './jobs/removeStorageKeyJobQueue'; import { ourProfileKeyService } from './services/ourProfileKey'; import { notificationService } from './services/notifications'; @@ -168,6 +171,8 @@ import { flushAttachmentDownloadQueue } from './util/attachmentDownloadQueue'; import { StartupQueue } from './util/StartupQueue'; import { showConfirmationDialog } from './util/showConfirmationDialog'; import { onCallEventSync } from './util/onCallEventSync'; +import { sleeper } from './util/sleeper'; +import { MINUTE } from './util/durations'; export function isOverHourIntoPast(timestamp: number): boolean { const HOUR = 1000 * 60 * 60; @@ -730,6 +735,46 @@ export async function startApp(): Promise { ) ); + sleeper.shutdown(); + + const shutdownQueues = async () => { + await Promise.allSettled([ + StartupQueue.shutdown(), + shutdownAllJobQueues(), + ]); + + await Promise.allSettled( + window.ConversationController.getAll().map(async convo => { + try { + await convo.shutdownJobQueue(); + } catch (err) { + log.error( + `background/shutdown: error waiting for conversation ${convo.idForLogging} job queue shutdown`, + Errors.toLogFormat(err) + ); + } + }) + ); + }; + + // wait for at most 2 minutes for startup queue and job queues to drain + let timeout: NodeJS.Timeout | undefined; + await Promise.race([ + shutdownQueues(), + new Promise((resolve, _) => { + timeout = setTimeout(() => { + log.warn( + 'background/shutdown - timed out waiting for StartupQueue/JobQueues, continuing with shutdown' + ); + timeout = undefined; + resolve(); + }, 2 * MINUTE); + }), + ]); + if (timeout) { + clearTimeout(timeout); + } + log.info('background/shutdown: waiting for all batchers'); // A number of still-to-queue database queries might be waiting inside batchers. diff --git a/ts/jobs/JobQueue.ts b/ts/jobs/JobQueue.ts index 03b51993c82..fd5a0ebc3ed 100644 --- a/ts/jobs/JobQueue.ts +++ b/ts/jobs/JobQueue.ts @@ -54,6 +54,8 @@ export abstract class JobQueue { private readonly logPrefix: string; + private shuttingDown = false; + private readonly onCompleteCallbacks = new Map< string, { @@ -66,6 +68,10 @@ export abstract class JobQueue { private started = false; + get isShuttingDown(): boolean { + return this.shuttingDown; + } + constructor(options: Readonly) { assertDev( Number.isInteger(options.maxAttempts) && options.maxAttempts >= 1, @@ -115,6 +121,10 @@ export abstract class JobQueue { extra?: Readonly<{ attempt?: number; log?: LoggerType }> ): Promise; + protected getQueues(): ReadonlySet { + return new Set([this.defaultInMemoryQueue]); + } + /** * Start streaming jobs from the store. */ @@ -130,6 +140,10 @@ export abstract class JobQueue { const stream = this.store.stream(this.queueType); for await (const storedJob of stream) { + if (this.shuttingDown) { + log.info(`${this.logPrefix} is shutting down. Can't accept more work.`); + break; + } void this.enqueueStoredJob(storedJob); } } @@ -275,4 +289,14 @@ export abstract class JobQueue { reject(result.err); } } + + async shutdown(): Promise { + const queues = this.getQueues(); + log.info( + `${this.logPrefix} shutdown: stop accepting new work and drain ${queues.size} promise queues` + ); + this.shuttingDown = true; + await Promise.all([...queues].map(q => q.onIdle())); + log.info(`${this.logPrefix} shutdown: complete`); + } } diff --git a/ts/jobs/conversationJobQueue.ts b/ts/jobs/conversationJobQueue.ts index 0179928ed39..6d0574a5d40 100644 --- a/ts/jobs/conversationJobQueue.ts +++ b/ts/jobs/conversationJobQueue.ts @@ -7,7 +7,6 @@ import * as globalLogger from '../logging/log'; import * as durations from '../util/durations'; import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff'; -import { commonShouldJobContinue } from './helpers/commonShouldJobContinue'; import { InMemoryQueues } from './helpers/InMemoryQueues'; import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; import { JobQueue } from './JobQueue'; @@ -24,7 +23,6 @@ import { sendReceipts } from './helpers/sendReceipts'; import type { LoggerType } from '../types/Logging'; import { ConversationVerificationState } from '../state/ducks/conversationsEnums'; -import { sleep } from '../util/sleep'; import { MINUTE } from '../util/durations'; import { OutgoingIdentityKeyError, @@ -38,6 +36,8 @@ import type { Job } from './Job'; import type { ParsedJob } from './types'; import type SendMessage from '../textsecure/SendMessage'; import type { UUIDStringType } from '../types/UUID'; +import { commonShouldJobContinue } from './helpers/commonShouldJobContinue'; +import { sleeper } from '../util/sleeper'; import { receiptSchema, ReceiptType } from '../types/Receipt'; // Note: generally, we only want to add to this list. If you do need to change one of @@ -188,6 +188,10 @@ export class ConversationJobQueue extends JobQueue { } >(); + override getQueues(): ReadonlySet { + return this.inMemoryQueues.allQueues; + } + public override async add( data: Readonly, insert?: (job: ParsedJob) => Promise @@ -290,13 +294,21 @@ export class ConversationJobQueue extends JobQueue { } if (window.Signal.challengeHandler?.isRegistered(conversationId)) { + if (this.isShuttingDown) { + throw new Error("Shutting down, can't wait for captcha challenge."); + } log.info( 'captcha challenge is pending for this conversation; waiting at most 5m...' ); // eslint-disable-next-line no-await-in-loop await Promise.race([ this.startVerificationWaiter(conversation.id), - sleep(5 * MINUTE), + // don't resolve on shutdown, otherwise we end up in an infinite loop + sleeper.sleep( + 5 * MINUTE, + `conversationJobQueue: waiting for captcha: ${conversation.idForLogging()}`, + { resolveOnShutdown: false } + ), ]); continue; } @@ -320,13 +332,22 @@ export class ConversationJobQueue extends JobQueue { return; } + if (this.isShuttingDown) { + throw new Error("Shutting down, can't wait for verification."); + } + log.info( 'verification is pending for this conversation; waiting at most 5m...' ); // eslint-disable-next-line no-await-in-loop await Promise.race([ this.startVerificationWaiter(conversation.id), - sleep(5 * MINUTE), + // don't resolve on shutdown, otherwise we end up in an infinite loop + sleeper.sleep( + 5 * MINUTE, + `conversationJobQueue: verification pending: ${conversation.idForLogging()}`, + { resolveOnShutdown: false } + ), ]); continue; } diff --git a/ts/jobs/helpers/InMemoryQueues.ts b/ts/jobs/helpers/InMemoryQueues.ts index e324bf9f25e..6a9f85c39c0 100644 --- a/ts/jobs/helpers/InMemoryQueues.ts +++ b/ts/jobs/helpers/InMemoryQueues.ts @@ -20,4 +20,8 @@ export class InMemoryQueues { this.queues.set(key, newQueue); return newQueue; } + + get allQueues(): ReadonlySet { + return new Set(this.queues.values()); + } } diff --git a/ts/jobs/helpers/commonShouldJobContinue.ts b/ts/jobs/helpers/commonShouldJobContinue.ts index 87c134b7843..0ce8eac5dcb 100644 --- a/ts/jobs/helpers/commonShouldJobContinue.ts +++ b/ts/jobs/helpers/commonShouldJobContinue.ts @@ -3,9 +3,9 @@ import type { LoggerType } from '../../types/Logging'; import { waitForOnline } from '../../util/waitForOnline'; -import { sleep } from '../../util/sleep'; import { exponentialBackoffSleepTime } from '../../util/exponentialBackoff'; import { isDone as isDeviceLinked } from '../../util/registration'; +import { sleeper } from '../../util/sleeper'; export async function commonShouldJobContinue({ attempt, @@ -45,7 +45,10 @@ export async function commonShouldJobContinue({ const sleepTime = exponentialBackoffSleepTime(attempt); log.info(`sleeping for ${sleepTime}`); - await sleep(sleepTime); + await sleeper.sleep( + sleepTime, + `commonShouldJobContinue: attempt ${attempt}, skipWait ${skipWait}` + ); return true; } diff --git a/ts/jobs/helpers/sleepForRateLimitRetryAfterTime.ts b/ts/jobs/helpers/sleepForRateLimitRetryAfterTime.ts index 37f8ff1f585..c868b53db66 100644 --- a/ts/jobs/helpers/sleepForRateLimitRetryAfterTime.ts +++ b/ts/jobs/helpers/sleepForRateLimitRetryAfterTime.ts @@ -2,7 +2,7 @@ // SPDX-License-Identifier: AGPL-3.0-only import type { LoggerType } from '../../types/Logging'; -import { sleep } from '../../util/sleep'; +import { sleeper } from '../../util/sleeper'; import { findRetryAfterTimeFromError } from './findRetryAfterTimeFromError'; export async function sleepForRateLimitRetryAfterTime({ @@ -24,5 +24,9 @@ export async function sleepForRateLimitRetryAfterTime({ `Got a 413 or 429 response code. Sleeping for ${retryAfter} millisecond(s)` ); - await sleep(retryAfter); + await sleeper.sleep( + retryAfter, + 'sleepForRateLimitRetryAfterTime: Got a 413 or 429 response code', + { resolveOnShutdown: false } + ); } diff --git a/ts/jobs/initializeAllJobQueues.ts b/ts/jobs/initializeAllJobQueues.ts index b3b81ab8b0a..de3cf907e35 100644 --- a/ts/jobs/initializeAllJobQueues.ts +++ b/ts/jobs/initializeAllJobQueues.ts @@ -37,3 +37,15 @@ export function initializeAllJobQueues({ drop(removeStorageKeyJobQueue.streamJobs()); drop(reportSpamJobQueue.streamJobs()); } + +export async function shutdownAllJobQueues(): Promise { + await Promise.allSettled([ + conversationJobQueue.shutdown(), + singleProtoJobQueue.shutdown(), + readSyncJobQueue.shutdown(), + viewSyncJobQueue.shutdown(), + viewOnceOpenJobQueue.shutdown(), + removeStorageKeyJobQueue.shutdown(), + reportSpamJobQueue.shutdown(), + ]); +} diff --git a/ts/jobs/reportSpamJobQueue.ts b/ts/jobs/reportSpamJobQueue.ts index f31851b3764..9ea4e22df9d 100644 --- a/ts/jobs/reportSpamJobQueue.ts +++ b/ts/jobs/reportSpamJobQueue.ts @@ -8,13 +8,13 @@ import { waitForOnline } from '../util/waitForOnline'; import { isDone as isDeviceLinked } from '../util/registration'; import type { LoggerType } from '../types/Logging'; import { map } from '../util/iterables'; -import { sleep } from '../util/sleep'; import { JobQueue } from './JobQueue'; import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; import { parseIntWithFallback } from '../util/parseIntWithFallback'; import type { WebAPIType } from '../textsecure/WebAPI'; import { HTTPError } from '../textsecure/Errors'; +import { sleeper } from '../util/sleeper'; const RETRY_WAIT_TIME = durations.MINUTE; const RETRYABLE_4XX_FAILURE_STATUSES = new Set([ @@ -94,7 +94,10 @@ export class ReportSpamJobQueue extends JobQueue { log.info( `reportSpamJobQueue: server responded with ${code} status code. Sleeping before our next attempt` ); - await sleep(RETRY_WAIT_TIME); + await sleeper.sleep( + RETRY_WAIT_TIME, + `reportSpamJobQueue: server responded with ${code} status code` + ); throw err; } diff --git a/ts/jobs/singleProtoJobQueue.ts b/ts/jobs/singleProtoJobQueue.ts index 2f38ca5f932..98c9282ed99 100644 --- a/ts/jobs/singleProtoJobQueue.ts +++ b/ts/jobs/singleProtoJobQueue.ts @@ -31,6 +31,10 @@ const MAX_ATTEMPTS = exponentialBackoffMaxAttempts(MAX_RETRY_TIME); export class SingleProtoJobQueue extends JobQueue { private parallelQueue = new PQueue({ concurrency: MAX_PARALLEL_JOBS }); + protected override getQueues(): ReadonlySet { + return new Set([this.parallelQueue]); + } + protected override getInMemoryQueue( _parsedJob: ParsedJob ): PQueue { diff --git a/ts/messageModifiers/ReadSyncs.ts b/ts/messageModifiers/ReadSyncs.ts index cd3617ca28d..bebe0ee13f9 100644 --- a/ts/messageModifiers/ReadSyncs.ts +++ b/ts/messageModifiers/ReadSyncs.ts @@ -113,14 +113,15 @@ export class ReadSyncs extends Collection { // TODO DESKTOP-1509: use MessageUpdater.markRead once this is TS message.markRead(readAt, { skipSave: true }); - const updateConversation = () => { + const updateConversation = async () => { // onReadMessage may result in messages older than this one being // marked read. We want those messages to have the same expire timer // start time as this one, so we pass the readAt value through. void message.getConversation()?.onReadMessage(message, readAt); }; - if (StartupQueue.isReady()) { + // only available during initialization + if (StartupQueue.isAvailable()) { const conversation = message.getConversation(); if (conversation) { StartupQueue.add( @@ -130,7 +131,9 @@ export class ReadSyncs extends Collection { ); } } else { - updateConversation(); + // not awaiting since we don't want to block work happening in the + // eventHandlerQueue + void updateConversation(); } } else { const now = Date.now(); diff --git a/ts/models/conversations.ts b/ts/models/conversations.ts index 4cdd09ed927..aee0eeb46e4 100644 --- a/ts/models/conversations.ts +++ b/ts/models/conversations.ts @@ -272,6 +272,8 @@ export class ConversationModel extends window.Backbone private privVerifiedEnum?: typeof window.textsecure.storage.protocol.VerifiedStatus; + private isShuttingDown = false; + override defaults(): Partial { return { unreadCount: 0, @@ -3580,14 +3582,19 @@ export class ConversationModel extends window.Backbone return validateConversation(attributes); } - queueJob( + async queueJob( name: string, callback: (abortSignal: AbortSignal) => Promise ): Promise { - this.jobQueue = this.jobQueue || new PQueue({ concurrency: 1 }); - const logId = `conversation.queueJob(${this.idForLogging()}, ${name})`; + if (this.isShuttingDown) { + log.warn(`${logId}: shutting down, can't accept more work`); + throw new Error(`${logId}: shutting down, can't accept more work`); + } + + this.jobQueue = this.jobQueue || new PQueue({ concurrency: 1 }); + const taskWithTimeout = createTaskWithTimeout(callback, logId); const abortController = new AbortController(); @@ -5683,6 +5690,30 @@ export class ConversationModel extends window.Backbone return this.get('storySendMode') ?? StorySendMode.IfActive; } + + async shutdownJobQueue(): Promise { + log.info(`conversation ${this.idForLogging()} jobQueue shutdown start`); + + if (!this.jobQueue) { + log.info(`conversation ${this.idForLogging()} no jobQueue to shutdown`); + return; + } + + // If the queue takes more than 10 seconds to get to idle, we force it by setting + // isShuttingDown = true which will reject incoming requests. + const to = setTimeout(() => { + log.warn( + `conversation ${this.idForLogging()} jobQueue stop accepting new work` + ); + this.isShuttingDown = true; + }, 10 * SECOND); + + await this.jobQueue.onIdle(); + this.isShuttingDown = true; + clearTimeout(to); + + log.info(`conversation ${this.idForLogging()} jobQueue shutdown complete`); + } } window.Whisper.Conversation = ConversationModel; diff --git a/ts/test-mock/pnp/send_gv2_invite_test.ts b/ts/test-mock/pnp/send_gv2_invite_test.ts index ee6f606a189..b87dff93fb2 100644 --- a/ts/test-mock/pnp/send_gv2_invite_test.ts +++ b/ts/test-mock/pnp/send_gv2_invite_test.ts @@ -152,7 +152,6 @@ describe('pnp/send gv2 invite', function needsName() { let group: Group; { state = await phone.waitForStorageState({ after: state }); - const groups = await phone.getAllGroups(state); assert.strictEqual(groups.length, 1); diff --git a/ts/util/StartupQueue.ts b/ts/util/StartupQueue.ts index 79a88984a9a..17514df5768 100644 --- a/ts/util/StartupQueue.ts +++ b/ts/util/StartupQueue.ts @@ -1,20 +1,26 @@ // Copyright 2021 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only +import PQueue from 'p-queue'; import * as Errors from '../types/errors'; import * as log from '../logging/log'; type EntryType = Readonly<{ value: number; - callback(): void; + callback(): Promise; }>; let startupProcessingQueue: StartupQueue | undefined; export class StartupQueue { private readonly map = new Map(); + private readonly running: PQueue = new PQueue({ + // mostly io-bound work that is not very parallelizable + // small number should be sufficient + concurrency: 5, + }); - public add(id: string, value: number, f: () => void): void { + public add(id: string, value: number, f: () => Promise): void { const existing = this.map.get(id); if (existing && existing.value >= value) { return; @@ -30,26 +36,36 @@ export class StartupQueue { this.map.clear(); for (const { callback } of values) { - try { - callback(); - } catch (error) { - log.error( - 'StartupQueue: Failed to process item due to error', - Errors.toLogFormat(error) - ); - } + void this.running.add(async () => { + try { + return callback(); + } catch (error) { + log.error( + 'StartupQueue: Failed to process item due to error', + Errors.toLogFormat(error) + ); + throw error; + } + }); } } + private shutdown(): Promise { + log.info( + `StartupQueue: Waiting for ${this.running.pending} tasks to drain` + ); + return this.running.onIdle(); + } + static initialize(): void { startupProcessingQueue = new StartupQueue(); } - static isReady(): boolean { + static isAvailable(): boolean { return Boolean(startupProcessingQueue); } - static add(id: string, value: number, f: () => void): void { + static add(id: string, value: number, f: () => Promise): void { startupProcessingQueue?.add(id, value, f); } @@ -57,4 +73,8 @@ export class StartupQueue { startupProcessingQueue?.flush(); startupProcessingQueue = undefined; } + + static async shutdown(): Promise { + await startupProcessingQueue?.shutdown(); + } } diff --git a/ts/util/sleeper.ts b/ts/util/sleeper.ts new file mode 100644 index 00000000000..a1ee4d9ca8d --- /dev/null +++ b/ts/util/sleeper.ts @@ -0,0 +1,98 @@ +// Copyright 2023 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import * as log from '../logging/log'; +import * as Errors from '../types/errors'; + +/** + * Provides a way to delay tasks + * but also a way to force sleeping tasks to immediately resolve/reject on shutdown + */ +export class Sleeper { + private shuttingDown = false; + private shutdownCallbacks: Set<() => void> = new Set(); + + /** + * delay by ms, careful when using on a loop if resolving on shutdown (default) + */ + sleep( + ms: number, + reason: string, + options?: { resolveOnShutdown?: boolean } + ): Promise { + log.info(`Sleeper: sleeping for ${ms}ms. Reason: ${reason}`); + const resolveOnShutdown = options?.resolveOnShutdown ?? true; + + return new Promise((resolve, reject) => { + let timeout: NodeJS.Timeout | undefined; + + const shutdownCallback = () => { + if (timeout) { + clearTimeout(timeout); + } + log.info( + `Sleeper: resolving sleep task on shutdown. Original reason: ${reason}` + ); + if (resolveOnShutdown) { + setTimeout(resolve, 0); + } else { + setTimeout(() => { + reject( + new Error( + `Sleeper: rejecting sleep task during shutdown. Original reason: ${reason}` + ) + ); + }, 0); + } + }; + + if (this.shuttingDown) { + log.info( + `Sleeper: sleep called when shutdown is in progress, scheduling immediate ${ + resolveOnShutdown ? 'resolution' : 'rejection' + }. Original reason: ${reason}` + ); + shutdownCallback(); + return; + } + + timeout = setTimeout(() => { + resolve(); + this.removeShutdownCallback(shutdownCallback); + }, ms); + + this.addShutdownCallback(shutdownCallback); + }); + } + + private addShutdownCallback(callback: () => void) { + this.shutdownCallbacks.add(callback); + } + + private removeShutdownCallback(callback: () => void) { + this.shutdownCallbacks.delete(callback); + } + + shutdown(): void { + if (this.shuttingDown) { + return; + } + log.info( + `Sleeper: shutting down, settling ${this.shutdownCallbacks.size} in-progress sleep calls` + ); + this.shuttingDown = true; + this.shutdownCallbacks.forEach(cb => { + try { + cb(); + } catch (error) { + log.error( + 'Sleeper: Error executing shutdown callback', + Errors.toLogFormat(error) + ); + } + }); + log.info('Sleeper: sleep tasks settled'); + } +} + +export const sleeper = new Sleeper();