Check for duplicate and send delivery receipts in per-convo queue
This commit is contained in:
parent
5f58be1a29
commit
3aba4d0d06
2 changed files with 47 additions and 58 deletions
|
@ -17,11 +17,11 @@
|
||||||
'use strict';
|
'use strict';
|
||||||
|
|
||||||
const eventHandlerQueue = new window.PQueue({ concurrency: 1 });
|
const eventHandlerQueue = new window.PQueue({ concurrency: 1 });
|
||||||
const deliveryReceiptQueue = new window.PQueue({
|
Whisper.deliveryReceiptQueue = new window.PQueue({
|
||||||
concurrency: 1,
|
concurrency: 1,
|
||||||
});
|
});
|
||||||
deliveryReceiptQueue.pause();
|
Whisper.deliveryReceiptQueue.pause();
|
||||||
const deliveryReceiptBatcher = window.Signal.Util.createBatcher({
|
Whisper.deliveryReceiptBatcher = window.Signal.Util.createBatcher({
|
||||||
wait: 500,
|
wait: 500,
|
||||||
maxSize: 500,
|
maxSize: 500,
|
||||||
processBatch: async items => {
|
processBatch: async items => {
|
||||||
|
@ -1481,7 +1481,7 @@
|
||||||
serverTrustRoot: window.getServerTrustRoot(),
|
serverTrustRoot: window.getServerTrustRoot(),
|
||||||
};
|
};
|
||||||
|
|
||||||
deliveryReceiptQueue.pause(); // avoid flood of delivery receipts until we catch up
|
Whisper.deliveryReceiptQueue.pause(); // avoid flood of delivery receipts until we catch up
|
||||||
Whisper.Notifications.disable(); // avoid notification flood until empty
|
Whisper.Notifications.disable(); // avoid notification flood until empty
|
||||||
|
|
||||||
// initialize the socket and start listening for messages
|
// initialize the socket and start listening for messages
|
||||||
|
@ -1669,7 +1669,7 @@
|
||||||
}
|
}
|
||||||
}, 500);
|
}, 500);
|
||||||
|
|
||||||
deliveryReceiptQueue.start();
|
Whisper.deliveryReceiptQueue.start();
|
||||||
Whisper.Notifications.enable();
|
Whisper.Notifications.enable();
|
||||||
}
|
}
|
||||||
function onReconnect() {
|
function onReconnect() {
|
||||||
|
@ -1677,7 +1677,7 @@
|
||||||
// scenarios where we're coming back from sleep, we can get offline/online events
|
// scenarios where we're coming back from sleep, we can get offline/online events
|
||||||
// very fast, and it looks like a network blip. But we need to suppress
|
// very fast, and it looks like a network blip. But we need to suppress
|
||||||
// notifications in these scenarios too. So we listen for 'reconnect' events.
|
// notifications in these scenarios too. So we listen for 'reconnect' events.
|
||||||
deliveryReceiptQueue.pause();
|
Whisper.deliveryReceiptQueue.pause();
|
||||||
Whisper.Notifications.disable();
|
Whisper.Notifications.disable();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2028,21 +2028,6 @@
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const isDuplicate = await isMessageDuplicate({
|
|
||||||
source: data.source,
|
|
||||||
sourceDevice: data.sourceDevice,
|
|
||||||
sent_at: data.timestamp,
|
|
||||||
});
|
|
||||||
if (isDuplicate) {
|
|
||||||
window.log.warn(
|
|
||||||
'Received duplicate message',
|
|
||||||
`${data.source}.${data.sourceDevice} ${data.timestamp}`
|
|
||||||
);
|
|
||||||
confirm();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// We do this after the duplicate check because it might send a delivery receipt
|
|
||||||
const message = await initIncomingMessage(data);
|
const message = await initIncomingMessage(data);
|
||||||
|
|
||||||
const ourNumber = textsecure.storage.user.getNumber();
|
const ourNumber = textsecure.storage.user.getNumber();
|
||||||
|
@ -2219,15 +2204,8 @@
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function isMessageDuplicate(data) {
|
async function initIncomingMessage(data) {
|
||||||
const result = await getExistingMessage(data);
|
return new Whisper.Message({
|
||||||
return Boolean(result);
|
|
||||||
}
|
|
||||||
|
|
||||||
async function initIncomingMessage(data, options = {}) {
|
|
||||||
const { isError } = options;
|
|
||||||
|
|
||||||
const message = new Whisper.Message({
|
|
||||||
source: data.source,
|
source: data.source,
|
||||||
sourceDevice: data.sourceDevice,
|
sourceDevice: data.sourceDevice,
|
||||||
sent_at: data.timestamp,
|
sent_at: data.timestamp,
|
||||||
|
@ -2237,24 +2215,6 @@
|
||||||
type: 'incoming',
|
type: 'incoming',
|
||||||
unread: 1,
|
unread: 1,
|
||||||
});
|
});
|
||||||
|
|
||||||
// If we don't return early here, we can get into infinite error loops. So, no
|
|
||||||
// delivery receipts for sealed sender errors.
|
|
||||||
if (isError || !data.unidentifiedDeliveryReceived) {
|
|
||||||
return message;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Note: We both queue and batch because we want to wait until we are done processing
|
|
||||||
// incoming messages to start sending outgoing delivery receipts. The queue can be
|
|
||||||
// paused easily.
|
|
||||||
deliveryReceiptQueue.add(() => {
|
|
||||||
deliveryReceiptBatcher.add({
|
|
||||||
source: data.source,
|
|
||||||
timestamp: data.timestamp,
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
return message;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async function unlinkAndDisconnect() {
|
async function unlinkAndDisconnect() {
|
||||||
|
@ -2350,15 +2310,7 @@
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const envelope = ev.proto;
|
const envelope = ev.proto;
|
||||||
const message = await initIncomingMessage(envelope, { isError: true });
|
const message = await initIncomingMessage(envelope);
|
||||||
const isDuplicate = await isMessageDuplicate(message.attributes);
|
|
||||||
if (isDuplicate) {
|
|
||||||
ev.confirm();
|
|
||||||
window.log.warn(
|
|
||||||
`Got duplicate error for message ${message.idForLogging()}`
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const conversationId = message.get('conversationId');
|
const conversationId = message.get('conversationId');
|
||||||
const conversation = await ConversationController.getOrCreateAndWait(
|
const conversation = await ConversationController.getOrCreateAndWait(
|
||||||
|
@ -2368,6 +2320,15 @@
|
||||||
|
|
||||||
// This matches the queueing behavior used in Message.handleDataMessage
|
// This matches the queueing behavior used in Message.handleDataMessage
|
||||||
conversation.queueJob(async () => {
|
conversation.queueJob(async () => {
|
||||||
|
const existingMessage = await getExistingMessage(message.attributes);
|
||||||
|
if (existingMessage) {
|
||||||
|
ev.confirm();
|
||||||
|
window.log.warn(
|
||||||
|
`Got duplicate error for message ${message.idForLogging()}`
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const model = new Whisper.Message({
|
const model = new Whisper.Message({
|
||||||
...message.attributes,
|
...message.attributes,
|
||||||
id: window.getGuid(),
|
id: window.getGuid(),
|
||||||
|
|
|
@ -38,7 +38,7 @@
|
||||||
} = window.Signal.Stickers;
|
} = window.Signal.Stickers;
|
||||||
const { GoogleChrome } = window.Signal.Util;
|
const { GoogleChrome } = window.Signal.Util;
|
||||||
|
|
||||||
const { addStickerPackReference } = window.Signal.Data;
|
const { addStickerPackReference, getMessageBySender } = window.Signal.Data;
|
||||||
const { bytesFromString } = window.Signal.Crypto;
|
const { bytesFromString } = window.Signal.Crypto;
|
||||||
|
|
||||||
window.AccountCache = Object.create(null);
|
window.AccountCache = Object.create(null);
|
||||||
|
@ -1836,6 +1836,34 @@
|
||||||
window.log.info(
|
window.log.info(
|
||||||
`Starting handleDataMessage for message ${message.idForLogging()} in conversation ${conversation.idForLogging()}`
|
`Starting handleDataMessage for message ${message.idForLogging()} in conversation ${conversation.idForLogging()}`
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// First, check for duplicates. If we find one, stop processing here.
|
||||||
|
const existingMessage = await getMessageBySender(this.attributes, {
|
||||||
|
Message: Whisper.Message,
|
||||||
|
});
|
||||||
|
if (existingMessage) {
|
||||||
|
window.log.warn('Received duplicate message', this.idForLogging());
|
||||||
|
confirm();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send delivery receipts, but only for incoming sealed sender messages
|
||||||
|
if (
|
||||||
|
type === 'incoming' &&
|
||||||
|
this.get('unidentifiedDeliveryReceived') &&
|
||||||
|
!this.hasErrors()
|
||||||
|
) {
|
||||||
|
// Note: We both queue and batch because we want to wait until we are done
|
||||||
|
// processing incoming messages to start sending outgoing delivery receipts.
|
||||||
|
// The queue can be paused easily.
|
||||||
|
Whisper.deliveryReceiptQueue.add(() => {
|
||||||
|
Whisper.deliveryReceiptBatcher.add({
|
||||||
|
source,
|
||||||
|
timestamp: this.get('sent_at'),
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
const withQuoteReference = await this.copyFromQuotedMessage(
|
const withQuoteReference = await this.copyFromQuotedMessage(
|
||||||
initialMessage
|
initialMessage
|
||||||
);
|
);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue