Improve message download performance

This commit is contained in:
Scott Nonnenberg 2019-09-26 12:56:31 -07:00
parent 957f6f6474
commit 0c09f9620f
32 changed files with 906 additions and 633 deletions

View file

@ -493,6 +493,7 @@
const regionCode = libphonenumber.util.getRegionCodeForNumber(number);
await textsecure.storage.put('regionCode', regionCode);
await textsecure.storage.protocol.hydrateCaches();
},
async clearSessionsAndPreKeys() {
const store = textsecure.storage.protocol;

View file

@ -35766,6 +35766,8 @@ Internal.SessionRecord = function() {
return SessionRecord;
}();
libsignal.SessionRecord = Internal.SessionRecord;
function SignalProtocolAddress(name, deviceId) {
this.name = name;
this.deviceId = deviceId;
@ -35844,18 +35846,15 @@ SessionBuilder.prototype = {
});
}.bind(this)).then(function(session) {
var address = this.remoteAddress.toString();
return this.storage.loadSession(address).then(function(serialized) {
var record;
if (serialized !== undefined) {
record = Internal.SessionRecord.deserialize(serialized);
} else {
return this.storage.loadSession(address).then(function(record) {
if (record === undefined) {
record = new Internal.SessionRecord();
}
record.archiveCurrentState();
record.updateSessionState(session);
return Promise.all([
this.storage.storeSession(address, record.serialize()),
this.storage.storeSession(address, record),
this.storage.saveIdentity(this.remoteAddress.toString(), device.identityKey)
]);
}.bind(this));
@ -36039,12 +36038,7 @@ function SessionCipher(storage, remoteAddress) {
SessionCipher.prototype = {
getRecord: function(encodedNumber) {
return this.storage.loadSession(encodedNumber).then(function(serialized) {
if (serialized === undefined) {
return undefined;
}
return Internal.SessionRecord.deserialize(serialized);
});
return this.storage.loadSession(encodedNumber);
},
// encoding is an optional parameter - wrap() will only translate if one is provided
encrypt: function(buffer, encoding) {
@ -36124,7 +36118,7 @@ SessionCipher.prototype = {
return this.storage.saveIdentity(this.remoteAddress.toString(), theirIdentityKey);
}.bind(this)).then(function() {
record.updateSessionState(session);
return this.storage.storeSession(address, record.serialize()).then(function() {
return this.storage.storeSession(address, record).then(function() {
return result;
});
}.bind(this));
@ -36211,7 +36205,7 @@ SessionCipher.prototype = {
return this.storage.saveIdentity(this.remoteAddress.toString(), result.session.indexInfo.remoteIdentityKey);
}.bind(this)).then(function() {
record.updateSessionState(result.session);
return this.storage.storeSession(address, record.serialize()).then(function() {
return this.storage.storeSession(address, record).then(function() {
return result.plaintext;
});
}.bind(this));
@ -36246,7 +36240,7 @@ SessionCipher.prototype = {
preKeyProto.message.toArrayBuffer(), session
).then(function(plaintext) {
record.updateSessionState(session);
return this.storage.storeSession(address, record.serialize()).then(function() {
return this.storage.storeSession(address, record).then(function() {
if (preKeyId !== undefined && preKeyId !== null) {
return this.storage.removePreKey(preKeyId);
}
@ -36444,7 +36438,7 @@ SessionCipher.prototype = {
}
record.archiveCurrentState();
return this.storage.storeSession(address, record.serialize());
return this.storage.storeSession(address, record);
}.bind(this));
}.bind(this));
},
@ -36458,7 +36452,7 @@ SessionCipher.prototype = {
}
record.deleteAllSessions();
return this.storage.storeSession(address, record.serialize());
return this.storage.storeSession(address, record);
}.bind(this));
}.bind(this));
}

View file

@ -9,113 +9,11 @@
/* global _: false */
/* global ContactBuffer: false */
/* global GroupBuffer: false */
/* global Worker: false */
/* eslint-disable more/no-then */
const WORKER_TIMEOUT = 60 * 1000; // one minute
const RETRY_TIMEOUT = 2 * 60 * 1000;
const _utilWorker = new Worker('js/util_worker.js');
const _jobs = Object.create(null);
const _DEBUG = false;
let _jobCounter = 0;
function _makeJob(fnName) {
_jobCounter += 1;
const id = _jobCounter;
if (_DEBUG) {
window.log.info(`Worker job ${id} (${fnName}) started`);
}
_jobs[id] = {
fnName,
start: Date.now(),
};
return id;
}
function _updateJob(id, data) {
const { resolve, reject } = data;
const { fnName, start } = _jobs[id];
_jobs[id] = {
..._jobs[id],
...data,
resolve: value => {
_removeJob(id);
const end = Date.now();
window.log.info(
`Worker job ${id} (${fnName}) succeeded in ${end - start}ms`
);
return resolve(value);
},
reject: error => {
_removeJob(id);
const end = Date.now();
window.log.info(
`Worker job ${id} (${fnName}) failed in ${end - start}ms`
);
return reject(error);
},
};
}
function _removeJob(id) {
if (_DEBUG) {
_jobs[id].complete = true;
} else {
delete _jobs[id];
}
}
function _getJob(id) {
return _jobs[id];
}
async function callWorker(fnName, ...args) {
const jobId = _makeJob(fnName);
return new Promise((resolve, reject) => {
_utilWorker.postMessage([jobId, fnName, ...args]);
_updateJob(jobId, {
resolve,
reject,
args: _DEBUG ? args : null,
});
setTimeout(
() => reject(new Error(`Worker job ${jobId} (${fnName}) timed out`)),
WORKER_TIMEOUT
);
});
}
_utilWorker.onmessage = e => {
const [jobId, errorForDisplay, result] = e.data;
const job = _getJob(jobId);
if (!job) {
throw new Error(
`Received worker reply to job ${jobId}, but did not have it in our registry!`
);
}
const { resolve, reject, fnName } = job;
if (errorForDisplay) {
return reject(
new Error(
`Error received from worker job ${jobId} (${fnName}): ${errorForDisplay}`
)
);
}
return resolve(result);
};
function MessageReceiver(username, password, signalingKey, options = {}) {
this.count = 0;
@ -135,24 +33,39 @@ function MessageReceiver(username, password, signalingKey, options = {}) {
this.number = address.getName();
this.deviceId = address.getDeviceId();
this.pendingQueue = new window.PQueue({ concurrency: 1 });
this.incomingQueue = new window.PQueue({ concurrency: 1 });
this.pendingQueue = new window.PQueue({ concurrency: 1 });
this.appQueue = new window.PQueue({ concurrency: 1 });
this.cacheAddBatcher = window.Signal.Util.createBatcher({
wait: 200,
maxSize: 30,
processBatch: this.cacheAndQueueBatch.bind(this),
});
this.cacheUpdateBatcher = window.Signal.Util.createBatcher({
wait: 500,
maxSize: 30,
processBatch: this.cacheUpdateBatch.bind(this),
});
this.cacheRemoveBatcher = window.Signal.Util.createBatcher({
wait: 500,
maxSize: 30,
processBatch: this.cacheRemoveBatch.bind(this),
});
if (options.retryCached) {
this.pendingQueue.add(() => this.queueAllCached());
}
}
MessageReceiver.stringToArrayBuffer = string =>
Promise.resolve(dcodeIO.ByteBuffer.wrap(string, 'binary').toArrayBuffer());
dcodeIO.ByteBuffer.wrap(string, 'binary').toArrayBuffer();
MessageReceiver.arrayBufferToString = arrayBuffer =>
Promise.resolve(dcodeIO.ByteBuffer.wrap(arrayBuffer).toString('binary'));
dcodeIO.ByteBuffer.wrap(arrayBuffer).toString('binary');
MessageReceiver.stringToArrayBufferBase64 = string =>
callWorker('stringToArrayBufferBase64', string);
dcodeIO.ByteBuffer.wrap(string, 'base64').toArrayBuffer();
MessageReceiver.arrayBufferToStringBase64 = arrayBuffer =>
callWorker('arrayBufferToStringBase64', arrayBuffer);
dcodeIO.ByteBuffer.wrap(arrayBuffer).toString('base64');
MessageReceiver.prototype = new textsecure.EventTarget();
MessageReceiver.prototype.extend({
@ -197,6 +110,12 @@ MessageReceiver.prototype.extend({
this.stoppingProcessing = true;
return this.close();
},
unregisterBatchers() {
window.log.info('MessageReceiver: unregister batchers');
this.cacheAddBatcher.unregister();
this.cacheUpdateBatcher.unregister();
this.cacheRemoveBatcher.unregister();
},
shutdown() {
if (this.socket) {
this.socket.onclose = null;
@ -308,20 +227,7 @@ MessageReceiver.prototype.extend({
? envelope.serverTimestamp.toNumber()
: null;
try {
await this.addToCache(envelope, plaintext);
request.respond(200, 'OK');
this.queueEnvelope(envelope);
this.clearRetryTimeout();
this.maybeScheduleRetryTimeout();
} catch (error) {
request.respond(500, 'Failed to cache message');
window.log.error(
'handleRequest error trying to add message to cache:',
error && error.stack ? error.stack : error
);
}
this.cacheAndQueue(envelope, plaintext, request);
} catch (e) {
request.respond(500, 'Bad encrypted websocket message');
window.log.error(
@ -377,7 +283,12 @@ MessageReceiver.prototype.extend({
this.count = 0;
};
this.incomingQueue.add(waitForIncomingQueue);
const waitForCacheAddBatcher = async () => {
await this.cacheAddBatcher.onIdle();
this.incomingQueue.add(waitForIncomingQueue);
};
waitForCacheAddBatcher();
},
drain() {
const waitForIncomingQueue = () =>
@ -408,13 +319,13 @@ MessageReceiver.prototype.extend({
let envelopePlaintext = item.envelope;
if (item.version === 2) {
envelopePlaintext = await MessageReceiver.stringToArrayBufferBase64(
envelopePlaintext = MessageReceiver.stringToArrayBufferBase64(
envelopePlaintext
);
}
if (typeof envelopePlaintext === 'string') {
envelopePlaintext = await MessageReceiver.stringToArrayBuffer(
envelopePlaintext = MessageReceiver.stringToArrayBuffer(
envelopePlaintext
);
}
@ -430,13 +341,13 @@ MessageReceiver.prototype.extend({
let payloadPlaintext = decrypted;
if (item.version === 2) {
payloadPlaintext = await MessageReceiver.stringToArrayBufferBase64(
payloadPlaintext = MessageReceiver.stringToArrayBufferBase64(
payloadPlaintext
);
}
if (typeof payloadPlaintext === 'string') {
payloadPlaintext = await MessageReceiver.stringToArrayBuffer(
payloadPlaintext = MessageReceiver.stringToArrayBuffer(
payloadPlaintext
);
}
@ -530,44 +441,61 @@ MessageReceiver.prototype.extend({
})
);
},
async addToCache(envelope, plaintext) {
async cacheAndQueueBatch(items) {
const dataArray = items.map(item => item.data);
try {
await textsecure.storage.unprocessed.batchAdd(dataArray);
items.forEach(item => {
item.request.respond(200, 'OK');
this.queueEnvelope(item.envelope);
});
this.clearRetryTimeout();
this.maybeScheduleRetryTimeout();
} catch (error) {
items.forEach(item => {
item.request.respond(500, 'Failed to cache message');
});
window.log.error(
'cacheAndQueue error trying to add messages to cache:',
error && error.stack ? error.stack : error
);
}
},
cacheAndQueue(envelope, plaintext, request) {
const { id } = envelope;
const data = {
id,
version: 2,
envelope: await MessageReceiver.arrayBufferToStringBase64(plaintext),
envelope: MessageReceiver.arrayBufferToStringBase64(plaintext),
timestamp: Date.now(),
attempts: 1,
};
return textsecure.storage.unprocessed.add(data);
this.cacheAddBatcher.add({
request,
envelope,
data,
});
},
async updateCache(envelope, plaintext) {
async cacheUpdateBatch(items) {
await textsecure.storage.unprocessed.addDecryptedDataToList(items);
},
updateCache(envelope, plaintext) {
const { id } = envelope;
const item = await textsecure.storage.unprocessed.get(id);
if (!item) {
window.log.error(
`updateCache: Didn't find item ${id} in cache to update`
);
return null;
}
item.source = envelope.source;
item.sourceDevice = envelope.sourceDevice;
item.serverTimestamp = envelope.serverTimestamp;
if (item.version === 2) {
item.decrypted = await MessageReceiver.arrayBufferToStringBase64(
plaintext
);
} else {
item.decrypted = await MessageReceiver.arrayBufferToString(plaintext);
}
return textsecure.storage.unprocessed.addDecryptedData(item.id, item);
const data = {
source: envelope.source,
sourceDevice: envelope.sourceDevice,
serverTimestamp: envelope.serverTimestamp,
decrypted: MessageReceiver.arrayBufferToStringBase64(plaintext),
};
this.cacheUpdateBatcher.add({ id, data });
},
async cacheRemoveBatch(items) {
await textsecure.storage.unprocessed.remove(items);
},
removeFromCache(envelope) {
const { id } = envelope;
return textsecure.storage.unprocessed.remove(id);
this.cacheRemoveBatcher.add(id);
},
queueDecryptedEnvelope(envelope, plaintext) {
const id = this.getEnvelopeId(envelope);
@ -811,12 +739,7 @@ MessageReceiver.prototype.extend({
// Note: this is an out of band update; there are cases where the item in the
// cache has already been deleted by the time this runs. That's okay.
this.updateCache(envelope, plaintext).catch(error => {
window.log.error(
'decrypt failed to save decrypted message contents to cache:',
error && error.stack ? error.stack : error
);
});
this.updateCache(envelope, plaintext);
return plaintext;
})
@ -1497,6 +1420,9 @@ textsecure.MessageReceiver = function MessageReceiverWrapper(
messageReceiver
);
this.stopProcessing = messageReceiver.stopProcessing.bind(messageReceiver);
this.unregisterBatchers = messageReceiver.unregisterBatchers.bind(
messageReceiver
);
messageReceiver.connect();
};

View file

@ -676,7 +676,7 @@ MessageSender.prototype = {
);
},
sendDeliveryReceipt(recipientId, timestamp, options) {
sendDeliveryReceipt(recipientId, timestamps, options) {
const myNumber = textsecure.storage.user.getNumber();
const myDevice = textsecure.storage.user.getDeviceId();
if (myNumber === recipientId && (myDevice === 1 || myDevice === '1')) {
@ -685,7 +685,7 @@ MessageSender.prototype = {
const receiptMessage = new textsecure.protobuf.ReceiptMessage();
receiptMessage.type = textsecure.protobuf.ReceiptMessage.Type.DELIVERY;
receiptMessage.timestamp = [timestamp];
receiptMessage.timestamp = timestamps;
const contentMessage = new textsecure.protobuf.Content();
contentMessage.receiptMessage = receiptMessage;

View file

@ -21,6 +21,9 @@
add(data) {
return textsecure.storage.protocol.addUnprocessed(data);
},
batchAdd(array) {
return textsecure.storage.protocol.addMultipleUnprocessed(array);
},
updateAttempts(id, attempts) {
return textsecure.storage.protocol.updateUnprocessedAttempts(
id,
@ -30,8 +33,11 @@
addDecryptedData(id, data) {
return textsecure.storage.protocol.updateUnprocessedWithData(id, data);
},
remove(id) {
return textsecure.storage.protocol.removeUnprocessed(id);
addDecryptedDataToList(array) {
return textsecure.storage.protocol.updateUnprocessedsWithData(array);
},
remove(idOrArray) {
return textsecure.storage.protocol.removeUnprocessed(idOrArray);
},
removeAll() {
return textsecure.storage.protocol.removeAllUnprocessed();