Use job queue when restoring call links from storage
This commit is contained in:
parent
278866f3b7
commit
cbfbe8302c
4 changed files with 138 additions and 10 deletions
126
ts/jobs/callLinkRefreshJobQueue.ts
Normal file
126
ts/jobs/callLinkRefreshJobQueue.ts
Normal file
|
@ -0,0 +1,126 @@
|
|||
// Copyright 2024 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import * as z from 'zod';
|
||||
import PQueue from 'p-queue';
|
||||
import { CallLinkRootKey } from '@signalapp/ringrtc';
|
||||
import type { LoggerType } from '../types/Logging';
|
||||
import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff';
|
||||
import type { ParsedJob } from './types';
|
||||
import type { JOB_STATUS } from './JobQueue';
|
||||
import { JobQueue } from './JobQueue';
|
||||
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
|
||||
import { DAY, SECOND } from '../util/durations';
|
||||
import { commonShouldJobContinue } from './helpers/commonShouldJobContinue';
|
||||
import { DataReader, DataWriter } from '../sql/Client';
|
||||
import { storageServiceUploadJob } from '../services/storage';
|
||||
import type { CallLinkType } from '../types/CallLink';
|
||||
import { calling } from '../services/calling';
|
||||
import { sleeper } from '../util/sleeper';
|
||||
|
||||
const MAX_RETRY_TIME = DAY;
|
||||
const MAX_PARALLEL_JOBS = 5;
|
||||
const MAX_ATTEMPTS = exponentialBackoffMaxAttempts(MAX_RETRY_TIME);
|
||||
const DEFAULT_SLEEP_TIME = 20 * SECOND;
|
||||
|
||||
const callLinkRefreshJobDataSchema = z.object({
|
||||
roomId: z.string(),
|
||||
});
|
||||
|
||||
export type CallLinkRefreshJobData = z.infer<
|
||||
typeof callLinkRefreshJobDataSchema
|
||||
>;
|
||||
|
||||
export class CallLinkRefreshJobQueue extends JobQueue<CallLinkRefreshJobData> {
|
||||
private parallelQueue = new PQueue({ concurrency: MAX_PARALLEL_JOBS });
|
||||
|
||||
protected override getQueues(): ReadonlySet<PQueue> {
|
||||
return new Set([this.parallelQueue]);
|
||||
}
|
||||
|
||||
protected override getInMemoryQueue(
|
||||
_parsedJob: ParsedJob<CallLinkRefreshJobData>
|
||||
): PQueue {
|
||||
return this.parallelQueue;
|
||||
}
|
||||
|
||||
protected parseData(data: unknown): CallLinkRefreshJobData {
|
||||
return callLinkRefreshJobDataSchema.parse(data);
|
||||
}
|
||||
|
||||
protected async run(
|
||||
{
|
||||
data,
|
||||
timestamp,
|
||||
}: Readonly<{ data: CallLinkRefreshJobData; timestamp: number }>,
|
||||
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
|
||||
): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
|
||||
const { roomId } = data;
|
||||
const logId = `callLinkRefreshJobQueue(${roomId}).run`;
|
||||
log.info(`${logId}: Starting`);
|
||||
|
||||
const timeRemaining = timestamp + MAX_RETRY_TIME - Date.now();
|
||||
const shouldContinue = await commonShouldJobContinue({
|
||||
attempt,
|
||||
log,
|
||||
timeRemaining,
|
||||
skipWait: false,
|
||||
});
|
||||
if (!shouldContinue) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const existingCallLink = await DataReader.getCallLinkByRoomId(roomId);
|
||||
if (!existingCallLink) {
|
||||
log.warn(`${logId}: Call link missing locally, can't refresh`);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
let error: Error | undefined;
|
||||
const callLinkRootKey = CallLinkRootKey.parse(existingCallLink.rootKey);
|
||||
try {
|
||||
// This will either return the fresh call link state,
|
||||
// null (link deleted from server), or err (connection error)
|
||||
const freshCallLinkState = await calling.readCallLink(callLinkRootKey);
|
||||
if (freshCallLinkState != null) {
|
||||
log.info(`${logId}: Refreshed call link`);
|
||||
const callLink: CallLinkType = {
|
||||
...existingCallLink,
|
||||
...freshCallLinkState,
|
||||
};
|
||||
await DataWriter.updateCallLinkState(roomId, freshCallLinkState);
|
||||
window.reduxActions.calling.handleCallLinkUpdateLocal(callLink);
|
||||
} else {
|
||||
log.info(
|
||||
`${logId}: Call link not found on server, deleting local call link`
|
||||
);
|
||||
await DataWriter.beginDeleteCallLink(roomId, {
|
||||
storageNeedsSync: true,
|
||||
});
|
||||
storageServiceUploadJob();
|
||||
window.reduxActions.calling.handleCallLinkDelete({ roomId });
|
||||
}
|
||||
} catch (err) {
|
||||
error = err;
|
||||
}
|
||||
|
||||
// Always throttle API calls to the calling server, but if shutting down and job
|
||||
// was successful then resolve and dequeue it on app shutdown, otherwise reject
|
||||
await sleeper.sleep(DEFAULT_SLEEP_TIME, `${logId}: Default sleep`, {
|
||||
resolveOnShutdown: error === undefined,
|
||||
});
|
||||
|
||||
// Repropagate error so JobQueue handles it
|
||||
if (error) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
export const callLinkRefreshJobQueue = new CallLinkRefreshJobQueue({
|
||||
maxAttempts: MAX_ATTEMPTS,
|
||||
queueType: 'call link refresh',
|
||||
store: jobQueueDatabaseStore,
|
||||
});
|
|
@ -44,11 +44,13 @@ export async function commonShouldJobContinue({
|
|||
}
|
||||
|
||||
const sleepTime = exponentialBackoffSleepTime(attempt);
|
||||
log.info(`sleeping for ${sleepTime}`);
|
||||
await sleeper.sleep(
|
||||
sleepTime,
|
||||
`commonShouldJobContinue: attempt ${attempt}, skipWait ${skipWait}`
|
||||
);
|
||||
if (sleepTime > 0) {
|
||||
log.info(`sleeping for ${sleepTime}`);
|
||||
await sleeper.sleep(
|
||||
sleepTime,
|
||||
`commonShouldJobContinue: attempt ${attempt}, skipWait ${skipWait}`
|
||||
);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import type { WebAPIType } from '../textsecure/WebAPI';
|
|||
import { drop } from '../util/drop';
|
||||
import { CallLinkDeleteManager } from './CallLinkDeleteManager';
|
||||
|
||||
import { callLinkRefreshJobQueue } from './callLinkRefreshJobQueue';
|
||||
import { conversationJobQueue } from './conversationJobQueue';
|
||||
import { groupAvatarJobQueue } from './groupAvatarJobQueue';
|
||||
import { readSyncJobQueue } from './readSyncJobQueue';
|
||||
|
@ -41,11 +42,13 @@ export function initializeAllJobQueues({
|
|||
// Other queues
|
||||
drop(removeStorageKeyJobQueue.streamJobs());
|
||||
drop(reportSpamJobQueue.streamJobs());
|
||||
drop(callLinkRefreshJobQueue.streamJobs());
|
||||
drop(CallLinkDeleteManager.start());
|
||||
}
|
||||
|
||||
export async function shutdownAllJobQueues(): Promise<void> {
|
||||
await Promise.allSettled([
|
||||
callLinkRefreshJobQueue.shutdown(),
|
||||
conversationJobQueue.shutdown(),
|
||||
groupAvatarJobQueue.shutdown(),
|
||||
singleProtoJobQueue.shutdown(),
|
||||
|
|
|
@ -78,6 +78,7 @@ import {
|
|||
toCallHistoryFromUnusedCallLink,
|
||||
} from '../util/callLinks';
|
||||
import { isOlderThan } from '../util/timestamp';
|
||||
import { callLinkRefreshJobQueue } from '../jobs/callLinkRefreshJobQueue';
|
||||
|
||||
const MY_STORY_BYTES = uuidToBytes(MY_STORY_ID);
|
||||
|
||||
|
@ -2024,11 +2025,7 @@ export async function mergeCallLinkRecord(
|
|||
DataWriter.saveCallHistory(callHistory),
|
||||
]);
|
||||
|
||||
// Refresh call link state via RingRTC and update in redux
|
||||
window.reduxActions.calling.handleCallLinkUpdate({
|
||||
rootKey: rootKeyString,
|
||||
adminKey: adminKeyString,
|
||||
});
|
||||
drop(callLinkRefreshJobQueue.add({ roomId: callLink.roomId }));
|
||||
window.reduxActions.callHistory.addCallHistory(callHistory);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue