Properly abort inflight requests on resume or shutdown

This commit is contained in:
trevor-signal 2024-08-02 13:31:27 -04:00 committed by GitHub
parent 517007ad04
commit 81bed5c444
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 63 additions and 43 deletions

View file

@ -735,6 +735,9 @@ export async function startApp(): Promise<void> {
'background/shutdown: shutdown requested' 'background/shutdown: shutdown requested'
); );
const attachmentDownloadStopPromise = AttachmentDownloadManager.stop();
const attachmentBackupStopPromise = AttachmentBackupManager.stop();
server?.cancelInflightRequests('shutdown'); server?.cancelInflightRequests('shutdown');
// Stop background processing // Stop background processing
@ -815,13 +818,12 @@ export async function startApp(): Promise<void> {
]); ]);
log.info( log.info(
'background/shutdown: waiting for all attachment downloads to finish' 'background/shutdown: waiting for all attachment backups & downloads to finish'
); );
// Since we canceled the inflight requests earlier in shutdown, these should // Since we canceled the inflight requests earlier in shutdown, these should
// resolve quickly // resolve quickly
await AttachmentDownloadManager.stop(); await attachmentDownloadStopPromise;
await AttachmentBackupManager.stop(); await attachmentBackupStopPromise;
log.info('background/shutdown: closing the database'); log.info('background/shutdown: closing the database');

View file

@ -208,12 +208,10 @@ export class AttachmentDownloadManager extends JobManager<CoreAttachmentDownload
} }
static async start(): Promise<void> { static async start(): Promise<void> {
log.info('AttachmentDownloadManager/starting');
await AttachmentDownloadManager.instance.start(); await AttachmentDownloadManager.instance.start();
} }
static async stop(): Promise<void> { static async stop(): Promise<void> {
log.info('AttachmentDownloadManager/stopping');
return AttachmentDownloadManager._instance?.stop(); return AttachmentDownloadManager._instance?.stop();
} }

View file

@ -83,19 +83,26 @@ export abstract class JobManager<CoreJobType> {
constructor(readonly params: JobManagerParamsType<CoreJobType>) {} constructor(readonly params: JobManagerParamsType<CoreJobType>) {}
async start(): Promise<void> { async start(): Promise<void> {
log.info(`${this.logPrefix}: starting`);
this.enabled = true; this.enabled = true;
await this.params.markAllJobsInactive(); await this.params.markAllJobsInactive();
this.tick(); this.tick();
} }
async stop(): Promise<void> { async stop(): Promise<void> {
const activeJobs = [...this.activeJobs.values()];
log.info(
`${this.logPrefix}: stopping. There are ` +
`${activeJobs.length} active job(s)`
);
this.enabled = false; this.enabled = false;
clearTimeoutIfNecessary(this.tickTimeout); clearTimeoutIfNecessary(this.tickTimeout);
this.tickTimeout = null; this.tickTimeout = null;
await Promise.all( await Promise.all(
[...this.activeJobs.values()].map( activeJobs.map(({ completionPromise }) => completionPromise.promise)
({ completionPromise }) => completionPromise.promise
)
); );
} }

View file

@ -257,15 +257,10 @@ function getHostname(url: string): string {
return urlObject.hostname; return urlObject.hostname;
} }
type FetchOptionsType = { type FetchOptionsType = Omit<RequestInit, 'headers'> & {
method: string; headers: Record<string, string>;
body?: Uint8Array | Readable | string; // This is patch-packaged
headers: FetchHeaderListType;
redirect?: 'error' | 'follow' | 'manual';
agent?: Agent;
ca?: string; ca?: string;
timeout?: number;
abortSignal?: AbortSignal;
}; };
async function getFetchOptions( async function getFetchOptions(
@ -297,7 +292,7 @@ async function getFetchOptions(
const agentEntry = agents[cacheKey]; const agentEntry = agents[cacheKey];
const agent = agentEntry?.agent ?? null; const agent = agentEntry?.agent ?? null;
const fetchOptions = { const fetchOptions: FetchOptionsType = {
method: options.type, method: options.type,
body: typeof options.data === 'function' ? options.data() : options.data, body: typeof options.data === 'function' ? options.data() : options.data,
headers: { headers: {
@ -309,7 +304,7 @@ async function getFetchOptions(
agent, agent,
ca: options.certificateAuthority, ca: options.certificateAuthority,
timeout, timeout,
abortSignal: options.abortSignal, signal: options.abortSignal,
}; };
if (options.contentType) { if (options.contentType) {
@ -364,7 +359,6 @@ async function _promiseAjax(
response = socketManager response = socketManager
? await socketManager.fetch(url, fetchOptions) ? await socketManager.fetch(url, fetchOptions)
: await fetch(url, fetchOptions); : await fetch(url, fetchOptions);
if ( if (
options.serverUrl && options.serverUrl &&
getHostname(options.serverUrl) === getHostname(url) getHostname(options.serverUrl) === getHostname(url)
@ -414,7 +408,6 @@ async function _promiseAjax(
options.stack options.stack
); );
} }
if ( if (
options.responseType === 'json' || options.responseType === 'json' ||
options.responseType === 'jsonwithdetails' options.responseType === 'jsonwithdetails'
@ -433,6 +426,17 @@ async function _promiseAjax(
} }
} }
if (options.responseType === 'stream') {
log.info(logId, response.status, 'Streaming');
response.body.on('error', e => {
log.info(logId, 'Errored while streaming:', e.message);
});
response.body.on('end', () => {
log.info(logId, response.status, 'Streaming ended');
});
return result;
}
log.info(logId, response.status, 'Success'); log.info(logId, response.status, 'Success');
if (options.responseType === 'byteswithdetails') { if (options.responseType === 'byteswithdetails') {
@ -3482,19 +3486,38 @@ export function initialize({
}): Promise<Readable> { }): Promise<Readable> {
const abortController = new AbortController(); const abortController = new AbortController();
const cdnUrl = cdnUrlObject[cdnNumber] ?? cdnUrlObject['0']; const cdnUrl = cdnUrlObject[cdnNumber] ?? cdnUrlObject['0'];
let downloadStream: Readable | undefined;
const cancelRequest = () => {
abortController.abort();
};
registerInflightRequest(cancelRequest);
// This is going to the CDN, not the service, so we use _outerAjax // This is going to the CDN, not the service, so we use _outerAjax
const downloadStream = await _outerAjax(`${cdnUrl}${cdnPath}`, { try {
headers, downloadStream = await _outerAjax(`${cdnUrl}${cdnPath}`, {
certificateAuthority, headers,
disableRetries: options?.disableRetries, certificateAuthority,
proxyUrl, disableRetries: options?.disableRetries,
responseType: 'stream', proxyUrl,
timeout: options?.timeout || 0, responseType: 'stream',
type: 'GET', timeout: options?.timeout || 0,
redactUrl: redactor, type: 'GET',
version, redactUrl: redactor,
abortSignal: abortController.signal, version,
}); abortSignal: abortController.signal,
});
} finally {
if (!downloadStream) {
unregisterInFlightRequest(cancelRequest);
} else {
downloadStream.on('close', () => {
unregisterInFlightRequest(cancelRequest);
});
}
}
const timeoutStream = getTimeoutStream({ const timeoutStream = getTimeoutStream({
name: `getAttachment(${redactor(cdnPath)})`, name: `getAttachment(${redactor(cdnPath)})`,
@ -3509,16 +3532,6 @@ export function initialize({
}) })
.pipe(timeoutStream); .pipe(timeoutStream);
const cancelRequest = (error: Error) => {
combinedStream.emit('error', error);
abortController.abort();
};
registerInflightRequest(cancelRequest);
combinedStream.on('done', () => {
unregisterInFlightRequest(cancelRequest);
});
return combinedStream; return combinedStream;
} }