// Copyright 2020-2021 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only import { debounce, isNumber } from 'lodash'; import pMap from 'p-map'; import Crypto from '../textsecure/Crypto'; import dataInterface from '../sql/Client'; import * as Bytes from '../Bytes'; import { arrayBufferToBase64, base64ToArrayBuffer, deriveStorageItemKey, deriveStorageManifestKey, typedArrayToArrayBuffer, } from '../Crypto'; import { mergeAccountRecord, mergeContactRecord, mergeGroupV1Record, mergeGroupV2Record, toAccountRecord, toContactRecord, toGroupV1Record, toGroupV2Record, } from './storageRecordOps'; import { ConversationModel } from '../models/conversations'; import { strictAssert } from '../util/assert'; import * as durations from '../util/durations'; import { BackOff } from '../util/BackOff'; import { handleMessageSend } from '../util/handleMessageSend'; import { storageJobQueue } from '../util/JobQueue'; import { sleep } from '../util/sleep'; import { isMoreRecentThan } from '../util/timestamp'; import { normalizeNumber } from '../util/normalizeNumber'; import { isStorageWriteFeatureEnabled } from '../storage/isFeatureEnabled'; import { ourProfileKeyService } from './ourProfileKey'; import { ConversationTypes, typeofConversation, } from '../util/whatTypeOfConversation'; import { SignalService as Proto } from '../protobuf'; type IManifestRecordIdentifier = Proto.ManifestRecord.IIdentifier; // TODO: remove once we move away from ArrayBuffers const FIXMEU8 = Uint8Array; const { eraseStorageServiceStateFromConversations, updateConversation, } = dataInterface; const uploadBucket: Array = []; const validRecordTypes = new Set([ 0, // UNKNOWN 1, // CONTACT 2, // GROUPV1 3, // GROUPV2 4, // ACCOUNT ]); const backOff = new BackOff([ durations.SECOND, 5 * durations.SECOND, 30 * durations.SECOND, 2 * durations.MINUTE, 5 * durations.MINUTE, ]); const conflictBackOff = new BackOff([ durations.SECOND, 5 * durations.SECOND, 30 * durations.SECOND, ]); function redactStorageID(storageID: string): string { return storageID.substring(0, 3); } type RemoteRecord = { itemType: number; storageID: string; }; type UnknownRecord = RemoteRecord; async function encryptRecord( storageID: string | undefined, storageRecord: Proto.IStorageRecord ): Promise { const storageItem = new Proto.StorageItem(); const storageKeyBuffer = storageID ? base64ToArrayBuffer(String(storageID)) : generateStorageID(); const storageKeyBase64 = window.storage.get('storageKey'); if (!storageKeyBase64) { throw new Error('No storage key'); } const storageKey = base64ToArrayBuffer(storageKeyBase64); const storageItemKey = await deriveStorageItemKey( storageKey, arrayBufferToBase64(storageKeyBuffer) ); const encryptedRecord = await Crypto.encryptProfile( typedArrayToArrayBuffer(Proto.StorageRecord.encode(storageRecord).finish()), storageItemKey ); storageItem.key = new FIXMEU8(storageKeyBuffer); storageItem.value = new FIXMEU8(encryptedRecord); return storageItem; } function generateStorageID(): ArrayBuffer { return Crypto.getRandomBytes(16); } type GeneratedManifestType = { conversationsToUpdate: Array<{ conversation: ConversationModel; storageID: string | undefined; }>; deleteKeys: Array; newItems: Set; storageManifest: Proto.IStorageManifest; }; async function generateManifest( version: number, previousManifest?: Proto.IManifestRecord, isNewManifest = false ): Promise { window.log.info( 'storageService.generateManifest: generating manifest', version, isNewManifest ); await window.ConversationController.checkForConflicts(); const ITEM_TYPE = Proto.ManifestRecord.Identifier.Type; const conversationsToUpdate = []; const insertKeys: Array = []; const deleteKeys: Array = []; const manifestRecordKeys: Set = new Set(); const newItems: Set = new Set(); const conversations = window.getConversations(); for (let i = 0; i < conversations.length; i += 1) { const conversation = conversations.models[i]; const identifier = new Proto.ManifestRecord.Identifier(); let storageRecord; const conversationType = typeofConversation(conversation.attributes); if (conversationType === ConversationTypes.Me) { storageRecord = new Proto.StorageRecord(); // eslint-disable-next-line no-await-in-loop storageRecord.account = await toAccountRecord(conversation); identifier.type = ITEM_TYPE.ACCOUNT; } else if (conversationType === ConversationTypes.Direct) { storageRecord = new Proto.StorageRecord(); // eslint-disable-next-line no-await-in-loop storageRecord.contact = await toContactRecord(conversation); identifier.type = ITEM_TYPE.CONTACT; } else if (conversationType === ConversationTypes.GroupV2) { storageRecord = new Proto.StorageRecord(); // eslint-disable-next-line no-await-in-loop storageRecord.groupV2 = await toGroupV2Record(conversation); identifier.type = ITEM_TYPE.GROUPV2; } else if (conversationType === ConversationTypes.GroupV1) { storageRecord = new Proto.StorageRecord(); // eslint-disable-next-line no-await-in-loop storageRecord.groupV1 = await toGroupV1Record(conversation); identifier.type = ITEM_TYPE.GROUPV1; } else { window.log.info( 'storageService.generateManifest: unknown conversation', conversation.idForLogging() ); } if (storageRecord) { const currentStorageID = conversation.get('storageID'); const isNewItem = isNewManifest || Boolean(conversation.get('needsStorageServiceSync')) || !currentStorageID; const storageID = isNewItem ? arrayBufferToBase64(generateStorageID()) : currentStorageID; let storageItem; try { // eslint-disable-next-line no-await-in-loop storageItem = await encryptRecord(storageID, storageRecord); } catch (err) { window.log.error( 'storageService.generateManifest: encrypt record failed:', err && err.stack ? err.stack : String(err) ); throw err; } identifier.raw = storageItem.key; // When a client needs to update a given record it should create it // under a new key and delete the existing key. if (isNewItem) { newItems.add(storageItem); if (storageID) { insertKeys.push(storageID); window.log.info( 'storageService.generateManifest: new key', conversation.idForLogging(), redactStorageID(storageID) ); } else { window.log.info( 'storageService.generateManifest: no storage id', conversation.idForLogging() ); } const oldStorageID = conversation.get('storageID'); if (oldStorageID) { window.log.info( 'storageService.generateManifest: deleting key', redactStorageID(oldStorageID) ); deleteKeys.push(base64ToArrayBuffer(oldStorageID)); } conversationsToUpdate.push({ conversation, storageID, }); } manifestRecordKeys.add(identifier); } } const unknownRecordsArray: ReadonlyArray = ( window.storage.get('storage-service-unknown-records') || [] ).filter((record: UnknownRecord) => !validRecordTypes.has(record.itemType)); window.log.info( 'storageService.generateManifest: adding unknown records:', unknownRecordsArray.length ); // When updating the manifest, ensure all "unknown" keys are added to the // new manifest, so we don't inadvertently delete something we don't understand unknownRecordsArray.forEach((record: UnknownRecord) => { const identifier = new Proto.ManifestRecord.Identifier(); identifier.type = record.itemType; identifier.raw = Bytes.fromBase64(record.storageID); manifestRecordKeys.add(identifier); }); const recordsWithErrors: ReadonlyArray = window.storage.get( 'storage-service-error-records', new Array() ); window.log.info( 'storageService.generateManifest: adding records that had errors in the previous merge', recordsWithErrors.length ); // These records failed to merge in the previous fetchManifest, but we still // need to include them so that the manifest is complete recordsWithErrors.forEach((record: UnknownRecord) => { const identifier = new Proto.ManifestRecord.Identifier(); identifier.type = record.itemType; identifier.raw = Bytes.fromBase64(record.storageID); manifestRecordKeys.add(identifier); }); // Validate before writing const rawDuplicates = new Set(); const typeRawDuplicates = new Set(); let hasAccountType = false; manifestRecordKeys.forEach(identifier => { // Ensure there are no duplicate StorageIdentifiers in your manifest // This can be broken down into two parts: // There are no duplicate type+raw pairs // There are no duplicate raw bytes strictAssert(identifier.raw, 'manifest record key without raw identifier'); const storageID = Bytes.toBase64(identifier.raw); const typeAndRaw = `${identifier.type}+${storageID}`; if ( rawDuplicates.has(identifier.raw) || typeRawDuplicates.has(typeAndRaw) ) { window.log.info( 'storageService.generateManifest: removing duplicate identifier from manifest', identifier.type ); manifestRecordKeys.delete(identifier); } rawDuplicates.add(identifier.raw); typeRawDuplicates.add(typeAndRaw); // Ensure all deletes are not present in the manifest const hasDeleteKey = deleteKeys.find( key => arrayBufferToBase64(key) === storageID ); if (hasDeleteKey) { window.log.info( 'storageService.generateManifest: removing key which has been deleted', identifier.type ); manifestRecordKeys.delete(identifier); } // Ensure that there is *exactly* one Account type in the manifest if (identifier.type === ITEM_TYPE.ACCOUNT) { if (hasAccountType) { window.log.info( 'storageService.generateManifest: removing duplicate account' ); manifestRecordKeys.delete(identifier); } hasAccountType = true; } }); rawDuplicates.clear(); typeRawDuplicates.clear(); const storageKeyDuplicates = new Set(); newItems.forEach(storageItem => { // Ensure there are no duplicate StorageIdentifiers in your list of inserts strictAssert(storageItem.key, 'New storage item without key'); const storageID = Bytes.toBase64(storageItem.key); if (storageKeyDuplicates.has(storageID)) { window.log.info( 'storageService.generateManifest: removing duplicate identifier from inserts', redactStorageID(storageID) ); newItems.delete(storageItem); } storageKeyDuplicates.add(storageID); }); storageKeyDuplicates.clear(); // If we have a copy of what the current remote manifest is then we run these // additional validations comparing our pending manifest to the remote // manifest: if (previousManifest) { const pendingInserts: Set = new Set(); const pendingDeletes: Set = new Set(); const remoteKeys: Set = new Set(); (previousManifest.keys ?? []).forEach( (identifier: IManifestRecordIdentifier) => { strictAssert(identifier.raw, 'Identifier without raw field'); const storageID = Bytes.toBase64(identifier.raw); remoteKeys.add(storageID); } ); const localKeys: Set = new Set(); manifestRecordKeys.forEach((identifier: IManifestRecordIdentifier) => { strictAssert(identifier.raw, 'Identifier without raw field'); const storageID = Bytes.toBase64(identifier.raw); localKeys.add(storageID); if (!remoteKeys.has(storageID)) { pendingInserts.add(storageID); } }); remoteKeys.forEach(storageID => { if (!localKeys.has(storageID)) { pendingDeletes.add(storageID); } }); if (deleteKeys.length !== pendingDeletes.size) { throw new Error('invalid write delete keys length do not match'); } if (newItems.size !== pendingInserts.size) { throw new Error('invalid write insert items length do not match'); } deleteKeys.forEach(key => { const storageID = arrayBufferToBase64(key); if (!pendingDeletes.has(storageID)) { throw new Error( 'invalid write delete key missing from pending deletes' ); } }); insertKeys.forEach(storageID => { if (!pendingInserts.has(storageID)) { throw new Error( 'invalid write insert key missing from pending inserts' ); } }); } const manifestRecord = new Proto.ManifestRecord(); manifestRecord.version = version; manifestRecord.keys = Array.from(manifestRecordKeys); const storageKeyBase64 = window.storage.get('storageKey'); if (!storageKeyBase64) { throw new Error('No storage key'); } const storageKey = base64ToArrayBuffer(storageKeyBase64); const storageManifestKey = await deriveStorageManifestKey( storageKey, version ); const encryptedManifest = await Crypto.encryptProfile( typedArrayToArrayBuffer( Proto.ManifestRecord.encode(manifestRecord).finish() ), storageManifestKey ); const storageManifest = new Proto.StorageManifest(); storageManifest.version = version; storageManifest.value = new FIXMEU8(encryptedManifest); return { conversationsToUpdate, deleteKeys, newItems, storageManifest, }; } async function uploadManifest( version: number, { conversationsToUpdate, deleteKeys, newItems, storageManifest, }: GeneratedManifestType ): Promise { if (!window.textsecure.messaging) { throw new Error('storageService.uploadManifest: We are offline!'); } if (newItems.size === 0 && deleteKeys.length === 0) { window.log.info('storageService.uploadManifest: nothing to upload'); return; } const credentials = window.storage.get('storageCredentials'); try { window.log.info( 'storageService.uploadManifest: keys inserting, deleting:', newItems.size, deleteKeys.length ); const writeOperation = new Proto.WriteOperation(); writeOperation.manifest = storageManifest; writeOperation.insertItem = Array.from(newItems); writeOperation.deleteKey = deleteKeys.map(key => new FIXMEU8(key)); window.log.info('storageService.uploadManifest: uploading...', version); await window.textsecure.messaging.modifyStorageRecords( typedArrayToArrayBuffer( Proto.WriteOperation.encode(writeOperation).finish() ), { credentials, } ); window.log.info( 'storageService.uploadManifest: upload done, updating conversation(s) with new storageIDs:', conversationsToUpdate.length ); // update conversations with the new storageID conversationsToUpdate.forEach(({ conversation, storageID }) => { conversation.set({ needsStorageServiceSync: false, storageID, }); updateConversation(conversation.attributes); }); } catch (err) { window.log.error( 'storageService.uploadManifest: failed!', err && err.stack ? err.stack : String(err) ); if (err.code === 409) { if (conflictBackOff.isFull()) { window.log.error( 'storageService.uploadManifest: Exceeded maximum consecutive conflicts' ); return; } window.log.info( `storageService.uploadManifest: Conflict found with v${version}, running sync job times(${conflictBackOff.getIndex()})` ); throw err; } throw err; } window.log.info( 'storageService.uploadManifest: setting new manifestVersion', version ); window.storage.put('manifestVersion', version); conflictBackOff.reset(); backOff.reset(); if (window.ConversationController.areWePrimaryDevice()) { window.log.warn( 'uploadManifest: We are primary device; not sending sync manifest' ); return; } await handleMessageSend( window.textsecure.messaging.sendFetchManifestSyncMessage(), { messageIds: [], sendType: 'otherSync' } ); } async function stopStorageServiceSync() { window.log.info('storageService.stopStorageServiceSync'); await window.storage.remove('storageKey'); if (backOff.isFull()) { window.log.info( 'storageService.stopStorageServiceSync: too many consecutive stops' ); return; } await sleep(backOff.getAndIncrement()); window.log.info('storageService.stopStorageServiceSync: requesting new keys'); setTimeout(() => { if (!window.textsecure.messaging) { throw new Error('storageService.stopStorageServiceSync: We are offline!'); } if (window.ConversationController.areWePrimaryDevice()) { window.log.warn( 'stopStorageServiceSync: We are primary device; not sending key sync request' ); return; } handleMessageSend(window.textsecure.messaging.sendRequestKeySyncMessage(), { messageIds: [], sendType: 'otherSync', }); }); } async function createNewManifest() { window.log.info('storageService.createNewManifest: creating new manifest'); const version = window.storage.get('manifestVersion', 0); const { conversationsToUpdate, newItems, storageManifest, } = await generateManifest(version, undefined, true); await uploadManifest(version, { conversationsToUpdate, // we have created a new manifest, there should be no keys to delete deleteKeys: [], newItems, storageManifest, }); } async function decryptManifest( encryptedManifest: Proto.IStorageManifest ): Promise { const { version, value } = encryptedManifest; const storageKeyBase64 = window.storage.get('storageKey'); if (!storageKeyBase64) { throw new Error('No storage key'); } const storageKey = base64ToArrayBuffer(storageKeyBase64); const storageManifestKey = await deriveStorageManifestKey( storageKey, normalizeNumber(version ?? 0) ); strictAssert(value, 'StorageManifest has no value field'); const decryptedManifest = await Crypto.decryptProfile( typedArrayToArrayBuffer(value), storageManifestKey ); return Proto.ManifestRecord.decode(new FIXMEU8(decryptedManifest)); } async function fetchManifest( manifestVersion: number ): Promise { window.log.info('storageService.fetchManifest'); if (!window.textsecure.messaging) { throw new Error('storageService.fetchManifest: We are offline!'); } try { const credentials = await window.textsecure.messaging.getStorageCredentials(); window.storage.put('storageCredentials', credentials); const manifestBinary = await window.textsecure.messaging.getStorageManifest( { credentials, greaterThanVersion: manifestVersion, } ); const encryptedManifest = Proto.StorageManifest.decode( new FIXMEU8(manifestBinary) ); // if we don't get a value we're assuming that there's no newer manifest if (!encryptedManifest.value || !encryptedManifest.version) { window.log.info('storageService.fetchManifest: nothing changed'); return; } try { // eslint-disable-next-line consistent-return return decryptManifest(encryptedManifest); } catch (err) { await stopStorageServiceSync(); return; } } catch (err) { window.log.error( 'storageService.fetchManifest: failed!', err && err.stack ? err.stack : String(err) ); if (err.code === 404) { await createNewManifest(); return; } if (err.code === 204) { // noNewerManifest we're ok return; } throw err; } } type MergeableItemType = { itemType: number; storageID: string; storageRecord: Proto.IStorageRecord; }; type MergedRecordType = UnknownRecord & { hasConflict: boolean; hasError: boolean; isUnsupported: boolean; }; async function mergeRecord( itemToMerge: MergeableItemType ): Promise { const { itemType, storageID, storageRecord } = itemToMerge; const ITEM_TYPE = Proto.ManifestRecord.Identifier.Type; let hasConflict = false; let isUnsupported = false; let hasError = false; try { if (itemType === ITEM_TYPE.UNKNOWN) { window.log.info( 'storageService.mergeRecord: Unknown item type', storageID ); } else if (itemType === ITEM_TYPE.CONTACT && storageRecord.contact) { hasConflict = await mergeContactRecord(storageID, storageRecord.contact); } else if (itemType === ITEM_TYPE.GROUPV1 && storageRecord.groupV1) { hasConflict = await mergeGroupV1Record(storageID, storageRecord.groupV1); } else if (itemType === ITEM_TYPE.GROUPV2 && storageRecord.groupV2) { hasConflict = await mergeGroupV2Record(storageID, storageRecord.groupV2); } else if (itemType === ITEM_TYPE.ACCOUNT && storageRecord.account) { hasConflict = await mergeAccountRecord(storageID, storageRecord.account); } else { isUnsupported = true; window.log.info('storageService.mergeRecord: Unknown record:', itemType); } window.log.info( 'storageService.mergeRecord: merged', redactStorageID(storageID), itemType, hasConflict ); } catch (err) { hasError = true; window.log.error( 'storageService.mergeRecord: Error with', redactStorageID(storageID), itemType, String(err) ); } return { hasConflict, hasError, isUnsupported, itemType, storageID, }; } async function processManifest( manifest: Proto.IManifestRecord ): Promise { if (!window.textsecure.messaging) { throw new Error('storageService.processManifest: We are offline!'); } const remoteKeysTypeMap = new Map(); (manifest.keys || []).forEach(({ raw, type }: IManifestRecordIdentifier) => { strictAssert(raw, 'Identifier without raw field'); remoteKeysTypeMap.set(Bytes.toBase64(raw), type); }); const remoteKeys = new Set(remoteKeysTypeMap.keys()); const localKeys: Set = new Set(); const conversations = window.getConversations(); conversations.forEach((conversation: ConversationModel) => { const storageID = conversation.get('storageID'); if (storageID) { localKeys.add(storageID); } }); const unknownRecordsArray: ReadonlyArray = window.storage.get('storage-service-unknown-records') || []; const stillUnknown = unknownRecordsArray.filter((record: UnknownRecord) => { // Do not include any unknown records that we already support if (!validRecordTypes.has(record.itemType)) { localKeys.add(record.storageID); return false; } return true; }); window.log.info( 'storageService.processManifest: local records:', conversations.length ); window.log.info( 'storageService.processManifest: local keys:', localKeys.size ); window.log.info( 'storageService.processManifest: unknown records:', stillUnknown.length ); window.log.info( 'storageService.processManifest: remote keys:', remoteKeys.size ); const remoteOnlySet: Set = new Set(); remoteKeys.forEach((key: string) => { if (!localKeys.has(key)) { remoteOnlySet.add(key); } }); window.log.info( 'storageService.processManifest: remote ids:', Array.from(remoteOnlySet).map(redactStorageID).join(',') ); const remoteOnlyRecords = new Map(); remoteOnlySet.forEach(storageID => { remoteOnlyRecords.set(storageID, { storageID, itemType: remoteKeysTypeMap.get(storageID), }); }); const conflictCount = await processRemoteRecords(remoteOnlyRecords); // Post-merge, if our local records contain any storage IDs that were not // present in the remote manifest then we'll need to clear it, generate a // new storageID for that record, and upload. // This might happen if a device pushes a manifest which doesn't contain // the keys that we have in our local database. window.getConversations().forEach((conversation: ConversationModel) => { const storageID = conversation.get('storageID'); if (storageID && !remoteKeys.has(storageID)) { window.log.info( 'storageService.processManifest: local key was not in remote manifest', redactStorageID(storageID), conversation.idForLogging() ); conversation.unset('storageID'); updateConversation(conversation.attributes); } }); return conflictCount !== 0; } async function processRemoteRecords( remoteOnlyRecords: Map ): Promise { const storageKeyBase64 = window.storage.get('storageKey'); if (!storageKeyBase64) { throw new Error('No storage key'); } const storageKey = base64ToArrayBuffer(storageKeyBase64); window.log.info( 'storageService.processRemoteRecords: remote only keys', remoteOnlyRecords.size ); const readOperation = new Proto.ReadOperation(); readOperation.readKey = Array.from(remoteOnlyRecords.keys()).map( Bytes.fromBase64 ); const credentials = window.storage.get('storageCredentials'); const storageItemsBuffer = await window.textsecure.messaging.getStorageRecords( typedArrayToArrayBuffer(Proto.ReadOperation.encode(readOperation).finish()), { credentials, } ); const storageItems = Proto.StorageItems.decode( new FIXMEU8(storageItemsBuffer) ); if (!storageItems.items) { window.log.info( 'storageService.processRemoteRecords: No storage items retrieved' ); return 0; } const decryptedStorageItems = await pMap( storageItems.items, async ( storageRecordWrapper: Proto.IStorageItem ): Promise => { const { key, value: storageItemCiphertext } = storageRecordWrapper; if (!key || !storageItemCiphertext) { window.log.error( 'storageService.processRemoteRecords: No key or Ciphertext available' ); await stopStorageServiceSync(); throw new Error( 'storageService.processRemoteRecords: Missing key and/or Ciphertext' ); } const base64ItemID = Bytes.toBase64(key); const storageItemKey = await deriveStorageItemKey( storageKey, base64ItemID ); let storageItemPlaintext; try { storageItemPlaintext = await Crypto.decryptProfile( typedArrayToArrayBuffer(storageItemCiphertext), storageItemKey ); } catch (err) { window.log.error( 'storageService.processRemoteRecords: Error decrypting storage item' ); await stopStorageServiceSync(); throw err; } const storageRecord = Proto.StorageRecord.decode( new FIXMEU8(storageItemPlaintext) ); const remoteRecord = remoteOnlyRecords.get(base64ItemID); if (!remoteRecord) { throw new Error( "Got a remote record that wasn't requested with " + `storageID: ${base64ItemID}` ); } return { itemType: remoteRecord.itemType, storageID: base64ItemID, storageRecord, }; }, { concurrency: 5 } ); // Merge Account records last since it contains the pinned conversations // and we need all other records merged first before we can find the pinned // records in our db const ITEM_TYPE = Proto.ManifestRecord.Identifier.Type; const sortedStorageItems = decryptedStorageItems.sort((_, b) => b.itemType === ITEM_TYPE.ACCOUNT ? -1 : 1 ); try { window.log.info( `storageService.processRemoteRecords: Attempting to merge ${sortedStorageItems.length} records` ); const mergedRecords = await pMap(sortedStorageItems, mergeRecord, { concurrency: 5, }); window.log.info( `storageService.processRemoteRecords: Processed ${mergedRecords.length} records` ); // Collect full map of previously and currently unknown records const unknownRecords: Map = new Map(); const unknownRecordsArray: ReadonlyArray = window.storage.get( 'storage-service-unknown-records', new Array() ); unknownRecordsArray.forEach((record: UnknownRecord) => { unknownRecords.set(record.storageID, record); }); const newRecordsWithErrors: Array = []; let conflictCount = 0; mergedRecords.forEach((mergedRecord: MergedRecordType) => { if (mergedRecord.isUnsupported) { unknownRecords.set(mergedRecord.storageID, { itemType: mergedRecord.itemType, storageID: mergedRecord.storageID, }); } else if (mergedRecord.hasError) { newRecordsWithErrors.push({ itemType: mergedRecord.itemType, storageID: mergedRecord.storageID, }); } if (mergedRecord.hasConflict) { conflictCount += 1; } }); // Filter out all the unknown records we're already supporting const newUnknownRecords = Array.from(unknownRecords.values()).filter( (record: UnknownRecord) => !validRecordTypes.has(record.itemType) ); window.log.info( 'storageService.processRemoteRecords: Unknown records found:', newUnknownRecords.length ); window.storage.put('storage-service-unknown-records', newUnknownRecords); window.log.info( 'storageService.processRemoteRecords: Records with errors:', newRecordsWithErrors.length ); // Refresh the list of records that had errors with every push, that way // this list doesn't grow unbounded and we keep the list of storage keys // fresh. window.storage.put('storage-service-error-records', newRecordsWithErrors); if (conflictCount !== 0) { window.log.info( 'storageService.processRemoteRecords: ' + `${conflictCount} conflicts found, uploading changes` ); return conflictCount; } conflictBackOff.reset(); } catch (err) { window.log.error( 'storageService.processRemoteRecords: failed!', err && err.stack ? err.stack : String(err) ); } return 0; } async function sync(): Promise { if (!isStorageWriteFeatureEnabled()) { window.log.info( 'storageService.sync: Not starting desktop.storage is falsey' ); return undefined; } if (!window.storage.get('storageKey')) { throw new Error('storageService.sync: Cannot start; no storage key!'); } window.log.info('storageService.sync: starting...'); let manifest: Proto.ManifestRecord | undefined; try { // If we've previously interacted with strage service, update 'fetchComplete' record const previousFetchComplete = window.storage.get('storageFetchComplete'); const manifestFromStorage = window.storage.get('manifestVersion'); if (!previousFetchComplete && isNumber(manifestFromStorage)) { window.storage.put('storageFetchComplete', true); } const localManifestVersion = manifestFromStorage || 0; manifest = await fetchManifest(localManifestVersion); // Guarding against no manifests being returned, everything should be ok if (!manifest) { window.log.info('storageService.sync: no new manifest'); return undefined; } strictAssert( manifest.version !== undefined && manifest.version !== null, 'Manifest without version' ); const version = normalizeNumber(manifest.version); window.log.info( `storageService.sync: manifest versions - previous: ${localManifestVersion}, current: ${version}` ); window.storage.put('manifestVersion', version); const hasConflicts = await processManifest(manifest); if (hasConflicts) { await upload(true); } // We now know that we've successfully completed a storage service fetch window.storage.put('storageFetchComplete', true); } catch (err) { window.log.error( 'storageService.sync: error processing manifest', err && err.stack ? err.stack : String(err) ); } window.Signal.Util.postLinkExperience.stop(); window.log.info('storageService.sync: complete'); return manifest; } async function upload(fromSync = false): Promise { if (!isStorageWriteFeatureEnabled()) { window.log.info( 'storageService.upload: Not starting because the feature is not enabled' ); return; } if (!window.textsecure.messaging) { throw new Error('storageService.upload: We are offline!'); } // Rate limit uploads coming from syncing if (fromSync) { uploadBucket.push(Date.now()); if (uploadBucket.length >= 3) { const [firstMostRecentWrite] = uploadBucket; if (isMoreRecentThan(5 * durations.MINUTE, firstMostRecentWrite)) { throw new Error( 'storageService.uploadManifest: too many writes too soon.' ); } uploadBucket.shift(); } } if (!window.storage.get('storageKey')) { // requesting new keys runs the sync job which will detect the conflict // and re-run the upload job once we're merged and up-to-date. window.log.info( 'storageService.upload: no storageKey, requesting new keys' ); backOff.reset(); if (window.ConversationController.areWePrimaryDevice()) { window.log.warn( 'upload: We are primary device; not sending key sync request' ); return; } await handleMessageSend( window.textsecure.messaging.sendRequestKeySyncMessage(), { messageIds: [], sendType: 'otherSync' } ); return; } let previousManifest: Proto.ManifestRecord | undefined; if (!fromSync) { // Syncing before we upload so that we repair any unknown records and // records with errors as well as ensure that we have the latest up to date // manifest. previousManifest = await sync(); } const localManifestVersion = window.storage.get('manifestVersion', 0); const version = Number(localManifestVersion) + 1; window.log.info( 'storageService.upload: will update to manifest version', version ); try { const generatedManifest = await generateManifest(version, previousManifest); await uploadManifest(version, generatedManifest); } catch (err) { if (err.code === 409) { await sleep(conflictBackOff.getAndIncrement()); window.log.info('storageService.upload: pushing sync on the queue'); // The sync job will check for conflicts and as part of that conflict // check if an item needs sync and doesn't match with the remote record // it'll kick off another upload. setTimeout(runStorageServiceSyncJob); return; } window.log.error( 'storageService.upload', err && err.stack ? err.stack : String(err) ); } } let storageServiceEnabled = false; export function enableStorageService(): void { storageServiceEnabled = true; } // Note: this function is meant to be called before ConversationController is hydrated. // It goes directly to the database, so in-memory conversations will be out of date. export async function eraseAllStorageServiceState(): Promise { window.log.info('storageService.eraseAllStorageServiceState: starting...'); await Promise.all([ window.storage.remove('manifestVersion'), window.storage.remove('storage-service-unknown-records'), window.storage.remove('storageCredentials'), ]); await eraseStorageServiceStateFromConversations(); window.log.info('storageService.eraseAllStorageServiceState: complete'); } export const storageServiceUploadJob = debounce(() => { if (!storageServiceEnabled) { window.log.info( 'storageService.storageServiceUploadJob: called before enabled' ); return; } storageJobQueue(async () => { await upload(); }, `upload v${window.storage.get('manifestVersion')}`); }, 500); export const runStorageServiceSyncJob = debounce(() => { if (!storageServiceEnabled) { window.log.info( 'storageService.runStorageServiceSyncJob: called before enabled' ); return; } ourProfileKeyService.blockGetWithPromise( storageJobQueue(async () => { await sync(); }, `sync v${window.storage.get('manifestVersion')}`) ); }, 500);