Use streams to download attachments directly to disk

Co-authored-by: trevor-signal <131492920+trevor-signal@users.noreply.github.com>
This commit is contained in:
Scott Nonnenberg 2023-10-30 09:24:28 -07:00 committed by GitHub
parent 2da49456c6
commit 99b2bc304e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
48 changed files with 2297 additions and 356 deletions

View file

@ -6,6 +6,8 @@
import { isBoolean, isNumber, isString, omit } from 'lodash';
import PQueue from 'p-queue';
import { v4 as getGuid } from 'uuid';
import { existsSync } from 'fs';
import { removeSync } from 'fs-extra';
import type {
SealedSenderDecryptionResult,
@ -49,7 +51,7 @@ import { parseIntOrThrow } from '../util/parseIntOrThrow';
import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary';
import { Zone } from '../util/Zone';
import { DurationInSeconds, SECOND } from '../util/durations';
import type { DownloadedAttachmentType } from '../types/Attachment';
import type { AttachmentType } from '../types/Attachment';
import { Address } from '../types/Address';
import { QualifiedAddress } from '../types/QualifiedAddress';
import { normalizeStoryDistributionId } from '../types/StoryDistributionId';
@ -81,9 +83,10 @@ import {
import { processSyncMessage } from './processSyncMessage';
import type { EventHandler } from './EventTarget';
import EventTarget from './EventTarget';
import { downloadAttachment } from './downloadAttachment';
import { downloadAttachmentV2 } from './downloadAttachment';
import type { IncomingWebSocketRequest } from './WebsocketResources';
import { ContactBuffer } from './ContactsParser';
import type { ContactDetailsWithAvatar } from './ContactsParser';
import { parseContactsV2 } from './ContactsParser';
import type { WebAPIType } from './WebAPI';
import type { Storage } from './Storage';
import { WarnOnlyError } from './Errors';
@ -3504,11 +3507,11 @@ export default class MessageReceiver
private async handleContacts(
envelope: ProcessedEnvelope,
contacts: Proto.SyncMessage.IContacts
contactSyncProto: Proto.SyncMessage.IContacts
): Promise<void> {
const logId = getEnvelopeId(envelope);
log.info(`MessageReceiver: handleContacts ${logId}`);
const { blob } = contacts;
const { blob } = contactSyncProto;
if (!blob) {
throw new Error('MessageReceiver.handleContacts: blob field was missing');
}
@ -3517,21 +3520,50 @@ export default class MessageReceiver
this.removeFromCache(envelope);
const attachmentPointer = await this.handleAttachment(blob, {
disableRetries: true,
timeout: 90 * SECOND,
});
const contactBuffer = new ContactBuffer(attachmentPointer.data);
let attachment: AttachmentType | undefined;
try {
attachment = await this.handleAttachmentV2(blob, {
disableRetries: true,
timeout: 90 * SECOND,
});
const contactSync = new ContactSyncEvent(
Array.from(contactBuffer),
Boolean(contacts.complete),
envelope.receivedAtCounter,
envelope.timestamp
);
await this.dispatchAndWait(logId, contactSync);
const { path } = attachment;
if (!path) {
throw new Error('Failed no path field in returned attachment');
}
const absolutePath =
window.Signal.Migrations.getAbsoluteAttachmentPath(path);
if (!existsSync(absolutePath)) {
throw new Error(
'Contact sync attachment had path, but it was not found on disk'
);
}
log.info('handleContacts: finished');
let contacts: ReadonlyArray<ContactDetailsWithAvatar>;
try {
contacts = await parseContactsV2({
absolutePath,
});
} finally {
if (absolutePath) {
removeSync(absolutePath);
}
}
const contactSync = new ContactSyncEvent(
contacts,
Boolean(contactSyncProto.complete),
envelope.receivedAtCounter,
envelope.timestamp
);
await this.dispatchAndWait(logId, contactSync);
log.info('handleContacts: finished');
} finally {
if (attachment?.path) {
await window.Signal.Migrations.deleteAttachmentData(attachment.path);
}
}
}
private async handleBlocked(
@ -3618,12 +3650,12 @@ export default class MessageReceiver
return this.storage.blocked.isGroupBlocked(groupId);
}
private async handleAttachment(
private async handleAttachmentV2(
attachment: Proto.IAttachmentPointer,
options?: { timeout?: number; disableRetries?: boolean }
): Promise<DownloadedAttachmentType> {
): Promise<AttachmentType> {
const cleaned = processAttachment(attachment);
return downloadAttachment(this.server, cleaned, options);
return downloadAttachmentV2(this.server, cleaned, options);
}
private async handleEndSession(