Rename locks to zones

This commit is contained in:
Fedor Indutny 2021-05-19 14:25:56 -07:00 committed by Scott Nonnenberg
parent 8f0731d498
commit 7418a5c663
6 changed files with 317 additions and 321 deletions

View file

@ -24,12 +24,9 @@ import {
} from '@signalapp/signal-client';
import { freezePreKey, freezeSignedPreKey } from './SignalProtocolStore';
import { UnprocessedType } from './textsecure/Types.d';
import { typedArrayToArrayBuffer } from './Crypto';
import { assert } from './util/assert';
import { Lock } from './util/Lock';
import { Zone } from './util/Zone';
function encodedNameFromAddress(address: ProtocolAddress): string {
const name = address.name();
@ -39,91 +36,49 @@ function encodedNameFromAddress(address: ProtocolAddress): string {
}
export type SessionsOptions = {
readonly lock?: Lock;
readonly transactionOnly?: boolean;
readonly zone?: Zone;
};
export class Sessions extends SessionStore {
private readonly lock: Lock | undefined;
private readonly zone: Zone | undefined;
private inTransaction = false;
constructor(private readonly options: SessionsOptions = {}) {
constructor(options: SessionsOptions = {}) {
super();
this.lock = options.lock;
this.zone = options.zone;
}
public async transaction<T>(fn: () => Promise<T>): Promise<T> {
assert(!this.inTransaction, 'Already in transaction');
this.inTransaction = true;
assert(this.lock, "Can't start transaction without lock");
try {
return await window.textsecure.storage.protocol.sessionTransaction(
'Sessions.transaction',
fn,
this.lock
);
} finally {
this.inTransaction = false;
}
}
public async addUnprocessed(array: Array<UnprocessedType>): Promise<void> {
await window.textsecure.storage.protocol.addMultipleUnprocessed(array, {
lock: this.lock,
});
}
// SessionStore overrides
async saveSession(
address: ProtocolAddress,
record: SessionRecord
): Promise<void> {
this.checkInTransaction();
await window.textsecure.storage.protocol.storeSession(
encodedNameFromAddress(address),
record,
{ lock: this.lock }
{ zone: this.zone }
);
}
async getSession(name: ProtocolAddress): Promise<SessionRecord | null> {
this.checkInTransaction();
const encodedName = encodedNameFromAddress(name);
const record = await window.textsecure.storage.protocol.loadSession(
encodedName,
{ lock: this.lock }
{ zone: this.zone }
);
return record || null;
}
// Private
private checkInTransaction(): void {
assert(
this.inTransaction || !this.options.transactionOnly,
'Accessing session store outside of transaction'
);
}
}
export type IdentityKeysOptions = {
readonly lock?: Lock;
readonly zone?: Zone;
};
export class IdentityKeys extends IdentityKeyStore {
private readonly lock: Lock | undefined;
private readonly zone: Zone | undefined;
constructor({ lock }: IdentityKeysOptions = {}) {
constructor({ zone }: IdentityKeysOptions = {}) {
super();
this.lock = lock;
this.zone = zone;
}
async getIdentityKey(): Promise<PrivateKey> {
@ -161,13 +116,13 @@ export class IdentityKeys extends IdentityKeyStore {
const encodedName = encodedNameFromAddress(name);
const publicKey = typedArrayToArrayBuffer(key.serialize());
// Pass `lock` to let `saveIdentity` archive sibling sessions when identity
// Pass `zone` to let `saveIdentity` archive sibling sessions when identity
// key changes.
return window.textsecure.storage.protocol.saveIdentity(
encodedName,
publicKey,
false,
{ lock: this.lock }
{ zone: this.zone }
);
}

View file

@ -25,7 +25,7 @@ import {
} from './Crypto';
import { assert } from './util/assert';
import { isNotNil } from './util/isNotNil';
import { Lock } from './util/Lock';
import { Zone } from './util/Zone';
import { isMoreRecentThan } from './util/timestamp';
import {
sessionRecordToProtobuf,
@ -115,10 +115,10 @@ type MapFields =
type SessionResetsType = Record<string, number>;
export type SessionTransactionOptions = {
readonly lock?: Lock;
readonly zone?: Zone;
};
const GLOBAL_LOCK = new Lock('GLOBAL_LOCK');
export const GLOBAL_ZONE = new Zone('GLOBAL_ZONE');
async function _fillCaches<ID, T extends HasIdType<ID>, HydratedType>(
object: SignalProtocolStore,
@ -219,14 +219,6 @@ export class SignalProtocolStore extends EventsMixin {
sessions?: Map<string, SessionCacheEntry>;
sessionLock?: Lock;
sessionLockQueue: Array<() => void> = [];
pendingSessions = new Map<string, SessionCacheEntry>();
pendingUnprocessed = new Map<string, UnprocessedType>();
preKeys?: Map<number, CacheEntryType<PreKeyType, PreKeyRecord>>;
signedPreKeys?: Map<
@ -238,6 +230,16 @@ export class SignalProtocolStore extends EventsMixin {
sessionQueues: Map<string, PQueue> = new Map<string, PQueue>();
private currentZone?: Zone;
private currentZoneDepth = 0;
private readonly zoneQueue: Array<() => void> = [];
private pendingSessions = new Map<string, SessionCacheEntry>();
private pendingUnprocessed = new Map<string, UnprocessedType>();
async hydrateCaches(): Promise<void> {
await Promise.all([
(async () => {
@ -603,54 +605,48 @@ export class SignalProtocolStore extends EventsMixin {
//
// - successfully: pending session stores are batched into the database
// - with an error: pending session stores are reverted
async sessionTransaction<T>(
public async withZone<T>(
zone: Zone,
name: string,
body: () => Promise<T>,
lock: Lock = GLOBAL_LOCK
body: () => Promise<T>
): Promise<T> {
const debugName = `sessionTransaction(${lock.name}:${name})`;
const debugName = `withZone(${zone.name}:${name})`;
// Allow re-entering from LibSignalStores
const isNested = this.sessionLock === lock;
if (this.sessionLock && !isNested) {
if (this.currentZone && this.currentZone !== zone) {
const start = Date.now();
window.log.info(
`${debugName}: locked by ${this.sessionLock.name}, waiting`
`${debugName}: locked by ${this.currentZone.name}, waiting`
);
await new Promise<void>(resolve => this.sessionLockQueue.push(resolve));
await new Promise<void>(resolve => this.zoneQueue.push(resolve));
const duration = Date.now() - start;
window.log.info(`${debugName}: unlocked after ${duration}ms`);
}
if (!isNested) {
if (lock !== GLOBAL_LOCK) {
window.log.info(`${debugName}: enter`);
}
this.sessionLock = lock;
}
this.enterZone(zone, name);
let result: T;
try {
result = await body();
} catch (error) {
if (!isNested) {
await this.revertSessions(name, error);
this.releaseSessionLock();
if (this.isInTopLevelZone()) {
await this.revertZoneChanges(name, error);
}
this.leaveZone(zone);
throw error;
}
if (!isNested) {
await this.commitSessions(name);
this.releaseSessionLock();
if (this.isInTopLevelZone()) {
await this.commitZoneChanges(name);
}
this.leaveZone(zone);
return result;
}
private async commitSessions(name: string): Promise<void> {
private async commitZoneChanges(name: string): Promise<void> {
const { pendingSessions, pendingUnprocessed } = this;
if (pendingSessions.size === 0 && pendingUnprocessed.size === 0) {
@ -658,7 +654,7 @@ export class SignalProtocolStore extends EventsMixin {
}
window.log.info(
`commitSessions(${name}): pending sessions ${pendingSessions.size} ` +
`commitZoneChanges(${name}): pending sessions ${pendingSessions.size} ` +
`pending unprocessed ${pendingUnprocessed.size}`
);
@ -683,18 +679,52 @@ export class SignalProtocolStore extends EventsMixin {
});
}
private async revertSessions(name: string, error: Error): Promise<void> {
private async revertZoneChanges(name: string, error: Error): Promise<void> {
window.log.info(
`revertSessions(${name}): pending size ${this.pendingSessions.size}`,
`revertZoneChanges(${name}): ` +
`pending sessions size ${this.pendingSessions.size} ` +
`pending unprocessed size ${this.pendingUnprocessed.size}`,
error && error.stack
);
this.pendingSessions.clear();
this.pendingUnprocessed.clear();
}
private releaseSessionLock(): void {
this.sessionLock = undefined;
const next = this.sessionLockQueue.shift();
private isInTopLevelZone(): boolean {
return this.currentZoneDepth === 1;
}
private enterZone(zone: Zone, name: string): void {
this.currentZoneDepth += 1;
if (this.currentZoneDepth === 1) {
assert(this.currentZone === undefined, 'Should not be in the zone');
this.currentZone = zone;
if (zone !== GLOBAL_ZONE) {
window.log.info(`enterZone(${zone.name}:${name})`);
}
}
}
private leaveZone(zone: Zone): void {
assert(this.currentZone === zone, 'Should be in the correct zone');
this.currentZoneDepth -= 1;
assert(this.currentZoneDepth >= 0, 'Unmatched number of leaveZone calls');
// Since we allow re-entering zones we might actually be in two overlapping
// async calls. Leave the zone and yield to another one only if there are
// no active zone users anymore.
if (this.currentZoneDepth !== 0) {
return;
}
if (zone !== GLOBAL_ZONE) {
window.log.info(`leaveZone(${zone.name})`);
}
this.currentZone = undefined;
const next = this.zoneQueue.shift();
if (next) {
next();
}
@ -702,51 +732,47 @@ export class SignalProtocolStore extends EventsMixin {
async loadSession(
encodedAddress: string,
{ lock }: SessionTransactionOptions = {}
{ zone = GLOBAL_ZONE }: SessionTransactionOptions = {}
): Promise<SessionRecord | undefined> {
return this.sessionTransaction(
'loadSession',
async () => {
if (!this.sessions) {
throw new Error('loadSession: this.sessions not yet cached!');
}
return this.withZone(zone, 'loadSession', async () => {
if (!this.sessions) {
throw new Error('loadSession: this.sessions not yet cached!');
}
if (encodedAddress === null || encodedAddress === undefined) {
throw new Error('loadSession: encodedAddress was undefined/null');
}
if (encodedAddress === null || encodedAddress === undefined) {
throw new Error('loadSession: encodedAddress was undefined/null');
}
try {
const id = await normalizeEncodedAddress(encodedAddress);
const map = this.pendingSessions.has(id)
? this.pendingSessions
: this.sessions;
const entry = map.get(id);
try {
const id = await normalizeEncodedAddress(encodedAddress);
const map = this.pendingSessions.has(id)
? this.pendingSessions
: this.sessions;
const entry = map.get(id);
if (!entry) {
return undefined;
}
if (entry.hydrated) {
return entry.item;
}
const item = await this._maybeMigrateSession(entry.fromDB);
map.set(id, {
hydrated: true,
item,
fromDB: entry.fromDB,
});
return item;
} catch (error) {
const errorString = error && error.stack ? error.stack : error;
window.log.error(
`loadSession: failed to load session ${encodedAddress}: ${errorString}`
);
if (!entry) {
return undefined;
}
},
lock
);
if (entry.hydrated) {
return entry.item;
}
const item = await this._maybeMigrateSession(entry.fromDB);
map.set(id, {
hydrated: true,
item,
fromDB: entry.fromDB,
});
return item;
} catch (error) {
const errorString = error && error.stack ? error.stack : error;
window.log.error(
`loadSession: failed to load session ${encodedAddress}: ${errorString}`
);
return undefined;
}
});
}
private async _maybeMigrateSession(
@ -792,54 +818,55 @@ export class SignalProtocolStore extends EventsMixin {
async storeSession(
encodedAddress: string,
record: SessionRecord,
{ lock }: SessionTransactionOptions = {}
{ zone = GLOBAL_ZONE }: SessionTransactionOptions = {}
): Promise<void> {
await this.sessionTransaction(
'storeSession',
async () => {
if (!this.sessions) {
throw new Error('storeSession: this.sessions not yet cached!');
}
await this.withZone(zone, 'storeSession', async () => {
if (!this.sessions) {
throw new Error('storeSession: this.sessions not yet cached!');
}
if (encodedAddress === null || encodedAddress === undefined) {
throw new Error('storeSession: encodedAddress was undefined/null');
if (encodedAddress === null || encodedAddress === undefined) {
throw new Error('storeSession: encodedAddress was undefined/null');
}
const unencoded = window.textsecure.utils.unencodeNumber(encodedAddress);
const deviceId = parseInt(unencoded[1], 10);
try {
const id = await normalizeEncodedAddress(encodedAddress);
const fromDB = {
id,
version: 2,
conversationId: window.textsecure.utils.unencodeNumber(id)[0],
deviceId,
record: record.serialize().toString('base64'),
};
const newSession = {
hydrated: true,
fromDB,
item: record,
};
assert(this.currentZone, 'Must run in the zone');
this.pendingSessions.set(id, newSession);
// Current zone doesn't support pending sessions - commit immediately
if (!zone.supportsPendingSessions()) {
await this.commitZoneChanges('storeSession');
}
const unencoded = window.textsecure.utils.unencodeNumber(
encodedAddress
} catch (error) {
const errorString = error && error.stack ? error.stack : error;
window.log.error(
`storeSession: Save failed fo ${encodedAddress}: ${errorString}`
);
const deviceId = parseInt(unencoded[1], 10);
try {
const id = await normalizeEncodedAddress(encodedAddress);
const fromDB = {
id,
version: 2,
conversationId: window.textsecure.utils.unencodeNumber(id)[0],
deviceId,
record: record.serialize().toString('base64'),
};
const newSession = {
hydrated: true,
fromDB,
item: record,
};
this.pendingSessions.set(id, newSession);
} catch (error) {
const errorString = error && error.stack ? error.stack : error;
window.log.error(
`storeSession: Save failed fo ${encodedAddress}: ${errorString}`
);
throw error;
}
},
lock
);
throw error;
}
});
}
async getDeviceIds(identifier: string): Promise<Array<number>> {
return this.sessionTransaction('getDeviceIds', async () => {
return this.withZone(GLOBAL_ZONE, 'getDeviceIds', async () => {
if (!this.sessions) {
throw new Error('getDeviceIds: this.sessions not yet cached!');
}
@ -892,7 +919,7 @@ export class SignalProtocolStore extends EventsMixin {
}
async removeSession(encodedAddress: string): Promise<void> {
return this.sessionTransaction('removeSession', async () => {
return this.withZone(GLOBAL_ZONE, 'removeSession', async () => {
if (!this.sessions) {
throw new Error('removeSession: this.sessions not yet cached!');
}
@ -912,7 +939,7 @@ export class SignalProtocolStore extends EventsMixin {
}
async removeAllSessions(identifier: string): Promise<void> {
return this.sessionTransaction('removeAllSessions', async () => {
return this.withZone(GLOBAL_ZONE, 'removeAllSessions', async () => {
if (!this.sessions) {
throw new Error('removeAllSessions: this.sessions not yet cached!');
}
@ -960,7 +987,7 @@ export class SignalProtocolStore extends EventsMixin {
}
async archiveSession(encodedAddress: string): Promise<void> {
return this.sessionTransaction('archiveSession', async () => {
return this.withZone(GLOBAL_ZONE, 'archiveSession', async () => {
if (!this.sessions) {
throw new Error('archiveSession: this.sessions not yet cached!');
}
@ -977,47 +1004,41 @@ export class SignalProtocolStore extends EventsMixin {
async archiveSiblingSessions(
encodedAddress: string,
{ lock }: SessionTransactionOptions = {}
{ zone = GLOBAL_ZONE }: SessionTransactionOptions = {}
): Promise<void> {
return this.sessionTransaction(
'archiveSiblingSessions',
async () => {
if (!this.sessions) {
throw new Error(
'archiveSiblingSessions: this.sessions not yet cached!'
);
}
window.log.info(
'archiveSiblingSessions: archiving sibling sessions for',
encodedAddress
return this.withZone(zone, 'archiveSiblingSessions', async () => {
if (!this.sessions) {
throw new Error(
'archiveSiblingSessions: this.sessions not yet cached!'
);
}
const id = await normalizeEncodedAddress(encodedAddress);
const [identifier, deviceId] = window.textsecure.utils.unencodeNumber(
id
);
const deviceIdNumber = parseInt(deviceId, 10);
window.log.info(
'archiveSiblingSessions: archiving sibling sessions for',
encodedAddress
);
const allEntries = this._getAllSessions();
const entries = allEntries.filter(
entry =>
entry.fromDB.conversationId === identifier &&
entry.fromDB.deviceId !== deviceIdNumber
);
const id = await normalizeEncodedAddress(encodedAddress);
const [identifier, deviceId] = window.textsecure.utils.unencodeNumber(id);
const deviceIdNumber = parseInt(deviceId, 10);
await Promise.all(
entries.map(async entry => {
await this._archiveSession(entry);
})
);
},
lock
);
const allEntries = this._getAllSessions();
const entries = allEntries.filter(
entry =>
entry.fromDB.conversationId === identifier &&
entry.fromDB.deviceId !== deviceIdNumber
);
await Promise.all(
entries.map(async entry => {
await this._archiveSession(entry);
})
);
});
}
async archiveAllSessions(identifier: string): Promise<void> {
return this.sessionTransaction('archiveAllSessions', async () => {
return this.withZone(GLOBAL_ZONE, 'archiveAllSessions', async () => {
if (!this.sessions) {
throw new Error('archiveAllSessions: this.sessions not yet cached!');
}
@ -1043,7 +1064,7 @@ export class SignalProtocolStore extends EventsMixin {
}
async clearSessionStore(): Promise<void> {
return this.sessionTransaction('clearSessionStore', async () => {
return this.withZone(GLOBAL_ZONE, 'clearSessionStore', async () => {
if (this.sessions) {
this.sessions.clear();
}
@ -1242,7 +1263,7 @@ export class SignalProtocolStore extends EventsMixin {
encodedAddress: string,
publicKey: ArrayBuffer,
nonblockingApproval = false,
{ lock }: SessionTransactionOptions = {}
{ zone }: SessionTransactionOptions = {}
): Promise<boolean> {
if (!this.identityKeys) {
throw new Error('saveIdentity: this.identityKeys not yet cached!');
@ -1316,9 +1337,9 @@ export class SignalProtocolStore extends EventsMixin {
);
}
// Pass the lock to facilitate transactional session use in
// Pass the zone to facilitate transactional session use in
// MessageReceiver.ts
await this.archiveSiblingSessions(encodedAddress, { lock });
await this.archiveSiblingSessions(encodedAddress, { zone });
return true;
}
@ -1646,57 +1667,54 @@ export class SignalProtocolStore extends EventsMixin {
// Not yet processed messages - for resiliency
getUnprocessedCount(): Promise<number> {
return this.sessionTransaction('getUnprocessedCount', async () => {
this._checkNoPendingUnprocessed();
return this.withZone(GLOBAL_ZONE, 'getUnprocessedCount', async () => {
return window.Signal.Data.getUnprocessedCount();
});
}
getAllUnprocessed(): Promise<Array<UnprocessedType>> {
return this.sessionTransaction('getAllUnprocessed', async () => {
this._checkNoPendingUnprocessed();
return this.withZone(GLOBAL_ZONE, 'getAllUnprocessed', async () => {
return window.Signal.Data.getAllUnprocessed();
});
}
getUnprocessedById(id: string): Promise<UnprocessedType | undefined> {
return this.sessionTransaction('getUnprocessedById', async () => {
this._checkNoPendingUnprocessed();
return this.withZone(GLOBAL_ZONE, 'getUnprocessedById', async () => {
return window.Signal.Data.getUnprocessedById(id);
});
}
addUnprocessed(
data: UnprocessedType,
{ lock }: SessionTransactionOptions = {}
{ zone = GLOBAL_ZONE }: SessionTransactionOptions = {}
): Promise<void> {
return this.sessionTransaction(
'addUnprocessed',
async () => {
this.pendingUnprocessed.set(data.id, data);
},
lock
);
return this.withZone(zone, 'addUnprocessed', async () => {
this.pendingUnprocessed.set(data.id, data);
// Current zone doesn't support pending unprocessed - commit immediately
if (!zone.supportsPendingUnprocessed()) {
await this.commitZoneChanges('addUnprocessed');
}
});
}
addMultipleUnprocessed(
array: Array<UnprocessedType>,
{ lock }: SessionTransactionOptions = {}
{ zone = GLOBAL_ZONE }: SessionTransactionOptions = {}
): Promise<void> {
return this.sessionTransaction(
'addMultipleUnprocessed',
async () => {
for (const elem of array) {
this.pendingUnprocessed.set(elem.id, elem);
}
},
lock
);
return this.withZone(zone, 'addMultipleUnprocessed', async () => {
for (const elem of array) {
this.pendingUnprocessed.set(elem.id, elem);
}
// Current zone doesn't support pending unprocessed - commit immediately
if (!zone.supportsPendingUnprocessed()) {
await this.commitZoneChanges('addMultipleUnprocessed');
}
});
}
updateUnprocessedAttempts(id: string, attempts: number): Promise<void> {
return this.sessionTransaction('updateUnprocessedAttempts', async () => {
this._checkNoPendingUnprocessed();
return this.withZone(GLOBAL_ZONE, 'updateUnprocessedAttempts', async () => {
await window.Signal.Data.updateUnprocessedAttempts(id, attempts);
});
}
@ -1705,8 +1723,7 @@ export class SignalProtocolStore extends EventsMixin {
id: string,
data: UnprocessedUpdateType
): Promise<void> {
return this.sessionTransaction('updateUnprocessedWithData', async () => {
this._checkNoPendingUnprocessed();
return this.withZone(GLOBAL_ZONE, 'updateUnprocessedWithData', async () => {
await window.Signal.Data.updateUnprocessedWithData(id, data);
});
}
@ -1714,22 +1731,23 @@ export class SignalProtocolStore extends EventsMixin {
updateUnprocessedsWithData(
items: Array<{ id: string; data: UnprocessedUpdateType }>
): Promise<void> {
return this.sessionTransaction('updateUnprocessedsWithData', async () => {
this._checkNoPendingUnprocessed();
await window.Signal.Data.updateUnprocessedsWithData(items);
});
return this.withZone(
GLOBAL_ZONE,
'updateUnprocessedsWithData',
async () => {
await window.Signal.Data.updateUnprocessedsWithData(items);
}
);
}
removeUnprocessed(idOrArray: string | Array<string>): Promise<void> {
return this.sessionTransaction('removeUnprocessed', async () => {
this._checkNoPendingUnprocessed();
return this.withZone(GLOBAL_ZONE, 'removeUnprocessed', async () => {
await window.Signal.Data.removeUnprocessed(idOrArray);
});
}
removeAllUnprocessed(): Promise<void> {
return this.sessionTransaction('removeAllUnprocessed', async () => {
this._checkNoPendingUnprocessed();
return this.withZone(GLOBAL_ZONE, 'removeAllUnprocessed', async () => {
await window.Signal.Data.removeAllUnprocessed();
});
}
@ -1765,17 +1783,6 @@ export class SignalProtocolStore extends EventsMixin {
return Array.from(union.values());
}
private _checkNoPendingUnprocessed(): void {
assert(
!this.sessionLock || this.sessionLock === GLOBAL_LOCK,
"Can't use this function with a global lock"
);
assert(
this.pendingUnprocessed.size === 0,
'Missing support for pending unprocessed'
);
}
}
window.SignalProtocolStore = SignalProtocolStore;

View file

@ -13,11 +13,11 @@ import {
import { signal } from '../protobuf/compiled';
import { sessionStructureToArrayBuffer } from '../util/sessionTranslation';
import { Lock } from '../util/Lock';
import { Zone } from '../util/Zone';
import { getRandomBytes, constantTimeEqual } from '../Crypto';
import { clampPrivateKey, setPublicKeyTypeByte } from '../Curve';
import { SignalProtocolStore } from '../SignalProtocolStore';
import { SignalProtocolStore, GLOBAL_ZONE } from '../SignalProtocolStore';
import { IdentityKeyType, KeyPairType } from '../textsecure/Types.d';
chai.use(chaiAsPromised);
@ -1280,27 +1280,50 @@ describe('SignalProtocolStore', () => {
});
});
describe('sessionTransaction', () => {
describe('zones', () => {
const zone = new Zone('zone', {
pendingSessions: true,
pendingUnprocessed: true,
});
beforeEach(async () => {
await store.removeAllUnprocessed();
await store.removeAllSessions(number);
});
it('should not store pending sessions in global zone', async () => {
const id = `${number}.1`;
const testRecord = getSessionRecord();
await assert.isRejected(
store.withZone(GLOBAL_ZONE, 'test', async () => {
await store.storeSession(id, testRecord);
throw new Error('Failure');
}),
'Failure'
);
assert.equal(await store.loadSession(id), testRecord);
});
it('commits session stores and unprocessed on success', async () => {
const id = `${number}.1`;
const testRecord = getSessionRecord();
await store.sessionTransaction('test', async () => {
await store.storeSession(id, testRecord);
await store.withZone(zone, 'test', async () => {
await store.storeSession(id, testRecord, { zone });
await store.addUnprocessed({
id: '2-two',
envelope: 'second',
timestamp: 2,
version: 2,
attempts: 0,
});
assert.equal(await store.loadSession(id), testRecord);
await store.addUnprocessed(
{
id: '2-two',
envelope: 'second',
timestamp: 2,
version: 2,
attempts: 0,
},
{ zone }
);
assert.equal(await store.loadSession(id, { zone }), testRecord);
});
assert.equal(await store.loadSession(id), testRecord);
@ -1321,17 +1344,20 @@ describe('SignalProtocolStore', () => {
assert.equal(await store.loadSession(id), testRecord);
await assert.isRejected(
store.sessionTransaction('test', async () => {
await store.storeSession(id, failedRecord);
assert.equal(await store.loadSession(id), failedRecord);
store.withZone(zone, 'test', async () => {
await store.storeSession(id, failedRecord, { zone });
assert.equal(await store.loadSession(id, { zone }), failedRecord);
await store.addUnprocessed({
id: '2-two',
envelope: 'second',
timestamp: 2,
version: 2,
attempts: 0,
});
await store.addUnprocessed(
{
id: '2-two',
envelope: 'second',
timestamp: 2,
version: 2,
attempts: 0,
},
{ zone }
);
throw new Error('Failure');
}),
@ -1346,25 +1372,15 @@ describe('SignalProtocolStore', () => {
const id = `${number}.1`;
const testRecord = getSessionRecord();
const lock = new Lock('lock');
await store.withZone(zone, 'test', async () => {
await store.withZone(zone, 'nested', async () => {
await store.storeSession(id, testRecord, { zone });
await store.sessionTransaction(
'test',
async () => {
await store.sessionTransaction(
'nested',
async () => {
await store.storeSession(id, testRecord, { lock });
assert.equal(await store.loadSession(id, { zone }), testRecord);
});
assert.equal(await store.loadSession(id, { lock }), testRecord);
},
lock
);
assert.equal(await store.loadSession(id, { lock }), testRecord);
},
lock
);
assert.equal(await store.loadSession(id, { zone }), testRecord);
});
assert.equal(await store.loadSession(id), testRecord);
});

View file

@ -39,7 +39,7 @@ import {
} from '../LibSignalStores';
import { BatcherType, createBatcher } from '../util/batcher';
import { parseIntOrThrow } from '../util/parseIntOrThrow';
import { Lock } from '../util/Lock';
import { Zone } from '../util/Zone';
import EventTarget from './EventTarget';
import { WebAPIType } from './WebAPI';
import utils from './Helpers';
@ -765,28 +765,27 @@ class MessageReceiverInner extends EventTarget {
window.log.info('MessageReceiver.cacheAndQueueBatch', items.length);
const decrypted: Array<DecryptedEnvelope> = [];
const storageProtocol = window.textsecure.storage.protocol;
try {
const lock = new Lock('cacheAndQueueBatch');
const sessionStore = new Sessions({
transactionOnly: true,
lock,
});
const identityKeyStore = new IdentityKeys({
lock,
const zone = new Zone('cacheAndQueueBatch', {
pendingSessions: true,
pendingUnprocessed: true,
});
const sessionStore = new Sessions({ zone });
const identityKeyStore = new IdentityKeys({ zone });
const failed: Array<UnprocessedType> = [];
// Below we:
//
// 1. Enter session transaction
// 1. Enter zone
// 2. Decrypt all batched envelopes
// 3. Persist both decrypted envelopes and envelopes that we failed to
// decrypt (for future retries, see `attempts` field)
// 4. Leave session transaction and commit all pending session updates
// 4. Leave zone and commit all pending sessions and unprocesseds
// 5. Acknowledge envelopes (can't fail)
// 6. Finally process decrypted envelopes
await sessionStore.transaction(async () => {
await storageProtocol.withZone(zone, 'MessageReceiver', async () => {
await Promise.all<void>(
items.map(async ({ data, envelope }) => {
try {
@ -833,7 +832,10 @@ class MessageReceiverInner extends EventTarget {
}
);
await sessionStore.addUnprocessed(unprocesseds.concat(failed));
await storageProtocol.addMultipleUnprocessed(
unprocesseds.concat(failed),
{ zone }
);
});
window.log.info(

View file

@ -1,6 +0,0 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
export class Lock {
constructor(public readonly name: string) {}
}

22
ts/util/Zone.ts Normal file
View file

@ -0,0 +1,22 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
export type ZoneOptions = {
readonly pendingSessions?: boolean;
readonly pendingUnprocessed?: boolean;
};
export class Zone {
constructor(
public readonly name: string,
private readonly options: ZoneOptions = {}
) {}
public supportsPendingSessions(): boolean {
return this.options.pendingSessions === true;
}
public supportsPendingUnprocessed(): boolean {
return this.options.pendingUnprocessed === true;
}
}