Storage Service Write: Improved conflict handling

This commit is contained in:
Josh Perez 2020-09-29 19:29:11 -04:00 committed by Josh Perez
parent 866217a724
commit 27759233e4
10 changed files with 297 additions and 134 deletions

View file

@ -122,8 +122,8 @@ const {
const { notify } = require('../../ts/services/notify'); const { notify } = require('../../ts/services/notify');
const { calling } = require('../../ts/services/calling'); const { calling } = require('../../ts/services/calling');
const { const {
enableStorageService,
eraseAllStorageServiceState, eraseAllStorageServiceState,
handleUnknownRecords,
runStorageServiceSyncJob, runStorageServiceSyncJob,
storageServiceUploadJob, storageServiceUploadJob,
} = require('../../ts/services/storage'); } = require('../../ts/services/storage');
@ -336,8 +336,8 @@ exports.setup = (options = {}) => {
const Services = { const Services = {
calling, calling,
enableStorageService,
eraseAllStorageServiceState, eraseAllStorageServiceState,
handleUnknownRecords,
initializeGroupCredentialFetcher, initializeGroupCredentialFetcher,
initializeNetworkObserver, initializeNetworkObserver,
initializeUpdateListener, initializeUpdateListener,

View file

@ -1468,6 +1468,11 @@ type WhatIsThis = typeof window.WhatIsThis;
} }
}); });
function runStorageService() {
window.Signal.Services.enableStorageService();
window.textsecure.messaging.sendRequestKeySyncMessage();
}
async function start() { async function start() {
window.dispatchEvent(new Event('storage_ready')); window.dispatchEvent(new Event('storage_ready'));
@ -1632,11 +1637,7 @@ type WhatIsThis = typeof window.WhatIsThis;
await window.storage.put('gv2-enabled', true); await window.storage.put('gv2-enabled', true);
window.Signal.Services.handleUnknownRecords( // Erase current manifest version so we re-process storage service data
window.textsecure.protobuf.ManifestRecord.Identifier.Type.GROUPV2
);
// Erase current manifest version so we re-process window.storage service data
await window.storage.remove('manifestVersion'); await window.storage.remove('manifestVersion');
// Kick off window.storage service fetch to grab GroupV2 information // Kick off window.storage service fetch to grab GroupV2 information
@ -1890,7 +1891,9 @@ type WhatIsThis = typeof window.WhatIsThis;
if (connectCount === 1) { if (connectCount === 1) {
window.Signal.Stickers.downloadQueuedPacks(); window.Signal.Stickers.downloadQueuedPacks();
await window.textsecure.messaging.sendRequestKeySyncMessage(); if (!newVersion) {
runStorageService();
}
} }
// On startup after upgrading to a new version, request a contact sync // On startup after upgrading to a new version, request a contact sync
@ -1905,6 +1908,8 @@ type WhatIsThis = typeof window.WhatIsThis;
window.log.info('Boot after upgrading. Requesting contact sync'); window.log.info('Boot after upgrading. Requesting contact sync');
window.getSyncRequest(); window.getSyncRequest();
runStorageService();
try { try {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const manager = window.getAccountManager()!; const manager = window.getAccountManager()!;
@ -2003,10 +2008,12 @@ type WhatIsThis = typeof window.WhatIsThis;
window.log.info('sync successful'); window.log.info('sync successful');
window.storage.put('synced_at', Date.now()); window.storage.put('synced_at', Date.now());
window.Whisper.events.trigger('contactsync'); window.Whisper.events.trigger('contactsync');
runStorageService();
}); });
syncRequest.addEventListener('timeout', () => { syncRequest.addEventListener('timeout', () => {
window.log.error('sync timed out'); window.log.error('sync timed out');
window.Whisper.events.trigger('contactsync'); window.Whisper.events.trigger('contactsync');
runStorageService();
}); });
const ourId = window.ConversationController.getOurConversationId(); const ourId = window.ConversationController.getOurConversationId();

View file

