/* global Whisper, Signal, setTimeout, clearTimeout */ const { isFunction, isNumber, omit } = require('lodash'); const getGuid = require('uuid/v4'); const { getMessageById, getNextAttachmentDownloadJobs, removeAttachmentDownloadJob, resetAttachmentDownloadPending, saveAttachmentDownloadJob, saveMessage, setAttachmentDownloadJobPending, } = require('./data'); const { stringFromBytes } = require('./crypto'); module.exports = { start, stop, addJob, }; const MAX_ATTACHMENT_JOB_PARALLELISM = 3; const SECOND = 1000; const MINUTE = 60 * SECOND; const HOUR = 60 * MINUTE; const TICK_INTERVAL = MINUTE; const RETRY_BACKOFF = { 1: 30 * SECOND, 2: 30 * MINUTE, 3: 6 * HOUR, }; let enabled = false; let timeout; let getMessageReceiver; let logger; const _activeAttachmentDownloadJobs = {}; const _messageCache = {}; async function start(options = {}) { ({ getMessageReceiver, logger } = options); if (!isFunction(getMessageReceiver)) { throw new Error( 'attachment_downloads/start: getMessageReceiver must be a function' ); } if (!logger) { throw new Error('attachment_downloads/start: logger must be provided!'); } enabled = true; await resetAttachmentDownloadPending(); _tick(); } async function stop() { enabled = false; if (timeout) { clearTimeout(timeout); timeout = null; } } async function addJob(attachment, job = {}) { if (!attachment) { throw new Error('attachments_download/addJob: attachment is required'); } const { messageId, type, index } = job; if (!messageId) { throw new Error('attachments_download/addJob: job.messageId is required'); } if (!type) { throw new Error('attachments_download/addJob: job.type is required'); } if (!isNumber(index)) { throw new Error('attachments_download/addJob: index must be a number'); } const id = getGuid(); const timestamp = Date.now(); const toSave = { ...job, id, attachment, timestamp, pending: 0, attempts: 0, }; await saveAttachmentDownloadJob(toSave); _maybeStartJob(); return { ...attachment, pending: true, downloadJobId: id, }; } async function _tick() { _maybeStartJob(); timeout = setTimeout(_tick, TICK_INTERVAL); } async function _maybeStartJob() { if (!enabled) { return; } const jobCount = getActiveJobCount(); const limit = MAX_ATTACHMENT_JOB_PARALLELISM - jobCount; if (limit <= 0) { return; } const nextJobs = await getNextAttachmentDownloadJobs(limit); if (nextJobs.length <= 0) { return; } // To prevent the race condition caused by two parallel database calls, eached kicked // off because the jobCount wasn't at the max. const secondJobCount = getActiveJobCount(); const needed = MAX_ATTACHMENT_JOB_PARALLELISM - secondJobCount; if (needed <= 0) { return; } const jobs = nextJobs.slice(0, Math.min(needed, nextJobs.length)); for (let i = 0, max = jobs.length; i < max; i += 1) { const job = jobs[i]; _activeAttachmentDownloadJobs[job.id] = _runJob(job); } } async function _runJob(job) { const { id, messageId, attachment, type, index, attempts } = job || {}; let message; try { if (!job || !attachment || !messageId) { throw new Error( `_runJob: Key information required for job was missing. Job id: ${id}` ); } message = await _getMessage(messageId); if (!message) { logger.error('_runJob: Source message not found, deleting job'); await _finishJob(message, id); return; } const pending = true; await setAttachmentDownloadJobPending(id, pending); let downloaded; const messageReceiver = getMessageReceiver(); if (!messageReceiver) { throw new Error('_runJob: messageReceiver not found'); } try { downloaded = await messageReceiver.downloadAttachment(attachment); } catch (error) { // Attachments on the server expire after 30 days, then start returning 404 if (error && error.code === 404) { logger.warn( `_runJob: Got 404 from server, marking attachment ${ attachment.id } from message ${message.idForLogging()} as permanent error` ); await _finishJob(message, id); await _addAttachmentToMessage( message, _markAttachmentAsError(attachment), { type, index } ); return; } throw error; } const upgradedAttachment = await Signal.Migrations.processNewAttachment( downloaded ); await _addAttachmentToMessage(message, upgradedAttachment, { type, index }); await _finishJob(message, id); } catch (error) { const currentAttempt = (attempts || 0) + 1; if (currentAttempt >= 3) { logger.error( `_runJob: ${currentAttempt} failed attempts, marking attachment ${id} from message ${message.idForLogging()} as permament error:`, error && error.stack ? error.stack : error ); await _finishJob(message, id); await _addAttachmentToMessage( message, _markAttachmentAsError(attachment), { type, index } ); return; } logger.error( `_runJob: Failed to download attachment type ${type} for message ${message.idForLogging()}, attempt ${currentAttempt}:`, error && error.stack ? error.stack : error ); const failedJob = { ...job, pending: 0, attempts: currentAttempt, timestamp: Date.now() + RETRY_BACKOFF[currentAttempt], }; await saveAttachmentDownloadJob(failedJob); delete _activeAttachmentDownloadJobs[id]; _maybeStartJob(); } } async function _getMessage(id) { let item = _messageCache[id]; if (item) { const fiveMinutesAgo = Date.now() - 5 * MINUTE; if (item.timestamp >= fiveMinutesAgo) { return item.message; } delete _messageCache[id]; } let message = await getMessageById(id, { Message: Whisper.Message, }); if (!message) { return message; } // Once more, checking for race conditions item = _messageCache[id]; if (item) { const fiveMinutesAgo = Date.now() - 5 * MINUTE; if (item.timestamp >= fiveMinutesAgo) { return item.message; } } const conversation = message.getConversation(); if (conversation && conversation.messageCollection.get(id)) { message = conversation.get(id); } _messageCache[id] = { timestamp: Date.now(), message, }; return message; } async function _finishJob(message, id) { if (message) { await saveMessage(message.attributes, { Message: Whisper.Message, }); const conversation = message.getConversation(); if (conversation) { const fromConversation = conversation.messageCollection.get(message.id); if (fromConversation && message !== fromConversation) { fromConversation.set(message.attributes); fromConversation.trigger('change', fromConversation); } else { message.trigger('change', message); } } } await removeAttachmentDownloadJob(id); delete _activeAttachmentDownloadJobs[id]; _maybeStartJob(); } function getActiveJobCount() { return Object.keys(_activeAttachmentDownloadJobs).length; } function _markAttachmentAsError(attachment) { return { ...omit(attachment, ['key', 'digest', 'id']), error: true, }; } async function _addAttachmentToMessage(message, attachment, { type, index }) { if (!message) { return; } const logPrefix = `${message.idForLogging()} (type: ${type}, index: ${index})`; if (type === 'long-message') { try { const { data } = await Signal.Migrations.loadAttachmentData(attachment); message.set({ body: attachment.isError ? message.get('body') : stringFromBytes(data), bodyPending: false, }); } finally { Signal.Migrations.deleteAttachmentData(attachment.path); } return; } if (type === 'attachment') { const attachments = message.get('attachments'); if (!attachments || attachments.length <= index) { throw new Error( `_addAttachmentToMessage: attachments didn't exist or ${index} was too large` ); } _replaceAttachment(attachments, index, attachment, logPrefix); return; } if (type === 'preview') { const preview = message.get('preview'); if (!preview || preview.length <= index) { throw new Error( `_addAttachmentToMessage: preview didn't exist or ${index} was too large` ); } const item = preview[index]; if (!item) { throw new Error(`_addAttachmentToMessage: preview ${index} was falsey`); } _replaceAttachment(item, 'image', attachment, logPrefix); return; } if (type === 'contact') { const contact = message.get('contact'); if (!contact || contact.length <= index) { throw new Error( `_addAttachmentToMessage: contact didn't exist or ${index} was too large` ); } const item = contact[index]; if (item && item.avatar && item.avatar.avatar) { _replaceAttachment(item.avatar, 'avatar', attachment, logPrefix); } else { logger.warn( `_addAttachmentToMessage: Couldn't update contact with avatar attachment for message ${message.idForLogging()}` ); } return; } if (type === 'quote') { const quote = message.get('quote'); if (!quote) { throw new Error("_addAttachmentToMessage: quote didn't exist"); } const { attachments } = quote; if (!attachments || attachments.length <= index) { throw new Error( `_addAttachmentToMessage: quote attachments didn't exist or ${index} was too large` ); } const item = attachments[index]; if (!item) { throw new Error( `_addAttachmentToMessage: attachment ${index} was falsey` ); } _replaceAttachment(item, 'thumbnail', attachment, logPrefix); return; } if (type === 'group-avatar') { const group = message.get('group'); if (!group) { throw new Error("_addAttachmentToMessage: group didn't exist"); } const existingAvatar = group.avatar; if (existingAvatar && existingAvatar.path) { await Signal.Migrations.deleteAttachmentData(existingAvatar.path); } _replaceAttachment(group, 'avatar', attachment, logPrefix); return; } throw new Error( `_addAttachmentToMessage: Unknown job type ${type} for message ${message.idForLogging()}` ); } function _replaceAttachment(object, key, newAttachment, logPrefix) { const oldAttachment = object[key]; if (oldAttachment && oldAttachment.path) { logger.warn( `_replaceAttachment: ${logPrefix} - old attachment already had path, not replacing` ); } // eslint-disable-next-line no-param-reassign object[key] = newAttachment; }