Send/Receive support for reaction read syncs

This commit is contained in:
Josh Perez 2021-05-06 18:15:25 -07:00 committed by GitHub
parent 82a9705010
commit e0c324e4ba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 1188 additions and 498 deletions

View file

@ -28,13 +28,14 @@ import {
omit,
} from 'lodash';
import { assert } from '../util/assert';
import { isNormalNumber } from '../util/isNormalNumber';
import { combineNames } from '../util/combineNames';
import { isNotNil } from '../util/isNotNil';
import { GroupV2MemberType } from '../model-types.d';
import { ReactionType } from '../types/Reactions';
import { StoredJob } from '../jobs/types';
import { assert } from '../util/assert';
import { combineNames } from '../util/combineNames';
import { getExpiresAt } from '../services/MessageUpdater';
import { isNormalNumber } from '../util/isNormalNumber';
import { isNotNil } from '../util/isNotNil';
import {
AttachmentDownloadJobType,
@ -156,7 +157,11 @@ const dataInterface: ServerInterface = {
saveMessages,
removeMessage,
removeMessages,
getUnreadByConversation,
getUnreadByConversationAndMarkRead,
getUnreadReactionsAndMarkRead,
markReactionAsRead,
addReaction,
removeReactionFromConversation,
getMessageBySender,
getMessageById,
_getAllMessages,
@ -1714,6 +1719,39 @@ function updateToSchemaVersion28(currentVersion: number, db: Database) {
})();
}
function updateToSchemaVersion29(currentVersion: number, db: Database) {
if (currentVersion >= 29) {
return;
}
db.transaction(() => {
db.exec(`
CREATE TABLE reactions(
conversationId STRING,
emoji STRING,
fromId STRING,
messageReceivedAt INTEGER,
targetAuthorUuid STRING,
targetTimestamp INTEGER,
unread INTEGER
);
CREATE INDEX reactions_unread ON reactions (
unread,
conversationId
);
CREATE INDEX reaction_identifier ON reactions (
emoji,
targetAuthorUuid,
targetTimestamp
);
`);
db.pragma('user_version = 29');
})();
}
const SCHEMA_VERSIONS = [
updateToSchemaVersion1,
updateToSchemaVersion2,
@ -1743,6 +1781,7 @@ const SCHEMA_VERSIONS = [
updateToSchemaVersion26,
updateToSchemaVersion27,
updateToSchemaVersion28,
updateToSchemaVersion29,
];
function updateSchema(db: Database): void {
@ -2961,25 +3000,298 @@ async function getMessageBySender({
return rows.map(row => jsonToObject(row.json));
}
async function getUnreadByConversation(
conversationId: string
): Promise<Array<MessageType>> {
function getExpireData(
messageExpireTimer: number,
readAt?: number
): {
expirationStartTimestamp: number;
expiresAt: number;
} {
const expirationStartTimestamp = Math.min(Date.now(), readAt || Date.now());
const expiresAt = getExpiresAt({
expireTimer: messageExpireTimer,
expirationStartTimestamp,
});
// We are guaranteeing an expirationStartTimestamp above so this should
// definitely return a number.
if (!expiresAt || typeof expiresAt !== 'number') {
assert(false, 'Expected expiresAt to be a number');
}
return {
expirationStartTimestamp,
expiresAt,
};
}
function updateExpirationTimers(
messageExpireTimer: number,
messagesWithExpireTimer: Set<string>,
readAt?: number
) {
const { expirationStartTimestamp, expiresAt } = getExpireData(
messageExpireTimer,
readAt
);
const db = getInstance();
const rows: JSONRows = db
.prepare<Query>(
`
SELECT json FROM messages WHERE
unread = $unread AND
conversationId = $conversationId
ORDER BY received_at DESC, sent_at DESC;
`
)
.all({
unread: 1,
conversationId,
const stmt = db.prepare<Query>(
`
UPDATE messages
SET
unread = 0,
expires_at = $expiresAt,
expirationStartTimestamp = $expirationStartTimestamp,
json = json_patch(json, $jsonPatch)
WHERE
id = $id
`
);
messagesWithExpireTimer.forEach(id => {
stmt.run({
id,
expirationStartTimestamp,
expiresAt,
jsonPatch: JSON.stringify({
expirationStartTimestamp,
expires_at: expiresAt,
unread: 0,
}),
});
});
}
async function getUnreadByConversationAndMarkRead(
conversationId: string,
newestUnreadId: number,
readAt?: number
): Promise<
Array<Pick<MessageType, 'id' | 'source' | 'sourceUuid' | 'sent_at' | 'type'>>
> {
const db = getInstance();
return db.transaction(() => {
const rows = db
.prepare<Query>(
`
SELECT id, expireTimer, expirationStartTimestamp, json
FROM messages WHERE
unread = $unread AND
conversationId = $conversationId AND
received_at <= $newestUnreadId
ORDER BY received_at DESC, sent_at DESC;
`
)
.all({
unread: 1,
conversationId,
newestUnreadId,
});
let messageExpireTimer: number | undefined;
const messagesWithExpireTimer: Set<string> = new Set();
const messagesToMarkRead: Array<string> = [];
rows.forEach(row => {
if (row.expireTimer && !row.expirationStartTimestamp) {
messageExpireTimer = row.expireTimer;
messagesWithExpireTimer.add(row.id);
}
messagesToMarkRead.push(row.id);
});
return rows.map(row => jsonToObject(row.json));
if (messagesToMarkRead.length) {
const stmt = db.prepare<Query>(
`
UPDATE messages
SET
unread = 0,
json = json_patch(json, $jsonPatch)
WHERE
id = $id;
`
);
messagesToMarkRead.forEach(id =>
stmt.run({
id,
jsonPatch: JSON.stringify({ unread: 0 }),
})
);
}
if (messageExpireTimer && messagesWithExpireTimer.size) {
// We use the messageExpireTimer set above from whichever row we have
// in the database. Since this is the same conversation the expireTimer
// should be the same for all messages within it.
updateExpirationTimers(
messageExpireTimer,
messagesWithExpireTimer,
readAt
);
}
return rows.map(row => {
const json = jsonToObject(row.json);
const expireAttrs = {};
if (messageExpireTimer && messagesWithExpireTimer.has(row.id)) {
const { expirationStartTimestamp, expiresAt } = getExpireData(
messageExpireTimer,
readAt
);
Object.assign(expireAttrs, {
expirationStartTimestamp,
expires_at: expiresAt,
});
}
return {
unread: false,
...pick(json, ['id', 'sent_at', 'source', 'sourceUuid', 'type']),
...expireAttrs,
};
});
})();
}
async function getUnreadReactionsAndMarkRead(
conversationId: string,
newestUnreadId: number
): Promise<Array<Pick<ReactionType, 'targetAuthorUuid' | 'targetTimestamp'>>> {
const db = getInstance();
return db.transaction(() => {
const unreadMessages = db
.prepare<Query>(
`
SELECT targetAuthorUuid, targetTimestamp
FROM reactions WHERE
unread = 1 AND
conversationId = $conversationId AND
messageReceivedAt <= $newestUnreadId;
`
)
.all({
conversationId,
newestUnreadId,
});
db.exec(`
UPDATE reactions SET
unread = 0 WHERE
$conversationId = conversationId AND
$messageReceivedAt <= messageReceivedAt;
`);
return unreadMessages;
})();
}
async function markReactionAsRead(
targetAuthorUuid: string,
targetTimestamp: number
): Promise<ReactionType | undefined> {
const db = getInstance();
return db.transaction(() => {
const readReaction = db
.prepare(
`
SELECT *
FROM reactions
WHERE
targetAuthorUuid = $targetAuthorUuid AND
targetTimestamp = $targetTimestamp AND
unread = 1
ORDER BY rowId DESC
LIMIT 1;
`
)
.get({
targetAuthorUuid,
targetTimestamp,
});
db.prepare(
`
UPDATE reactions SET
unread = 0 WHERE
$targetAuthorUuid = targetAuthorUuid AND
$targetTimestamp = targetTimestamp;
`
).run({
targetAuthorUuid,
targetTimestamp,
});
return readReaction;
})();
}
async function addReaction({
conversationId,
emoji,
fromId,
messageReceivedAt,
targetAuthorUuid,
targetTimestamp,
}: ReactionType): Promise<void> {
const db = getInstance();
await db
.prepare(
`INSERT INTO reactions (
conversationId,
emoji,
fromId,
messageReceivedAt,
targetAuthorUuid,
targetTimestamp,
unread
) VALUES (
$conversationId,
$emoji,
$fromId,
$messageReceivedAt,
$targetAuthorUuid,
$targetTimestamp,
$unread
);`
)
.run({
conversationId,
emoji,
fromId,
messageReceivedAt,
targetAuthorUuid,
targetTimestamp,
unread: 1,
});
}
async function removeReactionFromConversation({
emoji,
fromId,
targetAuthorUuid,
targetTimestamp,
}: {
emoji: string;
fromId: string;
targetAuthorUuid: string;
targetTimestamp: number;
}): Promise<void> {
const db = getInstance();
await db
.prepare(
`DELETE FROM reactions WHERE
emoji = $emoji AND
fromId = $fromId AND
targetAuthorUuid = $targetAuthorUuid AND
targetTimestamp = $targetTimestamp;`
)
.run({
emoji,
fromId,
targetAuthorUuid,
targetTimestamp,
});
}
async function getOlderMessagesByConversation(