Batch deleteSentProtoRecipient queries
This commit is contained in:
parent
b71e4875e6
commit
6f3191117f
5 changed files with 151 additions and 81 deletions
|
@ -13,11 +13,13 @@ import { isOutgoing } from '../state/selectors/message';
|
|||
import { isDirectConversation } from '../util/whatTypeOfConversation';
|
||||
import { getOwn } from '../util/getOwn';
|
||||
import { missingCaseError } from '../util/missingCaseError';
|
||||
import { createWaitBatcher } from '../util/waitBatcher';
|
||||
import {
|
||||
SendActionType,
|
||||
SendStatus,
|
||||
sendStateReducer,
|
||||
} from '../messages/MessageSendState';
|
||||
import type { DeleteSentProtoRecipientOptionsType } from '../sql/Interface';
|
||||
import dataInterface from '../sql/Client';
|
||||
|
||||
const { deleteSentProtoRecipient } = dataInterface;
|
||||
|
@ -40,6 +42,18 @@ class MessageReceiptModel extends Model<MessageReceiptAttributesType> {}
|
|||
|
||||
let singleton: MessageReceipts | undefined;
|
||||
|
||||
const deleteSentProtoBatcher = createWaitBatcher({
|
||||
name: 'deleteSentProtoBatcher',
|
||||
wait: 250,
|
||||
maxSize: 30,
|
||||
async processBatch(items: Array<DeleteSentProtoRecipientOptionsType>) {
|
||||
window.log.info(
|
||||
`MessageReceipts: Batching ${items.length} sent proto recipients deletes`
|
||||
);
|
||||
await deleteSentProtoRecipient(items);
|
||||
},
|
||||
});
|
||||
|
||||
async function getTargetMessage(
|
||||
sourceId: string,
|
||||
messages: MessageModelCollectionType
|
||||
|
@ -202,7 +216,7 @@ export class MessageReceipts extends Collection<MessageReceiptModel> {
|
|||
const deviceId = receipt.get('sourceDevice');
|
||||
|
||||
if (recipientUuid && deviceId) {
|
||||
await deleteSentProtoRecipient({
|
||||
await deleteSentProtoBatcher.add({
|
||||
timestamp: messageSentAt,
|
||||
recipientUuid,
|
||||
deviceId,
|
||||
|
|
|
@ -48,6 +48,7 @@ import {
|
|||
ClientJobType,
|
||||
ClientSearchResultMessageType,
|
||||
ConversationType,
|
||||
DeleteSentProtoRecipientOptionsType,
|
||||
IdentityKeyType,
|
||||
ItemKeyType,
|
||||
ItemType,
|
||||
|
@ -825,11 +826,11 @@ async function insertProtoRecipients(options: {
|
|||
}): Promise<void> {
|
||||
await channels.insertProtoRecipients(options);
|
||||
}
|
||||
async function deleteSentProtoRecipient(options: {
|
||||
timestamp: number;
|
||||
recipientUuid: string;
|
||||
deviceId: number;
|
||||
}): Promise<void> {
|
||||
async function deleteSentProtoRecipient(
|
||||
options:
|
||||
| DeleteSentProtoRecipientOptionsType
|
||||
| ReadonlyArray<DeleteSentProtoRecipientOptionsType>
|
||||
): Promise<void> {
|
||||
await channels.deleteSentProtoRecipient(options);
|
||||
}
|
||||
|
||||
|
|
|
@ -215,6 +215,12 @@ export type LastConversationMessagesType = {
|
|||
hasUserInitiatedMessages: boolean;
|
||||
};
|
||||
|
||||
export type DeleteSentProtoRecipientOptionsType = Readonly<{
|
||||
timestamp: number;
|
||||
recipientUuid: string;
|
||||
deviceId: number;
|
||||
}>;
|
||||
|
||||
export type DataInterface = {
|
||||
close: () => Promise<void>;
|
||||
removeDB: () => Promise<void>;
|
||||
|
@ -267,11 +273,11 @@ export type DataInterface = {
|
|||
recipientUuid: string;
|
||||
deviceIds: Array<number>;
|
||||
}) => Promise<void>;
|
||||
deleteSentProtoRecipient: (options: {
|
||||
timestamp: number;
|
||||
recipientUuid: string;
|
||||
deviceId: number;
|
||||
}) => Promise<void>;
|
||||
deleteSentProtoRecipient: (
|
||||
options:
|
||||
| DeleteSentProtoRecipientOptionsType
|
||||
| ReadonlyArray<DeleteSentProtoRecipientOptionsType>
|
||||
) => Promise<void>;
|
||||
getSentProtoByRecipient: (options: {
|
||||
now: number;
|
||||
recipientUuid: string;
|
||||
|
|
146
ts/sql/Server.ts
146
ts/sql/Server.ts
|
@ -52,6 +52,7 @@ import {
|
|||
AttachmentDownloadJobType,
|
||||
ConversationMetricsType,
|
||||
ConversationType,
|
||||
DeleteSentProtoRecipientOptionsType,
|
||||
EmojiType,
|
||||
IdentityKeyType,
|
||||
ItemKeyType,
|
||||
|
@ -2708,82 +2709,87 @@ async function insertProtoRecipients({
|
|||
})();
|
||||
}
|
||||
|
||||
async function deleteSentProtoRecipient({
|
||||
timestamp,
|
||||
recipientUuid,
|
||||
deviceId,
|
||||
}: {
|
||||
timestamp: number;
|
||||
recipientUuid: string;
|
||||
deviceId: number;
|
||||
}): Promise<void> {
|
||||
async function deleteSentProtoRecipient(
|
||||
options:
|
||||
| DeleteSentProtoRecipientOptionsType
|
||||
| ReadonlyArray<DeleteSentProtoRecipientOptionsType>
|
||||
): Promise<void> {
|
||||
const db = getInstance();
|
||||
|
||||
// Note: we use `pluck` in this function to fetch only the first column of returned row.
|
||||
const items = Array.isArray(options) ? options : [options];
|
||||
|
||||
// Note: we use `pluck` in this function to fetch only the first column of
|
||||
// returned row.
|
||||
|
||||
db.transaction(() => {
|
||||
// 1. Figure out what payload we're talking about.
|
||||
const rows = prepare(
|
||||
db,
|
||||
`
|
||||
SELECT sendLogPayloads.id FROM sendLogPayloads
|
||||
INNER JOIN sendLogRecipients
|
||||
ON sendLogRecipients.payloadId = sendLogPayloads.id
|
||||
WHERE
|
||||
sendLogPayloads.timestamp = $timestamp AND
|
||||
sendLogRecipients.recipientUuid = $recipientUuid AND
|
||||
sendLogRecipients.deviceId = $deviceId;
|
||||
`
|
||||
).all({ timestamp, recipientUuid, deviceId });
|
||||
if (!rows.length) {
|
||||
return;
|
||||
}
|
||||
if (rows.length > 1) {
|
||||
console.warn(
|
||||
`deleteSentProtoRecipient: More than one payload matches recipient and timestamp ${timestamp}. Using the first.`
|
||||
for (const item of items) {
|
||||
const { timestamp, recipientUuid, deviceId } = item;
|
||||
|
||||
// 1. Figure out what payload we're talking about.
|
||||
const rows = prepare(
|
||||
db,
|
||||
`
|
||||
SELECT sendLogPayloads.id FROM sendLogPayloads
|
||||
INNER JOIN sendLogRecipients
|
||||
ON sendLogRecipients.payloadId = sendLogPayloads.id
|
||||
WHERE
|
||||
sendLogPayloads.timestamp = $timestamp AND
|
||||
sendLogRecipients.recipientUuid = $recipientUuid AND
|
||||
sendLogRecipients.deviceId = $deviceId;
|
||||
`
|
||||
).all({ timestamp, recipientUuid, deviceId });
|
||||
if (!rows.length) {
|
||||
continue;
|
||||
}
|
||||
if (rows.length > 1) {
|
||||
console.warn(
|
||||
'deleteSentProtoRecipient: More than one payload matches ' +
|
||||
`recipient and timestamp ${timestamp}. Using the first.`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
const { id } = rows[0];
|
||||
|
||||
// 2. Delete the recipient/device combination in question.
|
||||
prepare(
|
||||
db,
|
||||
`
|
||||
DELETE FROM sendLogRecipients
|
||||
WHERE
|
||||
payloadId = $id AND
|
||||
recipientUuid = $recipientUuid AND
|
||||
deviceId = $deviceId;
|
||||
`
|
||||
).run({ id, recipientUuid, deviceId });
|
||||
|
||||
// 3. See how many more recipient devices there were for this payload.
|
||||
const remaining = prepare(
|
||||
db,
|
||||
'SELECT count(*) FROM sendLogRecipients WHERE payloadId = $id;'
|
||||
)
|
||||
.pluck(true)
|
||||
.get({ id });
|
||||
|
||||
if (!isNumber(remaining)) {
|
||||
throw new Error(
|
||||
'deleteSentProtoRecipient: select count() returned non-number!'
|
||||
);
|
||||
}
|
||||
|
||||
if (remaining > 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 4. Delete the entire payload if there are no more recipients left.
|
||||
console.info(
|
||||
'deleteSentProtoRecipient: ' +
|
||||
`Deleting proto payload for timestamp ${timestamp}`
|
||||
);
|
||||
return;
|
||||
prepare(db, 'DELETE FROM sendLogPayloads WHERE id = $id;').run({
|
||||
id,
|
||||
});
|
||||
}
|
||||
|
||||
const { id } = rows[0];
|
||||
|
||||
// 2. Delete the recipient/device combination in question.
|
||||
prepare(
|
||||
db,
|
||||
`
|
||||
DELETE FROM sendLogRecipients
|
||||
WHERE
|
||||
payloadId = $id AND
|
||||
recipientUuid = $recipientUuid AND
|
||||
deviceId = $deviceId;
|
||||
`
|
||||
).run({ id, recipientUuid, deviceId });
|
||||
|
||||
// 3. See how many more recipient devices there were for this payload.
|
||||
const remaining = prepare(
|
||||
db,
|
||||
'SELECT count(*) FROM sendLogRecipients WHERE payloadId = $id;'
|
||||
)
|
||||
.pluck(true)
|
||||
.get({ id });
|
||||
|
||||
if (!isNumber(remaining)) {
|
||||
throw new Error(
|
||||
'deleteSentProtoRecipient: select count() returned non-number!'
|
||||
);
|
||||
}
|
||||
|
||||
if (remaining > 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 4. Delete the entire payload if there are no more recipients left.
|
||||
console.info(
|
||||
`deleteSentProtoRecipient: Deleting proto payload for timestamp ${timestamp}`
|
||||
);
|
||||
prepare(db, 'DELETE FROM sendLogPayloads WHERE id = $id;').run({
|
||||
id,
|
||||
});
|
||||
})();
|
||||
}
|
||||
|
||||
|
|
|
@ -416,6 +416,49 @@ describe('sendLog', () => {
|
|||
assert.lengthOf(await getAllSentProtos(), 0);
|
||||
assert.lengthOf(await _getAllSentProtoRecipients(), 0);
|
||||
});
|
||||
|
||||
it('deletes multiple recipients in a single transaction', async () => {
|
||||
const timestamp = Date.now();
|
||||
|
||||
const recipientUuid1 = getGuid();
|
||||
const recipientUuid2 = getGuid();
|
||||
const proto = {
|
||||
contentHint: 1,
|
||||
proto: Buffer.from(getRandomBytes(128)),
|
||||
timestamp,
|
||||
};
|
||||
await insertSentProto(proto, {
|
||||
messageIds: [getGuid()],
|
||||
recipients: {
|
||||
[recipientUuid1]: [1, 2],
|
||||
[recipientUuid2]: [1],
|
||||
},
|
||||
});
|
||||
|
||||
assert.lengthOf(await getAllSentProtos(), 1);
|
||||
assert.lengthOf(await _getAllSentProtoRecipients(), 3);
|
||||
|
||||
await deleteSentProtoRecipient([
|
||||
{
|
||||
timestamp,
|
||||
recipientUuid: recipientUuid1,
|
||||
deviceId: 1,
|
||||
},
|
||||
{
|
||||
timestamp,
|
||||
recipientUuid: recipientUuid1,
|
||||
deviceId: 2,
|
||||
},
|
||||
{
|
||||
timestamp,
|
||||
recipientUuid: recipientUuid2,
|
||||
deviceId: 1,
|
||||
},
|
||||
]);
|
||||
|
||||
assert.lengthOf(await getAllSentProtos(), 0);
|
||||
assert.lengthOf(await _getAllSentProtoRecipients(), 0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('#getSentProtoByRecipient', () => {
|
||||
|
|
Loading…
Reference in a new issue