Delete Sync: Handle and send mostRecentNonExpiringMessages if needed

This commit is contained in:
Scott Nonnenberg 2024-06-24 10:49:36 -07:00 committed by GitHub
parent 9c0ea4d6ec
commit 08da49a0aa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 356 additions and 46 deletions

View file

@ -670,6 +670,7 @@ message SyncMessage {
optional ConversationIdentifier conversation = 1;
repeated AddressableMessage mostRecentMessages = 2;
optional bool isFullDelete = 3;
repeated AddressableMessage mostRecentNonExpiringMessages = 4;
}
message LocalOnlyConversationDelete {

View file

@ -565,24 +565,6 @@ export async function startApp(): Promise<void> {
storage: window.storage,
serverTrustRoot: window.getServerTrustRoot(),
});
const onFirstEmpty = async () => {
log.info('onFirstEmpty: Starting');
// We want to remove this handler on the next tick so we don't interfere with
// the other handlers being notified of this instance of the 'empty' event.
setTimeout(() => {
messageReceiver?.removeEventListener('empty', onFirstEmpty);
}, 1);
log.info('onFirstEmpty: Fetching sync tasks');
const syncTasks = await window.Signal.Data.getAllSyncTasks();
log.info(`onFirstEmpty: Queuing ${syncTasks.length} sync tasks`);
await queueSyncTasks(syncTasks, window.Signal.Data.removeSyncTaskById);
log.info('onFirstEmpty: Done');
};
messageReceiver.addEventListener('empty', onFirstEmpty);
function queuedEventListener<E extends Event>(
handler: (event: E) => Promise<void> | void,
@ -1459,6 +1441,16 @@ export async function startApp(): Promise<void> {
}
log.info('Expiration start timestamp cleanup: complete');
{
log.info('Startup/syncTasks: Fetching tasks');
const syncTasks = await window.Signal.Data.getAllSyncTasks();
log.info(`Startup/syncTasks: Queueing ${syncTasks.length} sync tasks`);
await queueSyncTasks(syncTasks, window.Signal.Data.removeSyncTaskById);
log.info('`Startup/syncTasks: Done');
}
log.info('listening for registration events');
window.Whisper.events.on('registration_done', () => {
log.info('handling registration event');

View file

@ -193,6 +193,7 @@ const {
getMessageMetricsForConversation,
getMessageById,
getMostRecentAddressableMessages,
getMostRecentAddressableNondisappearingMessages,
getNewerMessagesByConversation,
} = window.Signal.Data;
@ -4999,12 +5000,13 @@ export class ConversationModel extends window.Backbone
}
async destroyMessagesInner({
logId,
logId: providedLogId,
source,
}: {
logId: string;
source: 'message-request' | 'local-delete-sync' | 'local-delete';
}): Promise<void> {
const logId = `${providedLogId}/destroyMessagesInner`;
this.set({
lastMessage: null,
lastMessageAuthor: null,
@ -5032,6 +5034,26 @@ export class ConversationModel extends window.Backbone
.map(getMessageToDelete)
.filter(isNotNil)
.slice(0, 5);
log.info(
`${logId}: Found ${mostRecentMessages.length} most recent messages`
);
const areAnyDisappearing = addressableMessages.some(
item => item.expireTimer
);
let mostRecentNonExpiringMessages: Array<MessageToDelete> | undefined;
if (areAnyDisappearing) {
const nondisappearingAddressableMessages =
await getMostRecentAddressableNondisappearingMessages(this.id);
mostRecentNonExpiringMessages = nondisappearingAddressableMessages
.map(getMessageToDelete)
.filter(isNotNil)
.slice(0, 5);
log.info(
`${logId}: Found ${mostRecentNonExpiringMessages.length} most recent nondisappearing messages`
);
}
if (mostRecentMessages.length > 0) {
await singleProtoJobQueue.add(
@ -5041,6 +5063,7 @@ export class ConversationModel extends window.Backbone
conversation: getConversationToDelete(this.attributes),
isFullDelete: true,
mostRecentMessages,
mostRecentNonExpiringMessages,
timestamp,
},
])

View file

@ -733,6 +733,10 @@ export type DataInterface = {
conversationId: string,
limit?: number
) => Promise<Array<MessageType>>;
getMostRecentAddressableNondisappearingMessages: (
conversationId: string,
limit?: number
) => Promise<Array<MessageType>>;
removeSyncTaskById: (id: string) => Promise<void>;
saveSyncTasks: (tasks: Array<SyncTaskType>) => Promise<void>;

View file

@ -372,6 +372,7 @@ const dataInterface: ServerInterface = {
saveEditedMessage,
saveEditedMessages,
getMostRecentAddressableMessages,
getMostRecentAddressableNondisappearingMessages,
removeSyncTaskById,
saveSyncTasks,
@ -2119,6 +2120,39 @@ export function getMostRecentAddressableMessagesSync(
return rows.map(row => jsonToObject(row.json));
}
async function getMostRecentAddressableNondisappearingMessages(
conversationId: string,
limit = 5
): Promise<Array<MessageType>> {
const db = getReadonlyInstance();
return getMostRecentAddressableNondisappearingMessagesSync(
db,
conversationId,
limit
);
}
export function getMostRecentAddressableNondisappearingMessagesSync(
db: Database,
conversationId: string,
limit = 5
): Array<MessageType> {
const [query, parameters] = sql`
SELECT json FROM messages
INDEXED BY messages_by_date_addressable_nondisappearing
WHERE
expireTimer IS NULL AND
conversationId IS ${conversationId} AND
isAddressableMessage = 1
ORDER BY received_at DESC, sent_at DESC
LIMIT ${limit};
`;
const rows = db.prepare(query).all(parameters);
return rows.map(row => jsonToObject(row.json));
}
async function removeSyncTaskById(id: string): Promise<void> {
const db = await getWritableInstance();
removeSyncTaskByIdSync(db, id);

View file

@ -0,0 +1,31 @@
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import type { Database } from '@signalapp/better-sqlite3';
import type { LoggerType } from '../../types/Logging';
export const version = 1080;
export function updateToSchemaVersion1080(
currentVersion: number,
db: Database,
logger: LoggerType
): void {
if (currentVersion >= 1080) {
return;
}
db.transaction(() => {
db.exec(`
CREATE INDEX messages_by_date_addressable_nondisappearing
ON messages (
conversationId, isAddressableMessage, received_at, sent_at
) WHERE expireTimer IS NULL;
`);
})();
db.pragma('user_version = 1080');
logger.info('updateToSchemaVersion1080: success!');
}

View file

@ -82,10 +82,11 @@ import { updateToSchemaVersion1030 } from './1030-unblock-event';
import { updateToSchemaVersion1040 } from './1040-undownloaded-backed-up-media';
import { updateToSchemaVersion1050 } from './1050-group-send-endorsements';
import { updateToSchemaVersion1060 } from './1060-addressable-messages-and-sync-tasks';
import { updateToSchemaVersion1070 } from './1070-attachment-backup';
import {
updateToSchemaVersion1070,
updateToSchemaVersion1080,
version as MAX_VERSION,
} from './1070-attachment-backup';
} from './1080-nondisappearing-addressable';
function updateToSchemaVersion1(
currentVersion: number,
@ -2036,6 +2037,7 @@ export const SCHEMA_VERSIONS = [
updateToSchemaVersion1050,
updateToSchemaVersion1060,
updateToSchemaVersion1070,
updateToSchemaVersion1080,
];
export class DBVersionFromFutureError extends Error {

View file

@ -0,0 +1,167 @@
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { assert } from 'chai';
import type { Database } from '@signalapp/better-sqlite3';
import SQL from '@signalapp/better-sqlite3';
import { v4 as generateGuid } from 'uuid';
import { getMostRecentAddressableNondisappearingMessagesSync } from '../../sql/Server';
import { insertData, updateToVersion } from './helpers';
import type { MessageAttributesType } from '../../model-types';
import { DurationInSeconds } from '../../util/durations/duration-in-seconds';
/* eslint-disable camelcase */
function generateMessage(json: MessageAttributesType) {
const { conversationId, expireTimer, received_at, sent_at, type } = json;
return {
conversationId,
json,
received_at,
sent_at,
expireTimer: Number(expireTimer),
type,
};
}
describe('SQL/updateToSchemaVersion1080', () => {
let db: Database;
beforeEach(() => {
db = new SQL(':memory:');
updateToVersion(db, 1080);
});
afterEach(() => {
db.close();
});
describe('Addressable Messages', () => {
describe('Storing of new attachment jobs', () => {
it('returns only incoming/outgoing messages', () => {
const conversationId = generateGuid();
const otherConversationId = generateGuid();
insertData(db, 'messages', [
generateMessage({
id: '1',
conversationId,
type: 'incoming',
received_at: 1,
sent_at: 1,
timestamp: 1,
}),
generateMessage({
id: '2',
conversationId,
type: 'story',
received_at: 2,
sent_at: 2,
timestamp: 2,
}),
generateMessage({
id: '3',
conversationId,
type: 'outgoing',
received_at: 3,
sent_at: 3,
timestamp: 3,
}),
generateMessage({
id: '4',
conversationId,
type: 'group-v1-migration',
received_at: 4,
sent_at: 4,
timestamp: 4,
}),
generateMessage({
id: '5',
conversationId,
type: 'group-v2-change',
received_at: 5,
sent_at: 5,
timestamp: 5,
}),
generateMessage({
id: '6',
conversationId,
type: 'incoming',
received_at: 6,
sent_at: 6,
timestamp: 6,
expireTimer: DurationInSeconds.fromMinutes(10),
}),
generateMessage({
id: '7',
conversationId,
type: 'profile-change',
received_at: 7,
sent_at: 7,
timestamp: 7,
}),
generateMessage({
id: '8',
conversationId: otherConversationId,
type: 'incoming',
received_at: 8,
sent_at: 8,
timestamp: 8,
}),
]);
const messages = getMostRecentAddressableNondisappearingMessagesSync(
db,
conversationId
);
assert.lengthOf(messages, 2);
assert.deepEqual(messages, [
{
id: '3',
conversationId,
type: 'outgoing',
received_at: 3,
sent_at: 3,
timestamp: 3,
},
{
id: '1',
conversationId,
type: 'incoming',
received_at: 1,
sent_at: 1,
timestamp: 1,
},
]);
});
it('ensures that index is used for getMostRecentAddressableNondisappearingMessagesSync, with storyId', () => {
const { detail } = db
.prepare(
`
EXPLAIN QUERY PLAN
SELECT json FROM messages
INDEXED BY messages_by_date_addressable_nondisappearing
WHERE
expireTimer IS NULL AND
conversationId IS 'not-important' AND
isAddressableMessage = 1
ORDER BY received_at DESC, sent_at DESC
LIMIT 5;
`
)
.get();
assert.notInclude(detail, 'B-TREE');
assert.notInclude(detail, 'SCAN');
assert.include(
detail,
'SEARCH messages USING INDEX messages_by_date_addressable_nondisappearing (conversationId=? AND isAddressableMessage=?)'
);
});
});
});
});

View file

@ -3708,6 +3708,10 @@ export default class MessageReceiver
const mostRecentMessages = item.mostRecentMessages
?.map(message => processMessageToDelete(message, logId))
.filter(isNotNil);
const mostRecentNonExpiringMessages =
item.mostRecentNonExpiringMessages
?.map(message => processMessageToDelete(message, logId))
.filter(isNotNil);
const conversation = item.conversation
? processConversationToDelete(item.conversation, logId)
: undefined;
@ -3730,6 +3734,7 @@ export default class MessageReceiver
conversation,
isFullDelete: Boolean(item.isFullDelete),
mostRecentMessages,
mostRecentNonExpiringMessages,
timestamp,
};
})

View file

@ -97,6 +97,7 @@ import {
} from '../types/CallDisposition';
import { getProtoForCallHistory } from '../util/callDisposition';
import { CallMode } from '../types/Calling';
import { MAX_MESSAGE_COUNT } from '../util/deleteForMe.types';
export type SendMetadataType = {
[serviceId: ServiceIdString]: {
@ -1518,13 +1519,16 @@ export default class MessageSender {
} else if (item.type === 'delete-conversation') {
const mostRecentMessages =
item.mostRecentMessages.map(toAddressableMessage);
const mostRecentNonExpiringMessages =
item.mostRecentNonExpiringMessages?.map(toAddressableMessage);
const conversation = toConversationIdentifier(item.conversation);
deleteForMe.conversationDeletes = deleteForMe.conversationDeletes || [];
deleteForMe.conversationDeletes.push({
mostRecentMessages,
conversation,
isFullDelete: true,
mostRecentMessages,
mostRecentNonExpiringMessages,
});
} else if (item.type === 'delete-local-conversation') {
const conversation = toConversationIdentifier(item.conversation);
@ -1544,7 +1548,7 @@ export default class MessageSender {
});
if (messageDeletes.size > 0) {
for (const items of messageDeletes.values()) {
for (const [conversationId, items] of messageDeletes.entries()) {
const first = items[0];
if (!first) {
throw new Error('Failed to fetch first from items');
@ -1552,6 +1556,12 @@ export default class MessageSender {
const messages = items.map(item => toAddressableMessage(item.message));
const conversation = toConversationIdentifier(first.conversation);
if (items.length > MAX_MESSAGE_COUNT) {
log.warn(
`getDeleteForMeSyncMessage: Sending ${items.length} message deletes for conversationId ${conversationId}`
);
}
deleteForMe.messageDeletes = deleteForMe.messageDeletes || [];
deleteForMe.messageDeletes.push({
messages,

View file

@ -518,6 +518,7 @@ export const deleteConversationSchema = z.object({
type: z.literal('delete-conversation').readonly(),
conversation: conversationToDeleteSchema,
mostRecentMessages: z.array(messageToDeleteSchema),
mostRecentNonExpiringMessages: z.array(messageToDeleteSchema).optional(),
isFullDelete: z.boolean(),
timestamp: z.number(),
});

View file

@ -256,20 +256,36 @@ export async function applyDeleteAttachmentFromMessage(
return true;
}
export async function deleteConversation(
conversation: ConversationModel,
mostRecentMessages: Array<MessageToDelete>,
isFullDelete: boolean,
logId: string
): Promise<boolean> {
const queries = mostRecentMessages.map(getMessageQueryFromTarget);
async function getMostRecentMatchingMessage(
conversationId: string,
targetMessages: Array<MessageToDelete>
): Promise<MessageAttributesType | undefined> {
const queries = targetMessages.map(getMessageQueryFromTarget);
const found = await Promise.all(
queries.map(query => findMatchingMessage(conversation.id, query))
queries.map(query => findMatchingMessage(conversationId, query))
);
const sorted = sortBy(found, 'received_at');
const newestMessage = last(sorted);
if (newestMessage) {
return last(sorted);
}
export async function deleteConversation(
conversation: ConversationModel,
mostRecentMessages: Array<MessageToDelete>,
mostRecentNonExpiringMessages: Array<MessageToDelete> | undefined,
isFullDelete: boolean,
providedLogId: string
): Promise<boolean> {
const logId = `${providedLogId}/deleteConversation`;
const newestMessage = await getMostRecentMatchingMessage(
conversation.id,
mostRecentMessages
);
if (!newestMessage) {
log.warn(`${logId}: Found no messages from mostRecentMessages set`);
} else {
log.info(`${logId}: Found most recent message from mostRecentMessages set`);
const { received_at: receivedAt } = newestMessage;
await removeMessagesInConversation(conversation.id, {
@ -280,13 +296,34 @@ export async function deleteConversation(
});
}
if (!newestMessage) {
log.warn(`${logId}: Found no target messages for delete`);
if (!newestMessage && mostRecentNonExpiringMessages?.length) {
const newestNondisappearingMessage = await getMostRecentMatchingMessage(
conversation.id,
mostRecentNonExpiringMessages
);
if (!newestNondisappearingMessage) {
log.warn(
`${logId}: Found no messages from mostRecentNonExpiringMessages set`
);
} else {
log.info(
`${logId}: Found most recent message from mostRecentNonExpiringMessages set`
);
const { received_at: receivedAt } = newestNondisappearingMessage;
await removeMessagesInConversation(conversation.id, {
fromSync: true,
receivedAt,
logId: `${logId}(receivedAt=${receivedAt})`,
singleProtoJobQueue,
});
}
}
if (isFullDelete) {
log.info(`${logId}: isFullDelete=true, proceeding to local-only delete`);
return deleteLocalOnlyConversation(conversation, logId);
return deleteLocalOnlyConversation(conversation, providedLogId);
}
return true;
@ -294,17 +331,16 @@ export async function deleteConversation(
export async function deleteLocalOnlyConversation(
conversation: ConversationModel,
logId: string
providedLogId: string
): Promise<boolean> {
const logId = `${providedLogId}/deleteLocalOnlyConversation`;
const limit = 1;
const messages = await getMostRecentAddressableMessages(
conversation.id,
limit
);
if (messages.length > 0) {
log.warn(
`${logId}: Attempted local-only delete but found an addressable message`
);
log.warn(`${logId}: Cannot delete; found an addressable message`);
return false;
}

View file

@ -65,7 +65,7 @@ const SCHEMAS_BY_TYPE: Record<SyncTaskData['type'], ZodSchema> = {
};
function toLogId(task: SyncTaskType) {
return `task=${task.id},timestamp:${task},type=${task.type},envelopeId=${task.envelopeId}`;
return `type=${task.type},envelopeId=${task.envelopeId}`;
}
export async function queueSyncTasks(
@ -112,6 +112,7 @@ export async function queueSyncTasks(
const {
conversation: targetConversation,
mostRecentMessages,
mostRecentNonExpiringMessages,
isFullDelete,
} = parsed;
const conversation = getConversationFromTarget(targetConversation);
@ -121,17 +122,18 @@ export async function queueSyncTasks(
}
drop(
conversation.queueJob(innerLogId, async () => {
log.info(`${logId}: Starting...`);
log.info(`${innerLogId}: Starting...`);
const result = await deleteConversation(
conversation,
mostRecentMessages,
mostRecentNonExpiringMessages,
isFullDelete,
innerLogId
);
if (result) {
await removeSyncTaskById(id);
}
log.info(`${logId}: Done, result=${result}`);
log.info(`${innerLogId}: Done, result=${result}`);
})
);
} else if (parsed.type === 'delete-local-conversation') {
@ -143,7 +145,7 @@ export async function queueSyncTasks(
}
drop(
conversation.queueJob(innerLogId, async () => {
log.info(`${logId}: Starting...`);
log.info(`${innerLogId}: Starting...`);
const result = await deleteLocalOnlyConversation(
conversation,
innerLogId
@ -153,7 +155,7 @@ export async function queueSyncTasks(
// get more messages in this conversation from here!
await removeSyncTaskById(id);
log.info(`${logId}: Done; result=${result}`);
log.info(`${innerLogId}: Done; result=${result}`);
})
);
} else if (parsed.type === 'delete-single-attachment') {
@ -201,7 +203,9 @@ export async function queueSyncTasks(
);
} else {
const parsedType: never = parsed.type;
log.error(`${logId}: Encountered job of type ${parsedType}, removing`);
log.error(
`${innerLogId}: Encountered job of type ${parsedType}, removing`
);
// eslint-disable-next-line no-await-in-loop
await removeSyncTaskById(id);
}