@ -1117,7 +1117,7 @@ function getMembers(groupState: GroupClass) {
} }
return groupState.members.map((member: MemberClass) => ({ return groupState.members.map((member: MemberClass) => ({
profileKey: member.profileKey, profileKey: arrayBufferToBase64(member.profileKey),
uuid: member.userId, uuid: member.userId,
})); }));
} }

1
ts/model-types.d.ts vendored
View file

@ -188,6 +188,7 @@ export type ConversationAttributesType = {
profileFamilyName?: string; profileFamilyName?: string;
profileKey?: string; profileKey?: string;
profileName?: string; profileName?: string;
storageProfileKey?: string;
verified?: number; verified?: number;
// Group-only // Group-only

View file

@ -16,6 +16,15 @@ import { ColorType } from '../types/Colors';
import { MessageModel } from './messages'; import { MessageModel } from './messages';
import { sniffImageMimeType } from '../util/sniffImageMimeType'; import { sniffImageMimeType } from '../util/sniffImageMimeType';
import { MIMEType, IMAGE_WEBP } from '../types/MIME'; import { MIMEType, IMAGE_WEBP } from '../types/MIME';
import {
arrayBufferToBase64,
base64ToArrayBuffer,
deriveAccessKey,
fromEncodedBinaryToArrayBuffer,
getRandomBytes,
stringFromBytes,
verifyAccessKey,
} from '../Crypto';
/* eslint-disable more/no-then */ /* eslint-disable more/no-then */
window.Whisper = window.Whisper || {}; window.Whisper = window.Whisper || {};
@ -39,14 +48,6 @@ const {
writeNewAttachmentData, writeNewAttachmentData,
} = window.Signal.Migrations; } = window.Signal.Migrations;
const { addStickerPackReference } = window.Signal.Data; const { addStickerPackReference } = window.Signal.Data;
const {
arrayBufferToBase64,
base64ToArrayBuffer,
deriveAccessKey,
getRandomBytes,
stringFromBytes,
verifyAccessKey,
} = window.Signal.Crypto;
const COLORS = [ const COLORS = [
'red', 'red',
@ -257,6 +258,15 @@ export class ConversationModel extends window.Backbone.Model<
(uuid && uuid === this.ourUuid)) as boolean; (uuid && uuid === this.ourUuid)) as boolean;
} }
isGroupV1(): boolean {
const groupID = this.get('groupId');
if (!groupID) {
return false;
}
return fromEncodedBinaryToArrayBuffer(groupID).byteLength === 16;
}
isEverUnregistered(): boolean { isEverUnregistered(): boolean {
return Boolean(this.get('discoveredUnregisteredAt')); return Boolean(this.get('discoveredUnregisteredAt'));
} }
@ -3228,7 +3238,10 @@ export class ConversationModel extends window.Backbone.Model<
sealedSender: SEALED_SENDER.UNKNOWN, sealedSender: SEALED_SENDER.UNKNOWN,
}); });
if (!viaStorageServiceSync) { if (
!viaStorageServiceSync &&
profileKey !== this.get('storageProfileKey')
) {
this.captureChange(); this.captureChange();
} }
@ -3241,6 +3254,12 @@ export class ConversationModel extends window.Backbone.Model<
Conversation: window.Whisper.Conversation, Conversation: window.Whisper.Conversation,
}); });
} }
if (viaStorageServiceSync) {
this.set({
storageProfileKey: profileKey,
});
}
} }
async dropProfileKey(): Promise<void> { async dropProfileKey(): Promise<void> {

View file

@ -8,7 +8,6 @@ import {
base64ToArrayBuffer, base64ToArrayBuffer,
deriveStorageItemKey, deriveStorageItemKey,
deriveStorageManifestKey, deriveStorageManifestKey,
fromEncodedBinaryToArrayBuffer,
} from '../Crypto'; } from '../Crypto';
import { import {
ManifestRecordClass, ManifestRecordClass,
@ -29,6 +28,7 @@ import {
toGroupV2Record, toGroupV2Record,
} from './storageRecordOps'; } from './storageRecordOps';
import { ConversationModel } from '../models/conversations'; import { ConversationModel } from '../models/conversations';
import { storageJobQueue } from '../util/JobQueue';
const { const {
eraseStorageServiceStateFromConversations, eraseStorageServiceStateFromConversations,
@ -37,6 +37,15 @@ const {
let consecutiveStops = 0; let consecutiveStops = 0;
let consecutiveConflicts = 0; let consecutiveConflicts = 0;
const forcedPushBucket: Array<number> = [];
const validRecordTypes = new Set([
0, // UNKNOWN
1, // CONTACT
2, // GROUPV1
3, // GROUPV2
4, // ACCOUNT
]);
type BackoffType = { type BackoffType = {
[key: number]: number | undefined; [key: number]: number | undefined;
@ -98,15 +107,6 @@ function generateStorageID(): ArrayBuffer {
return Crypto.getRandomBytes(16); return Crypto.getRandomBytes(16);
} }
function isGroupV1(conversation: ConversationModel): boolean {
const groupID = conversation.get('groupId');
if (!groupID) {
return false;
}
return fromEncodedBinaryToArrayBuffer(groupID).byteLength === 16;
}
type GeneratedManifestType = { type GeneratedManifestType = {
conversationsToUpdate: Array<{ conversationsToUpdate: Array<{
conversation: ConversationModel; conversation: ConversationModel;
@ -153,7 +153,7 @@ async function generateManifest(
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
storageRecord.groupV2 = await toGroupV2Record(conversation); storageRecord.groupV2 = await toGroupV2Record(conversation);
identifier.type = ITEM_TYPE.GROUPV2; identifier.type = ITEM_TYPE.GROUPV2;
} else if (isGroupV1(conversation)) { } else if (conversation.isGroupV1()) {
storageRecord = new window.textsecure.protobuf.StorageRecord(); storageRecord = new window.textsecure.protobuf.StorageRecord();
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
storageRecord.groupV1 = await toGroupV1Record(conversation); storageRecord.groupV1 = await toGroupV1Record(conversation);
@ -166,12 +166,16 @@ async function generateManifest(
} }
if (storageRecord) { if (storageRecord) {
const currentStorageID = conversation.get('storageID');
const isNewItem = const isNewItem =
isNewManifest || Boolean(conversation.get('needsStorageServiceSync')); isNewManifest ||
Boolean(conversation.get('needsStorageServiceSync')) ||
!currentStorageID;
const storageID = isNewItem const storageID = isNewItem
? arrayBufferToBase64(generateStorageID()) ? arrayBufferToBase64(generateStorageID())
: conversation.get('storageID'); : currentStorageID;
let storageItem; let storageItem;
try { try {
@ -217,6 +221,27 @@ async function generateManifest(
// When updating the manifest, ensure all "unknown" keys are added to the // When updating the manifest, ensure all "unknown" keys are added to the
// new manifest, so we don't inadvertently delete something we don't understand // new manifest, so we don't inadvertently delete something we don't understand
unknownRecordsArray.forEach((record: UnknownRecord) => { unknownRecordsArray.forEach((record: UnknownRecord) => {
if (validRecordTypes.has(record.itemType)) {
return;
}
const identifier = new window.textsecure.protobuf.ManifestRecord.Identifier();
identifier.type = record.itemType;
identifier.raw = base64ToArrayBuffer(record.storageID);
manifestRecordKeys.add(identifier);
});
const recordsWithErrors =
window.storage.get('storage-service-error-records') || [];
window.log.info(
`storageService.generateManifest: adding ${recordsWithErrors.length} records that had errors in the previous merge`
);
// These records failed to merge in the previous fetchManifest, but we still
// need to include them so that the manifest is complete
recordsWithErrors.forEach((record: UnknownRecord) => {
const identifier = new window.textsecure.protobuf.ManifestRecord.Identifier(); const identifier = new window.textsecure.protobuf.ManifestRecord.Identifier();
identifier.type = record.itemType; identifier.type = record.itemType;
identifier.raw = base64ToArrayBuffer(record.storageID); identifier.raw = base64ToArrayBuffer(record.storageID);
@ -334,6 +359,11 @@ async function uploadManifest(
throw new Error('storageService.uploadManifest: We are offline!'); throw new Error('storageService.uploadManifest: We are offline!');
} }
if (newItems.size === 0 && deleteKeys.length === 0) {
window.log.info('storageService.uploadManifest: nothing to upload');
return;
}
const credentials = window.storage.get('storageCredentials'); const credentials = window.storage.get('storageCredentials');
try { try {
window.log.info( window.log.info(
@ -345,7 +375,7 @@ async function uploadManifest(
writeOperation.insertItem = Array.from(newItems); writeOperation.insertItem = Array.from(newItems);
writeOperation.deleteKey = deleteKeys; writeOperation.deleteKey = deleteKeys;
window.log.info('storageService.uploadManifest: uploading...'); window.log.info('storageService.uploadManifest: uploading...', version);
await window.textsecure.messaging.modifyStorageRecords( await window.textsecure.messaging.modifyStorageRecords(
writeOperation.toArrayBuffer(), writeOperation.toArrayBuffer(),
{ {
@ -382,7 +412,7 @@ async function uploadManifest(
consecutiveConflicts += 1; consecutiveConflicts += 1;
window.log.info( window.log.info(
`storageService.uploadManifest: Conflict found, running sync job times(${consecutiveConflicts})` `storageService.uploadManifest: Conflict found with v${version}, running sync job times(${consecutiveConflicts})`
); );
throw err; throw err;
@ -527,6 +557,7 @@ type MergeableItemType = {
type MergedRecordType = UnknownRecord & { type MergedRecordType = UnknownRecord & {
hasConflict: boolean; hasConflict: boolean;
hasError: boolean;
isUnsupported: boolean; isUnsupported: boolean;
}; };
@ -539,6 +570,7 @@ async function mergeRecord(
let hasConflict = false; let hasConflict = false;
let isUnsupported = false; let isUnsupported = false;
let hasError = false;
try { try {
if (itemType === ITEM_TYPE.UNKNOWN) { if (itemType === ITEM_TYPE.UNKNOWN) {
@ -565,6 +597,7 @@ async function mergeRecord(
); );
} }
} catch (err) { } catch (err) {
hasError = true;
window.log.error( window.log.error(
'storageService.mergeRecord: merging record failed', 'storageService.mergeRecord: merging record failed',
storageID, storageID,
@ -574,6 +607,7 @@ async function mergeRecord(
return { return {
hasConflict, hasConflict,
hasError,
isUnsupported, isUnsupported,
itemType, itemType,
storageID, storageID,
@ -607,7 +641,10 @@ async function processManifest(
window.storage.get('storage-service-unknown-records') || []; window.storage.get('storage-service-unknown-records') || [];
unknownRecordsArray.forEach((record: UnknownRecord) => { unknownRecordsArray.forEach((record: UnknownRecord) => {
localKeys.push(record.storageID); // Do not include any unknown records that we already support
if (!validRecordTypes.has(record.itemType)) {
localKeys.push(record.storageID);
}
}); });
window.log.info( window.log.info(
@ -616,9 +653,14 @@ async function processManifest(
const remoteKeys = Array.from(remoteKeysTypeMap.keys()); const remoteKeys = Array.from(remoteKeysTypeMap.keys());
const remoteOnly = remoteKeys.filter( const remoteOnlySet: Set<string> = new Set();
(key: string) => !localKeys.includes(key) remoteKeys.forEach((key: string) => {
); if (!localKeys.includes(key)) {
remoteOnlySet.add(key);
}
});
const remoteOnly = Array.from(remoteOnlySet);
window.log.info( window.log.info(
`storageService.processManifest: remoteOnly.length ${remoteOnly.length}` `storageService.processManifest: remoteOnly.length ${remoteOnly.length}`
@ -694,7 +736,7 @@ async function processManifest(
storageRecord, storageRecord,
}; };
}, },
{ concurrency: 50 } { concurrency: 5 }
); );
// Merge Account records last // Merge Account records last
@ -713,7 +755,7 @@ async function processManifest(
concurrency: 5, concurrency: 5,
}); });
window.log.info( window.log.info(
`storageService.processManifest: Merged ${mergedRecords.length} records` `storageService.processManifest: Processed ${mergedRecords.length} records`
); );
const unknownRecords: Map<string, UnknownRecord> = new Map(); const unknownRecords: Map<string, UnknownRecord> = new Map();
@ -721,21 +763,89 @@ async function processManifest(
unknownRecords.set(record.storageID, record); unknownRecords.set(record.storageID, record);
}); });
const hasConflict = mergedRecords.some((mergedRecord: MergedRecordType) => { const recordsWithErrors: Array<UnknownRecord> = [];
let hasConflict = false;
mergedRecords.forEach((mergedRecord: MergedRecordType) => {
if (mergedRecord.isUnsupported) { if (mergedRecord.isUnsupported) {
unknownRecords.set(mergedRecord.storageID, { unknownRecords.set(mergedRecord.storageID, {
itemType: mergedRecord.itemType, itemType: mergedRecord.itemType,
storageID: mergedRecord.storageID, storageID: mergedRecord.storageID,
}); });
} else if (mergedRecord.hasError) {
recordsWithErrors.push({
itemType: mergedRecord.itemType,
storageID: mergedRecord.storageID,
});
} }
return mergedRecord.hasConflict;
hasConflict = hasConflict || mergedRecord.hasConflict;
}); });
window.storage.put( // Filter out all the unknown records we're already supporting
'storage-service-unknown-records', const newUnknownRecords = Array.from(unknownRecords.values()).filter(
Array.from(unknownRecords.values()) (record: UnknownRecord) => !validRecordTypes.has(record.itemType)
); );
window.log.info(
`storageService.processManifest: Found ${newUnknownRecords.length} unknown records`
);
window.storage.put('storage-service-unknown-records', newUnknownRecords);
window.log.info(
`storageService.processManifest: Found ${recordsWithErrors.length} records with errors`
);
window.storage.put('storage-service-error-records', recordsWithErrors);
const now = Date.now();
// if the remote only keys are larger or equal to our local keys then it
// was likely a forced push of storage service. We keep track of these
// merges so that we can detect possible infinite loops
if (remoteOnly.length >= localKeys.length) {
window.log.info(
'storageService.processManifest: remote manifest was likely force pushed',
now
);
forcedPushBucket.push(now);
// we need to check our conversations because maybe all of them were not
// updated properly, for those that weren't we'll clear their storage
// key so that they can be included in the next update
window.getConversations().forEach((conversation: ConversationModel) => {
const storageID = conversation.get('storageID');
if (storageID && !remoteOnlySet.has(storageID)) {
window.log.info(
'storageService.processManifest: conversation was not included in remote force push, clearing storageID',
conversation.debugID(),
storageID
);
conversation.unset('storageID');
}
});
if (forcedPushBucket.length >= 3) {
const [firstMostRecentForcedPush] = forcedPushBucket;
if (now - firstMostRecentForcedPush < 5 * MINUTE) {
window.log.info(
'storageService.processManifest: thrasing? Backing off'
);
const error = new Error();
error.code = 'E_BACKOFF';
throw error;
}
window.log.info(
'storageService.processManifest: thrash timestamp of first -> now',
firstMostRecentForcedPush,
now
);
forcedPushBucket.shift();
}
}
if (hasConflict) { if (hasConflict) {
window.log.info( window.log.info(
'storageService.processManifest: Conflict found, uploading changes' 'storageService.processManifest: Conflict found, uploading changes'
@ -756,24 +866,20 @@ async function processManifest(
return false; return false;
} }
// Exported functions async function sync(): Promise<void> {
export async function runStorageServiceSyncJob(): Promise<void> {
if (!isEnabled('desktop.storage')) { if (!isEnabled('desktop.storage')) {
window.log.info( window.log.info(
'storageService.runStorageServiceSyncJob: Not starting desktop.storage is falsey' 'storageService.sync: Not starting desktop.storage is falsey'
); );
return; return;
} }
if (!window.storage.get('storageKey')) { if (!window.storage.get('storageKey')) {
throw new Error( throw new Error('storageService.sync: Cannot start; no storage key!');
'storageService.runStorageServiceSyncJob: Cannot start; no storage key!'
);
} }
window.log.info('storageService.runStorageServiceSyncJob: starting...'); window.log.info('storageService.sync: starting...');
try { try {
const localManifestVersion = window.storage.get('manifestVersion') || 0; const localManifestVersion = window.storage.get('manifestVersion') || 0;
@ -781,45 +887,103 @@ export async function runStorageServiceSyncJob(): Promise<void> {
// Guarding against no manifests being returned, everything should be ok // Guarding against no manifests being returned, everything should be ok
if (!manifest) { if (!manifest) {
window.log.info( window.log.info('storageService.sync: no new manifest');
'storageService.runStorageServiceSyncJob: no new manifest'
);
return; return;
} }
const version = manifest.version.toNumber(); const version = manifest.version.toNumber();
window.log.info( window.log.info(
`storageService.runStorageServiceSyncJob: manifest versions - previous: ${localManifestVersion}, current: ${version}` `storageService.sync: manifest versions - previous: ${localManifestVersion}, current: ${version}`
); );
window.storage.put('manifestVersion', version);
const hasConflicts = await processManifest(manifest); const hasConflicts = await processManifest(manifest);
if (hasConflicts) { if (hasConflicts) {
await storageServiceUploadJob(); await upload();
} }
window.storage.put('manifestVersion', version);
} catch (err) { } catch (err) {
window.log.error( window.log.error(
`storageService.runStorageServiceSyncJob: error processing manifest ${ `storageService.sync: error processing manifest ${
err && err.stack ? err.stack : String(err) err && err.stack ? err.stack : String(err)
}` }`
); );
// When we're told to backoff, backoff to the max which should be
// ~5 minutes. If this job was running inside a queue it'll probably time
// out.
if (err.code === 'E_BACKOFF') {
await backOff(9001);
}
} }
window.log.info('storageService.runStorageServiceSyncJob: complete'); window.log.info('storageService.sync: complete');
} }
// Note: this function must be called at startup once we handle unknown records async function upload(): Promise<void> {
// of a certain type. This way once the runStorageServiceSyncJob function runs if (!isEnabled('desktop.storage')) {
// it'll pick up the new storage IDs and process them accordingly. window.log.info(
export function handleUnknownRecords(itemType: number): void { 'storageService.upload: Not starting desktop.storage is falsey'
const unknownRecordsArray = );
window.storage.get('storage-service-unknown-records') || [];
const newUnknownRecords = unknownRecordsArray.filter( return;
(record: UnknownRecord) => record.itemType !== itemType }
if (!isEnabled('desktop.storageWrite')) {
window.log.info(
'storageService.upload: Not starting desktop.storageWrite is falsey'
);
return;
}
if (!window.textsecure.messaging) {
throw new Error('storageService.upload: We are offline!');
}
if (!window.storage.get('storageKey')) {
// requesting new keys runs the sync job which will detect the conflict
// and re-run the upload job once we're merged and up-to-date.
window.log.info(
'storageService.upload: no storageKey, requesting new keys'
);
consecutiveStops = 0;
await window.textsecure.messaging.sendRequestKeySyncMessage();
return;
}
const localManifestVersion = window.storage.get('manifestVersion') || 0;
const version = Number(localManifestVersion) + 1;
window.log.info(
'storageService.upload: will update to manifest version',
version
); );
window.storage.put('storage-service-unknown-records', newUnknownRecords);
try {
const generatedManifest = await generateManifest(version);
await uploadManifest(version, generatedManifest);
} catch (err) {
if (err.code === 409) {
await backOff(consecutiveConflicts);
window.log.info('storageService.upload: pushing sync on the queue');
// The sync job will check for conflicts and as part of that conflict
// check if an item needs sync and doesn't match with the remote record
// it'll kick off another upload.
setTimeout(runStorageServiceSyncJob);
return;
}
window.log.error(
'storageService.upload',
err && err.stack ? err.stack : String(err)
);
}
}
let storageServiceEnabled = false;
export function enableStorageService(): void {
storageServiceEnabled = true;
} }
// Note: this function is meant to be called before ConversationController is hydrated. // Note: this function is meant to be called before ConversationController is hydrated.
@ -835,56 +999,24 @@ export async function eraseAllStorageServiceState(): Promise<void> {
window.log.info('storageService.eraseAllStorageServiceState: complete'); window.log.info('storageService.eraseAllStorageServiceState: complete');
} }
async function nondebouncedStorageServiceUploadJob(): Promise<void> { export const storageServiceUploadJob = _.debounce(() => {
if (!isEnabled('desktop.storage')) { if (!storageServiceEnabled) {
window.log.info( window.log.info(
'storageService.storageServiceUploadJob: Not starting desktop.storage is falsey' 'storageService.storageServiceUploadJob: called before enabled'
); );
return;
}
if (!isEnabled('desktop.storageWrite')) {
window.log.info(
'storageService.storageServiceUploadJob: Not starting desktop.storageWrite is falsey'
);
return; return;
} }
if (!window.textsecure.messaging) { storageJobQueue(upload, `upload v${window.storage.get('manifestVersion')}`);
throw new Error('storageService.storageServiceUploadJob: We are offline!'); }, 500);
}
if (!window.storage.get('storageKey')) { export const runStorageServiceSyncJob = _.debounce(() => {
// requesting new keys runs the sync job which will detect the conflict if (!storageServiceEnabled) {
// and re-run the upload job once we're merged and up-to-date.
window.log.info( window.log.info(
'storageService.storageServiceUploadJob: no storageKey, requesting new keys' 'storageService.runStorageServiceSyncJob: called before enabled'
); );
consecutiveStops = 0;
await window.textsecure.messaging.sendRequestKeySyncMessage();
return; return;
} }
const localManifestVersion = window.storage.get('manifestVersion') || 0; storageJobQueue(sync, `sync v${window.storage.get('manifestVersion')}`);
const version = Number(localManifestVersion) + 1; }, 500);
window.log.info(
'storageService.storageServiceUploadJob: will update to manifest version',
version
);
try {
await uploadManifest(version, await generateManifest(version));
} catch (err) {
if (err.code === 409) {
await backOff(consecutiveConflicts);
await runStorageServiceSyncJob();
}
}
}
export const storageServiceUploadJob = _.debounce(
nondebouncedStorageServiceUploadJob,
500
);

View file

@ -303,10 +303,7 @@ export async function mergeGroupV1Record(
groupV1Record: GroupV1RecordClass groupV1Record: GroupV1RecordClass
): Promise<boolean> { ): Promise<boolean> {
if (!groupV1Record.id) { if (!groupV1Record.id) {
window.log.info( throw new Error(`No ID for ${storageID}`);
`storageService.mergeGroupV1Record: no ID for ${storageID}`
);
return false;
} }
const groupId = groupV1Record.id.toBinary(); const groupId = groupV1Record.id.toBinary();
@ -316,10 +313,11 @@ export async function mergeGroupV1Record(
// record if we have one; otherwise we'll just drop this update. // record if we have one; otherwise we'll just drop this update.
const conversation = window.ConversationController.get(groupId); const conversation = window.ConversationController.get(groupId);
if (!conversation) { if (!conversation) {
window.log.warn( throw new Error(`No conversation for group(${groupId})`);
`storageService.mergeGroupV1Record: No conversation for group(${groupId})` }
);
return false; if (!conversation.isGroupV1()) {
throw new Error(`Record has group type mismatch ${conversation.debugID()}`);
} }
conversation.set({ conversation.set({
@ -347,10 +345,7 @@ export async function mergeGroupV2Record(
groupV2Record: GroupV2RecordClass groupV2Record: GroupV2RecordClass
): Promise<boolean> { ): Promise<boolean> {
if (!groupV2Record.masterKey) { if (!groupV2Record.masterKey) {
window.log.info( throw new Error(`No master key for ${storageID}`);
`storageService.mergeGroupV2Record: no master key for ${storageID}`
);
return false;
} }
const masterKeyBuffer = groupV2Record.masterKey.toArrayBuffer(); const masterKeyBuffer = groupV2Record.masterKey.toArrayBuffer();
@ -375,9 +370,7 @@ export async function mergeGroupV2Record(
const conversation = window.ConversationController.get(conversationId); const conversation = window.ConversationController.get(conversationId);
if (!conversation) { if (!conversation) {
throw new Error( throw new Error(`No conversation for groupv2(${groupId})`);
`storageService.mergeGroupV2Record: No conversation for groupv2(${groupId})`
);
} }
conversation.maybeRepairGroupV2({ conversation.maybeRepairGroupV2({
@ -433,10 +426,7 @@ export async function mergeContactRecord(
}); });
if (!id) { if (!id) {
window.log.info( throw new Error(`No ID for ${storageID}`);
`storageService.mergeContactRecord: no ID for ${storageID}`
);
return false;
} }
const conversation = await window.ConversationController.getOrCreateAndWait( const conversation = await window.ConversationController.getOrCreateAndWait(
@ -623,7 +613,7 @@ export async function mergeAccountRecord(
const ourID = window.ConversationController.getOurConversationId(); const ourID = window.ConversationController.getOurConversationId();
if (!ourID) { if (!ourID) {
return false; throw new Error('Could not find ourID');
} }
const conversation = await window.ConversationController.getOrCreateAndWait( const conversation = await window.ConversationController.getOrCreateAndWait(

View file

@ -2271,7 +2271,7 @@ async function eraseStorageServiceStateFromConversations() {
await db.run( await db.run(
`UPDATE conversations SET `UPDATE conversations SET
json = json_remove(json, '$.storageID', '$.needsStorageServiceSync', '$.unknownFields'); json = json_remove(json, '$.storageID', '$.needsStorageServiceSync', '$.unknownFields', '$.storageProfileKey');
` `
); );
} }

14
ts/util/JobQueue.ts Normal file
View file

@ -0,0 +1,14 @@
import PQueue from 'p-queue';
import createTaskWithTimeout from '../textsecure/TaskWithTimeout';
function createJobQueue(label: string) {
const jobQueue = new PQueue({ concurrency: 1 });
return (job: () => Promise<void>, id = '') => {
const taskWithTimeout = createTaskWithTimeout(job, `${label} ${id}`);
return jobQueue.add(taskWithTimeout);
};
}
export const storageJobQueue = createJobQueue('storageService');

2
ts/window.d.ts vendored
View file

@ -182,8 +182,8 @@ declare global {
RemoteConfig: typeof RemoteConfig; RemoteConfig: typeof RemoteConfig;
Services: { Services: {
calling: CallingClass; calling: CallingClass;
enableStorageService: () => boolean;
eraseAllStorageServiceState: () => Promise<void>; eraseAllStorageServiceState: () => Promise<void>;
handleUnknownRecords: (param: WhatIsThis) => void;
initializeGroupCredentialFetcher: () => void; initializeGroupCredentialFetcher: () => void;
initializeNetworkObserver: (network: WhatIsThis) => void; initializeNetworkObserver: (network: WhatIsThis) => void;
initializeUpdateListener: ( initializeUpdateListener: (