From 768ab26a209f688d23db0b71882b89927fe13366 Mon Sep 17 00:00:00 2001 From: Scott Nonnenberg Date: Wed, 28 Aug 2024 16:05:08 +1000 Subject: [PATCH] Conversation: savePromises for off-queue post-handleDataMessage work --- ts/models/conversations.ts | 12 +++++ ts/models/messages.ts | 72 ++++++++++++++++----------- ts/test-both/util/waitBatcher_test.ts | 55 +++++++++++++++++--- ts/util/syncTasks.ts | 16 +++++- ts/util/waitBatcher.ts | 52 +++++++++++-------- 5 files changed, 145 insertions(+), 62 deletions(-) diff --git a/ts/models/conversations.ts b/ts/models/conversations.ts index 798bb80b571..6d05331ba3b 100644 --- a/ts/models/conversations.ts +++ b/ts/models/conversations.ts @@ -300,6 +300,8 @@ export class ConversationModel extends window.Backbone private isShuttingDown = false; + private savePromises = new Set>(); + override defaults(): Partial { return { unreadCount: 0, @@ -466,6 +468,16 @@ export class ConversationModel extends window.Backbone } } + addSavePromise(promise: Promise): void { + this.savePromises.add(promise); + } + removeSavePromise(promise: Promise): void { + this.savePromises.delete(promise); + } + getSavePromises(): Array> { + return Array.from(this.savePromises); + } + toSenderKeyTarget(): SenderKeyTargetType { return { getGroupId: () => this.get('groupId'), diff --git a/ts/models/messages.ts b/ts/models/messages.ts index 22c57e34385..f006bc10ef7 100644 --- a/ts/models/messages.ts +++ b/ts/models/messages.ts @@ -157,6 +157,7 @@ import { copyQuoteContentFromOriginal, } from '../messages/copyQuote'; import { getRoomIdFromCallLink } from '../util/callLinksRingrtc'; +import { explodePromise } from '../util/explodePromise'; /* eslint-disable more/no-then */ @@ -2117,7 +2118,7 @@ export class MessageModel extends window.Backbone.Model { } log.info(`${idLog}: Batching save`); - void this.saveAndNotify(conversation, confirm); + drop(this.saveAndNotify(conversation, confirm)); } catch (error) { const errorForLog = Errors.toLogFormat(error); log.error(`${idLog}: error:`, errorForLog); @@ -2130,40 +2131,51 @@ export class MessageModel extends window.Backbone.Model { conversation: ConversationModel, confirm: () => void ): Promise { - await saveNewMessageBatcher.add(this.attributes); + const { resolve, promise } = explodePromise(); + try { + conversation.addSavePromise(promise); - log.info('Message saved', this.get('sent_at')); + await saveNewMessageBatcher.add(this.attributes); - // Once the message is saved to DB, we queue attachment downloads - await this.handleAttachmentDownloadsForNewMessage(conversation); + log.info('Message saved', this.get('sent_at')); - // We'd like to check for deletions before scheduling downloads, but if an edit comes - // in, we want to have kicked off attachment downloads for the original message. - const isFirstRun = false; - const result = await this.modifyTargetMessage(conversation, isFirstRun); - if (result === ModifyTargetMessageResult.Deleted) { + // Once the message is saved to DB, we queue attachment downloads + await this.handleAttachmentDownloadsForNewMessage(conversation); + + // We'd like to check for deletions before scheduling downloads, but if an edit + // comes in, we want to have kicked off attachment downloads for the original + // message. + const isFirstRun = false; + const result = await this.modifyTargetMessage(conversation, isFirstRun); + if (result === ModifyTargetMessageResult.Deleted) { + confirm(); + return; + } + + conversation.trigger('newmessage', this.attributes); + + if (await shouldReplyNotifyUser(this.attributes, conversation)) { + await conversation.notify(this.attributes); + } + + // Increment the sent message count if this is an outgoing message + if (this.get('type') === 'outgoing') { + conversation.incrementSentMessageCount(); + } + + window.Whisper.events.trigger('incrementProgress'); confirm(); - return; - } - conversation.trigger('newmessage', this.attributes); - - if (await shouldReplyNotifyUser(this.attributes, conversation)) { - await conversation.notify(this.attributes); - } - - // Increment the sent message count if this is an outgoing message - if (this.get('type') === 'outgoing') { - conversation.incrementSentMessageCount(); - } - - window.Whisper.events.trigger('incrementProgress'); - confirm(); - - if (!isStory(this.attributes)) { - drop( - conversation.queueJob('updateUnread', () => conversation.updateUnread()) - ); + if (!isStory(this.attributes)) { + drop( + conversation.queueJob('updateUnread', () => + conversation.updateUnread() + ) + ); + } + } finally { + resolve(); + conversation.removeSavePromise(promise); } } diff --git a/ts/test-both/util/waitBatcher_test.ts b/ts/test-both/util/waitBatcher_test.ts index 42368527ebe..a6e9b476bc8 100644 --- a/ts/test-both/util/waitBatcher_test.ts +++ b/ts/test-both/util/waitBatcher_test.ts @@ -5,11 +5,23 @@ import { assert } from 'chai'; import * as sinon from 'sinon'; import { createWaitBatcher } from '../../util/waitBatcher'; +import { drop } from '../../util/drop'; +import { sleep } from '../../util/sleep'; describe('waitBatcher', () => { - it('should schedule a full batch', async () => { - const processBatch = sinon.fake.resolves(undefined); + let processBatch: sinon.SinonSpy; + let processResults: Array>; + beforeEach(() => { + processResults = []; + processBatch = sinon.fake(async (list: Array) => { + await sleep(1); + processResults.push(list); + return undefined; + }); + }); + + it('should schedule a full batch', async () => { const batcher = createWaitBatcher({ name: 'test', wait: 10, @@ -20,11 +32,10 @@ describe('waitBatcher', () => { await Promise.all([batcher.add(1), batcher.add(2)]); assert.ok(processBatch.calledOnceWith([1, 2]), 'Full batch on first call'); + assert.deepEqual(processResults[0], [1, 2]); }); it('should schedule a partial batch', async () => { - const processBatch = sinon.fake.resolves(undefined); - const batcher = createWaitBatcher({ name: 'test', wait: 10, @@ -35,11 +46,10 @@ describe('waitBatcher', () => { await batcher.add(1); assert.ok(processBatch.calledOnceWith([1]), 'Partial batch on timeout'); + assert.deepEqual(processResults[0], [1]); }); it('should flush a partial batch', async () => { - const processBatch = sinon.fake.resolves(undefined); - const batcher = createWaitBatcher({ name: 'test', wait: 10000, @@ -53,11 +63,10 @@ describe('waitBatcher', () => { processBatch.calledOnceWith([1]), 'Partial batch on flushAndWait' ); + assert.deepEqual(processResults[0], [1]); }); it('should flush a partial batch with new items added', async () => { - const processBatch = sinon.fake.resolves(undefined); - const batcher = createWaitBatcher({ name: 'test', wait: 10000, @@ -74,7 +83,37 @@ describe('waitBatcher', () => { ]); assert(processBatch.firstCall.calledWith([1]), 'First partial batch'); + assert.deepEqual(processResults[0], [1]); assert(processBatch.secondCall.calledWith([2]), 'Second partial batch'); + assert.deepEqual(processResults[1], [2]); assert(!processBatch.thirdCall); }); + + it('#addNoopAndWait returns as soon as #2 is complete, but more have been added', async () => { + const batcher = createWaitBatcher({ + name: 'test', + wait: 1000, + maxSize: 1000, + processBatch, + }); + + drop(batcher.add(1)); + drop(batcher.add(2)); + const waitPromise = batcher.pushNoopAndWait(); + drop(batcher.add(3)); + + await waitPromise; + + assert(processBatch.firstCall.calledWith([1, 2]), 'First partial batch'); + assert.deepEqual(processResults[0], [1, 2]); + + // Typescript needs this; doesn't realize that secondCall could be set later + const { secondCall } = processBatch; + assert(!secondCall); + + await batcher.flushAndWait(); + + assert(processBatch.secondCall.calledWith([3]), 'Second partial batch'); + assert.deepEqual(processResults[1], [3]); + }); }); diff --git a/ts/util/syncTasks.ts b/ts/util/syncTasks.ts index 0c0eb0df3d4..4a5a003d3b3 100644 --- a/ts/util/syncTasks.ts +++ b/ts/util/syncTasks.ts @@ -122,7 +122,13 @@ export async function queueSyncTasks( } drop( conversation.queueJob(innerLogId, async () => { - log.info(`${innerLogId}: Starting...`); + const promises = conversation.getSavePromises(); + log.info( + `${innerLogId}: Waiting for message saves (${promises.length} items)...` + ); + await Promise.all(promises); + + log.info(`${innerLogId}: Starting delete...`); const result = await deleteConversation( conversation, mostRecentMessages, @@ -145,7 +151,13 @@ export async function queueSyncTasks( } drop( conversation.queueJob(innerLogId, async () => { - log.info(`${innerLogId}: Starting...`); + const promises = conversation.getSavePromises(); + log.info( + `${innerLogId}: Waiting for message saves (${promises.length} items)...` + ); + await Promise.all(promises); + + log.info(`${innerLogId}: Starting delete...`); const result = await deleteLocalOnlyConversation( conversation, innerLogId diff --git a/ts/util/waitBatcher.ts b/ts/util/waitBatcher.ts index 1539abe0819..e0ae23a45bd 100644 --- a/ts/util/waitBatcher.ts +++ b/ts/util/waitBatcher.ts @@ -8,6 +8,8 @@ import * as log from '../logging/log'; import * as Errors from '../types/errors'; import { clearTimeoutIfNecessary } from './clearTimeoutIfNecessary'; import { MINUTE } from './durations'; +import { drop } from './drop'; +import { explodePromise } from './explodePromise'; declare global { // We want to extend `window`'s properties, so we need an interface. @@ -52,12 +54,6 @@ type ItemHolderType = { item: ItemType; }; -type ExplodedPromiseType = { - resolve?: (value?: unknown) => void; - reject?: (error: Error) => void; - promise: Promise; -}; - type BatcherOptionsType = { name: string; wait: number; @@ -70,7 +66,8 @@ type BatcherType = { anyPending: () => boolean; onIdle: () => Promise; unregister: () => void; - flushAndWait: () => void; + flushAndWait: () => Promise; + pushNoopAndWait: () => Promise; }; export function createWaitBatcher( @@ -86,6 +83,10 @@ export function createWaitBatcher( }); async function _kickBatchOff() { + if (items.length === 0) { + return; + } + const itemsRef = items; items = []; await queue.add(async () => { @@ -106,20 +107,8 @@ export function createWaitBatcher( }); } - function _makeExplodedPromise(): ExplodedPromiseType { - let resolve; - let reject; - - const promise = new Promise((resolveParam, rejectParam) => { - resolve = resolveParam; - reject = rejectParam; - }); - - return { promise, resolve, reject }; - } - async function add(item: ItemType) { - const { promise, resolve, reject } = _makeExplodedPromise(); + const { promise, resolve, reject } = explodePromise(); items.push({ resolve, @@ -132,14 +121,14 @@ export function createWaitBatcher( // time is bounded by `options.wait` and not extended by further pushes. timeout = setTimeout(() => { timeout = null; - void _kickBatchOff(); + drop(_kickBatchOff()); }, options.wait); } if (items.length >= options.maxSize) { clearTimeoutIfNecessary(timeout); timeout = null; - void _kickBatchOff(); + drop(_kickBatchOff()); } await promise; @@ -169,6 +158,7 @@ export function createWaitBatcher( ); } + // Meant for a full shutdown of the queue async function flushAndWait() { log.info( `Flushing start ${options.name} for waitBatcher ` + @@ -190,12 +180,30 @@ export function createWaitBatcher( log.info(`Flushing complete ${options.name} for waitBatcher`); } + // Meant to let us know that we've processed jobs up to a point + async function pushNoopAndWait() { + log.info( + `Pushing no-op to ${options.name} for waitBatcher ` + + `items.length=${items.length}` + ); + + clearTimeoutIfNecessary(timeout); + timeout = null; + + drop(_kickBatchOff()); + + return queue.add(() => { + /* noop */ + }); + } + waitBatcher = { add, anyPending, onIdle, unregister, flushAndWait, + pushNoopAndWait, }; window.waitBatchers.push(waitBatcher);