Ensure SQL worker messages are not dropped

This commit is contained in:
trevor-signal 2024-08-19 14:50:48 -04:00 committed by GitHub
parent 563451b5ef
commit 2c92591b59
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -55,7 +55,6 @@ import type { BadgeType, BadgeImageType } from '../badges/types';
import { parseBadgeCategory } from '../badges/BadgeCategory'; import { parseBadgeCategory } from '../badges/BadgeCategory';
import { parseBadgeImageTheme } from '../badges/BadgeImageTheme'; import { parseBadgeImageTheme } from '../badges/BadgeImageTheme';
import type { LoggerType } from '../types/Logging'; import type { LoggerType } from '../types/Logging';
import * as log from '../logging/log';
import type { import type {
EmptyQuery, EmptyQuery,
ArrayQuery, ArrayQuery,
@ -2132,7 +2131,7 @@ export function getAllSyncTasks(db: WritableDB): Array<SyncTaskType> {
}); });
if (toDelete.length > 0) { if (toDelete.length > 0) {
log.warn(`getAllSyncTasks: Removing ${toDelete.length} expired tasks`); logger.warn(`getAllSyncTasks: Removing ${toDelete.length} expired tasks`);
toDelete.forEach(task => { toDelete.forEach(task => {
removeSyncTaskById(db, task.id); removeSyncTaskById(db, task.id);
}); });
@ -2204,7 +2203,7 @@ export function saveMessage(
} }
if (readStatus === ReadStatus.Unread && seenStatus !== SeenStatus.Unseen) { if (readStatus === ReadStatus.Unread && seenStatus !== SeenStatus.Unseen) {
log.warn( logger.warn(
`saveMessage: Message ${id}/${type} is unread but had seenStatus=${seenStatus}. Forcing to UnseenStatus.Unseen.` `saveMessage: Message ${id}/${type} is unread but had seenStatus=${seenStatus}. Forcing to UnseenStatus.Unseen.`
); );
@ -2485,7 +2484,7 @@ function getMessageBySender(
}); });
if (rows.length > 1) { if (rows.length > 1) {
log.warn('getMessageBySender: More than one message found for', { logger.warn('getMessageBySender: More than one message found for', {
sent_at, sent_at,
source, source,
sourceServiceId, sourceServiceId,
@ -3671,7 +3670,7 @@ function getMessageTimestampForCallLogEventTarget(
const row = db.prepare(selectQuery).get(selectParams); const row = db.prepare(selectQuery).get(selectParams);
if (row == null) { if (row == null) {
log.warn('getTimestampForCallLogEventTarget: Target call not found'); logger.warn('getTimestampForCallLogEventTarget: Target call not found');
return timestamp; return timestamp;
} }
callId = row.callId as string; callId = row.callId as string;
@ -3690,7 +3689,7 @@ function getMessageTimestampForCallLogEventTarget(
const messageTimestamp = db.prepare(selectQuery).pluck().get(selectParams); const messageTimestamp = db.prepare(selectQuery).pluck().get(selectParams);
if (messageTimestamp == null) { if (messageTimestamp == null) {
log.warn( logger.warn(
'getTimestampForCallLogEventTarget: Target call message not found' 'getTimestampForCallLogEventTarget: Target call message not found'
); );
} }
@ -4510,7 +4509,7 @@ function getUnprocessedCount(db: ReadableDB): number {
} }
function getAllUnprocessedIds(db: WritableDB): Array<string> { function getAllUnprocessedIds(db: WritableDB): Array<string> {
log.info('getAllUnprocessedIds'); logger.info('getAllUnprocessedIds');
return db.transaction(() => { return db.transaction(() => {
// cleanup first // cleanup first
const { changes: deletedStaleCount } = db const { changes: deletedStaleCount } = db
@ -4559,7 +4558,9 @@ function getUnprocessedByIdsAndIncrementAttempts(
db: WritableDB, db: WritableDB,
ids: ReadonlyArray<string> ids: ReadonlyArray<string>
): Array<UnprocessedType> { ): Array<UnprocessedType> {
log.info('getUnprocessedByIdsAndIncrementAttempts', { totalIds: ids.length }); logger.info('getUnprocessedByIdsAndIncrementAttempts', {
totalIds: ids.length,
});
batchMultiVarQuery(db, ids, batch => { batchMultiVarQuery(db, ids, batch => {
return db return db
@ -4593,7 +4594,7 @@ function getUnprocessedByIdsAndIncrementAttempts(
} }
function removeUnprocesseds(db: WritableDB, ids: ReadonlyArray<string>): void { function removeUnprocesseds(db: WritableDB, ids: ReadonlyArray<string>): void {
log.info('removeUnprocesseds', { totalIds: ids.length }); logger.info('removeUnprocesseds', { totalIds: ids.length });
db.prepare<ArrayQuery>( db.prepare<ArrayQuery>(
` `
DELETE FROM unprocessed DELETE FROM unprocessed
@ -4603,7 +4604,7 @@ function removeUnprocesseds(db: WritableDB, ids: ReadonlyArray<string>): void {
} }
function removeUnprocessed(db: WritableDB, id: string | Array<string>): void { function removeUnprocessed(db: WritableDB, id: string | Array<string>): void {
log.info('removeUnprocessedSync', { id }); logger.info('removeUnprocessedSync', { id });
if (!Array.isArray(id)) { if (!Array.isArray(id)) {
prepare(db, 'DELETE FROM unprocessed WHERE id = $id;').run({ id }); prepare(db, 'DELETE FROM unprocessed WHERE id = $id;').run({ id });