Removed hard limit on unprocessed messages in cache

This commit is contained in:
Alvaro 2023-02-02 12:39:07 -07:00 committed by GitHub
parent 1381e8df5d
commit e51f582bfb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 130 additions and 44 deletions

View file

@ -2012,12 +2012,24 @@ export class SignalProtocolStore extends EventEmitter {
}); });
} }
getAllUnprocessedAndIncrementAttempts(): Promise<Array<UnprocessedType>> { getAllUnprocessedIds(): Promise<Array<string>> {
return this.withZone(GLOBAL_ZONE, 'getAllUnprocessed', async () => { return this.withZone(GLOBAL_ZONE, 'getAllUnprocessedIds', () => {
return window.Signal.Data.getAllUnprocessedAndIncrementAttempts(); return window.Signal.Data.getAllUnprocessedIds();
}); });
} }
getUnprocessedByIdsAndIncrementAttempts(
ids: ReadonlyArray<string>
): Promise<Array<UnprocessedType>> {
return this.withZone(
GLOBAL_ZONE,
'getAllUnprocessedByIdsAndIncrementAttempts',
async () => {
return window.Signal.Data.getUnprocessedByIdsAndIncrementAttempts(ids);
}
);
}
getUnprocessedById(id: string): Promise<UnprocessedType | undefined> { getUnprocessedById(id: string): Promise<UnprocessedType | undefined> {
return this.withZone(GLOBAL_ZONE, 'getUnprocessedById', async () => { return this.withZone(GLOBAL_ZONE, 'getUnprocessedById', async () => {
return window.Signal.Data.getUnprocessedById(id); return window.Signal.Data.getUnprocessedById(id);
@ -2080,7 +2092,9 @@ export class SignalProtocolStore extends EventEmitter {
}); });
} }
/** only for testing */
removeAllUnprocessed(): Promise<void> { removeAllUnprocessed(): Promise<void> {
log.info('removeAllUnprocessed');
return this.withZone(GLOBAL_ZONE, 'removeAllUnprocessed', async () => { return this.withZone(GLOBAL_ZONE, 'removeAllUnprocessed', async () => {
await window.Signal.Data.removeAllUnprocessed(); await window.Signal.Data.removeAllUnprocessed();
}); });

View file

@ -560,7 +560,10 @@ export type DataInterface = {
) => Promise<void>; ) => Promise<void>;
getUnprocessedCount: () => Promise<number>; getUnprocessedCount: () => Promise<number>;
getAllUnprocessedAndIncrementAttempts: () => Promise<Array<UnprocessedType>>; getUnprocessedByIdsAndIncrementAttempts: (
ids: ReadonlyArray<string>
) => Promise<Array<UnprocessedType>>;
getAllUnprocessedIds: () => Promise<Array<string>>;
updateUnprocessedWithData: ( updateUnprocessedWithData: (
id: string, id: string,
data: UnprocessedUpdateType data: UnprocessedUpdateType
@ -570,6 +573,8 @@ export type DataInterface = {
) => Promise<void>; ) => Promise<void>;
getUnprocessedById: (id: string) => Promise<UnprocessedType | undefined>; getUnprocessedById: (id: string) => Promise<UnprocessedType | undefined>;
removeUnprocessed: (id: string | Array<string>) => Promise<void>; removeUnprocessed: (id: string | Array<string>) => Promise<void>;
/** only for testing */
removeAllUnprocessed: () => Promise<void>; removeAllUnprocessed: () => Promise<void>;
getAttachmentDownloadJobById: ( getAttachmentDownloadJobById: (

View file

@ -261,7 +261,8 @@ const dataInterface: ServerInterface = {
migrateConversationMessages, migrateConversationMessages,
getUnprocessedCount, getUnprocessedCount,
getAllUnprocessedAndIncrementAttempts, getUnprocessedByIdsAndIncrementAttempts,
getAllUnprocessedIds,
updateUnprocessedWithData, updateUnprocessedWithData,
updateUnprocessedsWithData, updateUnprocessedsWithData,
getUnprocessedById, getUnprocessedById,
@ -3391,12 +3392,12 @@ async function getUnprocessedCount(): Promise<number> {
return getCountFromTable(getInstance(), 'unprocessed'); return getCountFromTable(getInstance(), 'unprocessed');
} }
async function getAllUnprocessedAndIncrementAttempts(): Promise< async function getAllUnprocessedIds(): Promise<Array<string>> {
Array<UnprocessedType> log.info('getAllUnprocessedIds');
> {
const db = getInstance(); const db = getInstance();
return db.transaction(() => { return db.transaction(() => {
// cleanup first
const { changes: deletedStaleCount } = db const { changes: deletedStaleCount } = db
.prepare<Query>('DELETE FROM unprocessed WHERE timestamp < $monthAgo') .prepare<Query>('DELETE FROM unprocessed WHERE timestamp < $monthAgo')
.run({ .run({
@ -3410,18 +3411,11 @@ async function getAllUnprocessedAndIncrementAttempts(): Promise<
); );
} }
db.prepare<EmptyQuery>(
`
UPDATE unprocessed
SET attempts = attempts + 1
`
).run();
const { changes: deletedInvalidCount } = db const { changes: deletedInvalidCount } = db
.prepare<Query>( .prepare<Query>(
` `
DELETE FROM unprocessed DELETE FROM unprocessed
WHERE attempts > $MAX_UNPROCESSED_ATTEMPTS WHERE attempts >= $MAX_UNPROCESSED_ATTEMPTS
` `
) )
.run({ MAX_UNPROCESSED_ATTEMPTS }); .run({ MAX_UNPROCESSED_ATTEMPTS });
@ -3435,22 +3429,57 @@ async function getAllUnprocessedAndIncrementAttempts(): Promise<
return db return db
.prepare<EmptyQuery>( .prepare<EmptyQuery>(
`
SELECT id
FROM unprocessed
ORDER BY receivedAtCounter ASC
`
)
.pluck()
.all();
})();
}
async function getUnprocessedByIdsAndIncrementAttempts(
ids: ReadonlyArray<string>
): Promise<Array<UnprocessedType>> {
log.info('getUnprocessedByIdsAndIncrementAttempts', { totalIds: ids.length });
const db = getInstance();
batchMultiVarQuery(db, ids, batch => {
return db
.prepare<ArrayQuery>(
`
UPDATE unprocessed
SET attempts = attempts + 1
WHERE id IN (${batch.map(() => '?').join(', ')})
`
)
.run(batch);
});
return batchMultiVarQuery(db, ids, batch => {
return db
.prepare<ArrayQuery>(
` `
SELECT * SELECT *
FROM unprocessed FROM unprocessed
WHERE id IN (${batch.map(() => '?').join(', ')})
ORDER BY receivedAtCounter ASC; ORDER BY receivedAtCounter ASC;
` `
) )
.all() .all(batch)
.map(row => ({ .map(row => ({
...row, ...row,
urgent: isNumber(row.urgent) ? Boolean(row.urgent) : true, urgent: isNumber(row.urgent) ? Boolean(row.urgent) : true,
story: Boolean(row.story), story: Boolean(row.story),
})); }));
})(); });
} }
function removeUnprocessedsSync(ids: ReadonlyArray<string>): void { function removeUnprocessedsSync(ids: ReadonlyArray<string>): void {
log.info('removeUnprocessedsSync', { totalIds: ids.length });
const db = getInstance(); const db = getInstance();
db.prepare<ArrayQuery>( db.prepare<ArrayQuery>(
@ -3462,6 +3491,7 @@ function removeUnprocessedsSync(ids: ReadonlyArray<string>): void {
} }
function removeUnprocessedSync(id: string | Array<string>): void { function removeUnprocessedSync(id: string | Array<string>): void {
log.info('removeUnprocessedSync', { id });
const db = getInstance(); const db = getInstance();
if (!Array.isArray(id)) { if (!Array.isArray(id)) {

View file

@ -1273,7 +1273,10 @@ describe('SignalProtocolStore', () => {
assert.equal(await store.getSenderKey(id, distributionId), testSenderKey); assert.equal(await store.getSenderKey(id, distributionId), testSenderKey);
const allUnprocessed = const allUnprocessed =
await store.getAllUnprocessedAndIncrementAttempts(); await store.getUnprocessedByIdsAndIncrementAttempts(
await store.getAllUnprocessedIds()
);
assert.deepEqual( assert.deepEqual(
allUnprocessed.map(({ envelope }) => envelope), allUnprocessed.map(({ envelope }) => envelope),
['second'] ['second']
@ -1327,7 +1330,12 @@ describe('SignalProtocolStore', () => {
assert.equal(await store.loadSession(id), testSession); assert.equal(await store.loadSession(id), testSession);
assert.equal(await store.getSenderKey(id, distributionId), testSenderKey); assert.equal(await store.getSenderKey(id, distributionId), testSenderKey);
assert.deepEqual(await store.getAllUnprocessedAndIncrementAttempts(), []); assert.deepEqual(
await store.getUnprocessedByIdsAndIncrementAttempts(
await store.getAllUnprocessedIds()
),
[]
);
}); });
it('can be re-entered', async () => { it('can be re-entered', async () => {
@ -1423,7 +1431,9 @@ describe('SignalProtocolStore', () => {
beforeEach(async () => { beforeEach(async () => {
await store.removeAllUnprocessed(); await store.removeAllUnprocessed();
const items = await store.getAllUnprocessedAndIncrementAttempts(); const items = await store.getUnprocessedByIdsAndIncrementAttempts(
await store.getAllUnprocessedIds()
);
assert.strictEqual(items.length, 0); assert.strictEqual(items.length, 0);
}); });
@ -1471,7 +1481,9 @@ describe('SignalProtocolStore', () => {
}), }),
]); ]);
const items = await store.getAllUnprocessedAndIncrementAttempts(); const items = await store.getUnprocessedByIdsAndIncrementAttempts(
await store.getAllUnprocessedIds()
);
assert.strictEqual(items.length, 3); assert.strictEqual(items.length, 3);
// they are in the proper order because the collection comparator is // they are in the proper order because the collection comparator is
@ -1495,7 +1507,9 @@ describe('SignalProtocolStore', () => {
}); });
await store.updateUnprocessedWithData(id, { decrypted: 'updated' }); await store.updateUnprocessedWithData(id, { decrypted: 'updated' });
const items = await store.getAllUnprocessedAndIncrementAttempts(); const items = await store.getUnprocessedByIdsAndIncrementAttempts(
await store.getAllUnprocessedIds()
);
assert.strictEqual(items.length, 1); assert.strictEqual(items.length, 1);
assert.strictEqual(items[0].decrypted, 'updated'); assert.strictEqual(items[0].decrypted, 'updated');
assert.strictEqual(items[0].timestamp, NOW + 1); assert.strictEqual(items[0].timestamp, NOW + 1);
@ -1517,7 +1531,9 @@ describe('SignalProtocolStore', () => {
}); });
await store.removeUnprocessed(id); await store.removeUnprocessed(id);
const items = await store.getAllUnprocessedAndIncrementAttempts(); const items = await store.getUnprocessedByIdsAndIncrementAttempts(
await store.getAllUnprocessedIds()
);
assert.strictEqual(items.length, 0); assert.strictEqual(items.length, 0);
}); });
@ -1533,7 +1549,9 @@ describe('SignalProtocolStore', () => {
urgent: true, urgent: true,
}); });
const items = await store.getAllUnprocessedAndIncrementAttempts(); const items = await store.getUnprocessedByIdsAndIncrementAttempts(
await store.getAllUnprocessedIds()
);
assert.strictEqual(items.length, 0); assert.strictEqual(items.length, 0);
}); });
}); });

