Update sendStateByConversationId after merge
This commit is contained in:
parent
da9e657211
commit
99fd03078e
3 changed files with 240 additions and 11 deletions
|
@ -16,6 +16,7 @@ import { isNotNil } from '../util/isNotNil';
|
||||||
import { map } from '../util/iterables';
|
import { map } from '../util/iterables';
|
||||||
import { softAssert, strictAssert } from '../util/assert';
|
import { softAssert, strictAssert } from '../util/assert';
|
||||||
import { isStory } from '../messages/helpers';
|
import { isStory } from '../messages/helpers';
|
||||||
|
import type { SendStateByConversationId } from '../messages/MessageSendState';
|
||||||
import { getStoryDataFromMessageAttributes } from './storyLoader';
|
import { getStoryDataFromMessageAttributes } from './storyLoader';
|
||||||
|
|
||||||
const MAX_THROTTLED_REDUX_UPDATERS = 200;
|
const MAX_THROTTLED_REDUX_UPDATERS = 200;
|
||||||
|
@ -95,13 +96,42 @@ export class MessageCache {
|
||||||
conversationId: string;
|
conversationId: string;
|
||||||
obsoleteId: string;
|
obsoleteId: string;
|
||||||
}): void {
|
}): void {
|
||||||
|
const updateSendState = (
|
||||||
|
sendState?: SendStateByConversationId
|
||||||
|
): SendStateByConversationId | undefined => {
|
||||||
|
if (!sendState?.[obsoleteId]) {
|
||||||
|
return sendState;
|
||||||
|
}
|
||||||
|
const { [obsoleteId]: obsoleteSendState, ...rest } = sendState;
|
||||||
|
return {
|
||||||
|
[conversationId]: obsoleteSendState,
|
||||||
|
...rest,
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
for (const [messageId, messageAttributes] of this.state.messages) {
|
for (const [messageId, messageAttributes] of this.state.messages) {
|
||||||
if (messageAttributes.conversationId !== obsoleteId) {
|
if (messageAttributes.conversationId !== obsoleteId) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const editHistory = messageAttributes.editHistory?.map(history => {
|
||||||
|
return {
|
||||||
|
...history,
|
||||||
|
sendStateByConversationId: updateSendState(
|
||||||
|
history.sendStateByConversationId
|
||||||
|
),
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
this.setAttributes({
|
this.setAttributes({
|
||||||
messageId,
|
messageId,
|
||||||
messageAttributes: { conversationId },
|
messageAttributes: {
|
||||||
|
conversationId,
|
||||||
|
sendStateByConversationId: updateSendState(
|
||||||
|
messageAttributes.sendStateByConversationId
|
||||||
|
),
|
||||||
|
editHistory,
|
||||||
|
},
|
||||||
skipSaveToDatabase: true,
|
skipSaveToDatabase: true,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
108
ts/sql/Server.ts
108
ts/sql/Server.ts
|
@ -26,6 +26,7 @@ import {
|
||||||
last,
|
last,
|
||||||
map,
|
map,
|
||||||
mapValues,
|
mapValues,
|
||||||
|
noop,
|
||||||
omit,
|
omit,
|
||||||
partition,
|
partition,
|
||||||
pick,
|
pick,
|
||||||
|
@ -664,6 +665,28 @@ async function initialize({
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function setupTests(db: Database): void {
|
||||||
|
if (globalWritableInstance || globalReadonlyInstance) {
|
||||||
|
throw new Error('Cannot initialize more than once!');
|
||||||
|
}
|
||||||
|
|
||||||
|
globalWritableInstance = db;
|
||||||
|
globalReadonlyInstance = db;
|
||||||
|
|
||||||
|
const silentLogger = {
|
||||||
|
...consoleLogger,
|
||||||
|
info: noop,
|
||||||
|
};
|
||||||
|
logger = silentLogger;
|
||||||
|
|
||||||
|
updateSchema(db, logger);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function teardownTests(): void {
|
||||||
|
globalWritableInstance = undefined;
|
||||||
|
globalReadonlyInstance = undefined;
|
||||||
|
}
|
||||||
|
|
||||||
async function close(): Promise<void> {
|
async function close(): Promise<void> {
|
||||||
globalReadonlyInstance?.close();
|
globalReadonlyInstance?.close();
|
||||||
globalReadonlyInstance = undefined;
|
globalReadonlyInstance = undefined;
|
||||||
|
@ -3969,17 +3992,82 @@ async function migrateConversationMessages(
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
const db = await getWritableInstance();
|
const db = await getWritableInstance();
|
||||||
|
|
||||||
db.prepare<Query>(
|
const PAGE_SIZE = 1000;
|
||||||
`
|
|
||||||
UPDATE messages SET
|
const getPage = db.prepare(`
|
||||||
|
SELECT
|
||||||
|
rowid,
|
||||||
|
json -> '$.sendStateByConversationId' AS sendStateJson,
|
||||||
|
json -> '$.editHistory' AS editHistoryJson
|
||||||
|
FROM messages
|
||||||
|
WHERE conversationId IS $obsoleteId
|
||||||
|
ORDER BY rowid
|
||||||
|
LIMIT $pageSize OFFSET $offset`);
|
||||||
|
|
||||||
|
const updateOne = db.prepare(`
|
||||||
|
UPDATE messages
|
||||||
|
SET
|
||||||
conversationId = $currentId,
|
conversationId = $currentId,
|
||||||
json = json_set(json, '$.conversationId', $currentId)
|
json = json_patch(json, $patch)
|
||||||
WHERE conversationId = $obsoleteId;
|
WHERE
|
||||||
`
|
rowid IS $rowid
|
||||||
).run({
|
`);
|
||||||
obsoleteId,
|
|
||||||
currentId,
|
db.transaction(() => {
|
||||||
});
|
// eslint-disable-next-line no-constant-condition
|
||||||
|
for (let offset = 0; true; offset += PAGE_SIZE) {
|
||||||
|
const parts: Array<{
|
||||||
|
rowid: number;
|
||||||
|
sendStateJson?: string;
|
||||||
|
editHistoryJson?: string;
|
||||||
|
}> = getPage.all({ obsoleteId, pageSize: PAGE_SIZE, offset });
|
||||||
|
|
||||||
|
for (const { rowid, sendStateJson, editHistoryJson } of parts) {
|
||||||
|
const editHistory = JSON.parse(editHistoryJson || '[]') as Array<{
|
||||||
|
sendStateByConversationId?: Record<string, unknown>;
|
||||||
|
}>;
|
||||||
|
const sendState = JSON.parse(sendStateJson || '{}');
|
||||||
|
const patch = {
|
||||||
|
conversationId: currentId,
|
||||||
|
sendStateByConversationId: {
|
||||||
|
[obsoleteId]: null,
|
||||||
|
[currentId]: sendState[obsoleteId],
|
||||||
|
},
|
||||||
|
|
||||||
|
// Unlike above here we have to provide the full object with all
|
||||||
|
// existing properties because arrays can't be patched and can only
|
||||||
|
// be replaced.
|
||||||
|
editHistory: editHistory.map(
|
||||||
|
({ sendStateByConversationId, ...rest }) => {
|
||||||
|
const existingState = sendStateByConversationId?.[obsoleteId];
|
||||||
|
if (!existingState) {
|
||||||
|
return rest;
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
...rest,
|
||||||
|
sendStateByConversationId: {
|
||||||
|
...sendStateByConversationId,
|
||||||
|
[obsoleteId]: undefined,
|
||||||
|
[currentId]: existingState,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
),
|
||||||
|
};
|
||||||
|
|
||||||
|
updateOne.run({
|
||||||
|
rowid,
|
||||||
|
patch: JSON.stringify(patch),
|
||||||
|
currentId,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (parts.length < PAGE_SIZE) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})();
|
||||||
}
|
}
|
||||||
|
|
||||||
async function getMessagesBySentAt(
|
async function getMessagesBySentAt(
|
||||||
|
|
111
ts/test-node/sql/migrateConversationMessages_test.ts
Normal file
111
ts/test-node/sql/migrateConversationMessages_test.ts
Normal file
|
@ -0,0 +1,111 @@
|
||||||
|
// Copyright 2024 Signal Messenger, LLC
|
||||||
|
// SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
import { assert } from 'chai';
|
||||||
|
import type { Database } from '@signalapp/better-sqlite3';
|
||||||
|
import SQL from '@signalapp/better-sqlite3';
|
||||||
|
|
||||||
|
import data, { setupTests, teardownTests } from '../../sql/Server';
|
||||||
|
import { insertData, getTableData } from './helpers';
|
||||||
|
|
||||||
|
describe('SQL/migrateConversationMessages', () => {
|
||||||
|
let db: Database;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
db = new SQL(':memory:');
|
||||||
|
setupTests(db);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
db.close();
|
||||||
|
teardownTests();
|
||||||
|
});
|
||||||
|
|
||||||
|
function compactify(
|
||||||
|
message: Record<string, unknown>
|
||||||
|
): Record<string, unknown> {
|
||||||
|
const { id, conversationId, json } = message;
|
||||||
|
|
||||||
|
return {
|
||||||
|
id,
|
||||||
|
conversationId,
|
||||||
|
json,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
it('should leave irrelevant messages intact', async () => {
|
||||||
|
insertData(db, 'messages', [
|
||||||
|
{
|
||||||
|
id: 'irrelevant',
|
||||||
|
conversationId: 'other',
|
||||||
|
json: {
|
||||||
|
conversationId: 'other',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
|
||||||
|
await data.migrateConversationMessages('obsolete', 'current');
|
||||||
|
|
||||||
|
assert.deepStrictEqual(getTableData(db, 'messages').map(compactify), [
|
||||||
|
{
|
||||||
|
id: 'irrelevant',
|
||||||
|
conversationId: 'other',
|
||||||
|
json: {
|
||||||
|
conversationId: 'other',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should update conversationId and send state', async () => {
|
||||||
|
insertData(db, 'messages', [
|
||||||
|
{
|
||||||
|
id: 'no-send-state',
|
||||||
|
conversationId: 'obsolete',
|
||||||
|
json: {
|
||||||
|
conversationId: 'obsolete',
|
||||||
|
body: 'test',
|
||||||
|
sendStateByConversationId: {
|
||||||
|
other: 'Failed',
|
||||||
|
obsolete: 'Read',
|
||||||
|
},
|
||||||
|
editHistory: [
|
||||||
|
{
|
||||||
|
body: 'test2',
|
||||||
|
sendStateByConversationId: {
|
||||||
|
other: 'Failed',
|
||||||
|
obsolete: 'Read',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
|
||||||
|
await data.migrateConversationMessages('obsolete', 'current');
|
||||||
|
|
||||||
|
assert.deepStrictEqual(getTableData(db, 'messages').map(compactify), [
|
||||||
|
{
|
||||||
|
id: 'no-send-state',
|
||||||
|
conversationId: 'current',
|
||||||
|
json: {
|
||||||
|
body: 'test',
|
||||||
|
conversationId: 'current',
|
||||||
|
sendStateByConversationId: {
|
||||||
|
other: 'Failed',
|
||||||
|
current: 'Read',
|
||||||
|
},
|
||||||
|
editHistory: [
|
||||||
|
{
|
||||||
|
body: 'test2',
|
||||||
|
sendStateByConversationId: {
|
||||||
|
other: 'Failed',
|
||||||
|
current: 'Read',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
});
|
Loading…
Add table
Add a link
Reference in a new issue