Avoid race condition when marking messages read

Co-authored-by: trevor-signal <131492920+trevor-signal@users.noreply.github.com>
This commit is contained in:
automated-signal 2025-07-16 15:14:44 -05:00 committed by GitHub
parent e03ac4394e
commit 72514b73b6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 58 additions and 50 deletions

View file

@ -152,7 +152,6 @@ export async function onSync(sync: ReadSyncAttributesType): Promise<void> {
const message = window.MessageCache.register(new MessageModel(found));
const readAt = Math.min(readSync.readAt, Date.now());
const newestSentAt = readSync.timestamp;
// If message is unread, we mark it read. Otherwise, we update the expiration
// timer to the time specified by the read sync if it's earlier than
@ -168,9 +167,7 @@ export async function onSync(sync: ReadSyncAttributesType): Promise<void> {
// onReadMessage may result in messages older than this one being
// marked read. We want those messages to have the same expire timer
// start time as this one, so we pass the readAt value through.
drop(
conversation.onReadMessage(message.attributes, readAt, newestSentAt)
);
drop(conversation.onReadMessage(message.attributes, readAt));
};
// only available during initialization

View file

@ -3658,8 +3658,7 @@ export class ConversationModel extends window.Backbone
async onReadMessage(
message: MessageAttributesType,
readAt?: number,
newestSentAt?: number
readAt?: number
): Promise<void> {
// We mark as read everything older than this message - to clean up old stuff
// still marked unread in the database. If the user generally doesn't read in
@ -3673,9 +3672,7 @@ export class ConversationModel extends window.Backbone
// Lastly, we don't send read syncs for any message marked read due to a read
// sync. That's a notification explosion we don't need.
return this.queueJob('onReadMessage', () =>
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.markRead(message.received_at!, {
newestSentAt: newestSentAt || message.sent_at,
this.markRead(message, {
sendReadReceipts: false,
readAt,
})
@ -4862,16 +4859,15 @@ export class ConversationModel extends window.Backbone
}
async markRead(
newestUnreadAt: number,
readMessage: { received_at: number; sent_at: number },
options: {
readAt?: number;
sendReadReceipts: boolean;
newestSentAt?: number;
} = {
sendReadReceipts: true,
}
): Promise<void> {
await markConversationRead(this.attributes, newestUnreadAt, options);
await markConversationRead(this.attributes, readMessage, options);
this.throttledUpdateUnread();
window.reduxActions.callHistory.updateCallHistoryUnreadCount();
}

View file

@ -983,18 +983,18 @@ type WritableInterface = {
getUnreadByConversationAndMarkRead: (options: {
conversationId: string;
includeStoryReplies: boolean;
newestUnreadAt: number;
readMessageReceivedAt: number;
now?: number;
readAt?: number;
storyId?: string;
}) => GetUnreadByConversationAndMarkReadResultType;
getUnreadEditedMessagesAndMarkRead: (options: {
conversationId: string;
newestUnreadAt: number;
readMessageReceivedAt: number;
}) => GetUnreadByConversationAndMarkReadResultType;
getUnreadReactionsAndMarkRead: (options: {
conversationId: string;
newestUnreadAt: number;
readMessageReceivedAt: number;
storyId?: string;
}) => Array<ReactionResultType>;
markReactionAsRead: (

View file

@ -3113,14 +3113,14 @@ function getUnreadByConversationAndMarkRead(
{
conversationId,
includeStoryReplies,
newestUnreadAt,
readMessageReceivedAt,
storyId,
readAt,
now = Date.now(),
}: {
conversationId: string;
includeStoryReplies: boolean;
newestUnreadAt: number;
readMessageReceivedAt: number;
storyId?: string;
readAt?: number;
now?: number;
@ -3147,7 +3147,7 @@ function getUnreadByConversationAndMarkRead(
expirationStartTimestamp > ${expirationStartTimestamp}
) AND
expireTimer > 0 AND
received_at <= ${newestUnreadAt};
received_at <= ${readMessageReceivedAt};
`;
db.prepare(updateExpirationQuery).run(updateExpirationParams);
@ -3161,7 +3161,7 @@ function getUnreadByConversationAndMarkRead(
seenStatus = ${SeenStatus.Unseen} AND
isStory = 0 AND
(${_storyIdPredicate(storyId, includeStoryReplies)}) AND
received_at <= ${newestUnreadAt}
received_at <= ${readMessageReceivedAt}
ORDER BY received_at DESC, sent_at DESC;
`;
@ -3185,7 +3185,7 @@ function getUnreadByConversationAndMarkRead(
seenStatus = ${SeenStatus.Unseen} AND
isStory = 0 AND
(${_storyIdPredicate(storyId, includeStoryReplies)}) AND
received_at <= ${newestUnreadAt};
received_at <= ${readMessageReceivedAt};
`;
db.prepare(updateStatusQuery).run(updateStatusParams);
@ -3211,11 +3211,11 @@ function getUnreadReactionsAndMarkRead(
db: WritableDB,
{
conversationId,
newestUnreadAt,
readMessageReceivedAt,
storyId,
}: {
conversationId: string;
newestUnreadAt: number;
readMessageReceivedAt: number;
storyId?: string;
}
): Array<ReactionResultType> {
@ -3230,14 +3230,14 @@ function getUnreadReactionsAndMarkRead(
WHERE
reactions.conversationId IS $conversationId AND
reactions.unread > 0 AND
messages.received_at <= $newestUnreadAt AND
messages.received_at <= $readMessageReceivedAt AND
messages.storyId IS $storyId
ORDER BY messageReceivedAt DESC;
`
)
.all({
conversationId,
newestUnreadAt,
readMessageReceivedAt,
storyId: storyId || null,
});
@ -8445,10 +8445,10 @@ function getUnreadEditedMessagesAndMarkRead(
db: WritableDB,
{
conversationId,
newestUnreadAt,
readMessageReceivedAt,
}: {
conversationId: string;
newestUnreadAt: number;
readMessageReceivedAt: number;
}
): GetUnreadByConversationAndMarkReadResultType {
return db.transaction(() => {
@ -8467,7 +8467,7 @@ function getUnreadEditedMessagesAndMarkRead(
WHERE
edited_messages.readStatus = ${ReadStatus.Unread} AND
edited_messages.conversationId = ${conversationId} AND
received_at <= ${newestUnreadAt}
received_at <= ${readMessageReceivedAt}
ORDER BY messages.received_at DESC, messages.sent_at DESC;
`;

View file

@ -1468,8 +1468,7 @@ function markMessageRead(
throw new Error(`markMessageRead: failed to load message ${messageId}`);
}
await conversation.markRead(message.get('received_at'), {
newestSentAt: message.get('sent_at'),
await conversation.markRead(message.attributes, {
sendReadReceipts: true,
});

View file

@ -143,7 +143,7 @@ describe('sql/markRead', () => {
const markedRead = await getUnreadByConversationAndMarkRead({
conversationId,
newestUnreadAt: unreadStoryReply.received_at,
readMessageReceivedAt: unreadStoryReply.received_at,
readAt,
includeStoryReplies: false,
});
@ -172,7 +172,7 @@ describe('sql/markRead', () => {
const markedRead2 = await getUnreadByConversationAndMarkRead({
conversationId,
newestUnreadAt: newestUnread.received_at,
readMessageReceivedAt: newestUnread.received_at,
readAt,
includeStoryReplies: true,
});
@ -300,7 +300,7 @@ describe('sql/markRead', () => {
const markedRead = await getUnreadByConversationAndMarkRead({
conversationId,
newestUnreadAt: message7.received_at,
readMessageReceivedAt: message7.received_at,
readAt,
storyId,
includeStoryReplies: false,
@ -410,7 +410,7 @@ describe('sql/markRead', () => {
const markedRead = await getUnreadByConversationAndMarkRead({
conversationId,
newestUnreadAt: message4.received_at,
readMessageReceivedAt: message4.received_at,
readAt,
includeStoryReplies: false,
now,
@ -587,7 +587,7 @@ describe('sql/markRead', () => {
assert.lengthOf(await _getAllReactions(), 5);
const markedRead = await getUnreadReactionsAndMarkRead({
conversationId,
newestUnreadAt: reaction4.messageReceivedAt,
readMessageReceivedAt: reaction4.messageReceivedAt,
});
assert.lengthOf(markedRead, 2, 'two reactions marked read');
@ -606,7 +606,7 @@ describe('sql/markRead', () => {
const markedRead2 = await getUnreadReactionsAndMarkRead({
conversationId,
newestUnreadAt: reaction5.messageReceivedAt,
readMessageReceivedAt: reaction5.messageReceivedAt,
});
assert.lengthOf(markedRead2, 1);
@ -742,7 +742,7 @@ describe('sql/markRead', () => {
assert.lengthOf(await _getAllReactions(), 5);
const markedRead = await getUnreadReactionsAndMarkRead({
conversationId,
newestUnreadAt: reaction4.messageReceivedAt,
readMessageReceivedAt: reaction4.messageReceivedAt,
storyId,
});
@ -762,7 +762,7 @@ describe('sql/markRead', () => {
const markedRead2 = await getUnreadReactionsAndMarkRead({
conversationId,
newestUnreadAt: reaction5.messageReceivedAt,
readMessageReceivedAt: reaction5.messageReceivedAt,
storyId,
});
@ -837,7 +837,7 @@ describe('sql/markRead', () => {
const markedRead = await getUnreadByConversationAndMarkRead({
conversationId,
includeStoryReplies: false,
newestUnreadAt: message4.received_at,
readMessageReceivedAt: message4.received_at,
readAt,
});

View file

@ -25,16 +25,17 @@ import {
import { ReceiptType } from '../types/Receipt';
import type { AciString } from '../types/ServiceId';
import { isAciString } from './isAciString';
import type { MessageModel } from '../models/messages';
import { postSaveUpdates } from './cleanup';
const log = createLogger('markConversationRead');
export async function markConversationRead(
conversationAttrs: ConversationAttributesType,
newestUnreadAt: number,
readMessage: { received_at: number; sent_at: number },
options: {
readAt?: number;
sendReadReceipts: boolean;
newestSentAt?: number;
} = {
sendReadReceipts: true,
}
@ -45,26 +46,28 @@ export async function markConversationRead(
await Promise.all([
DataWriter.getUnreadByConversationAndMarkRead({
conversationId,
newestUnreadAt,
readMessageReceivedAt: readMessage.received_at,
readAt: options.readAt,
includeStoryReplies: !isGroup(conversationAttrs),
}),
DataWriter.getUnreadEditedMessagesAndMarkRead({
conversationId,
newestUnreadAt,
readMessageReceivedAt: readMessage.received_at,
}),
DataWriter.getUnreadReactionsAndMarkRead({
conversationId,
newestUnreadAt,
readMessageReceivedAt: readMessage.received_at,
}),
]);
const convoId = getConversationIdForLogging(conversationAttrs);
const logId = `markConversationRead(${convoId})`;
const logId = `(${convoId})`;
log.info(logId, {
newestSentAt: options.newestSentAt,
newestUnreadAt,
markingReadBefore: {
sentAt: readMessage.sent_at,
receivedAt: readMessage.received_at,
},
unreadMessages: unreadMessages.length,
unreadReactions: unreadReactions.length,
});
@ -103,6 +106,7 @@ export async function markConversationRead(
const allUnreadMessages = [...unreadMessages, ...unreadEditedMessages];
const updatedMessages: Array<MessageModel> = [];
const allReadMessagesSync = allUnreadMessages
.map(messageSyncData => {
const message = window.MessageCache.getById(messageSyncData.id);
@ -116,6 +120,7 @@ export async function markConversationRead(
'expirationStartTimestamp'
)
);
updatedMessages.push(message);
}
const {
@ -159,6 +164,18 @@ export async function markConversationRead(
})
.filter(isNotNil);
// We need to save any messages that are in memory, since their read status could have
// been overwritten in the DB by a message save from a stale (unread) in-memory model
if (updatedMessages.length) {
await DataWriter.saveMessages(
updatedMessages.map(msg => msg.attributes),
{
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
}
);
}
// Some messages we're marking read are local notifications with no sender or were just
// unseen and not unread.
// Also, if a message has errors, we don't want to send anything out about it:
@ -183,11 +200,11 @@ export async function markConversationRead(
}> = [...unreadMessagesSyncData, ...unreadReactionSyncData.values()];
if (readSyncs.length && options.sendReadReceipts) {
log.info(`Sending ${readSyncs.length} read syncs`);
log.info(logId, `Sending ${readSyncs.length} read syncs`);
// Because syncReadMessages sends to our other devices, and sendReadReceipts goes
// to a contact, we need accessKeys for both.
if (window.ConversationController.areWePrimaryDevice()) {
log.warn('We are primary device; not sending read syncs');
log.warn(logId, 'We are primary device; not sending read syncs');
} else {
drop(readSyncJobQueue.add({ readSyncs }));
}

View file

@ -231,7 +231,6 @@ export async function modifyTargetMessage(
const markReadAt = message.pendingMarkRead;
// eslint-disable-next-line no-param-reassign
message.pendingMarkRead = undefined;
const newestSentAt = maybeSingleReadSync?.readSync.timestamp;
// This is primarily to allow the conversation to mark all older
// messages as read, as is done when we receive a read sync for
@ -243,7 +242,7 @@ export async function modifyTargetMessage(
drop(
window.ConversationController.get(
message.get('conversationId')
)?.onReadMessage(message.attributes, markReadAt, newestSentAt)
)?.onReadMessage(message.attributes, markReadAt)
);
}