diff --git a/Gruntfile.js b/Gruntfile.js index 051a3aa7260e..414a64685a33 100644 --- a/Gruntfile.js +++ b/Gruntfile.js @@ -47,6 +47,7 @@ module.exports = function(grunt) { 'libtextsecure/storage.js', 'libtextsecure/storage/user.js', 'libtextsecure/storage/groups.js', + 'libtextsecure/storage/unprocessed.js', 'libtextsecure/protobufs.js', 'libtextsecure/websocket-resources.js', 'libtextsecure/helpers.js', diff --git a/js/background.js b/js/background.js index 37af22fcd19d..8bf752ee7455 100644 --- a/js/background.js +++ b/js/background.js @@ -162,7 +162,7 @@ return; } - ConversationController.create(c).save(); + ConversationController.create(c).save().then(ev.confirm); } function onGroupReceived(ev) { @@ -179,14 +179,24 @@ } else { attributes.left = true; } + var conversation = ConversationController.create(attributes); - conversation.save(); + conversation.save().then(ev.confirm); } function onMessageReceived(ev) { var data = ev.data; - var message = initIncomingMessage(data.source, data.timestamp); - message.handleDataMessage(data.message); + var message = initIncomingMessage(data); + + isMessageDuplicate(message).then(function(isDuplicate) { + if (isDuplicate) { + console.log('Received duplicate message', message.idForLogging()); + ev.confirm(); + return; + } + + message.handleDataMessage(data.message, ev.confirm); + }); } function onSentMessage(ev) { @@ -195,6 +205,7 @@ var message = new Whisper.Message({ source : textsecure.storage.user.getNumber(), + sourceDevice : data.device, sent_at : data.timestamp, received_at : now, conversationId : data.destination, @@ -203,17 +214,53 @@ expirationStartTimestamp: data.expirationStartTimestamp, }); - message.handleDataMessage(data.message); + isMessageDuplicate(message).then(function(isDuplicate) { + if (isDuplicate) { + console.log('Received duplicate message', message.idForLogging()); + ev.confirm(); + return; + } + + message.handleDataMessage(data.message, ev.confirm); + }); } - function initIncomingMessage(source, timestamp) { + function isMessageDuplicate(message) { + return new Promise(function(resolve) { + var fetcher = new Whisper.Message(); + var options = { + index: { + name: 'unique', + value: [ + message.get('source'), + message.get('sourceDevice'), + message.get('sent_at') + ] + } + }; + + fetcher.fetch(options).always(function() { + if (fetcher.get('id')) { + return resolve(true); + } + + return resolve(false); + }); + }).catch(function(error) { + console.log('isMessageDuplicate error:', error && error.stack ? error.stack : error); + return false; + }); + } + + function initIncomingMessage(data) { var now = new Date().getTime(); var message = new Whisper.Message({ - source : source, - sent_at : timestamp, + source : data.source, + sourceDevice : data.sourceDevice, + sent_at : data.timestamp, received_at : now, - conversationId : source, + conversationId : data.source, type : 'incoming', unread : 1 }); @@ -284,11 +331,12 @@ var timestamp = ev.read.timestamp; var sender = ev.read.sender; console.log('read receipt ', sender, timestamp); - Whisper.ReadReceipts.add({ + var receipt = Whisper.ReadReceipts.add({ sender : sender, timestamp : timestamp, read_at : read_at }); + receipt.on('remove', ev.confirm); } function onVerified(ev) { @@ -323,11 +371,11 @@ }; if (state === 'VERIFIED') { - contact.setVerified(options); + contact.setVerified(options).then(ev.confirm); } else if (state === 'DEFAULT') { - contact.setVerifiedDefault(options); + contact.setVerifiedDefault(options).then(ev.confirm); } else { - contact.setUnverified(options); + contact.setUnverified(options).then(ev.confirm); } } @@ -340,9 +388,11 @@ timestamp ); - Whisper.DeliveryReceipts.add({ - timestamp: timestamp, source: pushMessage.source + var receipt = Whisper.DeliveryReceipts.add({ + timestamp: timestamp, + source: pushMessage.source }); + receipt.on('remove', ev.confirm); } window.owsDesktopApp = { diff --git a/js/database.js b/js/database.js index 6be7a3019c23..3f679a4c27b5 100644 --- a/js/database.js +++ b/js/database.js @@ -254,6 +254,26 @@ console.log(event); }; } + }, + { + version: "14.0", + migrate: function(transaction, next) { + console.log('migration 14.0'); + console.log('Adding unprocessed message store'); + var unprocessed = transaction.db.createObjectStore('unprocessed'); + unprocessed.createIndex('received', 'timestamp', { unique: false }); + next(); + } + }, + { + version: "15.0", + migrate: function(transaction, next) { + console.log('migration 15.0'); + console.log('Adding messages index for de-duplication'); + var messages = transaction.objectStore('messages'); + messages.createIndex('unique', ['source', 'sourceDevice', 'sent_at'], { unique: true }); + next(); + } } ]; }()); diff --git a/js/debugLog.js b/js/debugLog.js index e89e49b0bc1d..15743683b119 100644 --- a/js/debugLog.js +++ b/js/debugLog.js @@ -40,7 +40,7 @@ } }); - var MAX_MESSAGES = 1000; + var MAX_MESSAGES = 3000; var PHONE_REGEX = /\+\d{7,12}(\d{3})/g; var log = new DebugLog(); if (window.console) { diff --git a/js/delivery_receipts.js b/js/delivery_receipts.js index b74d8be74249..7988417bca25 100644 --- a/js/delivery_receipts.js +++ b/js/delivery_receipts.js @@ -43,7 +43,6 @@ }); }).then(function(message) { if (message) { - this.remove(receipt); var deliveries = message.get('delivered') || 0; message.save({ delivered: deliveries + 1 @@ -55,7 +54,9 @@ if (conversation) { conversation.trigger('delivered', message); } - }); + + this.remove(receipt); + }.bind(this)); // TODO: consider keeping a list of numbers we've // successfully delivered to? } diff --git a/js/libtextsecure.js b/js/libtextsecure.js index 243afaf3724c..e914aa122f6b 100644 --- a/js/libtextsecure.js +++ b/js/libtextsecure.js @@ -36906,9 +36906,9 @@ Internal.SessionLock.queueJobForNumber = function queueJobForNumber(number, runJ * vim: ts=4:sw=4:expandtab */ -'use strict'; - ;(function() { + 'use strict'; + /********************* *** Group Storage *** *********************/ @@ -37051,6 +37051,35 @@ Internal.SessionLock.queueJobForNumber = function queueJobForNumber(number, runJ }; })(); +/* + * vim: ts=4:sw=4:expandtab + */ + +;(function() { + 'use strict'; + + /***************************************** + *** Not-yet-processed message storage *** + *****************************************/ + window.textsecure = window.textsecure || {}; + window.textsecure.storage = window.textsecure.storage || {}; + + window.textsecure.storage.unprocessed = { + getAll: function() { + return textsecure.storage.protocol.getAllUnprocessed(); + }, + add: function(data) { + return textsecure.storage.protocol.addUnprocessed(data); + }, + update: function(id, updates) { + return textsecure.storage.protocol.updateUnprocessed(id, updates); + }, + remove: function(id) { + return textsecure.storage.protocol.removeUnprocessed(id); + }, + }; +})(); + ;(function() { 'use strict'; window.textsecure = window.textsecure || {}; @@ -38239,7 +38268,10 @@ MessageReceiver.prototype.extend({ handleRequest: this.handleRequest.bind(this), keepalive: { path: '/v1/keepalive', disconnect: true } }); + this.pending = Promise.resolve(); + + this.queueAllCached(); }, close: function() { this.socket.close(3000, 'called close'); @@ -38280,25 +38312,146 @@ MessageReceiver.prototype.extend({ textsecure.crypto.decryptWebsocketMessage(request.body, this.signalingKey).then(function(plaintext) { var envelope = textsecure.protobuf.Envelope.decode(plaintext); // After this point, decoding errors are not the server's - // fault, and we should handle them gracefully and tell the - // user they received an invalid message - request.respond(200, 'OK'); + // fault, and we should handle them gracefully and tell the + // user they received an invalid message - if (!this.isBlocked(envelope.source)) { - this.queueEnvelope(envelope); + if (this.isBlocked(envelope.source)) { + return request.respond(200, 'OK'); } + this.addToCache(envelope, plaintext).then(function() { + request.respond(200, 'OK'); + this.queueEnvelope(envelope); + }.bind(this), function(error) { + console.log( + 'handleRequest error trying to add message to cache:', + error && error.stack ? error.stack : error + ); + }); }.bind(this)).catch(function(e) { request.respond(500, 'Bad encrypted websocket message'); - console.log("Error handling incoming message:", e); + console.log("Error handling incoming message:", e && e.stack ? e.stack : e); var ev = new Event('error'); ev.error = e; this.dispatchEvent(ev); }.bind(this)); }, + queueAllCached: function() { + this.getAllFromCache().then(function(items) { + for (var i = 0, max = items.length; i < max; i += 1) { + this.queueCached(items[i]); + } + }.bind(this)); + }, + queueCached: function(item) { + try { + var envelopePlaintext = this.stringToArrayBuffer(item.envelope); + var envelope = textsecure.protobuf.Envelope.decode(envelopePlaintext); + + var decrypted = item.decrypted; + if (decrypted) { + var payloadPlaintext = this.stringToArrayBuffer(decrypted); + this.queueDecryptedEnvelope(envelope, payloadPlaintext); + } else { + this.queueEnvelope(envelope); + } + } + catch (error) { + console.log('queueCached error handling item', item.id); + } + }, + getEnvelopeId: function(envelope) { + return envelope.source + '.' + envelope.sourceDevice + ' ' + envelope.timestamp.toNumber(); + }, + arrayBufferToString: function(arrayBuffer) { + return new dcodeIO.ByteBuffer.wrap(arrayBuffer).toString('binary'); + }, + stringToArrayBuffer: function(string) { + return new dcodeIO.ByteBuffer.wrap(string, 'binary').toArrayBuffer(); + }, + getAllFromCache: function() { + console.log('getAllFromCache'); + return textsecure.storage.unprocessed.getAll().then(function(items) { + console.log('getAllFromCache loaded', items.length, 'saved envelopes'); + + return Promise.all(_.map(items, function(item) { + var attempts = 1 + (item.attempts || 0); + if (attempts >= 5) { + console.log('getAllFromCache final attempt for envelope', item.id); + return textsecure.storage.unprocessed.remove(item.id); + } else { + return textsecure.storage.unprocessed.update(item.id, {attempts: attempts}); + } + }.bind(this))).then(function() { + return items; + }, function(error) { + console.log( + 'getAllFromCache error updating items after load:', + error && error.stack ? error.stack : error + ); + return items; + }); + }.bind(this)); + }, + addToCache: function(envelope, plaintext) { + var id = this.getEnvelopeId(envelope); + console.log('addToCache', id); + var string = this.arrayBufferToString(plaintext); + var data = { + id: id, + envelope: string, + timestamp: Date.now(), + attempts: 1 + }; + return textsecure.storage.unprocessed.add(data); + }, + updateCache: function(envelope, plaintext) { + var id = this.getEnvelopeId(envelope); + console.log('updateCache', id); + var string = this.arrayBufferToString(plaintext); + var data = { + decrypted: string + }; + return textsecure.storage.unprocessed.update(id, data); + }, + removeFromCache: function(envelope) { + var id = this.getEnvelopeId(envelope); + console.log('removeFromCache', id); + return textsecure.storage.unprocessed.remove(id); + }, + queueDecryptedEnvelope: function(envelope, plaintext) { + console.log('queueing decrypted envelope', this.getEnvelopeId(envelope)); + var handleDecryptedEnvelope = this.handleDecryptedEnvelope.bind(this, envelope, plaintext); + this.pending = this.pending.then(handleDecryptedEnvelope, handleDecryptedEnvelope); + + return this.pending.catch(function(error) { + console.log('queueDecryptedEnvelope error:', error && error.stack ? error.stack : error); + }); + }, queueEnvelope: function(envelope) { + console.log('queueing envelope', this.getEnvelopeId(envelope)); var handleEnvelope = this.handleEnvelope.bind(this, envelope); this.pending = this.pending.then(handleEnvelope, handleEnvelope); + + return this.pending.catch(function(error) { + console.log('queueEnvelope error:', error && error.stack ? error.stack : error); + }); + }, + // Same as handleEnvelope, just without the decryption step. Necessary for handling + // messages which were successfully decrypted, but application logic didn't finish + // processing. + handleDecryptedEnvelope: function(envelope, plaintext) { + // No decryption is required for delivery receipts, so the decrypted field of + // the Unprocessed model will never be set + + if (envelope.content) { + return this.innerHandleContentMessage(envelope, plaintext); + } else if (envelope.legacyMessage) { + return this.innerHandleLegacyMessage(envelope, plaintext); + } else { + this.removeFromCache(envelope); + throw new Error('Received message with no content and no legacyMessage'); + } }, handleEnvelope: function(envelope) { if (envelope.type === textsecure.protobuf.Envelope.Type.RECEIPT) { @@ -38310,6 +38463,7 @@ MessageReceiver.prototype.extend({ } else if (envelope.legacyMessage) { return this.handleLegacyMessage(envelope); } else { + this.removeFromCache(envelope); throw new Error('Received message with no content and no legacyMessage'); } }, @@ -38321,9 +38475,13 @@ MessageReceiver.prototype.extend({ } }, onDeliveryReceipt: function (envelope) { - var ev = new Event('receipt'); - ev.proto = envelope; - this.dispatchEvent(ev); + return new Promise(function(resolve) { + var ev = new Event('receipt'); + ev.confirm = this.removeFromCache.bind(this, envelope); + ev.proto = envelope; + this.dispatchEvent(ev); + return resolve(); + }.bind(this)); }, unpad: function(paddedPlaintext) { paddedPlaintext = new Uint8Array(paddedPlaintext); @@ -38347,17 +38505,27 @@ MessageReceiver.prototype.extend({ var sessionCipher = new libsignal.SessionCipher(textsecure.storage.protocol, address); switch(envelope.type) { case textsecure.protobuf.Envelope.Type.CIPHERTEXT: - console.log('message from', envelope.source + '.' + envelope.sourceDevice, envelope.timestamp.toNumber()); + console.log('message from', this.getEnvelopeId(envelope)); promise = sessionCipher.decryptWhisperMessage(ciphertext).then(this.unpad); break; case textsecure.protobuf.Envelope.Type.PREKEY_BUNDLE: - console.log('prekey message from', envelope.source + '.' + envelope.sourceDevice, envelope.timestamp.toNumber()); + console.log('prekey message from', this.getEnvelopeId(envelope)); promise = this.decryptPreKeyWhisperMessage(ciphertext, sessionCipher, address); break; default: promise = Promise.reject(new Error("Unknown message type")); } - return promise.catch(function(error) { + return promise.then(function(plaintext) { + return this.updateCache(envelope, plaintext).then(function() { + return plaintext; + }, function(error) { + console.log( + 'decrypt failed to save decrypted message contents to cache:', + error && error.stack ? error.stack : error + ); + return plaintext; + }); + }.bind(this)).catch(function(error) { if (error.message === 'Unknown identity key') { // create an error that the UI will pick up and ask the // user if they want to re-negotiate @@ -38390,7 +38558,7 @@ MessageReceiver.prototype.extend({ throw e; }); }, - handleSentMessage: function(destination, timestamp, message, expirationStartTimestamp) { + handleSentMessage: function(envelope, destination, timestamp, message, expirationStartTimestamp) { var p = Promise.resolve(); if ((message.flags & textsecure.protobuf.DataMessage.Flags.END_SESSION) == textsecure.protobuf.DataMessage.Flags.END_SESSION ) { @@ -38399,9 +38567,11 @@ MessageReceiver.prototype.extend({ return p.then(function() { return this.processDecrypted(message, this.number).then(function(message) { var ev = new Event('sent'); + ev.confirm = this.removeFromCache.bind(this, envelope); ev.data = { destination : destination, timestamp : timestamp.toNumber(), + device : envelope.sourceDevice, message : message }; if (expirationStartTimestamp) { @@ -38422,10 +38592,12 @@ MessageReceiver.prototype.extend({ return p.then(function() { return this.processDecrypted(message, envelope.source).then(function(message) { var ev = new Event('message'); + ev.confirm = this.removeFromCache.bind(this, envelope); ev.data = { - source : envelope.source, - timestamp : envelope.timestamp.toNumber(), - message : message + source : envelope.source, + sourceDevice : envelope.sourceDevice, + timestamp : envelope.timestamp.toNumber(), + message : message }; this.dispatchEvent(ev); }.bind(this)); @@ -38433,27 +38605,35 @@ MessageReceiver.prototype.extend({ }, handleLegacyMessage: function (envelope) { return this.decrypt(envelope, envelope.legacyMessage).then(function(plaintext) { - var message = textsecure.protobuf.DataMessage.decode(plaintext); - return this.handleDataMessage(envelope, message); + return this.innerHandleLegacyMessage(envelope, plaintext); }.bind(this)); }, + innerHandleLegacyMessage: function (envelope, plaintext) { + var message = textsecure.protobuf.DataMessage.decode(plaintext); + return this.handleDataMessage(envelope, message); + }, handleContentMessage: function (envelope) { return this.decrypt(envelope, envelope.content).then(function(plaintext) { - var content = textsecure.protobuf.Content.decode(plaintext); - if (content.syncMessage) { - return this.handleSyncMessage(envelope, content.syncMessage); - } else if (content.dataMessage) { - return this.handleDataMessage(envelope, content.dataMessage); - } else if (content.nullMessage) { - return this.handleNullMessage(envelope, content.nullMessage); - } else { - throw new Error('Unsupported content message'); - } + this.innerHandleContentMessage(envelope, plaintext); }.bind(this)); }, + innerHandleContentMessage: function(envelope, plaintext) { + var content = textsecure.protobuf.Content.decode(plaintext); + if (content.syncMessage) { + return this.handleSyncMessage(envelope, content.syncMessage); + } else if (content.dataMessage) { + return this.handleDataMessage(envelope, content.dataMessage); + } else if (content.nullMessage) { + return this.handleNullMessage(envelope, content.nullMessage); + } else { + this.removeFromCache(envelope); + throw new Error('Unsupported content message'); + } + }, handleNullMessage: function(envelope, nullMessage) { var encodedNumber = envelope.source + '.' + envelope.sourceDevice; console.log('null message from', encodedNumber, envelope.timestamp.toNumber()); + this.removeFromCache(envelope); }, handleSyncMessage: function(envelope, syncMessage) { if (envelope.source !== this.number) { @@ -38470,34 +38650,37 @@ MessageReceiver.prototype.extend({ 'from', envelope.source + '.' + envelope.sourceDevice ); return this.handleSentMessage( + envelope, sentMessage.destination, sentMessage.timestamp, sentMessage.message, sentMessage.expirationStartTimestamp ); } else if (syncMessage.contacts) { - this.handleContacts(syncMessage.contacts); + this.handleContacts(envelope, syncMessage.contacts); } else if (syncMessage.groups) { - this.handleGroups(syncMessage.groups); + this.handleGroups(envelope, syncMessage.groups); } else if (syncMessage.blocked) { - this.handleBlocked(syncMessage.blocked); + this.handleBlocked(envelope, syncMessage.blocked); } else if (syncMessage.request) { console.log('Got SyncMessage Request'); + this.removeFromCache(envelope); } else if (syncMessage.read && syncMessage.read.length) { console.log('read messages', 'from', envelope.source + '.' + envelope.sourceDevice); - this.handleRead(syncMessage.read, envelope.timestamp); + this.handleRead(envelope, syncMessage.read); } else if (syncMessage.verified) { - this.handleVerified(syncMessage.verified); + this.handleVerified(envelope, syncMessage.verified); } else { throw new Error('Got empty SyncMessage'); } }, - handleVerified: function(verified, options) { + handleVerified: function(envelope, verified, options) { options = options || {}; _.defaults(options, {viaContactSync: false}); var ev = new Event('verified'); + ev.confirm = this.removeFromCache.bind(this, envelope); ev.verified = { state: verified.state, destination: verified.destination, @@ -38506,10 +38689,11 @@ MessageReceiver.prototype.extend({ ev.viaContactSync = options.viaContactSync; this.dispatchEvent(ev); }, - handleRead: function(read, timestamp) { + handleRead: function(envelope, read) { for (var i = 0; i < read.length; ++i) { var ev = new Event('read'); - ev.timestamp = timestamp.toNumber(); + ev.confirm = this.removeFromCache.bind(this, envelope); + ev.timestamp = envelope.timestamp.toNumber(); ev.read = { timestamp : read[i].timestamp.toNumber(), sender : read[i].sender @@ -38517,7 +38701,7 @@ MessageReceiver.prototype.extend({ this.dispatchEvent(ev); } }, - handleContacts: function(contacts) { + handleContacts: function(envelope, contacts) { console.log('contact sync'); var eventTarget = this; var attachmentPointer = contacts.blob; @@ -38526,19 +38710,23 @@ MessageReceiver.prototype.extend({ var contactDetails = contactBuffer.next(); while (contactDetails !== undefined) { var ev = new Event('contact'); + ev.confirm = this.removeFromCache.bind(this, envelope); ev.contactDetails = contactDetails; eventTarget.dispatchEvent(ev); if (contactDetails.verified) { - this.handleVerified(contactDetails.verified, {viaContactSync: true}); + this.handleVerified(envelope, contactDetails.verified, {viaContactSync: true}); } contactDetails = contactBuffer.next(); } - eventTarget.dispatchEvent(new Event('contactsync')); + + var ev = new Event('contactsync'); + ev.confirm = this.removeFromCache.bind(this, envelope); + eventTarget.dispatchEvent(ev); }.bind(this)); }, - handleGroups: function(groups) { + handleGroups: function(envelope, groups) { console.log('group sync'); var eventTarget = this; var attachmentPointer = groups.blob; @@ -38567,18 +38755,22 @@ MessageReceiver.prototype.extend({ } })(groupDetails).then(function(groupDetails) { var ev = new Event('group'); + ev.confirm = this.removeFromCache.bind(this, envelope); ev.groupDetails = groupDetails; eventTarget.dispatchEvent(ev); - }).catch(function(e) { + }.bind(this)).catch(function(e) { console.log('error processing group', e); }); groupDetails = groupBuffer.next(); promises.push(promise); } + Promise.all(promises).then(function() { - eventTarget.dispatchEvent(new Event('groupsync')); - }); - }); + var ev = new Event('groupsync'); + ev.confirm = this.removeFromCache.bind(this, envelope); + eventTarget.dispatchEvent(ev); + }.bind(this)); + }.bind(this)); }, handleBlocked: function(blocked) { textsecure.storage.put('blocked', blocked.numbers); diff --git a/js/models/messages.js b/js/models/messages.js index 7d831a3eff1c..04eb8d074038 100644 --- a/js/models/messages.js +++ b/js/models/messages.js @@ -15,7 +15,10 @@ this.on('change:expireTimer', this.setToExpire); this.setToExpire(); }, - defaults : function() { + idForLogging: function() { + return this.get('source') + '.' + this.get('sourceDevice') + ' ' + this.get('sent_at'); + }, + defaults: function() { return { timestamp: new Date().getTime(), attachments: [] @@ -339,7 +342,7 @@ this.send(promise); } }, - handleDataMessage: function(dataMessage) { + handleDataMessage: function(dataMessage, confirm) { // This function can be called from the background script on an // incoming message or from the frontend after the user accepts an // identity key change. @@ -351,13 +354,13 @@ if (dataMessage.group) { conversationId = dataMessage.group.id; } - console.log('queuing handleDataMessage', source, timestamp); + console.log('queuing handleDataMessage', message.idForLogging()); var conversation = ConversationController.create({id: conversationId}); conversation.queueJob(function() { return new Promise(function(resolve) { conversation.fetch().always(function() { - console.log('starting handleDataMessage', source, timestamp); + console.log('starting handleDataMessage', message.idForLogging()); var now = new Date().getTime(); var attributes = { type: 'private' }; @@ -468,15 +471,18 @@ }); } - console.log('beginning saves in handleDataMessage', source, timestamp); + console.log('beginning saves in handleDataMessage', message.idForLogging()); var handleError = function(error) { error = error && error.stack ? error.stack : error; - console.log('handleDataMessage', source, timestamp, 'error:', error); + console.log('handleDataMessage', message.idForLogging(), 'error:', error); return resolve(); }; message.save().then(function() { + + // throw new Error('Something went wrong!'); + conversation.save().then(function() { try { conversation.trigger('newmessage', message); @@ -501,7 +507,11 @@ conversation.notify(message); } - console.log('done with handleDataMessage', source, timestamp); + console.log('done with handleDataMessage', message.idForLogging()); + + if (confirm) { + confirm(); + } return resolve(); } catch (e) { diff --git a/js/read_receipts.js b/js/read_receipts.js index 50d37ab086de..71ad8b152a21 100644 --- a/js/read_receipts.js +++ b/js/read_receipts.js @@ -27,9 +27,9 @@ message.get('source') === receipt.get('sender')); }); if (message) { - this.remove(receipt); message.markRead(receipt.get('read_at')).then(function() { this.notifyConversation(message); + this.remove(receipt); }.bind(this)); } else { console.log('No message for read receipt'); diff --git a/js/signal_protocol_store.js b/js/signal_protocol_store.js index b54c1bb262f2..1f9325da8fc4 100644 --- a/js/signal_protocol_store.js +++ b/js/signal_protocol_store.js @@ -104,6 +104,13 @@ return this.fetch({range: [number + '.1', number + '.' + ':']}); } }); + var Unprocessed = Model.extend({ storeName : 'unprocessed' }); + var UnprocessedCollection = Backbone.Collection.extend({ + storeName : 'unprocessed', + database : Whisper.Database, + model : Unprocessed, + comparator : 'timestamp' + }); var IdentityRecord = Model.extend({ storeName: 'identityKeys', validAttributes: [ @@ -740,6 +747,8 @@ resolve(textsecure.storage.protocol.removeAllSessions(number)); }); }, + + // Groups getGroup: function(groupId) { if (groupId === null || groupId === undefined) { throw new Error("Tried to get group for undefined/null id"); @@ -773,6 +782,41 @@ }); }, + // Not yet processed messages - for resiliency + getAllUnprocessed: function() { + var collection; + return new Promise(function(resolve, reject) { + collection = new UnprocessedCollection(); + return collection.fetch().then(resolve, reject); + }).then(function() { + // Return a plain array of plain objects + return collection.map('attributes'); + }); + }, + addUnprocessed: function(data) { + return new Promise(function(resolve, reject) { + var unprocessed = new Unprocessed(data); + return unprocessed.save().then(resolve, reject); + }); + }, + updateUnprocessed: function(id, updates) { + return new Promise(function(resolve, reject) { + var unprocessed = new Unprocessed({ + id: id + }); + return unprocessed.fetch().then(function() { + return unprocessed.save(updates).then(resolve, reject); + }, reject); + }.bind(this)); + }, + removeUnprocessed: function(id) { + return new Promise(function(resolve, reject) { + var unprocessed = new Unprocessed({ + id: id + }); + return unprocessed.destroy().then(resolve, reject); + }.bind(this)); + }, }; _.extend(SignalProtocolStore.prototype, Backbone.Events); diff --git a/libtextsecure/message_receiver.js b/libtextsecure/message_receiver.js index 633079410efc..e68456a9b42d 100644 --- a/libtextsecure/message_receiver.js +++ b/libtextsecure/message_receiver.js @@ -30,7 +30,10 @@ MessageReceiver.prototype.extend({ handleRequest: this.handleRequest.bind(this), keepalive: { path: '/v1/keepalive', disconnect: true } }); + this.pending = Promise.resolve(); + + this.queueAllCached(); }, close: function() { this.socket.close(3000, 'called close'); @@ -71,25 +74,146 @@ MessageReceiver.prototype.extend({ textsecure.crypto.decryptWebsocketMessage(request.body, this.signalingKey).then(function(plaintext) { var envelope = textsecure.protobuf.Envelope.decode(plaintext); // After this point, decoding errors are not the server's - // fault, and we should handle them gracefully and tell the - // user they received an invalid message - request.respond(200, 'OK'); + // fault, and we should handle them gracefully and tell the + // user they received an invalid message - if (!this.isBlocked(envelope.source)) { - this.queueEnvelope(envelope); + if (this.isBlocked(envelope.source)) { + return request.respond(200, 'OK'); } + this.addToCache(envelope, plaintext).then(function() { + request.respond(200, 'OK'); + this.queueEnvelope(envelope); + }.bind(this), function(error) { + console.log( + 'handleRequest error trying to add message to cache:', + error && error.stack ? error.stack : error + ); + }); }.bind(this)).catch(function(e) { request.respond(500, 'Bad encrypted websocket message'); - console.log("Error handling incoming message:", e); + console.log("Error handling incoming message:", e && e.stack ? e.stack : e); var ev = new Event('error'); ev.error = e; this.dispatchEvent(ev); }.bind(this)); }, + queueAllCached: function() { + this.getAllFromCache().then(function(items) { + for (var i = 0, max = items.length; i < max; i += 1) { + this.queueCached(items[i]); + } + }.bind(this)); + }, + queueCached: function(item) { + try { + var envelopePlaintext = this.stringToArrayBuffer(item.envelope); + var envelope = textsecure.protobuf.Envelope.decode(envelopePlaintext); + + var decrypted = item.decrypted; + if (decrypted) { + var payloadPlaintext = this.stringToArrayBuffer(decrypted); + this.queueDecryptedEnvelope(envelope, payloadPlaintext); + } else { + this.queueEnvelope(envelope); + } + } + catch (error) { + console.log('queueCached error handling item', item.id); + } + }, + getEnvelopeId: function(envelope) { + return envelope.source + '.' + envelope.sourceDevice + ' ' + envelope.timestamp.toNumber(); + }, + arrayBufferToString: function(arrayBuffer) { + return new dcodeIO.ByteBuffer.wrap(arrayBuffer).toString('binary'); + }, + stringToArrayBuffer: function(string) { + return new dcodeIO.ByteBuffer.wrap(string, 'binary').toArrayBuffer(); + }, + getAllFromCache: function() { + console.log('getAllFromCache'); + return textsecure.storage.unprocessed.getAll().then(function(items) { + console.log('getAllFromCache loaded', items.length, 'saved envelopes'); + + return Promise.all(_.map(items, function(item) { + var attempts = 1 + (item.attempts || 0); + if (attempts >= 5) { + console.log('getAllFromCache final attempt for envelope', item.id); + return textsecure.storage.unprocessed.remove(item.id); + } else { + return textsecure.storage.unprocessed.update(item.id, {attempts: attempts}); + } + }.bind(this))).then(function() { + return items; + }, function(error) { + console.log( + 'getAllFromCache error updating items after load:', + error && error.stack ? error.stack : error + ); + return items; + }); + }.bind(this)); + }, + addToCache: function(envelope, plaintext) { + var id = this.getEnvelopeId(envelope); + console.log('addToCache', id); + var string = this.arrayBufferToString(plaintext); + var data = { + id: id, + envelope: string, + timestamp: Date.now(), + attempts: 1 + }; + return textsecure.storage.unprocessed.add(data); + }, + updateCache: function(envelope, plaintext) { + var id = this.getEnvelopeId(envelope); + console.log('updateCache', id); + var string = this.arrayBufferToString(plaintext); + var data = { + decrypted: string + }; + return textsecure.storage.unprocessed.update(id, data); + }, + removeFromCache: function(envelope) { + var id = this.getEnvelopeId(envelope); + console.log('removeFromCache', id); + return textsecure.storage.unprocessed.remove(id); + }, + queueDecryptedEnvelope: function(envelope, plaintext) { + console.log('queueing decrypted envelope', this.getEnvelopeId(envelope)); + var handleDecryptedEnvelope = this.handleDecryptedEnvelope.bind(this, envelope, plaintext); + this.pending = this.pending.then(handleDecryptedEnvelope, handleDecryptedEnvelope); + + return this.pending.catch(function(error) { + console.log('queueDecryptedEnvelope error:', error && error.stack ? error.stack : error); + }); + }, queueEnvelope: function(envelope) { + console.log('queueing envelope', this.getEnvelopeId(envelope)); var handleEnvelope = this.handleEnvelope.bind(this, envelope); this.pending = this.pending.then(handleEnvelope, handleEnvelope); + + return this.pending.catch(function(error) { + console.log('queueEnvelope error:', error && error.stack ? error.stack : error); + }); + }, + // Same as handleEnvelope, just without the decryption step. Necessary for handling + // messages which were successfully decrypted, but application logic didn't finish + // processing. + handleDecryptedEnvelope: function(envelope, plaintext) { + // No decryption is required for delivery receipts, so the decrypted field of + // the Unprocessed model will never be set + + if (envelope.content) { + return this.innerHandleContentMessage(envelope, plaintext); + } else if (envelope.legacyMessage) { + return this.innerHandleLegacyMessage(envelope, plaintext); + } else { + this.removeFromCache(envelope); + throw new Error('Received message with no content and no legacyMessage'); + } }, handleEnvelope: function(envelope) { if (envelope.type === textsecure.protobuf.Envelope.Type.RECEIPT) { @@ -101,6 +225,7 @@ MessageReceiver.prototype.extend({ } else if (envelope.legacyMessage) { return this.handleLegacyMessage(envelope); } else { + this.removeFromCache(envelope); throw new Error('Received message with no content and no legacyMessage'); } }, @@ -112,9 +237,13 @@ MessageReceiver.prototype.extend({ } }, onDeliveryReceipt: function (envelope) { - var ev = new Event('receipt'); - ev.proto = envelope; - this.dispatchEvent(ev); + return new Promise(function(resolve) { + var ev = new Event('receipt'); + ev.confirm = this.removeFromCache.bind(this, envelope); + ev.proto = envelope; + this.dispatchEvent(ev); + return resolve(); + }.bind(this)); }, unpad: function(paddedPlaintext) { paddedPlaintext = new Uint8Array(paddedPlaintext); @@ -138,17 +267,27 @@ MessageReceiver.prototype.extend({ var sessionCipher = new libsignal.SessionCipher(textsecure.storage.protocol, address); switch(envelope.type) { case textsecure.protobuf.Envelope.Type.CIPHERTEXT: - console.log('message from', envelope.source + '.' + envelope.sourceDevice, envelope.timestamp.toNumber()); + console.log('message from', this.getEnvelopeId(envelope)); promise = sessionCipher.decryptWhisperMessage(ciphertext).then(this.unpad); break; case textsecure.protobuf.Envelope.Type.PREKEY_BUNDLE: - console.log('prekey message from', envelope.source + '.' + envelope.sourceDevice, envelope.timestamp.toNumber()); + console.log('prekey message from', this.getEnvelopeId(envelope)); promise = this.decryptPreKeyWhisperMessage(ciphertext, sessionCipher, address); break; default: promise = Promise.reject(new Error("Unknown message type")); } - return promise.catch(function(error) { + return promise.then(function(plaintext) { + return this.updateCache(envelope, plaintext).then(function() { + return plaintext; + }, function(error) { + console.log( + 'decrypt failed to save decrypted message contents to cache:', + error && error.stack ? error.stack : error + ); + return plaintext; + }); + }.bind(this)).catch(function(error) { if (error.message === 'Unknown identity key') { // create an error that the UI will pick up and ask the // user if they want to re-negotiate @@ -181,7 +320,7 @@ MessageReceiver.prototype.extend({ throw e; }); }, - handleSentMessage: function(destination, timestamp, message, expirationStartTimestamp) { + handleSentMessage: function(envelope, destination, timestamp, message, expirationStartTimestamp) { var p = Promise.resolve(); if ((message.flags & textsecure.protobuf.DataMessage.Flags.END_SESSION) == textsecure.protobuf.DataMessage.Flags.END_SESSION ) { @@ -190,9 +329,11 @@ MessageReceiver.prototype.extend({ return p.then(function() { return this.processDecrypted(message, this.number).then(function(message) { var ev = new Event('sent'); + ev.confirm = this.removeFromCache.bind(this, envelope); ev.data = { destination : destination, timestamp : timestamp.toNumber(), + device : envelope.sourceDevice, message : message }; if (expirationStartTimestamp) { @@ -213,10 +354,12 @@ MessageReceiver.prototype.extend({ return p.then(function() { return this.processDecrypted(message, envelope.source).then(function(message) { var ev = new Event('message'); + ev.confirm = this.removeFromCache.bind(this, envelope); ev.data = { - source : envelope.source, - timestamp : envelope.timestamp.toNumber(), - message : message + source : envelope.source, + sourceDevice : envelope.sourceDevice, + timestamp : envelope.timestamp.toNumber(), + message : message }; this.dispatchEvent(ev); }.bind(this)); @@ -224,27 +367,35 @@ MessageReceiver.prototype.extend({ }, handleLegacyMessage: function (envelope) { return this.decrypt(envelope, envelope.legacyMessage).then(function(plaintext) { - var message = textsecure.protobuf.DataMessage.decode(plaintext); - return this.handleDataMessage(envelope, message); + return this.innerHandleLegacyMessage(envelope, plaintext); }.bind(this)); }, + innerHandleLegacyMessage: function (envelope, plaintext) { + var message = textsecure.protobuf.DataMessage.decode(plaintext); + return this.handleDataMessage(envelope, message); + }, handleContentMessage: function (envelope) { return this.decrypt(envelope, envelope.content).then(function(plaintext) { - var content = textsecure.protobuf.Content.decode(plaintext); - if (content.syncMessage) { - return this.handleSyncMessage(envelope, content.syncMessage); - } else if (content.dataMessage) { - return this.handleDataMessage(envelope, content.dataMessage); - } else if (content.nullMessage) { - return this.handleNullMessage(envelope, content.nullMessage); - } else { - throw new Error('Unsupported content message'); - } + this.innerHandleContentMessage(envelope, plaintext); }.bind(this)); }, + innerHandleContentMessage: function(envelope, plaintext) { + var content = textsecure.protobuf.Content.decode(plaintext); + if (content.syncMessage) { + return this.handleSyncMessage(envelope, content.syncMessage); + } else if (content.dataMessage) { + return this.handleDataMessage(envelope, content.dataMessage); + } else if (content.nullMessage) { + return this.handleNullMessage(envelope, content.nullMessage); + } else { + this.removeFromCache(envelope); + throw new Error('Unsupported content message'); + } + }, handleNullMessage: function(envelope, nullMessage) { var encodedNumber = envelope.source + '.' + envelope.sourceDevice; console.log('null message from', encodedNumber, envelope.timestamp.toNumber()); + this.removeFromCache(envelope); }, handleSyncMessage: function(envelope, syncMessage) { if (envelope.source !== this.number) { @@ -261,34 +412,37 @@ MessageReceiver.prototype.extend({ 'from', envelope.source + '.' + envelope.sourceDevice ); return this.handleSentMessage( + envelope, sentMessage.destination, sentMessage.timestamp, sentMessage.message, sentMessage.expirationStartTimestamp ); } else if (syncMessage.contacts) { - this.handleContacts(syncMessage.contacts); + this.handleContacts(envelope, syncMessage.contacts); } else if (syncMessage.groups) { - this.handleGroups(syncMessage.groups); + this.handleGroups(envelope, syncMessage.groups); } else if (syncMessage.blocked) { - this.handleBlocked(syncMessage.blocked); + this.handleBlocked(envelope, syncMessage.blocked); } else if (syncMessage.request) { console.log('Got SyncMessage Request'); + this.removeFromCache(envelope); } else if (syncMessage.read && syncMessage.read.length) { console.log('read messages', 'from', envelope.source + '.' + envelope.sourceDevice); - this.handleRead(syncMessage.read, envelope.timestamp); + this.handleRead(envelope, syncMessage.read); } else if (syncMessage.verified) { - this.handleVerified(syncMessage.verified); + this.handleVerified(envelope, syncMessage.verified); } else { throw new Error('Got empty SyncMessage'); } }, - handleVerified: function(verified, options) { + handleVerified: function(envelope, verified, options) { options = options || {}; _.defaults(options, {viaContactSync: false}); var ev = new Event('verified'); + ev.confirm = this.removeFromCache.bind(this, envelope); ev.verified = { state: verified.state, destination: verified.destination, @@ -297,10 +451,11 @@ MessageReceiver.prototype.extend({ ev.viaContactSync = options.viaContactSync; this.dispatchEvent(ev); }, - handleRead: function(read, timestamp) { + handleRead: function(envelope, read) { for (var i = 0; i < read.length; ++i) { var ev = new Event('read'); - ev.timestamp = timestamp.toNumber(); + ev.confirm = this.removeFromCache.bind(this, envelope); + ev.timestamp = envelope.timestamp.toNumber(); ev.read = { timestamp : read[i].timestamp.toNumber(), sender : read[i].sender @@ -308,7 +463,7 @@ MessageReceiver.prototype.extend({ this.dispatchEvent(ev); } }, - handleContacts: function(contacts) { + handleContacts: function(envelope, contacts) { console.log('contact sync'); var eventTarget = this; var attachmentPointer = contacts.blob; @@ -317,19 +472,23 @@ MessageReceiver.prototype.extend({ var contactDetails = contactBuffer.next(); while (contactDetails !== undefined) { var ev = new Event('contact'); + ev.confirm = this.removeFromCache.bind(this, envelope); ev.contactDetails = contactDetails; eventTarget.dispatchEvent(ev); if (contactDetails.verified) { - this.handleVerified(contactDetails.verified, {viaContactSync: true}); + this.handleVerified(envelope, contactDetails.verified, {viaContactSync: true}); } contactDetails = contactBuffer.next(); } - eventTarget.dispatchEvent(new Event('contactsync')); + + var ev = new Event('contactsync'); + ev.confirm = this.removeFromCache.bind(this, envelope); + eventTarget.dispatchEvent(ev); }.bind(this)); }, - handleGroups: function(groups) { + handleGroups: function(envelope, groups) { console.log('group sync'); var eventTarget = this; var attachmentPointer = groups.blob; @@ -358,18 +517,22 @@ MessageReceiver.prototype.extend({ } })(groupDetails).then(function(groupDetails) { var ev = new Event('group'); + ev.confirm = this.removeFromCache.bind(this, envelope); ev.groupDetails = groupDetails; eventTarget.dispatchEvent(ev); - }).catch(function(e) { + }.bind(this)).catch(function(e) { console.log('error processing group', e); }); groupDetails = groupBuffer.next(); promises.push(promise); } + Promise.all(promises).then(function() { - eventTarget.dispatchEvent(new Event('groupsync')); - }); - }); + var ev = new Event('groupsync'); + ev.confirm = this.removeFromCache.bind(this, envelope); + eventTarget.dispatchEvent(ev); + }.bind(this)); + }.bind(this)); }, handleBlocked: function(blocked) { textsecure.storage.put('blocked', blocked.numbers); diff --git a/libtextsecure/storage/groups.js b/libtextsecure/storage/groups.js index 115105185120..226b46ed796a 100644 --- a/libtextsecure/storage/groups.js +++ b/libtextsecure/storage/groups.js @@ -2,9 +2,9 @@ * vim: ts=4:sw=4:expandtab */ -'use strict'; - ;(function() { + 'use strict'; + /********************* *** Group Storage *** *********************/ diff --git a/libtextsecure/storage/unprocessed.js b/libtextsecure/storage/unprocessed.js new file mode 100644 index 000000000000..6e696560d465 --- /dev/null +++ b/libtextsecure/storage/unprocessed.js @@ -0,0 +1,28 @@ +/* + * vim: ts=4:sw=4:expandtab + */ + +;(function() { + 'use strict'; + + /***************************************** + *** Not-yet-processed message storage *** + *****************************************/ + window.textsecure = window.textsecure || {}; + window.textsecure.storage = window.textsecure.storage || {}; + + window.textsecure.storage.unprocessed = { + getAll: function() { + return textsecure.storage.protocol.getAllUnprocessed(); + }, + add: function(data) { + return textsecure.storage.protocol.addUnprocessed(data); + }, + update: function(id, updates) { + return textsecure.storage.protocol.updateUnprocessed(id, updates); + }, + remove: function(id) { + return textsecure.storage.protocol.removeUnprocessed(id); + }, + }; +})(); diff --git a/test/storage_test.js b/test/storage_test.js index bef917f0b9c2..e700525e7918 100644 --- a/test/storage_test.js +++ b/test/storage_test.js @@ -874,4 +874,60 @@ describe("SignalProtocolStore", function() { }).then(done,done); }); }); + + describe('Not yet processed messages', function() { + beforeEach(function() { + return store.getAllUnprocessed().then(function(items) { + return Promise.all(_.map(items, function(item) { + return store.removeUnprocessed(item.id); + })); + }).then(function() { + return store.getAllUnprocessed(); + }).then(function(items) { + assert.strictEqual(items.length, 0); + }); + }); + + it('adds two and gets them back', function() { + return Promise.all([ + store.addUnprocessed({id: 2, name: 'second', timestamp: 2}), + store.addUnprocessed({id: 3, name: 'third', timestamp: 3}), + store.addUnprocessed({id: 1, name: 'first', timestamp: 1}) + ]).then(function() { + return store.getAllUnprocessed(); + }).then(function(items) { + assert.strictEqual(items.length, 3); + + // they are in the proper order because the collection comparator is 'timestamp' + assert.strictEqual(items[0].name, 'first'); + assert.strictEqual(items[1].name, 'second'); + assert.strictEqual(items[2].name, 'third'); + }); + }); + + it('updateUnprocessed successfully updates only part of itme', function() { + var id = 1; + return store.addUnprocessed({id: id, name: 'first', timestamp: 1}).then(function() { + return store.updateUnprocessed(id, {name: 'updated'}); + }).then(function() { + return store.getAllUnprocessed(); + }).then(function(items) { + assert.strictEqual(items.length, 1); + assert.strictEqual(items[0].name, 'updated'); + assert.strictEqual(items[0].timestamp, 1); + }); + }); + + it('removeUnprocessed successfully deletes item', function() { + var id = 1; + return store.addUnprocessed({id: id, name: 'first', timestamp: 1}).then(function() { + return store.removeUnprocessed(id); + }).then(function() { + return store.getAllUnprocessed(); + }).then(function(items) { + assert.strictEqual(items.length, 0); + }); + }); + }); + });