Store receivedAtCounter separately for unprocessed
This commit is contained in:
parent
0f5a01f2b2
commit
ca3f8b7df0
8 changed files with 50 additions and 4 deletions
|
@ -205,7 +205,8 @@ export abstract class JobQueue<T> {
|
|||
parsedData = this.parseData(storedJob.data);
|
||||
} catch (err) {
|
||||
log.error(
|
||||
`${this.logPrefix} failed to parse data for job ${storedJob.id}`
|
||||
`${this.logPrefix} failed to parse data for job ${storedJob.id}`,
|
||||
Errors.toLogFormat(err)
|
||||
);
|
||||
reject(
|
||||
new Error(
|
||||
|
|
|
@ -196,6 +196,7 @@ export type StickerPackType = Readonly<{
|
|||
export type UnprocessedType = {
|
||||
id: string;
|
||||
timestamp: number;
|
||||
receivedAtCounter: number | null;
|
||||
version: number;
|
||||
attempts: number;
|
||||
envelope?: string;
|
||||
|
|
|
@ -2969,6 +2969,7 @@ function saveUnprocessedSync(data: UnprocessedType): string {
|
|||
const {
|
||||
id,
|
||||
timestamp,
|
||||
receivedAtCounter,
|
||||
version,
|
||||
attempts,
|
||||
envelope,
|
||||
|
@ -2994,6 +2995,7 @@ function saveUnprocessedSync(data: UnprocessedType): string {
|
|||
INSERT OR REPLACE INTO unprocessed (
|
||||
id,
|
||||
timestamp,
|
||||
receivedAtCounter,
|
||||
version,
|
||||
attempts,
|
||||
envelope,
|
||||
|
@ -3006,6 +3008,7 @@ function saveUnprocessedSync(data: UnprocessedType): string {
|
|||
) values (
|
||||
$id,
|
||||
$timestamp,
|
||||
$receivedAtCounter,
|
||||
$version,
|
||||
$attempts,
|
||||
$envelope,
|
||||
|
@ -3020,6 +3023,7 @@ function saveUnprocessedSync(data: UnprocessedType): string {
|
|||
).run({
|
||||
id,
|
||||
timestamp,
|
||||
receivedAtCounter: receivedAtCounter ?? null,
|
||||
version,
|
||||
attempts,
|
||||
envelope: envelope || null,
|
||||
|
|
27
ts/sql/migrations/54-unprocessed-received-at-counter.ts
Normal file
27
ts/sql/migrations/54-unprocessed-received-at-counter.ts
Normal file
|
@ -0,0 +1,27 @@
|
|||
// Copyright 2022 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import type { Database } from 'better-sqlite3';
|
||||
|
||||
import type { LoggerType } from '../../types/Logging';
|
||||
|
||||
export default function updateToSchemaVersion54(
|
||||
currentVersion: number,
|
||||
db: Database,
|
||||
logger: LoggerType
|
||||
): void {
|
||||
if (currentVersion >= 54) {
|
||||
return;
|
||||
}
|
||||
|
||||
db.transaction(() => {
|
||||
db.exec(
|
||||
`
|
||||
ALTER TABLE unprocessed ADD COLUMN receivedAtCounter INTEGER;
|
||||
`
|
||||
);
|
||||
|
||||
db.pragma('user_version = 54');
|
||||
})();
|
||||
logger.info('updateToSchemaVersion54: success!');
|
||||
}
|
|
@ -29,6 +29,7 @@ import updateToSchemaVersion50 from './50-fix-messages-unread-index';
|
|||
import updateToSchemaVersion51 from './51-centralize-conversation-jobs';
|
||||
import updateToSchemaVersion52 from './52-optimize-stories';
|
||||
import updateToSchemaVersion53 from './53-gv2-banned-members';
|
||||
import updateToSchemaVersion54 from './54-unprocessed-received-at-counter';
|
||||
|
||||
function updateToSchemaVersion1(
|
||||
currentVersion: number,
|
||||
|
@ -1921,6 +1922,7 @@ export const SCHEMA_VERSIONS = [
|
|||
updateToSchemaVersion51,
|
||||
updateToSchemaVersion52,
|
||||
updateToSchemaVersion53,
|
||||
updateToSchemaVersion54,
|
||||
];
|
||||
|
||||
export function updateSchema(db: Database, logger: LoggerType): void {
|
||||
|
|
|
@ -1482,6 +1482,7 @@ describe('SignalProtocolStore', () => {
|
|||
id: '2-two',
|
||||
envelope: 'second',
|
||||
timestamp: Date.now() + 2,
|
||||
receivedAtCounter: 0,
|
||||
version: 2,
|
||||
attempts: 0,
|
||||
},
|
||||
|
@ -1536,6 +1537,7 @@ describe('SignalProtocolStore', () => {
|
|||
id: '2-two',
|
||||
envelope: 'second',
|
||||
timestamp: 2,
|
||||
receivedAtCounter: 0,
|
||||
version: 2,
|
||||
attempts: 0,
|
||||
},
|
||||
|
@ -1655,6 +1657,7 @@ describe('SignalProtocolStore', () => {
|
|||
id: '0-dropped',
|
||||
envelope: 'old envelope',
|
||||
timestamp: NOW - 2 * durations.MONTH,
|
||||
receivedAtCounter: 0,
|
||||
version: 2,
|
||||
attempts: 0,
|
||||
}),
|
||||
|
@ -1662,6 +1665,7 @@ describe('SignalProtocolStore', () => {
|
|||
id: '2-two',
|
||||
envelope: 'second',
|
||||
timestamp: NOW + 2,
|
||||
receivedAtCounter: 0,
|
||||
version: 2,
|
||||
attempts: 0,
|
||||
}),
|
||||
|
@ -1669,6 +1673,7 @@ describe('SignalProtocolStore', () => {
|
|||
id: '3-three',
|
||||
envelope: 'third',
|
||||
timestamp: NOW + 3,
|
||||
receivedAtCounter: 0,
|
||||
version: 2,
|
||||
attempts: 0,
|
||||
}),
|
||||
|
@ -1676,6 +1681,7 @@ describe('SignalProtocolStore', () => {
|
|||
id: '1-one',
|
||||
envelope: 'first',
|
||||
timestamp: NOW + 1,
|
||||
receivedAtCounter: 0,
|
||||
version: 2,
|
||||
attempts: 0,
|
||||
}),
|
||||
|
@ -1696,6 +1702,7 @@ describe('SignalProtocolStore', () => {
|
|||
id,
|
||||
envelope: 'first',
|
||||
timestamp: NOW + 1,
|
||||
receivedAtCounter: 0,
|
||||
version: 2,
|
||||
attempts: 0,
|
||||
});
|
||||
|
@ -1713,6 +1720,7 @@ describe('SignalProtocolStore', () => {
|
|||
id,
|
||||
envelope: 'first',
|
||||
timestamp: NOW + 1,
|
||||
receivedAtCounter: 0,
|
||||
version: 2,
|
||||
attempts: 0,
|
||||
});
|
||||
|
|
1
ts/textsecure.d.ts
vendored
1
ts/textsecure.d.ts
vendored
|
@ -23,6 +23,7 @@ export type UnprocessedType = {
|
|||
decrypted?: string;
|
||||
envelope?: string;
|
||||
id: string;
|
||||
receivedAtCounter: number | null;
|
||||
timestamp: number;
|
||||
serverGuid?: string;
|
||||
serverTimestamp?: number;
|
||||
|
|
|
@ -677,8 +677,9 @@ export default class MessageReceiver
|
|||
|
||||
const envelope: ProcessedEnvelope = {
|
||||
id: item.id,
|
||||
receivedAtCounter: item.timestamp,
|
||||
receivedAtDate: Date.now(),
|
||||
receivedAtCounter: item.receivedAtCounter ?? item.timestamp,
|
||||
receivedAtDate:
|
||||
item.receivedAtCounter === null ? Date.now() : item.timestamp,
|
||||
messageAgeSec: item.messageAgeSec || 0,
|
||||
|
||||
// Proto.Envelope fields
|
||||
|
@ -975,7 +976,8 @@ export default class MessageReceiver
|
|||
id,
|
||||
version: 2,
|
||||
envelope: Bytes.toBase64(plaintext),
|
||||
timestamp: envelope.receivedAtCounter,
|
||||
receivedAtCounter: envelope.receivedAtCounter,
|
||||
timestamp: envelope.timestamp,
|
||||
attempts: 1,
|
||||
messageAgeSec: envelope.messageAgeSec,
|
||||
};
|
||||
|
|
Loading…
Reference in a new issue