conversationJobQueue: Introduce RUNNING status, attempts and backoff

Co-authored-by: Scott Nonnenberg <scott@signal.org>
This commit is contained in:
automated-signal 2024-05-07 11:58:58 -05:00 committed by GitHub
parent 0119478d85
commit 55111c581d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 133 additions and 27 deletions

View file

@ -4263,7 +4263,7 @@
"description": "Shown in timeline or conversation preview when v2 group changes" "description": "Shown in timeline or conversation preview when v2 group changes"
}, },
"icu:GroupV2--pending-remove--revoke-invite-from--one--unknown": { "icu:GroupV2--pending-remove--revoke-invite-from--one--unknown": {
"messageformat": "An admin revoked an invitation to the group for 1 person invited by {memberName}.", "messageformat": "An admin revoked an invitation to the group for # person invited by {memberName}.",
"description": "Shown in timeline or conversation preview when v2 group changes" "description": "Shown in timeline or conversation preview when v2 group changes"
}, },
"icu:GroupV2--pending-remove--revoke-invite-from-you--one--other": { "icu:GroupV2--pending-remove--revoke-invite-from-you--one--other": {
@ -4279,19 +4279,19 @@
"description": "Shown in timeline or conversation preview when v2 group changes" "description": "Shown in timeline or conversation preview when v2 group changes"
}, },
"icu:GroupV2--pending-remove--revoke-invite-from--many--other": { "icu:GroupV2--pending-remove--revoke-invite-from--many--other": {
"messageformat": "{adminName} revoked {count, plural, one {an invitation to the group for 1 person} other {invitations to the group for # people}} invited by {memberName}.", "messageformat": "{adminName} revoked {count, plural, one {an invitation to the group for # person} other {invitations to the group for # people}} invited by {memberName}.",
"description": "Shown in timeline or conversation preview when v2 group changes" "description": "Shown in timeline or conversation preview when v2 group changes"
}, },
"icu:GroupV2--pending-remove--revoke-invite-from--many--you": { "icu:GroupV2--pending-remove--revoke-invite-from--many--you": {
"messageformat": "You revoked {count, plural, one {an invitation to the group for 1 person} other {invitations to the group for # people}} invited by {memberName}.", "messageformat": "You revoked {count, plural, one {an invitation to the group for # person} other {invitations to the group for # people}} invited by {memberName}.",
"description": "Shown in timeline or conversation preview when v2 group changes" "description": "Shown in timeline or conversation preview when v2 group changes"
}, },
"icu:GroupV2--pending-remove--revoke-invite-from--many--unknown": { "icu:GroupV2--pending-remove--revoke-invite-from--many--unknown": {
"messageformat": "An admin revoked {count, plural, one {an invitation to the group for 1 person} other {invitations to the group for # people}} invited by {memberName}.", "messageformat": "An admin revoked {count, plural, one {an invitation to the group for # person} other {invitations to the group for # people}} invited by {memberName}.",
"description": "Shown in timeline or conversation preview when v2 group changes" "description": "Shown in timeline or conversation preview when v2 group changes"
}, },
"icu:GroupV2--pending-remove--revoke-invite-from-you--many--other": { "icu:GroupV2--pending-remove--revoke-invite-from-you--many--other": {
"messageformat": "{adminName} revoked the {count, plural, one {invitation to the group you sent to 1 person} other {invitations to the group you sent to # people}}.", "messageformat": "{adminName} revoked the {count, plural, one {invitation to the group you sent to # person} other {invitations to the group you sent to # people}}.",
"description": "Shown in timeline or conversation preview when v2 group changes" "description": "Shown in timeline or conversation preview when v2 group changes"
}, },
"icu:GroupV2--pending-remove--revoke-invite-from-you--many--you": { "icu:GroupV2--pending-remove--revoke-invite-from-you--many--you": {
@ -4299,7 +4299,7 @@
"description": "Shown in timeline or conversation preview when v2 group changes" "description": "Shown in timeline or conversation preview when v2 group changes"
}, },
"icu:GroupV2--pending-remove--revoke-invite-from-you--many--unknown": { "icu:GroupV2--pending-remove--revoke-invite-from-you--many--unknown": {
"messageformat": "An admin revoked the {count, plural, one {invitation to the group you sent to 1 person} other {invitations to the group you sent to # people}}.", "messageformat": "An admin revoked the {count, plural, one {invitation to the group you sent to # person} other {invitations to the group you sent to # people}}.",
"description": "Shown in timeline or conversation preview when v2 group changes" "description": "Shown in timeline or conversation preview when v2 group changes"
}, },
"icu:GroupV2--admin-approval-add-one--you": { "icu:GroupV2--admin-approval-add-one--you": {

View file

@ -49,6 +49,7 @@ import { sendSavedProto } from './helpers/sendSavedProto';
import { drop } from '../util/drop'; import { drop } from '../util/drop';
import { isInPast } from '../util/timestamp'; import { isInPast } from '../util/timestamp';
import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary'; import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary';
import { FIBONACCI } from '../util/BackOff';
// Note: generally, we only want to add to this list. If you do need to change one of // Note: generally, we only want to add to this list. If you do need to change one of
// these values, you'll likely need to write a database migration. // these values, you'll likely need to write a database migration.
@ -321,6 +322,7 @@ enum RETRY_STATUS {
BLOCKED = 'BLOCKED', BLOCKED = 'BLOCKED',
BLOCKED_WITH_JOBS = 'BLOCKED_WITH_JOBS', BLOCKED_WITH_JOBS = 'BLOCKED_WITH_JOBS',
UNBLOCKED = 'UNBLOCKED', UNBLOCKED = 'UNBLOCKED',
RUNNING = 'RUNNING',
} }
type ConversationData = Readonly< type ConversationData = Readonly<
@ -329,6 +331,7 @@ type ConversationData = Readonly<
// yet have a job to retry. We should, very soon, when the job returns // yet have a job to retry. We should, very soon, when the job returns
// JOB_STATUS.NEEDS_RETRY. This should be a very short-lived state. // JOB_STATUS.NEEDS_RETRY. This should be a very short-lived state.
status: RETRY_STATUS.BLOCKED; status: RETRY_STATUS.BLOCKED;
attempts: number;
callback: undefined; callback: undefined;
jobsNeedingRetry: undefined; jobsNeedingRetry: undefined;
retryAt: number; retryAt: number;
@ -337,6 +340,7 @@ type ConversationData = Readonly<
// This is the next stage, when we've added at least one job needing retry, and we // This is the next stage, when we've added at least one job needing retry, and we
// have a callback registered to run on queue idle (or be called directly). // have a callback registered to run on queue idle (or be called directly).
status: RETRY_STATUS.BLOCKED_WITH_JOBS; status: RETRY_STATUS.BLOCKED_WITH_JOBS;
attempts: number;
callback: () => void; callback: () => void;
jobsNeedingRetry: Array<Readonly<StoredJob>>; jobsNeedingRetry: Array<Readonly<StoredJob>>;
retryAt: number; retryAt: number;
@ -345,13 +349,27 @@ type ConversationData = Readonly<
| { | {
// When we discover that we can now run these deferred jobs, we flip into this // When we discover that we can now run these deferred jobs, we flip into this
// state, which should be short-lived. We very quickly re-enqueue all // state, which should be short-lived. We very quickly re-enqueue all
// jobsNeedingRetry, and erase perConversationData for this conversation. // jobsNeedingRetry, and move to RETRY_STATUS.RUNNING for this conversation.
status: RETRY_STATUS.UNBLOCKED; status: RETRY_STATUS.UNBLOCKED;
attempts: number;
callback: () => void; callback: () => void;
jobsNeedingRetry: Array<Readonly<StoredJob>>; jobsNeedingRetry: Array<Readonly<StoredJob>>;
retryAt: undefined; retryAt: undefined;
retryAtTimeout?: NodeJS.Timeout; retryAtTimeout?: NodeJS.Timeout;
} }
| {
// When we've queued all jobs needing retry, and we're waiting for the results
// of our next set of attempted sends, we are in this state. Its only real purpose
// is to keep track of our attempts, so we can exponentially back off. Once a send
// goes through successfully, we erase perConversationData for this conversation.
// Otherwise, we go back to RETRY_STATUS.BLOCKED.
status: RETRY_STATUS.RUNNING;
attempts: number;
callback: undefined;
jobsNeedingRetry: undefined;
retryAt: undefined;
retryAtTimeout: undefined;
}
>; >;
export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> { export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
@ -450,15 +468,22 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
return; return;
} }
const { status, callback } = perConversationData; const { attempts, status, callback } = perConversationData;
if (status === RETRY_STATUS.BLOCKED) { if (status === RETRY_STATUS.BLOCKED) {
globalLogger.info( globalLogger.info(
`${logId}: Deleting previous BLOCKED state; had no jobs` `${logId}: Previously BLOCKED, moving to RUNNING state`
); );
this.perConversationData.delete(conversationId); this.perConversationData.set(conversationId, {
status: RETRY_STATUS.RUNNING,
attempts,
callback: undefined,
jobsNeedingRetry: undefined,
retryAt: undefined,
retryAtTimeout: undefined,
});
} else if (status === RETRY_STATUS.BLOCKED_WITH_JOBS) { } else if (status === RETRY_STATUS.BLOCKED_WITH_JOBS) {
globalLogger.info( globalLogger.info(
`${logId}: Moving previous WAITING state to UNBLOCKED, calling callback directly` `${logId}: Moving previous BLOCKED state to UNBLOCKED, calling callback directly`
); );
this.perConversationData.set(conversationId, { this.perConversationData.set(conversationId, {
...perConversationData, ...perConversationData,
@ -471,17 +496,54 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
`${logId}: We're still in UNBLOCKED state; calling callback directly` `${logId}: We're still in UNBLOCKED state; calling callback directly`
); );
callback(); callback();
} else if (status === RETRY_STATUS.RUNNING) {
globalLogger.warn(
`${logId}: We're already in RUNNING state; doing nothing`
);
} else { } else {
throw missingCaseError(status); throw missingCaseError(status);
} }
} }
private recordSuccessfulSend(conversationId: string) {
const logId = `recordSuccessfulSend/${conversationId}`;
const perConversationData = this.perConversationData.get(conversationId);
if (!perConversationData) {
return;
}
const { status } = perConversationData;
if (status === RETRY_STATUS.RUNNING || status === RETRY_STATUS.BLOCKED) {
globalLogger.info(`${logId}: Previously ${status}; clearing state`);
this.perConversationData.delete(conversationId);
} else if (
status === RETRY_STATUS.BLOCKED_WITH_JOBS ||
status === RETRY_STATUS.UNBLOCKED
) {
globalLogger.warn(
`${logId}: We're still in ${status} state; calling unblockConversationRetries`
);
// We have to do this because in these states there are jobs that need to be retried
this.unblockConversationRetries(conversationId);
} else {
throw missingCaseError(status);
}
}
private getRetryWithBackoff(attempts: number) {
return (
Date.now() +
MINUTE * (FIBONACCI[attempts] ?? FIBONACCI[FIBONACCI.length - 1])
);
}
private captureRetryAt(conversationId: string, retryAt: number | undefined) { private captureRetryAt(conversationId: string, retryAt: number | undefined) {
const logId = `captureRetryAt/${conversationId}`; const logId = `captureRetryAt/${conversationId}`;
const newRetryAt = retryAt || Date.now() + MINUTE;
const perConversationData = this.perConversationData.get(conversationId); const perConversationData = this.perConversationData.get(conversationId);
if (!perConversationData) { if (!perConversationData) {
const newRetryAt = retryAt || Date.now() + MINUTE;
if (!retryAt) { if (!retryAt) {
globalLogger.warn( globalLogger.warn(
`${logId}: No existing data, using retryAt of ${newRetryAt}` `${logId}: No existing data, using retryAt of ${newRetryAt}`
@ -489,6 +551,7 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
} }
this.perConversationData.set(conversationId, { this.perConversationData.set(conversationId, {
status: RETRY_STATUS.BLOCKED, status: RETRY_STATUS.BLOCKED,
attempts: 1,
retryAt: newRetryAt, retryAt: newRetryAt,
callback: undefined, callback: undefined,
jobsNeedingRetry: undefined, jobsNeedingRetry: undefined,
@ -498,9 +561,12 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
} }
const { status, retryAt: existingRetryAt } = perConversationData; const { status, retryAt: existingRetryAt } = perConversationData;
if (existingRetryAt && existingRetryAt >= newRetryAt) { const attempts = perConversationData.attempts + 1;
const retryWithBackoff = this.getRetryWithBackoff(attempts);
if (existingRetryAt && existingRetryAt >= retryWithBackoff) {
globalLogger.warn( globalLogger.warn(
`${logId}: New newRetryAt ${newRetryAt} isn't after existing retryAt ${existingRetryAt}, dropping` `${logId}: New newRetryAt ${retryWithBackoff} isn't after existing retryAt ${existingRetryAt}, dropping`
); );
return; return;
} }
@ -510,20 +576,31 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
status === RETRY_STATUS.BLOCKED_WITH_JOBS status === RETRY_STATUS.BLOCKED_WITH_JOBS
) { ) {
globalLogger.info( globalLogger.info(
`${logId}: Updating to newRetryAt ${newRetryAt} from existing retryAt ${existingRetryAt}, status ${status}` `${logId}: Updating to new retryAt ${retryWithBackoff} (attempts ${attempts}) from existing retryAt ${existingRetryAt}, status ${status}`
); );
this.perConversationData.set(conversationId, { this.perConversationData.set(conversationId, {
...perConversationData, ...perConversationData,
retryAt: newRetryAt, retryAt: retryWithBackoff,
}); });
} else if (status === RETRY_STATUS.UNBLOCKED) { } else if (status === RETRY_STATUS.UNBLOCKED) {
globalLogger.info( globalLogger.info(
`${logId}: Updating to newRetryAt ${newRetryAt} from previous UNBLOCKED status` `${logId}: Updating to new retryAt ${retryWithBackoff} (attempts ${attempts}) from previous UNBLOCKED status`
); );
this.perConversationData.set(conversationId, { this.perConversationData.set(conversationId, {
...perConversationData, ...perConversationData,
status: RETRY_STATUS.BLOCKED_WITH_JOBS, status: RETRY_STATUS.BLOCKED_WITH_JOBS,
retryAt: newRetryAt, retryAt: retryWithBackoff,
});
} else if (status === RETRY_STATUS.RUNNING) {
globalLogger.info(
`${logId}: Updating to new retryAt ${retryWithBackoff} (attempts ${attempts}) from previous RUNNING status`
);
this.perConversationData.set(conversationId, {
status: RETRY_STATUS.BLOCKED,
attempts,
retryAt: retryWithBackoff,
callback: undefined,
jobsNeedingRetry: undefined,
}); });
} else { } else {
throw missingCaseError(status); throw missingCaseError(status);
@ -551,10 +628,12 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
); );
} }
const { status, retryAt, jobsNeedingRetry, callback } = const defaultRetryAt = Date.now() + MINUTE;
const { attempts, callback, jobsNeedingRetry, status, retryAt } =
perConversationData || { perConversationData || {
attempts: 1,
status: RETRY_STATUS.BLOCKED, status: RETRY_STATUS.BLOCKED,
retryAt: Date.now() + MINUTE, retryAt: defaultRetryAt,
}; };
const newJobsNeedingRetry = (jobsNeedingRetry || []).concat([storedJob]); const newJobsNeedingRetry = (jobsNeedingRetry || []).concat([storedJob]);
@ -571,13 +650,28 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
) { ) {
this.perConversationData.set(conversationId, { this.perConversationData.set(conversationId, {
status: RETRY_STATUS.BLOCKED_WITH_JOBS, status: RETRY_STATUS.BLOCKED_WITH_JOBS,
attempts,
retryAt, retryAt,
jobsNeedingRetry: newJobsNeedingRetry, jobsNeedingRetry: newJobsNeedingRetry,
callback: newCallback, callback: newCallback,
}); });
} else if (status === RETRY_STATUS.RUNNING) {
const newAttempts = attempts + 1;
const newRetryAt = this.getRetryWithBackoff(newAttempts);
logger.warn(
`${logId}: Moving from state RUNNING to BLOCKED_WITH_JOBS, with retryAt ${newRetryAt}, (attempts ${newAttempts})`
);
this.perConversationData.set(conversationId, {
status: RETRY_STATUS.BLOCKED_WITH_JOBS,
attempts: newAttempts,
retryAt: newRetryAt,
jobsNeedingRetry: newJobsNeedingRetry,
callback: newCallback,
});
} else { } else {
this.perConversationData.set(conversationId, { this.perConversationData.set(conversationId, {
status: RETRY_STATUS.UNBLOCKED, status: RETRY_STATUS.UNBLOCKED,
attempts,
retryAt, retryAt,
jobsNeedingRetry: newJobsNeedingRetry, jobsNeedingRetry: newJobsNeedingRetry,
callback: newCallback, callback: newCallback,
@ -616,15 +710,12 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
} }
const { status, retryAt } = perConversationData; const { status, retryAt } = perConversationData;
if (status === RETRY_STATUS.BLOCKED) { if (status === RETRY_STATUS.BLOCKED || status === RETRY_STATUS.RUNNING) {
globalLogger.warn( globalLogger.warn(`${logId}: In ${status} state; no jobs to retry.`);
`${logId}: Still in blocked state, no jobs to retry. Clearing perConversationData.`
);
this.perConversationData.delete(conversationId);
return; return;
} }
const { callback, jobsNeedingRetry, retryAtTimeout } = const { attempts, callback, jobsNeedingRetry, retryAtTimeout } =
perConversationData; perConversationData;
if (retryAtTimeout) { if (retryAtTimeout) {
@ -639,10 +730,18 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
// We're starting to retry jobs; remove the challenge handler // We're starting to retry jobs; remove the challenge handler
drop(window.Signal.challengeHandler?.unregister(conversationId, logId)); drop(window.Signal.challengeHandler?.unregister(conversationId, logId));
this.perConversationData.set(conversationId, {
status: RETRY_STATUS.RUNNING,
attempts,
callback: undefined,
jobsNeedingRetry: undefined,
retryAt: undefined,
retryAtTimeout: undefined,
});
jobsNeedingRetry?.forEach(job => { jobsNeedingRetry?.forEach(job => {
drop(this.enqueueStoredJob(job)); drop(this.enqueueStoredJob(job));
}); });
this.perConversationData.delete(conversationId);
return; return;
} }
@ -701,6 +800,7 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
skipWait: count > 1, skipWait: count > 1,
}); });
if (!shouldContinue) { if (!shouldContinue) {
// We don't return here because each sub-task has its own cleanup sequence
break; break;
} }
@ -873,6 +973,10 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
} }
} }
if (shouldContinue && !this.isShuttingDown) {
this.recordSuccessfulSend(conversationId);
}
return undefined; return undefined;
} catch (error: unknown) { } catch (error: unknown) {
const untrustedServiceIds: Array<ServiceIdString> = []; const untrustedServiceIds: Array<ServiceIdString> = [];

View file

@ -3,6 +3,8 @@
const SECOND = 1000; const SECOND = 1000;
export const FIBONACCI: ReadonlyArray<number> = [1, 2, 3, 5, 8, 13, 21, 34, 55];
export const FIBONACCI_TIMEOUTS: ReadonlyArray<number> = [ export const FIBONACCI_TIMEOUTS: ReadonlyArray<number> = [
1 * SECOND, 1 * SECOND,
2 * SECOND, 2 * SECOND,