From 8ef0ec706d777c2000163abbce51a8f5825a2010 Mon Sep 17 00:00:00 2001 From: Jamie Kyle <113370520+jamiebuilds-signal@users.noreply.github.com> Date: Tue, 30 Apr 2024 17:57:57 -0700 Subject: [PATCH] Add utilities for using TUS Protocol Co-authored-by: Scott Nonnenberg Co-authored-by: Fedor Indutny --- ts/test-node/util/uploads/helpers.ts | 135 +++++ ts/test-node/util/uploads/tusProtocol_test.ts | 462 ++++++++++++++++++ ts/textsecure/Errors.ts | 8 + ts/util/uploads/tusProtocol.ts | 357 ++++++++++++++ ts/util/uploads/uploads.ts | 136 ++++++ 5 files changed, 1098 insertions(+) create mode 100644 ts/test-node/util/uploads/helpers.ts create mode 100644 ts/test-node/util/uploads/tusProtocol_test.ts create mode 100644 ts/util/uploads/tusProtocol.ts create mode 100644 ts/util/uploads/uploads.ts diff --git a/ts/test-node/util/uploads/helpers.ts b/ts/test-node/util/uploads/helpers.ts new file mode 100644 index 000000000000..39b82736459a --- /dev/null +++ b/ts/test-node/util/uploads/helpers.ts @@ -0,0 +1,135 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only +import { EventEmitter, once } from 'events'; +import { Readable } from 'stream'; +import { createServer } from 'http'; +import type { + IncomingMessage, + ServerResponse, + Server, + OutgoingHttpHeaders, +} from 'http'; +import { strictAssert } from '../../../util/assert'; + +export type NextResponse = Readonly<{ + status: number; + headers: OutgoingHttpHeaders; +}>; + +export type LastRequestData = Readonly<{ + method?: string; + url?: string; + headers: OutgoingHttpHeaders; + body: Buffer; +}>; + +export class TestServer extends EventEmitter { + #server: Server; + #nextResponse: NextResponse = { status: 200, headers: {} }; + #lastRequest: { request: IncomingMessage; body: Buffer } | null = null; + + constructor() { + super(); + this.#server = createServer(this.#onRequest); + } + + async listen(): Promise { + await new Promise(resolve => { + this.#server.listen(0, resolve); + }); + } + + closeLastRequest(): void { + this.#lastRequest?.request.destroy(); + } + + async closeServer(): Promise { + if (!this.#server.listening) { + return; + } + this.#server.closeAllConnections(); + await new Promise((resolve, reject) => { + this.#server.close(error => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); + } + + get endpoint(): string { + const address = this.#server.address(); + strictAssert( + typeof address === 'object' && address != null, + 'address must be an object' + ); + return `http://localhost:${address.port}/}`; + } + + respondWith(status: number, headers: OutgoingHttpHeaders = {}): void { + this.#nextResponse = { status, headers }; + } + + lastRequest(): LastRequestData | null { + const request = this.#lastRequest; + if (request == null) { + return null; + } + return { + method: request.request.method, + url: request.request.url, + headers: request.request.headers, + body: request.body, + }; + } + + #onRequest = (request: IncomingMessage, response: ServerResponse) => { + this.emit('request'); + const nextResponse = this.#nextResponse; + const lastRequest = { request, body: Buffer.alloc(0) }; + this.#lastRequest = lastRequest; + request.on('data', chunk => { + lastRequest.body = Buffer.concat([lastRequest.body, chunk]); + this.emit('data'); + }); + request.on('end', () => { + response.writeHead(nextResponse.status, nextResponse.headers); + this.#nextResponse = { status: 200, headers: {} }; + response.end(); + }); + request.on('error', error => { + response.destroy(error); + }); + }; +} + +export function body( + server: TestServer, + steps: () => AsyncIterator +): Readable { + const iter = steps(); + let first = true; + return new Readable({ + async read(size: number) { + try { + // To make tests more reliable, we want each `yield` in body() to be + // processed before we yield the next chunk. + if (first) { + first = false; + } else { + await once(server, 'data'); + } + const chunk = await iter.next(size); + if (chunk.done) { + this.push(null); + return; + } + this.push(chunk.value); + } catch (error) { + this.destroy(error); + } + }, + }); +} diff --git a/ts/test-node/util/uploads/tusProtocol_test.ts b/ts/test-node/util/uploads/tusProtocol_test.ts new file mode 100644 index 000000000000..2ef59f4b0e4f --- /dev/null +++ b/ts/test-node/util/uploads/tusProtocol_test.ts @@ -0,0 +1,462 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only +// eslint-disable-next-line @typescript-eslint/no-restricted-imports +import { assert, expect } from 'chai'; +import { + _getUploadMetadataHeader, + _tusCreateWithUploadRequest, + _tusGetCurrentOffsetRequest, + _tusResumeUploadRequest, + tusUpload, +} from '../../../util/uploads/tusProtocol'; +import { TestServer, body } from './helpers'; +import { toLogFormat } from '../../../types/errors'; + +describe('tusProtocol', () => { + describe('_getUploadMetadataHeader', () => { + it('creates key value pairs, with base 64 values', () => { + assert.strictEqual(_getUploadMetadataHeader({}), ''); + assert.strictEqual( + _getUploadMetadataHeader({ + one: 'first', + }), + 'one Zmlyc3Q=' + ); + assert.strictEqual( + _getUploadMetadataHeader({ + one: 'first', + two: 'second', + }), + 'one Zmlyc3Q=,two c2Vjb25k' + ); + }); + }); + + describe('_tusCreateWithUploadRequest', () => { + let server: TestServer; + + beforeEach(async () => { + server = new TestServer(); + await server.listen(); + }); + + afterEach(async () => { + await server.closeServer(); + }); + + it('uploads on create', async () => { + server.respondWith(200, {}); + const result = await _tusCreateWithUploadRequest({ + endpoint: server.endpoint, + headers: { + 'custom-header': 'custom-value', + }, + fileName: 'test', + fileSize: 6, + readable: body(server, async function* () { + yield new Uint8Array([1, 2, 3]); + yield new Uint8Array([4, 5, 6]); + }), + }); + assert.strictEqual(result, true); + assert.strictEqual(server.lastRequest()?.body.byteLength, 6); + assert.strictEqual( + server.lastRequest()?.body.toString('hex'), + '010203040506' + ); + assert.strictEqual(server.lastRequest()?.method, 'POST'); + assert.deepOwnInclude(server.lastRequest()?.headers, { + 'tus-resumable': '1.0.0', + 'upload-length': '6', + 'upload-metadata': 'filename dGVzdA==', + 'content-type': 'application/offset+octet-stream', + 'custom-header': 'custom-value', + }); + }); + + it('gracefully handles server connection closing', async () => { + const result = await _tusCreateWithUploadRequest({ + endpoint: server.endpoint, + headers: {}, + fileName: 'test', + fileSize: 0, + readable: body(server, async function* () { + yield new Uint8Array([1, 2, 3]); + await server.closeServer(); + yield new Uint8Array([4, 5, 6]); + }), + }); + assert.strictEqual(result, false); + assert.strictEqual(server.lastRequest()?.body.byteLength, 3); + assert.strictEqual(server.lastRequest()?.body.toString('hex'), '010203'); + }); + + it('gracefully handles being aborted', async () => { + const controller = new AbortController(); + const result = await _tusCreateWithUploadRequest({ + endpoint: server.endpoint, + headers: {}, + fileName: 'test', + fileSize: 0, + signal: controller.signal, + readable: body(server, async function* () { + yield new Uint8Array([1, 2, 3]); + controller.abort(); + yield new Uint8Array([4, 5, 6]); + }), + }); + assert.strictEqual(result, false); + assert.strictEqual(server.lastRequest()?.body.byteLength, 3); + assert.strictEqual(server.lastRequest()?.body.toString('hex'), '010203'); + }); + + it('reports progress', async () => { + let progress = 0; + const result = await _tusCreateWithUploadRequest({ + endpoint: server.endpoint, + headers: {}, + fileName: 'test', + fileSize: 6, + onProgress: bytesUploaded => { + progress = bytesUploaded; + }, + readable: body(server, async function* () { + yield new Uint8Array([1, 2, 3]); + assert.strictEqual(progress, 3); + yield new Uint8Array([4, 5, 6]); + assert.strictEqual(progress, 6); + }), + }); + assert.strictEqual(result, true); + }); + + it('reports caught errors', async () => { + let caughtError: Error | undefined; + const result = await _tusCreateWithUploadRequest({ + endpoint: server.endpoint, + headers: {}, + fileName: 'test', + fileSize: 6, + onCaughtError: error => { + caughtError = error; + }, + readable: body(server, async function* () { + yield new Uint8Array([1, 2, 3]); + throw new Error('test'); + }), + }); + assert.strictEqual(result, false); + assert.strictEqual(caughtError?.message, 'fetch failed'); + }); + }); + + describe('_tusGetCurrentOffsetRequest', () => { + let server: TestServer; + + beforeEach(async () => { + server = new TestServer(); + await server.listen(); + }); + + afterEach(async () => { + await server.closeServer(); + }); + + it('returns the current offset', async () => { + server.respondWith(200, { 'Upload-Offset': '3' }); + const result = await _tusGetCurrentOffsetRequest({ + endpoint: server.endpoint, + headers: { + 'custom-header': 'custom-value', + }, + fileName: 'test', + }); + assert.strictEqual(result, 3); + assert.strictEqual(server.lastRequest()?.method, 'HEAD'); + assert.deepOwnInclude(server.lastRequest()?.headers, { + 'tus-resumable': '1.0.0', + 'custom-header': 'custom-value', + }); + }); + + it('throws on missing offset', async () => { + server.respondWith(200, {}); + await assert.isRejected( + _tusGetCurrentOffsetRequest({ + endpoint: server.endpoint, + headers: {}, + fileName: 'test', + }), + 'getCurrentState: Missing Upload-Offset header' + ); + }); + + it('throws on invalid offset', async () => { + server.respondWith(200, { 'Upload-Offset': '-1' }); + await assert.isRejected( + _tusGetCurrentOffsetRequest({ + endpoint: server.endpoint, + headers: {}, + fileName: 'test', + }), + 'getCurrentState: Invalid Upload-Offset (-1)' + ); + }); + }); + + describe('_tusResumeUploadRequest', () => { + let server: TestServer; + + beforeEach(async () => { + server = new TestServer(); + await server.listen(); + }); + + afterEach(async () => { + await server.closeServer(); + }); + + it('uploads on resume', async () => { + server.respondWith(200, {}); + const result = await _tusResumeUploadRequest({ + endpoint: server.endpoint, + headers: { + 'custom-header': 'custom-value', + }, + fileName: 'test', + uploadOffset: 3, + readable: body(server, async function* () { + // we're resuming from offset 3 + yield new Uint8Array([3, 4, 5]); + yield new Uint8Array([6, 7, 8]); + }), + }); + assert.strictEqual(result, true); + assert.strictEqual(server.lastRequest()?.body.byteLength, 6); + assert.strictEqual( + server.lastRequest()?.body.toString('hex'), + '030405060708' + ); + assert.deepOwnInclude(server.lastRequest()?.headers, { + 'tus-resumable': '1.0.0', + 'upload-offset': '3', + 'content-type': 'application/offset+octet-stream', + 'custom-header': 'custom-value', + }); + }); + + it('gracefully handles server connection closing', async () => { + const result = await _tusResumeUploadRequest({ + endpoint: server.endpoint, + headers: {}, + fileName: 'test', + uploadOffset: 3, + readable: body(server, async function* () { + yield new Uint8Array([1, 2, 3]); + await server.closeServer(); + yield new Uint8Array([4, 5, 6]); + }), + }); + assert.strictEqual(result, false); + assert.strictEqual(server.lastRequest()?.body.byteLength, 3); + assert.strictEqual(server.lastRequest()?.body.toString('hex'), '010203'); + }); + + it('gracefully handles being aborted', async () => { + const controller = new AbortController(); + const result = await _tusResumeUploadRequest({ + endpoint: server.endpoint, + headers: {}, + fileName: 'test', + uploadOffset: 3, + signal: controller.signal, + readable: body(server, async function* () { + yield new Uint8Array([1, 2, 3]); + controller.abort(); + yield new Uint8Array([4, 5, 6]); + }), + }); + assert.strictEqual(result, false); + assert.strictEqual(server.lastRequest()?.body.byteLength, 3); + assert.strictEqual(server.lastRequest()?.body.toString('hex'), '010203'); + }); + + it('reports progress', async () => { + let progress = 0; + const result = await _tusResumeUploadRequest({ + endpoint: server.endpoint, + headers: {}, + fileName: 'test', + uploadOffset: 3, + onProgress: bytesUploaded => { + progress = bytesUploaded; + }, + readable: body(server, async function* () { + yield new Uint8Array([1, 2, 3]); + assert.strictEqual(progress, 3); + yield new Uint8Array([4, 5, 6]); + assert.strictEqual(progress, 6); + }), + }); + assert.strictEqual(result, true); + }); + + it('reports caught errors', async () => { + let caughtError: Error | undefined; + const result = await _tusResumeUploadRequest({ + endpoint: server.endpoint, + headers: {}, + fileName: 'test', + uploadOffset: 3, + onCaughtError: error => { + caughtError = error; + }, + readable: body(server, async function* () { + yield new Uint8Array([1, 2, 3]); + throw new Error('test'); + }), + }); + assert.strictEqual(result, false); + assert.strictEqual(caughtError?.message, 'fetch failed'); + }); + }); + + describe('tusUpload', () => { + let server: TestServer; + + function assertSocketCloseError(error: unknown) { + // There isn't an equivalent to this chain in assert() + expect(error, toLogFormat(error)) + .property('cause') + .property('code') + .oneOf(['ECONNRESET', 'UND_ERR_SOCKET']); + } + + beforeEach(async () => { + server = new TestServer(); + await server.listen(); + }); + + afterEach(async () => { + await server.closeServer(); + }); + + it('creates and uploads', async () => { + server.respondWith(200, {}); + await tusUpload({ + endpoint: server.endpoint, + headers: { 'mock-header': 'mock-value' }, + fileName: 'mock-file-name', + filePath: 'mock-file-path', + fileSize: 6, + onCaughtError: assertSocketCloseError, + reader: (filePath, offset) => { + assert.strictEqual(offset, undefined); + assert.strictEqual(filePath, 'mock-file-path'); + return body(server, async function* () { + yield new Uint8Array([1, 2, 3]); + yield new Uint8Array([4, 5, 6]); + }); + }, + }); + assert.strictEqual(server.lastRequest()?.body.byteLength, 6); + assert.deepOwnInclude(server.lastRequest()?.headers, { + 'upload-metadata': 'filename bW9jay1maWxlLW5hbWU=', + 'mock-header': 'mock-value', + }); + }); + + it('resumes when initial request fails', async () => { + let cursor = undefined as number | void; + let callCount = 0; + const file = new Uint8Array([1, 2, 3, 4, 5, 6]); + await tusUpload({ + endpoint: server.endpoint, + headers: { 'mock-header': 'mock-value' }, + fileName: 'mock-file-name', + filePath: 'mock-file-path', + fileSize: file.byteLength, + onCaughtError: assertSocketCloseError, + reader: (_filePath, offset) => { + callCount += 1; + assert.strictEqual(offset, cursor); + if (offset != null) { + // Ensure we're checking the offset on the HEAD request on every + // iteration after the first. + assert.strictEqual(server.lastRequest()?.method, 'HEAD'); + } + return body(server, async function* () { + cursor = cursor ?? 0; + const nextChunk = file.subarray(cursor, (cursor += 2)); + if (offset === undefined) { + // Stage 1: Create and upload + yield nextChunk; + server.closeLastRequest(); + assert.deepOwnInclude(server.lastRequest(), { + method: 'POST', + body: nextChunk, + }); + } else if (offset === 2) { + // Stage 2: Resume + yield nextChunk; + server.closeLastRequest(); + assert.deepOwnInclude(server.lastRequest(), { + method: 'PATCH', + body: nextChunk, + }); + } else if (offset === 4) { + // Stage 3: Keep looping + yield nextChunk; + // Closing even though this is the last one so we have to check + // HEAD one last time. + server.closeLastRequest(); + assert.deepOwnInclude(server.lastRequest(), { + method: 'PATCH', + body: nextChunk, + }); + } else { + assert.fail('Unexpected offset'); + } + server.respondWith(200, { 'Upload-Offset': cursor }); + }); + }, + }); + // Last request should have checked length and seen it was done. + assert.strictEqual(server.lastRequest()?.method, 'HEAD'); + assert.strictEqual(callCount, 3); + }); + + it('should resume from wherever the server says it got to', async () => { + let nextExpectedOffset = undefined as number | void; + let callCount = 0; + const file = new Uint8Array([1, 2, 3, 4, 5, 6]); + await tusUpload({ + endpoint: server.endpoint, + headers: { 'mock-header': 'mock-value' }, + fileName: 'mock-file-name', + filePath: 'mock-file-path', + fileSize: file.byteLength, + onCaughtError: assertSocketCloseError, + reader: (_filePath, offset) => { + callCount += 1; + assert.strictEqual(offset, nextExpectedOffset); + return body(server, async function* () { + if (offset === undefined) { + yield file.subarray(0, 3); + yield file.subarray(3, 6); + nextExpectedOffset = 3; + server.closeLastRequest(); + // For this test lets pretend this as far as we were able to save + server.respondWith(200, { 'Upload-Offset': 3 }); + } else if (offset === 3) { + yield file.subarray(3, 6); + } else { + assert.fail('Unexpected offset'); + } + }); + }, + }); + assert.strictEqual(callCount, 2); + }); + }); +}); diff --git a/ts/textsecure/Errors.ts b/ts/textsecure/Errors.ts index 7c44652ff991..88bd7a1bf149 100644 --- a/ts/textsecure/Errors.ts +++ b/ts/textsecure/Errors.ts @@ -25,6 +25,14 @@ export class HTTPError extends Error { public readonly response: unknown; + static fromResponse(response: Response): HTTPError { + return new HTTPError(response.statusText, { + code: response.status, + headers: Object.fromEntries(response.headers), + response, + }); + } + constructor( message: string, options: { diff --git a/ts/util/uploads/tusProtocol.ts b/ts/util/uploads/tusProtocol.ts new file mode 100644 index 000000000000..119acfe086cd --- /dev/null +++ b/ts/util/uploads/tusProtocol.ts @@ -0,0 +1,357 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only +import { type Readable } from 'node:stream'; + +import { HTTPError } from '../../textsecure/Errors'; +import * as log from '../../logging/log'; +import * as Errors from '../../types/errors'; +import { sleep } from '../sleep'; +import { FIBONACCI_TIMEOUTS, BackOff } from '../BackOff'; + +const DEFAULT_MAX_RETRIES = 3; + +function toLogId(input: string) { + return Buffer.from(input).toString('base64').slice(0, 3); +} + +/** + * This file is a standalone implementation of the TUS protocol. + * Signal specific logic is in uploads.ts + */ + +export type TusFileReader = (filePath: string, offset?: number) => Readable; + +/** + * @private + * https://tus.io/protocols/resumable-upload#upload-metadata + */ +export function _getUploadMetadataHeader( + params: Record +): string { + return Object.entries(params) + .map(([key, value]) => { + return `${key} ${Buffer.from(value).toString('base64')}`; + }) + .join(','); +} + +function addProgressHandler( + readable: Readable, + onProgress: (progress: number) => void +): void { + let bytesUploaded = 0; + // Explicitly stop the flow, otherwise we might emit 'data' before `fetch()` + // starts reading the stream. + readable.pause(); + readable.on('data', (chunk: Buffer) => { + bytesUploaded += chunk.byteLength; + onProgress(bytesUploaded); + }); +} + +/** + * @private + * Generic TUS POST implementation with creation-with-upload. + * @returns {boolean} `true` if the upload completed, `false` if interrupted. + * @throws {ResponseError} If the server responded with an error. + * @see https://tus.io/protocols/resumable-upload#creation-with-upload + */ +export async function _tusCreateWithUploadRequest({ + endpoint, + headers, + fileName, + fileSize, + readable, + onProgress, + onCaughtError, + signal, +}: { + endpoint: string; + headers: Record; + fileName: string; + fileSize: number; + readable: Readable; + onProgress?: (bytesUploaded: number) => void; + onCaughtError?: (error: Error) => void; + signal?: AbortSignal; +}): Promise { + const logId = `tusProtocol: CreateWithUpload(${toLogId(fileName)})`; + if (onProgress != null) { + addProgressHandler(readable, onProgress); + } + + let response: Response; + try { + log.info(`${logId} init`); + response = await fetch(endpoint, { + method: 'POST', + signal, + // @ts-expect-error: `duplex` is missing from TypeScript's `RequestInit`. + duplex: 'half', + headers: { + ...headers, + 'Tus-Resumable': '1.0.0', + 'Upload-Length': String(fileSize), + 'Upload-Metadata': _getUploadMetadataHeader({ + filename: fileName, + }), + 'Content-Type': 'application/offset+octet-stream', + }, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + body: readable as any, + }); + } catch (error) { + log.error(`${logId} closed without response`, Errors.toLogFormat(error)); + onCaughtError?.(error); + return false; + } + if (!response.ok) { + log.error(`${logId} error (${response.status})`); + throw HTTPError.fromResponse(response); + } + log.info(`${logId} success (${response.status})`); + return true; +} + +function isPositiveInteger(value: number): value is number { + return Number.isInteger(value) && value >= 0; +} + +/** + * @private + * Generic TUS HEAD implementation. + * @returns {number} The current offset of the upload. + * @throws {ResponseError} If the server responded with an error. + * @throws {Error} If the server responded with an invalid Upload-Offset header. + * @see https://tus.io/protocols/resumable-upload#head + */ +export async function _tusGetCurrentOffsetRequest({ + endpoint, + headers, + fileName, + signal, +}: { + endpoint: string; + headers: Record; + fileName: string; + signal?: AbortSignal; +}): Promise { + const logId = `tusProtocol: GetCurrentOffsetRequest(${toLogId(fileName)})`; + log.info(`${logId} init`); + + const response = await fetch(`${endpoint}/${fileName}`, { + method: 'HEAD', + signal, + headers: { + ...headers, + 'Tus-Resumable': '1.0.0', + }, + }); + if (!response.ok) { + log.error(`${logId} error (${response.status})`); + throw HTTPError.fromResponse(response); + } + + log.info(`${logId} success (${response.status})`); + const header = response.headers.get('Upload-Offset'); + if (header == null) { + throw new Error('getCurrentState: Missing Upload-Offset header'); + } + + const result = Number(header); + if (!isPositiveInteger(result)) { + throw new Error(`getCurrentState: Invalid Upload-Offset (${header})`); + } + + log.info(`${logId} current offset (${result})`); + return result; +} + +/** + * @private + * Generic TUS PATCH implementation. + * @returns {boolean} `true` if the upload completed, `false` if interrupted. + * @throws {ResponseError} If the server responded with an error. + * @see https://tus.io/protocols/resumable-upload#patch + */ +export async function _tusResumeUploadRequest({ + endpoint, + headers, + fileName, + uploadOffset, + readable, + onProgress, + onCaughtError, + signal, +}: { + endpoint: string; + headers: Record; + fileName: string; + uploadOffset: number; + readable: Readable; + onProgress?: (bytesUploaded: number) => void; + onCaughtError?: (error: Error) => void; + signal?: AbortSignal; +}): Promise { + const logId = `tusProtocol: ResumeUploadRequest(${toLogId(fileName)})`; + if (onProgress != null) { + addProgressHandler(readable, onProgress); + } + + let response: Response; + try { + log.info(`${logId} init`); + response = await fetch(`${endpoint}/${fileName}`, { + method: 'PATCH', + signal, + // @ts-expect-error: `duplex` is missing from TypeScript's `RequestInit`. + duplex: 'half', + headers: { + ...headers, + 'Tus-Resumable': '1.0.0', + 'Upload-Offset': String(uploadOffset), + 'Content-Type': 'application/offset+octet-stream', + }, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + body: readable as any, + }); + } catch (error) { + log.error(`${logId} closed without response`, Errors.toLogFormat(error)); + onCaughtError?.(error); + return false; + } + if (!response.ok) { + log.error(`${logId} error (${response.status})`); + throw HTTPError.fromResponse(response); + } + log.info(`${logId} success (${response.status})`); + return true; +} + +/** + * Attempts to upload a file using the TUS protocol with Signal headers, and + * resumes the upload if it was interrupted. + * @throws {ResponseError} If the server responded with an error. + */ +export async function tusUpload({ + endpoint, + headers, + fileName, + filePath, + fileSize, + reader, + onProgress, + onCaughtError, + maxRetries = DEFAULT_MAX_RETRIES, + signal, +}: { + endpoint: string; + headers: Record; + fileName: string; + filePath: string; + fileSize: number; + reader: TusFileReader; + onProgress?: (bytesUploaded: number) => void; + onCaughtError?: (error: Error) => void; + maxRetries?: number; + signal?: AbortSignal; +}): Promise { + const readable = reader(filePath); + const done = await _tusCreateWithUploadRequest({ + endpoint, + headers, + fileName, + fileSize, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + readable: readable as any, + onProgress, + onCaughtError, + signal, + }); + if (!done) { + await tusResumeUpload({ + endpoint, + headers, + fileName, + filePath, + fileSize, + reader, + onProgress, + onCaughtError, + maxRetries, + signal, + }); + } +} + +const BACKOFF_JITTER_MS = 100; + +/** + * Attempts to resume an upload using the TUS protocol. + * @throws {ResponseError} If the server responded with an error. + * @param params + */ +export async function tusResumeUpload({ + endpoint, + headers, + fileName, + filePath, + fileSize, + reader, + onProgress, + onCaughtError, + maxRetries = DEFAULT_MAX_RETRIES, + signal, +}: { + endpoint: string; + headers: Record; + fileName: string; + filePath: string; + fileSize: number; + reader: TusFileReader; + onProgress?: (bytesUploaded: number) => void; + onCaughtError?: (error: Error) => void; + maxRetries?: number; + signal?: AbortSignal; +}): Promise { + const backoff = new BackOff(FIBONACCI_TIMEOUTS, { + jitter: BACKOFF_JITTER_MS, + }); + + let retryAttempts = 0; + while (retryAttempts < maxRetries) { + // eslint-disable-next-line no-await-in-loop + await sleep(backoff.getAndIncrement()); + retryAttempts += 1; + + // eslint-disable-next-line no-await-in-loop + const uploadOffset = await _tusGetCurrentOffsetRequest({ + endpoint, + headers, + fileName, + signal, + }); + + if (uploadOffset === fileSize) { + break; + } + + const readable = reader(filePath, uploadOffset); + + // eslint-disable-next-line no-await-in-loop + const done = await _tusResumeUploadRequest({ + endpoint, + headers, + fileName, + uploadOffset, + readable, + onProgress, + onCaughtError, + signal, + }); + + if (done) { + break; + } + } +} diff --git a/ts/util/uploads/uploads.ts b/ts/util/uploads/uploads.ts new file mode 100644 index 000000000000..e9c280563db1 --- /dev/null +++ b/ts/util/uploads/uploads.ts @@ -0,0 +1,136 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only +import { createReadStream, createWriteStream } from 'node:fs'; +import { Writable } from 'node:stream'; +import type { TusFileReader } from './tusProtocol'; +import { tusResumeUpload, tusUpload } from './tusProtocol'; +import { HTTPError } from '../../textsecure/Errors'; + +const defaultFileReader: TusFileReader = (filePath, offset) => { + return createReadStream(filePath, { start: offset }); +}; + +/** + * @public + * Uploads a file to the attachments bucket. + * @throws {ResponseError} If the server responded with an error. + */ +export async function uploadAttachment({ + host, + fileName, + filePath, + fileSize, + checksum, + headers = {}, + signal, +}: { + host: string; + fileName: string; + filePath: string; + fileSize: number; + checksum: string; + headers?: Record; + signal?: AbortSignal; +}): Promise { + return tusUpload({ + endpoint: `${host}/upload/attachments`, + headers: { + ...headers, + 'X-Signal-Checksum-Sha256': checksum, + }, + fileName, + filePath, + fileSize, + reader: defaultFileReader, + signal, + }); +} + +/** + * @public + * Resumes an upload to the attachments bucket. + * @throws {ResponseError} If the server responded with an error. + */ +export async function resumeUploadAttachment({ + host, + fileName, + filePath, + fileSize, + headers = {}, + signal, +}: { + host: string; + fileName: string; + filePath: string; + fileSize: number; + headers?: Record; + signal?: AbortSignal; +}): Promise { + return tusResumeUpload({ + endpoint: `${host}/upload/attachments`, + headers, + fileName, + filePath, + fileSize, + reader: defaultFileReader, + signal, + }); +} + +/** + * Downloads a file with Signal headers. + * @throws {ResponseError} If the server responded with an error. + * @throws {Error} If the response has no body. + */ +export async function _doDownload({ + endpoint, + headers = {}, + filePath, + signal, +}: { + endpoint: string; + filePath: string; + headers?: Record; + signal?: AbortSignal; +}): Promise { + const response = await fetch(endpoint, { + method: 'GET', + signal, + redirect: 'error', + headers, + }); + if (!response.ok) { + throw HTTPError.fromResponse(response); + } + if (!response.body) { + throw new Error('Response has no body'); + } + const writable = createWriteStream(filePath); + await response.body.pipeTo(Writable.toWeb(writable)); +} + +/** + * @public + * Downloads a file from the attachments bucket. + * @throws {ResponseError} If the server responded with an error. + */ +export async function downloadAttachment({ + host, + fileName, + filePath, + headers, + signal, +}: { + host: string; + fileName: string; + filePath: string; + headers?: Record; + signal?: AbortSignal; +}): Promise { + return _doDownload({ + endpoint: `${host}/attachments/${fileName}`, + headers, + filePath, + signal, + }); +}