From 7abd2280bcb9a7ed8492fed7f67bca8bd9e46909 Mon Sep 17 00:00:00 2001 From: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com> Date: Tue, 6 Jun 2023 17:36:38 -0700 Subject: [PATCH] Simplify WebSocket keepalive logic --- ts/test-electron/WebsocketResources_test.ts | 41 +++---- ts/textsecure/SocketManager.ts | 3 + ts/textsecure/WebsocketResources.ts | 125 +++++++++++--------- 3 files changed, 88 insertions(+), 81 deletions(-) diff --git a/ts/test-electron/WebsocketResources_test.ts b/ts/test-electron/WebsocketResources_test.ts index 45184bc9b355..388f9b3ee7fd 100644 --- a/ts/test-electron/WebsocketResources_test.ts +++ b/ts/test-electron/WebsocketResources_test.ts @@ -68,6 +68,7 @@ describe('WebSocket-Resource', () => { // actual test new WebSocketResource(socket as WebSocket, { + name: 'test', handleRequest(request: any) { assert.strictEqual(request.verb, 'PUT'); assert.strictEqual(request.path, '/some/path'); @@ -106,7 +107,9 @@ describe('WebSocket-Resource', () => { }); // actual test - const resource = new WebSocketResource(socket as WebSocket); + const resource = new WebSocketResource(socket as WebSocket, { + name: 'test', + }); const promise = resource.sendRequest({ verb: 'PUT', path: '/some/path', @@ -134,14 +137,18 @@ describe('WebSocket-Resource', () => { sinon.stub(socket, 'close').callsFake(() => done()); - const resource = new WebSocketResource(socket as WebSocket); + const resource = new WebSocketResource(socket as WebSocket, { + name: 'test', + }); resource.close(); }); it('force closes the connection', function test(done) { const socket = new FakeSocket(); - const resource = new WebSocketResource(socket as WebSocket); + const resource = new WebSocketResource(socket as WebSocket, { + name: 'test', + }); resource.close(); resource.addEventListener('close', () => done()); @@ -164,37 +171,21 @@ describe('WebSocket-Resource', () => { }); new WebSocketResource(socket as WebSocket, { + name: 'test', keepalive: { path: '/v1/keepalive' }, }); this.clock.next(); }); - it('uses / as a default path', function test(done) { - const socket = new FakeSocket(); - - sinon.stub(socket, 'sendBytes').callsFake(data => { - const message = Proto.WebSocketMessage.decode(data); - assert.strictEqual(message.type, Proto.WebSocketMessage.Type.REQUEST); - assert.strictEqual(message.request?.verb, 'GET'); - assert.strictEqual(message.request?.path, '/'); - done(); - }); - - new WebSocketResource(socket as WebSocket, { - keepalive: true, - }); - - this.clock.next(); - }); - it('optionally disconnects if no response', function thisNeeded1(done) { const socket = new FakeSocket(); sinon.stub(socket, 'close').callsFake(() => done()); new WebSocketResource(socket as WebSocket, { - keepalive: true, + name: 'test', + keepalive: { path: '/' }, }); // One to trigger send @@ -210,7 +201,8 @@ describe('WebSocket-Resource', () => { sinon.stub(socket, 'close').callsFake(() => done()); new WebSocketResource(socket as WebSocket, { - keepalive: true, + name: 'test', + keepalive: { path: '/' }, }); // Just skip one hour immediately @@ -237,7 +229,8 @@ describe('WebSocket-Resource', () => { }); const resource = new WebSocketResource(socket as WebSocket, { - keepalive: true, + name: 'test', + keepalive: { path: '/' }, }); setTimeout(() => { diff --git a/ts/textsecure/SocketManager.ts b/ts/textsecure/SocketManager.ts index 6b9ab7d05c61..61bf1bd4e8ef 100644 --- a/ts/textsecure/SocketManager.ts +++ b/ts/textsecure/SocketManager.ts @@ -142,6 +142,7 @@ export class SocketManager extends EventListener { path: '/v1/websocket/', query: { login: username, password }, resourceOptions: { + name: 'authenticated', keepalive: { path: '/v1/keepalive' }, handleRequest: (req: IncomingWebSocketRequest): void => { this.queueOrHandleRequest(req); @@ -269,6 +270,7 @@ export class SocketManager extends EventListener { name: 'provisioning', path: '/v1/websocket/provisioning/', resourceOptions: { + name: 'provisioning', handleRequest: (req: IncomingWebSocketRequest): void => { handler.handleRequest(req); }, @@ -483,6 +485,7 @@ export class SocketManager extends EventListener { name: 'unauthenticated', path: '/v1/websocket/', resourceOptions: { + name: 'unauthenticated', keepalive: { path: '/v1/keepalive' }, }, }); diff --git a/ts/textsecure/WebsocketResources.ts b/ts/textsecure/WebsocketResources.ts index c6cc77814a7a..25b3336119e4 100644 --- a/ts/textsecure/WebsocketResources.ts +++ b/ts/textsecure/WebsocketResources.ts @@ -25,6 +25,7 @@ import type { connection as WebSocket, IMessage } from 'websocket'; import Long from 'long'; +import pTimeout from 'p-timeout'; import type { EventHandler } from './EventTarget'; import EventTarget from './EventTarget'; @@ -94,8 +95,9 @@ export type SendRequestResult = Readonly<{ }>; export type WebSocketResourceOptions = { + name: string; handleRequest?: (request: IncomingWebSocketRequest) => void; - keepalive?: KeepAliveOptionsType | true; + keepalive?: KeepAliveOptionsType; }; export class CloseEvent extends Event { @@ -122,15 +124,19 @@ export default class WebSocketResource extends EventTarget { private shutdownTimer?: Timers.Timeout; + private readonly logId: string; + // Public for tests public readonly keepalive?: KeepAlive; constructor( private readonly socket: WebSocket, - private readonly options: WebSocketResourceOptions = {} + private readonly options: WebSocketResourceOptions ) { super(); + this.logId = `WebSocketResource(${options.name})`; + this.boundOnMessage = this.onMessage.bind(this); socket.on('message', this.boundOnMessage); @@ -138,25 +144,22 @@ export default class WebSocketResource extends EventTarget { if (options.keepalive) { const keepalive = new KeepAlive( this, - options.keepalive === true ? {} : options.keepalive + options.name, + options.keepalive ?? {} ); this.keepalive = keepalive; keepalive.reset(); - socket.on('message', () => keepalive.reset()); - socket.on('close', () => keepalive.stop()); + socket.on('close', () => this.keepalive?.stop()); socket.on('error', (error: Error) => { - log.warn( - 'WebSocketResource: WebSocket error', - Errors.toLogFormat(error) - ); + log.warn(`${this.logId}: WebSocket error`, Errors.toLogFormat(error)); }); } socket.on('close', (code, reason) => { this.closed = true; - log.warn('WebSocketResource: Socket closed'); + log.warn(`${this.logId}: Socket closed`); this.dispatchEvent(new CloseEvent(code, reason || 'normal')); }); @@ -218,10 +221,13 @@ export default class WebSocketResource extends EventTarget { } if (timedOut) { log.warn( - `Response received after timeout; id: [${idString}], path: [${ - options.path - }], response time: [${Date.now() - sentAt}]` + `${this.logId}: Response received after timeout; ` + + `id: [${idString}], path: [${options.path}], ` + + `response time: ${Date.now() - sentAt}ms` ); + } else { + // Reset keepalive when an on-time response arrives + this.keepalive?.reset(); } this.removeActive(idString); resolve(result); @@ -242,11 +248,11 @@ export default class WebSocketResource extends EventTarget { public close(code = 3000, reason?: string): void { if (this.closed) { - log.info(`WebSocketResource.close: Already closed! ${code}/${reason}`); + log.info(`${this.logId}.close: Already closed! ${code}/${reason}`); return; } - log.info('WebSocketResource.close()'); + log.info(`${this.logId}.close(${code})`); if (this.keepalive) { this.keepalive.stop(); } @@ -263,9 +269,9 @@ export default class WebSocketResource extends EventTarget { return; } - log.warn('WebSocketResource: Dispatching our own socket close event'); + log.warn(`${this.logId}.close: Dispatching our own socket close event`); this.dispatchEvent(new CloseEvent(code, reason || 'normal')); - }, 5000); + }, 5 * durations.SECOND); } public shutdown(): void { @@ -274,20 +280,20 @@ export default class WebSocketResource extends EventTarget { } if (this.activeRequests.size === 0) { - log.info('WebSocketResource: no active requests, closing'); + log.info(`${this.logId}.shutdown: no active requests, closing`); this.close(3000, 'Shutdown'); return; } this.shuttingDown = true; - log.info('WebSocketResource: shutting down'); + log.info(`${this.logId}.shutdown: shutting down`); this.shutdownTimer = Timers.setTimeout(() => { if (this.closed) { return; } - log.warn('WebSocketResource: Failed to shutdown gracefully'); + log.warn(`${this.logId}.shutdown: Failed to shutdown gracefully`); this.close(3000, 'Shutdown'); }, THIRTY_SECONDS); } @@ -370,7 +376,7 @@ export default class WebSocketResource extends EventTarget { private removeActive(request: IncomingWebSocketRequest | string): void { if (!this.activeRequests.has(request)) { - log.warn('WebSocketResource: removing unknown request'); + log.warn(`${this.logId}.removeActive: removing unknown request`); return; } @@ -387,41 +393,47 @@ export default class WebSocketResource extends EventTarget { this.shutdownTimer = undefined; } - log.info('WebSocketResource: shutdown complete'); + log.info(`${this.logId}.removeActive: shutdown complete`); this.close(3000, 'Shutdown'); } } export type KeepAliveOptionsType = { path?: string; - disconnect?: boolean; }; // 30 seconds + 5 seconds for closing the socket above. const KEEPALIVE_INTERVAL_MS = 30 * durations.SECOND; -const MAX_KEEPALIVE_INTERVAL_MS = 5 * durations.MINUTE; + +// If the machine was in suspended mode for more than 5 minutes - trigger +// immediate disconnect. +const STALE_THRESHOLD_MS = 5 * durations.MINUTE; + +// If we don't receive a response to keepalive request within 10 seconds - +// close the socket. +const KEEPALIVE_TIMEOUT_MS = 10 * durations.SECOND; + const LOG_KEEPALIVE_AFTER_MS = 500; class KeepAlive { private keepAliveTimer: Timers.Timeout | undefined; - private disconnectTimer: Timers.Timeout | undefined; - private path: string; - private disconnect: boolean; - private wsr: WebSocketResource; private lastAliveAt: number = Date.now(); + private logId: string; + constructor( websocketResource: WebSocketResource, + name: string, opts: KeepAliveOptionsType = {} ) { + this.logId = `WebSocketResources.KeepAlive(${name})`; if (websocketResource instanceof WebSocketResource) { - this.path = opts.path !== undefined ? opts.path : '/'; - this.disconnect = opts.disconnect !== undefined ? opts.disconnect : true; + this.path = opts.path ?? '/'; this.wsr = websocketResource; } else { throw new TypeError('KeepAlive expected a WebSocketResource'); @@ -435,8 +447,9 @@ class KeepAlive { public async send(): Promise { this.clearTimers(); - if (isOlderThan(this.lastAliveAt, MAX_KEEPALIVE_INTERVAL_MS)) { - log.info('WebSocketResources: disconnecting due to stale state'); + const isStale = isOlderThan(this.lastAliveAt, STALE_THRESHOLD_MS); + if (isStale) { + log.info(`${this.logId}.send: disconnecting due to stale state`); this.wsr.close( 3001, `Last keepalive request was too far in the past: ${this.lastAliveAt}` @@ -444,36 +457,38 @@ class KeepAlive { return; } - if (this.disconnect) { - // automatically disconnect if server doesn't ack - this.disconnectTimer = Timers.setTimeout(() => { - log.info('WebSocketResources: disconnecting due to no response'); - this.clearTimers(); - - this.wsr.close(3001, 'No response to keepalive request'); - }, 10000); - } else { - this.reset(); - } - - log.info('WebSocketResources: Sending a keepalive message'); + log.info(`${this.logId}.send: Sending a keepalive message`); const sentAt = Date.now(); - const { status } = await this.wsr.sendRequest({ - verb: 'GET', - path: this.path, - }); + try { + const { status } = await pTimeout( + this.wsr.sendRequest({ + verb: 'GET', + path: this.path, + }), + KEEPALIVE_TIMEOUT_MS + ); + + if (status < 200 || status >= 300) { + log.warn(`${this.logId}.send: keepalive response status ${status}`); + this.wsr.close(3001, `keepalive response with ${status} code`); + return; + } + } catch (error) { + this.wsr.close(3001, 'No response to keepalive request'); + return; + } const responseTime = Date.now() - sentAt; if (responseTime > LOG_KEEPALIVE_AFTER_MS) { log.warn( - `Delayed response to keepalive request, response time: [${responseTime}]` + `${this.logId}.send: delayed response to keepalive request, ` + + `response time: ${responseTime}ms` ); } - if (status >= 200 || status < 300) { - this.reset(); - } + // Successful response on time + this.reset(); } public reset(): void { @@ -492,9 +507,5 @@ class KeepAlive { Timers.clearTimeout(this.keepAliveTimer); this.keepAliveTimer = undefined; } - if (this.disconnectTimer) { - Timers.clearTimeout(this.disconnectTimer); - this.disconnectTimer = undefined; - } } }