Drop invalid storage service keys
This commit is contained in:
parent
138580fedb
commit
e0205ffff8
2 changed files with 115 additions and 42 deletions
|
@ -88,6 +88,11 @@ type RemoteRecord = {
|
|||
|
||||
type UnknownRecord = RemoteRecord;
|
||||
|
||||
type ProcessRemoteRecordsResultType = Readonly<{
|
||||
conflictCount: number;
|
||||
deleteKeys: Array<string>;
|
||||
}>;
|
||||
|
||||
function unknownRecordToRedactedID({
|
||||
storageID,
|
||||
storageVersion,
|
||||
|
@ -143,11 +148,12 @@ type GeneratedManifestType = {
|
|||
async function generateManifest(
|
||||
version: number,
|
||||
previousManifest?: Proto.IManifestRecord,
|
||||
isNewManifest = false
|
||||
isNewManifest = false,
|
||||
extraDeleteKeys = new Array<string>()
|
||||
): Promise<GeneratedManifestType> {
|
||||
log.info(
|
||||
`storageService.upload(${version}): generating manifest ` +
|
||||
`new=${isNewManifest}`
|
||||
`new=${isNewManifest} extraDeleteKeys=${extraDeleteKeys.length}`
|
||||
);
|
||||
|
||||
await window.ConversationController.checkForConflicts();
|
||||
|
@ -160,6 +166,10 @@ async function generateManifest(
|
|||
const manifestRecordKeys: Set<IManifestRecordIdentifier> = new Set();
|
||||
const newItems: Set<Proto.IStorageItem> = new Set();
|
||||
|
||||
for (const key of extraDeleteKeys) {
|
||||
deleteKeys.push(Bytes.fromBase64(key));
|
||||
}
|
||||
|
||||
const conversations = window.getConversations();
|
||||
for (let i = 0; i < conversations.length; i += 1) {
|
||||
const conversation = conversations.models[i];
|
||||
|
@ -181,15 +191,25 @@ async function generateManifest(
|
|||
|
||||
const validationError = conversation.validate();
|
||||
if (validationError) {
|
||||
if (conversation.get('storageID')) {
|
||||
log.warn(
|
||||
'storageService.generateManifest: skipping contact',
|
||||
conversation.idForLogging(),
|
||||
'due to local validation error',
|
||||
validationError
|
||||
);
|
||||
conversation.unset('storageID');
|
||||
const droppedID = conversation.get('storageID');
|
||||
const droppedVersion = conversation.get('storageVersion');
|
||||
if (!droppedID) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const recordID = redactStorageID(
|
||||
droppedID,
|
||||
droppedVersion,
|
||||
conversation
|
||||
);
|
||||
|
||||
log.warn(
|
||||
`storageService.generateManifest(${version}): ` +
|
||||
`skipping contact=${recordID} ` +
|
||||
`due to local validation error=${validationError}`
|
||||
);
|
||||
conversation.unset('storageID');
|
||||
deleteKeys.push(Bytes.fromBase64(droppedID));
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -715,6 +735,7 @@ type MergeableItemType = {
|
|||
|
||||
type MergedRecordType = UnknownRecord & {
|
||||
hasConflict: boolean;
|
||||
shouldDrop: boolean;
|
||||
hasError: boolean;
|
||||
isUnsupported: boolean;
|
||||
};
|
||||
|
@ -780,6 +801,7 @@ async function mergeRecord(
|
|||
`storageService.merge(${redactedID}): merged item type=${itemType} ` +
|
||||
`oldID=${oldID} ` +
|
||||
`conflict=${mergeResult.hasConflict} ` +
|
||||
`shouldDrop=${Boolean(mergeResult.shouldDrop)} ` +
|
||||
`details=${JSON.stringify(mergeResult.details)}`
|
||||
);
|
||||
} catch (err) {
|
||||
|
@ -794,6 +816,7 @@ async function mergeRecord(
|
|||
|
||||
return {
|
||||
hasConflict: mergeResult.hasConflict,
|
||||
shouldDrop: Boolean(mergeResult.shouldDrop),
|
||||
hasError,
|
||||
isUnsupported,
|
||||
itemType,
|
||||
|
@ -804,7 +827,7 @@ async function mergeRecord(
|
|||
async function processManifest(
|
||||
manifest: Proto.IManifestRecord,
|
||||
version: number
|
||||
): Promise<number> {
|
||||
): Promise<ProcessRemoteRecordsResultType> {
|
||||
if (!window.textsecure.messaging) {
|
||||
throw new Error('storageService.processManifest: We are offline!');
|
||||
}
|
||||
|
@ -884,8 +907,12 @@ async function processManifest(
|
|||
});
|
||||
|
||||
let conflictCount = 0;
|
||||
let deleteKeys = new Array<string>();
|
||||
if (remoteOnlyRecords.size) {
|
||||
conflictCount = await processRemoteRecords(version, remoteOnlyRecords);
|
||||
({ conflictCount, deleteKeys } = await processRemoteRecords(
|
||||
version,
|
||||
remoteOnlyRecords
|
||||
));
|
||||
}
|
||||
|
||||
// Post-merge, if our local records contain any storage IDs that were not
|
||||
|
@ -913,16 +940,17 @@ async function processManifest(
|
|||
});
|
||||
|
||||
log.info(
|
||||
`storageService.process(${version}): conflictCount=${conflictCount}`
|
||||
`storageService.process(${version}): conflictCount=${conflictCount} ` +
|
||||
`deleteKeys=${deleteKeys.length}`
|
||||
);
|
||||
|
||||
return conflictCount;
|
||||
return { conflictCount, deleteKeys };
|
||||
}
|
||||
|
||||
async function processRemoteRecords(
|
||||
storageVersion: number,
|
||||
remoteOnlyRecords: Map<string, RemoteRecord>
|
||||
): Promise<number> {
|
||||
): Promise<ProcessRemoteRecordsResultType> {
|
||||
const storageKeyBase64 = window.storage.get('storageKey');
|
||||
if (!storageKeyBase64) {
|
||||
throw new Error('No storage key');
|
||||
|
@ -948,6 +976,8 @@ async function processRemoteRecords(
|
|||
}
|
||||
);
|
||||
|
||||
const missingKeys = new Set<string>(remoteOnlyRecords.keys());
|
||||
|
||||
const storageItems = Proto.StorageItems.decode(storageItemsBuffer);
|
||||
|
||||
const decryptedStorageItems = await pMap(
|
||||
|
@ -967,6 +997,7 @@ async function processRemoteRecords(
|
|||
}
|
||||
|
||||
const base64ItemID = Bytes.toBase64(key);
|
||||
missingKeys.delete(base64ItemID);
|
||||
|
||||
const storageItemKey = deriveStorageItemKey(storageKey, base64ItemID);
|
||||
|
||||
|
@ -1005,6 +1036,18 @@ async function processRemoteRecords(
|
|||
{ concurrency: 5 }
|
||||
);
|
||||
|
||||
const redactedMissingKeys = Array.from(missingKeys).map(id =>
|
||||
redactStorageID(id, storageVersion)
|
||||
);
|
||||
|
||||
log.info(
|
||||
`storageService.process(${storageVersion}): missing remote ` +
|
||||
`keys=${JSON.stringify(redactedMissingKeys)} ` +
|
||||
`count=${missingKeys.size}`
|
||||
);
|
||||
|
||||
const droppedKeys = new Set<string>();
|
||||
|
||||
// Merge Account records last since it contains the pinned conversations
|
||||
// and we need all other records merged first before we can find the pinned
|
||||
// records in our db
|
||||
|
@ -1059,11 +1102,24 @@ async function processRemoteRecords(
|
|||
});
|
||||
}
|
||||
|
||||
if (mergedRecord.hasConflict) {
|
||||
if (mergedRecord.hasConflict || mergedRecord.shouldDrop) {
|
||||
conflictCount += 1;
|
||||
}
|
||||
|
||||
if (mergedRecord.shouldDrop) {
|
||||
droppedKeys.add(mergedRecord.storageID);
|
||||
}
|
||||
});
|
||||
|
||||
const redactedDroppedKeys = Array.from(droppedKeys.values()).map(key =>
|
||||
redactStorageID(key, storageVersion)
|
||||
);
|
||||
log.info(
|
||||
`storageService.process(${storageVersion}): ` +
|
||||
`droppedKeys=${JSON.stringify(redactedDroppedKeys)} ` +
|
||||
`count=${redactedDroppedKeys.length}`
|
||||
);
|
||||
|
||||
// Filter out all the unknown records we're already supporting
|
||||
const newUnknownRecords = Array.from(unknownRecords.values()).filter(
|
||||
(record: UnknownRecord) => !validRecordTypes.has(record.itemType)
|
||||
|
@ -1102,7 +1158,10 @@ async function processRemoteRecords(
|
|||
conflictBackOff.reset();
|
||||
}
|
||||
|
||||
return conflictCount;
|
||||
return {
|
||||
conflictCount,
|
||||
deleteKeys: [...missingKeys, ...droppedKeys],
|
||||
};
|
||||
} catch (err) {
|
||||
log.error(
|
||||
`storageService.process(${storageVersion}): ` +
|
||||
|
@ -1111,7 +1170,7 @@ async function processRemoteRecords(
|
|||
);
|
||||
}
|
||||
|
||||
return 0;
|
||||
return { conflictCount: 0, deleteKeys: [] };
|
||||
}
|
||||
|
||||
async function sync(
|
||||
|
@ -1159,18 +1218,21 @@ async function sync(
|
|||
`version=${localManifestVersion}`
|
||||
);
|
||||
|
||||
const conflictCount = await processManifest(manifest, version);
|
||||
const { conflictCount, deleteKeys } = await processManifest(
|
||||
manifest,
|
||||
version
|
||||
);
|
||||
|
||||
log.info(
|
||||
`storageService.sync: updated to version=${version} ` +
|
||||
`conflicts=${conflictCount}`
|
||||
`conflicts=${conflictCount} deleteKeys=${deleteKeys.length}`
|
||||
);
|
||||
|
||||
await window.storage.put('manifestVersion', version);
|
||||
|
||||
const hasConflicts = conflictCount !== 0;
|
||||
const hasConflicts = conflictCount !== 0 || deleteKeys.length !== 0;
|
||||
if (hasConflicts && !ignoreConflicts) {
|
||||
await upload(true);
|
||||
await upload(true, deleteKeys);
|
||||
}
|
||||
|
||||
// We now know that we've successfully completed a storage service fetch
|
||||
|
@ -1187,7 +1249,10 @@ async function sync(
|
|||
return manifest;
|
||||
}
|
||||
|
||||
async function upload(fromSync = false): Promise<void> {
|
||||
async function upload(
|
||||
fromSync = false,
|
||||
extraDeleteKeys = new Array<string>()
|
||||
): Promise<void> {
|
||||
if (!window.textsecure.messaging) {
|
||||
throw new Error('storageService.upload: We are offline!');
|
||||
}
|
||||
|
@ -1249,10 +1314,17 @@ async function upload(fromSync = false): Promise<void> {
|
|||
const localManifestVersion = window.storage.get('manifestVersion', 0);
|
||||
const version = Number(localManifestVersion) + 1;
|
||||
|
||||
log.info('storageService.upload: will update to manifest version', version);
|
||||
log.info(
|
||||
`storageService.upload(${version}): will update to manifest version`
|
||||
);
|
||||
|
||||
try {
|
||||
const generatedManifest = await generateManifest(version, previousManifest);
|
||||
const generatedManifest = await generateManifest(
|
||||
version,
|
||||
previousManifest,
|
||||
false,
|
||||
extraDeleteKeys
|
||||
);
|
||||
await uploadManifest(version, generatedManifest);
|
||||
} catch (err) {
|
||||
if (err.code === 409) {
|
||||
|
@ -1264,7 +1336,10 @@ async function upload(fromSync = false): Promise<void> {
|
|||
setTimeout(runStorageServiceSyncJob);
|
||||
return;
|
||||
}
|
||||
log.error('storageService.upload', Errors.toLogFormat(err));
|
||||
log.error(
|
||||
`storageService.upload(${version}): error`,
|
||||
Errors.toLogFormat(err)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -36,11 +36,9 @@ import {
|
|||
import { ourProfileKeyService } from './ourProfileKey';
|
||||
import { isGroupV1, isGroupV2 } from '../util/whatTypeOfConversation';
|
||||
import { isValidUuid } from '../types/UUID';
|
||||
import type { ConversationAttributesType } from '../model-types.d';
|
||||
import * as preferredReactionEmoji from '../reactions/preferredReactionEmoji';
|
||||
import { SignalService as Proto } from '../protobuf';
|
||||
import * as log from '../logging/log';
|
||||
import type { WhatIsThis } from '../window.d';
|
||||
|
||||
const { updateConversation } = dataInterface;
|
||||
|
||||
|
@ -52,6 +50,7 @@ type RecordClass =
|
|||
|
||||
export type MergeResultType = Readonly<{
|
||||
hasConflict: boolean;
|
||||
shouldDrop?: boolean;
|
||||
conversation?: ConversationModel;
|
||||
oldStorageID?: string;
|
||||
oldStorageVersion?: number;
|
||||
|
@ -555,6 +554,19 @@ export async function mergeGroupV1Record(
|
|||
const oldStorageID = conversation.get('storageID');
|
||||
const oldStorageVersion = conversation.get('storageVersion');
|
||||
|
||||
if (!isGroupV1(conversation.attributes)) {
|
||||
details.push('GV1 record for GV2 group, dropping');
|
||||
|
||||
return {
|
||||
hasConflict: true,
|
||||
shouldDrop: true,
|
||||
conversation,
|
||||
oldStorageID,
|
||||
oldStorageVersion,
|
||||
details,
|
||||
};
|
||||
}
|
||||
|
||||
conversation.set({
|
||||
isArchived: Boolean(groupV1Record.archived),
|
||||
markedUnread: Boolean(groupV1Record.markedUnread),
|
||||
|
@ -766,20 +778,6 @@ export async function mergeContactRecord(
|
|||
return { hasConflict: false, details: ['invalid uuid'] };
|
||||
}
|
||||
|
||||
const c = new window.Whisper.Conversation({
|
||||
e164,
|
||||
uuid,
|
||||
type: 'private',
|
||||
} as Partial<ConversationAttributesType> as WhatIsThis);
|
||||
|
||||
const validationError = c.validate();
|
||||
if (validationError) {
|
||||
return {
|
||||
hasConflict: false,
|
||||
details: [`validation error=${validationError}`],
|
||||
};
|
||||
}
|
||||
|
||||
const id = window.ConversationController.ensureContactIds({
|
||||
e164,
|
||||
uuid,
|
||||
|
|
Loading…
Add table
Reference in a new issue