Fix in-memory transactions in MessageReceiver
This commit is contained in:
parent
ceffc2380c
commit
7c07fdd589
2 changed files with 27 additions and 15 deletions
|
@ -458,12 +458,15 @@ export class SignalProtocolStore extends EventsMixin {
|
||||||
|
|
||||||
async enqueueSenderKeyJob<T>(
|
async enqueueSenderKeyJob<T>(
|
||||||
encodedAddress: string,
|
encodedAddress: string,
|
||||||
task: () => Promise<T>
|
task: () => Promise<T>,
|
||||||
|
zone = GLOBAL_ZONE
|
||||||
): Promise<T> {
|
): Promise<T> {
|
||||||
const senderId = await normalizeEncodedAddress(encodedAddress);
|
return this.withZone(zone, 'enqueueSenderKeyJob', async () => {
|
||||||
const queue = this._getSenderKeyQueue(senderId);
|
const senderId = await normalizeEncodedAddress(encodedAddress);
|
||||||
|
const queue = this._getSenderKeyQueue(senderId);
|
||||||
|
|
||||||
return queue.add<T>(task);
|
return queue.add<T>(task);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private _createSenderKeyQueue(): PQueue {
|
private _createSenderKeyQueue(): PQueue {
|
||||||
|
@ -567,12 +570,15 @@ export class SignalProtocolStore extends EventsMixin {
|
||||||
|
|
||||||
async enqueueSessionJob<T>(
|
async enqueueSessionJob<T>(
|
||||||
encodedAddress: string,
|
encodedAddress: string,
|
||||||
task: () => Promise<T>
|
task: () => Promise<T>,
|
||||||
|
zone: Zone = GLOBAL_ZONE
|
||||||
): Promise<T> {
|
): Promise<T> {
|
||||||
const id = await normalizeEncodedAddress(encodedAddress);
|
return this.withZone(zone, 'enqueueSessionJob', async () => {
|
||||||
const queue = this._getSessionQueue(id);
|
const id = await normalizeEncodedAddress(encodedAddress);
|
||||||
|
const queue = this._getSessionQueue(id);
|
||||||
|
|
||||||
return queue.add<T>(task);
|
return queue.add<T>(task);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private _createSessionQueue(): PQueue {
|
private _createSessionQueue(): PQueue {
|
||||||
|
|
|
@ -132,6 +132,7 @@ type DecryptedEnvelope = {
|
||||||
type LockedStores = {
|
type LockedStores = {
|
||||||
readonly sessionStore: Sessions;
|
readonly sessionStore: Sessions;
|
||||||
readonly identityKeyStore: IdentityKeys;
|
readonly identityKeyStore: IdentityKeys;
|
||||||
|
readonly zone?: Zone;
|
||||||
};
|
};
|
||||||
|
|
||||||
class MessageReceiverInner extends EventTarget {
|
class MessageReceiverInner extends EventTarget {
|
||||||
|
@ -790,7 +791,7 @@ class MessageReceiverInner extends EventTarget {
|
||||||
items.map(async ({ data, envelope }) => {
|
items.map(async ({ data, envelope }) => {
|
||||||
try {
|
try {
|
||||||
const plaintext = await this.queueEnvelope(
|
const plaintext = await this.queueEnvelope(
|
||||||
{ sessionStore, identityKeyStore },
|
{ sessionStore, identityKeyStore, zone },
|
||||||
envelope
|
envelope
|
||||||
);
|
);
|
||||||
if (plaintext) {
|
if (plaintext) {
|
||||||
|
@ -1059,7 +1060,7 @@ class MessageReceiverInner extends EventTarget {
|
||||||
}
|
}
|
||||||
|
|
||||||
async decrypt(
|
async decrypt(
|
||||||
{ sessionStore, identityKeyStore }: LockedStores,
|
{ sessionStore, identityKeyStore, zone }: LockedStores,
|
||||||
envelope: EnvelopeClass,
|
envelope: EnvelopeClass,
|
||||||
ciphertext: ByteBufferClass
|
ciphertext: ByteBufferClass
|
||||||
): Promise<ArrayBuffer | null> {
|
): Promise<ArrayBuffer | null> {
|
||||||
|
@ -1112,7 +1113,8 @@ class MessageReceiverInner extends EventTarget {
|
||||||
ProtocolAddress.new(identifier, sourceDevice),
|
ProtocolAddress.new(identifier, sourceDevice),
|
||||||
senderKeyStore,
|
senderKeyStore,
|
||||||
messageBuffer
|
messageBuffer
|
||||||
).then(plaintext => this.unpad(typedArrayToArrayBuffer(plaintext)))
|
).then(plaintext => this.unpad(typedArrayToArrayBuffer(plaintext))),
|
||||||
|
zone
|
||||||
);
|
);
|
||||||
} else if (envelope.type === envelopeTypeEnum.CIPHERTEXT) {
|
} else if (envelope.type === envelopeTypeEnum.CIPHERTEXT) {
|
||||||
window.log.info('message from', this.getEnvelopeId(envelope));
|
window.log.info('message from', this.getEnvelopeId(envelope));
|
||||||
|
@ -1139,7 +1141,8 @@ class MessageReceiverInner extends EventTarget {
|
||||||
ProtocolAddress.new(identifier, sourceDevice),
|
ProtocolAddress.new(identifier, sourceDevice),
|
||||||
sessionStore,
|
sessionStore,
|
||||||
identityKeyStore
|
identityKeyStore
|
||||||
).then(plaintext => this.unpad(typedArrayToArrayBuffer(plaintext)))
|
).then(plaintext => this.unpad(typedArrayToArrayBuffer(plaintext))),
|
||||||
|
zone
|
||||||
);
|
);
|
||||||
} else if (envelope.type === envelopeTypeEnum.PREKEY_BUNDLE) {
|
} else if (envelope.type === envelopeTypeEnum.PREKEY_BUNDLE) {
|
||||||
window.log.info('prekey message from', this.getEnvelopeId(envelope));
|
window.log.info('prekey message from', this.getEnvelopeId(envelope));
|
||||||
|
@ -1168,7 +1171,8 @@ class MessageReceiverInner extends EventTarget {
|
||||||
identityKeyStore,
|
identityKeyStore,
|
||||||
preKeyStore,
|
preKeyStore,
|
||||||
signedPreKeyStore
|
signedPreKeyStore
|
||||||
).then(plaintext => this.unpad(typedArrayToArrayBuffer(plaintext)))
|
).then(plaintext => this.unpad(typedArrayToArrayBuffer(plaintext))),
|
||||||
|
zone
|
||||||
);
|
);
|
||||||
} else if (envelope.type === envelopeTypeEnum.UNIDENTIFIED_SENDER) {
|
} else if (envelope.type === envelopeTypeEnum.UNIDENTIFIED_SENDER) {
|
||||||
window.log.info('received unidentified sender message');
|
window.log.info('received unidentified sender message');
|
||||||
|
@ -1241,7 +1245,8 @@ class MessageReceiverInner extends EventTarget {
|
||||||
),
|
),
|
||||||
senderKeyStore,
|
senderKeyStore,
|
||||||
buffer
|
buffer
|
||||||
)
|
),
|
||||||
|
zone
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1261,7 +1266,8 @@ class MessageReceiverInner extends EventTarget {
|
||||||
identityKeyStore,
|
identityKeyStore,
|
||||||
preKeyStore,
|
preKeyStore,
|
||||||
signedPreKeyStore
|
signedPreKeyStore
|
||||||
)
|
),
|
||||||
|
zone
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue