// Copyright 2020-2022 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only import { debounce, isNumber, chunk } from 'lodash'; import pMap from 'p-map'; import Long from 'long'; import dataInterface from '../sql/Client'; import * as Bytes from '../Bytes'; import { getRandomBytes, deriveStorageItemKey, deriveStorageManifestKey, encryptProfile, decryptProfile, deriveMasterKeyFromGroupV1, } from '../Crypto'; import { mergeAccountRecord, mergeContactRecord, mergeGroupV1Record, mergeGroupV2Record, toAccountRecord, toContactRecord, toGroupV1Record, toGroupV2Record, } from './storageRecordOps'; import type { MergeResultType } from './storageRecordOps'; import { MAX_READ_KEYS } from './storageConstants'; import type { ConversationModel } from '../models/conversations'; import { strictAssert } from '../util/assert'; import { dropNull } from '../util/dropNull'; import * as durations from '../util/durations'; import { BackOff } from '../util/BackOff'; import { storageJobQueue } from '../util/JobQueue'; import { sleep } from '../util/sleep'; import { isMoreRecentThan } from '../util/timestamp'; import { ourProfileKeyService } from './ourProfileKey'; import { ConversationTypes, typeofConversation, } from '../util/whatTypeOfConversation'; import { SignalService as Proto } from '../protobuf'; import * as log from '../logging/log'; import { singleProtoJobQueue } from '../jobs/singleProtoJobQueue'; import * as Errors from '../types/errors'; import type { ExtendedStorageID, RemoteRecord, UnknownRecord, } from '../types/StorageService.d'; type IManifestRecordIdentifier = Proto.ManifestRecord.IIdentifier; const { eraseStorageServiceStateFromConversations, updateConversation, updateConversations, } = dataInterface; const uploadBucket: Array = []; const validRecordTypes = new Set([ 0, // UNKNOWN 1, // CONTACT 2, // GROUPV1 3, // GROUPV2 4, // ACCOUNT ]); const backOff = new BackOff([ durations.SECOND, 5 * durations.SECOND, 30 * durations.SECOND, 2 * durations.MINUTE, 5 * durations.MINUTE, ]); const conflictBackOff = new BackOff([ durations.SECOND, 5 * durations.SECOND, 30 * durations.SECOND, ]); function redactStorageID( storageID: string, version?: number, conversation?: ConversationModel ): string { const convoId = conversation ? ` ${conversation?.idForLogging()}` : ''; return `${version ?? '?'}:${storageID.substring(0, 3)}${convoId}`; } function redactExtendedStorageID({ storageID, storageVersion, }: ExtendedStorageID): string { return redactStorageID(storageID, storageVersion); } async function encryptRecord( storageID: string | undefined, storageRecord: Proto.IStorageRecord ): Promise { const storageItem = new Proto.StorageItem(); const storageKeyBuffer = storageID ? Bytes.fromBase64(String(storageID)) : generateStorageID(); const storageKeyBase64 = window.storage.get('storageKey'); if (!storageKeyBase64) { throw new Error('No storage key'); } const storageKey = Bytes.fromBase64(storageKeyBase64); const storageItemKey = deriveStorageItemKey( storageKey, Bytes.toBase64(storageKeyBuffer) ); const encryptedRecord = encryptProfile( Proto.StorageRecord.encode(storageRecord).finish(), storageItemKey ); storageItem.key = storageKeyBuffer; storageItem.value = encryptedRecord; return storageItem; } function generateStorageID(): Uint8Array { return getRandomBytes(16); } type GeneratedManifestType = { postUploadUpdateFunctions: Array<() => unknown>; deleteKeys: Array; newItems: Set; storageManifest: Proto.IStorageManifest; }; async function generateManifest( version: number, previousManifest?: Proto.IManifestRecord, isNewManifest = false ): Promise { log.info( `storageService.upload(${version}): generating manifest ` + `new=${isNewManifest}` ); await window.ConversationController.checkForConflicts(); const ITEM_TYPE = Proto.ManifestRecord.Identifier.Type; const postUploadUpdateFunctions: Array<() => unknown> = []; const insertKeys: Array = []; const deleteKeys: Array = []; const manifestRecordKeys: Set = new Set(); const newItems: Set = new Set(); const conversations = window.getConversations(); for (let i = 0; i < conversations.length; i += 1) { const conversation = conversations.models[i]; const identifier = new Proto.ManifestRecord.Identifier(); let storageRecord; const conversationType = typeofConversation(conversation.attributes); if (conversationType === ConversationTypes.Me) { storageRecord = new Proto.StorageRecord(); // eslint-disable-next-line no-await-in-loop storageRecord.account = await toAccountRecord(conversation); identifier.type = ITEM_TYPE.ACCOUNT; } else if (conversationType === ConversationTypes.Direct) { // Contacts must have UUID if (!conversation.get('uuid')) { continue; } const validationError = conversation.validate(); if (validationError) { const droppedID = conversation.get('storageID'); const droppedVersion = conversation.get('storageVersion'); if (!droppedID) { continue; } const recordID = redactStorageID( droppedID, droppedVersion, conversation ); log.warn( `storageService.generateManifest(${version}): ` + `skipping contact=${recordID} ` + `due to local validation error=${validationError}` ); conversation.unset('storageID'); deleteKeys.push(Bytes.fromBase64(droppedID)); continue; } storageRecord = new Proto.StorageRecord(); // eslint-disable-next-line no-await-in-loop storageRecord.contact = await toContactRecord(conversation); identifier.type = ITEM_TYPE.CONTACT; } else if (conversationType === ConversationTypes.GroupV2) { storageRecord = new Proto.StorageRecord(); // eslint-disable-next-line no-await-in-loop storageRecord.groupV2 = await toGroupV2Record(conversation); identifier.type = ITEM_TYPE.GROUPV2; } else if (conversationType === ConversationTypes.GroupV1) { storageRecord = new Proto.StorageRecord(); // eslint-disable-next-line no-await-in-loop storageRecord.groupV1 = await toGroupV1Record(conversation); identifier.type = ITEM_TYPE.GROUPV1; } else { log.warn( `storageService.upload(${version}): ` + `unknown conversation=${conversation.idForLogging()}` ); } if (!storageRecord) { continue; } const currentStorageID = conversation.get('storageID'); const currentStorageVersion = conversation.get('storageVersion'); const currentRedactedID = currentStorageID ? redactStorageID(currentStorageID, currentStorageVersion) : undefined; const isNewItem = isNewManifest || Boolean(conversation.get('needsStorageServiceSync')) || !currentStorageID; const storageID = isNewItem ? Bytes.toBase64(generateStorageID()) : currentStorageID; let storageItem; try { // eslint-disable-next-line no-await-in-loop storageItem = await encryptRecord(storageID, storageRecord); } catch (err) { log.error( `storageService.upload(${version}): encrypt record failed:`, Errors.toLogFormat(err) ); throw err; } identifier.raw = storageItem.key; // When a client needs to update a given record it should create it // under a new key and delete the existing key. if (isNewItem) { newItems.add(storageItem); insertKeys.push(storageID); const newRedactedID = redactStorageID(storageID, version, conversation); if (currentStorageID) { log.info( `storageService.upload(${version}): ` + `updating from=${currentRedactedID} ` + `to=${newRedactedID}` ); deleteKeys.push(Bytes.fromBase64(currentStorageID)); } else { log.info( `storageService.upload(${version}): adding key=${newRedactedID}` ); } postUploadUpdateFunctions.push(() => { conversation.set({ needsStorageServiceSync: false, storageVersion: version, storageID, }); updateConversation(conversation.attributes); }); } manifestRecordKeys.add(identifier); } const unknownRecordsArray: ReadonlyArray = ( window.storage.get('storage-service-unknown-records') || [] ).filter((record: UnknownRecord) => !validRecordTypes.has(record.itemType)); const redactedUnknowns = unknownRecordsArray.map(redactExtendedStorageID); log.info( `storageService.upload(${version}): adding unknown ` + `records=${JSON.stringify(redactedUnknowns)} ` + `count=${redactedUnknowns.length}` ); // When updating the manifest, ensure all "unknown" keys are added to the // new manifest, so we don't inadvertently delete something we don't understand unknownRecordsArray.forEach((record: UnknownRecord) => { const identifier = new Proto.ManifestRecord.Identifier(); identifier.type = record.itemType; identifier.raw = Bytes.fromBase64(record.storageID); manifestRecordKeys.add(identifier); }); const recordsWithErrors: ReadonlyArray = window.storage.get( 'storage-service-error-records', new Array() ); const redactedErrors = recordsWithErrors.map(redactExtendedStorageID); log.info( `storageService.upload(${version}): adding error ` + `records=${JSON.stringify(redactedErrors)} count=${redactedErrors.length}` ); // These records failed to merge in the previous fetchManifest, but we still // need to include them so that the manifest is complete recordsWithErrors.forEach((record: UnknownRecord) => { const identifier = new Proto.ManifestRecord.Identifier(); identifier.type = record.itemType; identifier.raw = Bytes.fromBase64(record.storageID); manifestRecordKeys.add(identifier); }); // Delete keys that we wanted to drop during the processing of the manifest. const storedPendingDeletes = window.storage.get( 'storage-service-pending-deletes', [] ); const redactedPendingDeletes = storedPendingDeletes.map( redactExtendedStorageID ); log.info( `storageService.upload(${version}): ` + `deleting extra keys=${JSON.stringify(redactedPendingDeletes)} ` + `count=${redactedPendingDeletes.length}` ); for (const { storageID } of storedPendingDeletes) { deleteKeys.push(Bytes.fromBase64(storageID)); } // Validate before writing const rawDuplicates = new Set(); const typeRawDuplicates = new Set(); let hasAccountType = false; manifestRecordKeys.forEach(identifier => { // Ensure there are no duplicate StorageIdentifiers in your manifest // This can be broken down into two parts: // There are no duplicate type+raw pairs // There are no duplicate raw bytes strictAssert(identifier.raw, 'manifest record key without raw identifier'); const storageID = Bytes.toBase64(identifier.raw); const typeAndRaw = `${identifier.type}+${storageID}`; if ( rawDuplicates.has(identifier.raw) || typeRawDuplicates.has(typeAndRaw) ) { log.warn( `storageService.upload(${version}): removing from duplicate item ` + 'from the manifest', redactStorageID(storageID), identifier.type ); manifestRecordKeys.delete(identifier); } rawDuplicates.add(identifier.raw); typeRawDuplicates.add(typeAndRaw); // Ensure all deletes are not present in the manifest const hasDeleteKey = deleteKeys.find( key => Bytes.toBase64(key) === storageID ); if (hasDeleteKey) { log.warn( `storageService.upload(${version}): removing key which has been deleted`, redactStorageID(storageID), identifier.type ); manifestRecordKeys.delete(identifier); } // Ensure that there is *exactly* one Account type in the manifest if (identifier.type === ITEM_TYPE.ACCOUNT) { if (hasAccountType) { log.warn( `storageService.upload(${version}): removing duplicate account`, redactStorageID(storageID) ); manifestRecordKeys.delete(identifier); } hasAccountType = true; } }); rawDuplicates.clear(); typeRawDuplicates.clear(); const storageKeyDuplicates = new Set(); newItems.forEach(storageItem => { // Ensure there are no duplicate StorageIdentifiers in your list of inserts strictAssert(storageItem.key, 'New storage item without key'); const storageID = Bytes.toBase64(storageItem.key); if (storageKeyDuplicates.has(storageID)) { log.warn( `storageService.upload(${version}): ` + 'removing duplicate identifier from inserts', redactStorageID(storageID) ); newItems.delete(storageItem); } storageKeyDuplicates.add(storageID); }); storageKeyDuplicates.clear(); // If we have a copy of what the current remote manifest is then we run these // additional validations comparing our pending manifest to the remote // manifest: if (previousManifest) { const pendingInserts: Set = new Set(); const pendingDeletes: Set = new Set(); const remoteKeys: Set = new Set(); (previousManifest.keys ?? []).forEach( (identifier: IManifestRecordIdentifier) => { strictAssert(identifier.raw, 'Identifier without raw field'); const storageID = Bytes.toBase64(identifier.raw); remoteKeys.add(storageID); } ); const localKeys: Set = new Set(); manifestRecordKeys.forEach((identifier: IManifestRecordIdentifier) => { strictAssert(identifier.raw, 'Identifier without raw field'); const storageID = Bytes.toBase64(identifier.raw); localKeys.add(storageID); if (!remoteKeys.has(storageID)) { pendingInserts.add(storageID); } }); remoteKeys.forEach(storageID => { if (!localKeys.has(storageID)) { pendingDeletes.add(storageID); } }); if (deleteKeys.length !== pendingDeletes.size) { const localDeletes = deleteKeys.map(key => redactStorageID(Bytes.toBase64(key)) ); const remoteDeletes: Array = []; pendingDeletes.forEach(id => remoteDeletes.push(redactStorageID(id))); log.error( `storageService.upload(${version}): delete key sizes do not match`, 'local', localDeletes.join(','), 'remote', remoteDeletes.join(',') ); throw new Error('invalid write delete keys length do not match'); } if (newItems.size !== pendingInserts.size) { throw new Error('invalid write insert items length do not match'); } deleteKeys.forEach(key => { const storageID = Bytes.toBase64(key); if (!pendingDeletes.has(storageID)) { throw new Error( 'invalid write delete key missing from pending deletes' ); } }); insertKeys.forEach(storageID => { if (!pendingInserts.has(storageID)) { throw new Error( 'invalid write insert key missing from pending inserts' ); } }); } const manifestRecord = new Proto.ManifestRecord(); manifestRecord.version = Long.fromNumber(version); manifestRecord.keys = Array.from(manifestRecordKeys); const storageKeyBase64 = window.storage.get('storageKey'); if (!storageKeyBase64) { throw new Error('No storage key'); } const storageKey = Bytes.fromBase64(storageKeyBase64); const storageManifestKey = deriveStorageManifestKey( storageKey, Long.fromNumber(version) ); const encryptedManifest = encryptProfile( Proto.ManifestRecord.encode(manifestRecord).finish(), storageManifestKey ); const storageManifest = new Proto.StorageManifest(); storageManifest.version = manifestRecord.version; storageManifest.value = encryptedManifest; return { postUploadUpdateFunctions, deleteKeys, newItems, storageManifest, }; } async function uploadManifest( version: number, { postUploadUpdateFunctions, deleteKeys, newItems, storageManifest, }: GeneratedManifestType ): Promise { if (!window.textsecure.messaging) { throw new Error('storageService.uploadManifest: We are offline!'); } if (newItems.size === 0 && deleteKeys.length === 0) { log.info(`storageService.upload(${version}): nothing to upload`); return; } const credentials = window.storage.get('storageCredentials'); try { log.info( `storageService.upload(${version}): inserting=${newItems.size} ` + `deleting=${deleteKeys.length}` ); const writeOperation = new Proto.WriteOperation(); writeOperation.manifest = storageManifest; writeOperation.insertItem = Array.from(newItems); writeOperation.deleteKey = deleteKeys; await window.textsecure.messaging.modifyStorageRecords( Proto.WriteOperation.encode(writeOperation).finish(), { credentials, } ); log.info( `storageService.upload(${version}): upload complete, updating ` + `items=${postUploadUpdateFunctions.length}` ); // update conversations with the new storageID postUploadUpdateFunctions.forEach(fn => fn()); } catch (err) { log.error( `storageService.upload(${version}): failed!`, Errors.toLogFormat(err) ); if (err.code === 409) { if (conflictBackOff.isFull()) { log.error( `storageService.upload(${version}): exceeded maximum consecutive ` + 'conflicts' ); return; } log.info( `storageService.upload(${version}): conflict found with ` + `version=${version}, running sync job ` + `times=${conflictBackOff.getIndex()}` ); throw err; } throw err; } log.info(`storageService.upload(${version}): setting new manifestVersion`); window.storage.put('manifestVersion', version); conflictBackOff.reset(); backOff.reset(); try { await singleProtoJobQueue.add( window.textsecure.messaging.getFetchManifestSyncMessage() ); } catch (error) { log.error( `storageService.upload(${version}): Failed to queue sync message`, Errors.toLogFormat(error) ); } } async function stopStorageServiceSync(reason: Error) { log.warn('storageService.stopStorageServiceSync', Errors.toLogFormat(reason)); await window.storage.remove('storageKey'); if (backOff.isFull()) { log.warn( 'storageService.stopStorageServiceSync: too many consecutive stops' ); return; } await sleep(backOff.getAndIncrement()); log.info('storageService.stopStorageServiceSync: requesting new keys'); setTimeout(async () => { if (!window.textsecure.messaging) { throw new Error('storageService.stopStorageServiceSync: We are offline!'); } if (window.ConversationController.areWePrimaryDevice()) { log.warn( 'stopStorageServiceSync: We are primary device; not sending key sync request' ); return; } try { await singleProtoJobQueue.add( window.textsecure.messaging.getRequestKeySyncMessage() ); } catch (error) { log.error( 'storageService.stopStorageServiceSync: Failed to queue sync message', Errors.toLogFormat(error) ); } }); } async function createNewManifest() { log.info('storageService.createNewManifest: creating new manifest'); const version = window.storage.get('manifestVersion', 0); const { postUploadUpdateFunctions, newItems, storageManifest } = await generateManifest(version, undefined, true); await uploadManifest(version, { postUploadUpdateFunctions, // we have created a new manifest, there should be no keys to delete deleteKeys: [], newItems, storageManifest, }); } async function decryptManifest( encryptedManifest: Proto.IStorageManifest ): Promise { const { version, value } = encryptedManifest; const storageKeyBase64 = window.storage.get('storageKey'); if (!storageKeyBase64) { throw new Error('No storage key'); } const storageKey = Bytes.fromBase64(storageKeyBase64); const storageManifestKey = deriveStorageManifestKey( storageKey, dropNull(version) ); strictAssert(value, 'StorageManifest has no value field'); const decryptedManifest = decryptProfile(value, storageManifestKey); return Proto.ManifestRecord.decode(decryptedManifest); } async function fetchManifest( manifestVersion: number ): Promise { log.info('storageService.sync: fetch start'); if (!window.textsecure.messaging) { throw new Error('storageService.sync: we are offline!'); } try { const credentials = await window.textsecure.messaging.getStorageCredentials(); window.storage.put('storageCredentials', credentials); const manifestBinary = await window.textsecure.messaging.getStorageManifest( { credentials, greaterThanVersion: manifestVersion, } ); const encryptedManifest = Proto.StorageManifest.decode(manifestBinary); try { return decryptManifest(encryptedManifest); } catch (err) { await stopStorageServiceSync(err); return; } } catch (err) { if (err.code === 204) { log.info('storageService.sync: no newer manifest, ok'); return; } log.error('storageService.sync: failed!', Errors.toLogFormat(err)); if (err.code === 404) { await createNewManifest(); return; } throw err; } } type MergeableItemType = { itemType: number; storageID: string; storageRecord: Proto.IStorageRecord; }; type MergedRecordType = UnknownRecord & { hasConflict: boolean; shouldDrop: boolean; hasError: boolean; isUnsupported: boolean; updatedConversations: ReadonlyArray; needProfileFetch: ReadonlyArray; }; async function mergeRecord( storageVersion: number, itemToMerge: MergeableItemType ): Promise { const { itemType, storageID, storageRecord } = itemToMerge; const ITEM_TYPE = Proto.ManifestRecord.Identifier.Type; let mergeResult: MergeResultType = { hasConflict: false, details: [] }; let isUnsupported = false; let hasError = false; let updatedConversations = new Array(); const needProfileFetch = new Array(); try { if (itemType === ITEM_TYPE.UNKNOWN) { log.warn('storageService.mergeRecord: Unknown item type', storageID); } else if (itemType === ITEM_TYPE.CONTACT && storageRecord.contact) { mergeResult = await mergeContactRecord( storageID, storageVersion, storageRecord.contact ); } else if (itemType === ITEM_TYPE.GROUPV1 && storageRecord.groupV1) { mergeResult = await mergeGroupV1Record( storageID, storageVersion, storageRecord.groupV1 ); } else if (itemType === ITEM_TYPE.GROUPV2 && storageRecord.groupV2) { mergeResult = await mergeGroupV2Record( storageID, storageVersion, storageRecord.groupV2 ); } else if (itemType === ITEM_TYPE.ACCOUNT && storageRecord.account) { mergeResult = await mergeAccountRecord( storageID, storageVersion, storageRecord.account ); } else { isUnsupported = true; log.warn( `storageService.merge(${redactStorageID( storageID, storageVersion )}): unknown item type=${itemType}` ); } const redactedID = redactStorageID( storageID, storageVersion, mergeResult.conversation ); const oldID = mergeResult.oldStorageID ? redactStorageID(mergeResult.oldStorageID, mergeResult.oldStorageVersion) : '?'; updatedConversations = [ ...updatedConversations, ...(mergeResult.updatedConversations ?? []), ]; if (mergeResult.needsProfileFetch) { strictAssert(mergeResult.conversation, 'needsProfileFetch, but no convo'); needProfileFetch.push(mergeResult.conversation); } log.info( `storageService.merge(${redactedID}): merged item type=${itemType} ` + `oldID=${oldID} ` + `conflict=${mergeResult.hasConflict} ` + `shouldDrop=${Boolean(mergeResult.shouldDrop)} ` + `details=${JSON.stringify(mergeResult.details)}` ); } catch (err) { hasError = true; const redactedID = redactStorageID(storageID, storageVersion); log.error( `storageService.merge(${redactedID}): error with ` + `item type=${itemType} ` + `details=${Errors.toLogFormat(err)}` ); } return { hasConflict: mergeResult.hasConflict, shouldDrop: Boolean(mergeResult.shouldDrop), hasError, isUnsupported, itemType, storageID, updatedConversations, needProfileFetch, }; } async function processManifest( manifest: Proto.IManifestRecord, version: number ): Promise { if (!window.textsecure.messaging) { throw new Error('storageService.processManifest: We are offline!'); } const remoteKeysTypeMap = new Map(); (manifest.keys || []).forEach(({ raw, type }: IManifestRecordIdentifier) => { strictAssert(raw, 'Identifier without raw field'); remoteKeysTypeMap.set(Bytes.toBase64(raw), type); }); const remoteKeys = new Set(remoteKeysTypeMap.keys()); const localVersions = new Map(); const conversations = window.getConversations(); conversations.forEach((conversation: ConversationModel) => { const storageID = conversation.get('storageID'); if (storageID) { localVersions.set(storageID, conversation.get('storageVersion')); } }); const unknownRecordsArray: ReadonlyArray = window.storage.get('storage-service-unknown-records') || []; const stillUnknown = unknownRecordsArray.filter((record: UnknownRecord) => { // Do not include any unknown records that we already support if (!validRecordTypes.has(record.itemType)) { localVersions.set(record.storageID, record.storageVersion); return false; } return true; }); const remoteOnlySet = new Set(); for (const key of remoteKeys) { if (!localVersions.has(key)) { remoteOnlySet.add(key); } } const localOnlySet = new Set(); for (const key of localVersions.keys()) { if (!remoteKeys.has(key)) { localOnlySet.add(key); } } const redactedRemoteOnly = Array.from(remoteOnlySet).map(id => redactStorageID(id, version) ); const redactedLocalOnly = Array.from(localOnlySet).map(id => redactStorageID(id, localVersions.get(id)) ); log.info( `storageService.process(${version}): localRecords=${conversations.length} ` + `localKeys=${localVersions.size} unknownKeys=${stillUnknown.length} ` + `remoteKeys=${remoteKeys.size}` ); log.info( `storageService.process(${version}): ` + `remoteOnlyCount=${remoteOnlySet.size} ` + `remoteOnlyKeys=${JSON.stringify(redactedRemoteOnly)}` ); log.info( `storageService.process(${version}): ` + `localOnlyCount=${localOnlySet.size} ` + `localOnlyKeys=${JSON.stringify(redactedLocalOnly)}` ); const remoteOnlyRecords = new Map(); remoteOnlySet.forEach(storageID => { remoteOnlyRecords.set(storageID, { storageID, itemType: remoteKeysTypeMap.get(storageID), }); }); let conflictCount = 0; if (remoteOnlyRecords.size) { conflictCount = await processRemoteRecords(version, remoteOnlyRecords); } // Post-merge, if our local records contain any storage IDs that were not // present in the remote manifest then we'll need to clear it, generate a // new storageID for that record, and upload. // This might happen if a device pushes a manifest which doesn't contain // the keys that we have in our local database. window.getConversations().forEach((conversation: ConversationModel) => { const storageID = conversation.get('storageID'); if (storageID && !remoteKeys.has(storageID)) { const storageVersion = conversation.get('storageVersion'); const missingKey = redactStorageID( storageID, storageVersion, conversation ); log.info( `storageService.process(${version}): localKey=${missingKey} was not ` + 'in remote manifest' ); conversation.unset('storageID'); conversation.unset('storageVersion'); updateConversation(conversation.attributes); } }); log.info( `storageService.process(${version}): conflictCount=${conflictCount}` ); return conflictCount; } async function processRemoteRecords( storageVersion: number, remoteOnlyRecords: Map ): Promise { const storageKeyBase64 = window.storage.get('storageKey'); if (!storageKeyBase64) { throw new Error('No storage key'); } const storageKey = Bytes.fromBase64(storageKeyBase64); log.info( `storageService.process(${storageVersion}): fetching remote keys ` + `count=${remoteOnlyRecords.size}` ); const credentials = window.storage.get('storageCredentials'); const batches = chunk(Array.from(remoteOnlyRecords.keys()), MAX_READ_KEYS); const storageItems = ( await pMap( batches, async ( batch: ReadonlyArray ): Promise> => { const readOperation = new Proto.ReadOperation(); readOperation.readKey = batch.map(Bytes.fromBase64); const storageItemsBuffer = await window.textsecure.messaging.getStorageRecords( Proto.ReadOperation.encode(readOperation).finish(), { credentials, } ); return Proto.StorageItems.decode(storageItemsBuffer).items ?? []; }, { concurrency: 5 } ) ).flat(); const missingKeys = new Set(remoteOnlyRecords.keys()); const decryptedStorageItems = await pMap( storageItems, async ( storageRecordWrapper: Proto.IStorageItem ): Promise => { const { key, value: storageItemCiphertext } = storageRecordWrapper; if (!key || !storageItemCiphertext) { const error = new Error( `storageService.process(${storageVersion}): ` + 'missing key and/or Ciphertext' ); await stopStorageServiceSync(error); throw error; } const base64ItemID = Bytes.toBase64(key); missingKeys.delete(base64ItemID); const storageItemKey = deriveStorageItemKey(storageKey, base64ItemID); let storageItemPlaintext; try { storageItemPlaintext = decryptProfile( storageItemCiphertext, storageItemKey ); } catch (err) { log.error( `storageService.process(${storageVersion}): ` + 'Error decrypting storage item', Errors.toLogFormat(err) ); await stopStorageServiceSync(err); throw err; } const storageRecord = Proto.StorageRecord.decode(storageItemPlaintext); const remoteRecord = remoteOnlyRecords.get(base64ItemID); if (!remoteRecord) { throw new Error( "Got a remote record that wasn't requested with " + `storageID: ${base64ItemID}` ); } return { itemType: remoteRecord.itemType, storageID: base64ItemID, storageRecord, }; }, { concurrency: 5 } ); const redactedMissingKeys = Array.from(missingKeys).map(id => redactStorageID(id, storageVersion) ); log.info( `storageService.process(${storageVersion}): missing remote ` + `keys=${JSON.stringify(redactedMissingKeys)} ` + `count=${missingKeys.size}` ); const ITEM_TYPE = Proto.ManifestRecord.Identifier.Type; const droppedKeys = new Set(); // Drop all GV1 records for which we have GV2 record in the same manifest const masterKeys = new Map(); for (const { itemType, storageID, storageRecord } of decryptedStorageItems) { if (itemType === ITEM_TYPE.GROUPV2 && storageRecord.groupV2?.masterKey) { masterKeys.set( Bytes.toBase64(storageRecord.groupV2.masterKey), storageID ); } } let accountItem: MergeableItemType | undefined; const prunedStorageItems = decryptedStorageItems.filter(item => { const { itemType, storageID, storageRecord } = item; if (itemType === ITEM_TYPE.ACCOUNT) { if (accountItem !== undefined) { log.warn( `storageService.process(${storageVersion}): duplicate account ` + `record=${redactStorageID(storageID, storageVersion)} ` + `previous=${redactStorageID(accountItem.storageID, storageVersion)}` ); droppedKeys.add(accountItem.storageID); } accountItem = item; return false; } if (itemType !== ITEM_TYPE.GROUPV1 || !storageRecord.groupV1?.id) { return true; } const masterKey = deriveMasterKeyFromGroupV1(storageRecord.groupV1.id); const gv2StorageID = masterKeys.get(Bytes.toBase64(masterKey)); if (!gv2StorageID) { return true; } log.warn( `storageService.process(${storageVersion}): dropping ` + `GV1 record=${redactStorageID(storageID, storageVersion)} ` + `GV2 record=${redactStorageID(gv2StorageID, storageVersion)} ` + 'is in the same manifest' ); droppedKeys.add(storageID); return false; }); try { log.info( `storageService.process(${storageVersion}): ` + `attempting to merge records=${prunedStorageItems.length}` ); if (accountItem !== undefined) { log.info( `storageService.process(${storageVersion}): account ` + `record=${redactStorageID(accountItem.storageID, storageVersion)}` ); } const mergedRecords = [ ...(await pMap( prunedStorageItems, (item: MergeableItemType) => mergeRecord(storageVersion, item), { concurrency: 32 } )), // Merge Account records last since it contains the pinned conversations // and we need all other records merged first before we can find the pinned // records in our db ...(accountItem ? [await mergeRecord(storageVersion, accountItem)] : []), ]; log.info( `storageService.process(${storageVersion}): ` + `processed records=${mergedRecords.length}` ); const updatedConversations = mergedRecords .map(record => record.updatedConversations) .flat() .map(convo => convo.attributes); await updateConversations(updatedConversations); log.info( `storageService.process(${storageVersion}): ` + `updated conversations=${updatedConversations.length}` ); const needProfileFetch = mergedRecords .map(record => record.needProfileFetch) .flat(); log.info( `storageService.process(${storageVersion}): ` + `kicking off profile fetches=${needProfileFetch.length}` ); // Intentionally not awaiting pMap(needProfileFetch, convo => convo.getProfiles(), { concurrency: 3 }); // Collect full map of previously and currently unknown records const unknownRecords: Map = new Map(); const previousUnknownRecords: ReadonlyArray = window.storage.get( 'storage-service-unknown-records', new Array() ); previousUnknownRecords.forEach((record: UnknownRecord) => { unknownRecords.set(record.storageID, record); }); const newRecordsWithErrors: Array = []; let conflictCount = 0; mergedRecords.forEach((mergedRecord: MergedRecordType) => { if (mergedRecord.isUnsupported) { unknownRecords.set(mergedRecord.storageID, { itemType: mergedRecord.itemType, storageID: mergedRecord.storageID, storageVersion, }); } else if (mergedRecord.hasError) { newRecordsWithErrors.push({ itemType: mergedRecord.itemType, storageID: mergedRecord.storageID, storageVersion, }); } if (mergedRecord.hasConflict) { conflictCount += 1; } if (mergedRecord.shouldDrop) { droppedKeys.add(mergedRecord.storageID); } }); const redactedDroppedKeys = Array.from(droppedKeys.values()).map(key => redactStorageID(key, storageVersion) ); log.info( `storageService.process(${storageVersion}): ` + `dropped keys=${JSON.stringify(redactedDroppedKeys)} ` + `count=${redactedDroppedKeys.length}` ); // Filter out all the unknown records we're already supporting const newUnknownRecords = Array.from(unknownRecords.values()).filter( (record: UnknownRecord) => !validRecordTypes.has(record.itemType) ); const redactedNewUnknowns = newUnknownRecords.map(redactExtendedStorageID); log.info( `storageService.process(${storageVersion}): ` + `unknown records=${JSON.stringify(redactedNewUnknowns)} ` + `count=${redactedNewUnknowns.length}` ); await window.storage.put( 'storage-service-unknown-records', newUnknownRecords ); const redactedErrorRecords = newRecordsWithErrors.map( redactExtendedStorageID ); log.info( `storageService.process(${storageVersion}): ` + `error records=${JSON.stringify(redactedErrorRecords)} ` + `count=${redactedErrorRecords.length}` ); // Refresh the list of records that had errors with every push, that way // this list doesn't grow unbounded and we keep the list of storage keys // fresh. await window.storage.put( 'storage-service-error-records', newRecordsWithErrors ); // Store/overwrite keys pending deletion, but use them only when we have to // upload a new manifest to avoid oscillation. const pendingDeletes = [...missingKeys, ...droppedKeys].map(storageID => ({ storageID, storageVersion, })); const redactedPendingDeletes = pendingDeletes.map(redactExtendedStorageID); log.info( `storageService.process(${storageVersion}): ` + `pending deletes=${JSON.stringify(redactedPendingDeletes)} ` + `count=${redactedPendingDeletes.length}` ); await window.storage.put('storage-service-pending-deletes', pendingDeletes); if (conflictCount === 0) { conflictBackOff.reset(); } return conflictCount; } catch (err) { log.error( `storageService.process(${storageVersion}): ` + 'failed to process remote records', Errors.toLogFormat(err) ); } // conflictCount return 0; } async function sync( ignoreConflicts = false ): Promise { if (!window.storage.get('storageKey')) { throw new Error('storageService.sync: Cannot start; no storage key!'); } log.info( `storageService.sync: starting... ignoreConflicts=${ignoreConflicts}` ); let manifest: Proto.ManifestRecord | undefined; try { // If we've previously interacted with strage service, update 'fetchComplete' record const previousFetchComplete = window.storage.get('storageFetchComplete'); const manifestFromStorage = window.storage.get('manifestVersion'); if (!previousFetchComplete && isNumber(manifestFromStorage)) { window.storage.put('storageFetchComplete', true); } const localManifestVersion = manifestFromStorage || 0; log.info( 'storageService.sync: fetching latest ' + `after version=${localManifestVersion}` ); manifest = await fetchManifest(localManifestVersion); // Guarding against no manifests being returned, everything should be ok if (!manifest) { log.info( `storageService.sync: no updates, version=${localManifestVersion}` ); return undefined; } strictAssert( manifest.version !== undefined && manifest.version !== null, 'Manifest without version' ); const version = manifest.version?.toNumber() ?? 0; log.info( `storageService.sync: updating to remoteVersion=${version} from ` + `version=${localManifestVersion}` ); const conflictCount = await processManifest(manifest, version); log.info( `storageService.sync: updated to version=${version} ` + `conflicts=${conflictCount}` ); await window.storage.put('manifestVersion', version); const hasConflicts = conflictCount !== 0; if (hasConflicts && !ignoreConflicts) { await upload(true); } // We now know that we've successfully completed a storage service fetch await window.storage.put('storageFetchComplete', true); } catch (err) { log.error( 'storageService.sync: error processing manifest', Errors.toLogFormat(err) ); } log.info('storageService.sync: complete'); return manifest; } async function upload(fromSync = false): Promise { if (!window.textsecure.messaging) { throw new Error('storageService.upload: We are offline!'); } // Rate limit uploads coming from syncing if (fromSync) { uploadBucket.push(Date.now()); if (uploadBucket.length >= 3) { const [firstMostRecentWrite] = uploadBucket; if (isMoreRecentThan(5 * durations.MINUTE, firstMostRecentWrite)) { throw new Error( 'storageService.uploadManifest: too many writes too soon.' ); } uploadBucket.shift(); } } if (!window.storage.get('storageKey')) { // requesting new keys runs the sync job which will detect the conflict // and re-run the upload job once we're merged and up-to-date. log.info('storageService.upload: no storageKey, requesting new keys'); backOff.reset(); if (window.ConversationController.areWePrimaryDevice()) { log.warn( 'storageService.upload: We are primary device; not sending key sync request' ); return; } try { await singleProtoJobQueue.add( window.textsecure.messaging.getRequestKeySyncMessage() ); } catch (error) { log.error( 'storageService.upload: Failed to queue sync message', Errors.toLogFormat(error) ); } return; } let previousManifest: Proto.ManifestRecord | undefined; if (!fromSync) { // Syncing before we upload so that we repair any unknown records and // records with errors as well as ensure that we have the latest up to date // manifest. // We are going to upload after this sync so we can ignore any conflicts // that arise during the sync. const ignoreConflicts = true; previousManifest = await sync(ignoreConflicts); } const localManifestVersion = window.storage.get('manifestVersion', 0); const version = Number(localManifestVersion) + 1; log.info( `storageService.upload(${version}): will update to manifest version` ); try { const generatedManifest = await generateManifest( version, previousManifest, false ); await uploadManifest(version, generatedManifest); // Clear pending delete keys after successful upload await window.storage.put('storage-service-pending-deletes', []); } catch (err) { if (err.code === 409) { await sleep(conflictBackOff.getAndIncrement()); log.info('storageService.upload: pushing sync on the queue'); // The sync job will check for conflicts and as part of that conflict // check if an item needs sync and doesn't match with the remote record // it'll kick off another upload. setTimeout(runStorageServiceSyncJob); return; } log.error( `storageService.upload(${version}): error`, Errors.toLogFormat(err) ); } } let storageServiceEnabled = false; export function enableStorageService(): void { storageServiceEnabled = true; } // Note: this function is meant to be called before ConversationController is hydrated. // It goes directly to the database, so in-memory conversations will be out of date. export async function eraseAllStorageServiceState({ keepUnknownFields = false, }: { keepUnknownFields?: boolean } = {}): Promise { log.info('storageService.eraseAllStorageServiceState: starting...'); await Promise.all([ window.storage.remove('manifestVersion'), keepUnknownFields ? Promise.resolve() : window.storage.remove('storage-service-unknown-records'), window.storage.remove('storageCredentials'), ]); await eraseStorageServiceStateFromConversations(); log.info('storageService.eraseAllStorageServiceState: complete'); } export const storageServiceUploadJob = debounce(() => { if (!storageServiceEnabled) { log.info('storageService.storageServiceUploadJob: called before enabled'); return; } storageJobQueue(async () => { await upload(); }, `upload v${window.storage.get('manifestVersion')}`); }, 500); export const runStorageServiceSyncJob = debounce(() => { if (!storageServiceEnabled) { log.info('storageService.runStorageServiceSyncJob: called before enabled'); return; } ourProfileKeyService.blockGetWithPromise( storageJobQueue(async () => { await sync(); // Notify listeners about sync completion window.Whisper.events.trigger('storageService:syncComplete'); }, `sync v${window.storage.get('manifestVersion')}`) ); }, 500);