Migrate sessions table to BLOB column

Co-authored-by: Scott Nonnenberg <scott@signal.org>
This commit is contained in:
Fedor Indutny 2024-10-02 17:23:00 -07:00 committed by GitHub
parent a4d8ba4899
commit 091580825a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 582 additions and 275 deletions

View file

@ -2,7 +2,7 @@
// SPDX-License-Identifier: AGPL-3.0-only
import PQueue from 'p-queue';
import { isNumber, omit } from 'lodash';
import { omit } from 'lodash';
import { z } from 'zod';
import { EventEmitter } from 'events';
@ -27,10 +27,6 @@ import { isNotNil } from './util/isNotNil';
import { drop } from './util/drop';
import { Zone } from './util/Zone';
import { isMoreRecentThan } from './util/timestamp';
import {
sessionRecordToProtobuf,
sessionStructureToBytes,
} from './util/sessionTranslation';
import type {
DeviceType,
IdentityKeyType,
@ -182,7 +178,7 @@ async function _fillCaches<ID, T extends HasIdType<ID>, HydratedType>(
}
export function hydrateSession(session: SessionType): SessionRecord {
return SessionRecord.deserialize(Buffer.from(session.record, 'base64'));
return SessionRecord.deserialize(Buffer.from(session.record));
}
export function hydratePublicKey(identityKey: IdentityKeyType): PublicKey {
return PublicKey.deserialize(Buffer.from(identityKey.publicKey));
@ -209,9 +205,6 @@ export function hydrateSignedPreKey(
);
}
export function freezeSession(session: SessionRecord): string {
return session.serialize().toString('base64');
}
export function freezePublicKey(publicKey: PublicKey): Uint8Array {
return publicKey.serialize();
}
@ -1333,9 +1326,14 @@ export class SignalProtocolStore extends EventEmitter {
return entry.item;
}
// We'll either just hydrate the item or we'll fully migrate the session
// and save it to the database.
return await this._maybeMigrateSession(entry.fromDB, { zone });
const newItem = {
hydrated: true,
item: hydrateSession(entry.fromDB),
fromDB: entry.fromDB,
};
map.set(id, newItem);
return newItem.item;
} catch (error) {
const errorString = Errors.toLogFormat(error);
log.error(`loadSession: failed to load session ${id}: ${errorString}`);
@ -1359,68 +1357,6 @@ export class SignalProtocolStore extends EventEmitter {
});
}
private async _maybeMigrateSession(
session: SessionType,
{ zone = GLOBAL_ZONE }: SessionTransactionOptions = {}
): Promise<SessionRecord> {
if (!this.sessions) {
throw new Error('_maybeMigrateSession: this.sessions not yet cached!');
}
// Already migrated, hydrate and update cache
if (session.version === 2) {
const item = hydrateSession(session);
const map = this.pendingSessions.has(session.id)
? this.pendingSessions
: this.sessions;
map.set(session.id, {
hydrated: true,
item,
fromDB: session,
});
return item;
}
// Not yet converted, need to translate to new format and save
if (session.version !== undefined) {
throw new Error('_maybeMigrateSession: Unknown session version type!');
}
const { ourServiceId } = session;
const keyPair = this.getIdentityKeyPair(ourServiceId);
if (!keyPair) {
throw new Error('_maybeMigrateSession: No identity key for ourself!');
}
const localRegistrationId = await this.getLocalRegistrationId(ourServiceId);
if (!isNumber(localRegistrationId)) {
throw new Error('_maybeMigrateSession: No registration id for ourself!');
}
const localUserData = {
identityKeyPublic: keyPair.pubKey,
registrationId: localRegistrationId,
};
log.info(`_maybeMigrateSession: Migrating session with id ${session.id}`);
const sessionProto = sessionRecordToProtobuf(
JSON.parse(session.record),
localUserData
);
const record = SessionRecord.deserialize(
Buffer.from(sessionStructureToBytes(sessionProto))
);
await this.storeSession(QualifiedAddress.parse(session.id), record, {
zone,
});
return record;
}
async storeSession(
qualifiedAddress: QualifiedAddress,
record: SessionRecord,
@ -1454,7 +1390,7 @@ export class SignalProtocolStore extends EventEmitter {
conversationId: conversation.id,
serviceId,
deviceId,
record: record.serialize().toString('base64'),
record: record.serialize(),
};
const newSession = {
@ -1533,9 +1469,7 @@ export class SignalProtocolStore extends EventEmitter {
return undefined;
}
const record = await this._maybeMigrateSession(entry.fromDB, {
zone,
});
const record = hydrateSession(entry.fromDB);
if (record.hasCurrentState()) {
return { record, entry };
}
@ -1688,9 +1622,7 @@ export class SignalProtocolStore extends EventEmitter {
addr,
`_archiveSession(${addr.toString()})`,
async () => {
const item = entry.hydrated
? entry.item
: await this._maybeMigrateSession(entry.fromDB, { zone });
const item = entry.hydrated ? entry.item : hydrateSession(entry.fromDB);
if (!item.hasCurrentState()) {
return;