Download attachments in separate queue from message processing

This commit is contained in:
Scott Nonnenberg 2019-01-30 12:15:07 -08:00
parent a43a78731a
commit 1d2c3ae23c
34 changed files with 2062 additions and 214 deletions

View file

@ -0,0 +1,410 @@
/* global Whisper, Signal, setTimeout, clearTimeout */
const { isFunction, isNumber, omit } = require('lodash');
const getGuid = require('uuid/v4');
const {
getMessageById,
getNextAttachmentDownloadJobs,
removeAttachmentDownloadJob,
resetAttachmentDownloadPending,
saveAttachmentDownloadJob,
saveMessage,
setAttachmentDownloadJobPending,
} = require('./data');
module.exports = {
start,
stop,
addJob,
};
const MAX_ATTACHMENT_JOB_PARALLELISM = 3;
const SECOND = 1000;
const MINUTE = 60 * SECOND;
const HOUR = 60 * MINUTE;
const TICK_INTERVAL = MINUTE;
const RETRY_BACKOFF = {
1: 30 * SECOND,
2: 30 * MINUTE,
3: 6 * HOUR,
};
let enabled = false;
let timeout;
let getMessageReceiver;
let logger;
const _activeAttachmentDownloadJobs = {};
const _messageCache = {};
async function start(options = {}) {
({ getMessageReceiver, logger } = options);
if (!isFunction(getMessageReceiver)) {
throw new Error(
'attachment_downloads/start: getMessageReceiver must be a function'
);
}
if (!logger) {
throw new Error('attachment_downloads/start: logger must be provided!');
}
enabled = true;
await resetAttachmentDownloadPending();
_tick();
}
async function stop() {
enabled = false;
if (timeout) {
clearTimeout(timeout);
timeout = null;
}
}
async function addJob(attachment, job = {}) {
if (!attachment) {
throw new Error('attachments_download/addJob: attachment is required');
}
const { messageId, type, index } = job;
if (!messageId) {
throw new Error('attachments_download/addJob: job.messageId is required');
}
if (!type) {
throw new Error('attachments_download/addJob: job.type is required');
}
if (!isNumber(index)) {
throw new Error('attachments_download/addJob: index must be a number');
}
const id = getGuid();
const timestamp = Date.now();
const toSave = {
...job,
id,
attachment,
timestamp,
pending: 0,
attempts: 0,
};
await saveAttachmentDownloadJob(toSave);
_maybeStartJob();
return {
...attachment,
pending: true,
downloadJobId: id,
};
}
async function _tick() {
_maybeStartJob();
timeout = setTimeout(_tick, TICK_INTERVAL);
}
async function _maybeStartJob() {
if (!enabled) {
return;
}
const jobCount = getActiveJobCount();
const limit = MAX_ATTACHMENT_JOB_PARALLELISM - jobCount;
if (limit <= 0) {
return;
}
const nextJobs = await getNextAttachmentDownloadJobs(limit);
if (nextJobs.length <= 0) {
return;
}
// To prevent the race condition caused by two parallel database calls, eached kicked
// off because the jobCount wasn't at the max.
const secondJobCount = getActiveJobCount();
const needed = MAX_ATTACHMENT_JOB_PARALLELISM - secondJobCount;
if (needed <= 0) {
return;
}
const jobs = nextJobs.slice(0, Math.min(needed, nextJobs.length));
for (let i = 0, max = jobs.length; i < max; i += 1) {
const job = jobs[i];
_activeAttachmentDownloadJobs[job.id] = _runJob(job);
}
}
async function _runJob(job) {
const { id, messageId, attachment, type, index, attempts } = job || {};
let message;
try {
if (!job || !attachment || !messageId) {
throw new Error(
`_runJob: Key information required for job was missing. Job id: ${id}`
);
}
message = await _getMessage(messageId);
if (!message) {
logger.error('_runJob: Source message not found, deleting job');
await _finishJob(message, id);
return;
}
const pending = true;
await setAttachmentDownloadJobPending(id, pending);
let downloaded;
const messageReceiver = getMessageReceiver();
if (!messageReceiver) {
throw new Error('_runJob: messageReceiver not found');
}
try {
downloaded = await messageReceiver.downloadAttachment(attachment);
} catch (error) {
// Attachments on the server expire after 30 days, then start returning 404
if (error && error.code === 404) {
logger.warn(
`_runJob: Got 404 from server, marking attachment ${
attachment.id
} from message ${message.idForLogging()} as permanent error`
);
await _finishJob(message, id);
await _addAttachmentToMessage(
message,
_markAttachmentAsError(attachment),
{ type, index }
);
return;
}
throw error;
}
const upgradedAttachment = await Signal.Migrations.processNewAttachment(
downloaded
);
await _addAttachmentToMessage(message, upgradedAttachment, { type, index });
await _finishJob(message, id);
} catch (error) {
const currentAttempt = (attempts || 0) + 1;
if (currentAttempt >= 3) {
logger.error(
`_runJob: ${currentAttempt} failed attempts, marking attachment ${id} from message ${message.idForLogging()} as permament error:`,
error && error.stack ? error.stack : error
);
await _finishJob(message, id);
await _addAttachmentToMessage(
message,
_markAttachmentAsError(attachment),
{ type, index }
);
return;
}
logger.error(
`_runJob: Failed to download attachment type ${type} for message ${message.idForLogging()}, attempt ${currentAttempt}:`,
error && error.stack ? error.stack : error
);
const failedJob = {
...job,
pending: 0,
attempts: currentAttempt,
timestamp: Date.now() + RETRY_BACKOFF[currentAttempt],
};
await saveAttachmentDownloadJob(failedJob);
delete _activeAttachmentDownloadJobs[id];
_maybeStartJob();
}
}
async function _getMessage(id) {
let item = _messageCache[id];
if (item) {
const fiveMinutesAgo = Date.now() - 5 * MINUTE;
if (item.timestamp >= fiveMinutesAgo) {
return item.message;
}
delete _messageCache[id];
}
let message = await getMessageById(id, {
Message: Whisper.Message,
});
if (!message) {
return message;
}
// Once more, checking for race conditions
item = _messageCache[id];
if (item) {
const fiveMinutesAgo = Date.now() - 5 * MINUTE;
if (item.timestamp >= fiveMinutesAgo) {
return item.message;
}
}
const conversation = message.getConversation();
if (conversation && conversation.messageCollection.get(id)) {
message = conversation.get(id);
}
_messageCache[id] = {
timestamp: Date.now(),
message,
};
return message;
}
async function _finishJob(message, id) {
if (message) {
await saveMessage(message.attributes, {
Message: Whisper.Message,
});
const conversation = message.getConversation();
if (conversation) {
const fromConversation = conversation.messageCollection.get(message.id);
if (fromConversation && message !== fromConversation) {
fromConversation.set(message.attributes);
fromConversation.trigger('change');
} else {
message.trigger('change');
}
}
}
await removeAttachmentDownloadJob(id);
delete _activeAttachmentDownloadJobs[id];
_maybeStartJob();
}
function getActiveJobCount() {
return Object.keys(_activeAttachmentDownloadJobs).length;
}
function _markAttachmentAsError(attachment) {
return {
...omit(attachment, ['key', 'digest', 'id']),
error: true,
};
}
async function _addAttachmentToMessage(message, attachment, { type, index }) {
if (!message) {
return;
}
if (type === 'attachment') {
const attachments = message.get('attachments');
if (!attachments || attachments.length <= index) {
throw new Error(
`_addAttachmentToMessage: attachments didn't exist or ${index} was too large`
);
}
_replaceAttachment(attachments, index, attachment);
return;
}
if (type === 'preview') {
const preview = message.get('preview');
if (!preview || preview.length <= index) {
throw new Error(
`_addAttachmentToMessage: preview didn't exist or ${index} was too large`
);
}
const item = preview[index];
if (!item) {
throw new Error(`_addAttachmentToMessage: preview ${index} was falsey`);
}
_replaceAttachment(item, 'image', attachment);
return;
}
if (type === 'contact') {
const contact = message.get('contact');
if (!contact || contact.length <= index) {
throw new Error(
`_addAttachmentToMessage: contact didn't exist or ${index} was too large`
);
}
const item = contact[index];
if (item && item.avatar && item.avatar.avatar) {
_replaceAttachment(item.avatar, 'avatar', attachment);
} else {
logger.warn(
`_addAttachmentToMessage: Couldn't update contact with avatar attachment for message ${message.idForLogging()}`
);
}
return;
}
if (type === 'quote') {
const quote = message.get('quote');
if (!quote) {
throw new Error("_addAttachmentToMessage: quote didn't exist");
}
const { attachments } = quote;
if (!attachments || attachments.length <= index) {
throw new Error(
`_addAttachmentToMessage: quote attachments didn't exist or ${index} was too large`
);
}
const item = attachments[index];
if (!item) {
throw new Error(
`_addAttachmentToMessage: attachment ${index} was falsey`
);
}
_replaceAttachment(item, 'thumbnail', attachment);
return;
}
if (type === 'group-avatar') {
const group = message.get('group');
if (!group) {
throw new Error("_addAttachmentToMessage: group didn't exist");
}
const existingAvatar = group.avatar;
if (existingAvatar && existingAvatar.path) {
await Signal.Migrations.deleteAttachmentData(existingAvatar.path);
}
_replaceAttachment(group, 'avatar', attachment);
return;
}
throw new Error(
`_addAttachmentToMessage: Unknown job type ${type} for message ${message.idForLogging()}`
);
}
function _replaceAttachment(object, key, newAttachment) {
const oldAttachment = object[key];
if (oldAttachment && oldAttachment.path) {
logger.warn(
'_replaceAttachment: Old attachment already had path, not replacing'
);
}
// eslint-disable-next-line no-param-reassign
object[key] = newAttachment;
}

