Refactor outbound delivery state

This commit is contained in:
Evan Hahn 2021-07-09 16:38:51 -05:00 committed by GitHub
parent 831ec98418
commit 9c48a95eb5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 3200 additions and 697 deletions

View file

@ -48,7 +48,6 @@ import {
ItemKeyType,
ItemType,
MessageType,
MessageTypeUnhydrated,
MessageMetricsType,
PreKeyType,
SearchResultMessageType,
@ -62,6 +61,11 @@ import {
UnprocessedType,
UnprocessedUpdateType,
} from './Interface';
import {
SendState,
serializeSendStateForDatabase,
} from '../messages/MessageSendState';
import { MessageRowWithJoinedSends, rowToMessage } from './rowToMessage';
declare global {
// We want to extend `Function`'s properties, so we need to use an interface.
@ -179,6 +183,7 @@ const dataInterface: ServerInterface = {
getMessageBySender,
getMessageById,
_getAllMessages,
_getSendStates,
getAllMessageIds,
getMessagesBySentAt,
getExpiredMessages,
@ -194,6 +199,8 @@ const dataInterface: ServerInterface = {
hasGroupCallHistoryMessage,
migrateConversationMessages,
updateMessageSendState,
getUnprocessedCount,
getAllUnprocessed,
updateUnprocessedAttempts,
@ -279,6 +286,7 @@ function objectToJSON(data: any) {
function jsonToObject(json: string): any {
return JSON.parse(json);
}
function rowToConversation(row: ConversationRow): ConversationType {
const parsedJson = JSON.parse(row.json);
@ -298,6 +306,7 @@ function rowToConversation(row: ConversationRow): ConversationType {
profileLastFetchedAt,
};
}
function rowToSticker(row: StickerRow): StickerType {
return {
...row,
@ -1937,6 +1946,56 @@ function updateToSchemaVersion35(currentVersion: number, db: Database) {
console.log('updateToSchemaVersion35: success!');
}
function updateToSchemaVersion36(currentVersion: number, db: Database) {
if (currentVersion >= 36) {
return;
}
db.transaction(() => {
db.exec(`
CREATE TABLE sendStates(
messageId STRING NOT NULL,
destinationConversationId STRING NOT NULL,
updatedAt INTEGER NOT NULL,
-- This should match the in-code enum.
status TEXT CHECK(
status IN (
'Failed',
'Pending',
'Sent',
'Delivered',
'Read',
'Viewed'
)
) NOT NULL,
UNIQUE(messageId, destinationConversationId),
FOREIGN KEY (messageId)
REFERENCES messages(id) ON DELETE CASCADE,
FOREIGN KEY (destinationConversationId)
REFERENCES conversations(id) ON DELETE CASCADE
);
CREATE INDEX message_sends ON sendStates (
messageId,
destinationConversationId
);
CREATE VIEW messagesWithSendStates AS
SELECT
messages.*,
GROUP_CONCAT(sendStates.destinationConversationId) AS sendConversationIdsJoined,
GROUP_CONCAT(sendStates.status) AS sendStatusesJoined,
GROUP_CONCAT(sendStates.updatedAt) AS sendUpdatedAtsJoined
FROM messages
LEFT JOIN sendStates ON messages.id = sendStates.messageId
GROUP BY messages.id;
`);
db.pragma('user_version = 36');
})();
console.log('updateToSchemaVersion36: success!');
}
const SCHEMA_VERSIONS = [
updateToSchemaVersion1,
updateToSchemaVersion2,
@ -1973,6 +2032,7 @@ const SCHEMA_VERSIONS = [
updateToSchemaVersion33,
updateToSchemaVersion34,
updateToSchemaVersion35,
updateToSchemaVersion36,
];
function updateSchema(db: Database): void {
@ -2984,21 +3044,24 @@ async function searchMessages(
// give us the right results. We can't call `snippet()` in the query above
// because it would bloat the temporary table with text data and we want
// to keep its size minimal for `ORDER BY` + `LIMIT` to be fast.
const result = db
const result: Array<SearchResultMessageType> = db
.prepare<Query>(
`
SELECT
messages.json,
messagesWithSendStates.json,
messagesWithSendStates.sendConversationIdsJoined,
messagesWithSendStates.sendStatusesJoined,
messagesWithSendStates.sendUpdatedAtsJoined,
snippet(messages_fts, -1, '<<left>>', '<<right>>', '...', 10)
AS snippet
FROM tmp_filtered_results
INNER JOIN messages_fts
ON messages_fts.rowid = tmp_filtered_results.rowid
INNER JOIN messages
ON messages.rowid = tmp_filtered_results.rowid
INNER JOIN messagesWithSendStates
ON messagesWithSendStates.rowid = tmp_filtered_results.rowid
WHERE
messages_fts.body MATCH $query
ORDER BY messages.received_at DESC, messages.sent_at DESC;
ORDER BY messagesWithSendStates.received_at DESC, messagesWithSendStates.sent_at DESC;
`
)
.all({ query });
@ -3125,9 +3188,11 @@ function saveMessageSync(
expirationStartTimestamp,
} = data;
const { sendStateByConversationId, ...dataToSaveInJsonField } = data;
const payload = {
id,
json: objectToJSON(data),
json: objectToJSON(dataToSaveInJsonField),
body: body || null,
conversationId,
@ -3149,6 +3214,8 @@ function saveMessageSync(
unread: unread ? 1 : 0,
};
let messageId: string;
if (id && !forceSave) {
prepare(
db,
@ -3179,70 +3246,94 @@ function saveMessageSync(
`
).run(payload);
return id;
messageId = id;
} else {
messageId = id || generateUUID();
const toCreate = {
...dataToSaveInJsonField,
id: messageId,
};
prepare(
db,
`
INSERT INTO messages (
id,
json,
body,
conversationId,
expirationStartTimestamp,
expireTimer,
hasAttachments,
hasFileAttachments,
hasVisualMediaAttachments,
isErased,
isViewOnce,
received_at,
schemaVersion,
serverGuid,
sent_at,
source,
sourceUuid,
sourceDevice,
type,
unread
) values (
$id,
$json,
$body,
$conversationId,
$expirationStartTimestamp,
$expireTimer,
$hasAttachments,
$hasFileAttachments,
$hasVisualMediaAttachments,
$isErased,
$isViewOnce,
$received_at,
$schemaVersion,
$serverGuid,
$sent_at,
$source,
$sourceUuid,
$sourceDevice,
$type,
$unread
);
`
).run({
...payload,
id: messageId,
json: objectToJSON(toCreate),
});
}
const toCreate = {
...data,
id: id || generateUUID(),
};
prepare(
db,
`
INSERT INTO messages (
id,
json,
body,
conversationId,
expirationStartTimestamp,
expireTimer,
hasAttachments,
hasFileAttachments,
hasVisualMediaAttachments,
isErased,
isViewOnce,
received_at,
schemaVersion,
serverGuid,
sent_at,
source,
sourceUuid,
sourceDevice,
type,
unread
) values (
$id,
$json,
$body,
$conversationId,
$expirationStartTimestamp,
$expireTimer,
$hasAttachments,
$hasFileAttachments,
$hasVisualMediaAttachments,
$isErased,
$isViewOnce,
$received_at,
$schemaVersion,
$serverGuid,
$sent_at,
$source,
$sourceUuid,
$sourceDevice,
$type,
$unread
if (sendStateByConversationId) {
const upsertSendStateStmt = prepare(
db,
`
INSERT OR REPLACE INTO sendStates
(messageId, destinationConversationId, updatedAt, status) VALUES
($messageId, $destinationConversationId, $updatedAt, $status);
`
);
`
).run({
...payload,
id: toCreate.id,
json: objectToJSON(toCreate),
});
Object.entries(sendStateByConversationId).forEach(
([destinationConversationId, sendState]) => {
upsertSendStateStmt.run(
serializeSendStateForDatabase({
messageId,
destinationConversationId,
...sendState,
})
);
}
);
}
return toCreate.id;
return messageId;
}
async function saveMessage(
@ -3290,8 +3381,18 @@ async function removeMessages(ids: Array<string>): Promise<void> {
async function getMessageById(id: string): Promise<MessageType | undefined> {
const db = getInstance();
const row = db
.prepare<Query>('SELECT json FROM messages WHERE id = $id;')
const row: null | MessageRowWithJoinedSends = db
.prepare<Query>(
`
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
WHERE id = $id;
`
)
.get({
id,
});
@ -3300,16 +3401,45 @@ async function getMessageById(id: string): Promise<MessageType | undefined> {
return undefined;
}
return jsonToObject(row.json);
return rowToMessage(row);
}
async function _getAllMessages(): Promise<Array<MessageType>> {
const db = getInstance();
const rows: JSONRows = db
.prepare<EmptyQuery>('SELECT json FROM messages ORDER BY id ASC;')
const rows: Array<MessageRowWithJoinedSends> = db
.prepare<EmptyQuery>(
`
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
ORDER BY id asc;
`
)
.all();
return rows.map(row => jsonToObject(row.json));
return rows.map(row => rowToMessage(row));
}
async function _getSendStates({
messageId,
destinationConversationId,
}: Readonly<{
messageId: string;
destinationConversationId: string;
}>) {
const db = getInstance();
return db
.prepare(
`
SELECT status, updatedAt FROM sendStates
WHERE messageId = $messageId
AND destinationConversationId = $destinationConversationId;
`
)
.all({ messageId, destinationConversationId });
}
async function getAllMessageIds(): Promise<Array<string>> {
@ -3333,10 +3463,16 @@ async function getMessageBySender({
sent_at: number;
}): Promise<Array<MessageType>> {
const db = getInstance();
const rows: JSONRows = prepare(
const rows: Array<MessageRowWithJoinedSends> = prepare(
db,
`
SELECT json FROM messages WHERE
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
WHERE
(source = $source OR sourceUuid = $sourceUuid) AND
sourceDevice = $sourceDevice AND
sent_at = $sent_at;
@ -3348,7 +3484,7 @@ async function getMessageBySender({
sent_at,
});
return rows.map(row => jsonToObject(row.json));
return rows.map(row => rowToMessage(row));
}
async function getUnreadCountForConversation(
@ -3614,15 +3750,21 @@ async function getOlderMessagesByConversation(
sentAt?: number;
messageId?: string;
} = {}
): Promise<Array<MessageTypeUnhydrated>> {
): Promise<Array<MessageRowWithJoinedSends>> {
const db = getInstance();
let rows: JSONRows;
let rows: Array<MessageRowWithJoinedSends>;
if (messageId) {
rows = db
.prepare<Query>(
`
SELECT json FROM messages WHERE
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
WHERE
conversationId = $conversationId AND
id != $messageId AND
(
@ -3644,7 +3786,12 @@ async function getOlderMessagesByConversation(
rows = db
.prepare<Query>(
`
SELECT json FROM messages WHERE
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates WHERE
conversationId = $conversationId AND
(
(received_at = $received_at AND sent_at < $sent_at) OR
@ -3672,12 +3819,17 @@ async function getNewerMessagesByConversation(
receivedAt = 0,
sentAt = 0,
}: { limit?: number; receivedAt?: number; sentAt?: number } = {}
): Promise<Array<MessageTypeUnhydrated>> {
): Promise<Array<MessageRowWithJoinedSends>> {
const db = getInstance();
const rows: JSONRows = db
return db
.prepare<Query>(
`
SELECT json FROM messages WHERE
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates WHERE
conversationId = $conversationId AND
(
(received_at = $received_at AND sent_at > $sent_at) OR
@ -3693,9 +3845,8 @@ async function getNewerMessagesByConversation(
sent_at: sentAt,
limit,
});
return rows;
}
function getOldestMessageForConversation(
conversationId: string
): MessageMetricsType | undefined {
@ -3751,10 +3902,15 @@ async function getLastConversationActivity({
ourConversationId: string;
}): Promise<MessageType | undefined> {
const db = getInstance();
const row = prepare(
const row: undefined | MessageRowWithJoinedSends = prepare(
db,
`
SELECT json FROM messages
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
WHERE
conversationId = $conversationId AND
(type IS NULL
@ -3792,7 +3948,7 @@ async function getLastConversationActivity({
return undefined;
}
return jsonToObject(row.json);
return rowToMessage(row);
}
async function getLastConversationPreview({
conversationId,
@ -3802,10 +3958,15 @@ async function getLastConversationPreview({
ourConversationId: string;
}): Promise<MessageType | undefined> {
const db = getInstance();
const row = prepare(
const row: undefined | MessageRowWithJoinedSends = prepare(
db,
`
SELECT json FROM messages
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
WHERE
conversationId = $conversationId AND
(
@ -3838,8 +3999,9 @@ async function getLastConversationPreview({
return undefined;
}
return jsonToObject(row.json);
return rowToMessage(row);
}
function getOldestUnreadMessageForConversation(
conversationId: string
): MessageMetricsType | undefined {
@ -3953,14 +4115,38 @@ async function migrateConversationMessages(
});
}
async function updateMessageSendState(
params: Readonly<
{
messageId: string;
destinationConversationId: string;
} & SendState
>
): Promise<void> {
const db = getInstance();
db.prepare<Query>(
`
INSERT OR REPLACE INTO sendStates
(messageId, destinationConversationId, updatedAt, status) VALUES
($messageId, $destinationConversationId, $updatedAt, $status);
`
).run(serializeSendStateForDatabase(params));
}
async function getMessagesBySentAt(
sentAt: number
): Promise<Array<MessageType>> {
const db = getInstance();
const rows: JSONRows = db
const rows: Array<MessageRowWithJoinedSends> = db
.prepare<Query>(
`
SELECT json FROM messages
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
WHERE sent_at = $sent_at
ORDER BY received_at DESC, sent_at DESC;
`
@ -3969,17 +4155,23 @@ async function getMessagesBySentAt(
sent_at: sentAt,
});
return rows.map(row => jsonToObject(row.json));
return rows.map(row => rowToMessage(row));
}
async function getExpiredMessages(): Promise<Array<MessageType>> {
const db = getInstance();
const now = Date.now();
const rows: JSONRows = db
const rows: Array<MessageRowWithJoinedSends> = db
.prepare<Query>(
`
SELECT json FROM messages WHERE
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
WHERE
expiresAt IS NOT NULL AND
expiresAt <= $now
ORDER BY expiresAt ASC;
@ -3987,18 +4179,22 @@ async function getExpiredMessages(): Promise<Array<MessageType>> {
)
.all({ now });
return rows.map(row => jsonToObject(row.json));
return rows.map(row => rowToMessage(row));
}
async function getMessagesUnexpectedlyMissingExpirationStartTimestamp(): Promise<
Array<MessageType>
> {
const db = getInstance();
const rows: JSONRows = db
const rows: Array<MessageRowWithJoinedSends> = db
.prepare<EmptyQuery>(
`
SELECT json FROM messages
INDEXED BY messages_unexpectedly_missing_expiration_start_timestamp
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
WHERE
expireTimer > 0 AND
expirationStartTimestamp IS NULL AND
@ -4013,7 +4209,7 @@ async function getMessagesUnexpectedlyMissingExpirationStartTimestamp(): Promise
)
.all();
return rows.map(row => jsonToObject(row.json));
return rows.map(row => rowToMessage(row));
}
async function getSoonestMessageExpiry(): Promise<undefined | number> {
@ -4063,11 +4259,15 @@ async function getTapToViewMessagesNeedingErase(): Promise<Array<MessageType>> {
const db = getInstance();
const THIRTY_DAYS_AGO = Date.now() - 30 * 24 * 60 * 60 * 1000;
const rows: JSONRows = db
const rows: Array<MessageRowWithJoinedSends> = db
.prepare<Query>(
`
SELECT json
FROM messages
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
WHERE
isViewOnce = 1
AND (isErased IS NULL OR isErased != 1)
@ -4079,7 +4279,7 @@ async function getTapToViewMessagesNeedingErase(): Promise<Array<MessageType>> {
THIRTY_DAYS_AGO,
});
return rows.map(row => jsonToObject(row.json));
return rows.map(row => rowToMessage(row));
}
function saveUnprocessedSync(data: UnprocessedType): string {
@ -4913,6 +5113,7 @@ async function removeAll(): Promise<void> {
DELETE FROM sticker_packs;
DELETE FROM sticker_references;
DELETE FROM jobs;
DELETE FROM sendStates;
`);
})();
}
@ -4933,6 +5134,7 @@ async function removeAllConfiguration(): Promise<void> {
DELETE FROM signedPreKeys;
DELETE FROM unprocessed;
DELETE FROM jobs;
DELETE FROM sendStates;
`
);
db.prepare('UPDATE conversations SET json = json_patch(json, $patch);').run(
@ -4948,11 +5150,15 @@ async function getMessagesNeedingUpgrade(
{ maxVersion }: { maxVersion: number }
): Promise<Array<MessageType>> {
const db = getInstance();
const rows: JSONRows = db
const rows: Array<MessageRowWithJoinedSends> = db
.prepare<Query>(
`
SELECT json
FROM messages
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
WHERE schemaVersion IS NULL OR schemaVersion < $maxVersion
LIMIT $limit;
`
@ -4962,7 +5168,7 @@ async function getMessagesNeedingUpgrade(
limit,
});
return rows.map(row => jsonToObject(row.json));
return rows.map(row => rowToMessage(row));
}
async function getMessagesWithVisualMediaAttachments(
@ -4970,10 +5176,15 @@ async function getMessagesWithVisualMediaAttachments(
{ limit }: { limit: number }
): Promise<Array<MessageType>> {
const db = getInstance();
const rows: JSONRows = db
const rows: Array<MessageRowWithJoinedSends> = db
.prepare<Query>(
`
SELECT json FROM messages WHERE
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates WHERE
conversationId = $conversationId AND
hasVisualMediaAttachments = 1
ORDER BY received_at DESC, sent_at DESC
@ -4985,7 +5196,7 @@ async function getMessagesWithVisualMediaAttachments(
limit,
});
return rows.map(row => jsonToObject(row.json));
return rows.map(row => rowToMessage(row));
}
async function getMessagesWithFileAttachments(
@ -4993,10 +5204,15 @@ async function getMessagesWithFileAttachments(
{ limit }: { limit: number }
): Promise<Array<MessageType>> {
const db = getInstance();
const rows = db
const rows: Array<MessageRowWithJoinedSends> = db
.prepare<Query>(
`
SELECT json FROM messages WHERE
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates WHERE
conversationId = $conversationId AND
hasFileAttachments = 1
ORDER BY received_at DESC, sent_at DESC
@ -5008,7 +5224,7 @@ async function getMessagesWithFileAttachments(
limit,
});
return map(rows, row => jsonToObject(row.json));
return rows.map(row => rowToMessage(row));
}
async function getMessageServerGuidsForSpam(