Faster backup import for large conversation count

This commit is contained in:
Fedor Indutny 2024-10-29 13:18:32 -07:00 committed by GitHub
parent ce090a8a3c
commit 6ca3719625
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -114,7 +114,6 @@ import { hasAttachmentDownloads } from '../../util/hasAttachmentDownloads';
const MAX_CONCURRENCY = 10; const MAX_CONCURRENCY = 10;
const CONVERSATION_OP_BATCH_SIZE = 10000;
const SAVE_MESSAGE_BATCH_SIZE = 10000; const SAVE_MESSAGE_BATCH_SIZE = 10000;
// Keep 1000 recent messages in memory to speed up quote lookup. // Keep 1000 recent messages in memory to speed up quote lookup.
@ -204,9 +203,9 @@ export class BackupImportStream extends Writable {
number, number,
ConversationAttributesType ConversationAttributesType
>(); >();
private readonly conversationOpBatch = new Map< private readonly conversations = new Map<
ConversationAttributesType, string,
'save' | 'update' ConversationAttributesType
>(); >();
private readonly saveMessageBatch = new Set<MessageAttributesType>(); private readonly saveMessageBatch = new Set<MessageAttributesType>();
private readonly stickerPacks = new Array<StickerPackPointerType>(); private readonly stickerPacks = new Array<StickerPackPointerType>();
@ -439,22 +438,13 @@ export class BackupImportStream extends Writable {
private async saveConversation( private async saveConversation(
attributes: ConversationAttributesType attributes: ConversationAttributesType
): Promise<void> { ): Promise<void> {
this.conversationOpBatch.set(attributes, 'save'); this.conversations.set(attributes.id, attributes);
if (this.conversationOpBatch.size >= CONVERSATION_OP_BATCH_SIZE) {
return this.flushConversations();
}
} }
private async updateConversation( private async updateConversation(
attributes: ConversationAttributesType attributes: ConversationAttributesType
): Promise<void> { ): Promise<void> {
if (!this.conversationOpBatch.has(attributes)) { this.conversations.set(attributes.id, attributes);
this.conversationOpBatch.set(attributes, 'update');
}
if (this.conversationOpBatch.size >= CONVERSATION_OP_BATCH_SIZE) {
return this.flushConversations();
}
} }
private async saveMessage(attributes: MessageAttributesType): Promise<void> { private async saveMessage(attributes: MessageAttributesType): Promise<void> {
@ -466,25 +456,23 @@ export class BackupImportStream extends Writable {
} }
private async flushConversations(): Promise<void> { private async flushConversations(): Promise<void> {
const saves = new Array<ConversationAttributesType>();
const updates = new Array<ConversationAttributesType>(); const updates = new Array<ConversationAttributesType>();
for (const [conversation, op] of this.conversationOpBatch) {
if (op === 'save') { if (this.ourConversation) {
saves.push(conversation); const us = this.conversations.get(this.ourConversation.id);
} else { if (us) {
updates.push(conversation); updates.push(us);
this.conversations.delete(us.id);
} }
} }
this.conversationOpBatch.clear();
const saves = Array.from(this.conversations.values());
this.conversations.clear();
// Queue writes at the same time to prevent races. // Queue writes at the same time to prevent races.
await Promise.all([ await Promise.all([
saves.length > 0 DataWriter.saveConversations(saves),
? DataWriter.saveConversations(saves) DataWriter.updateConversations(updates),
: Promise.resolve(),
updates.length > 0
? DataWriter.updateConversations(updates)
: Promise.resolve(),
]); ]);
} }