unauthenticated WebSocket via libsignal: shadowing mode
Co-authored-by: Scott Nonnenberg <scott@signal.org>
This commit is contained in:
parent
d696a2c082
commit
9f40562b19
14 changed files with 636 additions and 119 deletions
|
@ -23,9 +23,19 @@
|
|||
*
|
||||
*/
|
||||
|
||||
/* eslint-disable @typescript-eslint/no-namespace */
|
||||
/* eslint-disable @typescript-eslint/brace-style */
|
||||
|
||||
import { Net } from '@signalapp/libsignal-client';
|
||||
import type { connection as WebSocket, IMessage } from 'websocket';
|
||||
import Long from 'long';
|
||||
import pTimeout from 'p-timeout';
|
||||
import { Response } from 'node-fetch';
|
||||
import net from 'net';
|
||||
import { z } from 'zod';
|
||||
import { clearInterval } from 'timers';
|
||||
import { random } from 'lodash';
|
||||
import type { DebugInfo } from '@signalapp/libsignal-client/Native';
|
||||
|
||||
import type { EventHandler } from './EventTarget';
|
||||
import EventTarget from './EventTarget';
|
||||
|
@ -38,11 +48,121 @@ import * as Errors from '../types/errors';
|
|||
import { SignalService as Proto } from '../protobuf';
|
||||
import * as log from '../logging/log';
|
||||
import * as Timers from '../Timers';
|
||||
import type { IResource } from './WebSocket';
|
||||
import { isProduction, isStaging } from '../util/version';
|
||||
|
||||
import { ToastType } from '../types/Toast';
|
||||
import { drop } from '../util/drop';
|
||||
|
||||
const THIRTY_SECONDS = 30 * durations.SECOND;
|
||||
|
||||
const HEALTHCHECK_TIMEOUT = durations.SECOND;
|
||||
|
||||
const STATS_UPDATE_INTERVAL = durations.MINUTE;
|
||||
|
||||
const MAX_MESSAGE_SIZE = 512 * 1024;
|
||||
|
||||
const AGGREGATED_STATS_KEY = 'websocketStats';
|
||||
|
||||
export enum IpVersion {
|
||||
IPv4 = 'ipv4',
|
||||
IPv6 = 'ipv6',
|
||||
}
|
||||
|
||||
export namespace IpVersion {
|
||||
export function fromDebugInfoCode(ipType: number): IpVersion | undefined {
|
||||
switch (ipType) {
|
||||
case 1:
|
||||
return IpVersion.IPv4;
|
||||
case 2:
|
||||
return IpVersion.IPv6;
|
||||
default:
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const AggregatedStatsSchema = z.object({
|
||||
requestsCompared: z.number(),
|
||||
ipVersionMismatches: z.number(),
|
||||
unexpectedReconnects: z.number(),
|
||||
healthcheckFailures: z.number(),
|
||||
healthcheckBadStatus: z.number(),
|
||||
lastToastTimestamp: z.number(),
|
||||
});
|
||||
|
||||
export type AggregatedStats = z.infer<typeof AggregatedStatsSchema>;
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-redeclare
|
||||
export namespace AggregatedStats {
|
||||
export function loadOrCreateEmpty(name: string): AggregatedStats {
|
||||
const key = localStorageKey(name);
|
||||
try {
|
||||
const json = localStorage.getItem(key);
|
||||
return json != null
|
||||
? AggregatedStatsSchema.parse(JSON.parse(json))
|
||||
: createEmpty();
|
||||
} catch (error) {
|
||||
log.warn(
|
||||
`Could not load [${key}] from local storage. Possibly, attempting to load for the first time`,
|
||||
Errors.toLogFormat(error)
|
||||
);
|
||||
return createEmpty();
|
||||
}
|
||||
}
|
||||
|
||||
export function store(stats: AggregatedStats, name: string): void {
|
||||
const key = localStorageKey(name);
|
||||
try {
|
||||
const json = JSON.stringify(stats);
|
||||
localStorage.setItem(key, json);
|
||||
} catch (error) {
|
||||
log.warn(
|
||||
`Failed to store key [${key}] to the local storage`,
|
||||
Errors.toLogFormat(error)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
export function add(a: AggregatedStats, b: AggregatedStats): AggregatedStats {
|
||||
return {
|
||||
requestsCompared: a.requestsCompared + b.requestsCompared,
|
||||
healthcheckFailures: a.healthcheckFailures + b.healthcheckFailures,
|
||||
ipVersionMismatches: a.ipVersionMismatches + b.ipVersionMismatches,
|
||||
unexpectedReconnects: a.unexpectedReconnects + b.unexpectedReconnects,
|
||||
healthcheckBadStatus: a.healthcheckBadStatus + b.healthcheckBadStatus,
|
||||
lastToastTimestamp: Math.max(a.lastToastTimestamp, b.lastToastTimestamp),
|
||||
};
|
||||
}
|
||||
|
||||
export function createEmpty(): AggregatedStats {
|
||||
return {
|
||||
requestsCompared: 0,
|
||||
ipVersionMismatches: 0,
|
||||
unexpectedReconnects: 0,
|
||||
healthcheckFailures: 0,
|
||||
healthcheckBadStatus: 0,
|
||||
lastToastTimestamp: 0,
|
||||
};
|
||||
}
|
||||
|
||||
export function shouldReportError(stats: AggregatedStats): boolean {
|
||||
const timeSinceLastToast = Date.now() - stats.lastToastTimestamp;
|
||||
if (timeSinceLastToast < durations.DAY || stats.requestsCompared < 1000) {
|
||||
return false;
|
||||
}
|
||||
return (
|
||||
stats.healthcheckBadStatus + stats.healthcheckFailures > 20 ||
|
||||
stats.ipVersionMismatches > 50 ||
|
||||
stats.unexpectedReconnects > 50
|
||||
);
|
||||
}
|
||||
|
||||
export function localStorageKey(name: string): string {
|
||||
return `${AGGREGATED_STATS_KEY}.${name}`;
|
||||
}
|
||||
}
|
||||
|
||||
export class IncomingWebSocketRequest {
|
||||
private readonly id: Long;
|
||||
|
||||
|
@ -84,7 +204,7 @@ export type SendRequestOptions = Readonly<{
|
|||
path: string;
|
||||
body?: Uint8Array;
|
||||
timeout?: number;
|
||||
headers?: ReadonlyArray<string>;
|
||||
headers?: ReadonlyArray<[string, string]>;
|
||||
}>;
|
||||
|
||||
export type SendRequestResult = Readonly<{
|
||||
|
@ -94,10 +214,29 @@ export type SendRequestResult = Readonly<{
|
|||
headers: ReadonlyArray<string>;
|
||||
}>;
|
||||
|
||||
export enum TransportOption {
|
||||
// Only original transport is used
|
||||
Original = 'original',
|
||||
// All requests are going through the original transport,
|
||||
// but for every request that completes sucessfully we're initiating
|
||||
// a healthcheck request via libsignal transport,
|
||||
// collecting comparison statistics, and if we see many inconsistencies,
|
||||
// we're showing a toast asking user to submit a debug log
|
||||
ShadowingHigh = 'shadowingHigh',
|
||||
// Similar to `shadowingHigh`, however, only 10% of requests
|
||||
// will trigger a healthcheck, and toast is never shown.
|
||||
// Statistics data is still added to the debug logs,
|
||||
// so it will be available to us with all the debug log uploads.
|
||||
ShadowingLow = 'shadowingLow',
|
||||
// Only libsignal transport is used
|
||||
Libsignal = 'libsignal',
|
||||
}
|
||||
|
||||
export type WebSocketResourceOptions = {
|
||||
name: string;
|
||||
handleRequest?: (request: IncomingWebSocketRequest) => void;
|
||||
keepalive?: KeepAliveOptionsType;
|
||||
transportOption?: TransportOption;
|
||||
};
|
||||
|
||||
export class CloseEvent extends Event {
|
||||
|
@ -106,7 +245,225 @@ export class CloseEvent extends Event {
|
|||
}
|
||||
}
|
||||
|
||||
export default class WebSocketResource extends EventTarget {
|
||||
// eslint-disable-next-line no-restricted-syntax
|
||||
export interface IWebSocketResource extends IResource {
|
||||
sendRequest(options: SendRequestOptions): Promise<Response>;
|
||||
|
||||
addEventListener(name: 'close', handler: (ev: CloseEvent) => void): void;
|
||||
|
||||
forceKeepAlive(): void;
|
||||
|
||||
shutdown(): void;
|
||||
|
||||
close(): void;
|
||||
|
||||
localPort(): number | undefined;
|
||||
}
|
||||
|
||||
export class LibsignalWebSocketResource implements IWebSocketResource {
|
||||
private readonly net: Net.Net;
|
||||
|
||||
constructor(version: string) {
|
||||
this.net = new Net.Net(
|
||||
isStaging(version) ? Net.Environment.Staging : Net.Environment.Production
|
||||
);
|
||||
}
|
||||
|
||||
public localPort(): number | undefined {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
public addEventListener(
|
||||
_name: 'close',
|
||||
_handler: (ev: CloseEvent) => void
|
||||
): void {
|
||||
// noop
|
||||
}
|
||||
|
||||
public close(_code?: number, _reason?: string): void {
|
||||
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
||||
this.net.disconnectChatService();
|
||||
}
|
||||
|
||||
public shutdown(): void {
|
||||
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
||||
this.net.disconnectChatService();
|
||||
}
|
||||
|
||||
public forceKeepAlive(): void {
|
||||
// no-op
|
||||
}
|
||||
|
||||
public async sendRequest(options: SendRequestOptions): Promise<Response> {
|
||||
const [response] = await this.sendRequestGetDebugInfo(options);
|
||||
return response;
|
||||
}
|
||||
|
||||
public async sendRequestGetDebugInfo(
|
||||
options: SendRequestOptions
|
||||
): Promise<[Response, DebugInfo]> {
|
||||
const { response, debugInfo } = await this.net.unauthenticatedFetchAndDebug(
|
||||
{
|
||||
verb: options.verb,
|
||||
path: options.path,
|
||||
headers: options.headers ? options.headers : [],
|
||||
body: options.body,
|
||||
timeoutMillis: options.timeout,
|
||||
}
|
||||
);
|
||||
return [
|
||||
new Response(response.body, {
|
||||
status: response.status,
|
||||
statusText: response.message,
|
||||
headers: [...response.headers],
|
||||
}),
|
||||
debugInfo,
|
||||
];
|
||||
}
|
||||
}
|
||||
|
||||
export class WebSocketResourceWithShadowing implements IWebSocketResource {
|
||||
private main: WebSocketResource;
|
||||
|
||||
private shadowing: LibsignalWebSocketResource;
|
||||
|
||||
private stats: AggregatedStats;
|
||||
|
||||
private statsTimer: NodeJS.Timer;
|
||||
|
||||
private shadowingWithReporting: boolean;
|
||||
|
||||
private logId: string;
|
||||
|
||||
constructor(
|
||||
socket: WebSocket,
|
||||
options: WebSocketResourceOptions,
|
||||
version: string
|
||||
) {
|
||||
this.main = new WebSocketResource(socket, options);
|
||||
this.shadowing = new LibsignalWebSocketResource(version);
|
||||
this.stats = AggregatedStats.createEmpty();
|
||||
this.logId = `WebSocketResourceWithShadowing(${options.name})`;
|
||||
this.statsTimer = setInterval(
|
||||
() => this.updateStats(options.name),
|
||||
STATS_UPDATE_INTERVAL
|
||||
);
|
||||
this.shadowingWithReporting =
|
||||
options.transportOption === TransportOption.ShadowingHigh;
|
||||
|
||||
this.addEventListener('close', (_ev): void => {
|
||||
clearInterval(this.statsTimer);
|
||||
this.updateStats(options.name);
|
||||
});
|
||||
}
|
||||
|
||||
private updateStats(name: string) {
|
||||
const storedStats = AggregatedStats.loadOrCreateEmpty(name);
|
||||
const updatedStats = AggregatedStats.add(storedStats, this.stats);
|
||||
if (
|
||||
this.shadowingWithReporting &&
|
||||
AggregatedStats.shouldReportError(updatedStats) &&
|
||||
!isProduction(window.getVersion())
|
||||
) {
|
||||
window.reduxActions.toast.showToast({
|
||||
toastType: ToastType.TransportError,
|
||||
});
|
||||
updatedStats.lastToastTimestamp = Date.now();
|
||||
}
|
||||
AggregatedStats.store(updatedStats, name);
|
||||
this.stats = AggregatedStats.createEmpty();
|
||||
}
|
||||
|
||||
public localPort(): number | undefined {
|
||||
return this.main.localPort();
|
||||
}
|
||||
|
||||
public addEventListener(
|
||||
name: 'close',
|
||||
handler: (ev: CloseEvent) => void
|
||||
): void {
|
||||
this.main.addEventListener(name, handler);
|
||||
}
|
||||
|
||||
public close(): void {
|
||||
this.main.close();
|
||||
this.shadowing.close();
|
||||
}
|
||||
|
||||
public shutdown(): void {
|
||||
this.main.shutdown();
|
||||
this.shadowing.shutdown();
|
||||
}
|
||||
|
||||
public forceKeepAlive(): void {
|
||||
this.main.forceKeepAlive();
|
||||
}
|
||||
|
||||
public async sendRequest(options: SendRequestOptions): Promise<Response> {
|
||||
const responsePromise = this.main.sendRequest(options);
|
||||
const response = await responsePromise;
|
||||
|
||||
// if we're received a response from the main channel and the status was successful,
|
||||
// attempting to run a healthcheck on a libsignal transport.
|
||||
if (
|
||||
isSuccessfulStatusCode(response.status) &&
|
||||
this.shouldSendShadowRequest()
|
||||
) {
|
||||
drop(this.sendShadowRequest());
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
private async sendShadowRequest(): Promise<void> {
|
||||
try {
|
||||
const [healthCheckResult, debugInfo] =
|
||||
await this.shadowing.sendRequestGetDebugInfo({
|
||||
verb: 'GET',
|
||||
path: '/v1/keepalive',
|
||||
timeout: HEALTHCHECK_TIMEOUT,
|
||||
});
|
||||
this.stats.requestsCompared += 1;
|
||||
if (!isSuccessfulStatusCode(healthCheckResult.status)) {
|
||||
this.stats.healthcheckBadStatus += 1;
|
||||
log.warn(
|
||||
`${this.logId}: keepalive via libsignal responded with status [${healthCheckResult.status}]`
|
||||
);
|
||||
}
|
||||
const ipVersion = IpVersion.fromDebugInfoCode(debugInfo.ipType);
|
||||
if (this.main.ipVersion() !== ipVersion) {
|
||||
this.stats.ipVersionMismatches += 1;
|
||||
log.warn(
|
||||
`${
|
||||
this.logId
|
||||
}: keepalive via libsignal using IP [${ipVersion}] while main is using IP [${this.main.ipVersion()}]`
|
||||
);
|
||||
}
|
||||
if (debugInfo.reconnectCount > 1) {
|
||||
this.stats.unexpectedReconnects = debugInfo.reconnectCount - 1;
|
||||
}
|
||||
} catch (error) {
|
||||
this.stats.healthcheckFailures += 1;
|
||||
log.warn(
|
||||
`${this.logId}: failed to send keepalive via libsignal`,
|
||||
Errors.toLogFormat(error)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private shouldSendShadowRequest(): boolean {
|
||||
return this.shadowingWithReporting || random(0, 100) < 10;
|
||||
}
|
||||
}
|
||||
|
||||
function isSuccessfulStatusCode(status: number): boolean {
|
||||
return status >= 200 && status < 300;
|
||||
}
|
||||
|
||||
export default class WebSocketResource
|
||||
extends EventTarget
|
||||
implements IWebSocketResource
|
||||
{
|
||||
private outgoingId = Long.fromNumber(1, true);
|
||||
|
||||
private closed = false;
|
||||
|
@ -126,7 +483,9 @@ export default class WebSocketResource extends EventTarget {
|
|||
|
||||
private readonly logId: string;
|
||||
|
||||
public readonly localPort: number | undefined;
|
||||
private readonly localSocketPort: number | undefined;
|
||||
|
||||
private readonly socketIpVersion: IpVersion | undefined;
|
||||
|
||||
// Public for tests
|
||||
public readonly keepalive?: KeepAlive;
|
||||
|
@ -138,7 +497,20 @@ export default class WebSocketResource extends EventTarget {
|
|||
super();
|
||||
|
||||
this.logId = `WebSocketResource(${options.name})`;
|
||||
this.localPort = socket.socket.localPort;
|
||||
this.localSocketPort = socket.socket.localPort;
|
||||
|
||||
if (!socket.socket.localAddress) {
|
||||
this.socketIpVersion = undefined;
|
||||
}
|
||||
if (socket.socket.localAddress == null) {
|
||||
this.socketIpVersion = undefined;
|
||||
} else if (net.isIPv4(socket.socket.localAddress)) {
|
||||
this.socketIpVersion = IpVersion.IPv4;
|
||||
} else if (net.isIPv6(socket.socket.localAddress)) {
|
||||
this.socketIpVersion = IpVersion.IPv6;
|
||||
} else {
|
||||
this.socketIpVersion = undefined;
|
||||
}
|
||||
|
||||
this.boundOnMessage = this.onMessage.bind(this);
|
||||
|
||||
|
@ -169,6 +541,14 @@ export default class WebSocketResource extends EventTarget {
|
|||
this.addEventListener('close', () => this.onClose());
|
||||
}
|
||||
|
||||
public ipVersion(): IpVersion | undefined {
|
||||
return this.socketIpVersion;
|
||||
}
|
||||
|
||||
public localPort(): number | undefined {
|
||||
return this.localSocketPort;
|
||||
}
|
||||
|
||||
public override addEventListener(
|
||||
name: 'close',
|
||||
handler: (ev: CloseEvent) => void
|
||||
|
@ -178,9 +558,7 @@ export default class WebSocketResource extends EventTarget {
|
|||
return super.addEventListener(name, handler);
|
||||
}
|
||||
|
||||
public async sendRequest(
|
||||
options: SendRequestOptions
|
||||
): Promise<SendRequestResult> {
|
||||
public async sendRequest(options: SendRequestOptions): Promise<Response> {
|
||||
const id = this.outgoingId;
|
||||
const idString = id.toString();
|
||||
strictAssert(!this.outgoingMap.has(idString), 'Duplicate outgoing request');
|
||||
|
@ -194,7 +572,13 @@ export default class WebSocketResource extends EventTarget {
|
|||
verb: options.verb,
|
||||
path: options.path,
|
||||
body: options.body,
|
||||
headers: options.headers ? options.headers.slice() : undefined,
|
||||
headers: options.headers
|
||||
? options.headers
|
||||
.map(([key, value]) => {
|
||||
return `${key}:${value}`;
|
||||
})
|
||||
.slice()
|
||||
: undefined,
|
||||
id,
|
||||
},
|
||||
}).finish();
|
||||
|
@ -239,7 +623,8 @@ export default class WebSocketResource extends EventTarget {
|
|||
|
||||
this.socket.sendBytes(Buffer.from(bytes));
|
||||
|
||||
return promise;
|
||||
const requestResult = await promise;
|
||||
return WebSocketResource.intoResponse(requestResult);
|
||||
}
|
||||
|
||||
public forceKeepAlive(): void {
|
||||
|
@ -399,6 +784,27 @@ export default class WebSocketResource extends EventTarget {
|
|||
log.info(`${this.logId}.removeActive: shutdown complete`);
|
||||
this.close(3000, 'Shutdown');
|
||||
}
|
||||
|
||||
private static intoResponse(sendRequestResult: SendRequestResult): Response {
|
||||
const {
|
||||
status,
|
||||
message: statusText,
|
||||
response,
|
||||
headers: flatResponseHeaders,
|
||||
} = sendRequestResult;
|
||||
|
||||
const headers: Array<[string, string]> = flatResponseHeaders.map(header => {
|
||||
const [key, value] = header.split(':', 2);
|
||||
strictAssert(value !== undefined, 'Invalid header!');
|
||||
return [key, value];
|
||||
});
|
||||
|
||||
return new Response(response, {
|
||||
status,
|
||||
statusText,
|
||||
headers,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export type KeepAliveOptionsType = {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue