More protobufjs use
This commit is contained in:
parent
1fa0e6c8c1
commit
299fe2af36
20 changed files with 646 additions and 447 deletions
|
@ -1,9 +1,6 @@
|
|||
// Copyright 2020 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
/* eslint-disable no-param-reassign */
|
||||
/* eslint-disable @typescript-eslint/no-explicit-any */
|
||||
/* eslint-disable @typescript-eslint/ban-types */
|
||||
/* eslint-disable max-classes-per-file */
|
||||
/*
|
||||
* WebSocket-Resources
|
||||
|
@ -29,228 +26,152 @@
|
|||
|
||||
import { connection as WebSocket, IMessage } from 'websocket';
|
||||
|
||||
import { ByteBufferClass } from '../window.d';
|
||||
import { typedArrayToArrayBuffer as toArrayBuffer } from '../Crypto';
|
||||
|
||||
import EventTarget from './EventTarget';
|
||||
|
||||
import { dropNull } from '../util/dropNull';
|
||||
import { isOlderThan } from '../util/timestamp';
|
||||
import { strictAssert } from '../util/assert';
|
||||
import { normalizeNumber } from '../util/normalizeNumber';
|
||||
import { SignalService as Proto } from '../protobuf';
|
||||
|
||||
class Request {
|
||||
verb: string;
|
||||
|
||||
path: string;
|
||||
|
||||
headers: Array<string>;
|
||||
|
||||
body: ByteBufferClass | null;
|
||||
|
||||
success: Function;
|
||||
|
||||
error: Function;
|
||||
|
||||
id: number;
|
||||
|
||||
response?: any;
|
||||
|
||||
constructor(options: any) {
|
||||
this.verb = options.verb || options.type;
|
||||
this.path = options.path || options.url;
|
||||
this.headers = options.headers;
|
||||
this.body = options.body || options.data;
|
||||
this.success = options.success;
|
||||
this.error = options.error;
|
||||
this.id = options.id;
|
||||
|
||||
if (this.id === undefined) {
|
||||
const bits = new Uint32Array(2);
|
||||
window.crypto.getRandomValues(bits);
|
||||
this.id = window.dcodeIO.Long.fromBits(bits[0], bits[1], true);
|
||||
}
|
||||
|
||||
if (this.body === undefined) {
|
||||
this.body = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
type Callback = (
|
||||
message: string,
|
||||
status: number,
|
||||
request: OutgoingWebSocketRequest
|
||||
) => void;
|
||||
|
||||
export class IncomingWebSocketRequest {
|
||||
verb: string;
|
||||
private readonly id: Long | number;
|
||||
|
||||
path: string;
|
||||
public readonly verb: string;
|
||||
|
||||
body: ByteBufferClass | null;
|
||||
public readonly path: string;
|
||||
|
||||
headers: Array<string>;
|
||||
public readonly body: Uint8Array | undefined;
|
||||
|
||||
respond: (status: number, message: string) => void;
|
||||
public readonly headers: ReadonlyArray<string>;
|
||||
|
||||
constructor(options: unknown) {
|
||||
const request = new Request(options);
|
||||
const { socket } = options as { socket: WebSocket };
|
||||
constructor(
|
||||
request: Proto.IWebSocketRequestMessage,
|
||||
private readonly socket: WebSocket
|
||||
) {
|
||||
strictAssert(request.id, 'request without id');
|
||||
strictAssert(request.verb, 'request without verb');
|
||||
strictAssert(request.path, 'request without path');
|
||||
|
||||
this.id = request.id;
|
||||
this.verb = request.verb;
|
||||
this.path = request.path;
|
||||
this.body = request.body;
|
||||
this.headers = request.headers;
|
||||
this.body = dropNull(request.body);
|
||||
this.headers = request.headers || [];
|
||||
this.socket = socket;
|
||||
}
|
||||
|
||||
this.respond = (status, message) => {
|
||||
const ab = new window.textsecure.protobuf.WebSocketMessage({
|
||||
type: window.textsecure.protobuf.WebSocketMessage.Type.RESPONSE,
|
||||
response: { id: request.id, message, status },
|
||||
})
|
||||
.encode()
|
||||
.toArrayBuffer();
|
||||
socket.sendBytes(Buffer.from(ab));
|
||||
};
|
||||
public respond(status: number, message: string): void {
|
||||
const bytes = Proto.WebSocketMessage.encode({
|
||||
type: Proto.WebSocketMessage.Type.RESPONSE,
|
||||
response: { id: this.id, message, status },
|
||||
}).finish();
|
||||
|
||||
this.socket.sendBytes(Buffer.from(bytes));
|
||||
}
|
||||
}
|
||||
|
||||
const outgoing: {
|
||||
[id: number]: Request;
|
||||
} = {};
|
||||
class OutgoingWebSocketRequest {
|
||||
constructor(options: any, socket: WebSocket) {
|
||||
const request = new Request(options);
|
||||
outgoing[request.id] = request;
|
||||
const ab = new window.textsecure.protobuf.WebSocketMessage({
|
||||
type: window.textsecure.protobuf.WebSocketMessage.Type.REQUEST,
|
||||
export type OutgoingWebSocketRequestOptions = Readonly<{
|
||||
verb: string;
|
||||
path: string;
|
||||
body?: Uint8Array;
|
||||
headers?: ReadonlyArray<string>;
|
||||
error?: Callback;
|
||||
success?: Callback;
|
||||
}>;
|
||||
|
||||
export class OutgoingWebSocketRequest {
|
||||
public readonly error: Callback | undefined;
|
||||
|
||||
public readonly success: Callback | undefined;
|
||||
|
||||
public response: Proto.IWebSocketResponseMessage | undefined;
|
||||
|
||||
constructor(
|
||||
id: number,
|
||||
options: OutgoingWebSocketRequestOptions,
|
||||
socket: WebSocket
|
||||
) {
|
||||
this.error = options.error;
|
||||
this.success = options.success;
|
||||
|
||||
const bytes = Proto.WebSocketMessage.encode({
|
||||
type: Proto.WebSocketMessage.Type.REQUEST,
|
||||
request: {
|
||||
verb: request.verb,
|
||||
path: request.path,
|
||||
body: request.body,
|
||||
headers: request.headers,
|
||||
id: request.id,
|
||||
verb: options.verb,
|
||||
path: options.path,
|
||||
body: options.body,
|
||||
headers: options.headers ? options.headers.slice() : undefined,
|
||||
id,
|
||||
},
|
||||
})
|
||||
.encode()
|
||||
.toArrayBuffer();
|
||||
socket.sendBytes(Buffer.from(ab));
|
||||
}).finish();
|
||||
socket.sendBytes(Buffer.from(bytes));
|
||||
}
|
||||
}
|
||||
|
||||
export type WebSocketResourceOptions = {
|
||||
handleRequest?: (request: IncomingWebSocketRequest) => void;
|
||||
keepalive?: KeepAliveOptionsType | true;
|
||||
};
|
||||
|
||||
export default class WebSocketResource extends EventTarget {
|
||||
closed?: boolean;
|
||||
private outgoingId = 1;
|
||||
|
||||
close: (code?: number, reason?: string) => void;
|
||||
private closed?: boolean;
|
||||
|
||||
sendRequest: (options: any) => OutgoingWebSocketRequest;
|
||||
private readonly outgoingMap = new Map<number, OutgoingWebSocketRequest>();
|
||||
|
||||
keepalive?: KeepAlive;
|
||||
private readonly boundOnMessage: (message: IMessage) => void;
|
||||
|
||||
constructor(socket: WebSocket, opts: any = {}) {
|
||||
// Public for tests
|
||||
public readonly keepalive?: KeepAlive;
|
||||
|
||||
constructor(
|
||||
private readonly socket: WebSocket,
|
||||
private readonly options: WebSocketResourceOptions = {}
|
||||
) {
|
||||
super();
|
||||
|
||||
let { handleRequest } = opts;
|
||||
if (typeof handleRequest !== 'function') {
|
||||
handleRequest = (request: IncomingWebSocketRequest) => {
|
||||
request.respond(404, 'Not found');
|
||||
};
|
||||
}
|
||||
this.sendRequest = options => new OutgoingWebSocketRequest(options, socket);
|
||||
this.boundOnMessage = this.onMessage.bind(this);
|
||||
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
const onMessage = ({ type, binaryData }: IMessage): void => {
|
||||
if (type !== 'binary' || !binaryData) {
|
||||
throw new Error(`Unsupported websocket message type: ${type}`);
|
||||
}
|
||||
socket.on('message', this.boundOnMessage);
|
||||
|
||||
const message = window.textsecure.protobuf.WebSocketMessage.decode(
|
||||
toArrayBuffer(binaryData)
|
||||
if (options.keepalive) {
|
||||
const keepalive = new KeepAlive(
|
||||
this,
|
||||
options.keepalive === true ? {} : options.keepalive
|
||||
);
|
||||
if (
|
||||
message.type ===
|
||||
window.textsecure.protobuf.WebSocketMessage.Type.REQUEST &&
|
||||
message.request
|
||||
) {
|
||||
handleRequest(
|
||||
new IncomingWebSocketRequest({
|
||||
verb: message.request.verb,
|
||||
path: message.request.path,
|
||||
body: message.request.body,
|
||||
headers: message.request.headers,
|
||||
id: message.request.id,
|
||||
socket,
|
||||
})
|
||||
);
|
||||
} else if (
|
||||
message.type ===
|
||||
window.textsecure.protobuf.WebSocketMessage.Type.RESPONSE &&
|
||||
message.response
|
||||
) {
|
||||
const { response } = message;
|
||||
const request = outgoing[response.id];
|
||||
if (request) {
|
||||
request.response = response;
|
||||
let callback = request.error;
|
||||
if (
|
||||
response.status &&
|
||||
response.status >= 200 &&
|
||||
response.status < 300
|
||||
) {
|
||||
callback = request.success;
|
||||
}
|
||||
this.keepalive = keepalive;
|
||||
|
||||
if (typeof callback === 'function') {
|
||||
callback(response.message, response.status, request);
|
||||
}
|
||||
} else {
|
||||
throw new Error(
|
||||
`Received response for unknown request ${message.response.id}`
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
socket.on('message', onMessage);
|
||||
|
||||
if (opts.keepalive) {
|
||||
this.keepalive = new KeepAlive(this, {
|
||||
path: opts.keepalive.path,
|
||||
disconnect: opts.keepalive.disconnect,
|
||||
});
|
||||
const resetKeepAliveTimer = this.keepalive.reset.bind(this.keepalive);
|
||||
|
||||
this.keepalive.reset();
|
||||
|
||||
socket.on('message', resetKeepAliveTimer);
|
||||
socket.on('close', this.keepalive.stop.bind(this.keepalive));
|
||||
keepalive.reset();
|
||||
socket.on('message', () => keepalive.reset());
|
||||
socket.on('close', () => keepalive.stop());
|
||||
}
|
||||
|
||||
socket.on('close', () => {
|
||||
this.closed = true;
|
||||
});
|
||||
}
|
||||
|
||||
this.close = (code = 3000, reason) => {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
public sendRequest(
|
||||
options: OutgoingWebSocketRequestOptions
|
||||
): OutgoingWebSocketRequest {
|
||||
const id = this.outgoingId;
|
||||
strictAssert(!this.outgoingMap.has(id), 'Duplicate outgoing request');
|
||||
|
||||
window.log.info('WebSocketResource.close()');
|
||||
if (this.keepalive) {
|
||||
this.keepalive.stop();
|
||||
}
|
||||
// eslint-disable-next-line no-bitwise
|
||||
this.outgoingId = Math.max(1, (this.outgoingId + 1) & 0x7fffffff);
|
||||
|
||||
socket.close(code, reason);
|
||||
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
|
||||
// @ts-ignore
|
||||
socket.removeListener('message', onMessage);
|
||||
const outgoing = new OutgoingWebSocketRequest(id, options, this.socket);
|
||||
this.outgoingMap.set(id, outgoing);
|
||||
|
||||
// On linux the socket can wait a long time to emit its close event if we've
|
||||
// lost the internet connection. On the order of minutes. This speeds that
|
||||
// process up.
|
||||
setTimeout(() => {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
this.closed = true;
|
||||
|
||||
window.log.warn('Dispatching our own socket close event');
|
||||
const ev = new Event('close');
|
||||
ev.code = code;
|
||||
ev.reason = reason;
|
||||
this.dispatchEvent(ev);
|
||||
}, 5000);
|
||||
};
|
||||
return outgoing;
|
||||
}
|
||||
|
||||
public forceKeepAlive(): void {
|
||||
|
@ -259,9 +180,83 @@ export default class WebSocketResource extends EventTarget {
|
|||
}
|
||||
this.keepalive.send();
|
||||
}
|
||||
|
||||
public close(code = 3000, reason?: string): void {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
window.log.info('WebSocketResource.close()');
|
||||
if (this.keepalive) {
|
||||
this.keepalive.stop();
|
||||
}
|
||||
|
||||
this.socket.close(code, reason);
|
||||
|
||||
this.socket.removeListener('message', this.boundOnMessage);
|
||||
|
||||
// On linux the socket can wait a long time to emit its close event if we've
|
||||
// lost the internet connection. On the order of minutes. This speeds that
|
||||
// process up.
|
||||
setTimeout(() => {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
window.log.warn('Dispatching our own socket close event');
|
||||
const ev = new Event('close');
|
||||
ev.code = code;
|
||||
ev.reason = reason;
|
||||
this.dispatchEvent(ev);
|
||||
}, 5000);
|
||||
}
|
||||
|
||||
private onMessage({ type, binaryData }: IMessage): void {
|
||||
if (type !== 'binary' || !binaryData) {
|
||||
throw new Error(`Unsupported websocket message type: ${type}`);
|
||||
}
|
||||
|
||||
const message = Proto.WebSocketMessage.decode(binaryData);
|
||||
if (
|
||||
message.type === Proto.WebSocketMessage.Type.REQUEST &&
|
||||
message.request
|
||||
) {
|
||||
const handleRequest =
|
||||
this.options.handleRequest ||
|
||||
(request => request.respond(404, 'Not found'));
|
||||
handleRequest(new IncomingWebSocketRequest(message.request, this.socket));
|
||||
} else if (
|
||||
message.type === Proto.WebSocketMessage.Type.RESPONSE &&
|
||||
message.response
|
||||
) {
|
||||
const { response } = message;
|
||||
strictAssert(response.id, 'response without id');
|
||||
|
||||
const responseId = normalizeNumber(response.id);
|
||||
const request = this.outgoingMap.get(responseId);
|
||||
this.outgoingMap.delete(responseId);
|
||||
|
||||
if (!request) {
|
||||
throw new Error(`Received response for unknown request ${responseId}`);
|
||||
}
|
||||
|
||||
request.response = dropNull(response);
|
||||
|
||||
let callback = request.error;
|
||||
|
||||
const status = response.status ?? -1;
|
||||
if (status >= 200 && status < 300) {
|
||||
callback = request.success;
|
||||
}
|
||||
|
||||
if (typeof callback === 'function') {
|
||||
callback(response.message ?? '', status, request);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type KeepAliveOptionsType = {
|
||||
export type KeepAliveOptionsType = {
|
||||
path?: string;
|
||||
disconnect?: boolean;
|
||||
};
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue