signal-desktop/ts/textsecure/WebsocketResources.ts

501 lines
13 KiB
TypeScript
Raw Normal View History

2023-01-03 19:55:46 +00:00
// Copyright 2020 Signal Messenger, LLC
2020-10-30 20:34:04 +00:00
// SPDX-License-Identifier: AGPL-3.0-only
2021-08-26 18:18:00 +00:00
/* eslint-disable max-classes-per-file */
/*
* WebSocket-Resources
*
* Create a request-response interface over websockets using the
* WebSocket-Resources sub-protocol[1].
*
* var client = new WebSocketResource(socket, function(request) {
* request.respond(200, 'OK');
* });
*
* const { response, status } = await client.sendRequest({
* verb: 'PUT',
* path: '/v1/messages',
* headers: ['content-type:application/json'],
* body: Buffer.from('{ some: "json" }'),
* });
*
* 1. https://github.com/signalapp/WebSocket-Resources
*
*/
import type { connection as WebSocket, IMessage } from 'websocket';
2022-03-23 20:49:27 +00:00
import Long from 'long';
2021-06-09 22:28:54 +00:00
import type { EventHandler } from './EventTarget';
import EventTarget from './EventTarget';
import * as durations from '../util/durations';
2021-07-02 19:21:24 +00:00
import { dropNull } from '../util/dropNull';
import { isOlderThan } from '../util/timestamp';
2021-07-02 19:21:24 +00:00
import { strictAssert } from '../util/assert';
import * as Errors from '../types/errors';
2021-07-02 19:21:24 +00:00
import { SignalService as Proto } from '../protobuf';
import * as log from '../logging/log';
import * as Timers from '../Timers';
const THIRTY_SECONDS = 30 * durations.SECOND;
const MAX_MESSAGE_SIZE = 256 * 1024;
2021-07-02 19:21:24 +00:00
export class IncomingWebSocketRequest {
2022-03-23 20:49:27 +00:00
private readonly id: Long;
2021-07-02 19:21:24 +00:00
public readonly verb: string;
2021-07-02 19:21:24 +00:00
public readonly path: string;
2021-07-02 19:21:24 +00:00
public readonly body: Uint8Array | undefined;
2021-07-02 19:21:24 +00:00
public readonly headers: ReadonlyArray<string>;
2021-07-02 19:21:24 +00:00
constructor(
request: Proto.IWebSocketRequestMessage,
private readonly sendBytes: (bytes: Buffer) => void
2021-07-02 19:21:24 +00:00
) {
strictAssert(request.id, 'request without id');
strictAssert(request.verb, 'request without verb');
strictAssert(request.path, 'request without path');
2021-07-02 19:21:24 +00:00
this.id = request.id;
this.verb = request.verb;
this.path = request.path;
this.body = dropNull(request.body);
this.headers = request.headers || [];
}
2021-07-02 19:21:24 +00:00
public respond(status: number, message: string): void {
const bytes = Proto.WebSocketMessage.encode({
type: Proto.WebSocketMessage.Type.RESPONSE,
response: { id: this.id, message, status },
}).finish();
this.sendBytes(Buffer.from(bytes));
}
}
export type SendRequestOptions = Readonly<{
verb: string;
path: string;
2021-07-02 19:21:24 +00:00
body?: Uint8Array;
timeout?: number;
2021-07-02 19:21:24 +00:00
headers?: ReadonlyArray<string>;
}>;
export type SendRequestResult = Readonly<{
status: number;
message: string;
response?: Uint8Array;
headers: ReadonlyArray<string>;
}>;
2021-07-02 19:21:24 +00:00
export type WebSocketResourceOptions = {
handleRequest?: (request: IncomingWebSocketRequest) => void;
keepalive?: KeepAliveOptionsType | true;
};
2021-07-09 19:36:10 +00:00
export class CloseEvent extends Event {
constructor(public readonly code: number, public readonly reason: string) {
super('close');
}
}
export default class WebSocketResource extends EventTarget {
2022-03-23 20:49:27 +00:00
private outgoingId = Long.fromNumber(1, true);
private closed = false;
private readonly outgoingMap = new Map<
2022-03-23 20:49:27 +00:00
string,
(result: SendRequestResult) => void
>();
2021-07-02 19:21:24 +00:00
private readonly boundOnMessage: (message: IMessage) => void;
2022-03-23 20:49:27 +00:00
private activeRequests = new Set<IncomingWebSocketRequest | string>();
private shuttingDown = false;
private shutdownTimer?: Timers.Timeout;
2021-07-02 19:21:24 +00:00
// Public for tests
public readonly keepalive?: KeepAlive;
constructor(
private readonly socket: WebSocket,
private readonly options: WebSocketResourceOptions = {}
) {
super();
2021-07-02 19:21:24 +00:00
this.boundOnMessage = this.onMessage.bind(this);
2021-07-02 19:21:24 +00:00
socket.on('message', this.boundOnMessage);
2021-06-09 22:28:54 +00:00
2021-07-02 19:21:24 +00:00
if (options.keepalive) {
const keepalive = new KeepAlive(
this,
options.keepalive === true ? {} : options.keepalive
2021-06-09 22:28:54 +00:00
);
2021-07-02 19:21:24 +00:00
this.keepalive = keepalive;
2021-07-02 19:21:24 +00:00
keepalive.reset();
socket.on('message', () => keepalive.reset());
socket.on('close', () => keepalive.stop());
socket.on('error', (error: Error) => {
log.warn(
'WebSocketResource: WebSocket error',
Errors.toLogFormat(error)
);
});
}
socket.on('close', (code, reason) => {
this.closed = true;
log.warn('WebSocketResource: Socket closed');
this.dispatchEvent(new CloseEvent(code, reason || 'normal'));
});
this.addEventListener('close', () => this.onClose());
2021-07-02 19:21:24 +00:00
}
public override addEventListener(
2021-07-09 19:36:10 +00:00
name: 'close',
handler: (ev: CloseEvent) => void
): void;
public override addEventListener(name: string, handler: EventHandler): void {
2021-07-09 19:36:10 +00:00
return super.addEventListener(name, handler);
}
public async sendRequest(
options: SendRequestOptions
): Promise<SendRequestResult> {
2021-07-02 19:21:24 +00:00
const id = this.outgoingId;
2022-03-23 20:49:27 +00:00
const idString = id.toString();
strictAssert(!this.outgoingMap.has(idString), 'Duplicate outgoing request');
2022-03-23 20:49:27 +00:00
// Note that this automatically wraps
this.outgoingId = this.outgoingId.add(1);
const bytes = Proto.WebSocketMessage.encode({
type: Proto.WebSocketMessage.Type.REQUEST,
request: {
verb: options.verb,
path: options.path,
body: options.body,
headers: options.headers ? options.headers.slice() : undefined,
id,
},
}).finish();
2021-09-10 20:16:53 +00:00
strictAssert(
bytes.length <= MAX_MESSAGE_SIZE,
'WebSocket request byte size exceeded'
);
strictAssert(!this.shuttingDown, 'Cannot send request, shutting down');
2022-03-23 20:49:27 +00:00
this.addActive(idString);
const promise = new Promise<SendRequestResult>((resolve, reject) => {
2023-05-17 16:43:33 +00:00
const sentAt = Date.now();
let timedOut = false;
let timer = options.timeout
? Timers.setTimeout(() => {
2023-05-17 16:43:33 +00:00
timedOut = true;
2022-03-23 20:49:27 +00:00
this.removeActive(idString);
2023-05-17 16:43:33 +00:00
reject(new Error(`Request timed out; id: [${idString}]`));
}, options.timeout)
: undefined;
2022-03-23 20:49:27 +00:00
this.outgoingMap.set(idString, result => {
if (timer !== undefined) {
Timers.clearTimeout(timer);
timer = undefined;
}
2023-05-17 16:43:33 +00:00
if (timedOut) {
log.warn(
`Response received after timeout; id: [${idString}], path: [${
options.path
}], response time: [${Date.now() - sentAt}]`
);
}
2022-03-23 20:49:27 +00:00
this.removeActive(idString);
resolve(result);
});
});
2021-07-02 19:21:24 +00:00
this.socket.sendBytes(Buffer.from(bytes));
return promise;
}
2021-06-09 22:28:54 +00:00
public forceKeepAlive(): void {
if (!this.keepalive) {
return;
}
void this.keepalive.send();
2021-06-09 22:28:54 +00:00
}
2021-07-02 19:21:24 +00:00
public close(code = 3000, reason?: string): void {
if (this.closed) {
log.info(`WebSocketResource.close: Already closed! ${code}/${reason}`);
2021-07-02 19:21:24 +00:00
return;
}
log.info('WebSocketResource.close()');
2021-07-02 19:21:24 +00:00
if (this.keepalive) {
this.keepalive.stop();
}
this.socket.close(code, reason);
this.socket.removeListener('message', this.boundOnMessage);
// On linux the socket can wait a long time to emit its close event if we've
// lost the internet connection. On the order of minutes. This speeds that
// process up.
Timers.setTimeout(() => {
2021-07-02 19:21:24 +00:00
if (this.closed) {
return;
}
log.warn('WebSocketResource: Dispatching our own socket close event');
2021-07-09 19:36:10 +00:00
this.dispatchEvent(new CloseEvent(code, reason || 'normal'));
2021-07-02 19:21:24 +00:00
}, 5000);
}
public shutdown(): void {
if (this.closed) {
return;
}
if (this.activeRequests.size === 0) {
log.info('WebSocketResource: no active requests, closing');
this.close(3000, 'Shutdown');
return;
}
this.shuttingDown = true;
log.info('WebSocketResource: shutting down');
this.shutdownTimer = Timers.setTimeout(() => {
if (this.closed) {
return;
}
log.warn('WebSocketResource: Failed to shutdown gracefully');
this.close(3000, 'Shutdown');
}, THIRTY_SECONDS);
}
2021-07-02 19:21:24 +00:00
private onMessage({ type, binaryData }: IMessage): void {
if (type !== 'binary' || !binaryData) {
throw new Error(`Unsupported websocket message type: ${type}`);
}
const message = Proto.WebSocketMessage.decode(binaryData);
if (
message.type === Proto.WebSocketMessage.Type.REQUEST &&
message.request
) {
const handleRequest =
this.options.handleRequest ||
(request => request.respond(404, 'Not found'));
const incomingRequest = new IncomingWebSocketRequest(
message.request,
(bytes: Buffer): void => {
this.removeActive(incomingRequest);
strictAssert(
bytes.length <= MAX_MESSAGE_SIZE,
'WebSocket response byte size exceeded'
);
this.socket.sendBytes(bytes);
}
);
if (this.shuttingDown) {
incomingRequest.respond(-1, 'Shutting down');
return;
}
this.addActive(incomingRequest);
handleRequest(incomingRequest);
2021-07-02 19:21:24 +00:00
} else if (
message.type === Proto.WebSocketMessage.Type.RESPONSE &&
message.response
) {
const { response } = message;
strictAssert(response.id, 'response without id');
2022-03-23 20:49:27 +00:00
const responseIdString = response.id.toString();
const resolve = this.outgoingMap.get(responseIdString);
this.outgoingMap.delete(responseIdString);
2021-07-02 19:21:24 +00:00
if (!resolve) {
2022-03-23 20:49:27 +00:00
throw new Error(`Received response for unknown request ${response.id}`);
2021-07-02 19:21:24 +00:00
}
resolve({
status: response.status ?? -1,
message: response.message ?? '',
response: dropNull(response.body),
headers: response.headers ?? [],
});
}
}
2021-07-02 19:21:24 +00:00
private onClose(): void {
const outgoing = new Map(this.outgoingMap);
this.outgoingMap.clear();
for (const resolve of outgoing.values()) {
resolve({
status: -1,
message: 'Connection closed',
response: undefined,
headers: [],
});
}
}
2021-07-02 19:21:24 +00:00
2022-03-23 20:49:27 +00:00
private addActive(request: IncomingWebSocketRequest | string): void {
this.activeRequests.add(request);
}
2021-07-02 19:21:24 +00:00
2022-03-23 20:49:27 +00:00
private removeActive(request: IncomingWebSocketRequest | string): void {
if (!this.activeRequests.has(request)) {
log.warn('WebSocketResource: removing unknown request');
return;
}
this.activeRequests.delete(request);
if (this.activeRequests.size !== 0) {
return;
}
if (!this.shuttingDown) {
return;
}
if (this.shutdownTimer) {
Timers.clearTimeout(this.shutdownTimer);
this.shutdownTimer = undefined;
2021-07-02 19:21:24 +00:00
}
log.info('WebSocketResource: shutdown complete');
this.close(3000, 'Shutdown');
2021-07-02 19:21:24 +00:00
}
}
2021-07-02 19:21:24 +00:00
export type KeepAliveOptionsType = {
path?: string;
disconnect?: boolean;
};
// 30 seconds + 5 seconds for closing the socket above.
const KEEPALIVE_INTERVAL_MS = 30 * durations.SECOND;
const MAX_KEEPALIVE_INTERVAL_MS = 5 * durations.MINUTE;
2023-05-17 16:43:33 +00:00
const LOG_KEEPALIVE_AFTER_MS = 500;
class KeepAlive {
private keepAliveTimer: Timers.Timeout | undefined;
private disconnectTimer: Timers.Timeout | undefined;
2021-06-09 22:28:54 +00:00
private path: string;
2021-06-09 22:28:54 +00:00
private disconnect: boolean;
2021-06-09 22:28:54 +00:00
private wsr: WebSocketResource;
private lastAliveAt: number = Date.now();
constructor(
websocketResource: WebSocketResource,
opts: KeepAliveOptionsType = {}
) {
if (websocketResource instanceof WebSocketResource) {
this.path = opts.path !== undefined ? opts.path : '/';
this.disconnect = opts.disconnect !== undefined ? opts.disconnect : true;
this.wsr = websocketResource;
} else {
throw new TypeError('KeepAlive expected a WebSocketResource');
}
}
2021-06-09 22:28:54 +00:00
public stop(): void {
this.clearTimers();
}
public async send(): Promise<void> {
2021-06-09 22:28:54 +00:00
this.clearTimers();
if (isOlderThan(this.lastAliveAt, MAX_KEEPALIVE_INTERVAL_MS)) {
log.info('WebSocketResources: disconnecting due to stale state');
this.wsr.close(
3001,
`Last keepalive request was too far in the past: ${this.lastAliveAt}`
);
return;
}
2021-06-09 22:28:54 +00:00
if (this.disconnect) {
// automatically disconnect if server doesn't ack
this.disconnectTimer = Timers.setTimeout(() => {
log.info('WebSocketResources: disconnecting due to no response');
2021-06-09 22:28:54 +00:00
this.clearTimers();
this.wsr.close(3001, 'No response to keepalive request');
}, 10000);
} else {
this.reset();
}
log.info('WebSocketResources: Sending a keepalive message');
2023-05-17 16:43:33 +00:00
const sentAt = Date.now();
const { status } = await this.wsr.sendRequest({
2021-06-09 22:28:54 +00:00
verb: 'GET',
path: this.path,
});
2023-05-17 16:43:33 +00:00
const responseTime = Date.now() - sentAt;
if (responseTime > LOG_KEEPALIVE_AFTER_MS) {
log.warn(
`Delayed response to keepalive request, response time: [${responseTime}]`
);
}
if (status >= 200 || status < 300) {
this.reset();
}
2021-06-09 22:28:54 +00:00
}
public reset(): void {
this.lastAliveAt = Date.now();
2021-06-09 22:28:54 +00:00
this.clearTimers();
this.keepAliveTimer = Timers.setTimeout(
() => this.send(),
KEEPALIVE_INTERVAL_MS
);
2021-06-09 22:28:54 +00:00
}
private clearTimers(): void {
if (this.keepAliveTimer) {
Timers.clearTimeout(this.keepAliveTimer);
2021-06-09 22:28:54 +00:00
this.keepAliveTimer = undefined;
}
if (this.disconnectTimer) {
Timers.clearTimeout(this.disconnectTimer);
2021-06-09 22:28:54 +00:00
this.disconnectTimer = undefined;
}
}
}