Use libsignal PassThroughs instead of our own
This commit is contained in:
parent
fa25d5aaf8
commit
1b8be6a3d1
5 changed files with 10 additions and 218 deletions
|
@ -12,8 +12,9 @@ import z from 'zod';
|
||||||
import * as rimraf from 'rimraf';
|
import * as rimraf from 'rimraf';
|
||||||
import LRU from 'lru-cache';
|
import LRU from 'lru-cache';
|
||||||
import {
|
import {
|
||||||
|
DigestingPassThrough,
|
||||||
inferChunkSize,
|
inferChunkSize,
|
||||||
DigestingWritable,
|
ValidatingPassThrough,
|
||||||
} from '@signalapp/libsignal-client/dist/incremental_mac';
|
} from '@signalapp/libsignal-client/dist/incremental_mac';
|
||||||
import { RangeFinder, DefaultStorage } from '@indutny/range-finder';
|
import { RangeFinder, DefaultStorage } from '@indutny/range-finder';
|
||||||
import {
|
import {
|
||||||
|
@ -49,7 +50,6 @@ import { safeParseInteger } from '../ts/util/numbers';
|
||||||
import { SECOND } from '../ts/util/durations';
|
import { SECOND } from '../ts/util/durations';
|
||||||
import { drop } from '../ts/util/drop';
|
import { drop } from '../ts/util/drop';
|
||||||
import { strictAssert } from '../ts/util/assert';
|
import { strictAssert } from '../ts/util/assert';
|
||||||
import { ValidatingPassThrough } from '../ts/util/ValidatingPassThrough';
|
|
||||||
import { toWebStream } from '../ts/util/toWebStream';
|
import { toWebStream } from '../ts/util/toWebStream';
|
||||||
import {
|
import {
|
||||||
isImageTypeSupported,
|
isImageTypeSupported,
|
||||||
|
@ -115,7 +115,10 @@ async function safeDecryptToSink(
|
||||||
let entry = digestLRU.get(ctx.path);
|
let entry = digestLRU.get(ctx.path);
|
||||||
if (!entry) {
|
if (!entry) {
|
||||||
const key = randomBytes(32);
|
const key = randomBytes(32);
|
||||||
const writable = new DigestingWritable(key, chunkSize);
|
const digester = new DigestingPassThrough(key, chunkSize);
|
||||||
|
|
||||||
|
// Important to do this so the pipeline() returns in the decrypt call below
|
||||||
|
digester.resume();
|
||||||
|
|
||||||
const controller = new AbortController();
|
const controller = new AbortController();
|
||||||
|
|
||||||
|
@ -124,7 +127,7 @@ async function safeDecryptToSink(
|
||||||
// to handle errors on `sink` while generating digest in case whole
|
// to handle errors on `sink` while generating digest in case whole
|
||||||
// request get cancelled early.
|
// request get cancelled early.
|
||||||
once(sink, 'non-error-event', { signal: controller.signal }),
|
once(sink, 'non-error-event', { signal: controller.signal }),
|
||||||
decryptAttachmentV2ToSink(options, writable),
|
decryptAttachmentV2ToSink(options, digester),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
// Stop handling errors on sink
|
// Stop handling errors on sink
|
||||||
|
@ -132,7 +135,7 @@ async function safeDecryptToSink(
|
||||||
|
|
||||||
entry = {
|
entry = {
|
||||||
key,
|
key,
|
||||||
digest: writable.getFinalDigest(),
|
digest: digester.getFinalDigest(),
|
||||||
};
|
};
|
||||||
digestLRU.set(ctx.path, entry);
|
digestLRU.set(ctx.path, entry);
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,8 +12,10 @@ import { isNumber } from 'lodash';
|
||||||
import { ensureFile } from 'fs-extra';
|
import { ensureFile } from 'fs-extra';
|
||||||
import {
|
import {
|
||||||
chunkSizeInBytes,
|
chunkSizeInBytes,
|
||||||
|
DigestingPassThrough,
|
||||||
everyNthByte,
|
everyNthByte,
|
||||||
inferChunkSize,
|
inferChunkSize,
|
||||||
|
ValidatingPassThrough,
|
||||||
} from '@signalapp/libsignal-client/dist/incremental_mac';
|
} from '@signalapp/libsignal-client/dist/incremental_mac';
|
||||||
import type { ChunkSizeChoice } from '@signalapp/libsignal-client/dist/incremental_mac';
|
import type { ChunkSizeChoice } from '@signalapp/libsignal-client/dist/incremental_mac';
|
||||||
|
|
||||||
|
@ -40,8 +42,6 @@ import { isNotNil } from './util/isNotNil';
|
||||||
import { missingCaseError } from './util/missingCaseError';
|
import { missingCaseError } from './util/missingCaseError';
|
||||||
import { getEnvironment, Environment } from './environment';
|
import { getEnvironment, Environment } from './environment';
|
||||||
import { toBase64 } from './Bytes';
|
import { toBase64 } from './Bytes';
|
||||||
import { DigestingPassThrough } from './util/DigestingPassThrough';
|
|
||||||
import { ValidatingPassThrough } from './util/ValidatingPassThrough';
|
|
||||||
|
|
||||||
// This file was split from ts/Crypto.ts because it pulls things in from node, and
|
// This file was split from ts/Crypto.ts because it pulls things in from node, and
|
||||||
// too many things pull in Crypto.ts, so it broke storybook.
|
// too many things pull in Crypto.ts, so it broke storybook.
|
||||||
|
|
|
@ -1,72 +0,0 @@
|
||||||
// Copyright 2024 Signal Messenger, LLC
|
|
||||||
// SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
|
|
||||||
import { assert } from 'chai';
|
|
||||||
import { randomBytes } from 'node:crypto';
|
|
||||||
import { Readable } from 'node:stream';
|
|
||||||
import { pipeline, finished } from 'node:stream/promises';
|
|
||||||
import {
|
|
||||||
inferChunkSize,
|
|
||||||
DigestingWritable,
|
|
||||||
} from '@signalapp/libsignal-client/dist/incremental_mac';
|
|
||||||
|
|
||||||
import { ValidatingPassThrough } from '../../util/ValidatingPassThrough';
|
|
||||||
|
|
||||||
// Use uneven chunk size to trigger buffering
|
|
||||||
const CHUNK_SIZE = 13579;
|
|
||||||
|
|
||||||
function toChunkedReadable(buffer: Buffer): Readable {
|
|
||||||
const chunked = new Array<Buffer>();
|
|
||||||
for (let i = 0; i < buffer.byteLength; i += CHUNK_SIZE) {
|
|
||||||
chunked.push(buffer.subarray(i, i + CHUNK_SIZE));
|
|
||||||
}
|
|
||||||
|
|
||||||
return Readable.from(chunked);
|
|
||||||
}
|
|
||||||
|
|
||||||
describe('ValidatingPassThrough', () => {
|
|
||||||
it('should emit whole source stream', async () => {
|
|
||||||
const source = randomBytes(10 * 1024 * 1024);
|
|
||||||
const key = randomBytes(32);
|
|
||||||
|
|
||||||
const chunkSize = inferChunkSize(source.byteLength);
|
|
||||||
const writable = new DigestingWritable(key, chunkSize);
|
|
||||||
await pipeline(Readable.from(source), writable);
|
|
||||||
|
|
||||||
const digest = writable.getFinalDigest();
|
|
||||||
const validator = new ValidatingPassThrough(key, chunkSize, digest);
|
|
||||||
|
|
||||||
const received = new Array<Buffer>();
|
|
||||||
validator.on('data', chunk => received.push(chunk));
|
|
||||||
|
|
||||||
await Promise.all([
|
|
||||||
pipeline(toChunkedReadable(source), validator),
|
|
||||||
finished(validator),
|
|
||||||
]);
|
|
||||||
|
|
||||||
const actual = Buffer.concat(received);
|
|
||||||
assert.isTrue(actual.equals(source));
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should emit error on digest mismatch', async () => {
|
|
||||||
const source = randomBytes(10 * 1024 * 1024);
|
|
||||||
const key = randomBytes(32);
|
|
||||||
|
|
||||||
const chunkSize = inferChunkSize(source.byteLength);
|
|
||||||
const writable = new DigestingWritable(key, chunkSize);
|
|
||||||
await pipeline(Readable.from(source), writable);
|
|
||||||
|
|
||||||
const digest = writable.getFinalDigest();
|
|
||||||
const wrongKey = randomBytes(32);
|
|
||||||
const validator = new ValidatingPassThrough(wrongKey, chunkSize, digest);
|
|
||||||
|
|
||||||
validator.on('data', () => {
|
|
||||||
throw new Error('Should not be called');
|
|
||||||
});
|
|
||||||
|
|
||||||
await assert.isRejected(
|
|
||||||
pipeline(toChunkedReadable(source), validator),
|
|
||||||
'Corrupted input data'
|
|
||||||
);
|
|
||||||
});
|
|
||||||
});
|
|
|
@ -1,52 +0,0 @@
|
||||||
// Copyright 2024 Signal Messenger, LLC
|
|
||||||
// SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
|
|
||||||
import { Transform } from 'stream';
|
|
||||||
|
|
||||||
import { DigestingWritable } from '@signalapp/libsignal-client/dist/incremental_mac';
|
|
||||||
|
|
||||||
import type { ChunkSizeChoice } from '@signalapp/libsignal-client/dist/incremental_mac';
|
|
||||||
|
|
||||||
type CallbackType = (error?: Error | null) => void;
|
|
||||||
|
|
||||||
export class DigestingPassThrough extends Transform {
|
|
||||||
private digester: DigestingWritable;
|
|
||||||
|
|
||||||
constructor(key: Buffer, sizeChoice: ChunkSizeChoice) {
|
|
||||||
super();
|
|
||||||
this.digester = new DigestingWritable(key, sizeChoice);
|
|
||||||
|
|
||||||
// We handle errors coming from write/end
|
|
||||||
this.digester.on('error', () => {
|
|
||||||
/* noop */
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
getFinalDigest(): Buffer {
|
|
||||||
return this.digester.getFinalDigest();
|
|
||||||
}
|
|
||||||
|
|
||||||
public override _transform(
|
|
||||||
data: Buffer,
|
|
||||||
enc: BufferEncoding,
|
|
||||||
callback: CallbackType
|
|
||||||
): void {
|
|
||||||
this.push(data);
|
|
||||||
this.digester.write(data, enc, err => {
|
|
||||||
if (err) {
|
|
||||||
return callback(err);
|
|
||||||
}
|
|
||||||
callback();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public override _final(callback: CallbackType): void {
|
|
||||||
this.digester.end((err?: Error) => {
|
|
||||||
if (err) {
|
|
||||||
return callback(err);
|
|
||||||
}
|
|
||||||
|
|
||||||
callback();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,87 +0,0 @@
|
||||||
// Copyright 2024 Signal Messenger, LLC
|
|
||||||
// SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
|
|
||||||
import { noop } from 'lodash';
|
|
||||||
import { Transform } from 'node:stream';
|
|
||||||
import {
|
|
||||||
ValidatingWritable,
|
|
||||||
type ChunkSizeChoice,
|
|
||||||
} from '@signalapp/libsignal-client/dist/incremental_mac';
|
|
||||||
|
|
||||||
export class ValidatingPassThrough extends Transform {
|
|
||||||
private validator: ValidatingWritable;
|
|
||||||
private buffer = new Array<Buffer>();
|
|
||||||
|
|
||||||
constructor(key: Buffer, sizeChoice: ChunkSizeChoice, digest: Buffer) {
|
|
||||||
super();
|
|
||||||
this.validator = new ValidatingWritable(key, sizeChoice, digest);
|
|
||||||
|
|
||||||
// We handle errors coming from write/end
|
|
||||||
this.validator.on('error', noop);
|
|
||||||
}
|
|
||||||
|
|
||||||
public override _transform(
|
|
||||||
data: Buffer,
|
|
||||||
enc: BufferEncoding,
|
|
||||||
callback: (error?: null | Error) => void
|
|
||||||
): void {
|
|
||||||
const start = this.validator.validatedSize();
|
|
||||||
this.validator.write(data, enc, err => {
|
|
||||||
if (err) {
|
|
||||||
return callback(err);
|
|
||||||
}
|
|
||||||
|
|
||||||
this.buffer.push(data);
|
|
||||||
|
|
||||||
const end = this.validator.validatedSize();
|
|
||||||
const readySize = end - start;
|
|
||||||
|
|
||||||
// Fully buffer
|
|
||||||
if (readySize === 0) {
|
|
||||||
return callback(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
const { buffer } = this;
|
|
||||||
this.buffer = [];
|
|
||||||
let validated = 0;
|
|
||||||
for (const chunk of buffer) {
|
|
||||||
validated += chunk.byteLength;
|
|
||||||
|
|
||||||
// Buffered chunk is fully validated - push it without slicing
|
|
||||||
if (validated <= readySize) {
|
|
||||||
this.push(chunk);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Validation boundary lies within the chunk, split it
|
|
||||||
const notValidated = validated - readySize;
|
|
||||||
this.push(chunk.subarray(0, -notValidated));
|
|
||||||
this.buffer.push(chunk.subarray(-notValidated));
|
|
||||||
|
|
||||||
// Technically this chunk must be the last chunk so we could break,
|
|
||||||
// but for consistency keep looping.
|
|
||||||
}
|
|
||||||
callback(null);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public override _final(callback: (error?: null | Error) => void): void {
|
|
||||||
const start = this.validator.validatedSize();
|
|
||||||
this.validator.end((err?: Error) => {
|
|
||||||
if (err) {
|
|
||||||
return callback(err);
|
|
||||||
}
|
|
||||||
|
|
||||||
const end = this.validator.validatedSize();
|
|
||||||
const readySize = end - start;
|
|
||||||
const buffer = Buffer.concat(this.buffer);
|
|
||||||
this.buffer = [];
|
|
||||||
if (buffer.byteLength !== readySize) {
|
|
||||||
return callback(new Error('Stream not fully processed'));
|
|
||||||
}
|
|
||||||
this.push(buffer);
|
|
||||||
|
|
||||||
callback(null);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in a new issue