Delay storage service sync until empty

Co-authored-by: trevor-signal <131492920+trevor-signal@users.noreply.github.com>
This commit is contained in:
Fedor Indutny 2025-05-30 07:39:39 -07:00 committed by GitHub
commit 8be2e8e527
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 104 additions and 42 deletions

View file

@ -425,7 +425,7 @@ export async function startApp(): Promise<void> {
accountManager = new window.textsecure.AccountManager(server); accountManager = new window.textsecure.AccountManager(server);
accountManager.addEventListener('startRegistration', () => { accountManager.addEventListener('startRegistration', () => {
pauseProcessing(); pauseProcessing('startRegistration');
// We should already be logged out, but this ensures that the next time we connect // We should already be logged out, but this ensures that the next time we connect
// to the auth socket it is from newly-registered credentials // to the auth socket it is from newly-registered credentials
drop(server?.logout()); drop(server?.logout());
@ -771,7 +771,7 @@ export async function startApp(): Promise<void> {
'WebAPI should be initialized together with MessageReceiver' 'WebAPI should be initialized together with MessageReceiver'
); );
log.info('background/shutdown: shutting down messageReceiver'); log.info('background/shutdown: shutting down messageReceiver');
pauseProcessing(); pauseProcessing('shutdown');
await window.waitForAllBatchers(); await window.waitForAllBatchers();
} }
@ -1182,14 +1182,14 @@ export async function startApp(): Promise<void> {
log.info('Storage fetch'); log.info('Storage fetch');
drop(window.storage.fetch()); drop(window.storage.fetch());
function pauseProcessing() { function pauseProcessing(reason: string) {
strictAssert(server != null, 'WebAPI not initialized'); strictAssert(server != null, 'WebAPI not initialized');
strictAssert( strictAssert(
messageReceiver != null, messageReceiver != null,
'messageReceiver must be initialized' 'messageReceiver must be initialized'
); );
StorageService.disableStorageService(); StorageService.disableStorageService(reason);
server.unregisterRequestHandler(messageReceiver); server.unregisterRequestHandler(messageReceiver);
messageReceiver.stopProcessing(); messageReceiver.stopProcessing();
} }
@ -1447,14 +1447,19 @@ export async function startApp(): Promise<void> {
remotelyExpired = true; remotelyExpired = true;
}); });
async function runStorageService({ reason }: { reason: string }) { async function enableStorageService({ andSync }: { andSync?: string } = {}) {
log.info('enableStorageService: waiting for backupReady');
await backupReady.promise; await backupReady.promise;
log.info('enableStorageService: enabling and running');
StorageService.enableStorageService(); StorageService.enableStorageService();
StorageService.runStorageServiceSyncJob({
reason: `runStorageService/${reason}`, if (andSync != null) {
await StorageService.runStorageServiceSyncJob({
reason: andSync,
}); });
} }
}
async function start() { async function start() {
// Storage is ready because `start()` is called from `storage.onready()` // Storage is ready because `start()` is called from `storage.onready()`
@ -1795,21 +1800,28 @@ export async function startApp(): Promise<void> {
wasBackupImported, wasBackupImported,
}); });
// 5. Kickoff storage service sync // 5. Start processing messages from websocket and clear
// `messageReceiver.#isEmptied`.
log.info(`${logId}: enabling message processing`);
messageReceiver.startProcessingQueue();
server.registerRequestHandler(messageReceiver);
// 6. Kickoff storage service sync
if (isFirstAuthSocketConnect || !postRegistrationSyncsComplete) { if (isFirstAuthSocketConnect || !postRegistrationSyncsComplete) {
log.info(`${logId}: triggering storage service sync`); log.info(`${logId}: triggering storage service sync`);
storageServiceSyncComplete = waitForEvent( storageServiceSyncComplete = waitForEvent(
'storageService:syncComplete' 'storageService:syncComplete'
); );
drop(runStorageService({ reason: 'afterFirstAuthSocketConnect' })); drop(
enableStorageService({
andSync: 'afterFirstAuthSocketConnect',
})
);
} else {
drop(enableStorageService());
} }
// 6. Start processing messages from websocket
log.info(`${logId}: enabling message processing`);
messageReceiver.startProcessingQueue();
server.registerRequestHandler(messageReceiver);
// 7. Wait for critical post-registration syncs before showing inbox // 7. Wait for critical post-registration syncs before showing inbox
if (!postRegistrationSyncsComplete) { if (!postRegistrationSyncsComplete) {
const syncsToAwaitBeforeShowingInbox = [contactSyncComplete]; const syncsToAwaitBeforeShowingInbox = [contactSyncComplete];
@ -2077,34 +2089,64 @@ export async function startApp(): Promise<void> {
return; return;
} }
const waitStart = Date.now();
if (!messageReceiver.hasEmptied()) { if (!messageReceiver.hasEmptied()) {
log.info( log.info(
'waitForEmptyEventQueue: Waiting for MessageReceiver empty event...' 'waitForEmptyEventQueue: Waiting for MessageReceiver empty event...'
); );
const { resolve, reject, promise } = explodePromise<void>(); const { resolve, reject, promise } = explodePromise<void>();
const timeout = Timers.setTimeout(() => { const cleanup = () => {
reject(new Error('Empty queue never fired')); messageReceiver?.removeEventListener('empty', onEmptyOnce);
}, FIVE_MINUTES); messageReceiver?.removeEventListener('envelopeQueued', onResetTimer);
const onEmptyOnce = () => { if (timeout !== undefined) {
if (messageReceiver) {
messageReceiver.removeEventListener('empty', onEmptyOnce);
}
Timers.clearTimeout(timeout); Timers.clearTimeout(timeout);
if (resolve) { timeout = undefined;
resolve();
} }
}; };
// Reject after 1 minutes of inactivity.
const onTimeout = () => {
cleanup();
reject(new Error('Empty queue never fired'));
};
let timeout: Timers.Timeout | undefined = Timers.setTimeout(
onTimeout,
durations.MINUTE
);
const onEmptyOnce = () => {
cleanup();
resolve();
};
messageReceiver.addEventListener('empty', onEmptyOnce); messageReceiver.addEventListener('empty', onEmptyOnce);
const onResetTimer = () => {
if (timeout !== undefined) {
Timers.clearTimeout(timeout);
}
timeout = Timers.setTimeout(onTimeout, durations.MINUTE);
};
messageReceiver.addEventListener('envelopeQueued', onResetTimer);
await promise; await promise;
} }
log.info('waitForEmptyEventQueue: Waiting for event handler queue idle...'); if (eventHandlerQueue.pending !== 0 || eventHandlerQueue.size !== 0) {
log.info(
'waitForEmptyEventQueue: Waiting for event handler queue idle...'
);
await eventHandlerQueue.onIdle(); await eventHandlerQueue.onIdle();
} }
const duration = Date.now() - waitStart;
if (duration > SECOND) {
log.info(`waitForEmptyEventQueue: resolving after ${duration}ms`);
}
}
window.waitForEmptyEventQueue = waitForEmptyEventQueue; window.waitForEmptyEventQueue = waitForEmptyEventQueue;
async function onEmpty({ async function onEmpty({
@ -3114,7 +3156,7 @@ export async function startApp(): Promise<void> {
log.info('unlinkAndDisconnect: logging out'); log.info('unlinkAndDisconnect: logging out');
strictAssert(server !== undefined, 'WebAPI not initialized'); strictAssert(server !== undefined, 'WebAPI not initialized');
pauseProcessing(); pauseProcessing('unlinkAndDisconnect');
backupReady.reject(new Error('Aborted')); backupReady.reject(new Error('Aborted'));
backupReady = explodePromise(); backupReady = explodePromise();

View file

@ -2062,6 +2062,8 @@ async function sync({
strictAssert(manifest.version != null, 'Manifest without version'); strictAssert(manifest.version != null, 'Manifest without version');
const version = manifest.version?.toNumber() ?? 0; const version = manifest.version?.toNumber() ?? 0;
await window.waitForEmptyEventQueue();
log.info( log.info(
`storageService.sync: updating to remoteVersion=${version} ` + `storageService.sync: updating to remoteVersion=${version} ` +
`sourceDevice=${manifest.sourceDevice ?? '?'} from ` + `sourceDevice=${manifest.sourceDevice ?? '?'} from ` +
@ -2193,10 +2195,20 @@ async function upload({
let storageServiceEnabled = false; let storageServiceEnabled = false;
export function enableStorageService(): void { export function enableStorageService(): void {
storageServiceEnabled = true; if (storageServiceEnabled) {
return;
} }
export function disableStorageService(): void { storageServiceEnabled = true;
log.info('storageService.enableStorageService');
}
export function disableStorageService(reason: string): void {
if (!storageServiceEnabled) {
return;
}
log.info(`storageService.disableStorageService: ${reason}`);
storageServiceEnabled = false; storageServiceEnabled = false;
} }
@ -2316,7 +2328,8 @@ export const runStorageServiceSyncJob = debounce(
({ reason }: { reason: string }) => { ({ reason }: { reason: string }) => {
if (!storageServiceEnabled) { if (!storageServiceEnabled) {
log.info( log.info(
'storageService.runStorageServiceSyncJob: called before enabled' `storageService.runStorageServiceSyncJob(${reason}): ` +
'called before enabled'
); );
return; return;
} }

View file

@ -292,7 +292,6 @@ export default class MessageReceiver
#appQueue: PQueue; #appQueue: PQueue;
#decryptAndCacheBatcher: BatcherType<CacheAddItemType>; #decryptAndCacheBatcher: BatcherType<CacheAddItemType>;
#cacheRemoveBatcher: BatcherType<string>; #cacheRemoveBatcher: BatcherType<string>;
#count: number;
#processedCount: number; #processedCount: number;
#incomingQueue: PQueue; #incomingQueue: PQueue;
#isEmptied?: boolean; #isEmptied?: boolean;
@ -308,7 +307,6 @@ export default class MessageReceiver
this.#storage = storage; this.#storage = storage;
this.#count = 0;
this.#processedCount = 0; this.#processedCount = 0;
if (!serverTrustRoot) { if (!serverTrustRoot) {
@ -471,10 +469,12 @@ export default class MessageReceiver
); );
} }
public handleDisconnect(): void {
this.#isEmptied = false;
}
public startProcessingQueue(): void { public startProcessingQueue(): void {
log.info('MessageReceiver.startProcessingQueue'); log.info('MessageReceiver.startProcessingQueue');
this.#count = 0;
this.#isEmptied = false;
this.#stoppingProcessing = false; this.#stoppingProcessing = false;
drop(this.#addCachedMessagesToQueue()); drop(this.#addCachedMessagesToQueue());
@ -731,10 +731,6 @@ export default class MessageReceiver
id: string, id: string,
taskType: TaskType taskType: TaskType
): Promise<T> { ): Promise<T> {
if (taskType === TaskType.Encrypted) {
this.#count += 1;
}
const queue = const queue =
taskType === TaskType.Encrypted taskType === TaskType.Encrypted
? this.#encryptedQueue ? this.#encryptedQueue
@ -796,10 +792,6 @@ export default class MessageReceiver
}; };
const waitForIncomingQueue = async () => { const waitForIncomingQueue = async () => {
// Note: this.count is used in addToQueue
// Resetting count so everything from the websocket after this starts at zero
this.#count = 0;
drop( drop(
this.#addToQueue( this.#addToQueue(
waitForEncryptedQueue, waitForEncryptedQueue,

View file

@ -396,6 +396,9 @@ export class Provisioner {
resource.close(); resource.close();
} }
}, },
handleDisconnect() {
// No-op
},
}, },
timeout timeout
); );

View file

@ -879,6 +879,17 @@ export class SocketManager extends EventListener {
this.#incomingRequestQueue = []; this.#incomingRequestQueue = [];
this.#authenticated = undefined; this.#authenticated = undefined;
this.#setAuthenticatedStatus({ status: SocketStatus.CLOSED }); this.#setAuthenticatedStatus({ status: SocketStatus.CLOSED });
for (const handlers of this.#requestHandlers) {
try {
handlers.handleDisconnect();
} catch (error) {
log.warn(
'SocketManager: got exception while handling disconnect, ' +
`error: ${Errors.toLogFormat(error)}`
);
}
}
} }
#dropUnauthenticated(process: AbortableProcess<IWebSocketResource>): void { #dropUnauthenticated(process: AbortableProcess<IWebSocketResource>): void {

View file

@ -289,6 +289,7 @@ export type CallbackResultType = {
export type IRequestHandler = { export type IRequestHandler = {
handleRequest(request: IncomingWebSocketRequest): void; handleRequest(request: IncomingWebSocketRequest): void;
handleDisconnect(): void;
}; };
export type PniKeyMaterialType = Readonly<{ export type PniKeyMaterialType = Readonly<{