Add CallLinkDeleteManager to retry and ensure deletion
Co-authored-by: ayumi-signal <143036029+ayumi-signal@users.noreply.github.com>
This commit is contained in:
parent
8d42bd55e2
commit
6a52ad2e94
8 changed files with 229 additions and 89 deletions
213
ts/jobs/CallLinkDeleteManager.ts
Normal file
213
ts/jobs/CallLinkDeleteManager.ts
Normal file
|
@ -0,0 +1,213 @@
|
|||
// Copyright 2024 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
import * as durations from '../util/durations';
|
||||
import * as log from '../logging/log';
|
||||
import { DataReader, DataWriter } from '../sql/Client';
|
||||
import {
|
||||
JobManager,
|
||||
type JobManagerParamsType,
|
||||
type JobManagerJobResultType,
|
||||
type JobManagerJobType,
|
||||
} from './JobManager';
|
||||
import { calling } from '../services/calling';
|
||||
import { callLinkFromRecord } from '../util/callLinksRingrtc';
|
||||
|
||||
// Type for adding a new job
|
||||
export type NewCallLinkDeleteJobType = {
|
||||
roomId: string;
|
||||
options?: { delay: number };
|
||||
};
|
||||
|
||||
export type CoreCallLinkDeleteJobType = {
|
||||
roomId: string;
|
||||
};
|
||||
|
||||
export type CallLinkDeleteJobType = CoreCallLinkDeleteJobType &
|
||||
JobManagerJobType;
|
||||
|
||||
const MAX_CONCURRENT_JOBS = 5;
|
||||
|
||||
const DEFAULT_RETRY_CONFIG = {
|
||||
maxAttempts: Infinity,
|
||||
backoffConfig: {
|
||||
// 1 min, 5 min, 25 min, (max) 1 day
|
||||
multiplier: 5,
|
||||
firstBackoffs: [durations.MINUTE],
|
||||
maxBackoffTime: durations.DAY,
|
||||
},
|
||||
};
|
||||
|
||||
type CallLinkDeleteManagerParamsType =
|
||||
JobManagerParamsType<CoreCallLinkDeleteJobType>;
|
||||
|
||||
function getJobId(job: CoreCallLinkDeleteJobType): string {
|
||||
return job.roomId;
|
||||
}
|
||||
|
||||
export class CallLinkDeleteManager extends JobManager<CoreCallLinkDeleteJobType> {
|
||||
jobs: Map<string, CallLinkDeleteJobType> = new Map();
|
||||
private static _instance: CallLinkDeleteManager | undefined;
|
||||
override logPrefix = 'CallLinkDeleteManager';
|
||||
|
||||
static defaultParams: CallLinkDeleteManagerParamsType = {
|
||||
markAllJobsInactive: () => Promise.resolve(),
|
||||
getNextJobs,
|
||||
saveJob,
|
||||
removeJob,
|
||||
runJob,
|
||||
getJobId,
|
||||
getJobIdForLogging: getJobId,
|
||||
getRetryConfig: () => DEFAULT_RETRY_CONFIG,
|
||||
maxConcurrentJobs: MAX_CONCURRENT_JOBS,
|
||||
};
|
||||
|
||||
constructor(params: CallLinkDeleteManagerParamsType) {
|
||||
super({
|
||||
...params,
|
||||
getNextJobs: ({ limit, timestamp }) =>
|
||||
params.getNextJobs.call(this, { limit, timestamp }),
|
||||
saveJob: (job: CallLinkDeleteJobType) => params.saveJob.call(this, job),
|
||||
removeJob: (job: CallLinkDeleteJobType) =>
|
||||
params.removeJob.call(this, job),
|
||||
});
|
||||
}
|
||||
|
||||
override async addJob(
|
||||
jobData: CoreCallLinkDeleteJobType,
|
||||
options?: { delay: number }
|
||||
): Promise<void> {
|
||||
const { delay } = options || {};
|
||||
if (delay) {
|
||||
log.info(
|
||||
`CallLinkDeleteJobType/addJob/${getJobId(jobData)}: Adding with delay ${delay}`
|
||||
);
|
||||
const job: CallLinkDeleteJobType = {
|
||||
...jobData,
|
||||
attempts: 0,
|
||||
retryAfter: Date.now() + delay,
|
||||
lastAttemptTimestamp: null,
|
||||
active: false,
|
||||
};
|
||||
await this.params.saveJob(job);
|
||||
return;
|
||||
}
|
||||
|
||||
await this._addJob(jobData);
|
||||
}
|
||||
|
||||
async enqueueAllDeletedCallLinks(options?: { delay: number }): Promise<void> {
|
||||
const roomIds = await DataReader.getAllMarkedDeletedCallLinkRoomIds();
|
||||
log.info(
|
||||
`CallLinkDeleteJobType/enqueueAllDeletedCallLinks: Found ${roomIds.length} call links to delete`
|
||||
);
|
||||
roomIds.forEach(roomId => this.addJob({ roomId }, options));
|
||||
}
|
||||
|
||||
static get instance(): CallLinkDeleteManager {
|
||||
if (!CallLinkDeleteManager._instance) {
|
||||
CallLinkDeleteManager._instance = new CallLinkDeleteManager(
|
||||
CallLinkDeleteManager.defaultParams
|
||||
);
|
||||
}
|
||||
return CallLinkDeleteManager._instance;
|
||||
}
|
||||
|
||||
static async start(): Promise<void> {
|
||||
await CallLinkDeleteManager.instance.enqueueAllDeletedCallLinks();
|
||||
await CallLinkDeleteManager.instance.start();
|
||||
}
|
||||
|
||||
static async stop(): Promise<void> {
|
||||
return CallLinkDeleteManager._instance?.stop();
|
||||
}
|
||||
|
||||
static async addJob(
|
||||
newJob: CoreCallLinkDeleteJobType,
|
||||
options?: { delay: number }
|
||||
): Promise<void> {
|
||||
return CallLinkDeleteManager.instance.addJob(newJob, options);
|
||||
}
|
||||
|
||||
static async enqueueAllDeletedCallLinks(options?: {
|
||||
delay: number;
|
||||
}): Promise<void> {
|
||||
return CallLinkDeleteManager.instance.enqueueAllDeletedCallLinks(options);
|
||||
}
|
||||
}
|
||||
|
||||
async function getNextJobs(
|
||||
this: CallLinkDeleteManager,
|
||||
{
|
||||
limit,
|
||||
timestamp,
|
||||
}: {
|
||||
limit: number;
|
||||
timestamp: number;
|
||||
}
|
||||
): Promise<Array<CallLinkDeleteJobType>> {
|
||||
let countRemaining = limit;
|
||||
const nextJobs: Array<CallLinkDeleteJobType> = [];
|
||||
for (const job of this.jobs.values()) {
|
||||
if (job.active || (job.retryAfter && job.retryAfter > timestamp)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
nextJobs.push(job);
|
||||
countRemaining -= 1;
|
||||
if (countRemaining <= 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return nextJobs;
|
||||
}
|
||||
|
||||
async function saveJob(
|
||||
this: CallLinkDeleteManager,
|
||||
job: CallLinkDeleteJobType
|
||||
): Promise<void> {
|
||||
const { roomId } = job;
|
||||
this.jobs.set(roomId, job);
|
||||
}
|
||||
|
||||
async function removeJob(
|
||||
this: CallLinkDeleteManager,
|
||||
job: CallLinkDeleteJobType
|
||||
): Promise<void> {
|
||||
const logId = `CallLinkDeleteJobType/removeJob/${getJobId(job)}`;
|
||||
const { roomId } = job;
|
||||
await DataWriter.finalizeDeleteCallLink(job.roomId);
|
||||
log.info(`${logId}: Finalized local delete`);
|
||||
this.jobs.delete(roomId);
|
||||
}
|
||||
|
||||
async function runJob(
|
||||
job: CallLinkDeleteJobType,
|
||||
_isLastAttempt: boolean
|
||||
): Promise<JobManagerJobResultType<CoreCallLinkDeleteJobType>> {
|
||||
const logId = `CallLinkDeleteJobType/runJob/${getJobId(job)}`;
|
||||
|
||||
const callLinkRecord = await DataReader.getCallLinkRecordByRoomId(job.roomId);
|
||||
if (callLinkRecord == null) {
|
||||
log.warn(`${logId}: Call link gone from DB`);
|
||||
return { status: 'finished' };
|
||||
}
|
||||
if (callLinkRecord.adminKey == null) {
|
||||
log.error(
|
||||
`${logId}: No admin key available, deletion on server not possible. Giving up.`
|
||||
);
|
||||
return { status: 'finished' };
|
||||
}
|
||||
|
||||
// For consistency between devices, wait for storage sync
|
||||
if (callLinkRecord.storageNeedsSync !== 0) {
|
||||
log.info(`${logId}: Call link storage needs sync; retrying later`);
|
||||
return { status: 'retry' };
|
||||
}
|
||||
|
||||
// Delete link on calling server. May 200 or 404 and both are OK.
|
||||
// Errs if call link is active or network is unavailable.
|
||||
const callLink = callLinkFromRecord(callLinkRecord);
|
||||
await calling.deleteCallLink(callLink);
|
||||
log.info(`${logId}: Deleted call link on server`);
|
||||
return { status: 'finished' };
|
||||
}
|
|
@ -1,70 +0,0 @@
|
|||
// Copyright 2024 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
import { z } from 'zod';
|
||||
import type { LoggerType } from '../types/Logging';
|
||||
import { DataReader, DataWriter } from '../sql/Client';
|
||||
import type { JOB_STATUS } from './JobQueue';
|
||||
import { JobQueue } from './JobQueue';
|
||||
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
|
||||
import { calling } from '../services/calling';
|
||||
import { toLogFormat } from '../types/errors';
|
||||
|
||||
const callLinksDeleteJobData = z.object({
|
||||
source: z.string(),
|
||||
});
|
||||
|
||||
type CallLinksDeleteJobData = z.infer<typeof callLinksDeleteJobData>;
|
||||
|
||||
export class CallLinksDeleteJobQueue extends JobQueue<CallLinksDeleteJobData> {
|
||||
protected parseData(data: unknown): CallLinksDeleteJobData {
|
||||
return callLinksDeleteJobData.parse(data);
|
||||
}
|
||||
|
||||
protected async run(
|
||||
{ data }: { data: CallLinksDeleteJobData },
|
||||
{ attempt, log }: { attempt: number; log: LoggerType }
|
||||
): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
|
||||
const { source } = data;
|
||||
const logId = `callLinksDeleteJobQueue(${source}, attempt=${attempt})`;
|
||||
const deletedCallLinks = await DataReader.getAllMarkedDeletedCallLinks();
|
||||
if (deletedCallLinks.length === 0) {
|
||||
log.info(`${logId}: no call links to delete`);
|
||||
return undefined;
|
||||
}
|
||||
log.info(`${logId}: deleting ${deletedCallLinks.length} call links`);
|
||||
const errors: Array<unknown> = [];
|
||||
await Promise.all(
|
||||
deletedCallLinks.map(async deletedCallLink => {
|
||||
try {
|
||||
// May 200 or 404 and that's fine
|
||||
// Sends a CallLinkUpdate with type set to DELETE
|
||||
await calling.deleteCallLink(deletedCallLink);
|
||||
await DataWriter.finalizeDeleteCallLink(deletedCallLink.roomId);
|
||||
log.info(`${logId}: deleted call link ${deletedCallLink.roomId}`);
|
||||
} catch (error) {
|
||||
log.error(
|
||||
`${logId}: failed to delete call link ${deletedCallLink.roomId}`,
|
||||
toLogFormat(error)
|
||||
);
|
||||
errors.push(error);
|
||||
}
|
||||
})
|
||||
);
|
||||
log.info(
|
||||
`${logId}: Deleted ${deletedCallLinks.length} call links, failed to delete ${errors.length} call links`
|
||||
);
|
||||
if (errors.length > 0) {
|
||||
throw new AggregateError(
|
||||
errors,
|
||||
`Failed to delete ${errors.length} call links`
|
||||
);
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
export const callLinksDeleteJobQueue = new CallLinksDeleteJobQueue({
|
||||
store: jobQueueDatabaseStore,
|
||||
queueType: 'callLinksDelete',
|
||||
maxAttempts: 5,
|
||||
});
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
import type { WebAPIType } from '../textsecure/WebAPI';
|
||||
import { drop } from '../util/drop';
|
||||
import { callLinksDeleteJobQueue } from './callLinksDeleteJobQueue';
|
||||
import { CallLinkDeleteManager } from './CallLinkDeleteManager';
|
||||
|
||||
import { conversationJobQueue } from './conversationJobQueue';
|
||||
import { groupAvatarJobQueue } from './groupAvatarJobQueue';
|
||||
|
@ -41,7 +41,7 @@ export function initializeAllJobQueues({
|
|||
// Other queues
|
||||
drop(removeStorageKeyJobQueue.streamJobs());
|
||||
drop(reportSpamJobQueue.streamJobs());
|
||||
drop(callLinksDeleteJobQueue.streamJobs());
|
||||
drop(CallLinkDeleteManager.start());
|
||||
}
|
||||
|
||||
export async function shutdownAllJobQueues(): Promise<void> {
|
||||
|
@ -54,6 +54,6 @@ export async function shutdownAllJobQueues(): Promise<void> {
|
|||
viewOnceOpenJobQueue.shutdown(),
|
||||
removeStorageKeyJobQueue.shutdown(),
|
||||
reportSpamJobQueue.shutdown(),
|
||||
callLinksDeleteJobQueue.shutdown(),
|
||||
CallLinkDeleteManager.stop(),
|
||||
]);
|
||||
}
|
||||
|
|
|
@ -586,7 +586,7 @@ type ReadableInterface = {
|
|||
getCallLinkByRoomId: (roomId: string) => CallLinkType | undefined;
|
||||
getCallLinkRecordByRoomId: (roomId: string) => CallLinkRecord | undefined;
|
||||
getAllCallLinkRecordsWithAdminKey(): ReadonlyArray<CallLinkRecord>;
|
||||
getAllMarkedDeletedCallLinks(): ReadonlyArray<CallLinkType>;
|
||||
getAllMarkedDeletedCallLinkRoomIds(): ReadonlyArray<string>;
|
||||
getMessagesBetween: (
|
||||
conversationId: string,
|
||||
options: GetMessagesBetweenOptions
|
||||
|
|
|
@ -181,7 +181,7 @@ import {
|
|||
updateCallLinkState,
|
||||
beginDeleteAllCallLinks,
|
||||
getAllCallLinkRecordsWithAdminKey,
|
||||
getAllMarkedDeletedCallLinks,
|
||||
getAllMarkedDeletedCallLinkRoomIds,
|
||||
finalizeDeleteCallLink,
|
||||
beginDeleteCallLink,
|
||||
deleteCallLinkFromSync,
|
||||
|
@ -313,7 +313,7 @@ export const DataReader: ServerReadableInterface = {
|
|||
getCallLinkByRoomId,
|
||||
getCallLinkRecordByRoomId,
|
||||
getAllCallLinkRecordsWithAdminKey,
|
||||
getAllMarkedDeletedCallLinks,
|
||||
getAllMarkedDeletedCallLinkRoomIds,
|
||||
getMessagesBetween,
|
||||
getNearbyMessageFromDeletedSet,
|
||||
getMostRecentAddressableMessages,
|
||||
|
|
|
@ -305,16 +305,13 @@ export function getAllCallLinkRecordsWithAdminKey(
|
|||
.map(item => callLinkRecordSchema.parse(item));
|
||||
}
|
||||
|
||||
export function getAllMarkedDeletedCallLinks(
|
||||
export function getAllMarkedDeletedCallLinkRoomIds(
|
||||
db: ReadableDB
|
||||
): ReadonlyArray<CallLinkType> {
|
||||
): ReadonlyArray<string> {
|
||||
const [query] = sql`
|
||||
SELECT * FROM callLinks WHERE deleted = 1;
|
||||
SELECT roomId FROM callLinks WHERE deleted = 1;
|
||||
`;
|
||||
return db
|
||||
.prepare(query)
|
||||
.all()
|
||||
.map(item => callLinkFromRecord(callLinkRecordSchema.parse(item)));
|
||||
return db.prepare(query).pluck().all();
|
||||
}
|
||||
|
||||
// TODO: Run this after uploading storage records, maybe periodically on startup
|
||||
|
|
|
@ -100,9 +100,9 @@ import { addCallHistory, reloadCallHistory } from './callHistory';
|
|||
import { saveDraftRecordingIfNeeded } from './composer';
|
||||
import type { CallHistoryDetails } from '../../types/CallDisposition';
|
||||
import type { StartCallData } from '../../components/ConfirmLeaveCallModal';
|
||||
import { callLinksDeleteJobQueue } from '../../jobs/callLinksDeleteJobQueue';
|
||||
import { getCallLinksByRoomId } from '../selectors/calling';
|
||||
import { storageServiceUploadJob } from '../../services/storage';
|
||||
import { CallLinkDeleteManager } from '../../jobs/CallLinkDeleteManager';
|
||||
|
||||
// State
|
||||
|
||||
|
@ -2031,7 +2031,8 @@ function deleteCallLink(
|
|||
return async dispatch => {
|
||||
await DataWriter.beginDeleteCallLink(roomId, { storageNeedsSync: true });
|
||||
storageServiceUploadJob();
|
||||
await callLinksDeleteJobQueue.add({ source: 'deleteCallLink' });
|
||||
// Wait for storage service sync before finalizing delete
|
||||
drop(CallLinkDeleteManager.addJob({ roomId }, { delay: 10000 }));
|
||||
dispatch(handleCallLinkDelete({ roomId }));
|
||||
};
|
||||
}
|
||||
|
|
|
@ -66,8 +66,8 @@ import type { ConversationType } from '../state/ducks/conversations';
|
|||
import type { ConversationModel } from '../models/conversations';
|
||||
import { drop } from './drop';
|
||||
import { sendCallLinkUpdateSync } from './sendCallLinkUpdateSync';
|
||||
import { callLinksDeleteJobQueue } from '../jobs/callLinksDeleteJobQueue';
|
||||
import { storageServiceUploadJob } from '../services/storage';
|
||||
import { CallLinkDeleteManager } from '../jobs/CallLinkDeleteManager';
|
||||
|
||||
// utils
|
||||
// -----
|
||||
|
@ -1305,14 +1305,13 @@ export async function clearCallHistoryDataAndSync(
|
|||
const messageIds = await DataWriter.clearCallHistory(latestCall);
|
||||
await DataWriter.beginDeleteAllCallLinks();
|
||||
storageServiceUploadJob();
|
||||
// Wait for storage sync before finalizing delete
|
||||
drop(CallLinkDeleteManager.enqueueAllDeletedCallLinks({ delay: 10000 }));
|
||||
updateDeletedMessages(messageIds);
|
||||
log.info('clearCallHistory: Queueing sync message');
|
||||
await singleProtoJobQueue.add(
|
||||
MessageSender.getClearCallHistoryMessage(latestCall)
|
||||
);
|
||||
await callLinksDeleteJobQueue.add({
|
||||
source: 'clearCallHistoryDataAndSync',
|
||||
});
|
||||
} catch (error) {
|
||||
log.error('clearCallHistory: Failed to clear call history', error);
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue