signal-desktop/ts/services/storage.ts

1042 lines
30 KiB
TypeScript
Raw Normal View History

2021-03-03 20:09:58 +00:00
// Copyright 2020-2021 Signal Messenger, LLC
2020-10-30 20:34:04 +00:00
// SPDX-License-Identifier: AGPL-3.0-only
import { debounce, isNumber, partition } from 'lodash';
2020-09-09 00:56:23 +00:00
import pMap from 'p-map';
import Crypto from '../textsecure/Crypto';
import dataInterface from '../sql/Client';
import {
arrayBufferToBase64,
base64ToArrayBuffer,
deriveStorageItemKey,
deriveStorageManifestKey,
} from '../Crypto';
import {
ManifestRecordClass,
ManifestRecordIdentifierClass,
StorageItemClass,
StorageManifestClass,
StorageRecordClass,
} from '../textsecure.d';
import {
mergeAccountRecord,
mergeContactRecord,
mergeGroupV1Record,
2020-09-09 02:25:05 +00:00
mergeGroupV2Record,
2020-09-09 00:56:23 +00:00
toAccountRecord,
toContactRecord,
toGroupV1Record,
2020-09-09 02:25:05 +00:00
toGroupV2Record,
2020-09-09 00:56:23 +00:00
} from './storageRecordOps';
import { ConversationModel } from '../models/conversations';
import { storageJobQueue } from '../util/JobQueue';
import { sleep } from '../util/sleep';
2021-03-03 20:09:58 +00:00
import { isStorageWriteFeatureEnabled } from '../storage/isFeatureEnabled';
2020-09-09 00:56:23 +00:00
const {
eraseStorageServiceStateFromConversations,
updateConversation,
} = dataInterface;
let consecutiveStops = 0;
2020-09-09 00:56:23 +00:00
let consecutiveConflicts = 0;
const forcedPushBucket: Array<number> = [];
const validRecordTypes = new Set([
0, // UNKNOWN
1, // CONTACT
2, // GROUPV1
3, // GROUPV2
4, // ACCOUNT
]);
2020-09-09 00:56:23 +00:00
type BackoffType = {
[key: number]: number | undefined;
max: number;
};
const SECOND = 1000;
const MINUTE = 60 * SECOND;
const BACKOFF: BackoffType = {
0: SECOND,
1: 5 * SECOND,
2: 30 * SECOND,
3: 2 * MINUTE,
max: 5 * MINUTE,
};
function backOff(count: number) {
const ms = BACKOFF[count] || BACKOFF.max;
return sleep(ms);
}
2020-09-09 00:56:23 +00:00
type UnknownRecord = {
itemType: number;
storageID: string;
};
async function encryptRecord(
storageID: string | undefined,
storageRecord: StorageRecordClass
): Promise<StorageItemClass> {
const storageItem = new window.textsecure.protobuf.StorageItem();
const storageKeyBuffer = storageID
? base64ToArrayBuffer(String(storageID))
: generateStorageID();
const storageKeyBase64 = window.storage.get('storageKey');
const storageKey = base64ToArrayBuffer(storageKeyBase64);
const storageItemKey = await deriveStorageItemKey(
storageKey,
arrayBufferToBase64(storageKeyBuffer)
);
const encryptedRecord = await Crypto.encryptProfile(
storageRecord.toArrayBuffer(),
storageItemKey
);
storageItem.key = storageKeyBuffer;
storageItem.value = encryptedRecord;
return storageItem;
}
function generateStorageID(): ArrayBuffer {
return Crypto.getRandomBytes(16);
}
type GeneratedManifestType = {
conversationsToUpdate: Array<{
conversation: ConversationModel;
2020-09-09 00:56:23 +00:00
storageID: string | undefined;
}>;
deleteKeys: Array<ArrayBuffer>;
newItems: Set<StorageItemClass>;
storageManifest: StorageManifestClass;
};
async function generateManifest(
version: number,
isNewManifest = false
): Promise<GeneratedManifestType> {
window.log.info(
2020-10-06 22:25:00 +00:00
'storageService.generateManifest: generating manifest',
version,
isNewManifest
2020-09-09 00:56:23 +00:00
);
await window.ConversationController.checkForConflicts();
2020-09-09 00:56:23 +00:00
const ITEM_TYPE = window.textsecure.protobuf.ManifestRecord.Identifier.Type;
const conversationsToUpdate = [];
const deleteKeys: Array<ArrayBuffer> = [];
2020-09-09 00:56:23 +00:00
const manifestRecordKeys: Set<ManifestRecordIdentifierClass> = new Set();
const newItems: Set<StorageItemClass> = new Set();
const conversations = window.getConversations();
for (let i = 0; i < conversations.length; i += 1) {
const conversation = conversations.models[i];
const identifier = new window.textsecure.protobuf.ManifestRecord.Identifier();
let storageRecord;
if (conversation.isMe()) {
storageRecord = new window.textsecure.protobuf.StorageRecord();
// eslint-disable-next-line no-await-in-loop
storageRecord.account = await toAccountRecord(conversation);
identifier.type = ITEM_TYPE.ACCOUNT;
} else if (conversation.isPrivate()) {
storageRecord = new window.textsecure.protobuf.StorageRecord();
// eslint-disable-next-line no-await-in-loop
storageRecord.contact = await toContactRecord(conversation);
identifier.type = ITEM_TYPE.CONTACT;
2020-10-02 22:19:52 +00:00
} else if (conversation.isGroupV2()) {
2020-09-09 02:25:05 +00:00
storageRecord = new window.textsecure.protobuf.StorageRecord();
// eslint-disable-next-line no-await-in-loop
storageRecord.groupV2 = await toGroupV2Record(conversation);
2020-09-09 02:25:05 +00:00
identifier.type = ITEM_TYPE.GROUPV2;
} else if (conversation.isGroupV1()) {
2020-09-09 00:56:23 +00:00
storageRecord = new window.textsecure.protobuf.StorageRecord();
// eslint-disable-next-line no-await-in-loop
storageRecord.groupV1 = await toGroupV1Record(conversation);
identifier.type = ITEM_TYPE.GROUPV1;
} else {
window.log.info(
'storageService.generateManifest: unknown conversation',
conversation.debugID()
);
2020-09-09 00:56:23 +00:00
}
if (storageRecord) {
const currentStorageID = conversation.get('storageID');
2020-09-09 00:56:23 +00:00
const isNewItem =
isNewManifest ||
Boolean(conversation.get('needsStorageServiceSync')) ||
!currentStorageID;
2020-09-09 00:56:23 +00:00
const storageID = isNewItem
? arrayBufferToBase64(generateStorageID())
: currentStorageID;
2020-09-09 00:56:23 +00:00
let storageItem;
try {
// eslint-disable-next-line no-await-in-loop
storageItem = await encryptRecord(storageID, storageRecord);
} catch (err) {
window.log.error(
2020-10-07 23:44:55 +00:00
'storageService.generateManifest: encrypt record failed:',
err && err.stack ? err.stack : String(err)
);
throw err;
}
2020-09-09 00:56:23 +00:00
identifier.raw = storageItem.key;
// When a client needs to update a given record it should create it
// under a new key and delete the existing key.
if (isNewItem) {
newItems.add(storageItem);
const oldStorageID = conversation.get('storageID');
if (oldStorageID) {
deleteKeys.push(base64ToArrayBuffer(oldStorageID));
}
conversationsToUpdate.push({
conversation,
storageID,
});
}
manifestRecordKeys.add(identifier);
}
}
2020-10-06 22:25:00 +00:00
const unknownRecordsArray = (
window.storage.get('storage-service-unknown-records') || []
).filter((record: UnknownRecord) => !validRecordTypes.has(record.itemType));
2020-09-09 00:56:23 +00:00
window.log.info(
2020-10-06 22:25:00 +00:00
'storageService.generateManifest: adding unknown records:',
unknownRecordsArray.length
2020-09-09 00:56:23 +00:00
);
// When updating the manifest, ensure all "unknown" keys are added to the
// new manifest, so we don't inadvertently delete something we don't understand
unknownRecordsArray.forEach((record: UnknownRecord) => {
const identifier = new window.textsecure.protobuf.ManifestRecord.Identifier();
identifier.type = record.itemType;
identifier.raw = base64ToArrayBuffer(record.storageID);
manifestRecordKeys.add(identifier);
});
const recordsWithErrors =
window.storage.get('storage-service-error-records') || [];
window.log.info(
2020-10-06 22:25:00 +00:00
'storageService.generateManifest: adding records that had errors in the previous merge',
recordsWithErrors.length
);
// These records failed to merge in the previous fetchManifest, but we still
// need to include them so that the manifest is complete
recordsWithErrors.forEach((record: UnknownRecord) => {
2020-09-09 00:56:23 +00:00
const identifier = new window.textsecure.protobuf.ManifestRecord.Identifier();
identifier.type = record.itemType;
identifier.raw = base64ToArrayBuffer(record.storageID);
manifestRecordKeys.add(identifier);
});
// Validate before writing
const rawDuplicates = new Set();
const typeRawDuplicates = new Set();
let hasAccountType = false;
manifestRecordKeys.forEach(identifier => {
// Ensure there are no duplicate StorageIdentifiers in your manifest
// This can be broken down into two parts:
// There are no duplicate type+raw pairs
// There are no duplicate raw bytes
const storageID = arrayBufferToBase64(identifier.raw);
const typeAndRaw = `${identifier.type}+${storageID}`;
if (
rawDuplicates.has(identifier.raw) ||
typeRawDuplicates.has(typeAndRaw)
) {
window.log.info(
'storageService.generateManifest: removing duplicate identifier from manifest',
2020-10-07 23:44:55 +00:00
identifier.type
);
manifestRecordKeys.delete(identifier);
}
rawDuplicates.add(identifier.raw);
typeRawDuplicates.add(typeAndRaw);
// Ensure all deletes are not present in the manifest
const hasDeleteKey = deleteKeys.find(
key => arrayBufferToBase64(key) === storageID
);
if (hasDeleteKey) {
window.log.info(
'storageService.generateManifest: removing key which has been deleted',
2020-10-07 23:44:55 +00:00
identifier.type
);
manifestRecordKeys.delete(identifier);
}
// Ensure that there is *exactly* one Account type in the manifest
if (identifier.type === ITEM_TYPE.ACCOUNT) {
if (hasAccountType) {
window.log.info(
2020-10-07 23:44:55 +00:00
'storageService.generateManifest: removing duplicate account'
);
manifestRecordKeys.delete(identifier);
}
hasAccountType = true;
}
});
rawDuplicates.clear();
typeRawDuplicates.clear();
const storageKeyDuplicates = new Set();
newItems.forEach(storageItem => {
// Ensure there are no duplicate StorageIdentifiers in your list of inserts
const storageID = storageItem.key;
if (storageKeyDuplicates.has(storageID)) {
window.log.info(
'storageService.generateManifest: removing duplicate identifier from inserts',
storageID
);
newItems.delete(storageItem);
}
storageKeyDuplicates.add(storageID);
});
storageKeyDuplicates.clear();
2020-09-09 00:56:23 +00:00
const manifestRecord = new window.textsecure.protobuf.ManifestRecord();
manifestRecord.version = version;
manifestRecord.keys = Array.from(manifestRecordKeys);
const storageKeyBase64 = window.storage.get('storageKey');
const storageKey = base64ToArrayBuffer(storageKeyBase64);
const storageManifestKey = await deriveStorageManifestKey(
storageKey,
version
);
const encryptedManifest = await Crypto.encryptProfile(
manifestRecord.toArrayBuffer(),
storageManifestKey
);
const storageManifest = new window.textsecure.protobuf.StorageManifest();
storageManifest.version = version;
storageManifest.value = encryptedManifest;
return {
conversationsToUpdate,
deleteKeys,
newItems,
storageManifest,
};
}
async function uploadManifest(
version: number,
{
conversationsToUpdate,
deleteKeys,
newItems,
storageManifest,
}: GeneratedManifestType
): Promise<void> {
if (!window.textsecure.messaging) {
throw new Error('storageService.uploadManifest: We are offline!');
}
if (newItems.size === 0 && deleteKeys.length === 0) {
window.log.info('storageService.uploadManifest: nothing to upload');
return;
}
2020-09-09 00:56:23 +00:00
const credentials = window.storage.get('storageCredentials');
try {
window.log.info(
2020-10-06 22:25:00 +00:00
'storageService.uploadManifest: keys inserting, deleting:',
newItems.size,
deleteKeys.length
2020-09-09 00:56:23 +00:00
);
const writeOperation = new window.textsecure.protobuf.WriteOperation();
writeOperation.manifest = storageManifest;
writeOperation.insertItem = Array.from(newItems);
writeOperation.deleteKey = deleteKeys;
2020-09-09 00:56:23 +00:00
window.log.info('storageService.uploadManifest: uploading...', version);
2020-09-09 00:56:23 +00:00
await window.textsecure.messaging.modifyStorageRecords(
writeOperation.toArrayBuffer(),
{
credentials,
}
);
window.log.info(
2020-10-06 22:25:00 +00:00
'storageService.uploadManifest: upload done, updating conversation(s) with new storageIDs:',
conversationsToUpdate.length
2020-09-09 00:56:23 +00:00
);
// update conversations with the new storageID
conversationsToUpdate.forEach(({ conversation, storageID }) => {
conversation.set({
needsStorageServiceSync: false,
storageID,
});
updateConversation(conversation.attributes);
});
} catch (err) {
window.log.error(
2020-10-07 23:44:55 +00:00
'storageService.uploadManifest: failed!',
err && err.stack ? err.stack : String(err)
2020-09-09 00:56:23 +00:00
);
if (err.code === 409) {
if (consecutiveConflicts > 3) {
window.log.error(
'storageService.uploadManifest: Exceeded maximum consecutive conflicts'
);
return;
}
consecutiveConflicts += 1;
window.log.info(
`storageService.uploadManifest: Conflict found with v${version}, running sync job times(${consecutiveConflicts})`
);
2020-09-09 00:56:23 +00:00
throw err;
}
throw err;
}
window.log.info(
'storageService.uploadManifest: setting new manifestVersion',
version
);
window.storage.put('manifestVersion', version);
consecutiveConflicts = 0;
consecutiveStops = 0;
2020-09-09 00:56:23 +00:00
await window.textsecure.messaging.sendFetchManifestSyncMessage();
}
async function stopStorageServiceSync() {
window.log.info('storageService.stopStorageServiceSync');
await window.storage.remove('storageKey');
if (consecutiveStops < 5) {
await backOff(consecutiveStops);
window.log.info(
'storageService.stopStorageServiceSync: requesting new keys'
);
consecutiveStops += 1;
setTimeout(() => {
if (!window.textsecure.messaging) {
throw new Error(
'storageService.stopStorageServiceSync: We are offline!'
);
}
window.textsecure.messaging.sendRequestKeySyncMessage();
});
2020-09-09 00:56:23 +00:00
}
}
async function createNewManifest() {
window.log.info('storageService.createNewManifest: creating new manifest');
const version = window.storage.get('manifestVersion') || 0;
const {
conversationsToUpdate,
newItems,
storageManifest,
} = await generateManifest(version, true);
await uploadManifest(version, {
conversationsToUpdate,
// we have created a new manifest, there should be no keys to delete
deleteKeys: [],
newItems,
storageManifest,
});
}
async function decryptManifest(
encryptedManifest: StorageManifestClass
): Promise<ManifestRecordClass> {
const { version, value } = encryptedManifest;
const storageKeyBase64 = window.storage.get('storageKey');
const storageKey = base64ToArrayBuffer(storageKeyBase64);
const storageManifestKey = await deriveStorageManifestKey(
storageKey,
typeof version === 'number' ? version : version.toNumber()
);
const decryptedManifest = await Crypto.decryptProfile(
typeof value.toArrayBuffer === 'function' ? value.toArrayBuffer() : value,
storageManifestKey
);
return window.textsecure.protobuf.ManifestRecord.decode(decryptedManifest);
}
async function fetchManifest(
manifestVersion: string
): Promise<ManifestRecordClass | undefined> {
window.log.info('storageService.fetchManifest');
if (!window.textsecure.messaging) {
throw new Error('storageService.fetchManifest: We are offline!');
}
try {
const credentials = await window.textsecure.messaging.getStorageCredentials();
window.storage.put('storageCredentials', credentials);
const manifestBinary = await window.textsecure.messaging.getStorageManifest(
{
credentials,
greaterThanVersion: manifestVersion,
}
);
const encryptedManifest = window.textsecure.protobuf.StorageManifest.decode(
manifestBinary
);
// if we don't get a value we're assuming that there's no newer manifest
if (!encryptedManifest.value || !encryptedManifest.version) {
window.log.info('storageService.fetchManifest: nothing changed');
return;
}
try {
// eslint-disable-next-line consistent-return
return decryptManifest(encryptedManifest);
} catch (err) {
await stopStorageServiceSync();
return;
}
} catch (err) {
window.log.error(
2020-10-07 23:44:55 +00:00
'storageService.fetchManifest: failed!',
err && err.stack ? err.stack : String(err)
2020-09-09 00:56:23 +00:00
);
if (err.code === 404) {
await createNewManifest();
return;
2020-09-09 02:25:05 +00:00
}
if (err.code === 204) {
2020-09-09 00:56:23 +00:00
// noNewerManifest we're ok
return;
}
throw err;
}
}
type MergeableItemType = {
itemType: number;
storageID: string;
storageRecord: StorageRecordClass;
};
type MergedRecordType = UnknownRecord & {
hasConflict: boolean;
hasError: boolean;
2020-09-09 00:56:23 +00:00
isUnsupported: boolean;
};
async function mergeRecord(
itemToMerge: MergeableItemType
): Promise<MergedRecordType> {
const { itemType, storageID, storageRecord } = itemToMerge;
const ITEM_TYPE = window.textsecure.protobuf.ManifestRecord.Identifier.Type;
let hasConflict = false;
let isUnsupported = false;
let hasError = false;
2020-09-09 00:56:23 +00:00
try {
if (itemType === ITEM_TYPE.UNKNOWN) {
window.log.info(
'storageService.mergeRecord: Unknown item type',
storageID
);
} else if (itemType === ITEM_TYPE.CONTACT && storageRecord.contact) {
hasConflict = await mergeContactRecord(storageID, storageRecord.contact);
} else if (itemType === ITEM_TYPE.GROUPV1 && storageRecord.groupV1) {
hasConflict = await mergeGroupV1Record(storageID, storageRecord.groupV1);
2020-10-07 18:27:08 +00:00
} else if (itemType === ITEM_TYPE.GROUPV2 && storageRecord.groupV2) {
2020-09-09 02:25:05 +00:00
hasConflict = await mergeGroupV2Record(storageID, storageRecord.groupV2);
2020-09-09 00:56:23 +00:00
} else if (itemType === ITEM_TYPE.ACCOUNT && storageRecord.account) {
hasConflict = await mergeAccountRecord(storageID, storageRecord.account);
} else {
isUnsupported = true;
2020-10-07 23:44:55 +00:00
window.log.info('storageService.mergeRecord: Unknown record:', itemType);
2020-09-09 00:56:23 +00:00
}
} catch (err) {
hasError = true;
2020-09-09 00:56:23 +00:00
window.log.error(
2020-09-16 18:04:28 +00:00
'storageService.mergeRecord: merging record failed',
err && err.stack ? err.stack : String(err)
2020-09-09 00:56:23 +00:00
);
}
return {
hasConflict,
hasError,
2020-09-09 00:56:23 +00:00
isUnsupported,
itemType,
storageID,
};
}
async function processManifest(
manifest: ManifestRecordClass
): Promise<boolean> {
const storageKeyBase64 = window.storage.get('storageKey');
const storageKey = base64ToArrayBuffer(storageKeyBase64);
if (!window.textsecure.messaging) {
throw new Error('storageService.processManifest: We are offline!');
}
const remoteKeysTypeMap = new Map();
manifest.keys.forEach((identifier: ManifestRecordIdentifierClass) => {
remoteKeysTypeMap.set(
arrayBufferToBase64(identifier.raw.toArrayBuffer()),
identifier.type
);
});
const localKeys = window
.getConversations()
.map((conversation: ConversationModel) => conversation.get('storageID'))
2020-09-09 00:56:23 +00:00
.filter(Boolean);
const unknownRecordsArray =
window.storage.get('storage-service-unknown-records') || [];
unknownRecordsArray.forEach((record: UnknownRecord) => {
// Do not include any unknown records that we already support
if (!validRecordTypes.has(record.itemType)) {
localKeys.push(record.storageID);
}
2020-09-09 00:56:23 +00:00
});
2020-10-06 22:25:00 +00:00
const recordsWithErrors =
window.storage.get('storage-service-error-records') || [];
// Do not fetch any records that we failed to merge in the previous fetch
recordsWithErrors.forEach((record: UnknownRecord) => {
localKeys.push(record.storageID);
});
2020-09-09 00:56:23 +00:00
window.log.info(
2020-10-07 23:44:55 +00:00
'storageService.processManifest: local keys:',
localKeys.length
);
window.log.info(
'storageService.processManifest: incl. unknown records:',
unknownRecordsArray.length
);
window.log.info(
'storageService.processManifest: incl. records with errors:',
recordsWithErrors.length
2020-09-09 00:56:23 +00:00
);
const remoteKeys = Array.from(remoteKeysTypeMap.keys());
const remoteOnlySet: Set<string> = new Set();
remoteKeys.forEach((key: string) => {
if (!localKeys.includes(key)) {
remoteOnlySet.add(key);
}
});
const remoteOnly = Array.from(remoteOnlySet);
2020-09-09 00:56:23 +00:00
window.log.info(
2020-10-07 23:44:55 +00:00
'storageService.processManifest: remote keys',
remoteOnly.length
2020-09-09 00:56:23 +00:00
);
const readOperation = new window.textsecure.protobuf.ReadOperation();
readOperation.readKey = remoteOnly.map(base64ToArrayBuffer);
const credentials = window.storage.get('storageCredentials');
const storageItemsBuffer = await window.textsecure.messaging.getStorageRecords(
readOperation.toArrayBuffer(),
{
credentials,
}
);
const storageItems = window.textsecure.protobuf.StorageItems.decode(
storageItemsBuffer
);
if (!storageItems.items) {
window.log.info(
'storageService.processManifest: No storage items retrieved'
);
return false;
}
const decryptedStorageItems = await pMap(
storageItems.items,
async (
storageRecordWrapper: StorageItemClass
): Promise<MergeableItemType> => {
2020-09-09 00:56:23 +00:00
const { key, value: storageItemCiphertext } = storageRecordWrapper;
if (!key || !storageItemCiphertext) {
window.log.error(
'storageService.processManifest: No key or Ciphertext available'
);
2020-09-09 00:56:23 +00:00
await stopStorageServiceSync();
throw new Error(
'storageService.processManifest: Missing key and/or Ciphertext'
);
}
const base64ItemID = arrayBufferToBase64(key.toArrayBuffer());
const storageItemKey = await deriveStorageItemKey(
storageKey,
base64ItemID
);
let storageItemPlaintext;
try {
storageItemPlaintext = await Crypto.decryptProfile(
storageItemCiphertext.toArrayBuffer(),
storageItemKey
);
} catch (err) {
window.log.error(
'storageService.processManifest: Error decrypting storage item'
);
2020-09-09 00:56:23 +00:00
await stopStorageServiceSync();
throw err;
}
const storageRecord = window.textsecure.protobuf.StorageRecord.decode(
storageItemPlaintext
);
return {
itemType: remoteKeysTypeMap.get(base64ItemID),
storageID: base64ItemID,
storageRecord,
};
},
{ concurrency: 5 }
2020-09-09 00:56:23 +00:00
);
// Merge Account records last
const sortedStorageItems = ([] as Array<MergeableItemType>).concat(
...partition(
decryptedStorageItems,
storageRecord => storageRecord.storageRecord.account === undefined
)
);
2020-09-09 00:56:23 +00:00
try {
2020-09-16 18:04:28 +00:00
window.log.info(
`storageService.processManifest: Attempting to merge ${sortedStorageItems.length} records`
2020-09-16 18:04:28 +00:00
);
const mergedRecords = await pMap(sortedStorageItems, mergeRecord, {
2020-09-09 00:56:23 +00:00
concurrency: 5,
});
2020-09-16 18:04:28 +00:00
window.log.info(
`storageService.processManifest: Processed ${mergedRecords.length} records`
2020-09-16 18:04:28 +00:00
);
2020-09-09 00:56:23 +00:00
const unknownRecords: Map<string, UnknownRecord> = new Map();
unknownRecordsArray.forEach((record: UnknownRecord) => {
unknownRecords.set(record.storageID, record);
});
2020-10-06 22:25:00 +00:00
const newRecordsWithErrors: Array<UnknownRecord> = [];
let hasConflict = false;
mergedRecords.forEach((mergedRecord: MergedRecordType) => {
2020-09-09 00:56:23 +00:00
if (mergedRecord.isUnsupported) {
unknownRecords.set(mergedRecord.storageID, {
itemType: mergedRecord.itemType,
storageID: mergedRecord.storageID,
});
} else if (mergedRecord.hasError) {
2020-10-06 22:25:00 +00:00
newRecordsWithErrors.push({
itemType: mergedRecord.itemType,
storageID: mergedRecord.storageID,
});
2020-09-09 00:56:23 +00:00
}
hasConflict = hasConflict || mergedRecord.hasConflict;
2020-09-09 00:56:23 +00:00
});
// Filter out all the unknown records we're already supporting
const newUnknownRecords = Array.from(unknownRecords.values()).filter(
(record: UnknownRecord) => !validRecordTypes.has(record.itemType)
);
window.log.info(
2020-10-06 22:25:00 +00:00
'storageService.processManifest: Unknown records found:',
newUnknownRecords.length
2020-09-09 00:56:23 +00:00
);
window.storage.put('storage-service-unknown-records', newUnknownRecords);
window.log.info(
2020-10-06 22:25:00 +00:00
'storageService.processManifest: Records with errors:',
newRecordsWithErrors.length
);
2020-10-06 22:25:00 +00:00
// Refresh the list of records that had errors with every push, that way
// this list doesn't grow unbounded and we keep the list of storage keys
// fresh.
window.storage.put('storage-service-error-records', newRecordsWithErrors);
const now = Date.now();
// if the remote only keys are larger or equal to our local keys then it
// was likely a forced push of storage service. We keep track of these
// merges so that we can detect possible infinite loops
if (remoteOnly.length >= localKeys.length) {
window.log.info(
'storageService.processManifest: remote manifest was likely force pushed',
now
);
forcedPushBucket.push(now);
// we need to check our conversations because maybe all of them were not
// updated properly, for those that weren't we'll clear their storage
// key so that they can be included in the next update
window.getConversations().forEach((conversation: ConversationModel) => {
const storageID = conversation.get('storageID');
if (storageID && !remoteOnlySet.has(storageID)) {
window.log.info(
2020-10-07 23:44:55 +00:00
'storageService.processManifest: clearing storageID',
conversation.debugID()
);
conversation.unset('storageID');
}
});
if (forcedPushBucket.length >= 3) {
const [firstMostRecentForcedPush] = forcedPushBucket;
if (now - firstMostRecentForcedPush < 5 * MINUTE) {
window.log.info(
'storageService.processManifest: thrasing? Backing off'
);
const error = new Error();
error.code = 'E_BACKOFF';
throw error;
}
window.log.info(
'storageService.processManifest: thrash timestamp of first -> now',
firstMostRecentForcedPush,
now
);
forcedPushBucket.shift();
}
}
2020-09-09 00:56:23 +00:00
if (hasConflict) {
window.log.info(
'storageService.processManifest: Conflict found, uploading changes'
);
return true;
}
2020-09-09 02:25:05 +00:00
consecutiveConflicts = 0;
2020-09-09 00:56:23 +00:00
} catch (err) {
window.log.error(
2020-10-07 23:44:55 +00:00
'storageService.processManifest: failed!',
err && err.stack ? err.stack : String(err)
2020-09-09 00:56:23 +00:00
);
}
return false;
}
async function sync(): Promise<void> {
2021-03-03 20:09:58 +00:00
if (!isStorageWriteFeatureEnabled()) {
window.log.info(
'storageService.sync: Not starting desktop.storage is falsey'
);
return;
}
2020-09-09 00:56:23 +00:00
if (!window.storage.get('storageKey')) {
throw new Error('storageService.sync: Cannot start; no storage key!');
2020-09-09 00:56:23 +00:00
}
window.log.info('storageService.sync: starting...');
2020-09-09 00:56:23 +00:00
try {
// If we've previously interacted with strage service, update 'fetchComplete' record
const previousFetchComplete = window.storage.get('storageFetchComplete');
const manifestFromStorage = window.storage.get('manifestVersion');
if (!previousFetchComplete && isNumber(manifestFromStorage)) {
window.storage.put('storageFetchComplete', true);
}
const localManifestVersion = manifestFromStorage || 0;
2020-09-09 00:56:23 +00:00
const manifest = await fetchManifest(localManifestVersion);
// Guarding against no manifests being returned, everything should be ok
if (!manifest) {
window.log.info('storageService.sync: no new manifest');
2020-09-09 00:56:23 +00:00
return;
}
const version = manifest.version.toNumber();
window.log.info(
`storageService.sync: manifest versions - previous: ${localManifestVersion}, current: ${version}`
2020-09-09 00:56:23 +00:00
);
window.storage.put('manifestVersion', version);
2020-09-09 00:56:23 +00:00
const hasConflicts = await processManifest(manifest);
if (hasConflicts) {
await upload();
2020-09-09 00:56:23 +00:00
}
// We now know that we've successfully completed a storage service fetch
window.storage.put('storageFetchComplete', true);
2020-09-09 00:56:23 +00:00
} catch (err) {
window.log.error(
2020-10-07 23:44:55 +00:00
'storageService.sync: error processing manifest',
err && err.stack ? err.stack : String(err)
2020-09-09 00:56:23 +00:00
);
// When we're told to backoff, backoff to the max which should be
// ~5 minutes. If this job was running inside a queue it'll probably time
// out.
if (err.code === 'E_BACKOFF') {
await backOff(9001);
}
}
2020-09-09 00:56:23 +00:00
window.log.info('storageService.sync: complete');
2020-09-09 00:56:23 +00:00
}
async function upload(): Promise<void> {
2021-03-03 20:09:58 +00:00
if (!isStorageWriteFeatureEnabled()) {
window.log.info(
2021-03-03 20:09:58 +00:00
'storageService.upload: Not starting because the feature is not enabled'
);
return;
}
if (!window.textsecure.messaging) {
throw new Error('storageService.upload: We are offline!');
}
2020-09-09 00:56:23 +00:00
if (!window.storage.get('storageKey')) {
// requesting new keys runs the sync job which will detect the conflict
// and re-run the upload job once we're merged and up-to-date.
window.log.info(
'storageService.upload: no storageKey, requesting new keys'
2020-09-09 00:56:23 +00:00
);
consecutiveStops = 0;
await window.textsecure.messaging.sendRequestKeySyncMessage();
return;
2020-09-09 00:56:23 +00:00
}
const localManifestVersion = window.storage.get('manifestVersion') || 0;
const version = Number(localManifestVersion) + 1;
window.log.info(
'storageService.upload: will update to manifest version',
2020-09-09 00:56:23 +00:00
version
);
try {
const generatedManifest = await generateManifest(version);
await uploadManifest(version, generatedManifest);
2020-09-09 00:56:23 +00:00
} catch (err) {
if (err.code === 409) {
await backOff(consecutiveConflicts);
window.log.info('storageService.upload: pushing sync on the queue');
// The sync job will check for conflicts and as part of that conflict
// check if an item needs sync and doesn't match with the remote record
// it'll kick off another upload.
setTimeout(runStorageServiceSyncJob);
return;
2020-09-09 00:56:23 +00:00
}
window.log.error(
'storageService.upload',
err && err.stack ? err.stack : String(err)
);
2020-09-09 00:56:23 +00:00
}
}
let storageServiceEnabled = false;
export function enableStorageService(): void {
storageServiceEnabled = true;
}
// Note: this function is meant to be called before ConversationController is hydrated.
// It goes directly to the database, so in-memory conversations will be out of date.
export async function eraseAllStorageServiceState(): Promise<void> {
window.log.info('storageService.eraseAllStorageServiceState: starting...');
await Promise.all([
window.storage.remove('manifestVersion'),
window.storage.remove('storage-service-unknown-records'),
window.storage.remove('storageCredentials'),
]);
await eraseStorageServiceStateFromConversations();
window.log.info('storageService.eraseAllStorageServiceState: complete');
}
export const storageServiceUploadJob = debounce(() => {
if (!storageServiceEnabled) {
window.log.info(
'storageService.storageServiceUploadJob: called before enabled'
);
return;
}
storageJobQueue(upload, `upload v${window.storage.get('manifestVersion')}`);
}, 500);
export const runStorageServiceSyncJob = debounce(() => {
if (!storageServiceEnabled) {
window.log.info(
'storageService.runStorageServiceSyncJob: called before enabled'
);
return;
}
storageJobQueue(sync, `sync v${window.storage.get('manifestVersion')}`);
}, 500);