Don't block message processing on contact syncs
Co-authored-by: trevor-signal <131492920+trevor-signal@users.noreply.github.com>
This commit is contained in:
parent
034f963fcc
commit
7267cd894e
5 changed files with 59 additions and 59 deletions
|
@ -542,7 +542,6 @@ export async function startApp(): Promise<void> {
|
|||
|
||||
log.info('Initializing MessageReceiver');
|
||||
messageReceiver = new MessageReceiver({
|
||||
server,
|
||||
storage: window.storage,
|
||||
serverTrustRoot: window.getServerTrustRoot(),
|
||||
});
|
||||
|
|
|
@ -5,7 +5,10 @@ import PQueue from 'p-queue';
|
|||
|
||||
import { DataWriter } from '../sql/Client';
|
||||
import type { ContactSyncEvent } from '../textsecure/messageReceiverEvents';
|
||||
import type { ContactDetailsWithAvatar } from '../textsecure/ContactsParser';
|
||||
import {
|
||||
parseContactsV2,
|
||||
type ContactDetailsWithAvatar,
|
||||
} from '../textsecure/ContactsParser';
|
||||
import { normalizeAci } from '../util/normalizeAci';
|
||||
import * as Conversation from '../types/Conversation';
|
||||
import * as Errors from '../types/errors';
|
||||
|
@ -15,6 +18,11 @@ import { validateConversation } from '../util/validateConversation';
|
|||
import { isDirectConversation, isMe } from '../util/whatTypeOfConversation';
|
||||
import * as log from '../logging/log';
|
||||
import { dropNull } from '../util/dropNull';
|
||||
import type { ProcessedAttachment } from '../textsecure/Types';
|
||||
import { downloadAttachment } from '../textsecure/downloadAttachment';
|
||||
import { strictAssert } from '../util/assert';
|
||||
import type { ReencryptedAttachmentV2 } from '../AttachmentCrypto';
|
||||
import { SECOND } from '../util/durations';
|
||||
|
||||
// When true - we are running the very first storage and contact sync after
|
||||
// linking.
|
||||
|
@ -86,8 +94,34 @@ async function updateConversationFromContactSync(
|
|||
|
||||
const queue = new PQueue({ concurrency: 1 });
|
||||
|
||||
async function downloadAndParseContactAttachment(
|
||||
contactAttachment: ProcessedAttachment
|
||||
) {
|
||||
strictAssert(window.textsecure.server, 'server must exist');
|
||||
let downloaded: ReencryptedAttachmentV2 | undefined;
|
||||
try {
|
||||
downloaded = await downloadAttachment(
|
||||
window.textsecure.server,
|
||||
contactAttachment,
|
||||
{
|
||||
disableRetries: true,
|
||||
timeout: 90 * SECOND,
|
||||
}
|
||||
);
|
||||
|
||||
return await parseContactsV2({
|
||||
...contactAttachment,
|
||||
...downloaded,
|
||||
});
|
||||
} finally {
|
||||
if (downloaded?.path) {
|
||||
await window.Signal.Migrations.deleteAttachmentData(downloaded.path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function doContactSync({
|
||||
contacts,
|
||||
contactAttachment,
|
||||
complete: isFullSync,
|
||||
receivedAtCounter,
|
||||
sentAt,
|
||||
|
@ -95,8 +129,11 @@ async function doContactSync({
|
|||
const logId =
|
||||
`doContactSync(sent=${sentAt}, ` +
|
||||
`receivedAt=${receivedAtCounter}, isFullSync=${isFullSync})`;
|
||||
log.info(`${logId}: got ${contacts.length} contacts`);
|
||||
|
||||
log.info(`${logId}: downloading contact attachment`);
|
||||
const contacts = await downloadAndParseContactAttachment(contactAttachment);
|
||||
|
||||
log.info(`${logId}: got ${contacts.length} contacts`);
|
||||
const updatedConversations = new Set<ConversationModel>();
|
||||
|
||||
let promises = new Array<Promise<void>>();
|
||||
|
@ -195,5 +232,12 @@ export async function onContactSync(ev: ContactSyncEvent): Promise<void> {
|
|||
log.info(
|
||||
`onContactSync(sent=${ev.sentAt}, receivedAt=${ev.receivedAtCounter}): queueing sync`
|
||||
);
|
||||
await queue.add(() => doContactSync(ev));
|
||||
|
||||
const promise = queue.add(() => doContactSync(ev));
|
||||
|
||||
// Returning the promise blocks MessageReceiver.appQueue, which we only want to do on
|
||||
// initial sync
|
||||
if (isInitialSync) {
|
||||
return promise;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,7 +8,6 @@ import Long from 'long';
|
|||
|
||||
import MessageReceiver from '../textsecure/MessageReceiver';
|
||||
import { IncomingWebSocketRequestLegacy } from '../textsecure/WebsocketResources';
|
||||
import type { WebAPIType } from '../textsecure/WebAPI';
|
||||
import type { DecryptionErrorEvent } from '../textsecure/messageReceiverEvents';
|
||||
import { generateAci } from '../types/ServiceId';
|
||||
import type { AciString } from '../types/ServiceId';
|
||||
|
@ -39,7 +38,6 @@ describe('MessageReceiver', () => {
|
|||
describe('connecting', () => {
|
||||
it('generates decryption-error event when it cannot decrypt', async () => {
|
||||
const messageReceiver = new MessageReceiver({
|
||||
server: {} as WebAPIType,
|
||||
storage: window.storage,
|
||||
serverTrustRoot: 'AAAAAAAA',
|
||||
});
|
||||
|
|
|
@ -49,8 +49,7 @@ import { parseIntOrThrow } from '../util/parseIntOrThrow';
|
|||
import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary';
|
||||
import { Zone } from '../util/Zone';
|
||||
import * as durations from '../util/durations';
|
||||
import { DurationInSeconds, SECOND } from '../util/durations';
|
||||
import type { AttachmentType } from '../types/Attachment';
|
||||
import { DurationInSeconds } from '../util/durations';
|
||||
import { Address } from '../types/Address';
|
||||
import { QualifiedAddress } from '../types/QualifiedAddress';
|
||||
import { normalizeStoryDistributionId } from '../types/StoryDistributionId';
|
||||
|
@ -82,11 +81,8 @@ import {
|
|||
import { processSyncMessage } from './processSyncMessage';
|
||||
import type { EventHandler } from './EventTarget';
|
||||
import EventTarget from './EventTarget';
|
||||
import { downloadAttachment } from './downloadAttachment';
|
||||
import type { IncomingWebSocketRequest } from './WebsocketResources';
|
||||
import { ServerRequestType } from './WebsocketResources';
|
||||
import { parseContactsV2 } from './ContactsParser';
|
||||
import type { WebAPIType } from './WebAPI';
|
||||
import type { Storage } from './Storage';
|
||||
import { WarnOnlyError } from './Errors';
|
||||
import * as Bytes from '../Bytes';
|
||||
|
@ -215,7 +211,6 @@ enum TaskType {
|
|||
}
|
||||
|
||||
export type MessageReceiverOptions = {
|
||||
server: WebAPIType;
|
||||
storage: Storage;
|
||||
serverTrustRoot: string;
|
||||
};
|
||||
|
@ -287,8 +282,6 @@ export default class MessageReceiver
|
|||
{
|
||||
/* eslint-enable @typescript-eslint/brace-style */
|
||||
|
||||
private server: WebAPIType;
|
||||
|
||||
private storage: Storage;
|
||||
|
||||
private appQueue: PQueue;
|
||||
|
@ -319,10 +312,9 @@ export default class MessageReceiver
|
|||
|
||||
private isAppReadyForProcessing: boolean = false;
|
||||
|
||||
constructor({ server, storage, serverTrustRoot }: MessageReceiverOptions) {
|
||||
constructor({ storage, serverTrustRoot }: MessageReceiverOptions) {
|
||||
super();
|
||||
|
||||
this.server = server;
|
||||
this.storage = storage;
|
||||
|
||||
this.count = 0;
|
||||
|
@ -3823,34 +3815,13 @@ export default class MessageReceiver
|
|||
|
||||
this.removeFromCache(envelope);
|
||||
|
||||
let attachment: AttachmentType | undefined;
|
||||
try {
|
||||
attachment = await this.handleAttachmentV2(blob, {
|
||||
disableRetries: true,
|
||||
timeout: 90 * SECOND,
|
||||
});
|
||||
|
||||
const { path } = attachment;
|
||||
if (!path) {
|
||||
throw new Error('Failed no path field in returned attachment');
|
||||
}
|
||||
|
||||
const contacts = await parseContactsV2(attachment);
|
||||
|
||||
const contactSync = new ContactSyncEvent(
|
||||
contacts,
|
||||
Boolean(contactSyncProto.complete),
|
||||
envelope.receivedAtCounter,
|
||||
envelope.timestamp
|
||||
);
|
||||
await this.dispatchAndWait(logId, contactSync);
|
||||
|
||||
log.info('handleContacts: finished');
|
||||
} finally {
|
||||
if (attachment?.path) {
|
||||
await window.Signal.Migrations.deleteAttachmentData(attachment.path);
|
||||
}
|
||||
}
|
||||
const contactSync = new ContactSyncEvent(
|
||||
processAttachment(blob),
|
||||
Boolean(contactSyncProto.complete),
|
||||
envelope.receivedAtCounter,
|
||||
envelope.timestamp
|
||||
);
|
||||
await this.dispatchAndWait(logId, contactSync);
|
||||
}
|
||||
|
||||
private async handleBlocked(
|
||||
|
@ -3937,18 +3908,6 @@ export default class MessageReceiver
|
|||
return this.storage.blocked.isGroupBlocked(groupId);
|
||||
}
|
||||
|
||||
private async handleAttachmentV2(
|
||||
attachment: Proto.IAttachmentPointer,
|
||||
options?: { timeout?: number; disableRetries?: boolean }
|
||||
): Promise<AttachmentType> {
|
||||
const cleaned = processAttachment(attachment);
|
||||
const downloaded = await downloadAttachment(this.server, cleaned, options);
|
||||
return {
|
||||
...cleaned,
|
||||
...downloaded,
|
||||
};
|
||||
}
|
||||
|
||||
private async handleEndSession(
|
||||
envelope: ProcessedEnvelope,
|
||||
theirServiceId: ServiceIdString
|
||||
|
|
|
@ -16,8 +16,8 @@ import type {
|
|||
ProcessedEnvelope,
|
||||
ProcessedDataMessage,
|
||||
ProcessedSent,
|
||||
ProcessedAttachment,
|
||||
} from './Types.d';
|
||||
import type { ContactDetailsWithAvatar } from './ContactsParser';
|
||||
import type {
|
||||
CallEventDetails,
|
||||
CallLogEventDetails,
|
||||
|
@ -84,7 +84,7 @@ export class ErrorEvent extends Event {
|
|||
|
||||
export class ContactSyncEvent extends Event {
|
||||
constructor(
|
||||
public readonly contacts: ReadonlyArray<ContactDetailsWithAvatar>,
|
||||
public readonly contactAttachment: ProcessedAttachment,
|
||||
public readonly complete: boolean,
|
||||
public readonly receivedAtCounter: number,
|
||||
public readonly sentAt: number
|
||||
|
|
Loading…
Reference in a new issue