Add preliminary message backup harness
This commit is contained in:
parent
231bf91a22
commit
d85a1d5074
38 changed files with 2997 additions and 121 deletions
107
ts/sql/Server.ts
107
ts/sql/Server.ts
|
@ -10,6 +10,7 @@ import { randomBytes } from 'crypto';
|
|||
import type { Database, Statement } from '@signalapp/better-sqlite3';
|
||||
import SQL from '@signalapp/better-sqlite3';
|
||||
import pProps from 'p-props';
|
||||
import pTimeout from 'p-timeout';
|
||||
import { v4 as generateUuid } from 'uuid';
|
||||
import { z } from 'zod';
|
||||
|
||||
|
@ -48,6 +49,7 @@ import { isNormalNumber } from '../util/isNormalNumber';
|
|||
import { isNotNil } from '../util/isNotNil';
|
||||
import { parseIntOrThrow } from '../util/parseIntOrThrow';
|
||||
import * as durations from '../util/durations';
|
||||
import { explodePromise } from '../util/explodePromise';
|
||||
import { formatCountForLogging } from '../logging/formatCountForLogging';
|
||||
import type { ConversationColorType, CustomColorType } from '../types/Colors';
|
||||
import type { BadgeType, BadgeImageType } from '../badges/types';
|
||||
|
@ -106,9 +108,12 @@ import type {
|
|||
StoredItemType,
|
||||
ConversationMessageStatsType,
|
||||
MessageAttachmentsCursorType,
|
||||
MessageCursorType,
|
||||
MessageMetricsType,
|
||||
MessageType,
|
||||
MessageTypeUnhydrated,
|
||||
PageMessagesCursorType,
|
||||
PageMessagesResultType,
|
||||
PreKeyIdType,
|
||||
ReactionResultType,
|
||||
StoredPreKeyType,
|
||||
|
@ -184,6 +189,8 @@ type StickerRow = Readonly<{
|
|||
// https://github.com/microsoft/TypeScript/issues/420
|
||||
const dataInterface: ServerInterface = {
|
||||
close,
|
||||
pauseWriteAccess,
|
||||
resumeWriteAccess,
|
||||
removeDB,
|
||||
removeIndexedDBFiles,
|
||||
|
||||
|
@ -417,6 +424,8 @@ const dataInterface: ServerInterface = {
|
|||
|
||||
getKnownMessageAttachments,
|
||||
finishGetKnownMessageAttachments,
|
||||
pageMessages,
|
||||
finishPageMessages,
|
||||
getKnownConversationAttachments,
|
||||
removeKnownStickers,
|
||||
removeKnownDraftAttachments,
|
||||
|
@ -571,6 +580,8 @@ function openAndSetUpSQLCipher(
|
|||
return db;
|
||||
}
|
||||
|
||||
let pausedWriteQueue: Array<() => void> | undefined;
|
||||
|
||||
let globalWritableInstance: Database | undefined;
|
||||
let globalReadonlyInstance: Database | undefined;
|
||||
let logger = consoleLogger;
|
||||
|
@ -653,6 +664,33 @@ async function close(): Promise<void> {
|
|||
globalWritableInstance = undefined;
|
||||
}
|
||||
|
||||
async function pauseWriteAccess(): Promise<void> {
|
||||
strictAssert(
|
||||
pausedWriteQueue === undefined,
|
||||
'Database writes are already paused'
|
||||
);
|
||||
pausedWriteQueue = [];
|
||||
|
||||
logger.warn('pauseWriteAccess: pausing write access');
|
||||
}
|
||||
|
||||
async function resumeWriteAccess(): Promise<void> {
|
||||
strictAssert(
|
||||
pausedWriteQueue !== undefined,
|
||||
'Database writes are not paused'
|
||||
);
|
||||
const queue = pausedWriteQueue;
|
||||
pausedWriteQueue = undefined;
|
||||
|
||||
logger.warn(
|
||||
`resumeWriteAccess: resuming write access, queue.length=${queue.length}`
|
||||
);
|
||||
|
||||
for (const resumeOperation of queue) {
|
||||
resumeOperation();
|
||||
}
|
||||
}
|
||||
|
||||
async function removeDB(): Promise<void> {
|
||||
if (globalReadonlyInstance) {
|
||||
try {
|
||||
|
@ -702,7 +740,15 @@ function getReadonlyInstance(): Database {
|
|||
return globalReadonlyInstance;
|
||||
}
|
||||
|
||||
const WRITABLE_INSTANCE_MAX_WAIT = 5 * durations.MINUTE;
|
||||
|
||||
async function getWritableInstance(): Promise<Database> {
|
||||
if (pausedWriteQueue) {
|
||||
const { promise, resolve } = explodePromise<void>();
|
||||
pausedWriteQueue.push(resolve);
|
||||
await pTimeout(promise, WRITABLE_INSTANCE_MAX_WAIT);
|
||||
}
|
||||
|
||||
if (!globalWritableInstance) {
|
||||
throw new Error('getWritableInstance: globalWritableInstance not set!');
|
||||
}
|
||||
|
@ -6086,17 +6132,42 @@ function getExternalDraftFilesForConversation(
|
|||
async function getKnownMessageAttachments(
|
||||
cursor?: MessageAttachmentsCursorType
|
||||
): Promise<GetKnownMessageAttachmentsResultType> {
|
||||
const db = await getWritableInstance();
|
||||
const innerCursor = cursor as MessageCursorType | undefined as
|
||||
| PageMessagesCursorType
|
||||
| undefined;
|
||||
const result = new Set<string>();
|
||||
|
||||
const { messages, cursor: newCursor } = await pageMessages(innerCursor);
|
||||
|
||||
for (const message of messages) {
|
||||
const externalFiles = getExternalFilesForMessage(message);
|
||||
forEach(externalFiles, file => result.add(file));
|
||||
}
|
||||
|
||||
return {
|
||||
attachments: Array.from(result),
|
||||
cursor: newCursor as MessageCursorType as MessageAttachmentsCursorType,
|
||||
};
|
||||
}
|
||||
|
||||
async function finishGetKnownMessageAttachments(
|
||||
cursor: MessageAttachmentsCursorType
|
||||
): Promise<void> {
|
||||
const innerCursor = cursor as MessageCursorType as PageMessagesCursorType;
|
||||
|
||||
await finishPageMessages(innerCursor);
|
||||
}
|
||||
|
||||
async function pageMessages(
|
||||
cursor?: PageMessagesCursorType
|
||||
): Promise<PageMessagesResultType> {
|
||||
const db = getUnsafeWritableInstance('only temp table use');
|
||||
const chunkSize = 1000;
|
||||
|
||||
return db.transaction(() => {
|
||||
let count = cursor?.count ?? 0;
|
||||
|
||||
strictAssert(
|
||||
!cursor?.done,
|
||||
'getKnownMessageAttachments: iteration cannot be restarted'
|
||||
);
|
||||
strictAssert(!cursor?.done, 'pageMessages: iteration cannot be restarted');
|
||||
|
||||
let runId: string;
|
||||
if (cursor === undefined) {
|
||||
|
@ -6104,7 +6175,7 @@ async function getKnownMessageAttachments(
|
|||
|
||||
const total = getMessageCountSync();
|
||||
logger.info(
|
||||
`getKnownMessageAttachments(${runId}): ` +
|
||||
`pageMessages(${runId}): ` +
|
||||
`Starting iteration through ${total} messages`
|
||||
);
|
||||
|
||||
|
@ -6114,7 +6185,7 @@ async function getKnownMessageAttachments(
|
|||
(rowid INTEGER PRIMARY KEY ASC);
|
||||
|
||||
INSERT INTO tmp_${runId}_updated_messages (rowid)
|
||||
SELECT rowid FROM messages;
|
||||
SELECT rowid FROM messages ORDER BY rowid ASC;
|
||||
|
||||
CREATE TEMP TRIGGER tmp_${runId}_message_updates
|
||||
UPDATE OF json ON messages
|
||||
|
@ -6140,6 +6211,7 @@ async function getKnownMessageAttachments(
|
|||
`
|
||||
DELETE FROM tmp_${runId}_updated_messages
|
||||
RETURNING rowid
|
||||
ORDER BY rowid ASC
|
||||
LIMIT $chunkSize;
|
||||
`
|
||||
)
|
||||
|
@ -6160,28 +6232,25 @@ async function getKnownMessageAttachments(
|
|||
}
|
||||
);
|
||||
|
||||
for (const message of messages) {
|
||||
const externalFiles = getExternalFilesForMessage(message);
|
||||
forEach(externalFiles, file => result.add(file));
|
||||
count += 1;
|
||||
}
|
||||
|
||||
count += messages.length;
|
||||
const done = rowids.length < chunkSize;
|
||||
const newCursor: MessageCursorType = { runId, count, done };
|
||||
|
||||
return {
|
||||
attachments: Array.from(result),
|
||||
cursor: { runId, count, done },
|
||||
messages,
|
||||
cursor: newCursor as PageMessagesCursorType,
|
||||
};
|
||||
})();
|
||||
}
|
||||
|
||||
async function finishGetKnownMessageAttachments({
|
||||
async function finishPageMessages({
|
||||
runId,
|
||||
count,
|
||||
done,
|
||||
}: MessageAttachmentsCursorType): Promise<void> {
|
||||
const db = await getWritableInstance();
|
||||
}: PageMessagesCursorType): Promise<void> {
|
||||
const db = getUnsafeWritableInstance('only temp table use');
|
||||
|
||||
const logId = `finishGetKnownMessageAttachments(${runId})`;
|
||||
const logId = `finishPageMessages(${runId})`;
|
||||
if (!done) {
|
||||
logger.warn(`${logId}: iteration not finished`);
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue