Use new CDS implementation in staging
This commit is contained in:
parent
5774fdef9f
commit
0c8c332805
11 changed files with 284 additions and 130 deletions
|
@ -2,6 +2,8 @@
|
|||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import { EventEmitter } from 'events';
|
||||
import { noop } from 'lodash';
|
||||
import { Readable } from 'stream';
|
||||
import type { HsmEnclaveClient } from '@signalapp/signal-client';
|
||||
import type { connection as WebSocket } from 'websocket';
|
||||
import Long from 'long';
|
||||
|
@ -10,6 +12,7 @@ import { strictAssert } from '../util/assert';
|
|||
import { dropNull } from '../util/dropNull';
|
||||
import { explodePromise } from '../util/explodePromise';
|
||||
import * as durations from '../util/durations';
|
||||
import * as log from '../logging/log';
|
||||
import type { UUIDStringType } from '../types/UUID';
|
||||
import { UUID_BYTE_SIZE } from '../types/UUID';
|
||||
import * as Bytes from '../Bytes';
|
||||
|
@ -23,13 +26,24 @@ enum State {
|
|||
Closed,
|
||||
}
|
||||
|
||||
export type CDSRequestOptionsType = Readonly<{
|
||||
e164s: ReadonlyArray<string>;
|
||||
acis: ReadonlyArray<UUIDStringType>;
|
||||
accessKeys: ReadonlyArray<string>;
|
||||
auth: CDSAuthType;
|
||||
timeout?: number;
|
||||
}>;
|
||||
export type CDSRequestOptionsType = Readonly<
|
||||
{
|
||||
auth: CDSAuthType;
|
||||
e164s: ReadonlyArray<string>;
|
||||
timeout?: number;
|
||||
} & (
|
||||
| {
|
||||
version: 1;
|
||||
acis?: undefined;
|
||||
accessKeys?: undefined;
|
||||
}
|
||||
| {
|
||||
version: 2;
|
||||
acis: ReadonlyArray<UUIDStringType>;
|
||||
accessKeys: ReadonlyArray<string>;
|
||||
}
|
||||
)
|
||||
>;
|
||||
|
||||
export type CDSAuthType = Readonly<{
|
||||
username: string;
|
||||
|
@ -50,19 +64,23 @@ export type CDSSocketResponseType = Readonly<{
|
|||
retryAfterSecs?: number;
|
||||
}>;
|
||||
|
||||
const MAX_E164_COUNT = 5000;
|
||||
const HANDSHAKE_TIMEOUT = 10 * durations.SECOND;
|
||||
const REQUEST_TIMEOUT = 10 * durations.SECOND;
|
||||
const VERSION = new Uint8Array([0x02]);
|
||||
const USERNAME_LENGTH = 32;
|
||||
const PASSWORD_LENGTH = 31;
|
||||
const E164_BYTE_SIZE = 8;
|
||||
const TRIPLE_BYTE_SIZE = UUID_BYTE_SIZE * 2 + E164_BYTE_SIZE;
|
||||
|
||||
export class CDSSocket extends EventEmitter {
|
||||
private state = State.Handshake;
|
||||
|
||||
private readonly finishedHandshake: Promise<void>;
|
||||
|
||||
private readonly requestQueue = new Array<(buffer: Buffer) => void>();
|
||||
private readonly responseStream = new Readable({
|
||||
read: noop,
|
||||
|
||||
// Don't coalesce separate websocket messages
|
||||
objectMode: true,
|
||||
});
|
||||
|
||||
constructor(
|
||||
private readonly socket: WebSocket,
|
||||
|
@ -93,15 +111,25 @@ export class CDSSocket extends EventEmitter {
|
|||
return;
|
||||
}
|
||||
|
||||
const requestHandler = this.requestQueue.shift();
|
||||
strictAssert(
|
||||
requestHandler !== undefined,
|
||||
'No handler for incoming CDS data'
|
||||
);
|
||||
|
||||
requestHandler(this.enclaveClient.establishedRecv(binaryData));
|
||||
try {
|
||||
this.responseStream.push(
|
||||
this.enclaveClient.establishedRecv(binaryData)
|
||||
);
|
||||
} catch (error) {
|
||||
this.responseStream.destroy(error);
|
||||
}
|
||||
});
|
||||
socket.on('close', (code, reason) => {
|
||||
if (this.state === State.Established) {
|
||||
if (code === 1000) {
|
||||
this.responseStream.push(null);
|
||||
} else {
|
||||
this.responseStream.destroy(
|
||||
new Error(`Socket closed with code ${code} and reason ${reason}`)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
this.state = State.Closed;
|
||||
this.emit('close', code, reason);
|
||||
});
|
||||
|
@ -115,37 +143,32 @@ export class CDSSocket extends EventEmitter {
|
|||
}
|
||||
|
||||
public async request({
|
||||
e164s,
|
||||
acis,
|
||||
accessKeys,
|
||||
auth,
|
||||
version,
|
||||
timeout = REQUEST_TIMEOUT,
|
||||
e164s,
|
||||
acis = [],
|
||||
accessKeys = [],
|
||||
}: CDSRequestOptionsType): Promise<CDSSocketResponseType> {
|
||||
strictAssert(
|
||||
e164s.length < MAX_E164_COUNT,
|
||||
'CDSSocket does not support paging. Use this for one-off requests'
|
||||
);
|
||||
|
||||
log.info('CDSSocket.request(): awaiting handshake');
|
||||
await this.finishedHandshake;
|
||||
strictAssert(
|
||||
this.state === State.Established,
|
||||
'Connection not established'
|
||||
);
|
||||
|
||||
const username = Bytes.fromString(auth.username);
|
||||
const password = Bytes.fromString(auth.password);
|
||||
strictAssert(
|
||||
username.length === USERNAME_LENGTH,
|
||||
'Invalid username length'
|
||||
);
|
||||
strictAssert(
|
||||
password.length === PASSWORD_LENGTH,
|
||||
'Invalid password length'
|
||||
);
|
||||
|
||||
strictAssert(
|
||||
acis.length === accessKeys.length,
|
||||
`Number of ACIs ${acis.length} is different ` +
|
||||
`from number of access keys ${accessKeys.length}`
|
||||
);
|
||||
const aciUakPair = new Array<Uint8Array>();
|
||||
const aciUakPairs = new Array<Uint8Array>();
|
||||
for (let i = 0; i < acis.length; i += 1) {
|
||||
aciUakPair.push(
|
||||
aciUakPairs.push(
|
||||
Bytes.concatenate([
|
||||
uuidToBytes(acis[i]),
|
||||
Bytes.fromBase64(accessKeys[i]),
|
||||
|
@ -154,64 +177,55 @@ export class CDSSocket extends EventEmitter {
|
|||
}
|
||||
|
||||
const request = Proto.CDSClientRequest.encode({
|
||||
username,
|
||||
password,
|
||||
e164: e164s.map(e164 => {
|
||||
// Long.fromString handles numbers with or without a leading '+'
|
||||
return new Uint8Array(Long.fromString(e164).toBytesBE());
|
||||
}),
|
||||
aciUakPair,
|
||||
newE164s: Buffer.concat(
|
||||
e164s.map(e164 => {
|
||||
// Long.fromString handles numbers with or without a leading '+'
|
||||
return new Uint8Array(Long.fromString(e164).toBytesBE());
|
||||
})
|
||||
),
|
||||
aciUakPairs: Buffer.concat(aciUakPairs),
|
||||
}).finish();
|
||||
|
||||
const { promise, resolve, reject } = explodePromise<Buffer>();
|
||||
|
||||
const timer = Timers.setTimeout(() => {
|
||||
reject(new Error('CDS request timed out'));
|
||||
this.responseStream.destroy(new Error('CDS request timed out'));
|
||||
}, timeout);
|
||||
|
||||
log.info(`CDSSocket.request(): sending version=${version} request`);
|
||||
this.socket.sendBytes(
|
||||
this.enclaveClient.establishedSend(Buffer.concat([VERSION, request]))
|
||||
this.enclaveClient.establishedSend(
|
||||
Buffer.concat([Buffer.from([version]), request])
|
||||
)
|
||||
);
|
||||
|
||||
this.requestQueue.push(resolve);
|
||||
strictAssert(
|
||||
this.requestQueue.length === 1,
|
||||
'Concurrent use of CDS shold not happen'
|
||||
);
|
||||
const responseBytes = await promise;
|
||||
Timers.clearTimeout(timer);
|
||||
const resultMap: Map<string, CDSSocketDictionaryEntryType> = new Map();
|
||||
let retryAfterSecs: number | undefined;
|
||||
|
||||
const response = Proto.CDSClientResponse.decode(responseBytes);
|
||||
for await (const message of this.responseStream) {
|
||||
log.info('CDSSocket.request(): processing response message');
|
||||
|
||||
const dictionary: Record<string, CDSSocketDictionaryEntryType> =
|
||||
Object.create(null);
|
||||
const response = Proto.CDSClientResponse.decode(message);
|
||||
const newRetryAfterSecs = dropNull(response.retryAfterSecs);
|
||||
|
||||
for (const tripleBytes of response.e164PniAciTriple ?? []) {
|
||||
strictAssert(
|
||||
tripleBytes.length === UUID_BYTE_SIZE * 2 + E164_BYTE_SIZE,
|
||||
'Invalid size of CDS response triple'
|
||||
);
|
||||
decodeSingleResponse(resultMap, response);
|
||||
|
||||
let offset = 0;
|
||||
const e164Bytes = tripleBytes.slice(offset, offset + E164_BYTE_SIZE);
|
||||
offset += E164_BYTE_SIZE;
|
||||
|
||||
const pniBytes = tripleBytes.slice(offset, offset + UUID_BYTE_SIZE);
|
||||
offset += UUID_BYTE_SIZE;
|
||||
|
||||
const aciBytes = tripleBytes.slice(offset, offset + UUID_BYTE_SIZE);
|
||||
offset += UUID_BYTE_SIZE;
|
||||
|
||||
const e164 = `+${Long.fromBytesBE(Array.from(e164Bytes)).toString()}`;
|
||||
const pni = bytesToUuid(pniBytes);
|
||||
const aci = bytesToUuid(aciBytes);
|
||||
|
||||
dictionary[e164] = { pni, aci };
|
||||
if (newRetryAfterSecs) {
|
||||
retryAfterSecs = Math.max(newRetryAfterSecs, retryAfterSecs ?? 0);
|
||||
}
|
||||
}
|
||||
|
||||
const result: Record<string, CDSSocketDictionaryEntryType> =
|
||||
Object.create(null);
|
||||
|
||||
for (const [key, value] of resultMap) {
|
||||
result[key] = value;
|
||||
}
|
||||
|
||||
log.info('CDSSocket.request(): done');
|
||||
Timers.clearTimeout(timer);
|
||||
|
||||
return {
|
||||
dictionary,
|
||||
retryAfterSecs: dropNull(response.retryAfterSecs),
|
||||
dictionary: result,
|
||||
retryAfterSecs,
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -239,3 +253,44 @@ export class CDSSocket extends EventEmitter {
|
|||
return super.emit(type, ...args);
|
||||
}
|
||||
}
|
||||
|
||||
function decodeSingleResponse(
|
||||
resultMap: Map<string, CDSSocketDictionaryEntryType>,
|
||||
response: Proto.CDSClientResponse
|
||||
): void {
|
||||
for (
|
||||
let i = 0;
|
||||
i < response.e164PniAciTriples.length;
|
||||
i += TRIPLE_BYTE_SIZE
|
||||
) {
|
||||
const tripleBytes = response.e164PniAciTriples.slice(
|
||||
i,
|
||||
i + TRIPLE_BYTE_SIZE
|
||||
);
|
||||
strictAssert(
|
||||
tripleBytes.length === TRIPLE_BYTE_SIZE,
|
||||
'Invalid size of CDS response triple'
|
||||
);
|
||||
|
||||
let offset = 0;
|
||||
const e164Bytes = tripleBytes.slice(offset, offset + E164_BYTE_SIZE);
|
||||
offset += E164_BYTE_SIZE;
|
||||
|
||||
const pniBytes = tripleBytes.slice(offset, offset + UUID_BYTE_SIZE);
|
||||
offset += UUID_BYTE_SIZE;
|
||||
|
||||
const aciBytes = tripleBytes.slice(offset, offset + UUID_BYTE_SIZE);
|
||||
offset += UUID_BYTE_SIZE;
|
||||
|
||||
const e164Long = Long.fromBytesBE(Array.from(e164Bytes));
|
||||
if (e164Long.isZero()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const e164 = `+${e164Long.toString()}`;
|
||||
const pni = bytesToUuid(pniBytes);
|
||||
const aci = bytesToUuid(aciBytes);
|
||||
|
||||
resultMap.set(e164, { pni, aci });
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue