Only queue backfilled attachments after backfill response

Co-authored-by: trevor-signal <131492920+trevor-signal@users.noreply.github.com>
Co-authored-by: trevor-signal <trevor@signal.org>
This commit is contained in:
automated-signal 2025-07-15 11:58:33 -05:00 committed by GitHub
parent 0a2f341602
commit 233d5e6e28
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 131 additions and 88 deletions

View file

@ -429,7 +429,7 @@ jobs:
run: |
set -o pipefail
xvfb-run --auto-servernum pnpm run test-mock
timeout-minutes: 10
timeout-minutes: 15
env:
NODE_ENV: production
DEBUG: mock:test:*

View file

@ -193,11 +193,6 @@ export abstract class JobManager<CoreJobType> {
log.info(`${logId}: already running; resetting attempts`);
runningJob.attempts = 0;
await this.params.saveJob({
...runningJob,
attempts: 0,
});
return { isAlreadyRunning: true };
}

View file

@ -10,6 +10,7 @@ import {
isDownloading,
isDownloaded,
isDownloadable,
getUndownloadedAttachmentSignature,
} from '../../types/Attachment';
import {
type AttachmentDownloadJobTypeType,
@ -30,7 +31,7 @@ import { missingCaseError } from '../../util/missingCaseError';
import { isStagingServer } from '../../util/isStagingServer';
import {
ensureBodyAttachmentsAreSeparated,
queueAttachmentDownloadsForMessage,
queueAttachmentDownloads,
} from '../../util/queueAttachmentDownloads';
import { SECOND } from '../../util/durations';
import { showDownloadFailedToast } from '../../util/showDownloadFailedToast';
@ -174,8 +175,7 @@ export class AttachmentBackfill {
// If `true` - show a toast at the end of the process
let showToast = false;
// If `true` - queue downloads at the end of the process
let shouldDownload = false;
const attachmentSignaturesToDownload = new Set<string>();
// Track number of pending attachments to decide when the request is
// fully processed by the phone.
@ -213,7 +213,9 @@ export class AttachmentBackfill {
// other device's backfill request. Update the CDN info without queueing
// a download.
if (isDownloading(updatedSticker.data)) {
shouldDownload = true;
attachmentSignaturesToDownload.add(
getUndownloadedAttachmentSignature(updatedSticker.data)
);
}
updatedSticker = {
...updatedSticker,
@ -261,7 +263,9 @@ export class AttachmentBackfill {
} else {
// See sticker handling code above for the reasoning
if (isDownloading(updatedBodyAttachment)) {
shouldDownload = true;
attachmentSignaturesToDownload.add(
getUndownloadedAttachmentSignature(updatedBodyAttachment)
);
}
updatedBodyAttachment = response.longText.attachment;
changeCount += 1;
@ -298,7 +302,9 @@ export class AttachmentBackfill {
// See sticker handling code above for the reasoning
if (isDownloading(existing)) {
shouldDownload = true;
attachmentSignaturesToDownload.add(
getUndownloadedAttachmentSignature(existing)
);
}
updatedAttachments[index] = entry.attachment;
}
@ -326,17 +332,18 @@ export class AttachmentBackfill {
editHistory: message.get('editHistory')?.map(edit => ({
...edit,
attachments: updatedAttachments,
bodyAttachment: updatedBodyAttachment,
})),
});
// It is fine to await below this line
if (shouldDownload) {
log.info(`${logId}: queueing downloads`);
await queueAttachmentDownloadsForMessage(message, {
if (attachmentSignaturesToDownload.size) {
log.info(
`${logId}: queueing ${attachmentSignaturesToDownload.size} download(s)`
);
await queueAttachmentDownloads(message, {
source: AttachmentDownloadSource.BACKFILL,
urgency: AttachmentDownloadUrgency.IMMEDIATE,
signaturesToQueue: attachmentSignaturesToDownload,
isManualDownload: true,
});
}

View file

@ -203,7 +203,7 @@ export async function onSync(sync: ReadSyncAttributesType): Promise<void> {
message.set({ expirationStartTimestamp });
}
queueUpdateMessage(message.attributes);
drop(queueUpdateMessage(message.attributes));
await remove(sync);
} catch (error) {

View file

@ -17,6 +17,7 @@ import { queueUpdateMessage } from '../util/messageBatcher';
import { isAciString } from '../util/isAciString';
import { DataReader, DataWriter } from '../sql/Client';
import { MessageModel } from '../models/messages';
import { drop } from '../util/drop';
const log = createLogger('ViewSyncs');
@ -136,7 +137,7 @@ export async function onSync(sync: ViewSyncAttributesType): Promise<void> {
}
if (didChangeMessage) {
queueUpdateMessage(message.attributes);
drop(queueUpdateMessage(message.attributes));
}
await remove(sync);

View file

@ -184,7 +184,7 @@ import { getCallHistorySelector } from '../state/selectors/callHistory';
import { migrateLegacyReadStatus } from '../messages/migrateLegacyReadStatus';
import { migrateLegacySendAttributes } from '../messages/migrateLegacySendAttributes';
import { getIsInitialContactSync } from '../services/contactSync';
import { queueAttachmentDownloadsForMessage } from '../util/queueAttachmentDownloads';
import { queueAttachmentDownloadsAndMaybeSaveMessage } from '../util/queueAttachmentDownloads';
import { cleanupMessages } from '../util/cleanup';
import { MessageModel } from './messages';
import { applyNewAvatar } from '../groups';
@ -2321,13 +2321,9 @@ export class ConversationModel extends window.Backbone
await Promise.all(
readMessages.map(async m => {
const registered = window.MessageCache.register(new MessageModel(m));
const shouldSave = await queueAttachmentDownloadsForMessage(
registered,
{ isManualDownload: false }
);
if (shouldSave) {
await window.MessageCache.saveMessage(registered.attributes);
}
await queueAttachmentDownloadsAndMaybeSaveMessage(registered, {
isManualDownload: false,
});
})
);
} while (messages.length > 0);

View file

@ -15,6 +15,7 @@ import { eraseMessageContents } from '../util/cleanup';
import { getSource, getSourceServiceId } from '../messages/helpers';
import { isAciString } from '../util/isAciString';
import { viewOnceOpenJobQueue } from '../jobs/viewOnceOpenJobQueue';
import { drop } from '../util/drop';
const log = createLogger('MessageUpdater');
@ -46,7 +47,7 @@ function markReadOrViewed(
notificationService.removeBy({ messageId });
if (!skipSave) {
queueUpdateMessage(nextMessageAttributes);
drop(queueUpdateMessage(nextMessageAttributes));
}
return nextMessageAttributes;

View file

@ -210,7 +210,7 @@ import { actions as searchActions } from './search';
import type { SearchActionType } from './search';
import { getNotificationTextForMessage } from '../../util/getNotificationTextForMessage';
import { doubleCheckMissingQuoteReference as doDoubleCheckMissingQuoteReference } from '../../util/doubleCheckMissingQuoteReference';
import { queueAttachmentDownloadsForMessage } from '../../util/queueAttachmentDownloads';
import { queueAttachmentDownloads } from '../../util/queueAttachmentDownloads';
import { markAttachmentAsCorrupted as doMarkAttachmentAsCorrupted } from '../../messageModifiers/AttachmentDownloads';
import {
isSent,
@ -2344,7 +2344,7 @@ function kickOffAttachmentDownload(
`kickOffAttachmentDownload: Message ${options.messageId} missing!`
);
}
const didUpdateValues = await queueAttachmentDownloadsForMessage(message, {
const didUpdateValues = await queueAttachmentDownloads(message, {
urgency: AttachmentDownloadUrgency.IMMEDIATE,
isManualDownload: true,
});

View file

@ -46,7 +46,7 @@ import { useBoundActions } from '../../hooks/useBoundActions';
import { DataReader } from '../../sql/Client';
import { deleteDownloadsJobQueue } from '../../jobs/deleteDownloadsJobQueue';
import { AttachmentDownloadUrgency } from '../../types/AttachmentDownload';
import { queueAttachmentDownloads } from '../../util/queueAttachmentDownloads';
import { queueAttachmentDownloadsAndMaybeSaveMessage } from '../../util/queueAttachmentDownloads';
import { getMessageIdForLogging } from '../../util/idForLogging';
import { markViewOnceMessageViewed } from '../../services/MessageUpdater';
@ -289,16 +289,20 @@ function showLightbox(opts: {
}
if (isIncremental(attachment)) {
// Queue all attachments, but this target attachment should be IMMEDIATE
const wasUpdated = await queueAttachmentDownloads(message, {
attachmentSignatureForImmediate:
// Queue this target attachment with urgency IMMEDIATE
await queueAttachmentDownloadsAndMaybeSaveMessage(message, {
signaturesToQueue: new Set([
getUndownloadedAttachmentSignature(attachment),
]),
isManualDownload: true,
urgency: AttachmentDownloadUrgency.IMMEDIATE,
});
// Queue all the remaining with standard urgency.
await queueAttachmentDownloadsAndMaybeSaveMessage(message, {
isManualDownload: true,
urgency: AttachmentDownloadUrgency.STANDARD,
});
if (wasUpdated) {
await window.MessageCache.saveMessage(message);
}
}
const attachments = filterValidAttachments(message.attributes);

View file

@ -16,7 +16,7 @@ import type { App } from '../playwright';
import { Bootstrap } from '../bootstrap';
import { sendTextMessage, getTimelineMessageWithText } from '../helpers';
export const debug = createDebug('mock:test:edit');
export const debug = createDebug('mock:test:backfill');
const FIXTURES_PATH = join(__dirname, '..', '..', '..', 'fixtures');
@ -288,7 +288,7 @@ describe('attachment backfill', function (this: Mocha.Suite) {
});
await conversationListItem.getByText('Message Request').click();
debug('dowloading attachment');
debug('downloading attachment');
const conversationStack = page.locator('.Inbox__conversation-stack');
const startDownload = conversationStack.getByRole('button', {
name: 'Start Download',

View file

@ -11,7 +11,7 @@ import * as MIME from '../types/MIME';
import { DataWriter } from '../sql/Client';
import { isMoreRecentThan } from './timestamp';
import { isNotNil } from './isNotNil';
import { queueAttachmentDownloadsForMessage } from './queueAttachmentDownloads';
import { queueAttachmentDownloads } from './queueAttachmentDownloads';
import { postSaveUpdates } from './cleanup';
const log = createLogger('attachmentDownloadQueue');
@ -90,7 +90,7 @@ export async function flushAttachmentDownloadQueue(): Promise<void> {
// to display the message properly.
hasRequiredAttachmentDownloads(message.attributes)
) {
const shouldSave = await queueAttachmentDownloadsForMessage(message, {
const shouldSave = await queueAttachmentDownloads(message, {
isManualDownload: false,
});
if (shouldSave) {

View file

@ -14,6 +14,7 @@ import {
} from '../messages/helpers';
import { copyQuoteContentFromOriginal } from '../messages/copyQuote';
import { queueUpdateMessage } from './messageBatcher';
import { drop } from './drop';
const log = createLogger('doubleCheckMissingQuoteReference');
@ -85,6 +86,6 @@ export async function doubleCheckMissingQuoteReference(
referencedMessageNotFound: false,
},
});
queueUpdateMessage(message.attributes);
drop(queueUpdateMessage(message.attributes));
}
}

View file

@ -2,17 +2,15 @@
// SPDX-License-Identifier: AGPL-3.0-only
import type { ReadonlyMessageAttributesType } from '../model-types.d';
import { createBatcher } from './batcher';
import { createWaitBatcher } from './waitBatcher';
import { DataWriter } from '../sql/Client';
import { createLogger } from '../logging/log';
import { postSaveUpdates } from './cleanup';
import { MessageModel } from '../models/messages';
import { drop } from './drop';
const log = createLogger('messageBatcher');
const updateMessageBatcher = createBatcher<ReadonlyMessageAttributesType>({
const updateMessageBatcher = createWaitBatcher<ReadonlyMessageAttributesType>({
name: 'messageBatcher.updateMessageBatcher',
wait: 75,
maxSize: 50,
@ -33,13 +31,13 @@ const updateMessageBatcher = createBatcher<ReadonlyMessageAttributesType>({
let shouldBatch = true;
export function queueUpdateMessage(
export async function queueUpdateMessage(
messageAttr: ReadonlyMessageAttributesType
): void {
): Promise<void> {
if (shouldBatch) {
updateMessageBatcher.add(messageAttr);
await updateMessageBatcher.add(messageAttr);
} else {
drop(window.MessageCache.saveMessage(messageAttr));
await window.MessageCache.saveMessage(messageAttr);
}
}

View file

@ -207,7 +207,7 @@ export async function onStoryRecipientUpdate(
message.set({
sendStateByConversationId: nextSendStateByConversationId,
});
queueUpdateMessage(message.attributes);
drop(queueUpdateMessage(message.attributes));
}
return true;

View file

@ -89,29 +89,28 @@ export async function handleAttachmentDownloadsForNewMessage(
if (shouldUseAttachmentDownloadQueue()) {
addToAttachmentDownloadQueue(logId, message);
} else {
await queueAttachmentDownloadsForMessage(message, {
await queueAttachmentDownloadsAndMaybeSaveMessage(message, {
isManualDownload: false,
});
}
}
}
export async function queueAttachmentDownloadsForMessage(
export async function queueAttachmentDownloadsAndMaybeSaveMessage(
message: MessageModel,
options: {
urgency?: AttachmentDownloadUrgency;
source?: AttachmentDownloadSource;
isManualDownload: boolean;
urgency?: AttachmentDownloadUrgency;
signaturesToQueue?: Set<string>;
source?: AttachmentDownloadSource;
}
): Promise<boolean> {
): Promise<void> {
const updated = await queueAttachmentDownloads(message, options);
if (!updated) {
return false;
return;
}
queueUpdateMessage(message.attributes);
return true;
await queueUpdateMessage(message.attributes);
}
// Receive logic
@ -120,17 +119,28 @@ export async function queueAttachmentDownloadsForMessage(
export async function queueAttachmentDownloads(
message: MessageModel,
{
attachmentSignatureForImmediate,
isManualDownload,
source = AttachmentDownloadSource.STANDARD,
signaturesToQueue,
urgency = AttachmentDownloadUrgency.STANDARD,
}: {
attachmentSignatureForImmediate?: string;
isManualDownload: boolean;
signaturesToQueue?: Set<string>;
source?: AttachmentDownloadSource;
urgency?: AttachmentDownloadUrgency;
}
): Promise<boolean> {
function shouldQueueAttachmentBasedOnSignature(
attachment: AttachmentType
): boolean {
if (!signaturesToQueue) {
return true;
}
return signaturesToQueue.has(
getUndownloadedAttachmentSignature(attachment)
);
}
const autoDownloadAttachment = window.storage.get(
'auto-download-attachment',
DEFAULT_AUTO_DOWNLOAD_ATTACHMENT
@ -159,6 +169,7 @@ export async function queueAttachmentDownloads(
.map(editHistory => editHistory.bodyAttachment) ?? []),
]
.filter(isNotNil)
.filter(shouldQueueAttachmentBasedOnSignature)
.filter(attachment => !isDownloaded(attachment));
if (bodyAttachmentsToDownload.length) {
@ -185,7 +196,6 @@ export async function queueAttachmentDownloads(
const startingAttachments = message.get('attachments') || [];
const { attachments, count: attachmentsCount } = await queueNormalAttachments(
{
attachmentSignatureForImmediate,
attachments: startingAttachments,
isManualDownload,
logId,
@ -197,6 +207,7 @@ export async function queueAttachmentDownloads(
sentAt: message.get('sent_at'),
source,
urgency,
shouldQueueAttachmentBasedOnSignature,
}
);
@ -210,24 +221,25 @@ export async function queueAttachmentDownloads(
}
count += attachmentsCount;
const previewsToQueue = message.get('preview') || [];
const previews = message.get('preview') || [];
const { preview, count: previewCount } = await queuePreviews({
logId,
isManualDownload,
messageId,
previews: previewsToQueue,
previews,
otherPreviews: message.get('editHistory')?.flatMap(x => x.preview ?? []),
receivedAt: message.get('received_at'),
sentAt: message.get('sent_at'),
urgency,
source,
shouldQueueAttachmentBasedOnSignature,
});
if (previewCount > 0) {
message.set({ preview });
}
if (previewsToQueue.length > 0) {
if (previews.length > 0) {
log.info(
`${logId}: Queued ${previewCount} (of ${previewsToQueue.length}) preview attachment downloads`
`${logId}: Queued ${previewCount} (of ${previews.length}) preview attachment downloads`
);
}
count += previewCount;
@ -247,6 +259,7 @@ export async function queueAttachmentDownloads(
sentAt: message.get('sent_at'),
source,
urgency,
shouldQueueAttachmentBasedOnSignature,
});
if (thumbnailCount > 0) {
message.set({ quote });
@ -265,6 +278,11 @@ export async function queueAttachmentDownloads(
if (!item.avatar || !item.avatar.avatar) {
return item;
}
if (!shouldQueueAttachmentBasedOnSignature(item.avatar.avatar)) {
return item;
}
// We've already downloaded this!
if (item.avatar.avatar.path) {
log.info(`${logId}: Contact attachment already downloaded`);
@ -311,7 +329,6 @@ export async function queueAttachmentDownloads(
if (sticker && sticker.data && sticker.data.path) {
log.info(`${logId}: Sticker attachment already downloaded`);
} else if (sticker) {
count += 1;
const { packId, stickerId, packKey } = sticker;
const status = getStickerPackStatus(packId);
@ -320,6 +337,7 @@ export async function queueAttachmentDownloads(
try {
log.info(`${logId}: Copying sticker from installed pack`);
copiedSticker = true;
count += 1;
const data = await copyStickerToAttachments(packId, stickerId);
// Refresh sticker attachment since we had to await above
@ -342,17 +360,20 @@ export async function queueAttachmentDownloads(
if (!copiedSticker) {
if (sticker.data) {
log.info(`${logId}: Queueing sticker download`);
await AttachmentDownloadManager.addJob({
attachment: sticker.data,
attachmentType: 'sticker',
isManualDownload,
messageId,
receivedAt: message.get('received_at'),
sentAt: message.get('sent_at'),
source,
urgency,
});
if (shouldQueueAttachmentBasedOnSignature(sticker.data)) {
log.info(`${logId}: Queueing sticker download`);
count += 1;
await AttachmentDownloadManager.addJob({
attachment: sticker.data,
attachmentType: 'sticker',
isManualDownload,
messageId,
receivedAt: message.get('received_at'),
sentAt: message.get('sent_at'),
source,
urgency,
});
}
} else {
log.error(`${logId}: Sticker data was missing`);
}
@ -389,6 +410,7 @@ export async function queueAttachmentDownloads(
sentAt: message.get('sent_at'),
source,
urgency,
shouldQueueAttachmentBasedOnSignature,
});
count += editAttachmentsCount;
allEditsAttachmentCount += editAttachmentsCount;
@ -410,6 +432,7 @@ export async function queueAttachmentDownloads(
sentAt: message.get('sent_at'),
urgency,
source,
shouldQueueAttachmentBasedOnSignature,
});
count += editPreviewCount;
allEditsAttachmentCount += editPreviewCount;
@ -442,7 +465,6 @@ export async function queueAttachmentDownloads(
}
export async function queueNormalAttachments({
attachmentSignatureForImmediate,
attachments = [],
isManualDownload,
logId,
@ -452,8 +474,8 @@ export async function queueNormalAttachments({
sentAt,
source,
urgency,
shouldQueueAttachmentBasedOnSignature,
}: {
attachmentSignatureForImmediate?: string;
attachments: MessageAttributesType['attachments'];
isManualDownload: boolean;
logId: string;
@ -463,6 +485,9 @@ export async function queueNormalAttachments({
sentAt: number;
source: AttachmentDownloadSource;
urgency: AttachmentDownloadUrgency;
shouldQueueAttachmentBasedOnSignature: (
attachment: AttachmentType
) => boolean;
}): Promise<{
attachments: Array<AttachmentType>;
count: number;
@ -486,6 +511,10 @@ export async function queueNormalAttachments({
return attachment;
}
if (!shouldQueueAttachmentBasedOnSignature(attachment)) {
return attachment;
}
if (isLongMessage(attachment.contentType)) {
throw new Error(
`${logId}: queueNormalAttachments passed long-message attachment`
@ -545,12 +574,6 @@ export async function queueNormalAttachments({
count += 1;
const urgencyForAttachment =
attachmentSignatureForImmediate &&
attachmentSignatureForImmediate ===
getUndownloadedAttachmentSignature(attachment)
? AttachmentDownloadUrgency.IMMEDIATE
: urgency;
return AttachmentDownloadManager.addJob({
attachment,
attachmentType: 'attachment',
@ -559,7 +582,7 @@ export async function queueNormalAttachments({
receivedAt,
sentAt,
source,
urgency: urgencyForAttachment,
urgency,
});
})
);
@ -580,6 +603,7 @@ async function queuePreviews({
sentAt,
source,
urgency,
shouldQueueAttachmentBasedOnSignature,
}: {
isManualDownload: boolean;
logId: string;
@ -590,6 +614,9 @@ async function queuePreviews({
sentAt: number;
source: AttachmentDownloadSource;
urgency: AttachmentDownloadUrgency;
shouldQueueAttachmentBasedOnSignature: (
attachment: AttachmentType
) => boolean;
}): Promise<{ preview: Array<LinkPreviewType>; count: number }> {
const log = getLogger(source);
const previewSignatures: Map<string, AttachmentType> = new Map();
@ -606,6 +633,11 @@ async function queuePreviews({
if (!item.image) {
return item;
}
if (!shouldQueueAttachmentBasedOnSignature(item.image)) {
return item;
}
// We've already downloaded this!
if (isDownloaded(item.image)) {
log.info(`${logId}: Preview attachment already downloaded`);
@ -673,6 +705,7 @@ async function queueQuoteAttachments({
sentAt,
source,
urgency,
shouldQueueAttachmentBasedOnSignature,
}: {
logId: string;
isManualDownload: boolean;
@ -683,6 +716,9 @@ async function queueQuoteAttachments({
sentAt: number;
source: AttachmentDownloadSource;
urgency: AttachmentDownloadUrgency;
shouldQueueAttachmentBasedOnSignature: (
attachment: AttachmentType
) => boolean;
}): Promise<{ quote?: QuotedMessageType; count: number }> {
const log = getLogger(source);
let count = 0;
@ -715,7 +751,11 @@ async function queueQuoteAttachments({
if (!item.thumbnail) {
return item;
}
// We've already downloaded this!
if (!shouldQueueAttachmentBasedOnSignature(item.thumbnail)) {
return item;
}
if (isDownloaded(item.thumbnail)) {
log.info(`${logId}: Quote attachment already downloaded`);
return item;