Increment unprocessed attempts when fetching

This commit is contained in:
Fedor Indutny 2022-04-28 15:28:30 -07:00 committed by GitHub
parent 6d576ed901
commit d6b58d23d6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 77 additions and 41 deletions

View file

@ -1859,9 +1859,9 @@ export class SignalProtocolStore extends EventsMixin {
}); });
} }
getAllUnprocessed(): Promise<Array<UnprocessedType>> { getAllUnprocessedAndIncrementAttempts(): Promise<Array<UnprocessedType>> {
return this.withZone(GLOBAL_ZONE, 'getAllUnprocessed', async () => { return this.withZone(GLOBAL_ZONE, 'getAllUnprocessed', async () => {
return window.Signal.Data.getAllUnprocessed(); return window.Signal.Data.getAllUnprocessedAndIncrementAttempts();
}); });
} }

View file

@ -247,7 +247,7 @@ const dataInterface: ClientInterface = {
migrateConversationMessages, migrateConversationMessages,
getUnprocessedCount, getUnprocessedCount,
getAllUnprocessed, getAllUnprocessedAndIncrementAttempts,
getUnprocessedById, getUnprocessedById,
updateUnprocessedWithData, updateUnprocessedWithData,
updateUnprocessedsWithData, updateUnprocessedsWithData,
@ -1443,8 +1443,8 @@ async function getUnprocessedCount() {
return channels.getUnprocessedCount(); return channels.getUnprocessedCount();
} }
async function getAllUnprocessed() { async function getAllUnprocessedAndIncrementAttempts() {
return channels.getAllUnprocessed(); return channels.getAllUnprocessedAndIncrementAttempts();
} }
async function getUnprocessedById(id: string) { async function getUnprocessedById(id: string) {

View file

@ -474,7 +474,7 @@ export type DataInterface = {
) => Promise<void>; ) => Promise<void>;
getUnprocessedCount: () => Promise<number>; getUnprocessedCount: () => Promise<number>;
getAllUnprocessed: () => Promise<Array<UnprocessedType>>; getAllUnprocessedAndIncrementAttempts: () => Promise<Array<UnprocessedType>>;
updateUnprocessedWithData: ( updateUnprocessedWithData: (
id: string, id: string,
data: UnprocessedUpdateType data: UnprocessedUpdateType

View file

@ -244,7 +244,7 @@ const dataInterface: ServerInterface = {
migrateConversationMessages, migrateConversationMessages,
getUnprocessedCount, getUnprocessedCount,
getAllUnprocessed, getAllUnprocessedAndIncrementAttempts,
updateUnprocessedWithData, updateUnprocessedWithData,
updateUnprocessedsWithData, updateUnprocessedsWithData,
getUnprocessedById, getUnprocessedById,
@ -3181,32 +3181,58 @@ async function getUnprocessedCount(): Promise<number> {
return getCountFromTable(getInstance(), 'unprocessed'); return getCountFromTable(getInstance(), 'unprocessed');
} }
async function getAllUnprocessed(): Promise<Array<UnprocessedType>> { async function getAllUnprocessedAndIncrementAttempts(): Promise<
Array<UnprocessedType>
> {
const db = getInstance(); const db = getInstance();
const { changes: deletedCount } = db return db.transaction(() => {
.prepare<Query>('DELETE FROM unprocessed WHERE timestamp < $monthAgo') const { changes: deletedStaleCount } = db
.run({ .prepare<Query>('DELETE FROM unprocessed WHERE timestamp < $monthAgo')
monthAgo: Date.now() - durations.MONTH, .run({
}); monthAgo: Date.now() - durations.MONTH,
});
if (deletedCount !== 0) { if (deletedStaleCount !== 0) {
logger.warn( logger.warn(
`getAllUnprocessed: deleting ${deletedCount} old unprocessed envelopes` 'getAllUnprocessedAndIncrementAttempts: ' +
); `deleting ${deletedStaleCount} old unprocessed envelopes`
} );
}
const rows = db db.prepare<EmptyQuery>(
.prepare<EmptyQuery>(
` `
SELECT * UPDATE unprocessed
FROM unprocessed SET attempts = attempts + 1
ORDER BY timestamp ASC;
` `
) ).run();
.all();
return rows; const { changes: deletedInvalidCount } = db
.prepare<Query>(
`
DELETE FROM unprocessed
WHERE attempts >= $MAX_UNPROCESSED_ATTEMPTS
`
)
.run({ MAX_UNPROCESSED_ATTEMPTS });
if (deletedInvalidCount !== 0) {
logger.warn(
'getAllUnprocessedAndIncrementAttempts: ' +
`deleting ${deletedInvalidCount} invalid unprocessed envelopes`
);
}
return db
.prepare<EmptyQuery>(
`
SELECT *
FROM unprocessed
ORDER BY timestamp ASC;
`
)
.all();
})();
} }
function removeUnprocessedsSync(ids: Array<string>): void { function removeUnprocessedsSync(ids: Array<string>): void {

View file

@ -1499,7 +1499,8 @@ describe('SignalProtocolStore', () => {
assert.equal(await store.loadSession(id), testSession); assert.equal(await store.loadSession(id), testSession);
assert.equal(await store.getSenderKey(id, distributionId), testSenderKey); assert.equal(await store.getSenderKey(id, distributionId), testSenderKey);
const allUnprocessed = await store.getAllUnprocessed(); const allUnprocessed =
await store.getAllUnprocessedAndIncrementAttempts();
assert.deepEqual( assert.deepEqual(
allUnprocessed.map(({ envelope }) => envelope), allUnprocessed.map(({ envelope }) => envelope),
['second'] ['second']
@ -1551,7 +1552,7 @@ describe('SignalProtocolStore', () => {
assert.equal(await store.loadSession(id), testSession); assert.equal(await store.loadSession(id), testSession);
assert.equal(await store.getSenderKey(id, distributionId), testSenderKey); assert.equal(await store.getSenderKey(id, distributionId), testSenderKey);
assert.deepEqual(await store.getAllUnprocessed(), []); assert.deepEqual(await store.getAllUnprocessedAndIncrementAttempts(), []);
}); });
it('can be re-entered', async () => { it('can be re-entered', async () => {
@ -1647,7 +1648,7 @@ describe('SignalProtocolStore', () => {
beforeEach(async () => { beforeEach(async () => {
await store.removeAllUnprocessed(); await store.removeAllUnprocessed();
const items = await store.getAllUnprocessed(); const items = await store.getAllUnprocessedAndIncrementAttempts();
assert.strictEqual(items.length, 0); assert.strictEqual(items.length, 0);
}); });
@ -1687,7 +1688,7 @@ describe('SignalProtocolStore', () => {
}), }),
]); ]);
const items = await store.getAllUnprocessed(); const items = await store.getAllUnprocessedAndIncrementAttempts();
assert.strictEqual(items.length, 3); assert.strictEqual(items.length, 3);
// they are in the proper order because the collection comparator is 'timestamp' // they are in the proper order because the collection comparator is 'timestamp'
@ -1708,10 +1709,11 @@ describe('SignalProtocolStore', () => {
}); });
await store.updateUnprocessedWithData(id, { decrypted: 'updated' }); await store.updateUnprocessedWithData(id, { decrypted: 'updated' });
const items = await store.getAllUnprocessed(); const items = await store.getAllUnprocessedAndIncrementAttempts();
assert.strictEqual(items.length, 1); assert.strictEqual(items.length, 1);
assert.strictEqual(items[0].decrypted, 'updated'); assert.strictEqual(items[0].decrypted, 'updated');
assert.strictEqual(items[0].timestamp, NOW + 1); assert.strictEqual(items[0].timestamp, NOW + 1);
assert.strictEqual(items[0].attempts, 1);
}); });
it('removeUnprocessed successfully deletes item', async () => { it('removeUnprocessed successfully deletes item', async () => {
@ -1726,7 +1728,21 @@ describe('SignalProtocolStore', () => {
}); });
await store.removeUnprocessed(id); await store.removeUnprocessed(id);
const items = await store.getAllUnprocessed(); const items = await store.getAllUnprocessedAndIncrementAttempts();
assert.strictEqual(items.length, 0);
});
it('getAllUnprocessedAndIncrementAttempts deletes items', async () => {
await store.addUnprocessed({
id: '1-one',
envelope: 'first',
timestamp: NOW + 1,
receivedAtCounter: 0,
version: 2,
attempts: 3,
});
const items = await store.getAllUnprocessedAndIncrementAttempts();
assert.strictEqual(items.length, 0); assert.strictEqual(items.length, 0);
}); });
}); });

View file

@ -802,17 +802,11 @@ export default class MessageReceiver
return []; return [];
} }
const items = await this.storage.protocol.getAllUnprocessed(); const items =
await this.storage.protocol.getAllUnprocessedAndIncrementAttempts();
log.info('getAllFromCache loaded', items.length, 'saved envelopes'); log.info('getAllFromCache loaded', items.length, 'saved envelopes');
return items.map(item => { return items;
const { attempts = 0 } = item;
return {
...item,
attempts: attempts + 1,
};
});
} }
private async decryptAndCacheBatch( private async decryptAndCacheBatch(