Handle abort signal in SocketManager

Co-authored-by: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com>
This commit is contained in:
automated-signal 2024-11-14 17:23:52 -06:00 committed by GitHub
parent 2e82be1f3b
commit fbdb61be52
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 64 additions and 29 deletions

View file

@ -477,18 +477,25 @@ export class BackupsService {
}
let stream: Readable;
if (ephemeralKey == null) {
stream = await this.api.download({
downloadOffset,
onProgress: onDownloadProgress,
abortSignal: controller.signal,
});
} else {
stream = await this.api.downloadEphemeral({
downloadOffset,
onProgress: onDownloadProgress,
abortSignal: controller.signal,
});
try {
if (ephemeralKey == null) {
stream = await this.api.download({
downloadOffset,
onProgress: onDownloadProgress,
abortSignal: controller.signal,
});
} else {
stream = await this.api.downloadEphemeral({
downloadOffset,
onProgress: onDownloadProgress,
abortSignal: controller.signal,
});
}
} catch (error) {
if (controller.signal.aborted) {
return false;
}
throw error;
}
if (controller.signal.aborted) {

View file

@ -15,6 +15,7 @@ import EventListener from 'events';
import { AbortableProcess } from '../util/AbortableProcess';
import { strictAssert } from '../util/assert';
import { explodePromise } from '../util/explodePromise';
import {
BackOff,
EXTENDED_FIBONACCI_TIMEOUTS,
@ -421,7 +422,7 @@ export class SocketManager extends EventListener {
const { path } = URL.parse(url);
strictAssert(path, "Fetch can't have empty path");
const { method = 'GET', body, timeout } = init;
const { method = 'GET', body, timeout, signal } = init;
let bodyBytes: Uint8Array | undefined;
if (body === undefined) {
@ -436,13 +437,26 @@ export class SocketManager extends EventListener {
throw new Error(`Unsupported body type: ${typeof body}`);
}
return resource.sendRequest({
const { promise: abortPromise, reject } = explodePromise<Response>();
const onAbort = () => reject(new Error('Aborted'));
const cleanup = () => signal?.removeEventListener('abort', onAbort);
signal?.addEventListener('abort', onAbort, { once: true });
const responsePromise = resource.sendRequest({
verb: method,
path,
body: bodyBytes,
headers: Array.from(headers.entries()),
timeout,
});
try {
return await Promise.race([responsePromise, abortPromise]);
} finally {
cleanup();
}
}
public registerRequestHandler(handler: IRequestHandler): void {

View file

@ -68,7 +68,7 @@ import type {
import { handleStatusCode, translateError } from './Utils';
import * as log from '../logging/log';
import { maybeParseUrl, urlPathFromComponents } from '../util/url';
import { SECOND } from '../util/durations';
import { HOUR, MINUTE, SECOND } from '../util/durations';
import { safeParseNumber } from '../util/numbers';
import { isStagingServer } from '../util/isStagingServer';
import type { IWebSocketResource } from './WebsocketResources';
@ -165,8 +165,8 @@ function _validateResponse(response: any, schema: any) {
return true;
}
const FIVE_MINUTES = 5 * durations.MINUTE;
const GET_ATTACHMENT_CHUNK_TIMEOUT = 10 * durations.SECOND;
const FIVE_MINUTES = 5 * MINUTE;
const GET_ATTACHMENT_CHUNK_TIMEOUT = 10 * SECOND;
type AgentCacheType = {
[name: string]: {
@ -545,7 +545,12 @@ async function _retryAjax(
try {
return await _promiseAjax(url, options);
} catch (e) {
if (e instanceof HTTPError && e.code === -1 && count < limit) {
if (
e instanceof HTTPError &&
e.code === -1 &&
count < limit &&
!options.abortSignal?.aborted
) {
return new Promise(resolve => {
setTimeout(() => {
resolve(_retryAjax(url, options, limit, count));
@ -2242,26 +2247,31 @@ export function initialize({
}
async function getTransferArchive({
timeout = durations.HOUR,
timeout = HOUR,
abortSignal,
}: GetTransferArchiveOptionsType): Promise<TransferArchiveType> {
const timeoutTime = Date.now() + timeout;
const urlParameters = timeout
? `?timeout=${encodeURIComponent(Math.round(timeout / SECOND))}`
: undefined;
let remainingTime: number;
do {
remainingTime = Math.max(timeoutTime - Date.now(), 0);
const requestTimeoutInSecs = Math.round(
Math.min(remainingTime, 5 * MINUTE) / SECOND
);
const urlParameters = timeout
? `?timeout=${encodeURIComponent(requestTimeoutInSecs)}`
: undefined;
// eslint-disable-next-line no-await-in-loop
const { data, response }: JSONWithDetailsType = await _ajax({
call: 'transferArchive',
httpType: 'GET',
responseType: 'jsonwithdetails',
urlParameters,
timeout: remainingTime,
// Add a bit of leeway to let server respond properly
timeout: requestTimeoutInSecs + 15 * SECOND,
abortSignal,
});
@ -2274,6 +2284,10 @@ export function initialize({
'Invalid transfer archive status code'
);
if (abortSignal?.aborted) {
break;
}
// Timed out, see if we can retry
} while (!timeout || remainingTime != null);
@ -3049,8 +3063,8 @@ export function initialize({
startDayInMs,
endDayInMs,
}: GetBackupCredentialsOptionsType) {
const startDayInSeconds = startDayInMs / durations.SECOND;
const endDayInSeconds = endDayInMs / durations.SECOND;
const startDayInSeconds = startDayInMs / SECOND;
const endDayInSeconds = endDayInMs / SECOND;
const res = await _ajax({
call: 'getBackupCredentials',
httpType: 'GET',
@ -3616,7 +3630,7 @@ export function initialize({
// Upload stickers
const queue = new PQueue({
concurrency: 3,
timeout: durations.MINUTE * 30,
timeout: MINUTE * 30,
throwOnTimeout: true,
});
await Promise.all(
@ -4060,8 +4074,8 @@ export function initialize({
startDayInMs,
endDayInMs,
}: GetGroupCredentialsOptionsType): Promise<GetGroupCredentialsResultType> {
const startDayInSeconds = startDayInMs / durations.SECOND;
const endDayInSeconds = endDayInMs / durations.SECOND;
const startDayInSeconds = startDayInMs / SECOND;
const endDayInSeconds = endDayInMs / SECOND;
const response = (await _ajax({
call: 'getGroupCredentials',
urlParameters: