Defer unprocessed item attempt update

This commit is contained in:
Fedor Indutny 2021-09-17 16:11:24 -07:00 committed by GitHub
parent 7b5faa1cc1
commit b83c00f43f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 21 additions and 55 deletions

View file

@ -1948,12 +1948,6 @@ export class SignalProtocolStore extends EventsMixin {
}); });
} }
updateUnprocessedAttempts(id: string, attempts: number): Promise<void> {
return this.withZone(GLOBAL_ZONE, 'updateUnprocessedAttempts', async () => {
await window.Signal.Data.updateUnprocessedAttempts(id, attempts);
});
}
updateUnprocessedWithData( updateUnprocessedWithData(
id: string, id: string,
data: UnprocessedUpdateType data: UnprocessedUpdateType

View file

@ -233,7 +233,6 @@ const dataInterface: ClientInterface = {
getUnprocessedCount, getUnprocessedCount,
getAllUnprocessed, getAllUnprocessed,
getUnprocessedById, getUnprocessedById,
updateUnprocessedAttempts,
updateUnprocessedWithData, updateUnprocessedWithData,
updateUnprocessedsWithData, updateUnprocessedsWithData,
removeUnprocessed, removeUnprocessed,
@ -1422,9 +1421,6 @@ async function getUnprocessedById(id: string) {
return channels.getUnprocessedById(id); return channels.getUnprocessedById(id);
} }
async function updateUnprocessedAttempts(id: string, attempts: number) {
await channels.updateUnprocessedAttempts(id, attempts);
}
async function updateUnprocessedWithData( async function updateUnprocessedWithData(
id: string, id: string,
data: UnprocessedUpdateType data: UnprocessedUpdateType

View file

@ -392,7 +392,6 @@ export type DataInterface = {
getUnprocessedCount: () => Promise<number>; getUnprocessedCount: () => Promise<number>;
getAllUnprocessed: () => Promise<Array<UnprocessedType>>; getAllUnprocessed: () => Promise<Array<UnprocessedType>>;
updateUnprocessedAttempts: (id: string, attempts: number) => Promise<void>;
updateUnprocessedWithData: ( updateUnprocessedWithData: (
id: string, id: string,
data: UnprocessedUpdateType data: UnprocessedUpdateType

View file

@ -223,7 +223,6 @@ const dataInterface: ServerInterface = {
getUnprocessedCount, getUnprocessedCount,
getAllUnprocessed, getAllUnprocessed,
updateUnprocessedAttempts,
updateUnprocessedWithData, updateUnprocessedWithData,
updateUnprocessedsWithData, updateUnprocessedsWithData,
getUnprocessedById, getUnprocessedById,
@ -5118,6 +5117,8 @@ async function getTapToViewMessagesNeedingErase(): Promise<Array<MessageType>> {
return rows.map(row => jsonToObject(row.json)); return rows.map(row => jsonToObject(row.json));
} }
const MAX_UNPROCESSED_ATTEMPTS = 3;
function saveUnprocessedSync(data: UnprocessedType): string { function saveUnprocessedSync(data: UnprocessedType): string {
const db = getInstance(); const db = getInstance();
const { const {
@ -5137,6 +5138,11 @@ function saveUnprocessedSync(data: UnprocessedType): string {
throw new Error('saveUnprocessedSync: id was falsey'); throw new Error('saveUnprocessedSync: id was falsey');
} }
if (attempts >= MAX_UNPROCESSED_ATTEMPTS) {
removeUnprocessedSync(id);
return id;
}
prepare( prepare(
db, db,
` `
@ -5183,23 +5189,6 @@ function saveUnprocessedSync(data: UnprocessedType): string {
return id; return id;
} }
async function updateUnprocessedAttempts(
id: string,
attempts: number
): Promise<void> {
const db = getInstance();
db.prepare<Query>(
`
UPDATE unprocessed
SET attempts = $attempts
WHERE id = $id;
`
).run({
id,
attempts,
});
}
function updateUnprocessedWithDataSync( function updateUnprocessedWithDataSync(
id: string, id: string,
data: UnprocessedUpdateType data: UnprocessedUpdateType
@ -5299,7 +5288,7 @@ function removeUnprocessedsSync(ids: Array<string>): void {
).run(ids); ).run(ids);
} }
async function removeUnprocessed(id: string | Array<string>): Promise<void> { function removeUnprocessedSync(id: string | Array<string>): void {
if (!Array.isArray(id)) { if (!Array.isArray(id)) {
const db = getInstance(); const db = getInstance();
@ -5309,10 +5298,14 @@ async function removeUnprocessed(id: string | Array<string>): Promise<void> {
} }
if (!id.length) { if (!id.length) {
throw new Error('removeUnprocessed: No ids to delete!'); throw new Error('removeUnprocessedSync: No ids to delete!');
} }
batchMultiVarQuery(id, removeUnprocessedsSync); assertSync(batchMultiVarQuery(id, removeUnprocessedsSync));
}
async function removeUnprocessed(id: string | Array<string>): Promise<void> {
removeUnprocessedSync(id);
} }
async function removeAllUnprocessed(): Promise<void> { async function removeAllUnprocessed(): Promise<void> {

View file

@ -706,30 +706,14 @@ export default class MessageReceiver
const items = await this.storage.protocol.getAllUnprocessed(); const items = await this.storage.protocol.getAllUnprocessed();
log.info('getAllFromCache loaded', items.length, 'saved envelopes'); log.info('getAllFromCache loaded', items.length, 'saved envelopes');
return Promise.all( return items.map(item => {
map(items, async item => { const { attempts = 0 } = item;
const attempts = 1 + (item.attempts || 0);
try { return {
if (attempts >= 3) { ...item,
log.warn('getAllFromCache final attempt for envelope', item.id); attempts: attempts + 1,
await this.storage.protocol.removeUnprocessed(item.id); };
} else { });
await this.storage.protocol.updateUnprocessedAttempts(
item.id,
attempts
);
}
} catch (error) {
log.error(
'getAllFromCache error updating item after load:',
Errors.toLogFormat(error)
);
}
return item;
})
);
} }
private async decryptAndCacheBatch( private async decryptAndCacheBatch(