From af387095be91234f060f5c00a18875e931b76c5a Mon Sep 17 00:00:00 2001 From: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com> Date: Mon, 27 Sep 2021 11:22:46 -0700 Subject: [PATCH] API to suspend/resume tasks with timeout --- ts/background.ts | 6 ++ ts/sql/Client.ts | 66 +++++++------- ts/test-both/TaskWithTimeout_test.ts | 109 ++++++++++++++++++++--- ts/textsecure/TaskWithTimeout.ts | 128 +++++++++++++++------------ 4 files changed, 205 insertions(+), 104 deletions(-) diff --git a/ts/background.ts b/ts/background.ts index 51e3f6127ece..4aeddbc2b0ec 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -9,6 +9,10 @@ import { render, unstable_batchedUpdates as batchedUpdates } from 'react-dom'; import MessageReceiver from './textsecure/MessageReceiver'; import { SessionResetsType, ProcessedDataMessage } from './textsecure/Types.d'; import { HTTPError } from './textsecure/Errors'; +import { + suspendTasksWithTimeout, + resumeTasksWithTimeout, +} from './textsecure/TaskWithTimeout'; import { MessageAttributesType, ConversationAttributesType, @@ -1603,11 +1607,13 @@ export async function startApp(): Promise { window.Whisper.events.on('powerMonitorSuspend', () => { log.info('powerMonitor: suspend'); + suspendTasksWithTimeout(); }); window.Whisper.events.on('powerMonitorResume', () => { log.info('powerMonitor: resume'); server?.checkSockets(); + resumeTasksWithTimeout(); }); const reconnectToWebSocketQueue = new LatestQueue(); diff --git a/ts/sql/Client.ts b/ts/sql/Client.ts index a42b1b012f08..a2cb5e72f1d4 100644 --- a/ts/sql/Client.ts +++ b/ts/sql/Client.ts @@ -28,12 +28,12 @@ import * as Bytes from '../Bytes'; import { CURRENT_SCHEMA_VERSION } from '../../js/modules/types/message'; import { createBatcher } from '../util/batcher'; import { assert } from '../util/assert'; -import * as durations from '../util/durations'; import { cleanDataForIpc } from './cleanDataForIpc'; import { ReactionType } from '../types/Reactions'; import { ConversationColorType, CustomColorType } from '../types/Colors'; import type { ProcessGroupCallRingRequestResult } from '../types/Calling'; import type { RemoveAllConfiguration } from '../types/RemoveAllConfiguration'; +import createTaskWithTimeout from '../textsecure/TaskWithTimeout'; import * as log from '../logging/log'; import { @@ -92,8 +92,6 @@ if (ipcRenderer && ipcRenderer.setMaxListeners) { log.warn('sql/Client: ipcRenderer is not available!'); } -const DATABASE_UPDATE_TIMEOUT = 2 * durations.MINUTE; - const MIN_TRACE_DURATION = 10; const SQL_CHANNEL_KEY = 'sql-channel'; @@ -555,25 +553,25 @@ function makeChannel(fnName: string) { const jobId = _makeJob(fnName); - return new Promise((resolve, reject) => { - try { - ipcRenderer.send(SQL_CHANNEL_KEY, jobId, fnName, ...args); + return createTaskWithTimeout( + () => + new Promise((resolve, reject) => { + try { + ipcRenderer.send(SQL_CHANNEL_KEY, jobId, fnName, ...args); - _updateJob(jobId, { - resolve, - reject, - args: _DEBUG ? args : undefined, - }); + _updateJob(jobId, { + resolve, + reject, + args: _DEBUG ? args : undefined, + }); + } catch (error) { + _removeJob(jobId); - setTimeout(() => { - reject(new Error(`SQL channel job ${jobId} (${fnName}) timed out`)); - }, DATABASE_UPDATE_TIMEOUT); - } catch (error) { - _removeJob(jobId); - - reject(error); - } - }); + reject(error); + } + }), + `SQL channel job ${jobId} (${fnName})` + )(); }; } @@ -1555,22 +1553,22 @@ async function removeOtherData() { } async function callChannel(name: string) { - return new Promise((resolve, reject) => { - ipcRenderer.send(name); - ipcRenderer.once(`${name}-done`, (_, error) => { - if (error) { - reject(error); + return createTaskWithTimeout( + () => + new Promise((resolve, reject) => { + ipcRenderer.send(name); + ipcRenderer.once(`${name}-done`, (_, error) => { + if (error) { + reject(error); - return; - } + return; + } - resolve(); - }); - - setTimeout(() => { - reject(new Error(`callChannel call to ${name} timed out`)); - }, DATABASE_UPDATE_TIMEOUT); - }); + resolve(); + }); + }), + `callChannel call to ${name}` + )(); } async function getMessagesNeedingUpgrade( diff --git a/ts/test-both/TaskWithTimeout_test.ts b/ts/test-both/TaskWithTimeout_test.ts index 6908254a2233..6666c2bb8438 100644 --- a/ts/test-both/TaskWithTimeout_test.ts +++ b/ts/test-both/TaskWithTimeout_test.ts @@ -2,14 +2,28 @@ // SPDX-License-Identifier: AGPL-3.0-only import { assert } from 'chai'; +import * as sinon from 'sinon'; import { sleep } from '../util/sleep'; -import createTaskWithTimeout from '../textsecure/TaskWithTimeout'; +import createTaskWithTimeout, { + suspendTasksWithTimeout, + resumeTasksWithTimeout, +} from '../textsecure/TaskWithTimeout'; describe('createTaskWithTimeout', () => { + let sandbox: sinon.SinonSandbox; + + beforeEach(() => { + sandbox = sinon.createSandbox(); + }); + + afterEach(() => { + sandbox.restore(); + }); + it('resolves when promise resolves', async () => { const task = () => Promise.resolve('hi!'); - const taskWithTimeout = createTaskWithTimeout(task, 'test'); + const taskWithTimeout = createTaskWithTimeout(task, 'resolving-task'); const result = await taskWithTimeout(); assert.strictEqual(result, 'hi!'); @@ -18,38 +32,105 @@ describe('createTaskWithTimeout', () => { it('flows error from promise back', async () => { const error = new Error('original'); const task = () => Promise.reject(error); - const taskWithTimeout = createTaskWithTimeout(task, 'test'); + const taskWithTimeout = createTaskWithTimeout(task, 'rejecting-task'); await assert.isRejected(taskWithTimeout(), 'original'); }); it('rejects if promise takes too long (this one logs error to console)', async () => { - const task = async () => { - await sleep(3000); - }; - const taskWithTimeout = createTaskWithTimeout(task, 'test', { - timeout: 10, - }); + const clock = sandbox.useFakeTimers(); - await assert.isRejected(taskWithTimeout()); + const task = async () => { + await sleep(3000000); + }; + const taskWithTimeout = createTaskWithTimeout(task, 'slow-task'); + + const promise = assert.isRejected(taskWithTimeout()); + + await clock.nextAsync(); + await clock.nextAsync(); + + await promise; }); it('rejects if task throws (and does not log about taking too long)', async () => { + const clock = sandbox.useFakeTimers(); + const error = new Error('Task is throwing!'); const task = () => { throw error; }; - const taskWithTimeout = createTaskWithTimeout(task, 'test', { - timeout: 10, - }); + const taskWithTimeout = createTaskWithTimeout(task, 'throwing-task'); + await clock.nextAsync(); await assert.isRejected(taskWithTimeout(), 'Task is throwing!'); }); it('passes arguments to the underlying function', async () => { const task = (arg: string) => Promise.resolve(arg); - const taskWithTimeout = createTaskWithTimeout(task, 'test'); + const taskWithTimeout = createTaskWithTimeout(task, 'arguments-task'); const result = await taskWithTimeout('hi!'); assert.strictEqual(result, 'hi!'); }); + + it('suspends and resumes tasks', async () => { + const clock = sandbox.useFakeTimers(); + + let state = 0; + + const task = async () => { + state = 1; + await sleep(900); + state = 2; + await sleep(900); + state = 3; + }; + const taskWithTimeout = createTaskWithTimeout(task, 'suspend-task', { + timeout: 1000, + }); + + const promise = taskWithTimeout(); + + assert.strictEqual(state, 1); + + suspendTasksWithTimeout(); + await clock.nextAsync(); + assert.strictEqual(state, 2); + + resumeTasksWithTimeout(); + await clock.nextAsync(); + assert.strictEqual(state, 3); + + await promise; + }); + + it('suspends and resumes timing out task', async () => { + const clock = sandbox.useFakeTimers(); + + let state = 0; + + const task = async () => { + state = 1; + await sleep(3000000); + state = 2; + await sleep(3000000); + state = 3; + }; + const taskWithTimeout = createTaskWithTimeout(task, 'suspend-slow-task'); + + const promise = assert.isRejected(taskWithTimeout()); + + assert.strictEqual(state, 1); + + suspendTasksWithTimeout(); + await clock.nextAsync(); + assert.strictEqual(state, 2); + + resumeTasksWithTimeout(); + await clock.nextAsync(); + + assert.strictEqual(state, 2); + + await promise; + }); }); diff --git a/ts/textsecure/TaskWithTimeout.ts b/ts/textsecure/TaskWithTimeout.ts index 86409f3cb43d..23806a409284 100644 --- a/ts/textsecure/TaskWithTimeout.ts +++ b/ts/textsecure/TaskWithTimeout.ts @@ -2,8 +2,31 @@ // SPDX-License-Identifier: AGPL-3.0-only import * as durations from '../util/durations'; +import { explodePromise } from '../util/explodePromise'; +import { toLogFormat } from '../types/errors'; import * as log from '../logging/log'; +type TaskType = { + suspend(): void; + resume(): void; +}; + +const tasks = new Set(); + +export function suspendTasksWithTimeout(): void { + log.info(`TaskWithTimeout: suspending ${tasks.size} tasks`); + for (const task of tasks) { + task.suspend(); + } +} + +export function resumeTasksWithTimeout(): void { + log.info(`TaskWithTimeout: resuming ${tasks.size} tasks`); + for (const task of tasks) { + task.resume(); + } +} + export default function createTaskWithTimeout>( task: (...args: Args) => Promise, id: string, @@ -11,70 +34,63 @@ export default function createTaskWithTimeout>( ): (...args: Args) => Promise { const timeout = options.timeout || 2 * durations.MINUTE; - const errorForStack = new Error('for stack'); + const timeoutError = new Error(`${id || ''} task did not complete in time.`); - return async (...args: Args) => - new Promise((resolve, reject) => { - let complete = false; - let timer: NodeJS.Timeout | null = setTimeout(() => { - if (!complete) { - const message = `${ - id || '' - } task did not complete in time. Calling stack: ${ - errorForStack.stack - }`; + return async (...args: Args) => { + let complete = false; - log.error(message); - reject(new Error(message)); + let timer: NodeJS.Timeout | undefined; - return undefined; + const { promise: timerPromise, reject } = explodePromise(); + + const startTimer = () => { + stopTimer(); + + if (complete) { + return; + } + + timer = setTimeout(() => { + if (complete) { + return; } + complete = true; + tasks.delete(entry); - return null; + log.error(toLogFormat(timeoutError)); + reject(timeoutError); }, timeout); - const clearTimer = () => { - try { - const localTimer = timer; - if (localTimer) { - timer = null; - clearTimeout(localTimer); - } - } catch (error) { - log.error( - id || '', - 'task ran into problem canceling timer. Calling stack:', - errorForStack.stack - ); - } - }; + }; - const success = (result: T) => { - clearTimer(); - complete = true; - resolve(result); - }; - const failure = (error: Error) => { - clearTimer(); - complete = true; - reject(error); - }; - - let promise; - try { - promise = task(...args); - } catch (error) { - clearTimer(); - throw error; + const stopTimer = () => { + if (timer) { + clearTimeout(timer); + timer = undefined; } - if (!promise || !promise.then) { - clearTimer(); - complete = true; - resolve(promise); + }; - return undefined; - } + const entry: TaskType = { + suspend: stopTimer, + resume: startTimer, + }; - // eslint-disable-next-line more/no-then - return promise.then(success, failure); - }); + tasks.add(entry); + startTimer(); + + let result: unknown; + + const run = async (): Promise => { + result = await task(...args); + }; + + try { + await Promise.race([run(), timerPromise]); + + return result as T; + } finally { + complete = true; + tasks.delete(entry); + stopTimer(); + } + }; }