Move to websocket for requests to signal server

This commit is contained in:
Fedor Indutny 2021-07-28 14:37:09 -07:00 committed by GitHub
parent 8449f343a6
commit 1c1d0e2da0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
31 changed files with 1892 additions and 1336 deletions

View file

@ -14,15 +14,9 @@ const fakeAPI = {
getAvatar: fakeCall,
getDevices: fakeCall,
// getKeysForIdentifier : fakeCall,
getMessageSocket: async () => ({
on() {},
removeListener() {},
close() {},
sendBytes() {},
}),
getMyKeys: fakeCall,
getProfile: fakeCall,
getProvisioningSocket: fakeCall,
getProvisioningResource: fakeCall,
putAttachment: fakeCall,
registerKeys: fakeCall,
requestVerificationSMS: fakeCall,

View file

@ -1,75 +0,0 @@
// Copyright 2017-2020 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
/* global textsecure */
describe('createTaskWithTimeout', () => {
it('resolves when promise resolves', () => {
const task = () => Promise.resolve('hi!');
const taskWithTimeout = textsecure.createTaskWithTimeout(task);
return taskWithTimeout().then(result => {
assert.strictEqual(result, 'hi!');
});
});
it('flows error from promise back', () => {
const error = new Error('original');
const task = () => Promise.reject(error);
const taskWithTimeout = textsecure.createTaskWithTimeout(task);
return taskWithTimeout().catch(flowedError => {
assert.strictEqual(error, flowedError);
});
});
it('rejects if promise takes too long (this one logs error to console)', () => {
let complete = false;
const task = () =>
new Promise(resolve => {
setTimeout(() => {
complete = true;
resolve();
}, 3000);
});
const taskWithTimeout = textsecure.createTaskWithTimeout(task, this.name, {
timeout: 10,
});
return taskWithTimeout().then(
() => {
throw new Error('it was not supposed to resolve!');
},
() => {
assert.strictEqual(complete, false);
}
);
});
it('resolves if task returns something falsey', () => {
const task = () => {};
const taskWithTimeout = textsecure.createTaskWithTimeout(task);
return taskWithTimeout();
});
it('resolves if task returns a non-promise', () => {
const task = () => 'hi!';
const taskWithTimeout = textsecure.createTaskWithTimeout(task);
return taskWithTimeout().then(result => {
assert.strictEqual(result, 'hi!');
});
});
it('rejects if task throws (and does not log about taking too long)', () => {
const error = new Error('Task is throwing!');
const task = () => {
throw error;
};
const taskWithTimeout = textsecure.createTaskWithTimeout(task, this.name, {
timeout: 10,
});
return taskWithTimeout().then(
() => {
throw new Error('Overall task should reject!');
},
flowedError => {
assert.strictEqual(flowedError, error);
}
);
});
});

View file

@ -107,7 +107,7 @@ try {
let connectStartTime = 0;
window.logMessageReceiverConnect = () => {
window.logAuthenticatedConnect = () => {
if (connectStartTime === 0) {
connectStartTime = Date.now();
}

View file

@ -135,10 +135,12 @@ export async function startApp(): Promise<void> {
// Initialize WebAPI as early as possible
let server: WebAPIType | undefined;
let messageReceiver: MessageReceiver | undefined;
window.storage.onready(() => {
server = window.WebAPI.connect(
window.textsecure.storage.user.getWebAPICredentials()
);
window.textsecure.server = server;
window.textsecure.storage.user.on('credentialsChange', async () => {
strictAssert(server !== undefined, 'WebAPI not ready');
@ -150,6 +152,122 @@ export async function startApp(): Promise<void> {
initializeAllJobQueues({
server,
});
window.log.info('Initializing MessageReceiver');
messageReceiver = new MessageReceiver({
server,
storage: window.storage,
serverTrustRoot: window.getServerTrustRoot(),
});
// eslint-disable-next-line no-inner-declarations
function queuedEventListener<Args extends Array<unknown>>(
handler: (...args: Args) => Promise<void> | void,
track = true
): (...args: Args) => void {
return (...args: Args): void => {
eventHandlerQueue.add(async () => {
try {
await handler(...args);
} finally {
// message/sent: Message.handleDataMessage has its own queue and will
// trigger this event itself when complete.
// error: Error processing (below) also has its own queue and self-trigger.
if (track) {
window.Whisper.events.trigger('incrementProgress');
}
}
});
};
}
messageReceiver.addEventListener(
'message',
queuedEventListener(onMessageReceived, false)
);
messageReceiver.addEventListener(
'delivery',
queuedEventListener(onDeliveryReceipt)
);
messageReceiver.addEventListener(
'contact',
queuedEventListener(onContactReceived)
);
messageReceiver.addEventListener(
'contactSync',
queuedEventListener(onContactSyncComplete)
);
messageReceiver.addEventListener(
'group',
queuedEventListener(onGroupReceived)
);
messageReceiver.addEventListener(
'groupSync',
queuedEventListener(onGroupSyncComplete)
);
messageReceiver.addEventListener(
'sent',
queuedEventListener(onSentMessage, false)
);
messageReceiver.addEventListener(
'readSync',
queuedEventListener(onReadSync)
);
messageReceiver.addEventListener(
'read',
queuedEventListener(onReadReceipt)
);
messageReceiver.addEventListener(
'view',
queuedEventListener(onViewReceipt)
);
messageReceiver.addEventListener(
'verified',
queuedEventListener(onVerified)
);
messageReceiver.addEventListener(
'error',
queuedEventListener(onError, false)
);
messageReceiver.addEventListener(
'decryption-error',
queuedEventListener(onDecryptionError)
);
messageReceiver.addEventListener(
'retry-request',
queuedEventListener(onRetryRequest)
);
messageReceiver.addEventListener('empty', queuedEventListener(onEmpty));
messageReceiver.addEventListener(
'reconnect',
queuedEventListener(onReconnect)
);
messageReceiver.addEventListener(
'configuration',
queuedEventListener(onConfiguration)
);
messageReceiver.addEventListener('typing', queuedEventListener(onTyping));
messageReceiver.addEventListener(
'sticker-pack',
queuedEventListener(onStickerPack)
);
messageReceiver.addEventListener(
'viewOnceOpenSync',
queuedEventListener(onViewOnceOpenSync)
);
messageReceiver.addEventListener(
'messageRequestResponse',
queuedEventListener(onMessageRequestResponse)
);
messageReceiver.addEventListener(
'profileKeyUpdate',
queuedEventListener(onProfileKeyUpdate)
);
messageReceiver.addEventListener(
'fetchLatest',
queuedEventListener(onFetchLatestSync)
);
messageReceiver.addEventListener('keys', queuedEventListener(onKeysSync));
});
ourProfileKeyService.initialize(window.storage);
@ -378,16 +496,11 @@ export async function startApp(): Promise<void> {
window.getAccountManager().refreshPreKeys();
});
let messageReceiver: MessageReceiver | undefined;
let preMessageReceiverStatus: SocketStatus | undefined;
window.getSocketStatus = () => {
if (messageReceiver) {
return messageReceiver.getStatus();
if (server === undefined) {
return SocketStatus.CLOSED;
}
if (preMessageReceiverStatus) {
return preMessageReceiverStatus;
}
return SocketStatus.CLOSED;
return server.getSocketStatus();
};
let accountManager: typeof window.textsecure.AccountManager;
window.getAccountManager = () => {
@ -638,15 +751,15 @@ export async function startApp(): Promise<void> {
// Stop processing incoming messages
if (messageReceiver) {
strictAssert(
server !== undefined,
'WebAPI should be initialized together with MessageReceiver'
);
server.unregisterRequestHandler(messageReceiver);
await messageReceiver.stopProcessing();
await window.waitForAllBatchers();
}
if (messageReceiver) {
messageReceiver.unregisterBatchers();
messageReceiver = undefined;
}
// A number of still-to-queue database queries might be waiting inside batchers.
// We wait for these to empty first, and then shut down the data interface.
await Promise.all([
@ -1658,27 +1771,21 @@ export async function startApp(): Promise<void> {
window.Whisper.events.on('powerMonitorResume', () => {
window.log.info('powerMonitor: resume');
if (!messageReceiver) {
return;
}
messageReceiver.checkSocket();
server?.checkSockets();
});
const reconnectToWebSocketQueue = new LatestQueue();
const enqueueReconnectToWebSocket = () => {
reconnectToWebSocketQueue.add(async () => {
if (!messageReceiver) {
window.log.info(
'reconnectToWebSocket: No messageReceiver. Early return.'
);
if (!server) {
window.log.info('reconnectToWebSocket: No server. Early return.');
return;
}
window.log.info('reconnectToWebSocket starting...');
await disconnect();
connect();
await server.onOffline();
await server.onOnline();
window.log.info('reconnectToWebSocket complete.');
});
};
@ -1688,6 +1795,8 @@ export async function startApp(): Promise<void> {
window._.debounce(enqueueReconnectToWebSocket, 1000, { maxWait: 5000 })
);
window.Whisper.events.on('unlinkAndDisconnect', unlinkAndDisconnect);
function runStorageService() {
window.Signal.Services.enableStorageService();
@ -2011,8 +2120,13 @@ export async function startApp(): Promise<void> {
disconnectTimer = undefined;
AttachmentDownloads.stop();
if (messageReceiver) {
await messageReceiver.close();
if (server !== undefined) {
strictAssert(
messageReceiver !== undefined,
'WebAPI should be initialized together with MessageReceiver'
);
await server.onOffline();
await messageReceiver.drain();
}
}
@ -2056,19 +2170,6 @@ export async function startApp(): Promise<void> {
return;
}
preMessageReceiverStatus = SocketStatus.CONNECTING;
if (messageReceiver) {
await messageReceiver.stopProcessing();
await window.waitForAllBatchers();
}
if (messageReceiver) {
messageReceiver.unregisterBatchers();
messageReceiver = undefined;
}
window.textsecure.messaging = new window.textsecure.MessageSender(server);
if (connectCount === 0) {
@ -2115,132 +2216,20 @@ export async function startApp(): Promise<void> {
window.Whisper.deliveryReceiptQueue.pause();
window.Whisper.Notifications.disable();
// initialize the socket and start listening for messages
window.log.info('Initializing socket and listening for messages');
const messageReceiverOptions = {
serverTrustRoot: window.getServerTrustRoot(),
};
messageReceiver = new window.textsecure.MessageReceiver(
server,
messageReceiverOptions
);
window.textsecure.messageReceiver = messageReceiver;
window.Signal.Services.initializeGroupCredentialFetcher();
preMessageReceiverStatus = undefined;
strictAssert(server !== undefined, 'WebAPI not initialized');
strictAssert(
messageReceiver !== undefined,
'MessageReceiver not initialized'
);
messageReceiver.reset();
server.registerRequestHandler(messageReceiver);
// eslint-disable-next-line no-inner-declarations
function queuedEventListener<Args extends Array<unknown>>(
handler: (...args: Args) => Promise<void> | void,
track = true
): (...args: Args) => void {
return (...args: Args): void => {
eventHandlerQueue.add(async () => {
try {
await handler(...args);
} finally {
// message/sent: Message.handleDataMessage has its own queue and will
// trigger this event itself when complete.
// error: Error processing (below) also has its own queue and self-trigger.
if (track) {
window.Whisper.events.trigger('incrementProgress');
}
}
});
};
}
messageReceiver.addEventListener(
'message',
queuedEventListener(onMessageReceived, false)
);
messageReceiver.addEventListener(
'delivery',
queuedEventListener(onDeliveryReceipt)
);
messageReceiver.addEventListener(
'contact',
queuedEventListener(onContactReceived)
);
messageReceiver.addEventListener(
'contactSync',
queuedEventListener(onContactSyncComplete)
);
messageReceiver.addEventListener(
'group',
queuedEventListener(onGroupReceived)
);
messageReceiver.addEventListener(
'groupSync',
queuedEventListener(onGroupSyncComplete)
);
messageReceiver.addEventListener(
'sent',
queuedEventListener(onSentMessage, false)
);
messageReceiver.addEventListener(
'readSync',
queuedEventListener(onReadSync)
);
messageReceiver.addEventListener(
'read',
queuedEventListener(onReadReceipt)
);
messageReceiver.addEventListener(
'view',
queuedEventListener(onViewReceipt)
);
messageReceiver.addEventListener(
'verified',
queuedEventListener(onVerified)
);
messageReceiver.addEventListener(
'error',
queuedEventListener(onError, false)
);
messageReceiver.addEventListener(
'decryption-error',
queuedEventListener(onDecryptionError)
);
messageReceiver.addEventListener(
'retry-request',
queuedEventListener(onRetryRequest)
);
messageReceiver.addEventListener('empty', queuedEventListener(onEmpty));
messageReceiver.addEventListener(
'reconnect',
queuedEventListener(onReconnect)
);
messageReceiver.addEventListener(
'configuration',
queuedEventListener(onConfiguration)
);
messageReceiver.addEventListener('typing', queuedEventListener(onTyping));
messageReceiver.addEventListener(
'sticker-pack',
queuedEventListener(onStickerPack)
);
messageReceiver.addEventListener(
'viewOnceOpenSync',
queuedEventListener(onViewOnceOpenSync)
);
messageReceiver.addEventListener(
'messageRequestResponse',
queuedEventListener(onMessageRequestResponse)
);
messageReceiver.addEventListener(
'profileKeyUpdate',
queuedEventListener(onProfileKeyUpdate)
);
messageReceiver.addEventListener(
'fetchLatest',
queuedEventListener(onFetchLatestSync)
);
messageReceiver.addEventListener('keys', queuedEventListener(onKeysSync));
// If coming here after `offline` event - connect again.
await server.onOnline();
AttachmentDownloads.start({
getMessageReceiver: () => messageReceiver,
logger: window.log,
});
@ -3511,16 +3500,12 @@ export async function startApp(): Promise<void> {
window.Whisper.events.trigger('unauthorized');
if (messageReceiver) {
strictAssert(server !== undefined, 'WebAPI not initialized');
server.unregisterRequestHandler(messageReceiver);
await messageReceiver.stopProcessing();
await window.waitForAllBatchers();
}
if (messageReceiver) {
messageReceiver.unregisterBatchers();
messageReceiver = undefined;
}
onEmpty();
window.log.warn(

View file

@ -1689,8 +1689,8 @@ export async function createGroupV2({
}
);
await conversation.queueJob('storageServiceUploadJob', () => {
window.Signal.Services.storageServiceUploadJob();
await conversation.queueJob('storageServiceUploadJob', async () => {
await window.Signal.Services.storageServiceUploadJob();
});
const timestamp = Date.now();

View file

@ -14,6 +14,7 @@ import {
import * as Bytes from '../Bytes';
import { longRunningTaskWrapper } from '../util/longRunningTaskWrapper';
import { isGroupV1 } from '../util/whatTypeOfConversation';
import { explodePromise } from '../util/explodePromise';
import type { ConversationAttributesType } from '../model-types.d';
import type { ConversationModel } from '../models/conversations';
@ -175,7 +176,7 @@ export async function joinViaLink(hash: string): Promise<void> {
};
// Explode a promise so we know when this whole join process is complete
const { promise, resolve, reject } = explodePromise();
const { promise, resolve, reject } = explodePromise<void>();
const closeDialog = async () => {
try {
@ -405,26 +406,3 @@ function showErrorDialog(description: string, title: string) {
},
});
}
function explodePromise(): {
promise: Promise<void>;
resolve: () => void;
reject: (error: Error) => void;
} {
let resolve: () => void;
let reject: (error: Error) => void;
const promise = new Promise<void>((innerResolve, innerReject) => {
resolve = innerResolve;
reject = innerReject;
});
return {
promise,
// Typescript thinks that resolve and reject can be undefined here.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
resolve: resolve!,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
reject: reject!,
};
}

View file

@ -1,13 +1,12 @@
// Copyright 2019-2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { isFunction, isNumber, omit } from 'lodash';
import { isNumber, omit } from 'lodash';
import { v4 as getGuid } from 'uuid';
import dataInterface from '../sql/Client';
import { downloadAttachment } from '../util/downloadAttachment';
import { stringFromBytes } from '../Crypto';
import MessageReceiver from '../textsecure/MessageReceiver';
import {
AttachmentDownloadJobType,
AttachmentDownloadJobTypeType,
@ -42,7 +41,6 @@ const RETRY_BACKOFF: Record<number, number> = {
let enabled = false;
let timeout: NodeJS.Timeout | null;
let getMessageReceiver: () => MessageReceiver | undefined;
let logger: LoggerType;
const _activeAttachmentDownloadJobs: Record<
string,
@ -50,17 +48,11 @@ const _activeAttachmentDownloadJobs: Record<
> = {};
type StartOptionsType = {
getMessageReceiver: () => MessageReceiver | undefined;
logger: LoggerType;
};
export async function start(options: StartOptionsType): Promise<void> {
({ getMessageReceiver, logger } = options);
if (!isFunction(getMessageReceiver)) {
throw new Error(
'attachment_downloads/start: getMessageReceiver must be a function'
);
}
({ logger } = options);
if (!logger) {
throw new Error('attachment_downloads/start: logger must be provided!');
}
@ -220,11 +212,6 @@ async function _runJob(job?: AttachmentDownloadJobType): Promise<void> {
const pending = true;
await setAttachmentDownloadJobPending(id, pending);
const messageReceiver = getMessageReceiver();
if (!messageReceiver) {
throw new Error('_runJob: messageReceiver not found');
}
const downloaded = await downloadAttachment(attachment);
if (!downloaded) {

View file

@ -6,17 +6,9 @@
import { Collection, Model } from 'backbone';
import { MessageModel } from '../models/messages';
import { isOutgoing } from '../state/selectors/message';
import { ReactionAttributesType } from '../model-types.d';
type ReactionsAttributesType = {
emoji: string;
remove: boolean;
targetAuthorUuid: string;
targetTimestamp: number;
timestamp: number;
fromId: string;
};
export class ReactionModel extends Model<ReactionsAttributesType> {}
export class ReactionModel extends Model<ReactionAttributesType> {}
let singleton: Reactions | undefined;
@ -63,7 +55,7 @@ export class Reactions extends Collection {
async onReaction(
reaction: ReactionModel
): Promise<ReactionModel | undefined> {
): Promise<ReactionAttributesType | undefined> {
try {
// The conversation the target message was in; we have to find it in the database
// to to figure that out.

View file

@ -16,6 +16,7 @@ import { AttachmentType } from '../types/Attachment';
import { CallMode, CallHistoryDetailsType } from '../types/Calling';
import * as Stickers from '../types/Stickers';
import { GroupV2InfoType } from '../textsecure/SendMessage';
import createTaskWithTimeout from '../textsecure/TaskWithTimeout';
import { CallbackResultType } from '../textsecure/Types.d';
import { ConversationType } from '../state/ducks/conversations';
import {
@ -1058,7 +1059,7 @@ export class ConversationModel extends window.Backbone
this.setRegistered();
// If we couldn't apply universal timer before - try it again.
this.queueJob('maybeSetPendingUniversalTimer', () =>
this.queueJob('maybeSetPendingUniversalTimer', async () =>
this.maybeSetPendingUniversalTimer()
);
}
@ -2893,13 +2894,10 @@ export class ConversationModel extends window.Backbone
return null;
}
queueJob(
name: string,
callback: () => unknown | Promise<unknown>
): Promise<WhatIsThis> {
queueJob<T>(name: string, callback: () => Promise<T>): Promise<T> {
this.jobQueue = this.jobQueue || new window.PQueue({ concurrency: 1 });
const taskWithTimeout = window.textsecure.createTaskWithTimeout(
const taskWithTimeout = createTaskWithTimeout(
callback,
`conversation ${this.idForLogging()}`
);
@ -3231,7 +3229,7 @@ export class ConversationModel extends window.Backbone
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const destination = this.getSendTarget()!;
return this.queueJob('sendDeleteForEveryone', async () => {
await this.queueJob('sendDeleteForEveryone', async () => {
window.log.info(
'Sending deleteForEveryone to conversation',
this.idForLogging(),
@ -3782,7 +3780,7 @@ export class ConversationModel extends window.Backbone
return;
}
this.queueJob('maybeSetPendingUniversalTimer', () =>
this.queueJob('maybeSetPendingUniversalTimer', async () =>
this.maybeSetPendingUniversalTimer()
);
@ -4804,7 +4802,7 @@ export class ConversationModel extends window.Backbone
);
this.set({ needsStorageServiceSync: true });
this.queueJob('captureChange', () => {
this.queueJob('captureChange', async () => {
Services.storageServiceUploadJob();
});
}

View file

@ -0,0 +1,47 @@
// Copyright 2017-2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { assert } from 'chai';
import { sleep } from '../util/sleep';
import createTaskWithTimeout from '../textsecure/TaskWithTimeout';
describe('createTaskWithTimeout', () => {
it('resolves when promise resolves', async () => {
const task = () => Promise.resolve('hi!');
const taskWithTimeout = createTaskWithTimeout(task, 'test');
const result = await taskWithTimeout();
assert.strictEqual(result, 'hi!');
});
it('flows error from promise back', async () => {
const error = new Error('original');
const task = () => Promise.reject(error);
const taskWithTimeout = createTaskWithTimeout(task, 'test');
await assert.isRejected(taskWithTimeout(), 'original');
});
it('rejects if promise takes too long (this one logs error to console)', async () => {
const task = async () => {
await sleep(3000);
};
const taskWithTimeout = createTaskWithTimeout(task, 'test', {
timeout: 10,
});
await assert.isRejected(taskWithTimeout());
});
it('rejects if task throws (and does not log about taking too long)', async () => {
const error = new Error('Task is throwing!');
const task = () => {
throw error;
};
const taskWithTimeout = createTaskWithTimeout(task, 'test', {
timeout: 10,
});
await assert.isRejected(taskWithTimeout(), 'Task is throwing!');
});
});

View file

@ -0,0 +1,46 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { assert } from 'chai';
import { noop } from 'lodash';
import { AbortableProcess } from '../../util/AbortableProcess';
describe('AbortableProcess', () => {
it('resolves the result normally', async () => {
const process = new AbortableProcess(
'process',
{ abort: noop },
Promise.resolve(42)
);
assert.strictEqual(await process.getResult(), 42);
});
it('rejects normally', async () => {
const process = new AbortableProcess(
'process',
{ abort: noop },
Promise.reject(new Error('rejected'))
);
await assert.isRejected(process.getResult(), 'rejected');
});
it('rejects on abort', async () => {
let calledAbort = false;
const process = new AbortableProcess(
'A',
{
abort() {
calledAbort = true;
},
},
new Promise(noop)
);
process.abort();
await assert.isRejected(process.getResult(), 'Process "A" was aborted');
assert.isTrue(calledAbort);
});
});

View file

@ -0,0 +1,26 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
/* eslint-disable no-restricted-syntax */
import { assert } from 'chai';
import { explodePromise } from '../../util/explodePromise';
describe('explodePromise', () => {
it('resolves the promise', async () => {
const { promise, resolve } = explodePromise<number>();
resolve(42);
assert.strictEqual(await promise, 42);
});
it('rejects the promise', async () => {
const { promise, reject } = explodePromise<number>();
reject(new Error('rejected'));
await assert.isRejected(promise, 'rejected');
});
});

View file

@ -6,12 +6,11 @@
*/
import { assert } from 'chai';
import EventEmitter from 'events';
import { connection as WebSocket } from 'websocket';
import MessageReceiver from '../textsecure/MessageReceiver';
import { DecryptionErrorEvent } from '../textsecure/messageReceiverEvents';
import { IncomingWebSocketRequest } from '../textsecure/WebsocketResources';
import { WebAPIType } from '../textsecure/WebAPI';
import { DecryptionErrorEvent } from '../textsecure/messageReceiverEvents';
import { SignalService as Proto } from '../protobuf';
import * as Crypto from '../Crypto';
@ -19,23 +18,16 @@ import * as Crypto from '../Crypto';
const FIXMEU8 = Uint8Array;
describe('MessageReceiver', () => {
class FakeSocket extends EventEmitter {
public sendBytes(_: Uint8Array) {}
public close() {}
}
const number = '+19999999999';
const uuid = 'aaaaaaaa-bbbb-4ccc-9ddd-eeeeeeeeeeee';
const deviceId = 1;
describe('connecting', () => {
it('generates decryption-error event when it cannot decrypt', done => {
const socket = new FakeSocket();
const messageReceiver = new MessageReceiver({} as WebAPIType, {
const messageReceiver = new MessageReceiver({
server: {} as WebAPIType,
storage: window.storage,
serverTrustRoot: 'AAAAAAAA',
socket: socket as WebSocket,
});
const body = Proto.Envelope.encode({
@ -47,15 +39,18 @@ describe('MessageReceiver', () => {
content: new FIXMEU8(Crypto.getRandomBytes(200)),
}).finish();
const message = Proto.WebSocketMessage.encode({
type: Proto.WebSocketMessage.Type.REQUEST,
request: { id: 1, verb: 'PUT', path: '/api/v1/message', body },
}).finish();
socket.emit('message', {
type: 'binary',
binaryData: message,
});
messageReceiver.handleRequest(
new IncomingWebSocketRequest(
{
id: 1,
verb: 'PUT',
path: '/api/v1/message',
body,
headers: [],
},
(_: Buffer): void => {}
)
);
messageReceiver.addEventListener(
'decryption-error',

View file

@ -85,7 +85,7 @@ describe('WebSocket-Resource', () => {
});
});
it('sends requests and receives responses', done => {
it('sends requests and receives responses', async () => {
// mock socket and request handler
let requestId: number | Long | undefined;
const socket = new FakeSocket();
@ -101,16 +101,10 @@ describe('WebSocket-Resource', () => {
// actual test
const resource = new WebSocketResource(socket as WebSocket);
resource.sendRequest({
const promise = resource.sendRequest({
verb: 'PUT',
path: '/some/path',
body: new Uint8Array([1, 2, 3]),
error: done,
success(message: string, status: number) {
assert.strictEqual(message, 'OK');
assert.strictEqual(status, 200);
done();
},
});
// mock socket response
@ -121,6 +115,10 @@ describe('WebSocket-Resource', () => {
response: { id: requestId, message: 'OK', status: 200 },
}).finish(),
});
const { status, message } = await promise;
assert.strictEqual(message, 'OK');
assert.strictEqual(status, 200);
});
});

9
ts/textsecure.d.ts vendored
View file

@ -4,7 +4,6 @@
import { UnidentifiedSenderMessageContent } from '@signalapp/signal-client';
import Crypto from './textsecure/Crypto';
import MessageReceiver from './textsecure/MessageReceiver';
import MessageSender from './textsecure/SendMessage';
import SyncRequest from './textsecure/SyncRequest';
import EventTarget from './textsecure/EventTarget';
@ -38,20 +37,14 @@ export type UnprocessedType = {
export { StorageServiceCallOptionsType, StorageServiceCredentials };
export type TextSecureType = {
createTaskWithTimeout: (
task: () => Promise<any> | any,
id?: string,
options?: { timeout?: number }
) => () => Promise<any>;
crypto: typeof Crypto;
storage: Storage;
messageReceiver: MessageReceiver;
server: WebAPIType;
messageSender: MessageSender;
messaging: SendMessage;
utils: typeof utils;
EventTarget: typeof EventTarget;
MessageReceiver: typeof MessageReceiver;
AccountManager: WhatIsThis;
MessageSender: typeof MessageSender;
SyncRequest: typeof SyncRequest;

View file

@ -13,9 +13,8 @@ import { WebAPIType } from './WebAPI';
import { KeyPairType, CompatSignedPreKeyType } from './Types.d';
import utils from './Helpers';
import ProvisioningCipher from './ProvisioningCipher';
import WebSocketResource, {
IncomingWebSocketRequest,
} from './WebsocketResources';
import { IncomingWebSocketRequest } from './WebsocketResources';
import createTaskWithTimeout from './TaskWithTimeout';
import * as Bytes from '../Bytes';
import {
deriveAccessKey,
@ -192,112 +191,102 @@ export default class AccountManager extends EventTarget {
SIGNED_KEY_GEN_BATCH_SIZE,
progressCallback
);
const confirmKeys = this.confirmKeys.bind(this);
const registrationDone = this.registrationDone.bind(this);
const registerKeys = this.server.registerKeys.bind(this.server);
const getSocket = this.server.getProvisioningSocket.bind(this.server);
const queueTask = this.queueTask.bind(this);
const provisioningCipher = new ProvisioningCipher();
let gotProvisionEnvelope = false;
const pubKey = await provisioningCipher.getPublicKey();
const socket = await getSocket();
let envelopeCallbacks:
| {
resolve(data: Proto.ProvisionEnvelope): void;
reject(error: Error): void;
}
| undefined;
const envelopePromise = new Promise<Proto.ProvisionEnvelope>(
(resolve, reject) => {
envelopeCallbacks = { resolve, reject };
}
);
const wsr = await this.server.getProvisioningResource({
handleRequest(request: IncomingWebSocketRequest) {
if (
request.path === '/v1/address' &&
request.verb === 'PUT' &&
request.body
) {
const proto = Proto.ProvisioningUuid.decode(request.body);
const { uuid } = proto;
if (!uuid) {
throw new Error('registerSecondDevice: expected a UUID');
}
const url = getProvisioningUrl(uuid, pubKey);
if (window.CI) {
window.CI.setProvisioningURL(url);
}
setProvisioningUrl(url);
request.respond(200, 'OK');
} else if (
request.path === '/v1/message' &&
request.verb === 'PUT' &&
request.body
) {
const envelope = Proto.ProvisionEnvelope.decode(request.body);
request.respond(200, 'OK');
wsr.close();
envelopeCallbacks?.resolve(envelope);
} else {
window.log.error('Unknown websocket message', request.path);
}
},
});
window.log.info('provisioning socket open');
return new Promise((resolve, reject) => {
socket.on('close', (code, reason) => {
window.log.info(
`provisioning socket closed. Code: ${code} Reason: ${reason}`
wsr.addEventListener('close', ({ code, reason }) => {
window.log.info(
`provisioning socket closed. Code: ${code} Reason: ${reason}`
);
// Note: if we have resolved the envelope already - this has no effect
envelopeCallbacks?.reject(new Error('websocket closed'));
});
const envelope = await envelopePromise;
const provisionMessage = await provisioningCipher.decrypt(envelope);
await this.queueTask(async () => {
const deviceName = await confirmNumber(provisionMessage.number);
if (typeof deviceName !== 'string' || deviceName.length === 0) {
throw new Error(
'AccountManager.registerSecondDevice: Invalid device name'
);
if (!gotProvisionEnvelope) {
reject(new Error('websocket closed'));
}
});
}
if (
!provisionMessage.number ||
!provisionMessage.provisioningCode ||
!provisionMessage.identityKeyPair
) {
throw new Error(
'AccountManager.registerSecondDevice: Provision message was missing key data'
);
}
const wsr = new WebSocketResource(socket, {
keepalive: { path: '/v1/keepalive/provisioning' },
handleRequest(request: IncomingWebSocketRequest) {
if (
request.path === '/v1/address' &&
request.verb === 'PUT' &&
request.body
) {
const proto = Proto.ProvisioningUuid.decode(request.body);
const { uuid } = proto;
if (!uuid) {
throw new Error('registerSecondDevice: expected a UUID');
}
const url = getProvisioningUrl(uuid, pubKey);
if (window.CI) {
window.CI.setProvisioningURL(url);
}
setProvisioningUrl(url);
request.respond(200, 'OK');
} else if (
request.path === '/v1/message' &&
request.verb === 'PUT' &&
request.body
) {
const envelope = Proto.ProvisionEnvelope.decode(request.body);
request.respond(200, 'OK');
gotProvisionEnvelope = true;
wsr.close();
resolve(
provisioningCipher
.decrypt(envelope)
.then(async provisionMessage =>
queueTask(async () =>
confirmNumber(provisionMessage.number).then(
async deviceName => {
if (
typeof deviceName !== 'string' ||
deviceName.length === 0
) {
throw new Error(
'AccountManager.registerSecondDevice: Invalid device name'
);
}
if (
!provisionMessage.number ||
!provisionMessage.provisioningCode ||
!provisionMessage.identityKeyPair
) {
throw new Error(
'AccountManager.registerSecondDevice: Provision message was missing key data'
);
}
return createAccount(
provisionMessage.number,
provisionMessage.provisioningCode,
provisionMessage.identityKeyPair,
provisionMessage.profileKey,
deviceName,
provisionMessage.userAgent,
provisionMessage.readReceipts,
{ uuid: provisionMessage.uuid }
)
.then(clearSessionsAndPreKeys)
.then(generateKeys)
.then(async (keys: GeneratedKeysType) =>
registerKeys(keys).then(async () =>
confirmKeys(keys)
)
)
.then(registrationDone);
}
)
)
)
);
} else {
window.log.error('Unknown websocket message', request.path);
}
},
});
await createAccount(
provisionMessage.number,
provisionMessage.provisioningCode,
provisionMessage.identityKeyPair,
provisionMessage.profileKey,
deviceName,
provisionMessage.userAgent,
provisionMessage.readReceipts,
{ uuid: provisionMessage.uuid }
);
await clearSessionsAndPreKeys();
const keys = await generateKeys();
await this.server.registerKeys(keys);
await this.confirmKeys(keys);
await this.registrationDone();
});
}
@ -416,7 +405,7 @@ export default class AccountManager extends EventTarget {
async queueTask(task: () => Promise<any>) {
this.pendingQueue = this.pendingQueue || new PQueue({ concurrency: 1 });
const taskWithTimeout = window.textsecure.createTaskWithTimeout(task);
const taskWithTimeout = createTaskWithTimeout(task, 'AccountManager task');
return this.pendingQueue.add(taskWithTimeout);
}
@ -633,6 +622,11 @@ export default class AccountManager extends EventTarget {
);
await window.textsecure.storage.put('regionCode', regionCode);
await window.textsecure.storage.protocol.hydrateCaches();
// We are finally ready to reconnect
window.textsecure.storage.user.emitCredentialsChanged(
'AccountManager.createAccount'
);
}
async clearSessionsAndPreKeys() {

View file

@ -13,6 +13,40 @@ function appendStack(newError: Error, originalError: Error) {
newError.stack += `\nOriginal stack:\n${originalError.stack}`;
}
export type HTTPErrorHeadersType = {
[name: string]: string | ReadonlyArray<string>;
};
export class HTTPError extends Error {
public readonly name = 'HTTPError';
public readonly code: number;
public readonly responseHeaders: HTTPErrorHeadersType;
public readonly response: unknown;
constructor(
message: string,
options: {
code: number;
headers: HTTPErrorHeadersType;
response?: unknown;
stack?: string;
}
) {
super(`${message}; code: ${options.code}`);
const { code: providedCode, headers, response, stack } = options;
this.code = providedCode > 999 || providedCode < 100 ? -1 : providedCode;
this.responseHeaders = headers;
this.stack += `\nOriginal stack:\n${stack}`;
this.response = response;
}
}
export class ReplayableError extends Error {
name: string;

View file

@ -6,10 +6,9 @@
/* eslint-disable camelcase */
/* eslint-disable no-restricted-syntax */
import { isNumber, map, omit } from 'lodash';
import { isNumber, map } from 'lodash';
import PQueue from 'p-queue';
import { v4 as getGuid } from 'uuid';
import { connection as WebSocket } from 'websocket';
import {
DecryptionErrorMessage,
@ -38,42 +37,37 @@ import {
SignedPreKeys,
} from '../LibSignalStores';
import { verifySignature } from '../Curve';
import { BackOff, FIBONACCI_TIMEOUTS } from '../util/BackOff';
import { strictAssert } from '../util/assert';
import { BatcherType, createBatcher } from '../util/batcher';
import { dropNull } from '../util/dropNull';
import { normalizeUuid } from '../util/normalizeUuid';
import { normalizeNumber } from '../util/normalizeNumber';
import { sleep } from '../util/sleep';
import { parseIntOrThrow } from '../util/parseIntOrThrow';
import { Zone } from '../util/Zone';
import { deriveMasterKeyFromGroupV1, typedArrayToArrayBuffer } from '../Crypto';
import { DownloadedAttachmentType } from '../types/Attachment';
import * as Errors from '../types/errors';
import { SignalService as Proto } from '../protobuf';
import { UnprocessedType } from '../textsecure.d';
import { deriveGroupFields, MASTER_KEY_LENGTH } from '../groups';
import createTaskWithTimeout from './TaskWithTimeout';
import { processAttachment, processDataMessage } from './processDataMessage';
import { processSyncMessage } from './processSyncMessage';
import EventTarget, { EventHandler } from './EventTarget';
import { WebAPIType } from './WebAPI';
import WebSocketResource, {
IncomingWebSocketRequest,
CloseEvent,
} from './WebsocketResources';
import { ConnectTimeoutError } from './Errors';
import * as Bytes from '../Bytes';
import Crypto from './Crypto';
import { deriveMasterKeyFromGroupV1, typedArrayToArrayBuffer } from '../Crypto';
import { downloadAttachment } from './downloadAttachment';
import { IncomingWebSocketRequest } from './WebsocketResources';
import { ContactBuffer, GroupBuffer } from './ContactsParser';
import { DownloadedAttachmentType } from '../types/Attachment';
import * as Errors from '../types/errors';
import * as MIME from '../types/MIME';
import { SocketStatus } from '../types/SocketStatus';
import { SignalService as Proto } from '../protobuf';
import { UnprocessedType } from '../textsecure.d';
import type { WebAPIType } from './WebAPI';
import type { Storage } from './Storage';
import * as Bytes from '../Bytes';
import {
ProcessedAttachment,
ProcessedDataMessage,
ProcessedSyncMessage,
ProcessedSent,
ProcessedEnvelope,
IRequestHandler,
} from './Types.d';
import {
ReconnectEvent,
@ -103,8 +97,6 @@ import {
GroupSyncEvent,
} from './messageReceiverEvents';
import { deriveGroupFields, MASTER_KEY_LENGTH } from '../groups';
// TODO: remove once we move away from ArrayBuffers
const FIXMEU8 = Uint8Array;
@ -150,12 +142,18 @@ enum TaskType {
Decrypted = 'Decrypted',
}
export default class MessageReceiver extends EventTarget {
private _onClose?: (code: number, reason: string) => Promise<void>;
export type MessageReceiverOptions = {
server: WebAPIType;
storage: Storage;
serverTrustRoot: string;
};
private _onWSRClose?: (event: CloseEvent) => void;
export default class MessageReceiver
extends EventTarget
implements IRequestHandler {
private server: WebAPIType;
private _onError?: (error: Error) => Promise<void>;
private storage: Storage;
private appQueue: PQueue;
@ -163,22 +161,14 @@ export default class MessageReceiver extends EventTarget {
private cacheRemoveBatcher: BatcherType<string>;
private calledClose?: boolean;
private count: number;
private processedCount: number;
private deviceId?: number;
private hasConnected = false;
private incomingQueue: PQueue;
private isEmptied?: boolean;
private number_id?: string;
private encryptedQueue: PQueue;
private decryptedQueue: PQueue;
@ -187,38 +177,21 @@ export default class MessageReceiver extends EventTarget {
private serverTrustRoot: Uint8Array;
private socket?: WebSocket;
private socketStatus = SocketStatus.CLOSED;
private stoppingProcessing?: boolean;
private uuid_id?: string;
private wsr?: WebSocketResource;
private readonly reconnectBackOff = new BackOff(FIBONACCI_TIMEOUTS);
constructor(
public readonly server: WebAPIType,
options: {
serverTrustRoot: string;
socket?: WebSocket;
}
) {
constructor({ server, storage, serverTrustRoot }: MessageReceiverOptions) {
super();
this.server = server;
this.storage = storage;
this.count = 0;
this.processedCount = 0;
if (!options.serverTrustRoot) {
if (!serverTrustRoot) {
throw new Error('Server trust root is required!');
}
this.serverTrustRoot = Bytes.fromBase64(options.serverTrustRoot);
this.number_id = window.textsecure.storage.user.getNumber();
this.uuid_id = window.textsecure.storage.user.getUuid();
this.deviceId = window.textsecure.storage.user.getDeviceId();
this.serverTrustRoot = Bytes.fromBase64(serverTrustRoot);
this.incomingQueue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 });
this.appQueue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 });
@ -249,174 +222,124 @@ export default class MessageReceiver extends EventTarget {
maxSize: 30,
processBatch: this.cacheRemoveBatch.bind(this),
});
this.connect(options.socket);
}
public async stopProcessing(): Promise<void> {
window.log.info('MessageReceiver: stopProcessing requested');
this.stoppingProcessing = true;
return this.close();
}
public unregisterBatchers(): void {
window.log.info('MessageReceiver: unregister batchers');
this.decryptAndCacheBatcher.unregister();
this.cacheRemoveBatcher.unregister();
}
public getProcessedCount(): number {
return this.processedCount;
}
private async connect(socket?: WebSocket): Promise<void> {
if (this.calledClose) {
public handleRequest(request: IncomingWebSocketRequest): void {
// We do the message decryption here, instead of in the ordered pending queue,
// to avoid exposing the time it took us to process messages through the time-to-ack.
window.log.info('MessageReceiver: got request', request.verb, request.path);
if (request.path !== '/api/v1/message') {
request.respond(200, 'OK');
if (request.verb === 'PUT' && request.path === '/api/v1/queue/empty') {
this.incomingQueue.add(() => {
this.onEmpty();
});
}
return;
}
const job = async () => {
const headers = request.headers || [];
if (!request.body) {
throw new Error(
'MessageReceiver.handleRequest: request.body was falsey!'
);
}
const plaintext = request.body;
try {
const decoded = Proto.Envelope.decode(plaintext);
const serverTimestamp = normalizeNumber(decoded.serverTimestamp);
const envelope: ProcessedEnvelope = {
// Make non-private envelope IDs dashless so they don't get redacted
// from logs
id: getGuid().replace(/-/g, ''),
receivedAtCounter: window.Signal.Util.incrementMessageCounter(),
receivedAtDate: Date.now(),
// Calculate the message age (time on server).
messageAgeSec: this.calculateMessageAge(headers, serverTimestamp),
// Proto.Envelope fields
type: decoded.type,
source: decoded.source,
sourceUuid: decoded.sourceUuid
? normalizeUuid(
decoded.sourceUuid,
'MessageReceiver.handleRequest.sourceUuid'
)
: undefined,
sourceDevice: decoded.sourceDevice,
timestamp: normalizeNumber(decoded.timestamp),
legacyMessage: dropNull(decoded.legacyMessage),
content: dropNull(decoded.content),
serverGuid: decoded.serverGuid,
serverTimestamp,
};
// After this point, decoding errors are not the server's
// fault, and we should handle them gracefully and tell the
// user they received an invalid message
if (envelope.source && this.isBlocked(envelope.source)) {
request.respond(200, 'OK');
return;
}
if (envelope.sourceUuid && this.isUuidBlocked(envelope.sourceUuid)) {
request.respond(200, 'OK');
return;
}
this.decryptAndCache(envelope, plaintext, request);
this.processedCount += 1;
} catch (e) {
request.respond(500, 'Bad encrypted websocket message');
window.log.error(
'Error handling incoming message:',
Errors.toLogFormat(e)
);
await this.dispatchAndWait(new ErrorEvent(e));
}
};
this.incomingQueue.add(job);
}
public reset(): void {
// We always process our cache before processing a new websocket message
this.incomingQueue.add(async () => this.queueAllCached());
this.count = 0;
if (this.hasConnected) {
this.dispatchEvent(new ReconnectEvent());
}
this.isEmptied = false;
this.hasConnected = true;
if (this.socket && this.socket.connected) {
this.socket.close();
this.socket = undefined;
if (this.wsr) {
this.wsr.close();
this.wsr = undefined;
}
}
this.socketStatus = SocketStatus.CONNECTING;
// initialize the socket and start listening for messages
try {
this.socket = socket || (await this.server.getMessageSocket());
} catch (error) {
this.socketStatus = SocketStatus.CLOSED;
if (error instanceof ConnectTimeoutError) {
await this.onclose(-1, 'Connection timed out');
return;
}
await this.dispatchAndWait(new ErrorEvent(error));
return;
}
this.socketStatus = SocketStatus.OPEN;
window.log.info('websocket open');
window.logMessageReceiverConnect();
if (!this._onClose) {
this._onClose = this.onclose.bind(this);
}
if (!this._onWSRClose) {
this._onWSRClose = ({ code, reason }: CloseEvent): void => {
this.onclose(code, reason);
};
}
if (!this._onError) {
this._onError = this.onerror.bind(this);
}
this.socket.on('close', this._onClose);
this.socket.on('error', this._onError);
this.wsr = new WebSocketResource(this.socket, {
handleRequest: this.handleRequest.bind(this),
keepalive: {
path: '/v1/keepalive',
disconnect: true,
},
});
// Because sometimes the socket doesn't properly emit its close event
if (this._onWSRClose) {
this.wsr.addEventListener('close', this._onWSRClose);
}
this.stoppingProcessing = false;
}
public async close(): Promise<void> {
window.log.info('MessageReceiver.close()');
this.calledClose = true;
this.socketStatus = SocketStatus.CLOSING;
// Our WebSocketResource instance will close the socket and emit a 'close' event
// if the socket doesn't emit one quickly enough.
if (this.wsr) {
this.wsr.close(3000, 'called close');
}
this.clearRetryTimeout();
return this.drain();
public stopProcessing(): void {
this.stoppingProcessing = true;
}
public checkSocket(): void {
if (this.wsr) {
this.wsr.forceKeepAlive();
}
public hasEmptied(): boolean {
return Boolean(this.isEmptied);
}
public getStatus(): SocketStatus {
return this.socketStatus;
}
public async drain(): Promise<void> {
const waitForEncryptedQueue = async () =>
this.addToQueue(async () => {
window.log.info('drained');
}, TaskType.Decrypted);
public async downloadAttachment(
attachment: ProcessedAttachment
): Promise<DownloadedAttachmentType> {
const cdnId = attachment.cdnId || attachment.cdnKey;
const { cdnNumber } = attachment;
const waitForIncomingQueue = async () =>
this.addToQueue(waitForEncryptedQueue, TaskType.Encrypted);
if (!cdnId) {
throw new Error('downloadAttachment: Attachment was missing cdnId!');
}
strictAssert(cdnId, 'attachment without cdnId');
const encrypted = await this.server.getAttachment(
cdnId,
dropNull(cdnNumber)
);
const { key, digest, size, contentType } = attachment;
if (!digest) {
throw new Error('Failure: Ask sender to update Signal and resend.');
}
strictAssert(key, 'attachment has no key');
strictAssert(digest, 'attachment has no digest');
const paddedData = await Crypto.decryptAttachment(
encrypted,
typedArrayToArrayBuffer(Bytes.fromBase64(key)),
typedArrayToArrayBuffer(Bytes.fromBase64(digest))
);
if (!isNumber(size)) {
throw new Error(
`downloadAttachment: Size was not provided, actual size was ${paddedData.byteLength}`
);
}
const data = window.Signal.Crypto.getFirstBytes(paddedData, size);
return {
...omit(attachment, 'digest', 'key'),
contentType: contentType
? MIME.fromString(contentType)
: MIME.APPLICATION_OCTET_STREAM,
data,
};
return this.incomingQueue.add(waitForIncomingQueue);
}
//
@ -548,157 +471,10 @@ export default class MessageReceiver extends EventTarget {
// Private
//
private shutdown(): void {
if (this.socket) {
if (this._onClose) {
this.socket.removeListener('close', this._onClose);
}
if (this._onError) {
this.socket.removeListener('error', this._onError);
}
this.socket = undefined;
}
if (this.wsr) {
if (this._onWSRClose) {
this.wsr.removeEventListener('close', this._onWSRClose);
}
this.wsr = undefined;
}
}
private async onerror(error: Error): Promise<void> {
window.log.error('websocket error', error);
}
private async dispatchAndWait(event: Event): Promise<void> {
this.appQueue.add(async () => Promise.all(this.dispatchEvent(event)));
}
private async onclose(code: number, reason: string): Promise<void> {
window.log.info(
'MessageReceiver: websocket closed',
code,
reason || '',
'calledClose:',
this.calledClose
);
this.socketStatus = SocketStatus.CLOSED;
this.shutdown();
if (this.calledClose) {
return;
}
if (code === 3000) {
return;
}
if (code === 3001) {
this.onEmpty();
}
const timeout = this.reconnectBackOff.getAndIncrement();
window.log.info(`MessageReceiver: reconnecting after ${timeout}ms`);
await sleep(timeout);
// Try to reconnect (if there is an HTTP error - we'll get an
// `error` event from `connect()` and hit the secondary retry backoff
// logic in `ts/background.ts`)
await this.connect();
// Successfull reconnect, reset the backoff timeouts
this.reconnectBackOff.reset();
}
private handleRequest(request: IncomingWebSocketRequest): void {
// We do the message decryption here, instead of in the ordered pending queue,
// to avoid exposing the time it took us to process messages through the time-to-ack.
if (request.path !== '/api/v1/message') {
window.log.info('got request', request.verb, request.path);
request.respond(200, 'OK');
if (request.verb === 'PUT' && request.path === '/api/v1/queue/empty') {
this.incomingQueue.add(() => {
this.onEmpty();
});
}
return;
}
const job = async () => {
const headers = request.headers || [];
if (!request.body) {
throw new Error(
'MessageReceiver.handleRequest: request.body was falsey!'
);
}
const plaintext = request.body;
try {
const decoded = Proto.Envelope.decode(plaintext);
const serverTimestamp = normalizeNumber(decoded.serverTimestamp);
const envelope: ProcessedEnvelope = {
// Make non-private envelope IDs dashless so they don't get redacted
// from logs
id: getGuid().replace(/-/g, ''),
receivedAtCounter: window.Signal.Util.incrementMessageCounter(),
receivedAtDate: Date.now(),
// Calculate the message age (time on server).
messageAgeSec: this.calculateMessageAge(headers, serverTimestamp),
// Proto.Envelope fields
type: decoded.type,
source: decoded.source,
sourceUuid: decoded.sourceUuid
? normalizeUuid(
decoded.sourceUuid,
'MessageReceiver.handleRequest.sourceUuid'
)
: undefined,
sourceDevice: decoded.sourceDevice,
timestamp: normalizeNumber(decoded.timestamp),
legacyMessage: dropNull(decoded.legacyMessage),
content: dropNull(decoded.content),
serverGuid: decoded.serverGuid,
serverTimestamp,
};
// After this point, decoding errors are not the server's
// fault, and we should handle them gracefully and tell the
// user they received an invalid message
if (envelope.source && this.isBlocked(envelope.source)) {
request.respond(200, 'OK');
return;
}
if (envelope.sourceUuid && this.isUuidBlocked(envelope.sourceUuid)) {
request.respond(200, 'OK');
return;
}
this.decryptAndCache(envelope, plaintext, request);
this.processedCount += 1;
} catch (e) {
request.respond(500, 'Bad encrypted websocket message');
window.log.error(
'Error handling incoming message:',
Errors.toLogFormat(e)
);
await this.dispatchAndWait(new ErrorEvent(e));
}
};
this.incomingQueue.add(job);
}
private calculateMessageAge(
headers: ReadonlyArray<string>,
serverTimestamp?: number
@ -748,10 +524,6 @@ export default class MessageReceiver extends EventTarget {
}
}
public hasEmptied(): boolean {
return Boolean(this.isEmptied);
}
private onEmpty(): void {
const emitEmpty = async () => {
await Promise.all([
@ -795,18 +567,6 @@ export default class MessageReceiver extends EventTarget {
waitForCacheAddBatcher();
}
private async drain(): Promise<void> {
const waitForEncryptedQueue = async () =>
this.addToQueue(async () => {
window.log.info('drained');
}, TaskType.Decrypted);
const waitForIncomingQueue = async () =>
this.addToQueue(waitForEncryptedQueue, TaskType.Encrypted);
return this.incomingQueue.add(waitForIncomingQueue);
}
private updateProgress(count: number): void {
// count by 10s
if (count % 10 !== 0) {
@ -890,7 +650,7 @@ export default class MessageReceiver extends EventTarget {
try {
const { id } = item;
await window.textsecure.storage.protocol.removeUnprocessed(id);
await this.storage.protocol.removeUnprocessed(id);
} catch (deleteError) {
window.log.error(
'queueCached error deleting item',
@ -931,17 +691,17 @@ export default class MessageReceiver extends EventTarget {
private async getAllFromCache(): Promise<Array<UnprocessedType>> {
window.log.info('getAllFromCache');
const count = await window.textsecure.storage.protocol.getUnprocessedCount();
const count = await this.storage.protocol.getUnprocessedCount();
if (count > 1500) {
await window.textsecure.storage.protocol.removeAllUnprocessed();
await this.storage.protocol.removeAllUnprocessed();
window.log.warn(
`There were ${count} messages in cache. Deleted all instead of reprocessing`
);
return [];
}
const items = await window.textsecure.storage.protocol.getAllUnprocessed();
const items = await this.storage.protocol.getAllUnprocessed();
window.log.info('getAllFromCache loaded', items.length, 'saved envelopes');
return Promise.all(
@ -954,9 +714,9 @@ export default class MessageReceiver extends EventTarget {
'getAllFromCache final attempt for envelope',
item.id
);
await window.textsecure.storage.protocol.removeUnprocessed(item.id);
await this.storage.protocol.removeUnprocessed(item.id);
} else {
await window.textsecure.storage.protocol.updateUnprocessedAttempts(
await this.storage.protocol.updateUnprocessedAttempts(
item.id,
attempts
);
@ -986,7 +746,7 @@ export default class MessageReceiver extends EventTarget {
}>
> = [];
const storageProtocol = window.textsecure.storage.protocol;
const storageProtocol = this.storage.protocol;
try {
const zone = new Zone('decryptAndCacheBatch', {
@ -1124,7 +884,7 @@ export default class MessageReceiver extends EventTarget {
}
private async cacheRemoveBatch(items: Array<string>): Promise<void> {
await window.textsecure.storage.protocol.removeUnprocessed(items);
await this.storage.protocol.removeUnprocessed(items);
}
private removeFromCache(envelope: ProcessedEnvelope): void {
@ -1140,7 +900,7 @@ export default class MessageReceiver extends EventTarget {
window.log.info('queueing decrypted envelope', id);
const task = this.handleDecryptedEnvelope.bind(this, envelope, plaintext);
const taskWithTimeout = window.textsecure.createTaskWithTimeout(
const taskWithTimeout = createTaskWithTimeout(
task,
`queueDecryptedEnvelope ${id}`
);
@ -1163,7 +923,7 @@ export default class MessageReceiver extends EventTarget {
window.log.info('queueing envelope', id);
const task = this.decryptEnvelope.bind(this, stores, envelope);
const taskWithTimeout = window.textsecure.createTaskWithTimeout(
const taskWithTimeout = createTaskWithTimeout(
task,
`queueEncryptedEnvelope ${id}`
);
@ -1456,10 +1216,10 @@ export default class MessageReceiver extends EventTarget {
envelope: UnsealedEnvelope,
ciphertext: Uint8Array
): Promise<DecryptSealedSenderResult> {
const localE164 = window.textsecure.storage.user.getNumber();
const localUuid = window.textsecure.storage.user.getUuid();
const localE164 = this.storage.user.getNumber();
const localUuid = this.storage.user.getUuid();
const localDeviceId = parseIntOrThrow(
window.textsecure.storage.user.getDeviceId(),
this.storage.user.getDeviceId(),
'MessageReceiver.decryptSealedSender: localDeviceId'
);
@ -1513,7 +1273,7 @@ export default class MessageReceiver extends EventTarget {
const address = `${sealedSenderIdentifier}.${sealedSenderSourceDevice}`;
const plaintext = await window.textsecure.storage.protocol.enqueueSenderKeyJob(
const plaintext = await this.storage.protocol.enqueueSenderKeyJob(
address,
() =>
groupDecrypt(
@ -1539,7 +1299,7 @@ export default class MessageReceiver extends EventTarget {
const sealedSenderIdentifier = envelope.sourceUuid || envelope.source;
const address = `${sealedSenderIdentifier}.${envelope.sourceDevice}`;
const unsealedPlaintext = await window.textsecure.storage.protocol.enqueueSessionJob(
const unsealedPlaintext = await this.storage.protocol.enqueueSessionJob(
address,
() =>
sealedSenderDecryptMessage(
@ -1598,7 +1358,7 @@ export default class MessageReceiver extends EventTarget {
const signalMessage = SignalMessage.deserialize(Buffer.from(ciphertext));
const address = `${identifier}.${sourceDevice}`;
const plaintext = await window.textsecure.storage.protocol.enqueueSessionJob(
const plaintext = await this.storage.protocol.enqueueSessionJob(
address,
async () =>
this.unpad(
@ -1630,7 +1390,7 @@ export default class MessageReceiver extends EventTarget {
);
const address = `${identifier}.${sourceDevice}`;
const plaintext = await window.textsecure.storage.protocol.enqueueSessionJob(
const plaintext = await this.storage.protocol.enqueueSessionJob(
address,
async () =>
this.unpad(
@ -1780,8 +1540,8 @@ export default class MessageReceiver extends EventTarget {
const groupId = this.getProcessedGroupId(message);
const isBlocked = groupId ? this.isGroupBlocked(groupId) : false;
const { source, sourceUuid } = envelope;
const ourE164 = window.textsecure.storage.user.getNumber();
const ourUuid = window.textsecure.storage.user.getUuid();
const ourE164 = this.storage.user.getNumber();
const ourUuid = this.storage.user.getUuid();
const isMe =
(source && ourE164 && source === ourE164) ||
(sourceUuid && ourUuid && sourceUuid === ourUuid);
@ -1869,8 +1629,8 @@ export default class MessageReceiver extends EventTarget {
const groupId = this.getProcessedGroupId(message);
const isBlocked = groupId ? this.isGroupBlocked(groupId) : false;
const { source, sourceUuid } = envelope;
const ourE164 = window.textsecure.storage.user.getNumber();
const ourUuid = window.textsecure.storage.user.getUuid();
const ourE164 = this.storage.user.getNumber();
const ourUuid = this.storage.user.getUuid();
const isMe =
(source && ourE164 && source === ourE164) ||
(sourceUuid && ourUuid && sourceUuid === ourUuid);
@ -2086,7 +1846,7 @@ export default class MessageReceiver extends EventTarget {
const senderKeyStore = new SenderKeys();
const address = `${identifier}.${sourceDevice}`;
await window.textsecure.storage.protocol.enqueueSenderKeyJob(
await this.storage.protocol.enqueueSenderKeyJob(
address,
() =>
processSenderKeyDistributionMessage(
@ -2326,15 +2086,19 @@ export default class MessageReceiver extends EventTarget {
envelope: ProcessedEnvelope,
syncMessage: ProcessedSyncMessage
): Promise<void> {
const fromSelfSource =
envelope.source && envelope.source === this.number_id;
const ourNumber = this.storage.user.getNumber();
const ourUuid = this.storage.user.getUuid();
const fromSelfSource = envelope.source && envelope.source === ourNumber;
const fromSelfSourceUuid =
envelope.sourceUuid && envelope.sourceUuid === this.uuid_id;
envelope.sourceUuid && envelope.sourceUuid === ourUuid;
if (!fromSelfSource && !fromSelfSourceUuid) {
throw new Error('Received sync message from another number');
}
const ourDeviceId = this.storage.user.getDeviceId();
// eslint-disable-next-line eqeqeq
if (envelope.sourceDevice == this.deviceId) {
if (envelope.sourceDevice == ourDeviceId) {
throw new Error('Received sync message from our own device');
}
if (syncMessage.sent) {
@ -2681,14 +2445,14 @@ export default class MessageReceiver extends EventTarget {
): Promise<void> {
window.log.info('Setting these numbers as blocked:', blocked.numbers);
if (blocked.numbers) {
await window.textsecure.storage.put('blocked', blocked.numbers);
await this.storage.put('blocked', blocked.numbers);
}
if (blocked.uuids) {
const uuids = blocked.uuids.map((uuid, index) => {
return normalizeUuid(uuid, `handleBlocked.uuids.${index}`);
});
window.log.info('Setting these uuids as blocked:', uuids);
await window.textsecure.storage.put('blocked-uuids', uuids);
await this.storage.put('blocked-uuids', uuids);
}
const groupIds = map(blocked.groupIds, groupId => Bytes.toBinary(groupId));
@ -2696,33 +2460,33 @@ export default class MessageReceiver extends EventTarget {
'Setting these groups as blocked:',
groupIds.map(groupId => `group(${groupId})`)
);
await window.textsecure.storage.put('blocked-groups', groupIds);
await this.storage.put('blocked-groups', groupIds);
this.removeFromCache(envelope);
}
private isBlocked(number: string): boolean {
return window.textsecure.storage.blocked.isBlocked(number);
return this.storage.blocked.isBlocked(number);
}
private isUuidBlocked(uuid: string): boolean {
return window.textsecure.storage.blocked.isUuidBlocked(uuid);
return this.storage.blocked.isUuidBlocked(uuid);
}
private isGroupBlocked(groupId: string): boolean {
return window.textsecure.storage.blocked.isGroupBlocked(groupId);
return this.storage.blocked.isGroupBlocked(groupId);
}
private async handleAttachment(
attachment: Proto.IAttachmentPointer
): Promise<DownloadedAttachmentType> {
const cleaned = processAttachment(attachment);
return this.downloadAttachment(cleaned);
return downloadAttachment(this.server, cleaned);
}
private async handleEndSession(identifier: string): Promise<void> {
window.log.info(`handleEndSession: closing sessions for ${identifier}`);
await window.textsecure.storage.protocol.archiveAllSessions(identifier);
await this.storage.protocol.archiveAllSessions(identifier);
}
private async processDecrypted(

View file

@ -0,0 +1,627 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
/* eslint-disable no-restricted-syntax */
import URL from 'url';
import ProxyAgent from 'proxy-agent';
import { RequestInit, Response, Headers } from 'node-fetch';
import { client as WebSocketClient } from 'websocket';
import qs from 'querystring';
import EventListener from 'events';
import { AbortableProcess } from '../util/AbortableProcess';
import { strictAssert } from '../util/assert';
import { explodePromise } from '../util/explodePromise';
import { BackOff, FIBONACCI_TIMEOUTS } from '../util/BackOff';
import { getUserAgent } from '../util/getUserAgent';
import { sleep } from '../util/sleep';
import { SocketStatus } from '../types/SocketStatus';
import * as Errors from '../types/errors';
import * as Bytes from '../Bytes';
import WebSocketResource, {
WebSocketResourceOptions,
IncomingWebSocketRequest,
} from './WebsocketResources';
import { ConnectTimeoutError, HTTPError } from './Errors';
import { handleStatusCode, translateError } from './Utils';
import { WebAPICredentials, IRequestHandler } from './Types.d';
// TODO: remove once we move away from ArrayBuffers
const FIXMEU8 = Uint8Array;
const TEN_SECONDS = 1000 * 10;
const FIVE_MINUTES = 5 * 60 * 1000;
export type SocketManagerOptions = Readonly<{
url: string;
certificateAuthority: string;
version: string;
proxyUrl?: string;
}>;
// This class manages two websocket resource:
//
// - Authenticated WebSocketResource which uses supplied WebAPICredentials and
// automatically reconnects on closed socket (using back off)
// - Unauthenticated WebSocketResource that is created on demand and reconnected
// every 5 minutes.
//
// Incoming requests on authenticated resource are funneled into the registered
// request handlers (`registerRequestHandler`) or queued internally until at
// least one such request handler becomes available.
//
// Incoming requests on unauthenticated resource are not currently supported.
// WebSocketResource is responsible for their immediate termination.
export class SocketManager extends EventListener {
private backOff = new BackOff(FIBONACCI_TIMEOUTS);
private authenticated?: AbortableProcess<WebSocketResource>;
private unauthenticated?: AbortableProcess<WebSocketResource>;
private credentials?: WebAPICredentials;
private readonly proxyAgent?: ProxyAgent;
private status = SocketStatus.CLOSED;
private requestHandlers = new Set<IRequestHandler>();
private incomingRequestQueue = new Array<IncomingWebSocketRequest>();
private isOffline = false;
constructor(private readonly options: SocketManagerOptions) {
super();
if (options.proxyUrl) {
this.proxyAgent = new ProxyAgent(options.proxyUrl);
}
}
public getStatus(): SocketStatus {
return this.status;
}
// Update WebAPICredentials and reconnect authenticated resource if
// credentials changed
public async authenticate(credentials: WebAPICredentials): Promise<void> {
if (this.isOffline) {
throw new HTTPError('SocketManager offline', {
code: 0,
headers: {},
stack: new Error().stack,
});
}
const { username, password } = credentials;
if (!username && !password) {
window.log.warn(
'SocketManager authenticate was called without credentials'
);
return;
}
if (
this.credentials &&
this.credentials.username === username &&
this.credentials.password === password &&
this.authenticated
) {
try {
await this.authenticated.getResult();
} catch (error) {
window.log.warn(
'SocketManager: failed to wait for existing authenticated socket ' +
` due to error: ${Errors.toLogFormat(error)}`
);
}
return;
}
this.credentials = credentials;
window.log.info('SocketManager: connecting authenticated socket');
this.status = SocketStatus.CONNECTING;
const process = this.connectResource({
path: '/v1/websocket/',
query: { login: username, password },
resourceOptions: {
keepalive: { path: '/v1/keepalive' },
handleRequest: (req: IncomingWebSocketRequest): void => {
this.queueOrHandleRequest(req);
},
},
});
// Cancel previous connect attempt or close socket
this.authenticated?.abort();
this.authenticated = process;
const reconnect = async (): Promise<void> => {
const timeout = this.backOff.getAndIncrement();
window.log.info(
'SocketManager: reconnecting authenticated socket ' +
`after ${timeout}ms`
);
await sleep(timeout);
if (this.isOffline) {
window.log.info(
'SocketManager: cancelled reconnect because we are offline'
);
return;
}
if (this.authenticated) {
window.log.info(
'SocketManager: authenticated socket already reconnected'
);
return;
}
strictAssert(this.credentials !== undefined, 'Missing credentials');
try {
await this.authenticate(this.credentials);
} catch (error) {
window.log.info(
'SocketManager: authenticated socket failed to reconect ' +
`due to error ${Errors.toLogFormat(error)}`
);
return reconnect();
}
};
let authenticated: WebSocketResource;
try {
authenticated = await process.getResult();
this.status = SocketStatus.OPEN;
} catch (error) {
strictAssert(this.authenticated === process, 'Someone stole our socket');
this.dropAuthenticated(process);
window.log.warn(
'SocketManager: authenticated socket connection failed with ' +
`error: ${Errors.toLogFormat(error)}`
);
if (error instanceof HTTPError) {
const { code } = error;
if (code === 401 || code === 403) {
this.emit('authError', error);
return;
}
if (code !== 500 && code !== -1) {
// No reconnect attempt should be made
return;
}
}
reconnect();
return;
}
window.log.info('SocketManager: connected authenticated socket');
window.logAuthenticatedConnect();
this.backOff.reset();
authenticated.addEventListener('close', ({ code, reason }): void => {
if (this.authenticated !== process) {
return;
}
window.log.warn(
'SocketManager: authenticated socket closed ' +
`with code=${code} and reason=${reason}`
);
this.dropAuthenticated(process);
if (code === 3000) {
// Intentional disconnect
return;
}
reconnect();
});
}
// Either returns currently connecting/active authenticated
// WebSocketResource or connects a fresh one.
public async getAuthenticatedResource(): Promise<WebSocketResource> {
if (!this.authenticated) {
strictAssert(this.credentials !== undefined, 'Missing credentials');
await this.authenticate(this.credentials);
}
strictAssert(this.authenticated !== undefined, 'Authentication failed');
return this.authenticated.getResult();
}
// Creates new WebSocketResource for AccountManager's provisioning
public async getProvisioningResource(
handler: IRequestHandler
): Promise<WebSocketResource> {
return this.connectResource({
path: '/v1/websocket/provisioning/',
resourceOptions: {
handleRequest: (req: IncomingWebSocketRequest): void => {
handler.handleRequest(req);
},
keepalive: { path: '/v1/keepalive/provisioning' },
},
}).getResult();
}
// Fetch-compatible wrapper around underlying unauthenticated/authenticated
// websocket resources. This wrapper supports only limited number of features
// of node-fetch despite being API compatible.
public async fetch(url: string, init: RequestInit): Promise<Response> {
const headers = new Headers(init.headers);
const isAuthenticated = headers.has('Authorization');
let resource: WebSocketResource;
if (isAuthenticated) {
resource = await this.getAuthenticatedResource();
} else {
resource = await this.getUnauthenticatedResource();
}
const { path } = URL.parse(url);
strictAssert(path, "Fetch can't have empty path");
const { method = 'GET', body, timeout } = init;
let bodyBytes: Uint8Array | undefined;
if (body === undefined) {
bodyBytes = undefined;
} else if (body instanceof Uint8Array) {
bodyBytes = body;
} else if (body instanceof ArrayBuffer) {
bodyBytes = new FIXMEU8(body);
} else if (typeof body === 'string') {
bodyBytes = Bytes.fromString(body);
} else {
throw new Error(`Unsupported body type: ${typeof body}`);
}
const {
status,
message: statusText,
response,
headers: flatResponseHeaders,
} = await resource.sendRequest({
verb: method,
path,
body: bodyBytes,
headers: Array.from(headers.entries()).map(([key, value]) => {
return `${key}:${value}`;
}),
timeout,
});
const responseHeaders: Array<[string, string]> = flatResponseHeaders.map(
header => {
const [key, value] = header.split(':', 2);
strictAssert(value !== undefined, 'Invalid header!');
return [key, value];
}
);
return new Response(response, {
status,
statusText,
headers: responseHeaders,
});
}
public registerRequestHandler(handler: IRequestHandler): void {
this.requestHandlers.add(handler);
const queue = this.incomingRequestQueue;
if (queue.length === 0) {
return;
}
window.log.info(
`SocketManager: processing ${queue.length} queued incoming requests`
);
this.incomingRequestQueue = [];
for (const req of queue) {
this.queueOrHandleRequest(req);
}
}
public unregisterRequestHandler(handler: IRequestHandler): void {
this.requestHandlers.delete(handler);
}
// Force keep-alive checks on WebSocketResources
public async check(): Promise<void> {
if (this.isOffline) {
return;
}
window.log.info('SocketManager.check');
await Promise.all([
SocketManager.checkResource(this.authenticated),
SocketManager.checkResource(this.unauthenticated),
]);
}
// Puts SocketManager into "online" state and reconnects the authenticated
// WebSocketResource (if there are valid credentials)
public async onOnline(): Promise<void> {
window.log.info('SocketManager.onOnline');
this.isOffline = false;
if (this.credentials) {
await this.authenticate(this.credentials);
}
}
// Puts SocketManager into "offline" state and gracefully disconnects both
// unauthenticated and authenticated resources.
public async onOffline(): Promise<void> {
window.log.info('SocketManager.onOffline');
this.isOffline = true;
this.authenticated?.abort();
this.unauthenticated?.abort();
}
//
// Private
//
private async getUnauthenticatedResource(): Promise<WebSocketResource> {
if (this.isOffline) {
throw new HTTPError('SocketManager offline', {
code: 0,
headers: {},
stack: new Error().stack,
});
}
if (this.unauthenticated) {
return this.unauthenticated.getResult();
}
window.log.info('SocketManager: connecting unauthenticated socket');
const process = this.connectResource({
path: '/v1/websocket/',
resourceOptions: {
keepalive: { path: '/v1/keepalive' },
},
});
this.unauthenticated = process;
const unauthenticated = await this.unauthenticated.getResult();
window.log.info('SocketManager: connected unauthenticated socket');
let timer: NodeJS.Timeout | undefined = setTimeout(() => {
window.log.info(
'SocketManager: shutting down unauthenticated socket after timeout'
);
timer = undefined;
unauthenticated.shutdown();
this.dropUnauthenticated(process);
}, FIVE_MINUTES);
unauthenticated.addEventListener('close', ({ code, reason }): void => {
if (timer !== undefined) {
clearTimeout(timer);
timer = undefined;
}
if (this.unauthenticated !== process) {
return;
}
window.log.warn(
'SocketManager: unauthenticated socket closed ' +
`with code=${code} and reason=${reason}`
);
this.dropUnauthenticated(process);
});
return this.unauthenticated.getResult();
}
private connectResource({
path,
resourceOptions,
query = {},
timeout = TEN_SECONDS,
}: {
path: string;
resourceOptions: WebSocketResourceOptions;
query?: Record<string, string>;
timeout?: number;
}): AbortableProcess<WebSocketResource> {
const fixedScheme = this.options.url
.replace('https://', 'wss://')
.replace('http://', 'ws://');
const headers = {
'User-Agent': getUserAgent(this.options.version),
};
const client = new WebSocketClient({
tlsOptions: {
ca: this.options.certificateAuthority,
agent: this.proxyAgent,
},
maxReceivedFrameSize: 0x210000,
});
const queryWithDefaults = {
agent: 'OWD',
version: this.options.version,
...query,
};
client.connect(
`${fixedScheme}${path}?${qs.encode(queryWithDefaults)}`,
undefined,
undefined,
headers
);
const { stack } = new Error();
const { promise, resolve, reject } = explodePromise<WebSocketResource>();
const timer = setTimeout(() => {
reject(new ConnectTimeoutError('Connection timed out'));
client.abort();
}, timeout);
let resource: WebSocketResource | undefined;
client.on('connect', socket => {
clearTimeout(timer);
resource = new WebSocketResource(socket, resourceOptions);
resolve(resource);
});
client.on('httpResponse', async response => {
clearTimeout(timer);
const statusCode = response.statusCode || -1;
await handleStatusCode(statusCode);
const error = new HTTPError(
'connectResource: invalid websocket response',
{
code: statusCode || -1,
headers: {},
stack,
}
);
const translatedError = translateError(error);
strictAssert(
translatedError,
'`httpResponse` event cannot be emitted with 200 status code'
);
reject(translatedError);
});
client.on('connectFailed', e => {
clearTimeout(timer);
reject(
new HTTPError('connectResource: connectFailed', {
code: -1,
headers: {},
response: e.toString(),
stack,
})
);
});
return new AbortableProcess<WebSocketResource>(
`SocketManager.connectResource(${path})`,
{
abort() {
if (resource) {
resource.close(3000, 'aborted');
} else {
clearTimeout(timer);
client.abort();
}
},
},
promise
);
}
private static async checkResource(
process?: AbortableProcess<WebSocketResource>
): Promise<void> {
if (!process) {
return;
}
const resource = await process.getResult();
resource.forceKeepAlive();
}
private dropAuthenticated(
process: AbortableProcess<WebSocketResource>
): void {
strictAssert(
this.authenticated === process,
'Authenticated resource mismatch'
);
this.incomingRequestQueue = [];
this.authenticated = undefined;
this.status = SocketStatus.CLOSED;
}
private dropUnauthenticated(
process: AbortableProcess<WebSocketResource>
): void {
strictAssert(
this.unauthenticated === process,
'Unauthenticated resource mismatch'
);
this.unauthenticated = undefined;
}
private queueOrHandleRequest(req: IncomingWebSocketRequest): void {
if (this.requestHandlers.size === 0) {
this.incomingRequestQueue.push(req);
window.log.info(
'SocketManager: request handler unavailable, ' +
`queued request. Queue size: ${this.incomingRequestQueue.length}`
);
return;
}
for (const handlers of this.requestHandlers) {
try {
handlers.handleRequest(req);
} catch (error) {
window.log.warn(
'SocketManager: got exception while handling incoming request, ' +
`error: ${Errors.toLogFormat(error)}`
);
}
}
}
// EventEmitter types
public on(type: 'authError', callback: (error: HTTPError) => void): this;
public on(
type: string | symbol,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
listener: (...args: Array<any>) => void
): this {
return super.on(type, listener);
}
public emit(type: 'authError', error: HTTPError): boolean;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
public emit(type: string | symbol, ...args: Array<any>): boolean {
return super.emit(type, ...args);
}
}

View file

@ -2,6 +2,7 @@
// SPDX-License-Identifier: AGPL-3.0-only
import type { SignalService as Proto } from '../protobuf';
import type { IncomingWebSocketRequest } from './WebsocketResources';
export {
IdentityKeyType,
@ -232,3 +233,7 @@ export interface CallbackResultType {
timestamp?: number;
recipients?: Record<string, Array<number>>;
}
export interface IRequestHandler {
handleRequest(request: IncomingWebSocketRequest): void;
}

46
ts/textsecure/Utils.ts Normal file
View file

@ -0,0 +1,46 @@
// Copyright 2020-2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
export async function handleStatusCode(status: number): Promise<void> {
if (status === 499) {
window.log.error('Got 499 from Signal Server. Build is expired.');
await window.storage.put('remoteBuildExpiration', Date.now());
window.reduxActions.expiration.hydrateExpirationStatus(true);
}
}
export function translateError(error: Error): Error | undefined {
const { code } = error;
if (code === 200) {
// Happens sometimes when we get no response. Might be nice to get 204 instead.
return undefined;
}
let message: string;
switch (code) {
case -1:
message =
'Failed to connect to the server, please check your network connection.';
break;
case 413:
message = 'Rate limit exceeded, please try again later.';
break;
case 403:
message = 'Invalid code, please try again.';
break;
case 417:
message = 'Number already registered.';
break;
case 401:
message =
'Invalid authentication, most likely someone re-registered and invalidated our registration.';
break;
case 404:
message = 'Number is not registered.';
break;
default:
message = 'The server rejected our query, please file a bug report.';
}
// eslint-disable-next-line no-param-reassign
error.message = `${message} (original: ${error.message})`;
return error;
}

View file

@ -11,7 +11,7 @@
import fetch, { Response } from 'node-fetch';
import ProxyAgent from 'proxy-agent';
import { Agent, RequestOptions } from 'https';
import { Agent } from 'https';
import pProps from 'p-props';
import {
compact,
@ -26,13 +26,13 @@ import { pki } from 'node-forge';
import is from '@sindresorhus/is';
import PQueue from 'p-queue';
import { v4 as getGuid } from 'uuid';
import { client as WebSocketClient, connection as WebSocket } from 'websocket';
import { z } from 'zod';
import Long from 'long';
import { assert } from '../util/assert';
import { getUserAgent } from '../util/getUserAgent';
import { toWebSafeBase64 } from '../util/webSafeBase64';
import { SocketStatus } from '../types/SocketStatus';
import { isPackIdValid, redactPackId } from '../types/Stickers';
import * as Bytes from '../Bytes';
import {
@ -55,11 +55,14 @@ import {
StorageServiceCallOptionsType,
StorageServiceCredentials,
} from '../textsecure.d';
import { SocketManager } from './SocketManager';
import WebSocketResource from './WebsocketResources';
import { SignalService as Proto } from '../protobuf';
import { ConnectTimeoutError } from './Errors';
import { HTTPError } from './Errors';
import MessageSender from './SendMessage';
import { WebAPICredentials } from './Types.d';
import { WebAPICredentials, IRequestHandler } from './Types.d';
import { handleStatusCode, translateError } from './Utils';
// TODO: remove once we move away from ArrayBuffers
const FIXMEU8 = Uint8Array;
@ -263,96 +266,6 @@ function _validateResponse(response: any, schema: any) {
return true;
}
export type ConnectSocketOptions = Readonly<{
certificateAuthority: string;
proxyUrl?: string;
version: string;
timeout?: number;
}>;
const TEN_SECONDS = 1000 * 10;
async function _connectSocket(
url: string,
{
certificateAuthority,
proxyUrl,
version,
timeout = TEN_SECONDS,
}: ConnectSocketOptions
): Promise<WebSocket> {
let tlsOptions: RequestOptions = {
ca: certificateAuthority,
};
if (proxyUrl) {
tlsOptions = {
...tlsOptions,
agent: new ProxyAgent(proxyUrl),
};
}
const headers = {
'User-Agent': getUserAgent(version),
};
const client = new WebSocketClient({
tlsOptions,
maxReceivedFrameSize: 0x210000,
});
client.connect(url, undefined, undefined, headers);
const { stack } = new Error();
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
reject(new ConnectTimeoutError('Connection timed out'));
client.abort();
}, timeout);
client.on('connect', socket => {
clearTimeout(timer);
resolve(socket);
});
client.on('httpResponse', async response => {
clearTimeout(timer);
const statusCode = response.statusCode || -1;
await _handleStatusCode(statusCode);
const error = makeHTTPError(
'_connectSocket: invalid websocket response',
statusCode || -1,
{}, // headers
undefined,
stack
);
const translatedError = _translateError(error);
assert(
translatedError,
'`httpResponse` event cannot be emitted with 200 status code'
);
reject(translatedError);
});
client.on('connectFailed', e => {
clearTimeout(timer);
reject(
makeHTTPError(
'_connectSocket: connectFailed',
-1,
{},
e.toString(),
stack
)
);
});
});
}
const FIVE_MINUTES = 1000 * 60 * 5;
type AgentCacheType = {
@ -378,6 +291,7 @@ type HTTPCodeType = 'GET' | 'POST' | 'PUT' | 'DELETE' | 'PATCH';
type RedactUrl = (url: string) => string;
type PromiseAjaxOptionsType = {
socketManager?: SocketManager;
accessKey?: string;
basicAuth?: string;
certificateAuthority?: string;
@ -468,284 +382,218 @@ function getHostname(url: string): string {
return urlObject.hostname;
}
async function _handleStatusCode(
status: number,
unauthenticated = false
): Promise<void> {
if (status === 499) {
window.log.error('Got 499 from Signal Server. Build is expired.');
await window.storage.put('remoteBuildExpiration', Date.now());
window.reduxActions.expiration.hydrateExpirationStatus(true);
}
if (!unauthenticated && status === 401) {
window.log.error('Got 401 from Signal Server. We might be unlinked.');
window.Whisper.events.trigger('mightBeUnlinked');
}
}
function _translateError(error: Error): Error | undefined {
const { code } = error;
if (code === 200) {
// Happens sometimes when we get no response. Might be nice to get 204 instead.
return undefined;
}
let message: string;
switch (code) {
case -1:
message =
'Failed to connect to the server, please check your network connection.';
break;
case 413:
message = 'Rate limit exceeded, please try again later.';
break;
case 403:
message = 'Invalid code, please try again.';
break;
case 417:
message = 'Number already registered.';
break;
case 401:
message =
'Invalid authentication, most likely someone re-registered and invalidated our registration.';
break;
case 404:
message = 'Number is not registered.';
break;
default:
message = 'The server rejected our query, please file a bug report.';
}
error.message = `${message} (original: ${error.message})`;
return error;
}
async function _promiseAjax(
providedUrl: string | null,
options: PromiseAjaxOptionsType
): Promise<any> {
return new Promise((resolve, reject) => {
const url = providedUrl || `${options.host}/${options.path}`;
): Promise<
| string
| ArrayBuffer
| unknown
| JSONWithDetailsType
| ArrayBufferWithDetailsType
> {
const url = providedUrl || `${options.host}/${options.path}`;
const unauthLabel = options.unauthenticated ? ' (unauth)' : '';
const unauthLabel = options.unauthenticated ? ' (unauth)' : '';
if (options.redactUrl) {
window.log.info(`${options.type} ${options.redactUrl(url)}${unauthLabel}`);
} else {
window.log.info(`${options.type} ${url}${unauthLabel}`);
}
const timeout = typeof options.timeout === 'number' ? options.timeout : 10000;
const { proxyUrl, socketManager } = options;
const agentType = options.unauthenticated ? 'unauth' : 'auth';
const cacheKey = `${proxyUrl}-${agentType}`;
const { timestamp } = agents[cacheKey] || { timestamp: null };
if (!timestamp || timestamp + FIVE_MINUTES < Date.now()) {
if (timestamp) {
window.log.info(`Cycling agent for type ${cacheKey}`);
}
agents[cacheKey] = {
agent: proxyUrl
? new ProxyAgent(proxyUrl)
: new Agent({ keepAlive: true }),
timestamp: Date.now(),
};
}
const { agent } = agents[cacheKey];
const fetchOptions = {
method: options.type,
body: options.data,
headers: {
'User-Agent': getUserAgent(options.version),
'X-Signal-Agent': 'OWD',
...options.headers,
} as FetchHeaderListType,
redirect: options.redirect,
agent,
ca: options.certificateAuthority,
timeout,
};
if (fetchOptions.body instanceof ArrayBuffer) {
// node-fetch doesn't support ArrayBuffer, only node Buffer
const contentLength = fetchOptions.body.byteLength;
fetchOptions.body = Buffer.from(fetchOptions.body);
// node-fetch doesn't set content-length like S3 requires
fetchOptions.headers['Content-Length'] = contentLength.toString();
}
const { accessKey, basicAuth, unauthenticated } = options;
if (basicAuth) {
fetchOptions.headers.Authorization = `Basic ${basicAuth}`;
} else if (unauthenticated) {
if (!accessKey) {
throw new Error(
'_promiseAjax: mode is unauthenticated, but accessKey was not provided'
);
}
// Access key is already a Base64 string
fetchOptions.headers['Unidentified-Access-Key'] = accessKey;
} else if (options.user && options.password) {
const user = _getString(options.user);
const password = _getString(options.password);
const auth = _btoa(`${user}:${password}`);
fetchOptions.headers.Authorization = `Basic ${auth}`;
}
if (options.contentType) {
fetchOptions.headers['Content-Type'] = options.contentType;
}
let response: Response;
let result: string | ArrayBuffer | unknown;
try {
response = socketManager
? await socketManager.fetch(url, fetchOptions)
: await fetch(url, fetchOptions);
if (
options.serverUrl &&
getHostname(options.serverUrl) === getHostname(url)
) {
await handleStatusCode(response.status);
if (!unauthenticated && response.status === 401) {
window.log.error('Got 401 from Signal Server. We might be unlinked.');
window.Whisper.events.trigger('mightBeUnlinked');
}
}
if (DEBUG && !isSuccess(response.status)) {
result = await response.text();
} else if (
(options.responseType === 'json' ||
options.responseType === 'jsonwithdetails') &&
/^application\/json(;.*)?$/.test(
response.headers.get('Content-Type') || ''
)
) {
result = await response.json();
} else if (
options.responseType === 'arraybuffer' ||
options.responseType === 'arraybufferwithdetails'
) {
result = await response.arrayBuffer();
} else {
result = await response.textConverted();
}
} catch (e) {
if (options.redactUrl) {
window.log.error(options.type, options.redactUrl(url), 0, 'Error');
} else {
window.log.error(options.type, url, 0, 'Error');
}
const stack = `${e.stack}\nInitial stack:\n${options.stack}`;
throw makeHTTPError('promiseAjax catch', 0, {}, e.toString(), stack);
}
if (!isSuccess(response.status)) {
if (options.redactUrl) {
window.log.info(
`${options.type} ${options.redactUrl(url)}${unauthLabel}`
options.type,
options.redactUrl(url),
response.status,
'Error'
);
} else {
window.log.info(`${options.type} ${url}${unauthLabel}`);
window.log.error(options.type, url, response.status, 'Error');
}
const timeout =
typeof options.timeout === 'number' ? options.timeout : 10000;
throw makeHTTPError(
'promiseAjax: error response',
response.status,
response.headers.raw(),
result,
options.stack
);
}
const { proxyUrl } = options;
const agentType = options.unauthenticated ? 'unauth' : 'auth';
const cacheKey = `${proxyUrl}-${agentType}`;
const { timestamp } = agents[cacheKey] || { timestamp: null };
if (!timestamp || timestamp + FIVE_MINUTES < Date.now()) {
if (timestamp) {
window.log.info(`Cycling agent for type ${cacheKey}`);
}
agents[cacheKey] = {
agent: proxyUrl
? new ProxyAgent(proxyUrl)
: new Agent({ keepAlive: true }),
timestamp: Date.now(),
};
}
const { agent } = agents[cacheKey];
const fetchOptions = {
method: options.type,
body: options.data,
headers: {
'User-Agent': getUserAgent(options.version),
'X-Signal-Agent': 'OWD',
...options.headers,
} as FetchHeaderListType,
redirect: options.redirect,
agent,
ca: options.certificateAuthority,
timeout,
};
if (fetchOptions.body instanceof ArrayBuffer) {
// node-fetch doesn't support ArrayBuffer, only node Buffer
const contentLength = fetchOptions.body.byteLength;
fetchOptions.body = Buffer.from(fetchOptions.body);
// node-fetch doesn't set content-length like S3 requires
fetchOptions.headers['Content-Length'] = contentLength.toString();
}
const { accessKey, basicAuth, unauthenticated } = options;
if (basicAuth) {
fetchOptions.headers.Authorization = `Basic ${basicAuth}`;
} else if (unauthenticated) {
if (!accessKey) {
throw new Error(
'_promiseAjax: mode is unauthenticated, but accessKey was not provided'
if (
options.responseType === 'json' ||
options.responseType === 'jsonwithdetails'
) {
if (options.validateResponse) {
if (!_validateResponse(result, options.validateResponse)) {
if (options.redactUrl) {
window.log.info(
options.type,
options.redactUrl(url),
response.status,
'Error'
);
} else {
window.log.error(options.type, url, response.status, 'Error');
}
throw makeHTTPError(
'promiseAjax: invalid response',
response.status,
response.headers.raw(),
result,
options.stack
);
}
// Access key is already a Base64 string
fetchOptions.headers['Unidentified-Access-Key'] = accessKey;
} else if (options.user && options.password) {
const user = _getString(options.user);
const password = _getString(options.password);
const auth = _btoa(`${user}:${password}`);
fetchOptions.headers.Authorization = `Basic ${auth}`;
}
}
if (options.contentType) {
fetchOptions.headers['Content-Type'] = options.contentType;
}
if (options.redactUrl) {
window.log.info(
options.type,
options.redactUrl(url),
response.status,
'Success'
);
} else {
window.log.info(options.type, url, response.status, 'Success');
}
fetch(url, fetchOptions)
.then(async response => {
if (
options.serverUrl &&
getHostname(options.serverUrl) === getHostname(url)
) {
await _handleStatusCode(response.status, unauthenticated);
}
if (options.responseType === 'arraybufferwithdetails') {
assert(result instanceof ArrayBuffer, 'Expected ArrayBuffer result');
const fullResult: ArrayBufferWithDetailsType = {
data: result,
contentType: getContentType(response),
response,
};
let resultPromise;
if (DEBUG && !isSuccess(response.status)) {
resultPromise = response.text();
} else if (
(options.responseType === 'json' ||
options.responseType === 'jsonwithdetails') &&
/^application\/json(;.*)?$/.test(
response.headers.get('Content-Type') || ''
)
) {
resultPromise = response.json();
} else if (
options.responseType === 'arraybuffer' ||
options.responseType === 'arraybufferwithdetails'
) {
resultPromise = response.buffer();
} else {
resultPromise = response.textConverted();
}
return fullResult;
}
return resultPromise.then(result => {
if (isSuccess(response.status)) {
if (
options.responseType === 'arraybuffer' ||
options.responseType === 'arraybufferwithdetails'
) {
result = result.buffer.slice(
result.byteOffset,
result.byteOffset + result.byteLength
);
}
if (
options.responseType === 'json' ||
options.responseType === 'jsonwithdetails'
) {
if (options.validateResponse) {
if (!_validateResponse(result, options.validateResponse)) {
if (options.redactUrl) {
window.log.info(
options.type,
options.redactUrl(url),
response.status,
'Error'
);
} else {
window.log.error(
options.type,
url,
response.status,
'Error'
);
}
reject(
makeHTTPError(
'promiseAjax: invalid response',
response.status,
response.headers.raw(),
result,
options.stack
)
);
if (options.responseType === 'jsonwithdetails') {
const fullResult: JSONWithDetailsType = {
data: result,
contentType: getContentType(response),
response,
};
return;
}
}
}
return fullResult;
}
if (options.redactUrl) {
window.log.info(
options.type,
options.redactUrl(url),
response.status,
'Success'
);
} else {
window.log.info(options.type, url, response.status, 'Success');
}
if (options.responseType === 'arraybufferwithdetails') {
const fullResult: ArrayBufferWithDetailsType = {
data: result,
contentType: getContentType(response),
response,
};
resolve(fullResult);
return;
}
if (options.responseType === 'jsonwithdetails') {
const fullResult: JSONWithDetailsType = {
data: result,
contentType: getContentType(response),
response,
};
resolve(fullResult);
return;
}
resolve(result);
return;
}
if (options.redactUrl) {
window.log.info(
options.type,
options.redactUrl(url),
response.status,
'Error'
);
} else {
window.log.error(options.type, url, response.status, 'Error');
}
reject(
makeHTTPError(
'promiseAjax: error response',
response.status,
response.headers.raw(),
result,
options.stack
)
);
});
})
.catch(e => {
if (options.redactUrl) {
window.log.error(options.type, options.redactUrl(url), 0, 'Error');
} else {
window.log.error(options.type, url, 0, 'Error');
}
const stack = `${e.stack}\nInitial stack:\n${options.stack}`;
reject(makeHTTPError('promiseAjax catch', 0, {}, e.toString(), stack));
});
});
return result;
}
async function _retryAjax(
@ -793,21 +641,12 @@ function makeHTTPError(
response: any,
stack?: string
) {
const code = providedCode > 999 || providedCode < 100 ? -1 : providedCode;
const e = new Error(`${message}; code: ${code}`);
e.name = 'HTTPError';
e.code = code;
e.responseHeaders = headers;
if (DEBUG && response) {
e.stack += `\nresponse: ${response}`;
}
e.stack += `\nOriginal stack:\n${stack}`;
if (response) {
e.response = response;
}
return e;
return new HTTPError(message, {
code: providedCode,
headers,
response,
stack,
});
}
const URL_CALLS = {
@ -844,6 +683,21 @@ const URL_CALLS = {
challenge: 'v1/challenge',
};
const WEBSOCKET_CALLS = new Set<keyof typeof URL_CALLS>([
// MessageController
'messages',
'reportMessage',
// ProfileController
'profile',
// AttachmentControllerV2
'attachmentId',
// RemoteConfigController
'config',
]);
type InitializeOptionsType = {
url: string;
storageUrl: string;
@ -983,7 +837,6 @@ export type WebAPIType = {
deviceId?: number,
options?: { accessKey?: string }
) => Promise<ServerKeysType>;
getMessageSocket: () => Promise<WebSocket>;
getMyKeys: () => Promise<number>;
getProfile: (
identifier: string,
@ -1000,7 +853,9 @@ export type WebAPIType = {
profileKeyCredentialRequest?: string;
}
) => Promise<any>;
getProvisioningSocket: () => Promise<WebSocket>;
getProvisioningResource: (
handler: IRequestHandler
) => Promise<WebSocketResource>;
getSenderCertificate: (
withUuid?: boolean
) => Promise<{ certificate: string }>;
@ -1086,6 +941,12 @@ export type WebAPIType = {
Array<{ name: string; enabled: boolean; value: string | null }>
>;
authenticate: (credentials: WebAPICredentials) => Promise<void>;
getSocketStatus: () => SocketStatus;
registerRequestHandler: (handler: IRequestHandler) => void;
unregisterRequestHandler: (handler: IRequestHandler) => void;
checkSockets: () => void;
onOnline: () => Promise<void>;
onOffline: () => Promise<void>;
};
export type SignedPreKeyType = {
@ -1200,8 +1061,27 @@ export function initialize({
const PARSE_RANGE_HEADER = /\/(\d+)$/;
const PARSE_GROUP_LOG_RANGE_HEADER = /$versions (\d{1,10})-(\d{1,10})\/(d{1,10})/;
const socketManager = new SocketManager({
url,
certificateAuthority,
version,
proxyUrl,
});
socketManager.on('authError', () => {
window.Whisper.events.trigger('unlinkAndDisconnect');
});
socketManager.authenticate({ username, password });
// Thanks, function hoisting!
return {
getSocketStatus,
checkSockets,
onOnline,
onOffline,
registerRequestHandler,
unregisterRequestHandler,
authenticate,
confirmCode,
createGroup,
@ -1220,11 +1100,10 @@ export function initialize({
getIceServers,
getKeysForIdentifier,
getKeysForIdentifierUnauth,
getMessageSocket,
getMyKeys,
getProfile,
getProfileUnauth,
getProvisioningSocket,
getProvisioningResource,
getSenderCertificate,
getSticker,
getStickerPackManifest,
@ -1261,7 +1140,10 @@ export function initialize({
param.urlParameters = '';
}
const useWebSocket = WEBSOCKET_CALLS.has(param.call);
return _outerAjax(null, {
socketManager: useWebSocket ? socketManager : undefined,
basicAuth: param.basicAuth,
certificateAuthority,
contentType: param.contentType || 'application/json; charset=utf-8',
@ -1282,7 +1164,7 @@ export function initialize({
unauthenticated: param.unauthenticated,
accessKey: param.accessKey,
}).catch((e: Error) => {
const translatedError = _translateError(e);
const translatedError = translateError(e);
if (translatedError) {
throw translatedError;
}
@ -1311,6 +1193,33 @@ export function initialize({
}: WebAPICredentials) {
username = newUsername;
password = newPassword;
await socketManager.authenticate({ username, password });
}
function getSocketStatus(): SocketStatus {
return socketManager.getStatus();
}
function checkSockets(): void {
// Intentionally not awaiting
socketManager.check();
}
async function onOnline(): Promise<void> {
await socketManager.onOnline();
}
async function onOffline(): Promise<void> {
await socketManager.onOffline();
}
function registerRequestHandler(handler: IRequestHandler): void {
socketManager.registerRequestHandler(handler);
}
function unregisterRequestHandler(handler: IRequestHandler): void {
socketManager.unregisterRequestHandler(handler);
}
async function getConfig() {
@ -1589,8 +1498,7 @@ export function initialize({
const urlPrefix = deviceName ? '/' : '/code/';
// We update our saved username and password, since we're creating a new account
username = number;
password = newPassword;
await authenticate({ username: number, password: newPassword });
const response = await _ajax({
call,
@ -1601,7 +1509,10 @@ export function initialize({
});
// From here on out, our username will be our UUID or E164 combined with device
username = `${response.uuid || number}.${response.deviceId || 1}`;
await authenticate({
username: `${response.uuid || number}.${response.deviceId || 1}`,
password,
});
return response;
}
@ -2157,7 +2068,7 @@ export function initialize({
timeout: 0,
type,
version,
});
}) as Promise<ArrayBufferWithDetailsType>;
}
// Groups
@ -2312,7 +2223,7 @@ export function initialize({
timeout: 0,
type: 'GET',
version,
});
}) as Promise<ArrayBuffer>;
}
async function createGroup(
@ -2461,32 +2372,10 @@ export function initialize({
};
}
function getMessageSocket(): Promise<WebSocket> {
window.log.info('opening message socket', url);
const fixedScheme = url
.replace('https://', 'wss://')
.replace('http://', 'ws://');
const login = encodeURIComponent(username);
const pass = encodeURIComponent(password);
const clientVersion = encodeURIComponent(version);
return _connectSocket(
`${fixedScheme}/v1/websocket/?login=${login}&password=${pass}&agent=OWD&version=${clientVersion}`,
{ certificateAuthority, proxyUrl, version }
);
}
function getProvisioningSocket(): Promise<WebSocket> {
window.log.info('opening provisioning socket', url);
const fixedScheme = url
.replace('https://', 'wss://')
.replace('http://', 'ws://');
const clientVersion = encodeURIComponent(version);
return _connectSocket(
`${fixedScheme}/v1/websocket/provisioning/?agent=OWD&version=${clientVersion}`,
{ certificateAuthority, proxyUrl, version }
);
function getProvisioningResource(
handler: IRequestHandler
): Promise<WebSocketResource> {
return socketManager.getProvisioningResource(handler);
}
async function getDirectoryAuth(): Promise<{
@ -2688,7 +2577,7 @@ export function initialize({
const pubKeyBase64 = arrayBufferToBase64(slicedPubKey);
// Do request
const data = JSON.stringify({ clientPublic: pubKeyBase64 });
const result: JSONWithDetailsType = await _outerAjax(null, {
const result: JSONWithDetailsType = (await _outerAjax(null, {
certificateAuthority,
type: 'PUT',
contentType: 'application/json; charset=utf-8',
@ -2700,7 +2589,7 @@ export function initialize({
data,
timeout: 30000,
version,
});
})) as JSONWithDetailsType;
const { data: responseBody, response } = result;
@ -2806,7 +2695,7 @@ export function initialize({
iv: string;
data: string;
mac: string;
} = await _outerAjax(null, {
} = (await _outerAjax(null, {
certificateAuthority,
type: 'PUT',
headers: cookie
@ -2823,7 +2712,7 @@ export function initialize({
timeout: 30000,
data: JSON.stringify(data),
version,
});
})) as any;
// Decode discovery request response
const decodedDiscoveryResponse: {

View file

@ -1,7 +1,7 @@
// Copyright 2020 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
/* eslint-disable max-classes-per-file */
/* eslint-disable max-classes-per-file, no-restricted-syntax */
/*
* WebSocket-Resources
*
@ -12,12 +12,11 @@
* request.respond(200, 'OK');
* });
*
* client.sendRequest({
* const { response, status } = await client.sendRequest({
* verb: 'PUT',
* path: '/v1/messages',
* body: '{ some: "json" }',
* success: function(message, status, request) {...},
* error: function(message, status, request) {...}
* headers: ['content-type:application/json'],
* body: Buffer.from('{ some: "json" }'),
* });
*
* 1. https://github.com/signalapp/WebSocket-Resources
@ -32,13 +31,10 @@ import { dropNull } from '../util/dropNull';
import { isOlderThan } from '../util/timestamp';
import { strictAssert } from '../util/assert';
import { normalizeNumber } from '../util/normalizeNumber';
import * as Errors from '../types/errors';
import { SignalService as Proto } from '../protobuf';
type Callback = (
message: string,
status: number,
request: OutgoingWebSocketRequest
) => void;
const THIRTY_SECONDS = 30 * 1000;
export class IncomingWebSocketRequest {
private readonly id: Long | number;
@ -53,7 +49,7 @@ export class IncomingWebSocketRequest {
constructor(
request: Proto.IWebSocketRequestMessage,
private readonly socket: WebSocket
private readonly sendBytes: (bytes: Buffer) => void
) {
strictAssert(request.id, 'request without id');
strictAssert(request.verb, 'request without verb');
@ -64,7 +60,6 @@ export class IncomingWebSocketRequest {
this.path = request.path;
this.body = dropNull(request.body);
this.headers = request.headers || [];
this.socket = socket;
}
public respond(status: number, message: string): void {
@ -73,47 +68,24 @@ export class IncomingWebSocketRequest {
response: { id: this.id, message, status },
}).finish();
this.socket.sendBytes(Buffer.from(bytes));
this.sendBytes(Buffer.from(bytes));
}
}
export type OutgoingWebSocketRequestOptions = Readonly<{
export type SendRequestOptions = Readonly<{
verb: string;
path: string;
body?: Uint8Array;
timeout?: number;
headers?: ReadonlyArray<string>;
error?: Callback;
success?: Callback;
}>;
export class OutgoingWebSocketRequest {
public readonly error: Callback | undefined;
public readonly success: Callback | undefined;
public response: Proto.IWebSocketResponseMessage | undefined;
constructor(
id: number,
options: OutgoingWebSocketRequestOptions,
socket: WebSocket
) {
this.error = options.error;
this.success = options.success;
const bytes = Proto.WebSocketMessage.encode({
type: Proto.WebSocketMessage.Type.REQUEST,
request: {
verb: options.verb,
path: options.path,
body: options.body,
headers: options.headers ? options.headers.slice() : undefined,
id,
},
}).finish();
socket.sendBytes(Buffer.from(bytes));
}
}
export type SendRequestResult = Readonly<{
status: number;
message: string;
response?: Uint8Array;
headers: ReadonlyArray<string>;
}>;
export type WebSocketResourceOptions = {
handleRequest?: (request: IncomingWebSocketRequest) => void;
@ -129,12 +101,21 @@ export class CloseEvent extends Event {
export default class WebSocketResource extends EventTarget {
private outgoingId = 1;
private closed?: boolean;
private closed = false;
private readonly outgoingMap = new Map<number, OutgoingWebSocketRequest>();
private readonly outgoingMap = new Map<
number,
(result: SendRequestResult) => void
>();
private readonly boundOnMessage: (message: IMessage) => void;
private activeRequests = new Set<IncomingWebSocketRequest | number>();
private shuttingDown = false;
private shutdownTimer?: NodeJS.Timeout;
// Public for tests
public readonly keepalive?: KeepAlive;
@ -158,11 +139,22 @@ export default class WebSocketResource extends EventTarget {
keepalive.reset();
socket.on('message', () => keepalive.reset());
socket.on('close', () => keepalive.stop());
socket.on('error', (error: Error) => {
window.log.warn(
'WebSocketResource: WebSocket error',
Errors.toLogFormat(error)
);
});
}
socket.on('close', () => {
socket.on('close', (code, reason) => {
this.closed = true;
window.log.warn('WebSocketResource: Socket closed');
this.dispatchEvent(new CloseEvent(code, reason || 'normal'));
});
this.addEventListener('close', () => this.onClose());
}
public addEventListener(
@ -174,19 +166,50 @@ export default class WebSocketResource extends EventTarget {
return super.addEventListener(name, handler);
}
public sendRequest(
options: OutgoingWebSocketRequestOptions
): OutgoingWebSocketRequest {
public async sendRequest(
options: SendRequestOptions
): Promise<SendRequestResult> {
const id = this.outgoingId;
strictAssert(!this.outgoingMap.has(id), 'Duplicate outgoing request');
// eslint-disable-next-line no-bitwise
this.outgoingId = Math.max(1, (this.outgoingId + 1) & 0x7fffffff);
const outgoing = new OutgoingWebSocketRequest(id, options, this.socket);
this.outgoingMap.set(id, outgoing);
const bytes = Proto.WebSocketMessage.encode({
type: Proto.WebSocketMessage.Type.REQUEST,
request: {
verb: options.verb,
path: options.path,
body: options.body,
headers: options.headers ? options.headers.slice() : undefined,
id,
},
}).finish();
return outgoing;
strictAssert(!this.shuttingDown, 'Cannot send request, shutting down');
this.addActive(id);
const promise = new Promise<SendRequestResult>((resolve, reject) => {
let timer = options.timeout
? setTimeout(() => {
this.removeActive(id);
reject(new Error('Request timed out'));
}, options.timeout)
: undefined;
this.outgoingMap.set(id, result => {
if (timer !== undefined) {
clearTimeout(timer);
timer = undefined;
}
this.removeActive(id);
resolve(result);
});
});
this.socket.sendBytes(Buffer.from(bytes));
return promise;
}
public forceKeepAlive(): void {
@ -218,11 +241,37 @@ export default class WebSocketResource extends EventTarget {
return;
}
window.log.warn('Dispatching our own socket close event');
window.log.warn(
'WebSocketResource: Dispatching our own socket close event'
);
this.dispatchEvent(new CloseEvent(code, reason || 'normal'));
}, 5000);
}
public shutdown(): void {
if (this.closed) {
return;
}
if (this.activeRequests.size === 0) {
window.log.info('WebSocketResource: no active requests, closing');
this.close(3000, 'Shutdown');
return;
}
this.shuttingDown = true;
window.log.info('WebSocketResource: shutting down');
this.shutdownTimer = setTimeout(() => {
if (this.closed) {
return;
}
window.log.warn('WebSocketResource: Failed to shutdown gracefully');
this.close(3000, 'Shutdown');
}, THIRTY_SECONDS);
}
private onMessage({ type, binaryData }: IMessage): void {
if (type !== 'binary' || !binaryData) {
throw new Error(`Unsupported websocket message type: ${type}`);
@ -236,7 +285,23 @@ export default class WebSocketResource extends EventTarget {
const handleRequest =
this.options.handleRequest ||
(request => request.respond(404, 'Not found'));
handleRequest(new IncomingWebSocketRequest(message.request, this.socket));
const incomingRequest = new IncomingWebSocketRequest(
message.request,
(bytes: Buffer): void => {
this.removeActive(incomingRequest);
this.socket.sendBytes(bytes);
}
);
if (this.shuttingDown) {
incomingRequest.respond(500, 'Shutting down');
return;
}
this.addActive(incomingRequest);
handleRequest(incomingRequest);
} else if (
message.type === Proto.WebSocketMessage.Type.RESPONSE &&
message.response
@ -245,27 +310,62 @@ export default class WebSocketResource extends EventTarget {
strictAssert(response.id, 'response without id');
const responseId = normalizeNumber(response.id);
const request = this.outgoingMap.get(responseId);
const resolve = this.outgoingMap.get(responseId);
this.outgoingMap.delete(responseId);
if (!request) {
if (!resolve) {
throw new Error(`Received response for unknown request ${responseId}`);
}
request.response = dropNull(response);
let callback = request.error;
const status = response.status ?? -1;
if (status >= 200 && status < 300) {
callback = request.success;
}
if (typeof callback === 'function') {
callback(response.message ?? '', status, request);
}
resolve({
status: response.status ?? -1,
message: response.message ?? '',
response: dropNull(response.body),
headers: response.headers ?? [],
});
}
}
private onClose(): void {
const outgoing = new Map(this.outgoingMap);
this.outgoingMap.clear();
for (const resolve of outgoing.values()) {
resolve({
status: 500,
message: 'Connection closed',
response: undefined,
headers: [],
});
}
}
private addActive(request: IncomingWebSocketRequest | number): void {
this.activeRequests.add(request);
}
private removeActive(request: IncomingWebSocketRequest | number): void {
if (!this.activeRequests.has(request)) {
window.log.warn('WebSocketResource: removing unknown request');
return;
}
this.activeRequests.delete(request);
if (this.activeRequests.size !== 0) {
return;
}
if (!this.shuttingDown) {
return;
}
if (this.shutdownTimer) {
clearTimeout(this.shutdownTimer);
this.shutdownTimer = undefined;
}
window.log.info('WebSocketResource: shutdown complete');
this.close(3000, 'Shutdown');
}
}
export type KeepAliveOptionsType = {
@ -307,7 +407,7 @@ class KeepAlive {
this.clearTimers();
}
public send(): void {
public async send(): Promise<void> {
this.clearTimers();
if (isOlderThan(this.lastAliveAt, MAX_KEEPALIVE_INTERVAL_MS)) {
@ -332,11 +432,14 @@ class KeepAlive {
}
window.log.info('WebSocketResources: Sending a keepalive message');
this.wsr.sendRequest({
const { status } = await this.wsr.sendRequest({
verb: 'GET',
path: this.path,
success: this.reset.bind(this),
});
if (status >= 200 || status < 300) {
this.reset();
}
}
public reset(): void {

View file

@ -0,0 +1,61 @@
// Copyright 2020-2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { isNumber, omit } from 'lodash';
import { strictAssert } from '../util/assert';
import { dropNull } from '../util/dropNull';
import { DownloadedAttachmentType } from '../types/Attachment';
import * as MIME from '../types/MIME';
import * as Bytes from '../Bytes';
import { typedArrayToArrayBuffer } from '../Crypto';
import Crypto from './Crypto';
import { ProcessedAttachment } from './Types.d';
import type { WebAPIType } from './WebAPI';
export async function downloadAttachment(
server: WebAPIType,
attachment: ProcessedAttachment
): Promise<DownloadedAttachmentType> {
const cdnId = attachment.cdnId || attachment.cdnKey;
const { cdnNumber } = attachment;
if (!cdnId) {
throw new Error('downloadAttachment: Attachment was missing cdnId!');
}
strictAssert(cdnId, 'attachment without cdnId');
const encrypted = await server.getAttachment(cdnId, dropNull(cdnNumber));
const { key, digest, size, contentType } = attachment;
if (!digest) {
throw new Error('Failure: Ask sender to update Signal and resend.');
}
strictAssert(key, 'attachment has no key');
strictAssert(digest, 'attachment has no digest');
const paddedData = await Crypto.decryptAttachment(
encrypted,
typedArrayToArrayBuffer(Bytes.fromBase64(key)),
typedArrayToArrayBuffer(Bytes.fromBase64(digest))
);
if (!isNumber(size)) {
throw new Error(
`downloadAttachment: Size was not provided, actual size was ${paddedData.byteLength}`
);
}
const data = window.Signal.Crypto.getFirstBytes(paddedData, size);
return {
...omit(attachment, 'digest', 'key'),
contentType: contentType
? MIME.fromString(contentType)
: MIME.APPLICATION_OCTET_STREAM,
data,
};
}

View file

@ -7,7 +7,6 @@ import MessageReceiver from './MessageReceiver';
import utils from './Helpers';
import Crypto from './Crypto';
import { ContactBuffer, GroupBuffer } from './ContactsParser';
import createTaskWithTimeout from './TaskWithTimeout';
import SyncRequest from './SyncRequest';
import MessageSender from './SendMessage';
import StringView from './StringView';
@ -16,7 +15,6 @@ import * as WebAPI from './WebAPI';
import WebSocketResource from './WebsocketResources';
export const textsecure = {
createTaskWithTimeout,
crypto: Crypto,
utils,
storage: new Storage(),

View file

@ -81,12 +81,16 @@ export class User extends EventEmitter {
? this.storage.put('device_name', deviceName)
: Promise.resolve(),
]);
}
window.log.info('storage.user: credentials changed');
public emitCredentialsChanged(reason: string): void {
window.log.info(`storage.user: credentials changed, ${reason}`);
this.emit('credentialsChange');
}
public async removeCredentials(): Promise<void> {
window.log.info('storage.user: removeCredentials');
await Promise.all([
this.storage.remove('number_id'),
this.storage.remove('uuid_id'),

View file

@ -0,0 +1,38 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
/* eslint-disable no-restricted-syntax */
import { explodePromise } from './explodePromise';
export interface IController {
abort(): void;
}
export class AbortableProcess<Result> implements IController {
private abortReject: (error: Error) => void;
public readonly resultPromise: Promise<Result>;
constructor(
private readonly name: string,
private readonly controller: IController,
resultPromise: Promise<Result>
) {
const {
promise: abortPromise,
reject: abortReject,
} = explodePromise<Result>();
this.abortReject = abortReject;
this.resultPromise = Promise.race([abortPromise, resultPromise]);
}
public abort(): void {
this.controller.abort();
this.abortReject(new Error(`Process "${this.name}" was aborted`));
}
public getResult(): Promise<Result> {
return this.resultPromise;
}
}

View file

@ -2,6 +2,7 @@
// SPDX-License-Identifier: AGPL-3.0-only
import { AttachmentType, DownloadedAttachmentType } from '../types/Attachment';
import { downloadAttachment as doDownloadAttachment } from '../textsecure/downloadAttachment';
export async function downloadAttachment(
attachmentData: AttachmentType
@ -20,7 +21,8 @@ export async function downloadAttachment(
let downloaded;
try {
downloaded = await window.textsecure.messageReceiver.downloadAttachment(
downloaded = await doDownloadAttachment(
window.textsecure.server,
migratedAttachment
);
} catch (error) {

25
ts/util/explodePromise.ts Normal file
View file

@ -0,0 +1,25 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
export function explodePromise<T>(): {
promise: Promise<T>;
resolve: (value: T) => void;
reject: (error: Error) => void;
} {
let resolve: (value: T) => void;
let reject: (error: Error) => void;
const promise = new Promise<T>((innerResolve, innerReject) => {
resolve = innerResolve;
reject = innerReject;
});
return {
promise,
// Typescript thinks that resolve and reject can be undefined here.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
resolve: resolve!,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
reject: reject!,
};
}

View file

@ -19,10 +19,12 @@ declare global {
window.waitBatchers = [];
window.flushAllWaitBatchers = async () => {
window.log.info('waitBatcher#flushAllWaitBatchers');
await Promise.all(window.waitBatchers.map(item => item.flushAndWait()));
};
window.waitForAllWaitBatchers = async () => {
window.log.info('waitBatcher#waitForAllWaitBatchers');
await Promise.all(window.waitBatchers.map(item => item.onIdle()));
};

2
ts/window.d.ts vendored
View file

@ -483,7 +483,7 @@ declare global {
getServerTrustRoot: () => WhatIsThis;
readyForUpdates: () => void;
logAppLoadedEvent: (options: { processedCount?: number }) => void;
logMessageReceiverConnect: () => void;
logAuthenticatedConnect: () => void;
// Runtime Flags
isShowingModal?: boolean;