Conversation: savePromises for off-queue post-handleDataMessage work

This commit is contained in:
Scott Nonnenberg 2024-08-28 16:05:08 +10:00 committed by GitHub
parent 059c971ee2
commit 768ab26a20
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 145 additions and 62 deletions

View file

@ -300,6 +300,8 @@ export class ConversationModel extends window.Backbone
private isShuttingDown = false; private isShuttingDown = false;
private savePromises = new Set<Promise<void>>();
override defaults(): Partial<ConversationAttributesType> { override defaults(): Partial<ConversationAttributesType> {
return { return {
unreadCount: 0, unreadCount: 0,
@ -466,6 +468,16 @@ export class ConversationModel extends window.Backbone
} }
} }
addSavePromise(promise: Promise<void>): void {
this.savePromises.add(promise);
}
removeSavePromise(promise: Promise<void>): void {
this.savePromises.delete(promise);
}
getSavePromises(): Array<Promise<void>> {
return Array.from(this.savePromises);
}
toSenderKeyTarget(): SenderKeyTargetType { toSenderKeyTarget(): SenderKeyTargetType {
return { return {
getGroupId: () => this.get('groupId'), getGroupId: () => this.get('groupId'),

View file

@ -157,6 +157,7 @@ import {
copyQuoteContentFromOriginal, copyQuoteContentFromOriginal,
} from '../messages/copyQuote'; } from '../messages/copyQuote';
import { getRoomIdFromCallLink } from '../util/callLinksRingrtc'; import { getRoomIdFromCallLink } from '../util/callLinksRingrtc';
import { explodePromise } from '../util/explodePromise';
/* eslint-disable more/no-then */ /* eslint-disable more/no-then */
@ -2117,7 +2118,7 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
} }
log.info(`${idLog}: Batching save`); log.info(`${idLog}: Batching save`);
void this.saveAndNotify(conversation, confirm); drop(this.saveAndNotify(conversation, confirm));
} catch (error) { } catch (error) {
const errorForLog = Errors.toLogFormat(error); const errorForLog = Errors.toLogFormat(error);
log.error(`${idLog}: error:`, errorForLog); log.error(`${idLog}: error:`, errorForLog);
@ -2130,40 +2131,51 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
conversation: ConversationModel, conversation: ConversationModel,
confirm: () => void confirm: () => void
): Promise<void> { ): Promise<void> {
await saveNewMessageBatcher.add(this.attributes); const { resolve, promise } = explodePromise<void>();
try {
conversation.addSavePromise(promise);
log.info('Message saved', this.get('sent_at')); await saveNewMessageBatcher.add(this.attributes);
// Once the message is saved to DB, we queue attachment downloads log.info('Message saved', this.get('sent_at'));
await this.handleAttachmentDownloadsForNewMessage(conversation);
// We'd like to check for deletions before scheduling downloads, but if an edit comes // Once the message is saved to DB, we queue attachment downloads
// in, we want to have kicked off attachment downloads for the original message. await this.handleAttachmentDownloadsForNewMessage(conversation);
const isFirstRun = false;
const result = await this.modifyTargetMessage(conversation, isFirstRun); // We'd like to check for deletions before scheduling downloads, but if an edit
if (result === ModifyTargetMessageResult.Deleted) { // comes in, we want to have kicked off attachment downloads for the original
// message.
const isFirstRun = false;
const result = await this.modifyTargetMessage(conversation, isFirstRun);
if (result === ModifyTargetMessageResult.Deleted) {
confirm();
return;
}
conversation.trigger('newmessage', this.attributes);
if (await shouldReplyNotifyUser(this.attributes, conversation)) {
await conversation.notify(this.attributes);
}
// Increment the sent message count if this is an outgoing message
if (this.get('type') === 'outgoing') {
conversation.incrementSentMessageCount();
}
window.Whisper.events.trigger('incrementProgress');
confirm(); confirm();
return;
}
conversation.trigger('newmessage', this.attributes); if (!isStory(this.attributes)) {
drop(
if (await shouldReplyNotifyUser(this.attributes, conversation)) { conversation.queueJob('updateUnread', () =>
await conversation.notify(this.attributes); conversation.updateUnread()
} )
);
// Increment the sent message count if this is an outgoing message }
if (this.get('type') === 'outgoing') { } finally {
conversation.incrementSentMessageCount(); resolve();
} conversation.removeSavePromise(promise);
window.Whisper.events.trigger('incrementProgress');
confirm();
if (!isStory(this.attributes)) {
drop(
conversation.queueJob('updateUnread', () => conversation.updateUnread())
);
} }
} }

View file

