Better handle large numbers of messages in cache on startup

This commit is contained in:
Scott Nonnenberg 2018-09-28 15:51:26 -07:00
parent 102c2717cb
commit 2cfbfe477a
5 changed files with 56 additions and 21 deletions

View file

@ -42,6 +42,7 @@ module.exports = {
getNextExpiringMessage, getNextExpiringMessage,
getMessagesByConversation, getMessagesByConversation,
getUnprocessedCount,
getAllUnprocessed, getAllUnprocessed,
saveUnprocessed, saveUnprocessed,
getUnprocessedById, getUnprocessedById,
@ -962,6 +963,16 @@ async function getUnprocessedById(id) {
return jsonToObject(row.json); return jsonToObject(row.json);
} }
async function getUnprocessedCount() {
const row = await db.get('SELECT count(*) from unprocessed;');
if (!row) {
throw new Error('getMessageCount: Unable to get count of unprocessed');
}
return row['count(*)'];
}
async function getAllUnprocessed() { async function getAllUnprocessed() {
const rows = await db.all( const rows = await db.all(
'SELECT json FROM unprocessed ORDER BY timestamp ASC;' 'SELECT json FROM unprocessed ORDER BY timestamp ASC;'

View file

@ -72,6 +72,7 @@ module.exports = {
getNextExpiringMessage, getNextExpiringMessage,
getMessagesByConversation, getMessagesByConversation,
getUnprocessedCount,
getAllUnprocessed, getAllUnprocessed,
getUnprocessedById, getUnprocessedById,
saveUnprocessed, saveUnprocessed,
@ -458,6 +459,10 @@ async function getNextExpiringMessage({ MessageCollection }) {
return new MessageCollection(messages); return new MessageCollection(messages);
} }
async function getUnprocessedCount() {
return channels.getUnprocessedCount();
}
async function getAllUnprocessed() { async function getAllUnprocessed() {
return channels.getAllUnprocessed(); return channels.getAllUnprocessed();
} }

View file

@ -939,6 +939,9 @@
}, },
// Not yet processed messages - for resiliency // Not yet processed messages - for resiliency
getUnprocessedCount() {
return window.Signal.Data.getUnprocessedCount();
},
getAllUnprocessed() { getAllUnprocessed() {
return window.Signal.Data.getAllUnprocessed(); return window.Signal.Data.getAllUnprocessed();
}, },
@ -959,6 +962,9 @@
removeUnprocessed(id) { removeUnprocessed(id) {
return window.Signal.Data.removeUnprocessed(id, { Unprocessed }); return window.Signal.Data.removeUnprocessed(id, { Unprocessed });
}, },
removeAllUnprocessed() {
return window.Signal.Data.removeAllUnprocessed();
},
async removeAllData() { async removeAllData() {
// First the in-memory caches: // First the in-memory caches:
window.storage.reset(); // items store window.storage.reset(); // items store

View file

@ -441,38 +441,45 @@ MessageReceiver.prototype.extend({
envelope.sourceDevice envelope.sourceDevice
} ${envelope.timestamp.toNumber()}`; } ${envelope.timestamp.toNumber()}`;
}, },
getAllFromCache() { async getAllFromCache() {
window.log.info('getAllFromCache'); window.log.info('getAllFromCache');
return textsecure.storage.unprocessed.getAll().then(items => { const count = await textsecure.storage.unprocessed.getCount();
window.log.info(
'getAllFromCache loaded',
items.length,
'saved envelopes'
);
return Promise.all( if (count > 250) {
_.map(items, item => { await textsecure.storage.unprocessed.removeAll();
const attempts = 1 + (item.attempts || 0); window.log.warn(
if (attempts >= 5) { `There were ${count} messages in cache. Deleted all instead of reprocessing`
);
return [];
}
const items = await textsecure.storage.unprocessed.getAll();
window.log.info('getAllFromCache loaded', items.length, 'saved envelopes');
return Promise.all(
_.map(items, async item => {
const attempts = 1 + (item.attempts || 0);
try {
if (attempts >= 3) {
window.log.warn( window.log.warn(
'getAllFromCache final attempt for envelope', 'getAllFromCache final attempt for envelope',
item.id item.id
); );
return textsecure.storage.unprocessed.remove(item.id); await textsecure.storage.unprocessed.remove(item.id);
} else {
await textsecure.storage.unprocessed.save({ ...item, attempts });
} }
return textsecure.storage.unprocessed.save({ ...item, attempts }); } catch (error) {
})
).then(
() => items,
error => {
window.log.error( window.log.error(
'getAllFromCache error updating items after load:', 'getAllFromCache error updating item after load:',
error && error.stack ? error.stack : error error && error.stack ? error.stack : error
); );
return items;
} }
);
}); return item;
})
);
}, },
async addToCache(envelope, plaintext) { async addToCache(envelope, plaintext) {
const id = this.getEnvelopeId(envelope); const id = this.getEnvelopeId(envelope);

View file

@ -9,6 +9,9 @@
window.textsecure.storage = window.textsecure.storage || {}; window.textsecure.storage = window.textsecure.storage || {};
window.textsecure.storage.unprocessed = { window.textsecure.storage.unprocessed = {
getCount() {
return textsecure.storage.protocol.getUnprocessedCount();
},
getAll() { getAll() {
return textsecure.storage.protocol.getAllUnprocessed(); return textsecure.storage.protocol.getAllUnprocessed();
}, },
@ -24,5 +27,8 @@
remove(id) { remove(id) {
return textsecure.storage.protocol.removeUnprocessed(id); return textsecure.storage.protocol.removeUnprocessed(id);
}, },
removeAll() {
return textsecure.storage.protocol.removeAllUnprocessed();
},
}; };
})(); })();