Resumable backup import

This commit is contained in:
Fedor Indutny 2024-08-27 17:00:41 -04:00 committed by GitHub
parent 3d8aaf0a5a
commit 8ef149e3a8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 498 additions and 33 deletions

View file

@ -69,6 +69,7 @@ import { handleStatusCode, translateError } from './Utils';
import * as log from '../logging/log';
import { maybeParseUrl, urlPathFromComponents } from '../util/url';
import { SECOND } from '../util/durations';
import { safeParseNumber } from '../util/numbers';
import type { IWebSocketResource } from './WebsocketResources';
import { Environment, getEnvironment } from '../environment';
@ -1187,6 +1188,8 @@ export type GetBackupStreamOptionsType = Readonly<{
backupDir: string;
backupName: string;
headers: Record<string, string>;
downloadOffset: number;
onProgress: (currentBytes: number, totalBytes: number) => void;
}>;
export const getBackupInfoResponseSchema = z.object({
@ -2825,12 +2828,18 @@ export function initialize({
cdn,
backupDir,
backupName,
downloadOffset,
onProgress,
}: GetBackupStreamOptionsType): Promise<Readable> {
return _getAttachment({
cdnPath: `/backups/${encodeURIComponent(backupDir)}/${encodeURIComponent(backupName)}`,
cdnNumber: cdn,
redactor: _createRedactor(backupDir, backupName),
headers,
options: {
downloadOffset,
onProgress,
},
});
}
@ -3555,6 +3564,7 @@ export function initialize({
disableRetries?: boolean;
timeout?: number;
downloadOffset?: number;
onProgress?: (currentBytes: number, totalBytes: number) => void;
};
}): Promise<Readable> {
const abortController = new AbortController();
@ -3568,6 +3578,8 @@ export function initialize({
registerInflightRequest(cancelRequest);
let totalBytes = 0;
// This is going to the CDN, not the service, so we use _outerAjax
try {
const targetHeaders = { ...headers };
@ -3587,7 +3599,22 @@ export function initialize({
abortSignal: abortController.signal,
});
if (targetHeaders.range != null) {
if (targetHeaders.range == null) {
const contentLength =
streamWithDetails.response.headers.get('content-length');
strictAssert(
contentLength != null,
'Attachment Content-Length is absent'
);
const maybeSize = safeParseNumber(contentLength);
strictAssert(
maybeSize != null,
'Attachment Content-Length is not a number'
);
totalBytes = maybeSize;
} else {
strictAssert(
streamWithDetails.response.status === 206,
`Expected 206 status code for offset ${options?.downloadOffset}`
@ -3596,6 +3623,19 @@ export function initialize({
!streamWithDetails.contentType?.includes('multipart'),
`Expected non-multipart response for ${cdnUrl}${cdnPath}`
);
const range = streamWithDetails.response.headers.get('content-range');
strictAssert(range != null, 'Attachment Content-Range is absent');
const match = PARSE_RANGE_HEADER.exec(range);
strictAssert(match != null, 'Attachment Content-Range is invalid');
const maybeSize = safeParseNumber(match[1]);
strictAssert(
maybeSize != null,
'Attachment Content-Range[1] is not a number'
);
totalBytes = maybeSize;
}
} finally {
if (!streamWithDetails) {
@ -3620,6 +3660,17 @@ export function initialize({
})
.pipe(timeoutStream);
if (options?.onProgress) {
const { onProgress } = options;
let currentBytes = options.downloadOffset ?? 0;
combinedStream.pause();
combinedStream.on('data', chunk => {
currentBytes += chunk.byteLength;
onProgress(currentBytes, totalBytes);
});
}
return combinedStream;
}