Optimize read messages query
This commit is contained in:
parent
0ccc607100
commit
1276368f94
1 changed files with 99 additions and 64 deletions
163
ts/sql/Server.ts
163
ts/sql/Server.ts
|
@ -24,8 +24,8 @@ import {
|
||||||
keyBy,
|
keyBy,
|
||||||
last,
|
last,
|
||||||
map,
|
map,
|
||||||
pick,
|
|
||||||
omit,
|
omit,
|
||||||
|
pick,
|
||||||
} from 'lodash';
|
} from 'lodash';
|
||||||
|
|
||||||
import { GroupV2MemberType } from '../model-types.d';
|
import { GroupV2MemberType } from '../model-types.d';
|
||||||
|
@ -3001,7 +3001,7 @@ async function getMessageBySender({
|
||||||
}
|
}
|
||||||
|
|
||||||
function getExpireData(
|
function getExpireData(
|
||||||
messageExpireTimer: number,
|
expireTimer: number,
|
||||||
readAt?: number
|
readAt?: number
|
||||||
): {
|
): {
|
||||||
expirationStartTimestamp: number;
|
expirationStartTimestamp: number;
|
||||||
|
@ -3009,7 +3009,7 @@ function getExpireData(
|
||||||
} {
|
} {
|
||||||
const expirationStartTimestamp = Math.min(Date.now(), readAt || Date.now());
|
const expirationStartTimestamp = Math.min(Date.now(), readAt || Date.now());
|
||||||
const expiresAt = getExpiresAt({
|
const expiresAt = getExpiresAt({
|
||||||
expireTimer: messageExpireTimer,
|
expireTimer,
|
||||||
expirationStartTimestamp,
|
expirationStartTimestamp,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -3026,40 +3026,70 @@ function getExpireData(
|
||||||
}
|
}
|
||||||
|
|
||||||
function updateExpirationTimers(
|
function updateExpirationTimers(
|
||||||
messageExpireTimer: number,
|
conversationId: string,
|
||||||
messagesWithExpireTimer: Set<string>,
|
newestUnreadId: number,
|
||||||
|
messagesWithExpireTimer: Map<
|
||||||
|
string,
|
||||||
|
{
|
||||||
|
id: string;
|
||||||
|
expireTimer: number;
|
||||||
|
}
|
||||||
|
>,
|
||||||
readAt?: number
|
readAt?: number
|
||||||
) {
|
) {
|
||||||
const { expirationStartTimestamp, expiresAt } = getExpireData(
|
|
||||||
messageExpireTimer,
|
|
||||||
readAt
|
|
||||||
);
|
|
||||||
|
|
||||||
const db = getInstance();
|
const db = getInstance();
|
||||||
const stmt = db.prepare<Query>(
|
|
||||||
`
|
const rowsWithSameExpireTimer = new Map();
|
||||||
UPDATE messages
|
for (const row of messagesWithExpireTimer.values()) {
|
||||||
SET
|
const { expireTimer } = row;
|
||||||
unread = 0,
|
const expireTimerRow = rowsWithSameExpireTimer.get(expireTimer);
|
||||||
expires_at = $expiresAt,
|
|
||||||
expirationStartTimestamp = $expirationStartTimestamp,
|
if (expireTimerRow) {
|
||||||
json = json_patch(json, $jsonPatch)
|
return;
|
||||||
WHERE
|
}
|
||||||
id = $id
|
|
||||||
`
|
const { expirationStartTimestamp, expiresAt } = getExpireData(
|
||||||
);
|
expireTimer,
|
||||||
messagesWithExpireTimer.forEach(id => {
|
readAt
|
||||||
stmt.run({
|
);
|
||||||
id,
|
|
||||||
|
rowsWithSameExpireTimer.set(expireTimer, {
|
||||||
expirationStartTimestamp,
|
expirationStartTimestamp,
|
||||||
|
expireTimer,
|
||||||
expiresAt,
|
expiresAt,
|
||||||
jsonPatch: JSON.stringify({
|
|
||||||
expirationStartTimestamp,
|
|
||||||
expires_at: expiresAt,
|
|
||||||
unread: 0,
|
|
||||||
}),
|
|
||||||
});
|
});
|
||||||
});
|
}
|
||||||
|
|
||||||
|
rowsWithSameExpireTimer.forEach(
|
||||||
|
({ expirationStartTimestamp, expireTimer, expiresAt }) => {
|
||||||
|
db.prepare<Query>(
|
||||||
|
`
|
||||||
|
UPDATE messages
|
||||||
|
SET
|
||||||
|
unread = 0,
|
||||||
|
expires_at = $expiresAt,
|
||||||
|
expirationStartTimestamp = $expirationStartTimestamp,
|
||||||
|
json = json_patch(json, $jsonPatch)
|
||||||
|
WHERE
|
||||||
|
unread = 0 AND
|
||||||
|
conversationId = $conversationId AND
|
||||||
|
received_at <= $newestUnreadId AND
|
||||||
|
expireTimer = $expireTimer
|
||||||
|
`
|
||||||
|
).run({
|
||||||
|
conversationId,
|
||||||
|
expirationStartTimestamp,
|
||||||
|
expireTimer,
|
||||||
|
expiresAt,
|
||||||
|
jsonPatch: JSON.stringify({
|
||||||
|
expirationStartTimestamp,
|
||||||
|
expires_at: expiresAt,
|
||||||
|
unread: 0,
|
||||||
|
}),
|
||||||
|
newestUnreadId,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function getUnreadByConversationAndMarkRead(
|
async function getUnreadByConversationAndMarkRead(
|
||||||
|
@ -3074,7 +3104,7 @@ async function getUnreadByConversationAndMarkRead(
|
||||||
const rows = db
|
const rows = db
|
||||||
.prepare<Query>(
|
.prepare<Query>(
|
||||||
`
|
`
|
||||||
SELECT id, expireTimer, expirationStartTimestamp, json
|
SELECT id, expires_at, expireTimer, expirationStartTimestamp, json
|
||||||
FROM messages WHERE
|
FROM messages WHERE
|
||||||
unread = $unread AND
|
unread = $unread AND
|
||||||
conversationId = $conversationId AND
|
conversationId = $conversationId AND
|
||||||
|
@ -3088,44 +3118,48 @@ async function getUnreadByConversationAndMarkRead(
|
||||||
newestUnreadId,
|
newestUnreadId,
|
||||||
});
|
});
|
||||||
|
|
||||||
let messageExpireTimer: number | undefined;
|
const messagesWithExpireTimer: Map<
|
||||||
const messagesWithExpireTimer: Set<string> = new Set();
|
string,
|
||||||
const messagesToMarkRead: Array<string> = [];
|
{
|
||||||
|
id: string;
|
||||||
|
expireTimer: number;
|
||||||
|
}
|
||||||
|
> = new Map();
|
||||||
|
|
||||||
rows.forEach(row => {
|
rows.forEach(row => {
|
||||||
if (row.expireTimer && !row.expirationStartTimestamp) {
|
if (
|
||||||
messageExpireTimer = row.expireTimer;
|
row.expireTimer &&
|
||||||
messagesWithExpireTimer.add(row.id);
|
(!row.expirationStartTimestamp || !row.expires_at)
|
||||||
|
) {
|
||||||
|
messagesWithExpireTimer.set(row.id, {
|
||||||
|
id: row.id,
|
||||||
|
expireTimer: row.expireTimer,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
messagesToMarkRead.push(row.id);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
if (messagesToMarkRead.length) {
|
db.prepare<Query>(
|
||||||
const stmt = db.prepare<Query>(
|
`
|
||||||
|
UPDATE messages
|
||||||
|
SET
|
||||||
|
unread = 0,
|
||||||
|
json = json_patch(json, $jsonPatch)
|
||||||
|
WHERE
|
||||||
|
unread = $unread AND
|
||||||
|
conversationId = $conversationId AND
|
||||||
|
received_at <= $newestUnreadId;
|
||||||
`
|
`
|
||||||
UPDATE messages
|
).run({
|
||||||
SET
|
conversationId,
|
||||||
unread = 0,
|
jsonPatch: JSON.stringify({ unread: 0 }),
|
||||||
json = json_patch(json, $jsonPatch)
|
newestUnreadId,
|
||||||
WHERE
|
unread: 1,
|
||||||
id = $id;
|
});
|
||||||
`
|
|
||||||
);
|
|
||||||
|
|
||||||
messagesToMarkRead.forEach(id =>
|
if (messagesWithExpireTimer.size) {
|
||||||
stmt.run({
|
|
||||||
id,
|
|
||||||
jsonPatch: JSON.stringify({ unread: 0 }),
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (messageExpireTimer && messagesWithExpireTimer.size) {
|
|
||||||
// We use the messageExpireTimer set above from whichever row we have
|
|
||||||
// in the database. Since this is the same conversation the expireTimer
|
|
||||||
// should be the same for all messages within it.
|
|
||||||
updateExpirationTimers(
|
updateExpirationTimers(
|
||||||
messageExpireTimer,
|
conversationId,
|
||||||
|
newestUnreadId,
|
||||||
messagesWithExpireTimer,
|
messagesWithExpireTimer,
|
||||||
readAt
|
readAt
|
||||||
);
|
);
|
||||||
|
@ -3134,9 +3168,10 @@ async function getUnreadByConversationAndMarkRead(
|
||||||
return rows.map(row => {
|
return rows.map(row => {
|
||||||
const json = jsonToObject(row.json);
|
const json = jsonToObject(row.json);
|
||||||
const expireAttrs = {};
|
const expireAttrs = {};
|
||||||
if (messageExpireTimer && messagesWithExpireTimer.has(row.id)) {
|
const expiringMessage = messagesWithExpireTimer.get(row.id);
|
||||||
|
if (expiringMessage) {
|
||||||
const { expirationStartTimestamp, expiresAt } = getExpireData(
|
const { expirationStartTimestamp, expiresAt } = getExpireData(
|
||||||
messageExpireTimer,
|
expiringMessage.expireTimer,
|
||||||
readAt
|
readAt
|
||||||
);
|
);
|
||||||
Object.assign(expireAttrs, {
|
Object.assign(expireAttrs, {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue