From ba6e11614ea923f2ae1e96935dfada232b432e8f Mon Sep 17 00:00:00 2001 From: Jordan Rose Date: Mon, 23 Sep 2024 16:24:24 -0700 Subject: [PATCH] Hook up LibSignalWebsocketResource.forceKeepAlive --- ts/textsecure/SocketManager.ts | 3 + ts/textsecure/WebsocketResources.ts | 141 +++++++++++++++++++--------- 2 files changed, 100 insertions(+), 44 deletions(-) diff --git a/ts/textsecure/SocketManager.ts b/ts/textsecure/SocketManager.ts index a9e05e492d..13b4185ffa 100644 --- a/ts/textsecure/SocketManager.ts +++ b/ts/textsecure/SocketManager.ts @@ -187,6 +187,7 @@ export class SocketManager extends EventListener { this.queueOrHandleRequest(req); }, receiveStories: !this.hasStoriesDisabled, + keepalive: { path: '/v1/keepalive' }, }) : this.connectResource({ name: AUTHENTICATED_CHANNEL_NAME, @@ -628,6 +629,7 @@ export class SocketManager extends EventListener { process = connectUnauthenticatedLibsignal({ libsignalNet: this.libsignalNet, name: UNAUTHENTICATED_CHANNEL_NAME, + keepalive: { path: '/v1/keepalive' }, }); } else { process = this.connectResource({ @@ -749,6 +751,7 @@ export class SocketManager extends EventListener { const shadowingConnection = connectUnauthenticatedLibsignal({ libsignalNet: this.libsignalNet, name: options.name, + keepalive: options.keepalive ?? {}, }); const shadowWrapper = async () => { // if main connection results in an error, diff --git a/ts/textsecure/WebsocketResources.ts b/ts/textsecure/WebsocketResources.ts index ab7034512b..b7f4e59c7b 100644 --- a/ts/textsecure/WebsocketResources.ts +++ b/ts/textsecure/WebsocketResources.ts @@ -326,7 +326,7 @@ export interface IWebSocketResource extends IResource { shutdown(): void; - close(): void; + close(code?: number, reason?: string): void; localPort(): number | undefined; } @@ -340,9 +340,11 @@ const UNEXPECTED_DISCONNECT_CODE = 3001; export function connectUnauthenticatedLibsignal({ libsignalNet, name, + keepalive, }: { libsignalNet: Net.Net; name: string; + keepalive: KeepAliveOptionsType; }): AbortableProcess { const logId = `LibsignalWebSocketResource(${name})`; const listener: LibsignalWebSocketResourceHolder & ConnectionEventsListener = @@ -360,7 +362,8 @@ export function connectUnauthenticatedLibsignal({ return connectLibsignal( libsignalNet.newUnauthenticatedChatService(listener), listener, - logId + logId, + keepalive ); } @@ -370,12 +373,14 @@ export function connectAuthenticatedLibsignal({ credentials, handler, receiveStories, + keepalive, }: { libsignalNet: Net.Net; name: string; credentials: WebAPICredentials; handler: (request: IncomingWebSocketRequest) => void; receiveStories: boolean; + keepalive: KeepAliveOptionsType; }): AbortableProcess { const logId = `LibsignalWebSocketResource(${name})`; const listener: LibsignalWebSocketResourceHolder & ChatServiceListener = { @@ -424,7 +429,8 @@ export function connectAuthenticatedLibsignal({ listener ), listener, - logId + logId, + keepalive ); } @@ -435,7 +441,8 @@ function logDisconnectedListenerWarn(logId: string, method: string): void { function connectLibsignal( chatService: Net.ChatService, resourceHolder: LibsignalWebSocketResourceHolder, - logId: string + logId: string, + keepalive: KeepAliveOptionsType ): AbortableProcess { const connectAsync = async () => { try { @@ -444,7 +451,8 @@ function connectLibsignal( const resource = new LibsignalWebSocketResource( chatService, IpVersion.fromDebugInfoCode(debugInfo.ipType), - logId + logId, + keepalive ); // eslint-disable-next-line no-param-reassign resourceHolder.resource = resource; @@ -473,12 +481,21 @@ export class LibsignalWebSocketResource { closed = false; + // Unlike WebSocketResource, libsignal will automatically attempt to keep the + // socket alive using websocket pings, so we don't need a timer-based + // keepalive mechanism. But we still send one-off keepalive requests when + // things change (see forceKeepAlive()). + private keepalive: KeepAliveSender; + constructor( private readonly chatService: Net.ChatService, private readonly socketIpVersion: IpVersion | undefined, - private readonly logId: string + private readonly logId: string, + keepalive: KeepAliveOptionsType ) { super(); + + this.keepalive = new KeepAliveSender(this, this.logId, keepalive); } public localPort(): number | undefined { @@ -538,8 +555,8 @@ export class LibsignalWebSocketResource this.dispatchEvent(event); } - public forceKeepAlive(): void { - // no-op + public forceKeepAlive(timeout?: number): void { + drop(this.keepalive.send(timeout)); } public async sendRequest(options: SendRequestOptions): Promise { @@ -654,10 +671,10 @@ export class WebSocketResourceWithShadowing implements IWebSocketResource { this.main.addEventListener(name, handler); } - public close(): void { - this.main.close(); + public close(code = NORMAL_DISCONNECT_CODE, reason?: string): void { + this.main.close(code, reason); if (this.shadowing) { - this.shadowing.close(); + this.shadowing.close(code, reason); this.shadowing = undefined; } else { this.shadowingConnection.abort(); @@ -1088,48 +1105,35 @@ const KEEPALIVE_TIMEOUT_MS = 30 * durations.SECOND; const LOG_KEEPALIVE_AFTER_MS = 500; -class KeepAlive { - private keepAliveTimer: Timers.Timeout | undefined; - +/** + * References an {@link IWebSocketResource} and a request path that should + * return promptly to determine whether the connection is still alive. + * + * The response to the request must have a 2xx status code but is otherwise + * ignored. A failing response or a timeout results in the socket being closed + * with {@link UNEXPECTED_DISCONNECT_CODE}. + * + * Use the subclass {@link KeepAlive} if you want to send the request at regular + * intervals. + */ +class KeepAliveSender { private path: string; - private wsr: WebSocketResource; + protected wsr: IWebSocketResource; - private lastAliveAt: number = Date.now(); - - private logId: string; + protected logId: string; constructor( - websocketResource: WebSocketResource, + websocketResource: IWebSocketResource, name: string, opts: KeepAliveOptionsType = {} ) { this.logId = `WebSocketResources.KeepAlive(${name})`; - if (websocketResource instanceof WebSocketResource) { - this.path = opts.path ?? '/'; - this.wsr = websocketResource; - } else { - throw new TypeError('KeepAlive expected a WebSocketResource'); - } + this.path = opts.path ?? '/'; + this.wsr = websocketResource; } - public stop(): void { - this.clearTimers(); - } - - public async send(timeout = KEEPALIVE_TIMEOUT_MS): Promise { - this.clearTimers(); - - const isStale = isOlderThan(this.lastAliveAt, STALE_THRESHOLD_MS); - if (isStale) { - log.info(`${this.logId}.send: disconnecting due to stale state`); - this.wsr.close( - UNEXPECTED_DISCONNECT_CODE, - `Last keepalive request was too far in the past: ${this.lastAliveAt}` - ); - return; - } - + public async send(timeout = KEEPALIVE_TIMEOUT_MS): Promise { log.info(`${this.logId}.send: Sending a keepalive message`); const sentAt = Date.now(); @@ -1148,14 +1152,14 @@ class KeepAlive { UNEXPECTED_DISCONNECT_CODE, `keepalive response with ${status} code` ); - return; + return false; } } catch (error) { this.wsr.close( UNEXPECTED_DISCONNECT_CODE, 'No response to keepalive request' ); - return; + return false; } const responseTime = Date.now() - sentAt; @@ -1166,8 +1170,57 @@ class KeepAlive { ); } + return true; + } +} + +/** + * Manages a timer that checks if a particular {@link WebSocketResource} is + * still alive. + * + * The resource must specifically be a {@link WebSocketResource}. Other kinds of + * resource are expected to manage their own liveness checks. If you want to + * manually send keepalive requests to such resources, use the base class + * {@link KeepAliveSender}. + */ +class KeepAlive extends KeepAliveSender { + private keepAliveTimer: Timers.Timeout | undefined; + + private lastAliveAt: number = Date.now(); + + constructor( + websocketResource: WebSocketResource, + name: string, + opts: KeepAliveOptionsType = {} + ) { + super(websocketResource, name, opts); + } + + public stop(): void { + this.clearTimers(); + } + + public override async send(timeout = KEEPALIVE_TIMEOUT_MS): Promise { + this.clearTimers(); + + const isStale = isOlderThan(this.lastAliveAt, STALE_THRESHOLD_MS); + if (isStale) { + log.info(`${this.logId}.send: disconnecting due to stale state`); + this.wsr.close( + UNEXPECTED_DISCONNECT_CODE, + `Last keepalive request was too far in the past: ${this.lastAliveAt}` + ); + return false; + } + + const isAlive = await super.send(timeout); + if (!isAlive) { + return false; + } + // Successful response on time this.reset(); + return true; } public reset(): void {