conversationJobQueue: Introduce RUNNING status, attempts and backoff

This commit is contained in:
Scott Nonnenberg 2024-05-06 17:33:50 -07:00 committed by GitHub
parent 99022e7e6b
commit 3e51e4ef5d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 133 additions and 27 deletions

View file

@ -4263,7 +4263,7 @@
"description": "Shown in timeline or conversation preview when v2 group changes"
},
"icu:GroupV2--pending-remove--revoke-invite-from--one--unknown": {
"messageformat": "An admin revoked an invitation to the group for 1 person invited by {memberName}.",
"messageformat": "An admin revoked an invitation to the group for # person invited by {memberName}.",
"description": "Shown in timeline or conversation preview when v2 group changes"
},
"icu:GroupV2--pending-remove--revoke-invite-from-you--one--other": {
@ -4279,19 +4279,19 @@
"description": "Shown in timeline or conversation preview when v2 group changes"
},
"icu:GroupV2--pending-remove--revoke-invite-from--many--other": {
"messageformat": "{adminName} revoked {count, plural, one {an invitation to the group for 1 person} other {invitations to the group for # people}} invited by {memberName}.",
"messageformat": "{adminName} revoked {count, plural, one {an invitation to the group for # person} other {invitations to the group for # people}} invited by {memberName}.",
"description": "Shown in timeline or conversation preview when v2 group changes"
},
"icu:GroupV2--pending-remove--revoke-invite-from--many--you": {
"messageformat": "You revoked {count, plural, one {an invitation to the group for 1 person} other {invitations to the group for # people}} invited by {memberName}.",
"messageformat": "You revoked {count, plural, one {an invitation to the group for # person} other {invitations to the group for # people}} invited by {memberName}.",
"description": "Shown in timeline or conversation preview when v2 group changes"
},
"icu:GroupV2--pending-remove--revoke-invite-from--many--unknown": {
"messageformat": "An admin revoked {count, plural, one {an invitation to the group for 1 person} other {invitations to the group for # people}} invited by {memberName}.",
"messageformat": "An admin revoked {count, plural, one {an invitation to the group for # person} other {invitations to the group for # people}} invited by {memberName}.",
"description": "Shown in timeline or conversation preview when v2 group changes"
},
"icu:GroupV2--pending-remove--revoke-invite-from-you--many--other": {
"messageformat": "{adminName} revoked the {count, plural, one {invitation to the group you sent to 1 person} other {invitations to the group you sent to # people}}.",
"messageformat": "{adminName} revoked the {count, plural, one {invitation to the group you sent to # person} other {invitations to the group you sent to # people}}.",
"description": "Shown in timeline or conversation preview when v2 group changes"
},
"icu:GroupV2--pending-remove--revoke-invite-from-you--many--you": {
@ -4299,7 +4299,7 @@
"description": "Shown in timeline or conversation preview when v2 group changes"
},
"icu:GroupV2--pending-remove--revoke-invite-from-you--many--unknown": {
"messageformat": "An admin revoked the {count, plural, one {invitation to the group you sent to 1 person} other {invitations to the group you sent to # people}}.",
"messageformat": "An admin revoked the {count, plural, one {invitation to the group you sent to # person} other {invitations to the group you sent to # people}}.",
"description": "Shown in timeline or conversation preview when v2 group changes"
},
"icu:GroupV2--admin-approval-add-one--you": {

View file

@ -49,6 +49,7 @@ import { sendSavedProto } from './helpers/sendSavedProto';
import { drop } from '../util/drop';
import { isInPast } from '../util/timestamp';
import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary';
import { FIBONACCI } from '../util/BackOff';
// Note: generally, we only want to add to this list. If you do need to change one of
// these values, you'll likely need to write a database migration.
@ -321,6 +322,7 @@ enum RETRY_STATUS {
BLOCKED = 'BLOCKED',
BLOCKED_WITH_JOBS = 'BLOCKED_WITH_JOBS',
UNBLOCKED = 'UNBLOCKED',
RUNNING = 'RUNNING',
}
type ConversationData = Readonly<
@ -329,6 +331,7 @@ type ConversationData = Readonly<
// yet have a job to retry. We should, very soon, when the job returns
// JOB_STATUS.NEEDS_RETRY. This should be a very short-lived state.
status: RETRY_STATUS.BLOCKED;
attempts: number;
callback: undefined;
jobsNeedingRetry: undefined;
retryAt: number;
@ -337,6 +340,7 @@ type ConversationData = Readonly<
// This is the next stage, when we've added at least one job needing retry, and we
// have a callback registered to run on queue idle (or be called directly).
status: RETRY_STATUS.BLOCKED_WITH_JOBS;
attempts: number;
callback: () => void;
jobsNeedingRetry: Array<Readonly<StoredJob>>;
retryAt: number;
@ -345,13 +349,27 @@ type ConversationData = Readonly<
| {
// When we discover that we can now run these deferred jobs, we flip into this
// state, which should be short-lived. We very quickly re-enqueue all
// jobsNeedingRetry, and erase perConversationData for this conversation.
// jobsNeedingRetry, and move to RETRY_STATUS.RUNNING for this conversation.
status: RETRY_STATUS.UNBLOCKED;
attempts: number;
callback: () => void;
jobsNeedingRetry: Array<Readonly<StoredJob>>;
retryAt: undefined;
retryAtTimeout?: NodeJS.Timeout;
}
| {
// When we've queued all jobs needing retry, and we're waiting for the results
// of our next set of attempted sends, we are in this state. Its only real purpose
// is to keep track of our attempts, so we can exponentially back off. Once a send
// goes through successfully, we erase perConversationData for this conversation.
// Otherwise, we go back to RETRY_STATUS.BLOCKED.
status: RETRY_STATUS.RUNNING;
attempts: number;
callback: undefined;
jobsNeedingRetry: undefined;
retryAt: undefined;
retryAtTimeout: undefined;
}
>;
export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
@ -450,15 +468,22 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
return;
}
const { status, callback } = perConversationData;
const { attempts, status, callback } = perConversationData;
if (status === RETRY_STATUS.BLOCKED) {
globalLogger.info(
`${logId}: Deleting previous BLOCKED state; had no jobs`
`${logId}: Previously BLOCKED, moving to RUNNING state`
);
this.perConversationData.delete(conversationId);
this.perConversationData.set(conversationId, {
status: RETRY_STATUS.RUNNING,
attempts,
callback: undefined,
jobsNeedingRetry: undefined,
retryAt: undefined,
retryAtTimeout: undefined,
});
} else if (status === RETRY_STATUS.BLOCKED_WITH_JOBS) {
globalLogger.info(
`${logId}: Moving previous WAITING state to UNBLOCKED, calling callback directly`
`${logId}: Moving previous BLOCKED state to UNBLOCKED, calling callback directly`
);
this.perConversationData.set(conversationId, {
...perConversationData,
@ -471,17 +496,54 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
`${logId}: We're still in UNBLOCKED state; calling callback directly`
);
callback();
} else if (status === RETRY_STATUS.RUNNING) {
globalLogger.warn(
`${logId}: We're already in RUNNING state; doing nothing`
);
} else {
throw missingCaseError(status);
}
}
private recordSuccessfulSend(conversationId: string) {
const logId = `recordSuccessfulSend/${conversationId}`;
const perConversationData = this.perConversationData.get(conversationId);
if (!perConversationData) {
return;
}
const { status } = perConversationData;
if (status === RETRY_STATUS.RUNNING || status === RETRY_STATUS.BLOCKED) {
globalLogger.info(`${logId}: Previously ${status}; clearing state`);
this.perConversationData.delete(conversationId);
} else if (
status === RETRY_STATUS.BLOCKED_WITH_JOBS ||
status === RETRY_STATUS.UNBLOCKED
) {
globalLogger.warn(
`${logId}: We're still in ${status} state; calling unblockConversationRetries`
);
// We have to do this because in these states there are jobs that need to be retried
this.unblockConversationRetries(conversationId);
} else {
throw missingCaseError(status);
}
}
private getRetryWithBackoff(attempts: number) {
return (
Date.now() +
MINUTE * (FIBONACCI[attempts] ?? FIBONACCI[FIBONACCI.length - 1])
);
}
private captureRetryAt(conversationId: string, retryAt: number | undefined) {
const logId = `captureRetryAt/${conversationId}`;
const newRetryAt = retryAt || Date.now() + MINUTE;
const perConversationData = this.perConversationData.get(conversationId);
if (!perConversationData) {
const newRetryAt = retryAt || Date.now() + MINUTE;
if (!retryAt) {
globalLogger.warn(
`${logId}: No existing data, using retryAt of ${newRetryAt}`
@ -489,6 +551,7 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
}
this.perConversationData.set(conversationId, {
status: RETRY_STATUS.BLOCKED,
attempts: 1,
retryAt: newRetryAt,
callback: undefined,
jobsNeedingRetry: undefined,
@ -498,9 +561,12 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
}
const { status, retryAt: existingRetryAt } = perConversationData;
if (existingRetryAt && existingRetryAt >= newRetryAt) {
const attempts = perConversationData.attempts + 1;
const retryWithBackoff = this.getRetryWithBackoff(attempts);
if (existingRetryAt && existingRetryAt >= retryWithBackoff) {
globalLogger.warn(
`${logId}: New newRetryAt ${newRetryAt} isn't after existing retryAt ${existingRetryAt}, dropping`
`${logId}: New newRetryAt ${retryWithBackoff} isn't after existing retryAt ${existingRetryAt}, dropping`
);
return;
}
@ -510,20 +576,31 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
status === RETRY_STATUS.BLOCKED_WITH_JOBS
) {
globalLogger.info(
`${logId}: Updating to newRetryAt ${newRetryAt} from existing retryAt ${existingRetryAt}, status ${status}`
`${logId}: Updating to new retryAt ${retryWithBackoff} (attempts ${attempts}) from existing retryAt ${existingRetryAt}, status ${status}`
);
this.perConversationData.set(conversationId, {
...perConversationData,
retryAt: newRetryAt,
retryAt: retryWithBackoff,
});
} else if (status === RETRY_STATUS.UNBLOCKED) {
globalLogger.info(
`${logId}: Updating to newRetryAt ${newRetryAt} from previous UNBLOCKED status`
`${logId}: Updating to new retryAt ${retryWithBackoff} (attempts ${attempts}) from previous UNBLOCKED status`
);
this.perConversationData.set(conversationId, {
...perConversationData,
status: RETRY_STATUS.BLOCKED_WITH_JOBS,
retryAt: newRetryAt,
retryAt: retryWithBackoff,
});
} else if (status === RETRY_STATUS.RUNNING) {
globalLogger.info(
`${logId}: Updating to new retryAt ${retryWithBackoff} (attempts ${attempts}) from previous RUNNING status`
);
this.perConversationData.set(conversationId, {
status: RETRY_STATUS.BLOCKED,
attempts,
retryAt: retryWithBackoff,
callback: undefined,
jobsNeedingRetry: undefined,
});
} else {
throw missingCaseError(status);
@ -551,10 +628,12 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
);
}
const { status, retryAt, jobsNeedingRetry, callback } =
const defaultRetryAt = Date.now() + MINUTE;
const { attempts, callback, jobsNeedingRetry, status, retryAt } =
perConversationData || {
attempts: 1,
status: RETRY_STATUS.BLOCKED,
retryAt: Date.now() + MINUTE,
retryAt: defaultRetryAt,
};
const newJobsNeedingRetry = (jobsNeedingRetry || []).concat([storedJob]);
@ -571,13 +650,28 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
) {
this.perConversationData.set(conversationId, {
status: RETRY_STATUS.BLOCKED_WITH_JOBS,
attempts,
retryAt,
jobsNeedingRetry: newJobsNeedingRetry,
callback: newCallback,
});
} else if (status === RETRY_STATUS.RUNNING) {
const newAttempts = attempts + 1;
const newRetryAt = this.getRetryWithBackoff(newAttempts);
logger.warn(
`${logId}: Moving from state RUNNING to BLOCKED_WITH_JOBS, with retryAt ${newRetryAt}, (attempts ${newAttempts})`
);
this.perConversationData.set(conversationId, {
status: RETRY_STATUS.BLOCKED_WITH_JOBS,
attempts: newAttempts,
retryAt: newRetryAt,
jobsNeedingRetry: newJobsNeedingRetry,
callback: newCallback,
});
} else {
this.perConversationData.set(conversationId, {
status: RETRY_STATUS.UNBLOCKED,
attempts,
retryAt,
jobsNeedingRetry: newJobsNeedingRetry,
callback: newCallback,
@ -616,15 +710,12 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
}
const { status, retryAt } = perConversationData;
if (status === RETRY_STATUS.BLOCKED) {
globalLogger.warn(
`${logId}: Still in blocked state, no jobs to retry. Clearing perConversationData.`
);
this.perConversationData.delete(conversationId);
if (status === RETRY_STATUS.BLOCKED || status === RETRY_STATUS.RUNNING) {
globalLogger.warn(`${logId}: In ${status} state; no jobs to retry.`);
return;
}
const { callback, jobsNeedingRetry, retryAtTimeout } =
const { attempts, callback, jobsNeedingRetry, retryAtTimeout } =
perConversationData;
if (retryAtTimeout) {
@ -639,10 +730,18 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
// We're starting to retry jobs; remove the challenge handler
drop(window.Signal.challengeHandler?.unregister(conversationId, logId));
this.perConversationData.set(conversationId, {
status: RETRY_STATUS.RUNNING,
attempts,
callback: undefined,
jobsNeedingRetry: undefined,
retryAt: undefined,
retryAtTimeout: undefined,
});
jobsNeedingRetry?.forEach(job => {
drop(this.enqueueStoredJob(job));
});
this.perConversationData.delete(conversationId);
return;
}
@ -701,6 +800,7 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
skipWait: count > 1,
});
if (!shouldContinue) {
// We don't return here because each sub-task has its own cleanup sequence
break;
}
@ -873,6 +973,10 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
}
}
if (shouldContinue && !this.isShuttingDown) {
this.recordSuccessfulSend(conversationId);
}
return undefined;
} catch (error: unknown) {
const untrustedServiceIds: Array<ServiceIdString> = [];

View file

@ -3,6 +3,8 @@
const SECOND = 1000;
export const FIBONACCI: ReadonlyArray<number> = [1, 2, 3, 5, 8, 13, 21, 34, 55];
export const FIBONACCI_TIMEOUTS: ReadonlyArray<number> = [
1 * SECOND,
2 * SECOND,