Fix deadlock in saveIdentity
This commit is contained in:
parent
68185606e7
commit
8172840535
2 changed files with 229 additions and 160 deletions
|
@ -1044,15 +1044,27 @@ export class SignalProtocolStore extends EventEmitter {
|
|||
});
|
||||
}
|
||||
|
||||
private _getIdentityQueue(serviceId: ServiceIdString): PQueue {
|
||||
private _runOnIdentityQueue<T>(
|
||||
serviceId: ServiceIdString,
|
||||
zone: Zone,
|
||||
name: string,
|
||||
body: () => Promise<T>
|
||||
): Promise<T> {
|
||||
let queue: PQueue;
|
||||
|
||||
const cachedQueue = this.identityQueues.get(serviceId);
|
||||
if (cachedQueue) {
|
||||
return cachedQueue;
|
||||
queue = cachedQueue;
|
||||
} else {
|
||||
queue = this._createIdentityQueue();
|
||||
this.identityQueues.set(serviceId, queue);
|
||||
}
|
||||
|
||||
const freshQueue = this._createIdentityQueue();
|
||||
this.identityQueues.set(serviceId, freshQueue);
|
||||
return freshQueue;
|
||||
// We run the identity queue task in zone because `saveIdentity` needs to
|
||||
// be able to archive sibling sessions on keychange. Not entering the zone
|
||||
// now would mean that we can take locks in different order here and in
|
||||
// MessageReceiver which will lead to a deadlock.
|
||||
return this.withZone(zone, name, () => queue.add(body));
|
||||
}
|
||||
|
||||
// Sessions
|
||||
|
@ -1991,7 +2003,7 @@ export class SignalProtocolStore extends EventEmitter {
|
|||
encodedAddress: Address,
|
||||
publicKey: Uint8Array,
|
||||
nonblockingApproval = false,
|
||||
{ zone }: SessionTransactionOptions = {}
|
||||
{ zone = GLOBAL_ZONE }: SessionTransactionOptions = {}
|
||||
): Promise<boolean> {
|
||||
if (!this.identityKeys) {
|
||||
throw new Error('saveIdentity: this.identityKeys not yet cached!');
|
||||
|
@ -2009,105 +2021,110 @@ export class SignalProtocolStore extends EventEmitter {
|
|||
nonblockingApproval = false;
|
||||
}
|
||||
|
||||
return this._getIdentityQueue(encodedAddress.serviceId).add(async () => {
|
||||
const identityRecord = await this.getOrMigrateIdentityRecord(
|
||||
encodedAddress.serviceId
|
||||
);
|
||||
|
||||
const id = encodedAddress.serviceId;
|
||||
const logId = `saveIdentity(${id})`;
|
||||
|
||||
if (!identityRecord || !identityRecord.publicKey) {
|
||||
// Lookup failed, or the current key was removed, so save this one.
|
||||
log.info(`${logId}: Saving new identity...`);
|
||||
await this._saveIdentityKey({
|
||||
id,
|
||||
publicKey,
|
||||
firstUse: true,
|
||||
timestamp: Date.now(),
|
||||
verified: VerifiedStatus.DEFAULT,
|
||||
nonblockingApproval,
|
||||
});
|
||||
|
||||
this.checkPreviousKey(
|
||||
encodedAddress.serviceId,
|
||||
publicKey,
|
||||
'saveIdentity'
|
||||
);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
const identityKeyChanged = !constantTimeEqual(
|
||||
identityRecord.publicKey,
|
||||
publicKey
|
||||
);
|
||||
|
||||
if (identityKeyChanged) {
|
||||
const isOurIdentifier = window.textsecure.storage.user.isOurServiceId(
|
||||
return this._runOnIdentityQueue(
|
||||
encodedAddress.serviceId,
|
||||
zone,
|
||||
'saveIdentity',
|
||||
async () => {
|
||||
const identityRecord = await this.getOrMigrateIdentityRecord(
|
||||
encodedAddress.serviceId
|
||||
);
|
||||
|
||||
if (isOurIdentifier && identityKeyChanged) {
|
||||
log.warn(`${logId}: ignoring identity for ourselves`);
|
||||
const id = encodedAddress.serviceId;
|
||||
const logId = `saveIdentity(${id})`;
|
||||
|
||||
if (!identityRecord || !identityRecord.publicKey) {
|
||||
// Lookup failed, or the current key was removed, so save this one.
|
||||
log.info(`${logId}: Saving new identity...`);
|
||||
await this._saveIdentityKey({
|
||||
id,
|
||||
publicKey,
|
||||
firstUse: true,
|
||||
timestamp: Date.now(),
|
||||
verified: VerifiedStatus.DEFAULT,
|
||||
nonblockingApproval,
|
||||
});
|
||||
|
||||
this.checkPreviousKey(
|
||||
encodedAddress.serviceId,
|
||||
publicKey,
|
||||
'saveIdentity'
|
||||
);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
log.info(`${logId}: Replacing existing identity...`);
|
||||
const previousStatus = identityRecord.verified;
|
||||
let verifiedStatus;
|
||||
if (
|
||||
previousStatus === VerifiedStatus.VERIFIED ||
|
||||
previousStatus === VerifiedStatus.UNVERIFIED
|
||||
) {
|
||||
verifiedStatus = VerifiedStatus.UNVERIFIED;
|
||||
} else {
|
||||
verifiedStatus = VerifiedStatus.DEFAULT;
|
||||
}
|
||||
const identityKeyChanged = !constantTimeEqual(
|
||||
identityRecord.publicKey,
|
||||
publicKey
|
||||
);
|
||||
|
||||
await this._saveIdentityKey({
|
||||
id,
|
||||
publicKey,
|
||||
firstUse: false,
|
||||
timestamp: Date.now(),
|
||||
verified: verifiedStatus,
|
||||
nonblockingApproval,
|
||||
});
|
||||
|
||||
// See `addKeyChange` in `ts/models/conversations.ts` for sender key info
|
||||
// update caused by this.
|
||||
try {
|
||||
this.emit(
|
||||
'keychange',
|
||||
encodedAddress.serviceId,
|
||||
'saveIdentity - change'
|
||||
);
|
||||
} catch (error) {
|
||||
log.error(
|
||||
`${logId}: error triggering keychange:`,
|
||||
Errors.toLogFormat(error)
|
||||
if (identityKeyChanged) {
|
||||
const isOurIdentifier = window.textsecure.storage.user.isOurServiceId(
|
||||
encodedAddress.serviceId
|
||||
);
|
||||
|
||||
if (isOurIdentifier && identityKeyChanged) {
|
||||
log.warn(`${logId}: ignoring identity for ourselves`);
|
||||
return false;
|
||||
}
|
||||
|
||||
log.info(`${logId}: Replacing existing identity...`);
|
||||
const previousStatus = identityRecord.verified;
|
||||
let verifiedStatus;
|
||||
if (
|
||||
previousStatus === VerifiedStatus.VERIFIED ||
|
||||
previousStatus === VerifiedStatus.UNVERIFIED
|
||||
) {
|
||||
verifiedStatus = VerifiedStatus.UNVERIFIED;
|
||||
} else {
|
||||
verifiedStatus = VerifiedStatus.DEFAULT;
|
||||
}
|
||||
|
||||
await this._saveIdentityKey({
|
||||
id,
|
||||
publicKey,
|
||||
firstUse: false,
|
||||
timestamp: Date.now(),
|
||||
verified: verifiedStatus,
|
||||
nonblockingApproval,
|
||||
});
|
||||
|
||||
// See `addKeyChange` in `ts/models/conversations.ts` for sender key info
|
||||
// update caused by this.
|
||||
try {
|
||||
this.emit(
|
||||
'keychange',
|
||||
encodedAddress.serviceId,
|
||||
'saveIdentity - change'
|
||||
);
|
||||
} catch (error) {
|
||||
log.error(
|
||||
`${logId}: error triggering keychange:`,
|
||||
Errors.toLogFormat(error)
|
||||
);
|
||||
}
|
||||
|
||||
// Pass the zone to facilitate transactional session use in
|
||||
// MessageReceiver.ts
|
||||
await this.archiveSiblingSessions(encodedAddress, {
|
||||
zone,
|
||||
});
|
||||
|
||||
return true;
|
||||
}
|
||||
if (this.isNonBlockingApprovalRequired(identityRecord)) {
|
||||
log.info(`${logId}: Setting approval status...`);
|
||||
|
||||
// Pass the zone to facilitate transactional session use in
|
||||
// MessageReceiver.ts
|
||||
await this.archiveSiblingSessions(encodedAddress, {
|
||||
zone,
|
||||
});
|
||||
identityRecord.nonblockingApproval = nonblockingApproval;
|
||||
await this._saveIdentityKey(identityRecord);
|
||||
|
||||
return true;
|
||||
}
|
||||
if (this.isNonBlockingApprovalRequired(identityRecord)) {
|
||||
log.info(`${logId}: Setting approval status...`);
|
||||
|
||||
identityRecord.nonblockingApproval = nonblockingApproval;
|
||||
await this._saveIdentityKey(identityRecord);
|
||||
return false;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
return false;
|
||||
});
|
||||
);
|
||||
}
|
||||
|
||||
// https://github.com/signalapp/Signal-Android/blob/fc3db538bcaa38dc149712a483d3032c9c1f3998/app/src/main/java/org/thoughtcrime/securesms/crypto/storage/SignalBaseIdentityKeyStore.java#L257
|
||||
|
@ -2125,9 +2142,14 @@ export class SignalProtocolStore extends EventEmitter {
|
|||
serviceId: ServiceIdString,
|
||||
attributes: Partial<IdentityKeyType>
|
||||
): Promise<void> {
|
||||
return this._getIdentityQueue(serviceId).add(async () => {
|
||||
return this.saveIdentityWithAttributesOnQueue(serviceId, attributes);
|
||||
});
|
||||
return this._runOnIdentityQueue(
|
||||
serviceId,
|
||||
GLOBAL_ZONE,
|
||||
'saveIdentityWithAttributes',
|
||||
async () => {
|
||||
return this.saveIdentityWithAttributesOnQueue(serviceId, attributes);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private async saveIdentityWithAttributesOnQueue(
|
||||
|
@ -2172,16 +2194,21 @@ export class SignalProtocolStore extends EventEmitter {
|
|||
throw new Error('setApproval: Invalid approval status');
|
||||
}
|
||||
|
||||
return this._getIdentityQueue(serviceId).add(async () => {
|
||||
const identityRecord = await this.getOrMigrateIdentityRecord(serviceId);
|
||||
return this._runOnIdentityQueue(
|
||||
serviceId,
|
||||
GLOBAL_ZONE,
|
||||
'setApproval',
|
||||
async () => {
|
||||
const identityRecord = await this.getOrMigrateIdentityRecord(serviceId);
|
||||
|
||||
if (!identityRecord) {
|
||||
throw new Error(`setApproval: No identity record for ${serviceId}`);
|
||||
if (!identityRecord) {
|
||||
throw new Error(`setApproval: No identity record for ${serviceId}`);
|
||||
}
|
||||
|
||||
identityRecord.nonblockingApproval = nonblockingApproval;
|
||||
await this._saveIdentityKey(identityRecord);
|
||||
}
|
||||
|
||||
identityRecord.nonblockingApproval = nonblockingApproval;
|
||||
await this._saveIdentityKey(identityRecord);
|
||||
});
|
||||
);
|
||||
}
|
||||
|
||||
// https://github.com/signalapp/Signal-Android/blob/fc3db538bcaa38dc149712a483d3032c9c1f3998/app/src/main/java/org/thoughtcrime/securesms/crypto/storage/SignalBaseIdentityKeyStore.java#L215
|
||||
|
@ -2198,21 +2225,26 @@ export class SignalProtocolStore extends EventEmitter {
|
|||
throw new Error('setVerified: Invalid verified status');
|
||||
}
|
||||
|
||||
return this._getIdentityQueue(serviceId).add(async () => {
|
||||
const identityRecord = await this.getOrMigrateIdentityRecord(serviceId);
|
||||
return this._runOnIdentityQueue(
|
||||
serviceId,
|
||||
GLOBAL_ZONE,
|
||||
'setVerified',
|
||||
async () => {
|
||||
const identityRecord = await this.getOrMigrateIdentityRecord(serviceId);
|
||||
|
||||
if (!identityRecord) {
|
||||
throw new Error(`setVerified: No identity record for ${serviceId}`);
|
||||
}
|
||||
if (!identityRecord) {
|
||||
throw new Error(`setVerified: No identity record for ${serviceId}`);
|
||||
}
|
||||
|
||||
if (validateIdentityKey(identityRecord)) {
|
||||
await this._saveIdentityKey({
|
||||
...identityRecord,
|
||||
...extra,
|
||||
verified: verifiedStatus,
|
||||
});
|
||||
if (validateIdentityKey(identityRecord)) {
|
||||
await this._saveIdentityKey({
|
||||
...identityRecord,
|
||||
...extra,
|
||||
verified: verifiedStatus,
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
);
|
||||
}
|
||||
|
||||
async getVerified(serviceId: ServiceIdString): Promise<number> {
|
||||
|
@ -2279,56 +2311,69 @@ export class SignalProtocolStore extends EventEmitter {
|
|||
`Invalid verified status: ${verifiedStatus}`
|
||||
);
|
||||
|
||||
return this._getIdentityQueue(serviceId).add(async () => {
|
||||
const identityRecord = await this.getOrMigrateIdentityRecord(serviceId);
|
||||
const hadEntry = identityRecord !== undefined;
|
||||
const keyMatches = Boolean(
|
||||
identityRecord?.publicKey &&
|
||||
constantTimeEqual(publicKey, identityRecord.publicKey)
|
||||
);
|
||||
const statusMatches =
|
||||
keyMatches && verifiedStatus === identityRecord?.verified;
|
||||
return this._runOnIdentityQueue(
|
||||
serviceId,
|
||||
GLOBAL_ZONE,
|
||||
'updateIdentityAfterSync',
|
||||
async () => {
|
||||
const identityRecord = await this.getOrMigrateIdentityRecord(serviceId);
|
||||
const hadEntry = identityRecord !== undefined;
|
||||
const keyMatches = Boolean(
|
||||
identityRecord?.publicKey &&
|
||||
constantTimeEqual(publicKey, identityRecord.publicKey)
|
||||
);
|
||||
const statusMatches =
|
||||
keyMatches && verifiedStatus === identityRecord?.verified;
|
||||
|
||||
if (!keyMatches || !statusMatches) {
|
||||
await this.saveIdentityWithAttributesOnQueue(serviceId, {
|
||||
publicKey,
|
||||
verified: verifiedStatus,
|
||||
firstUse: !hadEntry,
|
||||
timestamp: Date.now(),
|
||||
nonblockingApproval: true,
|
||||
});
|
||||
}
|
||||
if (!hadEntry) {
|
||||
this.checkPreviousKey(serviceId, publicKey, 'updateIdentityAfterSync');
|
||||
} else if (hadEntry && !keyMatches) {
|
||||
try {
|
||||
this.emit('keychange', serviceId, 'updateIdentityAfterSync - change');
|
||||
} catch (error) {
|
||||
log.error(
|
||||
'updateIdentityAfterSync: error triggering keychange:',
|
||||
Errors.toLogFormat(error)
|
||||
);
|
||||
if (!keyMatches || !statusMatches) {
|
||||
await this.saveIdentityWithAttributesOnQueue(serviceId, {
|
||||
publicKey,
|
||||
verified: verifiedStatus,
|
||||
firstUse: !hadEntry,
|
||||
timestamp: Date.now(),
|
||||
nonblockingApproval: true,
|
||||
});
|
||||
}
|
||||
if (!hadEntry) {
|
||||
this.checkPreviousKey(
|
||||
serviceId,
|
||||
publicKey,
|
||||
'updateIdentityAfterSync'
|
||||
);
|
||||
} else if (hadEntry && !keyMatches) {
|
||||
try {
|
||||
this.emit(
|
||||
'keychange',
|
||||
serviceId,
|
||||
'updateIdentityAfterSync - change'
|
||||
);
|
||||
} catch (error) {
|
||||
log.error(
|
||||
'updateIdentityAfterSync: error triggering keychange:',
|
||||
Errors.toLogFormat(error)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// See: https://github.com/signalapp/Signal-Android/blob/fc3db538bcaa38dc149712a483d3032c9c1f3998/app/src/main/java/org/thoughtcrime/securesms/database/RecipientDatabase.kt#L921-L936
|
||||
if (
|
||||
verifiedStatus === VerifiedStatus.VERIFIED &&
|
||||
(!hadEntry || identityRecord?.verified !== VerifiedStatus.VERIFIED)
|
||||
) {
|
||||
// Needs a notification.
|
||||
return true;
|
||||
// See: https://github.com/signalapp/Signal-Android/blob/fc3db538bcaa38dc149712a483d3032c9c1f3998/app/src/main/java/org/thoughtcrime/securesms/database/RecipientDatabase.kt#L921-L936
|
||||
if (
|
||||
verifiedStatus === VerifiedStatus.VERIFIED &&
|
||||
(!hadEntry || identityRecord?.verified !== VerifiedStatus.VERIFIED)
|
||||
) {
|
||||
// Needs a notification.
|
||||
return true;
|
||||
}
|
||||
if (
|
||||
verifiedStatus !== VerifiedStatus.VERIFIED &&
|
||||
hadEntry &&
|
||||
identityRecord?.verified === VerifiedStatus.VERIFIED
|
||||
) {
|
||||
// Needs a notification.
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
if (
|
||||
verifiedStatus !== VerifiedStatus.VERIFIED &&
|
||||
hadEntry &&
|
||||
identityRecord?.verified === VerifiedStatus.VERIFIED
|
||||
) {
|
||||
// Needs a notification.
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
);
|
||||
}
|
||||
|
||||
isUntrusted(
|
||||
|
|
|
@ -20,6 +20,7 @@ import { v4 as generateUuid } from 'uuid';
|
|||
import { signal } from '../protobuf/compiled';
|
||||
import { sessionStructureToBytes } from '../util/sessionTranslation';
|
||||
import * as durations from '../util/durations';
|
||||
import { explodePromise } from '../util/explodePromise';
|
||||
import { Zone } from '../util/Zone';
|
||||
|
||||
import * as Bytes from '../Bytes';
|
||||
|
@ -262,6 +263,29 @@ describe('SignalProtocolStore', () => {
|
|||
await store.saveIdentity(identifier, testKey.pubKey);
|
||||
await store.saveIdentity(identifier, newIdentity);
|
||||
});
|
||||
it('should not deadlock', async () => {
|
||||
const newIdentity = getPublicKey();
|
||||
const zone = new Zone('zone', {
|
||||
pendingSenderKeys: true,
|
||||
pendingSessions: true,
|
||||
pendingUnprocessed: true,
|
||||
});
|
||||
|
||||
await store.saveIdentity(identifier, testKey.pubKey);
|
||||
|
||||
const { promise, resolve } = explodePromise<void>();
|
||||
|
||||
await Promise.all([
|
||||
store.withZone(zone, 'test', async () => {
|
||||
await promise;
|
||||
return store.saveIdentity(identifier, newIdentity, false, { zone });
|
||||
}),
|
||||
store.saveIdentity(identifier, newIdentity, false, {
|
||||
zone: GLOBAL_ZONE,
|
||||
}),
|
||||
resolve(),
|
||||
]);
|
||||
});
|
||||
|
||||
describe('When there is no existing key (first use)', () => {
|
||||
before(async () => {
|
||||
|
|
Loading…
Reference in a new issue