Fix for unread syncs and ooo reactions
This commit is contained in:
parent
55f0beaa6d
commit
62e04a1bbd
7 changed files with 178 additions and 102 deletions
|
@ -69,12 +69,26 @@
|
|||
if (message.isUnread()) {
|
||||
await message.markRead(readAt, { skipSave: true });
|
||||
|
||||
// onReadMessage may result in messages older than this one being
|
||||
// marked read. We want those messages to have the same expire timer
|
||||
// start time as this one, so we pass the readAt value through.
|
||||
const conversation = message.getConversation();
|
||||
if (conversation) {
|
||||
conversation.onReadMessage(message, readAt);
|
||||
const updateConversation = () => {
|
||||
// onReadMessage may result in messages older than this one being
|
||||
// marked read. We want those messages to have the same expire timer
|
||||
// start time as this one, so we pass the readAt value through.
|
||||
const conversation = message.getConversation();
|
||||
if (conversation) {
|
||||
conversation.onReadMessage(message, readAt);
|
||||
}
|
||||
};
|
||||
|
||||
if (window.startupProcessingQueue) {
|
||||
const conversation = message.getConversation();
|
||||
if (conversation) {
|
||||
window.startupProcessingQueue.add(
|
||||
conversation.get('id'),
|
||||
updateConversation
|
||||
);
|
||||
}
|
||||
} else {
|
||||
updateConversation();
|
||||
}
|
||||
} else {
|
||||
const now = Date.now();
|
||||
|
|
|
@ -9,6 +9,7 @@ import { isWindowDragElement } from './util/isWindowDragElement';
|
|||
import { assert } from './util/assert';
|
||||
|
||||
export async function startApp(): Promise<void> {
|
||||
window.startupProcessingQueue = new window.Signal.Util.StartupQueue();
|
||||
window.attachmentDownloadQueue = [];
|
||||
try {
|
||||
window.log.info('Initializing SQL in renderer');
|
||||
|
@ -2061,13 +2062,18 @@ export async function startApp(): Promise<void> {
|
|||
clearInterval(interval!);
|
||||
interval = null;
|
||||
view.onEmpty();
|
||||
|
||||
window.logAppLoadedEvent();
|
||||
window.log.info(
|
||||
'App loaded - messages:',
|
||||
messageReceiver.getProcessedCount()
|
||||
);
|
||||
if (messageReceiver) {
|
||||
window.log.info(
|
||||
'App loaded - messages:',
|
||||
messageReceiver.getProcessedCount()
|
||||
);
|
||||
}
|
||||
|
||||
window.sqlInitializer.goBackToMainProcess();
|
||||
window.Signal.Util.setBatchingStrategy(false);
|
||||
|
||||
const attachmentDownloadQueue = window.attachmentDownloadQueue || [];
|
||||
const THREE_DAYS_AGO = Date.now() - 3600 * 72 * 1000;
|
||||
const MAX_ATTACHMENT_MSGS_TO_DOWNLOAD = 250;
|
||||
|
@ -2081,7 +2087,12 @@ export async function startApp(): Promise<void> {
|
|||
attachmentsToDownload.length,
|
||||
attachmentDownloadQueue.length
|
||||
);
|
||||
window.attachmentDownloadQueue = undefined;
|
||||
|
||||
if (window.startupProcessingQueue) {
|
||||
window.startupProcessingQueue.flush();
|
||||
window.startupProcessingQueue = undefined;
|
||||
}
|
||||
|
||||
const messagesWithDownloads = await Promise.all(
|
||||
attachmentsToDownload.map(message =>
|
||||
message.queueAttachmentDownloads()
|
||||
|
|
|
@ -3532,21 +3532,6 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
|
|||
return;
|
||||
}
|
||||
|
||||
if (type === 'outgoing') {
|
||||
const receipts = window.Whisper.DeliveryReceipts.forMessage(
|
||||
conversation,
|
||||
message
|
||||
);
|
||||
receipts.forEach(receipt =>
|
||||
message.set({
|
||||
delivered: (message.get('delivered') || 0) + 1,
|
||||
delivered_to: _.union(message.get('delivered_to') || [], [
|
||||
receipt.get('deliveredTo'),
|
||||
]),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
attributes.active_at = now;
|
||||
conversation.set(attributes);
|
||||
|
||||
|
@ -3608,59 +3593,6 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
|
|||
}
|
||||
}
|
||||
|
||||
if (type === 'incoming') {
|
||||
const readSync = window.Whisper.ReadSyncs.forMessage(message);
|
||||
if (readSync) {
|
||||
if (
|
||||
message.get('expireTimer') &&
|
||||
!message.get('expirationStartTimestamp')
|
||||
) {
|
||||
message.set(
|
||||
'expirationStartTimestamp',
|
||||
Math.min(readSync.get('read_at'), Date.now())
|
||||
);
|
||||
}
|
||||
}
|
||||
if (readSync || message.isExpirationTimerUpdate()) {
|
||||
message.unset('unread');
|
||||
// This is primarily to allow the conversation to mark all older
|
||||
// messages as read, as is done when we receive a read sync for
|
||||
// a message we already know about.
|
||||
const c = message.getConversation();
|
||||
if (c) {
|
||||
c.onReadMessage(message);
|
||||
}
|
||||
} else {
|
||||
conversation.set({
|
||||
unreadCount: (conversation.get('unreadCount') || 0) + 1,
|
||||
isArchived: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (type === 'outgoing') {
|
||||
const reads = window.Whisper.ReadReceipts.forMessage(
|
||||
conversation,
|
||||
message
|
||||
);
|
||||
if (reads.length) {
|
||||
const readBy = reads.map(receipt => receipt.get('reader'));
|
||||
message.set({
|
||||
read_by: _.union(message.get('read_by'), readBy),
|
||||
});
|
||||
}
|
||||
|
||||
// A sync'd message to ourself is automatically considered read/delivered
|
||||
if (conversation.isMe()) {
|
||||
message.set({
|
||||
read_by: conversation.getRecipients(),
|
||||
delivered_to: conversation.getRecipients(),
|
||||
});
|
||||
}
|
||||
|
||||
message.set({ recipients: conversation.getRecipients() });
|
||||
}
|
||||
|
||||
if (dataMessage.profileKey) {
|
||||
const profileKey = dataMessage.profileKey.toString('base64');
|
||||
if (
|
||||
|
@ -3699,13 +3631,6 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
|
|||
});
|
||||
await message.eraseContents();
|
||||
}
|
||||
// Check for out-of-order view syncs
|
||||
if (type === 'incoming' && message.isTapToView()) {
|
||||
const viewSync = window.Whisper.ViewSyncs.forMessage(message);
|
||||
if (viewSync) {
|
||||
await message.markViewed({ fromSync: true });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const conversationTimestamp = conversation.get('timestamp');
|
||||
|
@ -3750,26 +3675,13 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
|
|||
}
|
||||
}
|
||||
|
||||
// Does this message have any pending, previously-received associated reactions?
|
||||
const reactions = window.Whisper.Reactions.forMessage(message);
|
||||
await Promise.all(
|
||||
reactions.map(reaction => message.handleReaction(reaction, false))
|
||||
);
|
||||
|
||||
// Does this message have any pending, previously-received associated
|
||||
// delete for everyone messages?
|
||||
const deletes = window.Whisper.Deletes.forMessage(message);
|
||||
await Promise.all(
|
||||
deletes.map(del =>
|
||||
window.Signal.Util.deleteForEveryone(message, del, false)
|
||||
)
|
||||
);
|
||||
await this.modifyTargetMessage(conversation, isGroupV2);
|
||||
|
||||
window.log.info(
|
||||
'handleDataMessage: Batching save for',
|
||||
message.get('sent_at')
|
||||
);
|
||||
this.saveAndNotify(conversation, confirm);
|
||||
this.saveAndNotify(conversation, isGroupV2, confirm);
|
||||
} catch (error) {
|
||||
const errorForLog = error && error.stack ? error.stack : error;
|
||||
window.log.error(
|
||||
|
@ -3785,6 +3697,7 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
|
|||
|
||||
async saveAndNotify(
|
||||
conversation: ConversationModel,
|
||||
isGroupV2: boolean,
|
||||
confirm: () => void
|
||||
): Promise<void> {
|
||||
await window.Signal.Util.saveNewMessageBatcher.add(this.attributes);
|
||||
|
@ -3793,6 +3706,8 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
|
|||
|
||||
conversation.trigger('newmessage', this);
|
||||
|
||||
await this.modifyTargetMessage(conversation, isGroupV2);
|
||||
|
||||
if (this.get('unread')) {
|
||||
await conversation.notify(this);
|
||||
}
|
||||
|
@ -3806,6 +3721,108 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
|
|||
confirm();
|
||||
}
|
||||
|
||||
async modifyTargetMessage(
|
||||
conversation: ConversationModel,
|
||||
isGroupV2: boolean
|
||||
): Promise<void> {
|
||||
// eslint-disable-next-line @typescript-eslint/no-this-alias
|
||||
const message = this;
|
||||
const type = message.get('type');
|
||||
|
||||
if (type === 'outgoing') {
|
||||
const receipts = window.Whisper.DeliveryReceipts.forMessage(
|
||||
conversation,
|
||||
message
|
||||
);
|
||||
receipts.forEach(receipt =>
|
||||
message.set({
|
||||
delivered: (message.get('delivered') || 0) + 1,
|
||||
delivered_to: _.union(message.get('delivered_to') || [], [
|
||||
receipt.get('deliveredTo'),
|
||||
]),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
if (!isGroupV2) {
|
||||
if (type === 'incoming') {
|
||||
const readSync = window.Whisper.ReadSyncs.forMessage(message);
|
||||
if (readSync) {
|
||||
if (
|
||||
message.get('expireTimer') &&
|
||||
!message.get('expirationStartTimestamp')
|
||||
) {
|
||||
message.set(
|
||||
'expirationStartTimestamp',
|
||||
Math.min(readSync.get('read_at'), Date.now())
|
||||
);
|
||||
}
|
||||
}
|
||||
if (readSync || message.isExpirationTimerUpdate()) {
|
||||
message.unset('unread');
|
||||
// This is primarily to allow the conversation to mark all older
|
||||
// messages as read, as is done when we receive a read sync for
|
||||
// a message we already know about.
|
||||
const c = message.getConversation();
|
||||
if (c) {
|
||||
c.onReadMessage(message);
|
||||
}
|
||||
} else {
|
||||
conversation.set({
|
||||
unreadCount: (conversation.get('unreadCount') || 0) + 1,
|
||||
isArchived: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (type === 'outgoing') {
|
||||
const reads = window.Whisper.ReadReceipts.forMessage(
|
||||
conversation,
|
||||
message
|
||||
);
|
||||
if (reads.length) {
|
||||
const readBy = reads.map(receipt => receipt.get('reader'));
|
||||
message.set({
|
||||
read_by: _.union(message.get('read_by'), readBy),
|
||||
});
|
||||
}
|
||||
|
||||
// A sync'd message to ourself is automatically considered read/delivered
|
||||
if (conversation.isMe()) {
|
||||
message.set({
|
||||
read_by: conversation.getRecipients(),
|
||||
delivered_to: conversation.getRecipients(),
|
||||
});
|
||||
}
|
||||
|
||||
message.set({ recipients: conversation.getRecipients() });
|
||||
}
|
||||
|
||||
// Check for out-of-order view syncs
|
||||
if (type === 'incoming' && message.isTapToView()) {
|
||||
const viewSync = window.Whisper.ViewSyncs.forMessage(message);
|
||||
if (viewSync) {
|
||||
await message.markViewed({ fromSync: true });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Does this message have any pending, previously-received associated reactions?
|
||||
const reactions = window.Whisper.Reactions.forMessage(message);
|
||||
await Promise.all(
|
||||
reactions.map(reaction => message.handleReaction(reaction, false))
|
||||
);
|
||||
|
||||
// Does this message have any pending, previously-received associated
|
||||
// delete for everyone messages?
|
||||
const deletes = window.Whisper.Deletes.forMessage(message);
|
||||
await Promise.all(
|
||||
deletes.map(del =>
|
||||
window.Signal.Util.deleteForEveryone(message, del, false)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
async handleReaction(
|
||||
reaction: typeof window.WhatIsThis,
|
||||
shouldPersist = true
|
||||
|
|
|
@ -441,6 +441,7 @@ class MessageReceiverInner extends EventTarget {
|
|||
);
|
||||
|
||||
this.cacheAndHandle(envelope, plaintext, request);
|
||||
this.processedCount += 1;
|
||||
} catch (e) {
|
||||
request.respond(500, 'Bad encrypted websocket message');
|
||||
window.log.error(
|
||||
|
@ -787,7 +788,6 @@ class MessageReceiverInner extends EventTarget {
|
|||
removeFromCache(envelope: EnvelopeClass) {
|
||||
const { id } = envelope;
|
||||
this.cacheRemoveBatcher.add(id);
|
||||
this.processedCount += 1;
|
||||
}
|
||||
|
||||
// Same as handleEnvelope, just without the decryption step. Necessary for handling
|
||||
|
|
30
ts/util/StartupQueue.ts
Normal file
30
ts/util/StartupQueue.ts
Normal file
|
@ -0,0 +1,30 @@
|
|||
// Copyright 2021 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
export class StartupQueue {
|
||||
set: Set<string>;
|
||||
|
||||
items: Array<() => void>;
|
||||
|
||||
constructor() {
|
||||
this.set = new Set();
|
||||
this.items = [];
|
||||
}
|
||||
|
||||
add(id: string, f: () => void): void {
|
||||
if (this.set.has(id)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.items.push(f);
|
||||
this.set.add(id);
|
||||
}
|
||||
|
||||
flush(): void {
|
||||
const { items } = this;
|
||||
window.log.info('StartupQueue: Processing', items.length, 'actions');
|
||||
items.forEach(f => f());
|
||||
this.items = [];
|
||||
this.set.clear();
|
||||
}
|
||||
}
|
|
@ -33,10 +33,12 @@ import {
|
|||
sessionStructureToArrayBuffer,
|
||||
} from './sessionTranslation';
|
||||
import * as zkgroup from './zkgroup';
|
||||
import { StartupQueue } from './StartupQueue';
|
||||
|
||||
export {
|
||||
GoogleChrome,
|
||||
Registration,
|
||||
StartupQueue,
|
||||
arrayBufferToObjectURL,
|
||||
combineNames,
|
||||
createBatcher,
|
||||
|
|
2
ts/window.d.ts
vendored
2
ts/window.d.ts
vendored
|
@ -94,6 +94,7 @@ import { StagedLinkPreview } from './components/conversation/StagedLinkPreview';
|
|||
import { MIMEType } from './types/MIME';
|
||||
import { ElectronLocaleType } from './util/mapToSupportLocale';
|
||||
import { SignalProtocolStore } from './LibSignalStore';
|
||||
import { StartupQueue } from './util/StartupQueue';
|
||||
|
||||
export { Long } from 'long';
|
||||
|
||||
|
@ -138,6 +139,7 @@ declare global {
|
|||
WhatIsThis: WhatIsThis;
|
||||
|
||||
attachmentDownloadQueue: Array<MessageModel> | undefined;
|
||||
startupProcessingQueue: StartupQueue | undefined;
|
||||
baseAttachmentsPath: string;
|
||||
baseStickersPath: string;
|
||||
baseTempPath: string;
|
||||
|
|
Loading…
Reference in a new issue