| 
									
										
										
										
											2023-01-03 11:55:46 -08:00
										 |  |  | // Copyright 2021 Signal Messenger, LLC
 | 
					
						
							| 
									
										
										
										
											2021-11-15 23:54:59 +01:00
										 |  |  | // SPDX-License-Identifier: AGPL-3.0-only
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-09-16 17:39:03 -07:00
										 |  |  | import { Transform } from 'node:stream'; | 
					
						
							|  |  |  | import type { Readable } from 'node:stream'; | 
					
						
							| 
									
										
										
										
											2021-11-15 23:54:59 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-09-16 17:39:03 -07:00
										 |  |  | import * as Bytes from '../Bytes.js'; | 
					
						
							|  |  |  | import { clearTimeoutIfNecessary } from './clearTimeoutIfNecessary.js'; | 
					
						
							|  |  |  | import { explodePromise } from './explodePromise.js'; | 
					
						
							| 
									
										
										
										
											2021-11-15 23:54:59 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | export type OptionsType = Readonly<{ | 
					
						
							|  |  |  |   name: string; | 
					
						
							|  |  |  |   timeout: number; | 
					
						
							|  |  |  |   abortController: { abort(): void }; | 
					
						
							|  |  |  | }>; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | export class StreamTimeoutError extends Error {} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | export function getStreamWithTimeout( | 
					
						
							|  |  |  |   stream: Readable, | 
					
						
							|  |  |  |   { name, timeout, abortController }: OptionsType | 
					
						
							|  |  |  | ): Promise<Uint8Array> { | 
					
						
							|  |  |  |   const { promise, resolve, reject } = explodePromise<Uint8Array>(); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   const chunks = new Array<Uint8Array>(); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   let timer: NodeJS.Timeout | undefined; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   const clearTimer = () => { | 
					
						
							| 
									
										
										
										
											2022-02-25 12:37:15 -06:00
										 |  |  |     clearTimeoutIfNecessary(timer); | 
					
						
							|  |  |  |     timer = undefined; | 
					
						
							| 
									
										
										
										
											2021-11-15 23:54:59 +01:00
										 |  |  |   }; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   const reset = () => { | 
					
						
							|  |  |  |     clearTimer(); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     timer = setTimeout(() => { | 
					
						
							|  |  |  |       abortController.abort(); | 
					
						
							|  |  |  |       reject(new StreamTimeoutError(`getStreamWithTimeout(${name}) timed out`)); | 
					
						
							|  |  |  |     }, timeout); | 
					
						
							|  |  |  |   }; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   stream.on('data', chunk => { | 
					
						
							|  |  |  |     reset(); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     chunks.push(chunk); | 
					
						
							|  |  |  |   }); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   stream.on('end', () => { | 
					
						
							|  |  |  |     clearTimer(); | 
					
						
							|  |  |  |     resolve(Bytes.concatenate(chunks)); | 
					
						
							|  |  |  |   }); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   stream.on('error', error => { | 
					
						
							|  |  |  |     clearTimer(); | 
					
						
							|  |  |  |     reject(error); | 
					
						
							|  |  |  |   }); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   reset(); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   return promise; | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2023-10-30 09:24:28 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | export function getTimeoutStream({ | 
					
						
							|  |  |  |   name, | 
					
						
							|  |  |  |   timeout, | 
					
						
							|  |  |  |   abortController, | 
					
						
							|  |  |  | }: OptionsType): Transform { | 
					
						
							|  |  |  |   const timeoutStream = new Transform(); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   let timer: NodeJS.Timeout | undefined; | 
					
						
							|  |  |  |   const clearTimer = () => { | 
					
						
							|  |  |  |     clearTimeoutIfNecessary(timer); | 
					
						
							|  |  |  |     timer = undefined; | 
					
						
							|  |  |  |   }; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   const reset = () => { | 
					
						
							|  |  |  |     clearTimer(); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     timer = setTimeout(() => { | 
					
						
							|  |  |  |       abortController.abort(); | 
					
						
							|  |  |  |       timeoutStream.emit( | 
					
						
							|  |  |  |         'error', | 
					
						
							|  |  |  |         new StreamTimeoutError(`getStreamWithTimeout(${name}) timed out`) | 
					
						
							|  |  |  |       ); | 
					
						
							|  |  |  |       clearTimer(); | 
					
						
							|  |  |  |     }, timeout); | 
					
						
							|  |  |  |   }; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   timeoutStream._transform = function transform(chunk, _encoding, done) { | 
					
						
							|  |  |  |     try { | 
					
						
							|  |  |  |       reset(); | 
					
						
							|  |  |  |     } catch (error) { | 
					
						
							|  |  |  |       return done(error); | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     this.push(chunk); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     done(); | 
					
						
							|  |  |  |   }; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   reset(); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   return timeoutStream; | 
					
						
							|  |  |  | } |