Add CallLinkDeleteManager to retry and ensure deletion

This commit is contained in:
ayumi-signal 2024-09-16 12:22:01 -07:00 committed by GitHub
parent 8b627b3f1a
commit a40d54099c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 229 additions and 89 deletions

View file

@ -0,0 +1,213 @@
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import * as durations from '../util/durations';
import * as log from '../logging/log';
import { DataReader, DataWriter } from '../sql/Client';
import {
JobManager,
type JobManagerParamsType,
type JobManagerJobResultType,
type JobManagerJobType,
} from './JobManager';
import { calling } from '../services/calling';
import { callLinkFromRecord } from '../util/callLinksRingrtc';
// Type for adding a new job
export type NewCallLinkDeleteJobType = {
roomId: string;
options?: { delay: number };
};
export type CoreCallLinkDeleteJobType = {
roomId: string;
};
export type CallLinkDeleteJobType = CoreCallLinkDeleteJobType &
JobManagerJobType;
const MAX_CONCURRENT_JOBS = 5;
const DEFAULT_RETRY_CONFIG = {
maxAttempts: Infinity,
backoffConfig: {
// 1 min, 5 min, 25 min, (max) 1 day
multiplier: 5,
firstBackoffs: [durations.MINUTE],
maxBackoffTime: durations.DAY,
},
};
type CallLinkDeleteManagerParamsType =
JobManagerParamsType<CoreCallLinkDeleteJobType>;
function getJobId(job: CoreCallLinkDeleteJobType): string {
return job.roomId;
}
export class CallLinkDeleteManager extends JobManager<CoreCallLinkDeleteJobType> {
jobs: Map<string, CallLinkDeleteJobType> = new Map();
private static _instance: CallLinkDeleteManager | undefined;
override logPrefix = 'CallLinkDeleteManager';
static defaultParams: CallLinkDeleteManagerParamsType = {
markAllJobsInactive: () => Promise.resolve(),
getNextJobs,
saveJob,
removeJob,
runJob,
getJobId,
getJobIdForLogging: getJobId,
getRetryConfig: () => DEFAULT_RETRY_CONFIG,
maxConcurrentJobs: MAX_CONCURRENT_JOBS,
};
constructor(params: CallLinkDeleteManagerParamsType) {
super({
...params,
getNextJobs: ({ limit, timestamp }) =>
params.getNextJobs.call(this, { limit, timestamp }),
saveJob: (job: CallLinkDeleteJobType) => params.saveJob.call(this, job),
removeJob: (job: CallLinkDeleteJobType) =>
params.removeJob.call(this, job),
});
}
override async addJob(
jobData: CoreCallLinkDeleteJobType,
options?: { delay: number }
): Promise<void> {
const { delay } = options || {};
if (delay) {
log.info(
`CallLinkDeleteJobType/addJob/${getJobId(jobData)}: Adding with delay ${delay}`
);
const job: CallLinkDeleteJobType = {
...jobData,
attempts: 0,
retryAfter: Date.now() + delay,
lastAttemptTimestamp: null,
active: false,
};
await this.params.saveJob(job);
return;
}
await this._addJob(jobData);
}
async enqueueAllDeletedCallLinks(options?: { delay: number }): Promise<void> {
const roomIds = await DataReader.getAllMarkedDeletedCallLinkRoomIds();
log.info(
`CallLinkDeleteJobType/enqueueAllDeletedCallLinks: Found ${roomIds.length} call links to delete`
);
roomIds.forEach(roomId => this.addJob({ roomId }, options));
}
static get instance(): CallLinkDeleteManager {
if (!CallLinkDeleteManager._instance) {
CallLinkDeleteManager._instance = new CallLinkDeleteManager(
CallLinkDeleteManager.defaultParams
);
}
return CallLinkDeleteManager._instance;
}
static async start(): Promise<void> {
await CallLinkDeleteManager.instance.enqueueAllDeletedCallLinks();
await CallLinkDeleteManager.instance.start();
}
static async stop(): Promise<void> {
return CallLinkDeleteManager._instance?.stop();
}
static async addJob(
newJob: CoreCallLinkDeleteJobType,
options?: { delay: number }
): Promise<void> {
return CallLinkDeleteManager.instance.addJob(newJob, options);
}
static async enqueueAllDeletedCallLinks(options?: {
delay: number;
}): Promise<void> {
return CallLinkDeleteManager.instance.enqueueAllDeletedCallLinks(options);
}
}
async function getNextJobs(
this: CallLinkDeleteManager,
{
limit,
timestamp,
}: {
limit: number;
timestamp: number;
}
): Promise<Array<CallLinkDeleteJobType>> {
let countRemaining = limit;
const nextJobs: Array<CallLinkDeleteJobType> = [];
for (const job of this.jobs.values()) {
if (job.active || (job.retryAfter && job.retryAfter > timestamp)) {
continue;
}
nextJobs.push(job);
countRemaining -= 1;
if (countRemaining <= 0) {
break;
}
}
return nextJobs;
}
async function saveJob(
this: CallLinkDeleteManager,
job: CallLinkDeleteJobType
): Promise<void> {
const { roomId } = job;
this.jobs.set(roomId, job);
}
async function removeJob(
this: CallLinkDeleteManager,
job: CallLinkDeleteJobType
): Promise<void> {
const logId = `CallLinkDeleteJobType/removeJob/${getJobId(job)}`;
const { roomId } = job;
await DataWriter.finalizeDeleteCallLink(job.roomId);
log.info(`${logId}: Finalized local delete`);
this.jobs.delete(roomId);
}
async function runJob(
job: CallLinkDeleteJobType,
_isLastAttempt: boolean
): Promise<JobManagerJobResultType<CoreCallLinkDeleteJobType>> {
const logId = `CallLinkDeleteJobType/runJob/${getJobId(job)}`;
const callLinkRecord = await DataReader.getCallLinkRecordByRoomId(job.roomId);
if (callLinkRecord == null) {
log.warn(`${logId}: Call link gone from DB`);
return { status: 'finished' };
}
if (callLinkRecord.adminKey == null) {
log.error(
`${logId}: No admin key available, deletion on server not possible. Giving up.`
);
return { status: 'finished' };
}
// For consistency between devices, wait for storage sync
if (callLinkRecord.storageNeedsSync !== 0) {
log.info(`${logId}: Call link storage needs sync; retrying later`);
return { status: 'retry' };
}
// Delete link on calling server. May 200 or 404 and both are OK.
// Errs if call link is active or network is unavailable.
const callLink = callLinkFromRecord(callLinkRecord);
await calling.deleteCallLink(callLink);
log.info(`${logId}: Deleted call link on server`);
return { status: 'finished' };
}

View file

@ -1,70 +0,0 @@
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { z } from 'zod';
import type { LoggerType } from '../types/Logging';
import { DataReader, DataWriter } from '../sql/Client';
import type { JOB_STATUS } from './JobQueue';
import { JobQueue } from './JobQueue';
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
import { calling } from '../services/calling';
import { toLogFormat } from '../types/errors';
const callLinksDeleteJobData = z.object({
source: z.string(),
});
type CallLinksDeleteJobData = z.infer<typeof callLinksDeleteJobData>;
export class CallLinksDeleteJobQueue extends JobQueue<CallLinksDeleteJobData> {
protected parseData(data: unknown): CallLinksDeleteJobData {
return callLinksDeleteJobData.parse(data);
}
protected async run(
{ data }: { data: CallLinksDeleteJobData },
{ attempt, log }: { attempt: number; log: LoggerType }
): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
const { source } = data;
const logId = `callLinksDeleteJobQueue(${source}, attempt=${attempt})`;
const deletedCallLinks = await DataReader.getAllMarkedDeletedCallLinks();
if (deletedCallLinks.length === 0) {
log.info(`${logId}: no call links to delete`);
return undefined;
}
log.info(`${logId}: deleting ${deletedCallLinks.length} call links`);
const errors: Array<unknown> = [];
await Promise.all(
deletedCallLinks.map(async deletedCallLink => {
try {
// May 200 or 404 and that's fine
// Sends a CallLinkUpdate with type set to DELETE
await calling.deleteCallLink(deletedCallLink);
await DataWriter.finalizeDeleteCallLink(deletedCallLink.roomId);
log.info(`${logId}: deleted call link ${deletedCallLink.roomId}`);
} catch (error) {
log.error(
`${logId}: failed to delete call link ${deletedCallLink.roomId}`,
toLogFormat(error)
);
errors.push(error);
}
})
);
log.info(
`${logId}: Deleted ${deletedCallLinks.length} call links, failed to delete ${errors.length} call links`
);
if (errors.length > 0) {
throw new AggregateError(
errors,
`Failed to delete ${errors.length} call links`
);
}
return undefined;
}
}
export const callLinksDeleteJobQueue = new CallLinksDeleteJobQueue({
store: jobQueueDatabaseStore,
queueType: 'callLinksDelete',
maxAttempts: 5,
});

