AttachmentDownloads: Wait for job completion, validate active job list
This commit is contained in:
parent
2b648b79a4
commit
3d94bf953c
1 changed files with 63 additions and 30 deletions
|
@ -195,9 +195,47 @@ async function _maybeStartJob(): Promise<void> {
|
||||||
const job = jobs[i];
|
const job = jobs[i];
|
||||||
const existing = _activeAttachmentDownloadJobs[job.id];
|
const existing = _activeAttachmentDownloadJobs[job.id];
|
||||||
if (existing) {
|
if (existing) {
|
||||||
logger.warn(`_maybeStartJob: Job ${job.id} is already running`);
|
logger.warn(
|
||||||
|
`attachment_downloads/_maybeStartJob: Job ${job.id} is already running`
|
||||||
|
);
|
||||||
} else {
|
} else {
|
||||||
_activeAttachmentDownloadJobs[job.id] = _runJob(job);
|
logger.info(
|
||||||
|
`attachment_downloads/_maybeStartJob: Starting job ${job.id}`
|
||||||
|
);
|
||||||
|
const promise = _runJob(job);
|
||||||
|
_activeAttachmentDownloadJobs[job.id] = promise;
|
||||||
|
|
||||||
|
const postProcess = async () => {
|
||||||
|
const logId = `attachment_downloads/_maybeStartJob/postProcess/${job.id}`;
|
||||||
|
try {
|
||||||
|
await promise;
|
||||||
|
if (_activeAttachmentDownloadJobs[job.id]) {
|
||||||
|
throw new Error(
|
||||||
|
`${logId}: Active attachments jobs list still has this job!`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} catch (error: unknown) {
|
||||||
|
log.error(
|
||||||
|
`${logId}: Download job threw an error, deleting.`,
|
||||||
|
Errors.toLogFormat(error)
|
||||||
|
);
|
||||||
|
|
||||||
|
delete _activeAttachmentDownloadJobs[job.id];
|
||||||
|
try {
|
||||||
|
await removeAttachmentDownloadJob(job.id);
|
||||||
|
} catch (deleteError) {
|
||||||
|
log.error(
|
||||||
|
`${logId}: Failed to delete attachment job`,
|
||||||
|
Errors.toLogFormat(error)
|
||||||
|
);
|
||||||
|
} finally {
|
||||||
|
_maybeStartJob();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Note: intentionally not awaiting
|
||||||
|
postProcess();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -305,35 +343,30 @@ async function _runJob(job?: AttachmentDownloadJobType): Promise<void> {
|
||||||
Errors.toLogFormat(error)
|
Errors.toLogFormat(error)
|
||||||
);
|
);
|
||||||
|
|
||||||
try {
|
// Remove `pending` flag from the attachment.
|
||||||
// Remove `pending` flag from the attachment.
|
await _addAttachmentToMessage(
|
||||||
await _addAttachmentToMessage(
|
message,
|
||||||
message,
|
{
|
||||||
{
|
...attachment,
|
||||||
...attachment,
|
downloadJobId: id,
|
||||||
downloadJobId: id,
|
},
|
||||||
},
|
{ type, index }
|
||||||
{ type, index }
|
);
|
||||||
);
|
if (message) {
|
||||||
if (message) {
|
await saveMessage(message.attributes, {
|
||||||
await saveMessage(message.attributes, {
|
ourUuid: window.textsecure.storage.user.getCheckedUuid().toString(),
|
||||||
ourUuid: window.textsecure.storage.user.getCheckedUuid().toString(),
|
});
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
const failedJob = {
|
|
||||||
...job,
|
|
||||||
pending: 0,
|
|
||||||
attempts: currentAttempt,
|
|
||||||
timestamp:
|
|
||||||
Date.now() + (RETRY_BACKOFF[currentAttempt] || RETRY_BACKOFF[3]),
|
|
||||||
};
|
|
||||||
|
|
||||||
await saveAttachmentDownloadJob(failedJob);
|
|
||||||
} finally {
|
|
||||||
delete _activeAttachmentDownloadJobs[id];
|
|
||||||
_maybeStartJob();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const failedJob = {
|
||||||
|
...job,
|
||||||
|
pending: 0,
|
||||||
|
attempts: currentAttempt,
|
||||||
|
timestamp:
|
||||||
|
Date.now() + (RETRY_BACKOFF[currentAttempt] || RETRY_BACKOFF[3]),
|
||||||
|
};
|
||||||
|
|
||||||
|
await saveAttachmentDownloadJob(failedJob);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue