// Copyright 2024 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only import * as z from 'zod'; import { MINUTE } from '../util/durations'; import { explodePromise, type ExplodePromiseResultType, } from '../util/explodePromise'; import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary'; import { drop } from '../util/drop'; import * as log from '../logging/log'; import { missingCaseError } from '../util/missingCaseError'; import { type ExponentialBackoffOptionsType, exponentialBackoffSleepTime, } from '../util/exponentialBackoff'; import * as Errors from '../types/errors'; export type JobManagerJobType = { active: boolean; attempts: number; retryAfter: number | null; lastAttemptTimestamp: number | null; }; export const jobManagerJobSchema = z.object({ attempts: z.number(), active: z.boolean(), retryAfter: z.number().nullable(), lastAttemptTimestamp: z.number().nullable(), }) satisfies z.ZodType; export type JobManagerParamsType< CoreJobType, JobType = CoreJobType & JobManagerJobType, > = { markAllJobsInactive: () => Promise; getNextJobs: (options: { limit: number; timestamp: number; }) => Promise>; saveJob: ( job: JobType, options?: { allowBatching?: boolean } ) => Promise; removeJob: (job: JobType) => Promise; runJob: ( job: JobType, isLastAttempt: boolean ) => Promise>; shouldHoldOffOnStartingQueuedJobs?: () => boolean; getJobId: (job: CoreJobType) => string; getJobIdForLogging: (job: JobType) => string; getRetryConfig: (job: JobType) => { maxAttempts: number; backoffConfig: ExponentialBackoffOptionsType; }; maxConcurrentJobs: number; }; const DEFAULT_TICK_INTERVAL = MINUTE; export type JobManagerJobResultType = | { status: 'retry'; } | { status: 'finished'; newJob?: CoreJobType } | { status: 'rate-limited'; pauseDurationMs: number }; export abstract class JobManager { private enabled: boolean = false; private activeJobs: Map< string, { completionPromise: ExplodePromiseResultType; job: CoreJobType & JobManagerJobType; } > = new Map(); private jobStartPromises: Map> = new Map(); private jobCompletePromises: Map> = new Map(); private tickTimeout: NodeJS.Timeout | null = null; private idleCallbacks = new Array<() => void>(); protected logPrefix = 'JobManager'; public tickInterval = DEFAULT_TICK_INTERVAL; constructor(readonly params: JobManagerParamsType) {} async start(): Promise { log.info(`${this.logPrefix}: starting`); if (!this.enabled) { this.enabled = true; await this.params.markAllJobsInactive(); } await this.maybeStartJobs(); this.tick(); } async stop(): Promise { const activeJobs = [...this.activeJobs.values()]; log.info( `${this.logPrefix}: stopping. There are ` + `${activeJobs.length} active job(s)` ); this.enabled = false; clearTimeoutIfNecessary(this.tickTimeout); this.tickTimeout = null; await Promise.all( activeJobs.map(({ completionPromise }) => completionPromise.promise) ); } async waitForIdle(): Promise { if (this.activeJobs.size === 0) { return; } await new Promise(resolve => this.idleCallbacks.push(resolve)); } private tick(): void { clearTimeoutIfNecessary(this.tickTimeout); this.tickTimeout = null; drop(this.maybeStartJobs()); this.tickTimeout = setTimeout(() => this.tick(), this.tickInterval); } private pauseForDuration(durationMs: number): void { this.enabled = false; clearTimeoutIfNecessary(this.tickTimeout); this.tickTimeout = setTimeout(() => { this.enabled = true; this.tick(); }, durationMs); } // used in testing waitForJobToBeStarted( job: CoreJobType & Pick ): Promise { const id = this.getJobIdIncludingAttempts(job); const existingPromise = this.jobStartPromises.get(id)?.promise; if (existingPromise) { return existingPromise; } const { promise, resolve, reject } = explodePromise(); this.jobStartPromises.set(id, { promise, resolve, reject }); return promise; } waitForJobToBeCompleted( job: CoreJobType & Pick ): Promise { const id = this.getJobIdIncludingAttempts(job); const existingPromise = this.jobCompletePromises.get(id)?.promise; if (existingPromise) { return existingPromise; } const { promise, resolve, reject } = explodePromise(); this.jobCompletePromises.set(id, { promise, resolve, reject }); return promise; } async addJob(newJob: CoreJobType): Promise { await this._addJob(newJob); } // Protected methods protected async _addJob( newJob: CoreJobType, options?: { forceStart: boolean } ): Promise<{ isAlreadyRunning: boolean }> { const job: CoreJobType & JobManagerJobType = { ...newJob, attempts: 0, retryAfter: null, lastAttemptTimestamp: null, active: false, }; const logId = this.params.getJobIdForLogging(job); try { const runningJob = this.getRunningJob(job); if (runningJob) { log.info(`${logId}: already running; resetting attempts`); runningJob.attempts = 0; await this.params.saveJob({ ...runningJob, attempts: 0, }); return { isAlreadyRunning: true }; } // Allow batching of all saves except those that we will start immediately await this.params.saveJob(job, { allowBatching: !options?.forceStart }); if (options?.forceStart) { if (!this.enabled) { log.warn( `${logId}: added but jobManager not enabled, can't start immediately` ); } else { log.info(`${logId}: starting job immediately`); drop(this.startJob(job)); } } else if (this.enabled) { drop(this.maybeStartJobs()); } return { isAlreadyRunning: false }; } catch (e) { log.error(`${logId}: error saving job`, Errors.toLogFormat(e)); throw e; } } // maybeStartJobs is called: // 1. every minute (via tick) // 2. after a job is added (via addJob) // 3. after a job finishes (via startJob) // preventing re-entrancy allow us to simplify some logic and ensure we don't try to // start too many jobs private _inMaybeStartJobs = false; protected async maybeStartJobs(): Promise { if (this._inMaybeStartJobs) { return; } try { this._inMaybeStartJobs = true; if (!this.enabled) { log.info(`${this.logPrefix}/_maybeStartJobs: not enabled, returning`); return; } const numJobsToStart = this.getMaximumNumberOfJobsToStart(); if (numJobsToStart <= 0) { return; } const nextJobs = await this.params.getNextJobs({ limit: numJobsToStart, timestamp: Date.now(), }); if (nextJobs.length === 0 && this.activeJobs.size === 0) { if (this.idleCallbacks.length > 0) { const callbacks = this.idleCallbacks; this.idleCallbacks = []; for (const callback of callbacks) { callback(); } } return; } if (this.params.shouldHoldOffOnStartingQueuedJobs?.()) { log.info( `${this.logPrefix}/_maybeStartJobs: holding off on starting ${nextJobs.length} new job(s)` ); return; } for (const job of nextJobs) { drop(this.startJob(job)); } } finally { this._inMaybeStartJobs = false; } } protected async startJob( job: CoreJobType & JobManagerJobType ): Promise { const logId = `${this.logPrefix}/startJob(${this.params.getJobIdForLogging( job )})`; if (this.isJobRunning(job)) { log.info(`${logId}: job is already running`); return; } const isLastAttempt = job.attempts + 1 >= (this.params.getRetryConfig(job).maxAttempts ?? Infinity); let jobRunResult: JobManagerJobResultType | undefined; try { log.info(`${logId}: starting job`); this.addRunningJob(job); await this.params.saveJob({ ...job, active: true }); const runJobPromise = this.params.runJob(job, isLastAttempt); this.handleJobStartPromises(job); jobRunResult = await runJobPromise; const { status } = jobRunResult; log.info(`${logId}: job completed with status: ${status}`); switch (status) { case 'finished': await this.params.removeJob(job); return; case 'retry': if (isLastAttempt) { throw new Error('Cannot retry on last attempt'); } await this.retryJobLater(job); return; case 'rate-limited': log.info( `${logId}: rate-limited; retrying in ${jobRunResult.pauseDurationMs}` ); this.pauseForDuration(jobRunResult.pauseDurationMs); await this.retryJobLater(job); return; default: throw missingCaseError(status); } } catch (e) { log.error(`${logId}: error when running job`, e); if (isLastAttempt) { await this.params.removeJob(job); } else { await this.retryJobLater(job); } } finally { this.removeRunningJob(job); if (jobRunResult?.status === 'finished') { if (jobRunResult.newJob) { log.info( `${logId}: adding new job as a result of this one completing` ); await this.addJob(jobRunResult.newJob); } } drop(this.maybeStartJobs()); } } private async retryJobLater(job: CoreJobType & JobManagerJobType) { const now = Date.now(); await this.params.saveJob({ ...job, active: false, attempts: job.attempts + 1, retryAfter: now + exponentialBackoffSleepTime( job.attempts + 1, this.params.getRetryConfig(job).backoffConfig ), lastAttemptTimestamp: now, }); } private getActiveJobCount(): number { return this.activeJobs.size; } private getMaximumNumberOfJobsToStart(): number { return Math.max( 0, this.params.maxConcurrentJobs - this.getActiveJobCount() ); } private getRunningJob( job: CoreJobType & JobManagerJobType ): (CoreJobType & JobManagerJobType) | undefined { const id = this.params.getJobId(job); return this.activeJobs.get(id)?.job; } private isJobRunning(job: CoreJobType & JobManagerJobType): boolean { return Boolean(this.getRunningJob(job)); } private removeRunningJob(job: CoreJobType & JobManagerJobType) { const idWithAttempts = this.getJobIdIncludingAttempts(job); this.jobCompletePromises.get(idWithAttempts)?.resolve(); this.jobCompletePromises.delete(idWithAttempts); const id = this.params.getJobId(job); this.activeJobs.get(id)?.completionPromise.resolve(); this.activeJobs.delete(id); } private addRunningJob(job: CoreJobType & JobManagerJobType) { if (this.isJobRunning(job)) { const jobIdForLogging = this.params.getJobIdForLogging(job); log.warn( `${this.logPrefix}/addRunningJob: job ${jobIdForLogging} is already running` ); } this.activeJobs.set(this.params.getJobId(job), { completionPromise: explodePromise(), job, }); } private handleJobStartPromises(job: CoreJobType & JobManagerJobType) { const id = this.getJobIdIncludingAttempts(job); this.jobStartPromises.get(id)?.resolve(); this.jobStartPromises.delete(id); } private getJobIdIncludingAttempts( job: CoreJobType & Pick ) { return `${this.params.getJobId(job)}.${job.attempts}`; } }