diff --git a/ts/services/storage.ts b/ts/services/storage.ts index ee9e47de46..5f089a7bce 100644 --- a/ts/services/storage.ts +++ b/ts/services/storage.ts @@ -70,11 +70,13 @@ function backOff(count: number) { return sleep(ms); } -type UnknownRecord = { +type RemoteRecord = { itemType: number; storageID: string; }; +type UnknownRecord = RemoteRecord; + async function encryptRecord( storageID: string | undefined, storageRecord: StorageRecordClass @@ -129,6 +131,8 @@ async function generateManifest( await window.ConversationController.checkForConflicts(); + await repairUnknownAndErroredRecords(); + const ITEM_TYPE = window.textsecure.protobuf.ManifestRecord.Identifier.Type; const conversationsToUpdate = []; @@ -214,7 +218,7 @@ async function generateManifest( } } - const unknownRecordsArray = ( + const unknownRecordsArray: ReadonlyArray = ( window.storage.get('storage-service-unknown-records') || [] ).filter((record: UnknownRecord) => !validRecordTypes.has(record.itemType)); @@ -233,7 +237,7 @@ async function generateManifest( manifestRecordKeys.add(identifier); }); - const recordsWithErrors = + const recordsWithErrors: ReadonlyArray = window.storage.get('storage-service-error-records') || []; window.log.info( @@ -347,6 +351,54 @@ async function generateManifest( }; } +async function repairUnknownAndErroredRecords() { + const unknownRecordsArray: ReadonlyArray = + window.storage.get('storage-service-unknown-records') || []; + + const recordsWithErrors: ReadonlyArray = + window.storage.get('storage-service-error-records') || []; + + const remoteRecords = unknownRecordsArray.concat(recordsWithErrors); + + // No repair necessary + if (remoteRecords.length === 0) { + return; + } + + // Process unknown and records with records from the past sync to see + // if they can be merged + const remoteRecordsMap: Map = new Map(); + remoteRecords.forEach(record => { + remoteRecordsMap.set(record.storageID, record); + }); + + window.log.info( + 'storageService.repairUnknownAndErroredRecords: found ' + + `${unknownRecordsArray.length} unknown records and ` + + `${recordsWithErrors.length} errored records, attempting repair` + ); + const conflictCount = await processRemoteRecords(remoteRecordsMap); + if (conflictCount !== 0) { + window.log.info( + 'storageService.repairUnknownAndErroredRecords: fixed ' + + `${conflictCount} conflicts` + ); + } + + const newUnknownCount = ( + window.storage.get('storage-service-unknown-records') || [] + ).length; + const newErroredCount = ( + window.storage.get('storage-service-error-records') || [] + ).length; + + window.log.info( + 'storageService.repairUnknownAndErroredRecords: ' + + `${newUnknownCount} unknown records and ` + + `${newErroredCount} errored records after repair` + ); +} + async function uploadManifest( version: number, { @@ -612,9 +664,6 @@ async function mergeRecord( async function processManifest( manifest: ManifestRecordClass ): Promise { - const storageKeyBase64 = window.storage.get('storageKey'); - const storageKey = base64ToArrayBuffer(storageKeyBase64); - if (!window.textsecure.messaging) { throw new Error('storageService.processManifest: We are offline!'); } @@ -632,7 +681,7 @@ async function processManifest( .map((conversation: ConversationModel) => conversation.get('storageID')) .filter(Boolean); - const unknownRecordsArray = + const unknownRecordsArray: ReadonlyArray = window.storage.get('storage-service-unknown-records') || []; unknownRecordsArray.forEach((record: UnknownRecord) => { @@ -642,7 +691,7 @@ async function processManifest( } }); - const recordsWithErrors = + const recordsWithErrors: ReadonlyArray = window.storage.get('storage-service-error-records') || []; // Do not fetch any records that we failed to merge in the previous fetch @@ -673,15 +722,44 @@ async function processManifest( } }); - const remoteOnly = Array.from(remoteOnlySet); + const remoteOnlyRecords = new Map(); + remoteOnlySet.forEach(storageID => { + remoteOnlyRecords.set(storageID, { + storageID, + itemType: remoteKeysTypeMap.get(storageID), + }); + }); + + // if the remote only keys are larger or equal to our local keys then it + // was likely a forced push of storage service. We keep track of these + // merges so that we can detect possible infinite loops + const isForcePushed = remoteOnlyRecords.size >= localKeys.length; + + const conflictCount = await processRemoteRecords( + remoteOnlyRecords, + isForcePushed + ); + const hasConflicts = conflictCount !== 0; + + return hasConflicts; +} + +async function processRemoteRecords( + remoteOnlyRecords: Map, + isForcePushed = false +): Promise { + const storageKeyBase64 = window.storage.get('storageKey'); + const storageKey = base64ToArrayBuffer(storageKeyBase64); window.log.info( - 'storageService.processManifest: remote keys', - remoteOnly.length + 'storageService.processRemoteRecords: remote keys', + remoteOnlyRecords.size ); const readOperation = new window.textsecure.protobuf.ReadOperation(); - readOperation.readKey = remoteOnly.map(base64ToArrayBuffer); + readOperation.readKey = Array.from(remoteOnlyRecords.keys()).map( + base64ToArrayBuffer + ); const credentials = window.storage.get('storageCredentials'); const storageItemsBuffer = await window.textsecure.messaging.getStorageRecords( @@ -697,9 +775,9 @@ async function processManifest( if (!storageItems.items) { window.log.info( - 'storageService.processManifest: No storage items retrieved' + 'storageService.processRemoteRecords: No storage items retrieved' ); - return false; + return 0; } const decryptedStorageItems = await pMap( @@ -711,11 +789,11 @@ async function processManifest( if (!key || !storageItemCiphertext) { window.log.error( - 'storageService.processManifest: No key or Ciphertext available' + 'storageService.processRemoteRecords: No key or Ciphertext available' ); await stopStorageServiceSync(); throw new Error( - 'storageService.processManifest: Missing key and/or Ciphertext' + 'storageService.processRemoteRecords: Missing key and/or Ciphertext' ); } @@ -734,7 +812,7 @@ async function processManifest( ); } catch (err) { window.log.error( - 'storageService.processManifest: Error decrypting storage item' + 'storageService.processRemoteRecords: Error decrypting storage item' ); await stopStorageServiceSync(); throw err; @@ -744,8 +822,16 @@ async function processManifest( storageItemPlaintext ); + const remoteRecord = remoteOnlyRecords.get(base64ItemID); + if (!remoteRecord) { + throw new Error( + "Got a remote record that wasn't requested with " + + `storageID: ${base64ItemID}` + ); + } + return { - itemType: remoteKeysTypeMap.get(base64ItemID), + itemType: remoteRecord.itemType, storageID: base64ItemID, storageRecord, }; @@ -763,23 +849,27 @@ async function processManifest( try { window.log.info( - `storageService.processManifest: Attempting to merge ${sortedStorageItems.length} records` + `storageService.processRemoteRecords: Attempting to merge ${sortedStorageItems.length} records` ); const mergedRecords = await pMap(sortedStorageItems, mergeRecord, { concurrency: 5, }); window.log.info( - `storageService.processManifest: Processed ${mergedRecords.length} records` + `storageService.processRemoteRecords: Processed ${mergedRecords.length} records` ); + // Collect full map of previously and currently unknown records const unknownRecords: Map = new Map(); + + const unknownRecordsArray: ReadonlyArray = + window.storage.get('storage-service-unknown-records') || []; unknownRecordsArray.forEach((record: UnknownRecord) => { unknownRecords.set(record.storageID, record); }); const newRecordsWithErrors: Array = []; - let hasConflict = false; + let conflictCount = 0; mergedRecords.forEach((mergedRecord: MergedRecordType) => { if (mergedRecord.isUnsupported) { @@ -794,7 +884,9 @@ async function processManifest( }); } - hasConflict = hasConflict || mergedRecord.hasConflict; + if (mergedRecord.hasConflict) { + conflictCount += 1; + } }); // Filter out all the unknown records we're already supporting @@ -803,13 +895,13 @@ async function processManifest( ); window.log.info( - 'storageService.processManifest: Unknown records found:', + 'storageService.processRemoteRecords: Unknown records found:', newUnknownRecords.length ); window.storage.put('storage-service-unknown-records', newUnknownRecords); window.log.info( - 'storageService.processManifest: Records with errors:', + 'storageService.processRemoteRecords: Records with errors:', newRecordsWithErrors.length ); // Refresh the list of records that had errors with every push, that way @@ -819,12 +911,9 @@ async function processManifest( const now = Date.now(); - // if the remote only keys are larger or equal to our local keys then it - // was likely a forced push of storage service. We keep track of these - // merges so that we can detect possible infinite loops - if (remoteOnly.length >= localKeys.length) { + if (isForcePushed) { window.log.info( - 'storageService.processManifest: remote manifest was likely force pushed', + 'storageService.processRemoteRecords: remote manifest was likely force pushed', now ); forcedPushBucket.push(now); @@ -834,9 +923,9 @@ async function processManifest( // key so that they can be included in the next update window.getConversations().forEach((conversation: ConversationModel) => { const storageID = conversation.get('storageID'); - if (storageID && !remoteOnlySet.has(storageID)) { + if (storageID && !remoteOnlyRecords.has(storageID)) { window.log.info( - 'storageService.processManifest: clearing storageID', + 'storageService.processRemoteRecords: clearing storageID', conversation.debugID() ); conversation.unset('storageID'); @@ -848,7 +937,7 @@ async function processManifest( if (now - firstMostRecentForcedPush < 5 * MINUTE) { window.log.info( - 'storageService.processManifest: thrasing? Backing off' + 'storageService.processRemoteRecords: thrasing? Backing off' ); const error = new Error(); error.code = 'E_BACKOFF'; @@ -856,7 +945,7 @@ async function processManifest( } window.log.info( - 'storageService.processManifest: thrash timestamp of first -> now', + 'storageService.processRemoteRecords: thrash timestamp of first -> now', firstMostRecentForcedPush, now ); @@ -864,23 +953,24 @@ async function processManifest( } } - if (hasConflict) { + if (conflictCount !== 0) { window.log.info( - 'storageService.processManifest: Conflict found, uploading changes' + 'storageService.processRemoteRecords: ' + + `${conflictCount} conflicts found, uploading changes` ); - return true; + return conflictCount; } consecutiveConflicts = 0; } catch (err) { window.log.error( - 'storageService.processManifest: failed!', + 'storageService.processRemoteRecords: failed!', err && err.stack ? err.stack : String(err) ); } - return false; + return 0; } async function sync(): Promise { diff --git a/ts/services/storageRecordOps.ts b/ts/services/storageRecordOps.ts index 44bf3895ca..dfd43eb923 100644 --- a/ts/services/storageRecordOps.ts +++ b/ts/services/storageRecordOps.ts @@ -6,6 +6,7 @@ import { isNumber } from 'lodash'; import { arrayBufferToBase64, base64ToArrayBuffer, + deriveMasterKeyFromGroupV1, fromEncodedBinaryToArrayBuffer, } from '../Crypto'; import dataInterface from '../sql/Client'; @@ -385,19 +386,38 @@ export async function mergeGroupV1Record( const groupId = groupV1Record.id.toBinary(); - // We do a get here because we don't get enough information from just this source to - // be able to do the right thing with this group. So we'll update the local group - // record if we have one; otherwise we'll just drop this update. - const conversation = window.ConversationController.get(groupId); + // Attempt to fetch an existing group pertaining to the `groupId` or create + // a new group and populate it with the attributes from the record. + let conversation = window.ConversationController.get(groupId); if (!conversation) { - throw new Error(`No conversation for group(${groupId})`); - } - window.log.info('storageService.mergeGroupV1Record:', conversation.debugID()); + const masterKeyBuffer = await deriveMasterKeyFromGroupV1(groupId); + const fields = deriveGroupFields(masterKeyBuffer); + const derivedGroupV2Id = arrayBufferToBase64(fields.id); - if (!conversation.isGroupV1()) { - throw new Error(`Record has group type mismatch ${conversation.debugID()}`); + window.log.info( + 'storageService.mergeGroupV1Record: failed to find group by v1 id ' + + `attempting lookup by v2 groupv2(${derivedGroupV2Id})` + ); + conversation = window.ConversationController.get(derivedGroupV2Id); + } + if (conversation) { + window.log.info( + 'storageService.mergeGroupV1Record: found existing group', + conversation.debugID() + ); + } else { + conversation = await window.ConversationController.getOrCreateAndWait( + groupId, + 'group' + ); + window.log.info( + 'storageService.mergeGroupV1Record: created a new group locally', + conversation.debugID() + ); } + // If we receive a group V1 record, remote data should take precendence + // even if the group is actually V2 on our end. conversation.set({ isArchived: Boolean(groupV1Record.archived), markedUnread: Boolean(groupV1Record.markedUnread), @@ -406,13 +426,29 @@ export async function mergeGroupV1Record( applyMessageRequestState(groupV1Record, conversation); - addUnknownFields(groupV1Record, conversation); + let hasPendingChanges: boolean; - const hasPendingChanges = doesRecordHavePendingChanges( - await toGroupV1Record(conversation), - groupV1Record, - conversation - ); + if (conversation.isGroupV1()) { + addUnknownFields(groupV1Record, conversation); + + hasPendingChanges = doesRecordHavePendingChanges( + await toGroupV1Record(conversation), + groupV1Record, + conversation + ); + } else { + // We cannot preserve unknown fields if local group is V2 and the remote is + // still V1, because the storageItem that we'll put into manifest will have + // a different record type. + window.log.info( + 'storageService.mergeGroupV1Record marking v1 ' + + ' group for an update to v2', + conversation.debugID() + ); + + // We want to upgrade group in the storage after merging it. + hasPendingChanges = true; + } updateConversation(conversation.attributes);