API to suspend/resume tasks with timeout

This commit is contained in:
Fedor Indutny 2021-09-27 11:22:46 -07:00 committed by GitHub
parent cf4c81b11c
commit af387095be
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 205 additions and 104 deletions

View file

@ -9,6 +9,10 @@ import { render, unstable_batchedUpdates as batchedUpdates } from 'react-dom';
import MessageReceiver from './textsecure/MessageReceiver'; import MessageReceiver from './textsecure/MessageReceiver';
import { SessionResetsType, ProcessedDataMessage } from './textsecure/Types.d'; import { SessionResetsType, ProcessedDataMessage } from './textsecure/Types.d';
import { HTTPError } from './textsecure/Errors'; import { HTTPError } from './textsecure/Errors';
import {
suspendTasksWithTimeout,
resumeTasksWithTimeout,
} from './textsecure/TaskWithTimeout';
import { import {
MessageAttributesType, MessageAttributesType,
ConversationAttributesType, ConversationAttributesType,
@ -1603,11 +1607,13 @@ export async function startApp(): Promise<void> {
window.Whisper.events.on('powerMonitorSuspend', () => { window.Whisper.events.on('powerMonitorSuspend', () => {
log.info('powerMonitor: suspend'); log.info('powerMonitor: suspend');
suspendTasksWithTimeout();
}); });
window.Whisper.events.on('powerMonitorResume', () => { window.Whisper.events.on('powerMonitorResume', () => {
log.info('powerMonitor: resume'); log.info('powerMonitor: resume');
server?.checkSockets(); server?.checkSockets();
resumeTasksWithTimeout();
}); });
const reconnectToWebSocketQueue = new LatestQueue(); const reconnectToWebSocketQueue = new LatestQueue();

View file

@ -28,12 +28,12 @@ import * as Bytes from '../Bytes';
import { CURRENT_SCHEMA_VERSION } from '../../js/modules/types/message'; import { CURRENT_SCHEMA_VERSION } from '../../js/modules/types/message';
import { createBatcher } from '../util/batcher'; import { createBatcher } from '../util/batcher';
import { assert } from '../util/assert'; import { assert } from '../util/assert';
import * as durations from '../util/durations';
import { cleanDataForIpc } from './cleanDataForIpc'; import { cleanDataForIpc } from './cleanDataForIpc';
import { ReactionType } from '../types/Reactions'; import { ReactionType } from '../types/Reactions';
import { ConversationColorType, CustomColorType } from '../types/Colors'; import { ConversationColorType, CustomColorType } from '../types/Colors';
import type { ProcessGroupCallRingRequestResult } from '../types/Calling'; import type { ProcessGroupCallRingRequestResult } from '../types/Calling';
import type { RemoveAllConfiguration } from '../types/RemoveAllConfiguration'; import type { RemoveAllConfiguration } from '../types/RemoveAllConfiguration';
import createTaskWithTimeout from '../textsecure/TaskWithTimeout';
import * as log from '../logging/log'; import * as log from '../logging/log';
import { import {
@ -92,8 +92,6 @@ if (ipcRenderer && ipcRenderer.setMaxListeners) {
log.warn('sql/Client: ipcRenderer is not available!'); log.warn('sql/Client: ipcRenderer is not available!');
} }
const DATABASE_UPDATE_TIMEOUT = 2 * durations.MINUTE;
const MIN_TRACE_DURATION = 10; const MIN_TRACE_DURATION = 10;
const SQL_CHANNEL_KEY = 'sql-channel'; const SQL_CHANNEL_KEY = 'sql-channel';
@ -555,25 +553,25 @@ function makeChannel(fnName: string) {
const jobId = _makeJob(fnName); const jobId = _makeJob(fnName);
return new Promise((resolve, reject) => { return createTaskWithTimeout(
try { () =>
ipcRenderer.send(SQL_CHANNEL_KEY, jobId, fnName, ...args); new Promise((resolve, reject) => {
try {
ipcRenderer.send(SQL_CHANNEL_KEY, jobId, fnName, ...args);
_updateJob(jobId, { _updateJob(jobId, {
resolve, resolve,
reject, reject,
args: _DEBUG ? args : undefined, args: _DEBUG ? args : undefined,
}); });
} catch (error) {
_removeJob(jobId);
setTimeout(() => { reject(error);
reject(new Error(`SQL channel job ${jobId} (${fnName}) timed out`)); }
}, DATABASE_UPDATE_TIMEOUT); }),
} catch (error) { `SQL channel job ${jobId} (${fnName})`
_removeJob(jobId); )();
reject(error);
}
});
}; };
} }
@ -1555,22 +1553,22 @@ async function removeOtherData() {
} }
async function callChannel(name: string) { async function callChannel(name: string) {
return new Promise<void>((resolve, reject) => { return createTaskWithTimeout(
ipcRenderer.send(name); () =>
ipcRenderer.once(`${name}-done`, (_, error) => { new Promise<void>((resolve, reject) => {
if (error) { ipcRenderer.send(name);
reject(error); ipcRenderer.once(`${name}-done`, (_, error) => {
if (error) {
reject(error);
return; return;
} }
resolve(); resolve();
}); });
}),
setTimeout(() => { `callChannel call to ${name}`
reject(new Error(`callChannel call to ${name} timed out`)); )();
}, DATABASE_UPDATE_TIMEOUT);
});
} }
async function getMessagesNeedingUpgrade( async function getMessagesNeedingUpgrade(

View file

@ -2,14 +2,28 @@
// SPDX-License-Identifier: AGPL-3.0-only // SPDX-License-Identifier: AGPL-3.0-only
import { assert } from 'chai'; import { assert } from 'chai';
import * as sinon from 'sinon';
import { sleep } from '../util/sleep'; import { sleep } from '../util/sleep';
import createTaskWithTimeout from '../textsecure/TaskWithTimeout'; import createTaskWithTimeout, {
suspendTasksWithTimeout,
resumeTasksWithTimeout,
} from '../textsecure/TaskWithTimeout';
describe('createTaskWithTimeout', () => { describe('createTaskWithTimeout', () => {
let sandbox: sinon.SinonSandbox;
beforeEach(() => {
sandbox = sinon.createSandbox();
});
afterEach(() => {
sandbox.restore();
});
it('resolves when promise resolves', async () => { it('resolves when promise resolves', async () => {
const task = () => Promise.resolve('hi!'); const task = () => Promise.resolve('hi!');
const taskWithTimeout = createTaskWithTimeout(task, 'test'); const taskWithTimeout = createTaskWithTimeout(task, 'resolving-task');
const result = await taskWithTimeout(); const result = await taskWithTimeout();
assert.strictEqual(result, 'hi!'); assert.strictEqual(result, 'hi!');
@ -18,38 +32,105 @@ describe('createTaskWithTimeout', () => {
it('flows error from promise back', async () => { it('flows error from promise back', async () => {
const error = new Error('original'); const error = new Error('original');
const task = () => Promise.reject(error); const task = () => Promise.reject(error);
const taskWithTimeout = createTaskWithTimeout(task, 'test'); const taskWithTimeout = createTaskWithTimeout(task, 'rejecting-task');
await assert.isRejected(taskWithTimeout(), 'original'); await assert.isRejected(taskWithTimeout(), 'original');
}); });
it('rejects if promise takes too long (this one logs error to console)', async () => { it('rejects if promise takes too long (this one logs error to console)', async () => {
const task = async () => { const clock = sandbox.useFakeTimers();
await sleep(3000);
};
const taskWithTimeout = createTaskWithTimeout(task, 'test', {
timeout: 10,
});
await assert.isRejected(taskWithTimeout()); const task = async () => {
await sleep(3000000);
};
const taskWithTimeout = createTaskWithTimeout(task, 'slow-task');
const promise = assert.isRejected(taskWithTimeout());
await clock.nextAsync();
await clock.nextAsync();
await promise;
}); });
it('rejects if task throws (and does not log about taking too long)', async () => { it('rejects if task throws (and does not log about taking too long)', async () => {
const clock = sandbox.useFakeTimers();
const error = new Error('Task is throwing!'); const error = new Error('Task is throwing!');
const task = () => { const task = () => {
throw error; throw error;
}; };
const taskWithTimeout = createTaskWithTimeout(task, 'test', { const taskWithTimeout = createTaskWithTimeout(task, 'throwing-task');
timeout: 10, await clock.nextAsync();
});
await assert.isRejected(taskWithTimeout(), 'Task is throwing!'); await assert.isRejected(taskWithTimeout(), 'Task is throwing!');
}); });
it('passes arguments to the underlying function', async () => { it('passes arguments to the underlying function', async () => {
const task = (arg: string) => Promise.resolve(arg); const task = (arg: string) => Promise.resolve(arg);
const taskWithTimeout = createTaskWithTimeout(task, 'test'); const taskWithTimeout = createTaskWithTimeout(task, 'arguments-task');
const result = await taskWithTimeout('hi!'); const result = await taskWithTimeout('hi!');
assert.strictEqual(result, 'hi!'); assert.strictEqual(result, 'hi!');
}); });
it('suspends and resumes tasks', async () => {
const clock = sandbox.useFakeTimers();
let state = 0;
const task = async () => {
state = 1;
await sleep(900);
state = 2;
await sleep(900);
state = 3;
};
const taskWithTimeout = createTaskWithTimeout(task, 'suspend-task', {
timeout: 1000,
});
const promise = taskWithTimeout();
assert.strictEqual(state, 1);
suspendTasksWithTimeout();
await clock.nextAsync();
assert.strictEqual(state, 2);
resumeTasksWithTimeout();
await clock.nextAsync();
assert.strictEqual(state, 3);
await promise;
});
it('suspends and resumes timing out task', async () => {
const clock = sandbox.useFakeTimers();
let state = 0;
const task = async () => {
state = 1;
await sleep(3000000);
state = 2;
await sleep(3000000);
state = 3;
};
const taskWithTimeout = createTaskWithTimeout(task, 'suspend-slow-task');
const promise = assert.isRejected(taskWithTimeout());
assert.strictEqual(state, 1);
suspendTasksWithTimeout();
await clock.nextAsync();
assert.strictEqual(state, 2);
resumeTasksWithTimeout();
await clock.nextAsync();
assert.strictEqual(state, 2);
await promise;
});
}); });

View file

@ -2,8 +2,31 @@
// SPDX-License-Identifier: AGPL-3.0-only // SPDX-License-Identifier: AGPL-3.0-only
import * as durations from '../util/durations'; import * as durations from '../util/durations';
import { explodePromise } from '../util/explodePromise';
import { toLogFormat } from '../types/errors';
import * as log from '../logging/log'; import * as log from '../logging/log';
type TaskType = {
suspend(): void;
resume(): void;
};
const tasks = new Set<TaskType>();
export function suspendTasksWithTimeout(): void {
log.info(`TaskWithTimeout: suspending ${tasks.size} tasks`);
for (const task of tasks) {
task.suspend();
}
}
export function resumeTasksWithTimeout(): void {
log.info(`TaskWithTimeout: resuming ${tasks.size} tasks`);
for (const task of tasks) {
task.resume();
}
}
export default function createTaskWithTimeout<T, Args extends Array<unknown>>( export default function createTaskWithTimeout<T, Args extends Array<unknown>>(
task: (...args: Args) => Promise<T>, task: (...args: Args) => Promise<T>,
id: string, id: string,
@ -11,70 +34,63 @@ export default function createTaskWithTimeout<T, Args extends Array<unknown>>(
): (...args: Args) => Promise<T> { ): (...args: Args) => Promise<T> {
const timeout = options.timeout || 2 * durations.MINUTE; const timeout = options.timeout || 2 * durations.MINUTE;
const errorForStack = new Error('for stack'); const timeoutError = new Error(`${id || ''} task did not complete in time.`);
return async (...args: Args) => return async (...args: Args) => {
new Promise((resolve, reject) => { let complete = false;
let complete = false;
let timer: NodeJS.Timeout | null = setTimeout(() => {
if (!complete) {
const message = `${
id || ''
} task did not complete in time. Calling stack: ${
errorForStack.stack
}`;
log.error(message); let timer: NodeJS.Timeout | undefined;
reject(new Error(message));
return undefined; const { promise: timerPromise, reject } = explodePromise<never>();
const startTimer = () => {
stopTimer();
if (complete) {
return;
}
timer = setTimeout(() => {
if (complete) {
return;
} }
complete = true;
tasks.delete(entry);
return null; log.error(toLogFormat(timeoutError));
reject(timeoutError);
}, timeout); }, timeout);
const clearTimer = () => { };
try {
const localTimer = timer;
if (localTimer) {
timer = null;
clearTimeout(localTimer);
}
} catch (error) {
log.error(
id || '',
'task ran into problem canceling timer. Calling stack:',
errorForStack.stack
);
}
};
const success = (result: T) => { const stopTimer = () => {
clearTimer(); if (timer) {
complete = true; clearTimeout(timer);
resolve(result); timer = undefined;
};
const failure = (error: Error) => {
clearTimer();
complete = true;
reject(error);
};
let promise;
try {
promise = task(...args);
} catch (error) {
clearTimer();
throw error;
} }
if (!promise || !promise.then) { };
clearTimer();
complete = true;
resolve(promise);
return undefined; const entry: TaskType = {
} suspend: stopTimer,
resume: startTimer,
};
// eslint-disable-next-line more/no-then tasks.add(entry);
return promise.then(success, failure); startTimer();
});
let result: unknown;
const run = async (): Promise<void> => {
result = await task(...args);
};
try {
await Promise.race([run(), timerPromise]);
return result as T;
} finally {
complete = true;
tasks.delete(entry);
stopTimer();
}
};
} }