Additional logging, more reliable contact/group sync fetch

This commit is contained in:
Scott Nonnenberg 2023-06-14 13:51:49 -07:00 committed by GitHub
parent 8e1a30d720
commit efc237d106
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 86 additions and 16 deletions

View file

@ -241,6 +241,8 @@ export class SignalProtocolStore extends EventEmitter {
sessionQueues = new Map<SessionIdType, PQueue>();
sessionQueueJobCounter = 0;
private readonly identityQueues = new Map<UUIDStringType, PQueue>();
private currentZone?: Zone;
@ -703,13 +705,31 @@ export class SignalProtocolStore extends EventEmitter {
async enqueueSessionJob<T>(
qualifiedAddress: QualifiedAddress,
name: string,
task: () => Promise<T>,
zone: Zone = GLOBAL_ZONE
): Promise<T> {
this.sessionQueueJobCounter += 1;
const id = this.sessionQueueJobCounter;
const waitStart = Date.now();
return this.withZone(zone, 'enqueueSessionJob', async () => {
const queue = this._getSessionQueue(qualifiedAddress);
return queue.add<T>(task);
const waitTime = Date.now() - waitStart;
log.info(
`enqueueSessionJob(${id}): queuing task ${name}, waited ${waitTime}ms`
);
const queueStart = Date.now();
return queue.add<T>(() => {
const queueTime = Date.now() - queueStart;
log.info(
`enqueueSessionJob(${id}): running task ${name}, waited ${queueTime}ms`
);
return task();
});
});
}
@ -1322,6 +1342,7 @@ export class SignalProtocolStore extends EventEmitter {
await this.enqueueSessionJob(
addr,
`_archiveSession(${addr.toString()})`,
async () => {
const item = entry.hydrated
? entry.item

View file

@ -3279,7 +3279,7 @@ async function getTapToViewMessagesNeedingErase(): Promise<Array<MessageType>> {
return rows.map(row => jsonToObject(row.json));
}
const MAX_UNPROCESSED_ATTEMPTS = 3;
const MAX_UNPROCESSED_ATTEMPTS = 10;
function saveUnprocessedSync(data: UnprocessedType): string {
const db = getInstance();

View file

@ -1542,7 +1542,7 @@ describe('SignalProtocolStore', () => {
id: '1-one',
version: 2,
attempts: 3,
attempts: 10,
envelope: 'first',
receivedAtCounter: 0,
timestamp: NOW + 1,

View file

@ -21,7 +21,9 @@ import WebSocketResource from '../textsecure/WebsocketResources';
describe('WebSocket-Resource', () => {
class FakeSocket extends EventEmitter {
public sendBytes(_: Uint8Array) {}
public socket = {
localPort: 5678,
};
public close() {}
}

View file

@ -47,7 +47,7 @@ import { normalizeUuid } from '../util/normalizeUuid';
import { parseIntOrThrow } from '../util/parseIntOrThrow';
import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary';
import { Zone } from '../util/Zone';
import { DurationInSeconds } from '../util/durations';
import { DurationInSeconds, SECOND } from '../util/durations';
import { bytesToUuid } from '../Crypto';
import type { DownloadedAttachmentType } from '../types/Attachment';
import { Address } from '../types/Address';
@ -1752,6 +1752,7 @@ export default class MessageReceiver
);
const unsealedPlaintext = await this.storage.protocol.enqueueSessionJob(
address,
`sealedSenderDecryptMessage(${address.toString()})`,
() =>
sealedSenderDecryptMessage(
Buffer.from(ciphertext),
@ -1839,6 +1840,7 @@ export default class MessageReceiver
const plaintext = await this.storage.protocol.enqueueSessionJob(
address,
`signalDecrypt(${address.toString()})`,
async () =>
this.unpad(
await signalDecrypt(
@ -1870,6 +1872,7 @@ export default class MessageReceiver
const plaintext = await this.storage.protocol.enqueueSessionJob(
address,
`signalDecryptPreKey(${address.toString()})`,
async () =>
this.unpad(
await signalDecryptPreKey(
@ -3397,7 +3400,10 @@ export default class MessageReceiver
this.removeFromCache(envelope);
const attachmentPointer = await this.handleAttachment(blob);
const attachmentPointer = await this.handleAttachment(blob, {
disableRetries: true,
timeout: 90 * SECOND,
});
const contactBuffer = new ContactBuffer(attachmentPointer.data);
const contactSync = new ContactSyncEvent(
@ -3430,7 +3436,10 @@ export default class MessageReceiver
// Note: we do not return here because we don't want to block the next message on
// this attachment download and a lot of processing of that attachment.
const attachmentPointer = await this.handleAttachment(blob);
const attachmentPointer = await this.handleAttachment(blob, {
disableRetries: true,
timeout: 90 * SECOND,
});
const groupBuffer = new GroupBuffer(attachmentPointer.data);
let groupDetails = groupBuffer.next();
const promises = [];
@ -3550,10 +3559,11 @@ export default class MessageReceiver
}
private async handleAttachment(
attachment: Proto.IAttachmentPointer
attachment: Proto.IAttachmentPointer,
options?: { timeout?: number; disableRetries?: boolean }
): Promise<DownloadedAttachmentType> {
const cleaned = processAttachment(attachment);
return downloadAttachment(this.server, cleaned);
return downloadAttachment(this.server, cleaned, options);
}
private async handleEndSession(

View file

@ -440,6 +440,7 @@ export default class OutgoingMessage {
return window.textsecure.storage.protocol.enqueueSessionJob<MessageType>(
address,
`doSendMessage(${address.toString()})`,
async () => {
const protocolAddress = ProtocolAddress.new(
theirUuid.toString(),

View file

@ -225,7 +225,9 @@ export class SocketManager extends EventListener {
return;
}
log.info('SocketManager: connected authenticated socket');
log.info(
`SocketManager: connected authenticated socket (localPort: ${authenticated.localPort})`
);
window.logAuthenticatedConnect?.();
this.backOff.reset();
@ -503,7 +505,9 @@ export class SocketManager extends EventListener {
throw error;
}
log.info('SocketManager: connected unauthenticated socket');
log.info(
`SocketManager: connected unauthenticated socket (localPort: ${unauthenticated.localPort})`
);
unauthenticated.addEventListener('close', ({ code, reason }): void => {
if (this.unauthenticated !== process) {

View file

@ -130,6 +130,7 @@ type PromiseAjaxOptionsType = {
certificateAuthority?: string;
contentType?: string;
data?: Uint8Array | string;
disableRetries?: boolean;
disableSessionResumption?: boolean;
headers?: HeaderListType;
host?: string;
@ -451,6 +452,10 @@ async function _outerAjax(
): Promise<unknown> {
options.stack = new Error().stack; // just in case, save stack here.
if (options.disableRetries) {
return _promiseAjax(url, options);
}
return _retryAjax(url, options);
}
@ -868,7 +873,14 @@ export type WebAPIType = {
imageFiles: Array<string>
) => Promise<Array<Uint8Array>>;
getArtAuth: () => Promise<ArtAuthType>;
getAttachment: (cdnKey: string, cdnNumber?: number) => Promise<Uint8Array>;
getAttachment: (
cdnKey: string,
cdnNumber?: number,
options?: {
disableRetries?: boolean;
timeout?: number;
}
) => Promise<Uint8Array>;
getAvatar: (path: string) => Promise<Uint8Array>;
getDevices: () => Promise<GetDevicesResultType>;
getHasSubscription: (subscriberId: Uint8Array) => Promise<boolean>;
@ -2473,7 +2485,14 @@ export function initialize({
return packId;
}
async function getAttachment(cdnKey: string, cdnNumber?: number) {
async function getAttachment(
cdnKey: string,
cdnNumber?: number,
options?: {
disableRetries?: boolean;
timeout?: number;
}
) {
const abortController = new AbortController();
const cdnUrl = isNumber(cdnNumber)
@ -2482,9 +2501,10 @@ export function initialize({
// This is going to the CDN, not the service, so we use _outerAjax
const stream = await _outerAjax(`${cdnUrl}/attachments/${cdnKey}`, {
certificateAuthority,
disableRetries: options?.disableRetries,
proxyUrl,
responseType: 'stream',
timeout: 0,
timeout: options?.timeout || 0,
type: 'GET',
redactUrl: _createRedactor(cdnKey),
version,

View file

@ -126,6 +126,8 @@ export default class WebSocketResource extends EventTarget {
private readonly logId: string;
public readonly localPort: number | undefined;
// Public for tests
public readonly keepalive?: KeepAlive;
@ -136,6 +138,7 @@ export default class WebSocketResource extends EventTarget {
super();
this.logId = `WebSocketResource(${options.name})`;
this.localPort = socket.socket.localPort;
this.boundOnMessage = this.onMessage.bind(this);

View file

@ -15,7 +15,11 @@ import type { WebAPIType } from './WebAPI';
export async function downloadAttachment(
server: WebAPIType,
attachment: ProcessedAttachment
attachment: ProcessedAttachment,
options?: {
disableRetries?: boolean;
timeout?: number;
}
): Promise<DownloadedAttachmentType> {
const cdnId = attachment.cdnId || attachment.cdnKey;
const { cdnNumber } = attachment;
@ -25,7 +29,11 @@ export async function downloadAttachment(
}
strictAssert(cdnId, 'attachment without cdnId');
const encrypted = await server.getAttachment(cdnId, dropNull(cdnNumber));
const encrypted = await server.getAttachment(
cdnId,
dropNull(cdnNumber),
options
);
const { key, digest, size, contentType } = attachment;
if (!digest) {

View file

@ -154,6 +154,7 @@ async function handleServerKeys(
try {
await window.textsecure.storage.protocol.enqueueSessionJob(
address,
`handleServerKeys(${identifier})`,
() =>
processPreKeyBundle(
preKeyBundle,