diff --git a/ts/jobs/callLinkRefreshJobQueue.ts b/ts/jobs/callLinkRefreshJobQueue.ts index be76c891e24..456ec135b60 100644 --- a/ts/jobs/callLinkRefreshJobQueue.ts +++ b/ts/jobs/callLinkRefreshJobQueue.ts @@ -17,15 +17,24 @@ import type { CallLinkType } from '../types/CallLink'; import { calling } from '../services/calling'; import { sleeper } from '../util/sleeper'; import { parseUnknown } from '../util/schemas'; +import { getRoomIdFromRootKey } from '../util/callLinksRingrtc'; +import { toCallHistoryFromUnusedCallLink } from '../util/callLinks'; const MAX_RETRY_TIME = DAY; -const MAX_PARALLEL_JOBS = 5; +const MAX_PARALLEL_JOBS = 10; const MAX_ATTEMPTS = exponentialBackoffMaxAttempts(MAX_RETRY_TIME); const DEFAULT_SLEEP_TIME = 20 * SECOND; +// Only rootKey is required. Other fields are only used if the call link doesn't +// exist locally, in order to create a call link. This is useful for storage sync when +// we download call link data, but we don't want to insert a record until +// the call link is confirmed valid on the calling server. const callLinkRefreshJobDataSchema = z.object({ - roomId: z.string(), - deleteLocallyIfMissingOnCallingServer: z.boolean(), + rootKey: z.string(), + adminKey: z.string().nullable().optional(), + storageID: z.string().nullable().optional(), + storageVersion: z.number().int().nullable().optional(), + storageUnknownFields: z.instanceof(Uint8Array).nullable().optional(), source: z.string(), }); @@ -57,7 +66,10 @@ export class CallLinkRefreshJobQueue extends JobQueue { }: Readonly<{ data: CallLinkRefreshJobData; timestamp: number }>, { attempt, log }: Readonly<{ attempt: number; log: LoggerType }> ): Promise { - const { roomId, deleteLocallyIfMissingOnCallingServer, source } = data; + const { rootKey, source } = data; + const callLinkRootKey = CallLinkRootKey.parse(rootKey); + const roomId = getRoomIdFromRootKey(callLinkRootKey); + const logId = `callLinkRefreshJobQueue(${roomId}, source=${source}).run`; log.info(`${logId}: Starting`); @@ -72,35 +84,61 @@ export class CallLinkRefreshJobQueue extends JobQueue { return undefined; } - const existingCallLink = await DataReader.getCallLinkByRoomId(roomId); - if (!existingCallLink) { - log.warn(`${logId}: Call link missing locally, can't refresh`); - return undefined; - } - let error: Error | undefined; - const callLinkRootKey = CallLinkRootKey.parse(existingCallLink.rootKey); try { // This will either return the fresh call link state, // null (link deleted from server), or err (connection error) const freshCallLinkState = await calling.readCallLink(callLinkRootKey); + const existingCallLink = await DataReader.getCallLinkByRoomId(roomId); + if (freshCallLinkState != null) { - log.info(`${logId}: Refreshed call link`); - const callLink: CallLinkType = { - ...existingCallLink, - ...freshCallLinkState, - }; - await DataWriter.updateCallLinkState(roomId, freshCallLinkState); - window.reduxActions.calling.handleCallLinkUpdateLocal(callLink); - } else if (deleteLocallyIfMissingOnCallingServer) { + if (existingCallLink) { + log.info(`${logId}: Updating call link with fresh state`); + const callLink: CallLinkType = { + ...existingCallLink, + ...freshCallLinkState, + }; + await DataWriter.updateCallLinkState(roomId, freshCallLinkState); + window.reduxActions.calling.handleCallLinkUpdateLocal(callLink); + } else { + log.info(`${logId}: Creating new call link`); + const { adminKey, storageID, storageVersion, storageUnknownFields } = + data; + const callLink: CallLinkType = { + ...freshCallLinkState, + roomId, + rootKey, + adminKey: adminKey ?? null, + storageID: storageID ?? undefined, + storageVersion: storageVersion ?? undefined, + storageUnknownFields, + storageNeedsSync: false, + }; + + const callHistory = toCallHistoryFromUnusedCallLink(callLink); + await Promise.all([ + DataWriter.insertCallLink(callLink), + DataWriter.saveCallHistory(callHistory), + ]); + window.reduxActions.callHistory.addCallHistory(callHistory); + window.reduxActions.calling.handleCallLinkUpdateLocal(callLink); + } + } else if (!existingCallLink) { + // When the call link is missing from the server, and we don't have a local + // call link record, that means we discovered a defunct link from storage service. + // Save this state to DefunctCallLink. log.info( - `${logId}: Call link not found on server and deleteLocallyIfMissingOnCallingServer; deleting local call link` + `${logId}: Call link not found on server but absent locally, saving DefunctCallLink` ); - // This will leave a storage service record, and it's up to primary to delete it - await DataWriter.deleteCallLinkAndHistory(roomId); - window.reduxActions.calling.handleCallLinkDelete({ roomId }); + await DataWriter.insertDefunctCallLink({ + roomId, + rootKey, + adminKey: data.adminKey ?? null, + }); } else { - log.info(`${logId}: Call link not found on server, ignoring`); + log.info( + `${logId}: Call link not found on server but present locally, ignoring` + ); } } catch (err) { error = err; diff --git a/ts/services/storageRecordOps.ts b/ts/services/storageRecordOps.ts index 2edb8324bc6..581165a445d 100644 --- a/ts/services/storageRecordOps.ts +++ b/ts/services/storageRecordOps.ts @@ -75,7 +75,6 @@ import { import { CALL_LINK_DELETED_STORAGE_RECORD_TTL, fromAdminKeyBytes, - toCallHistoryFromUnusedCallLink, } from '../util/callLinks'; import { isOlderThan } from '../util/timestamp'; import { callLinkRefreshJobQueue } from '../jobs/callLinkRefreshJobQueue'; @@ -2009,31 +2008,26 @@ export async function mergeCallLinkRecord( if (!localCallLinkDbRecord) { if (deletedAt) { - log.info( - `${logId}: Found deleted call link with no matching local record, skipping` - ); + details.push('skipping deleted call link with no matching local record'); + } else if (await DataReader.defunctCallLinkExists(roomId)) { + details.push('skipping known defunct call link'); } else { - log.info(`${logId}: Discovered new call link, creating locally`); - details.push('creating call link'); + details.push('new call link, enqueueing call link refresh and create'); - // Create CallLink and call history item + // Queue a job to refresh the call link to confirm its existence. + // Include the bundle of call link data so we can insert the call link + // after confirmation. const callLink = callLinkFromRecord(callLinkDbRecord); - const callHistory = toCallHistoryFromUnusedCallLink(callLink); - await Promise.all([ - DataWriter.insertCallLink(callLink), - DataWriter.saveCallHistory(callHistory), - ]); - - // The local DB record is a placeholder until confirmed refreshed. If it's gone from - // the calling server then delete the local record. drop( callLinkRefreshJobQueue.add({ - roomId: callLink.roomId, - deleteLocallyIfMissingOnCallingServer: true, + rootKey: callLink.rootKey, + adminKey: callLink.adminKey, + storageID: callLink.storageID, + storageVersion: callLink.storageVersion, + storageUnknownFields: callLink.storageUnknownFields, source: 'storage.mergeCallLinkRecord', }) ); - window.reduxActions.callHistory.addCallHistory(callHistory); } return { diff --git a/ts/sql/Interface.ts b/ts/sql/Interface.ts index 2a5e8a40383..8c266ac5f3e 100644 --- a/ts/sql/Interface.ts +++ b/ts/sql/Interface.ts @@ -35,6 +35,7 @@ import type { CallLinkRecord, CallLinkStateType, CallLinkType, + DefunctCallLinkType, } from '../types/CallLink'; import type { AttachmentDownloadJobType } from '../types/AttachmentDownload'; import type { @@ -580,6 +581,7 @@ type ReadableInterface = { eraId: string ) => boolean; callLinkExists(roomId: string): boolean; + defunctCallLinkExists(roomId: string): boolean; getAllCallLinks: () => ReadonlyArray; getCallLinkByRoomId: (roomId: string) => CallLinkType | undefined; getCallLinkRecordByRoomId: (roomId: string) => CallLinkRecord | undefined; @@ -819,6 +821,7 @@ type WritableInterface = { deleteCallLinkAndHistory(roomId: string): void; finalizeDeleteCallLink(roomId: string): void; _removeAllCallLinks(): void; + insertDefunctCallLink(callLink: DefunctCallLinkType): void; deleteCallLinkFromSync(roomId: string): void; migrateConversationMessages: (obsoleteId: string, currentId: string) => void; saveEditedMessage: ( diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index 03733ca3f42..fc8c81c7fa0 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -172,6 +172,7 @@ import { } from '../types/CallDisposition'; import { callLinkExists, + defunctCallLinkExists, getAllCallLinks, getCallLinkByRoomId, getCallLinkRecordByRoomId, @@ -189,6 +190,7 @@ import { beginDeleteCallLink, deleteCallLinkFromSync, _removeAllCallLinks, + insertDefunctCallLink, } from './server/callLinks'; import { replaceAllEndorsementsForGroup, @@ -313,6 +315,7 @@ export const DataReader: ServerReadableInterface = { hasGroupCallHistoryMessage, callLinkExists, + defunctCallLinkExists, getAllCallLinks, getCallLinkByRoomId, getCallLinkRecordByRoomId, @@ -460,6 +463,7 @@ export const DataWriter: ServerWritableInterface = { finalizeDeleteCallLink, _removeAllCallLinks, deleteCallLinkFromSync, + insertDefunctCallLink, migrateConversationMessages, saveEditedMessage, saveEditedMessages, @@ -6436,6 +6440,7 @@ function removeAll(db: WritableDB): void { DELETE FROM callLinks; DELETE FROM callsHistory; DELETE FROM conversations; + DELETE FROM defunctCallLinks; DELETE FROM emojis; DELETE FROM groupCallRingCancellations; DELETE FROM groupSendCombinedEndorsement; diff --git a/ts/sql/migrations/1240-defunct-call-links-table.ts b/ts/sql/migrations/1240-defunct-call-links-table.ts new file mode 100644 index 00000000000..24f0e2c6924 --- /dev/null +++ b/ts/sql/migrations/1240-defunct-call-links-table.ts @@ -0,0 +1,35 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import type { Database } from '@signalapp/better-sqlite3'; + +import type { LoggerType } from '../../types/Logging'; +import { sql } from '../util'; + +export const version = 1240; + +export function updateToSchemaVersion1240( + currentVersion: number, + db: Database, + logger: LoggerType +): void { + if (currentVersion >= 1240) { + return; + } + + db.transaction(() => { + const [createTable] = sql` + CREATE TABLE defunctCallLinks ( + roomId TEXT NOT NULL PRIMARY KEY, + rootKey BLOB NOT NULL, + adminKey BLOB + ) STRICT; + `; + + db.exec(createTable); + + db.pragma('user_version = 1240'); + })(); + + logger.info('updateToSchemaVersion1240: success!'); +} diff --git a/ts/sql/migrations/index.ts b/ts/sql/migrations/index.ts index 8bee9e34f0b..05d1a773ccd 100644 --- a/ts/sql/migrations/index.ts +++ b/ts/sql/migrations/index.ts @@ -99,10 +99,11 @@ import { updateToSchemaVersion1190 } from './1190-call-links-storage'; import { updateToSchemaVersion1200 } from './1200-attachment-download-source-index'; import { updateToSchemaVersion1210 } from './1210-call-history-started-id'; import { updateToSchemaVersion1220 } from './1220-blob-sessions'; +import { updateToSchemaVersion1230 } from './1230-call-links-admin-key-index'; import { - updateToSchemaVersion1230, + updateToSchemaVersion1240, version as MAX_VERSION, -} from './1230-call-links-admin-key-index'; +} from './1240-defunct-call-links-table'; function updateToSchemaVersion1( currentVersion: number, @@ -2071,6 +2072,7 @@ export const SCHEMA_VERSIONS = [ updateToSchemaVersion1210, updateToSchemaVersion1220, updateToSchemaVersion1230, + updateToSchemaVersion1240, ]; export class DBVersionFromFutureError extends Error { diff --git a/ts/sql/server/callLinks.ts b/ts/sql/server/callLinks.ts index f9917df155e..b44a6158167 100644 --- a/ts/sql/server/callLinks.ts +++ b/ts/sql/server/callLinks.ts @@ -6,6 +6,7 @@ import type { CallLinkRecord, CallLinkStateType, CallLinkType, + DefunctCallLinkType, } from '../../types/CallLink'; import { callLinkRestrictionsSchema, @@ -15,6 +16,7 @@ import { toAdminKeyBytes } from '../../util/callLinks'; import { callLinkToRecord, callLinkFromRecord, + toRootKeyBytes, } from '../../util/callLinksRingrtc'; import type { ReadableDB, WritableDB } from '../Interface'; import { prepare } from '../Server'; @@ -376,3 +378,41 @@ export function _removeAllCallLinks(db: WritableDB): void { `; db.prepare(query).run(params); } + +export function defunctCallLinkExists(db: ReadableDB, roomId: string): boolean { + const [query, params] = sql` + SELECT 1 + FROM defunctCallLinks + WHERE roomId = ${roomId}; + `; + return db.prepare(query).pluck(true).get(params) === 1; +} + +export function insertDefunctCallLink( + db: WritableDB, + callLink: DefunctCallLinkType +): void { + const { roomId, rootKey } = callLink; + assertRoomIdMatchesRootKey(roomId, rootKey); + + const rootKeyData = toRootKeyBytes(callLink.rootKey); + const adminKeyData = callLink.adminKey + ? toAdminKeyBytes(callLink.adminKey) + : null; + + prepare( + db, + ` + INSERT INTO defunctCallLinks ( + roomId, + rootKey, + adminKey + ) VALUES ( + $roomId, + $rootKeyData, + $adminKeyData + ) + ON CONFLICT (roomId) DO NOTHING; + ` + ).run({ roomId, rootKeyData, adminKeyData }); +} diff --git a/ts/state/ducks/calling.ts b/ts/state/ducks/calling.ts index c0b72b3002e..4adc9bff9d2 100644 --- a/ts/state/ducks/calling.ts +++ b/ts/state/ducks/calling.ts @@ -1556,8 +1556,7 @@ function handleCallLinkUpdate( // This job will throttle requests to the calling server. drop( callLinkRefreshJobQueue.add({ - roomId: callLink.roomId, - deleteLocallyIfMissingOnCallingServer: false, + rootKey, source: 'handleCallLinkUpdate', }) ); diff --git a/ts/types/CallLink.ts b/ts/types/CallLink.ts index 584b87557a8..48116d74761 100644 --- a/ts/types/CallLink.ts +++ b/ts/types/CallLink.ts @@ -83,6 +83,13 @@ export type CallLinkConversationType = ReadonlyDeep< } >; +// Call links discovered missing after server refresh +export type DefunctCallLinkType = Readonly<{ + roomId: string; + rootKey: string; + adminKey: string | null; +}>; + // DB Record export type CallLinkRecord = Readonly<{ roomId: string;