Drain jobs cleanly on shutdown
This commit is contained in:
parent
a83a85d557
commit
b5849f872a
14 changed files with 301 additions and 30 deletions
|
@ -65,7 +65,10 @@ import { getContact, isIncoming } from './messages/helpers';
|
||||||
import { migrateMessageData } from './messages/migrateMessageData';
|
import { migrateMessageData } from './messages/migrateMessageData';
|
||||||
import { createBatcher } from './util/batcher';
|
import { createBatcher } from './util/batcher';
|
||||||
import { updateConversationsWithUuidLookup } from './updateConversationsWithUuidLookup';
|
import { updateConversationsWithUuidLookup } from './updateConversationsWithUuidLookup';
|
||||||
import { initializeAllJobQueues } from './jobs/initializeAllJobQueues';
|
import {
|
||||||
|
initializeAllJobQueues,
|
||||||
|
shutdownAllJobQueues,
|
||||||
|
} from './jobs/initializeAllJobQueues';
|
||||||
import { removeStorageKeyJobQueue } from './jobs/removeStorageKeyJobQueue';
|
import { removeStorageKeyJobQueue } from './jobs/removeStorageKeyJobQueue';
|
||||||
import { ourProfileKeyService } from './services/ourProfileKey';
|
import { ourProfileKeyService } from './services/ourProfileKey';
|
||||||
import { notificationService } from './services/notifications';
|
import { notificationService } from './services/notifications';
|
||||||
|
@ -168,6 +171,8 @@ import { flushAttachmentDownloadQueue } from './util/attachmentDownloadQueue';
|
||||||
import { StartupQueue } from './util/StartupQueue';
|
import { StartupQueue } from './util/StartupQueue';
|
||||||
import { showConfirmationDialog } from './util/showConfirmationDialog';
|
import { showConfirmationDialog } from './util/showConfirmationDialog';
|
||||||
import { onCallEventSync } from './util/onCallEventSync';
|
import { onCallEventSync } from './util/onCallEventSync';
|
||||||
|
import { sleeper } from './util/sleeper';
|
||||||
|
import { MINUTE } from './util/durations';
|
||||||
|
|
||||||
export function isOverHourIntoPast(timestamp: number): boolean {
|
export function isOverHourIntoPast(timestamp: number): boolean {
|
||||||
const HOUR = 1000 * 60 * 60;
|
const HOUR = 1000 * 60 * 60;
|
||||||
|
@ -730,6 +735,46 @@ export async function startApp(): Promise<void> {
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
sleeper.shutdown();
|
||||||
|
|
||||||
|
const shutdownQueues = async () => {
|
||||||
|
await Promise.allSettled([
|
||||||
|
StartupQueue.shutdown(),
|
||||||
|
shutdownAllJobQueues(),
|
||||||
|
]);
|
||||||
|
|
||||||
|
await Promise.allSettled(
|
||||||
|
window.ConversationController.getAll().map(async convo => {
|
||||||
|
try {
|
||||||
|
await convo.shutdownJobQueue();
|
||||||
|
} catch (err) {
|
||||||
|
log.error(
|
||||||
|
`background/shutdown: error waiting for conversation ${convo.idForLogging} job queue shutdown`,
|
||||||
|
Errors.toLogFormat(err)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
);
|
||||||
|
};
|
||||||
|
|
||||||
|
// wait for at most 2 minutes for startup queue and job queues to drain
|
||||||
|
let timeout: NodeJS.Timeout | undefined;
|
||||||
|
await Promise.race([
|
||||||
|
shutdownQueues(),
|
||||||
|
new Promise<void>((resolve, _) => {
|
||||||
|
timeout = setTimeout(() => {
|
||||||
|
log.warn(
|
||||||
|
'background/shutdown - timed out waiting for StartupQueue/JobQueues, continuing with shutdown'
|
||||||
|
);
|
||||||
|
timeout = undefined;
|
||||||
|
resolve();
|
||||||
|
}, 2 * MINUTE);
|
||||||
|
}),
|
||||||
|
]);
|
||||||
|
if (timeout) {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
}
|
||||||
|
|
||||||
log.info('background/shutdown: waiting for all batchers');
|
log.info('background/shutdown: waiting for all batchers');
|
||||||
|
|
||||||
// A number of still-to-queue database queries might be waiting inside batchers.
|
// A number of still-to-queue database queries might be waiting inside batchers.
|
||||||
|
|
|
@ -54,6 +54,8 @@ export abstract class JobQueue<T> {
|
||||||
|
|
||||||
private readonly logPrefix: string;
|
private readonly logPrefix: string;
|
||||||
|
|
||||||
|
private shuttingDown = false;
|
||||||
|
|
||||||
private readonly onCompleteCallbacks = new Map<
|
private readonly onCompleteCallbacks = new Map<
|
||||||
string,
|
string,
|
||||||
{
|
{
|
||||||
|
@ -66,6 +68,10 @@ export abstract class JobQueue<T> {
|
||||||
|
|
||||||
private started = false;
|
private started = false;
|
||||||
|
|
||||||
|
get isShuttingDown(): boolean {
|
||||||
|
return this.shuttingDown;
|
||||||
|
}
|
||||||
|
|
||||||
constructor(options: Readonly<JobQueueOptions>) {
|
constructor(options: Readonly<JobQueueOptions>) {
|
||||||
assertDev(
|
assertDev(
|
||||||
Number.isInteger(options.maxAttempts) && options.maxAttempts >= 1,
|
Number.isInteger(options.maxAttempts) && options.maxAttempts >= 1,
|
||||||
|
@ -115,6 +121,10 @@ export abstract class JobQueue<T> {
|
||||||
extra?: Readonly<{ attempt?: number; log?: LoggerType }>
|
extra?: Readonly<{ attempt?: number; log?: LoggerType }>
|
||||||
): Promise<void>;
|
): Promise<void>;
|
||||||
|
|
||||||
|
protected getQueues(): ReadonlySet<PQueue> {
|
||||||
|
return new Set([this.defaultInMemoryQueue]);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start streaming jobs from the store.
|
* Start streaming jobs from the store.
|
||||||
*/
|
*/
|
||||||
|
@ -130,6 +140,10 @@ export abstract class JobQueue<T> {
|
||||||
|
|
||||||
const stream = this.store.stream(this.queueType);
|
const stream = this.store.stream(this.queueType);
|
||||||
for await (const storedJob of stream) {
|
for await (const storedJob of stream) {
|
||||||
|
if (this.shuttingDown) {
|
||||||
|
log.info(`${this.logPrefix} is shutting down. Can't accept more work.`);
|
||||||
|
break;
|
||||||
|
}
|
||||||
void this.enqueueStoredJob(storedJob);
|
void this.enqueueStoredJob(storedJob);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -275,4 +289,14 @@ export abstract class JobQueue<T> {
|
||||||
reject(result.err);
|
reject(result.err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async shutdown(): Promise<void> {
|
||||||
|
const queues = this.getQueues();
|
||||||
|
log.info(
|
||||||
|
`${this.logPrefix} shutdown: stop accepting new work and drain ${queues.size} promise queues`
|
||||||
|
);
|
||||||
|
this.shuttingDown = true;
|
||||||
|
await Promise.all([...queues].map(q => q.onIdle()));
|
||||||
|
log.info(`${this.logPrefix} shutdown: complete`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,6 @@ import * as globalLogger from '../logging/log';
|
||||||
|
|
||||||
import * as durations from '../util/durations';
|
import * as durations from '../util/durations';
|
||||||
import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff';
|
import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff';
|
||||||
import { commonShouldJobContinue } from './helpers/commonShouldJobContinue';
|
|
||||||
import { InMemoryQueues } from './helpers/InMemoryQueues';
|
import { InMemoryQueues } from './helpers/InMemoryQueues';
|
||||||
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
|
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
|
||||||
import { JobQueue } from './JobQueue';
|
import { JobQueue } from './JobQueue';
|
||||||
|
@ -24,7 +23,6 @@ import { sendReceipts } from './helpers/sendReceipts';
|
||||||
|
|
||||||
import type { LoggerType } from '../types/Logging';
|
import type { LoggerType } from '../types/Logging';
|
||||||
import { ConversationVerificationState } from '../state/ducks/conversationsEnums';
|
import { ConversationVerificationState } from '../state/ducks/conversationsEnums';
|
||||||
import { sleep } from '../util/sleep';
|
|
||||||
import { MINUTE } from '../util/durations';
|
import { MINUTE } from '../util/durations';
|
||||||
import {
|
import {
|
||||||
OutgoingIdentityKeyError,
|
OutgoingIdentityKeyError,
|
||||||
|
@ -38,6 +36,8 @@ import type { Job } from './Job';
|
||||||
import type { ParsedJob } from './types';
|
import type { ParsedJob } from './types';
|
||||||
import type SendMessage from '../textsecure/SendMessage';
|
import type SendMessage from '../textsecure/SendMessage';
|
||||||
import type { UUIDStringType } from '../types/UUID';
|
import type { UUIDStringType } from '../types/UUID';
|
||||||
|
import { commonShouldJobContinue } from './helpers/commonShouldJobContinue';
|
||||||
|
import { sleeper } from '../util/sleeper';
|
||||||
import { receiptSchema, ReceiptType } from '../types/Receipt';
|
import { receiptSchema, ReceiptType } from '../types/Receipt';
|
||||||
|
|
||||||
// Note: generally, we only want to add to this list. If you do need to change one of
|
// Note: generally, we only want to add to this list. If you do need to change one of
|
||||||
|
@ -188,6 +188,10 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
|
||||||
}
|
}
|
||||||
>();
|
>();
|
||||||
|
|
||||||
|
override getQueues(): ReadonlySet<PQueue> {
|
||||||
|
return this.inMemoryQueues.allQueues;
|
||||||
|
}
|
||||||
|
|
||||||
public override async add(
|
public override async add(
|
||||||
data: Readonly<ConversationQueueJobData>,
|
data: Readonly<ConversationQueueJobData>,
|
||||||
insert?: (job: ParsedJob<ConversationQueueJobData>) => Promise<void>
|
insert?: (job: ParsedJob<ConversationQueueJobData>) => Promise<void>
|
||||||
|
@ -290,13 +294,21 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (window.Signal.challengeHandler?.isRegistered(conversationId)) {
|
if (window.Signal.challengeHandler?.isRegistered(conversationId)) {
|
||||||
|
if (this.isShuttingDown) {
|
||||||
|
throw new Error("Shutting down, can't wait for captcha challenge.");
|
||||||
|
}
|
||||||
log.info(
|
log.info(
|
||||||
'captcha challenge is pending for this conversation; waiting at most 5m...'
|
'captcha challenge is pending for this conversation; waiting at most 5m...'
|
||||||
);
|
);
|
||||||
// eslint-disable-next-line no-await-in-loop
|
// eslint-disable-next-line no-await-in-loop
|
||||||
await Promise.race([
|
await Promise.race([
|
||||||
this.startVerificationWaiter(conversation.id),
|
this.startVerificationWaiter(conversation.id),
|
||||||
sleep(5 * MINUTE),
|
// don't resolve on shutdown, otherwise we end up in an infinite loop
|
||||||
|
sleeper.sleep(
|
||||||
|
5 * MINUTE,
|
||||||
|
`conversationJobQueue: waiting for captcha: ${conversation.idForLogging()}`,
|
||||||
|
{ resolveOnShutdown: false }
|
||||||
|
),
|
||||||
]);
|
]);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -320,13 +332,22 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (this.isShuttingDown) {
|
||||||
|
throw new Error("Shutting down, can't wait for verification.");
|
||||||
|
}
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
'verification is pending for this conversation; waiting at most 5m...'
|
'verification is pending for this conversation; waiting at most 5m...'
|
||||||
);
|
);
|
||||||
// eslint-disable-next-line no-await-in-loop
|
// eslint-disable-next-line no-await-in-loop
|
||||||
await Promise.race([
|
await Promise.race([
|
||||||
this.startVerificationWaiter(conversation.id),
|
this.startVerificationWaiter(conversation.id),
|
||||||
sleep(5 * MINUTE),
|
// don't resolve on shutdown, otherwise we end up in an infinite loop
|
||||||
|
sleeper.sleep(
|
||||||
|
5 * MINUTE,
|
||||||
|
`conversationJobQueue: verification pending: ${conversation.idForLogging()}`,
|
||||||
|
{ resolveOnShutdown: false }
|
||||||
|
),
|
||||||
]);
|
]);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,4 +20,8 @@ export class InMemoryQueues {
|
||||||
this.queues.set(key, newQueue);
|
this.queues.set(key, newQueue);
|
||||||
return newQueue;
|
return newQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
get allQueues(): ReadonlySet<PQueue> {
|
||||||
|
return new Set(this.queues.values());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,9 +3,9 @@
|
||||||
|
|
||||||
import type { LoggerType } from '../../types/Logging';
|
import type { LoggerType } from '../../types/Logging';
|
||||||
import { waitForOnline } from '../../util/waitForOnline';
|
import { waitForOnline } from '../../util/waitForOnline';
|
||||||
import { sleep } from '../../util/sleep';
|
|
||||||
import { exponentialBackoffSleepTime } from '../../util/exponentialBackoff';
|
import { exponentialBackoffSleepTime } from '../../util/exponentialBackoff';
|
||||||
import { isDone as isDeviceLinked } from '../../util/registration';
|
import { isDone as isDeviceLinked } from '../../util/registration';
|
||||||
|
import { sleeper } from '../../util/sleeper';
|
||||||
|
|
||||||
export async function commonShouldJobContinue({
|
export async function commonShouldJobContinue({
|
||||||
attempt,
|
attempt,
|
||||||
|
@ -45,7 +45,10 @@ export async function commonShouldJobContinue({
|
||||||
|
|
||||||
const sleepTime = exponentialBackoffSleepTime(attempt);
|
const sleepTime = exponentialBackoffSleepTime(attempt);
|
||||||
log.info(`sleeping for ${sleepTime}`);
|
log.info(`sleeping for ${sleepTime}`);
|
||||||
await sleep(sleepTime);
|
await sleeper.sleep(
|
||||||
|
sleepTime,
|
||||||
|
`commonShouldJobContinue: attempt ${attempt}, skipWait ${skipWait}`
|
||||||
|
);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
// SPDX-License-Identifier: AGPL-3.0-only
|
// SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
import type { LoggerType } from '../../types/Logging';
|
import type { LoggerType } from '../../types/Logging';
|
||||||
import { sleep } from '../../util/sleep';
|
import { sleeper } from '../../util/sleeper';
|
||||||
import { findRetryAfterTimeFromError } from './findRetryAfterTimeFromError';
|
import { findRetryAfterTimeFromError } from './findRetryAfterTimeFromError';
|
||||||
|
|
||||||
export async function sleepForRateLimitRetryAfterTime({
|
export async function sleepForRateLimitRetryAfterTime({
|
||||||
|
@ -24,5 +24,9 @@ export async function sleepForRateLimitRetryAfterTime({
|
||||||
`Got a 413 or 429 response code. Sleeping for ${retryAfter} millisecond(s)`
|
`Got a 413 or 429 response code. Sleeping for ${retryAfter} millisecond(s)`
|
||||||
);
|
);
|
||||||
|
|
||||||
await sleep(retryAfter);
|
await sleeper.sleep(
|
||||||
|
retryAfter,
|
||||||
|
'sleepForRateLimitRetryAfterTime: Got a 413 or 429 response code',
|
||||||
|
{ resolveOnShutdown: false }
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,3 +37,15 @@ export function initializeAllJobQueues({
|
||||||
drop(removeStorageKeyJobQueue.streamJobs());
|
drop(removeStorageKeyJobQueue.streamJobs());
|
||||||
drop(reportSpamJobQueue.streamJobs());
|
drop(reportSpamJobQueue.streamJobs());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function shutdownAllJobQueues(): Promise<void> {
|
||||||
|
await Promise.allSettled([
|
||||||
|
conversationJobQueue.shutdown(),
|
||||||
|
singleProtoJobQueue.shutdown(),
|
||||||
|
readSyncJobQueue.shutdown(),
|
||||||
|
viewSyncJobQueue.shutdown(),
|
||||||
|
viewOnceOpenJobQueue.shutdown(),
|
||||||
|
removeStorageKeyJobQueue.shutdown(),
|
||||||
|
reportSpamJobQueue.shutdown(),
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
|
|
@ -8,13 +8,13 @@ import { waitForOnline } from '../util/waitForOnline';
|
||||||
import { isDone as isDeviceLinked } from '../util/registration';
|
import { isDone as isDeviceLinked } from '../util/registration';
|
||||||
import type { LoggerType } from '../types/Logging';
|
import type { LoggerType } from '../types/Logging';
|
||||||
import { map } from '../util/iterables';
|
import { map } from '../util/iterables';
|
||||||
import { sleep } from '../util/sleep';
|
|
||||||
|
|
||||||
import { JobQueue } from './JobQueue';
|
import { JobQueue } from './JobQueue';
|
||||||
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
|
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
|
||||||
import { parseIntWithFallback } from '../util/parseIntWithFallback';
|
import { parseIntWithFallback } from '../util/parseIntWithFallback';
|
||||||
import type { WebAPIType } from '../textsecure/WebAPI';
|
import type { WebAPIType } from '../textsecure/WebAPI';
|
||||||
import { HTTPError } from '../textsecure/Errors';
|
import { HTTPError } from '../textsecure/Errors';
|
||||||
|
import { sleeper } from '../util/sleeper';
|
||||||
|
|
||||||
const RETRY_WAIT_TIME = durations.MINUTE;
|
const RETRY_WAIT_TIME = durations.MINUTE;
|
||||||
const RETRYABLE_4XX_FAILURE_STATUSES = new Set([
|
const RETRYABLE_4XX_FAILURE_STATUSES = new Set([
|
||||||
|
@ -94,7 +94,10 @@ export class ReportSpamJobQueue extends JobQueue<ReportSpamJobData> {
|
||||||
log.info(
|
log.info(
|
||||||
`reportSpamJobQueue: server responded with ${code} status code. Sleeping before our next attempt`
|
`reportSpamJobQueue: server responded with ${code} status code. Sleeping before our next attempt`
|
||||||
);
|
);
|
||||||
await sleep(RETRY_WAIT_TIME);
|
await sleeper.sleep(
|
||||||
|
RETRY_WAIT_TIME,
|
||||||
|
`reportSpamJobQueue: server responded with ${code} status code`
|
||||||
|
);
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,6 +31,10 @@ const MAX_ATTEMPTS = exponentialBackoffMaxAttempts(MAX_RETRY_TIME);
|
||||||
export class SingleProtoJobQueue extends JobQueue<SingleProtoJobData> {
|
export class SingleProtoJobQueue extends JobQueue<SingleProtoJobData> {
|
||||||
private parallelQueue = new PQueue({ concurrency: MAX_PARALLEL_JOBS });
|
private parallelQueue = new PQueue({ concurrency: MAX_PARALLEL_JOBS });
|
||||||
|
|
||||||
|
protected override getQueues(): ReadonlySet<PQueue> {
|
||||||
|
return new Set([this.parallelQueue]);
|
||||||
|
}
|
||||||
|
|
||||||
protected override getInMemoryQueue(
|
protected override getInMemoryQueue(
|
||||||
_parsedJob: ParsedJob<SingleProtoJobData>
|
_parsedJob: ParsedJob<SingleProtoJobData>
|
||||||
): PQueue {
|
): PQueue {
|
||||||
|
|
|
@ -113,14 +113,15 @@ export class ReadSyncs extends Collection {
|
||||||
// TODO DESKTOP-1509: use MessageUpdater.markRead once this is TS
|
// TODO DESKTOP-1509: use MessageUpdater.markRead once this is TS
|
||||||
message.markRead(readAt, { skipSave: true });
|
message.markRead(readAt, { skipSave: true });
|
||||||
|
|
||||||
const updateConversation = () => {
|
const updateConversation = async () => {
|
||||||
// onReadMessage may result in messages older than this one being
|
// onReadMessage may result in messages older than this one being
|
||||||
// marked read. We want those messages to have the same expire timer
|
// marked read. We want those messages to have the same expire timer
|
||||||
// start time as this one, so we pass the readAt value through.
|
// start time as this one, so we pass the readAt value through.
|
||||||
void message.getConversation()?.onReadMessage(message, readAt);
|
void message.getConversation()?.onReadMessage(message, readAt);
|
||||||
};
|
};
|
||||||
|
|
||||||
if (StartupQueue.isReady()) {
|
// only available during initialization
|
||||||
|
if (StartupQueue.isAvailable()) {
|
||||||
const conversation = message.getConversation();
|
const conversation = message.getConversation();
|
||||||
if (conversation) {
|
if (conversation) {
|
||||||
StartupQueue.add(
|
StartupQueue.add(
|
||||||
|
@ -130,7 +131,9 @@ export class ReadSyncs extends Collection {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
updateConversation();
|
// not awaiting since we don't want to block work happening in the
|
||||||
|
// eventHandlerQueue
|
||||||
|
void updateConversation();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
|
|
|
@ -272,6 +272,8 @@ export class ConversationModel extends window.Backbone
|
||||||
|
|
||||||
private privVerifiedEnum?: typeof window.textsecure.storage.protocol.VerifiedStatus;
|
private privVerifiedEnum?: typeof window.textsecure.storage.protocol.VerifiedStatus;
|
||||||
|
|
||||||
|
private isShuttingDown = false;
|
||||||
|
|
||||||
override defaults(): Partial<ConversationAttributesType> {
|
override defaults(): Partial<ConversationAttributesType> {
|
||||||
return {
|
return {
|
||||||
unreadCount: 0,
|
unreadCount: 0,
|
||||||
|
@ -3580,14 +3582,19 @@ export class ConversationModel extends window.Backbone
|
||||||
return validateConversation(attributes);
|
return validateConversation(attributes);
|
||||||
}
|
}
|
||||||
|
|
||||||
queueJob<T>(
|
async queueJob<T>(
|
||||||
name: string,
|
name: string,
|
||||||
callback: (abortSignal: AbortSignal) => Promise<T>
|
callback: (abortSignal: AbortSignal) => Promise<T>
|
||||||
): Promise<T> {
|
): Promise<T> {
|
||||||
this.jobQueue = this.jobQueue || new PQueue({ concurrency: 1 });
|
|
||||||
|
|
||||||
const logId = `conversation.queueJob(${this.idForLogging()}, ${name})`;
|
const logId = `conversation.queueJob(${this.idForLogging()}, ${name})`;
|
||||||
|
|
||||||
|
if (this.isShuttingDown) {
|
||||||
|
log.warn(`${logId}: shutting down, can't accept more work`);
|
||||||
|
throw new Error(`${logId}: shutting down, can't accept more work`);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.jobQueue = this.jobQueue || new PQueue({ concurrency: 1 });
|
||||||
|
|
||||||
const taskWithTimeout = createTaskWithTimeout(callback, logId);
|
const taskWithTimeout = createTaskWithTimeout(callback, logId);
|
||||||
|
|
||||||
const abortController = new AbortController();
|
const abortController = new AbortController();
|
||||||
|
@ -5683,6 +5690,30 @@ export class ConversationModel extends window.Backbone
|
||||||
|
|
||||||
return this.get('storySendMode') ?? StorySendMode.IfActive;
|
return this.get('storySendMode') ?? StorySendMode.IfActive;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async shutdownJobQueue(): Promise<void> {
|
||||||
|
log.info(`conversation ${this.idForLogging()} jobQueue shutdown start`);
|
||||||
|
|
||||||
|
if (!this.jobQueue) {
|
||||||
|
log.info(`conversation ${this.idForLogging()} no jobQueue to shutdown`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the queue takes more than 10 seconds to get to idle, we force it by setting
|
||||||
|
// isShuttingDown = true which will reject incoming requests.
|
||||||
|
const to = setTimeout(() => {
|
||||||
|
log.warn(
|
||||||
|
`conversation ${this.idForLogging()} jobQueue stop accepting new work`
|
||||||
|
);
|
||||||
|
this.isShuttingDown = true;
|
||||||
|
}, 10 * SECOND);
|
||||||
|
|
||||||
|
await this.jobQueue.onIdle();
|
||||||
|
this.isShuttingDown = true;
|
||||||
|
clearTimeout(to);
|
||||||
|
|
||||||
|
log.info(`conversation ${this.idForLogging()} jobQueue shutdown complete`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
window.Whisper.Conversation = ConversationModel;
|
window.Whisper.Conversation = ConversationModel;
|
||||||
|
|
|
@ -152,7 +152,6 @@ describe('pnp/send gv2 invite', function needsName() {
|
||||||
let group: Group;
|
let group: Group;
|
||||||
{
|
{
|
||||||
state = await phone.waitForStorageState({ after: state });
|
state = await phone.waitForStorageState({ after: state });
|
||||||
|
|
||||||
const groups = await phone.getAllGroups(state);
|
const groups = await phone.getAllGroups(state);
|
||||||
assert.strictEqual(groups.length, 1);
|
assert.strictEqual(groups.length, 1);
|
||||||
|
|
||||||
|
|
|
@ -1,20 +1,26 @@
|
||||||
// Copyright 2021 Signal Messenger, LLC
|
// Copyright 2021 Signal Messenger, LLC
|
||||||
// SPDX-License-Identifier: AGPL-3.0-only
|
// SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
import PQueue from 'p-queue';
|
||||||
import * as Errors from '../types/errors';
|
import * as Errors from '../types/errors';
|
||||||
import * as log from '../logging/log';
|
import * as log from '../logging/log';
|
||||||
|
|
||||||
type EntryType = Readonly<{
|
type EntryType = Readonly<{
|
||||||
value: number;
|
value: number;
|
||||||
callback(): void;
|
callback(): Promise<void>;
|
||||||
}>;
|
}>;
|
||||||
|
|
||||||
let startupProcessingQueue: StartupQueue | undefined;
|
let startupProcessingQueue: StartupQueue | undefined;
|
||||||
|
|
||||||
export class StartupQueue {
|
export class StartupQueue {
|
||||||
private readonly map = new Map<string, EntryType>();
|
private readonly map = new Map<string, EntryType>();
|
||||||
|
private readonly running: PQueue = new PQueue({
|
||||||
|
// mostly io-bound work that is not very parallelizable
|
||||||
|
// small number should be sufficient
|
||||||
|
concurrency: 5,
|
||||||
|
});
|
||||||
|
|
||||||
public add(id: string, value: number, f: () => void): void {
|
public add(id: string, value: number, f: () => Promise<void>): void {
|
||||||
const existing = this.map.get(id);
|
const existing = this.map.get(id);
|
||||||
if (existing && existing.value >= value) {
|
if (existing && existing.value >= value) {
|
||||||
return;
|
return;
|
||||||
|
@ -30,26 +36,36 @@ export class StartupQueue {
|
||||||
this.map.clear();
|
this.map.clear();
|
||||||
|
|
||||||
for (const { callback } of values) {
|
for (const { callback } of values) {
|
||||||
try {
|
void this.running.add(async () => {
|
||||||
callback();
|
try {
|
||||||
} catch (error) {
|
return callback();
|
||||||
log.error(
|
} catch (error) {
|
||||||
'StartupQueue: Failed to process item due to error',
|
log.error(
|
||||||
Errors.toLogFormat(error)
|
'StartupQueue: Failed to process item due to error',
|
||||||
);
|
Errors.toLogFormat(error)
|
||||||
}
|
);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private shutdown(): Promise<void> {
|
||||||
|
log.info(
|
||||||
|
`StartupQueue: Waiting for ${this.running.pending} tasks to drain`
|
||||||
|
);
|
||||||
|
return this.running.onIdle();
|
||||||
|
}
|
||||||
|
|
||||||
static initialize(): void {
|
static initialize(): void {
|
||||||
startupProcessingQueue = new StartupQueue();
|
startupProcessingQueue = new StartupQueue();
|
||||||
}
|
}
|
||||||
|
|
||||||
static isReady(): boolean {
|
static isAvailable(): boolean {
|
||||||
return Boolean(startupProcessingQueue);
|
return Boolean(startupProcessingQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
static add(id: string, value: number, f: () => void): void {
|
static add(id: string, value: number, f: () => Promise<void>): void {
|
||||||
startupProcessingQueue?.add(id, value, f);
|
startupProcessingQueue?.add(id, value, f);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -57,4 +73,8 @@ export class StartupQueue {
|
||||||
startupProcessingQueue?.flush();
|
startupProcessingQueue?.flush();
|
||||||
startupProcessingQueue = undefined;
|
startupProcessingQueue = undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static async shutdown(): Promise<void> {
|
||||||
|
await startupProcessingQueue?.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
98
ts/util/sleeper.ts
Normal file
98
ts/util/sleeper.ts
Normal file
|
@ -0,0 +1,98 @@
|
||||||
|
// Copyright 2023 Signal Messenger, LLC
|
||||||
|
// SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
import * as log from '../logging/log';
|
||||||
|
import * as Errors from '../types/errors';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provides a way to delay tasks
|
||||||
|
* but also a way to force sleeping tasks to immediately resolve/reject on shutdown
|
||||||
|
*/
|
||||||
|
export class Sleeper {
|
||||||
|
private shuttingDown = false;
|
||||||
|
private shutdownCallbacks: Set<() => void> = new Set();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* delay by ms, careful when using on a loop if resolving on shutdown (default)
|
||||||
|
*/
|
||||||
|
sleep(
|
||||||
|
ms: number,
|
||||||
|
reason: string,
|
||||||
|
options?: { resolveOnShutdown?: boolean }
|
||||||
|
): Promise<void> {
|
||||||
|
log.info(`Sleeper: sleeping for ${ms}ms. Reason: ${reason}`);
|
||||||
|
const resolveOnShutdown = options?.resolveOnShutdown ?? true;
|
||||||
|
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
let timeout: NodeJS.Timeout | undefined;
|
||||||
|
|
||||||
|
const shutdownCallback = () => {
|
||||||
|
if (timeout) {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
}
|
||||||
|
log.info(
|
||||||
|
`Sleeper: resolving sleep task on shutdown. Original reason: ${reason}`
|
||||||
|
);
|
||||||
|
if (resolveOnShutdown) {
|
||||||
|
setTimeout(resolve, 0);
|
||||||
|
} else {
|
||||||
|
setTimeout(() => {
|
||||||
|
reject(
|
||||||
|
new Error(
|
||||||
|
`Sleeper: rejecting sleep task during shutdown. Original reason: ${reason}`
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}, 0);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if (this.shuttingDown) {
|
||||||
|
log.info(
|
||||||
|
`Sleeper: sleep called when shutdown is in progress, scheduling immediate ${
|
||||||
|
resolveOnShutdown ? 'resolution' : 'rejection'
|
||||||
|
}. Original reason: ${reason}`
|
||||||
|
);
|
||||||
|
shutdownCallback();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
timeout = setTimeout(() => {
|
||||||
|
resolve();
|
||||||
|
this.removeShutdownCallback(shutdownCallback);
|
||||||
|
}, ms);
|
||||||
|
|
||||||
|
this.addShutdownCallback(shutdownCallback);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private addShutdownCallback(callback: () => void) {
|
||||||
|
this.shutdownCallbacks.add(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
private removeShutdownCallback(callback: () => void) {
|
||||||
|
this.shutdownCallbacks.delete(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
shutdown(): void {
|
||||||
|
if (this.shuttingDown) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
log.info(
|
||||||
|
`Sleeper: shutting down, settling ${this.shutdownCallbacks.size} in-progress sleep calls`
|
||||||
|
);
|
||||||
|
this.shuttingDown = true;
|
||||||
|
this.shutdownCallbacks.forEach(cb => {
|
||||||
|
try {
|
||||||
|
cb();
|
||||||
|
} catch (error) {
|
||||||
|
log.error(
|
||||||
|
'Sleeper: Error executing shutdown callback',
|
||||||
|
Errors.toLogFormat(error)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
log.info('Sleeper: sleep tasks settled');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const sleeper = new Sleeper();
|
Loading…
Reference in a new issue