Support for local deletes synced to all your devices

This commit is contained in:
Scott Nonnenberg 2024-05-29 01:56:00 +10:00 committed by GitHub
parent 06f71a7ef8
commit 11eb1782a7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
39 changed files with 2094 additions and 72 deletions

View file

@ -115,7 +115,7 @@ const exclusiveInterface: ClientExclusiveInterface = {
flushUpdateConversationBatcher,
shutdown,
removeAllMessagesInConversation,
removeMessagesInConversation,
removeOtherData,
cleanupOrphanedAttachments,
@ -592,6 +592,21 @@ async function removeMessage(id: string): Promise<void> {
}
}
export async function deleteAndCleanup(
messages: Array<MessageAttributesType>,
logId: string
): Promise<void> {
const ids = messages.map(message => message.id);
log.info(`deleteAndCleanup/${logId}: Deleting ${ids.length} messages...`);
await channels.removeMessages(ids);
log.info(`deleteAndCleanup/${logId}: Cleanup for ${ids.length} messages...`);
await _cleanupMessages(messages);
log.info(`deleteAndCleanup/${logId}: Complete`);
}
async function _cleanupMessages(
messages: ReadonlyArray<MessageAttributesType>
): Promise<void> {
@ -664,12 +679,14 @@ async function getConversationRangeCenteredOnMessage(
};
}
async function removeAllMessagesInConversation(
async function removeMessagesInConversation(
conversationId: string,
{
logId,
receivedAt,
}: {
logId: string;
receivedAt?: number;
}
): Promise<void> {
let messages;
@ -685,6 +702,7 @@ async function removeAllMessagesInConversation(
conversationId,
limit: chunkSize,
includeStoryReplies: true,
receivedAt,
storyId: undefined,
});
@ -692,15 +710,8 @@ async function removeAllMessagesInConversation(
return;
}
const ids = messages.map(message => message.id);
log.info(`removeAllMessagesInConversation/${logId}: Cleanup...`);
// eslint-disable-next-line no-await-in-loop
await _cleanupMessages(messages);
log.info(`removeAllMessagesInConversation/${logId}: Deleting...`);
// eslint-disable-next-line no-await-in-loop
await channels.removeMessages(ids);
await deleteAndCleanup(messages, logId);
} while (messages.length > 0);
}

View file

