diff --git a/ts/jobs/CallLinkDeleteManager.ts b/ts/jobs/CallLinkDeleteManager.ts new file mode 100644 index 000000000000..f567581eb9e4 --- /dev/null +++ b/ts/jobs/CallLinkDeleteManager.ts @@ -0,0 +1,213 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only +import * as durations from '../util/durations'; +import * as log from '../logging/log'; +import { DataReader, DataWriter } from '../sql/Client'; +import { + JobManager, + type JobManagerParamsType, + type JobManagerJobResultType, + type JobManagerJobType, +} from './JobManager'; +import { calling } from '../services/calling'; +import { callLinkFromRecord } from '../util/callLinksRingrtc'; + +// Type for adding a new job +export type NewCallLinkDeleteJobType = { + roomId: string; + options?: { delay: number }; +}; + +export type CoreCallLinkDeleteJobType = { + roomId: string; +}; + +export type CallLinkDeleteJobType = CoreCallLinkDeleteJobType & + JobManagerJobType; + +const MAX_CONCURRENT_JOBS = 5; + +const DEFAULT_RETRY_CONFIG = { + maxAttempts: Infinity, + backoffConfig: { + // 1 min, 5 min, 25 min, (max) 1 day + multiplier: 5, + firstBackoffs: [durations.MINUTE], + maxBackoffTime: durations.DAY, + }, +}; + +type CallLinkDeleteManagerParamsType = + JobManagerParamsType; + +function getJobId(job: CoreCallLinkDeleteJobType): string { + return job.roomId; +} + +export class CallLinkDeleteManager extends JobManager { + jobs: Map = new Map(); + private static _instance: CallLinkDeleteManager | undefined; + override logPrefix = 'CallLinkDeleteManager'; + + static defaultParams: CallLinkDeleteManagerParamsType = { + markAllJobsInactive: () => Promise.resolve(), + getNextJobs, + saveJob, + removeJob, + runJob, + getJobId, + getJobIdForLogging: getJobId, + getRetryConfig: () => DEFAULT_RETRY_CONFIG, + maxConcurrentJobs: MAX_CONCURRENT_JOBS, + }; + + constructor(params: CallLinkDeleteManagerParamsType) { + super({ + ...params, + getNextJobs: ({ limit, timestamp }) => + params.getNextJobs.call(this, { limit, timestamp }), + saveJob: (job: CallLinkDeleteJobType) => params.saveJob.call(this, job), + removeJob: (job: CallLinkDeleteJobType) => + params.removeJob.call(this, job), + }); + } + + override async addJob( + jobData: CoreCallLinkDeleteJobType, + options?: { delay: number } + ): Promise { + const { delay } = options || {}; + if (delay) { + log.info( + `CallLinkDeleteJobType/addJob/${getJobId(jobData)}: Adding with delay ${delay}` + ); + const job: CallLinkDeleteJobType = { + ...jobData, + attempts: 0, + retryAfter: Date.now() + delay, + lastAttemptTimestamp: null, + active: false, + }; + await this.params.saveJob(job); + return; + } + + await this._addJob(jobData); + } + + async enqueueAllDeletedCallLinks(options?: { delay: number }): Promise { + const roomIds = await DataReader.getAllMarkedDeletedCallLinkRoomIds(); + log.info( + `CallLinkDeleteJobType/enqueueAllDeletedCallLinks: Found ${roomIds.length} call links to delete` + ); + roomIds.forEach(roomId => this.addJob({ roomId }, options)); + } + + static get instance(): CallLinkDeleteManager { + if (!CallLinkDeleteManager._instance) { + CallLinkDeleteManager._instance = new CallLinkDeleteManager( + CallLinkDeleteManager.defaultParams + ); + } + return CallLinkDeleteManager._instance; + } + + static async start(): Promise { + await CallLinkDeleteManager.instance.enqueueAllDeletedCallLinks(); + await CallLinkDeleteManager.instance.start(); + } + + static async stop(): Promise { + return CallLinkDeleteManager._instance?.stop(); + } + + static async addJob( + newJob: CoreCallLinkDeleteJobType, + options?: { delay: number } + ): Promise { + return CallLinkDeleteManager.instance.addJob(newJob, options); + } + + static async enqueueAllDeletedCallLinks(options?: { + delay: number; + }): Promise { + return CallLinkDeleteManager.instance.enqueueAllDeletedCallLinks(options); + } +} + +async function getNextJobs( + this: CallLinkDeleteManager, + { + limit, + timestamp, + }: { + limit: number; + timestamp: number; + } +): Promise> { + let countRemaining = limit; + const nextJobs: Array = []; + for (const job of this.jobs.values()) { + if (job.active || (job.retryAfter && job.retryAfter > timestamp)) { + continue; + } + + nextJobs.push(job); + countRemaining -= 1; + if (countRemaining <= 0) { + break; + } + } + return nextJobs; +} + +async function saveJob( + this: CallLinkDeleteManager, + job: CallLinkDeleteJobType +): Promise { + const { roomId } = job; + this.jobs.set(roomId, job); +} + +async function removeJob( + this: CallLinkDeleteManager, + job: CallLinkDeleteJobType +): Promise { + const logId = `CallLinkDeleteJobType/removeJob/${getJobId(job)}`; + const { roomId } = job; + await DataWriter.finalizeDeleteCallLink(job.roomId); + log.info(`${logId}: Finalized local delete`); + this.jobs.delete(roomId); +} + +async function runJob( + job: CallLinkDeleteJobType, + _isLastAttempt: boolean +): Promise> { + const logId = `CallLinkDeleteJobType/runJob/${getJobId(job)}`; + + const callLinkRecord = await DataReader.getCallLinkRecordByRoomId(job.roomId); + if (callLinkRecord == null) { + log.warn(`${logId}: Call link gone from DB`); + return { status: 'finished' }; + } + if (callLinkRecord.adminKey == null) { + log.error( + `${logId}: No admin key available, deletion on server not possible. Giving up.` + ); + return { status: 'finished' }; + } + + // For consistency between devices, wait for storage sync + if (callLinkRecord.storageNeedsSync !== 0) { + log.info(`${logId}: Call link storage needs sync; retrying later`); + return { status: 'retry' }; + } + + // Delete link on calling server. May 200 or 404 and both are OK. + // Errs if call link is active or network is unavailable. + const callLink = callLinkFromRecord(callLinkRecord); + await calling.deleteCallLink(callLink); + log.info(`${logId}: Deleted call link on server`); + return { status: 'finished' }; +} diff --git a/ts/jobs/callLinksDeleteJobQueue.ts b/ts/jobs/callLinksDeleteJobQueue.ts deleted file mode 100644 index f159c9681f01..000000000000 --- a/ts/jobs/callLinksDeleteJobQueue.ts +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright 2024 Signal Messenger, LLC -// SPDX-License-Identifier: AGPL-3.0-only -import { z } from 'zod'; -import type { LoggerType } from '../types/Logging'; -import { DataReader, DataWriter } from '../sql/Client'; -import type { JOB_STATUS } from './JobQueue'; -import { JobQueue } from './JobQueue'; -import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; -import { calling } from '../services/calling'; -import { toLogFormat } from '../types/errors'; - -const callLinksDeleteJobData = z.object({ - source: z.string(), -}); - -type CallLinksDeleteJobData = z.infer; - -export class CallLinksDeleteJobQueue extends JobQueue { - protected parseData(data: unknown): CallLinksDeleteJobData { - return callLinksDeleteJobData.parse(data); - } - - protected async run( - { data }: { data: CallLinksDeleteJobData }, - { attempt, log }: { attempt: number; log: LoggerType } - ): Promise { - const { source } = data; - const logId = `callLinksDeleteJobQueue(${source}, attempt=${attempt})`; - const deletedCallLinks = await DataReader.getAllMarkedDeletedCallLinks(); - if (deletedCallLinks.length === 0) { - log.info(`${logId}: no call links to delete`); - return undefined; - } - log.info(`${logId}: deleting ${deletedCallLinks.length} call links`); - const errors: Array = []; - await Promise.all( - deletedCallLinks.map(async deletedCallLink => { - try { - // May 200 or 404 and that's fine - // Sends a CallLinkUpdate with type set to DELETE - await calling.deleteCallLink(deletedCallLink); - await DataWriter.finalizeDeleteCallLink(deletedCallLink.roomId); - log.info(`${logId}: deleted call link ${deletedCallLink.roomId}`); - } catch (error) { - log.error( - `${logId}: failed to delete call link ${deletedCallLink.roomId}`, - toLogFormat(error) - ); - errors.push(error); - } - }) - ); - log.info( - `${logId}: Deleted ${deletedCallLinks.length} call links, failed to delete ${errors.length} call links` - ); - if (errors.length > 0) { - throw new AggregateError( - errors, - `Failed to delete ${errors.length} call links` - ); - } - return undefined; - } -} - -export const callLinksDeleteJobQueue = new CallLinksDeleteJobQueue({ - store: jobQueueDatabaseStore, - queueType: 'callLinksDelete', - maxAttempts: 5, -}); diff --git a/ts/jobs/initializeAllJobQueues.ts b/ts/jobs/initializeAllJobQueues.ts index f5da45fb8a7c..45c8dc8eb1bd 100644 --- a/ts/jobs/initializeAllJobQueues.ts +++ b/ts/jobs/initializeAllJobQueues.ts @@ -3,7 +3,7 @@ import type { WebAPIType } from '../textsecure/WebAPI'; import { drop } from '../util/drop'; -import { callLinksDeleteJobQueue } from './callLinksDeleteJobQueue'; +import { CallLinkDeleteManager } from './CallLinkDeleteManager'; import { conversationJobQueue } from './conversationJobQueue'; import { groupAvatarJobQueue } from './groupAvatarJobQueue'; @@ -41,7 +41,7 @@ export function initializeAllJobQueues({ // Other queues drop(removeStorageKeyJobQueue.streamJobs()); drop(reportSpamJobQueue.streamJobs()); - drop(callLinksDeleteJobQueue.streamJobs()); + drop(CallLinkDeleteManager.start()); } export async function shutdownAllJobQueues(): Promise { @@ -54,6 +54,6 @@ export async function shutdownAllJobQueues(): Promise { viewOnceOpenJobQueue.shutdown(), removeStorageKeyJobQueue.shutdown(), reportSpamJobQueue.shutdown(), - callLinksDeleteJobQueue.shutdown(), + CallLinkDeleteManager.stop(), ]); } diff --git a/ts/sql/Interface.ts b/ts/sql/Interface.ts index 7c16507b66c6..26e5d183ed53 100644 --- a/ts/sql/Interface.ts +++ b/ts/sql/Interface.ts @@ -586,7 +586,7 @@ type ReadableInterface = { getCallLinkByRoomId: (roomId: string) => CallLinkType | undefined; getCallLinkRecordByRoomId: (roomId: string) => CallLinkRecord | undefined; getAllCallLinkRecordsWithAdminKey(): ReadonlyArray; - getAllMarkedDeletedCallLinks(): ReadonlyArray; + getAllMarkedDeletedCallLinkRoomIds(): ReadonlyArray; getMessagesBetween: ( conversationId: string, options: GetMessagesBetweenOptions diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index 52c5050dd18b..3a35f42af899 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -181,7 +181,7 @@ import { updateCallLinkState, beginDeleteAllCallLinks, getAllCallLinkRecordsWithAdminKey, - getAllMarkedDeletedCallLinks, + getAllMarkedDeletedCallLinkRoomIds, finalizeDeleteCallLink, beginDeleteCallLink, deleteCallLinkFromSync, @@ -313,7 +313,7 @@ export const DataReader: ServerReadableInterface = { getCallLinkByRoomId, getCallLinkRecordByRoomId, getAllCallLinkRecordsWithAdminKey, - getAllMarkedDeletedCallLinks, + getAllMarkedDeletedCallLinkRoomIds, getMessagesBetween, getNearbyMessageFromDeletedSet, getMostRecentAddressableMessages, diff --git a/ts/sql/server/callLinks.ts b/ts/sql/server/callLinks.ts index 959670e9ec6a..9b00dbcae50d 100644 --- a/ts/sql/server/callLinks.ts +++ b/ts/sql/server/callLinks.ts @@ -305,16 +305,13 @@ export function getAllCallLinkRecordsWithAdminKey( .map(item => callLinkRecordSchema.parse(item)); } -export function getAllMarkedDeletedCallLinks( +export function getAllMarkedDeletedCallLinkRoomIds( db: ReadableDB -): ReadonlyArray { +): ReadonlyArray { const [query] = sql` - SELECT * FROM callLinks WHERE deleted = 1; + SELECT roomId FROM callLinks WHERE deleted = 1; `; - return db - .prepare(query) - .all() - .map(item => callLinkFromRecord(callLinkRecordSchema.parse(item))); + return db.prepare(query).pluck().all(); } // TODO: Run this after uploading storage records, maybe periodically on startup diff --git a/ts/state/ducks/calling.ts b/ts/state/ducks/calling.ts index fbeba763213a..6a6ebdc53a84 100644 --- a/ts/state/ducks/calling.ts +++ b/ts/state/ducks/calling.ts @@ -100,9 +100,9 @@ import { addCallHistory, reloadCallHistory } from './callHistory'; import { saveDraftRecordingIfNeeded } from './composer'; import type { CallHistoryDetails } from '../../types/CallDisposition'; import type { StartCallData } from '../../components/ConfirmLeaveCallModal'; -import { callLinksDeleteJobQueue } from '../../jobs/callLinksDeleteJobQueue'; import { getCallLinksByRoomId } from '../selectors/calling'; import { storageServiceUploadJob } from '../../services/storage'; +import { CallLinkDeleteManager } from '../../jobs/CallLinkDeleteManager'; // State @@ -2031,7 +2031,8 @@ function deleteCallLink( return async dispatch => { await DataWriter.beginDeleteCallLink(roomId, { storageNeedsSync: true }); storageServiceUploadJob(); - await callLinksDeleteJobQueue.add({ source: 'deleteCallLink' }); + // Wait for storage service sync before finalizing delete + drop(CallLinkDeleteManager.addJob({ roomId }, { delay: 10000 })); dispatch(handleCallLinkDelete({ roomId })); }; } diff --git a/ts/util/callDisposition.ts b/ts/util/callDisposition.ts index 9ab675223ba2..7cd2de04b6e9 100644 --- a/ts/util/callDisposition.ts +++ b/ts/util/callDisposition.ts @@ -66,8 +66,8 @@ import type { ConversationType } from '../state/ducks/conversations'; import type { ConversationModel } from '../models/conversations'; import { drop } from './drop'; import { sendCallLinkUpdateSync } from './sendCallLinkUpdateSync'; -import { callLinksDeleteJobQueue } from '../jobs/callLinksDeleteJobQueue'; import { storageServiceUploadJob } from '../services/storage'; +import { CallLinkDeleteManager } from '../jobs/CallLinkDeleteManager'; // utils // ----- @@ -1305,14 +1305,13 @@ export async function clearCallHistoryDataAndSync( const messageIds = await DataWriter.clearCallHistory(latestCall); await DataWriter.beginDeleteAllCallLinks(); storageServiceUploadJob(); + // Wait for storage sync before finalizing delete + drop(CallLinkDeleteManager.enqueueAllDeletedCallLinks({ delay: 10000 })); updateDeletedMessages(messageIds); log.info('clearCallHistory: Queueing sync message'); await singleProtoJobQueue.add( MessageSender.getClearCallHistoryMessage(latestCall) ); - await callLinksDeleteJobQueue.add({ - source: 'clearCallHistoryDataAndSync', - }); } catch (error) { log.error('clearCallHistory: Failed to clear call history', error); }