Resumable attachment downloads

This commit is contained in:
Fedor Indutny 2024-08-19 13:05:35 -07:00 committed by GitHub
parent 2c92591b59
commit 38f532cdda
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 401 additions and 89 deletions

View file

@ -184,7 +184,8 @@ type PromiseAjaxOptionsType = {
| 'jsonwithdetails'
| 'bytes'
| 'byteswithdetails'
| 'stream';
| 'stream'
| 'streamwithdetails';
serverUrl?: string;
stack?: string;
timeout?: number;
@ -214,6 +215,11 @@ type BytesWithDetailsType = {
contentType: string | null;
response: Response;
};
type StreamWithDetailsType = {
stream: Readable;
contentType: string | null;
response: Response;
};
export const multiRecipient200ResponseSchema = z.object({
uuids404: z.array(serviceIdSchema).optional(),
@ -386,7 +392,10 @@ async function _promiseAjax(
options.responseType === 'byteswithdetails'
) {
result = await response.buffer();
} else if (options.responseType === 'stream') {
} else if (
options.responseType === 'stream' ||
options.responseType === 'streamwithdetails'
) {
result = response.body;
} else {
result = await response.textConverted();
@ -437,6 +446,24 @@ async function _promiseAjax(
return result;
}
if (options.responseType === 'streamwithdetails') {
log.info(logId, response.status, 'Streaming with details');
response.body.on('error', e => {
log.info(logId, 'Errored while streaming:', e.message);
});
response.body.on('end', () => {
log.info(logId, response.status, 'Streaming ended');
});
const fullResult: StreamWithDetailsType = {
stream: result as Readable,
contentType: getContentType(response),
response,
};
return fullResult;
}
log.info(logId, response.status, 'Success');
if (options.responseType === 'byteswithdetails') {
@ -506,6 +533,10 @@ function _outerAjax(
providedUrl: string | null,
options: PromiseAjaxOptionsType & { responseType?: 'stream' }
): Promise<Readable>;
function _outerAjax(
providedUrl: string | null,
options: PromiseAjaxOptionsType & { responseType: 'streamwithdetails' }
): Promise<StreamWithDetailsType>;
function _outerAjax(
providedUrl: string | null,
options: PromiseAjaxOptionsType
@ -1215,6 +1246,7 @@ export type WebAPIType = {
options?: {
disableRetries?: boolean;
timeout?: number;
downloadOffset?: number;
};
}) => Promise<Readable>;
getAttachment: (args: {
@ -1223,6 +1255,7 @@ export type WebAPIType = {
options?: {
disableRetries?: boolean;
timeout?: number;
downloadOffset?: number;
};
}) => Promise<Readable>;
getAttachmentUploadForm: () => Promise<AttachmentUploadFormResponseType>;
@ -1789,6 +1822,9 @@ export function initialize({
function _ajax(
param: AjaxOptionsType & { responseType: 'stream' }
): Promise<Readable>;
function _ajax(
param: AjaxOptionsType & { responseType: 'streamwithdetails' }
): Promise<StreamWithDetailsType>;
function _ajax(
param: AjaxOptionsType & { responseType: 'json' }
): Promise<unknown>;
@ -3442,6 +3478,7 @@ export function initialize({
options?: {
disableRetries?: boolean;
timeout?: number;
downloadOffset?: number;
};
}) {
return _getAttachment({
@ -3468,6 +3505,7 @@ export function initialize({
options?: {
disableRetries?: boolean;
timeout?: number;
downloadOffset?: number;
};
}) {
return _getAttachment({
@ -3482,7 +3520,7 @@ export function initialize({
async function _getAttachment({
cdnPath,
cdnNumber,
headers,
headers = {},
redactor,
options,
}: {
@ -3493,12 +3531,13 @@ export function initialize({
options?: {
disableRetries?: boolean;
timeout?: number;
downloadOffset?: number;
};
}): Promise<Readable> {
const abortController = new AbortController();
const cdnUrl = cdnUrlObject[cdnNumber] ?? cdnUrlObject['0'];
let downloadStream: Readable | undefined;
let streamWithDetails: StreamWithDetailsType | undefined;
const cancelRequest = () => {
abortController.abort();
@ -3508,23 +3547,38 @@ export function initialize({
// This is going to the CDN, not the service, so we use _outerAjax
try {
downloadStream = await _outerAjax(`${cdnUrl}${cdnPath}`, {
headers,
const targetHeaders = { ...headers };
if (options?.downloadOffset) {
targetHeaders.range = `bytes=${options.downloadOffset}-`;
}
streamWithDetails = await _outerAjax(`${cdnUrl}${cdnPath}`, {
headers: targetHeaders,
certificateAuthority,
disableRetries: options?.disableRetries,
proxyUrl,
responseType: 'stream',
responseType: 'streamwithdetails',
timeout: options?.timeout ?? DEFAULT_TIMEOUT,
type: 'GET',
redactUrl: redactor,
version,
abortSignal: abortController.signal,
});
if (targetHeaders.range != null) {
strictAssert(
streamWithDetails.response.status === 206,
`Expected 206 status code for offset ${options?.downloadOffset}`
);
strictAssert(
!streamWithDetails.contentType?.includes('multipart'),
`Expected non-multipart response for ${cdnUrl}${cdnPath}`
);
}
} finally {
if (!downloadStream) {
if (!streamWithDetails) {
unregisterInFlightRequest(cancelRequest);
} else {
downloadStream.on('close', () => {
streamWithDetails.stream.on('close', () => {
unregisterInFlightRequest(cancelRequest);
});
}
@ -3536,7 +3590,7 @@ export function initialize({
abortController,
});
const combinedStream = downloadStream
const combinedStream = streamWithDetails.stream
// We do this manually; pipe() doesn't flow errors through the streams for us
.on('error', (error: Error) => {
timeoutStream.emit('error', error);