Include sender keys in SignalProtocolStore zones
This commit is contained in:
parent
a17e157e7b
commit
06165cb742
10 changed files with 231 additions and 108 deletions
|
@ -191,6 +191,7 @@ const EventsMixin = function EventsMixin(this: unknown) {
|
|||
} as any as typeof window.Backbone.EventsMixin;
|
||||
|
||||
type SessionCacheEntry = CacheEntryType<SessionType, SessionRecord>;
|
||||
type SenderKeyCacheEntry = CacheEntryType<SenderKeyType, SenderKeyRecord>;
|
||||
|
||||
type ZoneQueueEntryType = Readonly<{
|
||||
zone: Zone;
|
||||
|
@ -213,10 +214,7 @@ export class SignalProtocolStore extends EventsMixin {
|
|||
CacheEntryType<IdentityKeyType, PublicKey>
|
||||
>;
|
||||
|
||||
senderKeys?: Map<
|
||||
SenderKeyIdType,
|
||||
CacheEntryType<SenderKeyType, SenderKeyRecord>
|
||||
>;
|
||||
senderKeys?: Map<SenderKeyIdType, SenderKeyCacheEntry>;
|
||||
|
||||
sessions?: Map<SessionIdType, SessionCacheEntry>;
|
||||
|
||||
|
@ -239,6 +237,8 @@ export class SignalProtocolStore extends EventsMixin {
|
|||
|
||||
private pendingSessions = new Map<SessionIdType, SessionCacheEntry>();
|
||||
|
||||
private pendingSenderKeys = new Map<SenderKeyIdType, SenderKeyCacheEntry>();
|
||||
|
||||
private pendingUnprocessed = new Map<string, UnprocessedType>();
|
||||
|
||||
async hydrateCaches(): Promise<void> {
|
||||
|
@ -501,7 +501,21 @@ export class SignalProtocolStore extends EventsMixin {
|
|||
await window.Signal.Data.removeAllSignedPreKeys();
|
||||
}
|
||||
|
||||
// Sender Key Queue
|
||||
// Sender Key
|
||||
|
||||
// Re-entrant sender key transaction routine. Only one sender key transaction could
|
||||
// be running at the same time.
|
||||
//
|
||||
// While in transaction:
|
||||
//
|
||||
// - `saveSenderKey()` adds the updated session to the `pendingSenderKeys`
|
||||
// - `getSenderKey()` looks up the session first in `pendingSenderKeys` and only
|
||||
// then in the main `senderKeys` store
|
||||
//
|
||||
// When transaction ends:
|
||||
//
|
||||
// - successfully: pending sender key stores are batched into the database
|
||||
// - with an error: pending sender key stores are reverted
|
||||
|
||||
async enqueueSenderKeyJob<T>(
|
||||
qualifiedAddress: QualifiedAddress,
|
||||
|
@ -534,8 +548,6 @@ export class SignalProtocolStore extends EventsMixin {
|
|||
return freshQueue;
|
||||
}
|
||||
|
||||
// Sender Keys
|
||||
|
||||
private getSenderKeyId(
|
||||
senderKeyId: QualifiedAddress,
|
||||
distributionId: string
|
||||
|
@ -546,79 +558,94 @@ export class SignalProtocolStore extends EventsMixin {
|
|||
async saveSenderKey(
|
||||
qualifiedAddress: QualifiedAddress,
|
||||
distributionId: string,
|
||||
record: SenderKeyRecord
|
||||
record: SenderKeyRecord,
|
||||
{ zone = GLOBAL_ZONE }: SessionTransactionOptions = {}
|
||||
): Promise<void> {
|
||||
if (!this.senderKeys) {
|
||||
throw new Error('saveSenderKey: this.senderKeys not yet cached!');
|
||||
}
|
||||
await this.withZone(zone, 'saveSenderKey', async () => {
|
||||
if (!this.senderKeys) {
|
||||
throw new Error('saveSenderKey: this.senderKeys not yet cached!');
|
||||
}
|
||||
|
||||
const senderId = qualifiedAddress.toString();
|
||||
const senderId = qualifiedAddress.toString();
|
||||
|
||||
try {
|
||||
const id = this.getSenderKeyId(qualifiedAddress, distributionId);
|
||||
try {
|
||||
const id = this.getSenderKeyId(qualifiedAddress, distributionId);
|
||||
|
||||
const fromDB: SenderKeyType = {
|
||||
id,
|
||||
senderId,
|
||||
distributionId,
|
||||
data: record.serialize(),
|
||||
lastUpdatedDate: Date.now(),
|
||||
};
|
||||
const fromDB: SenderKeyType = {
|
||||
id,
|
||||
senderId,
|
||||
distributionId,
|
||||
data: record.serialize(),
|
||||
lastUpdatedDate: Date.now(),
|
||||
};
|
||||
|
||||
await window.Signal.Data.createOrUpdateSenderKey(fromDB);
|
||||
this.pendingSenderKeys.set(id, {
|
||||
hydrated: true,
|
||||
fromDB,
|
||||
item: record,
|
||||
});
|
||||
|
||||
this.senderKeys.set(id, {
|
||||
hydrated: true,
|
||||
fromDB,
|
||||
item: record,
|
||||
});
|
||||
} catch (error) {
|
||||
const errorString = error && error.stack ? error.stack : error;
|
||||
log.error(
|
||||
`saveSenderKey: failed to save senderKey ${senderId}/${distributionId}: ${errorString}`
|
||||
);
|
||||
}
|
||||
// Current zone doesn't support pending sessions - commit immediately
|
||||
if (!zone.supportsPendingSenderKeys()) {
|
||||
await this.commitZoneChanges('saveSenderKey');
|
||||
}
|
||||
} catch (error) {
|
||||
const errorString = error && error.stack ? error.stack : error;
|
||||
log.error(
|
||||
`saveSenderKey: failed to save senderKey ${senderId}/${distributionId}: ${errorString}`
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async getSenderKey(
|
||||
qualifiedAddress: QualifiedAddress,
|
||||
distributionId: string
|
||||
distributionId: string,
|
||||
{ zone = GLOBAL_ZONE }: SessionTransactionOptions = {}
|
||||
): Promise<SenderKeyRecord | undefined> {
|
||||
if (!this.senderKeys) {
|
||||
throw new Error('getSenderKey: this.senderKeys not yet cached!');
|
||||
}
|
||||
return this.withZone(zone, 'getSenderKey', async () => {
|
||||
if (!this.senderKeys) {
|
||||
throw new Error('getSenderKey: this.senderKeys not yet cached!');
|
||||
}
|
||||
|
||||
const senderId = qualifiedAddress.toString();
|
||||
const senderId = qualifiedAddress.toString();
|
||||
|
||||
try {
|
||||
const id = this.getSenderKeyId(qualifiedAddress, distributionId);
|
||||
try {
|
||||
const id = this.getSenderKeyId(qualifiedAddress, distributionId);
|
||||
|
||||
const entry = this.senderKeys.get(id);
|
||||
if (!entry) {
|
||||
log.error('Failed to fetch sender key:', id);
|
||||
const map = this.pendingSenderKeys.has(id)
|
||||
? this.pendingSenderKeys
|
||||
: this.senderKeys;
|
||||
const entry = map.get(id);
|
||||
|
||||
if (!entry) {
|
||||
log.error('Failed to fetch sender key:', id);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
if (entry.hydrated) {
|
||||
log.info('Successfully fetched sender key (cache hit):', id);
|
||||
return entry.item;
|
||||
}
|
||||
|
||||
const item = SenderKeyRecord.deserialize(
|
||||
Buffer.from(entry.fromDB.data)
|
||||
);
|
||||
this.senderKeys.set(id, {
|
||||
hydrated: true,
|
||||
item,
|
||||
fromDB: entry.fromDB,
|
||||
});
|
||||
log.info('Successfully fetched sender key(cache miss):', id);
|
||||
return item;
|
||||
} catch (error) {
|
||||
const errorString = error && error.stack ? error.stack : error;
|
||||
log.error(
|
||||
`getSenderKey: failed to load sender key ${senderId}/${distributionId}: ${errorString}`
|
||||
);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
if (entry.hydrated) {
|
||||
log.info('Successfully fetched sender key (cache hit):', id);
|
||||
return entry.item;
|
||||
}
|
||||
|
||||
const item = SenderKeyRecord.deserialize(Buffer.from(entry.fromDB.data));
|
||||
this.senderKeys.set(id, {
|
||||
hydrated: true,
|
||||
item,
|
||||
fromDB: entry.fromDB,
|
||||
});
|
||||
log.info('Successfully fetched sender key(cache miss):', id);
|
||||
return item;
|
||||
} catch (error) {
|
||||
const errorString = error && error.stack ? error.stack : error;
|
||||
log.error(
|
||||
`getSenderKey: failed to load sender key ${senderId}/${distributionId}: ${errorString}`
|
||||
);
|
||||
return undefined;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async removeSenderKey(
|
||||
|
@ -645,11 +672,16 @@ export class SignalProtocolStore extends EventsMixin {
|
|||
}
|
||||
}
|
||||
|
||||
async clearSenderKeyStore(): Promise<void> {
|
||||
if (this.senderKeys) {
|
||||
this.senderKeys.clear();
|
||||
}
|
||||
await window.Signal.Data.removeAllSenderKeys();
|
||||
async removeAllSenderKeys(): Promise<void> {
|
||||
return this.withZone(GLOBAL_ZONE, 'removeAllSenderKeys', async () => {
|
||||
if (this.senderKeys) {
|
||||
this.senderKeys.clear();
|
||||
}
|
||||
if (this.pendingSenderKeys) {
|
||||
this.pendingSenderKeys.clear();
|
||||
}
|
||||
await window.Signal.Data.removeAllSenderKeys();
|
||||
});
|
||||
}
|
||||
|
||||
// Session Queue
|
||||
|
@ -700,6 +732,7 @@ export class SignalProtocolStore extends EventsMixin {
|
|||
//
|
||||
// - successfully: pending session stores are batched into the database
|
||||
// - with an error: pending session stores are reverted
|
||||
|
||||
public async withZone<T>(
|
||||
zone: Zone,
|
||||
name: string,
|
||||
|
@ -753,45 +786,66 @@ export class SignalProtocolStore extends EventsMixin {
|
|||
}
|
||||
|
||||
private async commitZoneChanges(name: string): Promise<void> {
|
||||
const { pendingSessions, pendingUnprocessed } = this;
|
||||
const { pendingSenderKeys, pendingSessions, pendingUnprocessed } = this;
|
||||
|
||||
if (pendingSessions.size === 0 && pendingUnprocessed.size === 0) {
|
||||
if (
|
||||
pendingSenderKeys.size === 0 &&
|
||||
pendingSessions.size === 0 &&
|
||||
pendingUnprocessed.size === 0
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.info(
|
||||
`commitZoneChanges(${name}): pending sessions ${pendingSessions.size} ` +
|
||||
`commitZoneChanges(${name}): ` +
|
||||
`pending sender keys ${pendingSenderKeys.size}, ` +
|
||||
`pending sessions ${pendingSessions.size}, ` +
|
||||
`pending unprocessed ${pendingUnprocessed.size}`
|
||||
);
|
||||
|
||||
this.pendingSenderKeys = new Map();
|
||||
this.pendingSessions = new Map();
|
||||
this.pendingUnprocessed = new Map();
|
||||
|
||||
// Commit both unprocessed and sessions in the same database transaction
|
||||
// to unroll both on error.
|
||||
await window.Signal.Data.commitSessionsAndUnprocessed({
|
||||
// Commit both sender keys, sessions and unprocessed in the same database transaction
|
||||
// to unroll both on error.
|
||||
await window.Signal.Data.commitDecryptResult({
|
||||
senderKeys: Array.from(pendingSenderKeys.values()).map(
|
||||
({ fromDB }) => fromDB
|
||||
),
|
||||
sessions: Array.from(pendingSessions.values()).map(
|
||||
({ fromDB }) => fromDB
|
||||
),
|
||||
unprocessed: Array.from(pendingUnprocessed.values()),
|
||||
});
|
||||
|
||||
const { sessions } = this;
|
||||
assert(sessions !== undefined, "Can't commit unhydrated storage");
|
||||
|
||||
// Apply changes to in-memory storage after successful DB write.
|
||||
|
||||
const { sessions } = this;
|
||||
assert(sessions !== undefined, "Can't commit unhydrated session storage");
|
||||
pendingSessions.forEach((value, key) => {
|
||||
sessions.set(key, value);
|
||||
});
|
||||
|
||||
const { senderKeys } = this;
|
||||
assert(
|
||||
senderKeys !== undefined,
|
||||
"Can't commit unhydrated sender key storage"
|
||||
);
|
||||
pendingSenderKeys.forEach((value, key) => {
|
||||
senderKeys.set(key, value);
|
||||
});
|
||||
}
|
||||
|
||||
private async revertZoneChanges(name: string, error: Error): Promise<void> {
|
||||
log.info(
|
||||
`revertZoneChanges(${name}): ` +
|
||||
`pending sessions size ${this.pendingSessions.size} ` +
|
||||
`pending sender keys size ${this.pendingSenderKeys.size}, ` +
|
||||
`pending sessions size ${this.pendingSessions.size}, ` +
|
||||
`pending unprocessed size ${this.pendingUnprocessed.size}`,
|
||||
error && error.stack
|
||||
);
|
||||
this.pendingSenderKeys.clear();
|
||||
this.pendingSessions.clear();
|
||||
this.pendingUnprocessed.clear();
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue