Only create call links from storage sync after refresh confirmed
Co-authored-by: ayumi-signal <143036029+ayumi-signal@users.noreply.github.com>
This commit is contained in:
parent
6859b1a220
commit
23e3a847d1
9 changed files with 169 additions and 46 deletions
|
@ -17,15 +17,24 @@ import type { CallLinkType } from '../types/CallLink';
|
|||
import { calling } from '../services/calling';
|
||||
import { sleeper } from '../util/sleeper';
|
||||
import { parseUnknown } from '../util/schemas';
|
||||
import { getRoomIdFromRootKey } from '../util/callLinksRingrtc';
|
||||
import { toCallHistoryFromUnusedCallLink } from '../util/callLinks';
|
||||
|
||||
const MAX_RETRY_TIME = DAY;
|
||||
const MAX_PARALLEL_JOBS = 5;
|
||||
const MAX_PARALLEL_JOBS = 10;
|
||||
const MAX_ATTEMPTS = exponentialBackoffMaxAttempts(MAX_RETRY_TIME);
|
||||
const DEFAULT_SLEEP_TIME = 20 * SECOND;
|
||||
|
||||
// Only rootKey is required. Other fields are only used if the call link doesn't
|
||||
// exist locally, in order to create a call link. This is useful for storage sync when
|
||||
// we download call link data, but we don't want to insert a record until
|
||||
// the call link is confirmed valid on the calling server.
|
||||
const callLinkRefreshJobDataSchema = z.object({
|
||||
roomId: z.string(),
|
||||
deleteLocallyIfMissingOnCallingServer: z.boolean(),
|
||||
rootKey: z.string(),
|
||||
adminKey: z.string().nullable().optional(),
|
||||
storageID: z.string().nullable().optional(),
|
||||
storageVersion: z.number().int().nullable().optional(),
|
||||
storageUnknownFields: z.instanceof(Uint8Array).nullable().optional(),
|
||||
source: z.string(),
|
||||
});
|
||||
|
||||
|
@ -57,7 +66,10 @@ export class CallLinkRefreshJobQueue extends JobQueue<CallLinkRefreshJobData> {
|
|||
}: Readonly<{ data: CallLinkRefreshJobData; timestamp: number }>,
|
||||
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
|
||||
): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
|
||||
const { roomId, deleteLocallyIfMissingOnCallingServer, source } = data;
|
||||
const { rootKey, source } = data;
|
||||
const callLinkRootKey = CallLinkRootKey.parse(rootKey);
|
||||
const roomId = getRoomIdFromRootKey(callLinkRootKey);
|
||||
|
||||
const logId = `callLinkRefreshJobQueue(${roomId}, source=${source}).run`;
|
||||
log.info(`${logId}: Starting`);
|
||||
|
||||
|
@ -72,35 +84,61 @@ export class CallLinkRefreshJobQueue extends JobQueue<CallLinkRefreshJobData> {
|
|||
return undefined;
|
||||
}
|
||||
|
||||
const existingCallLink = await DataReader.getCallLinkByRoomId(roomId);
|
||||
if (!existingCallLink) {
|
||||
log.warn(`${logId}: Call link missing locally, can't refresh`);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
let error: Error | undefined;
|
||||
const callLinkRootKey = CallLinkRootKey.parse(existingCallLink.rootKey);
|
||||
try {
|
||||
// This will either return the fresh call link state,
|
||||
// null (link deleted from server), or err (connection error)
|
||||
const freshCallLinkState = await calling.readCallLink(callLinkRootKey);
|
||||
const existingCallLink = await DataReader.getCallLinkByRoomId(roomId);
|
||||
|
||||
if (freshCallLinkState != null) {
|
||||
log.info(`${logId}: Refreshed call link`);
|
||||
const callLink: CallLinkType = {
|
||||
...existingCallLink,
|
||||
...freshCallLinkState,
|
||||
};
|
||||
await DataWriter.updateCallLinkState(roomId, freshCallLinkState);
|
||||
window.reduxActions.calling.handleCallLinkUpdateLocal(callLink);
|
||||
} else if (deleteLocallyIfMissingOnCallingServer) {
|
||||
if (existingCallLink) {
|
||||
log.info(`${logId}: Updating call link with fresh state`);
|
||||
const callLink: CallLinkType = {
|
||||
...existingCallLink,
|
||||
...freshCallLinkState,
|
||||
};
|
||||
await DataWriter.updateCallLinkState(roomId, freshCallLinkState);
|
||||
window.reduxActions.calling.handleCallLinkUpdateLocal(callLink);
|
||||
} else {
|
||||
log.info(`${logId}: Creating new call link`);
|
||||
const { adminKey, storageID, storageVersion, storageUnknownFields } =
|
||||
data;
|
||||
const callLink: CallLinkType = {
|
||||
...freshCallLinkState,
|
||||
roomId,
|
||||
rootKey,
|
||||
adminKey: adminKey ?? null,
|
||||
storageID: storageID ?? undefined,
|
||||
storageVersion: storageVersion ?? undefined,
|
||||
storageUnknownFields,
|
||||
storageNeedsSync: false,
|
||||
};
|
||||
|
||||
const callHistory = toCallHistoryFromUnusedCallLink(callLink);
|
||||
await Promise.all([
|
||||
DataWriter.insertCallLink(callLink),
|
||||
DataWriter.saveCallHistory(callHistory),
|
||||
]);
|
||||
window.reduxActions.callHistory.addCallHistory(callHistory);
|
||||
window.reduxActions.calling.handleCallLinkUpdateLocal(callLink);
|
||||
}
|
||||
} else if (!existingCallLink) {
|
||||
// When the call link is missing from the server, and we don't have a local
|
||||
// call link record, that means we discovered a defunct link from storage service.
|
||||
// Save this state to DefunctCallLink.
|
||||
log.info(
|
||||
`${logId}: Call link not found on server and deleteLocallyIfMissingOnCallingServer; deleting local call link`
|
||||
`${logId}: Call link not found on server but absent locally, saving DefunctCallLink`
|
||||
);
|
||||
// This will leave a storage service record, and it's up to primary to delete it
|
||||
await DataWriter.deleteCallLinkAndHistory(roomId);
|
||||
window.reduxActions.calling.handleCallLinkDelete({ roomId });
|
||||
await DataWriter.insertDefunctCallLink({
|
||||
roomId,
|
||||
rootKey,
|
||||
adminKey: data.adminKey ?? null,
|
||||
});
|
||||
} else {
|
||||
log.info(`${logId}: Call link not found on server, ignoring`);
|
||||
log.info(
|
||||
`${logId}: Call link not found on server but present locally, ignoring`
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
error = err;
|
||||
|
|
|
@ -75,7 +75,6 @@ import {
|
|||
import {
|
||||
CALL_LINK_DELETED_STORAGE_RECORD_TTL,
|
||||
fromAdminKeyBytes,
|
||||
toCallHistoryFromUnusedCallLink,
|
||||
} from '../util/callLinks';
|
||||
import { isOlderThan } from '../util/timestamp';
|
||||
import { callLinkRefreshJobQueue } from '../jobs/callLinkRefreshJobQueue';
|
||||
|
@ -2009,31 +2008,26 @@ export async function mergeCallLinkRecord(
|
|||
|
||||
if (!localCallLinkDbRecord) {
|
||||
if (deletedAt) {
|
||||
log.info(
|
||||
`${logId}: Found deleted call link with no matching local record, skipping`
|
||||
);
|
||||
details.push('skipping deleted call link with no matching local record');
|
||||
} else if (await DataReader.defunctCallLinkExists(roomId)) {
|
||||
details.push('skipping known defunct call link');
|
||||
} else {
|
||||
log.info(`${logId}: Discovered new call link, creating locally`);
|
||||
details.push('creating call link');
|
||||
details.push('new call link, enqueueing call link refresh and create');
|
||||
|
||||
// Create CallLink and call history item
|
||||
// Queue a job to refresh the call link to confirm its existence.
|
||||
// Include the bundle of call link data so we can insert the call link
|
||||
// after confirmation.
|
||||
const callLink = callLinkFromRecord(callLinkDbRecord);
|
||||
const callHistory = toCallHistoryFromUnusedCallLink(callLink);
|
||||
await Promise.all([
|
||||
DataWriter.insertCallLink(callLink),
|
||||
DataWriter.saveCallHistory(callHistory),
|
||||
]);
|
||||
|
||||
// The local DB record is a placeholder until confirmed refreshed. If it's gone from
|
||||
// the calling server then delete the local record.
|
||||
drop(
|
||||
callLinkRefreshJobQueue.add({
|
||||
roomId: callLink.roomId,
|
||||
deleteLocallyIfMissingOnCallingServer: true,
|
||||
rootKey: callLink.rootKey,
|
||||
adminKey: callLink.adminKey,
|
||||
storageID: callLink.storageID,
|
||||
storageVersion: callLink.storageVersion,
|
||||
storageUnknownFields: callLink.storageUnknownFields,
|
||||
source: 'storage.mergeCallLinkRecord',
|
||||
})
|
||||
);
|
||||
window.reduxActions.callHistory.addCallHistory(callHistory);
|
||||
}
|
||||
|
||||
return {
|
||||
|
|
|
@ -35,6 +35,7 @@ import type {
|
|||
CallLinkRecord,
|
||||
CallLinkStateType,
|
||||
CallLinkType,
|
||||
DefunctCallLinkType,
|
||||
} from '../types/CallLink';
|
||||
import type { AttachmentDownloadJobType } from '../types/AttachmentDownload';
|
||||
import type {
|
||||
|
@ -580,6 +581,7 @@ type ReadableInterface = {
|
|||
eraId: string
|
||||
) => boolean;
|
||||
callLinkExists(roomId: string): boolean;
|
||||
defunctCallLinkExists(roomId: string): boolean;
|
||||
getAllCallLinks: () => ReadonlyArray<CallLinkType>;
|
||||
getCallLinkByRoomId: (roomId: string) => CallLinkType | undefined;
|
||||
getCallLinkRecordByRoomId: (roomId: string) => CallLinkRecord | undefined;
|
||||
|
@ -819,6 +821,7 @@ type WritableInterface = {
|
|||
deleteCallLinkAndHistory(roomId: string): void;
|
||||
finalizeDeleteCallLink(roomId: string): void;
|
||||
_removeAllCallLinks(): void;
|
||||
insertDefunctCallLink(callLink: DefunctCallLinkType): void;
|
||||
deleteCallLinkFromSync(roomId: string): void;
|
||||
migrateConversationMessages: (obsoleteId: string, currentId: string) => void;
|
||||
saveEditedMessage: (
|
||||
|
|
|
@ -172,6 +172,7 @@ import {
|
|||
} from '../types/CallDisposition';
|
||||
import {
|
||||
callLinkExists,
|
||||
defunctCallLinkExists,
|
||||
getAllCallLinks,
|
||||
getCallLinkByRoomId,
|
||||
getCallLinkRecordByRoomId,
|
||||
|
@ -189,6 +190,7 @@ import {
|
|||
beginDeleteCallLink,
|
||||
deleteCallLinkFromSync,
|
||||
_removeAllCallLinks,
|
||||
insertDefunctCallLink,
|
||||
} from './server/callLinks';
|
||||
import {
|
||||
replaceAllEndorsementsForGroup,
|
||||
|
@ -313,6 +315,7 @@ export const DataReader: ServerReadableInterface = {
|
|||
hasGroupCallHistoryMessage,
|
||||
|
||||
callLinkExists,
|
||||
defunctCallLinkExists,
|
||||
getAllCallLinks,
|
||||
getCallLinkByRoomId,
|
||||
getCallLinkRecordByRoomId,
|
||||
|
@ -460,6 +463,7 @@ export const DataWriter: ServerWritableInterface = {
|
|||
finalizeDeleteCallLink,
|
||||
_removeAllCallLinks,
|
||||
deleteCallLinkFromSync,
|
||||
insertDefunctCallLink,
|
||||
migrateConversationMessages,
|
||||
saveEditedMessage,
|
||||
saveEditedMessages,
|
||||
|
@ -6436,6 +6440,7 @@ function removeAll(db: WritableDB): void {
|
|||
DELETE FROM callLinks;
|
||||
DELETE FROM callsHistory;
|
||||
DELETE FROM conversations;
|
||||
DELETE FROM defunctCallLinks;
|
||||
DELETE FROM emojis;
|
||||
DELETE FROM groupCallRingCancellations;
|
||||
DELETE FROM groupSendCombinedEndorsement;
|
||||
|
|
35
ts/sql/migrations/1240-defunct-call-links-table.ts
Normal file
35
ts/sql/migrations/1240-defunct-call-links-table.ts
Normal file
|
@ -0,0 +1,35 @@
|
|||
// Copyright 2024 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import type { Database } from '@signalapp/better-sqlite3';
|
||||
|
||||
import type { LoggerType } from '../../types/Logging';
|
||||
import { sql } from '../util';
|
||||
|
||||
export const version = 1240;
|
||||
|
||||
export function updateToSchemaVersion1240(
|
||||
currentVersion: number,
|
||||
db: Database,
|
||||
logger: LoggerType
|
||||
): void {
|
||||
if (currentVersion >= 1240) {
|
||||
return;
|
||||
}
|
||||
|
||||
db.transaction(() => {
|
||||
const [createTable] = sql`
|
||||
CREATE TABLE defunctCallLinks (
|
||||
roomId TEXT NOT NULL PRIMARY KEY,
|
||||
rootKey BLOB NOT NULL,
|
||||
adminKey BLOB
|
||||
) STRICT;
|
||||
`;
|
||||
|
||||
db.exec(createTable);
|
||||
|
||||
db.pragma('user_version = 1240');
|
||||
})();
|
||||
|
||||
logger.info('updateToSchemaVersion1240: success!');
|
||||
}
|
|
@ -99,10 +99,11 @@ import { updateToSchemaVersion1190 } from './1190-call-links-storage';
|
|||
import { updateToSchemaVersion1200 } from './1200-attachment-download-source-index';
|
||||
import { updateToSchemaVersion1210 } from './1210-call-history-started-id';
|
||||
import { updateToSchemaVersion1220 } from './1220-blob-sessions';
|
||||
import { updateToSchemaVersion1230 } from './1230-call-links-admin-key-index';
|
||||
import {
|
||||
updateToSchemaVersion1230,
|
||||
updateToSchemaVersion1240,
|
||||
version as MAX_VERSION,
|
||||
} from './1230-call-links-admin-key-index';
|
||||
} from './1240-defunct-call-links-table';
|
||||
|
||||
function updateToSchemaVersion1(
|
||||
currentVersion: number,
|
||||
|
@ -2071,6 +2072,7 @@ export const SCHEMA_VERSIONS = [
|
|||
updateToSchemaVersion1210,
|
||||
updateToSchemaVersion1220,
|
||||
updateToSchemaVersion1230,
|
||||
updateToSchemaVersion1240,
|
||||
];
|
||||
|
||||
export class DBVersionFromFutureError extends Error {
|
||||
|
|
|
@ -6,6 +6,7 @@ import type {
|
|||
CallLinkRecord,
|
||||
CallLinkStateType,
|
||||
CallLinkType,
|
||||
DefunctCallLinkType,
|
||||
} from '../../types/CallLink';
|
||||
import {
|
||||
callLinkRestrictionsSchema,
|
||||
|
@ -15,6 +16,7 @@ import { toAdminKeyBytes } from '../../util/callLinks';
|
|||
import {
|
||||
callLinkToRecord,
|
||||
callLinkFromRecord,
|
||||
toRootKeyBytes,
|
||||
} from '../../util/callLinksRingrtc';
|
||||
import type { ReadableDB, WritableDB } from '../Interface';
|
||||
import { prepare } from '../Server';
|
||||
|
@ -376,3 +378,41 @@ export function _removeAllCallLinks(db: WritableDB): void {
|
|||
`;
|
||||
db.prepare(query).run(params);
|
||||
}
|
||||
|
||||
export function defunctCallLinkExists(db: ReadableDB, roomId: string): boolean {
|
||||
const [query, params] = sql`
|
||||
SELECT 1
|
||||
FROM defunctCallLinks
|
||||
WHERE roomId = ${roomId};
|
||||
`;
|
||||
return db.prepare(query).pluck(true).get(params) === 1;
|
||||
}
|
||||
|
||||
export function insertDefunctCallLink(
|
||||
db: WritableDB,
|
||||
callLink: DefunctCallLinkType
|
||||
): void {
|
||||
const { roomId, rootKey } = callLink;
|
||||
assertRoomIdMatchesRootKey(roomId, rootKey);
|
||||
|
||||
const rootKeyData = toRootKeyBytes(callLink.rootKey);
|
||||
const adminKeyData = callLink.adminKey
|
||||
? toAdminKeyBytes(callLink.adminKey)
|
||||
: null;
|
||||
|
||||
prepare(
|
||||
db,
|
||||
`
|
||||
INSERT INTO defunctCallLinks (
|
||||
roomId,
|
||||
rootKey,
|
||||
adminKey
|
||||
) VALUES (
|
||||
$roomId,
|
||||
$rootKeyData,
|
||||
$adminKeyData
|
||||
)
|
||||
ON CONFLICT (roomId) DO NOTHING;
|
||||
`
|
||||
).run({ roomId, rootKeyData, adminKeyData });
|
||||
}
|
||||
|
|
|
@ -1556,8 +1556,7 @@ function handleCallLinkUpdate(
|
|||
// This job will throttle requests to the calling server.
|
||||
drop(
|
||||
callLinkRefreshJobQueue.add({
|
||||
roomId: callLink.roomId,
|
||||
deleteLocallyIfMissingOnCallingServer: false,
|
||||
rootKey,
|
||||
source: 'handleCallLinkUpdate',
|
||||
})
|
||||
);
|
||||
|
|
|
@ -83,6 +83,13 @@ export type CallLinkConversationType = ReadonlyDeep<
|
|||
}
|
||||
>;
|
||||
|
||||
// Call links discovered missing after server refresh
|
||||
export type DefunctCallLinkType = Readonly<{
|
||||
roomId: string;
|
||||
rootKey: string;
|
||||
adminKey: string | null;
|
||||
}>;
|
||||
|
||||
// DB Record
|
||||
export type CallLinkRecord = Readonly<{
|
||||
roomId: string;
|
||||
|
|
Loading…
Reference in a new issue