View file

@ -3,7 +3,7 @@
import type { WebAPIType } from '../textsecure/WebAPI'; import type { WebAPIType } from '../textsecure/WebAPI';
import { drop } from '../util/drop'; import { drop } from '../util/drop';
import { callLinksDeleteJobQueue } from './callLinksDeleteJobQueue'; import { CallLinkDeleteManager } from './CallLinkDeleteManager';
import { conversationJobQueue } from './conversationJobQueue'; import { conversationJobQueue } from './conversationJobQueue';
import { groupAvatarJobQueue } from './groupAvatarJobQueue'; import { groupAvatarJobQueue } from './groupAvatarJobQueue';
@ -41,7 +41,7 @@ export function initializeAllJobQueues({
// Other queues // Other queues
drop(removeStorageKeyJobQueue.streamJobs()); drop(removeStorageKeyJobQueue.streamJobs());
drop(reportSpamJobQueue.streamJobs()); drop(reportSpamJobQueue.streamJobs());
drop(callLinksDeleteJobQueue.streamJobs()); drop(CallLinkDeleteManager.start());
} }
export async function shutdownAllJobQueues(): Promise<void> { export async function shutdownAllJobQueues(): Promise<void> {
@ -54,6 +54,6 @@ export async function shutdownAllJobQueues(): Promise<void> {
viewOnceOpenJobQueue.shutdown(), viewOnceOpenJobQueue.shutdown(),
removeStorageKeyJobQueue.shutdown(), removeStorageKeyJobQueue.shutdown(),
reportSpamJobQueue.shutdown(), reportSpamJobQueue.shutdown(),
callLinksDeleteJobQueue.shutdown(), CallLinkDeleteManager.stop(),
]); ]);
} }

View file

