// Copyright 2021 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only import PQueue from 'p-queue'; import { v4 as uuid } from 'uuid'; import { noop } from 'lodash'; import { Job } from './Job'; import { JobError } from './JobError'; import type { ParsedJob, StoredJob, JobQueueStore } from './types'; import { assertDev } from '../util/assert'; import * as log from '../logging/log'; import { JobLogger } from './JobLogger'; import * as Errors from '../types/errors'; import type { LoggerType } from '../types/Logging'; import { drop } from '../util/drop'; const noopOnCompleteCallbacks = { resolve: noop, reject: noop, }; type JobQueueOptions = { /** * The backing store for jobs. Typically a wrapper around the database. */ store: JobQueueStore; /** * A unique name for this job queue. For example, might be "attachment downloads" or * "message send". */ queueType: string; /** * The maximum number of attempts for a job in this queue. A value of 1 will not allow * the job to fail; a value of 2 will allow the job to fail once; etc. */ maxAttempts: number; /** * A custom logger. Might be overwritten in test. */ logger?: LoggerType; }; export enum JOB_STATUS { SUCCESS = 'SUCCESS', NEEDS_RETRY = 'NEEDS_RETRY', ERROR = 'ERROR', } export abstract class JobQueue { private readonly maxAttempts: number; private readonly queueType: string; private readonly store: JobQueueStore; private readonly logger: LoggerType; private readonly logPrefix: string; private shuttingDown = false; private readonly onCompleteCallbacks = new Map< string, { resolve: () => void; reject: (err: unknown) => void; } >(); private readonly defaultInMemoryQueue = new PQueue({ concurrency: 1 }); private started = false; get isShuttingDown(): boolean { return this.shuttingDown; } constructor(options: Readonly) { assertDev( Number.isInteger(options.maxAttempts) && options.maxAttempts >= 1, 'maxAttempts should be a positive integer' ); assertDev( options.maxAttempts <= Number.MAX_SAFE_INTEGER, 'maxAttempts is too large' ); assertDev( options.queueType.trim().length, 'queueType should be a non-blank string' ); this.maxAttempts = options.maxAttempts; this.queueType = options.queueType; this.store = options.store; this.logger = options.logger ?? log; this.logPrefix = `${this.queueType} job queue:`; } /** * `parseData` will be called with the raw data from `store`. For example, if the job * takes a single number, `parseData` should throw if `data` is a number and should * return the number otherwise. * * If it throws, the job will be deleted from the database and the job will not be run. * * Will only be called once per job, even if `maxAttempts > 1`. */ protected abstract parseData(data: unknown): T; /** * Run the job, given data. * * If it resolves, the job will be deleted from the store. * * If it rejects, the job will be retried up to `maxAttempts - 1` times, after which it * will be deleted from the store. * * If your job logs things, you're encouraged to use the logger provided, as it * automatically includes debugging information. */ protected abstract run( job: Readonly>, extra?: Readonly<{ attempt?: number; log?: LoggerType }> ): Promise; protected getQueues(): ReadonlySet { return new Set([this.defaultInMemoryQueue]); } /** * Start streaming jobs from the store. */ async streamJobs(): Promise { if (this.started) { throw new Error( `${this.logPrefix} should not start streaming more than once` ); } this.started = true; log.info(`${this.logPrefix} starting to stream jobs`); const stream = this.store.stream(this.queueType); for await (const storedJob of stream) { if (this.shuttingDown) { log.info(`${this.logPrefix} is shutting down. Can't accept more work.`); break; } drop(this.enqueueStoredJob(storedJob)); } } /** * Add a job, which should cause it to be enqueued and run. * * If `streamJobs` has not been called yet, this will throw an error. * * You can override `insert` to change the way the job is added to the database. This is * useful if you're trying to save a message and a job in the same database transaction. */ async add( data: Readonly, insert?: (job: ParsedJob) => Promise ): Promise> { if (!this.started) { throw new Error( `${this.logPrefix} has not started streaming. Make sure to call streamJobs().` ); } const job = this.createJob(data); if (insert) { await insert(job); } await this.store.insert(job, { shouldPersist: !insert }); log.info(`${this.logPrefix} added new job ${job.id}`); return job; } protected createJob(data: Readonly): Job { const id = uuid(); const timestamp = Date.now(); const completionPromise = new Promise((resolve, reject) => { this.onCompleteCallbacks.set(id, { resolve, reject }); }); const completion = (async () => { try { await completionPromise; } catch (err: unknown) { throw new JobError(err); } finally { this.onCompleteCallbacks.delete(id); } })(); return new Job(id, timestamp, this.queueType, data, completion); } protected getInMemoryQueue(_parsedJob: ParsedJob): PQueue { return this.defaultInMemoryQueue; } protected async enqueueStoredJob( storedJob: Readonly ): Promise { assertDev( storedJob.queueType === this.queueType, 'Received a mis-matched queue type' ); log.info(`${this.logPrefix} enqueuing job ${storedJob.id}`); // It's okay if we don't have a callback; that likely means the job was created before // the process was started (e.g., from a previous run). const { resolve, reject } = this.onCompleteCallbacks.get(storedJob.id) || noopOnCompleteCallbacks; let parsedData: T; try { parsedData = this.parseData(storedJob.data); } catch (err) { log.error( `${this.logPrefix} failed to parse data for job ${storedJob.id}, created ${storedJob.timestamp}. Deleting job. Parse error:`, Errors.toLogFormat(err) ); await this.store.delete(storedJob.id); reject( new Error( 'Failed to parse job data. Was unexpected data loaded from the database?' ) ); return; } const parsedJob: ParsedJob = { ...storedJob, data: parsedData, }; const queue: PQueue = this.getInMemoryQueue(parsedJob); const logger = new JobLogger(parsedJob, this.logger); const result: | undefined | { status: JOB_STATUS.SUCCESS } | { status: JOB_STATUS.NEEDS_RETRY } | { status: JOB_STATUS.ERROR; err: unknown } = await queue.add( async () => { for (let attempt = 1; attempt <= this.maxAttempts; attempt += 1) { const isFinalAttempt = attempt === this.maxAttempts; logger.attempt = attempt; log.info( `${this.logPrefix} running job ${storedJob.id}, attempt ${attempt} of ${this.maxAttempts}` ); if (this.isShuttingDown) { log.warn( `${this.logPrefix} returning early for job ${storedJob.id}; shutting down` ); return { status: JOB_STATUS.ERROR, err: new Error('Shutting down'), }; } try { // We want an `await` in the loop, as we don't want a single job running more // than once at a time. Ideally, the job will succeed on the first attempt. // eslint-disable-next-line no-await-in-loop const jobStatus = await this.run(parsedJob, { attempt, log: logger, }); if (!jobStatus) { log.info( `${this.logPrefix} job ${storedJob.id} succeeded on attempt ${attempt}` ); return { status: JOB_STATUS.SUCCESS }; } log.info( `${this.logPrefix} job ${storedJob.id} returned status ${jobStatus} on attempt ${attempt}` ); return { status: jobStatus }; } catch (err: unknown) { log.error( `${this.logPrefix} job ${ storedJob.id } failed on attempt ${attempt}. ${Errors.toLogFormat(err)}` ); if (isFinalAttempt) { return { status: JOB_STATUS.ERROR, err }; } } } // This should never happen. See the assertion below. return undefined; } ); if (result?.status === JOB_STATUS.NEEDS_RETRY) { const addJobSuccess = await this.retryJobOnQueueIdle({ storedJob, job: parsedJob, logger, }); if (!addJobSuccess) { await this.store.delete(storedJob.id); } } if ( result?.status === JOB_STATUS.SUCCESS || (result?.status === JOB_STATUS.ERROR && !this.isShuttingDown) ) { await this.store.delete(storedJob.id); } assertDev( result, 'The job never ran. This indicates a developer error in the job queue' ); if (result.status === JOB_STATUS.ERROR) { reject(result.err); } else { resolve(); } } async retryJobOnQueueIdle({ logger, }: { job: Readonly>; storedJob: Readonly; logger: LoggerType; }): Promise { logger.error( `retryJobOnQueueIdle: not implemented for queue ${this.queueType}; dropping job` ); return false; } async shutdown(): Promise { const queues = this.getQueues(); log.info( `${this.logPrefix} shutdown: stop accepting new work and drain ${queues.size} promise queues` ); this.shuttingDown = true; await Promise.all([...queues].map(q => q.onIdle())); log.info(`${this.logPrefix} shutdown: complete`); } }