Storage Service fixes to prevent crashes on Android

* Merge report v1 group settings into local v2 group

The invariants of Storage Service mandate that the remote data always
takes precendence over the local data. We have to updated
blocked/whitelisted/... of the v2 group even if the record is for the v2
group. After doing such update - sync the manifest back to the Storage
Service with correct v2 record for the group.

* Repair errored records before uploading manifest

Fetch and re-attempt to merge errored records before uploading the
manifest. This is useful in the cases where we were not aware of the V1
group when the remote manifest was fetched, and became aware of it
before the new manifest is generated. In such scenario, we should fetch
the records for things we have failed on last time and attempt to merge
them with our data. If they are merged - we should not let their
storageIDs hang in the new manifest, which would cause group duplicates
and crashes on other clients.

* Create v1 group for storage service record

If we receive storage service record with v1 group that we didn't sync
yet (or just don't have for any other reason) - create it instead of
pushing it to `storage-service-error-records`.
This commit is contained in:
Fedor Indutny 2021-03-17 15:10:31 -07:00 committed by Josh Perez
parent 7ffc01f0b0
commit 11bcbded64
2 changed files with 179 additions and 53 deletions

View file

@ -70,11 +70,13 @@ function backOff(count: number) {
return sleep(ms); return sleep(ms);
} }
type UnknownRecord = { type RemoteRecord = {
itemType: number; itemType: number;
storageID: string; storageID: string;
}; };
type UnknownRecord = RemoteRecord;
async function encryptRecord( async function encryptRecord(
storageID: string | undefined, storageID: string | undefined,
storageRecord: StorageRecordClass storageRecord: StorageRecordClass
@ -129,6 +131,8 @@ async function generateManifest(
await window.ConversationController.checkForConflicts(); await window.ConversationController.checkForConflicts();
await repairUnknownAndErroredRecords();
const ITEM_TYPE = window.textsecure.protobuf.ManifestRecord.Identifier.Type; const ITEM_TYPE = window.textsecure.protobuf.ManifestRecord.Identifier.Type;
const conversationsToUpdate = []; const conversationsToUpdate = [];
@ -214,7 +218,7 @@ async function generateManifest(
} }
} }
const unknownRecordsArray = ( const unknownRecordsArray: ReadonlyArray<UnknownRecord> = (
window.storage.get('storage-service-unknown-records') || [] window.storage.get('storage-service-unknown-records') || []
).filter((record: UnknownRecord) => !validRecordTypes.has(record.itemType)); ).filter((record: UnknownRecord) => !validRecordTypes.has(record.itemType));
@ -233,7 +237,7 @@ async function generateManifest(
manifestRecordKeys.add(identifier); manifestRecordKeys.add(identifier);
}); });
const recordsWithErrors = const recordsWithErrors: ReadonlyArray<UnknownRecord> =
window.storage.get('storage-service-error-records') || []; window.storage.get('storage-service-error-records') || [];
window.log.info( window.log.info(
@ -347,6 +351,54 @@ async function generateManifest(
}; };
} }
async function repairUnknownAndErroredRecords() {
const unknownRecordsArray: ReadonlyArray<UnknownRecord> =
window.storage.get('storage-service-unknown-records') || [];
const recordsWithErrors: ReadonlyArray<UnknownRecord> =
window.storage.get('storage-service-error-records') || [];
const remoteRecords = unknownRecordsArray.concat(recordsWithErrors);
// No repair necessary
if (remoteRecords.length === 0) {
return;
}
// Process unknown and records with records from the past sync to see
// if they can be merged
const remoteRecordsMap: Map<string, RemoteRecord> = new Map();
remoteRecords.forEach(record => {
remoteRecordsMap.set(record.storageID, record);
});
window.log.info(
'storageService.repairUnknownAndErroredRecords: found ' +
`${unknownRecordsArray.length} unknown records and ` +
`${recordsWithErrors.length} errored records, attempting repair`
);
const conflictCount = await processRemoteRecords(remoteRecordsMap);
if (conflictCount !== 0) {
window.log.info(
'storageService.repairUnknownAndErroredRecords: fixed ' +
`${conflictCount} conflicts`
);
}
const newUnknownCount = (
window.storage.get('storage-service-unknown-records') || []
).length;
const newErroredCount = (
window.storage.get('storage-service-error-records') || []
).length;
window.log.info(
'storageService.repairUnknownAndErroredRecords: ' +
`${newUnknownCount} unknown records and ` +
`${newErroredCount} errored records after repair`
);
}
async function uploadManifest( async function uploadManifest(
version: number, version: number,
{ {
@ -612,9 +664,6 @@ async function mergeRecord(
async function processManifest( async function processManifest(
manifest: ManifestRecordClass manifest: ManifestRecordClass
): Promise<boolean> { ): Promise<boolean> {
const storageKeyBase64 = window.storage.get('storageKey');
const storageKey = base64ToArrayBuffer(storageKeyBase64);
if (!window.textsecure.messaging) { if (!window.textsecure.messaging) {
throw new Error('storageService.processManifest: We are offline!'); throw new Error('storageService.processManifest: We are offline!');
} }
@ -632,7 +681,7 @@ async function processManifest(
.map((conversation: ConversationModel) => conversation.get('storageID')) .map((conversation: ConversationModel) => conversation.get('storageID'))
.filter(Boolean); .filter(Boolean);
const unknownRecordsArray = const unknownRecordsArray: ReadonlyArray<UnknownRecord> =
window.storage.get('storage-service-unknown-records') || []; window.storage.get('storage-service-unknown-records') || [];
unknownRecordsArray.forEach((record: UnknownRecord) => { unknownRecordsArray.forEach((record: UnknownRecord) => {
@ -642,7 +691,7 @@ async function processManifest(
} }
}); });
const recordsWithErrors = const recordsWithErrors: ReadonlyArray<UnknownRecord> =
window.storage.get('storage-service-error-records') || []; window.storage.get('storage-service-error-records') || [];
// Do not fetch any records that we failed to merge in the previous fetch // Do not fetch any records that we failed to merge in the previous fetch
@ -673,15 +722,44 @@ async function processManifest(
} }
}); });
const remoteOnly = Array.from(remoteOnlySet); const remoteOnlyRecords = new Map<string, RemoteRecord>();
remoteOnlySet.forEach(storageID => {
remoteOnlyRecords.set(storageID, {
storageID,
itemType: remoteKeysTypeMap.get(storageID),
});
});
// if the remote only keys are larger or equal to our local keys then it
// was likely a forced push of storage service. We keep track of these
// merges so that we can detect possible infinite loops
const isForcePushed = remoteOnlyRecords.size >= localKeys.length;
const conflictCount = await processRemoteRecords(
remoteOnlyRecords,
isForcePushed
);
const hasConflicts = conflictCount !== 0;
return hasConflicts;
}
async function processRemoteRecords(
remoteOnlyRecords: Map<string, RemoteRecord>,
isForcePushed = false
): Promise<number> {
const storageKeyBase64 = window.storage.get('storageKey');
const storageKey = base64ToArrayBuffer(storageKeyBase64);
window.log.info( window.log.info(
'storageService.processManifest: remote keys', 'storageService.processRemoteRecords: remote keys',
remoteOnly.length remoteOnlyRecords.size
); );
const readOperation = new window.textsecure.protobuf.ReadOperation(); const readOperation = new window.textsecure.protobuf.ReadOperation();
readOperation.readKey = remoteOnly.map(base64ToArrayBuffer); readOperation.readKey = Array.from(remoteOnlyRecords.keys()).map(
base64ToArrayBuffer
);
const credentials = window.storage.get('storageCredentials'); const credentials = window.storage.get('storageCredentials');
const storageItemsBuffer = await window.textsecure.messaging.getStorageRecords( const storageItemsBuffer = await window.textsecure.messaging.getStorageRecords(
@ -697,9 +775,9 @@ async function processManifest(
if (!storageItems.items) { if (!storageItems.items) {
window.log.info( window.log.info(
'storageService.processManifest: No storage items retrieved' 'storageService.processRemoteRecords: No storage items retrieved'
); );
return false; return 0;
} }
const decryptedStorageItems = await pMap( const decryptedStorageItems = await pMap(
@ -711,11 +789,11 @@ async function processManifest(
if (!key || !storageItemCiphertext) { if (!key || !storageItemCiphertext) {
window.log.error( window.log.error(
'storageService.processManifest: No key or Ciphertext available' 'storageService.processRemoteRecords: No key or Ciphertext available'
); );
await stopStorageServiceSync(); await stopStorageServiceSync();
throw new Error( throw new Error(
'storageService.processManifest: Missing key and/or Ciphertext' 'storageService.processRemoteRecords: Missing key and/or Ciphertext'
); );
} }
@ -734,7 +812,7 @@ async function processManifest(
); );
} catch (err) { } catch (err) {
window.log.error( window.log.error(
'storageService.processManifest: Error decrypting storage item' 'storageService.processRemoteRecords: Error decrypting storage item'
); );
await stopStorageServiceSync(); await stopStorageServiceSync();
throw err; throw err;
@ -744,8 +822,16 @@ async function processManifest(
storageItemPlaintext storageItemPlaintext
); );
const remoteRecord = remoteOnlyRecords.get(base64ItemID);
if (!remoteRecord) {
throw new Error(
"Got a remote record that wasn't requested with " +
`storageID: ${base64ItemID}`
);
}
return { return {
itemType: remoteKeysTypeMap.get(base64ItemID), itemType: remoteRecord.itemType,
storageID: base64ItemID, storageID: base64ItemID,
storageRecord, storageRecord,
}; };
@ -763,23 +849,27 @@ async function processManifest(
try { try {
window.log.info( window.log.info(
`storageService.processManifest: Attempting to merge ${sortedStorageItems.length} records` `storageService.processRemoteRecords: Attempting to merge ${sortedStorageItems.length} records`
); );
const mergedRecords = await pMap(sortedStorageItems, mergeRecord, { const mergedRecords = await pMap(sortedStorageItems, mergeRecord, {
concurrency: 5, concurrency: 5,
}); });
window.log.info( window.log.info(
`storageService.processManifest: Processed ${mergedRecords.length} records` `storageService.processRemoteRecords: Processed ${mergedRecords.length} records`
); );
// Collect full map of previously and currently unknown records
const unknownRecords: Map<string, UnknownRecord> = new Map(); const unknownRecords: Map<string, UnknownRecord> = new Map();
const unknownRecordsArray: ReadonlyArray<UnknownRecord> =
window.storage.get('storage-service-unknown-records') || [];
unknownRecordsArray.forEach((record: UnknownRecord) => { unknownRecordsArray.forEach((record: UnknownRecord) => {
unknownRecords.set(record.storageID, record); unknownRecords.set(record.storageID, record);
}); });
const newRecordsWithErrors: Array<UnknownRecord> = []; const newRecordsWithErrors: Array<UnknownRecord> = [];
let hasConflict = false; let conflictCount = 0;
mergedRecords.forEach((mergedRecord: MergedRecordType) => { mergedRecords.forEach((mergedRecord: MergedRecordType) => {
if (mergedRecord.isUnsupported) { if (mergedRecord.isUnsupported) {
@ -794,7 +884,9 @@ async function processManifest(
}); });
} }
hasConflict = hasConflict || mergedRecord.hasConflict; if (mergedRecord.hasConflict) {
conflictCount += 1;
}
}); });
// Filter out all the unknown records we're already supporting // Filter out all the unknown records we're already supporting
@ -803,13 +895,13 @@ async function processManifest(
); );
window.log.info( window.log.info(
'storageService.processManifest: Unknown records found:', 'storageService.processRemoteRecords: Unknown records found:',
newUnknownRecords.length newUnknownRecords.length
); );
window.storage.put('storage-service-unknown-records', newUnknownRecords); window.storage.put('storage-service-unknown-records', newUnknownRecords);
window.log.info( window.log.info(
'storageService.processManifest: Records with errors:', 'storageService.processRemoteRecords: Records with errors:',
newRecordsWithErrors.length newRecordsWithErrors.length
); );
// Refresh the list of records that had errors with every push, that way // Refresh the list of records that had errors with every push, that way
@ -819,12 +911,9 @@ async function processManifest(
const now = Date.now(); const now = Date.now();
// if the remote only keys are larger or equal to our local keys then it if (isForcePushed) {
// was likely a forced push of storage service. We keep track of these
// merges so that we can detect possible infinite loops
if (remoteOnly.length >= localKeys.length) {
window.log.info( window.log.info(
'storageService.processManifest: remote manifest was likely force pushed', 'storageService.processRemoteRecords: remote manifest was likely force pushed',
now now
); );
forcedPushBucket.push(now); forcedPushBucket.push(now);
@ -834,9 +923,9 @@ async function processManifest(
// key so that they can be included in the next update // key so that they can be included in the next update
window.getConversations().forEach((conversation: ConversationModel) => { window.getConversations().forEach((conversation: ConversationModel) => {
const storageID = conversation.get('storageID'); const storageID = conversation.get('storageID');
if (storageID && !remoteOnlySet.has(storageID)) { if (storageID && !remoteOnlyRecords.has(storageID)) {
window.log.info( window.log.info(
'storageService.processManifest: clearing storageID', 'storageService.processRemoteRecords: clearing storageID',
conversation.debugID() conversation.debugID()
); );
conversation.unset('storageID'); conversation.unset('storageID');
@ -848,7 +937,7 @@ async function processManifest(
if (now - firstMostRecentForcedPush < 5 * MINUTE) { if (now - firstMostRecentForcedPush < 5 * MINUTE) {
window.log.info( window.log.info(
'storageService.processManifest: thrasing? Backing off' 'storageService.processRemoteRecords: thrasing? Backing off'
); );
const error = new Error(); const error = new Error();
error.code = 'E_BACKOFF'; error.code = 'E_BACKOFF';
@ -856,7 +945,7 @@ async function processManifest(
} }
window.log.info( window.log.info(
'storageService.processManifest: thrash timestamp of first -> now', 'storageService.processRemoteRecords: thrash timestamp of first -> now',
firstMostRecentForcedPush, firstMostRecentForcedPush,
now now
); );
@ -864,23 +953,24 @@ async function processManifest(
} }
} }
if (hasConflict) { if (conflictCount !== 0) {
window.log.info( window.log.info(
'storageService.processManifest: Conflict found, uploading changes' 'storageService.processRemoteRecords: ' +
`${conflictCount} conflicts found, uploading changes`
); );
return true; return conflictCount;
} }
consecutiveConflicts = 0; consecutiveConflicts = 0;
} catch (err) { } catch (err) {
window.log.error( window.log.error(
'storageService.processManifest: failed!', 'storageService.processRemoteRecords: failed!',
err && err.stack ? err.stack : String(err) err && err.stack ? err.stack : String(err)
); );
} }
return false; return 0;
} }
async function sync(): Promise<void> { async function sync(): Promise<void> {

View file

@ -6,6 +6,7 @@ import { isNumber } from 'lodash';
import { import {
arrayBufferToBase64, arrayBufferToBase64,
base64ToArrayBuffer, base64ToArrayBuffer,
deriveMasterKeyFromGroupV1,
fromEncodedBinaryToArrayBuffer, fromEncodedBinaryToArrayBuffer,
} from '../Crypto'; } from '../Crypto';
import dataInterface from '../sql/Client'; import dataInterface from '../sql/Client';
@ -385,19 +386,38 @@ export async function mergeGroupV1Record(
const groupId = groupV1Record.id.toBinary(); const groupId = groupV1Record.id.toBinary();
// We do a get here because we don't get enough information from just this source to // Attempt to fetch an existing group pertaining to the `groupId` or create
// be able to do the right thing with this group. So we'll update the local group // a new group and populate it with the attributes from the record.
// record if we have one; otherwise we'll just drop this update. let conversation = window.ConversationController.get(groupId);
const conversation = window.ConversationController.get(groupId);
if (!conversation) { if (!conversation) {
throw new Error(`No conversation for group(${groupId})`); const masterKeyBuffer = await deriveMasterKeyFromGroupV1(groupId);
} const fields = deriveGroupFields(masterKeyBuffer);
window.log.info('storageService.mergeGroupV1Record:', conversation.debugID()); const derivedGroupV2Id = arrayBufferToBase64(fields.id);
if (!conversation.isGroupV1()) { window.log.info(
throw new Error(`Record has group type mismatch ${conversation.debugID()}`); 'storageService.mergeGroupV1Record: failed to find group by v1 id ' +
`attempting lookup by v2 groupv2(${derivedGroupV2Id})`
);
conversation = window.ConversationController.get(derivedGroupV2Id);
}
if (conversation) {
window.log.info(
'storageService.mergeGroupV1Record: found existing group',
conversation.debugID()
);
} else {
conversation = await window.ConversationController.getOrCreateAndWait(
groupId,
'group'
);
window.log.info(
'storageService.mergeGroupV1Record: created a new group locally',
conversation.debugID()
);
} }
// If we receive a group V1 record, remote data should take precendence
// even if the group is actually V2 on our end.
conversation.set({ conversation.set({
isArchived: Boolean(groupV1Record.archived), isArchived: Boolean(groupV1Record.archived),
markedUnread: Boolean(groupV1Record.markedUnread), markedUnread: Boolean(groupV1Record.markedUnread),
@ -406,13 +426,29 @@ export async function mergeGroupV1Record(
applyMessageRequestState(groupV1Record, conversation); applyMessageRequestState(groupV1Record, conversation);
let hasPendingChanges: boolean;
if (conversation.isGroupV1()) {
addUnknownFields(groupV1Record, conversation); addUnknownFields(groupV1Record, conversation);
const hasPendingChanges = doesRecordHavePendingChanges( hasPendingChanges = doesRecordHavePendingChanges(
await toGroupV1Record(conversation), await toGroupV1Record(conversation),
groupV1Record, groupV1Record,
conversation conversation
); );
} else {
// We cannot preserve unknown fields if local group is V2 and the remote is
// still V1, because the storageItem that we'll put into manifest will have
// a different record type.
window.log.info(
'storageService.mergeGroupV1Record marking v1 ' +
' group for an update to v2',
conversation.debugID()
);
// We want to upgrade group in the storage after merging it.
hasPendingChanges = true;
}
updateConversation(conversation.attributes); updateConversation(conversation.attributes);