Upgrade libsignal-client to 0.44.0 and adopt API changes

This commit is contained in:
Sergey Skrobotov 2024-04-04 14:39:52 -07:00 committed by GitHub
parent 37725647c8
commit e388f13910
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 266 additions and 138 deletions

View file

@ -26,7 +26,6 @@
/* eslint-disable @typescript-eslint/no-namespace */
/* eslint-disable @typescript-eslint/brace-style */
import { Net } from '@signalapp/libsignal-client';
import type { connection as WebSocket, IMessage } from 'websocket';
import Long from 'long';
import pTimeout from 'p-timeout';
@ -35,8 +34,9 @@ import net from 'net';
import { z } from 'zod';
import { clearInterval } from 'timers';
import { random } from 'lodash';
import type { DebugInfo } from '@signalapp/libsignal-client/Native';
import type { ChatServiceDebugInfo } from '@signalapp/libsignal-client/Native';
import type { Net } from '@signalapp/libsignal-client';
import type { EventHandler } from './EventTarget';
import EventTarget from './EventTarget';
@ -50,14 +50,13 @@ import { SignalService as Proto } from '../protobuf';
import * as log from '../logging/log';
import * as Timers from '../Timers';
import type { IResource } from './WebSocket';
import { isProduction, isStaging } from '../util/version';
import { isProduction } from '../util/version';
import { ToastType } from '../types/Toast';
import { AbortableProcess } from '../util/AbortableProcess';
const THIRTY_SECONDS = 30 * durations.SECOND;
const HEALTHCHECK_TIMEOUT = durations.SECOND;
const STATS_UPDATE_INTERVAL = durations.MINUTE;
const MAX_MESSAGE_SIZE = 512 * 1024;
@ -83,6 +82,7 @@ export namespace IpVersion {
}
const AggregatedStatsSchema = z.object({
connectionFailures: z.number(),
requestsCompared: z.number(),
ipVersionMismatches: z.number(),
unexpectedReconnects: z.number(),
@ -127,6 +127,7 @@ export namespace AggregatedStats {
export function add(a: AggregatedStats, b: AggregatedStats): AggregatedStats {
return {
requestsCompared: a.requestsCompared + b.requestsCompared,
connectionFailures: a.connectionFailures + b.connectionFailures,
healthcheckFailures: a.healthcheckFailures + b.healthcheckFailures,
ipVersionMismatches: a.ipVersionMismatches + b.ipVersionMismatches,
unexpectedReconnects: a.unexpectedReconnects + b.unexpectedReconnects,
@ -138,6 +139,7 @@ export namespace AggregatedStats {
export function createEmpty(): AggregatedStats {
return {
requestsCompared: 0,
connectionFailures: 0,
ipVersionMismatches: 0,
unexpectedReconnects: 0,
healthcheckFailures: 0,
@ -152,9 +154,10 @@ export namespace AggregatedStats {
return false;
}
return (
stats.healthcheckBadStatus + stats.healthcheckFailures > 20 ||
stats.ipVersionMismatches > 50 ||
stats.unexpectedReconnects > 50
stats.healthcheckBadStatus +
stats.healthcheckFailures +
stats.connectionFailures >
20 || stats.unexpectedReconnects > 50
);
}
@ -261,11 +264,42 @@ export interface IWebSocketResource extends IResource {
}
export class LibsignalWebSocketResource implements IWebSocketResource {
private readonly net: Net.Net;
constructor(
private readonly chatService: Net.ChatService,
private readonly socketIpVersion: IpVersion | undefined
) {}
constructor(version: string) {
this.net = new Net.Net(
isStaging(version) ? Net.Environment.Staging : Net.Environment.Production
public static connect(
libsignalNet: Net.Net,
name: string
): AbortableProcess<LibsignalWebSocketResource> {
const chatService = libsignalNet.newChatService();
const connectAsync = async () => {
try {
const debugInfo = await chatService.connectUnauthenticated();
log.info(`LibsignalWebSocketResource(${name}) connected`, debugInfo);
return new LibsignalWebSocketResource(
chatService,
IpVersion.fromDebugInfoCode(debugInfo.ipType)
);
} catch (error) {
// Handle any errors that occur during connection
log.error(
`LibsignalWebSocketResource(${name}) connection failed`,
Errors.toLogFormat(error)
);
throw error;
}
};
return new AbortableProcess<LibsignalWebSocketResource>(
`LibsignalWebSocketResource.connect(${name})`,
{
abort() {
// if interrupted, trying to disconnect
drop(chatService.disconnect());
},
},
connectAsync()
);
}
@ -273,6 +307,10 @@ export class LibsignalWebSocketResource implements IWebSocketResource {
return undefined;
}
public ipVersion(): IpVersion | undefined {
return this.socketIpVersion;
}
public addEventListener(
_name: 'close',
_handler: (ev: CloseEvent) => void
@ -281,13 +319,11 @@ export class LibsignalWebSocketResource implements IWebSocketResource {
}
public close(_code?: number, _reason?: string): void {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.net.disconnectChatService();
drop(this.chatService.disconnect());
}
public shutdown(): void {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.net.disconnectChatService();
drop(this.chatService.disconnect());
}
public forceKeepAlive(): void {
@ -301,16 +337,15 @@ export class LibsignalWebSocketResource implements IWebSocketResource {
public async sendRequestGetDebugInfo(
options: SendRequestOptions
): Promise<[Response, DebugInfo]> {
const { response, debugInfo } = await this.net.unauthenticatedFetchAndDebug(
{
): Promise<[Response, ChatServiceDebugInfo]> {
const { response, debugInfo } =
await this.chatService.unauthenticatedFetchAndDebug({
verb: options.verb,
path: options.path,
headers: options.headers ? options.headers : [],
body: options.body,
timeoutMillis: options.timeout,
}
);
});
return [
new Response(response.body, {
status: response.status,
@ -323,9 +358,7 @@ export class LibsignalWebSocketResource implements IWebSocketResource {
}
export class WebSocketResourceWithShadowing implements IWebSocketResource {
private main: WebSocketResource;
private shadowing: LibsignalWebSocketResource;
private shadowing: LibsignalWebSocketResource | undefined;
private stats: AggregatedStats;
@ -336,12 +369,10 @@ export class WebSocketResourceWithShadowing implements IWebSocketResource {
private logId: string;
constructor(
socket: WebSocket,
options: WebSocketResourceOptions,
version: string
private readonly main: WebSocketResource,
private readonly shadowingConnection: AbortableProcess<LibsignalWebSocketResource>,
options: WebSocketResourceOptions
) {
this.main = new WebSocketResource(socket, options);
this.shadowing = new LibsignalWebSocketResource(version);
this.stats = AggregatedStats.createEmpty();
this.logId = `WebSocketResourceWithShadowing(${options.name})`;
this.statsTimer = setInterval(
@ -351,6 +382,28 @@ export class WebSocketResourceWithShadowing implements IWebSocketResource {
this.shadowingWithReporting =
options.transportOption === TransportOption.ShadowingHigh;
// the idea is that we want to keep the shadowing connection process
// "in the background", so that the main connection wouldn't need to wait on it.
// then when we're connected, `this.shadowing` socket resource is initialized
// or an error reported in case of connection failure
const initializeAfterConnected = async () => {
try {
this.shadowing = await shadowingConnection.resultPromise;
// checking IP one time per connection
if (this.main.ipVersion() !== this.shadowing.ipVersion()) {
this.stats.ipVersionMismatches += 1;
const mainIpType = this.main.ipVersion();
const shadowIpType = this.shadowing.ipVersion();
log.warn(
`${this.logId}: libsignal websocket IP [${shadowIpType}], Desktop websocket IP [${mainIpType}]`
);
}
} catch (error) {
this.stats.connectionFailures += 1;
}
};
drop(initializeAfterConnected());
this.addEventListener('close', (_ev): void => {
clearInterval(this.statsTimer);
this.updateStats(options.name);
@ -392,12 +445,20 @@ export class WebSocketResourceWithShadowing implements IWebSocketResource {
public close(): void {
this.main.close();
this.shadowing.close();
if (this.shadowing) {
this.shadowing.close();
} else {
this.shadowingConnection.abort();
}
}
public shutdown(): void {
this.main.shutdown();
this.shadowing.shutdown();
if (this.shadowing) {
this.shadowing.shutdown();
} else {
this.shadowingConnection.abort();
}
}
public forceKeepAlive(timeout?: number): void {
@ -421,12 +482,20 @@ export class WebSocketResourceWithShadowing implements IWebSocketResource {
}
private async sendShadowRequest(): Promise<void> {
// it could be that we're still connecting libsignal websocket
// in which case we're skipping the check
if (!this.shadowing) {
log.info(
`${this.logId}: skipping healthcheck - websocket not connected yet`
);
return;
}
try {
const [healthCheckResult, debugInfo] =
await this.shadowing.sendRequestGetDebugInfo({
verb: 'GET',
path: '/v1/keepalive',
timeout: HEALTHCHECK_TIMEOUT,
timeout: KEEPALIVE_TIMEOUT_MS,
});
this.stats.requestsCompared += 1;
if (!isSuccessfulStatusCode(healthCheckResult.status)) {
@ -435,18 +504,7 @@ export class WebSocketResourceWithShadowing implements IWebSocketResource {
`${this.logId}: keepalive via libsignal responded with status [${healthCheckResult.status}]`
);
}
const ipVersion = IpVersion.fromDebugInfoCode(debugInfo.ipType);
if (this.main.ipVersion() !== ipVersion) {
this.stats.ipVersionMismatches += 1;
log.warn(
`${
this.logId
}: keepalive via libsignal using IP [${ipVersion}] while main is using IP [${this.main.ipVersion()}]`
);
}
if (debugInfo.reconnectCount > 1) {
this.stats.unexpectedReconnects = debugInfo.reconnectCount - 1;
}
this.stats.unexpectedReconnects = debugInfo.reconnectCount;
} catch (error) {
this.stats.healthcheckFailures += 1;
log.warn(
@ -823,7 +881,7 @@ const KEEPALIVE_INTERVAL_MS = 30 * durations.SECOND;
// immediate disconnect.
const STALE_THRESHOLD_MS = 5 * durations.MINUTE;
// If we don't receive a response to keepalive request within 10 seconds -
// If we don't receive a response to keepalive request within 30 seconds -
// close the socket.
const KEEPALIVE_TIMEOUT_MS = 30 * durations.SECOND;