Fix db error from concurrent processing of call link update sync
This commit is contained in:
parent
746b22d3dc
commit
bcb1a614ea
6 changed files with 217 additions and 40 deletions
|
@ -53,6 +53,7 @@ import type { AttachmentBackupJobType } from '../types/AttachmentBackup';
|
|||
import type { GifType } from '../components/fun/panels/FunPanelGifs';
|
||||
import type { NotificationProfileType } from '../types/NotificationProfile';
|
||||
import type { DonationReceipt } from '../types/Donations';
|
||||
import type { InsertOrUpdateCallLinkFromSyncResult } from './server/callLinks';
|
||||
|
||||
export type ReadableDB = Database & { __readable_db: never };
|
||||
export type WritableDB = ReadableDB & { __writable_db: never };
|
||||
|
@ -1030,8 +1031,10 @@ type WritableInterface = {
|
|||
markCallHistoryMissed(callIds: ReadonlyArray<string>): void;
|
||||
getRecentStaleRingsAndMarkOlderMissed(): ReadonlyArray<MaybeStaleCallHistory>;
|
||||
insertCallLink(callLink: CallLinkType): void;
|
||||
insertOrUpdateCallLinkFromSync(
|
||||
callLink: CallLinkType
|
||||
): InsertOrUpdateCallLinkFromSyncResult;
|
||||
updateCallLink(callLink: CallLinkType): void;
|
||||
updateCallLinkAdminKeyByRoomId(roomId: string, adminKey: string): void;
|
||||
updateCallLinkState(
|
||||
roomId: string,
|
||||
callLinkState: CallLinkStateType
|
||||
|
|
|
@ -213,8 +213,8 @@ import {
|
|||
getCallLinkRecordByRoomId,
|
||||
insertCallLink,
|
||||
insertDefunctCallLink,
|
||||
insertOrUpdateCallLinkFromSync,
|
||||
updateCallLink,
|
||||
updateCallLinkAdminKeyByRoomId,
|
||||
updateCallLinkState,
|
||||
updateDefunctCallLink,
|
||||
} from './server/callLinks';
|
||||
|
@ -546,8 +546,8 @@ export const DataWriter: ServerWritableInterface = {
|
|||
saveCallHistory,
|
||||
markCallHistoryMissed,
|
||||
insertCallLink,
|
||||
insertOrUpdateCallLinkFromSync,
|
||||
updateCallLink,
|
||||
updateCallLinkAdminKeyByRoomId,
|
||||
updateCallLinkState,
|
||||
beginDeleteAllCallLinks,
|
||||
beginDeleteCallLink,
|
||||
|
|
|
@ -121,6 +121,41 @@ export function insertCallLink(db: WritableDB, callLink: CallLinkType): void {
|
|||
_insertCallLink(db, callLink);
|
||||
}
|
||||
|
||||
export type InsertOrUpdateCallLinkFromSyncResult = Readonly<{
|
||||
callLink: CallLinkType;
|
||||
inserted: boolean;
|
||||
updated: boolean;
|
||||
}>;
|
||||
|
||||
export function insertOrUpdateCallLinkFromSync(
|
||||
db: WritableDB,
|
||||
callLink: CallLinkType
|
||||
): InsertOrUpdateCallLinkFromSyncResult {
|
||||
const { roomId, adminKey } = callLink;
|
||||
return db.transaction(() => {
|
||||
const existingCallLink = getCallLinkByRoomId(db, roomId);
|
||||
if (existingCallLink) {
|
||||
if (adminKey && adminKey !== existingCallLink.adminKey) {
|
||||
updateCallLinkAdminKeyByRoomId(db, roomId, adminKey);
|
||||
return {
|
||||
callLink: { ...existingCallLink, adminKey },
|
||||
inserted: false,
|
||||
updated: true,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
callLink: existingCallLink,
|
||||
inserted: false,
|
||||
updated: false,
|
||||
};
|
||||
}
|
||||
|
||||
insertCallLink(db, callLink);
|
||||
return { callLink, inserted: true, updated: false };
|
||||
})();
|
||||
}
|
||||
|
||||
export function updateCallLink(db: WritableDB, callLink: CallLinkType): void {
|
||||
const { roomId, rootKey } = callLink;
|
||||
assertRoomIdMatchesRootKey(roomId, rootKey);
|
||||
|
|
|
@ -100,7 +100,6 @@ import { isAciString } from '../../util/isAciString';
|
|||
import type { CallHistoryAdd } from './callHistory';
|
||||
import { addCallHistory, reloadCallHistory } from './callHistory';
|
||||
import { saveDraftRecordingIfNeeded } from './composer';
|
||||
import type { CallHistoryDetails } from '../../types/CallDisposition';
|
||||
import type { StartCallData } from '../../components/ConfirmLeaveCallModal';
|
||||
import {
|
||||
getCallLinksByRoomId,
|
||||
|
@ -1600,39 +1599,38 @@ function handleCallLinkUpdate(
|
|||
const roomId = getRoomIdFromRootKey(callLinkRootKey);
|
||||
const logId = `handleCallLinkUpdate(${roomId})`;
|
||||
|
||||
const existingCallLink = await DataReader.getCallLinkByRoomId(roomId);
|
||||
|
||||
const callLink: CallLinkType = {
|
||||
...CALL_LINK_DEFAULT_STATE,
|
||||
storageNeedsSync: false,
|
||||
...existingCallLink,
|
||||
roomId,
|
||||
rootKey,
|
||||
adminKey,
|
||||
};
|
||||
|
||||
let callHistory: CallHistoryDetails | null = null;
|
||||
const result = await DataWriter.insertOrUpdateCallLinkFromSync(callLink);
|
||||
|
||||
if (existingCallLink) {
|
||||
if (adminKey && adminKey !== existingCallLink.adminKey) {
|
||||
log.info(`${logId}: Updating existing call link with new adminKey`);
|
||||
await DataWriter.updateCallLinkAdminKeyByRoomId(roomId, adminKey);
|
||||
}
|
||||
if (result.inserted) {
|
||||
log.info(`${logId}: Saved new call link`);
|
||||
} else if (result.updated) {
|
||||
log.info(`${logId}: Updated existing call link with new adminKey`);
|
||||
} else {
|
||||
log.info(`${logId}: Saving new call link`);
|
||||
await DataWriter.insertCallLink(callLink);
|
||||
if (adminKey != null) {
|
||||
callHistory = toCallHistoryFromUnusedCallLink(callLink);
|
||||
await DataWriter.saveCallHistory(callHistory);
|
||||
}
|
||||
// This case happens when concurrently processing a batch of call link sync messages
|
||||
// for the same roomId.
|
||||
log.info(
|
||||
`${logId}: Discarding call link update because we are already up to date`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
dispatch({
|
||||
type: HANDLE_CALL_LINK_UPDATE,
|
||||
payload: { callLink },
|
||||
payload: { callLink: result.callLink },
|
||||
});
|
||||
|
||||
if (callHistory != null) {
|
||||
const isPlaceholderCallHistoryNeeded = result.inserted && adminKey != null;
|
||||
if (isPlaceholderCallHistoryNeeded) {
|
||||
const callHistory = toCallHistoryFromUnusedCallLink(callLink);
|
||||
await DataWriter.saveCallHistory(callHistory);
|
||||
dispatch(addCallHistory(callHistory));
|
||||
}
|
||||
|
||||
|
|
130
ts/test-electron/sql/callLinks_test.ts
Normal file
130
ts/test-electron/sql/callLinks_test.ts
Normal file
|
@ -0,0 +1,130 @@
|
|||
// Copyright 2025 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import { assert } from 'chai';
|
||||
|
||||
import { DataReader, DataWriter } from '../../sql/Client';
|
||||
|
||||
import {
|
||||
FAKE_CALL_LINK,
|
||||
FAKE_CALL_LINK_WITH_ADMIN_KEY,
|
||||
} from '../../test-helpers/fakeCallLink';
|
||||
|
||||
const { getCallLinkByRoomId } = DataReader;
|
||||
const { removeAll, insertCallLink, insertOrUpdateCallLinkFromSync } =
|
||||
DataWriter;
|
||||
|
||||
describe('sql/insertOrUpdateCallLinkFromSync', () => {
|
||||
beforeEach(async () => {
|
||||
await removeAll();
|
||||
});
|
||||
after(async () => {
|
||||
await removeAll();
|
||||
});
|
||||
|
||||
it('inserts call links', async () => {
|
||||
const {
|
||||
callLink: resultCallLink,
|
||||
inserted,
|
||||
updated,
|
||||
} = await insertOrUpdateCallLinkFromSync(FAKE_CALL_LINK);
|
||||
|
||||
assert.deepEqual(
|
||||
resultCallLink,
|
||||
FAKE_CALL_LINK,
|
||||
'return value call link should match input'
|
||||
);
|
||||
assert.equal(inserted, true, 'result.inserted value should be true');
|
||||
assert.equal(updated, false, 'result.updated value should be false');
|
||||
|
||||
const dbCallLink = await getCallLinkByRoomId(FAKE_CALL_LINK.roomId);
|
||||
assert.deepEqual(
|
||||
resultCallLink,
|
||||
dbCallLink,
|
||||
'database call link should match input'
|
||||
);
|
||||
});
|
||||
|
||||
it('inserts admin call links', async () => {
|
||||
const {
|
||||
callLink: resultCallLink,
|
||||
inserted,
|
||||
updated,
|
||||
} = await insertOrUpdateCallLinkFromSync(FAKE_CALL_LINK_WITH_ADMIN_KEY);
|
||||
|
||||
assert.deepEqual(
|
||||
resultCallLink,
|
||||
FAKE_CALL_LINK_WITH_ADMIN_KEY,
|
||||
'return value call link should match input'
|
||||
);
|
||||
assert.equal(inserted, true, 'result.inserted value should be true');
|
||||
assert.equal(updated, false, 'result.updated value should be false');
|
||||
|
||||
const dbCallLink = await getCallLinkByRoomId(
|
||||
FAKE_CALL_LINK_WITH_ADMIN_KEY.roomId
|
||||
);
|
||||
assert.deepEqual(
|
||||
resultCallLink,
|
||||
dbCallLink,
|
||||
'database call link should match input'
|
||||
);
|
||||
});
|
||||
|
||||
it('updates call links with admin key', async () => {
|
||||
await insertCallLink(FAKE_CALL_LINK);
|
||||
|
||||
const newAdminKey = FAKE_CALL_LINK_WITH_ADMIN_KEY.adminKey;
|
||||
const callLinkUpdateData = {
|
||||
...FAKE_CALL_LINK,
|
||||
adminKey: newAdminKey,
|
||||
};
|
||||
const {
|
||||
callLink: resultCallLink,
|
||||
inserted,
|
||||
updated,
|
||||
} = await insertOrUpdateCallLinkFromSync(callLinkUpdateData);
|
||||
|
||||
assert.deepEqual(
|
||||
resultCallLink,
|
||||
callLinkUpdateData,
|
||||
'return value call link should match input'
|
||||
);
|
||||
|
||||
const dbCallLink = await getCallLinkByRoomId(FAKE_CALL_LINK.roomId);
|
||||
assert.deepEqual(
|
||||
resultCallLink,
|
||||
dbCallLink,
|
||||
'database call link should match input'
|
||||
);
|
||||
|
||||
assert.equal(inserted, false, 'result.inserted value should be false');
|
||||
assert.equal(updated, true, 'result.updated value should be true');
|
||||
});
|
||||
|
||||
it('no ops when the db is up to date', async () => {
|
||||
await insertCallLink(FAKE_CALL_LINK_WITH_ADMIN_KEY);
|
||||
|
||||
const {
|
||||
callLink: resultCallLink,
|
||||
inserted,
|
||||
updated,
|
||||
} = await insertOrUpdateCallLinkFromSync(FAKE_CALL_LINK_WITH_ADMIN_KEY);
|
||||
|
||||
assert.deepEqual(
|
||||
resultCallLink,
|
||||
FAKE_CALL_LINK_WITH_ADMIN_KEY,
|
||||
'return value call link should match input'
|
||||
);
|
||||
assert.equal(inserted, false, 'result.inserted value should be false');
|
||||
assert.equal(updated, false, 'result.updated value should be true');
|
||||
|
||||
const dbCallLink = await getCallLinkByRoomId(
|
||||
FAKE_CALL_LINK_WITH_ADMIN_KEY.roomId
|
||||
);
|
||||
assert.deepEqual(
|
||||
resultCallLink,
|
||||
dbCallLink,
|
||||
'database call link should match input'
|
||||
);
|
||||
});
|
||||
});
|
|
@ -50,6 +50,7 @@ import {
|
|||
import { strictAssert } from '../../../util/assert';
|
||||
import { callLinkRefreshJobQueue } from '../../../jobs/callLinkRefreshJobQueue';
|
||||
import { CALL_LINK_DEFAULT_STATE } from '../../../util/callLinks';
|
||||
import { DataWriter } from '../../../sql/Client';
|
||||
|
||||
const ACI_1 = generateAci();
|
||||
const NOW = new Date('2020-01-23T04:56:00.000');
|
||||
|
@ -1466,11 +1467,13 @@ describe('calling duck', () => {
|
|||
describe('handleCallLinkUpdate', () => {
|
||||
const { roomId, rootKey, adminKey } = FAKE_CALL_LINK;
|
||||
|
||||
beforeEach(function (this: Mocha.Context) {
|
||||
beforeEach(async function (this: Mocha.Context) {
|
||||
await DataWriter.removeAll();
|
||||
this.callLinkRefreshJobQueueAdd = this.sandbox.stub(
|
||||
callLinkRefreshJobQueue,
|
||||
'add'
|
||||
);
|
||||
this.clock = this.sandbox.useFakeTimers();
|
||||
});
|
||||
|
||||
const doAction = async (
|
||||
|
@ -1500,9 +1503,6 @@ describe('calling duck', () => {
|
|||
roomId,
|
||||
rootKey,
|
||||
adminKey,
|
||||
storageID: undefined,
|
||||
storageVersion: undefined,
|
||||
storageUnknownFields: undefined,
|
||||
storageNeedsSync: false,
|
||||
},
|
||||
},
|
||||
|
@ -1512,22 +1512,33 @@ describe('calling duck', () => {
|
|||
it('can save adminKey', async () => {
|
||||
const { dispatch } = await doAction({ rootKey, adminKey: 'banana' });
|
||||
|
||||
sinon.assert.calledOnce(dispatch);
|
||||
sinon.assert.calledWith(dispatch, {
|
||||
type: 'calling/HANDLE_CALL_LINK_UPDATE',
|
||||
payload: {
|
||||
callLink: {
|
||||
...CALL_LINK_DEFAULT_STATE,
|
||||
roomId,
|
||||
rootKey,
|
||||
adminKey: 'banana',
|
||||
storageID: undefined,
|
||||
storageVersion: undefined,
|
||||
storageUnknownFields: undefined,
|
||||
storageNeedsSync: false,
|
||||
sinon.assert.calledTwice(dispatch);
|
||||
assert(
|
||||
dispatch.getCall(0).calledWithExactly({
|
||||
type: 'calling/HANDLE_CALL_LINK_UPDATE',
|
||||
payload: {
|
||||
callLink: {
|
||||
...CALL_LINK_DEFAULT_STATE,
|
||||
roomId,
|
||||
rootKey,
|
||||
adminKey: 'banana',
|
||||
storageNeedsSync: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
}),
|
||||
'dispatches HANDLE_CALL_LINK_UPDATE'
|
||||
);
|
||||
const secondCall = dispatch.getCall(1);
|
||||
assert.strictEqual(
|
||||
secondCall.args[0].type,
|
||||
'callHistory/ADD',
|
||||
'dispatches CALL_HISTORY_ADD'
|
||||
);
|
||||
assert.strictEqual(
|
||||
secondCall.args[0].payload.peerId,
|
||||
roomId,
|
||||
'CALL_HISTORY_ADD peerId is call link roomId'
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue