From 4027f4604fff5654f8bc5b962a2ca55940384017 Mon Sep 17 00:00:00 2001 From: Jamie Kyle <113370520+jamiebuilds-signal@users.noreply.github.com> Date: Wed, 4 Dec 2024 14:03:29 -0800 Subject: [PATCH] Fix memory usage by batching syncTasks --- ts/background.ts | 12 +-- ts/sql/Interface.ts | 5 +- ts/sql/Server.ts | 96 +++++++++++++--------- ts/sql/migrations/1260-sync-tasks-rowid.ts | 30 +++++++ ts/sql/migrations/index.ts | 6 +- ts/sql/util.ts | 24 +++--- ts/test-node/sql/migration_1060_test.ts | 18 ++-- ts/util/syncTasks.ts | 38 ++++++++- 8 files changed, 154 insertions(+), 75 deletions(-) create mode 100644 ts/sql/migrations/1260-sync-tasks-rowid.ts diff --git a/ts/background.ts b/ts/background.ts index 329192758..2a67b9d61 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -186,7 +186,7 @@ import { AttachmentDownloadManager } from './jobs/AttachmentDownloadManager'; import { onCallLinkUpdateSync } from './util/onCallLinkUpdateSync'; import { CallMode } from './types/CallDisposition'; import type { SyncTaskType } from './util/syncTasks'; -import { queueSyncTasks } from './util/syncTasks'; +import { queueSyncTasks, runAllSyncTasks } from './util/syncTasks'; import type { ViewSyncTaskType } from './messageModifiers/ViewSyncs'; import type { ReceiptSyncTaskType } from './messageModifiers/MessageReceipts'; import type { ReadSyncTaskType } from './messageModifiers/ReadSyncs'; @@ -1469,15 +1469,7 @@ export async function startApp(): Promise { } log.info('Expiration start timestamp cleanup: complete'); - { - log.info('Startup/syncTasks: Fetching tasks'); - const syncTasks = await DataWriter.getAllSyncTasks(); - - log.info(`Startup/syncTasks: Queueing ${syncTasks.length} sync tasks`); - await queueSyncTasks(syncTasks, DataWriter.removeSyncTaskById); - - log.info('Startup/syncTasks: Done'); - } + await runAllSyncTasks(); log.info('listening for registration events'); window.Whisper.events.on('registration_done', () => { diff --git a/ts/sql/Interface.ts b/ts/sql/Interface.ts index be83d6164..715b99ad4 100644 --- a/ts/sql/Interface.ts +++ b/ts/sql/Interface.ts @@ -845,7 +845,10 @@ type WritableInterface = { removeSyncTaskById: (id: string) => void; saveSyncTasks: (tasks: Array) => void; - getAllSyncTasks: () => Array; + dequeueOldestSyncTasks: (previousRowId: number | null) => { + tasks: Array; + lastRowId: number | null; + }; getAllUnprocessedIds: () => Array; getUnprocessedByIdsAndIncrementAttempts: ( diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index 9982b762f..791082113 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -119,7 +119,6 @@ import { } from '../util/search'; import type { SyncTaskType } from '../util/syncTasks'; import { MAX_SYNC_TASK_ATTEMPTS } from '../util/syncTasks.types'; -import { isMoreRecentThan } from '../util/timestamp'; import type { AdjacentMessagesByConversationOptionsType, BackupCdnMediaObjectType, @@ -475,7 +474,7 @@ export const DataWriter: ServerWritableInterface = { removeSyncTaskById, saveSyncTasks, - getAllSyncTasks, + dequeueOldestSyncTasks, getUnprocessedByIdsAndIncrementAttempts, getAllUnprocessedIds, @@ -2158,47 +2157,68 @@ function saveSyncTask(db: WritableDB, task: SyncTaskType): void { db.prepare(query).run(parameters); } -export function getAllSyncTasks(db: WritableDB): Array { + +export function dequeueOldestSyncTasks( + db: WritableDB, + previousRowId: number | null +): { tasks: Array; lastRowId: number | null } { return db.transaction(() => { - const [selectAllQuery] = sql` - SELECT * FROM syncTasks ORDER BY createdAt ASC, sentAt ASC, id ASC + const orderBy = sqlFragment`ORDER BY rowid ASC`; + const limit = sqlFragment`LIMIT 10000`; + const predicate = sqlFragment`rowid > ${previousRowId ?? 0}`; + + const [deleteOldQuery, deleteOldParams] = sql` + DELETE FROM syncTasks + WHERE + attempts >= ${MAX_SYNC_TASK_ATTEMPTS} AND + createdAt < ${Date.now() - durations.WEEK} `; - const rows = db.prepare(selectAllQuery).all(); + const result = db.prepare(deleteOldQuery).run(deleteOldParams); - const tasks: Array = rows.map(row => ({ - ...row, - data: jsonToObject(row.data), - })); - - const [query] = sql` - UPDATE syncTasks - SET attempts = attempts + 1 - `; - db.prepare(query).run(); - - const [toDelete, toReturn] = partition(tasks, task => { - if ( - isNormalNumber(task.attempts) && - task.attempts < MAX_SYNC_TASK_ATTEMPTS - ) { - return false; - } - if (isMoreRecentThan(task.createdAt, durations.WEEK)) { - return false; - } - - return true; - }); - - if (toDelete.length > 0) { - logger.warn(`getAllSyncTasks: Removing ${toDelete.length} expired tasks`); - toDelete.forEach(task => { - removeSyncTaskById(db, task.id); - }); + if (result.changes > 0) { + logger.info( + `dequeueOldestSyncTasks: Deleted ${result.changes} expired sync tasks` + ); } - return toReturn; + const [selectAllQuery, selectAllParams] = sql` + SELECT rowid, * FROM syncTasks + WHERE ${predicate} + ${orderBy} + ${limit} + `; + + const rows = db.prepare(selectAllQuery).all(selectAllParams); + if (!rows.length) { + return { tasks: [], lastRowId: null }; + } + + const firstRowId = rows.at(0)?.rowid; + const lastRowId = rows.at(-1)?.rowid; + + strictAssert(firstRowId, 'dequeueOldestSyncTasks: firstRowId is null'); + strictAssert(lastRowId, 'dequeueOldestSyncTasks: lastRowId is null'); + + const tasks: Array = rows.map(row => { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { rowid: _rowid, ...rest } = row; + return { + ...rest, + data: jsonToObject(row.data), + }; + }); + + const [updateQuery, updateParams] = sql` + UPDATE syncTasks + SET attempts = attempts + 1 + WHERE rowid >= ${firstRowId} + AND rowid <= ${lastRowId} + `; + + db.prepare(updateQuery).run(updateParams); + + return { tasks, lastRowId }; })(); } @@ -7498,7 +7518,7 @@ function enableMessageInsertTriggersAndBackfill(db: WritableDB): void { VALUES (new.rowid, new.body); END; - + DROP TRIGGER IF EXISTS messages_on_insert_insert_mentions; CREATE TRIGGER messages_on_insert_insert_mentions AFTER INSERT ON messages BEGIN diff --git a/ts/sql/migrations/1260-sync-tasks-rowid.ts b/ts/sql/migrations/1260-sync-tasks-rowid.ts new file mode 100644 index 000000000..1e9958fa0 --- /dev/null +++ b/ts/sql/migrations/1260-sync-tasks-rowid.ts @@ -0,0 +1,30 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only +import type { Database } from '@signalapp/better-sqlite3'; +import type { LoggerType } from '../../types/Logging'; +import { sql } from '../util'; + +export const version = 1260; + +export function updateToSchemaVersion1260( + currentVersion: number, + db: Database, + logger: LoggerType +): void { + if (currentVersion >= 1260) { + return; + } + + db.transaction(() => { + const [query] = sql` + DROP INDEX IF EXISTS syncTasks_order; + CREATE INDEX syncTasks_delete ON syncTasks (attempts DESC); + `; + + db.exec(query); + + db.pragma('user_version = 1260'); + })(); + + logger.info('updateToSchemaVersion1260: success!'); +} diff --git a/ts/sql/migrations/index.ts b/ts/sql/migrations/index.ts index cc08e26c1..6e559882a 100644 --- a/ts/sql/migrations/index.ts +++ b/ts/sql/migrations/index.ts @@ -101,10 +101,11 @@ import { updateToSchemaVersion1210 } from './1210-call-history-started-id'; import { updateToSchemaVersion1220 } from './1220-blob-sessions'; import { updateToSchemaVersion1230 } from './1230-call-links-admin-key-index'; import { updateToSchemaVersion1240 } from './1240-defunct-call-links-table'; +import { updateToSchemaVersion1250 } from './1250-defunct-call-links-storage'; import { - updateToSchemaVersion1250, + updateToSchemaVersion1260, version as MAX_VERSION, -} from './1250-defunct-call-links-storage'; +} from './1260-sync-tasks-rowid'; import { DataWriter } from '../Server'; function updateToSchemaVersion1( @@ -2076,6 +2077,7 @@ export const SCHEMA_VERSIONS = [ updateToSchemaVersion1230, updateToSchemaVersion1240, updateToSchemaVersion1250, + updateToSchemaVersion1260, ]; export class DBVersionFromFutureError extends Error { diff --git a/ts/sql/util.ts b/ts/sql/util.ts index 452e163b5..8b98bbea4 100644 --- a/ts/sql/util.ts +++ b/ts/sql/util.ts @@ -4,6 +4,7 @@ import { isNumber, last } from 'lodash'; import type { ReadableDB, WritableDB } from './Interface'; +import type { LoggerType } from '../types/Logging'; export type EmptyQuery = []; export type ArrayQuery = Array>; @@ -162,17 +163,6 @@ export function sql( return [fragment, fragmentParams]; } -type QueryPlanRow = Readonly<{ - id: number; - parent: number; - details: string; -}>; - -type QueryPlan = Readonly<{ - query: string; - plan: ReadonlyArray; -}>; - /** * Returns typed objects of the query plan for the given query. * @@ -189,11 +179,19 @@ type QueryPlan = Readonly<{ */ export function explainQueryPlan( db: ReadableDB, + logger: LoggerType, template: QueryTemplate -): QueryPlan { +): QueryTemplate { const [query, params] = template; const plan = db.prepare(`EXPLAIN QUERY PLAN ${query}`).all(params); - return { query, plan }; + logger.info('EXPLAIN QUERY PLAN'); + for (const line of query.split('\n')) { + logger.info(line); + } + for (const row of plan) { + logger.info(`id=${row.id}, parent=${row.parent}, detail=${row.detail}`); + } + return [query, params]; } // diff --git a/ts/test-node/sql/migration_1060_test.ts b/ts/test-node/sql/migration_1060_test.ts index 2110ff385..4b48f9c5d 100644 --- a/ts/test-node/sql/migration_1060_test.ts +++ b/ts/test-node/sql/migration_1060_test.ts @@ -5,7 +5,7 @@ import { assert } from 'chai'; import { v4 as generateGuid } from 'uuid'; import { - getAllSyncTasks, + dequeueOldestSyncTasks, getMostRecentAddressableMessages, removeSyncTaskById, saveSyncTasks, @@ -217,23 +217,23 @@ describe('SQL/updateToSchemaVersion1060', () => { saveSyncTasks(db, expected); - const actual = getAllSyncTasks(db); - assert.deepEqual(expected, actual, 'before delete'); + const actual = dequeueOldestSyncTasks(db, null); + assert.deepEqual(expected, actual.tasks, 'before delete'); removeSyncTaskById(db, expected[1].id); - const actualAfterDelete = getAllSyncTasks(db); + const actualAfterDelete = dequeueOldestSyncTasks(db, null); assert.deepEqual( [ { ...expected[0], attempts: 2 }, { ...expected[2], attempts: 4 }, ], - actualAfterDelete, + actualAfterDelete.tasks, 'after delete' ); }); - it('getAllSyncTasksSync expired tasks', () => { + it('dequeueOldestSyncTasks expired tasks', () => { const now = Date.now(); const twoWeeksAgo = now - WEEK * 2; const expected: Array = [ @@ -289,10 +289,10 @@ describe('SQL/updateToSchemaVersion1060', () => { saveSyncTasks(db, expected); - const actual = getAllSyncTasks(db); + const actual = dequeueOldestSyncTasks(db, null); - assert.lengthOf(actual, 3); - assert.deepEqual([expected[1], expected[2], expected[3]], actual); + assert.lengthOf(actual.tasks, 3); + assert.deepEqual([expected[1], expected[2], expected[3]], actual.tasks); }); }); }); diff --git a/ts/util/syncTasks.ts b/ts/util/syncTasks.ts index ec1764dd1..99561029b 100644 --- a/ts/util/syncTasks.ts +++ b/ts/util/syncTasks.ts @@ -31,6 +31,7 @@ import { viewSyncTaskSchema, } from '../messageModifiers/ViewSyncs'; import { safeParseUnknown } from './schemas'; +import { DataWriter } from '../sql/Client'; const syncTaskDataSchema = z.union([ deleteMessageSchema, @@ -85,7 +86,7 @@ export async function queueSyncTasks( log.error(`${innerLogId}: Schema not found. Deleting.`); // eslint-disable-next-line no-await-in-loop await removeSyncTaskById(id); - return; + continue; } const parseResult = safeParseUnknown(syncTaskDataSchema, data); if (!parseResult.success) { @@ -94,7 +95,7 @@ export async function queueSyncTasks( ); // eslint-disable-next-line no-await-in-loop await removeSyncTaskById(id); - return; + continue; } const { data: parsed } = parseResult; @@ -223,4 +224,37 @@ export async function queueSyncTasks( await removeSyncTaskById(id); } } + + // Note: There may still be some tasks in the database, but we expect to be + // called again some time later to process them. +} + +async function processSyncTasksBatch( + logId: string, + previousRowId: number | null +): Promise { + log.info('syncTasks: Fetching tasks'); + const result = await DataWriter.dequeueOldestSyncTasks(previousRowId); + const syncTasks = result.tasks; + + if (syncTasks.length === 0) { + log.info(`${logId}/syncTasks: No sync tasks to process, stopping`); + } else { + log.info(`${logId}/syncTasks: Queueing ${syncTasks.length} sync tasks`); + await queueSyncTasks(syncTasks, DataWriter.removeSyncTaskById); + } + + return result.lastRowId; +} + +const A_TICK = Promise.resolve(); + +export async function runAllSyncTasks(): Promise { + let lastRowId: number | null = null; + do { + // eslint-disable-next-line no-await-in-loop + lastRowId = await processSyncTasksBatch('Startup', lastRowId); + // eslint-disable-next-line no-await-in-loop + await A_TICK; + } while (lastRowId != null); }