GroupV2: Don't apply last state from the logs if skipped

This commit is contained in:
Fedor Indutny 2022-03-31 11:22:40 -07:00 committed by GitHub
parent 99687a4b5b
commit e4b5b75988
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 111 additions and 122 deletions

View file

@ -7,7 +7,6 @@ import {
flatten, flatten,
fromPairs, fromPairs,
isNumber, isNumber,
last,
values, values,
} from 'lodash'; } from 'lodash';
import Long from 'long'; import Long from 'long';
@ -3585,11 +3584,12 @@ async function getGroupDelta({
}); });
const currentRevision = group.revision; const currentRevision = group.revision;
let latestRevision = newRevision; let includeFirstState = true;
const isFirstFetch = !isNumber(currentRevision);
let revisionToFetch = isNumber(currentRevision) // The range is inclusive so make sure that we always request the revision
? currentRevision + 1 // that we are currently at since we might want the latest full state in
: undefined; // `integrateGroupChanges`.
let revisionToFetch = isNumber(currentRevision) ? currentRevision : undefined;
let response; let response;
const changes: Array<Proto.IGroupChanges> = []; const changes: Array<Proto.IGroupChanges> = [];
@ -3598,7 +3598,7 @@ async function getGroupDelta({
response = await sender.getGroupLog( response = await sender.getGroupLog(
{ {
startVersion: revisionToFetch, startVersion: revisionToFetch,
includeFirstState: isFirstFetch, includeFirstState,
includeLastState: true, includeLastState: true,
maxSupportedChangeEpoch: SUPPORTED_CHANGE_EPOCH, maxSupportedChangeEpoch: SUPPORTED_CHANGE_EPOCH,
}, },
@ -3609,13 +3609,10 @@ async function getGroupDelta({
revisionToFetch = response.end + 1; revisionToFetch = response.end + 1;
} }
if (latestRevision === undefined) { includeFirstState = false;
latestRevision = response.currentRevision ?? response.end;
}
} while ( } while (
response.end && response.end &&
latestRevision !== undefined && (newRevision === undefined || response.end < newRevision)
response.end < latestRevision
); );
// Would be nice to cache the unused groupChanges here, to reduce server roundtrips // Would be nice to cache the unused groupChanges here, to reduce server roundtrips
@ -3623,8 +3620,7 @@ async function getGroupDelta({
return integrateGroupChanges({ return integrateGroupChanges({
changes, changes,
group, group,
newRevision: latestRevision, newRevision,
serverPublicParamsBase64,
}); });
} }
@ -3632,17 +3628,15 @@ async function integrateGroupChanges({
group, group,
newRevision, newRevision,
changes, changes,
serverPublicParamsBase64,
}: { }: {
group: ConversationAttributesType; group: ConversationAttributesType;
newRevision: number | undefined; newRevision: number | undefined;
changes: Array<Proto.IGroupChanges>; changes: Array<Proto.IGroupChanges>;
serverPublicParamsBase64: string;
}): Promise<UpdatesResultType> { }): Promise<UpdatesResultType> {
const logId = idForLogging(group.groupId); const logId = idForLogging(group.groupId);
let attributes = group; let attributes = group;
const finalMessages: Array<Array<GroupChangeMessageType>> = []; const finalMessages: Array<Array<GroupChangeMessageType>> = [];
let finalMembers: Array<Array<MemberType>> = []; const finalMembers: Array<Array<MemberType>> = [];
const imax = changes.length; const imax = changes.length;
for (let i = 0; i < imax; i += 1) { for (let i = 0; i < imax; i += 1) {
@ -3690,62 +3684,6 @@ async function integrateGroupChanges({
} }
} }
const lastState = last(last(changes)?.groupChanges)?.groupState;
// Apply the last included state if present to make sure that we didn't miss
// anything in the log processing above.
if (lastState) {
try {
const { newAttributes, groupChangeMessages, members } =
await integrateGroupChange({
group: attributes,
newRevision,
groupState: lastState,
});
if (groupChangeMessages.length !== 0 || finalMembers.length !== 0) {
assert(
groupChangeMessages.length === 0,
'Fallback group state processing should not kick in'
);
log.warn(
`integrateGroupChanges/${logId}: local state was different from ` +
'the remote final state. ' +
`Got ${groupChangeMessages.length} change messages, and ` +
`${finalMembers.length} updated members`
);
}
attributes = newAttributes;
finalMessages.push(groupChangeMessages);
finalMembers.push(members);
} catch (error) {
log.error(
`integrateGroupChanges/${logId}: Failed to apply final state`,
Errors.toLogFormat(error)
);
}
} else if (attributes.lastFetchedEpoch !== SUPPORTED_CHANGE_EPOCH) {
log.info(
`integrateGroupChanges(${logId}): last fetched epoch ` +
`${group.lastFetchedEpoch ?? '?'} is stale. ` +
'Refreshing group state'
);
const {
newAttributes: updatedAttributes,
groupChangeMessages: extraChanges,
members: updatedMembers,
} = await updateGroupViaState({
group: attributes,
serverPublicParamsBase64,
});
attributes = updatedAttributes;
finalMessages.push(extraChanges);
finalMembers = [updatedMembers];
}
// If this is our first fetch, we will collapse this down to one set of messages // If this is our first fetch, we will collapse this down to one set of messages
const isFirstFetch = !isNumber(group.revision); const isFirstFetch = !isNumber(group.revision);
if (isFirstFetch) { if (isFirstFetch) {
@ -3812,6 +3750,7 @@ async function integrateGroupChange({
// These need to be populated from the groupChange. But we might not get one! // These need to be populated from the groupChange. But we might not get one!
let isChangeSupported = false; let isChangeSupported = false;
let isSameVersion = false;
let isMoreThanOneVersionUp = false; let isMoreThanOneVersionUp = false;
let groupChangeActions: undefined | Proto.GroupChange.IActions; let groupChangeActions: undefined | Proto.GroupChange.IActions;
let decryptedChangeActions: undefined | DecryptedGroupChangeActions; let decryptedChangeActions: undefined | DecryptedGroupChangeActions;
@ -3822,11 +3761,16 @@ async function integrateGroupChange({
groupChange.actions || new Uint8Array(0) groupChange.actions || new Uint8Array(0)
); );
// Version is higher that what we have in the incoming message
if ( if (
groupChangeActions.version && groupChangeActions.version &&
newRevision !== undefined && newRevision !== undefined &&
groupChangeActions.version > newRevision groupChangeActions.version > newRevision
) { ) {
log.info(
`integrateGroupChange/${logId}: Skipping ` +
`${groupChangeActions.version}, newRevision is ${newRevision}`
);
return { return {
newAttributes: group, newAttributes: group,
groupChangeMessages: [], groupChangeMessages: [],
@ -3851,32 +3795,79 @@ async function integrateGroupChange({
!isNumber(groupChange.changeEpoch) || !isNumber(groupChange.changeEpoch) ||
groupChange.changeEpoch <= SUPPORTED_CHANGE_EPOCH; groupChange.changeEpoch <= SUPPORTED_CHANGE_EPOCH;
isMoreThanOneVersionUp = Boolean( // Version is lower or the same as what we currently have
groupChangeActions.version && if (group.revision !== undefined && groupChangeActions.version) {
isNumber(group.revision) && if (groupChangeActions.version < group.revision) {
groupChangeActions.version > group.revision + 1 log.info(
); `integrateGroupChange/${logId}: Skipping stale version` +
`${groupChangeActions.version}, current ` +
`revision is ${group.revision}`
);
return {
newAttributes: group,
groupChangeMessages: [],
members: [],
};
}
if (groupChangeActions.version === group.revision) {
isSameVersion = true;
} else if (groupChangeActions.version > group.revision + 1) {
isMoreThanOneVersionUp = true;
}
}
} }
if ( let attributes = group;
!groupChange || const aggregatedChangeMessages = [];
!isChangeSupported || const aggregatedMembers = [];
isFirstFetch ||
(groupState && group.lastFetchedEpoch !== SUPPORTED_CHANGE_EPOCH) || const canApplyChange =
(isMoreThanOneVersionUp && !weAreAwaitingApproval) groupChange &&
) { isChangeSupported &&
if (!groupState) { !isSameVersion &&
!isFirstFetch &&
(!isMoreThanOneVersionUp || weAreAwaitingApproval);
// Apply the change first
if (canApplyChange) {
if (!sourceUuid || !groupChangeActions || !decryptedChangeActions) {
throw new Error( throw new Error(
`integrateGroupChange/${logId}: No group state, but we can't apply changes!` `integrateGroupChange/${logId}: Missing necessary information that should have come from group actions`
); );
} }
log.info( log.info(
`integrateGroupChange/${logId}: Applying full group state, from version ${group.revision} to ${groupState.version}`, `integrateGroupChange/${logId}: Applying group change actions, ` +
`from version ${group.revision} to ${groupChangeActions.version}`
);
const { newAttributes, newProfileKeys } = await applyGroupChange({
group,
actions: decryptedChangeActions,
sourceUuid,
});
const groupChangeMessages = extractDiffs({
old: attributes,
current: newAttributes,
sourceUuid,
});
attributes = newAttributes;
aggregatedChangeMessages.push(groupChangeMessages);
aggregatedMembers.push(profileKeysToMembers(newProfileKeys));
}
// Apply the group state afterwards to verify that we didn't miss anything
if (groupState) {
log.info(
`integrateGroupChange/${logId}: Applying full group state, ` +
`from version ${group.revision} to ${groupState.version}`,
{ {
isChangePresent: Boolean(groupChange), isChangePresent: Boolean(groupChange),
isChangeSupported, isChangeSupported,
isFirstFetch, isFirstFetch,
isSameVersion,
isMoreThanOneVersionUp, isMoreThanOneVersionUp,
weAreAwaitingApproval, weAreAwaitingApproval,
} }
@ -3889,47 +3880,47 @@ async function integrateGroupChange({
); );
const { newAttributes, newProfileKeys } = await applyGroupState({ const { newAttributes, newProfileKeys } = await applyGroupState({
group, group: attributes,
groupState: decryptedGroupState, groupState: decryptedGroupState,
sourceUuid: isFirstFetch ? sourceUuid : undefined, sourceUuid: isFirstFetch ? sourceUuid : undefined,
}); });
return { const groupChangeMessages = extractDiffs({
newAttributes, old: attributes,
groupChangeMessages: extractDiffs({ current: newAttributes,
old: group, sourceUuid: isFirstFetch ? sourceUuid : undefined,
current: newAttributes, });
sourceUuid: isFirstFetch ? sourceUuid : undefined,
}),
members: profileKeysToMembers(newProfileKeys),
};
}
if (!sourceUuid || !groupChangeActions || !decryptedChangeActions) { const newMembers = profileKeysToMembers(newProfileKeys);
throw new Error(
`integrateGroupChange/${logId}: Missing necessary information that should have come from group actions` if (groupChangeMessages.length !== 0 || newMembers.length !== 0) {
assert(
groupChangeMessages.length === 0,
'Fallback group state processing should not kick in'
);
log.warn(
`integrateGroupChange/${logId}: local state was different from ` +
'the remote final state. ' +
`Got ${groupChangeMessages.length} change messages, and ` +
`${newMembers.length} updated members`
);
}
attributes = newAttributes;
aggregatedChangeMessages.push(groupChangeMessages);
aggregatedMembers.push(newMembers);
} else {
strictAssert(
canApplyChange,
`integrateGroupChange/${logId}: No group state, but we can't apply changes!`
); );
} }
log.info(
`integrateGroupChange/${logId}: Applying group change actions, from version ${group.revision} to ${groupChangeActions.version}`
);
const { newAttributes, newProfileKeys } = await applyGroupChange({
group,
actions: decryptedChangeActions,
sourceUuid,
});
const groupChangeMessages = extractDiffs({
old: group,
current: newAttributes,
sourceUuid,
});
return { return {
newAttributes, newAttributes: attributes,
groupChangeMessages, groupChangeMessages: aggregatedChangeMessages.flat(),
members: profileKeysToMembers(newProfileKeys), members: aggregatedMembers.flat(),
}; };
} }
@ -5017,7 +5008,6 @@ async function applyGroupState({
// version // version
result.revision = version; result.revision = version;
result.lastFetchedEpoch = SUPPORTED_CHANGE_EPOCH;
// title // title
// Note: During decryption, title becomes a GroupAttributeBlob // Note: During decryption, title becomes a GroupAttributeBlob

1
ts/model-types.d.ts vendored
View file

@ -332,7 +332,6 @@ export type ConversationAttributesType = {
secretParams?: string; secretParams?: string;
publicParams?: string; publicParams?: string;
revision?: number; revision?: number;
lastFetchedEpoch?: number;
senderKeyInfo?: SenderKeyInfoType; senderKeyInfo?: SenderKeyInfoType;
// GroupV2 other fields // GroupV2 other fields