Message Send Log to enable comprehensive resend

This commit is contained in:
Scott Nonnenberg 2021-07-15 16:48:09 -07:00 committed by GitHub
parent 0fe68b57b1
commit a42c41ed01
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
37 changed files with 3154 additions and 1266 deletions

View file

@ -14,7 +14,6 @@ import {
cloneDeep,
compact,
fromPairs,
toPairs,
get,
groupBy,
isFunction,
@ -22,6 +21,8 @@ import {
map,
omit,
set,
toPairs,
uniq,
} from 'lodash';
import { arrayBufferToBase64, base64ToArrayBuffer } from '../Crypto';
@ -41,8 +42,8 @@ import { StoredJob } from '../jobs/types';
import {
AttachmentDownloadJobType,
ClientInterface,
ClientSearchResultMessageType,
ClientJobType,
ClientSearchResultMessageType,
ConversationType,
IdentityKeyType,
ItemKeyType,
@ -52,6 +53,12 @@ import {
PreKeyType,
SearchResultMessageType,
SenderKeyType,
SentMessageDBType,
SentMessagesType,
SentProtoType,
SentProtoWithMessageIdsType,
SentRecipientsDBType,
SentRecipientsType,
ServerInterface,
SessionType,
SignedPreKeyType,
@ -143,6 +150,17 @@ const dataInterface: ClientInterface = {
getAllSenderKeys,
removeSenderKeyById,
insertSentProto,
deleteSentProtosOlderThan,
deleteSentProtoByMessageId,
insertProtoRecipients,
deleteSentProtoRecipient,
getSentProtoByRecipient,
removeAllSentProtos,
getAllSentProtos,
_getAllSentProtoRecipients,
_getAllSentProtoMessageIds,
createOrUpdateSession,
createOrUpdateSessions,
commitSessionsAndUnprocessed,
@ -771,6 +789,66 @@ async function removeSenderKeyById(id: string): Promise<void> {
return channels.removeSenderKeyById(id);
}
// Sent Protos
async function insertSentProto(
proto: SentProtoType,
options: {
messageIds: SentMessagesType;
recipients: SentRecipientsType;
}
): Promise<number> {
return channels.insertSentProto(proto, {
...options,
messageIds: uniq(options.messageIds),
});
}
async function deleteSentProtosOlderThan(timestamp: number): Promise<void> {
await channels.deleteSentProtosOlderThan(timestamp);
}
async function deleteSentProtoByMessageId(messageId: string): Promise<void> {
await channels.deleteSentProtoByMessageId(messageId);
}
async function insertProtoRecipients(options: {
id: number;
recipientUuid: string;
deviceIds: Array<number>;
}): Promise<void> {
await channels.insertProtoRecipients(options);
}
async function deleteSentProtoRecipient(options: {
timestamp: number;
recipientUuid: string;
deviceId: number;
}): Promise<void> {
await channels.deleteSentProtoRecipient(options);
}
async function getSentProtoByRecipient(options: {
now: number;
recipientUuid: string;
timestamp: number;
}): Promise<SentProtoWithMessageIdsType | undefined> {
return channels.getSentProtoByRecipient(options);
}
async function removeAllSentProtos(): Promise<void> {
await channels.removeAllSentProtos();
}
async function getAllSentProtos(): Promise<Array<SentProtoType>> {
return channels.getAllSentProtos();
}
// Test-only:
async function _getAllSentProtoRecipients(): Promise<
Array<SentRecipientsDBType>
> {
return channels._getAllSentProtoRecipients();
}
async function _getAllSentProtoMessageIds(): Promise<Array<SentMessageDBType>> {
return channels._getAllSentProtoMessageIds();
}
// Sessions
async function createOrUpdateSession(data: SessionType) {

View file

@ -17,6 +17,7 @@ import type { ReactionType } from '../types/Reactions';
import type { ConversationColorType, CustomColorType } from '../types/Colors';
import { StorageAccessType } from '../types/Storage.d';
import type { AttachmentType } from '../types/Attachment';
import { BodyRangesType } from '../types/Util';
export type AttachmentDownloadJobTypeType =
| 'long-message'
@ -83,9 +84,32 @@ export type SearchResultMessageType = {
};
export type ClientSearchResultMessageType = MessageType & {
json: string;
bodyRanges: [];
bodyRanges: BodyRangesType;
snippet: string;
};
export type SentProtoType = {
contentHint: number;
proto: Buffer;
timestamp: number;
};
export type SentProtoWithMessageIdsType = SentProtoType & {
messageIds: Array<string>;
};
export type SentRecipientsType = Record<string, Array<number>>;
export type SentMessagesType = Array<string>;
// These two are for test only
export type SentRecipientsDBType = {
payloadId: number;
recipientUuid: string;
deviceId: number;
};
export type SentMessageDBType = {
payloadId: number;
messageId: string;
};
export type SenderKeyType = {
// Primary key
id: string;
@ -215,6 +239,36 @@ export type DataInterface = {
getAllSenderKeys: () => Promise<Array<SenderKeyType>>;
removeSenderKeyById: (id: string) => Promise<void>;
insertSentProto: (
proto: SentProtoType,
options: {
recipients: SentRecipientsType;
messageIds: SentMessagesType;
}
) => Promise<number>;
deleteSentProtosOlderThan: (timestamp: number) => Promise<void>;
deleteSentProtoByMessageId: (messageId: string) => Promise<void>;
insertProtoRecipients: (options: {
id: number;
recipientUuid: string;
deviceIds: Array<number>;
}) => Promise<void>;
deleteSentProtoRecipient: (options: {
timestamp: number;
recipientUuid: string;
deviceId: number;
}) => Promise<void>;
getSentProtoByRecipient: (options: {
now: number;
recipientUuid: string;
timestamp: number;
}) => Promise<SentProtoWithMessageIdsType | undefined>;
removeAllSentProtos: () => Promise<void>;
getAllSentProtos: () => Promise<Array<SentProtoType>>;
// Test-only
_getAllSentProtoRecipients: () => Promise<Array<SentRecipientsDBType>>;
_getAllSentProtoMessageIds: () => Promise<Array<SentMessageDBType>>;
createOrUpdateSession: (data: SessionType) => Promise<void>;
createOrUpdateSessions: (array: Array<SessionType>) => Promise<void>;
commitSessionsAndUnprocessed(options: {
@ -255,6 +309,36 @@ export type DataInterface = {
) => Promise<void>;
getNextTapToViewMessageTimestampToAgeOut: () => Promise<undefined | number>;
getUnreadCountForConversation: (conversationId: string) => Promise<number>;
getUnreadByConversationAndMarkRead: (
conversationId: string,
newestUnreadId: number,
readAt?: number
) => Promise<
Array<
Pick<MessageType, 'id' | 'source' | 'sourceUuid' | 'sent_at' | 'type'>
>
>;
getUnreadReactionsAndMarkRead: (
conversationId: string,
newestUnreadId: number
) => Promise<
Array<
Pick<ReactionType, 'targetAuthorUuid' | 'targetTimestamp' | 'messageId'>
>
>;
markReactionAsRead: (
targetAuthorUuid: string,
targetTimestamp: number
) => Promise<ReactionType | undefined>;
removeReactionFromConversation: (reaction: {
emoji: string;
fromId: string;
targetAuthorUuid: string;
targetTimestamp: number;
}) => Promise<void>;
addReaction: (reactionObj: ReactionType) => Promise<void>;
getUnprocessedCount: () => Promise<number>;
getAllUnprocessed: () => Promise<Array<UnprocessedType>>;
updateUnprocessedAttempts: (id: string, attempts: number) => Promise<void>;
@ -391,33 +475,6 @@ export type ServerInterface = DataInterface & {
ourConversationId: string;
}) => Promise<MessageType | undefined>;
getTapToViewMessagesNeedingErase: () => Promise<Array<MessageType>>;
getUnreadCountForConversation: (conversationId: string) => Promise<number>;
getUnreadByConversationAndMarkRead: (
conversationId: string,
newestUnreadId: number,
readAt?: number
) => Promise<
Array<
Pick<MessageType, 'id' | 'source' | 'sourceUuid' | 'sent_at' | 'type'>
>
>;
getUnreadReactionsAndMarkRead: (
conversationId: string,
newestUnreadId: number
) => Promise<
Array<Pick<ReactionType, 'targetAuthorUuid' | 'targetTimestamp'>>
>;
markReactionAsRead: (
targetAuthorUuid: string,
targetTimestamp: number
) => Promise<ReactionType | undefined>;
removeReactionFromConversation: (reaction: {
emoji: string;
fromId: string;
targetAuthorUuid: string;
targetTimestamp: number;
}) => Promise<void>;
addReaction: (reactionObj: ReactionType) => Promise<void>;
removeConversation: (id: Array<string> | string) => Promise<void>;
removeMessage: (id: string) => Promise<void>;
removeMessages: (ids: Array<string>) => Promise<void>;
@ -530,33 +587,6 @@ export type ClientInterface = DataInterface & {
getTapToViewMessagesNeedingErase: (options: {
MessageCollection: typeof MessageModelCollectionType;
}) => Promise<MessageModelCollectionType>;
getUnreadCountForConversation: (conversationId: string) => Promise<number>;
getUnreadByConversationAndMarkRead: (
conversationId: string,
newestUnreadId: number,
readAt?: number
) => Promise<
Array<
Pick<MessageType, 'id' | 'source' | 'sourceUuid' | 'sent_at' | 'type'>
>
>;
getUnreadReactionsAndMarkRead: (
conversationId: string,
newestUnreadId: number
) => Promise<
Array<Pick<ReactionType, 'targetAuthorUuid' | 'targetTimestamp'>>
>;
markReactionAsRead: (
targetAuthorUuid: string,
targetTimestamp: number
) => Promise<ReactionType | undefined>;
removeReactionFromConversation: (reaction: {
emoji: string;
fromId: string;
targetAuthorUuid: string;
targetTimestamp: number;
}) => Promise<void>;
addReaction: (reactionObj: ReactionType) => Promise<void>;
removeConversation: (
id: string,
options: { Conversation: typeof ConversationModel }

View file

@ -36,23 +36,30 @@ import { combineNames } from '../util/combineNames';
import { dropNull } from '../util/dropNull';
import { isNormalNumber } from '../util/isNormalNumber';
import { isNotNil } from '../util/isNotNil';
import { parseIntOrThrow } from '../util/parseIntOrThrow';
import { ConversationColorType, CustomColorType } from '../types/Colors';
import {
AllItemsType,
AttachmentDownloadJobType,
ConversationMetricsType,
ConversationType,
EmojiType,
IdentityKeyType,
AllItemsType,
ItemKeyType,
ItemType,
MessageMetricsType,
MessageType,
MessageTypeUnhydrated,
MessageMetricsType,
PreKeyType,
SearchResultMessageType,
SenderKeyType,
SentMessageDBType,
SentMessagesType,
SentProtoType,
SentProtoWithMessageIdsType,
SentRecipientsDBType,
SentRecipientsType,
ServerInterface,
SessionType,
SignedPreKeyType,
@ -63,14 +70,6 @@ import {
UnprocessedUpdateType,
} from './Interface';
declare global {
// We want to extend `Function`'s properties, so we need to use an interface.
// eslint-disable-next-line no-restricted-syntax
interface Function {
needsSerial?: boolean;
}
}
type JSONRows = Array<{ readonly json: string }>;
type ConversationRow = Readonly<{
json: string;
@ -137,6 +136,17 @@ const dataInterface: ServerInterface = {
getAllSenderKeys,
removeSenderKeyById,
insertSentProto,
deleteSentProtosOlderThan,
deleteSentProtoByMessageId,
insertProtoRecipients,
deleteSentProtoRecipient,
getSentProtoByRecipient,
removeAllSentProtos,
getAllSentProtos,
_getAllSentProtoRecipients,
_getAllSentProtoMessageIds,
createOrUpdateSession,
createOrUpdateSessions,
commitSessionsAndUnprocessed,
@ -253,16 +263,16 @@ type DatabaseQueryCache = Map<string, Statement<Array<any>>>;
const statementCache = new WeakMap<Database, DatabaseQueryCache>();
function prepare(db: Database, query: string): Statement<Query> {
function prepare<T>(db: Database, query: string): Statement<T> {
let dbCache = statementCache.get(db);
if (!dbCache) {
dbCache = new Map();
statementCache.set(db, dbCache);
}
let result = dbCache.get(query);
let result = dbCache.get(query) as Statement<T>;
if (!result) {
result = db.prepare(query);
result = db.prepare<T>(query);
dbCache.set(query, result);
}
@ -1947,6 +1957,84 @@ function updateToSchemaVersion36(currentVersion: number, db: Database) {
console.log('updateToSchemaVersion36: success!');
}
function updateToSchemaVersion37(currentVersion: number, db: Database) {
if (currentVersion >= 37) {
return;
}
db.transaction(() => {
db.exec(`
-- Create send log primary table
CREATE TABLE sendLogPayloads(
id INTEGER PRIMARY KEY ASC,
timestamp INTEGER NOT NULL,
contentHint INTEGER NOT NULL,
proto BLOB NOT NULL
);
CREATE INDEX sendLogPayloadsByTimestamp ON sendLogPayloads (timestamp);
-- Create send log recipients table with foreign key relationship to payloads
CREATE TABLE sendLogRecipients(
payloadId INTEGER NOT NULL,
recipientUuid STRING NOT NULL,
deviceId INTEGER NOT NULL,
PRIMARY KEY (payloadId, recipientUuid, deviceId),
CONSTRAINT sendLogRecipientsForeignKey
FOREIGN KEY (payloadId)
REFERENCES sendLogPayloads(id)
ON DELETE CASCADE
);
CREATE INDEX sendLogRecipientsByRecipient
ON sendLogRecipients (recipientUuid, deviceId);
-- Create send log messages table with foreign key relationship to payloads
CREATE TABLE sendLogMessageIds(
payloadId INTEGER NOT NULL,
messageId STRING NOT NULL,
PRIMARY KEY (payloadId, messageId),
CONSTRAINT sendLogMessageIdsForeignKey
FOREIGN KEY (payloadId)
REFERENCES sendLogPayloads(id)
ON DELETE CASCADE
);
CREATE INDEX sendLogMessageIdsByMessage
ON sendLogMessageIds (messageId);
-- Recreate messages table delete trigger with send log support
DROP TRIGGER messages_on_delete;
CREATE TRIGGER messages_on_delete AFTER DELETE ON messages BEGIN
DELETE FROM messages_fts WHERE rowid = old.rowid;
DELETE FROM sendLogPayloads WHERE id IN (
SELECT payloadId FROM sendLogMessageIds
WHERE messageId = old.id
);
END;
--- Add messageId column to reactions table to properly track proto associations
ALTER TABLE reactions ADD column messageId STRING;
`);
db.pragma('user_version = 37');
})();
console.log('updateToSchemaVersion37: success!');
}
const SCHEMA_VERSIONS = [
updateToSchemaVersion1,
updateToSchemaVersion2,
@ -1984,6 +2072,7 @@ const SCHEMA_VERSIONS = [
updateToSchemaVersion34,
updateToSchemaVersion35,
updateToSchemaVersion36,
updateToSchemaVersion37,
];
function updateSchema(db: Database): void {
@ -2350,11 +2439,11 @@ async function getSenderKeyById(
}
async function removeAllSenderKeys(): Promise<void> {
const db = getInstance();
prepare(db, 'DELETE FROM senderKeys').run({});
prepare<EmptyQuery>(db, 'DELETE FROM senderKeys').run();
}
async function getAllSenderKeys(): Promise<Array<SenderKeyType>> {
const db = getInstance();
const rows = prepare(db, 'SELECT * FROM senderKeys').all({});
const rows = prepare<EmptyQuery>(db, 'SELECT * FROM senderKeys').all();
return rows;
}
@ -2363,6 +2452,317 @@ async function removeSenderKeyById(id: string): Promise<void> {
prepare(db, 'DELETE FROM senderKeys WHERE id = $id').run({ id });
}
async function insertSentProto(
proto: SentProtoType,
options: {
recipients: SentRecipientsType;
messageIds: SentMessagesType;
}
): Promise<number> {
const db = getInstance();
const { recipients, messageIds } = options;
// Note: we use `pluck` in this function to fetch only the first column of returned row.
return db.transaction(() => {
// 1. Insert the payload, fetching its primary key id
const info = prepare(
db,
`
INSERT INTO sendLogPayloads (
contentHint,
proto,
timestamp
) VALUES (
$contentHint,
$proto,
$timestamp
);
`
).run(proto);
const id = parseIntOrThrow(
info.lastInsertRowid,
'insertSentProto/lastInsertRowid'
);
// 2. Insert a record for each recipient device.
const recipientStatement = prepare(
db,
`
INSERT INTO sendLogRecipients (
payloadId,
recipientUuid,
deviceId
) VALUES (
$id,
$recipientUuid,
$deviceId
);
`
);
const recipientUuids = Object.keys(recipients);
for (const recipientUuid of recipientUuids) {
const deviceIds = recipients[recipientUuid];
for (const deviceId of deviceIds) {
recipientStatement.run({
id,
recipientUuid,
deviceId,
});
}
}
// 2. Insert a record for each message referenced by this payload.
const messageStatement = prepare(
db,
`
INSERT INTO sendLogMessageIds (
payloadId,
messageId
) VALUES (
$id,
$messageId
);
`
);
for (const messageId of messageIds) {
messageStatement.run({
id,
messageId,
});
}
return id;
})();
}
async function deleteSentProtosOlderThan(timestamp: number): Promise<void> {
const db = getInstance();
prepare(
db,
`
DELETE FROM sendLogPayloads
WHERE
timestamp IS NULL OR
timestamp < $timestamp;
`
).run({
timestamp,
});
}
async function deleteSentProtoByMessageId(messageId: string): Promise<void> {
const db = getInstance();
prepare(
db,
`
DELETE FROM sendLogPayloads WHERE id IN (
SELECT payloadId FROM sendLogMessageIds
WHERE messageId = $messageId
);
`
).run({
messageId,
});
}
async function insertProtoRecipients({
id,
recipientUuid,
deviceIds,
}: {
id: number;
recipientUuid: string;
deviceIds: Array<number>;
}): Promise<void> {
const db = getInstance();
db.transaction(() => {
const statement = prepare(
db,
`
INSERT INTO sendLogRecipients (
payloadId,
recipientUuid,
deviceId
) VALUES (
$id,
$recipientUuid,
$deviceId
);
`
);
for (const deviceId of deviceIds) {
statement.run({
id,
recipientUuid,
deviceId,
});
}
})();
}
async function deleteSentProtoRecipient({
timestamp,
recipientUuid,
deviceId,
}: {
timestamp: number;
recipientUuid: string;
deviceId: number;
}): Promise<void> {
const db = getInstance();
// Note: we use `pluck` in this function to fetch only the first column of returned row.
db.transaction(() => {
// 1. Figure out what payload we're talking about.
const rows = prepare(
db,
`
SELECT sendLogPayloads.id FROM sendLogPayloads
INNER JOIN sendLogRecipients
ON sendLogRecipients.payloadId = sendLogPayloads.id
WHERE
sendLogPayloads.timestamp = $timestamp AND
sendLogRecipients.recipientUuid = $recipientUuid AND
sendLogRecipients.deviceId = $deviceId;
`
).all({ timestamp, recipientUuid, deviceId });
if (!rows.length) {
return;
}
if (rows.length > 1) {
console.warn(
`deleteSentProtoRecipient: More than one payload matches recipient and timestamp ${timestamp}. Using the first.`
);
return;
}
const { id } = rows[0];
// 2. Delete the recipient/device combination in question.
prepare(
db,
`
DELETE FROM sendLogRecipients
WHERE
payloadId = $id AND
recipientUuid = $recipientUuid AND
deviceId = $deviceId;
`
).run({ id, recipientUuid, deviceId });
// 3. See how many more recipient devices there were for this payload.
const remaining = prepare(
db,
'SELECT count(*) FROM sendLogRecipients WHERE payloadId = $id;'
)
.pluck(true)
.get({ id });
if (!isNumber(remaining)) {
throw new Error(
'deleteSentProtoRecipient: select count() returned non-number!'
);
}
if (remaining > 0) {
return;
}
// 4. Delete the entire payload if there are no more recipients left.
console.info(
`deleteSentProtoRecipient: Deleting proto payload for timestamp ${timestamp}`
);
prepare(db, 'DELETE FROM sendLogPayloads WHERE id = $id;').run({
id,
});
})();
}
async function getSentProtoByRecipient({
now,
recipientUuid,
timestamp,
}: {
now: number;
recipientUuid: string;
timestamp: number;
}): Promise<SentProtoWithMessageIdsType | undefined> {
const db = getInstance();
const HOUR = 1000 * 60 * 60;
const oneDayAgo = now - HOUR * 24;
await deleteSentProtosOlderThan(oneDayAgo);
const row = prepare(
db,
`
SELECT
sendLogPayloads.*,
GROUP_CONCAT(DISTINCT sendLogMessageIds.messageId) AS messageIds
FROM sendLogPayloads
INNER JOIN sendLogRecipients ON sendLogRecipients.payloadId = sendLogPayloads.id
LEFT JOIN sendLogMessageIds ON sendLogMessageIds.payloadId = sendLogPayloads.id
WHERE
sendLogPayloads.timestamp = $timestamp AND
sendLogRecipients.recipientUuid = $recipientUuid
GROUP BY sendLogPayloads.id;
`
).get({
timestamp,
recipientUuid,
});
if (!row) {
return undefined;
}
const { messageIds } = row;
return {
...row,
messageIds: messageIds ? messageIds.split(',') : [],
};
}
async function removeAllSentProtos(): Promise<void> {
const db = getInstance();
prepare<EmptyQuery>(db, 'DELETE FROM sendLogPayloads;').run();
}
async function getAllSentProtos(): Promise<Array<SentProtoType>> {
const db = getInstance();
const rows = prepare<EmptyQuery>(db, 'SELECT * FROM sendLogPayloads;').all();
return rows;
}
async function _getAllSentProtoRecipients(): Promise<
Array<SentRecipientsDBType>
> {
const db = getInstance();
const rows = prepare<EmptyQuery>(
db,
'SELECT * FROM sendLogRecipients;'
).all();
return rows;
}
async function _getAllSentProtoMessageIds(): Promise<Array<SentMessageDBType>> {
const db = getInstance();
const rows = prepare<EmptyQuery>(
db,
'SELECT * FROM sendLogMessageIds;'
).all();
return rows;
}
const SESSIONS_TABLE = 'sessions';
function createOrUpdateSessionSync(data: SessionType): void {
const db = getInstance();
@ -2717,8 +3117,7 @@ function updateConversationSync(data: ConversationType): void {
? members.join(' ')
: null;
prepare(
db,
db.prepare(
`
UPDATE conversations SET
json = $json,
@ -3470,13 +3869,18 @@ async function getUnreadByConversationAndMarkRead(
async function getUnreadReactionsAndMarkRead(
conversationId: string,
newestUnreadId: number
): Promise<Array<Pick<ReactionType, 'targetAuthorUuid' | 'targetTimestamp'>>> {
): Promise<
Array<
Pick<ReactionType, 'targetAuthorUuid' | 'targetTimestamp' | 'messageId'>
>
> {
const db = getInstance();
return db.transaction(() => {
const unreadMessages = db
.prepare<Query>(
`
SELECT targetAuthorUuid, targetTimestamp
SELECT targetAuthorUuid, targetTimestamp, messageId
FROM reactions WHERE
unread = 1 AND
conversationId = $conversationId AND
@ -3548,6 +3952,7 @@ async function addReaction({
conversationId,
emoji,
fromId,
messageId,
messageReceivedAt,
targetAuthorUuid,
targetTimestamp,
@ -3559,6 +3964,7 @@ async function addReaction({
conversationId,
emoji,
fromId,
messageId,
messageReceivedAt,
targetAuthorUuid,
targetTimestamp,
@ -3567,6 +3973,7 @@ async function addReaction({
$conversationId,
$emoji,
$fromId,
$messageId,
$messageReceivedAt,
$targetAuthorUuid,
$targetTimestamp,
@ -3577,6 +3984,7 @@ async function addReaction({
conversationId,
emoji,
fromId,
messageId,
messageReceivedAt,
targetAuthorUuid,
targetTimestamp,