Adopt libsignal-net version with no auto-reconnect
Co-authored-by: Jordan Rose <jrose@signal.org>
This commit is contained in:
parent
00e6071b1d
commit
30a419bb2a
7 changed files with 139 additions and 71 deletions
|
@ -38,7 +38,11 @@ import type { ChatServiceDebugInfo } from '@signalapp/libsignal-client/Native';
|
|||
|
||||
import type { Net } from '@signalapp/libsignal-client';
|
||||
import { Buffer } from 'node:buffer';
|
||||
import type { ChatServerMessageAck } from '@signalapp/libsignal-client/dist/net';
|
||||
import type {
|
||||
ChatServerMessageAck,
|
||||
ChatServiceListener,
|
||||
ConnectionEventsListener,
|
||||
} from '@signalapp/libsignal-client/dist/net';
|
||||
import type { EventHandler } from './EventTarget';
|
||||
import EventTarget from './EventTarget';
|
||||
|
||||
|
@ -88,7 +92,6 @@ const AggregatedStatsSchema = z.object({
|
|||
connectionFailures: z.number(),
|
||||
requestsCompared: z.number(),
|
||||
ipVersionMismatches: z.number(),
|
||||
unexpectedReconnects: z.number(),
|
||||
healthcheckFailures: z.number(),
|
||||
healthcheckBadStatus: z.number(),
|
||||
lastToastTimestamp: z.number(),
|
||||
|
@ -133,7 +136,6 @@ export namespace AggregatedStats {
|
|||
connectionFailures: a.connectionFailures + b.connectionFailures,
|
||||
healthcheckFailures: a.healthcheckFailures + b.healthcheckFailures,
|
||||
ipVersionMismatches: a.ipVersionMismatches + b.ipVersionMismatches,
|
||||
unexpectedReconnects: a.unexpectedReconnects + b.unexpectedReconnects,
|
||||
healthcheckBadStatus: a.healthcheckBadStatus + b.healthcheckBadStatus,
|
||||
lastToastTimestamp: Math.max(a.lastToastTimestamp, b.lastToastTimestamp),
|
||||
};
|
||||
|
@ -144,7 +146,6 @@ export namespace AggregatedStats {
|
|||
requestsCompared: 0,
|
||||
connectionFailures: 0,
|
||||
ipVersionMismatches: 0,
|
||||
unexpectedReconnects: 0,
|
||||
healthcheckFailures: 0,
|
||||
healthcheckBadStatus: 0,
|
||||
lastToastTimestamp: 0,
|
||||
|
@ -156,12 +157,11 @@ export namespace AggregatedStats {
|
|||
if (timeSinceLastToast < durations.DAY || stats.requestsCompared < 1000) {
|
||||
return false;
|
||||
}
|
||||
return (
|
||||
const totalFailuresSinceLastToast =
|
||||
stats.healthcheckBadStatus +
|
||||
stats.healthcheckFailures +
|
||||
stats.connectionFailures >
|
||||
20 || stats.unexpectedReconnects > 50
|
||||
);
|
||||
stats.healthcheckFailures +
|
||||
stats.connectionFailures;
|
||||
return totalFailuresSinceLastToast > 20;
|
||||
}
|
||||
|
||||
export function localStorageKey(name: string): string {
|
||||
|
@ -330,6 +330,10 @@ export interface IWebSocketResource extends IResource {
|
|||
localPort(): number | undefined;
|
||||
}
|
||||
|
||||
type LibsignalWebSocketResourceHolder = {
|
||||
resource: LibsignalWebSocketResource | undefined;
|
||||
};
|
||||
|
||||
export function connectUnauthenticatedLibsignal({
|
||||
libsignalNet,
|
||||
name,
|
||||
|
@ -337,7 +341,24 @@ export function connectUnauthenticatedLibsignal({
|
|||
libsignalNet: Net.Net;
|
||||
name: string;
|
||||
}): AbortableProcess<LibsignalWebSocketResource> {
|
||||
return connectLibsignal(libsignalNet.newUnauthenticatedChatService(), name);
|
||||
const logId = `LibsignalWebSocketResource(${name})`;
|
||||
const listener: LibsignalWebSocketResourceHolder & ConnectionEventsListener =
|
||||
{
|
||||
resource: undefined,
|
||||
onConnectionInterrupted(): void {
|
||||
if (!this.resource) {
|
||||
logDisconnectedListenerWarn(logId, 'onConnectionInterrupted');
|
||||
return;
|
||||
}
|
||||
this.resource.onConnectionInterrupted();
|
||||
this.resource = undefined;
|
||||
},
|
||||
};
|
||||
return connectLibsignal(
|
||||
libsignalNet.newUnauthenticatedChatService(listener),
|
||||
listener,
|
||||
logId
|
||||
);
|
||||
}
|
||||
|
||||
export function connectAuthenticatedLibsignal({
|
||||
|
@ -353,12 +374,15 @@ export function connectAuthenticatedLibsignal({
|
|||
handler: (request: IncomingWebSocketRequest) => void;
|
||||
receiveStories: boolean;
|
||||
}): AbortableProcess<LibsignalWebSocketResource> {
|
||||
const listener = {
|
||||
const logId = `LibsignalWebSocketResource(${name})`;
|
||||
const listener: LibsignalWebSocketResourceHolder & ChatServiceListener = {
|
||||
resource: undefined,
|
||||
onIncomingMessage(
|
||||
envelope: Buffer,
|
||||
timestamp: number,
|
||||
ack: ChatServerMessageAck
|
||||
): void {
|
||||
// Handle incoming messages even if we've disconnected.
|
||||
const request = new IncomingWebSocketRequestLibsignal(
|
||||
ServerRequestType.ApiMessage,
|
||||
envelope,
|
||||
|
@ -368,6 +392,10 @@ export function connectAuthenticatedLibsignal({
|
|||
handler(request);
|
||||
},
|
||||
onQueueEmpty(): void {
|
||||
if (!this.resource) {
|
||||
logDisconnectedListenerWarn(logId, 'onQueueEmpty');
|
||||
return;
|
||||
}
|
||||
const request = new IncomingWebSocketRequestLibsignal(
|
||||
ServerRequestType.ApiEmptyQueue,
|
||||
undefined,
|
||||
|
@ -377,7 +405,12 @@ export function connectAuthenticatedLibsignal({
|
|||
handler(request);
|
||||
},
|
||||
onConnectionInterrupted(): void {
|
||||
log.warn(`LibsignalWebSocketResource(${name}): connection interrupted`);
|
||||
if (!this.resource) {
|
||||
logDisconnectedListenerWarn(logId, 'onConnectionInterrupted');
|
||||
return;
|
||||
}
|
||||
this.resource.onConnectionInterrupted();
|
||||
this.resource = undefined;
|
||||
},
|
||||
};
|
||||
return connectLibsignal(
|
||||
|
@ -387,33 +420,40 @@ export function connectAuthenticatedLibsignal({
|
|||
receiveStories,
|
||||
listener
|
||||
),
|
||||
name
|
||||
listener,
|
||||
logId
|
||||
);
|
||||
}
|
||||
|
||||
function logDisconnectedListenerWarn(logId: string, method: string): void {
|
||||
log.warn(`${logId} received ${method}, but listener already disconnected`);
|
||||
}
|
||||
|
||||
function connectLibsignal(
|
||||
chatService: Net.ChatService,
|
||||
name: string
|
||||
resourceHolder: LibsignalWebSocketResourceHolder,
|
||||
logId: string
|
||||
): AbortableProcess<LibsignalWebSocketResource> {
|
||||
const connectAsync = async () => {
|
||||
try {
|
||||
const debugInfo = await chatService.connect();
|
||||
log.info(`LibsignalWebSocketResource(${name}) connected`, debugInfo);
|
||||
return new LibsignalWebSocketResource(
|
||||
log.info(`${logId} connected`, debugInfo);
|
||||
const resource = new LibsignalWebSocketResource(
|
||||
chatService,
|
||||
IpVersion.fromDebugInfoCode(debugInfo.ipType)
|
||||
IpVersion.fromDebugInfoCode(debugInfo.ipType),
|
||||
logId
|
||||
);
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
resourceHolder.resource = resource;
|
||||
return resource;
|
||||
} catch (error) {
|
||||
// Handle any errors that occur during connection
|
||||
log.error(
|
||||
`LibsignalWebSocketResource(${name}) connection failed`,
|
||||
Errors.toLogFormat(error)
|
||||
);
|
||||
log.error(`${logId} connection failed`, Errors.toLogFormat(error));
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
return new AbortableProcess<LibsignalWebSocketResource>(
|
||||
`LibsignalWebSocketResource.connect(${name})`,
|
||||
`${logId}.connect`,
|
||||
{
|
||||
abort() {
|
||||
// if interrupted, trying to disconnect
|
||||
|
@ -428,9 +468,12 @@ export class LibsignalWebSocketResource
|
|||
extends EventTarget
|
||||
implements IWebSocketResource
|
||||
{
|
||||
closed = false;
|
||||
|
||||
constructor(
|
||||
private readonly chatService: Net.ChatService,
|
||||
private readonly socketIpVersion: IpVersion | undefined
|
||||
private readonly socketIpVersion: IpVersion | undefined,
|
||||
private readonly logId: string
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
@ -452,12 +495,39 @@ export class LibsignalWebSocketResource
|
|||
return super.addEventListener(name, handler);
|
||||
}
|
||||
|
||||
public close(_code?: number, _reason?: string): void {
|
||||
public close(code = 3000, reason?: string): void {
|
||||
if (this.closed) {
|
||||
log.info(`${this.logId}.close: Already closed! ${code}/${reason}`);
|
||||
return;
|
||||
}
|
||||
drop(this.chatService.disconnect());
|
||||
|
||||
// On linux the socket can wait a long time to emit its close event if we've
|
||||
// lost the internet connection. On the order of minutes. This speeds that
|
||||
// process up.
|
||||
Timers.setTimeout(
|
||||
() => this.onConnectionInterrupted(),
|
||||
5 * durations.SECOND
|
||||
);
|
||||
}
|
||||
|
||||
public shutdown(): void {
|
||||
drop(this.chatService.disconnect());
|
||||
this.close(3000, 'Shutdown');
|
||||
}
|
||||
|
||||
onConnectionInterrupted(): void {
|
||||
if (this.closed) {
|
||||
log.warn(
|
||||
`${this.logId}.onConnectionInterrupted called after resource is closed`
|
||||
);
|
||||
return;
|
||||
}
|
||||
this.closed = true;
|
||||
log.warn(`${this.logId}: connection closed`);
|
||||
// TODO: DESKTOP-7519. `reason` should be eventually resolved from the
|
||||
// disconnect reason error object coming from libsignal.
|
||||
const reason = undefined;
|
||||
this.dispatchEvent(new CloseEvent(3000, reason || 'normal'));
|
||||
}
|
||||
|
||||
public forceKeepAlive(): void {
|
||||
|
@ -627,12 +697,11 @@ export class WebSocketResourceWithShadowing implements IWebSocketResource {
|
|||
return;
|
||||
}
|
||||
try {
|
||||
const [healthCheckResult, debugInfo] =
|
||||
await this.shadowing.sendRequestGetDebugInfo({
|
||||
verb: 'GET',
|
||||
path: '/v1/keepalive',
|
||||
timeout: KEEPALIVE_TIMEOUT_MS,
|
||||
});
|
||||
const healthCheckResult = await this.shadowing.sendRequest({
|
||||
verb: 'GET',
|
||||
path: '/v1/keepalive',
|
||||
timeout: KEEPALIVE_TIMEOUT_MS,
|
||||
});
|
||||
this.stats.requestsCompared += 1;
|
||||
if (!isSuccessfulStatusCode(healthCheckResult.status)) {
|
||||
this.stats.healthcheckBadStatus += 1;
|
||||
|
@ -640,7 +709,6 @@ export class WebSocketResourceWithShadowing implements IWebSocketResource {
|
|||
`${this.logId}: keepalive via libsignal responded with status [${healthCheckResult.status}]`
|
||||
);
|
||||
}
|
||||
this.stats.unexpectedReconnects = debugInfo.reconnectCount;
|
||||
} catch (error) {
|
||||
this.stats.healthcheckFailures += 1;
|
||||
log.warn(
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue