Add utilities for using TUS Protocol
Co-authored-by: Scott Nonnenberg <scott@signal.org> Co-authored-by: Fedor Indutny <indutny@signal.org>
This commit is contained in:
parent
794eeb2323
commit
8ef0ec706d
5 changed files with 1098 additions and 0 deletions
357
ts/util/uploads/tusProtocol.ts
Normal file
357
ts/util/uploads/tusProtocol.ts
Normal file
|
@ -0,0 +1,357 @@
|
|||
// Copyright 2024 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
import { type Readable } from 'node:stream';
|
||||
|
||||
import { HTTPError } from '../../textsecure/Errors';
|
||||
import * as log from '../../logging/log';
|
||||
import * as Errors from '../../types/errors';
|
||||
import { sleep } from '../sleep';
|
||||
import { FIBONACCI_TIMEOUTS, BackOff } from '../BackOff';
|
||||
|
||||
const DEFAULT_MAX_RETRIES = 3;
|
||||
|
||||
function toLogId(input: string) {
|
||||
return Buffer.from(input).toString('base64').slice(0, 3);
|
||||
}
|
||||
|
||||
/**
|
||||
* This file is a standalone implementation of the TUS protocol.
|
||||
* Signal specific logic is in uploads.ts
|
||||
*/
|
||||
|
||||
export type TusFileReader = (filePath: string, offset?: number) => Readable;
|
||||
|
||||
/**
|
||||
* @private
|
||||
* https://tus.io/protocols/resumable-upload#upload-metadata
|
||||
*/
|
||||
export function _getUploadMetadataHeader(
|
||||
params: Record<string, string>
|
||||
): string {
|
||||
return Object.entries(params)
|
||||
.map(([key, value]) => {
|
||||
return `${key} ${Buffer.from(value).toString('base64')}`;
|
||||
})
|
||||
.join(',');
|
||||
}
|
||||
|
||||
function addProgressHandler(
|
||||
readable: Readable,
|
||||
onProgress: (progress: number) => void
|
||||
): void {
|
||||
let bytesUploaded = 0;
|
||||
// Explicitly stop the flow, otherwise we might emit 'data' before `fetch()`
|
||||
// starts reading the stream.
|
||||
readable.pause();
|
||||
readable.on('data', (chunk: Buffer) => {
|
||||
bytesUploaded += chunk.byteLength;
|
||||
onProgress(bytesUploaded);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @private
|
||||
* Generic TUS POST implementation with creation-with-upload.
|
||||
* @returns {boolean} `true` if the upload completed, `false` if interrupted.
|
||||
* @throws {ResponseError} If the server responded with an error.
|
||||
* @see https://tus.io/protocols/resumable-upload#creation-with-upload
|
||||
*/
|
||||
export async function _tusCreateWithUploadRequest({
|
||||
endpoint,
|
||||
headers,
|
||||
fileName,
|
||||
fileSize,
|
||||
readable,
|
||||
onProgress,
|
||||
onCaughtError,
|
||||
signal,
|
||||
}: {
|
||||
endpoint: string;
|
||||
headers: Record<string, string>;
|
||||
fileName: string;
|
||||
fileSize: number;
|
||||
readable: Readable;
|
||||
onProgress?: (bytesUploaded: number) => void;
|
||||
onCaughtError?: (error: Error) => void;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<boolean> {
|
||||
const logId = `tusProtocol: CreateWithUpload(${toLogId(fileName)})`;
|
||||
if (onProgress != null) {
|
||||
addProgressHandler(readable, onProgress);
|
||||
}
|
||||
|
||||
let response: Response;
|
||||
try {
|
||||
log.info(`${logId} init`);
|
||||
response = await fetch(endpoint, {
|
||||
method: 'POST',
|
||||
signal,
|
||||
// @ts-expect-error: `duplex` is missing from TypeScript's `RequestInit`.
|
||||
duplex: 'half',
|
||||
headers: {
|
||||
...headers,
|
||||
'Tus-Resumable': '1.0.0',
|
||||
'Upload-Length': String(fileSize),
|
||||
'Upload-Metadata': _getUploadMetadataHeader({
|
||||
filename: fileName,
|
||||
}),
|
||||
'Content-Type': 'application/offset+octet-stream',
|
||||
},
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
body: readable as any,
|
||||
});
|
||||
} catch (error) {
|
||||
log.error(`${logId} closed without response`, Errors.toLogFormat(error));
|
||||
onCaughtError?.(error);
|
||||
return false;
|
||||
}
|
||||
if (!response.ok) {
|
||||
log.error(`${logId} error (${response.status})`);
|
||||
throw HTTPError.fromResponse(response);
|
||||
}
|
||||
log.info(`${logId} success (${response.status})`);
|
||||
return true;
|
||||
}
|
||||
|
||||
function isPositiveInteger(value: number): value is number {
|
||||
return Number.isInteger(value) && value >= 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* @private
|
||||
* Generic TUS HEAD implementation.
|
||||
* @returns {number} The current offset of the upload.
|
||||
* @throws {ResponseError} If the server responded with an error.
|
||||
* @throws {Error} If the server responded with an invalid Upload-Offset header.
|
||||
* @see https://tus.io/protocols/resumable-upload#head
|
||||
*/
|
||||
export async function _tusGetCurrentOffsetRequest({
|
||||
endpoint,
|
||||
headers,
|
||||
fileName,
|
||||
signal,
|
||||
}: {
|
||||
endpoint: string;
|
||||
headers: Record<string, string>;
|
||||
fileName: string;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<number> {
|
||||
const logId = `tusProtocol: GetCurrentOffsetRequest(${toLogId(fileName)})`;
|
||||
log.info(`${logId} init`);
|
||||
|
||||
const response = await fetch(`${endpoint}/${fileName}`, {
|
||||
method: 'HEAD',
|
||||
signal,
|
||||
headers: {
|
||||
...headers,
|
||||
'Tus-Resumable': '1.0.0',
|
||||
},
|
||||
});
|
||||
if (!response.ok) {
|
||||
log.error(`${logId} error (${response.status})`);
|
||||
throw HTTPError.fromResponse(response);
|
||||
}
|
||||
|
||||
log.info(`${logId} success (${response.status})`);
|
||||
const header = response.headers.get('Upload-Offset');
|
||||
if (header == null) {
|
||||
throw new Error('getCurrentState: Missing Upload-Offset header');
|
||||
}
|
||||
|
||||
const result = Number(header);
|
||||
if (!isPositiveInteger(result)) {
|
||||
throw new Error(`getCurrentState: Invalid Upload-Offset (${header})`);
|
||||
}
|
||||
|
||||
log.info(`${logId} current offset (${result})`);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* @private
|
||||
* Generic TUS PATCH implementation.
|
||||
* @returns {boolean} `true` if the upload completed, `false` if interrupted.
|
||||
* @throws {ResponseError} If the server responded with an error.
|
||||
* @see https://tus.io/protocols/resumable-upload#patch
|
||||
*/
|
||||
export async function _tusResumeUploadRequest({
|
||||
endpoint,
|
||||
headers,
|
||||
fileName,
|
||||
uploadOffset,
|
||||
readable,
|
||||
onProgress,
|
||||
onCaughtError,
|
||||
signal,
|
||||
}: {
|
||||
endpoint: string;
|
||||
headers: Record<string, string>;
|
||||
fileName: string;
|
||||
uploadOffset: number;
|
||||
readable: Readable;
|
||||
onProgress?: (bytesUploaded: number) => void;
|
||||
onCaughtError?: (error: Error) => void;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<boolean> {
|
||||
const logId = `tusProtocol: ResumeUploadRequest(${toLogId(fileName)})`;
|
||||
if (onProgress != null) {
|
||||
addProgressHandler(readable, onProgress);
|
||||
}
|
||||
|
||||
let response: Response;
|
||||
try {
|
||||
log.info(`${logId} init`);
|
||||
response = await fetch(`${endpoint}/${fileName}`, {
|
||||
method: 'PATCH',
|
||||
signal,
|
||||
// @ts-expect-error: `duplex` is missing from TypeScript's `RequestInit`.
|
||||
duplex: 'half',
|
||||
headers: {
|
||||
...headers,
|
||||
'Tus-Resumable': '1.0.0',
|
||||
'Upload-Offset': String(uploadOffset),
|
||||
'Content-Type': 'application/offset+octet-stream',
|
||||
},
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
body: readable as any,
|
||||
});
|
||||
} catch (error) {
|
||||
log.error(`${logId} closed without response`, Errors.toLogFormat(error));
|
||||
onCaughtError?.(error);
|
||||
return false;
|
||||
}
|
||||
if (!response.ok) {
|
||||
log.error(`${logId} error (${response.status})`);
|
||||
throw HTTPError.fromResponse(response);
|
||||
}
|
||||
log.info(`${logId} success (${response.status})`);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to upload a file using the TUS protocol with Signal headers, and
|
||||
* resumes the upload if it was interrupted.
|
||||
* @throws {ResponseError} If the server responded with an error.
|
||||
*/
|
||||
export async function tusUpload({
|
||||
endpoint,
|
||||
headers,
|
||||
fileName,
|
||||
filePath,
|
||||
fileSize,
|
||||
reader,
|
||||
onProgress,
|
||||
onCaughtError,
|
||||
maxRetries = DEFAULT_MAX_RETRIES,
|
||||
signal,
|
||||
}: {
|
||||
endpoint: string;
|
||||
headers: Record<string, string>;
|
||||
fileName: string;
|
||||
filePath: string;
|
||||
fileSize: number;
|
||||
reader: TusFileReader;
|
||||
onProgress?: (bytesUploaded: number) => void;
|
||||
onCaughtError?: (error: Error) => void;
|
||||
maxRetries?: number;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<void> {
|
||||
const readable = reader(filePath);
|
||||
const done = await _tusCreateWithUploadRequest({
|
||||
endpoint,
|
||||
headers,
|
||||
fileName,
|
||||
fileSize,
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
readable: readable as any,
|
||||
onProgress,
|
||||
onCaughtError,
|
||||
signal,
|
||||
});
|
||||
if (!done) {
|
||||
await tusResumeUpload({
|
||||
endpoint,
|
||||
headers,
|
||||
fileName,
|
||||
filePath,
|
||||
fileSize,
|
||||
reader,
|
||||
onProgress,
|
||||
onCaughtError,
|
||||
maxRetries,
|
||||
signal,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const BACKOFF_JITTER_MS = 100;
|
||||
|
||||
/**
|
||||
* Attempts to resume an upload using the TUS protocol.
|
||||
* @throws {ResponseError} If the server responded with an error.
|
||||
* @param params
|
||||
*/
|
||||
export async function tusResumeUpload({
|
||||
endpoint,
|
||||
headers,
|
||||
fileName,
|
||||
filePath,
|
||||
fileSize,
|
||||
reader,
|
||||
onProgress,
|
||||
onCaughtError,
|
||||
maxRetries = DEFAULT_MAX_RETRIES,
|
||||
signal,
|
||||
}: {
|
||||
endpoint: string;
|
||||
headers: Record<string, string>;
|
||||
fileName: string;
|
||||
filePath: string;
|
||||
fileSize: number;
|
||||
reader: TusFileReader;
|
||||
onProgress?: (bytesUploaded: number) => void;
|
||||
onCaughtError?: (error: Error) => void;
|
||||
maxRetries?: number;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<void> {
|
||||
const backoff = new BackOff(FIBONACCI_TIMEOUTS, {
|
||||
jitter: BACKOFF_JITTER_MS,
|
||||
});
|
||||
|
||||
let retryAttempts = 0;
|
||||
while (retryAttempts < maxRetries) {
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
await sleep(backoff.getAndIncrement());
|
||||
retryAttempts += 1;
|
||||
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
const uploadOffset = await _tusGetCurrentOffsetRequest({
|
||||
endpoint,
|
||||
headers,
|
||||
fileName,
|
||||
signal,
|
||||
});
|
||||
|
||||
if (uploadOffset === fileSize) {
|
||||
break;
|
||||
}
|
||||
|
||||
const readable = reader(filePath, uploadOffset);
|
||||
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
const done = await _tusResumeUploadRequest({
|
||||
endpoint,
|
||||
headers,
|
||||
fileName,
|
||||
uploadOffset,
|
||||
readable,
|
||||
onProgress,
|
||||
onCaughtError,
|
||||
signal,
|
||||
});
|
||||
|
||||
if (done) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
136
ts/util/uploads/uploads.ts
Normal file
136
ts/util/uploads/uploads.ts
Normal file
|
@ -0,0 +1,136 @@
|
|||
// Copyright 2024 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
import { createReadStream, createWriteStream } from 'node:fs';
|
||||
import { Writable } from 'node:stream';
|
||||
import type { TusFileReader } from './tusProtocol';
|
||||
import { tusResumeUpload, tusUpload } from './tusProtocol';
|
||||
import { HTTPError } from '../../textsecure/Errors';
|
||||
|
||||
const defaultFileReader: TusFileReader = (filePath, offset) => {
|
||||
return createReadStream(filePath, { start: offset });
|
||||
};
|
||||
|
||||
/**
|
||||
* @public
|
||||
* Uploads a file to the attachments bucket.
|
||||
* @throws {ResponseError} If the server responded with an error.
|
||||
*/
|
||||
export async function uploadAttachment({
|
||||
host,
|
||||
fileName,
|
||||
filePath,
|
||||
fileSize,
|
||||
checksum,
|
||||
headers = {},
|
||||
signal,
|
||||
}: {
|
||||
host: string;
|
||||
fileName: string;
|
||||
filePath: string;
|
||||
fileSize: number;
|
||||
checksum: string;
|
||||
headers?: Record<string, string>;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<void> {
|
||||
return tusUpload({
|
||||
endpoint: `${host}/upload/attachments`,
|
||||
headers: {
|
||||
...headers,
|
||||
'X-Signal-Checksum-Sha256': checksum,
|
||||
},
|
||||
fileName,
|
||||
filePath,
|
||||
fileSize,
|
||||
reader: defaultFileReader,
|
||||
signal,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @public
|
||||
* Resumes an upload to the attachments bucket.
|
||||
* @throws {ResponseError} If the server responded with an error.
|
||||
*/
|
||||
export async function resumeUploadAttachment({
|
||||
host,
|
||||
fileName,
|
||||
filePath,
|
||||
fileSize,
|
||||
headers = {},
|
||||
signal,
|
||||
}: {
|
||||
host: string;
|
||||
fileName: string;
|
||||
filePath: string;
|
||||
fileSize: number;
|
||||
headers?: Record<string, string>;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<void> {
|
||||
return tusResumeUpload({
|
||||
endpoint: `${host}/upload/attachments`,
|
||||
headers,
|
||||
fileName,
|
||||
filePath,
|
||||
fileSize,
|
||||
reader: defaultFileReader,
|
||||
signal,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Downloads a file with Signal headers.
|
||||
* @throws {ResponseError} If the server responded with an error.
|
||||
* @throws {Error} If the response has no body.
|
||||
*/
|
||||
export async function _doDownload({
|
||||
endpoint,
|
||||
headers = {},
|
||||
filePath,
|
||||
signal,
|
||||
}: {
|
||||
endpoint: string;
|
||||
filePath: string;
|
||||
headers?: Record<string, string>;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<void> {
|
||||
const response = await fetch(endpoint, {
|
||||
method: 'GET',
|
||||
signal,
|
||||
redirect: 'error',
|
||||
headers,
|
||||
});
|
||||
if (!response.ok) {
|
||||
throw HTTPError.fromResponse(response);
|
||||
}
|
||||
if (!response.body) {
|
||||
throw new Error('Response has no body');
|
||||
}
|
||||
const writable = createWriteStream(filePath);
|
||||
await response.body.pipeTo(Writable.toWeb(writable));
|
||||
}
|
||||
|
||||
/**
|
||||
* @public
|
||||
* Downloads a file from the attachments bucket.
|
||||
* @throws {ResponseError} If the server responded with an error.
|
||||
*/
|
||||
export async function downloadAttachment({
|
||||
host,
|
||||
fileName,
|
||||
filePath,
|
||||
headers,
|
||||
signal,
|
||||
}: {
|
||||
host: string;
|
||||
fileName: string;
|
||||
filePath: string;
|
||||
headers?: Record<string, string>;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<void> {
|
||||
return _doDownload({
|
||||
endpoint: `${host}/attachments/${fileName}`,
|
||||
headers,
|
||||
filePath,
|
||||
signal,
|
||||
});
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue