Only queue backfilled attachments after backfill response
This commit is contained in:
parent
e938e68c7d
commit
2579dfd9d9
14 changed files with 130 additions and 87 deletions
|
@ -193,11 +193,6 @@ export abstract class JobManager<CoreJobType> {
|
||||||
log.info(`${logId}: already running; resetting attempts`);
|
log.info(`${logId}: already running; resetting attempts`);
|
||||||
runningJob.attempts = 0;
|
runningJob.attempts = 0;
|
||||||
|
|
||||||
await this.params.saveJob({
|
|
||||||
...runningJob,
|
|
||||||
attempts: 0,
|
|
||||||
});
|
|
||||||
|
|
||||||
return { isAlreadyRunning: true };
|
return { isAlreadyRunning: true };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ import {
|
||||||
isDownloading,
|
isDownloading,
|
||||||
isDownloaded,
|
isDownloaded,
|
||||||
isDownloadable,
|
isDownloadable,
|
||||||
|
getUndownloadedAttachmentSignature,
|
||||||
} from '../../types/Attachment';
|
} from '../../types/Attachment';
|
||||||
import {
|
import {
|
||||||
type AttachmentDownloadJobTypeType,
|
type AttachmentDownloadJobTypeType,
|
||||||
|
@ -30,7 +31,7 @@ import { missingCaseError } from '../../util/missingCaseError';
|
||||||
import { isStagingServer } from '../../util/isStagingServer';
|
import { isStagingServer } from '../../util/isStagingServer';
|
||||||
import {
|
import {
|
||||||
ensureBodyAttachmentsAreSeparated,
|
ensureBodyAttachmentsAreSeparated,
|
||||||
queueAttachmentDownloadsForMessage,
|
queueAttachmentDownloads,
|
||||||
} from '../../util/queueAttachmentDownloads';
|
} from '../../util/queueAttachmentDownloads';
|
||||||
import { SECOND } from '../../util/durations';
|
import { SECOND } from '../../util/durations';
|
||||||
import { showDownloadFailedToast } from '../../util/showDownloadFailedToast';
|
import { showDownloadFailedToast } from '../../util/showDownloadFailedToast';
|
||||||
|
@ -174,8 +175,7 @@ export class AttachmentBackfill {
|
||||||
// If `true` - show a toast at the end of the process
|
// If `true` - show a toast at the end of the process
|
||||||
let showToast = false;
|
let showToast = false;
|
||||||
|
|
||||||
// If `true` - queue downloads at the end of the process
|
const attachmentSignaturesToDownload = new Set<string>();
|
||||||
let shouldDownload = false;
|
|
||||||
|
|
||||||
// Track number of pending attachments to decide when the request is
|
// Track number of pending attachments to decide when the request is
|
||||||
// fully processed by the phone.
|
// fully processed by the phone.
|
||||||
|
@ -213,7 +213,9 @@ export class AttachmentBackfill {
|
||||||
// other device's backfill request. Update the CDN info without queueing
|
// other device's backfill request. Update the CDN info without queueing
|
||||||
// a download.
|
// a download.
|
||||||
if (isDownloading(updatedSticker.data)) {
|
if (isDownloading(updatedSticker.data)) {
|
||||||
shouldDownload = true;
|
attachmentSignaturesToDownload.add(
|
||||||
|
getUndownloadedAttachmentSignature(updatedSticker.data)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
updatedSticker = {
|
updatedSticker = {
|
||||||
...updatedSticker,
|
...updatedSticker,
|
||||||
|
@ -261,7 +263,9 @@ export class AttachmentBackfill {
|
||||||
} else {
|
} else {
|
||||||
// See sticker handling code above for the reasoning
|
// See sticker handling code above for the reasoning
|
||||||
if (isDownloading(updatedBodyAttachment)) {
|
if (isDownloading(updatedBodyAttachment)) {
|
||||||
shouldDownload = true;
|
attachmentSignaturesToDownload.add(
|
||||||
|
getUndownloadedAttachmentSignature(updatedBodyAttachment)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
updatedBodyAttachment = response.longText.attachment;
|
updatedBodyAttachment = response.longText.attachment;
|
||||||
changeCount += 1;
|
changeCount += 1;
|
||||||
|
@ -298,7 +302,9 @@ export class AttachmentBackfill {
|
||||||
|
|
||||||
// See sticker handling code above for the reasoning
|
// See sticker handling code above for the reasoning
|
||||||
if (isDownloading(existing)) {
|
if (isDownloading(existing)) {
|
||||||
shouldDownload = true;
|
attachmentSignaturesToDownload.add(
|
||||||
|
getUndownloadedAttachmentSignature(existing)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
updatedAttachments[index] = entry.attachment;
|
updatedAttachments[index] = entry.attachment;
|
||||||
}
|
}
|
||||||
|
@ -326,17 +332,18 @@ export class AttachmentBackfill {
|
||||||
editHistory: message.get('editHistory')?.map(edit => ({
|
editHistory: message.get('editHistory')?.map(edit => ({
|
||||||
...edit,
|
...edit,
|
||||||
attachments: updatedAttachments,
|
attachments: updatedAttachments,
|
||||||
bodyAttachment: updatedBodyAttachment,
|
|
||||||
})),
|
})),
|
||||||
});
|
});
|
||||||
|
|
||||||
// It is fine to await below this line
|
// It is fine to await below this line
|
||||||
|
if (attachmentSignaturesToDownload.size) {
|
||||||
if (shouldDownload) {
|
log.info(
|
||||||
log.info(`${logId}: queueing downloads`);
|
`${logId}: queueing ${attachmentSignaturesToDownload.size} download(s)`
|
||||||
await queueAttachmentDownloadsForMessage(message, {
|
);
|
||||||
|
await queueAttachmentDownloads(message, {
|
||||||
source: AttachmentDownloadSource.BACKFILL,
|
source: AttachmentDownloadSource.BACKFILL,
|
||||||
urgency: AttachmentDownloadUrgency.IMMEDIATE,
|
urgency: AttachmentDownloadUrgency.IMMEDIATE,
|
||||||
|
signaturesToQueue: attachmentSignaturesToDownload,
|
||||||
isManualDownload: true,
|
isManualDownload: true,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -203,7 +203,7 @@ export async function onSync(sync: ReadSyncAttributesType): Promise<void> {
|
||||||
message.set({ expirationStartTimestamp });
|
message.set({ expirationStartTimestamp });
|
||||||
}
|
}
|
||||||
|
|
||||||
queueUpdateMessage(message.attributes);
|
drop(queueUpdateMessage(message.attributes));
|
||||||
|
|
||||||
await remove(sync);
|
await remove(sync);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
|
|
@ -17,6 +17,7 @@ import { queueUpdateMessage } from '../util/messageBatcher';
|
||||||
import { isAciString } from '../util/isAciString';
|
import { isAciString } from '../util/isAciString';
|
||||||
import { DataReader, DataWriter } from '../sql/Client';
|
import { DataReader, DataWriter } from '../sql/Client';
|
||||||
import { MessageModel } from '../models/messages';
|
import { MessageModel } from '../models/messages';
|
||||||
|
import { drop } from '../util/drop';
|
||||||
|
|
||||||
const log = createLogger('ViewSyncs');
|
const log = createLogger('ViewSyncs');
|
||||||
|
|
||||||
|
@ -136,7 +137,7 @@ export async function onSync(sync: ViewSyncAttributesType): Promise<void> {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (didChangeMessage) {
|
if (didChangeMessage) {
|
||||||
queueUpdateMessage(message.attributes);
|
drop(queueUpdateMessage(message.attributes));
|
||||||
}
|
}
|
||||||
|
|
||||||
await remove(sync);
|
await remove(sync);
|
||||||
|
|
|
@ -184,7 +184,7 @@ import { getCallHistorySelector } from '../state/selectors/callHistory';
|
||||||
import { migrateLegacyReadStatus } from '../messages/migrateLegacyReadStatus';
|
import { migrateLegacyReadStatus } from '../messages/migrateLegacyReadStatus';
|
||||||
import { migrateLegacySendAttributes } from '../messages/migrateLegacySendAttributes';
|
import { migrateLegacySendAttributes } from '../messages/migrateLegacySendAttributes';
|
||||||
import { getIsInitialContactSync } from '../services/contactSync';
|
import { getIsInitialContactSync } from '../services/contactSync';
|
||||||
import { queueAttachmentDownloadsForMessage } from '../util/queueAttachmentDownloads';
|
import { queueAttachmentDownloadsAndMaybeSaveMessage } from '../util/queueAttachmentDownloads';
|
||||||
import { cleanupMessages } from '../util/cleanup';
|
import { cleanupMessages } from '../util/cleanup';
|
||||||
import { MessageModel } from './messages';
|
import { MessageModel } from './messages';
|
||||||
import { applyNewAvatar } from '../groups';
|
import { applyNewAvatar } from '../groups';
|
||||||
|
@ -2321,13 +2321,9 @@ export class ConversationModel extends window.Backbone
|
||||||
await Promise.all(
|
await Promise.all(
|
||||||
readMessages.map(async m => {
|
readMessages.map(async m => {
|
||||||
const registered = window.MessageCache.register(new MessageModel(m));
|
const registered = window.MessageCache.register(new MessageModel(m));
|
||||||
const shouldSave = await queueAttachmentDownloadsForMessage(
|
await queueAttachmentDownloadsAndMaybeSaveMessage(registered, {
|
||||||
registered,
|
isManualDownload: false,
|
||||||
{ isManualDownload: false }
|
});
|
||||||
);
|
|
||||||
if (shouldSave) {
|
|
||||||
await window.MessageCache.saveMessage(registered.attributes);
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
} while (messages.length > 0);
|
} while (messages.length > 0);
|
||||||
|
|
|
@ -15,6 +15,7 @@ import { eraseMessageContents } from '../util/cleanup';
|
||||||
import { getSource, getSourceServiceId } from '../messages/helpers';
|
import { getSource, getSourceServiceId } from '../messages/helpers';
|
||||||
import { isAciString } from '../util/isAciString';
|
import { isAciString } from '../util/isAciString';
|
||||||
import { viewOnceOpenJobQueue } from '../jobs/viewOnceOpenJobQueue';
|
import { viewOnceOpenJobQueue } from '../jobs/viewOnceOpenJobQueue';
|
||||||
|
import { drop } from '../util/drop';
|
||||||
|
|
||||||
const log = createLogger('MessageUpdater');
|
const log = createLogger('MessageUpdater');
|
||||||
|
|
||||||
|
@ -46,7 +47,7 @@ function markReadOrViewed(
|
||||||
notificationService.removeBy({ messageId });
|
notificationService.removeBy({ messageId });
|
||||||
|
|
||||||
if (!skipSave) {
|
if (!skipSave) {
|
||||||
queueUpdateMessage(nextMessageAttributes);
|
drop(queueUpdateMessage(nextMessageAttributes));
|
||||||
}
|
}
|
||||||
|
|
||||||
return nextMessageAttributes;
|
return nextMessageAttributes;
|
||||||
|
|
|
@ -210,7 +210,7 @@ import { actions as searchActions } from './search';
|
||||||
import type { SearchActionType } from './search';
|
import type { SearchActionType } from './search';
|
||||||
import { getNotificationTextForMessage } from '../../util/getNotificationTextForMessage';
|
import { getNotificationTextForMessage } from '../../util/getNotificationTextForMessage';
|
||||||
import { doubleCheckMissingQuoteReference as doDoubleCheckMissingQuoteReference } from '../../util/doubleCheckMissingQuoteReference';
|
import { doubleCheckMissingQuoteReference as doDoubleCheckMissingQuoteReference } from '../../util/doubleCheckMissingQuoteReference';
|
||||||
import { queueAttachmentDownloadsForMessage } from '../../util/queueAttachmentDownloads';
|
import { queueAttachmentDownloads } from '../../util/queueAttachmentDownloads';
|
||||||
import { markAttachmentAsCorrupted as doMarkAttachmentAsCorrupted } from '../../messageModifiers/AttachmentDownloads';
|
import { markAttachmentAsCorrupted as doMarkAttachmentAsCorrupted } from '../../messageModifiers/AttachmentDownloads';
|
||||||
import {
|
import {
|
||||||
isSent,
|
isSent,
|
||||||
|
@ -2344,7 +2344,7 @@ function kickOffAttachmentDownload(
|
||||||
`kickOffAttachmentDownload: Message ${options.messageId} missing!`
|
`kickOffAttachmentDownload: Message ${options.messageId} missing!`
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
const didUpdateValues = await queueAttachmentDownloadsForMessage(message, {
|
const didUpdateValues = await queueAttachmentDownloads(message, {
|
||||||
urgency: AttachmentDownloadUrgency.IMMEDIATE,
|
urgency: AttachmentDownloadUrgency.IMMEDIATE,
|
||||||
isManualDownload: true,
|
isManualDownload: true,
|
||||||
});
|
});
|
||||||
|
|
|
@ -46,7 +46,7 @@ import { useBoundActions } from '../../hooks/useBoundActions';
|
||||||
import { DataReader } from '../../sql/Client';
|
import { DataReader } from '../../sql/Client';
|
||||||
import { deleteDownloadsJobQueue } from '../../jobs/deleteDownloadsJobQueue';
|
import { deleteDownloadsJobQueue } from '../../jobs/deleteDownloadsJobQueue';
|
||||||
import { AttachmentDownloadUrgency } from '../../types/AttachmentDownload';
|
import { AttachmentDownloadUrgency } from '../../types/AttachmentDownload';
|
||||||
import { queueAttachmentDownloads } from '../../util/queueAttachmentDownloads';
|
import { queueAttachmentDownloadsAndMaybeSaveMessage } from '../../util/queueAttachmentDownloads';
|
||||||
import { getMessageIdForLogging } from '../../util/idForLogging';
|
import { getMessageIdForLogging } from '../../util/idForLogging';
|
||||||
import { markViewOnceMessageViewed } from '../../services/MessageUpdater';
|
import { markViewOnceMessageViewed } from '../../services/MessageUpdater';
|
||||||
|
|
||||||
|
@ -289,16 +289,20 @@ function showLightbox(opts: {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isIncremental(attachment)) {
|
if (isIncremental(attachment)) {
|
||||||
// Queue all attachments, but this target attachment should be IMMEDIATE
|
// Queue this target attachment with urgency IMMEDIATE
|
||||||
const wasUpdated = await queueAttachmentDownloads(message, {
|
await queueAttachmentDownloadsAndMaybeSaveMessage(message, {
|
||||||
attachmentSignatureForImmediate:
|
signaturesToQueue: new Set([
|
||||||
getUndownloadedAttachmentSignature(attachment),
|
getUndownloadedAttachmentSignature(attachment),
|
||||||
|
]),
|
||||||
|
isManualDownload: true,
|
||||||
|
urgency: AttachmentDownloadUrgency.IMMEDIATE,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Queue all the remaining with standard urgency.
|
||||||
|
await queueAttachmentDownloadsAndMaybeSaveMessage(message, {
|
||||||
isManualDownload: true,
|
isManualDownload: true,
|
||||||
urgency: AttachmentDownloadUrgency.STANDARD,
|
urgency: AttachmentDownloadUrgency.STANDARD,
|
||||||
});
|
});
|
||||||
if (wasUpdated) {
|
|
||||||
await window.MessageCache.saveMessage(message);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const attachments = filterValidAttachments(message.attributes);
|
const attachments = filterValidAttachments(message.attributes);
|
||||||
|
|
|
@ -16,7 +16,7 @@ import type { App } from '../playwright';
|
||||||
import { Bootstrap } from '../bootstrap';
|
import { Bootstrap } from '../bootstrap';
|
||||||
import { sendTextMessage, getTimelineMessageWithText } from '../helpers';
|
import { sendTextMessage, getTimelineMessageWithText } from '../helpers';
|
||||||
|
|
||||||
export const debug = createDebug('mock:test:edit');
|
export const debug = createDebug('mock:test:backfill');
|
||||||
|
|
||||||
const FIXTURES_PATH = join(__dirname, '..', '..', '..', 'fixtures');
|
const FIXTURES_PATH = join(__dirname, '..', '..', '..', 'fixtures');
|
||||||
|
|
||||||
|
@ -288,7 +288,7 @@ describe('attachment backfill', function (this: Mocha.Suite) {
|
||||||
});
|
});
|
||||||
await conversationListItem.getByText('Message Request').click();
|
await conversationListItem.getByText('Message Request').click();
|
||||||
|
|
||||||
debug('dowloading attachment');
|
debug('downloading attachment');
|
||||||
const conversationStack = page.locator('.Inbox__conversation-stack');
|
const conversationStack = page.locator('.Inbox__conversation-stack');
|
||||||
const startDownload = conversationStack.getByRole('button', {
|
const startDownload = conversationStack.getByRole('button', {
|
||||||
name: 'Start Download',
|
name: 'Start Download',
|
||||||
|
|
|
@ -11,7 +11,7 @@ import * as MIME from '../types/MIME';
|
||||||
import { DataWriter } from '../sql/Client';
|
import { DataWriter } from '../sql/Client';
|
||||||
import { isMoreRecentThan } from './timestamp';
|
import { isMoreRecentThan } from './timestamp';
|
||||||
import { isNotNil } from './isNotNil';
|
import { isNotNil } from './isNotNil';
|
||||||
import { queueAttachmentDownloadsForMessage } from './queueAttachmentDownloads';
|
import { queueAttachmentDownloads } from './queueAttachmentDownloads';
|
||||||
import { postSaveUpdates } from './cleanup';
|
import { postSaveUpdates } from './cleanup';
|
||||||
|
|
||||||
const log = createLogger('attachmentDownloadQueue');
|
const log = createLogger('attachmentDownloadQueue');
|
||||||
|
@ -90,7 +90,7 @@ export async function flushAttachmentDownloadQueue(): Promise<void> {
|
||||||
// to display the message properly.
|
// to display the message properly.
|
||||||
hasRequiredAttachmentDownloads(message.attributes)
|
hasRequiredAttachmentDownloads(message.attributes)
|
||||||
) {
|
) {
|
||||||
const shouldSave = await queueAttachmentDownloadsForMessage(message, {
|
const shouldSave = await queueAttachmentDownloads(message, {
|
||||||
isManualDownload: false,
|
isManualDownload: false,
|
||||||
});
|
});
|
||||||
if (shouldSave) {
|
if (shouldSave) {
|
||||||
|
|
|
@ -14,6 +14,7 @@ import {
|
||||||
} from '../messages/helpers';
|
} from '../messages/helpers';
|
||||||
import { copyQuoteContentFromOriginal } from '../messages/copyQuote';
|
import { copyQuoteContentFromOriginal } from '../messages/copyQuote';
|
||||||
import { queueUpdateMessage } from './messageBatcher';
|
import { queueUpdateMessage } from './messageBatcher';
|
||||||
|
import { drop } from './drop';
|
||||||
|
|
||||||
const log = createLogger('doubleCheckMissingQuoteReference');
|
const log = createLogger('doubleCheckMissingQuoteReference');
|
||||||
|
|
||||||
|
@ -85,6 +86,6 @@ export async function doubleCheckMissingQuoteReference(
|
||||||
referencedMessageNotFound: false,
|
referencedMessageNotFound: false,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
queueUpdateMessage(message.attributes);
|
drop(queueUpdateMessage(message.attributes));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,17 +2,15 @@
|
||||||
// SPDX-License-Identifier: AGPL-3.0-only
|
// SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
import type { ReadonlyMessageAttributesType } from '../model-types.d';
|
import type { ReadonlyMessageAttributesType } from '../model-types.d';
|
||||||
import { createBatcher } from './batcher';
|
|
||||||
import { createWaitBatcher } from './waitBatcher';
|
import { createWaitBatcher } from './waitBatcher';
|
||||||
import { DataWriter } from '../sql/Client';
|
import { DataWriter } from '../sql/Client';
|
||||||
import { createLogger } from '../logging/log';
|
import { createLogger } from '../logging/log';
|
||||||
import { postSaveUpdates } from './cleanup';
|
import { postSaveUpdates } from './cleanup';
|
||||||
import { MessageModel } from '../models/messages';
|
import { MessageModel } from '../models/messages';
|
||||||
import { drop } from './drop';
|
|
||||||
|
|
||||||
const log = createLogger('messageBatcher');
|
const log = createLogger('messageBatcher');
|
||||||
|
|
||||||
const updateMessageBatcher = createBatcher<ReadonlyMessageAttributesType>({
|
const updateMessageBatcher = createWaitBatcher<ReadonlyMessageAttributesType>({
|
||||||
name: 'messageBatcher.updateMessageBatcher',
|
name: 'messageBatcher.updateMessageBatcher',
|
||||||
wait: 75,
|
wait: 75,
|
||||||
maxSize: 50,
|
maxSize: 50,
|
||||||
|
@ -33,13 +31,13 @@ const updateMessageBatcher = createBatcher<ReadonlyMessageAttributesType>({
|
||||||
|
|
||||||
let shouldBatch = true;
|
let shouldBatch = true;
|
||||||
|
|
||||||
export function queueUpdateMessage(
|
export async function queueUpdateMessage(
|
||||||
messageAttr: ReadonlyMessageAttributesType
|
messageAttr: ReadonlyMessageAttributesType
|
||||||
): void {
|
): Promise<void> {
|
||||||
if (shouldBatch) {
|
if (shouldBatch) {
|
||||||
updateMessageBatcher.add(messageAttr);
|
await updateMessageBatcher.add(messageAttr);
|
||||||
} else {
|
} else {
|
||||||
drop(window.MessageCache.saveMessage(messageAttr));
|
await window.MessageCache.saveMessage(messageAttr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -207,7 +207,7 @@ export async function onStoryRecipientUpdate(
|
||||||
message.set({
|
message.set({
|
||||||
sendStateByConversationId: nextSendStateByConversationId,
|
sendStateByConversationId: nextSendStateByConversationId,
|
||||||
});
|
});
|
||||||
queueUpdateMessage(message.attributes);
|
drop(queueUpdateMessage(message.attributes));
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -89,29 +89,28 @@ export async function handleAttachmentDownloadsForNewMessage(
|
||||||
if (shouldUseAttachmentDownloadQueue()) {
|
if (shouldUseAttachmentDownloadQueue()) {
|
||||||
addToAttachmentDownloadQueue(logId, message);
|
addToAttachmentDownloadQueue(logId, message);
|
||||||
} else {
|
} else {
|
||||||
await queueAttachmentDownloadsForMessage(message, {
|
await queueAttachmentDownloadsAndMaybeSaveMessage(message, {
|
||||||
isManualDownload: false,
|
isManualDownload: false,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function queueAttachmentDownloadsForMessage(
|
export async function queueAttachmentDownloadsAndMaybeSaveMessage(
|
||||||
message: MessageModel,
|
message: MessageModel,
|
||||||
options: {
|
options: {
|
||||||
urgency?: AttachmentDownloadUrgency;
|
|
||||||
source?: AttachmentDownloadSource;
|
|
||||||
isManualDownload: boolean;
|
isManualDownload: boolean;
|
||||||
|
urgency?: AttachmentDownloadUrgency;
|
||||||
|
signaturesToQueue?: Set<string>;
|
||||||
|
source?: AttachmentDownloadSource;
|
||||||
}
|
}
|
||||||
): Promise<boolean> {
|
): Promise<void> {
|
||||||
const updated = await queueAttachmentDownloads(message, options);
|
const updated = await queueAttachmentDownloads(message, options);
|
||||||
if (!updated) {
|
if (!updated) {
|
||||||
return false;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
queueUpdateMessage(message.attributes);
|
await queueUpdateMessage(message.attributes);
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Receive logic
|
// Receive logic
|
||||||
|
@ -120,17 +119,28 @@ export async function queueAttachmentDownloadsForMessage(
|
||||||
export async function queueAttachmentDownloads(
|
export async function queueAttachmentDownloads(
|
||||||
message: MessageModel,
|
message: MessageModel,
|
||||||
{
|
{
|
||||||
attachmentSignatureForImmediate,
|
|
||||||
isManualDownload,
|
isManualDownload,
|
||||||
source = AttachmentDownloadSource.STANDARD,
|
source = AttachmentDownloadSource.STANDARD,
|
||||||
|
signaturesToQueue,
|
||||||
urgency = AttachmentDownloadUrgency.STANDARD,
|
urgency = AttachmentDownloadUrgency.STANDARD,
|
||||||
}: {
|
}: {
|
||||||
attachmentSignatureForImmediate?: string;
|
|
||||||
isManualDownload: boolean;
|
isManualDownload: boolean;
|
||||||
|
signaturesToQueue?: Set<string>;
|
||||||
source?: AttachmentDownloadSource;
|
source?: AttachmentDownloadSource;
|
||||||
urgency?: AttachmentDownloadUrgency;
|
urgency?: AttachmentDownloadUrgency;
|
||||||
}
|
}
|
||||||
): Promise<boolean> {
|
): Promise<boolean> {
|
||||||
|
function shouldQueueAttachmentBasedOnSignature(
|
||||||
|
attachment: AttachmentType
|
||||||
|
): boolean {
|
||||||
|
if (!signaturesToQueue) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return signaturesToQueue.has(
|
||||||
|
getUndownloadedAttachmentSignature(attachment)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
const autoDownloadAttachment = window.storage.get(
|
const autoDownloadAttachment = window.storage.get(
|
||||||
'auto-download-attachment',
|
'auto-download-attachment',
|
||||||
DEFAULT_AUTO_DOWNLOAD_ATTACHMENT
|
DEFAULT_AUTO_DOWNLOAD_ATTACHMENT
|
||||||
|
@ -159,6 +169,7 @@ export async function queueAttachmentDownloads(
|
||||||
.map(editHistory => editHistory.bodyAttachment) ?? []),
|
.map(editHistory => editHistory.bodyAttachment) ?? []),
|
||||||
]
|
]
|
||||||
.filter(isNotNil)
|
.filter(isNotNil)
|
||||||
|
.filter(shouldQueueAttachmentBasedOnSignature)
|
||||||
.filter(attachment => !isDownloaded(attachment));
|
.filter(attachment => !isDownloaded(attachment));
|
||||||
|
|
||||||
if (bodyAttachmentsToDownload.length) {
|
if (bodyAttachmentsToDownload.length) {
|
||||||
|
@ -185,7 +196,6 @@ export async function queueAttachmentDownloads(
|
||||||
const startingAttachments = message.get('attachments') || [];
|
const startingAttachments = message.get('attachments') || [];
|
||||||
const { attachments, count: attachmentsCount } = await queueNormalAttachments(
|
const { attachments, count: attachmentsCount } = await queueNormalAttachments(
|
||||||
{
|
{
|
||||||
attachmentSignatureForImmediate,
|
|
||||||
attachments: startingAttachments,
|
attachments: startingAttachments,
|
||||||
isManualDownload,
|
isManualDownload,
|
||||||
logId,
|
logId,
|
||||||
|
@ -197,6 +207,7 @@ export async function queueAttachmentDownloads(
|
||||||
sentAt: message.get('sent_at'),
|
sentAt: message.get('sent_at'),
|
||||||
source,
|
source,
|
||||||
urgency,
|
urgency,
|
||||||
|
shouldQueueAttachmentBasedOnSignature,
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -210,24 +221,25 @@ export async function queueAttachmentDownloads(
|
||||||
}
|
}
|
||||||
count += attachmentsCount;
|
count += attachmentsCount;
|
||||||
|
|
||||||
const previewsToQueue = message.get('preview') || [];
|
const previews = message.get('preview') || [];
|
||||||
const { preview, count: previewCount } = await queuePreviews({
|
const { preview, count: previewCount } = await queuePreviews({
|
||||||
logId,
|
logId,
|
||||||
isManualDownload,
|
isManualDownload,
|
||||||
messageId,
|
messageId,
|
||||||
previews: previewsToQueue,
|
previews,
|
||||||
otherPreviews: message.get('editHistory')?.flatMap(x => x.preview ?? []),
|
otherPreviews: message.get('editHistory')?.flatMap(x => x.preview ?? []),
|
||||||
receivedAt: message.get('received_at'),
|
receivedAt: message.get('received_at'),
|
||||||
sentAt: message.get('sent_at'),
|
sentAt: message.get('sent_at'),
|
||||||
urgency,
|
urgency,
|
||||||
source,
|
source,
|
||||||
|
shouldQueueAttachmentBasedOnSignature,
|
||||||
});
|
});
|
||||||
if (previewCount > 0) {
|
if (previewCount > 0) {
|
||||||
message.set({ preview });
|
message.set({ preview });
|
||||||
}
|
}
|
||||||
if (previewsToQueue.length > 0) {
|
if (previews.length > 0) {
|
||||||
log.info(
|
log.info(
|
||||||
`${logId}: Queued ${previewCount} (of ${previewsToQueue.length}) preview attachment downloads`
|
`${logId}: Queued ${previewCount} (of ${previews.length}) preview attachment downloads`
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
count += previewCount;
|
count += previewCount;
|
||||||
|
@ -247,6 +259,7 @@ export async function queueAttachmentDownloads(
|
||||||
sentAt: message.get('sent_at'),
|
sentAt: message.get('sent_at'),
|
||||||
source,
|
source,
|
||||||
urgency,
|
urgency,
|
||||||
|
shouldQueueAttachmentBasedOnSignature,
|
||||||
});
|
});
|
||||||
if (thumbnailCount > 0) {
|
if (thumbnailCount > 0) {
|
||||||
message.set({ quote });
|
message.set({ quote });
|
||||||
|
@ -265,6 +278,11 @@ export async function queueAttachmentDownloads(
|
||||||
if (!item.avatar || !item.avatar.avatar) {
|
if (!item.avatar || !item.avatar.avatar) {
|
||||||
return item;
|
return item;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!shouldQueueAttachmentBasedOnSignature(item.avatar.avatar)) {
|
||||||
|
return item;
|
||||||
|
}
|
||||||
|
|
||||||
// We've already downloaded this!
|
// We've already downloaded this!
|
||||||
if (item.avatar.avatar.path) {
|
if (item.avatar.avatar.path) {
|
||||||
log.info(`${logId}: Contact attachment already downloaded`);
|
log.info(`${logId}: Contact attachment already downloaded`);
|
||||||
|
@ -311,7 +329,6 @@ export async function queueAttachmentDownloads(
|
||||||
if (sticker && sticker.data && sticker.data.path) {
|
if (sticker && sticker.data && sticker.data.path) {
|
||||||
log.info(`${logId}: Sticker attachment already downloaded`);
|
log.info(`${logId}: Sticker attachment already downloaded`);
|
||||||
} else if (sticker) {
|
} else if (sticker) {
|
||||||
count += 1;
|
|
||||||
const { packId, stickerId, packKey } = sticker;
|
const { packId, stickerId, packKey } = sticker;
|
||||||
|
|
||||||
const status = getStickerPackStatus(packId);
|
const status = getStickerPackStatus(packId);
|
||||||
|
@ -320,6 +337,7 @@ export async function queueAttachmentDownloads(
|
||||||
try {
|
try {
|
||||||
log.info(`${logId}: Copying sticker from installed pack`);
|
log.info(`${logId}: Copying sticker from installed pack`);
|
||||||
copiedSticker = true;
|
copiedSticker = true;
|
||||||
|
count += 1;
|
||||||
const data = await copyStickerToAttachments(packId, stickerId);
|
const data = await copyStickerToAttachments(packId, stickerId);
|
||||||
|
|
||||||
// Refresh sticker attachment since we had to await above
|
// Refresh sticker attachment since we had to await above
|
||||||
|
@ -342,17 +360,20 @@ export async function queueAttachmentDownloads(
|
||||||
|
|
||||||
if (!copiedSticker) {
|
if (!copiedSticker) {
|
||||||
if (sticker.data) {
|
if (sticker.data) {
|
||||||
log.info(`${logId}: Queueing sticker download`);
|
if (shouldQueueAttachmentBasedOnSignature(sticker.data)) {
|
||||||
await AttachmentDownloadManager.addJob({
|
log.info(`${logId}: Queueing sticker download`);
|
||||||
attachment: sticker.data,
|
count += 1;
|
||||||
attachmentType: 'sticker',
|
await AttachmentDownloadManager.addJob({
|
||||||
isManualDownload,
|
attachment: sticker.data,
|
||||||
messageId,
|
attachmentType: 'sticker',
|
||||||
receivedAt: message.get('received_at'),
|
isManualDownload,
|
||||||
sentAt: message.get('sent_at'),
|
messageId,
|
||||||
source,
|
receivedAt: message.get('received_at'),
|
||||||
urgency,
|
sentAt: message.get('sent_at'),
|
||||||
});
|
source,
|
||||||
|
urgency,
|
||||||
|
});
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
log.error(`${logId}: Sticker data was missing`);
|
log.error(`${logId}: Sticker data was missing`);
|
||||||
}
|
}
|
||||||
|
@ -389,6 +410,7 @@ export async function queueAttachmentDownloads(
|
||||||
sentAt: message.get('sent_at'),
|
sentAt: message.get('sent_at'),
|
||||||
source,
|
source,
|
||||||
urgency,
|
urgency,
|
||||||
|
shouldQueueAttachmentBasedOnSignature,
|
||||||
});
|
});
|
||||||
count += editAttachmentsCount;
|
count += editAttachmentsCount;
|
||||||
allEditsAttachmentCount += editAttachmentsCount;
|
allEditsAttachmentCount += editAttachmentsCount;
|
||||||
|
@ -410,6 +432,7 @@ export async function queueAttachmentDownloads(
|
||||||
sentAt: message.get('sent_at'),
|
sentAt: message.get('sent_at'),
|
||||||
urgency,
|
urgency,
|
||||||
source,
|
source,
|
||||||
|
shouldQueueAttachmentBasedOnSignature,
|
||||||
});
|
});
|
||||||
count += editPreviewCount;
|
count += editPreviewCount;
|
||||||
allEditsAttachmentCount += editPreviewCount;
|
allEditsAttachmentCount += editPreviewCount;
|
||||||
|
@ -442,7 +465,6 @@ export async function queueAttachmentDownloads(
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function queueNormalAttachments({
|
export async function queueNormalAttachments({
|
||||||
attachmentSignatureForImmediate,
|
|
||||||
attachments = [],
|
attachments = [],
|
||||||
isManualDownload,
|
isManualDownload,
|
||||||
logId,
|
logId,
|
||||||
|
@ -452,8 +474,8 @@ export async function queueNormalAttachments({
|
||||||
sentAt,
|
sentAt,
|
||||||
source,
|
source,
|
||||||
urgency,
|
urgency,
|
||||||
|
shouldQueueAttachmentBasedOnSignature,
|
||||||
}: {
|
}: {
|
||||||
attachmentSignatureForImmediate?: string;
|
|
||||||
attachments: MessageAttributesType['attachments'];
|
attachments: MessageAttributesType['attachments'];
|
||||||
isManualDownload: boolean;
|
isManualDownload: boolean;
|
||||||
logId: string;
|
logId: string;
|
||||||
|
@ -463,6 +485,9 @@ export async function queueNormalAttachments({
|
||||||
sentAt: number;
|
sentAt: number;
|
||||||
source: AttachmentDownloadSource;
|
source: AttachmentDownloadSource;
|
||||||
urgency: AttachmentDownloadUrgency;
|
urgency: AttachmentDownloadUrgency;
|
||||||
|
shouldQueueAttachmentBasedOnSignature: (
|
||||||
|
attachment: AttachmentType
|
||||||
|
) => boolean;
|
||||||
}): Promise<{
|
}): Promise<{
|
||||||
attachments: Array<AttachmentType>;
|
attachments: Array<AttachmentType>;
|
||||||
count: number;
|
count: number;
|
||||||
|
@ -486,6 +511,10 @@ export async function queueNormalAttachments({
|
||||||
return attachment;
|
return attachment;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!shouldQueueAttachmentBasedOnSignature(attachment)) {
|
||||||
|
return attachment;
|
||||||
|
}
|
||||||
|
|
||||||
if (isLongMessage(attachment.contentType)) {
|
if (isLongMessage(attachment.contentType)) {
|
||||||
throw new Error(
|
throw new Error(
|
||||||
`${logId}: queueNormalAttachments passed long-message attachment`
|
`${logId}: queueNormalAttachments passed long-message attachment`
|
||||||
|
@ -545,12 +574,6 @@ export async function queueNormalAttachments({
|
||||||
|
|
||||||
count += 1;
|
count += 1;
|
||||||
|
|
||||||
const urgencyForAttachment =
|
|
||||||
attachmentSignatureForImmediate &&
|
|
||||||
attachmentSignatureForImmediate ===
|
|
||||||
getUndownloadedAttachmentSignature(attachment)
|
|
||||||
? AttachmentDownloadUrgency.IMMEDIATE
|
|
||||||
: urgency;
|
|
||||||
return AttachmentDownloadManager.addJob({
|
return AttachmentDownloadManager.addJob({
|
||||||
attachment,
|
attachment,
|
||||||
attachmentType: 'attachment',
|
attachmentType: 'attachment',
|
||||||
|
@ -559,7 +582,7 @@ export async function queueNormalAttachments({
|
||||||
receivedAt,
|
receivedAt,
|
||||||
sentAt,
|
sentAt,
|
||||||
source,
|
source,
|
||||||
urgency: urgencyForAttachment,
|
urgency,
|
||||||
});
|
});
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
@ -580,6 +603,7 @@ async function queuePreviews({
|
||||||
sentAt,
|
sentAt,
|
||||||
source,
|
source,
|
||||||
urgency,
|
urgency,
|
||||||
|
shouldQueueAttachmentBasedOnSignature,
|
||||||
}: {
|
}: {
|
||||||
isManualDownload: boolean;
|
isManualDownload: boolean;
|
||||||
logId: string;
|
logId: string;
|
||||||
|
@ -590,6 +614,9 @@ async function queuePreviews({
|
||||||
sentAt: number;
|
sentAt: number;
|
||||||
source: AttachmentDownloadSource;
|
source: AttachmentDownloadSource;
|
||||||
urgency: AttachmentDownloadUrgency;
|
urgency: AttachmentDownloadUrgency;
|
||||||
|
shouldQueueAttachmentBasedOnSignature: (
|
||||||
|
attachment: AttachmentType
|
||||||
|
) => boolean;
|
||||||
}): Promise<{ preview: Array<LinkPreviewType>; count: number }> {
|
}): Promise<{ preview: Array<LinkPreviewType>; count: number }> {
|
||||||
const log = getLogger(source);
|
const log = getLogger(source);
|
||||||
const previewSignatures: Map<string, AttachmentType> = new Map();
|
const previewSignatures: Map<string, AttachmentType> = new Map();
|
||||||
|
@ -606,6 +633,11 @@ async function queuePreviews({
|
||||||
if (!item.image) {
|
if (!item.image) {
|
||||||
return item;
|
return item;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!shouldQueueAttachmentBasedOnSignature(item.image)) {
|
||||||
|
return item;
|
||||||
|
}
|
||||||
|
|
||||||
// We've already downloaded this!
|
// We've already downloaded this!
|
||||||
if (isDownloaded(item.image)) {
|
if (isDownloaded(item.image)) {
|
||||||
log.info(`${logId}: Preview attachment already downloaded`);
|
log.info(`${logId}: Preview attachment already downloaded`);
|
||||||
|
@ -673,6 +705,7 @@ async function queueQuoteAttachments({
|
||||||
sentAt,
|
sentAt,
|
||||||
source,
|
source,
|
||||||
urgency,
|
urgency,
|
||||||
|
shouldQueueAttachmentBasedOnSignature,
|
||||||
}: {
|
}: {
|
||||||
logId: string;
|
logId: string;
|
||||||
isManualDownload: boolean;
|
isManualDownload: boolean;
|
||||||
|
@ -683,6 +716,9 @@ async function queueQuoteAttachments({
|
||||||
sentAt: number;
|
sentAt: number;
|
||||||
source: AttachmentDownloadSource;
|
source: AttachmentDownloadSource;
|
||||||
urgency: AttachmentDownloadUrgency;
|
urgency: AttachmentDownloadUrgency;
|
||||||
|
shouldQueueAttachmentBasedOnSignature: (
|
||||||
|
attachment: AttachmentType
|
||||||
|
) => boolean;
|
||||||
}): Promise<{ quote?: QuotedMessageType; count: number }> {
|
}): Promise<{ quote?: QuotedMessageType; count: number }> {
|
||||||
const log = getLogger(source);
|
const log = getLogger(source);
|
||||||
let count = 0;
|
let count = 0;
|
||||||
|
@ -715,7 +751,11 @@ async function queueQuoteAttachments({
|
||||||
if (!item.thumbnail) {
|
if (!item.thumbnail) {
|
||||||
return item;
|
return item;
|
||||||
}
|
}
|
||||||
// We've already downloaded this!
|
|
||||||
|
if (!shouldQueueAttachmentBasedOnSignature(item.thumbnail)) {
|
||||||
|
return item;
|
||||||
|
}
|
||||||
|
|
||||||
if (isDownloaded(item.thumbnail)) {
|
if (isDownloaded(item.thumbnail)) {
|
||||||
log.info(`${logId}: Quote attachment already downloaded`);
|
log.info(`${logId}: Quote attachment already downloaded`);
|
||||||
return item;
|
return item;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue