Fix memory usage by batching syncTasks
This commit is contained in:
parent
ab1e6f847d
commit
4027f4604f
8 changed files with 154 additions and 75 deletions
|
@ -845,7 +845,10 @@ type WritableInterface = {
|
|||
|
||||
removeSyncTaskById: (id: string) => void;
|
||||
saveSyncTasks: (tasks: Array<SyncTaskType>) => void;
|
||||
getAllSyncTasks: () => Array<SyncTaskType>;
|
||||
dequeueOldestSyncTasks: (previousRowId: number | null) => {
|
||||
tasks: Array<SyncTaskType>;
|
||||
lastRowId: number | null;
|
||||
};
|
||||
|
||||
getAllUnprocessedIds: () => Array<string>;
|
||||
getUnprocessedByIdsAndIncrementAttempts: (
|
||||
|
|
|
@ -119,7 +119,6 @@ import {
|
|||
} from '../util/search';
|
||||
import type { SyncTaskType } from '../util/syncTasks';
|
||||
import { MAX_SYNC_TASK_ATTEMPTS } from '../util/syncTasks.types';
|
||||
import { isMoreRecentThan } from '../util/timestamp';
|
||||
import type {
|
||||
AdjacentMessagesByConversationOptionsType,
|
||||
BackupCdnMediaObjectType,
|
||||
|
@ -475,7 +474,7 @@ export const DataWriter: ServerWritableInterface = {
|
|||
|
||||
removeSyncTaskById,
|
||||
saveSyncTasks,
|
||||
getAllSyncTasks,
|
||||
dequeueOldestSyncTasks,
|
||||
|
||||
getUnprocessedByIdsAndIncrementAttempts,
|
||||
getAllUnprocessedIds,
|
||||
|
@ -2158,47 +2157,68 @@ function saveSyncTask(db: WritableDB, task: SyncTaskType): void {
|
|||
|
||||
db.prepare(query).run(parameters);
|
||||
}
|
||||
export function getAllSyncTasks(db: WritableDB): Array<SyncTaskType> {
|
||||
|
||||
export function dequeueOldestSyncTasks(
|
||||
db: WritableDB,
|
||||
previousRowId: number | null
|
||||
): { tasks: Array<SyncTaskType>; lastRowId: number | null } {
|
||||
return db.transaction(() => {
|
||||
const [selectAllQuery] = sql`
|
||||
SELECT * FROM syncTasks ORDER BY createdAt ASC, sentAt ASC, id ASC
|
||||
const orderBy = sqlFragment`ORDER BY rowid ASC`;
|
||||
const limit = sqlFragment`LIMIT 10000`;
|
||||
const predicate = sqlFragment`rowid > ${previousRowId ?? 0}`;
|
||||
|
||||
const [deleteOldQuery, deleteOldParams] = sql`
|
||||
DELETE FROM syncTasks
|
||||
WHERE
|
||||
attempts >= ${MAX_SYNC_TASK_ATTEMPTS} AND
|
||||
createdAt < ${Date.now() - durations.WEEK}
|
||||
`;
|
||||
|
||||
const rows = db.prepare(selectAllQuery).all();
|
||||
const result = db.prepare(deleteOldQuery).run(deleteOldParams);
|
||||
|
||||
const tasks: Array<SyncTaskType> = rows.map(row => ({
|
||||
...row,
|
||||
data: jsonToObject(row.data),
|
||||
}));
|
||||
|
||||
const [query] = sql`
|
||||
UPDATE syncTasks
|
||||
SET attempts = attempts + 1
|
||||
`;
|
||||
db.prepare(query).run();
|
||||
|
||||
const [toDelete, toReturn] = partition(tasks, task => {
|
||||
if (
|
||||
isNormalNumber(task.attempts) &&
|
||||
task.attempts < MAX_SYNC_TASK_ATTEMPTS
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
if (isMoreRecentThan(task.createdAt, durations.WEEK)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
|
||||
if (toDelete.length > 0) {
|
||||
logger.warn(`getAllSyncTasks: Removing ${toDelete.length} expired tasks`);
|
||||
toDelete.forEach(task => {
|
||||
removeSyncTaskById(db, task.id);
|
||||
});
|
||||
if (result.changes > 0) {
|
||||
logger.info(
|
||||
`dequeueOldestSyncTasks: Deleted ${result.changes} expired sync tasks`
|
||||
);
|
||||
}
|
||||
|
||||
return toReturn;
|
||||
const [selectAllQuery, selectAllParams] = sql`
|
||||
SELECT rowid, * FROM syncTasks
|
||||
WHERE ${predicate}
|
||||
${orderBy}
|
||||
${limit}
|
||||
`;
|
||||
|
||||
const rows = db.prepare(selectAllQuery).all(selectAllParams);
|
||||
if (!rows.length) {
|
||||
return { tasks: [], lastRowId: null };
|
||||
}
|
||||
|
||||
const firstRowId = rows.at(0)?.rowid;
|
||||
const lastRowId = rows.at(-1)?.rowid;
|
||||
|
||||
strictAssert(firstRowId, 'dequeueOldestSyncTasks: firstRowId is null');
|
||||
strictAssert(lastRowId, 'dequeueOldestSyncTasks: lastRowId is null');
|
||||
|
||||
const tasks: Array<SyncTaskType> = rows.map(row => {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
const { rowid: _rowid, ...rest } = row;
|
||||
return {
|
||||
...rest,
|
||||
data: jsonToObject(row.data),
|
||||
};
|
||||
});
|
||||
|
||||
const [updateQuery, updateParams] = sql`
|
||||
UPDATE syncTasks
|
||||
SET attempts = attempts + 1
|
||||
WHERE rowid >= ${firstRowId}
|
||||
AND rowid <= ${lastRowId}
|
||||
`;
|
||||
|
||||
db.prepare(updateQuery).run(updateParams);
|
||||
|
||||
return { tasks, lastRowId };
|
||||
})();
|
||||
}
|
||||
|
||||
|
@ -7498,7 +7518,7 @@ function enableMessageInsertTriggersAndBackfill(db: WritableDB): void {
|
|||
VALUES
|
||||
(new.rowid, new.body);
|
||||
END;
|
||||
|
||||
|
||||
DROP TRIGGER IF EXISTS messages_on_insert_insert_mentions;
|
||||
CREATE TRIGGER messages_on_insert_insert_mentions AFTER INSERT ON messages
|
||||
BEGIN
|
||||
|
|
30
ts/sql/migrations/1260-sync-tasks-rowid.ts
Normal file
30
ts/sql/migrations/1260-sync-tasks-rowid.ts
Normal file
|
@ -0,0 +1,30 @@
|
|||
// Copyright 2024 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
import type { Database } from '@signalapp/better-sqlite3';
|
||||
import type { LoggerType } from '../../types/Logging';
|
||||
import { sql } from '../util';
|
||||
|
||||
export const version = 1260;
|
||||
|
||||
export function updateToSchemaVersion1260(
|
||||
currentVersion: number,
|
||||
db: Database,
|
||||
logger: LoggerType
|
||||
): void {
|
||||
if (currentVersion >= 1260) {
|
||||
return;
|
||||
}
|
||||
|
||||
db.transaction(() => {
|
||||
const [query] = sql`
|
||||
DROP INDEX IF EXISTS syncTasks_order;
|
||||
CREATE INDEX syncTasks_delete ON syncTasks (attempts DESC);
|
||||
`;
|
||||
|
||||
db.exec(query);
|
||||
|
||||
db.pragma('user_version = 1260');
|
||||
})();
|
||||
|
||||
logger.info('updateToSchemaVersion1260: success!');
|
||||
}
|
|
@ -101,10 +101,11 @@ import { updateToSchemaVersion1210 } from './1210-call-history-started-id';
|
|||
import { updateToSchemaVersion1220 } from './1220-blob-sessions';
|
||||
import { updateToSchemaVersion1230 } from './1230-call-links-admin-key-index';
|
||||
import { updateToSchemaVersion1240 } from './1240-defunct-call-links-table';
|
||||
import { updateToSchemaVersion1250 } from './1250-defunct-call-links-storage';
|
||||
import {
|
||||
updateToSchemaVersion1250,
|
||||
updateToSchemaVersion1260,
|
||||
version as MAX_VERSION,
|
||||
} from './1250-defunct-call-links-storage';
|
||||
} from './1260-sync-tasks-rowid';
|
||||
import { DataWriter } from '../Server';
|
||||
|
||||
function updateToSchemaVersion1(
|
||||
|
@ -2076,6 +2077,7 @@ export const SCHEMA_VERSIONS = [
|
|||
updateToSchemaVersion1230,
|
||||
updateToSchemaVersion1240,
|
||||
updateToSchemaVersion1250,
|
||||
updateToSchemaVersion1260,
|
||||
];
|
||||
|
||||
export class DBVersionFromFutureError extends Error {
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
import { isNumber, last } from 'lodash';
|
||||
import type { ReadableDB, WritableDB } from './Interface';
|
||||
import type { LoggerType } from '../types/Logging';
|
||||
|
||||
export type EmptyQuery = [];
|
||||
export type ArrayQuery = Array<ReadonlyArray<null | number | bigint | string>>;
|
||||
|
@ -162,17 +163,6 @@ export function sql(
|
|||
return [fragment, fragmentParams];
|
||||
}
|
||||
|
||||
type QueryPlanRow = Readonly<{
|
||||
id: number;
|
||||
parent: number;
|
||||
details: string;
|
||||
}>;
|
||||
|
||||
type QueryPlan = Readonly<{
|
||||
query: string;
|
||||
plan: ReadonlyArray<QueryPlanRow>;
|
||||
}>;
|
||||
|
||||
/**
|
||||
* Returns typed objects of the query plan for the given query.
|
||||
*
|
||||
|
@ -189,11 +179,19 @@ type QueryPlan = Readonly<{
|
|||
*/
|
||||
export function explainQueryPlan(
|
||||
db: ReadableDB,
|
||||
logger: LoggerType,
|
||||
template: QueryTemplate
|
||||
): QueryPlan {
|
||||
): QueryTemplate {
|
||||
const [query, params] = template;
|
||||
const plan = db.prepare(`EXPLAIN QUERY PLAN ${query}`).all(params);
|
||||
return { query, plan };
|
||||
logger.info('EXPLAIN QUERY PLAN');
|
||||
for (const line of query.split('\n')) {
|
||||
logger.info(line);
|
||||
}
|
||||
for (const row of plan) {
|
||||
logger.info(`id=${row.id}, parent=${row.parent}, detail=${row.detail}`);
|
||||
}
|
||||
return [query, params];
|
||||
}
|
||||
|
||||
//
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue