Do not confirm messages until we have handled them
This commit is contained in:
parent
29aa188c0f
commit
04f716986c
16 changed files with 990 additions and 960 deletions
|
@ -1,50 +1,52 @@
|
|||
// Copyright 2017 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
/* eslint-disable max-classes-per-file */
|
||||
|
||||
import { Collection, Model } from 'backbone';
|
||||
|
||||
import type { AciString } from '../types/ServiceId';
|
||||
import type { MessageModel } from '../models/messages';
|
||||
import * as Errors from '../types/errors';
|
||||
import * as log from '../logging/log';
|
||||
import { StartupQueue } from '../util/StartupQueue';
|
||||
import { getMessageIdForLogging } from '../util/idForLogging';
|
||||
import { getMessageSentTimestamp } from '../util/getMessageSentTimestamp';
|
||||
import { isIncoming } from '../state/selectors/message';
|
||||
import { isMessageUnread } from '../util/isMessageUnread';
|
||||
import { notificationService } from '../services/notifications';
|
||||
import * as log from '../logging/log';
|
||||
import * as Errors from '../types/errors';
|
||||
import type { AciString } from '../types/ServiceId';
|
||||
import { StartupQueue } from '../util/StartupQueue';
|
||||
import { queueUpdateMessage } from '../util/messageBatcher';
|
||||
import { getMessageSentTimestamp } from '../util/getMessageSentTimestamp';
|
||||
|
||||
export type ReadSyncAttributesType = {
|
||||
senderId: string;
|
||||
envelopeId: string;
|
||||
readAt: number;
|
||||
removeFromMessageReceiverCache: () => unknown;
|
||||
sender?: string;
|
||||
senderAci: AciString;
|
||||
senderId: string;
|
||||
timestamp: number;
|
||||
readAt: number;
|
||||
};
|
||||
|
||||
class ReadSyncModel extends Model<ReadSyncAttributesType> {}
|
||||
const readSyncs = new Map<string, ReadSyncAttributesType>();
|
||||
|
||||
let singleton: ReadSyncs | undefined;
|
||||
function remove(sync: ReadSyncAttributesType): void {
|
||||
readSyncs.delete(sync.envelopeId);
|
||||
sync.removeFromMessageReceiverCache();
|
||||
}
|
||||
|
||||
async function maybeItIsAReactionReadSync(
|
||||
sync: ReadSyncAttributesType
|
||||
): Promise<void> {
|
||||
const logId = `ReadSyncs.onSync(timestamp=${sync.timestamp})`;
|
||||
|
||||
async function maybeItIsAReactionReadSync(sync: ReadSyncModel): Promise<void> {
|
||||
const readReaction = await window.Signal.Data.markReactionAsRead(
|
||||
sync.get('senderAci'),
|
||||
Number(sync.get('timestamp'))
|
||||
sync.senderAci,
|
||||
Number(sync.timestamp)
|
||||
);
|
||||
|
||||
if (!readReaction) {
|
||||
log.info(
|
||||
'Nothing found for read sync',
|
||||
sync.get('senderId'),
|
||||
sync.get('sender'),
|
||||
sync.get('senderAci'),
|
||||
sync.get('timestamp')
|
||||
);
|
||||
log.info(`${logId} not found:`, sync.senderId, sync.sender, sync.senderAci);
|
||||
return;
|
||||
}
|
||||
|
||||
remove(sync);
|
||||
|
||||
notificationService.removeBy({
|
||||
conversationId: readReaction.conversationId,
|
||||
emoji: readReaction.emoji,
|
||||
|
@ -53,109 +55,110 @@ async function maybeItIsAReactionReadSync(sync: ReadSyncModel): Promise<void> {
|
|||
});
|
||||
}
|
||||
|
||||
export class ReadSyncs extends Collection {
|
||||
static getSingleton(): ReadSyncs {
|
||||
if (!singleton) {
|
||||
singleton = new ReadSyncs();
|
||||
}
|
||||
export function forMessage(
|
||||
message: MessageModel
|
||||
): ReadSyncAttributesType | null {
|
||||
const logId = `ReadSyncs.forMessage(${getMessageIdForLogging(
|
||||
message.attributes
|
||||
)})`;
|
||||
|
||||
return singleton;
|
||||
const sender = window.ConversationController.lookupOrCreate({
|
||||
e164: message.get('source'),
|
||||
serviceId: message.get('sourceServiceId'),
|
||||
reason: logId,
|
||||
});
|
||||
const messageTimestamp = getMessageSentTimestamp(message.attributes, {
|
||||
log,
|
||||
});
|
||||
const readSyncValues = Array.from(readSyncs.values());
|
||||
const foundSync = readSyncValues.find(item => {
|
||||
return item.senderId === sender?.id && item.timestamp === messageTimestamp;
|
||||
});
|
||||
if (foundSync) {
|
||||
log.info(
|
||||
`${logId}: Found early read sync for message ${foundSync.timestamp}`
|
||||
);
|
||||
remove(foundSync);
|
||||
return foundSync;
|
||||
}
|
||||
|
||||
forMessage(message: MessageModel): ReadSyncModel | null {
|
||||
const sender = window.ConversationController.lookupOrCreate({
|
||||
e164: message.get('source'),
|
||||
serviceId: message.get('sourceServiceId'),
|
||||
reason: 'ReadSyncs.forMessage',
|
||||
});
|
||||
const messageTimestamp = getMessageSentTimestamp(message.attributes, {
|
||||
log,
|
||||
});
|
||||
const sync = this.find(item => {
|
||||
return (
|
||||
item.get('senderId') === sender?.id &&
|
||||
item.get('timestamp') === messageTimestamp
|
||||
);
|
||||
});
|
||||
if (sync) {
|
||||
log.info(`Found early read sync for message ${sync.get('timestamp')}`);
|
||||
this.remove(sync);
|
||||
return sync;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
export async function onSync(sync: ReadSyncAttributesType): Promise<void> {
|
||||
readSyncs.set(sync.envelopeId, sync);
|
||||
|
||||
async onSync(sync: ReadSyncModel): Promise<void> {
|
||||
try {
|
||||
const messages = await window.Signal.Data.getMessagesBySentAt(
|
||||
sync.get('timestamp')
|
||||
);
|
||||
const logId = `ReadSyncs.onSync(timestamp=${sync.timestamp})`;
|
||||
|
||||
const found = messages.find(item => {
|
||||
const sender = window.ConversationController.lookupOrCreate({
|
||||
e164: item.source,
|
||||
serviceId: item.sourceServiceId,
|
||||
reason: 'ReadSyncs.onSync',
|
||||
});
|
||||
try {
|
||||
const messages = await window.Signal.Data.getMessagesBySentAt(
|
||||
sync.timestamp
|
||||
);
|
||||
|
||||
return isIncoming(item) && sender?.id === sync.get('senderId');
|
||||
const found = messages.find(item => {
|
||||
const sender = window.ConversationController.lookupOrCreate({
|
||||
e164: item.source,
|
||||
serviceId: item.sourceServiceId,
|
||||
reason: logId,
|
||||
});
|
||||
|
||||
if (!found) {
|
||||
await maybeItIsAReactionReadSync(sync);
|
||||
return;
|
||||
}
|
||||
return isIncoming(item) && sender?.id === sync.senderId;
|
||||
});
|
||||
|
||||
notificationService.removeBy({ messageId: found.id });
|
||||
if (!found) {
|
||||
await maybeItIsAReactionReadSync(sync);
|
||||
return;
|
||||
}
|
||||
|
||||
const message = window.MessageController.register(found.id, found);
|
||||
const readAt = Math.min(sync.get('readAt'), Date.now());
|
||||
notificationService.removeBy({ messageId: found.id });
|
||||
|
||||
// If message is unread, we mark it read. Otherwise, we update the expiration
|
||||
// timer to the time specified by the read sync if it's earlier than
|
||||
// the previous read time.
|
||||
if (isMessageUnread(message.attributes)) {
|
||||
// TODO DESKTOP-1509: use MessageUpdater.markRead once this is TS
|
||||
message.markRead(readAt, { skipSave: true });
|
||||
const message = window.MessageController.register(found.id, found);
|
||||
const readAt = Math.min(sync.readAt, Date.now());
|
||||
|
||||
const updateConversation = async () => {
|
||||
// onReadMessage may result in messages older than this one being
|
||||
// marked read. We want those messages to have the same expire timer
|
||||
// start time as this one, so we pass the readAt value through.
|
||||
void message.getConversation()?.onReadMessage(message, readAt);
|
||||
};
|
||||
// If message is unread, we mark it read. Otherwise, we update the expiration
|
||||
// timer to the time specified by the read sync if it's earlier than
|
||||
// the previous read time.
|
||||
if (isMessageUnread(message.attributes)) {
|
||||
// TODO DESKTOP-1509: use MessageUpdater.markRead once this is TS
|
||||
message.markRead(readAt, { skipSave: true });
|
||||
|
||||
// only available during initialization
|
||||
if (StartupQueue.isAvailable()) {
|
||||
const conversation = message.getConversation();
|
||||
if (conversation) {
|
||||
StartupQueue.add(
|
||||
conversation.get('id'),
|
||||
message.get('sent_at'),
|
||||
updateConversation
|
||||
);
|
||||
}
|
||||
} else {
|
||||
// not awaiting since we don't want to block work happening in the
|
||||
// eventHandlerQueue
|
||||
void updateConversation();
|
||||
const updateConversation = async () => {
|
||||
// onReadMessage may result in messages older than this one being
|
||||
// marked read. We want those messages to have the same expire timer
|
||||
// start time as this one, so we pass the readAt value through.
|
||||
void message.getConversation()?.onReadMessage(message, readAt);
|
||||
};
|
||||
|
||||
// only available during initialization
|
||||
if (StartupQueue.isAvailable()) {
|
||||
const conversation = message.getConversation();
|
||||
if (conversation) {
|
||||
StartupQueue.add(
|
||||
conversation.get('id'),
|
||||
message.get('sent_at'),
|
||||
updateConversation
|
||||
);
|
||||
}
|
||||
} else {
|
||||
const now = Date.now();
|
||||
const existingTimestamp = message.get('expirationStartTimestamp');
|
||||
const expirationStartTimestamp = Math.min(
|
||||
now,
|
||||
Math.min(existingTimestamp || now, readAt || now)
|
||||
);
|
||||
message.set({ expirationStartTimestamp });
|
||||
// not awaiting since we don't want to block work happening in the
|
||||
// eventHandlerQueue
|
||||
void updateConversation();
|
||||
}
|
||||
|
||||
queueUpdateMessage(message.attributes);
|
||||
|
||||
this.remove(sync);
|
||||
} catch (error) {
|
||||
log.error('ReadSyncs.onSync error:', Errors.toLogFormat(error));
|
||||
} else {
|
||||
const now = Date.now();
|
||||
const existingTimestamp = message.get('expirationStartTimestamp');
|
||||
const expirationStartTimestamp = Math.min(
|
||||
now,
|
||||
Math.min(existingTimestamp || now, readAt || now)
|
||||
);
|
||||
message.set({ expirationStartTimestamp });
|
||||
}
|
||||
|
||||
queueUpdateMessage(message.attributes);
|
||||
|
||||
remove(sync);
|
||||
} catch (error) {
|
||||
remove(sync);
|
||||
log.error(`${logId} error:`, Errors.toLogFormat(error));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue