diff --git a/js/deletes.js b/js/deletes.js index dfbfa0870e7..0f821d11933 100644 --- a/js/deletes.js +++ b/js/deletes.js @@ -48,7 +48,7 @@ } // Do not await, since this can deadlock the queue - targetConversation.queueJob(async () => { + targetConversation.queueJob('Deletes.onDelete', async () => { window.log.info('Handling DOE for', del.get('targetSentTimestamp')); const messages = await window.Signal.Data.getMessagesBySentAt( diff --git a/js/reactions.js b/js/reactions.js index 7a71c80ae78..8124ed9b077 100644 --- a/js/reactions.js +++ b/js/reactions.js @@ -65,67 +65,70 @@ } // awaiting is safe since `onReaction` is never called from inside the queue - return await targetConversation.queueJob(async () => { - window.log.info( - 'Handling reaction for', - reaction.get('targetTimestamp') - ); - - const messages = await window.Signal.Data.getMessagesBySentAt( - reaction.get('targetTimestamp'), - { - MessageCollection: Whisper.MessageCollection, - } - ); - // Message is fetched inside the conversation queue so we have the - // most recent data - const targetMessage = messages.find(m => { - const contact = m.getContact(); - - if (!contact) { - return false; - } - - const mcid = contact.get('id'); - const recid = ConversationController.ensureContactIds({ - uuid: reaction.get('targetAuthorUuid'), - }); - return mcid === recid; - }); - - if (!targetMessage) { + return await targetConversation.queueJob( + 'Reactions.onReaction', + async () => { window.log.info( - 'No message for reaction', - reaction.get('targetAuthorUuid'), + 'Handling reaction for', reaction.get('targetTimestamp') ); - // Since we haven't received the message for which we are removing a - // reaction, we can just remove those pending reactions - if (reaction.get('remove')) { - this.remove(reaction); - const oldReaction = this.where({ - targetAuthorUuid: reaction.get('targetAuthorUuid'), - targetTimestamp: reaction.get('targetTimestamp'), - emoji: reaction.get('emoji'), + const messages = await window.Signal.Data.getMessagesBySentAt( + reaction.get('targetTimestamp'), + { + MessageCollection: Whisper.MessageCollection, + } + ); + // Message is fetched inside the conversation queue so we have the + // most recent data + const targetMessage = messages.find(m => { + const contact = m.getContact(); + + if (!contact) { + return false; + } + + const mcid = contact.get('id'); + const recid = ConversationController.ensureContactIds({ + uuid: reaction.get('targetAuthorUuid'), }); - oldReaction.forEach(r => this.remove(r)); + return mcid === recid; + }); + + if (!targetMessage) { + window.log.info( + 'No message for reaction', + reaction.get('targetAuthorUuid'), + reaction.get('targetTimestamp') + ); + + // Since we haven't received the message for which we are removing a + // reaction, we can just remove those pending reactions + if (reaction.get('remove')) { + this.remove(reaction); + const oldReaction = this.where({ + targetAuthorUuid: reaction.get('targetAuthorUuid'), + targetTimestamp: reaction.get('targetTimestamp'), + emoji: reaction.get('emoji'), + }); + oldReaction.forEach(r => this.remove(r)); + } + + return undefined; } - return undefined; + const message = MessageController.register( + targetMessage.id, + targetMessage + ); + + const oldReaction = await message.handleReaction(reaction); + + this.remove(reaction); + + return oldReaction; } - - const message = MessageController.register( - targetMessage.id, - targetMessage - ); - - const oldReaction = await message.handleReaction(reaction); - - this.remove(reaction); - - return oldReaction; - }); + ); } catch (error) { window.log.error( 'Reactions.onReaction error:', diff --git a/ts/background.ts b/ts/background.ts index e5d9e5e3ad7..49551cecf37 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -162,7 +162,7 @@ export async function startApp(): Promise { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const e164 = c.get('e164')!; - c.queueJob(async () => { + c.queueJob('sendDeliveryReceipt', async () => { try { const { wrap, @@ -850,7 +850,7 @@ export async function startApp(): Promise { if (conversation) { const receivedAt = Date.now(); const receivedAtCounter = window.Signal.Util.incrementMessageCounter(); - conversation.queueJob(() => + conversation.queueJob('addDeliveryIssue', () => conversation.addDeliveryIssue({ receivedAt, receivedAtCounter, @@ -2838,7 +2838,9 @@ export async function startApp(): Promise { ); } - sender.queueJob(() => sender.sendProfileKeyUpdate()); + sender.queueJob('sendProfileKeyUpdate', () => + sender.sendProfileKeyUpdate() + ); }); }, @@ -3730,7 +3732,7 @@ export async function startApp(): Promise { window.log.warn( `requestResend/${logId}: No content hint, adding error immediately` ); - conversation.queueJob(async () => { + conversation.queueJob('addDeliveryIssue', async () => { conversation.addDeliveryIssue({ receivedAt: receivedAtDate, receivedAtCounter, @@ -3778,7 +3780,7 @@ export async function startApp(): Promise { const receivedAt = Date.now(); const receivedAtCounter = window.Signal.Util.incrementMessageCounter(); - conversation.queueJob(async () => { + conversation.queueJob('addChatSessionRefreshed', async () => { conversation.addChatSessionRefreshed({ receivedAt, receivedAtCounter }); }); } diff --git a/ts/groups.ts b/ts/groups.ts index 6fc226f8353..1b1bfc16b6a 100644 --- a/ts/groups.ts +++ b/ts/groups.ts @@ -1259,7 +1259,7 @@ export async function modifyGroupV2({ window.log.info(`modifyGroupV2/${idLog}: Queuing attempt ${attempt}`); // eslint-disable-next-line no-await-in-loop - await conversation.queueJob(async () => { + await conversation.queueJob('modifyGroupV2', async () => { window.log.info(`modifyGroupV2/${idLog}: Running attempt ${attempt}`); const actions = await createGroupChange(); @@ -1670,7 +1670,7 @@ export async function createGroupV2({ } ); - await conversation.queueJob(() => { + await conversation.queueJob('storageServiceUploadJob', () => { window.Signal.Services.storageServiceUploadJob(); }); @@ -2023,7 +2023,7 @@ export async function initiateMigrationToGroupV2( await maybeFetchNewCredentials(); try { - await conversation.queueJob(async () => { + await conversation.queueJob('initiateMigrationToGroupV2', async () => { const ACCESS_ENUM = window.textsecure.protobuf.AccessControl.AccessRequired; @@ -2310,7 +2310,7 @@ export async function waitThenRespondToGroupV2Migration( // Then wait to process all outstanding messages for this conversation const { conversation } = options; - await conversation.queueJob(async () => { + await conversation.queueJob('waitThenRespondToGroupV2Migration', async () => { try { // And finally try to migrate the group await respondToGroupV2Migration(options); @@ -2698,7 +2698,7 @@ export async function waitThenMaybeUpdateGroup( return; } - await conversation.queueJob(async () => { + await conversation.queueJob('waitThenMaybeUpdateGroup', async () => { try { // And finally try to update the group await maybeUpdateGroup(options, { viaSync }); diff --git a/ts/models/conversations.ts b/ts/models/conversations.ts index 522b1ddaacf..370520862e2 100644 --- a/ts/models/conversations.ts +++ b/ts/models/conversations.ts @@ -89,6 +89,8 @@ const { addStickerPackReference } = window.Signal.Data; const THREE_HOURS = 3 * 60 * 60 * 1000; const FIVE_MINUTES = 1000 * 60 * 5; +const JOB_REPORTING_THRESHOLD_MS = 25; + const ATTRIBUTES_THAT_DONT_INVALIDATE_PROPS_CACHE = new Set([ 'profileLastFetchedAt', ]); @@ -1051,7 +1053,9 @@ export class ConversationModel extends window.Backbone this.setRegistered(); // If we couldn't apply universal timer before - try it again. - this.queueJob(() => this.maybeSetPendingUniversalTimer()); + this.queueJob('maybeSetPendingUniversalTimer', () => + this.maybeSetPendingUniversalTimer() + ); } isValid(): boolean { @@ -1168,7 +1172,7 @@ export class ConversationModel extends window.Backbone return; } - await this.queueJob(async () => { + await this.queueJob('sendTypingMessage', async () => { const recipientId = isDirectConversation(this.attributes) ? this.getSendTarget() : undefined; @@ -2154,19 +2158,25 @@ export class ConversationModel extends window.Backbone setVerifiedDefault(options?: VerificationOptions): Promise { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const { DEFAULT } = this.verifiedEnum!; - return this.queueJob(() => this._setVerified(DEFAULT, options)); + return this.queueJob('setVerifiedDefault', () => + this._setVerified(DEFAULT, options) + ); } setVerified(options?: VerificationOptions): Promise { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const { VERIFIED } = this.verifiedEnum!; - return this.queueJob(() => this._setVerified(VERIFIED, options)); + return this.queueJob('setVerified', () => + this._setVerified(VERIFIED, options) + ); } setUnverified(options: VerificationOptions): Promise { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const { UNVERIFIED } = this.verifiedEnum!; - return this.queueJob(() => this._setVerified(UNVERIFIED, options)); + return this.queueJob('setUnverified', () => + this._setVerified(UNVERIFIED, options) + ); } async _setVerified( @@ -2863,7 +2873,7 @@ export class ConversationModel extends window.Backbone // Lastly, we don't send read syncs for any message marked read due to a read // sync. That's a notification explosion we don't need. - return this.queueJob(() => + return this.queueJob('onReadMessage', () => // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.markRead(message.get('received_at')!, { sendReadReceipts: false, @@ -2936,7 +2946,10 @@ export class ConversationModel extends window.Backbone return null; } - queueJob(callback: () => unknown | Promise): Promise { + queueJob( + name: string, + callback: () => unknown | Promise + ): Promise { this.jobQueue = this.jobQueue || new window.PQueue({ concurrency: 1 }); const taskWithTimeout = window.textsecure.createTaskWithTimeout( @@ -2944,7 +2957,27 @@ export class ConversationModel extends window.Backbone `conversation ${this.idForLogging()}` ); - return this.jobQueue.add(taskWithTimeout); + const queuedAt = Date.now(); + return this.jobQueue.add(async () => { + const startedAt = Date.now(); + const waitTime = startedAt - queuedAt; + + if (waitTime > JOB_REPORTING_THRESHOLD_MS) { + window.log.info( + `Conversation job ${name} was blocked for ${waitTime}ms` + ); + } + + try { + return await taskWithTimeout(); + } finally { + const duration = Date.now() - startedAt; + + if (duration > JOB_REPORTING_THRESHOLD_MS) { + window.log.info(`Conversation job ${name} took ${duration}ms`); + } + } + }); } isAdmin(conversationId: string): boolean { @@ -3232,7 +3265,7 @@ export class ConversationModel extends window.Backbone const destination = this.getSendTarget()!; const recipients = this.getRecipients(); - return this.queueJob(async () => { + return this.queueJob('sendDeleteForEveryone', async () => { window.log.info( 'Sending deleteForEveryone to conversation', this.idForLogging(), @@ -3362,7 +3395,7 @@ export class ConversationModel extends window.Backbone const destination = this.getSendTarget()!; const recipients = this.getRecipients(); - return this.queueJob(async () => { + return this.queueJob('sendReactionMessage', async () => { window.log.info( 'Sending reaction to conversation', this.idForLogging(), @@ -3559,7 +3592,7 @@ export class ConversationModel extends window.Backbone const destination = this.getSendTarget()!; const recipients = this.getRecipients(); - this.queueJob(async () => { + this.queueJob('sendMessage', async () => { const now = timestamp || Date.now(); await this.maybeApplyUniversalTimer(); @@ -3756,7 +3789,9 @@ export class ConversationModel extends window.Backbone return; } - this.queueJob(() => this.maybeSetPendingUniversalTimer()); + this.queueJob('maybeSetPendingUniversalTimer', () => + this.maybeSetPendingUniversalTimer() + ); const ourConversationId = window.ConversationController.getOurConversationId(); if (!ourConversationId) { @@ -5018,7 +5053,7 @@ export class ConversationModel extends window.Backbone ); this.set({ needsStorageServiceSync: true }); - this.queueJob(() => { + this.queueJob('captureChange', () => { Services.storageServiceUploadJob(); }); } diff --git a/ts/models/messages.ts b/ts/models/messages.ts index 3f90b3e050e..5357297e65b 100644 --- a/ts/models/messages.ts +++ b/ts/models/messages.ts @@ -3507,7 +3507,7 @@ export class MessageModel extends window.Backbone.Model { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const conversation = window.ConversationController.get(conversationId)!; - return conversation.queueJob(async () => { + return conversation.queueJob('handleDataMessage', async () => { window.log.info( `Starting handleDataMessage for message ${message.idForLogging()} in conversation ${conversation.idForLogging()}` );