2020-04-28 21:18:41 +00:00
|
|
|
// Ensures that messages in database are at the right schema.
|
2018-03-27 16:19:40 +00:00
|
|
|
|
2020-04-28 21:18:41 +00:00
|
|
|
/* global window */
|
2018-03-28 14:54:01 +00:00
|
|
|
|
2020-04-28 21:18:41 +00:00
|
|
|
const { isFunction, isNumber } = require('lodash');
|
2018-03-26 20:33:23 +00:00
|
|
|
|
2018-03-21 23:37:39 +00:00
|
|
|
const Message = require('./types/message');
|
|
|
|
|
2018-03-26 20:32:22 +00:00
|
|
|
exports.processNext = async ({
|
2018-03-21 23:37:39 +00:00
|
|
|
BackboneMessage,
|
|
|
|
BackboneMessageCollection,
|
2018-04-03 18:43:17 +00:00
|
|
|
numMessagesPerBatch,
|
2018-03-21 23:37:39 +00:00
|
|
|
upgradeMessageSchema,
|
2018-07-25 22:02:27 +00:00
|
|
|
getMessagesNeedingUpgrade,
|
|
|
|
saveMessage,
|
2018-07-27 02:19:34 +00:00
|
|
|
maxVersion = Message.CURRENT_SCHEMA_VERSION,
|
2018-03-21 23:37:39 +00:00
|
|
|
} = {}) => {
|
|
|
|
if (!isFunction(BackboneMessage)) {
|
2018-04-27 21:25:04 +00:00
|
|
|
throw new TypeError(
|
|
|
|
"'BackboneMessage' (Whisper.Message) constructor is required"
|
|
|
|
);
|
2018-03-21 23:37:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (!isFunction(BackboneMessageCollection)) {
|
2018-04-27 21:25:04 +00:00
|
|
|
throw new TypeError(
|
|
|
|
"'BackboneMessageCollection' (Whisper.MessageCollection)" +
|
|
|
|
' constructor is required'
|
|
|
|
);
|
2018-03-21 23:37:39 +00:00
|
|
|
}
|
|
|
|
|
2018-04-03 18:43:17 +00:00
|
|
|
if (!isNumber(numMessagesPerBatch)) {
|
2018-04-11 19:44:52 +00:00
|
|
|
throw new TypeError("'numMessagesPerBatch' is required");
|
2018-03-21 23:37:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (!isFunction(upgradeMessageSchema)) {
|
2018-04-11 19:44:52 +00:00
|
|
|
throw new TypeError("'upgradeMessageSchema' is required");
|
2018-03-21 23:37:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
const startTime = Date.now();
|
|
|
|
|
2018-03-27 16:19:55 +00:00
|
|
|
const fetchStartTime = Date.now();
|
2018-09-21 01:47:19 +00:00
|
|
|
let messagesRequiringSchemaUpgrade;
|
|
|
|
try {
|
|
|
|
messagesRequiringSchemaUpgrade = await getMessagesNeedingUpgrade(
|
|
|
|
numMessagesPerBatch,
|
|
|
|
{
|
|
|
|
maxVersion,
|
|
|
|
MessageCollection: BackboneMessageCollection,
|
|
|
|
}
|
|
|
|
);
|
|
|
|
} catch (error) {
|
|
|
|
window.log.error(
|
|
|
|
'processNext error:',
|
|
|
|
error && error.stack ? error.stack : error
|
|
|
|
);
|
|
|
|
return {
|
|
|
|
done: true,
|
|
|
|
numProcessed: 0,
|
|
|
|
};
|
|
|
|
}
|
2018-03-27 16:19:55 +00:00
|
|
|
const fetchDuration = Date.now() - fetchStartTime;
|
2018-03-21 23:37:39 +00:00
|
|
|
|
2018-03-27 16:19:55 +00:00
|
|
|
const upgradeStartTime = Date.now();
|
2018-04-27 21:25:04 +00:00
|
|
|
const upgradedMessages = await Promise.all(
|
2018-07-27 02:19:34 +00:00
|
|
|
messagesRequiringSchemaUpgrade.map(message =>
|
|
|
|
upgradeMessageSchema(message, { maxVersion })
|
|
|
|
)
|
2018-04-27 21:25:04 +00:00
|
|
|
);
|
2018-03-27 16:19:55 +00:00
|
|
|
const upgradeDuration = Date.now() - upgradeStartTime;
|
2018-03-21 23:37:39 +00:00
|
|
|
|
2018-03-27 16:19:55 +00:00
|
|
|
const saveStartTime = Date.now();
|
2018-07-25 22:02:27 +00:00
|
|
|
await Promise.all(
|
|
|
|
upgradedMessages.map(message =>
|
|
|
|
saveMessage(message, { Message: BackboneMessage })
|
|
|
|
)
|
|
|
|
);
|
2018-03-27 16:19:55 +00:00
|
|
|
const saveDuration = Date.now() - saveStartTime;
|
2018-03-21 23:37:39 +00:00
|
|
|
|
|
|
|
const totalDuration = Date.now() - startTime;
|
|
|
|
const numProcessed = messagesRequiringSchemaUpgrade.length;
|
2018-04-03 18:43:17 +00:00
|
|
|
const done = numProcessed < numMessagesPerBatch;
|
2018-03-21 23:37:39 +00:00
|
|
|
return {
|
2018-03-30 21:20:50 +00:00
|
|
|
done,
|
2018-03-21 23:37:39 +00:00
|
|
|
numProcessed,
|
|
|
|
fetchDuration,
|
|
|
|
upgradeDuration,
|
|
|
|
saveDuration,
|
|
|
|
totalDuration,
|
|
|
|
};
|
|
|
|
};
|