From e4b5b759882a62ce5842723526ee5f839d07bb9f Mon Sep 17 00:00:00 2001 From: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com> Date: Thu, 31 Mar 2022 11:22:40 -0700 Subject: [PATCH] GroupV2: Don't apply last state from the logs if skipped --- ts/groups.ts | 232 +++++++++++++++++++++----------------------- ts/model-types.d.ts | 1 - 2 files changed, 111 insertions(+), 122 deletions(-) diff --git a/ts/groups.ts b/ts/groups.ts index c91eddcace..3ba4e68cc8 100644 --- a/ts/groups.ts +++ b/ts/groups.ts @@ -7,7 +7,6 @@ import { flatten, fromPairs, isNumber, - last, values, } from 'lodash'; import Long from 'long'; @@ -3585,11 +3584,12 @@ async function getGroupDelta({ }); const currentRevision = group.revision; - let latestRevision = newRevision; - const isFirstFetch = !isNumber(currentRevision); - let revisionToFetch = isNumber(currentRevision) - ? currentRevision + 1 - : undefined; + let includeFirstState = true; + + // The range is inclusive so make sure that we always request the revision + // that we are currently at since we might want the latest full state in + // `integrateGroupChanges`. + let revisionToFetch = isNumber(currentRevision) ? currentRevision : undefined; let response; const changes: Array = []; @@ -3598,7 +3598,7 @@ async function getGroupDelta({ response = await sender.getGroupLog( { startVersion: revisionToFetch, - includeFirstState: isFirstFetch, + includeFirstState, includeLastState: true, maxSupportedChangeEpoch: SUPPORTED_CHANGE_EPOCH, }, @@ -3609,13 +3609,10 @@ async function getGroupDelta({ revisionToFetch = response.end + 1; } - if (latestRevision === undefined) { - latestRevision = response.currentRevision ?? response.end; - } + includeFirstState = false; } while ( response.end && - latestRevision !== undefined && - response.end < latestRevision + (newRevision === undefined || response.end < newRevision) ); // Would be nice to cache the unused groupChanges here, to reduce server roundtrips @@ -3623,8 +3620,7 @@ async function getGroupDelta({ return integrateGroupChanges({ changes, group, - newRevision: latestRevision, - serverPublicParamsBase64, + newRevision, }); } @@ -3632,17 +3628,15 @@ async function integrateGroupChanges({ group, newRevision, changes, - serverPublicParamsBase64, }: { group: ConversationAttributesType; newRevision: number | undefined; changes: Array; - serverPublicParamsBase64: string; }): Promise { const logId = idForLogging(group.groupId); let attributes = group; const finalMessages: Array> = []; - let finalMembers: Array> = []; + const finalMembers: Array> = []; const imax = changes.length; for (let i = 0; i < imax; i += 1) { @@ -3690,62 +3684,6 @@ async function integrateGroupChanges({ } } - const lastState = last(last(changes)?.groupChanges)?.groupState; - - // Apply the last included state if present to make sure that we didn't miss - // anything in the log processing above. - if (lastState) { - try { - const { newAttributes, groupChangeMessages, members } = - await integrateGroupChange({ - group: attributes, - newRevision, - groupState: lastState, - }); - - if (groupChangeMessages.length !== 0 || finalMembers.length !== 0) { - assert( - groupChangeMessages.length === 0, - 'Fallback group state processing should not kick in' - ); - - log.warn( - `integrateGroupChanges/${logId}: local state was different from ` + - 'the remote final state. ' + - `Got ${groupChangeMessages.length} change messages, and ` + - `${finalMembers.length} updated members` - ); - } - - attributes = newAttributes; - finalMessages.push(groupChangeMessages); - finalMembers.push(members); - } catch (error) { - log.error( - `integrateGroupChanges/${logId}: Failed to apply final state`, - Errors.toLogFormat(error) - ); - } - } else if (attributes.lastFetchedEpoch !== SUPPORTED_CHANGE_EPOCH) { - log.info( - `integrateGroupChanges(${logId}): last fetched epoch ` + - `${group.lastFetchedEpoch ?? '?'} is stale. ` + - 'Refreshing group state' - ); - const { - newAttributes: updatedAttributes, - groupChangeMessages: extraChanges, - members: updatedMembers, - } = await updateGroupViaState({ - group: attributes, - serverPublicParamsBase64, - }); - - attributes = updatedAttributes; - finalMessages.push(extraChanges); - finalMembers = [updatedMembers]; - } - // If this is our first fetch, we will collapse this down to one set of messages const isFirstFetch = !isNumber(group.revision); if (isFirstFetch) { @@ -3812,6 +3750,7 @@ async function integrateGroupChange({ // These need to be populated from the groupChange. But we might not get one! let isChangeSupported = false; + let isSameVersion = false; let isMoreThanOneVersionUp = false; let groupChangeActions: undefined | Proto.GroupChange.IActions; let decryptedChangeActions: undefined | DecryptedGroupChangeActions; @@ -3822,11 +3761,16 @@ async function integrateGroupChange({ groupChange.actions || new Uint8Array(0) ); + // Version is higher that what we have in the incoming message if ( groupChangeActions.version && newRevision !== undefined && groupChangeActions.version > newRevision ) { + log.info( + `integrateGroupChange/${logId}: Skipping ` + + `${groupChangeActions.version}, newRevision is ${newRevision}` + ); return { newAttributes: group, groupChangeMessages: [], @@ -3851,32 +3795,79 @@ async function integrateGroupChange({ !isNumber(groupChange.changeEpoch) || groupChange.changeEpoch <= SUPPORTED_CHANGE_EPOCH; - isMoreThanOneVersionUp = Boolean( - groupChangeActions.version && - isNumber(group.revision) && - groupChangeActions.version > group.revision + 1 - ); + // Version is lower or the same as what we currently have + if (group.revision !== undefined && groupChangeActions.version) { + if (groupChangeActions.version < group.revision) { + log.info( + `integrateGroupChange/${logId}: Skipping stale version` + + `${groupChangeActions.version}, current ` + + `revision is ${group.revision}` + ); + return { + newAttributes: group, + groupChangeMessages: [], + members: [], + }; + } + if (groupChangeActions.version === group.revision) { + isSameVersion = true; + } else if (groupChangeActions.version > group.revision + 1) { + isMoreThanOneVersionUp = true; + } + } } - if ( - !groupChange || - !isChangeSupported || - isFirstFetch || - (groupState && group.lastFetchedEpoch !== SUPPORTED_CHANGE_EPOCH) || - (isMoreThanOneVersionUp && !weAreAwaitingApproval) - ) { - if (!groupState) { + let attributes = group; + const aggregatedChangeMessages = []; + const aggregatedMembers = []; + + const canApplyChange = + groupChange && + isChangeSupported && + !isSameVersion && + !isFirstFetch && + (!isMoreThanOneVersionUp || weAreAwaitingApproval); + + // Apply the change first + if (canApplyChange) { + if (!sourceUuid || !groupChangeActions || !decryptedChangeActions) { throw new Error( - `integrateGroupChange/${logId}: No group state, but we can't apply changes!` + `integrateGroupChange/${logId}: Missing necessary information that should have come from group actions` ); } log.info( - `integrateGroupChange/${logId}: Applying full group state, from version ${group.revision} to ${groupState.version}`, + `integrateGroupChange/${logId}: Applying group change actions, ` + + `from version ${group.revision} to ${groupChangeActions.version}` + ); + + const { newAttributes, newProfileKeys } = await applyGroupChange({ + group, + actions: decryptedChangeActions, + sourceUuid, + }); + + const groupChangeMessages = extractDiffs({ + old: attributes, + current: newAttributes, + sourceUuid, + }); + + attributes = newAttributes; + aggregatedChangeMessages.push(groupChangeMessages); + aggregatedMembers.push(profileKeysToMembers(newProfileKeys)); + } + + // Apply the group state afterwards to verify that we didn't miss anything + if (groupState) { + log.info( + `integrateGroupChange/${logId}: Applying full group state, ` + + `from version ${group.revision} to ${groupState.version}`, { isChangePresent: Boolean(groupChange), isChangeSupported, isFirstFetch, + isSameVersion, isMoreThanOneVersionUp, weAreAwaitingApproval, } @@ -3889,47 +3880,47 @@ async function integrateGroupChange({ ); const { newAttributes, newProfileKeys } = await applyGroupState({ - group, + group: attributes, groupState: decryptedGroupState, sourceUuid: isFirstFetch ? sourceUuid : undefined, }); - return { - newAttributes, - groupChangeMessages: extractDiffs({ - old: group, - current: newAttributes, - sourceUuid: isFirstFetch ? sourceUuid : undefined, - }), - members: profileKeysToMembers(newProfileKeys), - }; - } + const groupChangeMessages = extractDiffs({ + old: attributes, + current: newAttributes, + sourceUuid: isFirstFetch ? sourceUuid : undefined, + }); - if (!sourceUuid || !groupChangeActions || !decryptedChangeActions) { - throw new Error( - `integrateGroupChange/${logId}: Missing necessary information that should have come from group actions` + const newMembers = profileKeysToMembers(newProfileKeys); + + if (groupChangeMessages.length !== 0 || newMembers.length !== 0) { + assert( + groupChangeMessages.length === 0, + 'Fallback group state processing should not kick in' + ); + + log.warn( + `integrateGroupChange/${logId}: local state was different from ` + + 'the remote final state. ' + + `Got ${groupChangeMessages.length} change messages, and ` + + `${newMembers.length} updated members` + ); + } + + attributes = newAttributes; + aggregatedChangeMessages.push(groupChangeMessages); + aggregatedMembers.push(newMembers); + } else { + strictAssert( + canApplyChange, + `integrateGroupChange/${logId}: No group state, but we can't apply changes!` ); } - log.info( - `integrateGroupChange/${logId}: Applying group change actions, from version ${group.revision} to ${groupChangeActions.version}` - ); - - const { newAttributes, newProfileKeys } = await applyGroupChange({ - group, - actions: decryptedChangeActions, - sourceUuid, - }); - const groupChangeMessages = extractDiffs({ - old: group, - current: newAttributes, - sourceUuid, - }); - return { - newAttributes, - groupChangeMessages, - members: profileKeysToMembers(newProfileKeys), + newAttributes: attributes, + groupChangeMessages: aggregatedChangeMessages.flat(), + members: aggregatedMembers.flat(), }; } @@ -5017,7 +5008,6 @@ async function applyGroupState({ // version result.revision = version; - result.lastFetchedEpoch = SUPPORTED_CHANGE_EPOCH; // title // Note: During decryption, title becomes a GroupAttributeBlob diff --git a/ts/model-types.d.ts b/ts/model-types.d.ts index 72187fc427..c0e8874605 100644 --- a/ts/model-types.d.ts +++ b/ts/model-types.d.ts @@ -332,7 +332,6 @@ export type ConversationAttributesType = { secretParams?: string; publicParams?: string; revision?: number; - lastFetchedEpoch?: number; senderKeyInfo?: SenderKeyInfoType; // GroupV2 other fields