@ -32,6 +32,7 @@ import type {
import type { CallLinkStateType, CallLinkType } from '../types/CallLink';
import type { AttachmentDownloadJobType } from '../types/AttachmentDownload';
import type { GroupSendEndorsementsData } from '../types/GroupSendEndorsements';
import type { SyncTaskType } from '../util/syncTasks';
export type AdjacentMessagesByConversationOptionsType = Readonly<{
conversationId: string;
@ -716,6 +717,15 @@ export type DataInterface = {
ourAci: AciString,
opts: EditedMessageType
) => Promise<void>;
getMostRecentAddressableMessages: (
conversationId: string,
limit?: number
) => Promise<Array<MessageType>>;
removeSyncTaskById: (id: string) => Promise<void>;
saveSyncTasks: (tasks: Array<SyncTaskType>) => Promise<void>;
getAllSyncTasks: () => Promise<Array<SyncTaskType>>;
getUnprocessedCount: () => Promise<number>;
getUnprocessedByIdsAndIncrementAttempts: (
ids: ReadonlyArray<string>
@ -1043,10 +1053,11 @@ export type ClientExclusiveInterface = {
// Client-side only
shutdown: () => Promise<void>;
removeAllMessagesInConversation: (
removeMessagesInConversation: (
conversationId: string,
options: {
logId: string;
receivedAt?: number;
}
) => Promise<void>;
removeOtherData: () => Promise<void>;

View file

@ -184,6 +184,9 @@ import {
attachmentDownloadJobSchema,
type AttachmentDownloadJobType,
} from '../types/AttachmentDownload';
import { MAX_SYNC_TASK_ATTEMPTS } from '../util/syncTasks.types';
import type { SyncTaskType } from '../util/syncTasks';
import { isMoreRecentThan } from '../util/timestamp';
type ConversationRow = Readonly<{
json: string;
@ -360,6 +363,11 @@ const dataInterface: ServerInterface = {
getMessagesBetween,
getNearbyMessageFromDeletedSet,
saveEditedMessage,
getMostRecentAddressableMessages,
removeSyncTaskById,
saveSyncTasks,
getAllSyncTasks,
getUnprocessedCount,
getUnprocessedByIdsAndIncrementAttempts,
@ -2066,6 +2074,131 @@ function hasUserInitiatedMessages(conversationId: string): boolean {
return exists !== 0;
}
async function getMostRecentAddressableMessages(
conversationId: string,
limit = 5
): Promise<Array<MessageType>> {
const db = getReadonlyInstance();
return getMostRecentAddressableMessagesSync(db, conversationId, limit);
}
export function getMostRecentAddressableMessagesSync(
db: Database,
conversationId: string,
limit = 5
): Array<MessageType> {
const [query, parameters] = sql`
SELECT json FROM messages
INDEXED BY messages_by_date_addressable
WHERE
conversationId IS ${conversationId} AND
isAddressableMessage = 1
ORDER BY received_at DESC, sent_at DESC
LIMIT ${limit};
`;
const rows = db.prepare(query).all(parameters);
return rows.map(row => jsonToObject(row.json));
}
async function removeSyncTaskById(id: string): Promise<void> {
const db = await getWritableInstance();
removeSyncTaskByIdSync(db, id);
}
export function removeSyncTaskByIdSync(db: Database, id: string): void {
const [query, parameters] = sql`
DELETE FROM syncTasks
WHERE id IS ${id}
`;
db.prepare(query).run(parameters);
}
async function saveSyncTasks(tasks: Array<SyncTaskType>): Promise<void> {
const db = await getWritableInstance();
return saveSyncTasksSync(db, tasks);
}
export function saveSyncTasksSync(
db: Database,
tasks: Array<SyncTaskType>
): void {
return db.transaction(() => {
tasks.forEach(task => assertSync(saveSyncTaskSync(db, task)));
})();
}
export function saveSyncTaskSync(db: Database, task: SyncTaskType): void {
const { id, attempts, createdAt, data, envelopeId, sentAt, type } = task;
const [query, parameters] = sql`
INSERT INTO syncTasks (
id,
attempts,
createdAt,
data,
envelopeId,
sentAt,
type
) VALUES (
${id},
${attempts},
${createdAt},
${objectToJSON(data)},
${envelopeId},
${sentAt},
${type}
)
`;
db.prepare(query).run(parameters);
}
async function getAllSyncTasks(): Promise<Array<SyncTaskType>> {
const db = await getWritableInstance();
return getAllSyncTasksSync(db);
}
export function getAllSyncTasksSync(db: Database): Array<SyncTaskType> {
return db.transaction(() => {
const [selectAllQuery] = sql`
SELECT * FROM syncTasks ORDER BY createdAt ASC, sentAt ASC, id ASC
`;
const rows = db.prepare(selectAllQuery).all();
const tasks: Array<SyncTaskType> = rows.map(row => ({
...row,
data: jsonToObject(row.data),
}));
const [query] = sql`
UPDATE syncTasks
SET attempts = attempts + 1
`;
db.prepare(query).run();
const [toDelete, toReturn] = partition(tasks, task => {
if (
isNormalNumber(task.attempts) &&
task.attempts < MAX_SYNC_TASK_ATTEMPTS
) {
return false;
}
if (isMoreRecentThan(task.createdAt, durations.WEEK)) {
return false;
}
return true;
});
if (toDelete.length > 0) {
log.warn(`getAllSyncTasks: Removing ${toDelete.length} expired tasks`);
toDelete.forEach(task => {
assertSync(removeSyncTaskByIdSync(db, task.id));
});
}
return toReturn;
})();
}
function saveMessageSync(
db: Database,
data: MessageType,
@ -6036,6 +6169,7 @@ async function removeAll(): Promise<void> {
DELETE FROM storyDistributionMembers;
DELETE FROM storyDistributions;
DELETE FROM storyReads;
DELETE FROM syncTasks;
DELETE FROM unprocessed;
DELETE FROM uninstalled_sticker_packs;
@ -6078,6 +6212,7 @@ async function removeAllConfiguration(): Promise<void> {
DELETE FROM sendLogRecipients;
DELETE FROM sessions;
DELETE FROM signedPreKeys;
DELETE FROM syncTasks;
DELETE FROM unprocessed;
`
);

View file

@ -0,0 +1,56 @@
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import type { Database } from '@signalapp/better-sqlite3';
import type { LoggerType } from '../../types/Logging';
export const version = 1060;
export function updateToSchemaVersion1060(
currentVersion: number,
db: Database,
logger: LoggerType
): void {
if (currentVersion >= 1060) {
return;
}
db.transaction(() => {
db.exec(`
ALTER TABLE messages
ADD COLUMN isAddressableMessage INTEGER
GENERATED ALWAYS AS (
type IS NULL
OR
type IN (
'incoming',
'outgoing'
)
);
CREATE INDEX messages_by_date_addressable
ON messages (
conversationId, isAddressableMessage, received_at, sent_at
);
CREATE TABLE syncTasks(
id TEXT PRIMARY KEY NOT NULL,
attempts INTEGER NOT NULL,
createdAt INTEGER NOT NULL,
data TEXT NOT NULL,
envelopeId TEXT NOT NULL,
sentAt INTEGER NOT NULL,
type TEXT NOT NULL
) STRICT;
CREATE INDEX syncTasks_order ON syncTasks (
createdAt, sentAt, id
)
`);
})();
db.pragma('user_version = 1060');
logger.info('updateToSchemaVersion1060: success!');
}

View file

@ -80,10 +80,11 @@ import { updateToSchemaVersion1010 } from './1010-call-links-table';
import { updateToSchemaVersion1020 } from './1020-self-merges';
import { updateToSchemaVersion1030 } from './1030-unblock-event';
import { updateToSchemaVersion1040 } from './1040-undownloaded-backed-up-media';
import { updateToSchemaVersion1050 } from './1050-group-send-endorsements';
import {
updateToSchemaVersion1050,
updateToSchemaVersion1060,
version as MAX_VERSION,
} from './1050-group-send-endorsements';
} from './1060-addressable-messages-and-sync-tasks';
function updateToSchemaVersion1(
currentVersion: number,
@ -2025,12 +2026,14 @@ export const SCHEMA_VERSIONS = [
updateToSchemaVersion970,
updateToSchemaVersion980,
updateToSchemaVersion990,
updateToSchemaVersion1000,
updateToSchemaVersion1010,
updateToSchemaVersion1020,
updateToSchemaVersion1030,
updateToSchemaVersion1040,
updateToSchemaVersion1050,
updateToSchemaVersion1060,
];
export class DBVersionFromFutureError extends Error {