View file

@ -131,6 +131,13 @@ module.exports = {
removeUnprocessed,
removeAllUnprocessed,
getNextAttachmentDownloadJobs,
saveAttachmentDownloadJob,
resetAttachmentDownloadPending,
setAttachmentDownloadJobPending,
removeAttachmentDownloadJob,
removeAllAttachmentDownloadJobs,
removeAll,
removeAllConfiguration,
@ -854,6 +861,27 @@ async function removeAllUnprocessed() {
await channels.removeAllUnprocessed();
}
// Attachment downloads
async function getNextAttachmentDownloadJobs(limit) {
return channels.getNextAttachmentDownloadJobs(limit);
}
async function saveAttachmentDownloadJob(job) {
await channels.saveAttachmentDownloadJob(job);
}
async function setAttachmentDownloadJobPending(id, pending) {
await channels.setAttachmentDownloadJobPending(id, pending);
}
async function resetAttachmentDownloadPending() {
await channels.resetAttachmentDownloadPending();
}
async function removeAttachmentDownloadJob(id) {
await channels.removeAttachmentDownloadJob(id);
}
async function removeAllAttachmentDownloadJobs() {
await channels.removeAllAttachmentDownloadJobs();
}
// Other
async function removeAll() {

View file

@ -14,6 +14,7 @@ const { migrateToSQL } = require('./migrate_to_sql');
const Metadata = require('./metadata/SecretSessionCipher');
const RefreshSenderCertificate = require('./refresh_sender_certificate');
const LinkPreviews = require('./link_previews');
const AttachmentDownloads = require('./attachment_downloads');
// Components
const {
@ -128,6 +129,7 @@ function initializeMigrations({
const loadQuoteData = MessageType.loadQuoteData(loadAttachmentData);
const getAbsoluteAttachmentPath = createAbsolutePathGetter(attachmentsPath);
const deleteOnDisk = Attachments.createDeleter(attachmentsPath);
const writeNewAttachmentData = createWriterForNew(attachmentsPath);
return {
attachmentsPath,
@ -145,11 +147,22 @@ function initializeMigrations({
loadQuoteData,
readAttachmentData,
run,
processNewAttachment: attachment =>
MessageType.processNewAttachment(attachment, {
writeNewAttachmentData,
getAbsoluteAttachmentPath,
makeObjectUrl,
revokeObjectUrl,
getImageDimensions,
makeImageThumbnail,
makeVideoScreenshot,
logger,
}),
upgradeMessageSchema: (message, options = {}) => {
const { maxVersion } = options;
return MessageType.upgradeSchema(message, {
writeNewAttachmentData: createWriterForNew(attachmentsPath),
writeNewAttachmentData,
getRegionCode,
getAbsoluteAttachmentPath,
makeObjectUrl,
@ -233,6 +246,7 @@ exports.setup = (options = {}) => {
};
return {
AttachmentDownloads,
Backbone,
Components,
Crypto,

View file

@ -56,6 +56,11 @@ exports.autoOrientJPEG = async attachment => {
return attachment;
}
// If we haven't downloaded the attachment yet, we won't have the data
if (!attachment.data) {
return attachment;
}
const dataBlob = await arrayBufferToBlob(
attachment.data,
attachment.contentType
@ -234,6 +239,11 @@ exports.captureDimensionsAndScreenshot = async (
return attachment;
}
// If the attachment hasn't been downloaded yet, we won't have a path
if (!attachment.path) {
return attachment;
}
const absolutePath = await getAbsoluteAttachmentPath(attachment.path);
if (GoogleChrome.isImageTypeSupported(contentType)) {

View file

@ -9,7 +9,7 @@ const { isArrayBuffer, isFunction, isUndefined, omit } = require('lodash');
// Promise Attachment
exports.migrateDataToFileSystem = async (
attachment,
{ writeNewAttachmentData, logger } = {}
{ writeNewAttachmentData } = {}
) => {
if (!isFunction(writeNewAttachmentData)) {
throw new TypeError("'writeNewAttachmentData' must be a function");
@ -19,7 +19,6 @@ exports.migrateDataToFileSystem = async (
const hasData = !isUndefined(data);
const shouldSkipSchemaUpgrade = !hasData;
if (shouldSkipSchemaUpgrade) {
logger.warn('WARNING: `attachment.data` is `undefined`');
return attachment;
}

View file

@ -134,8 +134,7 @@ exports._withSchemaVersion = ({ schemaVersion, upgrade }) => {
logger.warn(
'WARNING: Message._withSchemaVersion: Unexpected version:',
`Expected message to have version ${expectedVersion},`,
`but got ${message.schemaVersion}.`,
message
`but got ${message.schemaVersion}.`
);
return message;
}
@ -203,7 +202,6 @@ exports._mapQuotedAttachments = upgradeAttachment => async (
if (!context || !isObject(context.logger)) {
throw new Error('_mapQuotedAttachments: context must have logger object');
}
const { logger } = context;
const upgradeWithContext = async attachment => {
const { thumbnail } = attachment;
@ -211,11 +209,6 @@ exports._mapQuotedAttachments = upgradeAttachment => async (
return attachment;
}
if (!thumbnail.data && !thumbnail.path) {
logger.warn('Quoted attachment did not have thumbnail data; removing it');
return omit(attachment, ['thumbnail']);
}
const upgradedThumbnail = await upgradeAttachment(thumbnail, context);
return Object.assign({}, attachment, {
thumbnail: upgradedThumbnail,
@ -247,7 +240,6 @@ exports._mapPreviewAttachments = upgradeAttachment => async (
if (!context || !isObject(context.logger)) {
throw new Error('_mapPreviewAttachments: context must have logger object');
}
const { logger } = context;
const upgradeWithContext = async preview => {
const { image } = preview;
@ -255,11 +247,6 @@ exports._mapPreviewAttachments = upgradeAttachment => async (
return preview;
}
if (!image.data && !image.path) {
logger.warn('Preview did not have image data; removing it');
return omit(preview, ['image']);
}
const upgradedImage = await upgradeAttachment(image, context);
return Object.assign({}, preview, {
image: upgradedImage,
@ -413,6 +400,68 @@ exports.upgradeSchema = async (
return message;
};
// Runs on attachments outside of the schema upgrade process, since attachments are
// downloaded out of band.
exports.processNewAttachment = async (
attachment,
{
writeNewAttachmentData,
getAbsoluteAttachmentPath,
makeObjectUrl,
revokeObjectUrl,
getImageDimensions,
makeImageThumbnail,
makeVideoScreenshot,
logger,
} = {}
) => {
if (!isFunction(writeNewAttachmentData)) {
throw new TypeError('context.writeNewAttachmentData is required');
}
if (!isFunction(getAbsoluteAttachmentPath)) {
throw new TypeError('context.getAbsoluteAttachmentPath is required');
}
if (!isFunction(makeObjectUrl)) {
throw new TypeError('context.makeObjectUrl is required');
}
if (!isFunction(revokeObjectUrl)) {
throw new TypeError('context.revokeObjectUrl is required');
}
if (!isFunction(getImageDimensions)) {
throw new TypeError('context.getImageDimensions is required');
}
if (!isFunction(makeImageThumbnail)) {
throw new TypeError('context.makeImageThumbnail is required');
}
if (!isFunction(makeVideoScreenshot)) {
throw new TypeError('context.makeVideoScreenshot is required');
}
if (!isObject(logger)) {
throw new TypeError('context.logger is required');
}
const rotatedAttachment = await Attachment.autoOrientJPEG(attachment);
const onDiskAttachment = await Attachment.migrateDataToFileSystem(
rotatedAttachment,
{ writeNewAttachmentData }
);
const finalAttachment = await Attachment.captureDimensionsAndScreenshot(
onDiskAttachment,
{
writeNewAttachmentData,
getAbsoluteAttachmentPath,
makeObjectUrl,
revokeObjectUrl,
getImageDimensions,
makeImageThumbnail,
makeVideoScreenshot,
logger,
}
);
return finalAttachment;
};
exports.createAttachmentLoader = loadAttachmentData => {
if (!isFunction(loadAttachmentData)) {
throw new TypeError(
@ -508,7 +557,10 @@ exports.deleteAllExternalFiles = ({ deleteAttachmentData, deleteOnDisk }) => {
quote.attachments.map(async attachment => {
const { thumbnail } = attachment;
if (thumbnail && thumbnail.path) {
// To prevent spoofing, we copy the original image from the quoted message.
// If so, it will have a 'copied' field. We don't want to delete it if it has
// that field set to true.
if (thumbnail && thumbnail.path && !thumbnail.copied) {
await deleteOnDisk(thumbnail.path);
}
})