Improve Processing of Sync Tasks

This commit is contained in:
yash-signal 2025-02-25 09:18:42 -06:00 committed by GitHub
commit 0f767c0098
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 323 additions and 24 deletions

View file

@ -470,6 +470,7 @@ export const DataWriter: ServerWritableInterface = {
removeSyncTaskById,
saveSyncTasks,
incrementAllSyncTaskAttempts,
dequeueOldestSyncTasks,
getUnprocessedByIdsAndIncrementAttempts,
@ -2171,20 +2172,38 @@ function saveSyncTask(db: WritableDB, task: SyncTaskType): void {
db.prepare(query).run(parameters);
}
export function incrementAllSyncTaskAttempts(db: WritableDB): void {
const [updateQuery, updateParams] = sql`
UPDATE syncTasks
SET attempts = attempts + 1
`;
return db.transaction(() => {
db.prepare(updateQuery).run(updateParams);
})();
}
export function dequeueOldestSyncTasks(
db: WritableDB,
previousRowId: number | null
options: {
previousRowId: number | null;
incrementAttempts?: boolean;
syncTaskTypes?: Array<SyncTaskType['type']>;
}
): { tasks: Array<SyncTaskType>; lastRowId: number | null } {
const { previousRowId, incrementAttempts = true, syncTaskTypes } = options;
return db.transaction(() => {
const orderBy = sqlFragment`ORDER BY rowid ASC`;
const limit = sqlFragment`LIMIT 10000`;
const predicate = sqlFragment`rowid > ${previousRowId ?? 0}`;
let predicate = sqlFragment`rowid > ${previousRowId ?? 0}`;
if (syncTaskTypes && syncTaskTypes.length > 0) {
predicate = sqlFragment`${predicate} AND type IN (${sqlJoin(syncTaskTypes)})`;
}
const [deleteOldQuery, deleteOldParams] = sql`
DELETE FROM syncTasks
WHERE
attempts >= ${MAX_SYNC_TASK_ATTEMPTS} AND
createdAt < ${Date.now() - durations.WEEK}
createdAt < ${Date.now() - durations.DAY * 2}
`;
const result = db.prepare(deleteOldQuery).run(deleteOldParams);
@ -2213,7 +2232,7 @@ export function dequeueOldestSyncTasks(
strictAssert(firstRowId, 'dequeueOldestSyncTasks: firstRowId is null');
strictAssert(lastRowId, 'dequeueOldestSyncTasks: lastRowId is null');
const tasks: Array<SyncTaskType> = rows.map(row => {
let tasks: Array<SyncTaskType> = rows.map(row => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { rowid: _rowid, ...rest } = row;
return {
@ -2222,14 +2241,35 @@ export function dequeueOldestSyncTasks(
};
});
const [updateQuery, updateParams] = sql`
UPDATE syncTasks
SET attempts = attempts + 1
WHERE rowid >= ${firstRowId}
AND rowid <= ${lastRowId}
`;
if (incrementAttempts) {
let updatePredicate = sqlFragment`rowid >= ${firstRowId} AND rowid <= ${lastRowId}`;
if (syncTaskTypes && syncTaskTypes.length > 0) {
updatePredicate = sqlFragment`${updatePredicate} AND type IN (${sqlJoin(syncTaskTypes)})`;
}
const [updateQuery, updateParams] = sql`
UPDATE syncTasks
SET attempts = attempts + 1
WHERE ${updatePredicate}
RETURNING id, attempts;
`;
db.prepare(updateQuery).run(updateParams);
const res = db.prepare(updateQuery).raw().all(updateParams) as Array<
[string, number]
>;
if (Array.isArray(res)) {
const idToAttempts = new Map<string, number>(res);
tasks = tasks.map(task => {
const { id } = task;
const attempts = idToAttempts.get(id) ?? task.attempts;
return { ...task, attempts };
});
} else {
logger.error(
'dequeueOldestSyncTasks: failed to get sync task attempts'
);
}
}
return { tasks, lastRowId };
})();