Save storage for defunct and pending call links
This commit is contained in:
		
					parent
					
						
							
								a7be33b201
							
						
					
				
			
			
				commit
				
					
						c6902ec26a
					
				
			
		
					 10 changed files with 474 additions and 31 deletions
				
			
		|  | @ -4,21 +4,23 @@ | |||
| import * as z from 'zod'; | ||||
| import PQueue from 'p-queue'; | ||||
| import { CallLinkRootKey } from '@signalapp/ringrtc'; | ||||
| import * as globalLogger from '../logging/log'; | ||||
| import type { LoggerType } from '../types/Logging'; | ||||
| import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff'; | ||||
| import type { ParsedJob } from './types'; | ||||
| import type { ParsedJob, StoredJob } from './types'; | ||||
| import type { JOB_STATUS } from './JobQueue'; | ||||
| import { JobQueue } from './JobQueue'; | ||||
| import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; | ||||
| import { DAY, SECOND } from '../util/durations'; | ||||
| import { commonShouldJobContinue } from './helpers/commonShouldJobContinue'; | ||||
| import { DataReader, DataWriter } from '../sql/Client'; | ||||
| import type { CallLinkType } from '../types/CallLink'; | ||||
| import type { CallLinkType, PendingCallLinkType } from '../types/CallLink'; | ||||
| import { calling } from '../services/calling'; | ||||
| import { sleeper } from '../util/sleeper'; | ||||
| import { parseUnknown } from '../util/schemas'; | ||||
| import { getRoomIdFromRootKey } from '../util/callLinksRingrtc'; | ||||
| import { toCallHistoryFromUnusedCallLink } from '../util/callLinks'; | ||||
| import type { StorageServiceFieldsType } from '../sql/Interface'; | ||||
| 
 | ||||
| const MAX_RETRY_TIME = DAY; | ||||
| const MAX_PARALLEL_JOBS = 10; | ||||
|  | @ -45,6 +47,8 @@ export type CallLinkRefreshJobData = z.infer< | |||
| export class CallLinkRefreshJobQueue extends JobQueue<CallLinkRefreshJobData> { | ||||
|   private parallelQueue = new PQueue({ concurrency: MAX_PARALLEL_JOBS }); | ||||
| 
 | ||||
|   private readonly pendingCallLinks = new Map<string, PendingCallLinkType>(); | ||||
| 
 | ||||
|   protected override getQueues(): ReadonlySet<PQueue> { | ||||
|     return new Set([this.parallelQueue]); | ||||
|   } | ||||
|  | @ -59,6 +63,97 @@ export class CallLinkRefreshJobQueue extends JobQueue<CallLinkRefreshJobData> { | |||
|     return parseUnknown(callLinkRefreshJobDataSchema, data); | ||||
|   } | ||||
| 
 | ||||
|   // Called for every job; wrap it to save pending storage data
 | ||||
|   protected override async enqueueStoredJob( | ||||
|     storedJob: Readonly<StoredJob> | ||||
|   ): Promise<void> { | ||||
|     let parsedData: CallLinkRefreshJobData | undefined; | ||||
|     try { | ||||
|       parsedData = this.parseData(storedJob.data); | ||||
|     } catch { | ||||
|       // No need to err, it will fail below during super
 | ||||
|     } | ||||
|     const { | ||||
|       storageID, | ||||
|       storageVersion, | ||||
|       storageUnknownFields, | ||||
|       rootKey, | ||||
|       adminKey, | ||||
|     } = parsedData ?? {}; | ||||
|     if (storageID && storageVersion && rootKey) { | ||||
|       this.pendingCallLinks.set(rootKey, { | ||||
|         rootKey, | ||||
|         adminKey: adminKey ?? null, | ||||
|         storageID: storageID ?? undefined, | ||||
|         storageVersion: storageVersion ?? undefined, | ||||
|         storageUnknownFields, | ||||
|         storageNeedsSync: false, | ||||
|       }); | ||||
|     } | ||||
| 
 | ||||
|     await super.enqueueStoredJob(storedJob); | ||||
| 
 | ||||
|     if (rootKey) { | ||||
|       this.pendingCallLinks.delete(rootKey); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   // Return pending call links with storageIDs and versions. They're pending because
 | ||||
|   // depending on the refresh result, we will create either CallLinks or DefunctCallLinks,
 | ||||
|   // and we'll save storageID and version onto those records.
 | ||||
|   public getPendingAdminCallLinks(): ReadonlyArray<PendingCallLinkType> { | ||||
|     return Array.from(this.pendingCallLinks.values()).filter( | ||||
|       callLink => callLink.adminKey != null | ||||
|     ); | ||||
|   } | ||||
| 
 | ||||
|   public hasPendingCallLink(rootKey: string): boolean { | ||||
|     return this.pendingCallLinks.has(rootKey); | ||||
|   } | ||||
| 
 | ||||
|   // If a new version of storage is uploaded before we get a chance to refresh the
 | ||||
|   // call link, then we need to refresh pending storage fields so when the job
 | ||||
|   // completes it will save with the latest storage fields.
 | ||||
|   public updatePendingCallLinkStorageFields( | ||||
|     rootKey: string, | ||||
|     storageFields: StorageServiceFieldsType | ||||
|   ): void { | ||||
|     const existingStorageFields = this.pendingCallLinks.get(rootKey); | ||||
|     if (!existingStorageFields) { | ||||
|       globalLogger.warn( | ||||
|         'callLinkRefreshJobQueue.updatePendingCallLinkStorageFields: unknown rootKey' | ||||
|       ); | ||||
|       return; | ||||
|     } | ||||
| 
 | ||||
|     this.pendingCallLinks.set(rootKey, { | ||||
|       ...existingStorageFields, | ||||
|       ...storageFields, | ||||
|     }); | ||||
|   } | ||||
| 
 | ||||
|   protected getPendingCallLinkStorageFields( | ||||
|     storageID: string, | ||||
|     jobData: CallLinkRefreshJobData | ||||
|   ): StorageServiceFieldsType | undefined { | ||||
|     const storageFields = this.pendingCallLinks.get(storageID); | ||||
|     if (storageFields) { | ||||
|       return { | ||||
|         storageID: storageFields.storageID, | ||||
|         storageVersion: storageFields.storageVersion, | ||||
|         storageUnknownFields: storageFields.storageUnknownFields, | ||||
|         storageNeedsSync: storageFields.storageNeedsSync, | ||||
|       }; | ||||
|     } | ||||
| 
 | ||||
|     return { | ||||
|       storageID: jobData.storageID ?? undefined, | ||||
|       storageVersion: jobData.storageVersion ?? undefined, | ||||
|       storageUnknownFields: jobData.storageUnknownFields ?? undefined, | ||||
|       storageNeedsSync: false, | ||||
|     }; | ||||
|   } | ||||
| 
 | ||||
|   protected async run( | ||||
|     { | ||||
|       data, | ||||
|  | @ -102,16 +197,18 @@ export class CallLinkRefreshJobQueue extends JobQueue<CallLinkRefreshJobData> { | |||
|           window.reduxActions.calling.handleCallLinkUpdateLocal(callLink); | ||||
|         } else { | ||||
|           log.info(`${logId}: Creating new call link`); | ||||
|           const { adminKey, storageID, storageVersion, storageUnknownFields } = | ||||
|             data; | ||||
|           const { adminKey } = data; | ||||
|           // Refresh the latest storage fields, since they may have changed.
 | ||||
|           const storageFields = this.getPendingCallLinkStorageFields( | ||||
|             rootKey, | ||||
|             data | ||||
|           ); | ||||
|           const callLink: CallLinkType = { | ||||
|             ...freshCallLinkState, | ||||
|             roomId, | ||||
|             rootKey, | ||||
|             adminKey: adminKey ?? null, | ||||
|             storageID: storageID ?? undefined, | ||||
|             storageVersion: storageVersion ?? undefined, | ||||
|             storageUnknownFields, | ||||
|             ...storageFields, | ||||
|             storageNeedsSync: false, | ||||
|           }; | ||||
| 
 | ||||
|  | @ -130,10 +227,17 @@ export class CallLinkRefreshJobQueue extends JobQueue<CallLinkRefreshJobData> { | |||
|         log.info( | ||||
|           `${logId}: Call link not found on server but absent locally, saving DefunctCallLink` | ||||
|         ); | ||||
|         // Refresh the latest storage fields, since they may have changed.
 | ||||
|         const storageFields = this.getPendingCallLinkStorageFields( | ||||
|           rootKey, | ||||
|           data | ||||
|         ); | ||||
|         await DataWriter.insertDefunctCallLink({ | ||||
|           roomId, | ||||
|           rootKey, | ||||
|           adminKey: data.adminKey ?? null, | ||||
|           ...storageFields, | ||||
|           storageNeedsSync: false, | ||||
|         }); | ||||
|       } else { | ||||
|         log.info( | ||||
|  |  | |||
|  | @ -31,6 +31,7 @@ import { | |||
|   toStickerPackRecord, | ||||
|   toCallLinkRecord, | ||||
|   mergeCallLinkRecord, | ||||
|   toDefunctOrPendingCallLinkRecord, | ||||
| } from './storageRecordOps'; | ||||
| import type { MergeResultType } from './storageRecordOps'; | ||||
| import { MAX_READ_KEYS } from './storageConstants'; | ||||
|  | @ -71,8 +72,16 @@ import { MY_STORY_ID } from '../types/Stories'; | |||
| import { isNotNil } from '../util/isNotNil'; | ||||
| import { isSignalConversation } from '../util/isSignalConversation'; | ||||
| import { redactExtendedStorageID, redactStorageID } from '../util/privacy'; | ||||
| import type { CallLinkRecord } from '../types/CallLink'; | ||||
| import { callLinkFromRecord } from '../util/callLinksRingrtc'; | ||||
| import type { | ||||
|   CallLinkRecord, | ||||
|   DefunctCallLinkType, | ||||
|   PendingCallLinkType, | ||||
| } from '../types/CallLink'; | ||||
| import { | ||||
|   callLinkFromRecord, | ||||
|   getRoomIdFromRootKeyString, | ||||
| } from '../util/callLinksRingrtc'; | ||||
| import { callLinkRefreshJobQueue } from '../jobs/callLinkRefreshJobQueue'; | ||||
| 
 | ||||
| type IManifestRecordIdentifier = Proto.ManifestRecord.IIdentifier; | ||||
| 
 | ||||
|  | @ -333,6 +342,8 @@ async function generateManifest( | |||
| 
 | ||||
|   const { | ||||
|     callLinkDbRecords, | ||||
|     defunctCallLinks, | ||||
|     pendingCallLinks, | ||||
|     storyDistributionLists, | ||||
|     installedStickerPacks, | ||||
|     uninstalledStickerPacks, | ||||
|  | @ -475,6 +486,8 @@ async function generateManifest( | |||
|       `adding callLinks=${callLinkDbRecords.length}` | ||||
|   ); | ||||
| 
 | ||||
|   const callLinkRoomIds = new Set<string>(); | ||||
| 
 | ||||
|   for (const callLinkDbRecord of callLinkDbRecords) { | ||||
|     const { roomId } = callLinkDbRecord; | ||||
|     if (callLinkDbRecord.adminKey == null || callLinkDbRecord.rootKey == null) { | ||||
|  | @ -487,8 +500,10 @@ async function generateManifest( | |||
| 
 | ||||
|     const storageRecord = new Proto.StorageRecord(); | ||||
|     storageRecord.callLink = toCallLinkRecord(callLinkDbRecord); | ||||
| 
 | ||||
|     const callLink = callLinkFromRecord(callLinkDbRecord); | ||||
| 
 | ||||
|     callLinkRoomIds.add(callLink.roomId); | ||||
| 
 | ||||
|     const { isNewItem, storageID } = processStorageRecord({ | ||||
|       currentStorageID: callLink.storageID, | ||||
|       currentStorageVersion: callLink.storageVersion, | ||||
|  | @ -521,6 +536,76 @@ async function generateManifest( | |||
|     } | ||||
|   } | ||||
| 
 | ||||
|   log.info( | ||||
|     `storageService.upload(${version}): ` + | ||||
|       `adding defunctCallLinks=${defunctCallLinks.length}` | ||||
|   ); | ||||
| 
 | ||||
|   defunctCallLinks.forEach(defunctCallLink => { | ||||
|     const storageRecord = new Proto.StorageRecord(); | ||||
|     storageRecord.callLink = toDefunctOrPendingCallLinkRecord(defunctCallLink); | ||||
| 
 | ||||
|     callLinkRoomIds.add(defunctCallLink.roomId); | ||||
| 
 | ||||
|     const { isNewItem, storageID } = processStorageRecord({ | ||||
|       currentStorageID: defunctCallLink.storageID, | ||||
|       currentStorageVersion: defunctCallLink.storageVersion, | ||||
|       identifierType: ITEM_TYPE.CALL_LINK, | ||||
|       storageNeedsSync: defunctCallLink.storageNeedsSync, | ||||
|       storageRecord, | ||||
|     }); | ||||
| 
 | ||||
|     if (isNewItem) { | ||||
|       postUploadUpdateFunctions.push(() => { | ||||
|         drop( | ||||
|           DataWriter.updateDefunctCallLink({ | ||||
|             ...defunctCallLink, | ||||
|             storageID, | ||||
|             storageVersion: version, | ||||
|             storageNeedsSync: false, | ||||
|           }) | ||||
|         ); | ||||
|       }); | ||||
|     } | ||||
|   }); | ||||
| 
 | ||||
|   log.info( | ||||
|     `storageService.upload(${version}): ` + | ||||
|       `adding pendingCallLinks=${pendingCallLinks.length}` | ||||
|   ); | ||||
| 
 | ||||
|   pendingCallLinks.forEach(pendingCallLink => { | ||||
|     const storageRecord = new Proto.StorageRecord(); | ||||
|     storageRecord.callLink = toDefunctOrPendingCallLinkRecord(pendingCallLink); | ||||
| 
 | ||||
|     const roomId = getRoomIdFromRootKeyString(pendingCallLink.rootKey); | ||||
|     if (callLinkRoomIds.has(roomId)) { | ||||
|       return; | ||||
|     } | ||||
| 
 | ||||
|     const { isNewItem, storageID } = processStorageRecord({ | ||||
|       currentStorageID: pendingCallLink.storageID, | ||||
|       currentStorageVersion: pendingCallLink.storageVersion, | ||||
|       identifierType: ITEM_TYPE.CALL_LINK, | ||||
|       storageNeedsSync: pendingCallLink.storageNeedsSync, | ||||
|       storageRecord, | ||||
|     }); | ||||
| 
 | ||||
|     if (isNewItem) { | ||||
|       postUploadUpdateFunctions.push(() => { | ||||
|         callLinkRefreshJobQueue.updatePendingCallLinkStorageFields( | ||||
|           pendingCallLink.rootKey, | ||||
|           { | ||||
|             ...pendingCallLink, | ||||
|             storageID, | ||||
|             storageVersion: version, | ||||
|             storageNeedsSync: false, | ||||
|           } | ||||
|         ); | ||||
|       }); | ||||
|     } | ||||
|   }); | ||||
| 
 | ||||
|   const unknownRecordsArray: ReadonlyArray<UnknownRecord> = ( | ||||
|     window.storage.get('storage-service-unknown-records') || [] | ||||
|   ).filter((record: UnknownRecord) => !validRecordTypes.has(record.itemType)); | ||||
|  | @ -1145,6 +1230,8 @@ async function mergeRecord( | |||
| 
 | ||||
| type NonConversationRecordsResultType = Readonly<{ | ||||
|   callLinkDbRecords: ReadonlyArray<CallLinkRecord>; | ||||
|   defunctCallLinks: ReadonlyArray<DefunctCallLinkType>; | ||||
|   pendingCallLinks: ReadonlyArray<PendingCallLinkType>; | ||||
|   installedStickerPacks: ReadonlyArray<StickerPackType>; | ||||
|   uninstalledStickerPacks: ReadonlyArray<UninstalledStickerPackType>; | ||||
|   storyDistributionLists: ReadonlyArray<StoryDistributionWithMembersType>; | ||||
|  | @ -1154,11 +1241,15 @@ type NonConversationRecordsResultType = Readonly<{ | |||
| async function getNonConversationRecords(): Promise<NonConversationRecordsResultType> { | ||||
|   const [ | ||||
|     callLinkDbRecords, | ||||
|     defunctCallLinks, | ||||
|     pendingCallLinks, | ||||
|     storyDistributionLists, | ||||
|     uninstalledStickerPacks, | ||||
|     installedStickerPacks, | ||||
|   ] = await Promise.all([ | ||||
|     DataReader.getAllCallLinkRecordsWithAdminKey(), | ||||
|     DataReader.getAllDefunctCallLinksWithAdminKey(), | ||||
|     callLinkRefreshJobQueue.getPendingAdminCallLinks(), | ||||
|     DataReader.getAllStoryDistributionsWithMembers(), | ||||
|     DataReader.getUninstalledStickerPacks(), | ||||
|     DataReader.getInstalledStickerPacks(), | ||||
|  | @ -1166,6 +1257,8 @@ async function getNonConversationRecords(): Promise<NonConversationRecordsResult | |||
| 
 | ||||
|   return { | ||||
|     callLinkDbRecords, | ||||
|     defunctCallLinks, | ||||
|     pendingCallLinks, | ||||
|     storyDistributionLists, | ||||
|     uninstalledStickerPacks, | ||||
|     installedStickerPacks, | ||||
|  | @ -1202,6 +1295,8 @@ async function processManifest( | |||
|   { | ||||
|     const { | ||||
|       callLinkDbRecords, | ||||
|       defunctCallLinks, | ||||
|       pendingCallLinks, | ||||
|       storyDistributionLists, | ||||
|       installedStickerPacks, | ||||
|       uninstalledStickerPacks, | ||||
|  | @ -1221,6 +1316,12 @@ async function processManifest( | |||
|     ); | ||||
|     localRecordCount += callLinkDbRecords.length; | ||||
| 
 | ||||
|     defunctCallLinks.forEach(collectLocalKeysFromFields); | ||||
|     localRecordCount += defunctCallLinks.length; | ||||
| 
 | ||||
|     pendingCallLinks.forEach(collectLocalKeysFromFields); | ||||
|     localRecordCount += pendingCallLinks.length; | ||||
| 
 | ||||
|     storyDistributionLists.forEach(collectLocalKeysFromFields); | ||||
|     localRecordCount += storyDistributionLists.length; | ||||
| 
 | ||||
|  | @ -1342,6 +1443,8 @@ async function processManifest( | |||
|   { | ||||
|     const { | ||||
|       callLinkDbRecords, | ||||
|       defunctCallLinks, | ||||
|       pendingCallLinks, | ||||
|       storyDistributionLists, | ||||
|       installedStickerPacks, | ||||
|       uninstalledStickerPacks, | ||||
|  | @ -1454,6 +1557,47 @@ async function processManifest( | |||
|         }) | ||||
|       ); | ||||
|     }); | ||||
| 
 | ||||
|     defunctCallLinks.forEach(defunctCallLink => { | ||||
|       const { storageID, storageVersion } = defunctCallLink; | ||||
|       if (!storageID || remoteKeys.has(storageID)) { | ||||
|         return; | ||||
|       } | ||||
| 
 | ||||
|       const missingKey = redactStorageID(storageID, storageVersion); | ||||
|       log.info( | ||||
|         `storageService.process(${version}): localKey=${missingKey} was not ` + | ||||
|           'in remote manifest' | ||||
|       ); | ||||
|       drop( | ||||
|         DataWriter.updateDefunctCallLink({ | ||||
|           ...defunctCallLink, | ||||
|           storageID: undefined, | ||||
|           storageVersion: undefined, | ||||
|         }) | ||||
|       ); | ||||
|     }); | ||||
| 
 | ||||
|     pendingCallLinks.forEach(pendingCallLink => { | ||||
|       const { storageID, storageVersion } = pendingCallLink; | ||||
|       if (!storageID || remoteKeys.has(storageID)) { | ||||
|         return; | ||||
|       } | ||||
| 
 | ||||
|       const missingKey = redactStorageID(storageID, storageVersion); | ||||
|       log.info( | ||||
|         `storageService.process(${version}): localKey=${missingKey} was not ` + | ||||
|           'in remote manifest' | ||||
|       ); | ||||
|       callLinkRefreshJobQueue.updatePendingCallLinkStorageFields( | ||||
|         pendingCallLink.rootKey, | ||||
|         { | ||||
|           ...pendingCallLink, | ||||
|           storageID: undefined, | ||||
|           storageVersion: undefined, | ||||
|         } | ||||
|       ); | ||||
|     }); | ||||
|   } | ||||
| 
 | ||||
|   log.info( | ||||
|  |  | |||
|  | @ -4,7 +4,6 @@ | |||
| import { isEqual } from 'lodash'; | ||||
| import Long from 'long'; | ||||
| 
 | ||||
| import { CallLinkRootKey } from '@signalapp/ringrtc'; | ||||
| import { uuidToBytes, bytesToUuid } from '../util/uuidToBytes'; | ||||
| import { deriveMasterKeyFromGroupV1 } from '../Crypto'; | ||||
| import * as Bytes from '../Bytes'; | ||||
|  | @ -66,13 +65,18 @@ import { findAndDeleteOnboardingStoryIfExists } from '../util/findAndDeleteOnboa | |||
| import { downloadOnboardingStory } from '../util/downloadOnboardingStory'; | ||||
| import { drop } from '../util/drop'; | ||||
| import { redactExtendedStorageID } from '../util/privacy'; | ||||
| import type { CallLinkRecord } from '../types/CallLink'; | ||||
| import type { | ||||
|   CallLinkRecord, | ||||
|   DefunctCallLinkType, | ||||
|   PendingCallLinkType, | ||||
| } from '../types/CallLink'; | ||||
| import { | ||||
|   callLinkFromRecord, | ||||
|   fromRootKeyBytes, | ||||
|   getRoomIdFromRootKey, | ||||
|   getRoomIdFromRootKeyString, | ||||
|   toRootKeyBytes, | ||||
| } from '../util/callLinksRingrtc'; | ||||
| import { fromAdminKeyBytes } from '../util/callLinks'; | ||||
| import { fromAdminKeyBytes, toAdminKeyBytes } from '../util/callLinks'; | ||||
| import { isOlderThan } from '../util/timestamp'; | ||||
| import { getMessageQueueTime } from '../util/getMessageQueueTime'; | ||||
| import { callLinkRefreshJobQueue } from '../jobs/callLinkRefreshJobQueue'; | ||||
|  | @ -643,6 +647,29 @@ export function toCallLinkRecord( | |||
|   return callLinkRecord; | ||||
| } | ||||
| 
 | ||||
| export function toDefunctOrPendingCallLinkRecord( | ||||
|   callLink: DefunctCallLinkType | PendingCallLinkType | ||||
| ): Proto.CallLinkRecord { | ||||
|   const rootKey = toRootKeyBytes(callLink.rootKey); | ||||
|   const adminKey = callLink.adminKey | ||||
|     ? toAdminKeyBytes(callLink.adminKey) | ||||
|     : null; | ||||
| 
 | ||||
|   strictAssert(rootKey, 'toDefunctOrPendingCallLinkRecord: no rootKey'); | ||||
|   strictAssert(adminKey, 'toDefunctOrPendingCallLinkRecord: no adminPasskey'); | ||||
| 
 | ||||
|   const callLinkRecord = new Proto.CallLinkRecord(); | ||||
| 
 | ||||
|   callLinkRecord.rootKey = rootKey; | ||||
|   callLinkRecord.adminPasskey = adminKey; | ||||
| 
 | ||||
|   if (callLink.storageUnknownFields) { | ||||
|     callLinkRecord.$unknownFields = [callLink.storageUnknownFields]; | ||||
|   } | ||||
| 
 | ||||
|   return callLinkRecord; | ||||
| } | ||||
| 
 | ||||
| type MessageRequestCapableRecord = Proto.IContactRecord | Proto.IGroupV1Record; | ||||
| 
 | ||||
| function applyMessageRequestState( | ||||
|  | @ -1967,8 +1994,7 @@ export async function mergeCallLinkRecord( | |||
|     ? fromAdminKeyBytes(callLinkRecord.adminPasskey) | ||||
|     : null; | ||||
| 
 | ||||
|   const callLinkRootKey = CallLinkRootKey.parse(rootKeyString); | ||||
|   const roomId = getRoomIdFromRootKey(callLinkRootKey); | ||||
|   const roomId = getRoomIdFromRootKeyString(rootKeyString); | ||||
|   const logId = `mergeCallLinkRecord(${redactedStorageID}, ${roomId})`; | ||||
| 
 | ||||
|   const localCallLinkDbRecord = | ||||
|  | @ -2012,6 +2038,17 @@ export async function mergeCallLinkRecord( | |||
|       ); | ||||
|     } else if (await DataReader.defunctCallLinkExists(roomId)) { | ||||
|       details.push('skipping known defunct call link'); | ||||
|     } else if (callLinkRefreshJobQueue.hasPendingCallLink(storageID)) { | ||||
|       details.push('pending call link refresh, updating storage fields'); | ||||
|       callLinkRefreshJobQueue.updatePendingCallLinkStorageFields( | ||||
|         rootKeyString, | ||||
|         { | ||||
|           storageID, | ||||
|           storageVersion, | ||||
|           storageUnknownFields: callLinkDbRecord.storageUnknownFields, | ||||
|           storageNeedsSync: false, | ||||
|         } | ||||
|       ); | ||||
|     } else { | ||||
|       details.push('new call link, enqueueing call link refresh and create'); | ||||
| 
 | ||||
|  |  | |||
|  | @ -589,6 +589,7 @@ type ReadableInterface = { | |||
|   getCallLinkRecordByRoomId: (roomId: string) => CallLinkRecord | undefined; | ||||
|   getAllAdminCallLinks(): ReadonlyArray<CallLinkType>; | ||||
|   getAllCallLinkRecordsWithAdminKey(): ReadonlyArray<CallLinkRecord>; | ||||
|   getAllDefunctCallLinksWithAdminKey(): ReadonlyArray<DefunctCallLinkType>; | ||||
|   getAllMarkedDeletedCallLinkRoomIds(): ReadonlyArray<string>; | ||||
|   getMessagesBetween: ( | ||||
|     conversationId: string, | ||||
|  | @ -823,7 +824,8 @@ type WritableInterface = { | |||
|   deleteCallLinkAndHistory(roomId: string): void; | ||||
|   finalizeDeleteCallLink(roomId: string): void; | ||||
|   _removeAllCallLinks(): void; | ||||
|   insertDefunctCallLink(callLink: DefunctCallLinkType): void; | ||||
|   insertDefunctCallLink(defunctCallLink: DefunctCallLinkType): void; | ||||
|   updateDefunctCallLink(defunctCallLink: DefunctCallLinkType): void; | ||||
|   deleteCallLinkFromSync(roomId: string): void; | ||||
|   migrateConversationMessages: (obsoleteId: string, currentId: string) => void; | ||||
|   saveEditedMessage: ( | ||||
|  |  | |||
|  | @ -185,12 +185,14 @@ import { | |||
|   deleteCallLinkAndHistory, | ||||
|   getAllAdminCallLinks, | ||||
|   getAllCallLinkRecordsWithAdminKey, | ||||
|   getAllDefunctCallLinksWithAdminKey, | ||||
|   getAllMarkedDeletedCallLinkRoomIds, | ||||
|   finalizeDeleteCallLink, | ||||
|   beginDeleteCallLink, | ||||
|   deleteCallLinkFromSync, | ||||
|   _removeAllCallLinks, | ||||
|   insertDefunctCallLink, | ||||
|   updateDefunctCallLink, | ||||
| } from './server/callLinks'; | ||||
| import { | ||||
|   replaceAllEndorsementsForGroup, | ||||
|  | @ -321,6 +323,7 @@ export const DataReader: ServerReadableInterface = { | |||
|   getCallLinkRecordByRoomId, | ||||
|   getAllAdminCallLinks, | ||||
|   getAllCallLinkRecordsWithAdminKey, | ||||
|   getAllDefunctCallLinksWithAdminKey, | ||||
|   getAllMarkedDeletedCallLinkRoomIds, | ||||
|   getMessagesBetween, | ||||
|   getNearbyMessageFromDeletedSet, | ||||
|  | @ -464,6 +467,7 @@ export const DataWriter: ServerWritableInterface = { | |||
|   _removeAllCallLinks, | ||||
|   deleteCallLinkFromSync, | ||||
|   insertDefunctCallLink, | ||||
|   updateDefunctCallLink, | ||||
|   migrateConversationMessages, | ||||
|   saveEditedMessage, | ||||
|   saveEditedMessages, | ||||
|  |  | |||
							
								
								
									
										28
									
								
								ts/sql/migrations/1250-defunct-call-links-storage.ts
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										28
									
								
								ts/sql/migrations/1250-defunct-call-links-storage.ts
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,28 @@ | |||
| // Copyright 2024 Signal Messenger, LLC
 | ||||
| // SPDX-License-Identifier: AGPL-3.0-only
 | ||||
| import type { Database } from '@signalapp/better-sqlite3'; | ||||
| import type { LoggerType } from '../../types/Logging'; | ||||
| 
 | ||||
| export const version = 1250; | ||||
| 
 | ||||
| export function updateToSchemaVersion1250( | ||||
|   currentVersion: number, | ||||
|   db: Database, | ||||
|   logger: LoggerType | ||||
| ): void { | ||||
|   if (currentVersion >= 1250) { | ||||
|     return; | ||||
|   } | ||||
| 
 | ||||
|   db.transaction(() => { | ||||
|     db.exec(` | ||||
|       ALTER TABLE defunctCallLinks ADD COLUMN storageID TEXT; | ||||
|       ALTER TABLE defunctCallLinks ADD COLUMN storageVersion INTEGER; | ||||
|       ALTER TABLE defunctCallLinks ADD COLUMN storageUnknownFields BLOB; | ||||
|       ALTER TABLE defunctCallLinks ADD COLUMN storageNeedsSync INTEGER NOT NULL DEFAULT 0; | ||||
|     `);
 | ||||
| 
 | ||||
|     db.pragma('user_version = 1250'); | ||||
|   })(); | ||||
|   logger.info('updateToSchemaVersion1250: success!'); | ||||
| } | ||||
|  | @ -100,10 +100,11 @@ import { updateToSchemaVersion1200 } from './1200-attachment-download-source-ind | |||
| import { updateToSchemaVersion1210 } from './1210-call-history-started-id'; | ||||
| import { updateToSchemaVersion1220 } from './1220-blob-sessions'; | ||||
| import { updateToSchemaVersion1230 } from './1230-call-links-admin-key-index'; | ||||
| import { updateToSchemaVersion1240 } from './1240-defunct-call-links-table'; | ||||
| import { | ||||
|   updateToSchemaVersion1240, | ||||
|   updateToSchemaVersion1250, | ||||
|   version as MAX_VERSION, | ||||
| } from './1240-defunct-call-links-table'; | ||||
| } from './1250-defunct-call-links-storage'; | ||||
| 
 | ||||
| function updateToSchemaVersion1( | ||||
|   currentVersion: number, | ||||
|  | @ -2073,6 +2074,7 @@ export const SCHEMA_VERSIONS = [ | |||
|   updateToSchemaVersion1220, | ||||
|   updateToSchemaVersion1230, | ||||
|   updateToSchemaVersion1240, | ||||
|   updateToSchemaVersion1250, | ||||
| ]; | ||||
| 
 | ||||
| export class DBVersionFromFutureError extends Error { | ||||
|  |  | |||
|  | @ -11,12 +11,14 @@ import type { | |||
| import { | ||||
|   callLinkRestrictionsSchema, | ||||
|   callLinkRecordSchema, | ||||
|   defunctCallLinkRecordSchema, | ||||
| } from '../../types/CallLink'; | ||||
| import { toAdminKeyBytes } from '../../util/callLinks'; | ||||
| import { | ||||
|   callLinkToRecord, | ||||
|   callLinkFromRecord, | ||||
|   toRootKeyBytes, | ||||
|   defunctCallLinkToRecord, | ||||
|   defunctCallLinkFromRecord, | ||||
| } from '../../util/callLinksRingrtc'; | ||||
| import type { ReadableDB, WritableDB } from '../Interface'; | ||||
| import { prepare } from '../Server'; | ||||
|  | @ -388,31 +390,73 @@ export function defunctCallLinkExists(db: ReadableDB, roomId: string): boolean { | |||
|   return db.prepare(query).pluck(true).get(params) === 1; | ||||
| } | ||||
| 
 | ||||
| export function getAllDefunctCallLinksWithAdminKey( | ||||
|   db: ReadableDB | ||||
| ): ReadonlyArray<DefunctCallLinkType> { | ||||
|   const [query] = sql` | ||||
|     SELECT * | ||||
|     FROM defunctCallLinks | ||||
|     WHERE adminKey IS NOT NULL; | ||||
|   `;
 | ||||
|   return db | ||||
|     .prepare(query) | ||||
|     .all() | ||||
|     .map((item: unknown) => | ||||
|       defunctCallLinkFromRecord(parseUnknown(defunctCallLinkRecordSchema, item)) | ||||
|     ); | ||||
| } | ||||
| 
 | ||||
| export function insertDefunctCallLink( | ||||
|   db: WritableDB, | ||||
|   callLink: DefunctCallLinkType | ||||
|   defunctCallLink: DefunctCallLinkType | ||||
| ): void { | ||||
|   const { roomId, rootKey } = callLink; | ||||
|   const { roomId, rootKey } = defunctCallLink; | ||||
|   assertRoomIdMatchesRootKey(roomId, rootKey); | ||||
| 
 | ||||
|   const rootKeyData = toRootKeyBytes(callLink.rootKey); | ||||
|   const adminKeyData = callLink.adminKey | ||||
|     ? toAdminKeyBytes(callLink.adminKey) | ||||
|     : null; | ||||
| 
 | ||||
|   const data = defunctCallLinkToRecord(defunctCallLink); | ||||
|   prepare( | ||||
|     db, | ||||
|     ` | ||||
|     INSERT INTO defunctCallLinks ( | ||||
|       roomId, | ||||
|       rootKey, | ||||
|       adminKey | ||||
|       adminKey, | ||||
|       storageID, | ||||
|       storageVersion, | ||||
|       storageUnknownFields, | ||||
|       storageNeedsSync | ||||
|     ) VALUES ( | ||||
|       $roomId, | ||||
|       $rootKeyData, | ||||
|       $adminKeyData | ||||
|       $rootKey, | ||||
|       $adminKey, | ||||
|       $storageID, | ||||
|       $storageVersion, | ||||
|       $storageUnknownFields, | ||||
|       $storageNeedsSync | ||||
|     ) | ||||
|     ON CONFLICT (roomId) DO NOTHING; | ||||
|     ` | ||||
|   ).run({ roomId, rootKeyData, adminKeyData }); | ||||
|   ).run(data); | ||||
| } | ||||
| 
 | ||||
| export function updateDefunctCallLink( | ||||
|   db: WritableDB, | ||||
|   defunctCallLink: DefunctCallLinkType | ||||
| ): void { | ||||
|   const { roomId, rootKey } = defunctCallLink; | ||||
|   assertRoomIdMatchesRootKey(roomId, rootKey); | ||||
| 
 | ||||
|   const data = defunctCallLinkToRecord(defunctCallLink); | ||||
|   // Do not write roomId or rootKey since they should never change
 | ||||
|   db.prepare( | ||||
|     ` | ||||
|     UPDATE callLinks | ||||
|     SET | ||||
|       storageID = $storageID, | ||||
|       storageVersion = $storageVersion, | ||||
|       storageUnknownFields = $storageUnknownFields, | ||||
|       storageNeedsSync = $storageNeedsSync | ||||
|     WHERE roomId = $roomId | ||||
|     ` | ||||
|   ).run(data); | ||||
| } | ||||
|  |  | |||
|  | @ -83,13 +83,41 @@ export type CallLinkConversationType = ReadonlyDeep< | |||
|   } | ||||
| >; | ||||
| 
 | ||||
| // Call link discovered from sync, waiting to refresh state from the calling server
 | ||||
| export type PendingCallLinkType = Readonly<{ | ||||
|   rootKey: string; | ||||
|   adminKey: string | null; | ||||
| }> & | ||||
|   StorageServiceFieldsType; | ||||
| 
 | ||||
| // Call links discovered missing after server refresh
 | ||||
| export type DefunctCallLinkType = Readonly<{ | ||||
|   roomId: string; | ||||
|   rootKey: string; | ||||
|   adminKey: string | null; | ||||
| }> & | ||||
|   StorageServiceFieldsType; | ||||
| 
 | ||||
| export type DefunctCallLinkRecord = Readonly<{ | ||||
|   roomId: string; | ||||
|   rootKey: Uint8Array; | ||||
|   adminKey: Uint8Array | null; | ||||
|   storageID: string | null; | ||||
|   storageVersion: number | null; | ||||
|   storageUnknownFields: Uint8Array | null; | ||||
|   storageNeedsSync: 1 | 0; | ||||
| }>; | ||||
| 
 | ||||
| export const defunctCallLinkRecordSchema = z.object({ | ||||
|   roomId: z.string(), | ||||
|   rootKey: z.instanceof(Uint8Array), | ||||
|   adminKey: z.instanceof(Uint8Array).nullable(), | ||||
|   storageID: z.string().nullable(), | ||||
|   storageVersion: z.number().int().nullable(), | ||||
|   storageUnknownFields: z.instanceof(Uint8Array).nullable(), | ||||
|   storageNeedsSync: z.union([z.literal(1), z.literal(0)]), | ||||
| }) satisfies z.ZodType<DefunctCallLinkRecord>; | ||||
| 
 | ||||
| // DB Record
 | ||||
| export type CallLinkRecord = Readonly<{ | ||||
|   roomId: string; | ||||
|  |  | |||
|  | @ -12,11 +12,14 @@ import type { | |||
|   CallLinkRecord, | ||||
|   CallLinkRestrictions, | ||||
|   CallLinkType, | ||||
|   DefunctCallLinkRecord, | ||||
|   DefunctCallLinkType, | ||||
| } from '../types/CallLink'; | ||||
| import { | ||||
|   type CallLinkStateType, | ||||
|   CallLinkNameMaxByteLength, | ||||
|   callLinkRecordSchema, | ||||
|   defunctCallLinkRecordSchema, | ||||
|   toCallLinkRestrictions, | ||||
| } from '../types/CallLink'; | ||||
| import { unicodeSlice } from './unicodeSlice'; | ||||
|  | @ -64,6 +67,11 @@ export function getRoomIdFromRootKey(rootKey: CallLinkRootKey): string { | |||
|   return rootKey.deriveRoomId().toString('hex'); | ||||
| } | ||||
| 
 | ||||
| export function getRoomIdFromRootKeyString(rootKeyString: string): string { | ||||
|   const callLinkRootKey = CallLinkRootKey.parse(rootKeyString); | ||||
|   return getRoomIdFromRootKey(callLinkRootKey); | ||||
| } | ||||
| 
 | ||||
| export function getCallLinkRootKeyFromUrlKey(key: string): Uint8Array { | ||||
|   // Returns `Buffer` which inherits from `Uint8Array`
 | ||||
|   return CallLinkRootKey.parse(key).bytes; | ||||
|  | @ -167,3 +175,45 @@ export function callLinkToRecord(callLink: CallLinkType): CallLinkRecord { | |||
|     storageNeedsSync: callLink.storageNeedsSync ? 1 : 0, | ||||
|   }); | ||||
| } | ||||
| 
 | ||||
| export function defunctCallLinkFromRecord( | ||||
|   record: DefunctCallLinkRecord | ||||
| ): DefunctCallLinkType { | ||||
|   if (record.rootKey == null) { | ||||
|     throw new Error('CallLink.defunctCallLinkFromRecord: rootKey is null'); | ||||
|   } | ||||
| 
 | ||||
|   const rootKey = fromRootKeyBytes(record.rootKey); | ||||
|   const adminKey = record.adminKey ? fromAdminKeyBytes(record.adminKey) : null; | ||||
|   return { | ||||
|     roomId: record.roomId, | ||||
|     rootKey, | ||||
|     adminKey, | ||||
|     storageID: record.storageID || undefined, | ||||
|     storageVersion: record.storageVersion || undefined, | ||||
|     storageUnknownFields: record.storageUnknownFields || undefined, | ||||
|     storageNeedsSync: record.storageNeedsSync === 1, | ||||
|   }; | ||||
| } | ||||
| 
 | ||||
| export function defunctCallLinkToRecord( | ||||
|   defunctCallLink: DefunctCallLinkType | ||||
| ): DefunctCallLinkRecord { | ||||
|   if (defunctCallLink.rootKey == null) { | ||||
|     throw new Error('CallLink.defunctCallLinkToRecord: rootKey is null'); | ||||
|   } | ||||
| 
 | ||||
|   const rootKey = toRootKeyBytes(defunctCallLink.rootKey); | ||||
|   const adminKey = defunctCallLink.adminKey | ||||
|     ? toAdminKeyBytes(defunctCallLink.adminKey) | ||||
|     : null; | ||||
|   return parseStrict(defunctCallLinkRecordSchema, { | ||||
|     roomId: defunctCallLink.roomId, | ||||
|     rootKey, | ||||
|     adminKey, | ||||
|     storageID: defunctCallLink.storageID || null, | ||||
|     storageVersion: defunctCallLink.storageVersion || null, | ||||
|     storageUnknownFields: defunctCallLink.storageUnknownFields || null, | ||||
|     storageNeedsSync: defunctCallLink.storageNeedsSync ? 1 : 0, | ||||
|   }); | ||||
| } | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 ayumi-signal
				ayumi-signal