From f5a3d4bc8ac5b2343cafd07a8d7909773c5ceb0f Mon Sep 17 00:00:00 2001 From: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com> Date: Mon, 16 Aug 2021 17:16:00 -0700 Subject: [PATCH] Fix various read sync inconsistencies --- ts/messageModifiers/ReadSyncs.ts | 8 ++- ts/models/messages.ts | 83 ++++++++++++++++++-------------- ts/util/StartupQueue.ts | 47 +++++++++++------- 3 files changed, 79 insertions(+), 59 deletions(-) diff --git a/ts/messageModifiers/ReadSyncs.ts b/ts/messageModifiers/ReadSyncs.ts index f685c5b0e7..88fe9637b3 100644 --- a/ts/messageModifiers/ReadSyncs.ts +++ b/ts/messageModifiers/ReadSyncs.ts @@ -103,7 +103,7 @@ export class ReadSyncs extends Collection { window.Whisper.Notifications.removeBy({ messageId: found.id }); const message = window.MessageController.register(found.id, found); - const readAt = sync.get('readAt'); + const readAt = Math.min(sync.get('readAt'), Date.now()); // If message is unread, we mark it read. Otherwise, we update the expiration // timer to the time specified by the read sync if it's earlier than @@ -116,10 +116,7 @@ export class ReadSyncs extends Collection { // onReadMessage may result in messages older than this one being // marked read. We want those messages to have the same expire timer // start time as this one, so we pass the readAt value through. - const conversation = message.getConversation(); - if (conversation) { - conversation.onReadMessage(message, readAt); - } + message.getConversation()?.onReadMessage(message, readAt); }; if (window.startupProcessingQueue) { @@ -127,6 +124,7 @@ export class ReadSyncs extends Collection { if (conversation) { window.startupProcessingQueue.add( conversation.get('id'), + message.get('sent_at'), updateConversation ); } diff --git a/ts/models/messages.ts b/ts/models/messages.ts index 1e2912b730..3aec6d2941 100644 --- a/ts/models/messages.ts +++ b/ts/models/messages.ts @@ -185,6 +185,8 @@ export class MessageModel extends window.Backbone.Model { isSelected?: boolean; + private pendingMarkRead?: number; + syncPromise?: Promise; initialize(attributes: unknown): void { @@ -3284,54 +3286,63 @@ export class MessageModel extends window.Backbone.Model { const viewSyncs = ViewSyncs.getSingleton().forMessage(message); - if ( - (readSyncs.length !== 0 || viewSyncs.length !== 0) && - message.get('expireTimer') - ) { - const existingExpirationStartTimestamp = message.get( - 'expirationStartTimestamp' - ); - const candidateTimestamps: Array = [ + if (readSyncs.length !== 0 || viewSyncs.length !== 0) { + const markReadAt = Math.min( Date.now(), - ...(existingExpirationStartTimestamp - ? [existingExpirationStartTimestamp] - : []), ...readSyncs.map(sync => sync.get('readAt')), - ...viewSyncs.map(sync => sync.get('viewedAt')), - ]; - message.set( - 'expirationStartTimestamp', - Math.min(...candidateTimestamps) + ...viewSyncs.map(sync => sync.get('viewedAt')) ); - changed = true; - } - let newReadStatus: undefined | ReadStatus.Read | ReadStatus.Viewed; - if (viewSyncs.length) { - newReadStatus = ReadStatus.Viewed; - } else if (readSyncs.length) { - newReadStatus = ReadStatus.Read; - } - - if (newReadStatus !== undefined) { - message.set('readStatus', newReadStatus); - // This is primarily to allow the conversation to mark all older - // messages as read, as is done when we receive a read sync for - // a message we already know about. - const c = message.getConversation(); - if (c) { - c.onReadMessage(message); + if (message.get('expireTimer')) { + const existingExpirationStartTimestamp = message.get( + 'expirationStartTimestamp' + ); + message.set( + 'expirationStartTimestamp', + Math.min(existingExpirationStartTimestamp ?? Date.now(), markReadAt) + ); + changed = true; } - changed = true; - } - if (isFirstRun && !viewSyncs.length && !readSyncs.length) { + let newReadStatus: ReadStatus.Read | ReadStatus.Viewed; + if (viewSyncs.length) { + newReadStatus = ReadStatus.Viewed; + } else { + strictAssert( + readSyncs.length !== 0, + 'Should have either view or read syncs' + ); + newReadStatus = ReadStatus.Read; + } + + message.set('readStatus', newReadStatus); + changed = true; + + this.pendingMarkRead = Math.min( + this.pendingMarkRead ?? Date.now(), + markReadAt + ); + } else if (isFirstRun) { conversation.set({ unreadCount: (conversation.get('unreadCount') || 0) + 1, isArchived: false, }); } + if (!isFirstRun && this.pendingMarkRead) { + const markReadAt = this.pendingMarkRead; + this.pendingMarkRead = undefined; + + // This is primarily to allow the conversation to mark all older + // messages as read, as is done when we receive a read sync for + // a message we already know about. + // + // We run this when `isFirstRun` is false so that it triggers when the + // message and the other ones accompanying it in the batch are fully in + // the database. + message.getConversation()?.onReadMessage(message, markReadAt); + } + // Check for out-of-order view once open syncs if (isTapToView(message.attributes)) { const viewOnceOpenSync = ViewOnceOpenSyncs.getSingleton().forMessage( diff --git a/ts/util/StartupQueue.ts b/ts/util/StartupQueue.ts index 3c61ce3c4c..aec3a775f3 100644 --- a/ts/util/StartupQueue.ts +++ b/ts/util/StartupQueue.ts @@ -1,30 +1,41 @@ // Copyright 2021 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only +/* eslint-disable no-restricted-syntax */ + +import * as Errors from '../types/errors'; + +type EntryType = Readonly<{ + value: number; + callback(): void; +}>; export class StartupQueue { - set: Set; + private readonly map = new Map(); - items: Array<() => void>; - - constructor() { - this.set = new Set(); - this.items = []; - } - - add(id: string, f: () => void): void { - if (this.set.has(id)) { + public add(id: string, value: number, f: () => void): void { + const existing = this.map.get(id); + if (existing && existing.value >= value) { return; } - this.items.push(f); - this.set.add(id); + this.map.set(id, { value, callback: f }); } - flush(): void { - const { items } = this; - window.log.info('StartupQueue: Processing', items.length, 'actions'); - items.forEach(f => f()); - this.items = []; - this.set.clear(); + public flush(): void { + window.log.info('StartupQueue: Processing', this.map.size, 'actions'); + + const values = Array.from(this.map.values()); + this.map.clear(); + + for (const { callback } of values) { + try { + callback(); + } catch (error) { + window.log.error( + 'StartupQueue: Failed to process item due to error', + Errors.toLogFormat(error) + ); + } + } } }