diff --git a/app/attachment_channel.ts b/app/attachment_channel.ts index 1cf9ca48fec2..535174625e44 100644 --- a/app/attachment_channel.ts +++ b/app/attachment_channel.ts @@ -12,8 +12,9 @@ import z from 'zod'; import * as rimraf from 'rimraf'; import LRU from 'lru-cache'; import { + DigestingPassThrough, inferChunkSize, - DigestingWritable, + ValidatingPassThrough, } from '@signalapp/libsignal-client/dist/incremental_mac'; import { RangeFinder, DefaultStorage } from '@indutny/range-finder'; import { @@ -49,7 +50,6 @@ import { safeParseInteger } from '../ts/util/numbers'; import { SECOND } from '../ts/util/durations'; import { drop } from '../ts/util/drop'; import { strictAssert } from '../ts/util/assert'; -import { ValidatingPassThrough } from '../ts/util/ValidatingPassThrough'; import { toWebStream } from '../ts/util/toWebStream'; import { isImageTypeSupported, @@ -115,7 +115,10 @@ async function safeDecryptToSink( let entry = digestLRU.get(ctx.path); if (!entry) { const key = randomBytes(32); - const writable = new DigestingWritable(key, chunkSize); + const digester = new DigestingPassThrough(key, chunkSize); + + // Important to do this so the pipeline() returns in the decrypt call below + digester.resume(); const controller = new AbortController(); @@ -124,7 +127,7 @@ async function safeDecryptToSink( // to handle errors on `sink` while generating digest in case whole // request get cancelled early. once(sink, 'non-error-event', { signal: controller.signal }), - decryptAttachmentV2ToSink(options, writable), + decryptAttachmentV2ToSink(options, digester), ]); // Stop handling errors on sink @@ -132,7 +135,7 @@ async function safeDecryptToSink( entry = { key, - digest: writable.getFinalDigest(), + digest: digester.getFinalDigest(), }; digestLRU.set(ctx.path, entry); } diff --git a/ts/AttachmentCrypto.ts b/ts/AttachmentCrypto.ts index 1979e6ecaa69..4a539da8ed63 100644 --- a/ts/AttachmentCrypto.ts +++ b/ts/AttachmentCrypto.ts @@ -12,8 +12,10 @@ import { isNumber } from 'lodash'; import { ensureFile } from 'fs-extra'; import { chunkSizeInBytes, + DigestingPassThrough, everyNthByte, inferChunkSize, + ValidatingPassThrough, } from '@signalapp/libsignal-client/dist/incremental_mac'; import type { ChunkSizeChoice } from '@signalapp/libsignal-client/dist/incremental_mac'; @@ -40,8 +42,6 @@ import { isNotNil } from './util/isNotNil'; import { missingCaseError } from './util/missingCaseError'; import { getEnvironment, Environment } from './environment'; import { toBase64 } from './Bytes'; -import { DigestingPassThrough } from './util/DigestingPassThrough'; -import { ValidatingPassThrough } from './util/ValidatingPassThrough'; // This file was split from ts/Crypto.ts because it pulls things in from node, and // too many things pull in Crypto.ts, so it broke storybook. diff --git a/ts/test-node/util/ValidatingPassThrough_test.ts b/ts/test-node/util/ValidatingPassThrough_test.ts deleted file mode 100644 index 735984e33923..000000000000 --- a/ts/test-node/util/ValidatingPassThrough_test.ts +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright 2024 Signal Messenger, LLC -// SPDX-License-Identifier: AGPL-3.0-only - -import { assert } from 'chai'; -import { randomBytes } from 'node:crypto'; -import { Readable } from 'node:stream'; -import { pipeline, finished } from 'node:stream/promises'; -import { - inferChunkSize, - DigestingWritable, -} from '@signalapp/libsignal-client/dist/incremental_mac'; - -import { ValidatingPassThrough } from '../../util/ValidatingPassThrough'; - -// Use uneven chunk size to trigger buffering -const CHUNK_SIZE = 13579; - -function toChunkedReadable(buffer: Buffer): Readable { - const chunked = new Array(); - for (let i = 0; i < buffer.byteLength; i += CHUNK_SIZE) { - chunked.push(buffer.subarray(i, i + CHUNK_SIZE)); - } - - return Readable.from(chunked); -} - -describe('ValidatingPassThrough', () => { - it('should emit whole source stream', async () => { - const source = randomBytes(10 * 1024 * 1024); - const key = randomBytes(32); - - const chunkSize = inferChunkSize(source.byteLength); - const writable = new DigestingWritable(key, chunkSize); - await pipeline(Readable.from(source), writable); - - const digest = writable.getFinalDigest(); - const validator = new ValidatingPassThrough(key, chunkSize, digest); - - const received = new Array(); - validator.on('data', chunk => received.push(chunk)); - - await Promise.all([ - pipeline(toChunkedReadable(source), validator), - finished(validator), - ]); - - const actual = Buffer.concat(received); - assert.isTrue(actual.equals(source)); - }); - - it('should emit error on digest mismatch', async () => { - const source = randomBytes(10 * 1024 * 1024); - const key = randomBytes(32); - - const chunkSize = inferChunkSize(source.byteLength); - const writable = new DigestingWritable(key, chunkSize); - await pipeline(Readable.from(source), writable); - - const digest = writable.getFinalDigest(); - const wrongKey = randomBytes(32); - const validator = new ValidatingPassThrough(wrongKey, chunkSize, digest); - - validator.on('data', () => { - throw new Error('Should not be called'); - }); - - await assert.isRejected( - pipeline(toChunkedReadable(source), validator), - 'Corrupted input data' - ); - }); -}); diff --git a/ts/util/DigestingPassThrough.ts b/ts/util/DigestingPassThrough.ts deleted file mode 100644 index 8372cb21920e..000000000000 --- a/ts/util/DigestingPassThrough.ts +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2024 Signal Messenger, LLC -// SPDX-License-Identifier: AGPL-3.0-only - -import { Transform } from 'stream'; - -import { DigestingWritable } from '@signalapp/libsignal-client/dist/incremental_mac'; - -import type { ChunkSizeChoice } from '@signalapp/libsignal-client/dist/incremental_mac'; - -type CallbackType = (error?: Error | null) => void; - -export class DigestingPassThrough extends Transform { - private digester: DigestingWritable; - - constructor(key: Buffer, sizeChoice: ChunkSizeChoice) { - super(); - this.digester = new DigestingWritable(key, sizeChoice); - - // We handle errors coming from write/end - this.digester.on('error', () => { - /* noop */ - }); - } - - getFinalDigest(): Buffer { - return this.digester.getFinalDigest(); - } - - public override _transform( - data: Buffer, - enc: BufferEncoding, - callback: CallbackType - ): void { - this.push(data); - this.digester.write(data, enc, err => { - if (err) { - return callback(err); - } - callback(); - }); - } - - public override _final(callback: CallbackType): void { - this.digester.end((err?: Error) => { - if (err) { - return callback(err); - } - - callback(); - }); - } -} diff --git a/ts/util/ValidatingPassThrough.ts b/ts/util/ValidatingPassThrough.ts deleted file mode 100644 index 8a264e7f73df..000000000000 --- a/ts/util/ValidatingPassThrough.ts +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2024 Signal Messenger, LLC -// SPDX-License-Identifier: AGPL-3.0-only - -import { noop } from 'lodash'; -import { Transform } from 'node:stream'; -import { - ValidatingWritable, - type ChunkSizeChoice, -} from '@signalapp/libsignal-client/dist/incremental_mac'; - -export class ValidatingPassThrough extends Transform { - private validator: ValidatingWritable; - private buffer = new Array(); - - constructor(key: Buffer, sizeChoice: ChunkSizeChoice, digest: Buffer) { - super(); - this.validator = new ValidatingWritable(key, sizeChoice, digest); - - // We handle errors coming from write/end - this.validator.on('error', noop); - } - - public override _transform( - data: Buffer, - enc: BufferEncoding, - callback: (error?: null | Error) => void - ): void { - const start = this.validator.validatedSize(); - this.validator.write(data, enc, err => { - if (err) { - return callback(err); - } - - this.buffer.push(data); - - const end = this.validator.validatedSize(); - const readySize = end - start; - - // Fully buffer - if (readySize === 0) { - return callback(null); - } - - const { buffer } = this; - this.buffer = []; - let validated = 0; - for (const chunk of buffer) { - validated += chunk.byteLength; - - // Buffered chunk is fully validated - push it without slicing - if (validated <= readySize) { - this.push(chunk); - continue; - } - - // Validation boundary lies within the chunk, split it - const notValidated = validated - readySize; - this.push(chunk.subarray(0, -notValidated)); - this.buffer.push(chunk.subarray(-notValidated)); - - // Technically this chunk must be the last chunk so we could break, - // but for consistency keep looping. - } - callback(null); - }); - } - - public override _final(callback: (error?: null | Error) => void): void { - const start = this.validator.validatedSize(); - this.validator.end((err?: Error) => { - if (err) { - return callback(err); - } - - const end = this.validator.validatedSize(); - const readySize = end - start; - const buffer = Buffer.concat(this.buffer); - this.buffer = []; - if (buffer.byteLength !== readySize) { - return callback(new Error('Stream not fully processed')); - } - this.push(buffer); - - callback(null); - }); - } -}