Move to MessageCache.saveMessage, queue downloads piecemeal

This commit is contained in:
Scott Nonnenberg 2025-01-24 06:37:18 -10:00 committed by GitHub
parent 197660a966
commit ed30059bd5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
32 changed files with 138 additions and 298 deletions

View file

@ -3437,10 +3437,7 @@ async function appendChangeMessages(
strictAssert(first !== undefined, 'First message must be there');
log.info(`appendChangeMessages/${logId}: updating ${first.id}`);
await DataWriter.saveMessage(first, {
ourAci,
postSaveUpdates,
await window.MessageCache.saveMessage(first, {
// We don't use forceSave here because this is an update of existing
// message.
});

View file

@ -53,7 +53,6 @@ import {
import { safeParsePartial } from '../util/schemas';
import { deleteDownloadsJobQueue } from './deleteDownloadsJobQueue';
import { createBatcher } from '../util/batcher';
import { postSaveUpdates } from '../util/cleanup';
export enum AttachmentDownloadUrgency {
IMMEDIATE = 'immediate',
@ -431,10 +430,7 @@ async function runDownloadAttachmentJob({
} finally {
// This will fail if the message has been deleted before the download finished, which
// is good
await DataWriter.saveMessage(message.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
await window.MessageCache.saveMessage(message.attributes);
}
}

View file

@ -17,7 +17,6 @@ import {
} from './handleMultipleSendErrors';
import { ourProfileKeyService } from '../../services/ourProfileKey';
import { wrapWithSyncMessageSend } from '../../util/wrapWithSyncMessageSend';
import { DataWriter } from '../../sql/Client';
import type { ConversationModel } from '../../models/conversations';
import type {
@ -38,7 +37,6 @@ import type { LoggerType } from '../../types/Logging';
import type { ServiceIdString } from '../../types/ServiceId';
import { isStory } from '../../messages/helpers';
import { sendToGroup } from '../../util/sendToGroup';
import { postSaveUpdates } from '../../util/cleanup';
export async function sendDeleteForEveryone(
conversation: ConversationModel,
@ -306,10 +304,7 @@ async function updateMessageWithSuccessfulSends(
deletedForEveryoneSendStatus: {},
deletedForEveryoneFailed: undefined,
});
await DataWriter.saveMessage(message.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
await window.MessageCache.saveMessage(message.attributes);
return;
}
@ -330,10 +325,7 @@ async function updateMessageWithSuccessfulSends(
deletedForEveryoneSendStatus,
deletedForEveryoneFailed: undefined,
});
await DataWriter.saveMessage(message.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
await window.MessageCache.saveMessage(message.attributes);
}
async function updateMessageWithFailure(
@ -347,8 +339,5 @@ async function updateMessageWithFailure(
);
message.set({ deletedForEveryoneFailed: true });
await DataWriter.saveMessage(message.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
await window.MessageCache.saveMessage(message.attributes);
}

View file

@ -10,7 +10,6 @@ import {
maybeExpandErrors,
} from './handleMultipleSendErrors';
import { ourProfileKeyService } from '../../services/ourProfileKey';
import { DataWriter } from '../../sql/Client';
import type { ConversationModel } from '../../models/conversations';
import type {
@ -29,7 +28,6 @@ import { SendMessageProtoError } from '../../textsecure/Errors';
import { strictAssert } from '../../util/assert';
import type { LoggerType } from '../../types/Logging';
import { isStory } from '../../messages/helpers';
import { postSaveUpdates } from '../../util/cleanup';
export async function sendDeleteStoryForEveryone(
ourConversation: ConversationModel,
@ -280,10 +278,7 @@ async function updateMessageWithSuccessfulSends(
deletedForEveryoneSendStatus: {},
deletedForEveryoneFailed: undefined,
});
await DataWriter.saveMessage(message.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
await window.MessageCache.saveMessage(message.attributes);
return;
}
@ -304,10 +299,7 @@ async function updateMessageWithSuccessfulSends(
deletedForEveryoneSendStatus,
deletedForEveryoneFailed: undefined,
});
await DataWriter.saveMessage(message.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
await window.MessageCache.saveMessage(message.attributes);
}
async function updateMessageWithFailure(
@ -321,8 +313,5 @@ async function updateMessageWithFailure(
);
message.set({ deletedForEveryoneFailed: true });
await DataWriter.saveMessage(message.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
await window.MessageCache.saveMessage(message.attributes);
}

View file

@ -4,7 +4,6 @@
import { isNumber } from 'lodash';
import PQueue from 'p-queue';
import { DataWriter } from '../../sql/Client';
import * as Errors from '../../types/errors';
import { strictAssert } from '../../util/assert';
import type { MessageModel } from '../../models/messages';
@ -61,7 +60,6 @@ import {
saveErrorsOnMessage,
} from '../../test-node/util/messageFailures';
import { getMessageIdForLogging } from '../../util/idForLogging';
import { postSaveUpdates } from '../../util/cleanup';
import { send, sendSyncMessageOnly } from '../../messages/send';
const MAX_CONCURRENT_ATTACHMENT_UPLOADS = 5;
@ -667,10 +665,7 @@ async function getMessageSendData({
]);
// Save message after uploading attachments
await DataWriter.saveMessage(message.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
await window.MessageCache.saveMessage(message.attributes);
const storyReaction = message.get('storyReaction');
const storySourceServiceId = storyMessage?.get('sourceServiceId');

View file

@ -10,7 +10,6 @@ import type { CallbackResultType } from '../../textsecure/Types.d';
import { MessageModel } from '../../models/messages';
import type { MessageReactionType } from '../../model-types.d';
import type { ConversationModel } from '../../models/conversations';
import { DataWriter } from '../../sql/Client';
import * as reactionUtil from '../../reactions/util';
import { isSent, SendStatus } from '../../messages/MessageSendState';
@ -42,7 +41,6 @@ import { isConversationUnregistered } from '../../util/isConversationUnregistere
import type { LoggerType } from '../../types/Logging';
import { sendToGroup } from '../../util/sendToGroup';
import { hydrateStoryContext } from '../../util/hydrateStoryContext';
import { postSaveUpdates } from '../../util/cleanup';
import { send, sendSyncMessageOnly } from '../../messages/send';
export async function sendReaction(
@ -90,10 +88,7 @@ export async function sendReaction(
if (!canReact(message.attributes, ourConversationId, findAndFormatContact)) {
log.info(`could not react to ${messageId}. Removing this pending reaction`);
markReactionFailed(message, pendingReaction);
await DataWriter.saveMessage(message.attributes, {
ourAci,
postSaveUpdates,
});
await window.MessageCache.saveMessage(message.attributes);
return;
}
@ -102,10 +97,7 @@ export async function sendReaction(
`reacting to message ${messageId} ran out of time. Giving up on sending it`
);
markReactionFailed(message, pendingReaction);
await DataWriter.saveMessage(message.attributes, {
ourAci,
postSaveUpdates,
});
await window.MessageCache.saveMessage(message.attributes);
return;
}
@ -347,10 +339,8 @@ export async function sendReaction(
await hydrateStoryContext(reactionMessage.id, message.attributes, {
shouldSave: false,
});
await DataWriter.saveMessage(reactionMessage.attributes, {
ourAci,
await window.MessageCache.saveMessage(reactionMessage.attributes, {
forceSave: true,
postSaveUpdates,
});
window.MessageCache.register(reactionMessage);
@ -382,10 +372,7 @@ export async function sendReaction(
toThrow: originalError || thrownError,
});
} finally {
await DataWriter.saveMessage(message.attributes, {
ourAci,
postSaveUpdates,
});
await window.MessageCache.saveMessage(message.attributes);
}
}

View file

@ -23,7 +23,7 @@ import type { ServiceIdString } from '../../types/ServiceId';
import type { StoryDistributionIdString } from '../../types/StoryDistributionId';
import * as Errors from '../../types/errors';
import type { StoryMessageRecipientsType } from '../../types/Stories';
import { DataReader, DataWriter } from '../../sql/Client';
import { DataReader } from '../../sql/Client';
import { SignalService as Proto } from '../../protobuf';
import { getMessagesById } from '../../messages/getMessagesById';
import {
@ -44,7 +44,6 @@ import {
notifyStorySendFailed,
saveErrorsOnMessage,
} from '../../test-node/util/messageFailures';
import { postSaveUpdates } from '../../util/cleanup';
import { send } from '../../messages/send';
export async function sendStory(
@ -550,10 +549,7 @@ export async function sendStory(
}
message.set({ sendStateByConversationId: newSendStateByConversationId });
return DataWriter.saveMessage(message.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
return window.MessageCache.saveMessage(message.attributes);
})
);

View file

@ -33,7 +33,6 @@ import {
} from '../types/Receipt';
import { drop } from '../util/drop';
import { getMessageById } from '../messages/getMessageById';
import { postSaveUpdates } from '../util/cleanup';
import { MessageModel } from '../models/messages';
const { deleteSentProtoRecipient, removeSyncTaskById } = DataWriter;
@ -200,8 +199,7 @@ async function processReceiptsForMessage(
const { validReceipts } = await updateMessageWithReceipts(message, receipts);
const ourAci = window.textsecure.storage.user.getCheckedAci();
await DataWriter.saveMessage(message.attributes, { ourAci, postSaveUpdates });
await window.MessageCache.saveMessage(message.attributes);
// Confirm/remove receipts, and delete sent protos
for (const receipt of validReceipts) {

View file

@ -39,7 +39,6 @@ import {
conversationJobQueue,
conversationQueueJobEnum,
} from '../jobs/conversationJobQueue';
import { postSaveUpdates } from '../util/cleanup';
export type ReactionAttributesType = {
emoji: string;
@ -389,10 +388,8 @@ export async function handleReaction(
shouldSave: false,
});
// Note: generatedMessage comes with an id, so we have to force this save
await DataWriter.saveMessage(generatedMessage.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
await window.MessageCache.saveMessage(generatedMessage.attributes, {
forceSave: true,
postSaveUpdates,
});
log.info('Reactions.onReaction adding reaction to story', {
@ -555,10 +552,8 @@ export async function handleReaction(
await hydrateStoryContext(generatedMessage.id, message.attributes, {
shouldSave: false,
});
await DataWriter.saveMessage(generatedMessage.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
await window.MessageCache.saveMessage(generatedMessage.attributes, {
forceSave: true,
postSaveUpdates,
});
window.MessageCache.register(generatedMessage);
@ -586,20 +581,15 @@ export async function handleReaction(
jobToInsert.id
}`
);
await DataWriter.saveMessage(message.attributes, {
await window.MessageCache.saveMessage(message.attributes, {
jobToInsert,
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
});
} else {
await conversationJobQueue.add(jobData);
}
} else if (shouldPersist && !isStory(message.attributes)) {
await DataWriter.saveMessage(message.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
await window.MessageCache.saveMessage(message.attributes);
window.reduxActions.conversations.markOpenConversationRead(conversation.id);
}
}

View file

@ -124,12 +124,11 @@ export async function onSync(sync: ViewSyncAttributesType): Promise<void> {
const attachments = message.get('attachments');
if (!attachments?.every(isDownloaded)) {
const updatedFields = await queueAttachmentDownloads(
message.attributes,
{ urgency: AttachmentDownloadUrgency.STANDARD }
);
if (updatedFields) {
message.set(updatedFields);
const didQueueDownload = await queueAttachmentDownloads(message, {
urgency: AttachmentDownloadUrgency.STANDARD,
});
if (didQueueDownload) {
didChangeMessage = true;
}
}
}

View file

@ -19,7 +19,7 @@ import {
SendStatus,
} from './MessageSendState';
import { DataReader, DataWriter } from '../sql/Client';
import { eraseMessageContents, postSaveUpdates } from '../util/cleanup';
import { eraseMessageContents } from '../util/cleanup';
import {
isDirectConversation,
isGroup,
@ -200,10 +200,7 @@ export async function handleDataMessage(
sendStateByConversationId,
unidentifiedDeliveries: [...unidentifiedDeliveriesSet],
});
await DataWriter.saveMessage(toUpdate.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
await window.MessageCache.saveMessage(toUpdate.attributes);
confirm();
return;

View file

@ -11,7 +11,6 @@ import { isGroup } from '../util/whatTypeOfConversation';
import { handleMessageSend } from '../util/handleMessageSend';
import { getSendOptions } from '../util/getSendOptions';
import * as log from '../logging/log';
import { DataWriter } from '../sql/Client';
import {
getPropForTimestamp,
getChangesForPropAtTimestamp,
@ -21,7 +20,6 @@ import {
notifyStorySendFailed,
saveErrorsOnMessage,
} from '../test-node/util/messageFailures';
import { postSaveUpdates } from '../util/cleanup';
import { isCustomError } from './helpers';
import { SendActionType, isSent, sendStateReducer } from './MessageSendState';
@ -77,10 +75,7 @@ export async function send(
}
if (!message.doNotSave) {
await DataWriter.saveMessage(message.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
await window.MessageCache.saveMessage(message.attributes);
}
const sendStateByConversationId = {
@ -317,10 +312,7 @@ export async function sendSyncMessageOnly(
}
throw error;
} finally {
await DataWriter.saveMessage(message.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
await window.MessageCache.saveMessage(message.attributes);
if (updateLeftPane) {
updateLeftPane();
@ -476,10 +468,7 @@ export async function sendSyncMessage(
return result;
}
await DataWriter.saveMessage(message.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
await window.MessageCache.saveMessage(message.attributes);
return result;
});
};

View file

@ -190,7 +190,7 @@ import { migrateLegacyReadStatus } from '../messages/migrateLegacyReadStatus';
import { migrateLegacySendAttributes } from '../messages/migrateLegacySendAttributes';
import { getIsInitialSync } from '../services/contactSync';
import { queueAttachmentDownloadsForMessage } from '../util/queueAttachmentDownloads';
import { cleanupMessages, postSaveUpdates } from '../util/cleanup';
import { cleanupMessages } from '../util/cleanup';
import { MessageModel } from './messages';
/* eslint-disable more/no-then */
@ -2015,11 +2015,7 @@ export class ConversationModel extends window.Backbone
if (updated) {
upgraded += 1;
const ourAci = window.textsecure.storage.user.getCheckedAci();
await DataWriter.saveMessage(model.attributes, {
ourAci,
postSaveUpdates,
});
await window.MessageCache.saveMessage(model.attributes);
}
return model.attributes;
@ -2270,7 +2266,6 @@ export class ConversationModel extends window.Backbone
options: { isLocalAction?: boolean } = {}
): Promise<void> {
const { isLocalAction } = options;
const ourAci = window.textsecure.storage.user.getCheckedAci();
let messages: Array<MessageAttributesType> | undefined;
do {
@ -2324,10 +2319,7 @@ export class ConversationModel extends window.Backbone
const shouldSave =
await queueAttachmentDownloadsForMessage(registered);
if (shouldSave) {
await DataWriter.saveMessage(registered.attributes, {
ourAci,
postSaveUpdates,
});
await window.MessageCache.saveMessage(registered.attributes);
}
})
);

View file

@ -131,6 +131,7 @@ import { isConversationAccepted } from '../../util/isConversationAccepted';
import { saveBackupsSubscriberData } from '../../util/backupSubscriptionData';
import { postSaveUpdates } from '../../util/cleanup';
import type { LinkPreviewType } from '../../types/message/LinkPreviews';
import { MessageModel } from '../../models/messages';
const MAX_CONCURRENCY = 10;
@ -641,8 +642,9 @@ export class BackupImportStream extends Writable {
if (hasAttachmentDownloads(attributes)) {
const conversation = this.#conversations.get(attributes.conversationId);
if (conversation && isConversationAccepted(conversation)) {
const model = new MessageModel(attributes);
attachmentDownloadJobPromises.push(
queueAttachmentDownloads(attributes, {
queueAttachmentDownloads(model, {
source: AttachmentDownloadSource.BACKUP_IMPORT,
})
);

View file

@ -6,7 +6,7 @@ import type { ReadonlyMessageAttributesType } from '../model-types.d';
import type { StoryDataType } from '../state/ducks/stories';
import * as durations from '../util/durations';
import * as log from '../logging/log';
import { DataReader, DataWriter } from '../sql/Client';
import { DataReader } from '../sql/Client';
import type { GetAllStoriesResultType } from '../sql/Interface';
import {
getAttachmentsForMessage,
@ -18,7 +18,6 @@ import { strictAssert } from '../util/assert';
import { dropNull } from '../util/dropNull';
import { DurationInSeconds } from '../util/durations';
import { SIGNAL_ACI } from '../types/SignalConversation';
import { postSaveUpdates } from '../util/cleanup';
let storyData: GetAllStoriesResultType | undefined;
@ -173,10 +172,7 @@ async function repairUnexpiredStories(): Promise<void> {
await Promise.all(
storiesWithExpiry.map(messageAttributes => {
return DataWriter.saveMessage(messageAttributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
return window.MessageCache.saveMessage(messageAttributes);
})
);
}

View file

@ -218,7 +218,7 @@ import {
sendStateReducer,
} from '../../messages/MessageSendState';
import { markFailed } from '../../test-node/util/messageFailures';
import { cleanupMessages, postSaveUpdates } from '../../util/cleanup';
import { cleanupMessages } from '../../util/cleanup';
import { MessageModel } from '../../models/messages';
// State
@ -2312,12 +2312,7 @@ function kickOffAttachmentDownload(
);
if (didUpdateValues) {
drop(
DataWriter.saveMessage(message.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
})
);
drop(window.MessageCache.saveMessage(message.attributes));
}
dispatch({
@ -2347,11 +2342,7 @@ function cancelAttachmentDownload({
})),
});
const ourAci = window.textsecure.storage.user.getCheckedAci();
await DataWriter.saveMessage(message.attributes, {
ourAci,
postSaveUpdates,
});
await window.MessageCache.saveMessage(message.attributes);
}
// A click kicks off downloads for every attachment in a message, so cancel does too
@ -2486,10 +2477,8 @@ function retryMessageSend(
timestamp: message.attributes.timestamp,
},
async jobToInsert => {
await DataWriter.saveMessage(message.attributes, {
await window.MessageCache.saveMessage(message.attributes, {
jobToInsert,
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
}
);
@ -2502,10 +2491,8 @@ function retryMessageSend(
revision: conversation.get('revision'),
},
async jobToInsert => {
await DataWriter.saveMessage(message.attributes, {
await window.MessageCache.saveMessage(message.attributes, {
jobToInsert,
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
}
);

View file

@ -284,13 +284,12 @@ function showLightbox(opts: {
if (isIncremental(attachment)) {
// Queue all attachments, but this target attachment should be IMMEDIATE
const updatedFields = await queueAttachmentDownloads(message.attributes, {
const wasUpdated = await queueAttachmentDownloads(message, {
urgency: AttachmentDownloadUrgency.STANDARD,
attachmentDigestForImmediate: attachment.digest,
});
if (updatedFields) {
message.set(updatedFields);
await window.MessageCache.saveMessage(message.attributes);
if (wasUpdated) {
await window.MessageCache.saveMessage(message);
}
}

View file

@ -69,7 +69,7 @@ import {
conversationQueueJobEnum,
} from '../../jobs/conversationJobQueue';
import { ReceiptType } from '../../types/Receipt';
import { cleanupMessages, postSaveUpdates } from '../../util/cleanup';
import { cleanupMessages } from '../../util/cleanup';
export type StoryDataType = ReadonlyDeep<
{
@ -421,12 +421,7 @@ function markStoryRead(
const storyReadDate = Date.now();
message.set(markViewed(message.attributes, storyReadDate));
drop(
DataWriter.saveMessage(message.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
})
);
drop(window.MessageCache.saveMessage(message.attributes));
const conversationId = message.get('conversationId');
@ -534,10 +529,11 @@ function queueStoryDownload(
payload: storyId,
});
const updatedFields = await queueAttachmentDownloads(message.attributes);
if (updatedFields) {
message.set(updatedFields);
const wasUpdated = await queueAttachmentDownloads(message);
if (wasUpdated) {
await window.MessageCache.saveMessage(message);
}
return;
}

View file

@ -17,7 +17,6 @@ import {
messageReceiptTypeSchema,
} from '../messageModifiers/MessageReceipts';
import { ReadStatus } from '../messages/MessageReadStatus';
import { postSaveUpdates } from '../util/cleanup';
describe('MessageReceipts', () => {
let ourAci: AciString;
@ -79,10 +78,8 @@ describe('MessageReceipts', () => {
},
};
await DataWriter.saveMessage(messageAttributes, {
await window.MessageCache.saveMessage(messageAttributes, {
forceSave: true,
ourAci,
postSaveUpdates,
});
await Promise.all([
@ -157,10 +154,8 @@ describe('MessageReceipts', () => {
],
};
await DataWriter.saveMessage(messageAttributes, {
await window.MessageCache.saveMessage(messageAttributes, {
forceSave: true,
ourAci,
postSaveUpdates,
});
await DataWriter.saveEditedMessage(messageAttributes, ourAci, {
conversationId: messageAttributes.conversationId,

View file

@ -8,7 +8,6 @@ import { DataWriter } from '../../sql/Client';
import { SendStatus } from '../../messages/MessageSendState';
import { IMAGE_PNG } from '../../types/MIME';
import { generateAci, generatePni } from '../../types/ServiceId';
import { postSaveUpdates } from '../../util/cleanup';
import { MessageModel } from '../../models/messages';
describe('Conversations', () => {
@ -84,10 +83,8 @@ describe('Conversations', () => {
});
// Saving to db and updating the convo's last message
await DataWriter.saveMessage(message.attributes, {
await window.MessageCache.saveMessage(message.attributes, {
forceSave: true,
ourAci,
postSaveUpdates,
});
message = window.MessageCache.register(message);
await DataWriter.updateConversation(conversation.attributes);

View file

@ -17,12 +17,10 @@ import {
import type { AttachmentDownloadJobType } from '../../types/AttachmentDownload';
import { DataReader, DataWriter } from '../../sql/Client';
import { MINUTE } from '../../util/durations';
import { type AciString } from '../../types/ServiceId';
import { type AttachmentType, AttachmentVariant } from '../../types/Attachment';
import { strictAssert } from '../../util/assert';
import { AttachmentDownloadSource } from '../../sql/Interface';
import { getAttachmentCiphertextLength } from '../../AttachmentCrypto';
import { postSaveUpdates } from '../../util/cleanup';
function composeJob({
messageId,
@ -108,7 +106,7 @@ describe('AttachmentDownloadManager/JobManager', () => {
urgency: AttachmentDownloadUrgency
) {
// Save message first to satisfy foreign key constraint
await DataWriter.saveMessage(
await window.MessageCache.saveMessage(
{
id: job.messageId,
type: 'incoming',
@ -118,9 +116,7 @@ describe('AttachmentDownloadManager/JobManager', () => {
conversationId: 'convoId',
},
{
ourAci: 'ourAci' as AciString,
forceSave: true,
postSaveUpdates,
}
);
await downloadManager?.addJob({

View file

@ -5,7 +5,6 @@ import * as sinon from 'sinon';
import casual from 'casual';
import { v4 as generateUuid } from 'uuid';
import { DataWriter } from '../../../sql/Client';
import type {
DispatchableViewStoryType,
StoryDataType,
@ -27,7 +26,6 @@ import { actions, getEmptyState } from '../../../state/ducks/stories';
import { noopAction } from '../../../state/ducks/noop';
import { reducer as rootReducer } from '../../../state/reducer';
import { dropNull } from '../../../util/dropNull';
import { postSaveUpdates } from '../../../util/cleanup';
import { MessageModel } from '../../../models/messages';
describe('both/state/ducks/stories', () => {
@ -928,10 +926,8 @@ describe('both/state/ducks/stories', () => {
},
],
};
await DataWriter.saveMessage(messageAttributes, {
await window.MessageCache.saveMessage(messageAttributes, {
forceSave: true,
ourAci: generateAci(),
postSaveUpdates,
});
const rootState = getEmptyRootState();
@ -991,10 +987,8 @@ describe('both/state/ducks/stories', () => {
preview: [preview],
};
await DataWriter.saveMessage(messageAttributes, {
await window.MessageCache.saveMessage(messageAttributes, {
forceSave: true,
ourAci: generateAci(),
postSaveUpdates,
});
const rootState = getEmptyRootState();

View file

@ -71,10 +71,7 @@ export async function eraseMessageContents(
)?.debouncedUpdateLastMessage();
if (shouldPersist) {
await DataWriter.saveMessage(message.attributes, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
await window.MessageCache.saveMessage(message.attributes);
}
await DataWriter.deleteSentProtoByMessageId(message.id);

View file

@ -10,7 +10,6 @@ import type { StoryMessageRecipientsType } from '../types/Stories';
import type { StoryDistributionIdString } from '../types/StoryDistributionId';
import type { ServiceIdString } from '../types/ServiceId';
import * as log from '../logging/log';
import { DataWriter } from '../sql/Client';
import { DAY } from './durations';
import { StoryRecipientUpdateEvent } from '../textsecure/messageReceiverEvents';
import {
@ -24,7 +23,6 @@ import { getMessageById } from '../messages/getMessageById';
import { strictAssert } from './assert';
import { repeat, zipObject } from './iterables';
import { isOlderThan } from './timestamp';
import { postSaveUpdates } from './cleanup';
export async function deleteStoryForEveryone(
stories: ReadonlyArray<StoryDataType>,
@ -192,10 +190,8 @@ export async function deleteStoryForEveryone(
await conversationJobQueue.add(jobData, async jobToInsert => {
log.info(`${logId}: Deleting message with job ${jobToInsert.id}`);
await DataWriter.saveMessage(message.attributes, {
await window.MessageCache.saveMessage(message.attributes, {
jobToInsert,
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
});
} catch (error) {

View file

@ -289,32 +289,27 @@ export async function handleEditMessage(
});
// Queue up any downloads in case they're different, update the fields if so.
const updatedFields = await queueAttachmentDownloads(
mainMessageModel.attributes
);
const wasUpdated = await queueAttachmentDownloads(mainMessageModel);
// If we've scheduled a bodyAttachment download, we need that edit to know about it
if (updatedFields?.bodyAttachment) {
const existing =
updatedFields.editHistory || mainMessageModel.get('editHistory') || [];
if (wasUpdated && mainMessageModel.get('bodyAttachment')) {
const existing = mainMessageModel.get('editHistory') || [];
updatedFields.editHistory = existing.map(item => {
if (item.timestamp !== editedMessage.timestamp) {
return item;
}
mainMessageModel.set({
editHistory: existing.map(item => {
if (item.timestamp !== editedMessage.timestamp) {
return item;
}
return {
...item,
attachments: updatedFields.attachments,
bodyAttachment: updatedFields.bodyAttachment,
};
return {
...item,
attachments: mainMessageModel.get('attachments'),
bodyAttachment: mainMessageModel.get('bodyAttachment'),
};
}),
});
}
if (updatedFields) {
mainMessageModel.set(updatedFields);
}
const conversation = window.ConversationController.get(
editAttributes.conversationId
);

View file

@ -14,8 +14,6 @@ import { isOlderThan } from './timestamp';
import { DAY } from './durations';
import { getMessageById } from '../messages/getMessageById';
import { MessageModel } from '../models/messages';
import { DataWriter } from '../sql/Client';
import { postSaveUpdates } from './cleanup';
export async function hydrateStoryContext(
messageId: string,
@ -82,11 +80,7 @@ export async function hydrateStoryContext(
};
message.set(newMessageAttributes);
if (shouldSave) {
const ourAci = window.textsecure.storage.user.getCheckedAci();
await DataWriter.saveMessage(message.attributes, {
ourAci,
postSaveUpdates,
});
await window.MessageCache.saveMessage(message.attributes);
}
return newMessageAttributes;
@ -109,11 +103,7 @@ export async function hydrateStoryContext(
};
message.set(newMessageAttributes);
if (shouldSave) {
const ourAci = window.textsecure.storage.user.getCheckedAci();
await DataWriter.saveMessage(message.attributes, {
ourAci,
postSaveUpdates,
});
await window.MessageCache.saveMessage(message.attributes);
}
return newMessageAttributes;

View file

@ -8,6 +8,7 @@ import { DataWriter } from '../sql/Client';
import * as log from '../logging/log';
import { postSaveUpdates } from './cleanup';
import { MessageModel } from '../models/messages';
import { drop } from './drop';
const updateMessageBatcher = createBatcher<ReadonlyMessageAttributesType>({
name: 'messageBatcher.updateMessageBatcher',
@ -36,10 +37,7 @@ export function queueUpdateMessage(
if (shouldBatch) {
updateMessageBatcher.add(messageAttr);
} else {
void DataWriter.saveMessage(messageAttr, {
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
drop(window.MessageCache.saveMessage(messageAttr));
}
}

View file

@ -9,7 +9,6 @@ import type { SendStateByConversationId } from '../messages/MessageSendState';
import * as Edits from '../messageModifiers/Edits';
import * as log from '../logging/log';
import { DataWriter } from '../sql/Client';
import * as Deletes from '../messageModifiers/Deletes';
import * as DeletesForMe from '../messageModifiers/DeletesForMe';
import * as MessageReceipts from '../messageModifiers/MessageReceipts';
@ -38,7 +37,6 @@ import {
import { getMessageIdForLogging } from './idForLogging';
import { markViewOnceMessageViewed } from '../services/MessageUpdater';
import { handleReaction } from '../messageModifiers/Reactions';
import { postSaveUpdates } from './cleanup';
export enum ModifyTargetMessageResult {
Modified = 'Modified',
@ -326,10 +324,7 @@ export async function modifyTargetMessage(
// We save here before handling any edits because handleEditMessage does its own saves
if (changed && !isFirstRun) {
log.info(`${logId}: Changes in second run; saving.`);
await DataWriter.saveMessage(message.attributes, {
ourAci,
postSaveUpdates,
});
await window.MessageCache.saveMessage(message.attributes);
}
// We want to make sure the message is saved first before applying any edits

View file

@ -90,14 +90,13 @@ export async function queueAttachmentDownloadsForMessage(
message: MessageModel,
urgency?: AttachmentDownloadUrgency
): Promise<boolean> {
const updates = await queueAttachmentDownloads(message.attributes, {
const updated = await queueAttachmentDownloads(message, {
urgency,
});
if (!updates) {
if (!updated) {
return false;
}
message.set(updates);
queueUpdateMessage(message.attributes);
return true;
@ -107,7 +106,7 @@ export async function queueAttachmentDownloadsForMessage(
// NOTE: If you're changing any logic in this function that deals with the
// count then you'll also have to modify ./hasAttachmentsDownloads
export async function queueAttachmentDownloads(
message: MessageAttributesType,
message: MessageModel,
{
urgency = AttachmentDownloadUrgency.STANDARD,
source = AttachmentDownloadSource.STANDARD,
@ -117,13 +116,12 @@ export async function queueAttachmentDownloads(
source?: AttachmentDownloadSource;
attachmentDigestForImmediate?: string;
} = {}
): Promise<MessageAttachmentsDownloadedType | undefined> {
const attachmentsToQueue = message.attachments || [];
): Promise<boolean> {
const attachmentsToQueue = message.get('attachments') || [];
const messageId = message.id;
const idForLogging = getMessageIdForLogging(message);
const idForLogging = getMessageIdForLogging(message.attributes);
let count = 0;
let bodyAttachment;
const idLog = `queueAttachmentDownloads(${idForLogging}})`;
const log = getLogger(source);
@ -138,16 +136,13 @@ export async function queueAttachmentDownloads(
}
if (longMessageAttachments.length > 0) {
[bodyAttachment] = longMessageAttachments;
}
if (!bodyAttachment && message.bodyAttachment) {
bodyAttachment = message.bodyAttachment;
message.set({ bodyAttachment: longMessageAttachments[0] });
}
const bodyAttachmentsToDownload = [
bodyAttachment,
...(message.editHistory
message.get('bodyAttachment'),
...(message
.get('editHistory')
?.slice(1) // first entry is the same as the root level message!
.map(editHistory => editHistory.bodyAttachment) ?? []),
]
@ -164,8 +159,8 @@ export async function queueAttachmentDownloads(
attachment,
messageId,
attachmentType: 'long-message',
receivedAt: message.received_at,
sentAt: message.sent_at,
receivedAt: message.get('received_at'),
sentAt: message.get('sent_at'),
urgency,
source,
})
@ -184,17 +179,22 @@ export async function queueAttachmentDownloads(
idLog,
messageId,
attachments: normalAttachments,
otherAttachments: message.editHistory?.flatMap(x => x.attachments ?? []),
receivedAt: message.received_at,
sentAt: message.sent_at,
otherAttachments: message
.get('editHistory')
?.flatMap(x => x.attachments ?? []),
receivedAt: message.get('received_at'),
sentAt: message.get('sent_at'),
urgency,
source,
attachmentDigestForImmediate,
}
);
if (attachmentsCount > 0) {
message.set({ attachments });
}
count += attachmentsCount;
const previewsToQueue = message.preview || [];
const previewsToQueue = message.get('preview') || [];
if (previewsToQueue.length > 0) {
log.info(
`${idLog}: Queueing ${previewsToQueue.length} preview attachment downloads`
@ -204,15 +204,18 @@ export async function queueAttachmentDownloads(
idLog,
messageId,
previews: previewsToQueue,
otherPreviews: message.editHistory?.flatMap(x => x.preview ?? []),
receivedAt: message.received_at,
sentAt: message.sent_at,
otherPreviews: message.get('editHistory')?.flatMap(x => x.preview ?? []),
receivedAt: message.get('received_at'),
sentAt: message.get('sent_at'),
urgency,
source,
});
if (previewCount > 0) {
message.set({ preview });
}
count += previewCount;
const numQuoteAttachments = message.quote?.attachments?.length ?? 0;
const numQuoteAttachments = message.get('quote')?.attachments?.length ?? 0;
if (numQuoteAttachments > 0) {
log.info(
`${idLog}: Queueing ${numQuoteAttachments} ` +
@ -222,16 +225,23 @@ export async function queueAttachmentDownloads(
const { quote, count: thumbnailCount } = await queueQuoteAttachments({
idLog,
messageId,
quote: message.quote,
otherQuotes: message.editHistory?.map(x => x.quote).filter(isNotNil) ?? [],
receivedAt: message.received_at,
sentAt: message.sent_at,
quote: message.get('quote'),
otherQuotes:
message
.get('editHistory')
?.map(x => x.quote)
.filter(isNotNil) ?? [],
receivedAt: message.get('received_at'),
sentAt: message.get('sent_at'),
urgency,
source,
});
if (thumbnailCount > 0) {
message.set({ quote });
}
count += thumbnailCount;
const contactsToQueue = message.contact || [];
const contactsToQueue = message.get('contact') || [];
if (contactsToQueue.length > 0) {
log.info(
`${idLog}: Queueing ${contactsToQueue.length} contact attachment downloads`
@ -257,8 +267,8 @@ export async function queueAttachmentDownloads(
attachment: item.avatar.avatar,
messageId,
attachmentType: 'contact',
receivedAt: message.received_at,
sentAt: message.sent_at,
receivedAt: message.get('received_at'),
sentAt: message.get('sent_at'),
urgency,
source,
}),
@ -266,8 +276,9 @@ export async function queueAttachmentDownloads(
};
})
);
message.set({ contact });
let { sticker } = message;
let sticker = message.get('sticker');
if (sticker && sticker.data && sticker.data.path) {
log.info(`${idLog}: Sticker attachment already downloaded`);
} else if (sticker) {
@ -294,8 +305,8 @@ export async function queueAttachmentDownloads(
attachment: sticker.data,
messageId,
attachmentType: 'sticker',
receivedAt: message.received_at,
sentAt: message.sent_at,
receivedAt: message.get('received_at'),
sentAt: message.get('sent_at'),
urgency,
source,
});
@ -320,8 +331,9 @@ export async function queueAttachmentDownloads(
data,
};
}
message.set({ sticker });
let { editHistory } = message;
let editHistory = message.get('editHistory');
if (editHistory) {
log.info(`${idLog}: Looping through ${editHistory.length} edits`);
editHistory = await Promise.all(
@ -332,8 +344,8 @@ export async function queueAttachmentDownloads(
messageId,
attachments: edit.attachments,
otherAttachments: attachments,
receivedAt: message.received_at,
sentAt: message.sent_at,
receivedAt: message.get('received_at'),
sentAt: message.get('sent_at'),
urgency,
source,
});
@ -351,8 +363,8 @@ export async function queueAttachmentDownloads(
messageId,
previews: edit.preview,
otherPreviews: preview,
receivedAt: message.received_at,
sentAt: message.sent_at,
receivedAt: message.get('received_at'),
sentAt: message.get('sent_at'),
urgency,
source,
});
@ -372,22 +384,15 @@ export async function queueAttachmentDownloads(
})
);
}
message.set({ editHistory });
if (count <= 0) {
return;
return false;
}
log.info(`${idLog}: Queued ${count} total attachment downloads`);
return {
attachments,
bodyAttachment,
contact,
editHistory,
preview,
quote,
sticker,
};
return true;
}
export async function queueNormalAttachments({

View file

@ -3,7 +3,6 @@
import type { ConversationAttributesType } from '../model-types.d';
import type { ConversationQueueJobData } from '../jobs/conversationJobQueue';
import { DataWriter } from '../sql/Client';
import * as Errors from '../types/errors';
import { DAY } from './durations';
import * as log from '../logging/log';
@ -21,7 +20,6 @@ import { getRecipientConversationIds } from './getRecipientConversationIds';
import { getRecipients } from './getRecipients';
import { repeat, zipObject } from './iterables';
import { isMe } from './whatTypeOfConversation';
import { postSaveUpdates } from './cleanup';
export async function sendDeleteForEveryoneMessage(
conversationAttributes: ConversationAttributesType,
@ -83,10 +81,8 @@ export async function sendDeleteForEveryoneMessage(
`sendDeleteForEveryoneMessage: Deleting message ${idForLogging} ` +
`in conversation ${conversationIdForLogging} with job ${jobToInsert.id}`
);
await DataWriter.saveMessage(message.attributes, {
await window.MessageCache.saveMessage(message.attributes, {
jobToInsert,
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
});
} catch (error) {

View file

@ -34,7 +34,6 @@ import { strictAssert } from './assert';
import { timeAndLogIfTooLong } from './timeAndLogIfTooLong';
import { makeQuote } from './makeQuote';
import { getMessageSentTimestamp } from './getMessageSentTimestamp';
import { postSaveUpdates } from './cleanup';
const SEND_REPORT_THRESHOLD_MS = 25;
@ -224,10 +223,8 @@ export async function sendEditedMessage(
log.info(
`${idLog}: saving message ${targetMessageId} and job ${jobToInsert.id}`
);
await DataWriter.saveMessage(targetMessage.attributes, {
await window.MessageCache.saveMessage(targetMessage.attributes, {
jobToInsert,
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
}
),

View file

@ -31,7 +31,6 @@ import { collect } from './iterables';
import { DurationInSeconds } from './durations';
import { sanitizeLinkPreview } from '../services/LinkPreview';
import type { DraftBodyRanges } from '../types/BodyRange';
import { postSaveUpdates } from './cleanup';
import { MessageModel } from '../models/messages';
export async function sendStoryMessage(
@ -315,10 +314,8 @@ export async function sendStoryMessage(
void ourConversation.addSingleMessage(message, { isJustSent: true });
log.info(`stories.sendStoryMessage: saving message ${message.timestamp}`);
return DataWriter.saveMessage(message, {
return window.MessageCache.saveMessage(message, {
forceSave: true,
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
})
);
@ -368,11 +365,9 @@ export async function sendStoryMessage(
log.info(
`stories.sendStoryMessage: saving message ${messageAttributes.timestamp}`
);
await DataWriter.saveMessage(messageAttributes, {
await window.MessageCache.saveMessage(messageAttributes, {
forceSave: true,
jobToInsert,
ourAci: window.textsecure.storage.user.getCheckedAci(),
postSaveUpdates,
});
}
);