Edit message import/export
This commit is contained in:
parent
d47b46500e
commit
fa1530debf
8 changed files with 562 additions and 158 deletions
|
@ -34,7 +34,7 @@ export async function migrateMessageData({
|
|||
saveMessages: (
|
||||
data: ReadonlyArray<MessageAttributesType>,
|
||||
options: { ourAci: AciString }
|
||||
) => Promise<void>;
|
||||
) => Promise<unknown>;
|
||||
maxVersion?: number;
|
||||
}>): Promise<
|
||||
| {
|
||||
|
|
|
@ -679,7 +679,9 @@ export class BackupExportStream extends Readable {
|
|||
const result: Backups.IChatItem = {
|
||||
chatId,
|
||||
authorId,
|
||||
dateSent: getSafeLongFromTimestamp(message.sent_at),
|
||||
dateSent: getSafeLongFromTimestamp(
|
||||
message.editMessageTimestamp || message.sent_at
|
||||
),
|
||||
expireStartDate,
|
||||
expiresInMs,
|
||||
revisions: [],
|
||||
|
@ -708,7 +710,10 @@ export class BackupExportStream extends Readable {
|
|||
});
|
||||
|
||||
if (authorId === me) {
|
||||
result.outgoing = this.getOutgoingMessageDetails(message);
|
||||
result.outgoing = this.getOutgoingMessageDetails(
|
||||
message.sent_at,
|
||||
message
|
||||
);
|
||||
} else {
|
||||
result.incoming = this.getIncomingMessageDetails(message);
|
||||
}
|
||||
|
@ -811,51 +816,22 @@ export class BackupExportStream extends Readable {
|
|||
reactions: this.getMessageReactions(message),
|
||||
};
|
||||
} else {
|
||||
result.standardMessage = {
|
||||
quote: await this.toQuote(message.quote),
|
||||
attachments: message.attachments
|
||||
? await Promise.all(
|
||||
message.attachments.map(attachment => {
|
||||
return this.processMessageAttachment({
|
||||
attachment,
|
||||
backupLevel,
|
||||
messageReceivedAt: message.received_at,
|
||||
});
|
||||
})
|
||||
)
|
||||
: undefined,
|
||||
text: {
|
||||
// Note that we store full text on the message model so we have to
|
||||
// trim it before serializing.
|
||||
body: message.body?.slice(0, LONG_ATTACHMENT_LIMIT),
|
||||
bodyRanges: message.bodyRanges?.map(range => this.toBodyRange(range)),
|
||||
},
|
||||
|
||||
linkPreview: message.preview
|
||||
? await Promise.all(
|
||||
message.preview.map(async preview => {
|
||||
return {
|
||||
url: preview.url,
|
||||
title: preview.title,
|
||||
description: preview.description,
|
||||
date: getSafeLongFromTimestamp(preview.date),
|
||||
image: preview.image
|
||||
? await this.processAttachment({
|
||||
attachment: preview.image,
|
||||
backupLevel,
|
||||
messageReceivedAt: message.received_at,
|
||||
})
|
||||
: undefined,
|
||||
};
|
||||
})
|
||||
)
|
||||
: undefined,
|
||||
reactions: this.getMessageReactions(message),
|
||||
};
|
||||
result.standardMessage = await this.toStandardMessage(
|
||||
message,
|
||||
backupLevel
|
||||
);
|
||||
result.revisions = await this.toChatItemRevisions(
|
||||
result,
|
||||
message,
|
||||
backupLevel
|
||||
);
|
||||
}
|
||||
|
||||
if (isOutgoing) {
|
||||
result.outgoing = this.getOutgoingMessageDetails(message);
|
||||
result.outgoing = this.getOutgoingMessageDetails(
|
||||
message.sent_at,
|
||||
message
|
||||
);
|
||||
} else {
|
||||
result.incoming = this.getIncomingMessageDetails(message);
|
||||
}
|
||||
|
@ -1792,7 +1768,9 @@ export class BackupExportStream extends Readable {
|
|||
|
||||
private getMessageReactions({
|
||||
reactions,
|
||||
}: MessageAttributesType): Array<Backups.IReaction> | undefined {
|
||||
}: Pick<MessageAttributesType, 'reactions'>):
|
||||
| Array<Backups.IReaction>
|
||||
| undefined {
|
||||
return reactions?.map(reaction => {
|
||||
return {
|
||||
emoji: reaction.emoji,
|
||||
|
@ -1809,12 +1787,20 @@ export class BackupExportStream extends Readable {
|
|||
|
||||
private getIncomingMessageDetails({
|
||||
received_at_ms: receivedAtMs,
|
||||
editMessageReceivedAtMs,
|
||||
serverTimestamp,
|
||||
readStatus,
|
||||
}: MessageAttributesType): Backups.ChatItem.IIncomingMessageDetails {
|
||||
}: Pick<
|
||||
MessageAttributesType,
|
||||
| 'received_at_ms'
|
||||
| 'editMessageReceivedAtMs'
|
||||
| 'serverTimestamp'
|
||||
| 'readStatus'
|
||||
>): Backups.ChatItem.IIncomingMessageDetails {
|
||||
const dateReceived = editMessageReceivedAtMs || receivedAtMs;
|
||||
return {
|
||||
dateReceived:
|
||||
receivedAtMs != null ? getSafeLongFromTimestamp(receivedAtMs) : null,
|
||||
dateReceived != null ? getSafeLongFromTimestamp(dateReceived) : null,
|
||||
dateServerSent:
|
||||
serverTimestamp != null
|
||||
? getSafeLongFromTimestamp(serverTimestamp)
|
||||
|
@ -1823,10 +1809,12 @@ export class BackupExportStream extends Readable {
|
|||
};
|
||||
}
|
||||
|
||||
private getOutgoingMessageDetails({
|
||||
sent_at: sentAt,
|
||||
sendStateByConversationId = {},
|
||||
}: MessageAttributesType): Backups.ChatItem.IOutgoingMessageDetails {
|
||||
private getOutgoingMessageDetails(
|
||||
sentAt: number,
|
||||
{
|
||||
sendStateByConversationId = {},
|
||||
}: Pick<MessageAttributesType, 'sendStateByConversationId'>
|
||||
): Backups.ChatItem.IOutgoingMessageDetails {
|
||||
const BackupSendStatus = Backups.SendStatus.Status;
|
||||
|
||||
const sendStatus = new Array<Backups.ISendStatus>();
|
||||
|
@ -1874,6 +1862,106 @@ export class BackupExportStream extends Readable {
|
|||
sendStatus,
|
||||
};
|
||||
}
|
||||
|
||||
private async toStandardMessage(
|
||||
message: Pick<
|
||||
MessageAttributesType,
|
||||
| 'quote'
|
||||
| 'attachments'
|
||||
| 'body'
|
||||
| 'bodyRanges'
|
||||
| 'preview'
|
||||
| 'reactions'
|
||||
| 'received_at'
|
||||
>,
|
||||
backupLevel: BackupLevel
|
||||
): Promise<Backups.IStandardMessage> {
|
||||
return {
|
||||
quote: await this.toQuote(message.quote),
|
||||
attachments: message.attachments
|
||||
? await Promise.all(
|
||||
message.attachments.map(attachment => {
|
||||
return this.processMessageAttachment({
|
||||
attachment,
|
||||
backupLevel,
|
||||
messageReceivedAt: message.received_at,
|
||||
});
|
||||
})
|
||||
)
|
||||
: undefined,
|
||||
text: {
|
||||
// Note that we store full text on the message model so we have to
|
||||
// trim it before serializing.
|
||||
body: message.body?.slice(0, LONG_ATTACHMENT_LIMIT),
|
||||
bodyRanges: message.bodyRanges?.map(range => this.toBodyRange(range)),
|
||||
},
|
||||
|
||||
linkPreview: message.preview
|
||||
? await Promise.all(
|
||||
message.preview.map(async preview => {
|
||||
return {
|
||||
url: preview.url,
|
||||
title: preview.title,
|
||||
description: preview.description,
|
||||
date: getSafeLongFromTimestamp(preview.date),
|
||||
image: preview.image
|
||||
? await this.processAttachment({
|
||||
attachment: preview.image,
|
||||
backupLevel,
|
||||
messageReceivedAt: message.received_at,
|
||||
})
|
||||
: undefined,
|
||||
};
|
||||
})
|
||||
)
|
||||
: undefined,
|
||||
reactions: this.getMessageReactions(message),
|
||||
};
|
||||
}
|
||||
|
||||
private async toChatItemRevisions(
|
||||
parent: Backups.IChatItem,
|
||||
message: MessageAttributesType,
|
||||
backupLevel: BackupLevel
|
||||
): Promise<Array<Backups.IChatItem> | undefined> {
|
||||
const { editHistory } = message;
|
||||
if (editHistory == null) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const isOutgoing = message.type === 'outgoing';
|
||||
|
||||
return Promise.all(
|
||||
editHistory
|
||||
// The first history is the copy of the current message
|
||||
.slice(1)
|
||||
.map(async history => {
|
||||
return {
|
||||
// Required fields
|
||||
chatId: parent.chatId,
|
||||
authorId: parent.authorId,
|
||||
dateSent: getSafeLongFromTimestamp(history.timestamp),
|
||||
expireStartDate: parent.expireStartDate,
|
||||
expiresInMs: parent.expiresInMs,
|
||||
sms: parent.sms,
|
||||
|
||||
// Directional details
|
||||
outgoing: isOutgoing
|
||||
? this.getOutgoingMessageDetails(history.timestamp, history)
|
||||
: undefined,
|
||||
incoming: isOutgoing
|
||||
? undefined
|
||||
: this.getIncomingMessageDetails(history),
|
||||
|
||||
// Message itself
|
||||
standardMessage: await this.toStandardMessage(history, backupLevel),
|
||||
};
|
||||
|
||||
// Backups use oldest to newest order
|
||||
})
|
||||
.reverse()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
function checkServiceIdEquivalence(
|
||||
|
|
|
@ -11,7 +11,7 @@ import { Backups, SignalService } from '../../protobuf';
|
|||
import Data from '../../sql/Client';
|
||||
import * as log from '../../logging/log';
|
||||
import { StorySendMode } from '../../types/Stories';
|
||||
import type { ServiceIdString } from '../../types/ServiceId';
|
||||
import type { ServiceIdString, AciString } from '../../types/ServiceId';
|
||||
import { fromAciObject, fromPniObject } from '../../types/ServiceId';
|
||||
import { isStoryDistributionId } from '../../types/StoryDistributionId';
|
||||
import * as Errors from '../../types/errors';
|
||||
|
@ -28,6 +28,7 @@ import type {
|
|||
ConversationAttributesType,
|
||||
MessageAttributesType,
|
||||
MessageReactionType,
|
||||
EditHistoryType,
|
||||
} from '../../model-types.d';
|
||||
import { assertDev, strictAssert } from '../../util/assert';
|
||||
import { getTimestampFromLong } from '../../util/timestampLongUtils';
|
||||
|
@ -90,6 +91,45 @@ async function processConversationOpBatch(
|
|||
await Data.saveConversations(saves);
|
||||
await Data.updateConversations(updates);
|
||||
}
|
||||
async function processMessagesBatch(
|
||||
ourAci: AciString,
|
||||
batch: ReadonlyArray<MessageAttributesType>
|
||||
): Promise<void> {
|
||||
const ids = await Data.saveMessages(batch, {
|
||||
forceSave: true,
|
||||
ourAci,
|
||||
});
|
||||
strictAssert(ids.length === batch.length, 'Should get same number of ids');
|
||||
|
||||
// TODO (DESKTOP-7402): consider re-saving after updating the pending state
|
||||
for (const [index, rawAttributes] of batch.entries()) {
|
||||
const attributes = {
|
||||
...rawAttributes,
|
||||
id: ids[index],
|
||||
};
|
||||
|
||||
const { editHistory } = attributes;
|
||||
|
||||
if (editHistory?.length) {
|
||||
drop(
|
||||
Data.saveEditedMessages(
|
||||
attributes,
|
||||
ourAci,
|
||||
editHistory.slice(0, -1).map(({ timestamp }) => ({
|
||||
conversationId: attributes.conversationId,
|
||||
messageId: attributes.id,
|
||||
|
||||
// Main message will track this
|
||||
readStatus: ReadStatus.Read,
|
||||
sentAt: timestamp,
|
||||
}))
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
drop(queueAttachmentDownloads(attributes));
|
||||
}
|
||||
}
|
||||
|
||||
function phoneToContactFormType(
|
||||
type: Backups.ContactAttachment.Phone.Type | null | undefined
|
||||
|
@ -181,18 +221,11 @@ export class BackupImportStream extends Writable {
|
|||
name: 'BackupImport.saveMessageBatcher',
|
||||
wait: 0,
|
||||
maxSize: 1000,
|
||||
processBatch: async batch => {
|
||||
processBatch: batch => {
|
||||
const ourAci = this.ourConversation?.serviceId;
|
||||
assertDev(isAciString(ourAci), 'Our conversation must have ACI');
|
||||
await Data.saveMessages(batch, {
|
||||
forceSave: true,
|
||||
ourAci,
|
||||
});
|
||||
|
||||
// TODO (DESKTOP-7402): consider re-saving after updating the pending state
|
||||
for (const messageAttributes of batch) {
|
||||
drop(queueAttachmentDownloads(messageAttributes));
|
||||
}
|
||||
return processMessagesBatch(ourAci, batch);
|
||||
},
|
||||
});
|
||||
private ourConversation?: ConversationAttributesType;
|
||||
|
@ -715,6 +748,19 @@ export class BackupImportStream extends Writable {
|
|||
? this.recipientIdToConvo.get(item.authorId.toNumber())
|
||||
: undefined;
|
||||
|
||||
const {
|
||||
patch: directionDetails,
|
||||
newActiveAt,
|
||||
unread,
|
||||
} = this.fromDirectionDetails(item, timestamp);
|
||||
|
||||
if (newActiveAt != null) {
|
||||
chatConvo.active_at = newActiveAt;
|
||||
}
|
||||
if (unread != null) {
|
||||
chatConvo.unreadCount = (chatConvo.unreadCount ?? 0) + 1;
|
||||
}
|
||||
|
||||
let attributes: MessageAttributesType = {
|
||||
id: generateUuid(),
|
||||
conversationId: chatConvo.id,
|
||||
|
@ -732,16 +778,109 @@ export class BackupImportStream extends Writable {
|
|||
item.expiresInMs && !item.expiresInMs.isZero()
|
||||
? DurationInSeconds.fromMillis(item.expiresInMs.toNumber())
|
||||
: undefined,
|
||||
...directionDetails,
|
||||
};
|
||||
const additionalMessages: Array<MessageAttributesType> = [];
|
||||
|
||||
const { outgoing, incoming, directionless } = item;
|
||||
if (outgoing) {
|
||||
if (item.incoming) {
|
||||
strictAssert(
|
||||
authorConvo && this.ourConversation.id !== authorConvo?.id,
|
||||
`${logId}: message with incoming field must be incoming`
|
||||
);
|
||||
} else if (item.outgoing) {
|
||||
strictAssert(
|
||||
authorConvo && this.ourConversation.id === authorConvo?.id,
|
||||
`${logId}: outgoing message must have outgoing field`
|
||||
);
|
||||
}
|
||||
|
||||
if (item.standardMessage) {
|
||||
// TODO (DESKTOP-6964): gift badge
|
||||
|
||||
attributes = {
|
||||
...attributes,
|
||||
...this.fromStandardMessage(item.standardMessage),
|
||||
};
|
||||
} else {
|
||||
const result = await this.fromNonBubbleChatItem(item, {
|
||||
aboutMe,
|
||||
author: authorConvo,
|
||||
conversation: chatConvo,
|
||||
timestamp,
|
||||
});
|
||||
|
||||
if (!result) {
|
||||
throw new Error(`${logId}: fromNonBubbleChat item returned nothing!`);
|
||||
}
|
||||
|
||||
attributes = {
|
||||
...attributes,
|
||||
...result.message,
|
||||
};
|
||||
|
||||
let sentAt = attributes.sent_at;
|
||||
(result.additionalMessages || []).forEach(additional => {
|
||||
sentAt -= 1;
|
||||
additionalMessages.push({
|
||||
...attributes,
|
||||
sent_at: sentAt,
|
||||
...additional,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
if (item.revisions?.length) {
|
||||
strictAssert(
|
||||
item.standardMessage,
|
||||
'Only standard message can have revisions'
|
||||
);
|
||||
|
||||
const history = this.fromRevisions(attributes, item.revisions);
|
||||
attributes.editHistory = history;
|
||||
|
||||
// Update timestamps on the parent message
|
||||
const oldest = history.at(-1);
|
||||
|
||||
assertDev(oldest != null, 'History is non-empty');
|
||||
|
||||
attributes.editMessageReceivedAt = attributes.received_at;
|
||||
attributes.editMessageReceivedAtMs = attributes.received_at_ms;
|
||||
attributes.editMessageTimestamp = attributes.timestamp;
|
||||
|
||||
attributes.received_at = oldest.received_at;
|
||||
attributes.received_at_ms = oldest.received_at_ms;
|
||||
attributes.timestamp = oldest.timestamp;
|
||||
attributes.sent_at = oldest.timestamp;
|
||||
}
|
||||
|
||||
assertDev(
|
||||
isAciString(this.ourConversation.serviceId),
|
||||
`${logId}: Our conversation must have ACI`
|
||||
);
|
||||
this.saveMessage(attributes);
|
||||
additionalMessages.forEach(additional => this.saveMessage(additional));
|
||||
|
||||
// TODO (DESKTOP-6964): We'll want to increment for more types here - stickers, etc.
|
||||
if (item.standardMessage) {
|
||||
if (item.outgoing != null) {
|
||||
chatConvo.sentMessageCount = (chatConvo.sentMessageCount ?? 0) + 1;
|
||||
} else {
|
||||
chatConvo.messageCount = (chatConvo.messageCount ?? 0) + 1;
|
||||
}
|
||||
}
|
||||
this.updateConversation(chatConvo);
|
||||
}
|
||||
|
||||
private fromDirectionDetails(
|
||||
item: Backups.IChatItem,
|
||||
timestamp: number
|
||||
): {
|
||||
patch: Partial<MessageAttributesType>;
|
||||
newActiveAt?: number;
|
||||
unread?: boolean;
|
||||
} {
|
||||
const { outgoing, incoming, directionless } = item;
|
||||
if (outgoing) {
|
||||
const sendStateByConversationId: SendStateByConversationId = {};
|
||||
|
||||
const BackupSendStatus = Backups.SendStatus.Status;
|
||||
|
@ -785,93 +924,54 @@ export class BackupImportStream extends Writable {
|
|||
sendStateByConversationId[target.id] = {
|
||||
status: sendStatus,
|
||||
updatedAt:
|
||||
status.lastStatusUpdateTimestamp != null
|
||||
status.lastStatusUpdateTimestamp != null &&
|
||||
!status.lastStatusUpdateTimestamp.isZero()
|
||||
? getTimestampFromLong(status.lastStatusUpdateTimestamp)
|
||||
: undefined,
|
||||
};
|
||||
}
|
||||
|
||||
attributes.sendStateByConversationId = sendStateByConversationId;
|
||||
chatConvo.active_at = attributes.sent_at;
|
||||
} else if (incoming) {
|
||||
strictAssert(
|
||||
authorConvo && this.ourConversation.id !== authorConvo?.id,
|
||||
`${logId}: message with incoming field must be incoming`
|
||||
);
|
||||
attributes.received_at_ms =
|
||||
incoming.dateReceived?.toNumber() ?? Date.now();
|
||||
return {
|
||||
patch: {
|
||||
sendStateByConversationId,
|
||||
received_at_ms: timestamp,
|
||||
},
|
||||
newActiveAt: timestamp,
|
||||
};
|
||||
}
|
||||
if (incoming) {
|
||||
const receivedAtMs = incoming.dateReceived?.toNumber() ?? Date.now();
|
||||
|
||||
if (incoming.read) {
|
||||
attributes.readStatus = ReadStatus.Read;
|
||||
attributes.seenStatus = SeenStatus.Seen;
|
||||
} else {
|
||||
attributes.readStatus = ReadStatus.Unread;
|
||||
attributes.seenStatus = SeenStatus.Unseen;
|
||||
chatConvo.unreadCount = (chatConvo.unreadCount ?? 0) + 1;
|
||||
return {
|
||||
patch: {
|
||||
readStatus: ReadStatus.Read,
|
||||
seenStatus: SeenStatus.Seen,
|
||||
received_at_ms: receivedAtMs,
|
||||
},
|
||||
newActiveAt: receivedAtMs,
|
||||
};
|
||||
}
|
||||
|
||||
chatConvo.active_at = attributes.received_at_ms;
|
||||
} else if (directionless) {
|
||||
// Nothing to do
|
||||
}
|
||||
|
||||
if (item.standardMessage) {
|
||||
// TODO (DESKTOP-6964): add revisions to editHistory
|
||||
// gift badge
|
||||
|
||||
attributes = {
|
||||
...attributes,
|
||||
...this.fromStandardMessage(item.standardMessage),
|
||||
return {
|
||||
patch: {
|
||||
readStatus: ReadStatus.Unread,
|
||||
seenStatus: SeenStatus.Unseen,
|
||||
received_at_ms: receivedAtMs,
|
||||
},
|
||||
newActiveAt: receivedAtMs,
|
||||
unread: true,
|
||||
};
|
||||
} else {
|
||||
const result = await this.fromNonBubbleChatItem(item, {
|
||||
aboutMe,
|
||||
author: authorConvo,
|
||||
conversation: chatConvo,
|
||||
timestamp,
|
||||
});
|
||||
|
||||
if (!result) {
|
||||
throw new Error(`${logId}: fromNonBubbleChat item returned nothing!`);
|
||||
}
|
||||
|
||||
attributes = {
|
||||
...attributes,
|
||||
...result.message,
|
||||
};
|
||||
|
||||
let sentAt = attributes.sent_at;
|
||||
(result.additionalMessages || []).forEach(additional => {
|
||||
sentAt -= 1;
|
||||
additionalMessages.push({
|
||||
...attributes,
|
||||
sent_at: sentAt,
|
||||
...additional,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
assertDev(
|
||||
isAciString(this.ourConversation.serviceId),
|
||||
`${logId}: Our conversation must have ACI`
|
||||
);
|
||||
this.saveMessage(attributes);
|
||||
additionalMessages.forEach(additional => this.saveMessage(additional));
|
||||
|
||||
// TODO (DESKTOP-6964): We'll want to increment for more types here - stickers, etc.
|
||||
if (item.standardMessage) {
|
||||
if (item.outgoing != null) {
|
||||
chatConvo.sentMessageCount = (chatConvo.sentMessageCount ?? 0) + 1;
|
||||
} else {
|
||||
chatConvo.messageCount = (chatConvo.messageCount ?? 0) + 1;
|
||||
}
|
||||
}
|
||||
this.updateConversation(chatConvo);
|
||||
strictAssert(directionless, 'Absent direction state');
|
||||
return { patch: {} };
|
||||
}
|
||||
|
||||
private fromStandardMessage(
|
||||
data: Backups.IStandardMessage
|
||||
): Partial<MessageAttributesType> {
|
||||
// TODO (DESKTOP-6964): Quote, link preview
|
||||
return {
|
||||
body: data.text?.body || undefined,
|
||||
attachments: data.attachments?.length
|
||||
|
@ -903,13 +1003,63 @@ export class BackupImportStream extends Writable {
|
|||
};
|
||||
}
|
||||
|
||||
private fromRevisions(
|
||||
mainMessage: MessageAttributesType,
|
||||
revisions: ReadonlyArray<Backups.IChatItem>
|
||||
): Array<EditHistoryType> {
|
||||
const result = revisions
|
||||
.map(rev => {
|
||||
strictAssert(
|
||||
rev.standardMessage,
|
||||
'Edit history has non-standard messages'
|
||||
);
|
||||
|
||||
const timestamp = getTimestampFromLong(rev.dateSent);
|
||||
|
||||
const {
|
||||
// eslint-disable-next-line camelcase
|
||||
patch: { sendStateByConversationId, received_at_ms },
|
||||
} = this.fromDirectionDetails(rev, timestamp);
|
||||
|
||||
return {
|
||||
...this.fromStandardMessage(rev.standardMessage),
|
||||
timestamp,
|
||||
received_at: incrementMessageCounter(),
|
||||
sendStateByConversationId,
|
||||
// eslint-disable-next-line camelcase
|
||||
received_at_ms,
|
||||
};
|
||||
})
|
||||
// Fix order: from newest to oldest
|
||||
.reverse();
|
||||
|
||||
// See `ts/util/handleEditMessage.ts`, the first history entry is always
|
||||
// the current message.
|
||||
result.unshift({
|
||||
attachments: mainMessage.attachments,
|
||||
body: mainMessage.body,
|
||||
bodyAttachment: mainMessage.bodyAttachment,
|
||||
bodyRanges: mainMessage.bodyRanges,
|
||||
preview: mainMessage.preview,
|
||||
quote: mainMessage.quote,
|
||||
sendStateByConversationId: mainMessage.sendStateByConversationId
|
||||
? { ...mainMessage.sendStateByConversationId }
|
||||
: undefined,
|
||||
timestamp: mainMessage.timestamp,
|
||||
received_at: mainMessage.received_at,
|
||||
received_at_ms: mainMessage.received_at_ms,
|
||||
});
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private fromReactions(
|
||||
reactions: ReadonlyArray<Backups.IReaction> | null | undefined
|
||||
): Array<MessageReactionType> | undefined {
|
||||
if (!reactions?.length) {
|
||||
return undefined;
|
||||
}
|
||||
return reactions?.map(
|
||||
return reactions.map(
|
||||
({ emoji, authorId, sentTimestamp, receivedTimestamp }) => {
|
||||
strictAssert(emoji != null, 'reaction must have an emoji');
|
||||
strictAssert(authorId != null, 'reaction must have authorId');
|
||||
|
|
|
@ -571,14 +571,16 @@ async function saveMessage(
|
|||
async function saveMessages(
|
||||
arrayOfMessages: ReadonlyArray<MessageType>,
|
||||
options: { forceSave?: boolean; ourAci: AciString }
|
||||
): Promise<void> {
|
||||
await channels.saveMessages(
|
||||
): Promise<Array<string>> {
|
||||
const result = await channels.saveMessages(
|
||||
arrayOfMessages.map(message => _cleanMessageData(message)),
|
||||
options
|
||||
);
|
||||
|
||||
void expiringMessagesDeletionService.update();
|
||||
void tapToViewMessagesDeletionService.update();
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
async function removeMessage(id: string): Promise<void> {
|
||||
|
|
|
@ -556,7 +556,7 @@ export type DataInterface = {
|
|||
saveMessages: (
|
||||
arrayOfMessages: ReadonlyArray<MessageType>,
|
||||
options: { forceSave?: boolean; ourAci: AciString }
|
||||
) => Promise<void>;
|
||||
) => Promise<Array<string>>;
|
||||
removeMessage: (id: string) => Promise<void>;
|
||||
removeMessages: (ids: ReadonlyArray<string>) => Promise<void>;
|
||||
pageMessages: (
|
||||
|
@ -724,6 +724,11 @@ export type DataInterface = {
|
|||
ourAci: AciString,
|
||||
opts: EditedMessageType
|
||||
) => Promise<void>;
|
||||
saveEditedMessages: (
|
||||
mainMessage: MessageType,
|
||||
ourAci: AciString,
|
||||
history: ReadonlyArray<EditedMessageType>
|
||||
) => Promise<void>;
|
||||
getMostRecentAddressableMessages: (
|
||||
conversationId: string,
|
||||
limit?: number
|
||||
|
|
|
@ -369,6 +369,7 @@ const dataInterface: ServerInterface = {
|
|||
getMessagesBetween,
|
||||
getNearbyMessageFromDeletedSet,
|
||||
saveEditedMessage,
|
||||
saveEditedMessages,
|
||||
getMostRecentAddressableMessages,
|
||||
|
||||
removeSyncTaskById,
|
||||
|
@ -2450,15 +2451,17 @@ async function saveMessage(
|
|||
async function saveMessages(
|
||||
arrayOfMessages: ReadonlyArray<MessageType>,
|
||||
options: { forceSave?: boolean; ourAci: AciString }
|
||||
): Promise<void> {
|
||||
): Promise<Array<string>> {
|
||||
const db = await getWritableInstance();
|
||||
|
||||
db.transaction(() => {
|
||||
return db.transaction(() => {
|
||||
const result = new Array<string>();
|
||||
for (const message of arrayOfMessages) {
|
||||
assertSync(
|
||||
result.push(
|
||||
saveMessageSync(db, message, { ...options, alreadyInTransaction: true })
|
||||
);
|
||||
}
|
||||
return result;
|
||||
})();
|
||||
}
|
||||
|
||||
|
@ -7165,10 +7168,10 @@ async function removeAllProfileKeyCredentials(): Promise<void> {
|
|||
);
|
||||
}
|
||||
|
||||
async function saveEditedMessage(
|
||||
async function saveEditedMessagesSync(
|
||||
mainMessage: MessageType,
|
||||
ourAci: AciString,
|
||||
{ conversationId, messageId, readStatus, sentAt }: EditedMessageType
|
||||
history: ReadonlyArray<EditedMessageType>
|
||||
): Promise<void> {
|
||||
const db = await getWritableInstance();
|
||||
|
||||
|
@ -7180,24 +7183,42 @@ async function saveEditedMessage(
|
|||
})
|
||||
);
|
||||
|
||||
const [query, params] = sql`
|
||||
INSERT INTO edited_messages (
|
||||
conversationId,
|
||||
messageId,
|
||||
sentAt,
|
||||
readStatus
|
||||
) VALUES (
|
||||
${conversationId},
|
||||
${messageId},
|
||||
${sentAt},
|
||||
${readStatus}
|
||||
);
|
||||
`;
|
||||
for (const { conversationId, messageId, readStatus, sentAt } of history) {
|
||||
const [query, params] = sql`
|
||||
INSERT INTO edited_messages (
|
||||
conversationId,
|
||||
messageId,
|
||||
sentAt,
|
||||
readStatus
|
||||
) VALUES (
|
||||
${conversationId},
|
||||
${messageId},
|
||||
${sentAt},
|
||||
${readStatus}
|
||||
);
|
||||
`;
|
||||
|
||||
db.prepare(query).run(params);
|
||||
db.prepare(query).run(params);
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
||||
async function saveEditedMessage(
|
||||
mainMessage: MessageType,
|
||||
ourAci: AciString,
|
||||
editedMessage: EditedMessageType
|
||||
): Promise<void> {
|
||||
return saveEditedMessagesSync(mainMessage, ourAci, [editedMessage]);
|
||||
}
|
||||
|
||||
async function saveEditedMessages(
|
||||
mainMessage: MessageType,
|
||||
ourAci: AciString,
|
||||
editedMessages: ReadonlyArray<EditedMessageType>
|
||||
): Promise<void> {
|
||||
return saveEditedMessagesSync(mainMessage, ourAci, editedMessages);
|
||||
}
|
||||
|
||||
async function _getAllEditedMessages(): Promise<
|
||||
Array<{ messageId: string; sentAt: number }>
|
||||
> {
|
||||
|
|
134
ts/test-electron/backup/bubble_test.ts
Normal file
134
ts/test-electron/backup/bubble_test.ts
Normal file
|
@ -0,0 +1,134 @@
|
|||
// Copyright 2024 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import { v4 as generateGuid } from 'uuid';
|
||||
|
||||
import { SendStatus } from '../../messages/MessageSendState';
|
||||
import type { ConversationModel } from '../../models/conversations';
|
||||
|
||||
import Data from '../../sql/Client';
|
||||
import { generateAci } from '../../types/ServiceId';
|
||||
import { ReadStatus } from '../../messages/MessageReadStatus';
|
||||
import { SeenStatus } from '../../MessageSeenStatus';
|
||||
import { loadCallsHistory } from '../../services/callHistoryLoader';
|
||||
import { setupBasics, symmetricRoundtripHarness, OUR_ACI } from './helpers';
|
||||
|
||||
const CONTACT_A = generateAci();
|
||||
|
||||
describe('backup/bubble messages', () => {
|
||||
let contactA: ConversationModel;
|
||||
|
||||
beforeEach(async () => {
|
||||
await Data._removeAllMessages();
|
||||
await Data._removeAllConversations();
|
||||
window.storage.reset();
|
||||
|
||||
await setupBasics();
|
||||
|
||||
contactA = await window.ConversationController.getOrCreateAndWait(
|
||||
CONTACT_A,
|
||||
'private',
|
||||
{ systemGivenName: 'CONTACT_A' }
|
||||
);
|
||||
|
||||
await loadCallsHistory();
|
||||
});
|
||||
|
||||
it('roundtrips incoming edited message', async () => {
|
||||
await symmetricRoundtripHarness([
|
||||
{
|
||||
conversationId: contactA.id,
|
||||
id: generateGuid(),
|
||||
type: 'incoming',
|
||||
received_at: 3,
|
||||
received_at_ms: 3,
|
||||
sent_at: 3,
|
||||
timestamp: 3,
|
||||
sourceServiceId: CONTACT_A,
|
||||
body: 'd',
|
||||
readStatus: ReadStatus.Unread,
|
||||
seenStatus: SeenStatus.Unseen,
|
||||
editMessageTimestamp: 5,
|
||||
editMessageReceivedAtMs: 5,
|
||||
editHistory: [
|
||||
{
|
||||
body: 'd',
|
||||
timestamp: 5,
|
||||
received_at: 5,
|
||||
received_at_ms: 5,
|
||||
},
|
||||
{
|
||||
body: 'c',
|
||||
timestamp: 4,
|
||||
received_at: 4,
|
||||
received_at_ms: 4,
|
||||
},
|
||||
{
|
||||
body: 'b',
|
||||
timestamp: 3,
|
||||
received_at: 3,
|
||||
received_at_ms: 3,
|
||||
},
|
||||
],
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it('roundtrips outgoing edited message', async () => {
|
||||
await symmetricRoundtripHarness([
|
||||
{
|
||||
conversationId: contactA.id,
|
||||
id: generateGuid(),
|
||||
type: 'outgoing',
|
||||
received_at: 3,
|
||||
received_at_ms: 3,
|
||||
sent_at: 3,
|
||||
sourceServiceId: OUR_ACI,
|
||||
sendStateByConversationId: {
|
||||
[contactA.id]: {
|
||||
status: SendStatus.Delivered,
|
||||
},
|
||||
},
|
||||
timestamp: 3,
|
||||
editMessageTimestamp: 5,
|
||||
editMessageReceivedAtMs: 5,
|
||||
body: 'd',
|
||||
editHistory: [
|
||||
{
|
||||
body: 'd',
|
||||
timestamp: 5,
|
||||
received_at: 5,
|
||||
received_at_ms: 5,
|
||||
sendStateByConversationId: {
|
||||
[contactA.id]: {
|
||||
status: SendStatus.Delivered,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
body: 'c',
|
||||
timestamp: 4,
|
||||
received_at: 4,
|
||||
received_at_ms: 4,
|
||||
sendStateByConversationId: {
|
||||
[contactA.id]: {
|
||||
status: SendStatus.Viewed,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
body: 'b',
|
||||
timestamp: 3,
|
||||
received_at: 3,
|
||||
received_at_ms: 3,
|
||||
sendStateByConversationId: {
|
||||
[contactA.id]: {
|
||||
status: SendStatus.Viewed,
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
]);
|
||||
});
|
||||
});
|
|
@ -60,6 +60,8 @@ function sortAndNormalize(
|
|||
received_at: _receivedAt,
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
sourceDevice: _sourceDevice,
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
editMessageReceivedAt: _editMessageReceivedAt,
|
||||
|
||||
...rest
|
||||
} = message;
|
||||
|
@ -94,6 +96,8 @@ function sortAndNormalize(
|
|||
editHistory: editHistory?.map(history => {
|
||||
const {
|
||||
sendStateByConversationId: historySendState,
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
received_at: _receivedAtHistory,
|
||||
...restOfHistory
|
||||
} = history;
|
||||
|
||||
|
@ -155,7 +159,7 @@ export async function asymmetricRoundtripHarness(
|
|||
|
||||
const expected = sortAndNormalize(after);
|
||||
const actual = sortAndNormalize(messagesFromDatabase);
|
||||
assert.deepEqual(expected, actual);
|
||||
assert.deepEqual(actual, expected);
|
||||
} finally {
|
||||
fetchAndSaveBackupCdnObjectMetadata.restore();
|
||||
await rm(outDir, { recursive: true });
|
||||
|
|
Loading…
Add table
Reference in a new issue