Log conversation job wait time and duration

This commit is contained in:
Fedor Indutny 2021-06-14 14:55:14 -07:00 committed by GitHub
parent 2674e2f9f9
commit 61ac79e9ae
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 118 additions and 78 deletions

View file

@ -48,7 +48,7 @@
} }
// Do not await, since this can deadlock the queue // Do not await, since this can deadlock the queue
targetConversation.queueJob(async () => { targetConversation.queueJob('Deletes.onDelete', async () => {
window.log.info('Handling DOE for', del.get('targetSentTimestamp')); window.log.info('Handling DOE for', del.get('targetSentTimestamp'));
const messages = await window.Signal.Data.getMessagesBySentAt( const messages = await window.Signal.Data.getMessagesBySentAt(

View file

@ -65,67 +65,70 @@
} }
// awaiting is safe since `onReaction` is never called from inside the queue // awaiting is safe since `onReaction` is never called from inside the queue
return await targetConversation.queueJob(async () => { return await targetConversation.queueJob(
window.log.info( 'Reactions.onReaction',
'Handling reaction for', async () => {
reaction.get('targetTimestamp')
);
const messages = await window.Signal.Data.getMessagesBySentAt(
reaction.get('targetTimestamp'),
{
MessageCollection: Whisper.MessageCollection,
}
);
// Message is fetched inside the conversation queue so we have the
// most recent data
const targetMessage = messages.find(m => {
const contact = m.getContact();
if (!contact) {
return false;
}
const mcid = contact.get('id');
const recid = ConversationController.ensureContactIds({
uuid: reaction.get('targetAuthorUuid'),
});
return mcid === recid;
});
if (!targetMessage) {
window.log.info( window.log.info(
'No message for reaction', 'Handling reaction for',
reaction.get('targetAuthorUuid'),
reaction.get('targetTimestamp') reaction.get('targetTimestamp')
); );
// Since we haven't received the message for which we are removing a const messages = await window.Signal.Data.getMessagesBySentAt(
// reaction, we can just remove those pending reactions reaction.get('targetTimestamp'),
if (reaction.get('remove')) { {
this.remove(reaction); MessageCollection: Whisper.MessageCollection,
const oldReaction = this.where({ }
targetAuthorUuid: reaction.get('targetAuthorUuid'), );
targetTimestamp: reaction.get('targetTimestamp'), // Message is fetched inside the conversation queue so we have the
emoji: reaction.get('emoji'), // most recent data
const targetMessage = messages.find(m => {
const contact = m.getContact();
if (!contact) {
return false;
}
const mcid = contact.get('id');
const recid = ConversationController.ensureContactIds({
uuid: reaction.get('targetAuthorUuid'),
}); });
oldReaction.forEach(r => this.remove(r)); return mcid === recid;
});
if (!targetMessage) {
window.log.info(
'No message for reaction',
reaction.get('targetAuthorUuid'),
reaction.get('targetTimestamp')
);
// Since we haven't received the message for which we are removing a
// reaction, we can just remove those pending reactions
if (reaction.get('remove')) {
this.remove(reaction);
const oldReaction = this.where({
targetAuthorUuid: reaction.get('targetAuthorUuid'),
targetTimestamp: reaction.get('targetTimestamp'),
emoji: reaction.get('emoji'),
});
oldReaction.forEach(r => this.remove(r));
}
return undefined;
} }
return undefined; const message = MessageController.register(
targetMessage.id,
targetMessage
);
const oldReaction = await message.handleReaction(reaction);
this.remove(reaction);
return oldReaction;
} }
);
const message = MessageController.register(
targetMessage.id,
targetMessage
);
const oldReaction = await message.handleReaction(reaction);
this.remove(reaction);
return oldReaction;
});
} catch (error) { } catch (error) {
window.log.error( window.log.error(
'Reactions.onReaction error:', 'Reactions.onReaction error:',

View file

@ -162,7 +162,7 @@ export async function startApp(): Promise<void> {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const e164 = c.get('e164')!; const e164 = c.get('e164')!;
c.queueJob(async () => { c.queueJob('sendDeliveryReceipt', async () => {
try { try {
const { const {
wrap, wrap,
@ -850,7 +850,7 @@ export async function startApp(): Promise<void> {
if (conversation) { if (conversation) {
const receivedAt = Date.now(); const receivedAt = Date.now();
const receivedAtCounter = window.Signal.Util.incrementMessageCounter(); const receivedAtCounter = window.Signal.Util.incrementMessageCounter();
conversation.queueJob(() => conversation.queueJob('addDeliveryIssue', () =>
conversation.addDeliveryIssue({ conversation.addDeliveryIssue({
receivedAt, receivedAt,
receivedAtCounter, receivedAtCounter,
@ -2838,7 +2838,9 @@ export async function startApp(): Promise<void> {
); );
} }
sender.queueJob(() => sender.sendProfileKeyUpdate()); sender.queueJob('sendProfileKeyUpdate', () =>
sender.sendProfileKeyUpdate()
);
}); });
}, },
@ -3730,7 +3732,7 @@ export async function startApp(): Promise<void> {
window.log.warn( window.log.warn(
`requestResend/${logId}: No content hint, adding error immediately` `requestResend/${logId}: No content hint, adding error immediately`
); );
conversation.queueJob(async () => { conversation.queueJob('addDeliveryIssue', async () => {
conversation.addDeliveryIssue({ conversation.addDeliveryIssue({
receivedAt: receivedAtDate, receivedAt: receivedAtDate,
receivedAtCounter, receivedAtCounter,
@ -3778,7 +3780,7 @@ export async function startApp(): Promise<void> {
const receivedAt = Date.now(); const receivedAt = Date.now();
const receivedAtCounter = window.Signal.Util.incrementMessageCounter(); const receivedAtCounter = window.Signal.Util.incrementMessageCounter();
conversation.queueJob(async () => { conversation.queueJob('addChatSessionRefreshed', async () => {
conversation.addChatSessionRefreshed({ receivedAt, receivedAtCounter }); conversation.addChatSessionRefreshed({ receivedAt, receivedAtCounter });
}); });
} }

View file

@ -1259,7 +1259,7 @@ export async function modifyGroupV2({
window.log.info(`modifyGroupV2/${idLog}: Queuing attempt ${attempt}`); window.log.info(`modifyGroupV2/${idLog}: Queuing attempt ${attempt}`);
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
await conversation.queueJob(async () => { await conversation.queueJob('modifyGroupV2', async () => {
window.log.info(`modifyGroupV2/${idLog}: Running attempt ${attempt}`); window.log.info(`modifyGroupV2/${idLog}: Running attempt ${attempt}`);
const actions = await createGroupChange(); const actions = await createGroupChange();
@ -1670,7 +1670,7 @@ export async function createGroupV2({
} }
); );
await conversation.queueJob(() => { await conversation.queueJob('storageServiceUploadJob', () => {
window.Signal.Services.storageServiceUploadJob(); window.Signal.Services.storageServiceUploadJob();
}); });
@ -2023,7 +2023,7 @@ export async function initiateMigrationToGroupV2(
await maybeFetchNewCredentials(); await maybeFetchNewCredentials();
try { try {
await conversation.queueJob(async () => { await conversation.queueJob('initiateMigrationToGroupV2', async () => {
const ACCESS_ENUM = const ACCESS_ENUM =
window.textsecure.protobuf.AccessControl.AccessRequired; window.textsecure.protobuf.AccessControl.AccessRequired;
@ -2310,7 +2310,7 @@ export async function waitThenRespondToGroupV2Migration(
// Then wait to process all outstanding messages for this conversation // Then wait to process all outstanding messages for this conversation
const { conversation } = options; const { conversation } = options;
await conversation.queueJob(async () => { await conversation.queueJob('waitThenRespondToGroupV2Migration', async () => {
try { try {
// And finally try to migrate the group // And finally try to migrate the group
await respondToGroupV2Migration(options); await respondToGroupV2Migration(options);
@ -2698,7 +2698,7 @@ export async function waitThenMaybeUpdateGroup(
return; return;
} }
await conversation.queueJob(async () => { await conversation.queueJob('waitThenMaybeUpdateGroup', async () => {
try { try {
// And finally try to update the group // And finally try to update the group
await maybeUpdateGroup(options, { viaSync }); await maybeUpdateGroup(options, { viaSync });

View file

@ -89,6 +89,8 @@ const { addStickerPackReference } = window.Signal.Data;
const THREE_HOURS = 3 * 60 * 60 * 1000; const THREE_HOURS = 3 * 60 * 60 * 1000;
const FIVE_MINUTES = 1000 * 60 * 5; const FIVE_MINUTES = 1000 * 60 * 5;
const JOB_REPORTING_THRESHOLD_MS = 25;
const ATTRIBUTES_THAT_DONT_INVALIDATE_PROPS_CACHE = new Set([ const ATTRIBUTES_THAT_DONT_INVALIDATE_PROPS_CACHE = new Set([
'profileLastFetchedAt', 'profileLastFetchedAt',
]); ]);
@ -1051,7 +1053,9 @@ export class ConversationModel extends window.Backbone
this.setRegistered(); this.setRegistered();
// If we couldn't apply universal timer before - try it again. // If we couldn't apply universal timer before - try it again.
this.queueJob(() => this.maybeSetPendingUniversalTimer()); this.queueJob('maybeSetPendingUniversalTimer', () =>
this.maybeSetPendingUniversalTimer()
);
} }
isValid(): boolean { isValid(): boolean {
@ -1168,7 +1172,7 @@ export class ConversationModel extends window.Backbone
return; return;
} }
await this.queueJob(async () => { await this.queueJob('sendTypingMessage', async () => {
const recipientId = isDirectConversation(this.attributes) const recipientId = isDirectConversation(this.attributes)
? this.getSendTarget() ? this.getSendTarget()
: undefined; : undefined;
@ -2154,19 +2158,25 @@ export class ConversationModel extends window.Backbone
setVerifiedDefault(options?: VerificationOptions): Promise<unknown> { setVerifiedDefault(options?: VerificationOptions): Promise<unknown> {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const { DEFAULT } = this.verifiedEnum!; const { DEFAULT } = this.verifiedEnum!;
return this.queueJob(() => this._setVerified(DEFAULT, options)); return this.queueJob('setVerifiedDefault', () =>
this._setVerified(DEFAULT, options)
);
} }
setVerified(options?: VerificationOptions): Promise<unknown> { setVerified(options?: VerificationOptions): Promise<unknown> {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const { VERIFIED } = this.verifiedEnum!; const { VERIFIED } = this.verifiedEnum!;
return this.queueJob(() => this._setVerified(VERIFIED, options)); return this.queueJob('setVerified', () =>
this._setVerified(VERIFIED, options)
);
} }
setUnverified(options: VerificationOptions): Promise<unknown> { setUnverified(options: VerificationOptions): Promise<unknown> {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const { UNVERIFIED } = this.verifiedEnum!; const { UNVERIFIED } = this.verifiedEnum!;
return this.queueJob(() => this._setVerified(UNVERIFIED, options)); return this.queueJob('setUnverified', () =>
this._setVerified(UNVERIFIED, options)
);
} }
async _setVerified( async _setVerified(
@ -2863,7 +2873,7 @@ export class ConversationModel extends window.Backbone
// Lastly, we don't send read syncs for any message marked read due to a read // Lastly, we don't send read syncs for any message marked read due to a read
// sync. That's a notification explosion we don't need. // sync. That's a notification explosion we don't need.
return this.queueJob(() => return this.queueJob('onReadMessage', () =>
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.markRead(message.get('received_at')!, { this.markRead(message.get('received_at')!, {
sendReadReceipts: false, sendReadReceipts: false,
@ -2936,7 +2946,10 @@ export class ConversationModel extends window.Backbone
return null; return null;
} }
queueJob(callback: () => unknown | Promise<unknown>): Promise<WhatIsThis> { queueJob(
name: string,
callback: () => unknown | Promise<unknown>
): Promise<WhatIsThis> {
this.jobQueue = this.jobQueue || new window.PQueue({ concurrency: 1 }); this.jobQueue = this.jobQueue || new window.PQueue({ concurrency: 1 });
const taskWithTimeout = window.textsecure.createTaskWithTimeout( const taskWithTimeout = window.textsecure.createTaskWithTimeout(
@ -2944,7 +2957,27 @@ export class ConversationModel extends window.Backbone
`conversation ${this.idForLogging()}` `conversation ${this.idForLogging()}`
); );
return this.jobQueue.add(taskWithTimeout); const queuedAt = Date.now();
return this.jobQueue.add(async () => {
const startedAt = Date.now();
const waitTime = startedAt - queuedAt;
if (waitTime > JOB_REPORTING_THRESHOLD_MS) {
window.log.info(
`Conversation job ${name} was blocked for ${waitTime}ms`
);
}
try {
return await taskWithTimeout();
} finally {
const duration = Date.now() - startedAt;
if (duration > JOB_REPORTING_THRESHOLD_MS) {
window.log.info(`Conversation job ${name} took ${duration}ms`);
}
}
});
} }
isAdmin(conversationId: string): boolean { isAdmin(conversationId: string): boolean {
@ -3232,7 +3265,7 @@ export class ConversationModel extends window.Backbone
const destination = this.getSendTarget()!; const destination = this.getSendTarget()!;
const recipients = this.getRecipients(); const recipients = this.getRecipients();
return this.queueJob(async () => { return this.queueJob('sendDeleteForEveryone', async () => {
window.log.info( window.log.info(
'Sending deleteForEveryone to conversation', 'Sending deleteForEveryone to conversation',
this.idForLogging(), this.idForLogging(),
@ -3362,7 +3395,7 @@ export class ConversationModel extends window.Backbone
const destination = this.getSendTarget()!; const destination = this.getSendTarget()!;
const recipients = this.getRecipients(); const recipients = this.getRecipients();
return this.queueJob(async () => { return this.queueJob('sendReactionMessage', async () => {
window.log.info( window.log.info(
'Sending reaction to conversation', 'Sending reaction to conversation',
this.idForLogging(), this.idForLogging(),
@ -3559,7 +3592,7 @@ export class ConversationModel extends window.Backbone
const destination = this.getSendTarget()!; const destination = this.getSendTarget()!;
const recipients = this.getRecipients(); const recipients = this.getRecipients();
this.queueJob(async () => { this.queueJob('sendMessage', async () => {
const now = timestamp || Date.now(); const now = timestamp || Date.now();
await this.maybeApplyUniversalTimer(); await this.maybeApplyUniversalTimer();
@ -3756,7 +3789,9 @@ export class ConversationModel extends window.Backbone
return; return;
} }
this.queueJob(() => this.maybeSetPendingUniversalTimer()); this.queueJob('maybeSetPendingUniversalTimer', () =>
this.maybeSetPendingUniversalTimer()
);
const ourConversationId = window.ConversationController.getOurConversationId(); const ourConversationId = window.ConversationController.getOurConversationId();
if (!ourConversationId) { if (!ourConversationId) {
@ -5018,7 +5053,7 @@ export class ConversationModel extends window.Backbone
); );
this.set({ needsStorageServiceSync: true }); this.set({ needsStorageServiceSync: true });
this.queueJob(() => { this.queueJob('captureChange', () => {
Services.storageServiceUploadJob(); Services.storageServiceUploadJob();
}); });
} }

View file

@ -3507,7 +3507,7 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const conversation = window.ConversationController.get(conversationId)!; const conversation = window.ConversationController.get(conversationId)!;
return conversation.queueJob(async () => { return conversation.queueJob('handleDataMessage', async () => {
window.log.info( window.log.info(
`Starting handleDataMessage for message ${message.idForLogging()} in conversation ${conversation.idForLogging()}` `Starting handleDataMessage for message ${message.idForLogging()} in conversation ${conversation.idForLogging()}`
); );