Reduce timeout of some long running tasks

This commit is contained in:
Fedor Indutny 2022-10-25 17:03:05 -07:00 committed by GitHub
parent 08f2a966a1
commit 3beccbfd31
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 134 additions and 67 deletions

View file

@ -17,6 +17,7 @@ import { HTTPError } from './textsecure/Errors';
import createTaskWithTimeout, {
suspendTasksWithTimeout,
resumeTasksWithTimeout,
reportLongRunningTasks,
} from './textsecure/TaskWithTimeout';
import type {
MessageAttributesType,
@ -995,6 +996,10 @@ export async function startApp(): Promise<void> {
}
}, FIVE_MINUTES);
setInterval(() => {
reportLongRunningTasks();
}, FIVE_MINUTES);
let mainWindowStats = {
isMaximized: false,
isFullScreen: false,

View file

@ -174,6 +174,10 @@ export type MessageReceiverOptions = {
serverTrustRoot: string;
};
const TASK_WITH_TIMEOUT_OPTIONS = {
timeout: 2 * durations.MINUTE,
};
const LOG_UNEXPECTED_URGENT_VALUES = false;
const MUST_BE_URGENT_TYPES: Array<SendTypesType> = [
'message',
@ -331,9 +335,13 @@ export default class MessageReceiver
if (request.verb === 'PUT' && request.path === '/api/v1/queue/empty') {
this.incomingQueue.add(
createTaskWithTimeout(async () => {
this.onEmpty();
}, 'incomingQueue/onEmpty')
createTaskWithTimeout(
async () => {
this.onEmpty();
},
'incomingQueue/onEmpty',
TASK_WITH_TIMEOUT_OPTIONS
)
);
}
return;
@ -407,12 +415,16 @@ export default class MessageReceiver
} catch (e) {
request.respond(500, 'Bad encrypted websocket message');
log.error('Error handling incoming message:', Errors.toLogFormat(e));
await this.dispatchAndWait(new ErrorEvent(e));
await this.dispatchAndWait('websocket request', new ErrorEvent(e));
}
};
this.incomingQueue.add(
createTaskWithTimeout(job, 'incomingQueue/websocket')
createTaskWithTimeout(
job,
'incomingQueue/websocket',
TASK_WITH_TIMEOUT_OPTIONS
)
);
}
@ -421,7 +433,8 @@ export default class MessageReceiver
this.incomingQueue.add(
createTaskWithTimeout(
async () => this.queueAllCached(),
'incomingQueue/queueAllCached'
'incomingQueue/queueAllCached',
TASK_WITH_TIMEOUT_OPTIONS
)
);
@ -457,7 +470,11 @@ export default class MessageReceiver
);
return this.incomingQueue.add(
createTaskWithTimeout(waitForIncomingQueue, 'drain/waitForIncoming')
createTaskWithTimeout(
waitForIncomingQueue,
'drain/waitForIncoming',
TASK_WITH_TIMEOUT_OPTIONS
)
);
}
@ -605,11 +622,12 @@ export default class MessageReceiver
// Private
//
private async dispatchAndWait(event: Event): Promise<void> {
private async dispatchAndWait(id: string, event: Event): Promise<void> {
this.appQueue.add(
createTaskWithTimeout(
async () => Promise.all(this.dispatchEvent(event)),
'dispatchEvent'
`dispatchEvent(${event.type}, ${id})`,
TASK_WITH_TIMEOUT_OPTIONS
)
);
}
@ -658,7 +676,9 @@ export default class MessageReceiver
: this.decryptedQueue;
try {
return await queue.add(createTaskWithTimeout(task, id));
return await queue.add(
createTaskWithTimeout(task, id, TASK_WITH_TIMEOUT_OPTIONS)
);
} finally {
this.updateProgress(this.count);
}
@ -684,7 +704,9 @@ export default class MessageReceiver
);
// We don't await here because we don't want this to gate future message processing
this.appQueue.add(createTaskWithTimeout(emitEmpty, 'emitEmpty'));
this.appQueue.add(
createTaskWithTimeout(emitEmpty, 'emitEmpty', TASK_WITH_TIMEOUT_OPTIONS)
);
};
const waitForEncryptedQueue = async () => {
@ -710,7 +732,11 @@ export default class MessageReceiver
const waitForCacheAddBatcher = async () => {
await this.decryptAndCacheBatcher.onIdle();
this.incomingQueue.add(
createTaskWithTimeout(waitForIncomingQueue, 'onEmpty/waitForIncoming')
createTaskWithTimeout(
waitForIncomingQueue,
'onEmpty/waitForIncoming',
TASK_WITH_TIMEOUT_OPTIONS
)
);
};
@ -810,7 +836,7 @@ export default class MessageReceiver
async () => {
this.queueDecryptedEnvelope(decryptedEnvelope, payloadPlaintext);
},
'queueDecryptedEnvelope',
`queueDecryptedEnvelope(${getEnvelopeId(decryptedEnvelope)})`,
TaskType.Encrypted
);
} else {
@ -850,7 +876,8 @@ export default class MessageReceiver
this.incomingQueue.add(
createTaskWithTimeout(
async () => this.queueAllCached(),
'queueAllCached'
'queueAllCached',
TASK_WITH_TIMEOUT_OPTIONS
)
);
}, RETRY_TIMEOUT);
@ -1074,13 +1101,14 @@ export default class MessageReceiver
const task = this.handleDecryptedEnvelope.bind(this, envelope, plaintext);
const taskWithTimeout = createTaskWithTimeout(
task,
`queueDecryptedEnvelope ${id}`
`queueDecryptedEnvelope ${id}`,
TASK_WITH_TIMEOUT_OPTIONS
);
try {
await this.addToQueue(
taskWithTimeout,
'dispatchEvent',
`handleDecryptedEnvelope(${id})`,
TaskType.Decrypted
);
} catch (error) {
@ -1125,7 +1153,7 @@ export default class MessageReceiver
this.addToQueue(
async () => this.dispatchEvent(new EnvelopeEvent(unsealedEnvelope)),
'dispatchEvent',
`dispatchEvent(EnvelopeEvent(${logId}))`,
TaskType.Decrypted
);
@ -1398,12 +1426,14 @@ export default class MessageReceiver
if (syncMessage?.pniIdentity) {
inProgressMessageType = 'pni identity';
await this.handlePNIIdentity(envelope, syncMessage.pniIdentity);
this.removeFromCache(envelope);
return { plaintext: undefined, envelope };
}
if (syncMessage?.pniChangeNumber) {
inProgressMessageType = 'pni change number';
await this.handlePNIChangeNumber(envelope, syncMessage.pniChangeNumber);
this.removeFromCache(envelope);
return { plaintext: undefined, envelope };
}
@ -1422,6 +1452,7 @@ export default class MessageReceiver
(envelope.sourceUuid && this.isUuidBlocked(envelope.sourceUuid)))
) {
log.info(`${logId}: Dropping non-GV2 message from blocked sender`);
this.removeFromCache(envelope);
return { plaintext: undefined, envelope };
}
@ -1492,6 +1523,7 @@ export default class MessageReceiver
logUnexpectedUrgentValue(envelope, 'deliveryReceipt');
await this.dispatchAndWait(
getEnvelopeId(envelope),
new DeliveryEvent(
{
timestamp: envelope.timestamp,
@ -1822,6 +1854,8 @@ export default class MessageReceiver
throw error;
}
const envelopeId = getEnvelopeId(envelope);
if (uuid && deviceId) {
const { cipherTextBytes, cipherTextType } = envelope;
const event = new DecryptionErrorEvent(
@ -1842,11 +1876,10 @@ export default class MessageReceiver
// Avoid deadlocks by scheduling processing on decrypted queue
this.addToQueue(
async () => this.dispatchEvent(event),
'decrypted/dispatchEvent',
`decrypted/dispatchEvent/DecryptionErrorEvent(${envelopeId})`,
TaskType.Decrypted
);
} else {
const envelopeId = getEnvelopeId(envelope);
this.removeFromCache(envelope);
log.error(
`MessageReceiver.decrypt: Envelope ${envelopeId} missing uuid or deviceId`
@ -1939,7 +1972,7 @@ export default class MessageReceiver
},
this.removeFromCache.bind(this, envelope)
);
return this.dispatchAndWait(ev);
return this.dispatchAndWait(getEnvelopeId(envelope), ev);
}
private async handleStoryMessage(
@ -2036,7 +2069,7 @@ export default class MessageReceiver
},
this.removeFromCache.bind(this, envelope)
);
this.dispatchAndWait(ev);
this.dispatchAndWait(logId, ev);
return;
}
@ -2094,7 +2127,7 @@ export default class MessageReceiver
},
this.removeFromCache.bind(this, envelope)
);
this.dispatchAndWait(ev);
this.dispatchAndWait(logId, ev);
});
return;
}
@ -2117,7 +2150,7 @@ export default class MessageReceiver
},
this.removeFromCache.bind(this, envelope)
);
return this.dispatchAndWait(ev);
return this.dispatchAndWait(logId, ev);
}
private async handleDataMessage(
@ -2148,7 +2181,7 @@ export default class MessageReceiver
return undefined;
}
await this.checkGroupV1Data(msg);
this.checkGroupV1Data(msg);
if (msg.flags && msg.flags & Proto.DataMessage.Flags.END_SESSION) {
p = this.handleEndSession(envelope, new UUID(destination));
@ -2170,7 +2203,7 @@ export default class MessageReceiver
},
this.removeFromCache.bind(this, envelope)
);
return this.dispatchAndWait(ev);
return this.dispatchAndWait(logId, ev);
}
await p;
@ -2236,7 +2269,7 @@ export default class MessageReceiver
},
this.removeFromCache.bind(this, envelope)
);
return this.dispatchAndWait(ev);
return this.dispatchAndWait(logId, ev);
}
private async maybeUpdateTimestamp(
@ -2305,10 +2338,7 @@ export default class MessageReceiver
content.decryptionErrorMessage &&
Bytes.isNotEmpty(content.decryptionErrorMessage)
) {
await this.handleDecryptionError(
envelope,
content.decryptionErrorMessage
);
this.handleDecryptionError(envelope, content.decryptionErrorMessage);
return;
}
if (content.syncMessage) {
@ -2323,7 +2353,7 @@ export default class MessageReceiver
return;
}
if (content.nullMessage) {
await this.handleNullMessage(envelope);
this.handleNullMessage(envelope);
return;
}
if (content.callingMessage) {
@ -2335,7 +2365,7 @@ export default class MessageReceiver
return;
}
if (content.typingMessage) {
await this.handleTypingMessage(envelope, content.typingMessage);
this.handleTypingMessage(envelope, content.typingMessage);
return;
}
@ -2351,10 +2381,10 @@ export default class MessageReceiver
}
}
private async handleDecryptionError(
private handleDecryptionError(
envelope: UnsealedEnvelope,
decryptionError: Uint8Array
) {
): void {
const logId = getEnvelopeId(envelope);
log.info(`handleDecryptionError: ${logId}`);
@ -2381,7 +2411,7 @@ export default class MessageReceiver
},
() => this.removeFromCache(envelope)
);
await this.dispatchEvent(event);
this.dispatchEvent(event);
}
private async handleSenderKeyDistributionMessage(
@ -2506,6 +2536,8 @@ export default class MessageReceiver
logUnexpectedUrgentValue(envelope, type);
const logId = getEnvelopeId(envelope);
await Promise.all(
receiptMessage.timestamp.map(async rawTimestamp => {
const ev = new EventClass(
@ -2519,15 +2551,15 @@ export default class MessageReceiver
},
this.removeFromCache.bind(this, envelope)
);
await this.dispatchAndWait(ev);
await this.dispatchAndWait(logId, ev);
})
);
}
private async handleTypingMessage(
private handleTypingMessage(
envelope: UnsealedEnvelope,
typingMessage: Proto.ITypingMessage
): Promise<void> {
): void {
this.removeFromCache(envelope);
logUnexpectedUrgentValue(envelope, 'typing');
@ -2564,7 +2596,7 @@ export default class MessageReceiver
}
}
await this.dispatchEvent(
this.dispatchEvent(
new TypingEvent({
sender: envelope.source,
senderUuid: envelope.sourceUuid,
@ -2640,9 +2672,7 @@ export default class MessageReceiver
return Bytes.toBase64(data.id);
}
private async checkGroupV1Data(
message: Readonly<Proto.IDataMessage>
): Promise<void> {
private checkGroupV1Data(message: Readonly<Proto.IDataMessage>): void {
const { group } = message;
if (!group) {
@ -2744,7 +2774,8 @@ export default class MessageReceiver
},
this.removeFromCache.bind(this, envelope)
);
return this.dispatchAndWait(ev);
const logId = getEnvelopeId(envelope);
return this.dispatchAndWait(logId, ev);
}
if (sentMessage.storyMessage) {
@ -2767,7 +2798,7 @@ export default class MessageReceiver
return;
}
await this.checkGroupV1Data(sentMessage.message);
this.checkGroupV1Data(sentMessage.message);
strictAssert(sentMessage.timestamp, 'sent message without timestamp');
@ -2847,7 +2878,8 @@ export default class MessageReceiver
envelope: ProcessedEnvelope,
configuration: Proto.SyncMessage.IConfiguration
): Promise<void> {
log.info('got configuration sync message');
const logId = getEnvelopeId(envelope);
log.info('got configuration sync message', logId);
logUnexpectedUrgentValue(envelope, 'configurationSync');
@ -2855,14 +2887,15 @@ export default class MessageReceiver
configuration,
this.removeFromCache.bind(this, envelope)
);
return this.dispatchAndWait(ev);
return this.dispatchAndWait(logId, ev);
}
private async handleViewOnceOpen(
envelope: ProcessedEnvelope,
sync: Proto.SyncMessage.IViewOnceOpen
): Promise<void> {
log.info('got view once open sync message');
const logId = getEnvelopeId(envelope);
log.info('got view once open sync message', logId);
logUnexpectedUrgentValue(envelope, 'viewOnceSync');
@ -2877,14 +2910,15 @@ export default class MessageReceiver
this.removeFromCache.bind(this, envelope)
);
return this.dispatchAndWait(ev);
return this.dispatchAndWait(logId, ev);
}
private async handleMessageRequestResponse(
envelope: ProcessedEnvelope,
sync: Proto.SyncMessage.IMessageRequestResponse
): Promise<void> {
log.info('got message request response sync message');
const logId = getEnvelopeId(envelope);
log.info('got message request response sync message', logId);
logUnexpectedUrgentValue(envelope, 'messageRequestSync');
@ -2921,14 +2955,15 @@ export default class MessageReceiver
this.removeFromCache.bind(this, envelope)
);
return this.dispatchAndWait(ev);
return this.dispatchAndWait(logId, ev);
}
private async handleFetchLatest(
envelope: ProcessedEnvelope,
sync: Proto.SyncMessage.IFetchLatest
): Promise<void> {
log.info('got fetch latest sync message');
const logId = getEnvelopeId(envelope);
log.info('got fetch latest sync message', logId);
logUnexpectedUrgentValue(envelope, 'fetchLatestManifestSync');
@ -2937,14 +2972,15 @@ export default class MessageReceiver
this.removeFromCache.bind(this, envelope)
);
return this.dispatchAndWait(ev);
return this.dispatchAndWait(logId, ev);
}
private async handleKeys(
envelope: ProcessedEnvelope,
sync: Proto.SyncMessage.IKeys
): Promise<void> {
log.info('got keys sync message');
const logId = getEnvelopeId(envelope);
log.info('got keys sync message', logId);
logUnexpectedUrgentValue(envelope, 'keySync');
@ -2957,7 +2993,7 @@ export default class MessageReceiver
this.removeFromCache.bind(this, envelope)
);
return this.dispatchAndWait(ev);
return this.dispatchAndWait(logId, ev);
}
// Runs on TaskType.Encrypted queue
@ -3019,7 +3055,8 @@ export default class MessageReceiver
operations: Array<Proto.SyncMessage.IStickerPackOperation>
): Promise<void> {
const ENUM = Proto.SyncMessage.StickerPackOperation.Type;
log.info('got sticker pack operation sync message');
const logId = getEnvelopeId(envelope);
log.info('got sticker pack operation sync message', logId);
logUnexpectedUrgentValue(envelope, 'stickerPackSync');
const stickerPacks = operations.map(operation => ({
@ -3034,14 +3071,15 @@ export default class MessageReceiver
this.removeFromCache.bind(this, envelope)
);
return this.dispatchAndWait(ev);
return this.dispatchAndWait(logId, ev);
}
private async handleRead(
envelope: ProcessedEnvelope,
read: Array<Proto.SyncMessage.IRead>
): Promise<void> {
log.info('MessageReceiver.handleRead', getEnvelopeId(envelope));
const logId = getEnvelopeId(envelope);
log.info('MessageReceiver.handleRead', logId);
logUnexpectedUrgentValue(envelope, 'readSync');
@ -3058,7 +3096,7 @@ export default class MessageReceiver
},
this.removeFromCache.bind(this, envelope)
);
results.push(this.dispatchAndWait(ev));
results.push(this.dispatchAndWait(logId, ev));
}
await Promise.all(results);
}
@ -3067,7 +3105,8 @@ export default class MessageReceiver
envelope: ProcessedEnvelope,
viewed: ReadonlyArray<Proto.SyncMessage.IViewed>
): Promise<void> {
log.info('MessageReceiver.handleViewed', getEnvelopeId(envelope));
const logId = getEnvelopeId(envelope);
log.info('MessageReceiver.handleViewed', logId);
logUnexpectedUrgentValue(envelope, 'viewSync');
@ -3084,7 +3123,7 @@ export default class MessageReceiver
},
this.removeFromCache.bind(this, envelope)
);
await this.dispatchAndWait(ev);
await this.dispatchAndWait(logId, ev);
})
);
}
@ -3093,7 +3132,8 @@ export default class MessageReceiver
envelope: ProcessedEnvelope,
contacts: Proto.SyncMessage.IContacts
): Promise<void> {
log.info(`MessageReceiver: handleContacts ${getEnvelopeId(envelope)}`);
const logId = getEnvelopeId(envelope);
log.info(`MessageReceiver: handleContacts ${logId}`);
const { blob } = contacts;
if (!blob) {
throw new Error('MessageReceiver.handleContacts: blob field was missing');
@ -3112,7 +3152,7 @@ export default class MessageReceiver
envelope.receivedAtCounter,
envelope.timestamp
);
await this.dispatchAndWait(contactSync);
await this.dispatchAndWait(logId, contactSync);
log.info('handleContacts: finished');
}
@ -3121,8 +3161,9 @@ export default class MessageReceiver
envelope: ProcessedEnvelope,
groups: Proto.SyncMessage.IGroups
): Promise<void> {
const logId = getEnvelopeId(envelope);
log.info('group sync');
log.info(`MessageReceiver: handleGroups ${getEnvelopeId(envelope)}`);
log.info(`MessageReceiver: handleGroups ${logId}`);
const { blob } = groups;
this.removeFromCache(envelope);
@ -3157,7 +3198,7 @@ export default class MessageReceiver
},
envelope.receivedAtCounter
);
const promise = this.dispatchAndWait(ev).catch(e => {
const promise = this.dispatchAndWait(logId, ev).catch(e => {
log.error('error processing group', e);
});
groupDetails = groupBuffer.next();
@ -3167,7 +3208,7 @@ export default class MessageReceiver
await Promise.all(promises);
const ev = new GroupSyncEvent();
return this.dispatchAndWait(ev);
return this.dispatchAndWait(logId, ev);
}
private async handleBlocked(

View file

@ -1,13 +1,15 @@
// Copyright 2020-2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import * as durations from '../util/durations';
import { MINUTE } from '../util/durations';
import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary';
import { explodePromise } from '../util/explodePromise';
import { toLogFormat } from '../types/errors';
import * as log from '../logging/log';
type TaskType = {
id: string;
startedAt: number | undefined;
suspend(): void;
resume(): void;
};
@ -31,12 +33,28 @@ export function resumeTasksWithTimeout(): void {
}
}
export function reportLongRunningTasks(): void {
const now = Date.now();
for (const task of tasks) {
if (task.startedAt === undefined) {
continue;
}
const duration = Math.max(0, now - task.startedAt);
if (duration > MINUTE) {
log.warn(
`TaskWithTimeout: ${task.id} has been running for ${duration}ms`
);
}
}
}
export default function createTaskWithTimeout<T, Args extends Array<unknown>>(
task: (...args: Args) => Promise<T>,
id: string,
options: { timeout?: number } = {}
): (...args: Args) => Promise<T> {
const timeout = options.timeout || 30 * durations.MINUTE;
const timeout = options.timeout || 30 * MINUTE;
const timeoutError = new Error(`${id || ''} task did not complete in time.`);
@ -54,6 +72,7 @@ export default function createTaskWithTimeout<T, Args extends Array<unknown>>(
return;
}
entry.startedAt = Date.now();
timer = setTimeout(() => {
if (complete) {
return;
@ -72,6 +91,8 @@ export default function createTaskWithTimeout<T, Args extends Array<unknown>>(
};
const entry: TaskType = {
id,
startedAt: undefined,
suspend: stopTimer,
resume: startTimer,
};