Move logic into handleDataMessage for proper queuing

This commit is contained in:
Scott Nonnenberg 2020-02-14 13:28:35 -08:00 committed by GitHub
parent afddc852cc
commit 7ca0dfdfbe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 116 additions and 128 deletions

View file

@ -2071,6 +2071,9 @@
return confirm(); return confirm();
} }
// Note: We do very little in this function, since everything in handleDataMessage is
// inside a conversation-specific queue(). Any code here might run before an earlier
// message is processed in handleDataMessage().
async function onMessageReceived(event) { async function onMessageReceived(event) {
const { data, confirm } = event; const { data, confirm } = event;
@ -2090,27 +2093,6 @@
const message = await initIncomingMessage(data); const message = await initIncomingMessage(data);
const ourNumber = textsecure.storage.user.getNumber();
const isGroupUpdate =
data.message.group &&
data.message.group.type !== textsecure.protobuf.GroupContext.Type.DELIVER;
const conversation = ConversationController.get(messageDescriptor.id);
// We drop messages for groups we already know about, which we're not a part of,
// except for group updates
if (
conversation &&
!conversation.isPrivate() &&
!conversation.hasMember(ourNumber) &&
!isGroupUpdate
) {
window.log.warn(
`Received message destined for group ${conversation.idForLogging()}, which we're not a part of. Dropping.`
);
confirm();
return;
}
await ConversationController.getOrCreateAndWait( await ConversationController.getOrCreateAndWait(
messageDescriptor.id, messageDescriptor.id,
messageDescriptor.type messageDescriptor.type
@ -2134,12 +2116,9 @@
} }
// Don't wait for handleDataMessage, as it has its own per-conversation queueing // Don't wait for handleDataMessage, as it has its own per-conversation queueing
message.handleDataMessage(data.message, event.confirm, { message.handleDataMessage(data.message, event.confirm);
initialLoadComplete,
});
} }
// Sent:
async function handleMessageSentProfileUpdate({ async function handleMessageSentProfileUpdate({
data, data,
confirm, confirm,
@ -2196,6 +2175,9 @@
}); });
} }
// Note: We do very little in this function, since everything in handleDataMessage is
// inside a conversation-specific queue(). Any code here might run before an earlier
// message is processed in handleDataMessage().
async function onSentMessage(event) { async function onSentMessage(event) {
const { data, confirm } = event; const { data, confirm } = event;
@ -2214,44 +2196,8 @@
} }
const message = await createSentMessage(data); const message = await createSentMessage(data);
const existing = await getExistingMessage(message.attributes);
const isUpdate = Boolean(data.isRecipientUpdate);
if (isUpdate && existing) { if (data.message.reaction) {
event.confirm();
let sentTo = [];
let unidentifiedDeliveries = [];
if (Array.isArray(data.unidentifiedStatus)) {
sentTo = data.unidentifiedStatus.map(item => item.destination);
const unidentified = _.filter(data.unidentifiedStatus, item =>
Boolean(item.unidentified)
);
unidentifiedDeliveries = unidentified.map(item => item.destination);
}
existing.set({
sent_to: _.union(existing.get('sent_to'), sentTo),
unidentifiedDeliveries: _.union(
existing.get('unidentifiedDeliveries'),
unidentifiedDeliveries
),
});
await window.Signal.Data.saveMessage(existing.attributes, {
Message: Whisper.Message,
});
} else if (isUpdate) {
window.log.warn(
`onSentMessage: Received update transcript, but no existing entry for message ${message.idForLogging()}. Dropping.`
);
event.confirm();
} else if (existing) {
window.log.warn(
`onSentMessage: Received duplicate transcript for message ${message.idForLogging()}, but it was not an update transcript. Dropping.`
);
event.confirm();
} else if (data.message.reaction) {
const { reaction } = data.message; const { reaction } = data.message;
const ourNumber = textsecure.storage.user.getNumber(); const ourNumber = textsecure.storage.user.getNumber();
const reactionModel = Whisper.Reactions.add({ const reactionModel = Whisper.Reactions.add({
@ -2266,8 +2212,11 @@
}); });
// Note: We do not wait for completion here // Note: We do not wait for completion here
Whisper.Reactions.onReaction(reactionModel); Whisper.Reactions.onReaction(reactionModel);
event.confirm(); event.confirm();
} else { return;
}
await ConversationController.getOrCreateAndWait( await ConversationController.getOrCreateAndWait(
messageDescriptor.id, messageDescriptor.id,
messageDescriptor.type messageDescriptor.type
@ -2275,27 +2224,9 @@
// Don't wait for handleDataMessage, as it has its own per-conversation queueing // Don't wait for handleDataMessage, as it has its own per-conversation queueing
message.handleDataMessage(data.message, event.confirm, { message.handleDataMessage(data.message, event.confirm, {
initialLoadComplete, data,
}); });
} }
}
async function getExistingMessage(data) {
try {
const result = await window.Signal.Data.getMessageBySender(data, {
Message: Whisper.Message,
});
if (result) {
return MessageController.register(result.id, result);
}
return null;
} catch (error) {
window.log.error('getExistingMessage error:', Errors.toLogFormat(error));
return false;
}
}
async function initIncomingMessage(data) { async function initIncomingMessage(data) {
return new Whisper.Message({ return new Whisper.Message({
@ -2413,7 +2344,12 @@
// This matches the queueing behavior used in Message.handleDataMessage // This matches the queueing behavior used in Message.handleDataMessage
conversation.queueJob(async () => { conversation.queueJob(async () => {
const existingMessage = await getExistingMessage(message.attributes); const existingMessage = await window.Signal.Data.getMessageBySender(
message.attributes,
{
Message: Whisper.Message,
}
);
if (existingMessage) { if (existingMessage) {
ev.confirm(); ev.confirm();
window.log.warn( window.log.warn(

View file

@ -1406,12 +1406,8 @@
// This is used by sendSyncMessage, then set to null // This is used by sendSyncMessage, then set to null
if (result.dataMessage) { if (result.dataMessage) {
// When we're not sending recipient updates, we won't save the dataMessage
// unless it's the first time we attempt to send the message.
if (!this.get('synced') || window.SEND_RECIPIENT_UPDATES) {
this.set({ dataMessage: result.dataMessage }); this.set({ dataMessage: result.dataMessage });
} }
}
const sentTo = this.get('sent_to') || []; const sentTo = this.get('sent_to') || [];
this.set({ this.set({
@ -1547,11 +1543,6 @@
} }
const isUpdate = Boolean(this.get('synced')); const isUpdate = Boolean(this.get('synced'));
// Until isRecipientUpdate sync messages are widely supported, will not send them
if (isUpdate && !window.SEND_RECIPIENT_UPDATES) {
return Promise.resolve();
}
return wrap( return wrap(
textsecure.messaging.sendSyncMessage( textsecure.messaging.sendSyncMessage(
dataMessage, dataMessage,
@ -1872,7 +1863,9 @@
return message; return message;
}, },
handleDataMessage(initialMessage, confirm) { handleDataMessage(initialMessage, confirm, options = {}) {
const { data } = options;
// This function is called from the background script in a few scenarios: // This function is called from the background script in a few scenarios:
// 1. on an incoming message // 1. on an incoming message
// 2. on a sent message sync'd from another device // 2. on a sent message sync'd from another device
@ -1897,11 +1890,85 @@
const existingMessage = await getMessageBySender(this.attributes, { const existingMessage = await getMessageBySender(this.attributes, {
Message: Whisper.Message, Message: Whisper.Message,
}); });
if (existingMessage) { const isUpdate = Boolean(data && data.isRecipientUpdate);
if (existingMessage && type === 'incoming') {
window.log.warn('Received duplicate message', this.idForLogging()); window.log.warn('Received duplicate message', this.idForLogging());
confirm(); confirm();
return; return;
} }
if (type === 'outgoing') {
if (isUpdate && existingMessage) {
window.log.info(
`handleDataMessage: Updating message ${message.idForLogging()} with received transcript`
);
let sentTo = [];
let unidentifiedDeliveries = [];
if (Array.isArray(data.unidentifiedStatus)) {
sentTo = data.unidentifiedStatus.map(item => item.destination);
const unidentified = _.filter(data.unidentifiedStatus, item =>
Boolean(item.unidentified)
);
unidentifiedDeliveries = unidentified.map(
item => item.destination
);
}
const toUpdate = MessageController.register(
existingMessage.id,
existingMessage
);
toUpdate.set({
sent_to: _.union(toUpdate.get('sent_to'), sentTo),
unidentifiedDeliveries: _.union(
toUpdate.get('unidentifiedDeliveries'),
unidentifiedDeliveries
),
});
await window.Signal.Data.saveMessage(toUpdate.attributes, {
Message: Whisper.Message,
});
confirm();
return;
} else if (isUpdate) {
window.log.warn(
`handleDataMessage: Received update transcript, but no existing entry for message ${message.idForLogging()}. Dropping.`
);
confirm();
return;
} else if (existingMessage) {
window.log.warn(
`handleDataMessage: Received duplicate transcript for message ${message.idForLogging()}, but it was not an update transcript. Dropping.`
);
confirm();
return;
}
}
// We drop incoming messages for groups we already know about, which we're not a
// part of, except for group updates.
const ourNumber = textsecure.storage.user.getNumber();
const isGroupUpdate =
initialMessage.group &&
initialMessage.group.type !==
textsecure.protobuf.GroupContext.Type.DELIVER;
if (
type === 'incoming' &&
!conversation.isPrivate() &&
!conversation.hasMember(ourNumber) &&
!isGroupUpdate
) {
window.log.warn(
`Received message destined for group ${conversation.idForLogging()}, which we're not a part of. Dropping.`
);
confirm();
return;
}
// Send delivery receipts, but only for incoming sealed sender messages // Send delivery receipts, but only for incoming sealed sender messages
if ( if (

View file

@ -1239,7 +1239,6 @@ MessageReceiver.prototype.extend({
// Note that messages may (generally) only perform one action and we ignore remaining // Note that messages may (generally) only perform one action and we ignore remaining
// fields after the first action. // fields after the first action.
if (window.TIMESTAMP_VALIDATION) {
if (!envelope.timestamp || !decrypted.timestamp) { if (!envelope.timestamp || !decrypted.timestamp) {
throw new Error('Missing timestamp on dataMessage or envelope'); throw new Error('Missing timestamp on dataMessage or envelope');
} }
@ -1252,7 +1251,6 @@ MessageReceiver.prototype.extend({
`Timestamp ${decrypted.timestamp} in DataMessage did not match envelope timestamp ${envelope.timestamp}` `Timestamp ${decrypted.timestamp} in DataMessage did not match envelope timestamp ${envelope.timestamp}`
); );
} }
}
if (decrypted.flags == null) { if (decrypted.flags == null) {
decrypted.flags = 0; decrypted.flags = 0;

View file

@ -188,11 +188,7 @@ MessageSender.prototype = {
); );
}, },
getPaddedAttachment(data, shouldPad) { getPaddedAttachment(data) {
if (!window.PAD_ALL_ATTACHMENTS && !shouldPad) {
return data;
}
const size = data.byteLength; const size = data.byteLength;
const paddedSize = this._getAttachmentSizeBucket(size); const paddedSize = this._getAttachmentSizeBucket(size);
const padding = window.Signal.Crypto.getZeroes(paddedSize - size); const padding = window.Signal.Crypto.getZeroes(paddedSize - size);
@ -200,7 +196,7 @@ MessageSender.prototype = {
return window.Signal.Crypto.concatenateBytes(data, padding); return window.Signal.Crypto.concatenateBytes(data, padding);
}, },
async makeAttachmentPointer(attachment, shouldPad = false) { async makeAttachmentPointer(attachment) {
if (typeof attachment !== 'object' || attachment == null) { if (typeof attachment !== 'object' || attachment == null) {
return Promise.resolve(undefined); return Promise.resolve(undefined);
} }
@ -217,7 +213,7 @@ MessageSender.prototype = {
); );
} }
const padded = this.getPaddedAttachment(data, shouldPad); const padded = this.getPaddedAttachment(data);
const key = libsignal.crypto.getRandomBytes(64); const key = libsignal.crypto.getRandomBytes(64);
const iv = libsignal.crypto.getRandomBytes(16); const iv = libsignal.crypto.getRandomBytes(16);
@ -308,14 +304,10 @@ MessageSender.prototype = {
return; return;
} }
const shouldPad = true;
// eslint-disable-next-line no-param-reassign // eslint-disable-next-line no-param-reassign
message.sticker = { message.sticker = {
...sticker, ...sticker,
attachmentPointer: await this.makeAttachmentPointer( attachmentPointer: await this.makeAttachmentPointer(sticker.data),
sticker.data,
shouldPad
),
}; };
} catch (error) { } catch (error) {
if (error instanceof Error && error.name === 'HTTPError') { if (error instanceof Error && error.name === 'HTTPError') {

View file

@ -11,11 +11,6 @@ const { remote } = electron;
const { app } = remote; const { app } = remote;
const { systemPreferences } = remote.require('electron'); const { systemPreferences } = remote.require('electron');
// Waiting for clients to implement changes on receive side
window.TIMESTAMP_VALIDATION = false;
window.PAD_ALL_ATTACHMENTS = false;
window.SEND_RECIPIENT_UPDATES = false;
window.PROTO_ROOT = 'protos'; window.PROTO_ROOT = 'protos';
const config = require('url').parse(window.location.toString(), true).query; const config = require('url').parse(window.location.toString(), true).query;