SQL channel: Only serialize data access functions which need it

This commit is contained in:
Scott Nonnenberg 2019-09-03 13:10:32 -07:00 committed by Ken Powers
parent 8fe73fa884
commit 85e01ad037
2 changed files with 66 additions and 5 deletions

View file

@ -1616,6 +1616,7 @@ async function saveConversations(arrayOfConversations) {
throw error;
}
}
saveConversations.needsSerial = true;
async function updateConversation(data) {
// eslint-disable-next-line camelcase
@ -1943,6 +1944,7 @@ async function saveMessages(arrayOfMessages, { forceSave } = {}) {
throw error;
}
}
saveMessages.needsSerial = true;
async function removeMessage(id) {
if (!Array.isArray(id)) {
@ -2144,6 +2146,7 @@ async function getMessageMetricsForConversation(conversationId) {
totalUnread,
};
}
getMessageMetricsForConversation.needsSerial = true;
async function getMessagesBySentAt(sentAt) {
const rows = await db.all(
@ -2304,6 +2307,7 @@ async function saveUnprocesseds(arrayOfUnprocessed, { forceSave } = {}) {
throw error;
}
}
saveUnprocesseds.needsSerial = true;
async function updateUnprocessedAttempts(id, attempts) {
await db.run('UPDATE unprocessed SET attempts = $attempts WHERE id = $id;', {
@ -2745,6 +2749,8 @@ async function deleteStickerPackReference(messageId, packId) {
throw error;
}
}
deleteStickerPackReference.needsSerial = true;
async function deleteStickerPack(packId) {
if (!packId) {
throw new Error(
@ -2783,6 +2789,8 @@ async function deleteStickerPack(packId) {
throw error;
}
}
deleteStickerPack.needsSerial = true;
async function getStickerCount() {
const row = await db.get('SELECT count(*) from stickers;');
@ -2854,6 +2862,7 @@ async function updateEmojiUsage(shortName, timeUsed = Date.now()) {
throw error;
}
}
updateEmojiUsage.needsSerial = true;
async function getRecentEmojis(limit = 32) {
const rows = await db.all(
@ -2893,6 +2902,7 @@ async function removeAll() {
throw error;
}
}
removeAll.needsSerial = true;
// Anything that isn't user-visible data
async function removeAllConfiguration() {
@ -2914,6 +2924,7 @@ async function removeAllConfiguration() {
throw error;
}
}
removeAllConfiguration.needsSerial = true;
async function getMessagesNeedingUpgrade(limit, { maxVersion }) {
const rows = await db.all(

View file

@ -15,7 +15,17 @@ let initialized = false;
const SQL_CHANNEL_KEY = 'sql-channel';
const ERASE_SQL_KEY = 'erase-sql-key';
const queue = new Queue({ concurrency: 1 });
let singleQueue = null;
let multipleQueue = null;
function makeNewSingleQueue() {
singleQueue = new Queue({ concurrency: 1 });
return singleQueue;
}
function makeNewMultipleQueue() {
multipleQueue = new Queue({ concurrency: 10 });
return multipleQueue;
}
function initialize() {
if (initialized) {
@ -32,10 +42,50 @@ function initialize() {
);
}
// Note: we queue here to keep multi-query operations atomic. Without it, any
// multistage data operation (even within a BEGIN/COMMIT) can become interleaved,
// since all requests share one database connection.
const result = await queue.add(() => fn(...args));
let result;
// We queue here to keep multi-query operations atomic. Without it, any multistage
// data operation (even within a BEGIN/COMMIT) can become interleaved, since all
// requests share one database connection.
// A needsSerial method must be run in our single concurrency queue.
if (fn.needsSerial) {
if (singleQueue) {
result = await singleQueue.add(() => fn(...args));
} else if (multipleQueue) {
makeNewSingleQueue();
singleQueue.add(() => multipleQueue.onIdle());
multipleQueue = null;
result = await singleQueue.add(() => fn(...args));
} else {
makeNewSingleQueue();
result = await singleQueue.add(() => fn(...args));
}
} else {
// The request can be parallelized. To keep the same structure as the above block
// we force this section into the 'lonely if' pattern.
// eslint-disable-next-line no-lonely-if
if (multipleQueue) {
result = await multipleQueue.add(() => fn(...args));
} else if (singleQueue) {
makeNewMultipleQueue();
multipleQueue.pause();
const singleQueueRef = singleQueue;
singleQueue = null;
const promise = multipleQueue.add(() => fn(...args));
await singleQueueRef.onIdle();
multipleQueue.start();
result = await promise;
} else {
makeNewMultipleQueue();
result = await multipleQueue.add(() => fn(...args));
}
}
event.sender.send(`${SQL_CHANNEL_KEY}-done`, jobId, null, result);
} catch (error) {
const errorForDisplay = error && error.stack ? error.stack : error;