Increase max attempt count for unprocessed items

This commit is contained in:
Fedor Indutny 2022-12-21 16:06:50 -08:00 committed by GitHub
parent a80c6d89a8
commit ecbf84638d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 3 deletions

View file

@ -3209,7 +3209,10 @@ function saveUnprocessedSync(data: UnprocessedType): string {
throw new Error('saveUnprocessedSync: id was falsey');
}
if (attempts >= MAX_UNPROCESSED_ATTEMPTS) {
if (attempts > MAX_UNPROCESSED_ATTEMPTS) {
logger.warn(
`saveUnprocessedSync: not saving ${id} due to exhausted attempts`
);
removeUnprocessedSync(id);
return id;
}
@ -3376,7 +3379,7 @@ async function getAllUnprocessedAndIncrementAttempts(): Promise<
.prepare<Query>(
`
DELETE FROM unprocessed
WHERE attempts >= $MAX_UNPROCESSED_ATTEMPTS
WHERE attempts > $MAX_UNPROCESSED_ATTEMPTS
`
)
.run({ MAX_UNPROCESSED_ATTEMPTS });

View file

@ -776,6 +776,13 @@ export default class MessageReceiver
}
private async queueAllCached(): Promise<void> {
if (this.stoppingProcessing) {
log.info(
'MessageReceiver.queueAllCached: not running due to stopped processing'
);
return;
}
const items = await this.getAllFromCache();
const max = items.length;
for (let i = 0; i < max; i += 1) {
@ -1098,7 +1105,7 @@ export default class MessageReceiver
id,
version: 2,
attempts: 1,
attempts: 0,
envelope: Bytes.toBase64(plaintext),
messageAgeSec: envelope.messageAgeSec,
receivedAtCounter: envelope.receivedAtCounter,