@ -5,11 +5,23 @@ import { assert } from 'chai';
import * as sinon from 'sinon'; import * as sinon from 'sinon';
import { createWaitBatcher } from '../../util/waitBatcher'; import { createWaitBatcher } from '../../util/waitBatcher';
import { drop } from '../../util/drop';
import { sleep } from '../../util/sleep';
describe('waitBatcher', () => { describe('waitBatcher', () => {
it('should schedule a full batch', async () => { let processBatch: sinon.SinonSpy;
const processBatch = sinon.fake.resolves(undefined); let processResults: Array<Array<number>>;
beforeEach(() => {
processResults = [];
processBatch = sinon.fake(async (list: Array<number>) => {
await sleep(1);
processResults.push(list);
return undefined;
});
});
it('should schedule a full batch', async () => {
const batcher = createWaitBatcher<number>({ const batcher = createWaitBatcher<number>({
name: 'test', name: 'test',
wait: 10, wait: 10,
@ -20,11 +32,10 @@ describe('waitBatcher', () => {
await Promise.all([batcher.add(1), batcher.add(2)]); await Promise.all([batcher.add(1), batcher.add(2)]);
assert.ok(processBatch.calledOnceWith([1, 2]), 'Full batch on first call'); assert.ok(processBatch.calledOnceWith([1, 2]), 'Full batch on first call');
assert.deepEqual(processResults[0], [1, 2]);
}); });
it('should schedule a partial batch', async () => { it('should schedule a partial batch', async () => {
const processBatch = sinon.fake.resolves(undefined);
const batcher = createWaitBatcher<number>({ const batcher = createWaitBatcher<number>({
name: 'test', name: 'test',
wait: 10, wait: 10,
@ -35,11 +46,10 @@ describe('waitBatcher', () => {
await batcher.add(1); await batcher.add(1);
assert.ok(processBatch.calledOnceWith([1]), 'Partial batch on timeout'); assert.ok(processBatch.calledOnceWith([1]), 'Partial batch on timeout');
assert.deepEqual(processResults[0], [1]);
}); });
it('should flush a partial batch', async () => { it('should flush a partial batch', async () => {
const processBatch = sinon.fake.resolves(undefined);
const batcher = createWaitBatcher<number>({ const batcher = createWaitBatcher<number>({
name: 'test', name: 'test',
wait: 10000, wait: 10000,
@ -53,11 +63,10 @@ describe('waitBatcher', () => {
processBatch.calledOnceWith([1]), processBatch.calledOnceWith([1]),
'Partial batch on flushAndWait' 'Partial batch on flushAndWait'
); );
assert.deepEqual(processResults[0], [1]);
}); });
it('should flush a partial batch with new items added', async () => { it('should flush a partial batch with new items added', async () => {
const processBatch = sinon.fake.resolves(undefined);
const batcher = createWaitBatcher<number>({ const batcher = createWaitBatcher<number>({
name: 'test', name: 'test',
wait: 10000, wait: 10000,
@ -74,7 +83,37 @@ describe('waitBatcher', () => {
]); ]);
assert(processBatch.firstCall.calledWith([1]), 'First partial batch'); assert(processBatch.firstCall.calledWith([1]), 'First partial batch');
assert.deepEqual(processResults[0], [1]);
assert(processBatch.secondCall.calledWith([2]), 'Second partial batch'); assert(processBatch.secondCall.calledWith([2]), 'Second partial batch');
assert.deepEqual(processResults[1], [2]);
assert(!processBatch.thirdCall); assert(!processBatch.thirdCall);
}); });
it('#addNoopAndWait returns as soon as #2 is complete, but more have been added', async () => {
const batcher = createWaitBatcher<number>({
name: 'test',
wait: 1000,
maxSize: 1000,
processBatch,
});
drop(batcher.add(1));
drop(batcher.add(2));
const waitPromise = batcher.pushNoopAndWait();
drop(batcher.add(3));
await waitPromise;
assert(processBatch.firstCall.calledWith([1, 2]), 'First partial batch');
assert.deepEqual(processResults[0], [1, 2]);
// Typescript needs this; doesn't realize that secondCall could be set later
const { secondCall } = processBatch;
assert(!secondCall);
await batcher.flushAndWait();
assert(processBatch.secondCall.calledWith([3]), 'Second partial batch');
assert.deepEqual(processResults[1], [3]);
});
}); });

View file

@ -122,7 +122,13 @@ export async function queueSyncTasks(
} }
drop( drop(
conversation.queueJob(innerLogId, async () => { conversation.queueJob(innerLogId, async () => {
log.info(`${innerLogId}: Starting...`); const promises = conversation.getSavePromises();
log.info(
`${innerLogId}: Waiting for message saves (${promises.length} items)...`
);
await Promise.all(promises);
log.info(`${innerLogId}: Starting delete...`);
const result = await deleteConversation( const result = await deleteConversation(
conversation, conversation,
mostRecentMessages, mostRecentMessages,
@ -145,7 +151,13 @@ export async function queueSyncTasks(
} }
drop( drop(
conversation.queueJob(innerLogId, async () => { conversation.queueJob(innerLogId, async () => {
log.info(`${innerLogId}: Starting...`); const promises = conversation.getSavePromises();
log.info(
`${innerLogId}: Waiting for message saves (${promises.length} items)...`
);
await Promise.all(promises);
log.info(`${innerLogId}: Starting delete...`);
const result = await deleteLocalOnlyConversation( const result = await deleteLocalOnlyConversation(
conversation, conversation,
innerLogId innerLogId

View file

@ -8,6 +8,8 @@ import * as log from '../logging/log';
import * as Errors from '../types/errors'; import * as Errors from '../types/errors';
import { clearTimeoutIfNecessary } from './clearTimeoutIfNecessary'; import { clearTimeoutIfNecessary } from './clearTimeoutIfNecessary';
import { MINUTE } from './durations'; import { MINUTE } from './durations';
import { drop } from './drop';
import { explodePromise } from './explodePromise';
declare global { declare global {
// We want to extend `window`'s properties, so we need an interface. // We want to extend `window`'s properties, so we need an interface.
@ -52,12 +54,6 @@ type ItemHolderType<ItemType> = {
item: ItemType; item: ItemType;
}; };
type ExplodedPromiseType = {
resolve?: (value?: unknown) => void;
reject?: (error: Error) => void;
promise: Promise<unknown>;
};
type BatcherOptionsType<ItemType> = { type BatcherOptionsType<ItemType> = {
name: string; name: string;
wait: number; wait: number;
@ -70,7 +66,8 @@ type BatcherType<ItemType> = {
anyPending: () => boolean; anyPending: () => boolean;
onIdle: () => Promise<void>; onIdle: () => Promise<void>;
unregister: () => void; unregister: () => void;
flushAndWait: () => void; flushAndWait: () => Promise<void>;
pushNoopAndWait: () => Promise<void>;
}; };
export function createWaitBatcher<ItemType>( export function createWaitBatcher<ItemType>(
@ -86,6 +83,10 @@ export function createWaitBatcher<ItemType>(
}); });
async function _kickBatchOff() { async function _kickBatchOff() {
if (items.length === 0) {
return;
}
const itemsRef = items; const itemsRef = items;
items = []; items = [];
await queue.add(async () => { await queue.add(async () => {
@ -106,20 +107,8 @@ export function createWaitBatcher<ItemType>(
}); });
} }
function _makeExplodedPromise(): ExplodedPromiseType {
let resolve;
let reject;
const promise = new Promise((resolveParam, rejectParam) => {
resolve = resolveParam;
reject = rejectParam;
});
return { promise, resolve, reject };
}
async function add(item: ItemType) { async function add(item: ItemType) {
const { promise, resolve, reject } = _makeExplodedPromise(); const { promise, resolve, reject } = explodePromise();
items.push({ items.push({
resolve, resolve,
@ -132,14 +121,14 @@ export function createWaitBatcher<ItemType>(
// time is bounded by `options.wait` and not extended by further pushes. // time is bounded by `options.wait` and not extended by further pushes.
timeout = setTimeout(() => { timeout = setTimeout(() => {
timeout = null; timeout = null;
void _kickBatchOff(); drop(_kickBatchOff());
}, options.wait); }, options.wait);
} }
if (items.length >= options.maxSize) { if (items.length >= options.maxSize) {
clearTimeoutIfNecessary(timeout); clearTimeoutIfNecessary(timeout);
timeout = null; timeout = null;
void _kickBatchOff(); drop(_kickBatchOff());
} }
await promise; await promise;
@ -169,6 +158,7 @@ export function createWaitBatcher<ItemType>(
); );
} }
// Meant for a full shutdown of the queue
async function flushAndWait() { async function flushAndWait() {
log.info( log.info(
`Flushing start ${options.name} for waitBatcher ` + `Flushing start ${options.name} for waitBatcher ` +
@ -190,12 +180,30 @@ export function createWaitBatcher<ItemType>(
log.info(`Flushing complete ${options.name} for waitBatcher`); log.info(`Flushing complete ${options.name} for waitBatcher`);
} }
// Meant to let us know that we've processed jobs up to a point
async function pushNoopAndWait() {
log.info(
`Pushing no-op to ${options.name} for waitBatcher ` +
`items.length=${items.length}`
);
clearTimeoutIfNecessary(timeout);
timeout = null;
drop(_kickBatchOff());
return queue.add(() => {
/* noop */
});
}
waitBatcher = { waitBatcher = {
add, add,
anyPending, anyPending,
onIdle, onIdle,
unregister, unregister,
flushAndWait, flushAndWait,
pushNoopAndWait,
}; };
window.waitBatchers.push(waitBatcher); window.waitBatchers.push(waitBatcher);