From 0c09f9620fa4efbc80cbfe45afeb4c69146a0369 Mon Sep 17 00:00:00 2001 From: Scott Nonnenberg Date: Thu, 26 Sep 2019 12:56:31 -0700 Subject: [PATCH] Improve message download performance --- Gruntfile.js | 8 - app/logging.js | 11 ++ app/sql.js | 63 ++++++- app/sql_channel.js | 123 +++++++------ js/background.js | 256 +++++++++++++++++---------- js/conversation_controller.js | 7 +- js/delivery_receipts.js | 11 +- js/expiring_messages.js | 6 +- js/expiring_tap_to_view_messages.js | 6 +- js/logging.js | 23 ++- js/message_controller.js | 6 + js/models/conversations.js | 75 +++----- js/models/messages.js | 102 ++++------- js/modules/attachment_downloads.js | 16 +- js/modules/data.js | 61 ++++--- js/modules/debug.js | 6 +- js/modules/migrate_to_sql.js | 4 +- js/read_receipts.js | 11 +- js/read_syncs.js | 29 ++- js/signal_protocol_store.js | 82 ++++++--- js/util_worker_tasks.js | 44 ----- js/views/conversation_view.js | 2 +- libtextsecure/account_manager.js | 1 + libtextsecure/libsignal-protocol.js | 28 ++- libtextsecure/message_receiver.js | 244 +++++++++---------------- libtextsecure/sendmessage.js | 4 +- libtextsecure/storage/unprocessed.js | 10 +- test/storage_test.js | 2 + ts/util/batcher.ts | 97 ++++++++++ ts/util/index.ts | 4 + ts/util/lint/exceptions.json | 56 +++--- ts/util/waitBatcher.ts | 141 +++++++++++++++ 32 files changed, 906 insertions(+), 633 deletions(-) delete mode 100644 js/util_worker_tasks.js create mode 100644 ts/util/batcher.ts create mode 100644 ts/util/waitBatcher.ts diff --git a/Gruntfile.js b/Gruntfile.js index 03a98f9071..6e01da3a00 100644 --- a/Gruntfile.js +++ b/Gruntfile.js @@ -34,14 +34,6 @@ module.exports = grunt => { src: components, dest: 'js/components.js', }, - util_worker: { - src: [ - 'components/bytebuffer/dist/ByteBufferAB.js', - 'components/long/dist/Long.js', - 'js/util_worker_tasks.js', - ], - dest: 'js/util_worker.js', - }, libtextsecurecomponents: { src: libtextsecurecomponents, dest: 'libtextsecure/components.js', diff --git a/app/logging.js b/app/logging.js index 1f14092d68..ceced27b21 100644 --- a/app/logging.js +++ b/app/logging.js @@ -69,6 +69,17 @@ function initialize() { }); }); + ipc.on('batch-log', (first, batch) => { + batch.forEach(item => { + logger[item.level]( + { + time: new Date(item.timestamp), + }, + item.logText + ); + }); + }); + ipc.on('fetch-log', event => { fetch(logPath).then( data => { diff --git a/app/sql.js b/app/sql.js index e1dacbf6ba..0a6eb80356 100644 --- a/app/sql.js +++ b/app/sql.js @@ -19,10 +19,6 @@ const { pick, } = require('lodash'); -// To get long stack traces -// https://github.com/mapbox/node-sqlite3/wiki/API#sqlite3verbose -sql.verbose(); - module.exports = { initialize, close, @@ -58,6 +54,7 @@ module.exports = { removeAllItems, createOrUpdateSession, + createOrUpdateSessions, getSessionById, getSessionsByNumber, bulkAddSessions, @@ -71,6 +68,7 @@ module.exports = { saveConversations, getConversationById, updateConversation, + updateConversations, removeConversation, getAllConversations, getAllConversationIds, @@ -105,6 +103,7 @@ module.exports = { saveUnprocessed, updateUnprocessedAttempts, updateUnprocessedWithData, + updateUnprocessedsWithData, getUnprocessedById, saveUnprocesseds, removeUnprocessed, @@ -1259,10 +1258,20 @@ async function initialize({ configDir, key, messages }) { promisified = await openAndSetUpSQLCipher(filePath, { key }); // promisified.on('trace', async statement => { - // if (!db || statement.startsWith('--')) { - // console._log(statement); + // if ( + // !db || + // statement.startsWith('--') || + // statement.includes('COMMIT') || + // statement.includes('BEGIN') || + // statement.includes('ROLLBACK') + // ) { // return; // } + + // // Note that this causes problems when attempting to commit transactions - this + // // statement is running, and we get at SQLITE_BUSY error. So we delay. + // await new Promise(resolve => setTimeout(resolve, 1000)); + // const data = await db.get(`EXPLAIN QUERY PLAN ${statement}`); // console._log(`EXPLAIN QUERY PLAN ${statement}\n`, data && data.detail); // }); @@ -1469,6 +1478,19 @@ async function createOrUpdateSession(data) { } ); } +async function createOrUpdateSessions(array) { + await db.run('BEGIN TRANSACTION;'); + + try { + await Promise.all([...map(array, item => createOrUpdateSession(item))]); + await db.run('COMMIT TRANSACTION;'); + } catch (error) { + await db.run('ROLLBACK;'); + throw error; + } +} +createOrUpdateSessions.needsSerial = true; + async function getSessionById(id) { return getById(SESSIONS_TABLE, id); } @@ -1663,6 +1685,18 @@ async function updateConversation(data) { } ); } +async function updateConversations(array) { + await db.run('BEGIN TRANSACTION;'); + + try { + await Promise.all([...map(array, item => updateConversation(item))]); + await db.run('COMMIT TRANSACTION;'); + } catch (error) { + await db.run('ROLLBACK;'); + throw error; + } +} +updateConversations.needsSerial = true; async function removeConversation(id) { if (!Array.isArray(id)) { @@ -2353,6 +2387,23 @@ async function updateUnprocessedWithData(id, data = {}) { } ); } +async function updateUnprocessedsWithData(arrayOfUnprocessed) { + await db.run('BEGIN TRANSACTION;'); + + try { + await Promise.all([ + ...map(arrayOfUnprocessed, ({ id, data }) => + updateUnprocessedWithData(id, data) + ), + ]); + + await db.run('COMMIT TRANSACTION;'); + } catch (error) { + await db.run('ROLLBACK;'); + throw error; + } +} +updateUnprocessedsWithData.needsSerial = true; async function getUnprocessedById(id) { const row = await db.get('SELECT * FROM unprocessed WHERE id = $id;', { diff --git a/app/sql_channel.js b/app/sql_channel.js index 918b4f3f73..8cb5b81212 100644 --- a/app/sql_channel.js +++ b/app/sql_channel.js @@ -27,6 +27,75 @@ function makeNewMultipleQueue() { return multipleQueue; } +function makeSQLJob(fn, callName, jobId, args) { + // console.log(`Job ${jobId} (${callName}) queued`); + return async () => { + // const start = Date.now(); + // console.log(`Job ${jobId} (${callName}) started`); + const result = await fn(...args); + // const end = Date.now(); + // console.log(`Job ${jobId} (${callName}) succeeded in ${end - start}ms`); + return result; + }; +} + +async function handleCall(callName, jobId, args) { + const fn = sql[callName]; + if (!fn) { + throw new Error(`sql channel: ${callName} is not an available function`); + } + + let result; + + // We queue here to keep multi-query operations atomic. Without it, any multistage + // data operation (even within a BEGIN/COMMIT) can become interleaved, since all + // requests share one database connection. + + // A needsSerial method must be run in our single concurrency queue. + if (fn.needsSerial) { + if (singleQueue) { + result = await singleQueue.add(makeSQLJob(fn, callName, jobId, args)); + } else if (multipleQueue) { + makeNewSingleQueue(); + + singleQueue.add(() => multipleQueue.onIdle()); + multipleQueue = null; + + result = await singleQueue.add(makeSQLJob(fn, callName, jobId, args)); + } else { + makeNewSingleQueue(); + result = await singleQueue.add(makeSQLJob(fn, callName, jobId, args)); + } + } else { + // The request can be parallelized. To keep the same structure as the above block + // we force this section into the 'lonely if' pattern. + // eslint-disable-next-line no-lonely-if + if (multipleQueue) { + result = await multipleQueue.add(makeSQLJob(fn, callName, jobId, args)); + } else if (singleQueue) { + makeNewMultipleQueue(); + multipleQueue.pause(); + + const multipleQueueRef = multipleQueue; + const singleQueueRef = singleQueue; + + singleQueue = null; + const promise = multipleQueueRef.add( + makeSQLJob(fn, callName, jobId, args) + ); + await singleQueueRef.onIdle(); + + multipleQueueRef.start(); + result = await promise; + } else { + makeNewMultipleQueue(); + result = await multipleQueue.add(makeSQLJob(fn, callName, jobId, args)); + } + } + + return result; +} + function initialize() { if (initialized) { throw new Error('sqlChannels: already initialized!'); @@ -35,59 +104,7 @@ function initialize() { ipcMain.on(SQL_CHANNEL_KEY, async (event, jobId, callName, ...args) => { try { - const fn = sql[callName]; - if (!fn) { - throw new Error( - `sql channel: ${callName} is not an available function` - ); - } - - let result; - - // We queue here to keep multi-query operations atomic. Without it, any multistage - // data operation (even within a BEGIN/COMMIT) can become interleaved, since all - // requests share one database connection. - - // A needsSerial method must be run in our single concurrency queue. - if (fn.needsSerial) { - if (singleQueue) { - result = await singleQueue.add(() => fn(...args)); - } else if (multipleQueue) { - makeNewSingleQueue(); - - singleQueue.add(() => multipleQueue.onIdle()); - multipleQueue = null; - - result = await singleQueue.add(() => fn(...args)); - } else { - makeNewSingleQueue(); - result = await singleQueue.add(() => fn(...args)); - } - } else { - // The request can be parallelized. To keep the same structure as the above block - // we force this section into the 'lonely if' pattern. - // eslint-disable-next-line no-lonely-if - if (multipleQueue) { - result = await multipleQueue.add(() => fn(...args)); - } else if (singleQueue) { - makeNewMultipleQueue(); - multipleQueue.pause(); - - const multipleQueueRef = multipleQueue; - const singleQueueRef = singleQueue; - - singleQueue = null; - const promise = multipleQueueRef.add(() => fn(...args)); - await singleQueueRef.onIdle(); - - multipleQueueRef.start(); - result = await promise; - } else { - makeNewMultipleQueue(); - result = await multipleQueue.add(() => fn(...args)); - } - } - + const result = await handleCall(callName, jobId, args); event.sender.send(`${SQL_CHANNEL_KEY}-done`, jobId, null, result); } catch (error) { const errorForDisplay = error && error.stack ? error.stack : error; diff --git a/js/background.js b/js/background.js index 3e52cd5987..4dd5cebb60 100644 --- a/js/background.js +++ b/js/background.js @@ -21,6 +21,38 @@ concurrency: 1, }); deliveryReceiptQueue.pause(); + const deliveryReceiptBatcher = window.Signal.Util.createBatcher({ + wait: 500, + maxSize: 500, + processBatch: async items => { + const bySource = _.groupBy(items, item => item.source); + const sources = Object.keys(bySource); + + for (let i = 0, max = sources.length; i < max; i += 1) { + const source = sources[i]; + const timestamps = bySource[source].map(item => item.timestamp); + + try { + const { wrap, sendOptions } = ConversationController.prepareForSend( + source + ); + // eslint-disable-next-line no-await-in-loop + await wrap( + textsecure.messaging.sendDeliveryReceipt( + source, + timestamps, + sendOptions + ) + ); + } catch (error) { + window.log.error( + `Failed to send delivery receipt to ${source} for timestamps ${timestamps}:`, + error && error.stack ? error.stack : error + ); + } + } + }, + }); // Globally disable drag and drop document.body.addEventListener( @@ -78,7 +110,6 @@ }; // Load these images now to ensure that they don't flicker on first use - window.Signal.EmojiLib.preloadImages(); const images = []; function preload(list) { for (let index = 0, max = list.length; index < max; index += 1) { @@ -341,9 +372,20 @@ // Stop processing incoming messages if (messageReceiver) { await messageReceiver.stopProcessing(); + + await window.waitForAllBatchers(); + messageReceiver.unregisterBatchers(); + messageReceiver = null; } + // A number of still-to-queue database queries might be waiting inside batchers. + // We wait for these to empty first, and then shut down the data interface. + await Promise.all([ + window.waitForAllBatchers(), + window.waitForAllWaitBatchers(), + ]); + // Shut down the data interface cleanly await window.Signal.Data.shutdown(); }, @@ -850,7 +892,12 @@ } if (messageReceiver) { - messageReceiver.close(); + await messageReceiver.stopProcessing(); + + await window.waitForAllBatchers(); + messageReceiver.unregisterBatchers(); + + messageReceiver = null; } const USERNAME = storage.get('number_id'); @@ -1022,7 +1069,12 @@ view.applyTheme(); } } - function onEmpty() { + async function onEmpty() { + await Promise.all([ + window.waitForAllBatchers(), + window.waitForAllWaitBatchers(), + ]); + window.log.info('onEmpty: All outstanding database requests complete'); initialLoadComplete = true; window.readyForUpdates(); @@ -1057,6 +1109,8 @@ } } function onConfiguration(ev) { + ev.confirm(); + const { configuration } = ev; const { readReceipts, @@ -1084,11 +1138,11 @@ if (linkPreviews === true || linkPreviews === false) { storage.put('linkPreviews', linkPreviews); } - - ev.confirm(); } function onTyping(ev) { + // Note: this type of message is automatically removed from cache in MessageReceiver + const { typing, sender, senderDevice } = ev; const { groupId, started } = typing || {}; @@ -1118,6 +1172,8 @@ } async function onStickerPack(ev) { + ev.confirm(); + const packs = ev.stickerPacks || []; packs.forEach(pack => { @@ -1149,8 +1205,6 @@ } } }); - - ev.confirm(); } async function onContactReceived(ev) { @@ -1228,9 +1282,7 @@ conversation.set(newAttributes); } - await window.Signal.Data.updateConversation(id, conversation.attributes, { - Conversation: Whisper.Conversation, - }); + window.Signal.Data.updateConversation(id, conversation.attributes); const { expireTimer } = details; const isValidExpireTimer = typeof expireTimer === 'number'; if (isValidExpireTimer) { @@ -1312,9 +1364,7 @@ conversation.set(newAttributes); } - await window.Signal.Data.updateConversation(id, conversation.attributes, { - Conversation: Whisper.Conversation, - }); + window.Signal.Data.updateConversation(id, conversation.attributes); const { expireTimer } = details; const isValidExpireTimer = typeof expireTimer === 'number'; if (!isValidExpireTimer) { @@ -1375,20 +1425,31 @@ // eslint-disable-next-line no-bitwise const isProfileUpdate = Boolean(data.message.flags & PROFILE_KEY_UPDATE); if (isProfileUpdate) { - return handleMessageReceivedProfileUpdate({ + await handleMessageReceivedProfileUpdate({ data, confirm, messageDescriptor, }); + return; } - const message = await initIncomingMessage(data); - const isDuplicate = await isMessageDuplicate(message); + const isDuplicate = await isMessageDuplicate({ + source: data.source, + sourceDevice: data.sourceDevice, + sent_at: data.timestamp, + }); if (isDuplicate) { - window.log.warn('Received duplicate message', message.idForLogging()); - return event.confirm(); + window.log.warn( + 'Received duplicate message', + `${data.source}.${data.sourceDevice} ${data.timestamp}` + ); + confirm(); + return; } + // We do this after the duplicate check because it might send a delivery receipt + const message = await initIncomingMessage(data); + const ourNumber = textsecure.storage.user.getNumber(); const isGroupUpdate = data.message.group && @@ -1406,7 +1467,8 @@ window.log.warn( `Received message destined for group ${conversation.idForLogging()}, which we're not a part of. Dropping.` ); - return event.confirm(); + confirm(); + return; } await ConversationController.getOrCreateAndWait( @@ -1414,7 +1476,8 @@ messageDescriptor.type ); - return message.handleDataMessage(data.message, event.confirm, { + // Don't wait for handleDataMessage, as it has its own per-conversation queueing + message.handleDataMessage(data.message, event.confirm, { initialLoadComplete, }); } @@ -1433,9 +1496,7 @@ ); conversation.set({ profileSharing: true }); - await window.Signal.Data.updateConversation(id, conversation.attributes, { - Conversation: Whisper.Conversation, - }); + window.Signal.Data.updateConversation(id, conversation.attributes); // Then we update our own profileKey if it's different from what we have const ourNumber = textsecure.storage.user.getNumber(); @@ -1496,7 +1557,7 @@ } const message = await createSentMessage(data); - const existing = await getExistingMessage(message); + const existing = await getExistingMessage(message.attributes); const isUpdate = Boolean(data.isRecipientUpdate); if (isUpdate && existing) { @@ -1538,16 +1599,17 @@ messageDescriptor.id, messageDescriptor.type ); - await message.handleDataMessage(data.message, event.confirm, { + + // Don't wait for handleDataMessage, as it has its own per-conversation queueing + message.handleDataMessage(data.message, event.confirm, { initialLoadComplete, }); } } - async function getExistingMessage(message) { + async function getExistingMessage(data) { try { - const { attributes } = message; - const result = await window.Signal.Data.getMessageBySender(attributes, { + const result = await window.Signal.Data.getMessageBySender(data, { Message: Whisper.Message, }); @@ -1562,8 +1624,8 @@ } } - async function isMessageDuplicate(message) { - const result = await getExistingMessage(message); + async function isMessageDuplicate(data) { + const result = await getExistingMessage(data); return Boolean(result); } @@ -1587,26 +1649,14 @@ return message; } - deliveryReceiptQueue.add(async () => { - try { - const { wrap, sendOptions } = ConversationController.prepareForSend( - data.source - ); - await wrap( - textsecure.messaging.sendDeliveryReceipt( - data.source, - data.timestamp, - sendOptions - ) - ); - } catch (error) { - window.log.error( - `Failed to send delivery receipt to ${data.source} for message ${ - data.timestamp - }:`, - error && error.stack ? error.stack : error - ); - } + // Note: We both queue and batch because we want to wait until we are done processing + // incoming messages to start sending outgoing delivery receipts. The queue can be + // paused easily. + deliveryReceiptQueue.add(() => { + deliveryReceiptBatcher.add({ + source: data.source, + timestamp: data.timestamp, + }); }); return message; @@ -1625,6 +1675,10 @@ if (messageReceiver) { await messageReceiver.stopProcessing(); + + await window.waitForAllBatchers(); + messageReceiver.unregisterBatchers(); + messageReceiver = null; } @@ -1701,7 +1755,7 @@ } const envelope = ev.proto; const message = await initIncomingMessage(envelope, { isError: true }); - const isDuplicate = await isMessageDuplicate(message); + const isDuplicate = await isMessageDuplicate(message.attributes); if (isDuplicate) { ev.confirm(); window.log.warn( @@ -1710,48 +1764,62 @@ return; } - const id = await window.Signal.Data.saveMessage(message.attributes, { - Message: Whisper.Message, - }); - message.set({ id }); - await message.saveErrors(error || new Error('Error was null')); - const conversationId = message.get('conversationId'); const conversation = await ConversationController.getOrCreateAndWait( conversationId, 'private' ); - conversation.set({ - active_at: Date.now(), - unreadCount: conversation.get('unreadCount') + 1, - }); - const conversationTimestamp = conversation.get('timestamp'); - const messageTimestamp = message.get('timestamp'); - if (!conversationTimestamp || messageTimestamp > conversationTimestamp) { - conversation.set({ timestamp: message.get('sent_at') }); - } + // This matches the queueing behavior used in Message.handleDataMessage + conversation.queueJob(async () => { + const model = new Whisper.Message({ + ...message.attributes, + id: window.getGuid(), + }); + await model.saveErrors(error || new Error('Error was null'), { + skipSave: true, + }); - conversation.trigger('newmessage', message); - conversation.notify(message); + MessageController.register(model.id, model); + await window.Signal.Data.saveMessage(model.attributes, { + Message: Whisper.Message, + forceSave: true, + }); - if (ev.confirm) { - ev.confirm(); - } + conversation.set({ + active_at: Date.now(), + unreadCount: conversation.get('unreadCount') + 1, + }); - await window.Signal.Data.updateConversation( - conversationId, - conversation.attributes, - { - Conversation: Whisper.Conversation, + const conversationTimestamp = conversation.get('timestamp'); + const messageTimestamp = model.get('timestamp'); + if ( + !conversationTimestamp || + messageTimestamp > conversationTimestamp + ) { + conversation.set({ timestamp: model.get('sent_at') }); } - ); + + conversation.trigger('newmessage', model); + conversation.notify(model); + + if (ev.confirm) { + ev.confirm(); + } + + window.Signal.Data.updateConversation( + conversationId, + conversation.attributes + ); + }); } throw error; } async function onViewSync(ev) { + ev.confirm(); + const { source, timestamp } = ev; window.log.info(`view sync ${source} ${timestamp}`); @@ -1760,10 +1828,7 @@ timestamp, }); - sync.on('remove', ev.confirm); - - // Calling this directly so we can wait for completion - return Whisper.ViewSyncs.onSync(sync); + Whisper.ViewSyncs.onSync(sync); } function onReadReceipt(ev) { @@ -1772,8 +1837,10 @@ const { reader } = ev.read; window.log.info('read receipt', reader, timestamp); + ev.confirm(); + if (!storage.get('read-receipt-setting')) { - return ev.confirm(); + return; } const receipt = Whisper.ReadReceipts.add({ @@ -1782,10 +1849,8 @@ read_at: readAt, }); - receipt.on('remove', ev.confirm); - - // Calling this directly so we can wait for completion - return Whisper.ReadReceipts.onReceipt(receipt); + // Note: We do not wait for completion here + Whisper.ReadReceipts.onReceipt(receipt); } function onReadSync(ev) { @@ -1802,7 +1867,8 @@ receipt.on('remove', ev.confirm); - // Calling this directly so we can wait for completion + // Note: Here we wait, because we want read states to be in the database + // before we move on. return Whisper.ReadSyncs.onReceipt(receipt); } @@ -1811,6 +1877,10 @@ const key = ev.verified.identityKey; let state; + if (ev.confirm) { + ev.confirm(); + } + const c = new Whisper.Conversation({ id: number, }); @@ -1861,10 +1931,6 @@ } else { await contact.setUnverified(options); } - - if (ev.confirm) { - ev.confirm(); - } } function onDeliveryReceipt(ev) { @@ -1875,14 +1941,14 @@ deliveryReceipt.timestamp ); + ev.confirm(); + const receipt = Whisper.DeliveryReceipts.add({ timestamp: deliveryReceipt.timestamp, source: deliveryReceipt.source, }); - ev.confirm(); - - // Calling this directly so we can wait for completion - return Whisper.DeliveryReceipts.onReceipt(receipt); + // Note: We don't wait for completion here + Whisper.DeliveryReceipts.onReceipt(receipt); } })(); diff --git a/js/conversation_controller.js b/js/conversation_controller.js index e0bcb33dba..ace43c0d10 100644 --- a/js/conversation_controller.js +++ b/js/conversation_controller.js @@ -196,12 +196,9 @@ this.model.set({ draft: draft.slice(0, MAX_MESSAGE_BODY_LENGTH), }); - await window.Signal.Data.updateConversation( + window.Signal.Data.updateConversation( conversation.id, - conversation.attributes, - { - Conversation: Whisper.Conversation, - } + conversation.attributes ); } }) diff --git a/js/delivery_receipts.js b/js/delivery_receipts.js index 0e5abe3d0d..b6e6b16119 100644 --- a/js/delivery_receipts.js +++ b/js/delivery_receipts.js @@ -93,14 +93,13 @@ }); if (message.isExpiring() && !expirationStartTimestamp) { - // This will save the message for us while starting the timer - await message.setToExpire(); - } else { - await window.Signal.Data.saveMessage(message.attributes, { - Message: Whisper.Message, - }); + await message.setToExpire(false, { skipSave: true }); } + await window.Signal.Data.saveMessage(message.attributes, { + Message: Whisper.Message, + }); + // notify frontend listeners const conversation = ConversationController.get( message.get('conversationId') diff --git a/js/expiring_messages.js b/js/expiring_messages.js index 365c03fc57..be1db7ff01 100644 --- a/js/expiring_messages.js +++ b/js/expiring_messages.js @@ -88,7 +88,7 @@ clearTimeout(timeout); timeout = setTimeout(destroyExpiredMessages, wait); } - const throttledCheckExpiringMessages = _.throttle( + const debouncedCheckExpiringMessages = _.debounce( checkExpiringMessages, 1000 ); @@ -97,9 +97,9 @@ nextExpiration: null, init(events) { checkExpiringMessages(); - events.on('timetravel', throttledCheckExpiringMessages); + events.on('timetravel', debouncedCheckExpiringMessages); }, - update: throttledCheckExpiringMessages, + update: debouncedCheckExpiringMessages, }; const TimerOption = Backbone.Model.extend({ diff --git a/js/expiring_tap_to_view_messages.js b/js/expiring_tap_to_view_messages.js index 16522b51e0..52e4ee46e0 100644 --- a/js/expiring_tap_to_view_messages.js +++ b/js/expiring_tap_to_view_messages.js @@ -83,7 +83,7 @@ checkTapToViewMessages(); }, wait); } - const throttledCheckTapToViewMessages = _.throttle( + const debouncedCheckTapToViewMessages = _.debounce( checkTapToViewMessages, 1000 ); @@ -92,8 +92,8 @@ nextCheck: null, init(events) { checkTapToViewMessages(); - events.on('timetravel', throttledCheckTapToViewMessages); + events.on('timetravel', debouncedCheckTapToViewMessages); }, - update: throttledCheckTapToViewMessages, + update: debouncedCheckTapToViewMessages, }; })(); diff --git a/js/logging.js b/js/logging.js index 4d749178bc..89dedf776d 100644 --- a/js/logging.js +++ b/js/logging.js @@ -8,6 +8,7 @@ const _ = require('lodash'); const debuglogs = require('./modules/debuglogs'); const Privacy = require('./modules/privacy'); +const { createBatcher } = require('../ts/util/batcher'); const ipc = electron.ipcRenderer; @@ -98,13 +99,31 @@ const publish = debuglogs.upload; // A modern logging interface for the browser +const env = window.getEnvironment(); +const IS_PRODUCTION = env === 'production'; + +const ipcBatcher = createBatcher({ + wait: 500, + size: 20, + processBatch: items => { + ipc.send('batch-log', items); + }, +}); + // The Bunyan API: https://github.com/trentm/node-bunyan#log-method-api function logAtLevel(level, prefix, ...args) { - console._log(prefix, now(), ...args); + if (!IS_PRODUCTION) { + console._log(prefix, now(), ...args); + } const str = cleanArgsForIPC(args); const logText = Privacy.redactAll(str); - ipc.send(`log-${level}`, logText); + + ipcBatcher.add({ + timestamp: Date.now(), + level, + logText, + }); } window.log = { diff --git a/js/message_controller.js b/js/message_controller.js index d871bcc833..f6dfd6155a 100644 --- a/js/message_controller.js +++ b/js/message_controller.js @@ -54,6 +54,11 @@ } } + function getById(id) { + const existing = messageLookup[id]; + return existing && existing.message ? existing.message : null; + } + function _get() { return messageLookup; } @@ -64,6 +69,7 @@ register, unregister, cleanup, + getById, _get, }; })(); diff --git a/js/models/conversations.js b/js/models/conversations.js index e9bc61d42c..ac320b4d6f 100644 --- a/js/models/conversations.js +++ b/js/models/conversations.js @@ -101,14 +101,14 @@ this.messageCollection.on('send-error', this.onMessageError, this); this.throttledBumpTyping = _.throttle(this.bumpTyping, 300); - const debouncedUpdateLastMessage = _.debounce( + this.debouncedUpdateLastMessage = _.debounce( this.updateLastMessage.bind(this), 200 ); this.listenTo( this.messageCollection, 'add remove destroy', - debouncedUpdateLastMessage + this.debouncedUpdateLastMessage ); this.listenTo(this.messageCollection, 'sent', this.updateLastMessage); this.listenTo( @@ -268,7 +268,7 @@ }, async updateAndMerge(message) { - this.updateLastMessage(); + this.debouncedUpdateLastMessage(); const mergeMessage = () => { const existing = this.messageCollection.get(message.id); @@ -284,7 +284,7 @@ }, async onExpired(message) { - this.updateLastMessage(); + this.debouncedUpdateLastMessage(); const removeMessage = () => { const { id } = message; @@ -317,7 +317,7 @@ : `${message.source}.${message.sourceDevice}`; this.clearContactTypingTimer(identifier); - await this.updateLastMessage(); + this.debouncedUpdateLastMessage(); }, addSingleMessage(message) { @@ -411,11 +411,7 @@ if (this.get('verified') !== verified) { this.set({ verified }); - - // we don't await here because we don't need to wait for this to finish - window.Signal.Data.updateConversation(this.id, this.attributes, { - Conversation: Whisper.Conversation, - }); + window.Signal.Data.updateConversation(this.id, this.attributes); } return; @@ -479,9 +475,7 @@ } this.set({ verified }); - await window.Signal.Data.updateConversation(this.id, this.attributes, { - Conversation: Whisper.Conversation, - }); + window.Signal.Data.updateConversation(this.id, this.attributes); // Three situations result in a verification notice in the conversation: // 1) The message came from an explicit verification in another client (not @@ -1014,9 +1008,7 @@ draft: null, draftTimestamp: null, }); - await window.Signal.Data.updateConversation(this.id, this.attributes, { - Conversation: Whisper.Conversation, - }); + window.Signal.Data.updateConversation(this.id, this.attributes); // We're offline! if (!textsecure.messaging) { @@ -1143,10 +1135,9 @@ conversation.set({ sealedSender: SEALED_SENDER.DISABLED, }); - await window.Signal.Data.updateConversation( + window.Signal.Data.updateConversation( conversation.id, - conversation.attributes, - { Conversation: Whisper.Conversation } + conversation.attributes ); } }) @@ -1175,10 +1166,9 @@ sealedSender: SEALED_SENDER.UNRESTRICTED, }); } - await window.Signal.Data.updateConversation( + window.Signal.Data.updateConversation( conversation.id, - conversation.attributes, - { Conversation: Whisper.Conversation } + conversation.attributes ); } }) @@ -1299,7 +1289,7 @@ this.set(lastMessageUpdate); if (this.hasChanged()) { - await window.Signal.Data.updateConversation(this.id, this.attributes, { + window.Signal.Data.updateConversation(this.id, this.attributes, { Conversation: Whisper.Conversation, }); } @@ -1307,9 +1297,7 @@ async setArchived(isArchived) { this.set({ isArchived }); - await window.Signal.Data.updateConversation(this.id, this.attributes, { - Conversation: Whisper.Conversation, - }); + window.Signal.Data.updateConversation(this.id, this.attributes); }, async updateExpirationTimer( @@ -1346,9 +1334,7 @@ const timestamp = (receivedAt || Date.now()) - 1; this.set({ expireTimer }); - await window.Signal.Data.updateConversation(this.id, this.attributes, { - Conversation: Whisper.Conversation, - }); + window.Signal.Data.updateConversation(this.id, this.attributes); const model = new Whisper.Message({ // Even though this isn't reflected to the user, we want to place the last seen @@ -1516,9 +1502,7 @@ if (this.get('type') === 'group') { const groupNumbers = this.getRecipients(); this.set({ left: true }); - await window.Signal.Data.updateConversation(this.id, this.attributes, { - Conversation: Whisper.Conversation, - }); + window.Signal.Data.updateConversation(this.id, this.attributes); const model = new Whisper.Message({ group_update: { left: 'You' }, @@ -1565,14 +1549,9 @@ _.map(oldUnread, async providedM => { const m = MessageController.register(providedM.id, providedM); - if (!this.messageCollection.get(m.id)) { - window.log.warn( - 'Marked a message as read in the database, but ' + - 'it was not in messageCollection.' - ); - } - + // Note that this will update the message in the database await m.markRead(options.readAt); + const errors = m.get('errors'); return { sender: m.get('source'), @@ -1588,9 +1567,7 @@ const unreadCount = unreadMessages.length - read.length; this.set({ unreadCount }); - await window.Signal.Data.updateConversation(this.id, this.attributes, { - Conversation: Whisper.Conversation, - }); + window.Signal.Data.updateConversation(this.id, this.attributes); // If a message has errors, we don't want to send anything out about it. // read syncs - let's wait for a client that really understands the message @@ -1783,9 +1760,7 @@ } if (c.hasChanged()) { - await window.Signal.Data.updateConversation(id, c.attributes, { - Conversation: Whisper.Conversation, - }); + window.Signal.Data.updateConversation(id, c.attributes); } }, async setProfileName(encryptedName) { @@ -1860,7 +1835,7 @@ await this.deriveAccessKeyIfNeeded(); - await window.Signal.Data.updateConversation(this.id, this.attributes, { + window.Signal.Data.updateConversation(this.id, this.attributes, { Conversation: Whisper.Conversation, }); } @@ -1883,9 +1858,7 @@ sealedSender: SEALED_SENDER.UNKNOWN, }); - await window.Signal.Data.updateConversation(this.id, this.attributes, { - Conversation: Whisper.Conversation, - }); + window.Signal.Data.updateConversation(this.id, this.attributes); } }, @@ -1944,9 +1917,7 @@ timestamp: null, active_at: null, }); - await window.Signal.Data.updateConversation(this.id, this.attributes, { - Conversation: Whisper.Conversation, - }); + window.Signal.Data.updateConversation(this.id, this.attributes); await window.Signal.Data.removeAllMessagesInConversation(this.id, { MessageCollection: Whisper.MessageCollection, diff --git a/js/models/messages.js b/js/models/messages.js index 79cc7a296e..b01f00c68a 100644 --- a/js/models/messages.js +++ b/js/models/messages.js @@ -1004,7 +1004,9 @@ hasErrors() { return _.size(this.get('errors')) > 0; }, - async saveErrors(providedErrors) { + async saveErrors(providedErrors, options = {}) { + const { skipSave } = options; + let errors = providedErrors; if (!(errors instanceof Array)) { @@ -1030,11 +1032,16 @@ errors = errors.concat(this.get('errors') || []); this.set({ errors }); - await window.Signal.Data.saveMessage(this.attributes, { - Message: Whisper.Message, - }); + + if (!skipSave) { + await window.Signal.Data.saveMessage(this.attributes, { + Message: Whisper.Message, + }); + } }, - async markRead(readAt) { + async markRead(readAt, options = {}) { + const { skipSave } = options; + this.unset('unread'); if (this.get('expireTimer') && !this.get('expirationStartTimestamp')) { @@ -1051,9 +1058,11 @@ }) ); - await window.Signal.Data.saveMessage(this.attributes, { - Message: Whisper.Message, - }); + if (!skipSave) { + await window.Signal.Data.saveMessage(this.attributes, { + Message: Whisper.Message, + }); + } }, isExpiring() { return this.get('expireTimer') && this.get('expirationStartTimestamp'); @@ -1074,7 +1083,9 @@ } return msFromNow; }, - async setToExpire(force = false) { + async setToExpire(force = false, options = {}) { + const { skipSave } = options; + if (this.isExpiring() && (force || !this.get('expires_at'))) { const start = this.get('expirationStartTimestamp'); const delta = this.get('expireTimer') * 1000; @@ -1082,7 +1093,7 @@ this.set({ expires_at: expiresAt }); const id = this.get('id'); - if (id) { + if (id && !skipSave) { await window.Signal.Data.saveMessage(this.attributes, { Message: Whisper.Message, }); @@ -1664,10 +1675,6 @@ sticker, }); - await window.Signal.Data.saveMessage(this.attributes, { - Message: Whisper.Message, - }); - return true; } @@ -1880,6 +1887,7 @@ } message.set({ + id: window.getGuid(), attachments: dataMessage.attachments, body: dataMessage.body, contact: dataMessage.contact, @@ -2024,8 +2032,8 @@ !conversationTimestamp || message.get('sent_at') > conversationTimestamp ) { - conversation.lastMessage = message.getNotificationText(); conversation.set({ + lastMessage: message.getNotificationText(), timestamp: message.get('sent_at'), }); } @@ -2045,12 +2053,6 @@ } } - const id = await window.Signal.Data.saveMessage(message.attributes, { - Message: Whisper.Message, - }); - message.set({ id }); - MessageController.register(message.id, message); - if (message.isTapToView() && type === 'outgoing') { await message.eraseContents(); } @@ -2076,56 +2078,24 @@ } } - if (message.isUnsupportedMessage()) { - await message.eraseContents(); - } else { - // Note that this can save the message again, if jobs were queued. We need to - // call it after we have an id for this message, because the jobs refer back - // to their source message. - await message.queueAttachmentDownloads(); - } - - await window.Signal.Data.updateConversation( + MessageController.register(message.id, message); + window.Signal.Data.updateConversation( conversationId, - conversation.attributes, - { Conversation: Whisper.Conversation } + conversation.attributes ); - conversation.trigger('newmessage', message); - - try { - // We go to the database here because, between the message save above and - // the previous line's trigger() call, we might have marked all messages - // unread in the database. This message might already be read! - const fetched = await window.Signal.Data.getMessageById( - message.get('id'), - { - Message: Whisper.Message, - } - ); - const previousUnread = message.get('unread'); - - // Important to update message with latest read state from database - message.merge(fetched); - - if (previousUnread !== message.get('unread')) { - window.log.warn( - 'Caught race condition on new message read state! ' + - 'Manually starting timers.' - ); - // We call markRead() even though the message is already - // marked read because we need to start expiration - // timers, etc. - message.markRead(); - } - } catch (error) { - window.log.warn( - 'handleDataMessage: Message', - message.idForLogging(), - 'was deleted' - ); + if (message.isUnsupportedMessage()) { + await message.eraseContents(); } + await message.queueAttachmentDownloads(); + await window.Signal.Data.saveMessage(message.attributes, { + Message: Whisper.Message, + forceSave: true, + }); + + conversation.trigger('newmessage', message); + if (message.get('unread')) { await conversation.notify(message); } diff --git a/js/modules/attachment_downloads.js b/js/modules/attachment_downloads.js index 9ff4724191..6ff1d4aa6b 100644 --- a/js/modules/attachment_downloads.js +++ b/js/modules/attachment_downloads.js @@ -166,9 +166,11 @@ async function _runJob(job) { ); } - const found = await getMessageById(messageId, { - Message: Whisper.Message, - }); + const found = + MessageController.getById(messageId) || + (await getMessageById(messageId, { + Message: Whisper.Message, + })); if (!found) { logger.error('_runJob: Source message not found, deleting job'); await _finishJob(null, id); @@ -434,13 +436,7 @@ async function _addAttachmentToMessage(message, attachment, { type, index }) { hash: await computeHash(loadedAttachment.data), }, }); - await Signal.Data.updateConversation( - conversationId, - conversation.attributes, - { - Conversation: Whisper.Conversation, - } - ); + Signal.Data.updateConversation(conversationId, conversation.attributes); return; } diff --git a/js/modules/data.js b/js/modules/data.js index b145276159..e212555752 100644 --- a/js/modules/data.js +++ b/js/modules/data.js @@ -6,14 +6,17 @@ const { cloneDeep, forEach, get, + groupBy, isFunction, isObject, + last, map, set, } = require('lodash'); const { base64ToArrayBuffer, arrayBufferToBase64 } = require('./crypto'); const MessageType = require('./types/message'); +const { createBatcher } = require('../../ts/util/batcher'); const { ipcRenderer } = electron; @@ -78,6 +81,7 @@ module.exports = { removeAllItems, createOrUpdateSession, + createOrUpdateSessions, getSessionById, getSessionsByNumber, bulkAddSessions, @@ -91,6 +95,7 @@ module.exports = { saveConversations, getConversationById, updateConversation, + updateConversations, removeConversation, _removeConversations, @@ -134,6 +139,7 @@ module.exports = { saveUnprocesseds, updateUnprocessedAttempts, updateUnprocessedWithData, + updateUnprocessedsWithData, removeUnprocessed, removeAllUnprocessed, @@ -205,20 +211,21 @@ function _cleanData(data) { } async function _shutdown() { + const jobKeys = Object.keys(_jobs); + window.log.info( + `data.shutdown: shutdown requested. ${jobKeys.length} jobs outstanding` + ); + if (_shutdownPromise) { - return _shutdownPromise; + await _shutdownPromise; + return; } _shuttingDown = true; - const jobKeys = Object.keys(_jobs); - window.log.info( - `data.shutdown: starting process. ${jobKeys.length} jobs outstanding` - ); - // No outstanding jobs, return immediately - if (jobKeys.length === 0) { - return null; + if (jobKeys.length === 0 || _DEBUG) { + return; } // Outstanding jobs; we need to wait until the last one is done @@ -233,7 +240,7 @@ async function _shutdown() { }; }); - return _shutdownPromise; + await _shutdownPromise; } function _makeJob(fnName) { @@ -268,7 +275,7 @@ function _updateJob(id, data) { _removeJob(id); const end = Date.now(); const delta = end - start; - if (delta > 10) { + if (delta > 10 || _DEBUG) { window.log.info( `SQL channel job ${id} (${fnName}) succeeded in ${end - start}ms` ); @@ -556,6 +563,9 @@ async function removeAllItems() { async function createOrUpdateSession(data) { await channels.createOrUpdateSession(data); } +async function createOrUpdateSessions(items) { + await channels.createOrUpdateSessions(items); +} async function getSessionById(id) { const session = await channels.getSessionById(id); return session; @@ -600,17 +610,25 @@ async function getConversationById(id, { Conversation }) { return new Conversation(data); } -async function updateConversation(id, data, { Conversation }) { - const existing = await getConversationById(id, { Conversation }); - if (!existing) { - throw new Error(`Conversation ${id} does not exist!`); - } +const updateConversationBatcher = createBatcher({ + wait: 500, + maxSize: 20, + processBatch: async items => { + // We only care about the most recent update for each conversation + const byId = groupBy(items, item => item.id); + const ids = Object.keys(byId); + const mostRecent = ids.map(id => last(byId[id])); - const merged = { - ...existing.attributes, - ...data, - }; - await channels.updateConversation(merged); + await updateConversations(mostRecent); + }, +}); + +function updateConversation(id, data) { + updateConversationBatcher.add(data); +} + +async function updateConversations(data) { + await channels.updateConversations(data); } async function removeConversation(id, { Conversation }) { @@ -932,6 +950,9 @@ async function updateUnprocessedAttempts(id, attempts) { async function updateUnprocessedWithData(id, data) { await channels.updateUnprocessedWithData(id, data); } +async function updateUnprocessedsWithData(items) { + await channels.updateUnprocessedsWithData(items); +} async function removeUnprocessed(id) { await channels.removeUnprocessed(id); diff --git a/js/modules/debug.js b/js/modules/debug.js index 5f0e4037cc..070986cad6 100644 --- a/js/modules/debug.js +++ b/js/modules/debug.js @@ -50,11 +50,7 @@ exports.createConversation = async ({ unread: numMessages, }); const conversationId = conversation.get('id'); - await Signal.Data.updateConversation( - conversationId, - conversation.attributes, - { Conversation: Whisper.Conversation } - ); + Signal.Data.updateConversation(conversationId, conversation.attributes); await Promise.all( range(0, numMessages).map(async index => { diff --git a/js/modules/migrate_to_sql.js b/js/modules/migrate_to_sql.js index 4278ad0398..fb69954723 100644 --- a/js/modules/migrate_to_sql.js +++ b/js/modules/migrate_to_sql.js @@ -117,11 +117,11 @@ async function migrateToSQL({ if (item.envelope) { // eslint-disable-next-line no-param-reassign - item.envelope = await arrayBufferToString(item.envelope); + item.envelope = arrayBufferToString(item.envelope); } if (item.decrypted) { // eslint-disable-next-line no-param-reassign - item.decrypted = await arrayBufferToString(item.decrypted); + item.decrypted = arrayBufferToString(item.decrypted); } }) ); diff --git a/js/read_receipts.js b/js/read_receipts.js index 67aad052b2..38ed106e86 100644 --- a/js/read_receipts.js +++ b/js/read_receipts.js @@ -98,14 +98,13 @@ }); if (message.isExpiring() && !expirationStartTimestamp) { - // This will save the message for us while starting the timer - await message.setToExpire(); - } else { - await window.Signal.Data.saveMessage(message.attributes, { - Message: Whisper.Message, - }); + await message.setToExpire(false, { skipSave: true }); } + await window.Signal.Data.saveMessage(message.attributes, { + Message: Whisper.Message, + }); + // notify frontend listeners const conversation = ConversationController.get( message.get('conversationId') diff --git a/js/read_syncs.js b/js/read_syncs.js index 9327943288..4d3375415f 100644 --- a/js/read_syncs.js +++ b/js/read_syncs.js @@ -41,23 +41,14 @@ const notificationForMessage = found ? Whisper.Notifications.findWhere({ messageId: found.id }) : null; - const removedNotification = Whisper.Notifications.remove( - notificationForMessage - ); - const receiptSender = receipt.get('sender'); - const receiptTimestamp = receipt.get('timestamp'); - const wasMessageFound = Boolean(found); - const wasNotificationFound = Boolean(notificationForMessage); - const wasNotificationRemoved = Boolean(removedNotification); - window.log.info('Receive read sync:', { - receiptSender, - receiptTimestamp, - wasMessageFound, - wasNotificationFound, - wasNotificationRemoved, - }); + Whisper.Notifications.remove(notificationForMessage); if (!found) { + window.log.info( + 'No message for read sync', + receipt.get('sender'), + receipt.get('timestamp') + ); return; } @@ -68,7 +59,7 @@ // timer to the time specified by the read sync if it's earlier than // the previous read time. if (message.isUnread()) { - await message.markRead(readAt); + await message.markRead(readAt, { skipSave: true }); // onReadMessage may result in messages older than this one being // marked read. We want those messages to have the same expire timer @@ -87,7 +78,7 @@ message.set({ expirationStartTimestamp }); const force = true; - await message.setToExpire(force); + await message.setToExpire(force, { skipSave: true }); const conversation = message.getConversation(); if (conversation) { @@ -95,6 +86,10 @@ } } + await window.Signal.Data.saveMessage(message.attributes, { + Message: Whisper.Message, + }); + this.remove(receipt); } catch (error) { window.log.error( diff --git a/js/signal_protocol_store.js b/js/signal_protocol_store.js index 0a14b1a7cf..2916f78f63 100644 --- a/js/signal_protocol_store.js +++ b/js/signal_protocol_store.js @@ -147,9 +147,31 @@ }, }); - function SignalProtocolStore() {} + function SignalProtocolStore() { + this.sessionUpdateBatcher = window.Signal.Util.createBatcher({ + wait: 500, + maxSize: 20, + processBatch: async items => { + // We only care about the most recent update for each session + const byId = _.groupBy(items, item => item.id); + const ids = Object.keys(byId); + const mostRecent = ids.map(id => { + const item = _.last(byId[id]); + + return { + ...item, + record: item.record.serialize(), + }; + }); + + await window.Signal.Data.createOrUpdateSessions(mostRecent); + }, + }); + } + + async function _hydrateCache(object, field, itemsPromise, idField) { + const items = await itemsPromise; - async function _hydrateCache(object, field, items, idField) { const cache = Object.create(null); for (let i = 0, max = items.length; i < max; i += 1) { const item = items[i]; @@ -167,48 +189,53 @@ constructor: SignalProtocolStore, async hydrateCaches() { await Promise.all([ + (async () => { + const item = await window.Signal.Data.getItemById('identityKey'); + this.ourIdentityKey = item ? item.value : undefined; + })(), + (async () => { + const item = await window.Signal.Data.getItemById('registrationId'); + this.ourRegistrationId = item ? item.value : undefined; + })(), _hydrateCache( this, 'identityKeys', - await window.Signal.Data.getAllIdentityKeys(), + window.Signal.Data.getAllIdentityKeys(), 'id' ), _hydrateCache( this, 'sessions', - await window.Signal.Data.getAllSessions(), + (async () => { + const sessions = await window.Signal.Data.getAllSessions(); + + return sessions.map(item => ({ + ...item, + record: libsignal.SessionRecord.deserialize(item.record), + })); + })(), 'id' ), _hydrateCache( this, 'preKeys', - await window.Signal.Data.getAllPreKeys(), + window.Signal.Data.getAllPreKeys(), 'id' ), _hydrateCache( this, 'signedPreKeys', - await window.Signal.Data.getAllSignedPreKeys(), + window.Signal.Data.getAllSignedPreKeys(), 'id' ), ]); }, async getIdentityKeyPair() { - const item = await window.Signal.Data.getItemById('identityKey'); - if (item) { - return item.value; - } - - return undefined; + return this.ourIdentityKey; }, async getLocalRegistrationId() { - const item = await window.Signal.Data.getItemById('registrationId'); - if (item) { - return item.value; - } - - return undefined; + return this.ourRegistrationId; }, // PreKeys @@ -337,7 +364,10 @@ }; this.sessions[encodedNumber] = data; - await window.Signal.Data.createOrUpdateSession(data); + + // Note: Because these are cached in memory, we batch and make these database + // updates out of band. + this.sessionUpdateBatcher.add(data); }, async getDeviceIds(number) { if (number === null || number === undefined) { @@ -845,14 +875,24 @@ forceSave: true, }); }, + addMultipleUnprocessed(array) { + // We need to pass forceSave because the data has an id already, which will cause + // an update instead of an insert. + return window.Signal.Data.saveUnprocesseds(array, { + forceSave: true, + }); + }, updateUnprocessedAttempts(id, attempts) { return window.Signal.Data.updateUnprocessedAttempts(id, attempts); }, updateUnprocessedWithData(id, data) { return window.Signal.Data.updateUnprocessedWithData(id, data); }, - removeUnprocessed(id) { - return window.Signal.Data.removeUnprocessed(id); + updateUnprocessedsWithData(items) { + return window.Signal.Data.updateUnprocessedsWithData(items); + }, + removeUnprocessed(idOrArray) { + return window.Signal.Data.removeUnprocessed(idOrArray); }, removeAllUnprocessed() { return window.Signal.Data.removeAllUnprocessed(); diff --git a/js/util_worker_tasks.js b/js/util_worker_tasks.js deleted file mode 100644 index 4d6c80bda0..0000000000 --- a/js/util_worker_tasks.js +++ /dev/null @@ -1,44 +0,0 @@ -/* global dcodeIO */ -/* eslint-disable strict */ - -'use strict'; - -const functions = { - stringToArrayBufferBase64, - arrayBufferToStringBase64, -}; - -onmessage = async e => { - const [jobId, fnName, ...args] = e.data; - - try { - const fn = functions[fnName]; - if (!fn) { - throw new Error(`Worker: job ${jobId} did not find function ${fnName}`); - } - const result = await fn(...args); - postMessage([jobId, null, result]); - } catch (error) { - const errorForDisplay = prepareErrorForPostMessage(error); - postMessage([jobId, errorForDisplay]); - } -}; - -function prepareErrorForPostMessage(error) { - if (!error) { - return null; - } - - if (error.stack) { - return error.stack; - } - - return error.message; -} - -function stringToArrayBufferBase64(string) { - return dcodeIO.ByteBuffer.wrap(string, 'base64').toArrayBuffer(); -} -function arrayBufferToStringBase64(arrayBuffer) { - return dcodeIO.ByteBuffer.wrap(arrayBuffer).toString('base64'); -} diff --git a/js/views/conversation_view.js b/js/views/conversation_view.js index bd8115c77a..7d37049d99 100644 --- a/js/views/conversation_view.js +++ b/js/views/conversation_view.js @@ -944,7 +944,7 @@ }, async saveModel() { - await window.Signal.Data.updateConversation( + window.Signal.Data.updateConversation( this.model.id, this.model.attributes, { diff --git a/libtextsecure/account_manager.js b/libtextsecure/account_manager.js index 47782e3ca3..5ca169b784 100644 --- a/libtextsecure/account_manager.js +++ b/libtextsecure/account_manager.js @@ -493,6 +493,7 @@ const regionCode = libphonenumber.util.getRegionCodeForNumber(number); await textsecure.storage.put('regionCode', regionCode); + await textsecure.storage.protocol.hydrateCaches(); }, async clearSessionsAndPreKeys() { const store = textsecure.storage.protocol; diff --git a/libtextsecure/libsignal-protocol.js b/libtextsecure/libsignal-protocol.js index 7290960dbc..b2415f4f84 100644 --- a/libtextsecure/libsignal-protocol.js +++ b/libtextsecure/libsignal-protocol.js @@ -35766,6 +35766,8 @@ Internal.SessionRecord = function() { return SessionRecord; }(); +libsignal.SessionRecord = Internal.SessionRecord; + function SignalProtocolAddress(name, deviceId) { this.name = name; this.deviceId = deviceId; @@ -35844,18 +35846,15 @@ SessionBuilder.prototype = { }); }.bind(this)).then(function(session) { var address = this.remoteAddress.toString(); - return this.storage.loadSession(address).then(function(serialized) { - var record; - if (serialized !== undefined) { - record = Internal.SessionRecord.deserialize(serialized); - } else { + return this.storage.loadSession(address).then(function(record) { + if (record === undefined) { record = new Internal.SessionRecord(); } record.archiveCurrentState(); record.updateSessionState(session); return Promise.all([ - this.storage.storeSession(address, record.serialize()), + this.storage.storeSession(address, record), this.storage.saveIdentity(this.remoteAddress.toString(), device.identityKey) ]); }.bind(this)); @@ -36039,12 +36038,7 @@ function SessionCipher(storage, remoteAddress) { SessionCipher.prototype = { getRecord: function(encodedNumber) { - return this.storage.loadSession(encodedNumber).then(function(serialized) { - if (serialized === undefined) { - return undefined; - } - return Internal.SessionRecord.deserialize(serialized); - }); + return this.storage.loadSession(encodedNumber); }, // encoding is an optional parameter - wrap() will only translate if one is provided encrypt: function(buffer, encoding) { @@ -36124,7 +36118,7 @@ SessionCipher.prototype = { return this.storage.saveIdentity(this.remoteAddress.toString(), theirIdentityKey); }.bind(this)).then(function() { record.updateSessionState(session); - return this.storage.storeSession(address, record.serialize()).then(function() { + return this.storage.storeSession(address, record).then(function() { return result; }); }.bind(this)); @@ -36211,7 +36205,7 @@ SessionCipher.prototype = { return this.storage.saveIdentity(this.remoteAddress.toString(), result.session.indexInfo.remoteIdentityKey); }.bind(this)).then(function() { record.updateSessionState(result.session); - return this.storage.storeSession(address, record.serialize()).then(function() { + return this.storage.storeSession(address, record).then(function() { return result.plaintext; }); }.bind(this)); @@ -36246,7 +36240,7 @@ SessionCipher.prototype = { preKeyProto.message.toArrayBuffer(), session ).then(function(plaintext) { record.updateSessionState(session); - return this.storage.storeSession(address, record.serialize()).then(function() { + return this.storage.storeSession(address, record).then(function() { if (preKeyId !== undefined && preKeyId !== null) { return this.storage.removePreKey(preKeyId); } @@ -36444,7 +36438,7 @@ SessionCipher.prototype = { } record.archiveCurrentState(); - return this.storage.storeSession(address, record.serialize()); + return this.storage.storeSession(address, record); }.bind(this)); }.bind(this)); }, @@ -36458,7 +36452,7 @@ SessionCipher.prototype = { } record.deleteAllSessions(); - return this.storage.storeSession(address, record.serialize()); + return this.storage.storeSession(address, record); }.bind(this)); }.bind(this)); } diff --git a/libtextsecure/message_receiver.js b/libtextsecure/message_receiver.js index 559259c73f..6d6284fae6 100644 --- a/libtextsecure/message_receiver.js +++ b/libtextsecure/message_receiver.js @@ -9,113 +9,11 @@ /* global _: false */ /* global ContactBuffer: false */ /* global GroupBuffer: false */ -/* global Worker: false */ /* eslint-disable more/no-then */ -const WORKER_TIMEOUT = 60 * 1000; // one minute const RETRY_TIMEOUT = 2 * 60 * 1000; -const _utilWorker = new Worker('js/util_worker.js'); -const _jobs = Object.create(null); -const _DEBUG = false; -let _jobCounter = 0; - -function _makeJob(fnName) { - _jobCounter += 1; - const id = _jobCounter; - - if (_DEBUG) { - window.log.info(`Worker job ${id} (${fnName}) started`); - } - _jobs[id] = { - fnName, - start: Date.now(), - }; - - return id; -} - -function _updateJob(id, data) { - const { resolve, reject } = data; - const { fnName, start } = _jobs[id]; - - _jobs[id] = { - ..._jobs[id], - ...data, - resolve: value => { - _removeJob(id); - const end = Date.now(); - window.log.info( - `Worker job ${id} (${fnName}) succeeded in ${end - start}ms` - ); - return resolve(value); - }, - reject: error => { - _removeJob(id); - const end = Date.now(); - window.log.info( - `Worker job ${id} (${fnName}) failed in ${end - start}ms` - ); - return reject(error); - }, - }; -} - -function _removeJob(id) { - if (_DEBUG) { - _jobs[id].complete = true; - } else { - delete _jobs[id]; - } -} - -function _getJob(id) { - return _jobs[id]; -} - -async function callWorker(fnName, ...args) { - const jobId = _makeJob(fnName); - - return new Promise((resolve, reject) => { - _utilWorker.postMessage([jobId, fnName, ...args]); - - _updateJob(jobId, { - resolve, - reject, - args: _DEBUG ? args : null, - }); - - setTimeout( - () => reject(new Error(`Worker job ${jobId} (${fnName}) timed out`)), - WORKER_TIMEOUT - ); - }); -} - -_utilWorker.onmessage = e => { - const [jobId, errorForDisplay, result] = e.data; - - const job = _getJob(jobId); - if (!job) { - throw new Error( - `Received worker reply to job ${jobId}, but did not have it in our registry!` - ); - } - - const { resolve, reject, fnName } = job; - - if (errorForDisplay) { - return reject( - new Error( - `Error received from worker job ${jobId} (${fnName}): ${errorForDisplay}` - ) - ); - } - - return resolve(result); -}; - function MessageReceiver(username, password, signalingKey, options = {}) { this.count = 0; @@ -135,24 +33,39 @@ function MessageReceiver(username, password, signalingKey, options = {}) { this.number = address.getName(); this.deviceId = address.getDeviceId(); - this.pendingQueue = new window.PQueue({ concurrency: 1 }); this.incomingQueue = new window.PQueue({ concurrency: 1 }); + this.pendingQueue = new window.PQueue({ concurrency: 1 }); this.appQueue = new window.PQueue({ concurrency: 1 }); + this.cacheAddBatcher = window.Signal.Util.createBatcher({ + wait: 200, + maxSize: 30, + processBatch: this.cacheAndQueueBatch.bind(this), + }); + this.cacheUpdateBatcher = window.Signal.Util.createBatcher({ + wait: 500, + maxSize: 30, + processBatch: this.cacheUpdateBatch.bind(this), + }); + this.cacheRemoveBatcher = window.Signal.Util.createBatcher({ + wait: 500, + maxSize: 30, + processBatch: this.cacheRemoveBatch.bind(this), + }); + if (options.retryCached) { this.pendingQueue.add(() => this.queueAllCached()); } } MessageReceiver.stringToArrayBuffer = string => - Promise.resolve(dcodeIO.ByteBuffer.wrap(string, 'binary').toArrayBuffer()); + dcodeIO.ByteBuffer.wrap(string, 'binary').toArrayBuffer(); MessageReceiver.arrayBufferToString = arrayBuffer => - Promise.resolve(dcodeIO.ByteBuffer.wrap(arrayBuffer).toString('binary')); - + dcodeIO.ByteBuffer.wrap(arrayBuffer).toString('binary'); MessageReceiver.stringToArrayBufferBase64 = string => - callWorker('stringToArrayBufferBase64', string); + dcodeIO.ByteBuffer.wrap(string, 'base64').toArrayBuffer(); MessageReceiver.arrayBufferToStringBase64 = arrayBuffer => - callWorker('arrayBufferToStringBase64', arrayBuffer); + dcodeIO.ByteBuffer.wrap(arrayBuffer).toString('base64'); MessageReceiver.prototype = new textsecure.EventTarget(); MessageReceiver.prototype.extend({ @@ -197,6 +110,12 @@ MessageReceiver.prototype.extend({ this.stoppingProcessing = true; return this.close(); }, + unregisterBatchers() { + window.log.info('MessageReceiver: unregister batchers'); + this.cacheAddBatcher.unregister(); + this.cacheUpdateBatcher.unregister(); + this.cacheRemoveBatcher.unregister(); + }, shutdown() { if (this.socket) { this.socket.onclose = null; @@ -308,20 +227,7 @@ MessageReceiver.prototype.extend({ ? envelope.serverTimestamp.toNumber() : null; - try { - await this.addToCache(envelope, plaintext); - request.respond(200, 'OK'); - this.queueEnvelope(envelope); - - this.clearRetryTimeout(); - this.maybeScheduleRetryTimeout(); - } catch (error) { - request.respond(500, 'Failed to cache message'); - window.log.error( - 'handleRequest error trying to add message to cache:', - error && error.stack ? error.stack : error - ); - } + this.cacheAndQueue(envelope, plaintext, request); } catch (e) { request.respond(500, 'Bad encrypted websocket message'); window.log.error( @@ -377,7 +283,12 @@ MessageReceiver.prototype.extend({ this.count = 0; }; - this.incomingQueue.add(waitForIncomingQueue); + const waitForCacheAddBatcher = async () => { + await this.cacheAddBatcher.onIdle(); + this.incomingQueue.add(waitForIncomingQueue); + }; + + waitForCacheAddBatcher(); }, drain() { const waitForIncomingQueue = () => @@ -408,13 +319,13 @@ MessageReceiver.prototype.extend({ let envelopePlaintext = item.envelope; if (item.version === 2) { - envelopePlaintext = await MessageReceiver.stringToArrayBufferBase64( + envelopePlaintext = MessageReceiver.stringToArrayBufferBase64( envelopePlaintext ); } if (typeof envelopePlaintext === 'string') { - envelopePlaintext = await MessageReceiver.stringToArrayBuffer( + envelopePlaintext = MessageReceiver.stringToArrayBuffer( envelopePlaintext ); } @@ -430,13 +341,13 @@ MessageReceiver.prototype.extend({ let payloadPlaintext = decrypted; if (item.version === 2) { - payloadPlaintext = await MessageReceiver.stringToArrayBufferBase64( + payloadPlaintext = MessageReceiver.stringToArrayBufferBase64( payloadPlaintext ); } if (typeof payloadPlaintext === 'string') { - payloadPlaintext = await MessageReceiver.stringToArrayBuffer( + payloadPlaintext = MessageReceiver.stringToArrayBuffer( payloadPlaintext ); } @@ -530,44 +441,61 @@ MessageReceiver.prototype.extend({ }) ); }, - async addToCache(envelope, plaintext) { + async cacheAndQueueBatch(items) { + const dataArray = items.map(item => item.data); + try { + await textsecure.storage.unprocessed.batchAdd(dataArray); + items.forEach(item => { + item.request.respond(200, 'OK'); + this.queueEnvelope(item.envelope); + }); + + this.clearRetryTimeout(); + this.maybeScheduleRetryTimeout(); + } catch (error) { + items.forEach(item => { + item.request.respond(500, 'Failed to cache message'); + }); + window.log.error( + 'cacheAndQueue error trying to add messages to cache:', + error && error.stack ? error.stack : error + ); + } + }, + cacheAndQueue(envelope, plaintext, request) { const { id } = envelope; const data = { id, version: 2, - envelope: await MessageReceiver.arrayBufferToStringBase64(plaintext), + envelope: MessageReceiver.arrayBufferToStringBase64(plaintext), timestamp: Date.now(), attempts: 1, }; - return textsecure.storage.unprocessed.add(data); + this.cacheAddBatcher.add({ + request, + envelope, + data, + }); }, - async updateCache(envelope, plaintext) { + async cacheUpdateBatch(items) { + await textsecure.storage.unprocessed.addDecryptedDataToList(items); + }, + updateCache(envelope, plaintext) { const { id } = envelope; - const item = await textsecure.storage.unprocessed.get(id); - if (!item) { - window.log.error( - `updateCache: Didn't find item ${id} in cache to update` - ); - return null; - } - - item.source = envelope.source; - item.sourceDevice = envelope.sourceDevice; - item.serverTimestamp = envelope.serverTimestamp; - - if (item.version === 2) { - item.decrypted = await MessageReceiver.arrayBufferToStringBase64( - plaintext - ); - } else { - item.decrypted = await MessageReceiver.arrayBufferToString(plaintext); - } - - return textsecure.storage.unprocessed.addDecryptedData(item.id, item); + const data = { + source: envelope.source, + sourceDevice: envelope.sourceDevice, + serverTimestamp: envelope.serverTimestamp, + decrypted: MessageReceiver.arrayBufferToStringBase64(plaintext), + }; + this.cacheUpdateBatcher.add({ id, data }); + }, + async cacheRemoveBatch(items) { + await textsecure.storage.unprocessed.remove(items); }, removeFromCache(envelope) { const { id } = envelope; - return textsecure.storage.unprocessed.remove(id); + this.cacheRemoveBatcher.add(id); }, queueDecryptedEnvelope(envelope, plaintext) { const id = this.getEnvelopeId(envelope); @@ -811,12 +739,7 @@ MessageReceiver.prototype.extend({ // Note: this is an out of band update; there are cases where the item in the // cache has already been deleted by the time this runs. That's okay. - this.updateCache(envelope, plaintext).catch(error => { - window.log.error( - 'decrypt failed to save decrypted message contents to cache:', - error && error.stack ? error.stack : error - ); - }); + this.updateCache(envelope, plaintext); return plaintext; }) @@ -1497,6 +1420,9 @@ textsecure.MessageReceiver = function MessageReceiverWrapper( messageReceiver ); this.stopProcessing = messageReceiver.stopProcessing.bind(messageReceiver); + this.unregisterBatchers = messageReceiver.unregisterBatchers.bind( + messageReceiver + ); messageReceiver.connect(); }; diff --git a/libtextsecure/sendmessage.js b/libtextsecure/sendmessage.js index 5716b7dede..7b28186839 100644 --- a/libtextsecure/sendmessage.js +++ b/libtextsecure/sendmessage.js @@ -676,7 +676,7 @@ MessageSender.prototype = { ); }, - sendDeliveryReceipt(recipientId, timestamp, options) { + sendDeliveryReceipt(recipientId, timestamps, options) { const myNumber = textsecure.storage.user.getNumber(); const myDevice = textsecure.storage.user.getDeviceId(); if (myNumber === recipientId && (myDevice === 1 || myDevice === '1')) { @@ -685,7 +685,7 @@ MessageSender.prototype = { const receiptMessage = new textsecure.protobuf.ReceiptMessage(); receiptMessage.type = textsecure.protobuf.ReceiptMessage.Type.DELIVERY; - receiptMessage.timestamp = [timestamp]; + receiptMessage.timestamp = timestamps; const contentMessage = new textsecure.protobuf.Content(); contentMessage.receiptMessage = receiptMessage; diff --git a/libtextsecure/storage/unprocessed.js b/libtextsecure/storage/unprocessed.js index ac113362f7..ff51f8e01c 100644 --- a/libtextsecure/storage/unprocessed.js +++ b/libtextsecure/storage/unprocessed.js @@ -21,6 +21,9 @@ add(data) { return textsecure.storage.protocol.addUnprocessed(data); }, + batchAdd(array) { + return textsecure.storage.protocol.addMultipleUnprocessed(array); + }, updateAttempts(id, attempts) { return textsecure.storage.protocol.updateUnprocessedAttempts( id, @@ -30,8 +33,11 @@ addDecryptedData(id, data) { return textsecure.storage.protocol.updateUnprocessedWithData(id, data); }, - remove(id) { - return textsecure.storage.protocol.removeUnprocessed(id); + addDecryptedDataToList(array) { + return textsecure.storage.protocol.updateUnprocessedsWithData(array); + }, + remove(idOrArray) { + return textsecure.storage.protocol.removeUnprocessed(idOrArray); }, removeAll() { return textsecure.storage.protocol.removeAllUnprocessed(); diff --git a/test/storage_test.js b/test/storage_test.js index 890b262776..40df0a6121 100644 --- a/test/storage_test.js +++ b/test/storage_test.js @@ -27,12 +27,14 @@ describe('SignalProtocolStore', () => { describe('getLocalRegistrationId', () => { it('retrieves my registration id', async () => { + await store.hydrateCaches(); const id = await store.getLocalRegistrationId(); assert.strictEqual(id, 1337); }); }); describe('getIdentityKeyPair', () => { it('retrieves my identity key', async () => { + await store.hydrateCaches(); const key = await store.getIdentityKeyPair(); assertEqualArrayBuffers(key.pubKey, identityKey.pubKey); assertEqualArrayBuffers(key.privKey, identityKey.privKey); diff --git a/ts/util/batcher.ts b/ts/util/batcher.ts new file mode 100644 index 0000000000..55c711d9a4 --- /dev/null +++ b/ts/util/batcher.ts @@ -0,0 +1,97 @@ +import PQueue from 'p-queue'; + +// @ts-ignore +window.batchers = []; + +// @ts-ignore +window.waitForAllBatchers = async () => { + // @ts-ignore + await Promise.all(window.batchers.map(item => item.onIdle())); +}; + +type BatcherOptionsType = { + wait: number; + maxSize: number; + processBatch: (items: Array) => Promise; +}; + +type BatcherType = { + add: (item: ItemType) => void; + anyPending: () => boolean; + onIdle: () => Promise; + unregister: () => void; +}; + +async function sleep(ms: number): Promise { + // tslint:disable-next-line:no-string-based-set-timeout + await new Promise(resolve => setTimeout(resolve, ms)); +} + +export function createBatcher( + options: BatcherOptionsType +): BatcherType { + let batcher: BatcherType; + let timeout: any; + let items: Array = []; + const queue = new PQueue({ concurrency: 1 }); + + function _kickBatchOff() { + const itemsRef = items; + items = []; + // tslint:disable-next-line:no-floating-promises + queue.add(async () => { + await options.processBatch(itemsRef); + }); + } + + function add(item: ItemType) { + items.push(item); + + if (timeout) { + clearTimeout(timeout); + timeout = null; + } + + if (items.length >= options.maxSize) { + _kickBatchOff(); + } else { + timeout = setTimeout(() => { + timeout = null; + _kickBatchOff(); + }, options.wait); + } + } + + function anyPending(): boolean { + return queue.size > 0 || queue.pending > 0 || items.length > 0; + } + + async function onIdle() { + while (anyPending()) { + if (queue.size > 0 || queue.pending > 0) { + await queue.onIdle(); + } + + if (items.length > 0) { + await sleep(options.wait * 2); + } + } + } + + function unregister() { + // @ts-ignore + window.batchers = window.batchers.filter((item: any) => item !== batcher); + } + + batcher = { + add, + anyPending, + onIdle, + unregister, + }; + + // @ts-ignore + window.batchers.push(batcher); + + return batcher; +} diff --git a/ts/util/index.ts b/ts/util/index.ts index ec0ca2e21a..210f15a7c3 100644 --- a/ts/util/index.ts +++ b/ts/util/index.ts @@ -1,5 +1,7 @@ import * as GoogleChrome from './GoogleChrome'; import { arrayBufferToObjectURL } from './arrayBufferToObjectURL'; +import { createBatcher } from './batcher'; +import { createWaitBatcher } from './waitBatcher'; import { isFileDangerous } from './isFileDangerous'; import { missingCaseError } from './missingCaseError'; import { migrateColor } from './migrateColor'; @@ -7,6 +9,8 @@ import { makeLookup } from './makeLookup'; export { arrayBufferToObjectURL, + createBatcher, + createWaitBatcher, GoogleChrome, isFileDangerous, makeLookup, diff --git a/ts/util/lint/exceptions.json b/ts/util/lint/exceptions.json index 417d4ae99e..df4d323ef9 100644 --- a/ts/util/lint/exceptions.json +++ b/ts/util/lint/exceptions.json @@ -172,7 +172,7 @@ "rule": "jQuery-load(", "path": "js/conversation_controller.js", "line": " this._initialPromise = load();", - "lineNumber": 219, + "lineNumber": 216, "reasonCategory": "falseMatch", "updated": "2019-07-31T00:19:18.696Z" }, @@ -301,26 +301,10 @@ "rule": "jQuery-load(", "path": "js/signal_protocol_store.js", "line": " await ConversationController.load();", - "lineNumber": 868, + "lineNumber": 908, "reasonCategory": "falseMatch", "updated": "2018-09-15T00:38:04.183Z" }, - { - "rule": "jQuery-wrap(", - "path": "js/util_worker_tasks.js", - "line": " return dcodeIO.ByteBuffer.wrap(string, 'base64').toArrayBuffer();", - "lineNumber": 40, - "reasonCategory": "falseMatch", - "updated": "2018-09-19T18:13:29.628Z" - }, - { - "rule": "jQuery-wrap(", - "path": "js/util_worker_tasks.js", - "line": " return dcodeIO.ByteBuffer.wrap(arrayBuffer).toString('base64');", - "lineNumber": 43, - "reasonCategory": "falseMatch", - "updated": "2018-09-19T18:13:29.628Z" - }, { "rule": "DOM-innerHTML", "path": "js/views/app_view.js", @@ -1227,34 +1211,50 @@ { "rule": "jQuery-wrap(", "path": "libtextsecure/message_receiver.js", - "line": " Promise.resolve(dcodeIO.ByteBuffer.wrap(string, 'binary').toArrayBuffer());", - "lineNumber": 148, + "line": " dcodeIO.ByteBuffer.wrap(string, 'binary').toArrayBuffer();", + "lineNumber": 62, "reasonCategory": "falseMatch", - "updated": "2018-09-19T18:13:29.628Z" + "updated": "2019-09-20T18:36:19.909Z" }, { "rule": "jQuery-wrap(", "path": "libtextsecure/message_receiver.js", - "line": " Promise.resolve(dcodeIO.ByteBuffer.wrap(arrayBuffer).toString('binary'));", - "lineNumber": 150, + "line": " dcodeIO.ByteBuffer.wrap(arrayBuffer).toString('binary');", + "lineNumber": 64, "reasonCategory": "falseMatch", - "updated": "2018-09-19T18:13:29.628Z" + "updated": "2019-09-20T18:36:19.909Z" + }, + { + "rule": "jQuery-wrap(", + "path": "libtextsecure/message_receiver.js", + "line": " dcodeIO.ByteBuffer.wrap(string, 'base64').toArrayBuffer();", + "lineNumber": 66, + "reasonCategory": "falseMatch", + "updated": "2019-09-20T18:36:19.909Z" + }, + { + "rule": "jQuery-wrap(", + "path": "libtextsecure/message_receiver.js", + "line": " dcodeIO.ByteBuffer.wrap(arrayBuffer).toString('base64');", + "lineNumber": 68, + "reasonCategory": "falseMatch", + "updated": "2019-09-20T18:36:19.909Z" }, { "rule": "jQuery-wrap(", "path": "libtextsecure/message_receiver.js", "line": " const buffer = dcodeIO.ByteBuffer.wrap(ciphertext);", - "lineNumber": 829, + "lineNumber": 752, "reasonCategory": "falseMatch", - "updated": "2018-09-19T18:13:29.628Z" + "updated": "2019-09-20T18:36:19.909Z" }, { "rule": "jQuery-wrap(", "path": "libtextsecure/message_receiver.js", "line": " const buffer = dcodeIO.ByteBuffer.wrap(ciphertext);", - "lineNumber": 854, + "lineNumber": 777, "reasonCategory": "falseMatch", - "updated": "2018-09-19T18:13:29.628Z" + "updated": "2019-09-20T18:36:19.909Z" }, { "rule": "jQuery-wrap(", diff --git a/ts/util/waitBatcher.ts b/ts/util/waitBatcher.ts new file mode 100644 index 0000000000..482e30af2e --- /dev/null +++ b/ts/util/waitBatcher.ts @@ -0,0 +1,141 @@ +import PQueue from 'p-queue'; + +// @ts-ignore +window.waitBatchers = []; + +// @ts-ignore +window.waitForAllWaitBatchers = async () => { + // @ts-ignore + await Promise.all(window.waitBatchers.map(item => item.onIdle())); +}; + +type ItemHolderType = { + resolve: (value?: any) => void; + reject: (error: Error) => void; + item: ItemType; +}; + +type ExplodedPromiseType = { + resolve: (value?: any) => void; + reject: (error: Error) => void; + promise: Promise; +}; + +type BatcherOptionsType = { + wait: number; + maxSize: number; + processBatch: (items: Array) => Promise; +}; + +type BatcherType = { + add: (item: ItemType) => Promise; + anyPending: () => boolean; + onIdle: () => Promise; + unregister: () => void; +}; + +async function sleep(ms: number): Promise { + // tslint:disable-next-line:no-string-based-set-timeout + await new Promise(resolve => setTimeout(resolve, ms)); +} + +export function createWaitBatcher( + options: BatcherOptionsType +): BatcherType { + let waitBatcher: BatcherType; + let timeout: any; + let items: Array> = []; + const queue = new PQueue({ concurrency: 1 }); + + function _kickBatchOff() { + const itemsRef = items; + items = []; + // tslint:disable-next-line:no-floating-promises + queue.add(async () => { + try { + await options.processBatch(itemsRef.map(item => item.item)); + itemsRef.forEach(item => { + item.resolve(); + }); + } catch (error) { + itemsRef.forEach(item => { + item.reject(error); + }); + } + }); + } + + function _makeExplodedPromise(): ExplodedPromiseType { + let resolve; + let reject; + + // tslint:disable-next-line:promise-must-complete + const promise = new Promise((resolveParam, rejectParam) => { + resolve = resolveParam; + reject = rejectParam; + }); + + // @ts-ignore + return { promise, resolve, reject }; + } + + async function add(item: ItemType) { + const { promise, resolve, reject } = _makeExplodedPromise(); + items.push({ + resolve, + reject, + item, + }); + + if (timeout) { + clearTimeout(timeout); + timeout = null; + } + + if (items.length >= options.maxSize) { + _kickBatchOff(); + } else { + timeout = setTimeout(() => { + timeout = null; + _kickBatchOff(); + }, options.wait); + } + + await promise; + } + + function anyPending(): boolean { + return queue.size > 0 || queue.pending > 0 || items.length > 0; + } + + async function onIdle() { + while (anyPending()) { + if (queue.size > 0 || queue.pending > 0) { + await queue.onIdle(); + } + + if (items.length > 0) { + await sleep(options.wait * 2); + } + } + } + + function unregister() { + // @ts-ignore + window.waitBatchers = window.waitBatchers.filter( + (item: any) => item !== waitBatcher + ); + } + + waitBatcher = { + add, + anyPending, + onIdle, + unregister, + }; + + // @ts-ignore + window.waitBatchers.push(waitBatcher); + + return waitBatcher; +}