Fix memory usage by batching syncTasks

Co-authored-by: Jamie Kyle <113370520+jamiebuilds-signal@users.noreply.github.com>
This commit is contained in:
automated-signal 2024-12-04 16:16:18 -06:00 committed by GitHub
parent d0131eb54f
commit cb53e17ae4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 154 additions and 75 deletions

View file

@ -186,7 +186,7 @@ import { AttachmentDownloadManager } from './jobs/AttachmentDownloadManager';
import { onCallLinkUpdateSync } from './util/onCallLinkUpdateSync'; import { onCallLinkUpdateSync } from './util/onCallLinkUpdateSync';
import { CallMode } from './types/CallDisposition'; import { CallMode } from './types/CallDisposition';
import type { SyncTaskType } from './util/syncTasks'; import type { SyncTaskType } from './util/syncTasks';
import { queueSyncTasks } from './util/syncTasks'; import { queueSyncTasks, runAllSyncTasks } from './util/syncTasks';
import type { ViewSyncTaskType } from './messageModifiers/ViewSyncs'; import type { ViewSyncTaskType } from './messageModifiers/ViewSyncs';
import type { ReceiptSyncTaskType } from './messageModifiers/MessageReceipts'; import type { ReceiptSyncTaskType } from './messageModifiers/MessageReceipts';
import type { ReadSyncTaskType } from './messageModifiers/ReadSyncs'; import type { ReadSyncTaskType } from './messageModifiers/ReadSyncs';
@ -1469,15 +1469,7 @@ export async function startApp(): Promise<void> {
} }
log.info('Expiration start timestamp cleanup: complete'); log.info('Expiration start timestamp cleanup: complete');
{ await runAllSyncTasks();
log.info('Startup/syncTasks: Fetching tasks');
const syncTasks = await DataWriter.getAllSyncTasks();
log.info(`Startup/syncTasks: Queueing ${syncTasks.length} sync tasks`);
await queueSyncTasks(syncTasks, DataWriter.removeSyncTaskById);
log.info('Startup/syncTasks: Done');
}
log.info('listening for registration events'); log.info('listening for registration events');
window.Whisper.events.on('registration_done', () => { window.Whisper.events.on('registration_done', () => {

View file

@ -845,7 +845,10 @@ type WritableInterface = {
removeSyncTaskById: (id: string) => void; removeSyncTaskById: (id: string) => void;
saveSyncTasks: (tasks: Array<SyncTaskType>) => void; saveSyncTasks: (tasks: Array<SyncTaskType>) => void;
getAllSyncTasks: () => Array<SyncTaskType>; dequeueOldestSyncTasks: (previousRowId: number | null) => {
tasks: Array<SyncTaskType>;
lastRowId: number | null;
};
getAllUnprocessedIds: () => Array<string>; getAllUnprocessedIds: () => Array<string>;
getUnprocessedByIdsAndIncrementAttempts: ( getUnprocessedByIdsAndIncrementAttempts: (

View file

@ -119,7 +119,6 @@ import {
} from '../util/search'; } from '../util/search';
import type { SyncTaskType } from '../util/syncTasks'; import type { SyncTaskType } from '../util/syncTasks';
import { MAX_SYNC_TASK_ATTEMPTS } from '../util/syncTasks.types'; import { MAX_SYNC_TASK_ATTEMPTS } from '../util/syncTasks.types';
import { isMoreRecentThan } from '../util/timestamp';
import type { import type {
AdjacentMessagesByConversationOptionsType, AdjacentMessagesByConversationOptionsType,
BackupCdnMediaObjectType, BackupCdnMediaObjectType,
@ -475,7 +474,7 @@ export const DataWriter: ServerWritableInterface = {
removeSyncTaskById, removeSyncTaskById,
saveSyncTasks, saveSyncTasks,
getAllSyncTasks, dequeueOldestSyncTasks,
getUnprocessedByIdsAndIncrementAttempts, getUnprocessedByIdsAndIncrementAttempts,
getAllUnprocessedIds, getAllUnprocessedIds,
@ -2158,47 +2157,68 @@ function saveSyncTask(db: WritableDB, task: SyncTaskType): void {
db.prepare(query).run(parameters); db.prepare(query).run(parameters);
} }
export function getAllSyncTasks(db: WritableDB): Array<SyncTaskType> {
export function dequeueOldestSyncTasks(
db: WritableDB,
previousRowId: number | null
): { tasks: Array<SyncTaskType>; lastRowId: number | null } {
return db.transaction(() => { return db.transaction(() => {
const [selectAllQuery] = sql` const orderBy = sqlFragment`ORDER BY rowid ASC`;
SELECT * FROM syncTasks ORDER BY createdAt ASC, sentAt ASC, id ASC const limit = sqlFragment`LIMIT 10000`;
const predicate = sqlFragment`rowid > ${previousRowId ?? 0}`;
const [deleteOldQuery, deleteOldParams] = sql`
DELETE FROM syncTasks
WHERE
attempts >= ${MAX_SYNC_TASK_ATTEMPTS} AND
createdAt < ${Date.now() - durations.WEEK}
`; `;
const rows = db.prepare(selectAllQuery).all(); const result = db.prepare(deleteOldQuery).run(deleteOldParams);
const tasks: Array<SyncTaskType> = rows.map(row => ({ if (result.changes > 0) {
...row, logger.info(
data: jsonToObject(row.data), `dequeueOldestSyncTasks: Deleted ${result.changes} expired sync tasks`
})); );
const [query] = sql`
UPDATE syncTasks
SET attempts = attempts + 1
`;
db.prepare(query).run();
const [toDelete, toReturn] = partition(tasks, task => {
if (
isNormalNumber(task.attempts) &&
task.attempts < MAX_SYNC_TASK_ATTEMPTS
) {
return false;
}
if (isMoreRecentThan(task.createdAt, durations.WEEK)) {
return false;
}
return true;
});
if (toDelete.length > 0) {
logger.warn(`getAllSyncTasks: Removing ${toDelete.length} expired tasks`);
toDelete.forEach(task => {
removeSyncTaskById(db, task.id);
});
} }
return toReturn; const [selectAllQuery, selectAllParams] = sql`
SELECT rowid, * FROM syncTasks
WHERE ${predicate}
${orderBy}
${limit}
`;
const rows = db.prepare(selectAllQuery).all(selectAllParams);
if (!rows.length) {
return { tasks: [], lastRowId: null };
}
const firstRowId = rows.at(0)?.rowid;
const lastRowId = rows.at(-1)?.rowid;
strictAssert(firstRowId, 'dequeueOldestSyncTasks: firstRowId is null');
strictAssert(lastRowId, 'dequeueOldestSyncTasks: lastRowId is null');
const tasks: Array<SyncTaskType> = rows.map(row => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { rowid: _rowid, ...rest } = row;
return {
...rest,
data: jsonToObject(row.data),
};
});
const [updateQuery, updateParams] = sql`
UPDATE syncTasks
SET attempts = attempts + 1
WHERE rowid >= ${firstRowId}
AND rowid <= ${lastRowId}
`;
db.prepare(updateQuery).run(updateParams);
return { tasks, lastRowId };
})(); })();
} }
@ -7498,7 +7518,7 @@ function enableMessageInsertTriggersAndBackfill(db: WritableDB): void {
VALUES VALUES
(new.rowid, new.body); (new.rowid, new.body);
END; END;
DROP TRIGGER IF EXISTS messages_on_insert_insert_mentions; DROP TRIGGER IF EXISTS messages_on_insert_insert_mentions;
CREATE TRIGGER messages_on_insert_insert_mentions AFTER INSERT ON messages CREATE TRIGGER messages_on_insert_insert_mentions AFTER INSERT ON messages
BEGIN BEGIN

View file

@ -0,0 +1,30 @@
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import type { Database } from '@signalapp/better-sqlite3';
import type { LoggerType } from '../../types/Logging';
import { sql } from '../util';
export const version = 1260;
export function updateToSchemaVersion1260(
currentVersion: number,
db: Database,
logger: LoggerType
): void {
if (currentVersion >= 1260) {
return;
}
db.transaction(() => {
const [query] = sql`
DROP INDEX IF EXISTS syncTasks_order;
CREATE INDEX syncTasks_delete ON syncTasks (attempts DESC);
`;
db.exec(query);
db.pragma('user_version = 1260');
})();
logger.info('updateToSchemaVersion1260: success!');
}

View file

@ -101,10 +101,11 @@ import { updateToSchemaVersion1210 } from './1210-call-history-started-id';
import { updateToSchemaVersion1220 } from './1220-blob-sessions'; import { updateToSchemaVersion1220 } from './1220-blob-sessions';
import { updateToSchemaVersion1230 } from './1230-call-links-admin-key-index'; import { updateToSchemaVersion1230 } from './1230-call-links-admin-key-index';
import { updateToSchemaVersion1240 } from './1240-defunct-call-links-table'; import { updateToSchemaVersion1240 } from './1240-defunct-call-links-table';
import { updateToSchemaVersion1250 } from './1250-defunct-call-links-storage';
import { import {
updateToSchemaVersion1250, updateToSchemaVersion1260,
version as MAX_VERSION, version as MAX_VERSION,
} from './1250-defunct-call-links-storage'; } from './1260-sync-tasks-rowid';
import { DataWriter } from '../Server'; import { DataWriter } from '../Server';
function updateToSchemaVersion1( function updateToSchemaVersion1(
@ -2076,6 +2077,7 @@ export const SCHEMA_VERSIONS = [
updateToSchemaVersion1230, updateToSchemaVersion1230,
updateToSchemaVersion1240, updateToSchemaVersion1240,
updateToSchemaVersion1250, updateToSchemaVersion1250,
updateToSchemaVersion1260,
]; ];
export class DBVersionFromFutureError extends Error { export class DBVersionFromFutureError extends Error {

View file

@ -4,6 +4,7 @@
import { isNumber, last } from 'lodash'; import { isNumber, last } from 'lodash';
import type { ReadableDB, WritableDB } from './Interface'; import type { ReadableDB, WritableDB } from './Interface';
import type { LoggerType } from '../types/Logging';
export type EmptyQuery = []; export type EmptyQuery = [];
export type ArrayQuery = Array<ReadonlyArray<null | number | bigint | string>>; export type ArrayQuery = Array<ReadonlyArray<null | number | bigint | string>>;
@ -162,17 +163,6 @@ export function sql(
return [fragment, fragmentParams]; return [fragment, fragmentParams];
} }
type QueryPlanRow = Readonly<{
id: number;
parent: number;
details: string;
}>;
type QueryPlan = Readonly<{
query: string;
plan: ReadonlyArray<QueryPlanRow>;
}>;
/** /**
* Returns typed objects of the query plan for the given query. * Returns typed objects of the query plan for the given query.
* *
@ -189,11 +179,19 @@ type QueryPlan = Readonly<{
*/ */
export function explainQueryPlan( export function explainQueryPlan(
db: ReadableDB, db: ReadableDB,
logger: LoggerType,
template: QueryTemplate template: QueryTemplate
): QueryPlan { ): QueryTemplate {
const [query, params] = template; const [query, params] = template;
const plan = db.prepare(`EXPLAIN QUERY PLAN ${query}`).all(params); const plan = db.prepare(`EXPLAIN QUERY PLAN ${query}`).all(params);
return { query, plan }; logger.info('EXPLAIN QUERY PLAN');
for (const line of query.split('\n')) {
logger.info(line);
}
for (const row of plan) {
logger.info(`id=${row.id}, parent=${row.parent}, detail=${row.detail}`);
}
return [query, params];
} }
// //

View file

@ -5,7 +5,7 @@ import { assert } from 'chai';
import { v4 as generateGuid } from 'uuid'; import { v4 as generateGuid } from 'uuid';
import { import {
getAllSyncTasks, dequeueOldestSyncTasks,
getMostRecentAddressableMessages, getMostRecentAddressableMessages,
removeSyncTaskById, removeSyncTaskById,
saveSyncTasks, saveSyncTasks,
@ -217,23 +217,23 @@ describe('SQL/updateToSchemaVersion1060', () => {
saveSyncTasks(db, expected); saveSyncTasks(db, expected);
const actual = getAllSyncTasks(db); const actual = dequeueOldestSyncTasks(db, null);
assert.deepEqual(expected, actual, 'before delete'); assert.deepEqual(expected, actual.tasks, 'before delete');
removeSyncTaskById(db, expected[1].id); removeSyncTaskById(db, expected[1].id);
const actualAfterDelete = getAllSyncTasks(db); const actualAfterDelete = dequeueOldestSyncTasks(db, null);
assert.deepEqual( assert.deepEqual(
[ [
{ ...expected[0], attempts: 2 }, { ...expected[0], attempts: 2 },
{ ...expected[2], attempts: 4 }, { ...expected[2], attempts: 4 },
], ],
actualAfterDelete, actualAfterDelete.tasks,
'after delete' 'after delete'
); );
}); });
it('getAllSyncTasksSync expired tasks', () => { it('dequeueOldestSyncTasks expired tasks', () => {
const now = Date.now(); const now = Date.now();
const twoWeeksAgo = now - WEEK * 2; const twoWeeksAgo = now - WEEK * 2;
const expected: Array<SyncTaskType> = [ const expected: Array<SyncTaskType> = [
@ -289,10 +289,10 @@ describe('SQL/updateToSchemaVersion1060', () => {
saveSyncTasks(db, expected); saveSyncTasks(db, expected);
const actual = getAllSyncTasks(db); const actual = dequeueOldestSyncTasks(db, null);
assert.lengthOf(actual, 3); assert.lengthOf(actual.tasks, 3);
assert.deepEqual([expected[1], expected[2], expected[3]], actual); assert.deepEqual([expected[1], expected[2], expected[3]], actual.tasks);
}); });
}); });
}); });

View file

@ -31,6 +31,7 @@ import {
viewSyncTaskSchema, viewSyncTaskSchema,
} from '../messageModifiers/ViewSyncs'; } from '../messageModifiers/ViewSyncs';
import { safeParseUnknown } from './schemas'; import { safeParseUnknown } from './schemas';
import { DataWriter } from '../sql/Client';
const syncTaskDataSchema = z.union([ const syncTaskDataSchema = z.union([
deleteMessageSchema, deleteMessageSchema,
@ -85,7 +86,7 @@ export async function queueSyncTasks(
log.error(`${innerLogId}: Schema not found. Deleting.`); log.error(`${innerLogId}: Schema not found. Deleting.`);
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
await removeSyncTaskById(id); await removeSyncTaskById(id);
return; continue;
} }
const parseResult = safeParseUnknown(syncTaskDataSchema, data); const parseResult = safeParseUnknown(syncTaskDataSchema, data);
if (!parseResult.success) { if (!parseResult.success) {
@ -94,7 +95,7 @@ export async function queueSyncTasks(
); );
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
await removeSyncTaskById(id); await removeSyncTaskById(id);
return; continue;
} }
const { data: parsed } = parseResult; const { data: parsed } = parseResult;
@ -223,4 +224,37 @@ export async function queueSyncTasks(
await removeSyncTaskById(id); await removeSyncTaskById(id);
} }
} }
// Note: There may still be some tasks in the database, but we expect to be
// called again some time later to process them.
}
async function processSyncTasksBatch(
logId: string,
previousRowId: number | null
): Promise<number | null> {
log.info('syncTasks: Fetching tasks');
const result = await DataWriter.dequeueOldestSyncTasks(previousRowId);
const syncTasks = result.tasks;
if (syncTasks.length === 0) {
log.info(`${logId}/syncTasks: No sync tasks to process, stopping`);
} else {
log.info(`${logId}/syncTasks: Queueing ${syncTasks.length} sync tasks`);
await queueSyncTasks(syncTasks, DataWriter.removeSyncTaskById);
}
return result.lastRowId;
}
const A_TICK = Promise.resolve();
export async function runAllSyncTasks(): Promise<void> {
let lastRowId: number | null = null;
do {
// eslint-disable-next-line no-await-in-loop
lastRowId = await processSyncTasksBatch('Startup', lastRowId);
// eslint-disable-next-line no-await-in-loop
await A_TICK;
} while (lastRowId != null);
} }