Revert "Refactor outbound delivery state"

This reverts commit 9c48a95eb5.
This commit is contained in:
Fedor Indutny 2021-07-12 16:51:45 -07:00 committed by GitHub
parent 77668c3247
commit ad217c808d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 694 additions and 3197 deletions

View file

@ -48,6 +48,7 @@ import {
ItemKeyType,
ItemType,
MessageType,
MessageTypeUnhydrated,
MessageMetricsType,
PreKeyType,
SearchResultMessageType,
@ -61,11 +62,6 @@ import {
UnprocessedType,
UnprocessedUpdateType,
} from './Interface';
import {
SendState,
serializeSendStateForDatabase,
} from '../messages/MessageSendState';
import { MessageRowWithJoinedSends, rowToMessage } from './rowToMessage';
declare global {
// We want to extend `Function`'s properties, so we need to use an interface.
@ -183,7 +179,6 @@ const dataInterface: ServerInterface = {
getMessageBySender,
getMessageById,
_getAllMessages,
_getSendStates,
getAllMessageIds,
getMessagesBySentAt,
getExpiredMessages,
@ -199,8 +194,6 @@ const dataInterface: ServerInterface = {
hasGroupCallHistoryMessage,
migrateConversationMessages,
updateMessageSendState,
getUnprocessedCount,
getAllUnprocessed,
updateUnprocessedAttempts,
@ -286,7 +279,6 @@ function objectToJSON(data: any) {
function jsonToObject(json: string): any {
return JSON.parse(json);
}
function rowToConversation(row: ConversationRow): ConversationType {
const parsedJson = JSON.parse(row.json);
@ -306,7 +298,6 @@ function rowToConversation(row: ConversationRow): ConversationType {
profileLastFetchedAt,
};
}
function rowToSticker(row: StickerRow): StickerType {
return {
...row,
@ -1946,56 +1937,6 @@ function updateToSchemaVersion35(currentVersion: number, db: Database) {
console.log('updateToSchemaVersion35: success!');
}
function updateToSchemaVersion36(currentVersion: number, db: Database) {
if (currentVersion >= 36) {
return;
}
db.transaction(() => {
db.exec(`
CREATE TABLE sendStates(
messageId STRING NOT NULL,
destinationConversationId STRING NOT NULL,
updatedAt INTEGER NOT NULL,
-- This should match the in-code enum.
status TEXT CHECK(
status IN (
'Failed',
'Pending',
'Sent',
'Delivered',
'Read',
'Viewed'
)
) NOT NULL,
UNIQUE(messageId, destinationConversationId),
FOREIGN KEY (messageId)
REFERENCES messages(id) ON DELETE CASCADE,
FOREIGN KEY (destinationConversationId)
REFERENCES conversations(id) ON DELETE CASCADE
);
CREATE INDEX message_sends ON sendStates (
messageId,
destinationConversationId
);
CREATE VIEW messagesWithSendStates AS
SELECT
messages.*,
GROUP_CONCAT(sendStates.destinationConversationId) AS sendConversationIdsJoined,
GROUP_CONCAT(sendStates.status) AS sendStatusesJoined,
GROUP_CONCAT(sendStates.updatedAt) AS sendUpdatedAtsJoined
FROM messages
LEFT JOIN sendStates ON messages.id = sendStates.messageId
GROUP BY messages.id;
`);
db.pragma('user_version = 36');
})();
console.log('updateToSchemaVersion36: success!');
}
const SCHEMA_VERSIONS = [
updateToSchemaVersion1,
updateToSchemaVersion2,
@ -2032,7 +1973,6 @@ const SCHEMA_VERSIONS = [
updateToSchemaVersion33,
updateToSchemaVersion34,
updateToSchemaVersion35,
updateToSchemaVersion36,
];
function updateSchema(db: Database): void {
@ -3044,24 +2984,21 @@ async function searchMessages(
// give us the right results. We can't call `snippet()` in the query above
// because it would bloat the temporary table with text data and we want
// to keep its size minimal for `ORDER BY` + `LIMIT` to be fast.
const result: Array<SearchResultMessageType> = db
const result = db
.prepare<Query>(
`
SELECT
messagesWithSendStates.json,
messagesWithSendStates.sendConversationIdsJoined,
messagesWithSendStates.sendStatusesJoined,
messagesWithSendStates.sendUpdatedAtsJoined,
messages.json,
snippet(messages_fts, -1, '<<left>>', '<<right>>', '...', 10)
AS snippet
FROM tmp_filtered_results
INNER JOIN messages_fts
ON messages_fts.rowid = tmp_filtered_results.rowid
INNER JOIN messagesWithSendStates
ON messagesWithSendStates.rowid = tmp_filtered_results.rowid
INNER JOIN messages
ON messages.rowid = tmp_filtered_results.rowid
WHERE
messages_fts.body MATCH $query
ORDER BY messagesWithSendStates.received_at DESC, messagesWithSendStates.sent_at DESC;
ORDER BY messages.received_at DESC, messages.sent_at DESC;
`
)
.all({ query });
@ -3188,11 +3125,9 @@ function saveMessageSync(
expirationStartTimestamp,
} = data;
const { sendStateByConversationId, ...dataToSaveInJsonField } = data;
const payload = {
id,
json: objectToJSON(dataToSaveInJsonField),
json: objectToJSON(data),
body: body || null,
conversationId,
@ -3214,8 +3149,6 @@ function saveMessageSync(
unread: unread ? 1 : 0,
};
let messageId: string;
if (id && !forceSave) {
prepare(
db,
@ -3246,94 +3179,70 @@ function saveMessageSync(
`
).run(payload);
messageId = id;
} else {
messageId = id || generateUUID();
const toCreate = {
...dataToSaveInJsonField,
id: messageId,
};
prepare(
db,
`
INSERT INTO messages (
id,
json,
body,
conversationId,
expirationStartTimestamp,
expireTimer,
hasAttachments,
hasFileAttachments,
hasVisualMediaAttachments,
isErased,
isViewOnce,
received_at,
schemaVersion,
serverGuid,
sent_at,
source,
sourceUuid,
sourceDevice,
type,
unread
) values (
$id,
$json,
$body,
$conversationId,
$expirationStartTimestamp,
$expireTimer,
$hasAttachments,
$hasFileAttachments,
$hasVisualMediaAttachments,
$isErased,
$isViewOnce,
$received_at,
$schemaVersion,
$serverGuid,
$sent_at,
$source,
$sourceUuid,
$sourceDevice,
$type,
$unread
);
`
).run({
...payload,
id: messageId,
json: objectToJSON(toCreate),
});
return id;
}
if (sendStateByConversationId) {
const upsertSendStateStmt = prepare(
db,
`
INSERT OR REPLACE INTO sendStates
(messageId, destinationConversationId, updatedAt, status) VALUES
($messageId, $destinationConversationId, $updatedAt, $status);
`
);
Object.entries(sendStateByConversationId).forEach(
([destinationConversationId, sendState]) => {
upsertSendStateStmt.run(
serializeSendStateForDatabase({
messageId,
destinationConversationId,
...sendState,
})
);
}
);
}
const toCreate = {
...data,
id: id || generateUUID(),
};
return messageId;
prepare(
db,
`
INSERT INTO messages (
id,
json,
body,
conversationId,
expirationStartTimestamp,
expireTimer,
hasAttachments,
hasFileAttachments,
hasVisualMediaAttachments,
isErased,
isViewOnce,
received_at,
schemaVersion,
serverGuid,
sent_at,
source,
sourceUuid,
sourceDevice,
type,
unread
) values (
$id,
$json,
$body,
$conversationId,
$expirationStartTimestamp,
$expireTimer,
$hasAttachments,
$hasFileAttachments,
$hasVisualMediaAttachments,
$isErased,
$isViewOnce,
$received_at,
$schemaVersion,
$serverGuid,
$sent_at,
$source,
$sourceUuid,
$sourceDevice,
$type,
$unread
);
`
).run({
...payload,
id: toCreate.id,
json: objectToJSON(toCreate),
});
return toCreate.id;
}
async function saveMessage(
@ -3381,18 +3290,8 @@ async function removeMessages(ids: Array<string>): Promise<void> {
async function getMessageById(id: string): Promise<MessageType | undefined> {
const db = getInstance();
const row: null | MessageRowWithJoinedSends = db
.prepare<Query>(
`
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
WHERE id = $id;
`
)
const row = db
.prepare<Query>('SELECT json FROM messages WHERE id = $id;')
.get({
id,
});
@ -3401,45 +3300,16 @@ async function getMessageById(id: string): Promise<MessageType | undefined> {
return undefined;
}
return rowToMessage(row);
return jsonToObject(row.json);
}
async function _getAllMessages(): Promise<Array<MessageType>> {
const db = getInstance();
const rows: Array<MessageRowWithJoinedSends> = db
.prepare<EmptyQuery>(
`
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
ORDER BY id asc;
`
)
const rows: JSONRows = db
.prepare<EmptyQuery>('SELECT json FROM messages ORDER BY id ASC;')
.all();
return rows.map(row => rowToMessage(row));
}
async function _getSendStates({
messageId,
destinationConversationId,
}: Readonly<{
messageId: string;
destinationConversationId: string;
}>) {
const db = getInstance();
return db
.prepare(
`
SELECT status, updatedAt FROM sendStates
WHERE messageId = $messageId
AND destinationConversationId = $destinationConversationId;
`
)
.all({ messageId, destinationConversationId });
return rows.map(row => jsonToObject(row.json));
}
async function getAllMessageIds(): Promise<Array<string>> {
@ -3463,16 +3333,10 @@ async function getMessageBySender({
sent_at: number;
}): Promise<Array<MessageType>> {
const db = getInstance();
const rows: Array<MessageRowWithJoinedSends> = prepare(
const rows: JSONRows = prepare(
db,
`
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
WHERE
SELECT json FROM messages WHERE
(source = $source OR sourceUuid = $sourceUuid) AND
sourceDevice = $sourceDevice AND
sent_at = $sent_at;
@ -3484,7 +3348,7 @@ async function getMessageBySender({
sent_at,
});
return rows.map(row => rowToMessage(row));
return rows.map(row => jsonToObject(row.json));
}
async function getUnreadCountForConversation(
@ -3750,21 +3614,15 @@ async function getOlderMessagesByConversation(
sentAt?: number;
messageId?: string;
} = {}
): Promise<Array<MessageRowWithJoinedSends>> {
): Promise<Array<MessageTypeUnhydrated>> {
const db = getInstance();
let rows: Array<MessageRowWithJoinedSends>;
let rows: JSONRows;
if (messageId) {
rows = db
.prepare<Query>(
`
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
WHERE
SELECT json FROM messages WHERE
conversationId = $conversationId AND
id != $messageId AND
(
@ -3786,12 +3644,7 @@ async function getOlderMessagesByConversation(
rows = db
.prepare<Query>(
`
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates WHERE
SELECT json FROM messages WHERE
conversationId = $conversationId AND
(
(received_at = $received_at AND sent_at < $sent_at) OR
@ -3819,17 +3672,12 @@ async function getNewerMessagesByConversation(
receivedAt = 0,
sentAt = 0,
}: { limit?: number; receivedAt?: number; sentAt?: number } = {}
): Promise<Array<MessageRowWithJoinedSends>> {
): Promise<Array<MessageTypeUnhydrated>> {
const db = getInstance();
return db
const rows: JSONRows = db
.prepare<Query>(
`
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates WHERE
SELECT json FROM messages WHERE
conversationId = $conversationId AND
(
(received_at = $received_at AND sent_at > $sent_at) OR
@ -3845,8 +3693,9 @@ async function getNewerMessagesByConversation(
sent_at: sentAt,
limit,
});
}
return rows;
}
function getOldestMessageForConversation(
conversationId: string
): MessageMetricsType | undefined {
@ -3902,15 +3751,10 @@ async function getLastConversationActivity({
ourConversationId: string;
}): Promise<MessageType | undefined> {
const db = getInstance();
const row: undefined | MessageRowWithJoinedSends = prepare(
const row = prepare(
db,
`
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
SELECT json FROM messages
WHERE
conversationId = $conversationId AND
(type IS NULL
@ -3948,7 +3792,7 @@ async function getLastConversationActivity({
return undefined;
}
return rowToMessage(row);
return jsonToObject(row.json);
}
async function getLastConversationPreview({
conversationId,
@ -3958,15 +3802,10 @@ async function getLastConversationPreview({
ourConversationId: string;
}): Promise<MessageType | undefined> {
const db = getInstance();
const row: undefined | MessageRowWithJoinedSends = prepare(
const row = prepare(
db,
`
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
SELECT json FROM messages
WHERE
conversationId = $conversationId AND
(
@ -3999,9 +3838,8 @@ async function getLastConversationPreview({
return undefined;
}
return rowToMessage(row);
return jsonToObject(row.json);
}
function getOldestUnreadMessageForConversation(
conversationId: string
): MessageMetricsType | undefined {
@ -4115,38 +3953,14 @@ async function migrateConversationMessages(
});
}
async function updateMessageSendState(
params: Readonly<
{
messageId: string;
destinationConversationId: string;
} & SendState
>
): Promise<void> {
const db = getInstance();
db.prepare<Query>(
`
INSERT OR REPLACE INTO sendStates
(messageId, destinationConversationId, updatedAt, status) VALUES
($messageId, $destinationConversationId, $updatedAt, $status);
`
).run(serializeSendStateForDatabase(params));
}
async function getMessagesBySentAt(
sentAt: number
): Promise<Array<MessageType>> {
const db = getInstance();
const rows: Array<MessageRowWithJoinedSends> = db
const rows: JSONRows = db
.prepare<Query>(
`
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
SELECT json FROM messages
WHERE sent_at = $sent_at
ORDER BY received_at DESC, sent_at DESC;
`
@ -4155,23 +3969,17 @@ async function getMessagesBySentAt(
sent_at: sentAt,
});
return rows.map(row => rowToMessage(row));
return rows.map(row => jsonToObject(row.json));
}
async function getExpiredMessages(): Promise<Array<MessageType>> {
const db = getInstance();
const now = Date.now();
const rows: Array<MessageRowWithJoinedSends> = db
const rows: JSONRows = db
.prepare<Query>(
`
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
WHERE
SELECT json FROM messages WHERE
expiresAt IS NOT NULL AND
expiresAt <= $now
ORDER BY expiresAt ASC;
@ -4179,22 +3987,18 @@ async function getExpiredMessages(): Promise<Array<MessageType>> {
)
.all({ now });
return rows.map(row => rowToMessage(row));
return rows.map(row => jsonToObject(row.json));
}
async function getMessagesUnexpectedlyMissingExpirationStartTimestamp(): Promise<
Array<MessageType>
> {
const db = getInstance();
const rows: Array<MessageRowWithJoinedSends> = db
const rows: JSONRows = db
.prepare<EmptyQuery>(
`
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
SELECT json FROM messages
INDEXED BY messages_unexpectedly_missing_expiration_start_timestamp
WHERE
expireTimer > 0 AND
expirationStartTimestamp IS NULL AND
@ -4209,7 +4013,7 @@ async function getMessagesUnexpectedlyMissingExpirationStartTimestamp(): Promise
)
.all();
return rows.map(row => rowToMessage(row));
return rows.map(row => jsonToObject(row.json));
}
async function getSoonestMessageExpiry(): Promise<undefined | number> {
@ -4259,15 +4063,11 @@ async function getTapToViewMessagesNeedingErase(): Promise<Array<MessageType>> {
const db = getInstance();
const THIRTY_DAYS_AGO = Date.now() - 30 * 24 * 60 * 60 * 1000;
const rows: Array<MessageRowWithJoinedSends> = db
const rows: JSONRows = db
.prepare<Query>(
`
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
SELECT json
FROM messages
WHERE
isViewOnce = 1
AND (isErased IS NULL OR isErased != 1)
@ -4279,7 +4079,7 @@ async function getTapToViewMessagesNeedingErase(): Promise<Array<MessageType>> {
THIRTY_DAYS_AGO,
});
return rows.map(row => rowToMessage(row));
return rows.map(row => jsonToObject(row.json));
}
function saveUnprocessedSync(data: UnprocessedType): string {
@ -5113,7 +4913,6 @@ async function removeAll(): Promise<void> {
DELETE FROM sticker_packs;
DELETE FROM sticker_references;
DELETE FROM jobs;
DELETE FROM sendStates;
`);
})();
}
@ -5134,7 +4933,6 @@ async function removeAllConfiguration(): Promise<void> {
DELETE FROM signedPreKeys;
DELETE FROM unprocessed;
DELETE FROM jobs;
DELETE FROM sendStates;
`
);
db.prepare('UPDATE conversations SET json = json_patch(json, $patch);').run(
@ -5150,15 +4948,11 @@ async function getMessagesNeedingUpgrade(
{ maxVersion }: { maxVersion: number }
): Promise<Array<MessageType>> {
const db = getInstance();
const rows: Array<MessageRowWithJoinedSends> = db
const rows: JSONRows = db
.prepare<Query>(
`
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates
SELECT json
FROM messages
WHERE schemaVersion IS NULL OR schemaVersion < $maxVersion
LIMIT $limit;
`
@ -5168,7 +4962,7 @@ async function getMessagesNeedingUpgrade(
limit,
});
return rows.map(row => rowToMessage(row));
return rows.map(row => jsonToObject(row.json));
}
async function getMessagesWithVisualMediaAttachments(
@ -5176,15 +4970,10 @@ async function getMessagesWithVisualMediaAttachments(
{ limit }: { limit: number }
): Promise<Array<MessageType>> {
const db = getInstance();
const rows: Array<MessageRowWithJoinedSends> = db
const rows: JSONRows = db
.prepare<Query>(
`
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates WHERE
SELECT json FROM messages WHERE
conversationId = $conversationId AND
hasVisualMediaAttachments = 1
ORDER BY received_at DESC, sent_at DESC
@ -5196,7 +4985,7 @@ async function getMessagesWithVisualMediaAttachments(
limit,
});
return rows.map(row => rowToMessage(row));
return rows.map(row => jsonToObject(row.json));
}
async function getMessagesWithFileAttachments(
@ -5204,15 +4993,10 @@ async function getMessagesWithFileAttachments(
{ limit }: { limit: number }
): Promise<Array<MessageType>> {
const db = getInstance();
const rows: Array<MessageRowWithJoinedSends> = db
const rows = db
.prepare<Query>(
`
SELECT
json,
sendConversationIdsJoined,
sendStatusesJoined,
sendUpdatedAtsJoined
FROM messagesWithSendStates WHERE
SELECT json FROM messages WHERE
conversationId = $conversationId AND
hasFileAttachments = 1
ORDER BY received_at DESC, sent_at DESC
@ -5224,7 +5008,7 @@ async function getMessagesWithFileAttachments(
limit,
});
return rows.map(row => rowToMessage(row));
return map(rows, row => jsonToObject(row.json));
}
async function getMessageServerGuidsForSpam(