Batch redux conversation changed / added actions

This commit is contained in:
trevor-signal 2024-11-11 19:37:10 -05:00 committed by GitHub
parent 84b7cb4116
commit 22d4b1d194
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 504 additions and 186 deletions

View file

@ -3,10 +3,10 @@
import { isNumber, groupBy, throttle } from 'lodash'; import { isNumber, groupBy, throttle } from 'lodash';
import { render } from 'react-dom'; import { render } from 'react-dom';
import { batch as batchDispatch } from 'react-redux';
import PQueue from 'p-queue'; import PQueue from 'p-queue';
import pMap from 'p-map'; import pMap from 'p-map';
import { v7 as generateUuid } from 'uuid'; import { v7 as generateUuid } from 'uuid';
import { batch as batchDispatch } from 'react-redux';
import * as Registration from './util/registration'; import * as Registration from './util/registration';
import MessageReceiver from './textsecure/MessageReceiver'; import MessageReceiver from './textsecure/MessageReceiver';
@ -1166,67 +1166,106 @@ export async function startApp(): Promise<void> {
const convoCollection = window.getConversations(); const convoCollection = window.getConversations();
const { const {
conversationAdded, conversationsUpdated,
conversationChanged,
conversationRemoved, conversationRemoved,
removeAllConversations, removeAllConversations,
onConversationClosed, onConversationClosed,
} = window.reduxActions.conversations; } = window.reduxActions.conversations;
convoCollection.on('remove', conversation => { // Conversation add/update/remove actions are batched in this batcher to ensure
const { id } = conversation || {}; // that we retain correct orderings
const convoUpdateBatcher = createBatcher<
onConversationClosed(id, 'removed'); | { type: 'change' | 'add'; conversation: ConversationModel }
conversationRemoved(id); | { type: 'remove'; id: string }
}); >({
convoCollection.on('add', conversation => {
if (!conversation) {
return;
}
conversationAdded(conversation.id, conversation.format());
});
const changedConvoBatcher = createBatcher<ConversationModel>({
name: 'changedConvoBatcher', name: 'changedConvoBatcher',
processBatch(batch) { processBatch(batch) {
const deduped = new Set(batch); let changedOrAddedBatch = new Array<ConversationModel>();
log.info( function flushChangedOrAddedBatch() {
'changedConvoBatcher: deduped ' + if (!changedOrAddedBatch.length) {
`${batch.length} into ${deduped.size}` return;
}
conversationsUpdated(
changedOrAddedBatch.map(conversation => conversation.format())
); );
changedOrAddedBatch = [];
}
batchDispatch(() => { batchDispatch(() => {
deduped.forEach(conversation => { for (const item of batch) {
conversationChanged(conversation.id, conversation.format()); if (item.type === 'add' || item.type === 'change') {
}); changedOrAddedBatch.push(item.conversation);
} else {
strictAssert(item.type === 'remove', 'must be remove');
flushChangedOrAddedBatch();
onConversationClosed(item.id, 'removed');
conversationRemoved(item.id);
}
}
flushChangedOrAddedBatch();
}); });
}, },
wait: () => {
if (backupsService.isImportRunning()) {
return 500;
}
if (messageReceiver && !messageReceiver.hasEmptied()) {
return 250;
}
// This delay ensures that the .format() call isn't synchronous as a // This delay ensures that the .format() call isn't synchronous as a
// Backbone property is changed. Important because our _byUuid/_byE164 // Backbone property is changed. Important because our _byUuid/_byE164
// lookups aren't up-to-date as the change happens; just a little bit // lookups aren't up-to-date as the change happens; just a little bit
// after. // after.
wait: 1, return 1;
},
maxSize: Infinity, maxSize: Infinity,
}); });
convoCollection.on('props-change', (conversation, isBatched) => { convoCollection.on('add', (conversation: ConversationModel | undefined) => {
if (!conversation) {
return;
}
if (
backupsService.isImportRunning() ||
!window.reduxStore.getState().app.hasInitialLoadCompleted
) {
convoUpdateBatcher.add({ type: 'add', conversation });
} else {
// During normal app usage, we require conversations to be added synchronously
conversationsUpdated([conversation.format()]);
}
});
convoCollection.on('remove', conversation => {
const { id } = conversation || {};
convoUpdateBatcher.add({ type: 'remove', id });
});
convoCollection.on(
'props-change',
(conversation: ConversationModel | undefined, isBatched?: boolean) => {
if (!conversation) { if (!conversation) {
return; return;
} }
// `isBatched` is true when the `.set()` call on the conversation model // `isBatched` is true when the `.set()` call on the conversation model already
// already runs from within `react-redux`'s batch. Instead of batching // runs from within `react-redux`'s batch. Instead of batching the redux update
// the redux update for later - clear all queued updates and update // for later, update immediately. To ensure correct update ordering, only do this
// immediately. // optimization if there are no other pending conversation updates
if (isBatched) { if (isBatched && !convoUpdateBatcher.anyPending()) {
changedConvoBatcher.removeAll(conversation); conversationsUpdated([conversation.format()]);
conversationChanged(conversation.id, conversation.format());
return; return;
} }
changedConvoBatcher.add(conversation); convoUpdateBatcher.add({ type: 'change', conversation });
}); }
);
// Called by SignalProtocolStore#removeAllData() // Called by SignalProtocolStore#removeAllData()
convoCollection.on('reset', removeAllConversations); convoCollection.on('reset', removeAllConversations);

View file

@ -79,7 +79,7 @@ export type ImportOptionsType = Readonly<{
export class BackupsService { export class BackupsService {
private isStarted = false; private isStarted = false;
private isRunning = false; private isRunning: 'import' | 'export' | false = false;
private downloadController: AbortController | undefined; private downloadController: AbortController | undefined;
private downloadRetryPromise: private downloadRetryPromise:
| ExplodePromiseResultType<RetryBackupImportValue> | ExplodePromiseResultType<RetryBackupImportValue>
@ -275,7 +275,7 @@ export class BackupsService {
window.IPC.startTrackingQueryStats(); window.IPC.startTrackingQueryStats();
log.info(`importBackup: starting ${backupType}...`); log.info(`importBackup: starting ${backupType}...`);
this.isRunning = true; this.isRunning = 'import';
try { try {
const importStream = await BackupImportStream.create(backupType); const importStream = await BackupImportStream.create(backupType);
@ -531,7 +531,7 @@ export class BackupsService {
strictAssert(!this.isRunning, 'BackupService is already running'); strictAssert(!this.isRunning, 'BackupService is already running');
log.info('exportBackup: starting...'); log.info('exportBackup: starting...');
this.isRunning = true; this.isRunning = 'export';
try { try {
// TODO (DESKTOP-7168): Update mock-server to support this endpoint // TODO (DESKTOP-7168): Update mock-server to support this endpoint
@ -594,6 +594,13 @@ export class BackupsService {
log.error('Backup: periodic refresh failed', Errors.toLogFormat(error)); log.error('Backup: periodic refresh failed', Errors.toLogFormat(error));
} }
} }
public isImportRunning(): boolean {
return this.isRunning === 'import';
}
public isExportRunning(): boolean {
return this.isRunning === 'export';
}
} }
export const backupsService = new BackupsService(); export const backupsService = new BackupsService();

View file

@ -14,8 +14,7 @@ export function reloadSelectedConversation(): void {
return; return;
} }
conversation.cachedProps = undefined; conversation.cachedProps = undefined;
window.reduxActions.conversations.conversationChanged( window.reduxActions.conversations.conversationsUpdated([
conversation.id, conversation.format(),
conversation.format() ]);
);
} }

View file

@ -19,7 +19,7 @@ import type {
MessageDeletedActionType, MessageDeletedActionType,
MessageChangedActionType, MessageChangedActionType,
TargetedConversationChangedActionType, TargetedConversationChangedActionType,
ConversationChangedActionType, ConversationsUpdatedActionType,
} from './conversations'; } from './conversations';
import * as log from '../../logging/log'; import * as log from '../../logging/log';
import { isAudio } from '../../types/Attachment'; import { isAudio } from '../../types/Attachment';
@ -184,7 +184,7 @@ function setPlaybackRate(
void, void,
RootStateType, RootStateType,
unknown, unknown,
SetPlaybackRate | ConversationChangedActionType SetPlaybackRate | ConversationsUpdatedActionType
> { > {
return (dispatch, getState) => { return (dispatch, getState) => {
const { audioPlayer } = getState(); const { audioPlayer } = getState();

View file

@ -67,7 +67,7 @@ import { sleep } from '../../util/sleep';
import { LatestQueue } from '../../util/LatestQueue'; import { LatestQueue } from '../../util/LatestQueue';
import type { AciString, ServiceIdString } from '../../types/ServiceId'; import type { AciString, ServiceIdString } from '../../types/ServiceId';
import type { import type {
ConversationChangedActionType, ConversationsUpdatedActionType,
ConversationRemovedActionType, ConversationRemovedActionType,
} from './conversations'; } from './conversations';
import { getConversationCallMode, updateLastMessage } from './conversations'; import { getConversationCallMode, updateLastMessage } from './conversations';
@ -959,7 +959,7 @@ export type CallingActionType =
| CallStateChangeFulfilledActionType | CallStateChangeFulfilledActionType
| ChangeIODeviceFulfilledActionType | ChangeIODeviceFulfilledActionType
| CloseNeedPermissionScreenActionType | CloseNeedPermissionScreenActionType
| ConversationChangedActionType | ConversationsUpdatedActionType
| ConversationRemovedActionType | ConversationRemovedActionType
| DeclineCallActionType | DeclineCallActionType
| GroupCallAudioLevelsChangeActionType | GroupCallAudioLevelsChangeActionType
@ -3071,16 +3071,28 @@ export function reducer(
}; };
} }
if (action.type === 'CONVERSATION_CHANGED') { if (action.type === 'CONVERSATIONS_UPDATED') {
const activeCall = getActiveCall(state); const activeCall = getActiveCall(state);
const { activeCallState } = state; const { activeCallState } = state;
if ( if (
activeCallState?.state === 'Waiting' || activeCallState?.state === 'Waiting' ||
!activeCallState?.outgoingRing || !activeCallState?.outgoingRing ||
activeCallState.conversationId !== action.payload.id ||
!isGroupOrAdhocCallState(activeCall) || !isGroupOrAdhocCallState(activeCall) ||
activeCall.joinState !== GroupCallJoinState.NotJoined || activeCall.joinState !== GroupCallJoinState.NotJoined
!isConversationTooBigToRing(action.payload.data) ) {
return state;
}
const conversationForActiveCall = action.payload.data
.slice()
// reverse list since last update takes precedence
.reverse()
.find(conversation => conversation.id === activeCall?.conversationId);
if (
!conversationForActiveCall ||
!isConversationTooBigToRing(conversationForActiveCall)
) { ) {
return state; return state;
} }

View file

@ -711,18 +711,10 @@ type SetPreJoinConversationActionType = ReadonlyDeep<{
}; };
}>; }>;
type ConversationAddedActionType = ReadonlyDeep<{ export type ConversationsUpdatedActionType = ReadonlyDeep<{
type: 'CONVERSATION_ADDED'; type: 'CONVERSATIONS_UPDATED';
payload: { payload: {
id: string; data: Array<ConversationType>;
data: ConversationType;
};
}>;
export type ConversationChangedActionType = ReadonlyDeep<{
type: 'CONVERSATION_CHANGED';
payload: {
id: string;
data: ConversationType;
}; };
}>; }>;
export type ConversationRemovedActionType = ReadonlyDeep<{ export type ConversationRemovedActionType = ReadonlyDeep<{
@ -1025,8 +1017,7 @@ export type ConversationActionType =
| ComposeReplaceAvatarsActionType | ComposeReplaceAvatarsActionType
| ComposeSaveAvatarActionType | ComposeSaveAvatarActionType
| ConsumePreloadDataActionType | ConsumePreloadDataActionType
| ConversationAddedActionType | ConversationsUpdatedActionType
| ConversationChangedActionType
| ConversationRemovedActionType | ConversationRemovedActionType
| ConversationStoppedByMissingVerificationActionType | ConversationStoppedByMissingVerificationActionType
| ConversationUnloadedActionType | ConversationUnloadedActionType
@ -1107,8 +1098,7 @@ export const actions = {
composeReplaceAvatar, composeReplaceAvatar,
composeSaveAvatarToDisk, composeSaveAvatarToDisk,
consumePreloadData, consumePreloadData,
conversationAdded, conversationsUpdated,
conversationChanged,
conversationRemoved, conversationRemoved,
conversationStoppedByMissingVerification, conversationStoppedByMissingVerification,
createGroup, createGroup,
@ -2448,7 +2438,7 @@ export function setVoiceNotePlaybackRate({
}: { }: {
conversationId: string; conversationId: string;
rate: number; rate: number;
}): ThunkAction<void, RootStateType, unknown, ConversationChangedActionType> { }): ThunkAction<void, RootStateType, unknown, ConversationsUpdatedActionType> {
return async dispatch => { return async dispatch => {
const conversationModel = window.ConversationController.get(conversationId); const conversationModel = window.ConversationController.get(conversationId);
if (conversationModel) { if (conversationModel) {
@ -2462,13 +2452,14 @@ export function setVoiceNotePlaybackRate({
if (conversation) { if (conversation) {
dispatch({ dispatch({
type: 'CONVERSATION_CHANGED', type: 'CONVERSATIONS_UPDATED',
payload: { payload: {
id: conversationId, data: [
data: { {
...conversation, ...conversation,
voiceNotePlaybackRate: rate, voiceNotePlaybackRate: rate,
}, },
],
}, },
}); });
} }
@ -2688,29 +2679,17 @@ function setPreJoinConversation(
}, },
}; };
} }
function conversationAdded(
id: string,
data: ConversationType
): ConversationAddedActionType {
return {
type: 'CONVERSATION_ADDED',
payload: {
id,
data,
},
};
}
function conversationChanged(
id: string,
data: ConversationType
): ThunkAction<void, RootStateType, unknown, ConversationChangedActionType> {
return dispatch => {
calling.groupMembersChanged(id);
function conversationsUpdated(
data: Array<ConversationType>
): ThunkAction<void, RootStateType, unknown, ConversationsUpdatedActionType> {
return dispatch => {
for (const conversation of data) {
calling.groupMembersChanged(conversation.id);
}
dispatch({ dispatch({
type: 'CONVERSATION_CHANGED', type: 'CONVERSATIONS_UPDATED',
payload: { payload: {
id,
data, data,
}, },
}); });
@ -4684,8 +4663,8 @@ export function getEmptyState(): ConversationsStateType {
} }
export function updateConversationLookups( export function updateConversationLookups(
added: ConversationType | undefined, added: ReadonlyArray<ConversationType> | undefined,
removed: ConversationType | undefined, removed: ReadonlyArray<ConversationType> | undefined,
state: ConversationsStateType state: ConversationsStateType
): Pick< ): Pick<
ConversationsStateType, ConversationsStateType,
@ -4700,69 +4679,137 @@ export function updateConversationLookups(
conversationsByGroupId: state.conversationsByGroupId, conversationsByGroupId: state.conversationsByGroupId,
conversationsByUsername: state.conversationsByUsername, conversationsByUsername: state.conversationsByUsername,
}; };
const removedE164s = removed?.map(convo => convo.e164).filter(isNotNil);
const removedServiceIds = removed
?.map(convo => convo.serviceId)
.filter(isNotNil);
const removedPnis = removed?.map(convo => convo.pni).filter(isNotNil);
const removedGroupIds = removed?.map(convo => convo.groupId).filter(isNotNil);
const removedUsernames = removed
?.map(convo => convo.username)
.filter(isNotNil);
if (removed && removed.e164) { if (removedE164s?.length) {
result.conversationsByE164 = omit(result.conversationsByE164, removed.e164); result.conversationsByE164 = omit(result.conversationsByE164, removedE164s);
} }
if (removed && removed.serviceId) {
if (removedServiceIds?.length) {
result.conversationsByServiceId = omit( result.conversationsByServiceId = omit(
result.conversationsByServiceId, result.conversationsByServiceId,
removed.serviceId removedServiceIds
); );
} }
if (removed && removed.pni) { if (removedPnis?.length) {
result.conversationsByServiceId = omit( result.conversationsByServiceId = omit(
result.conversationsByServiceId, result.conversationsByServiceId,
removed.pni removedPnis
); );
} }
if (removed && removed.groupId) { if (removedGroupIds?.length) {
result.conversationsByGroupId = omit( result.conversationsByGroupId = omit(
result.conversationsByGroupId, result.conversationsByGroupId,
removed.groupId removedGroupIds
); );
} }
if (removed && removed.username) { if (removedUsernames?.length) {
result.conversationsByUsername = omit( result.conversationsByUsername = omit(
result.conversationsByUsername, result.conversationsByUsername,
removed.username removedUsernames
); );
} }
if (added && added.e164) { function isFirstElementNotNil(val: Array<unknown>) {
return val[0] != null;
}
const addedE164s = added
?.map(convo => [convo.e164, convo])
.filter(isFirstElementNotNil);
const addedServiceIds = added
?.map(convo => [convo.serviceId, convo])
.filter(isFirstElementNotNil);
const addedPnis = added
?.map(convo => [convo.pni, convo])
.filter(isFirstElementNotNil);
const addedGroupIds = added
?.map(convo => [convo.groupId, convo])
.filter(isFirstElementNotNil);
const addedUsernames = added
?.map(convo => [convo.username, convo])
.filter(isFirstElementNotNil);
if (addedE164s?.length) {
result.conversationsByE164 = { result.conversationsByE164 = {
...result.conversationsByE164, ...result.conversationsByE164,
[added.e164]: added, ...Object.fromEntries(addedE164s),
}; };
} }
if (added && added.serviceId) { if (addedServiceIds?.length) {
result.conversationsByServiceId = { result.conversationsByServiceId = {
...result.conversationsByServiceId, ...result.conversationsByServiceId,
[added.serviceId]: added, ...Object.fromEntries(addedServiceIds),
}; };
} }
if (added && added.pni) { if (addedPnis?.length) {
result.conversationsByServiceId = { result.conversationsByServiceId = {
...result.conversationsByServiceId, ...result.conversationsByServiceId,
[added.pni]: added, ...Object.fromEntries(addedPnis),
}; };
} }
if (added && added.groupId) { if (addedGroupIds?.length) {
result.conversationsByGroupId = { result.conversationsByGroupId = {
...result.conversationsByGroupId, ...result.conversationsByGroupId,
[added.groupId]: added, ...Object.fromEntries(addedGroupIds),
}; };
} }
if (added && added.username) { if (addedUsernames?.length) {
result.conversationsByUsername = { result.conversationsByUsername = {
...result.conversationsByUsername, ...result.conversationsByUsername,
[added.username]: added, ...Object.fromEntries(addedUsernames),
}; };
} }
return result; return result;
} }
function updateRootStateDueToConversationUpdate(
state: ConversationsStateType,
conversation: ConversationType
): ConversationsStateType {
if (state.selectedConversationId !== conversation.id) {
return state;
}
let { showArchived } = state;
const { selectedConversationId, conversationLookup } = state;
const existing = conversationLookup[conversation.id];
const keysToOmit: Array<keyof ConversationsStateType> = [];
const keyValuesToAdd: { hasContactSpoofingReview?: false } = {};
// Archived -> Inbox: we go back to the normal inbox view
if (existing.isArchived && !conversation.isArchived) {
showArchived = false;
}
// Inbox -> Archived: no conversation is selected
// Note: With today's stacked conversations architecture, this can result in weird
// behavior - no selected conversation in the left pane, but a conversation show
// in the right pane.
if (!existing.isArchived && conversation.isArchived) {
keysToOmit.push('selectedConversationId');
}
if (!existing.isBlocked && conversation.isBlocked) {
keyValuesToAdd.hasContactSpoofingReview = false;
}
return {
...omit(state, keysToOmit),
...keyValuesToAdd,
selectedConversationId,
showArchived,
};
}
function closeComposerModal( function closeComposerModal(
state: Readonly<ConversationsStateType>, state: Readonly<ConversationsStateType>,
modalToClose: 'maximumGroupSizeModalState' | 'recommendedGroupSizeModalState' modalToClose: 'maximumGroupSizeModalState' | 'recommendedGroupSizeModalState'
@ -4992,7 +5039,7 @@ function updateNicknameAndNote(
}); });
await DataWriter.updateConversation(conversationModel.attributes); await DataWriter.updateConversation(conversationModel.attributes);
const conversation = conversationModel.format(); const conversation = conversationModel.format();
dispatch(conversationChanged(conversationId, conversation)); dispatch(conversationsUpdated([conversation]));
conversationModel.captureChange('nicknameAndNote'); conversationModel.captureChange('nicknameAndNote');
}; };
} }
@ -5277,66 +5324,39 @@ export function reducer(
preJoinConversation: data, preJoinConversation: data,
}; };
} }
if (action.type === 'CONVERSATION_ADDED') { if (action.type === 'CONVERSATIONS_UPDATED') {
const { payload } = action; const { payload } = action;
const { id, data } = payload; const { data: conversations } = payload;
const { conversationLookup } = state;
return {
...state,
conversationLookup: {
...conversationLookup,
[id]: data,
},
...updateConversationLookups(data, undefined, state),
};
}
if (action.type === 'CONVERSATION_CHANGED') {
const { payload } = action;
const { id, data } = payload;
const { conversationLookup } = state; const { conversationLookup } = state;
const { selectedConversationId } = state; const { selectedConversationId } = state;
let { showArchived } = state;
const existing = conversationLookup[id]; const selectedConversation = conversations.find(
// We only modify the lookup if we already had that conversation and the conversation convo => convo.id === selectedConversationId
// changed. );
if (!existing || data === existing) {
return state; let updatedState = state;
if (selectedConversation) {
updatedState = updateRootStateDueToConversationUpdate(
state,
selectedConversation
);
} }
const keysToOmit: Array<keyof ConversationsStateType> = []; const existingConversations = conversations
const keyValuesToAdd: { hasContactSpoofingReview?: false } = {}; .map(conversation => conversationLookup[conversation.id])
.filter(isNotNil);
if (selectedConversationId === id) { const newConversationLookup = { ...conversationLookup };
// Archived -> Inbox: we go back to the normal inbox view for (const conversation of conversations) {
if (existing.isArchived && !data.isArchived) { newConversationLookup[conversation.id] = conversation;
showArchived = false;
}
// Inbox -> Archived: no conversation is selected
// Note: With today's stacked conversations architecture, this can result in weird
// behavior - no selected conversation in the left pane, but a conversation show
// in the right pane.
if (!existing.isArchived && data.isArchived) {
keysToOmit.push('selectedConversationId');
}
if (!existing.isBlocked && data.isBlocked) {
keyValuesToAdd.hasContactSpoofingReview = false;
}
} }
return { return {
...omit(state, keysToOmit), ...updatedState,
...keyValuesToAdd, conversationLookup: newConversationLookup,
selectedConversationId, ...updateConversationLookups(conversations, existingConversations, state),
showArchived,
conversationLookup: {
...conversationLookup,
[id]: data,
},
...updateConversationLookups(data, existing, state),
}; };
} }
if (action.type === 'CONVERSATION_REMOVED') { if (action.type === 'CONVERSATION_REMOVED') {
@ -5345,6 +5365,7 @@ export function reducer(
const { conversationLookup } = state; const { conversationLookup } = state;
const existing = getOwn(conversationLookup, id); const existing = getOwn(conversationLookup, id);
onConversationClosed(id, 'removed');
// No need to make a change if we didn't have a record of this conversation! // No need to make a change if we didn't have a record of this conversation!
if (!existing) { if (!existing) {
return state; return state;
@ -5353,7 +5374,7 @@ export function reducer(
return { return {
...state, ...state,
conversationLookup: omit(conversationLookup, [id]), conversationLookup: omit(conversationLookup, [id]),
...updateConversationLookups(undefined, existing, state), ...updateConversationLookups(undefined, [existing], state),
}; };
} }
if (action.type === CONVERSATION_UNLOADED) { if (action.type === CONVERSATION_UNLOADED) {
@ -6383,7 +6404,7 @@ export function reducer(
...conversationLookup, ...conversationLookup,
[id]: data, [id]: data,
}, },
...updateConversationLookups(data, undefined, state), ...updateConversationLookups([data], undefined, state),
}; };
} }
@ -6844,7 +6865,7 @@ export function reducer(
Object.assign( Object.assign(
nextState, nextState,
updateConversationLookups(added, existing, nextState), updateConversationLookups([added], [existing], nextState),
{ {
conversationLookup: { conversationLookup: {
...nextState.conversationLookup, ...nextState.conversationLookup,
@ -6880,7 +6901,7 @@ export function reducer(
...conversationLookup, ...conversationLookup,
[conversationId]: changed, [conversationId]: changed,
}, },
...updateConversationLookups(changed, existing, state), ...updateConversationLookups([changed], [existing], state),
}; };
} }
@ -6908,7 +6929,7 @@ export function reducer(
Object.assign( Object.assign(
nextState, nextState,
updateConversationLookups(changed, existing, nextState), updateConversationLookups([changed], [existing], nextState),
{ {
conversationLookup: { conversationLookup: {
...nextState.conversationLookup, ...nextState.conversationLookup,
@ -6941,7 +6962,7 @@ export function reducer(
...conversationLookup, ...conversationLookup,
[conversationId]: changed, [conversationId]: changed,
}, },
...updateConversationLookups(changed, conversation, state), ...updateConversationLookups([changed], [conversation], state),
}; };
} }

View file

@ -23,6 +23,7 @@ import type {
TargetedConversationChangedActionType, TargetedConversationChangedActionType,
ToggleConversationInChooseMembersActionType, ToggleConversationInChooseMembersActionType,
MessageChangedActionType, MessageChangedActionType,
ConversationsUpdatedActionType,
} from '../../../state/ducks/conversations'; } from '../../../state/ducks/conversations';
import { import {
TARGETED_CONVERSATION_CHANGED, TARGETED_CONVERSATION_CHANGED,
@ -37,7 +38,12 @@ import {
import { ReadStatus } from '../../../messages/MessageReadStatus'; import { ReadStatus } from '../../../messages/MessageReadStatus';
import type { SingleServePromiseIdString } from '../../../services/singleServePromise'; import type { SingleServePromiseIdString } from '../../../services/singleServePromise';
import { CallMode } from '../../../types/CallDisposition'; import { CallMode } from '../../../types/CallDisposition';
import { generateAci, getAciFromPrefix } from '../../../types/ServiceId'; import {
type AciString,
type PniString,
generateAci,
getAciFromPrefix,
} from '../../../types/ServiceId';
import { generateStoryDistributionId } from '../../../types/StoryDistributionId'; import { generateStoryDistributionId } from '../../../types/StoryDistributionId';
import { import {
getDefaultConversation, getDefaultConversation,
@ -62,6 +68,7 @@ import {
} from '../../../state/ducks/storyDistributionLists'; } from '../../../state/ducks/storyDistributionLists';
import { MY_STORY_ID } from '../../../types/Stories'; import { MY_STORY_ID } from '../../../types/Stories';
import type { ReadonlyMessageAttributesType } from '../../../model-types.d'; import type { ReadonlyMessageAttributesType } from '../../../model-types.d';
import { strictAssert } from '../../../util/assert';
const { const {
clearGroupCreationError, clearGroupCreationError,
@ -255,7 +262,7 @@ describe('both/state/ducks/conversations', () => {
'e164-added': added, 'e164-added': added,
}; };
const actual = updateConversationLookups(added, removed, state); const actual = updateConversationLookups([added], [removed], state);
assert.deepEqual(actual.conversationsByE164, expected); assert.deepEqual(actual.conversationsByE164, expected);
assert.strictEqual( assert.strictEqual(
@ -289,7 +296,7 @@ describe('both/state/ducks/conversations', () => {
[added.serviceId]: added, [added.serviceId]: added,
}; };
const actual = updateConversationLookups(added, removed, state); const actual = updateConversationLookups([added], [removed], state);
assert.strictEqual( assert.strictEqual(
state.conversationsByE164, state.conversationsByE164,
@ -310,9 +317,9 @@ describe('both/state/ducks/conversations', () => {
serviceId: undefined, serviceId: undefined,
}); });
const state = { const state: ConversationsStateType = {
...getEmptyState(), ...getEmptyState(),
conversationsBygroupId: { conversationsByGroupId: {
'groupId-removed': removed, 'groupId-removed': removed,
}, },
}; };
@ -327,7 +334,7 @@ describe('both/state/ducks/conversations', () => {
'groupId-added': added, 'groupId-added': added,
}; };
const actual = updateConversationLookups(added, removed, state); const actual = updateConversationLookups([added], [removed], state);
assert.strictEqual( assert.strictEqual(
state.conversationsByE164, state.conversationsByE164,
@ -339,6 +346,93 @@ describe('both/state/ducks/conversations', () => {
); );
assert.deepEqual(actual.conversationsByGroupId, expected); assert.deepEqual(actual.conversationsByGroupId, expected);
}); });
it('adds and removes multiple conversations', () => {
const removed = getDefaultConversation({
id: 'id-removed',
groupId: 'groupId-removed',
e164: 'e164-removed',
serviceId: 'serviceId-removed' as unknown as AciString,
pni: 'pni-removed' as unknown as PniString,
username: 'username-removed',
});
const stable = getDefaultConversation({
id: 'id-stable',
groupId: 'groupId-stable',
e164: 'e164-stable',
serviceId: 'serviceId-stable' as unknown as AciString,
pni: 'pni-stable' as unknown as PniString,
username: 'username-stable',
});
const state: ConversationsStateType = {
...getEmptyState(),
conversationsByServiceId: {
'serviceId-removed': removed,
'serviceId-stable': stable,
'pni-removed': removed,
'pni-stable': stable,
},
conversationsByE164: {
'e164-removed': removed,
'e164-stable': stable,
},
conversationsByGroupId: {
'groupId-removed': removed,
'groupId-stable': stable,
},
conversationsByUsername: {
'username-removed': removed,
'username-stable': stable,
},
};
const added1 = getDefaultConversation({
id: 'id-added1',
groupId: 'groupId-added1',
e164: 'e164-added1',
serviceId: 'serviceId-added1' as unknown as AciString,
pni: 'pni-added1' as unknown as PniString,
username: 'username-added1',
});
const added2 = getDefaultConversation({
id: 'id-added2',
groupId: 'groupId-added2',
e164: undefined,
serviceId: undefined,
pni: undefined,
username: undefined,
});
const actual = {
...state,
...updateConversationLookups([added1, added2], [removed], state),
};
const expected = {
...getEmptyState(),
conversationsByServiceId: {
'serviceId-added1': added1,
'pni-added1': added1,
'serviceId-stable': stable,
'pni-stable': stable,
},
conversationsByE164: {
'e164-added1': added1,
'e164-stable': stable,
},
conversationsByGroupId: {
'groupId-added1': added1,
'groupId-stable': stable,
'groupId-added2': added2,
},
conversationsByUsername: {
'username-added1': added1,
'username-stable': stable,
},
};
assert.deepEqual(actual, expected);
});
}); });
}); });
@ -2498,5 +2592,143 @@ describe('both/state/ducks/conversations', () => {
}); });
}); });
}); });
describe('CONVERSATIONS_UPDATED', () => {
it('adds and updates multiple conversations', () => {
const conversation1 = getDefaultConversation();
const conversation2 = getDefaultConversation();
const newConversation = getDefaultConversation();
strictAssert(conversation1.serviceId, 'must exist');
strictAssert(conversation1.e164, 'must exist');
strictAssert(conversation2.serviceId, 'must exist');
strictAssert(conversation2.e164, 'must exist');
strictAssert(newConversation.serviceId, 'must exist');
strictAssert(newConversation.e164, 'must exist');
const state = {
...getEmptyState(),
conversationLookup: {
[conversation1.id]: conversation1,
[conversation2.id]: conversation2,
},
conversationsByE164: {
[conversation1.e164]: conversation1,
[conversation2.e164]: conversation2,
},
conversationsByServiceId: {
[conversation1.serviceId]: conversation1,
[conversation2.serviceId]: conversation2,
},
};
const updatedConversation1 = {
...conversation1,
e164: undefined,
title: 'new title',
};
const updatedConversation2 = {
...conversation2,
active_at: 12345,
};
const updatedConversation2Again = {
...conversation2,
active_at: 98765,
};
const action: ConversationsUpdatedActionType = {
type: 'CONVERSATIONS_UPDATED',
payload: {
data: [
updatedConversation1,
updatedConversation2,
newConversation,
updatedConversation2Again,
],
},
};
const actual = reducer(state, action);
const expected: ConversationsStateType = {
...state,
conversationLookup: {
[conversation1.id]: updatedConversation1,
[conversation2.id]: updatedConversation2Again,
[newConversation.id]: newConversation,
},
conversationsByE164: {
[conversation2.e164]: updatedConversation2Again,
[newConversation.e164]: newConversation,
},
conversationsByServiceId: {
[conversation1.serviceId]: updatedConversation1,
[conversation2.serviceId]: updatedConversation2Again,
[newConversation.serviceId]: newConversation,
},
};
assert.deepEqual(actual, expected);
});
it('updates root state if conversation is selected', () => {
const conversation1 = getDefaultConversation({ isArchived: true });
const conversation2 = getDefaultConversation();
strictAssert(conversation1.serviceId, 'must exist');
strictAssert(conversation1.e164, 'must exist');
strictAssert(conversation2.serviceId, 'must exist');
strictAssert(conversation2.e164, 'must exist');
const state: ConversationsStateType = {
...getEmptyState(),
selectedConversationId: conversation1.id,
showArchived: true,
conversationLookup: {
[conversation1.id]: conversation1,
[conversation2.id]: conversation2,
},
conversationsByE164: {
[conversation1.e164]: conversation1,
[conversation2.e164]: conversation2,
},
conversationsByServiceId: {
[conversation1.serviceId]: conversation1,
[conversation2.serviceId]: conversation2,
},
};
const updatedConversation1 = {
...conversation1,
isArchived: false,
};
const updatedConversation2 = {
...conversation2,
active_at: 12345,
};
const action: ConversationsUpdatedActionType = {
type: 'CONVERSATIONS_UPDATED',
payload: {
data: [updatedConversation1, updatedConversation2],
},
};
const actual = reducer(state, action);
const expected: ConversationsStateType = {
...state,
showArchived: false,
conversationLookup: {
[conversation1.id]: updatedConversation1,
[conversation2.id]: updatedConversation2,
},
conversationsByE164: {
[conversation1.e164]: updatedConversation1,
[conversation2.e164]: updatedConversation2,
},
conversationsByServiceId: {
[conversation1.serviceId]: updatedConversation1,
[conversation2.serviceId]: updatedConversation2,
},
};
assert.deepEqual(actual, expected);
});
});
}); });
}); });

