diff --git a/ts/background.ts b/ts/background.ts index 5c7e958d23a..91f65845ec9 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -666,7 +666,10 @@ export async function startApp(): Promise { drop(onRetryRequestQueue.add(() => onRetryRequest(event))); }) ); - messageReceiver.addEventListener('empty', queuedEventListener(onEmpty)); + messageReceiver.addEventListener( + 'empty', + queuedEventListener(() => onEmpty({ isFromMessageReceiver: true })) + ); messageReceiver.addEventListener( 'configuration', queuedEventListener(onConfiguration) @@ -1629,7 +1632,7 @@ export async function startApp(): Promise { log.info( 'background: offline; initial load not completed; triggering onEmpty' ); - drop(onEmpty()); // this ensures that the inbox loading progress bar is dismissed + drop(onEmpty({ isFromMessageReceiver: false })); // this ensures that the inbox loading progress bar is dismissed } } }; @@ -2027,7 +2030,9 @@ export async function startApp(): Promise { window.waitForEmptyEventQueue = waitForEmptyEventQueue; - async function onEmpty(): Promise { + async function onEmpty({ + isFromMessageReceiver, + }: { isFromMessageReceiver?: boolean } = {}): Promise { const { storage } = window.textsecure; await Promise.all([ @@ -2075,6 +2080,40 @@ export async function startApp(): Promise { drop(usernameIntegrity.start()); drop(ReleaseNotesFetcher.init(window.Whisper.events, newVersion)); + + if (isFromMessageReceiver) { + drop( + (async () => { + let lastRowId: number | null = 0; + while (lastRowId != null) { + const result = + // eslint-disable-next-line no-await-in-loop + await DataWriter.dequeueOldestSyncTasks({ + previousRowId: lastRowId, + incrementAttempts: false, + syncTaskTypes: [ + 'delete-conversation', + 'delete-local-conversation', + ], + }); + const syncTasks = result.tasks; + if (syncTasks.length > 0) { + log.info( + `onEmpty/syncTasks: Queueing ${syncTasks.length} sync tasks for reattempt` + ); + // eslint-disable-next-line no-await-in-loop + await queueSyncTasks(syncTasks, DataWriter.removeSyncTaskById); + // eslint-disable-next-line no-await-in-loop + await Promise.resolve(); // one tick + } + + lastRowId = result.lastRowId; + } + log.info('onEmpty/syncTasks: Incrementing all sync task attempts'); + drop(DataWriter.incrementAllSyncTaskAttempts()); + })() + ); + } } let initialStartupCount = 0; @@ -3022,7 +3061,7 @@ export async function startApp(): Promise { await window.waitForAllBatchers(); } - void onEmpty(); + void onEmpty({ isFromMessageReceiver: false }); void Registration.remove(); diff --git a/ts/sql/Interface.ts b/ts/sql/Interface.ts index 89000324137..5e9d8a63c3b 100644 --- a/ts/sql/Interface.ts +++ b/ts/sql/Interface.ts @@ -896,7 +896,13 @@ type WritableInterface = { removeSyncTaskById: (id: string) => void; saveSyncTasks: (tasks: Array) => void; - dequeueOldestSyncTasks: (previousRowId: number | null) => { + + incrementAllSyncTaskAttempts: () => void; + dequeueOldestSyncTasks: (options: { + previousRowId: number | null; + incrementAttempts?: boolean; + syncTaskTypes?: Array; + }) => { tasks: Array; lastRowId: number | null; }; diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index ecad9647e6c..bf200557f9a 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -470,6 +470,7 @@ export const DataWriter: ServerWritableInterface = { removeSyncTaskById, saveSyncTasks, + incrementAllSyncTaskAttempts, dequeueOldestSyncTasks, getUnprocessedByIdsAndIncrementAttempts, @@ -2171,20 +2172,38 @@ function saveSyncTask(db: WritableDB, task: SyncTaskType): void { db.prepare(query).run(parameters); } +export function incrementAllSyncTaskAttempts(db: WritableDB): void { + const [updateQuery, updateParams] = sql` + UPDATE syncTasks + SET attempts = attempts + 1 + `; + return db.transaction(() => { + db.prepare(updateQuery).run(updateParams); + })(); +} + export function dequeueOldestSyncTasks( db: WritableDB, - previousRowId: number | null + options: { + previousRowId: number | null; + incrementAttempts?: boolean; + syncTaskTypes?: Array; + } ): { tasks: Array; lastRowId: number | null } { + const { previousRowId, incrementAttempts = true, syncTaskTypes } = options; return db.transaction(() => { const orderBy = sqlFragment`ORDER BY rowid ASC`; const limit = sqlFragment`LIMIT 10000`; - const predicate = sqlFragment`rowid > ${previousRowId ?? 0}`; + let predicate = sqlFragment`rowid > ${previousRowId ?? 0}`; + if (syncTaskTypes && syncTaskTypes.length > 0) { + predicate = sqlFragment`${predicate} AND type IN (${sqlJoin(syncTaskTypes)})`; + } const [deleteOldQuery, deleteOldParams] = sql` DELETE FROM syncTasks WHERE attempts >= ${MAX_SYNC_TASK_ATTEMPTS} AND - createdAt < ${Date.now() - durations.WEEK} + createdAt < ${Date.now() - durations.DAY * 2} `; const result = db.prepare(deleteOldQuery).run(deleteOldParams); @@ -2213,7 +2232,7 @@ export function dequeueOldestSyncTasks( strictAssert(firstRowId, 'dequeueOldestSyncTasks: firstRowId is null'); strictAssert(lastRowId, 'dequeueOldestSyncTasks: lastRowId is null'); - const tasks: Array = rows.map(row => { + let tasks: Array = rows.map(row => { // eslint-disable-next-line @typescript-eslint/no-unused-vars const { rowid: _rowid, ...rest } = row; return { @@ -2222,14 +2241,35 @@ export function dequeueOldestSyncTasks( }; }); - const [updateQuery, updateParams] = sql` - UPDATE syncTasks - SET attempts = attempts + 1 - WHERE rowid >= ${firstRowId} - AND rowid <= ${lastRowId} - `; + if (incrementAttempts) { + let updatePredicate = sqlFragment`rowid >= ${firstRowId} AND rowid <= ${lastRowId}`; + if (syncTaskTypes && syncTaskTypes.length > 0) { + updatePredicate = sqlFragment`${updatePredicate} AND type IN (${sqlJoin(syncTaskTypes)})`; + } + const [updateQuery, updateParams] = sql` + UPDATE syncTasks + SET attempts = attempts + 1 + WHERE ${updatePredicate} + RETURNING id, attempts; + `; - db.prepare(updateQuery).run(updateParams); + const res = db.prepare(updateQuery).raw().all(updateParams) as Array< + [string, number] + >; + + if (Array.isArray(res)) { + const idToAttempts = new Map(res); + tasks = tasks.map(task => { + const { id } = task; + const attempts = idToAttempts.get(id) ?? task.attempts; + return { ...task, attempts }; + }); + } else { + logger.error( + 'dequeueOldestSyncTasks: failed to get sync task attempts' + ); + } + } return { tasks, lastRowId }; })(); diff --git a/ts/sql/migrations/1330-sync-tasks-type-index.ts b/ts/sql/migrations/1330-sync-tasks-type-index.ts new file mode 100644 index 00000000000..1a452c14365 --- /dev/null +++ b/ts/sql/migrations/1330-sync-tasks-type-index.ts @@ -0,0 +1,29 @@ +// Copyright 2025 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only +import type { Database } from '@signalapp/better-sqlite3'; +import type { LoggerType } from '../../types/Logging'; +import { sql } from '../util'; + +export const version = 1330; + +export function updateToSchemaVersion1330( + currentVersion: number, + db: Database, + logger: LoggerType +): void { + if (currentVersion >= 1330) { + return; + } + + db.transaction(() => { + const [query] = sql` + CREATE INDEX syncTasks_type ON syncTasks (type); + `; + + db.exec(query); + + db.pragma('user_version = 1330'); + })(); + + logger.info('updateToSchemaVersion1330: success!'); +} diff --git a/ts/sql/migrations/index.ts b/ts/sql/migrations/index.ts index a7db78ca817..e48273536e9 100644 --- a/ts/sql/migrations/index.ts +++ b/ts/sql/migrations/index.ts @@ -108,10 +108,12 @@ import { updateToSchemaVersion1280 } from './1280-blob-unprocessed'; import { updateToSchemaVersion1290 } from './1290-int-unprocessed-source-device'; import { updateToSchemaVersion1300 } from './1300-sticker-pack-refs'; import { updateToSchemaVersion1310 } from './1310-muted-fixup'; +import { updateToSchemaVersion1320 } from './1320-unprocessed-received-at-date'; import { - updateToSchemaVersion1320, + updateToSchemaVersion1330, version as MAX_VERSION, -} from './1320-unprocessed-received-at-date'; +} from './1330-sync-tasks-type-index'; + import { DataWriter } from '../Server'; function updateToSchemaVersion1( @@ -2091,6 +2093,7 @@ export const SCHEMA_VERSIONS = [ updateToSchemaVersion1300, updateToSchemaVersion1310, updateToSchemaVersion1320, + updateToSchemaVersion1330, ]; export class DBVersionFromFutureError extends Error { diff --git a/ts/test-node/sql/migration_1060_test.ts b/ts/test-node/sql/migration_1060_test.ts index 6b19d793486..9bf5310f715 100644 --- a/ts/test-node/sql/migration_1060_test.ts +++ b/ts/test-node/sql/migration_1060_test.ts @@ -238,12 +238,19 @@ describe('SQL/updateToSchemaVersion1060', () => { saveSyncTasks(db, expected); - const actual = dequeueOldestSyncTasks(db, null); - assert.deepEqual(expected, actual.tasks, 'before delete'); + const actual = dequeueOldestSyncTasks(db, { previousRowId: null }); + assert.deepEqual( + expected.map(t => ({ ...t, attempts: t.attempts + 1 })), + actual.tasks, + 'before delete' + ); removeSyncTaskById(db, expected[1].id); - const actualAfterDelete = dequeueOldestSyncTasks(db, null); + const actualAfterDelete = dequeueOldestSyncTasks(db, { + previousRowId: null, + incrementAttempts: false, + }); assert.deepEqual( [ { ...expected[0], attempts: 2 }, @@ -310,10 +317,17 @@ describe('SQL/updateToSchemaVersion1060', () => { saveSyncTasks(db, expected); - const actual = dequeueOldestSyncTasks(db, null); + const actual = dequeueOldestSyncTasks(db, { previousRowId: null }); assert.lengthOf(actual.tasks, 3); - assert.deepEqual([expected[1], expected[2], expected[3]], actual.tasks); + assert.deepEqual( + [ + { ...expected[1], attempts: 3 }, + { ...expected[2], attempts: 11 }, + { ...expected[3], attempts: 5 }, + ], + actual.tasks + ); }); }); }); diff --git a/ts/test-node/sql/migration_1330_test.ts b/ts/test-node/sql/migration_1330_test.ts new file mode 100644 index 00000000000..f106b591c02 --- /dev/null +++ b/ts/test-node/sql/migration_1330_test.ts @@ -0,0 +1,168 @@ +// Copyright 2025 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; +import { v4 as generateGuid } from 'uuid'; + +import { + dequeueOldestSyncTasks, + saveSyncTasks, + incrementAllSyncTaskAttempts, +} from '../../sql/Server'; +import type { WritableDB } from '../../sql/Interface'; +import { updateToVersion, createDB } from './helpers'; + +import type { SyncTaskType } from '../../util/syncTasks'; + +describe('SQL/updateToSchemaVersion1330', () => { + let db: WritableDB; + beforeEach(() => { + db = createDB(); + updateToVersion(db, 1330); + }); + + afterEach(() => { + db.close(); + }); + + describe('Sync Tasks task index', () => { + it('uses the task index for queries', () => { + const { detail } = db + .prepare( + ` + EXPLAIN QUERY PLAN + SELECT rowid, * FROM syncTasks + WHERE rowid > 0 AND type IN ('delete-converation', 'delete-local-conversation') + ORDER BY rowid ASC + LIMIT 10000 + ` + ) + .get(); + assert.include(detail, 'USING INDEX syncTasks_type'); + }); + }); + + describe('#dequeueOldestSyncTasks', () => { + it('returns and increments tasks by type', () => { + const now = Date.now(); + const expected: Array = [ + { + id: generateGuid(), + attempts: 1, + createdAt: now + 1, + data: { + jsonField: 'one', + }, + envelopeId: generateGuid(), + sentAt: 1, + type: 'Delivery', + }, + { + id: generateGuid(), + attempts: 2, + createdAt: now + 2, + data: { + jsonField: 'two', + }, + envelopeId: generateGuid(), + sentAt: 2, + type: 'delete-local-conversation', + }, + { + id: generateGuid(), + attempts: 3, + createdAt: now + 3, + data: { + jsonField: 'three', + }, + envelopeId: generateGuid(), + sentAt: 3, + type: 'delete-conversation', + }, + ]; + + saveSyncTasks(db, expected); + + const deleteTasks = dequeueOldestSyncTasks(db, { + previousRowId: null, + syncTaskTypes: ['delete-conversation', 'delete-local-conversation'], + incrementAttempts: true, + }); + assert.deepEqual( + [ + { ...expected[1], attempts: 3 }, + { ...expected[2], attempts: 4 }, + ], + deleteTasks.tasks + ); + + const allTasks = dequeueOldestSyncTasks(db, { + previousRowId: 0, + incrementAttempts: false, + }); + + assert.deepEqual(allTasks.tasks[0], expected[0]); + }); + }); + + describe('#incrementAllSyncTaskAttempts', () => { + it('increments all sync task attempts', () => { + const now = Date.now(); + const expected: Array = [ + { + id: generateGuid(), + attempts: 1, + createdAt: now + 1, + data: { + jsonField: 'one', + data: 1, + }, + envelopeId: 'envelope-id-1', + sentAt: 1, + type: 'delete-conversation', + }, + { + id: generateGuid(), + attempts: 2, + createdAt: now + 2, + data: { + jsonField: 'two', + data: 2, + }, + envelopeId: 'envelope-id-2', + sentAt: 2, + type: 'delete-conversation', + }, + { + id: generateGuid(), + attempts: 3, + createdAt: now + 3, + data: { + jsonField: 'three', + data: 3, + }, + envelopeId: 'envelope-id-3', + sentAt: 3, + type: 'delete-conversation', + }, + ]; + saveSyncTasks(db, expected); + + incrementAllSyncTaskAttempts(db); + + const tasksAfterIncrement = dequeueOldestSyncTasks(db, { + previousRowId: 0, + incrementAttempts: false, + }); + + assert.deepEqual( + [ + { ...expected[0], attempts: 2 }, + { ...expected[1], attempts: 3 }, + { ...expected[2], attempts: 4 }, + ], + tasksAfterIncrement.tasks + ); + }); + }); +}); diff --git a/ts/util/syncTasks.ts b/ts/util/syncTasks.ts index 99561029b5e..d48ff31a5c2 100644 --- a/ts/util/syncTasks.ts +++ b/ts/util/syncTasks.ts @@ -234,7 +234,7 @@ async function processSyncTasksBatch( previousRowId: number | null ): Promise { log.info('syncTasks: Fetching tasks'); - const result = await DataWriter.dequeueOldestSyncTasks(previousRowId); + const result = await DataWriter.dequeueOldestSyncTasks({ previousRowId }); const syncTasks = result.tasks; if (syncTasks.length === 0) {