diff --git a/ts/jobs/callLinkRefreshJobQueue.ts b/ts/jobs/callLinkRefreshJobQueue.ts new file mode 100644 index 000000000000..326c8fdc876a --- /dev/null +++ b/ts/jobs/callLinkRefreshJobQueue.ts @@ -0,0 +1,126 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import * as z from 'zod'; +import PQueue from 'p-queue'; +import { CallLinkRootKey } from '@signalapp/ringrtc'; +import type { LoggerType } from '../types/Logging'; +import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff'; +import type { ParsedJob } from './types'; +import type { JOB_STATUS } from './JobQueue'; +import { JobQueue } from './JobQueue'; +import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; +import { DAY, SECOND } from '../util/durations'; +import { commonShouldJobContinue } from './helpers/commonShouldJobContinue'; +import { DataReader, DataWriter } from '../sql/Client'; +import { storageServiceUploadJob } from '../services/storage'; +import type { CallLinkType } from '../types/CallLink'; +import { calling } from '../services/calling'; +import { sleeper } from '../util/sleeper'; + +const MAX_RETRY_TIME = DAY; +const MAX_PARALLEL_JOBS = 5; +const MAX_ATTEMPTS = exponentialBackoffMaxAttempts(MAX_RETRY_TIME); +const DEFAULT_SLEEP_TIME = 20 * SECOND; + +const callLinkRefreshJobDataSchema = z.object({ + roomId: z.string(), +}); + +export type CallLinkRefreshJobData = z.infer< + typeof callLinkRefreshJobDataSchema +>; + +export class CallLinkRefreshJobQueue extends JobQueue { + private parallelQueue = new PQueue({ concurrency: MAX_PARALLEL_JOBS }); + + protected override getQueues(): ReadonlySet { + return new Set([this.parallelQueue]); + } + + protected override getInMemoryQueue( + _parsedJob: ParsedJob + ): PQueue { + return this.parallelQueue; + } + + protected parseData(data: unknown): CallLinkRefreshJobData { + return callLinkRefreshJobDataSchema.parse(data); + } + + protected async run( + { + data, + timestamp, + }: Readonly<{ data: CallLinkRefreshJobData; timestamp: number }>, + { attempt, log }: Readonly<{ attempt: number; log: LoggerType }> + ): Promise { + const { roomId } = data; + const logId = `callLinkRefreshJobQueue(${roomId}).run`; + log.info(`${logId}: Starting`); + + const timeRemaining = timestamp + MAX_RETRY_TIME - Date.now(); + const shouldContinue = await commonShouldJobContinue({ + attempt, + log, + timeRemaining, + skipWait: false, + }); + if (!shouldContinue) { + return undefined; + } + + const existingCallLink = await DataReader.getCallLinkByRoomId(roomId); + if (!existingCallLink) { + log.warn(`${logId}: Call link missing locally, can't refresh`); + return undefined; + } + + let error: Error | undefined; + const callLinkRootKey = CallLinkRootKey.parse(existingCallLink.rootKey); + try { + // This will either return the fresh call link state, + // null (link deleted from server), or err (connection error) + const freshCallLinkState = await calling.readCallLink(callLinkRootKey); + if (freshCallLinkState != null) { + log.info(`${logId}: Refreshed call link`); + const callLink: CallLinkType = { + ...existingCallLink, + ...freshCallLinkState, + }; + await DataWriter.updateCallLinkState(roomId, freshCallLinkState); + window.reduxActions.calling.handleCallLinkUpdateLocal(callLink); + } else { + log.info( + `${logId}: Call link not found on server, deleting local call link` + ); + await DataWriter.beginDeleteCallLink(roomId, { + storageNeedsSync: true, + }); + storageServiceUploadJob(); + window.reduxActions.calling.handleCallLinkDelete({ roomId }); + } + } catch (err) { + error = err; + } + + // Always throttle API calls to the calling server, but if shutting down and job + // was successful then resolve and dequeue it on app shutdown, otherwise reject + await sleeper.sleep(DEFAULT_SLEEP_TIME, `${logId}: Default sleep`, { + resolveOnShutdown: error === undefined, + }); + + // Repropagate error so JobQueue handles it + if (error) { + throw error; + } + + return undefined; + } +} + +export const callLinkRefreshJobQueue = new CallLinkRefreshJobQueue({ + maxAttempts: MAX_ATTEMPTS, + queueType: 'call link refresh', + store: jobQueueDatabaseStore, +}); diff --git a/ts/jobs/helpers/commonShouldJobContinue.ts b/ts/jobs/helpers/commonShouldJobContinue.ts index 59d63e163aaf..ae6a4caa9518 100644 --- a/ts/jobs/helpers/commonShouldJobContinue.ts +++ b/ts/jobs/helpers/commonShouldJobContinue.ts @@ -44,11 +44,13 @@ export async function commonShouldJobContinue({ } const sleepTime = exponentialBackoffSleepTime(attempt); - log.info(`sleeping for ${sleepTime}`); - await sleeper.sleep( - sleepTime, - `commonShouldJobContinue: attempt ${attempt}, skipWait ${skipWait}` - ); + if (sleepTime > 0) { + log.info(`sleeping for ${sleepTime}`); + await sleeper.sleep( + sleepTime, + `commonShouldJobContinue: attempt ${attempt}, skipWait ${skipWait}` + ); + } return true; } diff --git a/ts/jobs/initializeAllJobQueues.ts b/ts/jobs/initializeAllJobQueues.ts index 45c8dc8eb1bd..e1a9396b3694 100644 --- a/ts/jobs/initializeAllJobQueues.ts +++ b/ts/jobs/initializeAllJobQueues.ts @@ -5,6 +5,7 @@ import type { WebAPIType } from '../textsecure/WebAPI'; import { drop } from '../util/drop'; import { CallLinkDeleteManager } from './CallLinkDeleteManager'; +import { callLinkRefreshJobQueue } from './callLinkRefreshJobQueue'; import { conversationJobQueue } from './conversationJobQueue'; import { groupAvatarJobQueue } from './groupAvatarJobQueue'; import { readSyncJobQueue } from './readSyncJobQueue'; @@ -41,11 +42,13 @@ export function initializeAllJobQueues({ // Other queues drop(removeStorageKeyJobQueue.streamJobs()); drop(reportSpamJobQueue.streamJobs()); + drop(callLinkRefreshJobQueue.streamJobs()); drop(CallLinkDeleteManager.start()); } export async function shutdownAllJobQueues(): Promise { await Promise.allSettled([ + callLinkRefreshJobQueue.shutdown(), conversationJobQueue.shutdown(), groupAvatarJobQueue.shutdown(), singleProtoJobQueue.shutdown(), diff --git a/ts/services/storageRecordOps.ts b/ts/services/storageRecordOps.ts index b894b32ddf27..5d56e2855fd9 100644 --- a/ts/services/storageRecordOps.ts +++ b/ts/services/storageRecordOps.ts @@ -78,6 +78,7 @@ import { toCallHistoryFromUnusedCallLink, } from '../util/callLinks'; import { isOlderThan } from '../util/timestamp'; +import { callLinkRefreshJobQueue } from '../jobs/callLinkRefreshJobQueue'; const MY_STORY_BYTES = uuidToBytes(MY_STORY_ID); @@ -2024,11 +2025,7 @@ export async function mergeCallLinkRecord( DataWriter.saveCallHistory(callHistory), ]); - // Refresh call link state via RingRTC and update in redux - window.reduxActions.calling.handleCallLinkUpdate({ - rootKey: rootKeyString, - adminKey: adminKeyString, - }); + drop(callLinkRefreshJobQueue.add({ roomId: callLink.roomId })); window.reduxActions.callHistory.addCallHistory(callHistory); }