View file

@ -36,7 +36,7 @@ window.waitForAllBatchers = async () => {
export type BatcherOptionsType<ItemType> = { export type BatcherOptionsType<ItemType> = {
name: string; name: string;
wait: number; wait: number | (() => number);
maxSize: number; maxSize: number;
processBatch: (items: Array<ItemType>) => void | Promise<void>; processBatch: (items: Array<ItemType>) => void | Promise<void>;
}; };
@ -56,12 +56,20 @@ export function createBatcher<ItemType>(
let batcher: BatcherType<ItemType>; let batcher: BatcherType<ItemType>;
let timeout: NodeJS.Timeout | null; let timeout: NodeJS.Timeout | null;
let items: Array<ItemType> = []; let items: Array<ItemType> = [];
const queue = new PQueue({ const queue = new PQueue({
concurrency: 1, concurrency: 1,
timeout: MINUTE * 30, timeout: MINUTE * 30,
throwOnTimeout: true, throwOnTimeout: true,
}); });
function _getWait() {
if (typeof options.wait === 'number') {
return options.wait;
}
return options.wait();
}
function _kickBatchOff() { function _kickBatchOff() {
clearTimeoutIfNecessary(timeout); clearTimeoutIfNecessary(timeout);
timeout = null; timeout = null;
@ -81,7 +89,7 @@ export function createBatcher<ItemType>(
if (items.length === 1) { if (items.length === 1) {
// Set timeout once when we just pushed the first item so that the wait // Set timeout once when we just pushed the first item so that the wait
// time is bounded by `options.wait` and not extended by further pushes. // time is bounded by `options.wait` and not extended by further pushes.
timeout = setTimeout(_kickBatchOff, options.wait); timeout = setTimeout(_kickBatchOff, _getWait());
} else if (items.length >= options.maxSize) { } else if (items.length >= options.maxSize) {
_kickBatchOff(); _kickBatchOff();
} }
@ -104,7 +112,7 @@ export function createBatcher<ItemType>(
if (items.length > 0) { if (items.length > 0) {
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
await sleep(options.wait * 2); await sleep(_getWait() * 2);
} }
} }
} }