Update storageService logging
This commit is contained in:
parent
0a18cc50bd
commit
cb5131420f
4 changed files with 444 additions and 339 deletions
1
ts/model-types.d.ts
vendored
1
ts/model-types.d.ts
vendored
|
@ -273,6 +273,7 @@ export type ConversationAttributesType = {
|
|||
needsVerification?: boolean;
|
||||
profileSharing: boolean;
|
||||
storageID?: string;
|
||||
storageVersion?: number;
|
||||
storageUnknownFields?: string;
|
||||
unreadCount?: number;
|
||||
version: number;
|
||||
|
|
|
@ -144,6 +144,7 @@ const ATTRIBUTES_THAT_DONT_INVALIDATE_PROPS_CACHE = new Set([
|
|||
'profileLastFetchedAt',
|
||||
'needsStorageServiceSync',
|
||||
'storageID',
|
||||
'storageVersion',
|
||||
'storageUnknownFields',
|
||||
]);
|
||||
|
||||
|
@ -5047,7 +5048,7 @@ export class ConversationModel extends window.Backbone
|
|||
});
|
||||
}
|
||||
|
||||
startMuteTimer(): void {
|
||||
startMuteTimer({ viaStorageServiceSync = false } = {}): void {
|
||||
if (this.muteTimer !== undefined) {
|
||||
clearTimeout(this.muteTimer);
|
||||
this.muteTimer = undefined;
|
||||
|
@ -5057,7 +5058,7 @@ export class ConversationModel extends window.Backbone
|
|||
if (isNumber(muteExpiresAt) && muteExpiresAt < Number.MAX_SAFE_INTEGER) {
|
||||
const delay = muteExpiresAt - Date.now();
|
||||
if (delay <= 0) {
|
||||
this.setMuteExpiration(0);
|
||||
this.setMuteExpiration(0, { viaStorageServiceSync });
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -5076,7 +5077,10 @@ export class ConversationModel extends window.Backbone
|
|||
}
|
||||
|
||||
this.set({ muteExpiresAt });
|
||||
this.startMuteTimer();
|
||||
|
||||
// Don't cause duplicate captureChange
|
||||
this.startMuteTimer({ viaStorageServiceSync: true });
|
||||
|
||||
if (!viaStorageServiceSync) {
|
||||
this.captureChange('mutedUntilTimestamp');
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import {
|
|||
toGroupV1Record,
|
||||
toGroupV2Record,
|
||||
} from './storageRecordOps';
|
||||
import type { MergeResultType } from './storageRecordOps';
|
||||
import type { ConversationModel } from '../models/conversations';
|
||||
import { strictAssert } from '../util/assert';
|
||||
import * as durations from '../util/durations';
|
||||
|
@ -70,17 +71,30 @@ const conflictBackOff = new BackOff([
|
|||
30 * durations.SECOND,
|
||||
]);
|
||||
|
||||
function redactStorageID(storageID: string): string {
|
||||
return storageID.substring(0, 3);
|
||||
function redactStorageID(
|
||||
storageID: string,
|
||||
version?: number,
|
||||
conversation?: ConversationModel
|
||||
): string {
|
||||
const convoId = conversation ? ` ${conversation?.idForLogging()}` : '';
|
||||
return `${version ?? '?'}:${storageID.substring(0, 3)}${convoId}`;
|
||||
}
|
||||
|
||||
type RemoteRecord = {
|
||||
itemType: number;
|
||||
storageID: string;
|
||||
storageVersion?: number;
|
||||
};
|
||||
|
||||
type UnknownRecord = RemoteRecord;
|
||||
|
||||
function unknownRecordToRedactedID({
|
||||
storageID,
|
||||
storageVersion,
|
||||
}: UnknownRecord): string {
|
||||
return redactStorageID(storageID, storageVersion);
|
||||
}
|
||||
|
||||
async function encryptRecord(
|
||||
storageID: string | undefined,
|
||||
storageRecord: Proto.IStorageRecord
|
||||
|
@ -132,9 +146,8 @@ async function generateManifest(
|
|||
isNewManifest = false
|
||||
): Promise<GeneratedManifestType> {
|
||||
log.info(
|
||||
'storageService.generateManifest: generating manifest',
|
||||
version,
|
||||
isNewManifest
|
||||
`storageService.upload(${version}): generating manifest ` +
|
||||
`new=${isNewManifest}`
|
||||
);
|
||||
|
||||
await window.ConversationController.checkForConflicts();
|
||||
|
@ -195,82 +208,84 @@ async function generateManifest(
|
|||
storageRecord.groupV1 = await toGroupV1Record(conversation);
|
||||
identifier.type = ITEM_TYPE.GROUPV1;
|
||||
} else {
|
||||
log.info(
|
||||
'storageService.generateManifest: unknown conversation',
|
||||
conversation.idForLogging()
|
||||
log.warn(
|
||||
`storageService.upload(${version}): ` +
|
||||
`unknown conversation=${conversation.idForLogging()}`
|
||||
);
|
||||
}
|
||||
|
||||
if (storageRecord) {
|
||||
const currentStorageID = conversation.get('storageID');
|
||||
|
||||
const isNewItem =
|
||||
isNewManifest ||
|
||||
Boolean(conversation.get('needsStorageServiceSync')) ||
|
||||
!currentStorageID;
|
||||
|
||||
const storageID = isNewItem
|
||||
? Bytes.toBase64(generateStorageID())
|
||||
: currentStorageID;
|
||||
|
||||
let storageItem;
|
||||
try {
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
storageItem = await encryptRecord(storageID, storageRecord);
|
||||
} catch (err) {
|
||||
log.error(
|
||||
'storageService.generateManifest: encrypt record failed:',
|
||||
err && err.stack ? err.stack : String(err)
|
||||
);
|
||||
throw err;
|
||||
}
|
||||
identifier.raw = storageItem.key;
|
||||
|
||||
// When a client needs to update a given record it should create it
|
||||
// under a new key and delete the existing key.
|
||||
if (isNewItem) {
|
||||
newItems.add(storageItem);
|
||||
|
||||
if (storageID) {
|
||||
insertKeys.push(storageID);
|
||||
log.info(
|
||||
'storageService.generateManifest: new key',
|
||||
conversation.idForLogging(),
|
||||
redactStorageID(storageID)
|
||||
);
|
||||
} else {
|
||||
log.info(
|
||||
'storageService.generateManifest: no storage id',
|
||||
conversation.idForLogging()
|
||||
);
|
||||
}
|
||||
|
||||
const oldStorageID = conversation.get('storageID');
|
||||
if (oldStorageID) {
|
||||
log.info(
|
||||
'storageService.generateManifest: deleting key',
|
||||
redactStorageID(oldStorageID)
|
||||
);
|
||||
deleteKeys.push(Bytes.fromBase64(oldStorageID));
|
||||
}
|
||||
|
||||
conversationsToUpdate.push({
|
||||
conversation,
|
||||
storageID,
|
||||
});
|
||||
}
|
||||
|
||||
manifestRecordKeys.add(identifier);
|
||||
if (!storageRecord) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const currentStorageID = conversation.get('storageID');
|
||||
const currentStorageVersion = conversation.get('storageVersion');
|
||||
|
||||
const currentRedactedID = currentStorageID
|
||||
? redactStorageID(currentStorageID, currentStorageVersion)
|
||||
: undefined;
|
||||
|
||||
const isNewItem =
|
||||
isNewManifest ||
|
||||
Boolean(conversation.get('needsStorageServiceSync')) ||
|
||||
!currentStorageID;
|
||||
|
||||
const storageID = isNewItem
|
||||
? Bytes.toBase64(generateStorageID())
|
||||
: currentStorageID;
|
||||
|
||||
let storageItem;
|
||||
try {
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
storageItem = await encryptRecord(storageID, storageRecord);
|
||||
} catch (err) {
|
||||
log.error(
|
||||
`storageService.upload(${version}): encrypt record failed:`,
|
||||
Errors.toLogFormat(err)
|
||||
);
|
||||
throw err;
|
||||
}
|
||||
identifier.raw = storageItem.key;
|
||||
|
||||
// When a client needs to update a given record it should create it
|
||||
// under a new key and delete the existing key.
|
||||
if (isNewItem) {
|
||||
newItems.add(storageItem);
|
||||
|
||||
insertKeys.push(storageID);
|
||||
const newRedactedID = redactStorageID(storageID, version, conversation);
|
||||
if (currentStorageID) {
|
||||
log.info(
|
||||
`storageService.upload(${version}): ` +
|
||||
`updating from=${currentRedactedID} ` +
|
||||
`to=${newRedactedID}`
|
||||
);
|
||||
deleteKeys.push(Bytes.fromBase64(currentStorageID));
|
||||
} else {
|
||||
log.info(
|
||||
`storageService.upload(${version}): adding key=${newRedactedID}`
|
||||
);
|
||||
}
|
||||
|
||||
conversationsToUpdate.push({
|
||||
conversation,
|
||||
storageID,
|
||||
});
|
||||
}
|
||||
|
||||
manifestRecordKeys.add(identifier);
|
||||
}
|
||||
|
||||
const unknownRecordsArray: ReadonlyArray<UnknownRecord> = (
|
||||
window.storage.get('storage-service-unknown-records') || []
|
||||
).filter((record: UnknownRecord) => !validRecordTypes.has(record.itemType));
|
||||
|
||||
const redactedUnknowns = unknownRecordsArray.map(unknownRecordToRedactedID);
|
||||
|
||||
log.info(
|
||||
'storageService.generateManifest: adding unknown records:',
|
||||
unknownRecordsArray.length
|
||||
`storageService.upload(${version}): adding unknown ` +
|
||||
`records=${JSON.stringify(redactedUnknowns)} ` +
|
||||
`count=${redactedUnknowns.length}`
|
||||
);
|
||||
|
||||
// When updating the manifest, ensure all "unknown" keys are added to the
|
||||
|
@ -287,10 +302,11 @@ async function generateManifest(
|
|||
'storage-service-error-records',
|
||||
new Array<UnknownRecord>()
|
||||
);
|
||||
const redactedErrors = recordsWithErrors.map(unknownRecordToRedactedID);
|
||||
|
||||
log.info(
|
||||
'storageService.generateManifest: adding records that had errors in the previous merge',
|
||||
recordsWithErrors.length
|
||||
`storageService.upload(${version}): adding error ` +
|
||||
`records=${JSON.stringify(redactedErrors)} count=${redactedErrors.length}`
|
||||
);
|
||||
|
||||
// These records failed to merge in the previous fetchManifest, but we still
|
||||
|
@ -320,8 +336,10 @@ async function generateManifest(
|
|||
rawDuplicates.has(identifier.raw) ||
|
||||
typeRawDuplicates.has(typeAndRaw)
|
||||
) {
|
||||
log.info(
|
||||
'storageService.generateManifest: removing duplicate identifier from manifest',
|
||||
log.warn(
|
||||
`storageService.upload(${version}): removing from duplicate item ` +
|
||||
'from the manifest',
|
||||
redactStorageID(storageID),
|
||||
identifier.type
|
||||
);
|
||||
manifestRecordKeys.delete(identifier);
|
||||
|
@ -334,8 +352,9 @@ async function generateManifest(
|
|||
key => Bytes.toBase64(key) === storageID
|
||||
);
|
||||
if (hasDeleteKey) {
|
||||
log.info(
|
||||
'storageService.generateManifest: removing key which has been deleted',
|
||||
log.warn(
|
||||
`storageService.upload(${version}): removing key which has been deleted`,
|
||||
redactStorageID(storageID),
|
||||
identifier.type
|
||||
);
|
||||
manifestRecordKeys.delete(identifier);
|
||||
|
@ -344,7 +363,10 @@ async function generateManifest(
|
|||
// Ensure that there is *exactly* one Account type in the manifest
|
||||
if (identifier.type === ITEM_TYPE.ACCOUNT) {
|
||||
if (hasAccountType) {
|
||||
log.info('storageService.generateManifest: removing duplicate account');
|
||||
log.warn(
|
||||
`storageService.upload(${version}): removing duplicate account`,
|
||||
redactStorageID(storageID)
|
||||
);
|
||||
manifestRecordKeys.delete(identifier);
|
||||
}
|
||||
hasAccountType = true;
|
||||
|
@ -362,8 +384,9 @@ async function generateManifest(
|
|||
|
||||
const storageID = Bytes.toBase64(storageItem.key);
|
||||
if (storageKeyDuplicates.has(storageID)) {
|
||||
log.info(
|
||||
'storageService.generateManifest: removing duplicate identifier from inserts',
|
||||
log.warn(
|
||||
`storageService.upload(${version}): ` +
|
||||
'removing duplicate identifier from inserts',
|
||||
redactStorageID(storageID)
|
||||
);
|
||||
newItems.delete(storageItem);
|
||||
|
@ -413,7 +436,7 @@ async function generateManifest(
|
|||
const remoteDeletes: Array<string> = [];
|
||||
pendingDeletes.forEach(id => remoteDeletes.push(redactStorageID(id)));
|
||||
log.error(
|
||||
'Delete key sizes do not match',
|
||||
`storageService.upload(${version}): delete key sizes do not match`,
|
||||
'local',
|
||||
localDeletes.join(','),
|
||||
'remote',
|
||||
|
@ -482,16 +505,15 @@ async function uploadManifest(
|
|||
}
|
||||
|
||||
if (newItems.size === 0 && deleteKeys.length === 0) {
|
||||
log.info('storageService.uploadManifest: nothing to upload');
|
||||
log.info(`storageService.upload(${version}): nothing to upload`);
|
||||
return;
|
||||
}
|
||||
|
||||
const credentials = window.storage.get('storageCredentials');
|
||||
try {
|
||||
log.info(
|
||||
'storageService.uploadManifest: keys inserting, deleting:',
|
||||
newItems.size,
|
||||
deleteKeys.length
|
||||
`storageService.upload(${version}): inserting=${newItems.size} ` +
|
||||
`deleting=${deleteKeys.length}`
|
||||
);
|
||||
|
||||
const writeOperation = new Proto.WriteOperation();
|
||||
|
@ -499,7 +521,6 @@ async function uploadManifest(
|
|||
writeOperation.insertItem = Array.from(newItems);
|
||||
writeOperation.deleteKey = deleteKeys;
|
||||
|
||||
log.info('storageService.uploadManifest: uploading...', version);
|
||||
await window.textsecure.messaging.modifyStorageRecords(
|
||||
Proto.WriteOperation.encode(writeOperation).finish(),
|
||||
{
|
||||
|
@ -508,34 +529,38 @@ async function uploadManifest(
|
|||
);
|
||||
|
||||
log.info(
|
||||
'storageService.uploadManifest: upload done, updating conversation(s) with new storageIDs:',
|
||||
conversationsToUpdate.length
|
||||
`storageService.upload(${version}): upload complete, updating ` +
|
||||
`conversations=${conversationsToUpdate.length}`
|
||||
);
|
||||
|
||||
// update conversations with the new storageID
|
||||
conversationsToUpdate.forEach(({ conversation, storageID }) => {
|
||||
conversation.set({
|
||||
needsStorageServiceSync: false,
|
||||
storageVersion: version,
|
||||
storageID,
|
||||
});
|
||||
updateConversation(conversation.attributes);
|
||||
});
|
||||
} catch (err) {
|
||||
log.error(
|
||||
'storageService.uploadManifest: failed!',
|
||||
err && err.stack ? err.stack : String(err)
|
||||
`storageService.upload(${version}): failed!`,
|
||||
Errors.toLogFormat(err)
|
||||
);
|
||||
|
||||
if (err.code === 409) {
|
||||
if (conflictBackOff.isFull()) {
|
||||
log.error(
|
||||
'storageService.uploadManifest: Exceeded maximum consecutive conflicts'
|
||||
`storageService.upload(${version}): exceeded maximum consecutive ` +
|
||||
'conflicts'
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
log.info(
|
||||
`storageService.uploadManifest: Conflict found with v${version}, running sync job times(${conflictBackOff.getIndex()})`
|
||||
`storageService.upload(${version}): conflict found with ` +
|
||||
`version=${version}, running sync job ` +
|
||||
`times=${conflictBackOff.getIndex()}`
|
||||
);
|
||||
|
||||
throw err;
|
||||
|
@ -544,40 +569,30 @@ async function uploadManifest(
|
|||
throw err;
|
||||
}
|
||||
|
||||
log.info(
|
||||
'storageService.uploadManifest: setting new manifestVersion',
|
||||
version
|
||||
);
|
||||
log.info(`storageService.upload(${version}): setting new manifestVersion`);
|
||||
window.storage.put('manifestVersion', version);
|
||||
conflictBackOff.reset();
|
||||
backOff.reset();
|
||||
|
||||
if (window.ConversationController.areWePrimaryDevice()) {
|
||||
log.warn(
|
||||
'storageService.uploadManifest: We are primary device; not sending sync manifest'
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await singleProtoJobQueue.add(
|
||||
window.textsecure.messaging.getFetchManifestSyncMessage()
|
||||
);
|
||||
} catch (error) {
|
||||
log.error(
|
||||
'storageService.uploadManifest: Failed to queue sync message',
|
||||
`storageService.upload(${version}): Failed to queue sync message`,
|
||||
Errors.toLogFormat(error)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async function stopStorageServiceSync() {
|
||||
log.info('storageService.stopStorageServiceSync');
|
||||
async function stopStorageServiceSync(reason: Error) {
|
||||
log.warn('storageService.stopStorageServiceSync', Errors.toLogFormat(reason));
|
||||
|
||||
await window.storage.remove('storageKey');
|
||||
|
||||
if (backOff.isFull()) {
|
||||
log.info(
|
||||
log.warn(
|
||||
'storageService.stopStorageServiceSync: too many consecutive stops'
|
||||
);
|
||||
return;
|
||||
|
@ -650,10 +665,10 @@ async function decryptManifest(
|
|||
async function fetchManifest(
|
||||
manifestVersion: number
|
||||
): Promise<Proto.ManifestRecord | undefined> {
|
||||
log.info('storageService.fetchManifest');
|
||||
log.info('storageService.sync: fetch start');
|
||||
|
||||
if (!window.textsecure.messaging) {
|
||||
throw new Error('storageService.fetchManifest: We are offline!');
|
||||
throw new Error('storageService.sync: we are offline!');
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -669,28 +684,19 @@ async function fetchManifest(
|
|||
);
|
||||
const encryptedManifest = Proto.StorageManifest.decode(manifestBinary);
|
||||
|
||||
// if we don't get a value we're assuming that there's no newer manifest
|
||||
if (!encryptedManifest.value || !encryptedManifest.version) {
|
||||
log.info('storageService.fetchManifest: nothing changed');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
return decryptManifest(encryptedManifest);
|
||||
} catch (err) {
|
||||
await stopStorageServiceSync();
|
||||
await stopStorageServiceSync(err);
|
||||
return;
|
||||
}
|
||||
} catch (err) {
|
||||
if (err.code === 204) {
|
||||
log.info('storageService.fetchManifest: no newer manifest, ok');
|
||||
log.info('storageService.sync: no newer manifest, ok');
|
||||
return;
|
||||
}
|
||||
|
||||
log.error(
|
||||
'storageService.fetchManifest: failed!',
|
||||
err && err.stack ? err.stack : String(err)
|
||||
);
|
||||
log.error('storageService.sync: failed!', Errors.toLogFormat(err));
|
||||
|
||||
if (err.code === 404) {
|
||||
await createNewManifest();
|
||||
|
@ -714,49 +720,80 @@ type MergedRecordType = UnknownRecord & {
|
|||
};
|
||||
|
||||
async function mergeRecord(
|
||||
storageVersion: number,
|
||||
itemToMerge: MergeableItemType
|
||||
): Promise<MergedRecordType> {
|
||||
const { itemType, storageID, storageRecord } = itemToMerge;
|
||||
|
||||
const ITEM_TYPE = Proto.ManifestRecord.Identifier.Type;
|
||||
|
||||
let hasConflict = false;
|
||||
let mergeResult: MergeResultType = { hasConflict: false, details: [] };
|
||||
let isUnsupported = false;
|
||||
let hasError = false;
|
||||
|
||||
try {
|
||||
if (itemType === ITEM_TYPE.UNKNOWN) {
|
||||
log.info('storageService.mergeRecord: Unknown item type', storageID);
|
||||
log.warn('storageService.mergeRecord: Unknown item type', storageID);
|
||||
} else if (itemType === ITEM_TYPE.CONTACT && storageRecord.contact) {
|
||||
hasConflict = await mergeContactRecord(storageID, storageRecord.contact);
|
||||
mergeResult = await mergeContactRecord(
|
||||
storageID,
|
||||
storageVersion,
|
||||
storageRecord.contact
|
||||
);
|
||||
} else if (itemType === ITEM_TYPE.GROUPV1 && storageRecord.groupV1) {
|
||||
hasConflict = await mergeGroupV1Record(storageID, storageRecord.groupV1);
|
||||
mergeResult = await mergeGroupV1Record(
|
||||
storageID,
|
||||
storageVersion,
|
||||
storageRecord.groupV1
|
||||
);
|
||||
} else if (itemType === ITEM_TYPE.GROUPV2 && storageRecord.groupV2) {
|
||||
hasConflict = await mergeGroupV2Record(storageID, storageRecord.groupV2);
|
||||
mergeResult = await mergeGroupV2Record(
|
||||
storageID,
|
||||
storageVersion,
|
||||
storageRecord.groupV2
|
||||
);
|
||||
} else if (itemType === ITEM_TYPE.ACCOUNT && storageRecord.account) {
|
||||
hasConflict = await mergeAccountRecord(storageID, storageRecord.account);
|
||||
mergeResult = await mergeAccountRecord(
|
||||
storageID,
|
||||
storageVersion,
|
||||
storageRecord.account
|
||||
);
|
||||
} else {
|
||||
isUnsupported = true;
|
||||
log.info('storageService.mergeRecord: Unknown record:', itemType);
|
||||
log.warn(
|
||||
`storageService.merge(${redactStorageID(
|
||||
storageID,
|
||||
storageVersion
|
||||
)}): unknown item type=${itemType}`
|
||||
);
|
||||
}
|
||||
|
||||
const redactedID = redactStorageID(
|
||||
storageID,
|
||||
storageVersion,
|
||||
mergeResult.conversation
|
||||
);
|
||||
const oldID = mergeResult.oldStorageID
|
||||
? redactStorageID(mergeResult.oldStorageID, mergeResult.oldStorageVersion)
|
||||
: '?';
|
||||
log.info(
|
||||
'storageService.mergeRecord: merged',
|
||||
redactStorageID(storageID),
|
||||
itemType,
|
||||
hasConflict
|
||||
`storageService.merge(${redactedID}): merged item type=${itemType} ` +
|
||||
`oldID=${oldID} ` +
|
||||
`conflict=${mergeResult.hasConflict} ` +
|
||||
`details=${JSON.stringify(mergeResult.details)}`
|
||||
);
|
||||
} catch (err) {
|
||||
hasError = true;
|
||||
const redactedID = redactStorageID(storageID, storageVersion);
|
||||
log.error(
|
||||
'storageService.mergeRecord: Error with',
|
||||
redactStorageID(storageID),
|
||||
itemType,
|
||||
String(err)
|
||||
`storageService.merge(${redactedID}): error with ` +
|
||||
`item type=${itemType} ` +
|
||||
`details=${Errors.toLogFormat(err)}`
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
hasConflict,
|
||||
hasConflict: mergeResult.hasConflict,
|
||||
hasError,
|
||||
isUnsupported,
|
||||
itemType,
|
||||
|
@ -765,8 +802,9 @@ async function mergeRecord(
|
|||
}
|
||||
|
||||
async function processManifest(
|
||||
manifest: Proto.IManifestRecord
|
||||
): Promise<boolean> {
|
||||
manifest: Proto.IManifestRecord,
|
||||
version: number
|
||||
): Promise<number> {
|
||||
if (!window.textsecure.messaging) {
|
||||
throw new Error('storageService.processManifest: We are offline!');
|
||||
}
|
||||
|
@ -778,13 +816,13 @@ async function processManifest(
|
|||
});
|
||||
|
||||
const remoteKeys = new Set(remoteKeysTypeMap.keys());
|
||||
const localKeys: Set<string> = new Set();
|
||||
const localVersions = new Map<string, number | undefined>();
|
||||
|
||||
const conversations = window.getConversations();
|
||||
conversations.forEach((conversation: ConversationModel) => {
|
||||
const storageID = conversation.get('storageID');
|
||||
if (storageID) {
|
||||
localKeys.add(storageID);
|
||||
localVersions.set(storageID, conversation.get('storageVersion'));
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -794,33 +832,47 @@ async function processManifest(
|
|||
const stillUnknown = unknownRecordsArray.filter((record: UnknownRecord) => {
|
||||
// Do not include any unknown records that we already support
|
||||
if (!validRecordTypes.has(record.itemType)) {
|
||||
localKeys.add(record.storageID);
|
||||
localVersions.set(record.storageID, record.storageVersion);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
});
|
||||
|
||||
log.info(
|
||||
'storageService.processManifest: local records:',
|
||||
conversations.length
|
||||
);
|
||||
log.info('storageService.processManifest: local keys:', localKeys.size);
|
||||
log.info(
|
||||
'storageService.processManifest: unknown records:',
|
||||
stillUnknown.length
|
||||
);
|
||||
log.info('storageService.processManifest: remote keys:', remoteKeys.size);
|
||||
|
||||
const remoteOnlySet: Set<string> = new Set();
|
||||
remoteKeys.forEach((key: string) => {
|
||||
if (!localKeys.has(key)) {
|
||||
const remoteOnlySet = new Set<string>();
|
||||
for (const key of remoteKeys) {
|
||||
if (!localVersions.has(key)) {
|
||||
remoteOnlySet.add(key);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
const localOnlySet = new Set<string>();
|
||||
for (const key of localVersions.keys()) {
|
||||
if (!remoteKeys.has(key)) {
|
||||
localOnlySet.add(key);
|
||||
}
|
||||
}
|
||||
|
||||
const redactedRemoteOnly = Array.from(remoteOnlySet).map(id =>
|
||||
redactStorageID(id, version)
|
||||
);
|
||||
const redactedLocalOnly = Array.from(localOnlySet).map(id =>
|
||||
redactStorageID(id, localVersions.get(id))
|
||||
);
|
||||
|
||||
log.info(
|
||||
'storageService.processManifest: remote ids:',
|
||||
Array.from(remoteOnlySet).map(redactStorageID).join(',')
|
||||
`storageService.process(${version}): localRecords=${conversations.length} ` +
|
||||
`localKeys=${localVersions.size} unknownKeys=${stillUnknown.length} ` +
|
||||
`remoteKeys=${remoteKeys.size}`
|
||||
);
|
||||
log.info(
|
||||
`storageService.process(${version}): ` +
|
||||
`remoteOnlyCount=${remoteOnlySet.size} ` +
|
||||
`remoteOnlyKeys=${JSON.stringify(redactedRemoteOnly)}`
|
||||
);
|
||||
log.info(
|
||||
`storageService.process(${version}): ` +
|
||||
`localOnlyCount=${localOnlySet.size} ` +
|
||||
`localOnlyKeys=${JSON.stringify(redactedLocalOnly)}`
|
||||
);
|
||||
|
||||
const remoteOnlyRecords = new Map<string, RemoteRecord>();
|
||||
|
@ -831,12 +883,11 @@ async function processManifest(
|
|||
});
|
||||
});
|
||||
|
||||
if (!remoteOnlyRecords.size) {
|
||||
return false;
|
||||
let conflictCount = 0;
|
||||
if (remoteOnlyRecords.size) {
|
||||
conflictCount = await processRemoteRecords(version, remoteOnlyRecords);
|
||||
}
|
||||
|
||||
const conflictCount = await processRemoteRecords(remoteOnlyRecords);
|
||||
|
||||
// Post-merge, if our local records contain any storage IDs that were not
|
||||
// present in the remote manifest then we'll need to clear it, generate a
|
||||
// new storageID for that record, and upload.
|
||||
|
@ -845,20 +896,31 @@ async function processManifest(
|
|||
window.getConversations().forEach((conversation: ConversationModel) => {
|
||||
const storageID = conversation.get('storageID');
|
||||
if (storageID && !remoteKeys.has(storageID)) {
|
||||
const storageVersion = conversation.get('storageVersion');
|
||||
const missingKey = redactStorageID(
|
||||
storageID,
|
||||
storageVersion,
|
||||
conversation
|
||||
);
|
||||
log.info(
|
||||
'storageService.processManifest: local key was not in remote manifest',
|
||||
redactStorageID(storageID),
|
||||
conversation.idForLogging()
|
||||
`storageService.process(${version}): localKey=${missingKey} was not ` +
|
||||
'in remote manifest'
|
||||
);
|
||||
conversation.unset('storageID');
|
||||
conversation.unset('storageVersion');
|
||||
updateConversation(conversation.attributes);
|
||||
}
|
||||
});
|
||||
|
||||
return conflictCount !== 0;
|
||||
log.info(
|
||||
`storageService.process(${version}): conflictCount=${conflictCount}`
|
||||
);
|
||||
|
||||
return conflictCount;
|
||||
}
|
||||
|
||||
async function processRemoteRecords(
|
||||
storageVersion: number,
|
||||
remoteOnlyRecords: Map<string, RemoteRecord>
|
||||
): Promise<number> {
|
||||
const storageKeyBase64 = window.storage.get('storageKey');
|
||||
|
@ -868,8 +930,8 @@ async function processRemoteRecords(
|
|||
const storageKey = Bytes.fromBase64(storageKeyBase64);
|
||||
|
||||
log.info(
|
||||
'storageService.processRemoteRecords: remote only keys',
|
||||
remoteOnlyRecords.size
|
||||
`storageService.process(${storageVersion}): fetching remote keys ` +
|
||||
`count=${remoteOnlyRecords.size}`
|
||||
);
|
||||
|
||||
const readOperation = new Proto.ReadOperation();
|
||||
|
@ -888,11 +950,6 @@ async function processRemoteRecords(
|
|||
|
||||
const storageItems = Proto.StorageItems.decode(storageItemsBuffer);
|
||||
|
||||
if (!storageItems.items) {
|
||||
log.info('storageService.processRemoteRecords: No storage items retrieved');
|
||||
return 0;
|
||||
}
|
||||
|
||||
const decryptedStorageItems = await pMap(
|
||||
storageItems.items,
|
||||
async (
|
||||
|
@ -901,13 +958,12 @@ async function processRemoteRecords(
|
|||
const { key, value: storageItemCiphertext } = storageRecordWrapper;
|
||||
|
||||
if (!key || !storageItemCiphertext) {
|
||||
log.error(
|
||||
'storageService.processRemoteRecords: No key or Ciphertext available'
|
||||
);
|
||||
await stopStorageServiceSync();
|
||||
throw new Error(
|
||||
'storageService.processRemoteRecords: Missing key and/or Ciphertext'
|
||||
const error = new Error(
|
||||
`storageService.process(${storageVersion}): ` +
|
||||
'missing key and/or Ciphertext'
|
||||
);
|
||||
await stopStorageServiceSync(error);
|
||||
throw error;
|
||||
}
|
||||
|
||||
const base64ItemID = Bytes.toBase64(key);
|
||||
|
@ -922,9 +978,11 @@ async function processRemoteRecords(
|
|||
);
|
||||
} catch (err) {
|
||||
log.error(
|
||||
'storageService.processRemoteRecords: Error decrypting storage item'
|
||||
`storageService.process(${storageVersion}): ` +
|
||||
'Error decrypting storage item',
|
||||
Errors.toLogFormat(err)
|
||||
);
|
||||
await stopStorageServiceSync();
|
||||
await stopStorageServiceSync(err);
|
||||
throw err;
|
||||
}
|
||||
|
||||
|
@ -957,24 +1015,28 @@ async function processRemoteRecords(
|
|||
|
||||
try {
|
||||
log.info(
|
||||
`storageService.processRemoteRecords: Attempting to merge ${sortedStorageItems.length} records`
|
||||
`storageService.process(${storageVersion}): ` +
|
||||
`attempting to merge records=${sortedStorageItems.length}`
|
||||
);
|
||||
const mergedRecords = await pMap(
|
||||
sortedStorageItems,
|
||||
(item: MergeableItemType) => mergeRecord(storageVersion, item),
|
||||
{ concurrency: 5 }
|
||||
);
|
||||
const mergedRecords = await pMap(sortedStorageItems, mergeRecord, {
|
||||
concurrency: 5,
|
||||
});
|
||||
log.info(
|
||||
`storageService.processRemoteRecords: Processed ${mergedRecords.length} records`
|
||||
`storageService.process(${storageVersion}): ` +
|
||||
`processed records=${mergedRecords.length}`
|
||||
);
|
||||
|
||||
// Collect full map of previously and currently unknown records
|
||||
const unknownRecords: Map<string, UnknownRecord> = new Map();
|
||||
|
||||
const unknownRecordsArray: ReadonlyArray<UnknownRecord> =
|
||||
const previousUnknownRecords: ReadonlyArray<UnknownRecord> =
|
||||
window.storage.get(
|
||||
'storage-service-unknown-records',
|
||||
new Array<UnknownRecord>()
|
||||
);
|
||||
unknownRecordsArray.forEach((record: UnknownRecord) => {
|
||||
previousUnknownRecords.forEach((record: UnknownRecord) => {
|
||||
unknownRecords.set(record.storageID, record);
|
||||
});
|
||||
|
||||
|
@ -987,11 +1049,13 @@ async function processRemoteRecords(
|
|||
unknownRecords.set(mergedRecord.storageID, {
|
||||
itemType: mergedRecord.itemType,
|
||||
storageID: mergedRecord.storageID,
|
||||
storageVersion,
|
||||
});
|
||||
} else if (mergedRecord.hasError) {
|
||||
newRecordsWithErrors.push({
|
||||
itemType: mergedRecord.itemType,
|
||||
storageID: mergedRecord.storageID,
|
||||
storageVersion,
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -1004,36 +1068,46 @@ async function processRemoteRecords(
|
|||
const newUnknownRecords = Array.from(unknownRecords.values()).filter(
|
||||
(record: UnknownRecord) => !validRecordTypes.has(record.itemType)
|
||||
);
|
||||
|
||||
log.info(
|
||||
'storageService.processRemoteRecords: Unknown records found:',
|
||||
newUnknownRecords.length
|
||||
const redactedNewUnknowns = newUnknownRecords.map(
|
||||
unknownRecordToRedactedID
|
||||
);
|
||||
window.storage.put('storage-service-unknown-records', newUnknownRecords);
|
||||
|
||||
log.info(
|
||||
'storageService.processRemoteRecords: Records with errors:',
|
||||
newRecordsWithErrors.length
|
||||
`storageService.process(${storageVersion}): ` +
|
||||
`unknown records=${JSON.stringify(redactedNewUnknowns)} ` +
|
||||
`count=${redactedNewUnknowns.length}`
|
||||
);
|
||||
await window.storage.put(
|
||||
'storage-service-unknown-records',
|
||||
newUnknownRecords
|
||||
);
|
||||
|
||||
const redactedErrorRecords = newRecordsWithErrors.map(
|
||||
unknownRecordToRedactedID
|
||||
);
|
||||
log.info(
|
||||
`storageService.process(${storageVersion}): ` +
|
||||
`error records=${JSON.stringify(redactedErrorRecords)} ` +
|
||||
`count=${redactedErrorRecords.length}`
|
||||
);
|
||||
// Refresh the list of records that had errors with every push, that way
|
||||
// this list doesn't grow unbounded and we keep the list of storage keys
|
||||
// fresh.
|
||||
window.storage.put('storage-service-error-records', newRecordsWithErrors);
|
||||
await window.storage.put(
|
||||
'storage-service-error-records',
|
||||
newRecordsWithErrors
|
||||
);
|
||||
|
||||
if (conflictCount !== 0) {
|
||||
log.info(
|
||||
'storageService.processRemoteRecords: ' +
|
||||
`${conflictCount} conflicts found, uploading changes`
|
||||
);
|
||||
|
||||
return conflictCount;
|
||||
if (conflictCount === 0) {
|
||||
conflictBackOff.reset();
|
||||
}
|
||||
|
||||
conflictBackOff.reset();
|
||||
return conflictCount;
|
||||
} catch (err) {
|
||||
log.error(
|
||||
'storageService.processRemoteRecords: failed!',
|
||||
err && err.stack ? err.stack : String(err)
|
||||
`storageService.process(${storageVersion}): ` +
|
||||
'failed to process remote records',
|
||||
Errors.toLogFormat(err)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -1060,12 +1134,17 @@ async function sync(
|
|||
|
||||
const localManifestVersion = manifestFromStorage || 0;
|
||||
|
||||
log.info(`storageService.sync: fetching ${localManifestVersion}`);
|
||||
log.info(
|
||||
'storageService.sync: fetching latest ' +
|
||||
`after version=${localManifestVersion}`
|
||||
);
|
||||
manifest = await fetchManifest(localManifestVersion);
|
||||
|
||||
// Guarding against no manifests being returned, everything should be ok
|
||||
if (!manifest) {
|
||||
log.info('storageService.sync: no new manifest');
|
||||
log.info(
|
||||
`storageService.sync: no updates, version=${localManifestVersion}`
|
||||
);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
|
@ -1076,25 +1155,30 @@ async function sync(
|
|||
const version = normalizeNumber(manifest.version);
|
||||
|
||||
log.info(
|
||||
`storageService.sync: manifest versions - previous: ${localManifestVersion}, current: ${version}`
|
||||
`storageService.sync: updating to remoteVersion=${version} from ` +
|
||||
`version=${localManifestVersion}`
|
||||
);
|
||||
|
||||
const hasConflicts = await processManifest(manifest);
|
||||
const conflictCount = await processManifest(manifest, version);
|
||||
|
||||
log.info(`storageService.sync: storing new manifest version ${version}`);
|
||||
log.info(
|
||||
`storageService.sync: updated to version=${version} ` +
|
||||
`conflicts=${conflictCount}`
|
||||
);
|
||||
|
||||
window.storage.put('manifestVersion', version);
|
||||
await window.storage.put('manifestVersion', version);
|
||||
|
||||
const hasConflicts = conflictCount !== 0;
|
||||
if (hasConflicts && !ignoreConflicts) {
|
||||
await upload(true);
|
||||
}
|
||||
|
||||
// We now know that we've successfully completed a storage service fetch
|
||||
window.storage.put('storageFetchComplete', true);
|
||||
await window.storage.put('storageFetchComplete', true);
|
||||
} catch (err) {
|
||||
log.error(
|
||||
'storageService.sync: error processing manifest',
|
||||
err && err.stack ? err.stack : String(err)
|
||||
Errors.toLogFormat(err)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -1180,10 +1264,7 @@ async function upload(fromSync = false): Promise<void> {
|
|||
setTimeout(runStorageServiceSyncJob);
|
||||
return;
|
||||
}
|
||||
log.error(
|
||||
'storageService.upload',
|
||||
err && err.stack ? err.stack : String(err)
|
||||
);
|
||||
log.error('storageService.upload', Errors.toLogFormat(err));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -50,6 +50,19 @@ type RecordClass =
|
|||
| Proto.IGroupV1Record
|
||||
| Proto.IGroupV2Record;
|
||||
|
||||
export type MergeResultType = Readonly<{
|
||||
hasConflict: boolean;
|
||||
conversation?: ConversationModel;
|
||||
oldStorageID?: string;
|
||||
oldStorageVersion?: number;
|
||||
details: ReadonlyArray<string>;
|
||||
}>;
|
||||
|
||||
type HasConflictResultType = Readonly<{
|
||||
hasConflict: boolean;
|
||||
details: ReadonlyArray<string>;
|
||||
}>;
|
||||
|
||||
function toRecordVerified(verified: number): Proto.ContactRecord.IdentityState {
|
||||
const VERIFIED_ENUM = window.textsecure.storage.protocol.VerifiedStatus;
|
||||
const STATE_ENUM = Proto.ContactRecord.IdentityState;
|
||||
|
@ -66,13 +79,11 @@ function toRecordVerified(verified: number): Proto.ContactRecord.IdentityState {
|
|||
|
||||
function addUnknownFields(
|
||||
record: RecordClass,
|
||||
conversation: ConversationModel
|
||||
conversation: ConversationModel,
|
||||
details: Array<string>
|
||||
): void {
|
||||
if (record.__unknownFields) {
|
||||
log.info(
|
||||
'storageService.addUnknownFields: Unknown fields found for',
|
||||
conversation.idForLogging()
|
||||
);
|
||||
details.push('adding unknown fields');
|
||||
conversation.set({
|
||||
storageUnknownFields: Bytes.toBase64(
|
||||
Bytes.concatenate(record.__unknownFields)
|
||||
|
@ -81,10 +92,7 @@ function addUnknownFields(
|
|||
} else if (conversation.get('storageUnknownFields')) {
|
||||
// If the record doesn't have unknown fields attached but we have them
|
||||
// saved locally then we need to clear it out
|
||||
log.info(
|
||||
'storageService.addUnknownFields: Clearing unknown fields for',
|
||||
conversation.idForLogging()
|
||||
);
|
||||
details.push('clearing unknown fields');
|
||||
conversation.unset('storageUnknownFields');
|
||||
}
|
||||
}
|
||||
|
@ -97,7 +105,7 @@ function applyUnknownFields(
|
|||
if (storageUnknownFields) {
|
||||
log.info(
|
||||
'storageService.applyUnknownFields: Applying unknown fields for',
|
||||
conversation.get('id')
|
||||
conversation.idForLogging()
|
||||
);
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
record.__unknownFields = [Bytes.fromBase64(storageUnknownFields)];
|
||||
|
@ -292,11 +300,6 @@ export async function toAccountRecord(
|
|||
pinnedConversationClass !== undefined
|
||||
);
|
||||
|
||||
log.info(
|
||||
'storageService.toAccountRecord: pinnedConversations',
|
||||
pinnedConversations.length
|
||||
);
|
||||
|
||||
accountRecord.pinnedConversations = pinnedConversations;
|
||||
|
||||
const subscriberId = window.storage.get('subscriberId');
|
||||
|
@ -398,12 +401,11 @@ type RecordClassObject = {
|
|||
|
||||
function doRecordsConflict(
|
||||
localRecord: RecordClassObject,
|
||||
remoteRecord: RecordClassObject,
|
||||
conversation: ConversationModel
|
||||
): boolean {
|
||||
const idForLogging = conversation.idForLogging();
|
||||
remoteRecord: RecordClassObject
|
||||
): HasConflictResultType {
|
||||
const details = new Array<string>();
|
||||
|
||||
return Object.keys(remoteRecord).some((key: string): boolean => {
|
||||
for (const key of Object.keys(remoteRecord)) {
|
||||
const localValue = localRecord[key];
|
||||
const remoteValue = remoteRecord[key];
|
||||
|
||||
|
@ -412,52 +414,37 @@ function doRecordsConflict(
|
|||
if (localValue instanceof Uint8Array) {
|
||||
const areEqual = Bytes.areEqual(localValue, remoteValue);
|
||||
if (!areEqual) {
|
||||
log.info(
|
||||
'storageService.doRecordsConflict: Conflict found for Uint8Array',
|
||||
key,
|
||||
idForLogging
|
||||
);
|
||||
details.push(`key=${key}: different bytes`);
|
||||
}
|
||||
return !areEqual;
|
||||
continue;
|
||||
}
|
||||
|
||||
// If both types are Long we can use Long's equals to compare them
|
||||
if (Long.isLong(localValue) || typeof localValue === 'number') {
|
||||
if (!Long.isLong(remoteValue) && typeof remoteValue !== 'number') {
|
||||
log.info(
|
||||
'storageService.doRecordsConflict: Conflict found, remote value ' +
|
||||
'is not a number',
|
||||
key
|
||||
);
|
||||
return true;
|
||||
details.push(`key=${key}: type mismatch`);
|
||||
continue;
|
||||
}
|
||||
|
||||
const areEqual = Long.fromValue(localValue).equals(
|
||||
Long.fromValue(remoteValue)
|
||||
);
|
||||
if (!areEqual) {
|
||||
log.info(
|
||||
'storageService.doRecordsConflict: Conflict found for Long',
|
||||
key,
|
||||
idForLogging
|
||||
);
|
||||
details.push(`key=${key}: different integers`);
|
||||
}
|
||||
return !areEqual;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (key === 'pinnedConversations') {
|
||||
const areEqual = arePinnedConversationsEqual(localValue, remoteValue);
|
||||
if (!areEqual) {
|
||||
log.info(
|
||||
'storageService.doRecordsConflict: Conflict found for pinnedConversations',
|
||||
idForLogging
|
||||
);
|
||||
details.push('pinnedConversations');
|
||||
}
|
||||
return !areEqual;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (localValue === remoteValue) {
|
||||
return false;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Sometimes we get `null` values from Protobuf and they should default to
|
||||
|
@ -470,56 +457,59 @@ function doRecordsConflict(
|
|||
localValue === 0 ||
|
||||
(Long.isLong(localValue) && localValue.toNumber() === 0))
|
||||
) {
|
||||
return false;
|
||||
continue;
|
||||
}
|
||||
|
||||
const areEqual = isEqual(localValue, remoteValue);
|
||||
|
||||
if (!areEqual) {
|
||||
log.info(
|
||||
'storageService.doRecordsConflict: Conflict found for',
|
||||
key,
|
||||
idForLogging
|
||||
);
|
||||
details.push(`key=${key}: different values`);
|
||||
}
|
||||
}
|
||||
|
||||
return !areEqual;
|
||||
});
|
||||
return {
|
||||
hasConflict: details.length > 0,
|
||||
details,
|
||||
};
|
||||
}
|
||||
|
||||
function doesRecordHavePendingChanges(
|
||||
mergedRecord: RecordClass,
|
||||
serviceRecord: RecordClass,
|
||||
conversation: ConversationModel
|
||||
): boolean {
|
||||
): HasConflictResultType {
|
||||
const shouldSync = Boolean(conversation.get('needsStorageServiceSync'));
|
||||
|
||||
if (!shouldSync) {
|
||||
return false;
|
||||
return { hasConflict: false, details: [] };
|
||||
}
|
||||
|
||||
const hasConflict = doRecordsConflict(
|
||||
const { hasConflict, details } = doRecordsConflict(
|
||||
mergedRecord,
|
||||
serviceRecord,
|
||||
conversation
|
||||
serviceRecord
|
||||
);
|
||||
|
||||
if (!hasConflict) {
|
||||
conversation.set({ needsStorageServiceSync: false });
|
||||
}
|
||||
|
||||
return hasConflict;
|
||||
return {
|
||||
hasConflict,
|
||||
details,
|
||||
};
|
||||
}
|
||||
|
||||
export async function mergeGroupV1Record(
|
||||
storageID: string,
|
||||
storageVersion: number,
|
||||
groupV1Record: Proto.IGroupV1Record
|
||||
): Promise<boolean> {
|
||||
): Promise<MergeResultType> {
|
||||
if (!groupV1Record.id) {
|
||||
throw new Error(`No ID for ${storageID}`);
|
||||
}
|
||||
|
||||
const groupId = Bytes.toBinary(groupV1Record.id);
|
||||
let details = new Array<string>();
|
||||
|
||||
// Attempt to fetch an existing group pertaining to the `groupId` or create
|
||||
// a new group and populate it with the attributes from the record.
|
||||
|
@ -544,18 +534,13 @@ export async function mergeGroupV1Record(
|
|||
const fields = deriveGroupFields(masterKeyBuffer);
|
||||
const derivedGroupV2Id = Bytes.toBase64(fields.id);
|
||||
|
||||
log.info(
|
||||
'storageService.mergeGroupV1Record: failed to find group by v1 id ' +
|
||||
details.push(
|
||||
'failed to find group by v1 id ' +
|
||||
`attempting lookup by v2 groupv2(${derivedGroupV2Id})`
|
||||
);
|
||||
conversation = window.ConversationController.get(derivedGroupV2Id);
|
||||
}
|
||||
if (conversation) {
|
||||
log.info(
|
||||
'storageService.mergeGroupV1Record: found existing group',
|
||||
conversation.idForLogging()
|
||||
);
|
||||
} else {
|
||||
if (!conversation) {
|
||||
if (groupV1Record.id.byteLength !== 16) {
|
||||
throw new Error('Not a valid gv1');
|
||||
}
|
||||
|
@ -564,16 +549,17 @@ export async function mergeGroupV1Record(
|
|||
groupId,
|
||||
'group'
|
||||
);
|
||||
log.info(
|
||||
'storageService.mergeGroupV1Record: created a new group locally',
|
||||
conversation.idForLogging()
|
||||
);
|
||||
details.push('created a new group locally');
|
||||
}
|
||||
|
||||
const oldStorageID = conversation.get('storageID');
|
||||
const oldStorageVersion = conversation.get('storageVersion');
|
||||
|
||||
conversation.set({
|
||||
isArchived: Boolean(groupV1Record.archived),
|
||||
markedUnread: Boolean(groupV1Record.markedUnread),
|
||||
storageID,
|
||||
storageVersion,
|
||||
});
|
||||
|
||||
conversation.setMuteExpiration(
|
||||
|
@ -588,30 +574,35 @@ export async function mergeGroupV1Record(
|
|||
let hasPendingChanges: boolean;
|
||||
|
||||
if (isGroupV1(conversation.attributes)) {
|
||||
addUnknownFields(groupV1Record, conversation);
|
||||
addUnknownFields(groupV1Record, conversation, details);
|
||||
|
||||
hasPendingChanges = doesRecordHavePendingChanges(
|
||||
const { hasConflict, details: extraDetails } = doesRecordHavePendingChanges(
|
||||
await toGroupV1Record(conversation),
|
||||
groupV1Record,
|
||||
conversation
|
||||
);
|
||||
|
||||
details = details.concat(extraDetails);
|
||||
hasPendingChanges = hasConflict;
|
||||
} else {
|
||||
// We cannot preserve unknown fields if local group is V2 and the remote is
|
||||
// still V1, because the storageItem that we'll put into manifest will have
|
||||
// a different record type.
|
||||
log.info(
|
||||
'storageService.mergeGroupV1Record marking v1' +
|
||||
' group for an update to v2',
|
||||
conversation.idForLogging()
|
||||
);
|
||||
|
||||
// We want to upgrade group in the storage after merging it.
|
||||
hasPendingChanges = true;
|
||||
details.push('marking v1 group for an update to v2');
|
||||
}
|
||||
|
||||
updateConversation(conversation.attributes);
|
||||
|
||||
return hasPendingChanges;
|
||||
return {
|
||||
hasConflict: hasPendingChanges,
|
||||
conversation,
|
||||
oldStorageID,
|
||||
oldStorageVersion,
|
||||
details,
|
||||
};
|
||||
}
|
||||
|
||||
async function getGroupV2Conversation(
|
||||
|
@ -663,8 +654,9 @@ async function getGroupV2Conversation(
|
|||
|
||||
export async function mergeGroupV2Record(
|
||||
storageID: string,
|
||||
storageVersion: number,
|
||||
groupV2Record: Proto.IGroupV2Record
|
||||
): Promise<boolean> {
|
||||
): Promise<MergeResultType> {
|
||||
if (!groupV2Record.masterKey) {
|
||||
throw new Error(`No master key for ${storageID}`);
|
||||
}
|
||||
|
@ -672,7 +664,8 @@ export async function mergeGroupV2Record(
|
|||
const masterKeyBuffer = groupV2Record.masterKey;
|
||||
const conversation = await getGroupV2Conversation(masterKeyBuffer);
|
||||
|
||||
log.info('storageService.mergeGroupV2Record:', conversation.idForLogging());
|
||||
const oldStorageID = conversation.get('storageID');
|
||||
const oldStorageVersion = conversation.get('storageVersion');
|
||||
|
||||
conversation.set({
|
||||
isArchived: Boolean(groupV2Record.archived),
|
||||
|
@ -681,6 +674,7 @@ export async function mergeGroupV2Record(
|
|||
groupV2Record.dontNotifyForMentionsIfMuted
|
||||
),
|
||||
storageID,
|
||||
storageVersion,
|
||||
});
|
||||
|
||||
conversation.setMuteExpiration(
|
||||
|
@ -692,14 +686,18 @@ export async function mergeGroupV2Record(
|
|||
|
||||
applyMessageRequestState(groupV2Record, conversation);
|
||||
|
||||
addUnknownFields(groupV2Record, conversation);
|
||||
let details = new Array<string>();
|
||||
|
||||
const hasPendingChanges = doesRecordHavePendingChanges(
|
||||
addUnknownFields(groupV2Record, conversation, details);
|
||||
|
||||
const { hasConflict, details: extraDetails } = doesRecordHavePendingChanges(
|
||||
await toGroupV2Record(conversation),
|
||||
groupV2Record,
|
||||
conversation
|
||||
);
|
||||
|
||||
details = details.concat(extraDetails);
|
||||
|
||||
updateConversation(conversation.attributes);
|
||||
|
||||
const isGroupNewToUs = !isNumber(conversation.get('revision'));
|
||||
|
@ -731,13 +729,20 @@ export async function mergeGroupV2Record(
|
|||
);
|
||||
}
|
||||
|
||||
return hasPendingChanges;
|
||||
return {
|
||||
hasConflict,
|
||||
conversation,
|
||||
oldStorageID,
|
||||
oldStorageVersion,
|
||||
details,
|
||||
};
|
||||
}
|
||||
|
||||
export async function mergeContactRecord(
|
||||
storageID: string,
|
||||
storageVersion: number,
|
||||
originalContactRecord: Proto.IContactRecord
|
||||
): Promise<boolean> {
|
||||
): Promise<MergeResultType> {
|
||||
const contactRecord = {
|
||||
...originalContactRecord,
|
||||
|
||||
|
@ -754,11 +759,11 @@ export async function mergeContactRecord(
|
|||
|
||||
// All contacts must have UUID
|
||||
if (!uuid) {
|
||||
return false;
|
||||
return { hasConflict: false, details: ['no uuid'] };
|
||||
}
|
||||
|
||||
if (!isValidUuid(uuid)) {
|
||||
return false;
|
||||
return { hasConflict: false, details: ['invalid uuid'] };
|
||||
}
|
||||
|
||||
const c = new window.Whisper.Conversation({
|
||||
|
@ -769,11 +774,10 @@ export async function mergeContactRecord(
|
|||
|
||||
const validationError = c.validate();
|
||||
if (validationError) {
|
||||
log.error(
|
||||
'storageService.mergeContactRecord: invalid contact',
|
||||
validationError
|
||||
);
|
||||
return false;
|
||||
return {
|
||||
hasConflict: false,
|
||||
details: [`validation error=${validationError}`],
|
||||
};
|
||||
}
|
||||
|
||||
const id = window.ConversationController.ensureContactIds({
|
||||
|
@ -792,8 +796,6 @@ export async function mergeContactRecord(
|
|||
'private'
|
||||
);
|
||||
|
||||
log.info('storageService.mergeContactRecord:', conversation.idForLogging());
|
||||
|
||||
if (contactRecord.profileKey) {
|
||||
await conversation.setProfileKey(Bytes.toBase64(contactRecord.profileKey), {
|
||||
viaStorageServiceSync: true,
|
||||
|
@ -823,12 +825,17 @@ export async function mergeContactRecord(
|
|||
|
||||
applyMessageRequestState(contactRecord, conversation);
|
||||
|
||||
addUnknownFields(contactRecord, conversation);
|
||||
let details = new Array<string>();
|
||||
addUnknownFields(contactRecord, conversation, details);
|
||||
|
||||
const oldStorageID = conversation.get('storageID');
|
||||
const oldStorageVersion = conversation.get('storageVersion');
|
||||
|
||||
conversation.set({
|
||||
isArchived: Boolean(contactRecord.archived),
|
||||
markedUnread: Boolean(contactRecord.markedUnread),
|
||||
storageID,
|
||||
storageVersion,
|
||||
});
|
||||
|
||||
conversation.setMuteExpiration(
|
||||
|
@ -838,21 +845,30 @@ export async function mergeContactRecord(
|
|||
}
|
||||
);
|
||||
|
||||
const hasPendingChanges = doesRecordHavePendingChanges(
|
||||
const { hasConflict, details: extraDetails } = doesRecordHavePendingChanges(
|
||||
await toContactRecord(conversation),
|
||||
contactRecord,
|
||||
conversation
|
||||
);
|
||||
details = details.concat(extraDetails);
|
||||
|
||||
updateConversation(conversation.attributes);
|
||||
|
||||
return hasPendingChanges;
|
||||
return {
|
||||
hasConflict,
|
||||
conversation,
|
||||
oldStorageID,
|
||||
oldStorageVersion,
|
||||
details,
|
||||
};
|
||||
}
|
||||
|
||||
export async function mergeAccountRecord(
|
||||
storageID: string,
|
||||
storageVersion: number,
|
||||
accountRecord: Proto.IAccountRecord
|
||||
): Promise<boolean> {
|
||||
): Promise<MergeResultType> {
|
||||
let details = new Array<string>();
|
||||
const {
|
||||
avatarUrl,
|
||||
linkPreviews,
|
||||
|
@ -911,7 +927,7 @@ export async function mergeAccountRecord(
|
|||
const localPreferredReactionEmoji =
|
||||
window.storage.get('preferredReactionEmoji') || [];
|
||||
if (!isEqual(localPreferredReactionEmoji, rawPreferredReactionEmoji)) {
|
||||
log.info(
|
||||
log.warn(
|
||||
'storageService: remote and local preferredReactionEmoji do not match',
|
||||
localPreferredReactionEmoji.length,
|
||||
rawPreferredReactionEmoji.length
|
||||
|
@ -970,7 +986,7 @@ export async function mergeAccountRecord(
|
|||
.filter(id => !modelPinnedConversationIds.includes(id));
|
||||
|
||||
if (missingStoragePinnedConversationIds.length !== 0) {
|
||||
log.info(
|
||||
log.warn(
|
||||
'mergeAccountRecord: pinnedConversationIds in storage does not match pinned Conversation models'
|
||||
);
|
||||
}
|
||||
|
@ -986,13 +1002,9 @@ export async function mergeAccountRecord(
|
|||
)
|
||||
);
|
||||
|
||||
log.info(
|
||||
'storageService.mergeAccountRecord: Local pinned',
|
||||
locallyPinnedConversations.length
|
||||
);
|
||||
log.info(
|
||||
'storageService.mergeAccountRecord: Remote pinned',
|
||||
pinnedConversations.length
|
||||
details.push(
|
||||
`local pinned=${locallyPinnedConversations.length}`,
|
||||
`remote pinned=${pinnedConversations.length}`
|
||||
);
|
||||
|
||||
const remotelyPinnedConversationPromises = pinnedConversations.map(
|
||||
|
@ -1041,14 +1053,9 @@ export async function mergeAccountRecord(
|
|||
({ id }) => !remotelyPinnedConversationIds.includes(id)
|
||||
);
|
||||
|
||||
log.info(
|
||||
'storageService.mergeAccountRecord: unpinning',
|
||||
conversationsToUnpin.length
|
||||
);
|
||||
|
||||
log.info(
|
||||
'storageService.mergeAccountRecord: pinning',
|
||||
remotelyPinnedConversations.length
|
||||
details.push(
|
||||
`unpinning=${conversationsToUnpin.length}`,
|
||||
`pinning=${remotelyPinnedConversations.length}`
|
||||
);
|
||||
|
||||
conversationsToUnpin.forEach(conversation => {
|
||||
|
@ -1083,12 +1090,16 @@ export async function mergeAccountRecord(
|
|||
'private'
|
||||
);
|
||||
|
||||
addUnknownFields(accountRecord, conversation);
|
||||
addUnknownFields(accountRecord, conversation, details);
|
||||
|
||||
const oldStorageID = conversation.get('storageID');
|
||||
const oldStorageVersion = conversation.get('storageVersion');
|
||||
|
||||
conversation.set({
|
||||
isArchived: Boolean(noteToSelfArchived),
|
||||
markedUnread: Boolean(noteToSelfMarkedUnread),
|
||||
storageID,
|
||||
storageVersion,
|
||||
});
|
||||
|
||||
if (accountRecord.profileKey) {
|
||||
|
@ -1100,7 +1111,7 @@ export async function mergeAccountRecord(
|
|||
window.storage.put('avatarUrl', avatarUrl);
|
||||
}
|
||||
|
||||
const hasPendingChanges = doesRecordHavePendingChanges(
|
||||
const { hasConflict, details: extraDetails } = doesRecordHavePendingChanges(
|
||||
await toAccountRecord(conversation),
|
||||
accountRecord,
|
||||
conversation
|
||||
|
@ -1108,5 +1119,13 @@ export async function mergeAccountRecord(
|
|||
|
||||
updateConversation(conversation.attributes);
|
||||
|
||||
return hasPendingChanges;
|
||||
details = details.concat(extraDetails);
|
||||
|
||||
return {
|
||||
hasConflict,
|
||||
conversation,
|
||||
oldStorageID,
|
||||
oldStorageVersion,
|
||||
details,
|
||||
};
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue