Use smaller batches for receipts and syncs

This commit is contained in:
Fedor Indutny 2021-07-29 18:08:04 -07:00 committed by GitHub
parent 8775c711ae
commit 03874a788f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 48 additions and 21 deletions

View file

@ -320,7 +320,7 @@ export async function startApp(): Promise<void> {
{ {
name: 'Whisper.deliveryReceiptBatcher', name: 'Whisper.deliveryReceiptBatcher',
wait: 500, wait: 500,
maxSize: 500, maxSize: 100,
processBatch: async items => { processBatch: async items => {
const byConversationId = window._.groupBy(items, item => const byConversationId = window._.groupBy(items, item =>
window.ConversationController.ensureContactIds({ window.ConversationController.ensureContactIds({

View file

@ -5,6 +5,7 @@
import * as z from 'zod'; import * as z from 'zod';
import * as moment from 'moment'; import * as moment from 'moment';
import { chunk } from 'lodash';
import { getSendOptions } from '../util/getSendOptions'; import { getSendOptions } from '../util/getSendOptions';
import { handleMessageSend } from '../util/handleMessageSend'; import { handleMessageSend } from '../util/handleMessageSend';
import { isNotNil } from '../util/isNotNil'; import { isNotNil } from '../util/isNotNil';
@ -21,6 +22,8 @@ import { parseIntWithFallback } from '../util/parseIntWithFallback';
import { JobQueue } from './JobQueue'; import { JobQueue } from './JobQueue';
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
const CHUNK_SIZE = 100;
const MAX_RETRY_TIME = moment.duration(1, 'day').asMilliseconds(); const MAX_RETRY_TIME = moment.duration(1, 'day').asMilliseconds();
const readSyncJobDataSchema = z.object({ const readSyncJobDataSchema = z.object({
@ -79,18 +82,22 @@ export class ReadSyncJobQueue extends JobQueue<ReadSyncJobData> {
await sleep(exponentialBackoffSleepTime(attempt)); await sleep(exponentialBackoffSleepTime(attempt));
const messageIds = readSyncs.map(item => item.messageId).filter(isNotNil);
const ourConversation = window.ConversationController.getOurConversationOrThrow(); const ourConversation = window.ConversationController.getOurConversationOrThrow();
const sendOptions = await getSendOptions(ourConversation.attributes, { const sendOptions = await getSendOptions(ourConversation.attributes, {
syncMessage: true, syncMessage: true,
}); });
try { try {
await handleMessageSend( await Promise.all(
window.textsecure.messaging.syncReadMessages(readSyncs, sendOptions), chunk(readSyncs, CHUNK_SIZE).map(batch => {
const messageIds = batch.map(item => item.messageId).filter(isNotNil);
return handleMessageSend(
window.textsecure.messaging.syncReadMessages(batch, sendOptions),
{ messageIds, sendType: 'readSync' } { messageIds, sendType: 'readSync' }
); );
})
);
} catch (err: unknown) { } catch (err: unknown) {
if (!(err instanceof Error)) { if (!(err instanceof Error)) {
throw err; throw err;

View file

@ -36,6 +36,8 @@ import { SignalService as Proto } from '../protobuf';
const THIRTY_SECONDS = 30 * 1000; const THIRTY_SECONDS = 30 * 1000;
const MAX_MESSAGE_SIZE = 64 * 1024;
export class IncomingWebSocketRequest { export class IncomingWebSocketRequest {
private readonly id: Long | number; private readonly id: Long | number;
@ -207,6 +209,10 @@ export default class WebSocketResource extends EventTarget {
}); });
}); });
strictAssert(
bytes.length <= MAX_MESSAGE_SIZE,
'WebSocket request byte size exceeded'
);
this.socket.sendBytes(Buffer.from(bytes)); this.socket.sendBytes(Buffer.from(bytes));
return promise; return promise;
@ -291,6 +297,10 @@ export default class WebSocketResource extends EventTarget {
(bytes: Buffer): void => { (bytes: Buffer): void => {
this.removeActive(incomingRequest); this.removeActive(incomingRequest);
strictAssert(
bytes.length <= MAX_MESSAGE_SIZE,
'WebSocket response byte size exceeded'
);
this.socket.sendBytes(bytes); this.socket.sendBytes(bytes);
} }
); );

View file

@ -1,7 +1,7 @@
// Copyright 2021 Signal Messenger, LLC // Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only // SPDX-License-Identifier: AGPL-3.0-only
import { groupBy, map } from 'lodash'; import { chunk, groupBy, map } from 'lodash';
import { ConversationAttributesType } from '../model-types.d'; import { ConversationAttributesType } from '../model-types.d';
import { getSendOptions } from './getSendOptions'; import { getSendOptions } from './getSendOptions';
import { handleMessageSend } from './handleMessageSend'; import { handleMessageSend } from './handleMessageSend';
@ -16,6 +16,8 @@ type ReceiptSpecType = {
hasErrors: boolean; hasErrors: boolean;
}; };
const CHUNK_SIZE = 100;
export async function sendReadReceiptsFor( export async function sendReadReceiptsFor(
conversationAttrs: ConversationAttributesType, conversationAttrs: ConversationAttributesType,
items: Array<ReceiptSpecType> items: Array<ReceiptSpecType>
@ -31,12 +33,19 @@ export async function sendReadReceiptsFor(
await Promise.all( await Promise.all(
map(receiptsBySender, async (receipts, senderId) => { map(receiptsBySender, async (receipts, senderId) => {
const timestamps = map(receipts, item => item.timestamp);
const messageIds = map(receipts, item => item.messageId);
const conversation = window.ConversationController.get(senderId); const conversation = window.ConversationController.get(senderId);
if (conversation) { if (!conversation) {
await handleMessageSend( return;
}
const batches = chunk(receipts, CHUNK_SIZE);
await Promise.all(
batches.map(batch => {
const timestamps = map(batch, item => item.timestamp);
const messageIds = map(batch, item => item.messageId);
return handleMessageSend(
window.textsecure.messaging.sendReadReceipts({ window.textsecure.messaging.sendReadReceipts({
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
senderE164: conversation.get('e164')!, senderE164: conversation.get('e164')!,
@ -47,7 +56,8 @@ export async function sendReadReceiptsFor(
}), }),
{ messageIds, sendType: 'readReceipt' } { messageIds, sendType: 'readReceipt' }
); );
} })
);
}) })
); );
} }