Allow manually retrying attachment downloads
This commit is contained in:
parent
59b45399e4
commit
dfc310805a
16 changed files with 265 additions and 58 deletions
|
@ -7,6 +7,7 @@ import { v4 as getGuid } from 'uuid';
|
|||
import dataInterface from '../sql/Client';
|
||||
import * as durations from '../util/durations';
|
||||
import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary';
|
||||
import { strictAssert } from '../util/assert';
|
||||
import { downloadAttachment } from '../util/downloadAttachment';
|
||||
import * as Bytes from '../Bytes';
|
||||
import type {
|
||||
|
@ -16,11 +17,13 @@ import type {
|
|||
|
||||
import type { MessageModel } from '../models/messages';
|
||||
import type { AttachmentType } from '../types/Attachment';
|
||||
import * as Errors from '../types/errors';
|
||||
import type { LoggerType } from '../types/Logging';
|
||||
import * as log from '../logging/log';
|
||||
|
||||
const {
|
||||
getMessageById,
|
||||
getAttachmentDownloadJobById,
|
||||
getNextAttachmentDownloadJobs,
|
||||
removeAttachmentDownloadJob,
|
||||
resetAttachmentDownloadPending,
|
||||
|
@ -91,6 +94,32 @@ export async function addJob(
|
|||
throw new Error('attachments_download/addJob: index must be a number');
|
||||
}
|
||||
|
||||
if (attachment.downloadJobId) {
|
||||
let existingJob = await getAttachmentDownloadJobById(
|
||||
attachment.downloadJobId
|
||||
);
|
||||
if (existingJob) {
|
||||
// Reset job attempts through user's explicit action
|
||||
existingJob = { ...existingJob, attempts: 0 };
|
||||
|
||||
if (_activeAttachmentDownloadJobs[existingJob.id]) {
|
||||
logger.info(
|
||||
`attachment_downloads/addJob: ${existingJob.id} already running`
|
||||
);
|
||||
} else {
|
||||
logger.info(
|
||||
`attachment_downloads/addJob: restarting existing job ${existingJob.id}`
|
||||
);
|
||||
_activeAttachmentDownloadJobs[existingJob.id] = _runJob(existingJob);
|
||||
}
|
||||
|
||||
return {
|
||||
...attachment,
|
||||
pending: true,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
const id = getGuid();
|
||||
const timestamp = Date.now();
|
||||
const toSave: AttachmentDownloadJobType = {
|
||||
|
@ -175,7 +204,7 @@ async function _maybeStartJob(): Promise<void> {
|
|||
|
||||
async function _runJob(job?: AttachmentDownloadJobType): Promise<void> {
|
||||
if (!job) {
|
||||
log.warn('_runJob: Job was missing!');
|
||||
log.warn('attachment_downloads/_runJob: Job was missing!');
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -189,45 +218,60 @@ async function _runJob(job?: AttachmentDownloadJobType): Promise<void> {
|
|||
);
|
||||
}
|
||||
|
||||
logger.info(`attachment_downloads/_runJob for job id ${id}`);
|
||||
|
||||
const found =
|
||||
window.MessageController.getById(messageId) ||
|
||||
(await getMessageById(messageId));
|
||||
if (!found) {
|
||||
logger.error('_runJob: Source message not found, deleting job');
|
||||
await _finishJob(null, id);
|
||||
return;
|
||||
}
|
||||
message = window.MessageController.register(found.id, found);
|
||||
logger.info(`attachment_downloads/_runJob(${id}): starting`);
|
||||
|
||||
const pending = true;
|
||||
await setAttachmentDownloadJobPending(id, pending);
|
||||
|
||||
message = window.MessageController.getById(messageId);
|
||||
if (!message) {
|
||||
const messageAttributes = await getMessageById(messageId);
|
||||
if (!messageAttributes) {
|
||||
logger.error(
|
||||
`attachment_downloads/_runJob(${id}): ` +
|
||||
'Source message not found, deleting job'
|
||||
);
|
||||
await _finishJob(null, id);
|
||||
return;
|
||||
}
|
||||
|
||||
strictAssert(messageId === messageAttributes.id, 'message id mismatch');
|
||||
message = window.MessageController.register(messageId, messageAttributes);
|
||||
}
|
||||
|
||||
await _addAttachmentToMessage(
|
||||
message,
|
||||
{ ...attachment, pending: true },
|
||||
{ type, index }
|
||||
);
|
||||
|
||||
const downloaded = await downloadAttachment(attachment);
|
||||
|
||||
if (!downloaded) {
|
||||
logger.warn(
|
||||
`_runJob: Got 404 from server for CDN ${
|
||||
`attachment_downloads/_runJob(${id}): Got 404 from server for CDN ${
|
||||
attachment.cdnNumber
|
||||
}, marking attachment ${
|
||||
attachment.cdnId || attachment.cdnKey
|
||||
} from message ${message.idForLogging()} as permanent error`
|
||||
);
|
||||
|
||||
await _finishJob(message, id);
|
||||
await _addAttachmentToMessage(
|
||||
message,
|
||||
_markAttachmentAsError(attachment),
|
||||
_markAttachmentAsPermanentError(attachment),
|
||||
{ type, index }
|
||||
);
|
||||
await _finishJob(message, id);
|
||||
return;
|
||||
}
|
||||
|
||||
const upgradedAttachment =
|
||||
await window.Signal.Migrations.processNewAttachment(downloaded);
|
||||
|
||||
await _addAttachmentToMessage(message, upgradedAttachment, { type, index });
|
||||
await _addAttachmentToMessage(message, omit(upgradedAttachment, 'error'), {
|
||||
type,
|
||||
index,
|
||||
});
|
||||
|
||||
await _finishJob(message, id);
|
||||
} catch (error) {
|
||||
|
@ -236,25 +280,43 @@ async function _runJob(job?: AttachmentDownloadJobType): Promise<void> {
|
|||
|
||||
if (currentAttempt >= 3) {
|
||||
logger.error(
|
||||
`_runJob: ${currentAttempt} failed attempts, marking attachment ${id} from message ${logId} as permanent error:`,
|
||||
error && error.stack ? error.stack : error
|
||||
`attachment_downloads/runJob(${id}): ${currentAttempt} failed ` +
|
||||
`attempts, marking attachment from message ${logId} as ` +
|
||||
'error:',
|
||||
Errors.toLogFormat(error)
|
||||
);
|
||||
|
||||
await _finishJob(message, id);
|
||||
await _addAttachmentToMessage(
|
||||
message,
|
||||
_markAttachmentAsError(attachment),
|
||||
_markAttachmentAsTransientError(attachment),
|
||||
{ type, index }
|
||||
);
|
||||
await _finishJob(message, id);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
logger.error(
|
||||
`_runJob: Failed to download attachment type ${type} for message ${logId}, attempt ${currentAttempt}:`,
|
||||
error && error.stack ? error.stack : error
|
||||
`attachment_downloads/_runJob(${id}): Failed to download attachment ` +
|
||||
`type ${type} for message ${logId}, attempt ${currentAttempt}:`,
|
||||
Errors.toLogFormat(error)
|
||||
);
|
||||
|
||||
// Remove `pending` flag from the attachment.
|
||||
await _addAttachmentToMessage(
|
||||
message,
|
||||
{
|
||||
...attachment,
|
||||
downloadJobId: id,
|
||||
},
|
||||
{ type, index }
|
||||
);
|
||||
if (message) {
|
||||
await saveMessage(message.attributes, {
|
||||
ourUuid: window.textsecure.storage.user.getCheckedUuid().toString(),
|
||||
});
|
||||
}
|
||||
|
||||
const failedJob = {
|
||||
...job,
|
||||
pending: 0,
|
||||
|
@ -289,13 +351,21 @@ function getActiveJobCount(): number {
|
|||
return Object.keys(_activeAttachmentDownloadJobs).length;
|
||||
}
|
||||
|
||||
function _markAttachmentAsError(attachment: AttachmentType): AttachmentType {
|
||||
function _markAttachmentAsPermanentError(
|
||||
attachment: AttachmentType
|
||||
): AttachmentType {
|
||||
return {
|
||||
...omit(attachment, ['key', 'digest', 'id']),
|
||||
error: true,
|
||||
};
|
||||
}
|
||||
|
||||
function _markAttachmentAsTransientError(
|
||||
attachment: AttachmentType
|
||||
): AttachmentType {
|
||||
return { ...attachment, error: true };
|
||||
}
|
||||
|
||||
async function _addAttachmentToMessage(
|
||||
message: MessageModel | null | undefined,
|
||||
attachment: AttachmentType,
|
||||
|
@ -308,13 +378,21 @@ async function _addAttachmentToMessage(
|
|||
const logPrefix = `${message.idForLogging()} (type: ${type}, index: ${index})`;
|
||||
|
||||
if (type === 'long-message') {
|
||||
// Attachment wasn't downloaded yet.
|
||||
if (!attachment.path) {
|
||||
message.set({
|
||||
bodyAttachment: attachment,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const { data } = await window.Signal.Migrations.loadAttachmentData(
|
||||
attachment
|
||||
);
|
||||
message.set({
|
||||
body: attachment.error ? message.get('body') : Bytes.toString(data),
|
||||
bodyPending: false,
|
||||
body: Bytes.toString(data),
|
||||
bodyAttachment: undefined,
|
||||
});
|
||||
} finally {
|
||||
if (attachment.path) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue