Normalize messages table

This commit is contained in:
Fedor Indutny 2025-01-16 13:34:35 -08:00 committed by GitHub
parent 9bec59b70a
commit 630a1fcc89
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 490 additions and 240 deletions

3
ts/model-types.d.ts vendored
View file

@ -168,6 +168,9 @@ type MessageType =
| 'verified-change' | 'verified-change'
| 'message-request-response-event'; | 'message-request-response-event';
// Note: when adding a property that is likely to be set across many messages,
// consider adding a database column as well and updating `MESSAGE_COLUMNS`
// in `ts/sql/Server.ts`
export type MessageAttributesType = { export type MessageAttributesType = {
bodyAttachment?: AttachmentType; bodyAttachment?: AttachmentType;
bodyRanges?: ReadonlyArray<RawBodyRange>; bodyRanges?: ReadonlyArray<RawBodyRange>;

View file

@ -62,6 +62,7 @@ import type {
ClientOnlyReadableInterface, ClientOnlyReadableInterface,
ClientOnlyWritableInterface, ClientOnlyWritableInterface,
} from './Interface'; } from './Interface';
import { hydrateMessage } from './hydration';
import type { MessageAttributesType } from '../model-types'; import type { MessageAttributesType } from '../model-types';
import type { AttachmentDownloadJobType } from '../types/AttachmentDownload'; import type { AttachmentDownloadJobType } from '../types/AttachmentDownload';
@ -545,7 +546,7 @@ function handleSearchMessageJSON(
messages: Array<ServerSearchResultMessageType> messages: Array<ServerSearchResultMessageType>
): Array<ClientSearchResultMessageType> { ): Array<ClientSearchResultMessageType> {
return messages.map<ClientSearchResultMessageType>(message => { return messages.map<ClientSearchResultMessageType>(message => {
const parsedMessage = JSON.parse(message.json); const parsedMessage = hydrateMessage(message);
assertDev( assertDev(
message.ftsSnippet ?? typeof message.mentionStart === 'number', message.ftsSnippet ?? typeof message.mentionStart === 'number',
'Neither ftsSnippet nor matching mention returned from message search' 'Neither ftsSnippet nor matching mention returned from message search'
@ -553,14 +554,12 @@ function handleSearchMessageJSON(
const snippet = const snippet =
message.ftsSnippet ?? message.ftsSnippet ??
generateSnippetAroundMention({ generateSnippetAroundMention({
body: parsedMessage.body, body: parsedMessage.body || '',
mentionStart: message.mentionStart ?? 0, mentionStart: message.mentionStart ?? 0,
mentionLength: message.mentionLength ?? 1, mentionLength: message.mentionLength ?? 1,
}); });
return { return {
json: message.json,
// Empty array is a default value. `message.json` has the real field // Empty array is a default value. `message.json` has the real field
bodyRanges: [], bodyRanges: [],
...parsedMessage, ...parsedMessage,
@ -734,7 +733,7 @@ async function removeMessages(
function handleMessageJSON( function handleMessageJSON(
messages: Array<MessageTypeUnhydrated> messages: Array<MessageTypeUnhydrated>
): Array<MessageType> { ): Array<MessageType> {
return messages.map(message => JSON.parse(message.json)); return messages.map(message => hydrateMessage(message));
} }
async function getNewerMessagesByConversation( async function getNewerMessagesByConversation(

View file

@ -115,8 +115,76 @@ export type StoredItemType<K extends ItemKeyType> = {
value: BytesToStrings<StorageAccessType[K]>; value: BytesToStrings<StorageAccessType[K]>;
}; };
export type MessageType = MessageAttributesType; export type MessageType = MessageAttributesType;
// See: ts/sql/Interface.ts
//
// When adding a new column:
//
// - Make sure the name matches the one in `MessageAttributeTypes`
// - Update `hydrateMessage`
//
export const MESSAGE_COLUMNS = [
'json',
'id',
'body',
'conversationId',
'expirationStartTimestamp',
'expireTimer',
'hasAttachments',
'hasFileAttachments',
'hasVisualMediaAttachments',
'isChangeCreatedByUs',
'isErased',
'isViewOnce',
'mentionsMe',
'received_at',
'received_at_ms',
'schemaVersion',
'serverGuid',
'sent_at',
'source',
'sourceServiceId',
'sourceDevice',
'storyId',
'type',
'readStatus',
'seenStatus',
'serverTimestamp',
'timestamp',
'unidentifiedDeliveryReceived',
] as const;
export type MessageTypeUnhydrated = { export type MessageTypeUnhydrated = {
json: string; json: string;
id: string;
body: string | null;
conversationId: string | null;
expirationStartTimestamp: number | null;
expireTimer: number | null;
hasAttachments: 0 | 1 | null;
hasFileAttachments: 0 | 1 | null;
hasVisualMediaAttachments: 0 | 1 | null;
isChangeCreatedByUs: 0 | 1 | null;
isErased: 0 | 1 | null;
isViewOnce: 0 | 1 | null;
mentionsMe: 0 | 1 | null;
received_at: number | null;
received_at_ms: number | null;
schemaVersion: number | null;
serverGuid: string | null;
sent_at: number | null;
source: string | null;
sourceServiceId: string | null;
sourceDevice: number | null;
serverTimestamp: number | null;
storyId: string | null;
type: string;
timestamp: number | null;
readStatus: number | null;
seenStatus: number | null;
unidentifiedDeliveryReceived: 0 | 1 | null;
}; };
export type PreKeyIdType = `${ServiceIdString}:${number}`; export type PreKeyIdType = `${ServiceIdString}:${number}`;
@ -147,9 +215,7 @@ export type StoredPreKeyType = PreKeyType & {
privateKey: string; privateKey: string;
publicKey: string; publicKey: string;
}; };
export type ServerSearchResultMessageType = { export type ServerSearchResultMessageType = MessageTypeUnhydrated & {
json: string;
// If the FTS matches text in message.body, snippet will be populated // If the FTS matches text in message.body, snippet will be populated
ftsSnippet: string | null; ftsSnippet: string | null;
@ -159,7 +225,6 @@ export type ServerSearchResultMessageType = {
mentionLength: number | null; mentionLength: number | null;
}; };
export type ClientSearchResultMessageType = MessageType & { export type ClientSearchResultMessageType = MessageType & {
json: string;
bodyRanges: ReadonlyArray<RawBodyRange>; bodyRanges: ReadonlyArray<RawBodyRange>;
snippet: string; snippet: string;
}; };

View file

@ -55,13 +55,7 @@ import { isNormalNumber } from '../util/isNormalNumber';
import { isNotNil } from '../util/isNotNil'; import { isNotNil } from '../util/isNotNil';
import { parseIntOrThrow } from '../util/parseIntOrThrow'; import { parseIntOrThrow } from '../util/parseIntOrThrow';
import { updateSchema } from './migrations'; import { updateSchema } from './migrations';
import type { import type { ArrayQuery, EmptyQuery, JSONRows, Query } from './util';
ArrayQuery,
EmptyQuery,
JSONRows,
Query,
QueryFragment,
} from './util';
import { import {
batchMultiVarQuery, batchMultiVarQuery,
bulkAdd, bulkAdd,
@ -80,7 +74,9 @@ import {
sqlConstant, sqlConstant,
sqlFragment, sqlFragment,
sqlJoin, sqlJoin,
QueryFragment,
} from './util'; } from './util';
import { hydrateMessage } from './hydration';
import { getAttachmentCiphertextLength } from '../AttachmentCrypto'; import { getAttachmentCiphertextLength } from '../AttachmentCrypto';
import { SeenStatus } from '../MessageSeenStatus'; import { SeenStatus } from '../MessageSeenStatus';
@ -181,7 +177,7 @@ import type {
UnprocessedUpdateType, UnprocessedUpdateType,
WritableDB, WritableDB,
} from './Interface'; } from './Interface';
import { AttachmentDownloadSource } from './Interface'; import { AttachmentDownloadSource, MESSAGE_COLUMNS } from './Interface';
import { import {
_removeAllCallLinks, _removeAllCallLinks,
beginDeleteAllCallLinks, beginDeleteAllCallLinks,
@ -582,6 +578,10 @@ export function prepare<T extends Array<unknown> | Record<string, unknown>>(
return result; return result;
} }
const MESSAGE_COLUMNS_FRAGMENTS = MESSAGE_COLUMNS.map(
column => new QueryFragment(column, [])
);
function rowToConversation(row: ConversationRow): ConversationType { function rowToConversation(row: ConversationRow): ConversationType {
const { expireTimerVersion } = row; const { expireTimerVersion } = row;
const parsedJson = JSON.parse(row.json); const parsedJson = JSON.parse(row.json);
@ -603,6 +603,7 @@ function rowToConversation(row: ConversationRow): ConversationType {
profileLastFetchedAt, profileLastFetchedAt,
}; };
} }
function rowToSticker(row: StickerRow): StickerType { function rowToSticker(row: StickerRow): StickerType {
return { return {
...row, ...row,
@ -1938,6 +1939,10 @@ function searchMessages(
.run({ conversationId, limit }); .run({ conversationId, limit });
} }
const prefixedColumns = sqlJoin(
MESSAGE_COLUMNS_FRAGMENTS.map(name => sqlFragment`messages.${name}`)
);
// The `MATCH` is necessary in order to for `snippet()` helper function to // The `MATCH` is necessary in order to for `snippet()` helper function to
// give us the right results. We can't call `snippet()` in the query above // give us the right results. We can't call `snippet()` in the query above
// because it would bloat the temporary table with text data and we want // because it would bloat the temporary table with text data and we want
@ -1945,9 +1950,7 @@ function searchMessages(
const ftsFragment = sqlFragment` const ftsFragment = sqlFragment`
SELECT SELECT
messages.rowid, messages.rowid,
messages.json, ${prefixedColumns},
messages.sent_at,
messages.received_at,
snippet(messages_fts, -1, ${SNIPPET_LEFT_PLACEHOLDER}, ${SNIPPET_RIGHT_PLACEHOLDER}, ${SNIPPET_TRUNCATION_PLACEHOLDER}, 10) AS ftsSnippet snippet(messages_fts, -1, ${SNIPPET_LEFT_PLACEHOLDER}, ${SNIPPET_RIGHT_PLACEHOLDER}, ${SNIPPET_TRUNCATION_PLACEHOLDER}, 10) AS ftsSnippet
FROM tmp_filtered_results FROM tmp_filtered_results
INNER JOIN messages_fts INNER JOIN messages_fts
@ -1966,6 +1969,12 @@ function searchMessages(
const [sqlQuery, params] = sql`${ftsFragment};`; const [sqlQuery, params] = sql`${ftsFragment};`;
result = writable.prepare(sqlQuery).all(params); result = writable.prepare(sqlQuery).all(params);
} else { } else {
const coalescedColumns = MESSAGE_COLUMNS_FRAGMENTS.map(
name => sqlFragment`
COALESCE(messages.${name}, ftsResults.${name}) AS ${name}
`
);
// If contactServiceIdsMatchingQuery is not empty, we due an OUTER JOIN // If contactServiceIdsMatchingQuery is not empty, we due an OUTER JOIN
// between: // between:
// 1) the messages that mention at least one of // 1) the messages that mention at least one of
@ -1978,9 +1987,7 @@ function searchMessages(
const [sqlQuery, params] = sql` const [sqlQuery, params] = sql`
SELECT SELECT
messages.rowid as rowid, messages.rowid as rowid,
COALESCE(messages.json, ftsResults.json) as json, ${sqlJoin(coalescedColumns)},
COALESCE(messages.sent_at, ftsResults.sent_at) as sent_at,
COALESCE(messages.received_at, ftsResults.received_at) as received_at,
ftsResults.ftsSnippet, ftsResults.ftsSnippet,
mentionAci, mentionAci,
start as mentionStart, start as mentionStart,
@ -2082,7 +2089,9 @@ export function getMostRecentAddressableMessages(
limit = 5 limit = 5
): Array<MessageType> { ): Array<MessageType> {
const [query, parameters] = sql` const [query, parameters] = sql`
SELECT json FROM messages SELECT
${sqlJoin(MESSAGE_COLUMNS_FRAGMENTS)}
FROM messages
INDEXED BY messages_by_date_addressable INDEXED BY messages_by_date_addressable
WHERE WHERE
conversationId IS ${conversationId} AND conversationId IS ${conversationId} AND
@ -2093,7 +2102,7 @@ export function getMostRecentAddressableMessages(
const rows = db.prepare(query).all(parameters); const rows = db.prepare(query).all(parameters);
return rows.map(row => jsonToObject(row.json)); return rows.map(row => hydrateMessage(row));
} }
export function getMostRecentAddressableNondisappearingMessages( export function getMostRecentAddressableNondisappearingMessages(
@ -2102,7 +2111,9 @@ export function getMostRecentAddressableNondisappearingMessages(
limit = 5 limit = 5
): Array<MessageType> { ): Array<MessageType> {
const [query, parameters] = sql` const [query, parameters] = sql`
SELECT json FROM messages SELECT
${sqlJoin(MESSAGE_COLUMNS_FRAGMENTS)}
FROM messages
INDEXED BY messages_by_date_addressable_nondisappearing INDEXED BY messages_by_date_addressable_nondisappearing
WHERE WHERE
expireTimer IS NULL AND expireTimer IS NULL AND
@ -2114,7 +2125,7 @@ export function getMostRecentAddressableNondisappearingMessages(
const rows = db.prepare(query).all(parameters); const rows = db.prepare(query).all(parameters);
return rows.map(row => jsonToObject(row.json)); return rows.map(row => hydrateMessage(row));
} }
export function removeSyncTaskById(db: WritableDB, id: string): void { export function removeSyncTaskById(db: WritableDB, id: string): void {
@ -2248,7 +2259,6 @@ export function saveMessage(
const { const {
body, body,
conversationId, conversationId,
groupV2Change,
hasAttachments, hasAttachments,
hasFileAttachments, hasFileAttachments,
hasVisualMediaAttachments, hasVisualMediaAttachments,
@ -2257,6 +2267,7 @@ export function saveMessage(
isViewOnce, isViewOnce,
mentionsMe, mentionsMe,
received_at, received_at,
received_at_ms,
schemaVersion, schemaVersion,
sent_at, sent_at,
serverGuid, serverGuid,
@ -2264,13 +2275,22 @@ export function saveMessage(
sourceServiceId, sourceServiceId,
sourceDevice, sourceDevice,
storyId, storyId,
timestamp,
type, type,
readStatus, readStatus,
expireTimer, expireTimer,
expirationStartTimestamp, expirationStartTimestamp,
attachments, seenStatus: originalSeenStatus,
serverTimestamp,
unidentifiedDeliveryReceived,
...json
} = data; } = data;
let { seenStatus } = data;
// Extracted separately since we store this field in JSON
const { attachments, groupV2Change } = data;
let seenStatus = originalSeenStatus;
if (attachments) { if (attachments) {
strictAssert( strictAssert(
@ -2313,6 +2333,7 @@ export function saveMessage(
isViewOnce: isViewOnce ? 1 : 0, isViewOnce: isViewOnce ? 1 : 0,
mentionsMe: mentionsMe ? 1 : 0, mentionsMe: mentionsMe ? 1 : 0,
received_at: received_at || null, received_at: received_at || null,
received_at_ms: received_at_ms || null,
schemaVersion: schemaVersion || 0, schemaVersion: schemaVersion || 0,
serverGuid: serverGuid || null, serverGuid: serverGuid || null,
sent_at: sent_at || null, sent_at: sent_at || null,
@ -2321,43 +2342,22 @@ export function saveMessage(
sourceDevice: sourceDevice || null, sourceDevice: sourceDevice || null,
storyId: storyId || null, storyId: storyId || null,
type: type || null, type: type || null,
timestamp: timestamp ?? 0,
readStatus: readStatus ?? null, readStatus: readStatus ?? null,
seenStatus: seenStatus ?? SeenStatus.NotApplicable, seenStatus: seenStatus ?? SeenStatus.NotApplicable,
}; serverTimestamp: serverTimestamp ?? null,
unidentifiedDeliveryReceived: unidentifiedDeliveryReceived ? 1 : 0,
} satisfies Omit<MessageTypeUnhydrated, 'json'>;
if (id && !forceSave) { if (id && !forceSave) {
prepare( prepare(
db, db,
` `
UPDATE messages SET UPDATE messages SET
id = $id, ${MESSAGE_COLUMNS.map(name => `${name} = $${name}`).join(', ')}
json = $json,
body = $body,
conversationId = $conversationId,
expirationStartTimestamp = $expirationStartTimestamp,
expireTimer = $expireTimer,
hasAttachments = $hasAttachments,
hasFileAttachments = $hasFileAttachments,
hasVisualMediaAttachments = $hasVisualMediaAttachments,
isChangeCreatedByUs = $isChangeCreatedByUs,
isErased = $isErased,
isViewOnce = $isViewOnce,
mentionsMe = $mentionsMe,
received_at = $received_at,
schemaVersion = $schemaVersion,
serverGuid = $serverGuid,
sent_at = $sent_at,
source = $source,
sourceServiceId = $sourceServiceId,
sourceDevice = $sourceDevice,
storyId = $storyId,
type = $type,
readStatus = $readStatus,
seenStatus = $seenStatus
WHERE id = $id; WHERE id = $id;
` `
).run({ ...payloadWithoutJson, json: objectToJSON(data) }); ).run({ ...payloadWithoutJson, json: objectToJSON(json) });
if (jobToInsert) { if (jobToInsert) {
insertJob(db, jobToInsert); insertJob(db, jobToInsert);
@ -2366,79 +2366,28 @@ export function saveMessage(
return id; return id;
} }
const toCreate = { const createdId = id || generateMessageId(data.received_at).id;
...data,
id: id || generateMessageId(data.received_at).id,
};
prepare( prepare(
db, db,
` `
INSERT INTO messages ( INSERT INTO messages (
id, ${MESSAGE_COLUMNS.join(', ')}
json, ) VALUES (
${MESSAGE_COLUMNS.map(name => `$${name}`).join(', ')}
body,
conversationId,
expirationStartTimestamp,
expireTimer,
hasAttachments,
hasFileAttachments,
hasVisualMediaAttachments,
isChangeCreatedByUs,
isErased,
isViewOnce,
mentionsMe,
received_at,
schemaVersion,
serverGuid,
sent_at,
source,
sourceServiceId,
sourceDevice,
storyId,
type,
readStatus,
seenStatus
) values (
$id,
$json,
$body,
$conversationId,
$expirationStartTimestamp,
$expireTimer,
$hasAttachments,
$hasFileAttachments,
$hasVisualMediaAttachments,
$isChangeCreatedByUs,
$isErased,
$isViewOnce,
$mentionsMe,
$received_at,
$schemaVersion,
$serverGuid,
$sent_at,
$source,
$sourceServiceId,
$sourceDevice,
$storyId,
$type,
$readStatus,
$seenStatus
); );
` `
).run({ ).run({
...payloadWithoutJson, ...payloadWithoutJson,
id: toCreate.id, id: createdId,
json: objectToJSON(toCreate), json: objectToJSON(json),
}); });
if (jobToInsert) { if (jobToInsert) {
insertJob(db, jobToInsert); insertJob(db, jobToInsert);
} }
return toCreate.id; return createdId;
} }
function saveMessages( function saveMessages(
@ -2507,7 +2456,13 @@ export function getMessageById(
id: string id: string
): MessageType | undefined { ): MessageType | undefined {
const row = db const row = db
.prepare<Query>('SELECT json FROM messages WHERE id = $id;') .prepare<Query>(
`
SELECT ${MESSAGE_COLUMNS.join(', ')}
FROM messages
WHERE id = $id;
`
)
.get({ .get({
id, id,
}); });
@ -2516,7 +2471,7 @@ export function getMessageById(
return undefined; return undefined;
} }
return jsonToObject(row.json); return hydrateMessage(row);
} }
function getMessagesById( function getMessagesById(
@ -2528,22 +2483,30 @@ function getMessagesById(
messageIds, messageIds,
(batch: ReadonlyArray<string>): Array<MessageType> => { (batch: ReadonlyArray<string>): Array<MessageType> => {
const query = db.prepare<ArrayQuery>( const query = db.prepare<ArrayQuery>(
`SELECT json FROM messages WHERE id IN (${Array(batch.length) `
.fill('?') SELECT ${MESSAGE_COLUMNS.join(', ')}
.join(',')});` FROM messages
WHERE id IN (
${Array(batch.length).fill('?').join(',')}
);`
); );
const rows: JSONRows = query.all(batch); const rows: Array<MessageTypeUnhydrated> = query.all(batch);
return rows.map(row => jsonToObject(row.json)); return rows.map(row => hydrateMessage(row));
} }
); );
} }
function _getAllMessages(db: ReadableDB): Array<MessageType> { function _getAllMessages(db: ReadableDB): Array<MessageType> {
const rows: JSONRows = db const rows: Array<MessageTypeUnhydrated> = db
.prepare<EmptyQuery>('SELECT json FROM messages ORDER BY id ASC;') .prepare<EmptyQuery>(
`
SELECT ${MESSAGE_COLUMNS.join(', ')}
FROM messages ORDER BY id ASC
`
)
.all(); .all();
return rows.map(row => jsonToObject(row.json)); return rows.map(row => hydrateMessage(row));
} }
function _removeAllMessages(db: WritableDB): void { function _removeAllMessages(db: WritableDB): void {
db.exec(` db.exec(`
@ -2574,10 +2537,10 @@ function getMessageBySender(
sent_at: number; sent_at: number;
} }
): MessageType | undefined { ): MessageType | undefined {
const rows: JSONRows = prepare( const rows: Array<MessageTypeUnhydrated> = prepare(
db, db,
` `
SELECT json FROM messages WHERE SELECT ${MESSAGE_COLUMNS.join(', ')} FROM messages WHERE
(source = $source OR sourceServiceId = $sourceServiceId) AND (source = $source OR sourceServiceId = $sourceServiceId) AND
sourceDevice = $sourceDevice AND sourceDevice = $sourceDevice AND
sent_at = $sent_at sent_at = $sent_at
@ -2603,7 +2566,7 @@ function getMessageBySender(
return undefined; return undefined;
} }
return jsonToObject(rows[0].json); return hydrateMessage(rows[0]);
} }
export function _storyIdPredicate( export function _storyIdPredicate(
@ -2667,7 +2630,9 @@ function getUnreadByConversationAndMarkRead(
db.prepare(updateExpirationQuery).run(updateExpirationParams); db.prepare(updateExpirationQuery).run(updateExpirationParams);
const [selectQuery, selectParams] = sql` const [selectQuery, selectParams] = sql`
SELECT id, json FROM messages SELECT
${sqlJoin(MESSAGE_COLUMNS_FRAGMENTS)}
FROM messages
WHERE WHERE
conversationId = ${conversationId} AND conversationId = ${conversationId} AND
seenStatus = ${SeenStatus.Unseen} AND seenStatus = ${SeenStatus.Unseen} AND
@ -2701,7 +2666,7 @@ function getUnreadByConversationAndMarkRead(
db.prepare(updateStatusQuery).run(updateStatusParams); db.prepare(updateStatusQuery).run(updateStatusParams);
return rows.map(row => { return rows.map(row => {
const json = jsonToObject<MessageType>(row.json); const json = hydrateMessage(row);
return { return {
originalReadStatus: json.readStatus, originalReadStatus: json.readStatus,
readStatus: ReadStatus.Read, readStatus: ReadStatus.Read,
@ -2929,7 +2894,10 @@ function getRecentStoryReplies(
}; };
const createQuery = (timeFilter: QueryFragment): QueryFragment => sqlFragment` const createQuery = (timeFilter: QueryFragment): QueryFragment => sqlFragment`
SELECT json FROM messages WHERE SELECT
${sqlJoin(MESSAGE_COLUMNS_FRAGMENTS)}
FROM messages
WHERE
(${messageId} IS NULL OR id IS NOT ${messageId}) AND (${messageId} IS NULL OR id IS NOT ${messageId}) AND
isStory IS 0 AND isStory IS 0 AND
storyId IS ${storyId} AND storyId IS ${storyId} AND
@ -2940,9 +2908,9 @@ function getRecentStoryReplies(
`; `;
const template = sqlFragment` const template = sqlFragment`
SELECT first.json FROM (${createQuery(timeFilters.first)}) as first SELECT first.* FROM (${createQuery(timeFilters.first)}) as first
UNION ALL UNION ALL
SELECT second.json FROM (${createQuery(timeFilters.second)}) as second SELECT second.* FROM (${createQuery(timeFilters.second)}) as second
`; `;
const [query, params] = sql`${template} LIMIT ${limit}`; const [query, params] = sql`${template} LIMIT ${limit}`;
@ -2988,7 +2956,9 @@ function getAdjacentMessagesByConversation(
requireFileAttachments; requireFileAttachments;
const createQuery = (timeFilter: QueryFragment): QueryFragment => sqlFragment` const createQuery = (timeFilter: QueryFragment): QueryFragment => sqlFragment`
SELECT json FROM messages WHERE SELECT
${sqlJoin(MESSAGE_COLUMNS_FRAGMENTS)}
FROM messages WHERE
conversationId = ${conversationId} AND conversationId = ${conversationId} AND
${ ${
requireDifferentMessage requireDifferentMessage
@ -3014,15 +2984,15 @@ function getAdjacentMessagesByConversation(
`; `;
let template = sqlFragment` let template = sqlFragment`
SELECT first.json FROM (${createQuery(timeFilters.first)}) as first SELECT first.* FROM (${createQuery(timeFilters.first)}) as first
UNION ALL UNION ALL
SELECT second.json FROM (${createQuery(timeFilters.second)}) as second SELECT second.* FROM (${createQuery(timeFilters.second)}) as second
`; `;
// See `filterValidAttachments` in ts/state/ducks/lightbox.ts // See `filterValidAttachments` in ts/state/ducks/lightbox.ts
if (requireVisualMediaAttachments) { if (requireVisualMediaAttachments) {
template = sqlFragment` template = sqlFragment`
SELECT json SELECT messages.*
FROM (${template}) as messages FROM (${template}) as messages
WHERE WHERE
( (
@ -3037,7 +3007,7 @@ function getAdjacentMessagesByConversation(
`; `;
} else if (requireFileAttachments) { } else if (requireFileAttachments) {
template = sqlFragment` template = sqlFragment`
SELECT json SELECT messages.*
FROM (${template}) as messages FROM (${template}) as messages
WHERE WHERE
( (
@ -3086,7 +3056,7 @@ function getAllStories(
} }
): GetAllStoriesResultType { ): GetAllStoriesResultType {
const [storiesQuery, storiesParams] = sql` const [storiesQuery, storiesParams] = sql`
SELECT json, id SELECT ${sqlJoin(MESSAGE_COLUMNS_FRAGMENTS)}
FROM messages FROM messages
WHERE WHERE
isStory = 1 AND isStory = 1 AND
@ -3094,10 +3064,7 @@ function getAllStories(
(${sourceServiceId} IS NULL OR sourceServiceId IS ${sourceServiceId}) (${sourceServiceId} IS NULL OR sourceServiceId IS ${sourceServiceId})
ORDER BY received_at ASC, sent_at ASC; ORDER BY received_at ASC, sent_at ASC;
`; `;
const rows: ReadonlyArray<{ const rows = db.prepare(storiesQuery).all(storiesParams);
id: string;
json: string;
}> = db.prepare(storiesQuery).all(storiesParams);
const [repliesQuery, repliesParams] = sql` const [repliesQuery, repliesParams] = sql`
SELECT DISTINCT storyId SELECT DISTINCT storyId
@ -3126,7 +3093,7 @@ function getAllStories(
); );
return rows.map(row => ({ return rows.map(row => ({
...jsonToObject(row.json), ...hydrateMessage(row),
hasReplies: Boolean(repliesLookup.has(row.id)), hasReplies: Boolean(repliesLookup.has(row.id)),
hasRepliesFromSelf: Boolean(repliesFromSelfLookup.has(row.id)), hasRepliesFromSelf: Boolean(repliesFromSelfLookup.has(row.id)),
})); }));
@ -3301,7 +3268,7 @@ function getLastConversationActivity(
const row = prepare( const row = prepare(
db, db,
` `
SELECT json FROM messages SELECT ${MESSAGE_COLUMNS.join(', ')} FROM messages
INDEXED BY messages_activity INDEXED BY messages_activity
WHERE WHERE
conversationId IS $conversationId AND conversationId IS $conversationId AND
@ -3320,7 +3287,7 @@ function getLastConversationActivity(
return undefined; return undefined;
} }
return jsonToObject(row.json); return hydrateMessage(row);
} }
function getLastConversationPreview( function getLastConversationPreview(
db: ReadableDB, db: ReadableDB,
@ -3332,19 +3299,15 @@ function getLastConversationPreview(
includeStoryReplies: boolean; includeStoryReplies: boolean;
} }
): MessageType | undefined { ): MessageType | undefined {
type Row = Readonly<{
json: string;
}>;
const index = includeStoryReplies const index = includeStoryReplies
? 'messages_preview' ? 'messages_preview'
: 'messages_preview_without_story'; : 'messages_preview_without_story';
const row: Row | undefined = prepare( const row: MessageTypeUnhydrated | undefined = prepare(
db, db,
` `
SELECT json FROM ( SELECT ${MESSAGE_COLUMNS.join(', ')}, expiresAt FROM (
SELECT json, expiresAt FROM messages SELECT ${MESSAGE_COLUMNS.join(', ')}, expiresAt FROM messages
INDEXED BY ${index} INDEXED BY ${index}
WHERE WHERE
conversationId IS $conversationId AND conversationId IS $conversationId AND
@ -3361,7 +3324,7 @@ function getLastConversationPreview(
now: Date.now(), now: Date.now(),
}); });
return row ? jsonToObject(row.json) : undefined; return row ? hydrateMessage(row) : undefined;
} }
function getConversationMessageStats( function getConversationMessageStats(
@ -3400,7 +3363,7 @@ function getLastConversationMessage(
const row = db const row = db
.prepare<Query>( .prepare<Query>(
` `
SELECT json FROM messages WHERE SELECT ${MESSAGE_COLUMNS.join(', ')} FROM messages WHERE
conversationId = $conversationId conversationId = $conversationId
ORDER BY received_at DESC, sent_at DESC ORDER BY received_at DESC, sent_at DESC
LIMIT 1; LIMIT 1;
@ -3414,7 +3377,7 @@ function getLastConversationMessage(
return undefined; return undefined;
} }
return jsonToObject(row.json); return hydrateMessage(row);
} }
function getOldestUnseenMessageForConversation( function getOldestUnseenMessageForConversation(
@ -3730,7 +3693,7 @@ function getCallHistoryMessageByCallId(
} }
): MessageType | undefined { ): MessageType | undefined {
const [query, params] = sql` const [query, params] = sql`
SELECT json SELECT ${sqlJoin(MESSAGE_COLUMNS_FRAGMENTS)}
FROM messages FROM messages
WHERE conversationId = ${options.conversationId} WHERE conversationId = ${options.conversationId}
AND type = 'call-history' AND type = 'call-history'
@ -3740,7 +3703,7 @@ function getCallHistoryMessageByCallId(
if (row == null) { if (row == null) {
return; return;
} }
return jsonToObject(row.json); return hydrateMessage(row);
} }
function getCallHistory( function getCallHistory(
@ -4524,45 +4487,57 @@ function getMessagesBySentAt(
db: ReadableDB, db: ReadableDB,
sentAt: number sentAt: number
): Array<MessageType> { ): Array<MessageType> {
// Make sure to preserve order of columns
const editedColumns = MESSAGE_COLUMNS_FRAGMENTS.map(name => {
if (name.fragment === 'received_at' || name.fragment === 'sent_at') {
return name;
}
return sqlFragment`messages.${name}`;
});
const [query, params] = sql` const [query, params] = sql`
SELECT messages.json, received_at, sent_at FROM edited_messages SELECT ${sqlJoin(editedColumns)}
FROM edited_messages
INNER JOIN messages ON INNER JOIN messages ON
messages.id = edited_messages.messageId messages.id = edited_messages.messageId
WHERE edited_messages.sentAt = ${sentAt} WHERE edited_messages.sentAt = ${sentAt}
UNION UNION
SELECT json, received_at, sent_at FROM messages SELECT ${sqlJoin(MESSAGE_COLUMNS_FRAGMENTS)}
FROM messages
WHERE sent_at = ${sentAt} WHERE sent_at = ${sentAt}
ORDER BY messages.received_at DESC, messages.sent_at DESC; ORDER BY messages.received_at DESC, messages.sent_at DESC;
`; `;
const rows = db.prepare(query).all(params); const rows = db.prepare(query).all(params);
return rows.map(row => jsonToObject(row.json)); return rows.map(row => hydrateMessage(row));
} }
function getExpiredMessages(db: ReadableDB): Array<MessageType> { function getExpiredMessages(db: ReadableDB): Array<MessageType> {
const now = Date.now(); const now = Date.now();
const rows: JSONRows = db const rows: Array<MessageTypeUnhydrated> = db
.prepare<Query>( .prepare<Query>(
` `
SELECT json FROM messages WHERE SELECT ${sqlJoin(MESSAGE_COLUMNS_FRAGMENTS)}, expiresAt
FROM messages
WHERE
expiresAt <= $now expiresAt <= $now
ORDER BY expiresAt ASC; ORDER BY expiresAt ASC;
` `
) )
.all({ now }); .all({ now });
return rows.map(row => jsonToObject(row.json)); return rows.map(row => hydrateMessage(row));
} }
function getMessagesUnexpectedlyMissingExpirationStartTimestamp( function getMessagesUnexpectedlyMissingExpirationStartTimestamp(
db: ReadableDB db: ReadableDB
): Array<MessageType> { ): Array<MessageType> {
const rows: JSONRows = db const rows: Array<MessageTypeUnhydrated> = db
.prepare<EmptyQuery>( .prepare<EmptyQuery>(
` `
SELECT json FROM messages SELECT ${MESSAGE_COLUMNS.join(', ')} FROM messages
INDEXED BY messages_unexpectedly_missing_expiration_start_timestamp INDEXED BY messages_unexpectedly_missing_expiration_start_timestamp
WHERE WHERE
expireTimer > 0 AND expireTimer > 0 AND
@ -4579,7 +4554,7 @@ function getMessagesUnexpectedlyMissingExpirationStartTimestamp(
) )
.all(); .all();
return rows.map(row => jsonToObject(row.json)); return rows.map(row => hydrateMessage(row));
} }
function getSoonestMessageExpiry(db: ReadableDB): undefined | number { function getSoonestMessageExpiry(db: ReadableDB): undefined | number {
@ -4607,7 +4582,7 @@ function getNextTapToViewMessageTimestampToAgeOut(
const row = db const row = db
.prepare<EmptyQuery>( .prepare<EmptyQuery>(
` `
SELECT json FROM messages SELECT ${MESSAGE_COLUMNS.join(', ')} FROM messages
WHERE WHERE
-- we want this query to use the messages_view_once index rather than received_at -- we want this query to use the messages_view_once index rather than received_at
likelihood(isViewOnce = 1, 0.01) likelihood(isViewOnce = 1, 0.01)
@ -4621,7 +4596,7 @@ function getNextTapToViewMessageTimestampToAgeOut(
if (!row) { if (!row) {
return undefined; return undefined;
} }
const data = jsonToObject<MessageType>(row.json); const data = hydrateMessage(row);
const result = data.received_at_ms; const result = data.received_at_ms;
return isNormalNumber(result) ? result : undefined; return isNormalNumber(result) ? result : undefined;
} }
@ -4630,16 +4605,16 @@ function getTapToViewMessagesNeedingErase(
db: ReadableDB, db: ReadableDB,
maxTimestamp: number maxTimestamp: number
): Array<MessageType> { ): Array<MessageType> {
const rows: JSONRows = db const rows: Array<MessageTypeUnhydrated> = db
.prepare<Query>( .prepare<Query>(
` `
SELECT json SELECT ${MESSAGE_COLUMNS.join(', ')}
FROM messages FROM messages
WHERE WHERE
isViewOnce = 1 isViewOnce = 1
AND (isErased IS NULL OR isErased != 1) AND (isErased IS NULL OR isErased != 1)
AND ( AND (
IFNULL(json ->> '$.received_at_ms', 0) <= $maxTimestamp IFNULL(received_at_ms, 0) <= $maxTimestamp
) )
` `
) )
@ -4647,7 +4622,7 @@ function getTapToViewMessagesNeedingErase(
maxTimestamp, maxTimestamp,
}); });
return rows.map(row => jsonToObject(row.json)); return rows.map(row => hydrateMessage(row));
} }
const MAX_UNPROCESSED_ATTEMPTS = 10; const MAX_UNPROCESSED_ATTEMPTS = 10;
@ -6716,10 +6691,10 @@ function getMessagesNeedingUpgrade(
limit: number, limit: number,
{ maxVersion }: { maxVersion: number } { maxVersion }: { maxVersion: number }
): Array<MessageType> { ): Array<MessageType> {
const rows: JSONRows = db const rows: Array<MessageTypeUnhydrated> = db
.prepare<Query>( .prepare<Query>(
` `
SELECT json SELECT ${MESSAGE_COLUMNS.join(', ')}
FROM messages FROM messages
WHERE WHERE
(schemaVersion IS NULL OR schemaVersion < $maxVersion) AND (schemaVersion IS NULL OR schemaVersion < $maxVersion) AND
@ -6736,7 +6711,7 @@ function getMessagesNeedingUpgrade(
limit, limit,
}); });
return rows.map(row => jsonToObject(row.json)); return rows.map(row => hydrateMessage(row));
} }
// Exported for tests // Exported for tests
@ -7023,12 +6998,14 @@ function pageMessages(
rowids, rowids,
(batch: ReadonlyArray<number>): Array<MessageType> => { (batch: ReadonlyArray<number>): Array<MessageType> => {
const query = writable.prepare<ArrayQuery>( const query = writable.prepare<ArrayQuery>(
`SELECT json FROM messages WHERE rowid IN (${Array(batch.length) `
.fill('?') SELECT ${MESSAGE_COLUMNS.join(', ')}
.join(',')});` FROM messages
WHERE rowid IN (${Array(batch.length).fill('?').join(',')});
`
); );
const rows: JSONRows = query.all(batch); const rows: Array<MessageTypeUnhydrated> = query.all(batch);
return rows.map(row => jsonToObject(row.json)); return rows.map(row => hydrateMessage(row));
} }
); );
@ -7446,11 +7423,14 @@ function getUnreadEditedMessagesAndMarkRead(
} }
): GetUnreadByConversationAndMarkReadResultType { ): GetUnreadByConversationAndMarkReadResultType {
return db.transaction(() => { return db.transaction(() => {
const editedColumns = MESSAGE_COLUMNS_FRAGMENTS.filter(
name => name.fragment !== 'sent_at' && name.fragment !== 'readStatus'
).map(name => sqlFragment`messages.${name}`);
const [selectQuery, selectParams] = sql` const [selectQuery, selectParams] = sql`
SELECT SELECT
messages.id, ${sqlJoin(editedColumns)},
messages.json, edited_messages.sentAt as sent_at,
edited_messages.sentAt,
edited_messages.readStatus edited_messages.readStatus
FROM edited_messages FROM edited_messages
JOIN messages JOIN messages
@ -7481,7 +7461,7 @@ function getUnreadEditedMessagesAndMarkRead(
} }
return rows.map(row => { return rows.map(row => {
const json = jsonToObject<MessageType>(row.json); const json = hydrateMessage(row);
return { return {
originalReadStatus: row.readStatus, originalReadStatus: row.readStatus,
readStatus: ReadStatus.Read, readStatus: ReadStatus.Read,
@ -7494,8 +7474,6 @@ function getUnreadEditedMessagesAndMarkRead(
'sourceServiceId', 'sourceServiceId',
'type', 'type',
]), ]),
// Use the edited message timestamp
sent_at: row.sentAt,
}; };
}); });
})(); })();

88
ts/sql/hydration.ts Normal file
View file

@ -0,0 +1,88 @@
// Copyright 2025 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import type { ReadStatus } from '../messages/MessageReadStatus';
import type { SeenStatus } from '../MessageSeenStatus';
import type { ServiceIdString } from '../types/ServiceId';
import { dropNull } from '../util/dropNull';
/* eslint-disable camelcase */
import type {
MessageTypeUnhydrated,
MessageType,
MESSAGE_COLUMNS,
} from './Interface';
function toBoolean(value: number | null): boolean | undefined {
if (value == null) {
return undefined;
}
return value === 1;
}
export function hydrateMessage(row: MessageTypeUnhydrated): MessageType {
const {
json,
id,
body,
conversationId,
expirationStartTimestamp,
expireTimer,
hasAttachments,
hasFileAttachments,
hasVisualMediaAttachments,
isErased,
isViewOnce,
mentionsMe,
received_at,
received_at_ms,
schemaVersion,
serverGuid,
sent_at,
source,
sourceServiceId,
sourceDevice,
storyId,
type,
readStatus,
seenStatus,
timestamp,
serverTimestamp,
unidentifiedDeliveryReceived,
} = row;
return {
...(JSON.parse(json) as Omit<
MessageType,
(typeof MESSAGE_COLUMNS)[number]
>),
id,
body: dropNull(body),
conversationId: conversationId || '',
expirationStartTimestamp: dropNull(expirationStartTimestamp),
expireTimer: dropNull(expireTimer) as MessageType['expireTimer'],
hasAttachments: toBoolean(hasAttachments),
hasFileAttachments: toBoolean(hasFileAttachments),
hasVisualMediaAttachments: toBoolean(hasVisualMediaAttachments),
isErased: toBoolean(isErased),
isViewOnce: toBoolean(isViewOnce),
mentionsMe: toBoolean(mentionsMe),
received_at: received_at || 0,
received_at_ms: dropNull(received_at_ms),
schemaVersion: dropNull(schemaVersion),
serverGuid: dropNull(serverGuid),
sent_at: sent_at || 0,
source: dropNull(source),
sourceServiceId: dropNull(sourceServiceId) as ServiceIdString | undefined,
sourceDevice: dropNull(sourceDevice),
storyId: dropNull(storyId),
type: type as MessageType['type'],
readStatus: readStatus == null ? undefined : (readStatus as ReadStatus),
seenStatus: seenStatus == null ? undefined : (seenStatus as SeenStatus),
timestamp: timestamp || 0,
serverTimestamp: dropNull(serverTimestamp),
unidentifiedDeliveryReceived: toBoolean(unidentifiedDeliveryReceived),
};
}

View file

@ -0,0 +1,53 @@
// Copyright 2025 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import type { Database } from '@signalapp/better-sqlite3';
import type { LoggerType } from '../../types/Logging';
import { sql } from '../util';
export const version = 1270;
export function updateToSchemaVersion1270(
currentVersion: number,
db: Database,
logger: LoggerType
): void {
if (currentVersion >= 1270) {
return;
}
db.transaction(() => {
const [query] = sql`
ALTER TABLE messages
ADD COLUMN timestamp INTEGER;
ALTER TABLE messages
ADD COLUMN received_at_ms INTEGER;
ALTER TABLE messages
ADD COLUMN unidentifiedDeliveryReceived INTEGER;
ALTER TABLE messages
ADD COLUMN serverTimestamp INTEGER;
ALTER TABLE messages
RENAME COLUMN source TO legacySource;
ALTER TABLE messages
ADD COLUMN source TEXT;
UPDATE messages SET
timestamp = json_extract(json, '$.timestamp'),
received_at_ms = json_extract(json, '$.received_at_ms'),
unidentifiedDeliveryReceived =
json_extract(json, '$.unidentifiedDeliveryReceived'),
serverTimestamp =
json_extract(json, '$.serverTimestamp'),
source = IFNULL(json_extract(json, '$.source'), '+' || legacySource);
ALTER TABLE messages
DROP COLUMN legacySource;
`;
db.exec(query);
db.pragma('user_version = 1270');
})();
logger.info('updateToSchemaVersion1270: success!');
}

View file

@ -4,7 +4,7 @@
import type { LoggerType } from '../../types/Logging'; import type { LoggerType } from '../../types/Logging';
import { isRecord } from '../../util/isRecord'; import { isRecord } from '../../util/isRecord';
import type { WritableDB } from '../Interface'; import type { WritableDB } from '../Interface';
import { getJobsInQueue, getMessageById, insertJob } from '../Server'; import { getJobsInQueue, insertJob } from '../Server';
export default function updateToSchemaVersion51( export default function updateToSchemaVersion51(
currentVersion: number, currentVersion: number,
@ -24,6 +24,10 @@ export default function updateToSchemaVersion51(
const reactionsJobs = getJobsInQueue(db, 'reactions'); const reactionsJobs = getJobsInQueue(db, 'reactions');
deleteJobsInQueue.run({ queueType: 'reactions' }); deleteJobsInQueue.run({ queueType: 'reactions' });
const getMessageById = db.prepare(
'SELECT conversationId FROM messages WHERE id IS ?'
);
reactionsJobs.forEach(job => { reactionsJobs.forEach(job => {
const { data, id } = job; const { data, id } = job;
@ -42,7 +46,7 @@ export default function updateToSchemaVersion51(
return; return;
} }
const message = getMessageById(db, messageId); const message = getMessageById.get(messageId);
if (!message) { if (!message) {
logger.warn( logger.warn(
`updateToSchemaVersion51: Unable to find message for reaction job ${id}` `updateToSchemaVersion51: Unable to find message for reaction job ${id}`

View file

@ -4,7 +4,7 @@
import type { LoggerType } from '../../types/Logging'; import type { LoggerType } from '../../types/Logging';
import { isRecord } from '../../util/isRecord'; import { isRecord } from '../../util/isRecord';
import type { WritableDB } from '../Interface'; import type { WritableDB } from '../Interface';
import { getJobsInQueue, getMessageById, insertJob } from '../Server'; import { getJobsInQueue, insertJob } from '../Server';
export default function updateToSchemaVersion78( export default function updateToSchemaVersion78(
currentVersion: number, currentVersion: number,
@ -41,6 +41,10 @@ export default function updateToSchemaVersion78(
}, },
]; ];
const getMessageById = db.prepare(
'SELECT conversationId FROM messages WHERE id IS ?'
);
for (const queue of queues) { for (const queue of queues) {
const prevJobs = getJobsInQueue(db, queue.queueType); const prevJobs = getJobsInQueue(db, queue.queueType);
deleteJobsInQueue.run({ queueType: queue.queueType }); deleteJobsInQueue.run({ queueType: queue.queueType });
@ -62,7 +66,7 @@ export default function updateToSchemaVersion78(
return; return;
} }
const message = getMessageById(db, messageId); const message = getMessageById.get(messageId);
if (!message) { if (!message) {
logger.warn( logger.warn(
`updateToSchemaVersion78: Unable to find message for ${queue.queueType} job ${id}` `updateToSchemaVersion78: Unable to find message for ${queue.queueType} job ${id}`

View file

@ -102,10 +102,11 @@ import { updateToSchemaVersion1220 } from './1220-blob-sessions';
import { updateToSchemaVersion1230 } from './1230-call-links-admin-key-index'; import { updateToSchemaVersion1230 } from './1230-call-links-admin-key-index';
import { updateToSchemaVersion1240 } from './1240-defunct-call-links-table'; import { updateToSchemaVersion1240 } from './1240-defunct-call-links-table';
import { updateToSchemaVersion1250 } from './1250-defunct-call-links-storage'; import { updateToSchemaVersion1250 } from './1250-defunct-call-links-storage';
import { updateToSchemaVersion1260 } from './1260-sync-tasks-rowid';
import { import {
updateToSchemaVersion1260, updateToSchemaVersion1270,
version as MAX_VERSION, version as MAX_VERSION,
} from './1260-sync-tasks-rowid'; } from './1270-normalize-messages';
import { DataWriter } from '../Server'; import { DataWriter } from '../Server';
function updateToSchemaVersion1( function updateToSchemaVersion1(
@ -2078,6 +2079,7 @@ export const SCHEMA_VERSIONS = [
updateToSchemaVersion1240, updateToSchemaVersion1240,
updateToSchemaVersion1250, updateToSchemaVersion1250,
updateToSchemaVersion1260, updateToSchemaVersion1260,
updateToSchemaVersion1270,
]; ];
export class DBVersionFromFutureError extends Error { export class DBVersionFromFutureError extends Error {

View file

@ -11,7 +11,8 @@ export type ArrayQuery = Array<ReadonlyArray<null | number | bigint | string>>;
export type Query = { export type Query = {
[key: string]: null | number | bigint | string | Uint8Array; [key: string]: null | number | bigint | string | Uint8Array;
}; };
export type JSONRows = Array<{ readonly json: string }>; export type JSONRow = Readonly<{ json: string }>;
export type JSONRows = Array<JSONRow>;
export type TableType = export type TableType =
| 'attachment_downloads' | 'attachment_downloads'

View file

@ -166,8 +166,8 @@ describe('backup/attachments', () => {
// path & iv will not be roundtripped // path & iv will not be roundtripped
[ [
composeMessage(1, { composeMessage(1, {
hasAttachments: 1, hasAttachments: true,
hasVisualMediaAttachments: 1, hasVisualMediaAttachments: true,
attachments: [ attachments: [
omit(longMessageAttachment, NON_ROUNDTRIPPED_FIELDS), omit(longMessageAttachment, NON_ROUNDTRIPPED_FIELDS),
omit(normalAttachment, NON_ROUNDTRIPPED_FIELDS), omit(normalAttachment, NON_ROUNDTRIPPED_FIELDS),
@ -284,8 +284,8 @@ describe('backup/attachments', () => {
// path & iv will not be roundtripped // path & iv will not be roundtripped
[ [
composeMessage(1, { composeMessage(1, {
hasAttachments: 1, hasAttachments: true,
hasVisualMediaAttachments: 1, hasVisualMediaAttachments: true,
attachments: [ attachments: [
omit(attachment1, NON_ROUNDTRIPPED_FIELDS), omit(attachment1, NON_ROUNDTRIPPED_FIELDS),
omit(attachment2, NON_ROUNDTRIPPED_FIELDS), omit(attachment2, NON_ROUNDTRIPPED_FIELDS),
@ -307,8 +307,8 @@ describe('backup/attachments', () => {
], ],
[ [
composeMessage(1, { composeMessage(1, {
hasAttachments: 1, hasAttachments: true,
hasVisualMediaAttachments: 1, hasVisualMediaAttachments: true,
// path, iv, and uploadTimestamp will not be roundtripped, // path, iv, and uploadTimestamp will not be roundtripped,
// but there will be a backupLocator // but there will be a backupLocator
@ -341,7 +341,7 @@ describe('backup/attachments', () => {
], ],
[ [
composeMessage(1, { composeMessage(1, {
hasAttachments: 1, hasAttachments: true,
attachments: [ attachments: [
{ {
...omit(attachment, NON_ROUNDTRIPPED_BACKUP_LOCATOR_FIELDS), ...omit(attachment, NON_ROUNDTRIPPED_BACKUP_LOCATOR_FIELDS),
@ -604,8 +604,8 @@ describe('backup/attachments', () => {
[ [
{ {
...existingMessage, ...existingMessage,
hasAttachments: 1, hasAttachments: true,
hasVisualMediaAttachments: 1, hasVisualMediaAttachments: true,
attachments: [ attachments: [
{ {
...omit( ...omit(

View file

@ -110,10 +110,20 @@ function sortAndNormalize(
// Get rid of unserializable `undefined` values. // Get rid of unserializable `undefined` values.
return JSON.parse( return JSON.parse(
JSON.stringify({ JSON.stringify({
// Migration defaults // Defaults
hasAttachments: 0, hasAttachments: false,
hasFileAttachments: false,
hasVisualMediaAttachments: false,
isErased: false,
isViewOnce: false,
mentionsMe: false,
seenStatus: 0,
readStatus: 0,
unidentifiedDeliveryReceived: false,
// Drop more `undefined` values
...JSON.parse(JSON.stringify(rest)),
...rest,
conversationId: mapConvoId(conversationId), conversationId: mapConvoId(conversationId),
reactions: reactions?.map(({ fromId, ...restOfReaction }) => { reactions: reactions?.map(({ fromId, ...restOfReaction }) => {
return { return {

View file

@ -76,7 +76,6 @@ describe('backup/non-bubble messages', () => {
flags: Proto.DataMessage.Flags.END_SESSION, flags: Proto.DataMessage.Flags.END_SESSION,
attachments: [], attachments: [],
contact: [], contact: [],
hasAttachments: 0,
}, },
]); ]);
}); });

View file

@ -6,11 +6,11 @@ import { v4 as generateGuid } from 'uuid';
import { import {
dequeueOldestSyncTasks, dequeueOldestSyncTasks,
getMostRecentAddressableMessages,
removeSyncTaskById, removeSyncTaskById,
saveSyncTasks, saveSyncTasks,
} from '../../sql/Server'; } from '../../sql/Server';
import type { WritableDB } from '../../sql/Interface'; import type { WritableDB, ReadableDB, MessageType } from '../../sql/Interface';
import { sql, jsonToObject } from '../../sql/util';
import { insertData, updateToVersion, createDB } from './helpers'; import { insertData, updateToVersion, createDB } from './helpers';
import { MAX_SYNC_TASK_ATTEMPTS } from '../../util/syncTasks.types'; import { MAX_SYNC_TASK_ATTEMPTS } from '../../util/syncTasks.types';
import { WEEK } from '../../util/durations'; import { WEEK } from '../../util/durations';
@ -20,6 +20,27 @@ import type { SyncTaskType } from '../../util/syncTasks';
/* eslint-disable camelcase */ /* eslint-disable camelcase */
// Snapshot before: 1270
export function getMostRecentAddressableMessages(
db: ReadableDB,
conversationId: string,
limit = 5
): Array<MessageType> {
const [query, parameters] = sql`
SELECT json FROM messages
INDEXED BY messages_by_date_addressable
WHERE
conversationId IS ${conversationId} AND
isAddressableMessage = 1
ORDER BY received_at DESC, sent_at DESC
LIMIT ${limit};
`;
const rows = db.prepare(query).all(parameters);
return rows.map(row => jsonToObject(row.json));
}
function generateMessage(json: MessageAttributesType) { function generateMessage(json: MessageAttributesType) {
const { conversationId, received_at, sent_at, type } = json; const { conversationId, received_at, sent_at, type } = json;

View file

@ -4,8 +4,8 @@
import { assert } from 'chai'; import { assert } from 'chai';
import { v4 as generateGuid } from 'uuid'; import { v4 as generateGuid } from 'uuid';
import type { WritableDB } from '../../sql/Interface'; import type { WritableDB, ReadableDB, MessageType } from '../../sql/Interface';
import { getMostRecentAddressableNondisappearingMessages } from '../../sql/Server'; import { sql, jsonToObject } from '../../sql/util';
import { createDB, insertData, updateToVersion } from './helpers'; import { createDB, insertData, updateToVersion } from './helpers';
import type { MessageAttributesType } from '../../model-types'; import type { MessageAttributesType } from '../../model-types';
@ -26,6 +26,28 @@ function generateMessage(json: MessageAttributesType) {
}; };
} }
// Snapshot before: 1270
export function getMostRecentAddressableNondisappearingMessages(
db: ReadableDB,
conversationId: string,
limit = 5
): Array<MessageType> {
const [query, parameters] = sql`
SELECT json FROM messages
INDEXED BY messages_by_date_addressable_nondisappearing
WHERE
expireTimer IS NULL AND
conversationId IS ${conversationId} AND
isAddressableMessage = 1
ORDER BY received_at DESC, sent_at DESC
LIMIT ${limit};
`;
const rows = db.prepare(query).all(parameters);
return rows.map(row => jsonToObject(row.json));
}
describe('SQL/updateToSchemaVersion1080', () => { describe('SQL/updateToSchemaVersion1080', () => {
let db: WritableDB; let db: WritableDB;
beforeEach(() => { beforeEach(() => {

View file

@ -1351,10 +1351,8 @@ describe('SQL migrations test', () => {
db.exec( db.exec(
` `
INSERT INTO messages INSERT INTO messages
(id, json) (id, conversationId)
VALUES ('${MESSAGE_ID_1}', '${JSON.stringify({ VALUES ('${MESSAGE_ID_1}', '${CONVERSATION_ID_1}');
conversationId: CONVERSATION_ID_1,
})}')
` `
); );
@ -2482,10 +2480,8 @@ describe('SQL migrations test', () => {
db.exec( db.exec(
` `
INSERT INTO messages INSERT INTO messages
(id, json) (id, conversationId)
VALUES ('${MESSAGE_ID_1}', '${JSON.stringify({ VALUES ('${MESSAGE_ID_1}', '${CONVERSATION_ID_1}');
conversationId: CONVERSATION_ID_1,
})}')
` `
); );

View file

@ -126,11 +126,9 @@ describe('Message', () => {
it('should initialize schema version to zero', () => { it('should initialize schema version to zero', () => {
const input = getDefaultMessage({ const input = getDefaultMessage({
body: 'Imagine there is no heaven…', body: 'Imagine there is no heaven…',
attachments: [],
}); });
const expected = getDefaultMessage({ const expected = getDefaultMessage({
body: 'Imagine there is no heaven…', body: 'Imagine there is no heaven…',
attachments: [],
schemaVersion: 0, schemaVersion: 0,
}); });
@ -203,7 +201,6 @@ describe('Message', () => {
hasVisualMediaAttachments: undefined, hasVisualMediaAttachments: undefined,
hasFileAttachments: undefined, hasFileAttachments: undefined,
schemaVersion: Message.CURRENT_SCHEMA_VERSION, schemaVersion: Message.CURRENT_SCHEMA_VERSION,
contact: [],
}); });
const expectedAttachmentData = 'Its easy if you try'; const expectedAttachmentData = 'Its easy if you try';
@ -655,7 +652,6 @@ describe('Message', () => {
}); });
const expected = getDefaultMessage({ const expected = getDefaultMessage({
body: 'hey there!', body: 'hey there!',
contact: [],
}); });
const result = await upgradeVersion(message, getDefaultContext()); const result = await upgradeVersion(message, getDefaultContext());
assert.deepEqual(result, expected); assert.deepEqual(result, expected);
@ -848,7 +844,6 @@ describe('Message', () => {
key: 'key', key: 'key',
}, },
], ],
contact: [],
}); });
const result = await Message.upgradeSchema(message, { const result = await Message.upgradeSchema(message, {
...getDefaultContext(), ...getDefaultContext(),

View file

@ -285,6 +285,10 @@ export const _mapAttachments =
message: MessageAttributesType, message: MessageAttributesType,
context: ContextType context: ContextType
): Promise<MessageAttributesType> => { ): Promise<MessageAttributesType> => {
if (!message.attachments?.length) {
return message;
}
const upgradeWithContext = esbuildAnonymize((attachment: AttachmentType) => const upgradeWithContext = esbuildAnonymize((attachment: AttachmentType) =>
upgradeAttachment(attachment, context, message) upgradeAttachment(attachment, context, message)
); );
@ -356,6 +360,10 @@ export const _mapContact =
message: MessageAttributesType, message: MessageAttributesType,
context: ContextType context: ContextType
): Promise<MessageAttributesType> => { ): Promise<MessageAttributesType> => {
if (!message.contact?.length) {
return message;
}
const upgradeWithContext = esbuildAnonymize( const upgradeWithContext = esbuildAnonymize(
(contact: EmbeddedContactType) => (contact: EmbeddedContactType) =>
upgradeContact(contact, context, message) upgradeContact(contact, context, message)
@ -501,23 +509,25 @@ const toVersion10 = _withSchemaVersion({
schemaVersion: 10, schemaVersion: 10,
upgrade: async (message, context) => { upgrade: async (message, context) => {
const processPreviews = _mapPreviewAttachments(migrateDataToFileSystem); const processPreviews = _mapPreviewAttachments(migrateDataToFileSystem);
const processSticker = async ( const processSticker = esbuildAnonymize(
stickerMessage: MessageAttributesType, async (
stickerContext: ContextType stickerMessage: MessageAttributesType,
): Promise<MessageAttributesType> => { stickerContext: ContextType
const { sticker } = stickerMessage; ): Promise<MessageAttributesType> => {
if (!sticker || !sticker.data || !sticker.data.data) { const { sticker } = stickerMessage;
return stickerMessage; if (!sticker || !sticker.data || !sticker.data.data) {
} return stickerMessage;
}
return { return {
...stickerMessage, ...stickerMessage,
sticker: { sticker: {
...sticker, ...sticker,
data: await migrateDataToFileSystem(sticker.data, stickerContext), data: await migrateDataToFileSystem(sticker.data, stickerContext),
}, },
}; };
}; }
);
const previewProcessed = await processPreviews(message, context); const previewProcessed = await processPreviews(message, context);
const stickerProcessed = await processSticker(previewProcessed, context); const stickerProcessed = await processSticker(previewProcessed, context);