Simplify messageReceiver initialization & reset

Co-authored-by: Scott Nonnenberg <scott@signal.org>
This commit is contained in:
trevor-signal 2024-04-02 16:43:20 -04:00 committed by GitHub
parent f057bc560f
commit dfd564e67f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 318 additions and 289 deletions

View file

@ -254,216 +254,8 @@ export async function startApp(): Promise<void> {
let challengeHandler: ChallengeHandler | undefined;
let routineProfileRefresher: RoutineProfileRefresher | undefined;
window.storage.onready(() => {
server = window.WebAPI.connect({
...window.textsecure.storage.user.getWebAPICredentials(),
hasStoriesDisabled: window.storage.get('hasStoriesDisabled', false),
});
window.textsecure.server = server;
window.textsecure.messaging = new window.textsecure.MessageSender(server);
challengeHandler = new ChallengeHandler({
storage: window.storage,
startQueue(conversationId: string) {
conversationJobQueue.resolveVerificationWaiter(conversationId);
},
requestChallenge(request) {
if (window.SignalCI) {
window.SignalCI.handleEvent('challenge', request);
return;
}
window.sendChallengeRequest(request);
},
async sendChallengeResponse(data) {
const { messaging } = window.textsecure;
if (!messaging) {
throw new Error('sendChallengeResponse: messaging is not available!');
}
await messaging.sendChallengeResponse(data);
},
onChallengeFailed() {
// TODO: DESKTOP-1530
// Display humanized `retryAfter`
window.reduxActions.toast.showToast({
toastType: ToastType.CaptchaFailed,
});
},
onChallengeSolved() {
window.reduxActions.toast.showToast({
toastType: ToastType.CaptchaSolved,
});
},
setChallengeStatus(challengeStatus) {
window.reduxActions.network.setChallengeStatus(challengeStatus);
},
});
window.Whisper.events.on('challengeResponse', response => {
if (!challengeHandler) {
throw new Error('Expected challenge handler to be there');
}
challengeHandler.onResponse(response);
});
window.Signal.challengeHandler = challengeHandler;
log.info('Initializing MessageReceiver');
messageReceiver = new MessageReceiver({
server,
storage: window.storage,
serverTrustRoot: window.getServerTrustRoot(),
});
function queuedEventListener<E extends Event>(
handler: (event: E) => Promise<void> | void,
track = true
): (event: E) => void {
return (event: E): void => {
drop(
eventHandlerQueue.add(
createTaskWithTimeout(async () => {
try {
await handler(event);
} finally {
// message/sent: Message.handleDataMessage has its own queue and will
// trigger this event itself when complete.
// error: Error processing (below) also has its own queue and
// self-trigger.
if (track) {
window.Whisper.events.trigger('incrementProgress');
}
}
}, `queuedEventListener(${event.type}, ${event.timeStamp})`)
)
);
};
}
messageReceiver.addEventListener(
'envelopeUnsealed',
queuedEventListener(onEnvelopeUnsealed, false)
);
messageReceiver.addEventListener(
'envelopeQueued',
queuedEventListener(onEnvelopeQueued, false)
);
messageReceiver.addEventListener(
'message',
queuedEventListener(onMessageReceived, false)
);
messageReceiver.addEventListener(
'delivery',
queuedEventListener(onDeliveryReceipt)
);
messageReceiver.addEventListener(
'contactSync',
queuedEventListener(onContactSync)
);
messageReceiver.addEventListener(
'sent',
queuedEventListener(onSentMessage, false)
);
messageReceiver.addEventListener(
'readSync',
queuedEventListener(onReadSync)
);
messageReceiver.addEventListener(
'viewSync',
queuedEventListener(onViewSync)
);
messageReceiver.addEventListener(
'read',
queuedEventListener(onReadReceipt)
);
messageReceiver.addEventListener(
'view',
queuedEventListener(onViewReceipt)
);
messageReceiver.addEventListener(
'error',
queuedEventListener(onError, false)
);
messageReceiver.addEventListener(
'decryption-error',
queuedEventListener((event: DecryptionErrorEvent): void => {
drop(onDecryptionErrorQueue.add(() => onDecryptionError(event)));
})
);
messageReceiver.addEventListener(
'invalid-plaintext',
queuedEventListener((event: InvalidPlaintextEvent): void => {
drop(
onDecryptionErrorQueue.add(() => onInvalidPlaintextMessage(event))
);
})
);
messageReceiver.addEventListener(
'retry-request',
queuedEventListener((event: RetryRequestEvent): void => {
drop(onRetryRequestQueue.add(() => onRetryRequest(event)));
})
);
messageReceiver.addEventListener('empty', queuedEventListener(onEmpty));
messageReceiver.addEventListener(
'configuration',
queuedEventListener(onConfiguration)
);
messageReceiver.addEventListener('typing', queuedEventListener(onTyping));
messageReceiver.addEventListener(
'sticker-pack',
queuedEventListener(onStickerPack)
);
messageReceiver.addEventListener(
'viewOnceOpenSync',
queuedEventListener(onViewOnceOpenSync)
);
messageReceiver.addEventListener(
'messageRequestResponse',
queuedEventListener(onMessageRequestResponse)
);
messageReceiver.addEventListener(
'profileKeyUpdate',
queuedEventListener(onProfileKeyUpdate)
);
messageReceiver.addEventListener(
'fetchLatest',
queuedEventListener(onFetchLatestSync)
);
messageReceiver.addEventListener('keys', queuedEventListener(onKeysSync));
messageReceiver.addEventListener(
'storyRecipientUpdate',
queuedEventListener(onStoryRecipientUpdate, false)
);
messageReceiver.addEventListener(
'callEventSync',
queuedEventListener(onCallEventSync, false)
);
messageReceiver.addEventListener(
'callLogEventSync',
queuedEventListener(onCallLogEventSync, false)
);
});
ourProfileKeyService.initialize(window.storage);
window.storage.onready(() => {
if (!window.storage.get('defaultConversationColor')) {
drop(
window.storage.put(
'defaultConversationColor',
DEFAULT_CONVERSATION_COLOR
)
);
}
});
window.SignalContext.activeWindowService.registerForChange(isActive => {
if (!isActive) {
window.reduxActions?.stories.setHasAllStoriesUnmuted(false);
@ -477,18 +269,6 @@ export async function startApp(): Promise<void> {
const reconnectBackOff = new BackOff(FIBONACCI_TIMEOUTS);
window.storage.onready(() => {
strictAssert(server, 'WebAPI not ready');
senderCertificateService.initialize({
server,
events: window.Whisper.events,
storage: window.storage,
});
areWeASubscriberService.update(window.storage, server);
});
const eventHandlerQueue = new PQueue({
concurrency: 1,
});
@ -700,9 +480,6 @@ export async function startApp(): Promise<void> {
}
}
log.info('Storage fetch');
drop(window.storage.fetch());
// We need this 'first' check because we don't want to start the app up any other time
// than the first time. And window.storage.fetch() will cause onready() to fire.
let first = true;
@ -712,7 +489,217 @@ export async function startApp(): Promise<void> {
}
first = false;
strictAssert(server !== undefined, 'WebAPI not ready');
server = window.WebAPI.connect({
...window.textsecure.storage.user.getWebAPICredentials(),
hasStoriesDisabled: window.storage.get('hasStoriesDisabled', false),
});
window.textsecure.server = server;
window.textsecure.messaging = new window.textsecure.MessageSender(server);
challengeHandler = new ChallengeHandler({
storage: window.storage,
startQueue(conversationId: string) {
conversationJobQueue.resolveVerificationWaiter(conversationId);
},
requestChallenge(request) {
if (window.SignalCI) {
window.SignalCI.handleEvent('challenge', request);
return;
}
window.sendChallengeRequest(request);
},
async sendChallengeResponse(data) {
const { messaging } = window.textsecure;
if (!messaging) {
throw new Error('sendChallengeResponse: messaging is not available!');
}
await messaging.sendChallengeResponse(data);
},
onChallengeFailed() {
// TODO: DESKTOP-1530
// Display humanized `retryAfter`
window.reduxActions.toast.showToast({
toastType: ToastType.CaptchaFailed,
});
},
onChallengeSolved() {
window.reduxActions.toast.showToast({
toastType: ToastType.CaptchaSolved,
});
},
setChallengeStatus(challengeStatus) {
window.reduxActions.network.setChallengeStatus(challengeStatus);
},
});
window.Whisper.events.on('challengeResponse', response => {
if (!challengeHandler) {
throw new Error('Expected challenge handler to be there');
}
challengeHandler.onResponse(response);
});
window.Signal.challengeHandler = challengeHandler;
log.info('Initializing MessageReceiver');
messageReceiver = new MessageReceiver({
server,
storage: window.storage,
serverTrustRoot: window.getServerTrustRoot(),
});
function queuedEventListener<E extends Event>(
handler: (event: E) => Promise<void> | void,
track = true
): (event: E) => void {
return (event: E): void => {
drop(
eventHandlerQueue.add(
createTaskWithTimeout(async () => {
try {
await handler(event);
} finally {
// message/sent: Message.handleDataMessage has its own queue and will
// trigger this event itself when complete.
// error: Error processing (below) also has its own queue and
// self-trigger.
if (track) {
window.Whisper.events.trigger('incrementProgress');
}
}
}, `queuedEventListener(${event.type}, ${event.timeStamp})`)
)
);
};
}
messageReceiver.addEventListener(
'envelopeUnsealed',
queuedEventListener(onEnvelopeUnsealed, false)
);
messageReceiver.addEventListener(
'envelopeQueued',
queuedEventListener(onEnvelopeQueued, false)
);
messageReceiver.addEventListener(
'message',
queuedEventListener(onMessageReceived, false)
);
messageReceiver.addEventListener(
'delivery',
queuedEventListener(onDeliveryReceipt)
);
messageReceiver.addEventListener(
'contactSync',
queuedEventListener(onContactSync)
);
messageReceiver.addEventListener(
'sent',
queuedEventListener(onSentMessage, false)
);
messageReceiver.addEventListener(
'readSync',
queuedEventListener(onReadSync)
);
messageReceiver.addEventListener(
'viewSync',
queuedEventListener(onViewSync)
);
messageReceiver.addEventListener(
'read',
queuedEventListener(onReadReceipt)
);
messageReceiver.addEventListener(
'view',
queuedEventListener(onViewReceipt)
);
messageReceiver.addEventListener(
'error',
queuedEventListener(onError, false)
);
messageReceiver.addEventListener(
'decryption-error',
queuedEventListener((event: DecryptionErrorEvent): void => {
drop(onDecryptionErrorQueue.add(() => onDecryptionError(event)));
})
);
messageReceiver.addEventListener(
'invalid-plaintext',
queuedEventListener((event: InvalidPlaintextEvent): void => {
drop(
onDecryptionErrorQueue.add(() => onInvalidPlaintextMessage(event))
);
})
);
messageReceiver.addEventListener(
'retry-request',
queuedEventListener((event: RetryRequestEvent): void => {
drop(onRetryRequestQueue.add(() => onRetryRequest(event)));
})
);
messageReceiver.addEventListener('empty', queuedEventListener(onEmpty));
messageReceiver.addEventListener(
'configuration',
queuedEventListener(onConfiguration)
);
messageReceiver.addEventListener('typing', queuedEventListener(onTyping));
messageReceiver.addEventListener(
'sticker-pack',
queuedEventListener(onStickerPack)
);
messageReceiver.addEventListener(
'viewOnceOpenSync',
queuedEventListener(onViewOnceOpenSync)
);
messageReceiver.addEventListener(
'messageRequestResponse',
queuedEventListener(onMessageRequestResponse)
);
messageReceiver.addEventListener(
'profileKeyUpdate',
queuedEventListener(onProfileKeyUpdate)
);
messageReceiver.addEventListener(
'fetchLatest',
queuedEventListener(onFetchLatestSync)
);
messageReceiver.addEventListener('keys', queuedEventListener(onKeysSync));
messageReceiver.addEventListener(
'storyRecipientUpdate',
queuedEventListener(onStoryRecipientUpdate, false)
);
messageReceiver.addEventListener(
'callEventSync',
queuedEventListener(onCallEventSync, false)
);
messageReceiver.addEventListener(
'callLogEventSync',
queuedEventListener(onCallLogEventSync, false)
);
if (!window.storage.get('defaultConversationColor')) {
drop(
window.storage.put(
'defaultConversationColor',
DEFAULT_CONVERSATION_COLOR
)
);
}
senderCertificateService.initialize({
server,
events: window.Whisper.events,
storage: window.storage,
});
areWeASubscriberService.update(window.storage, server);
void cleanupSessionResets();
@ -1149,6 +1136,10 @@ export async function startApp(): Promise<void> {
);
}
});
// end of window.storage.onready() callback
log.info('Storage fetch');
drop(window.storage.fetch());
function setupAppState({
mainWindowStats,
@ -1442,6 +1433,9 @@ export async function startApp(): Promise<void> {
drop(connect(true));
// Connect messageReceiver back to websocket
afterStart();
// Run storage service after linking
drop(runStorageService());
});
@ -1472,7 +1466,7 @@ export async function startApp(): Promise<void> {
);
if (isCoreDataValid && Registration.everDone()) {
void connect();
drop(connect());
window.reduxActions.app.openInbox();
} else {
window.IPC.readyForUpdates();
@ -1523,6 +1517,58 @@ export async function startApp(): Promise<void> {
resolveOnAppView();
resolveOnAppView = undefined;
}
afterStart();
}
function afterStart() {
strictAssert(messageReceiver, 'messageReceiver must be initialized');
strictAssert(server, 'server must be initialized');
log.info('afterStart(): emitting app-ready-for-processing');
window.Whisper.events.trigger('app-ready-for-processing');
const onOnline = () => {
log.info('background: online');
if (!remotelyExpired) {
drop(connect());
}
};
window.Whisper.events.on('online', onOnline);
const onOffline = () => {
log.info('background: offline');
drop(challengeHandler?.onOffline());
drop(AttachmentDownloads.stop());
drop(messageReceiver?.drain());
if (connectCount === 0) {
log.info('background: offline, never connected, showing inbox');
drop(onEmpty()); // this ensures that the loading screen is dismissed
// Switch to inbox view even if contact sync is still running
if (
window.reduxStore.getState().app.appView === AppViewType.Installer
) {
log.info('background: offline, opening inbox');
window.reduxActions.app.openInbox();
}
}
};
window.Whisper.events.on('offline', onOffline);
// Because these events may have already fired, we manually call their handlers.
// isOnline() will return undefined if neither of these events have been emitted.
if (server.isOnline() === true) {
onOnline();
} else if (server.isOnline() === false) {
onOffline();
}
server.registerRequestHandler(messageReceiver);
}
window.getSyncRequest = (timeoutMillis?: number) => {
@ -1558,41 +1604,6 @@ export async function startApp(): Promise<void> {
);
}
window.Whisper.events.on('online', () => {
log.info('background: online');
strictAssert(
messageReceiver !== undefined,
'MessageReceiver not initialized'
);
messageReceiver.reset();
// The first call to connect should be done via start(), ensuring that the app is
// ready first
if (!remotelyExpired && connectCount > 0) {
drop(connect());
}
});
window.Whisper.events.on('offline', () => {
log.info('background: offline');
drop(challengeHandler?.onOffline());
drop(AttachmentDownloads.stop());
drop(messageReceiver?.drain());
if (connectCount === 0) {
log.info('background: offline, never connected, showing inbox');
drop(onEmpty()); // this ensures that the loading screen is dismissed
// Switch to inbox view even if contact sync is still running
if (window.reduxStore.getState().app.appView === AppViewType.Installer) {
log.info('background: offline, opening inbox');
window.reduxActions.app.openInbox();
}
}
});
let connectCount = 0;
let connecting = false;
let remotelyExpired = false;
@ -1675,12 +1686,6 @@ export async function startApp(): Promise<void> {
void window.Signal.Services.initializeGroupCredentialFetcher();
strictAssert(
messageReceiver !== undefined,
'MessageReceiver not initialized'
);
server.registerRequestHandler(messageReceiver);
drop(
AttachmentDownloads.start({
logger: log,
@ -1690,7 +1695,7 @@ export async function startApp(): Promise<void> {
if (connectCount === 1) {
Stickers.downloadQueuedPacks();
if (!newVersion) {
void runStorageService();
drop(runStorageService());
}
}

View file

@ -305,6 +305,8 @@ export default class MessageReceiver
private pniIdentityKeyCheckRequired?: boolean;
private isAppReadyForProcessing: boolean = false;
constructor({ server, storage, serverTrustRoot }: MessageReceiverOptions) {
super();
@ -352,6 +354,15 @@ export default class MessageReceiver
maxSize: 30,
processBatch: this.cacheRemoveBatch.bind(this),
});
window.Whisper.events.on('app-ready-for-processing', () => {
this.isAppReadyForProcessing = true;
this.reset();
});
window.Whisper.events.on('online', () => {
this.reset();
});
}
public getAndResetProcessedCount(): number {
@ -467,9 +478,22 @@ export default class MessageReceiver
}
public reset(): void {
// We always process our cache before processing a new websocket message
drop(
this.incomingQueue.add(
log.info('MessageReceiver.reset');
this.count = 0;
this.isEmptied = false;
this.stoppingProcessing = false;
if (!this.isAppReadyForProcessing) {
log.info('MessageReceiver.reset: not ready yet, returning early');
return;
}
drop(this.addCachedMessagesToQueue());
}
private addCachedMessagesToQueue(): Promise<void> {
log.info('MessageReceiver.addCachedMessagesToQueue');
return this.incomingQueue.add(
createTaskWithTimeout(
async () => this.queueAllCached(),
'incomingQueue/queueAllCached',
@ -477,17 +501,13 @@ export default class MessageReceiver
timeout: 10 * durations.MINUTE,
}
)
)
);
this.count = 0;
this.isEmptied = false;
this.stoppingProcessing = false;
}
public stopProcessing(): void {
log.info('MessageReceiver.stopProcessing');
this.stoppingProcessing = true;
this.isAppReadyForProcessing = false;
}
public hasEmptied(): boolean {

View file

@ -499,8 +499,8 @@ export class SocketManager extends EventListener {
this.credentials = undefined;
}
public get isOnline(): boolean {
return this.privIsOnline !== false;
public get isOnline(): boolean | undefined {
return this.privIsOnline;
}
//

View file

@ -1157,7 +1157,7 @@ export type WebAPIType = {
unregisterRequestHandler: (handler: IRequestHandler) => void;
onHasStoriesDisabledChange: (newValue: boolean) => void;
checkSockets: () => void;
isOnline: () => boolean;
isOnline: () => boolean | undefined;
onNavigatorOnline: () => Promise<void>;
onNavigatorOffline: () => Promise<void>;
onRemoteExpiration: () => Promise<void>;
@ -1637,7 +1637,7 @@ export function initialize({
void socketManager.check();
}
function isOnline(): boolean {
function isOnline(): boolean | undefined {
return socketManager.isOnline;
}

View file

@ -26,6 +26,7 @@ import { SignalService as Proto } from '../protobuf';
import * as log from '../logging/log';
import type { StickersStateType } from '../state/ducks/stickers';
import { MINUTE } from '../util/durations';
import { drop } from '../util/drop';
export type StickerType = {
packId: string;
@ -173,6 +174,7 @@ export function getInstalledStickerPacks(): Array<StickerPackType> {
}
export function downloadQueuedPacks(): void {
log.info('downloadQueuedPacks');
strictAssert(packsToDownload, 'Stickers not initialized');
const ids = Object.keys(packsToDownload);
@ -180,10 +182,12 @@ export function downloadQueuedPacks(): void {
const { key, status } = packsToDownload[id];
// The queuing is done inside this function, no need to await here
void downloadStickerPack(id, key, {
drop(
downloadStickerPack(id, key, {
finalStatus: status,
suppressError: true,
});
})
);
}
packsToDownload = {};

View file

@ -4,7 +4,7 @@
import { clearTimeoutIfNecessary } from './clearTimeoutIfNecessary';
export type WaitForOnlineOptionsType = Readonly<{
server?: Readonly<{ isOnline: () => boolean }>;
server?: Readonly<{ isOnline: () => boolean | undefined }>;
events?: {
on: (event: 'online', fn: () => void) => void;
off: (event: 'online', fn: () => void) => void;