Don't block message processing on contact syncs

This commit is contained in:
trevor-signal 2024-09-17 18:52:31 -04:00 committed by GitHub
parent e5d03edea1
commit c11a894ad1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 59 additions and 59 deletions

View file

@ -542,7 +542,6 @@ export async function startApp(): Promise<void> {
log.info('Initializing MessageReceiver'); log.info('Initializing MessageReceiver');
messageReceiver = new MessageReceiver({ messageReceiver = new MessageReceiver({
server,
storage: window.storage, storage: window.storage,
serverTrustRoot: window.getServerTrustRoot(), serverTrustRoot: window.getServerTrustRoot(),
}); });

View file

@ -5,7 +5,10 @@ import PQueue from 'p-queue';
import { DataWriter } from '../sql/Client'; import { DataWriter } from '../sql/Client';
import type { ContactSyncEvent } from '../textsecure/messageReceiverEvents'; import type { ContactSyncEvent } from '../textsecure/messageReceiverEvents';
import type { ContactDetailsWithAvatar } from '../textsecure/ContactsParser'; import {
parseContactsV2,
type ContactDetailsWithAvatar,
} from '../textsecure/ContactsParser';
import { normalizeAci } from '../util/normalizeAci'; import { normalizeAci } from '../util/normalizeAci';
import * as Conversation from '../types/Conversation'; import * as Conversation from '../types/Conversation';
import * as Errors from '../types/errors'; import * as Errors from '../types/errors';
@ -15,6 +18,11 @@ import { validateConversation } from '../util/validateConversation';
import { isDirectConversation, isMe } from '../util/whatTypeOfConversation'; import { isDirectConversation, isMe } from '../util/whatTypeOfConversation';
import * as log from '../logging/log'; import * as log from '../logging/log';
import { dropNull } from '../util/dropNull'; import { dropNull } from '../util/dropNull';
import type { ProcessedAttachment } from '../textsecure/Types';
import { downloadAttachment } from '../textsecure/downloadAttachment';
import { strictAssert } from '../util/assert';
import type { ReencryptedAttachmentV2 } from '../AttachmentCrypto';
import { SECOND } from '../util/durations';
// When true - we are running the very first storage and contact sync after // When true - we are running the very first storage and contact sync after
// linking. // linking.
@ -86,8 +94,34 @@ async function updateConversationFromContactSync(
const queue = new PQueue({ concurrency: 1 }); const queue = new PQueue({ concurrency: 1 });
async function downloadAndParseContactAttachment(
contactAttachment: ProcessedAttachment
) {
strictAssert(window.textsecure.server, 'server must exist');
let downloaded: ReencryptedAttachmentV2 | undefined;
try {
downloaded = await downloadAttachment(
window.textsecure.server,
contactAttachment,
{
disableRetries: true,
timeout: 90 * SECOND,
}
);
return await parseContactsV2({
...contactAttachment,
...downloaded,
});
} finally {
if (downloaded?.path) {
await window.Signal.Migrations.deleteAttachmentData(downloaded.path);
}
}
}
async function doContactSync({ async function doContactSync({
contacts, contactAttachment,
complete: isFullSync, complete: isFullSync,
receivedAtCounter, receivedAtCounter,
sentAt, sentAt,
@ -95,8 +129,11 @@ async function doContactSync({
const logId = const logId =
`doContactSync(sent=${sentAt}, ` + `doContactSync(sent=${sentAt}, ` +
`receivedAt=${receivedAtCounter}, isFullSync=${isFullSync})`; `receivedAt=${receivedAtCounter}, isFullSync=${isFullSync})`;
log.info(`${logId}: got ${contacts.length} contacts`);
log.info(`${logId}: downloading contact attachment`);
const contacts = await downloadAndParseContactAttachment(contactAttachment);
log.info(`${logId}: got ${contacts.length} contacts`);
const updatedConversations = new Set<ConversationModel>(); const updatedConversations = new Set<ConversationModel>();
let promises = new Array<Promise<void>>(); let promises = new Array<Promise<void>>();
@ -195,5 +232,12 @@ export async function onContactSync(ev: ContactSyncEvent): Promise<void> {
log.info( log.info(
`onContactSync(sent=${ev.sentAt}, receivedAt=${ev.receivedAtCounter}): queueing sync` `onContactSync(sent=${ev.sentAt}, receivedAt=${ev.receivedAtCounter}): queueing sync`
); );
await queue.add(() => doContactSync(ev));
const promise = queue.add(() => doContactSync(ev));
// Returning the promise blocks MessageReceiver.appQueue, which we only want to do on
// initial sync
if (isInitialSync) {
return promise;
}
} }

View file

@ -8,7 +8,6 @@ import Long from 'long';
import MessageReceiver from '../textsecure/MessageReceiver'; import MessageReceiver from '../textsecure/MessageReceiver';
import { IncomingWebSocketRequestLegacy } from '../textsecure/WebsocketResources'; import { IncomingWebSocketRequestLegacy } from '../textsecure/WebsocketResources';
import type { WebAPIType } from '../textsecure/WebAPI';
import type { DecryptionErrorEvent } from '../textsecure/messageReceiverEvents'; import type { DecryptionErrorEvent } from '../textsecure/messageReceiverEvents';
import { generateAci } from '../types/ServiceId'; import { generateAci } from '../types/ServiceId';
import type { AciString } from '../types/ServiceId'; import type { AciString } from '../types/ServiceId';
@ -39,7 +38,6 @@ describe('MessageReceiver', () => {
describe('connecting', () => { describe('connecting', () => {
it('generates decryption-error event when it cannot decrypt', async () => { it('generates decryption-error event when it cannot decrypt', async () => {
const messageReceiver = new MessageReceiver({ const messageReceiver = new MessageReceiver({
server: {} as WebAPIType,
storage: window.storage, storage: window.storage,
serverTrustRoot: 'AAAAAAAA', serverTrustRoot: 'AAAAAAAA',
}); });

View file

@ -49,8 +49,7 @@ import { parseIntOrThrow } from '../util/parseIntOrThrow';
import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary'; import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary';
import { Zone } from '../util/Zone'; import { Zone } from '../util/Zone';
import * as durations from '../util/durations'; import * as durations from '../util/durations';
import { DurationInSeconds, SECOND } from '../util/durations'; import { DurationInSeconds } from '../util/durations';
import type { AttachmentType } from '../types/Attachment';
import { Address } from '../types/Address'; import { Address } from '../types/Address';
import { QualifiedAddress } from '../types/QualifiedAddress'; import { QualifiedAddress } from '../types/QualifiedAddress';
import { normalizeStoryDistributionId } from '../types/StoryDistributionId'; import { normalizeStoryDistributionId } from '../types/StoryDistributionId';
@ -82,11 +81,8 @@ import {
import { processSyncMessage } from './processSyncMessage'; import { processSyncMessage } from './processSyncMessage';
import type { EventHandler } from './EventTarget'; import type { EventHandler } from './EventTarget';
import EventTarget from './EventTarget'; import EventTarget from './EventTarget';
import { downloadAttachment } from './downloadAttachment';
import type { IncomingWebSocketRequest } from './WebsocketResources'; import type { IncomingWebSocketRequest } from './WebsocketResources';
import { ServerRequestType } from './WebsocketResources'; import { ServerRequestType } from './WebsocketResources';
import { parseContactsV2 } from './ContactsParser';
import type { WebAPIType } from './WebAPI';
import type { Storage } from './Storage'; import type { Storage } from './Storage';
import { WarnOnlyError } from './Errors'; import { WarnOnlyError } from './Errors';
import * as Bytes from '../Bytes'; import * as Bytes from '../Bytes';
@ -215,7 +211,6 @@ enum TaskType {
} }
export type MessageReceiverOptions = { export type MessageReceiverOptions = {
server: WebAPIType;
storage: Storage; storage: Storage;
serverTrustRoot: string; serverTrustRoot: string;
}; };
@ -287,8 +282,6 @@ export default class MessageReceiver
{ {
/* eslint-enable @typescript-eslint/brace-style */ /* eslint-enable @typescript-eslint/brace-style */
private server: WebAPIType;
private storage: Storage; private storage: Storage;
private appQueue: PQueue; private appQueue: PQueue;
@ -319,10 +312,9 @@ export default class MessageReceiver
private isAppReadyForProcessing: boolean = false; private isAppReadyForProcessing: boolean = false;
constructor({ server, storage, serverTrustRoot }: MessageReceiverOptions) { constructor({ storage, serverTrustRoot }: MessageReceiverOptions) {
super(); super();
this.server = server;
this.storage = storage; this.storage = storage;
this.count = 0; this.count = 0;
@ -3823,34 +3815,13 @@ export default class MessageReceiver
this.removeFromCache(envelope); this.removeFromCache(envelope);
let attachment: AttachmentType | undefined;
try {
attachment = await this.handleAttachmentV2(blob, {
disableRetries: true,
timeout: 90 * SECOND,
});
const { path } = attachment;
if (!path) {
throw new Error('Failed no path field in returned attachment');
}
const contacts = await parseContactsV2(attachment);
const contactSync = new ContactSyncEvent( const contactSync = new ContactSyncEvent(
contacts, processAttachment(blob),
Boolean(contactSyncProto.complete), Boolean(contactSyncProto.complete),
envelope.receivedAtCounter, envelope.receivedAtCounter,
envelope.timestamp envelope.timestamp
); );
await this.dispatchAndWait(logId, contactSync); await this.dispatchAndWait(logId, contactSync);
log.info('handleContacts: finished');
} finally {
if (attachment?.path) {
await window.Signal.Migrations.deleteAttachmentData(attachment.path);
}
}
} }
private async handleBlocked( private async handleBlocked(
@ -3937,18 +3908,6 @@ export default class MessageReceiver
return this.storage.blocked.isGroupBlocked(groupId); return this.storage.blocked.isGroupBlocked(groupId);
} }
private async handleAttachmentV2(
attachment: Proto.IAttachmentPointer,
options?: { timeout?: number; disableRetries?: boolean }
): Promise<AttachmentType> {
const cleaned = processAttachment(attachment);
const downloaded = await downloadAttachment(this.server, cleaned, options);
return {
...cleaned,
...downloaded,
};
}
private async handleEndSession( private async handleEndSession(
envelope: ProcessedEnvelope, envelope: ProcessedEnvelope,
theirServiceId: ServiceIdString theirServiceId: ServiceIdString

View file

@ -16,8 +16,8 @@ import type {
ProcessedEnvelope, ProcessedEnvelope,
ProcessedDataMessage, ProcessedDataMessage,
ProcessedSent, ProcessedSent,
ProcessedAttachment,
} from './Types.d'; } from './Types.d';
import type { ContactDetailsWithAvatar } from './ContactsParser';
import type { import type {
CallEventDetails, CallEventDetails,
CallLogEventDetails, CallLogEventDetails,
@ -84,7 +84,7 @@ export class ErrorEvent extends Event {
export class ContactSyncEvent extends Event { export class ContactSyncEvent extends Event {
constructor( constructor(
public readonly contacts: ReadonlyArray<ContactDetailsWithAvatar>, public readonly contactAttachment: ProcessedAttachment,
public readonly complete: boolean, public readonly complete: boolean,
public readonly receivedAtCounter: number, public readonly receivedAtCounter: number,
public readonly sentAt: number public readonly sentAt: number