View file

@ -121,6 +121,7 @@ import { TEXT_ATTACHMENT } from '../types/MIME';
import type { SendTypesType } from '../util/handleMessageSend'; import type { SendTypesType } from '../util/handleMessageSend';
import { getStoriesBlocked } from '../util/stories'; import { getStoriesBlocked } from '../util/stories';
import { isNotNil } from '../util/isNotNil'; import { isNotNil } from '../util/isNotNil';
import { chunk } from '../util/iterables';
const GROUPV1_ID_LENGTH = 16; const GROUPV1_ID_LENGTH = 16;
const GROUPV2_ID_LENGTH = 32; const GROUPV2_ID_LENGTH = 32;
@ -444,7 +445,9 @@ export default class MessageReceiver
createTaskWithTimeout( createTaskWithTimeout(
async () => this.queueAllCached(), async () => this.queueAllCached(),
'incomingQueue/queueAllCached', 'incomingQueue/queueAllCached',
TASK_WITH_TIMEOUT_OPTIONS {
timeout: 10 * durations.MINUTE,
}
) )
) )
); );
@ -789,12 +792,14 @@ export default class MessageReceiver
return; return;
} }
const items = await this.getAllFromCache(); for await (const batch of this.getAllFromCache()) {
const max = items.length; const max = batch.length;
for (let i = 0; i < max; i += 1) { for (let i = 0; i < max; i += 1) {
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
await this.queueCached(items[i]); await this.queueCached(batch[i]);
}
} }
log.info('MessageReceiver.queueAllCached - finished');
} }
private async queueCached(item: UnprocessedType): Promise<void> { private async queueCached(item: UnprocessedType): Promise<void> {
@ -928,23 +933,20 @@ export default class MessageReceiver
} }
} }
private async getAllFromCache(): Promise<Array<UnprocessedType>> { private async *getAllFromCache(): AsyncIterable<Array<UnprocessedType>> {
log.info('getAllFromCache'); log.info('getAllFromCache');
const count = await this.storage.protocol.getUnprocessedCount();
if (count > 1500) { const ids = await this.storage.protocol.getAllUnprocessedIds();
await this.storage.protocol.removeAllUnprocessed();
log.warn( log.info(`getAllFromCache - ${ids.length} unprocessed`);
`There were ${count} messages in cache. Deleted all instead of reprocessing`
for (const batch of chunk(ids, 1000)) {
log.info(`getAllFromCache - yielding batch of ${batch.length}`);
yield this.storage.protocol.getUnprocessedByIdsAndIncrementAttempts(
batch
); );
return [];
} }
log.info(`getAllFromCache - done retrieving ${ids.length} unprocessed`);
const items =
await this.storage.protocol.getAllUnprocessedAndIncrementAttempts();
log.info('getAllFromCache loaded', items.length, 'saved envelopes');
return items;
} }
private async decryptAndCacheBatch( private async decryptAndCacheBatch(

View file

@ -257,6 +257,23 @@ export function repeat<T>(value: T): Iterable<T> {
return new RepeatIterable(value); return new RepeatIterable(value);
} }
export function* chunk<A>(
iterable: Iterable<A>,
chunkSize: number
): Iterable<Array<A>> {
let aChunk: Array<A> = [];
for (const item of iterable) {
aChunk.push(item);
if (aChunk.length === chunkSize) {
yield aChunk;
aChunk = [];
}
}
if (aChunk.length > 0) {
yield aChunk;
}
}
class RepeatIterable<T> implements Iterable<T> { class RepeatIterable<T> implements Iterable<T> {
constructor(private readonly value: T) {} constructor(private readonly value: T) {}