@ -586,7 +586,7 @@ type ReadableInterface = {
getCallLinkByRoomId: (roomId: string) => CallLinkType | undefined; getCallLinkByRoomId: (roomId: string) => CallLinkType | undefined;
getCallLinkRecordByRoomId: (roomId: string) => CallLinkRecord | undefined; getCallLinkRecordByRoomId: (roomId: string) => CallLinkRecord | undefined;
getAllCallLinkRecordsWithAdminKey(): ReadonlyArray<CallLinkRecord>; getAllCallLinkRecordsWithAdminKey(): ReadonlyArray<CallLinkRecord>;
getAllMarkedDeletedCallLinks(): ReadonlyArray<CallLinkType>; getAllMarkedDeletedCallLinkRoomIds(): ReadonlyArray<string>;
getMessagesBetween: ( getMessagesBetween: (
conversationId: string, conversationId: string,
options: GetMessagesBetweenOptions options: GetMessagesBetweenOptions

View file

@ -181,7 +181,7 @@ import {
updateCallLinkState, updateCallLinkState,
beginDeleteAllCallLinks, beginDeleteAllCallLinks,
getAllCallLinkRecordsWithAdminKey, getAllCallLinkRecordsWithAdminKey,
getAllMarkedDeletedCallLinks, getAllMarkedDeletedCallLinkRoomIds,
finalizeDeleteCallLink, finalizeDeleteCallLink,
beginDeleteCallLink, beginDeleteCallLink,
deleteCallLinkFromSync, deleteCallLinkFromSync,
@ -313,7 +313,7 @@ export const DataReader: ServerReadableInterface = {
getCallLinkByRoomId, getCallLinkByRoomId,
getCallLinkRecordByRoomId, getCallLinkRecordByRoomId,
getAllCallLinkRecordsWithAdminKey, getAllCallLinkRecordsWithAdminKey,
getAllMarkedDeletedCallLinks, getAllMarkedDeletedCallLinkRoomIds,
getMessagesBetween, getMessagesBetween,
getNearbyMessageFromDeletedSet, getNearbyMessageFromDeletedSet,
getMostRecentAddressableMessages, getMostRecentAddressableMessages,

View file

@ -305,16 +305,13 @@ export function getAllCallLinkRecordsWithAdminKey(
.map(item => callLinkRecordSchema.parse(item)); .map(item => callLinkRecordSchema.parse(item));
} }
export function getAllMarkedDeletedCallLinks( export function getAllMarkedDeletedCallLinkRoomIds(
db: ReadableDB db: ReadableDB
): ReadonlyArray<CallLinkType> { ): ReadonlyArray<string> {
const [query] = sql` const [query] = sql`
SELECT * FROM callLinks WHERE deleted = 1; SELECT roomId FROM callLinks WHERE deleted = 1;
`; `;
return db return db.prepare(query).pluck().all();
.prepare(query)
.all()
.map(item => callLinkFromRecord(callLinkRecordSchema.parse(item)));
} }
// TODO: Run this after uploading storage records, maybe periodically on startup // TODO: Run this after uploading storage records, maybe periodically on startup

View file

@ -100,9 +100,9 @@ import { addCallHistory, reloadCallHistory } from './callHistory';
import { saveDraftRecordingIfNeeded } from './composer'; import { saveDraftRecordingIfNeeded } from './composer';
import type { CallHistoryDetails } from '../../types/CallDisposition'; import type { CallHistoryDetails } from '../../types/CallDisposition';
import type { StartCallData } from '../../components/ConfirmLeaveCallModal'; import type { StartCallData } from '../../components/ConfirmLeaveCallModal';
import { callLinksDeleteJobQueue } from '../../jobs/callLinksDeleteJobQueue';
import { getCallLinksByRoomId } from '../selectors/calling'; import { getCallLinksByRoomId } from '../selectors/calling';
import { storageServiceUploadJob } from '../../services/storage'; import { storageServiceUploadJob } from '../../services/storage';
import { CallLinkDeleteManager } from '../../jobs/CallLinkDeleteManager';
// State // State
@ -2031,7 +2031,8 @@ function deleteCallLink(
return async dispatch => { return async dispatch => {
await DataWriter.beginDeleteCallLink(roomId, { storageNeedsSync: true }); await DataWriter.beginDeleteCallLink(roomId, { storageNeedsSync: true });
storageServiceUploadJob(); storageServiceUploadJob();
await callLinksDeleteJobQueue.add({ source: 'deleteCallLink' }); // Wait for storage service sync before finalizing delete
drop(CallLinkDeleteManager.addJob({ roomId }, { delay: 10000 }));
dispatch(handleCallLinkDelete({ roomId })); dispatch(handleCallLinkDelete({ roomId }));
}; };
} }

View file

@ -66,8 +66,8 @@ import type { ConversationType } from '../state/ducks/conversations';
import type { ConversationModel } from '../models/conversations'; import type { ConversationModel } from '../models/conversations';
import { drop } from './drop'; import { drop } from './drop';
import { sendCallLinkUpdateSync } from './sendCallLinkUpdateSync'; import { sendCallLinkUpdateSync } from './sendCallLinkUpdateSync';
import { callLinksDeleteJobQueue } from '../jobs/callLinksDeleteJobQueue';
import { storageServiceUploadJob } from '../services/storage'; import { storageServiceUploadJob } from '../services/storage';
import { CallLinkDeleteManager } from '../jobs/CallLinkDeleteManager';
// utils // utils
// ----- // -----
@ -1305,14 +1305,13 @@ export async function clearCallHistoryDataAndSync(
const messageIds = await DataWriter.clearCallHistory(latestCall); const messageIds = await DataWriter.clearCallHistory(latestCall);
await DataWriter.beginDeleteAllCallLinks(); await DataWriter.beginDeleteAllCallLinks();
storageServiceUploadJob(); storageServiceUploadJob();
// Wait for storage sync before finalizing delete
drop(CallLinkDeleteManager.enqueueAllDeletedCallLinks({ delay: 10000 }));
updateDeletedMessages(messageIds); updateDeletedMessages(messageIds);
log.info('clearCallHistory: Queueing sync message'); log.info('clearCallHistory: Queueing sync message');
await singleProtoJobQueue.add( await singleProtoJobQueue.add(
MessageSender.getClearCallHistoryMessage(latestCall) MessageSender.getClearCallHistoryMessage(latestCall)
); );
await callLinksDeleteJobQueue.add({
source: 'clearCallHistoryDataAndSync',
});
} catch (error) { } catch (error) {
log.error('clearCallHistory: Failed to clear call history', error); log.error('clearCallHistory: Failed to clear call history', error);
} }