Faster WebSocket reconnects

This commit is contained in:
Fedor Indutny 2021-06-09 15:28:54 -07:00 committed by GitHub
parent 3cac4a19e1
commit 17e6ec468e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 940 additions and 677 deletions

View file

@ -10,9 +10,10 @@
/* eslint-disable max-classes-per-file */
/* eslint-disable no-restricted-syntax */
import { isNumber, map, omit, noop } from 'lodash';
import { isNumber, map, omit } from 'lodash';
import PQueue from 'p-queue';
import { v4 as getGuid } from 'uuid';
import { connection as WebSocket } from 'websocket';
import { z } from 'zod';
import {
@ -41,6 +42,7 @@ import {
SignedPreKeys,
} from '../LibSignalStores';
import { BatcherType, createBatcher } from '../util/batcher';
import { sleep } from '../util/sleep';
import { parseIntOrThrow } from '../util/parseIntOrThrow';
import { Zone } from '../util/Zone';
import EventTarget from './EventTarget';
@ -53,6 +55,7 @@ import Crypto from './Crypto';
import { deriveMasterKeyFromGroupV1, typedArrayToArrayBuffer } from '../Crypto';
import { ContactBuffer, GroupBuffer } from './ContactsParser';
import { isByteBufferEmpty } from '../util/isByteBufferEmpty';
import { SocketStatus } from '../types/SocketStatus';
import {
AttachmentPointerClass,
@ -68,13 +71,12 @@ import {
} from '../textsecure.d';
import { ByteBufferClass } from '../window.d';
import { WebSocket } from './WebSocket';
import { deriveGroupFields, MASTER_KEY_LENGTH } from '../groups';
const GROUPV1_ID_LENGTH = 16;
const GROUPV2_ID_LENGTH = 32;
const RETRY_TIMEOUT = 2 * 60 * 1000;
const RECONNECT_DELAY = 1 * 1000;
const decryptionErrorTypeSchema = z
.object({
@ -169,7 +171,9 @@ enum TaskType {
}
class MessageReceiverInner extends EventTarget {
_onClose?: (ev: any) => Promise<void>;
_onClose?: (code: number, reason: string) => Promise<void>;
_onError?: (error: Error) => Promise<void>;
appQueue: PQueue;
@ -185,7 +189,7 @@ class MessageReceiverInner extends EventTarget {
deviceId?: number;
hasConnected?: boolean;
hasConnected = false;
incomingQueue: PQueue;
@ -209,6 +213,8 @@ class MessageReceiverInner extends EventTarget {
socket?: WebSocket;
socketStatus = SocketStatus.CLOSED;
stoppingProcessing?: boolean;
username: string;
@ -304,7 +310,7 @@ class MessageReceiverInner extends EventTarget {
static arrayBufferToStringBase64 = (arrayBuffer: ArrayBuffer): string =>
window.dcodeIO.ByteBuffer.wrap(arrayBuffer).toString('base64');
connect() {
async connect(): Promise<void> {
if (this.calledClose) {
return;
}
@ -322,17 +328,43 @@ class MessageReceiverInner extends EventTarget {
this.hasConnected = true;
if (this.socket && this.socket.readyState !== WebSocket.CLOSED) {
if (this.socket && this.socket.connected) {
this.socket.close();
this.socket = undefined;
if (this.wsr) {
this.wsr.close();
this.wsr = undefined;
}
}
this.socketStatus = SocketStatus.CONNECTING;
// initialize the socket and start listening for messages
this.socket = this.server.getMessageSocket();
this.socket.onclose = this.onclose.bind(this);
this.socket.onerror = this.onerror.bind(this);
this.socket.onopen = this.onopen.bind(this);
try {
this.socket = await this.server.getMessageSocket();
} catch (error) {
this.socketStatus = SocketStatus.CLOSED;
const event = new Event('error');
event.error = error;
await this.dispatchAndWait(event);
return;
}
this.socketStatus = SocketStatus.OPEN;
window.log.info('websocket open');
window.logMessageReceiverConnect();
if (!this._onClose) {
this._onClose = this.onclose.bind(this);
}
if (!this._onError) {
this._onError = this.onerror.bind(this);
}
this.socket.on('close', this._onClose);
this.socket.on('error', this._onError);
this.wsr = new WebSocketResource(this.socket, {
handleRequest: this.handleRequest.bind(this),
keepalive: {
@ -342,7 +374,6 @@ class MessageReceiverInner extends EventTarget {
});
// Because sometimes the socket doesn't properly emit its close event
this._onClose = this.onclose.bind(this);
if (this._onClose) {
this.wsr.addEventListener('close', this._onClose);
}
@ -362,9 +393,12 @@ class MessageReceiverInner extends EventTarget {
shutdown() {
if (this.socket) {
this.socket.onclose = noop;
this.socket.onerror = noop;
this.socket.onopen = noop;
if (this._onClose) {
this.socket.removeListener('close', this._onClose);
}
if (this._onError) {
this.socket.removeListener('error', this._onError);
}
this.socket = undefined;
}
@ -380,6 +414,7 @@ class MessageReceiverInner extends EventTarget {
async close() {
window.log.info('MessageReceiver.close()');
this.calledClose = true;
this.socketStatus = SocketStatus.CLOSING;
// Our WebSocketResource instance will close the socket and emit a 'close' event
// if the socket doesn't emit one quickly enough.
@ -392,13 +427,8 @@ class MessageReceiverInner extends EventTarget {
return this.drain();
}
onopen() {
window.log.info('websocket open');
window.logMessageReceiverConnect();
}
onerror() {
window.log.error('websocket error');
async onerror(error: Error): Promise<void> {
window.log.error('websocket error', error);
}
async dispatchAndWait(event: Event) {
@ -407,35 +437,41 @@ class MessageReceiverInner extends EventTarget {
return Promise.resolve();
}
async onclose(ev: any) {
async onclose(code: number, reason: string): Promise<void> {
window.log.info(
'websocket closed',
ev.code,
ev.reason || '',
code,
reason || '',
'calledClose:',
this.calledClose
);
this.socketStatus = SocketStatus.CLOSED;
this.shutdown();
if (this.calledClose) {
return Promise.resolve();
return;
}
if (ev.code === 3000) {
return Promise.resolve();
if (code === 3000) {
return;
}
if (ev.code === 3001) {
if (code === 3001) {
this.onEmpty();
}
// possible 403 or network issue. Make an request to confirm
return this.server
.getDevices()
.then(this.connect.bind(this)) // No HTTP error? Reconnect
.catch(async e => {
const event = new Event('error');
event.error = e;
return this.dispatchAndWait(event);
});
await sleep(RECONNECT_DELAY);
// Try to reconnect (if there is an error - we'll get an
// `error` event from `connect()` and hit the retry backoff logic in
// `ts/background.ts`)
await this.connect();
}
checkSocket(): void {
if (this.wsr) {
this.wsr.forceKeepAlive();
}
}
handleRequest(request: IncomingWebSocketRequest) {
@ -1076,14 +1112,8 @@ class MessageReceiverInner extends EventTarget {
throw new Error('Received message with no content and no legacyMessage');
}
getStatus() {
if (this.socket) {
return this.socket.readyState;
}
if (this.hasConnected) {
return WebSocket.CLOSED;
}
return -1;
getStatus(): SocketStatus {
return this.socketStatus;
}
async onDeliveryReceipt(envelope: EnvelopeClass): Promise<void> {
@ -2693,6 +2723,7 @@ export default class MessageReceiver {
this.hasEmptied = inner.hasEmptied.bind(inner);
this.removeEventListener = inner.removeEventListener.bind(inner);
this.stopProcessing = inner.stopProcessing.bind(inner);
this.checkSocket = inner.checkSocket.bind(inner);
this.unregisterBatchers = inner.unregisterBatchers.bind(inner);
inner.connect();
@ -2707,7 +2738,7 @@ export default class MessageReceiver {
attachment: AttachmentPointerClass
) => Promise<DownloadAttachmentType>;
getStatus: () => number;
getStatus: () => SocketStatus;
hasEmptied: () => boolean;
@ -2717,6 +2748,8 @@ export default class MessageReceiver {
unregisterBatchers: () => void;
checkSocket: () => void;
getProcessedCount: () => number;
static stringToArrayBuffer = MessageReceiverInner.stringToArrayBuffer;