Add cache layer for attachment streaming
This commit is contained in:
		
					parent
					
						
							
								7c449dcfed
							
						
					
				
			
			
				commit
				
					
						a1340aa510
					
				
			
		
					 3 changed files with 240 additions and 22 deletions
				
			
		|  | @ -4,9 +4,17 @@ | |||
| import { ipcMain, protocol } from 'electron'; | ||||
| import { createReadStream } from 'node:fs'; | ||||
| import { join, normalize } from 'node:path'; | ||||
| import { Readable, PassThrough } from 'node:stream'; | ||||
| import { Readable, PassThrough, type Writable } from 'node:stream'; | ||||
| import { pipeline } from 'node:stream/promises'; | ||||
| import { randomBytes } from 'node:crypto'; | ||||
| import { once } from 'node:events'; | ||||
| import z from 'zod'; | ||||
| import * as rimraf from 'rimraf'; | ||||
| import LRU from 'lru-cache'; | ||||
| import { | ||||
|   inferChunkSize, | ||||
|   DigestingWritable, | ||||
| } from '@signalapp/libsignal-client/dist/incremental_mac'; | ||||
| import { RangeFinder, DefaultStorage } from '@indutny/range-finder'; | ||||
| import { | ||||
|   getAllAttachments, | ||||
|  | @ -32,6 +40,7 @@ import { safeParseInteger } from '../ts/util/numbers'; | |||
| import { SECOND } from '../ts/util/durations'; | ||||
| import { drop } from '../ts/util/drop'; | ||||
| import { strictAssert } from '../ts/util/assert'; | ||||
| import { ValidatingPassThrough } from '../ts/util/ValidatingPassThrough'; | ||||
| import { decryptAttachmentV2ToSink } from '../ts/AttachmentCrypto'; | ||||
| 
 | ||||
| let initialized = false; | ||||
|  | @ -45,23 +54,81 @@ const CLEANUP_ORPHANED_ATTACHMENTS_KEY = 'cleanup-orphaned-attachments'; | |||
| const INTERACTIVITY_DELAY = 50; | ||||
| 
 | ||||
| type RangeFinderContextType = Readonly< | ||||
|   ( | ||||
|     | { | ||||
|         type: 'ciphertext'; | ||||
|       path: string; | ||||
|         keysBase64: string; | ||||
|         size: number; | ||||
|       } | ||||
|     | { | ||||
|         type: 'plaintext'; | ||||
|       } | ||||
|   ) & { | ||||
|     path: string; | ||||
|   } | ||||
| >; | ||||
| 
 | ||||
| type DigestLRUEntryType = Readonly<{ | ||||
|   key: Buffer; | ||||
|   digest: Buffer; | ||||
| }>; | ||||
| 
 | ||||
| const digestLRU = new LRU<string, DigestLRUEntryType>({ | ||||
|   // The size of each entry is roughgly 8kb per digest + 32 bytes per key. We
 | ||||
|   // mostly need this cache for range requests, so keep it low.
 | ||||
|   max: 100, | ||||
| }); | ||||
| 
 | ||||
| async function safeDecryptToSink( | ||||
|   ...args: Parameters<typeof decryptAttachmentV2ToSink> | ||||
|   ctx: RangeFinderContextType, | ||||
|   sink: Writable | ||||
| ): Promise<void> { | ||||
|   strictAssert(ctx.type === 'ciphertext', 'Cannot decrypt plaintext'); | ||||
| 
 | ||||
|   const options = { | ||||
|     ciphertextPath: ctx.path, | ||||
|     idForLogging: 'attachment_channel', | ||||
|     keysBase64: ctx.keysBase64, | ||||
|     type: 'local' as const, | ||||
|     size: ctx.size, | ||||
|   }; | ||||
| 
 | ||||
|   try { | ||||
|     await decryptAttachmentV2ToSink(...args); | ||||
|     const chunkSize = inferChunkSize(ctx.size); | ||||
|     let entry = digestLRU.get(ctx.path); | ||||
|     if (!entry) { | ||||
|       const key = randomBytes(32); | ||||
|       const writable = new DigestingWritable(key, chunkSize); | ||||
| 
 | ||||
|       const controller = new AbortController(); | ||||
| 
 | ||||
|       await Promise.race([ | ||||
|         // Just use a non-existing event name to wait for an 'error'. We want
 | ||||
|         // to handle errors on `sink` while generating digest in case whole
 | ||||
|         // request get cancelled early.
 | ||||
|         once(sink, 'non-error-event', { signal: controller.signal }), | ||||
|         decryptAttachmentV2ToSink(options, writable), | ||||
|       ]); | ||||
| 
 | ||||
|       // Stop handling errors on sink
 | ||||
|       controller.abort(); | ||||
| 
 | ||||
|       entry = { | ||||
|         key, | ||||
|         digest: writable.getFinalDigest(), | ||||
|       }; | ||||
|       digestLRU.set(ctx.path, entry); | ||||
|     } | ||||
| 
 | ||||
|     const validator = new ValidatingPassThrough( | ||||
|       entry.key, | ||||
|       chunkSize, | ||||
|       entry.digest | ||||
|     ); | ||||
|     await Promise.all([ | ||||
|       decryptAttachmentV2ToSink(options, validator), | ||||
|       pipeline(validator, sink), | ||||
|     ]); | ||||
|   } catch (error) { | ||||
|     // These errors happen when canceling fetch from `attachment://` urls,
 | ||||
|     // ignore them to avoid noise in the logs.
 | ||||
|  | @ -86,16 +153,8 @@ const storage = new DefaultStorage<RangeFinderContextType>( | |||
|     } | ||||
| 
 | ||||
|     if (ctx.type === 'ciphertext') { | ||||
|       const options = { | ||||
|         ciphertextPath: ctx.path, | ||||
|         idForLogging: 'attachment_channel', | ||||
|         keysBase64: ctx.keysBase64, | ||||
|         type: 'local' as const, | ||||
|         size: ctx.size, | ||||
|       }; | ||||
| 
 | ||||
|       const plaintext = new PassThrough(); | ||||
|       drop(safeDecryptToSink(options, plaintext)); | ||||
|       drop(safeDecryptToSink(ctx, plaintext)); | ||||
|       return plaintext; | ||||
|     } | ||||
| 
 | ||||
|  |  | |||
							
								
								
									
										72
									
								
								ts/test-node/util/ValidatingPassThrough_test.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										72
									
								
								ts/test-node/util/ValidatingPassThrough_test.ts
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,72 @@ | |||
| // Copyright 2024 Signal Messenger, LLC
 | ||||
| // SPDX-License-Identifier: AGPL-3.0-only
 | ||||
| 
 | ||||
| import { assert } from 'chai'; | ||||
| import { randomBytes } from 'node:crypto'; | ||||
| import { Readable } from 'node:stream'; | ||||
| import { pipeline, finished } from 'node:stream/promises'; | ||||
| import { | ||||
|   inferChunkSize, | ||||
|   DigestingWritable, | ||||
| } from '@signalapp/libsignal-client/dist/incremental_mac'; | ||||
| 
 | ||||
| import { ValidatingPassThrough } from '../../util/ValidatingPassThrough'; | ||||
| 
 | ||||
| // Use uneven chunk size to trigger buffering
 | ||||
| const CHUNK_SIZE = 13579; | ||||
| 
 | ||||
| function toChunkedReadable(buffer: Buffer): Readable { | ||||
|   const chunked = new Array<Buffer>(); | ||||
|   for (let i = 0; i < buffer.byteLength; i += CHUNK_SIZE) { | ||||
|     chunked.push(buffer.subarray(i, i + CHUNK_SIZE)); | ||||
|   } | ||||
| 
 | ||||
|   return Readable.from(chunked); | ||||
| } | ||||
| 
 | ||||
| describe('ValidatingPassThrough', () => { | ||||
|   it('should emit whole source stream', async () => { | ||||
|     const source = randomBytes(10 * 1024 * 1024); | ||||
|     const key = randomBytes(32); | ||||
| 
 | ||||
|     const chunkSize = inferChunkSize(source.byteLength); | ||||
|     const writable = new DigestingWritable(key, chunkSize); | ||||
|     await pipeline(Readable.from(source), writable); | ||||
| 
 | ||||
|     const digest = writable.getFinalDigest(); | ||||
|     const validator = new ValidatingPassThrough(key, chunkSize, digest); | ||||
| 
 | ||||
|     const received = new Array<Buffer>(); | ||||
|     validator.on('data', chunk => received.push(chunk)); | ||||
| 
 | ||||
|     await Promise.all([ | ||||
|       pipeline(toChunkedReadable(source), validator), | ||||
|       finished(validator), | ||||
|     ]); | ||||
| 
 | ||||
|     const actual = Buffer.concat(received); | ||||
|     assert.isTrue(actual.equals(source)); | ||||
|   }); | ||||
| 
 | ||||
|   it('should emit error on digest mismatch', async () => { | ||||
|     const source = randomBytes(10 * 1024 * 1024); | ||||
|     const key = randomBytes(32); | ||||
| 
 | ||||
|     const chunkSize = inferChunkSize(source.byteLength); | ||||
|     const writable = new DigestingWritable(key, chunkSize); | ||||
|     await pipeline(Readable.from(source), writable); | ||||
| 
 | ||||
|     const digest = writable.getFinalDigest(); | ||||
|     const wrongKey = randomBytes(32); | ||||
|     const validator = new ValidatingPassThrough(wrongKey, chunkSize, digest); | ||||
| 
 | ||||
|     validator.on('data', () => { | ||||
|       throw new Error('Should not be called'); | ||||
|     }); | ||||
| 
 | ||||
|     await assert.isRejected( | ||||
|       pipeline(toChunkedReadable(source), validator), | ||||
|       'Corrupted input data' | ||||
|     ); | ||||
|   }); | ||||
| }); | ||||
							
								
								
									
										87
									
								
								ts/util/ValidatingPassThrough.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										87
									
								
								ts/util/ValidatingPassThrough.ts
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,87 @@ | |||
| // Copyright 2024 Signal Messenger, LLC
 | ||||
| // SPDX-License-Identifier: AGPL-3.0-only
 | ||||
| 
 | ||||
| import { noop } from 'lodash'; | ||||
| import { Transform } from 'node:stream'; | ||||
| import { | ||||
|   ValidatingWritable, | ||||
|   type ChunkSizeChoice, | ||||
| } from '@signalapp/libsignal-client/dist/incremental_mac'; | ||||
| 
 | ||||
| export class ValidatingPassThrough extends Transform { | ||||
|   private validator: ValidatingWritable; | ||||
|   private buffer = new Array<Buffer>(); | ||||
| 
 | ||||
|   constructor(key: Buffer, sizeChoice: ChunkSizeChoice, digest: Buffer) { | ||||
|     super(); | ||||
|     this.validator = new ValidatingWritable(key, sizeChoice, digest); | ||||
| 
 | ||||
|     // We handle errors coming from write/end
 | ||||
|     this.validator.on('error', noop); | ||||
|   } | ||||
| 
 | ||||
|   public override _transform( | ||||
|     data: Buffer, | ||||
|     enc: BufferEncoding, | ||||
|     callback: (error?: null | Error) => void | ||||
|   ): void { | ||||
|     const start = this.validator.validatedSize(); | ||||
|     this.validator.write(data, enc, err => { | ||||
|       if (err) { | ||||
|         return callback(err); | ||||
|       } | ||||
| 
 | ||||
|       this.buffer.push(data); | ||||
| 
 | ||||
|       const end = this.validator.validatedSize(); | ||||
|       const readySize = end - start; | ||||
| 
 | ||||
|       // Fully buffer
 | ||||
|       if (readySize === 0) { | ||||
|         return callback(null); | ||||
|       } | ||||
| 
 | ||||
|       const { buffer } = this; | ||||
|       this.buffer = []; | ||||
|       let validated = 0; | ||||
|       for (const chunk of buffer) { | ||||
|         validated += chunk.byteLength; | ||||
| 
 | ||||
|         // Buffered chunk is fully validated - push it without slicing
 | ||||
|         if (validated <= readySize) { | ||||
|           this.push(chunk); | ||||
|           continue; | ||||
|         } | ||||
| 
 | ||||
|         // Validation boundary lies within the chunk, split it
 | ||||
|         const notValidated = validated - readySize; | ||||
|         this.push(chunk.subarray(0, -notValidated)); | ||||
|         this.buffer.push(chunk.subarray(-notValidated)); | ||||
| 
 | ||||
|         // Technically this chunk must be the last chunk so we could break,
 | ||||
|         // but for consistency keep looping.
 | ||||
|       } | ||||
|       callback(null); | ||||
|     }); | ||||
|   } | ||||
| 
 | ||||
|   public override _final(callback: (error?: null | Error) => void): void { | ||||
|     const start = this.validator.validatedSize(); | ||||
|     this.validator.end((err?: Error) => { | ||||
|       if (err) { | ||||
|         return callback(err); | ||||
|       } | ||||
| 
 | ||||
|       const end = this.validator.validatedSize(); | ||||
|       const readySize = end - start; | ||||
|       const buffer = Buffer.concat(this.buffer); | ||||
|       this.buffer = []; | ||||
|       if (buffer.byteLength !== readySize) { | ||||
|         return callback(new Error('Stream not fully processed')); | ||||
|       } | ||||
|       this.push(buffer); | ||||
| 
 | ||||
|       callback(null); | ||||
|     }); | ||||
|   } | ||||
| } | ||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Fedor Indutny
				Fedor Indutny