Drop old unprocessed envelopes
This commit is contained in:
parent
465b387a13
commit
71ca3c0764
3 changed files with 31 additions and 7 deletions
|
@ -3028,6 +3028,19 @@ async function getUnprocessedCount(): Promise<number> {
|
||||||
|
|
||||||
async function getAllUnprocessed(): Promise<Array<UnprocessedType>> {
|
async function getAllUnprocessed(): Promise<Array<UnprocessedType>> {
|
||||||
const db = getInstance();
|
const db = getInstance();
|
||||||
|
|
||||||
|
const { changes: deletedCount } = db
|
||||||
|
.prepare<Query>('DELETE FROM unprocessed WHERE timestamp < $monthAgo')
|
||||||
|
.run({
|
||||||
|
monthAgo: Date.now() - durations.MONTH,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (deletedCount !== 0) {
|
||||||
|
logger.warn(
|
||||||
|
`getAllUnprocessed: deleting ${deletedCount} old unprocessed envelopes`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
const rows = db
|
const rows = db
|
||||||
.prepare<EmptyQuery>(
|
.prepare<EmptyQuery>(
|
||||||
`
|
`
|
||||||
|
|
|
@ -13,6 +13,7 @@ import {
|
||||||
|
|
||||||
import { signal } from '../protobuf/compiled';
|
import { signal } from '../protobuf/compiled';
|
||||||
import { sessionStructureToBytes } from '../util/sessionTranslation';
|
import { sessionStructureToBytes } from '../util/sessionTranslation';
|
||||||
|
import * as durations from '../util/durations';
|
||||||
import { Zone } from '../util/Zone';
|
import { Zone } from '../util/Zone';
|
||||||
|
|
||||||
import * as Bytes from '../Bytes';
|
import * as Bytes from '../Bytes';
|
||||||
|
@ -1477,7 +1478,7 @@ describe('SignalProtocolStore', () => {
|
||||||
{
|
{
|
||||||
id: '2-two',
|
id: '2-two',
|
||||||
envelope: 'second',
|
envelope: 'second',
|
||||||
timestamp: 2,
|
timestamp: Date.now() + 2,
|
||||||
version: 2,
|
version: 2,
|
||||||
attempts: 0,
|
attempts: 0,
|
||||||
},
|
},
|
||||||
|
@ -1617,6 +1618,8 @@ describe('SignalProtocolStore', () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('Not yet processed messages', () => {
|
describe('Not yet processed messages', () => {
|
||||||
|
const NOW = Date.now();
|
||||||
|
|
||||||
beforeEach(async () => {
|
beforeEach(async () => {
|
||||||
await store.removeAllUnprocessed();
|
await store.removeAllUnprocessed();
|
||||||
const items = await store.getAllUnprocessed();
|
const items = await store.getAllUnprocessed();
|
||||||
|
@ -1625,24 +1628,31 @@ describe('SignalProtocolStore', () => {
|
||||||
|
|
||||||
it('adds three and gets them back', async () => {
|
it('adds three and gets them back', async () => {
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
|
store.addUnprocessed({
|
||||||
|
id: '0-dropped',
|
||||||
|
envelope: 'old envelope',
|
||||||
|
timestamp: NOW - 2 * durations.MONTH,
|
||||||
|
version: 2,
|
||||||
|
attempts: 0,
|
||||||
|
}),
|
||||||
store.addUnprocessed({
|
store.addUnprocessed({
|
||||||
id: '2-two',
|
id: '2-two',
|
||||||
envelope: 'second',
|
envelope: 'second',
|
||||||
timestamp: 2,
|
timestamp: NOW + 2,
|
||||||
version: 2,
|
version: 2,
|
||||||
attempts: 0,
|
attempts: 0,
|
||||||
}),
|
}),
|
||||||
store.addUnprocessed({
|
store.addUnprocessed({
|
||||||
id: '3-three',
|
id: '3-three',
|
||||||
envelope: 'third',
|
envelope: 'third',
|
||||||
timestamp: 3,
|
timestamp: NOW + 3,
|
||||||
version: 2,
|
version: 2,
|
||||||
attempts: 0,
|
attempts: 0,
|
||||||
}),
|
}),
|
||||||
store.addUnprocessed({
|
store.addUnprocessed({
|
||||||
id: '1-one',
|
id: '1-one',
|
||||||
envelope: 'first',
|
envelope: 'first',
|
||||||
timestamp: 1,
|
timestamp: NOW + 1,
|
||||||
version: 2,
|
version: 2,
|
||||||
attempts: 0,
|
attempts: 0,
|
||||||
}),
|
}),
|
||||||
|
@ -1662,7 +1672,7 @@ describe('SignalProtocolStore', () => {
|
||||||
await store.addUnprocessed({
|
await store.addUnprocessed({
|
||||||
id,
|
id,
|
||||||
envelope: 'first',
|
envelope: 'first',
|
||||||
timestamp: 1,
|
timestamp: NOW + 1,
|
||||||
version: 2,
|
version: 2,
|
||||||
attempts: 0,
|
attempts: 0,
|
||||||
});
|
});
|
||||||
|
@ -1671,7 +1681,7 @@ describe('SignalProtocolStore', () => {
|
||||||
const items = await store.getAllUnprocessed();
|
const items = await store.getAllUnprocessed();
|
||||||
assert.strictEqual(items.length, 1);
|
assert.strictEqual(items.length, 1);
|
||||||
assert.strictEqual(items[0].decrypted, 'updated');
|
assert.strictEqual(items[0].decrypted, 'updated');
|
||||||
assert.strictEqual(items[0].timestamp, 1);
|
assert.strictEqual(items[0].timestamp, NOW + 1);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('removeUnprocessed successfully deletes item', async () => {
|
it('removeUnprocessed successfully deletes item', async () => {
|
||||||
|
@ -1679,7 +1689,7 @@ describe('SignalProtocolStore', () => {
|
||||||
await store.addUnprocessed({
|
await store.addUnprocessed({
|
||||||
id,
|
id,
|
||||||
envelope: 'first',
|
envelope: 'first',
|
||||||
timestamp: 1,
|
timestamp: NOW + 1,
|
||||||
version: 2,
|
version: 2,
|
||||||
attempts: 0,
|
attempts: 0,
|
||||||
});
|
});
|
||||||
|
|
|
@ -6,3 +6,4 @@ export const MINUTE = SECOND * 60;
|
||||||
export const HOUR = MINUTE * 60;
|
export const HOUR = MINUTE * 60;
|
||||||
export const DAY = HOUR * 24;
|
export const DAY = HOUR * 24;
|
||||||
export const WEEK = DAY * 7;
|
export const WEEK = DAY * 7;
|
||||||
|
export const MONTH = DAY * 30;
|
||||||
|
|
Loading…
Reference in a new issue