Run checkForConflicts on a p-queue

This commit is contained in:
Fedor Indutny 2022-09-13 09:16:01 -07:00 committed by GitHub
parent a040330d89
commit 3e6156ced7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -664,7 +664,15 @@ export class ConversationController {
return convoUuid; return convoUuid;
} }
async checkForConflicts(): Promise<void> { checkForConflicts(): Promise<void> {
return this._combineConversationsQueue.add(() =>
this.doCheckForConflicts()
);
}
// Note: `doCombineConversations` is used within this function since both
// run on `_combineConversationsQueue` queue and we don't want deadlocks.
private async doCheckForConflicts(): Promise<void> {
log.info('checkForConflicts: starting...'); log.info('checkForConflicts: starting...');
const byUuid = Object.create(null); const byUuid = Object.create(null);
const byE164 = Object.create(null); const byE164 = Object.create(null);
@ -698,12 +706,12 @@ export class ConversationController {
if (conversation.get('e164')) { if (conversation.get('e164')) {
// Keep new one // Keep new one
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
await this.combineConversations(conversation, existing); await this.doCombineConversations(conversation, existing);
byUuid[uuid] = conversation; byUuid[uuid] = conversation;
} else { } else {
// Keep existing - note that this applies if neither had an e164 // Keep existing - note that this applies if neither had an e164
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
await this.combineConversations(existing, conversation); await this.doCombineConversations(existing, conversation);
} }
} }
} }
@ -719,12 +727,12 @@ export class ConversationController {
if (conversation.get('e164') || conversation.get('pni')) { if (conversation.get('e164') || conversation.get('pni')) {
// Keep new one // Keep new one
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
await this.combineConversations(conversation, existing); await this.doCombineConversations(conversation, existing);
byUuid[pni] = conversation; byUuid[pni] = conversation;
} else { } else {
// Keep existing - note that this applies if neither had an e164 // Keep existing - note that this applies if neither had an e164
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
await this.combineConversations(existing, conversation); await this.doCombineConversations(existing, conversation);
} }
} }
} }
@ -759,12 +767,12 @@ export class ConversationController {
if (conversation.get('uuid')) { if (conversation.get('uuid')) {
// Keep new one // Keep new one
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
await this.combineConversations(conversation, existing); await this.doCombineConversations(conversation, existing);
byE164[e164] = conversation; byE164[e164] = conversation;
} else { } else {
// Keep existing - note that this applies if neither had a UUID // Keep existing - note that this applies if neither had a UUID
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
await this.combineConversations(existing, conversation); await this.doCombineConversations(existing, conversation);
} }
} }
} }
@ -799,11 +807,11 @@ export class ConversationController {
!isGroupV2(existing.attributes) !isGroupV2(existing.attributes)
) { ) {
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
await this.combineConversations(conversation, existing); await this.doCombineConversations(conversation, existing);
byGroupV2Id[groupV2Id] = conversation; byGroupV2Id[groupV2Id] = conversation;
} else { } else {
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
await this.combineConversations(existing, conversation); await this.doCombineConversations(existing, conversation);
} }
} }
} }
@ -815,10 +823,18 @@ export class ConversationController {
async combineConversations( async combineConversations(
current: ConversationModel, current: ConversationModel,
obsolete: ConversationModel obsolete: ConversationModel
): Promise<void> {
return this._combineConversationsQueue.add(() =>
this.doCombineConversations(current, obsolete)
);
}
private async doCombineConversations(
current: ConversationModel,
obsolete: ConversationModel
): Promise<void> { ): Promise<void> {
const logId = `combineConversations/${obsolete.id}->${current.id}`; const logId = `combineConversations/${obsolete.id}->${current.id}`;
return this._combineConversationsQueue.add(async () => {
const conversationType = current.get('type'); const conversationType = current.get('type');
if (!this.get(obsolete.id)) { if (!this.get(obsolete.id)) {
@ -956,9 +972,7 @@ export class ConversationController {
log.warn(`${logId}: Update messages table`); log.warn(`${logId}: Update messages table`);
await migrateConversationMessages(obsoleteId, currentId); await migrateConversationMessages(obsoleteId, currentId);
log.warn( log.warn(`${logId}: Emit refreshConversation event to close old/open new`);
`${logId}: Emit refreshConversation event to close old/open new`
);
window.Whisper.events.trigger('refreshConversation', { window.Whisper.events.trigger('refreshConversation', {
newId: currentId, newId: currentId,
oldId: obsoleteId, oldId: obsoleteId,
@ -973,7 +987,6 @@ export class ConversationController {
current.captureChange('combineConversations'); current.captureChange('combineConversations');
log.warn(`${logId}: Complete!`); log.warn(`${logId}: Complete!`);
});
} }
/** /**