Wrap eventHandlerQueue jobs with task with timeout
This commit is contained in:
parent
5cee260063
commit
d7a2669b49
7 changed files with 112 additions and 116 deletions
|
@ -291,23 +291,25 @@ export async function startApp(): Promise<void> {
|
||||||
serverTrustRoot: window.getServerTrustRoot(),
|
serverTrustRoot: window.getServerTrustRoot(),
|
||||||
});
|
});
|
||||||
|
|
||||||
function queuedEventListener<Args extends Array<unknown>>(
|
function queuedEventListener<E extends Event>(
|
||||||
handler: (...args: Args) => Promise<void> | void,
|
handler: (event: E) => Promise<void> | void,
|
||||||
track = true
|
track = true
|
||||||
): (...args: Args) => void {
|
): (event: E) => void {
|
||||||
return (...args: Args): void => {
|
return (event: E): void => {
|
||||||
eventHandlerQueue.add(async () => {
|
eventHandlerQueue.add(
|
||||||
try {
|
createTaskWithTimeout(async () => {
|
||||||
await handler(...args);
|
try {
|
||||||
} finally {
|
await handler(event);
|
||||||
// message/sent: Message.handleDataMessage has its own queue and will
|
} finally {
|
||||||
// trigger this event itself when complete.
|
// message/sent: Message.handleDataMessage has its own queue and will
|
||||||
// error: Error processing (below) also has its own queue and self-trigger.
|
// trigger this event itself when complete.
|
||||||
if (track) {
|
// error: Error processing (below) also has its own queue and self-trigger.
|
||||||
window.Whisper.events.trigger('incrementProgress');
|
if (track) {
|
||||||
|
window.Whisper.events.trigger('incrementProgress');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}, `queuedEventListener(${event.type}, ${event.timeStamp})`)
|
||||||
});
|
);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -361,13 +363,13 @@ export async function startApp(): Promise<void> {
|
||||||
);
|
);
|
||||||
messageReceiver.addEventListener(
|
messageReceiver.addEventListener(
|
||||||
'decryption-error',
|
'decryption-error',
|
||||||
queuedEventListener((event: DecryptionErrorEvent) => {
|
queuedEventListener((event: DecryptionErrorEvent): void => {
|
||||||
onDecryptionErrorQueue.add(() => onDecryptionError(event));
|
onDecryptionErrorQueue.add(() => onDecryptionError(event));
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
messageReceiver.addEventListener(
|
messageReceiver.addEventListener(
|
||||||
'retry-request',
|
'retry-request',
|
||||||
queuedEventListener((event: RetryRequestEvent) => {
|
queuedEventListener((event: RetryRequestEvent): void => {
|
||||||
onRetryRequestQueue.add(() => onRetryRequest(event));
|
onRetryRequestQueue.add(() => onRetryRequest(event));
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
@ -437,7 +439,6 @@ export async function startApp(): Promise<void> {
|
||||||
|
|
||||||
const eventHandlerQueue = new PQueue({
|
const eventHandlerQueue = new PQueue({
|
||||||
concurrency: 1,
|
concurrency: 1,
|
||||||
timeout: durations.MINUTE * 30,
|
|
||||||
});
|
});
|
||||||
|
|
||||||
// Note: this queue is meant to allow for stop/start of tasks, not limit parallelism.
|
// Note: this queue is meant to allow for stop/start of tasks, not limit parallelism.
|
||||||
|
@ -2449,7 +2450,7 @@ export async function startApp(): Promise<void> {
|
||||||
|
|
||||||
window.waitForEmptyEventQueue = waitForEmptyEventQueue;
|
window.waitForEmptyEventQueue = waitForEmptyEventQueue;
|
||||||
|
|
||||||
async function onEmpty() {
|
async function onEmpty(): Promise<void> {
|
||||||
const { storage } = window.textsecure;
|
const { storage } = window.textsecure;
|
||||||
|
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
|
@ -2586,7 +2587,7 @@ export async function startApp(): Promise<void> {
|
||||||
connect();
|
connect();
|
||||||
}
|
}
|
||||||
|
|
||||||
function onConfiguration(ev: ConfigurationEvent) {
|
function onConfiguration(ev: ConfigurationEvent): void {
|
||||||
ev.confirm();
|
ev.confirm();
|
||||||
|
|
||||||
const { configuration } = ev;
|
const { configuration } = ev;
|
||||||
|
@ -2618,7 +2619,7 @@ export async function startApp(): Promise<void> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function onTyping(ev: TypingEvent) {
|
function onTyping(ev: TypingEvent): void {
|
||||||
// Note: this type of message is automatically removed from cache in MessageReceiver
|
// Note: this type of message is automatically removed from cache in MessageReceiver
|
||||||
|
|
||||||
const { typing, sender, senderUuid, senderDevice } = ev;
|
const { typing, sender, senderUuid, senderDevice } = ev;
|
||||||
|
@ -2707,7 +2708,7 @@ export async function startApp(): Promise<void> {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async function onStickerPack(ev: StickerPackEvent) {
|
function onStickerPack(ev: StickerPackEvent): void {
|
||||||
ev.confirm();
|
ev.confirm();
|
||||||
|
|
||||||
const packs = ev.stickerPacks;
|
const packs = ev.stickerPacks;
|
||||||
|
@ -2741,13 +2742,13 @@ export async function startApp(): Promise<void> {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async function onGroupSyncComplete() {
|
async function onGroupSyncComplete(): Promise<void> {
|
||||||
log.info('onGroupSyncComplete');
|
log.info('onGroupSyncComplete');
|
||||||
await window.storage.put('synced_at', Date.now());
|
await window.storage.put('synced_at', Date.now());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note: this handler is only for v1 groups received via 'group sync' messages
|
// Note: this handler is only for v1 groups received via 'group sync' messages
|
||||||
async function onGroupReceived(ev: GroupEvent) {
|
async function onGroupReceived(ev: GroupEvent): Promise<void> {
|
||||||
const details = ev.groupDetails;
|
const details = ev.groupDetails;
|
||||||
const { id } = details;
|
const { id } = details;
|
||||||
|
|
||||||
|
@ -2868,7 +2869,7 @@ export async function startApp(): Promise<void> {
|
||||||
maxSize: Infinity,
|
maxSize: Infinity,
|
||||||
});
|
});
|
||||||
|
|
||||||
function onEnvelopeReceived({ envelope }: EnvelopeEvent) {
|
function onEnvelopeReceived({ envelope }: EnvelopeEvent): void {
|
||||||
const ourUuid = window.textsecure.storage.user.getUuid()?.toString();
|
const ourUuid = window.textsecure.storage.user.getUuid()?.toString();
|
||||||
if (envelope.sourceUuid && envelope.sourceUuid !== ourUuid) {
|
if (envelope.sourceUuid && envelope.sourceUuid !== ourUuid) {
|
||||||
window.ConversationController.maybeMergeContacts({
|
window.ConversationController.maybeMergeContacts({
|
||||||
|
@ -2882,7 +2883,7 @@ export async function startApp(): Promise<void> {
|
||||||
// Note: We do very little in this function, since everything in handleDataMessage is
|
// Note: We do very little in this function, since everything in handleDataMessage is
|
||||||
// inside a conversation-specific queue(). Any code here might run before an earlier
|
// inside a conversation-specific queue(). Any code here might run before an earlier
|
||||||
// message is processed in handleDataMessage().
|
// message is processed in handleDataMessage().
|
||||||
function onMessageReceived(event: MessageEvent) {
|
async function onMessageReceived(event: MessageEvent): Promise<void> {
|
||||||
const { data, confirm } = event;
|
const { data, confirm } = event;
|
||||||
|
|
||||||
const messageDescriptor = getMessageDescriptor({
|
const messageDescriptor = getMessageDescriptor({
|
||||||
|
@ -2947,7 +2948,7 @@ export async function startApp(): Promise<void> {
|
||||||
if (!isValidReactionEmoji(reaction.emoji)) {
|
if (!isValidReactionEmoji(reaction.emoji)) {
|
||||||
log.warn('Received an invalid reaction emoji. Dropping it');
|
log.warn('Received an invalid reaction emoji. Dropping it');
|
||||||
confirm();
|
confirm();
|
||||||
return Promise.resolve();
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
strictAssert(
|
strictAssert(
|
||||||
|
@ -2975,7 +2976,7 @@ export async function startApp(): Promise<void> {
|
||||||
// Note: We do not wait for completion here
|
// Note: We do not wait for completion here
|
||||||
Reactions.getSingleton().onReaction(reactionModel, message);
|
Reactions.getSingleton().onReaction(reactionModel, message);
|
||||||
confirm();
|
confirm();
|
||||||
return Promise.resolve();
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (data.message.delete) {
|
if (data.message.delete) {
|
||||||
|
@ -3004,21 +3005,22 @@ export async function startApp(): Promise<void> {
|
||||||
Deletes.getSingleton().onDelete(deleteModel);
|
Deletes.getSingleton().onDelete(deleteModel);
|
||||||
|
|
||||||
confirm();
|
confirm();
|
||||||
return Promise.resolve();
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (handleGroupCallUpdateMessage(data.message, messageDescriptor)) {
|
if (handleGroupCallUpdateMessage(data.message, messageDescriptor)) {
|
||||||
confirm();
|
confirm();
|
||||||
return Promise.resolve();
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't wait for handleDataMessage, as it has its own per-conversation queueing
|
// Don't wait for handleDataMessage, as it has its own per-conversation queueing
|
||||||
message.handleDataMessage(data.message, event.confirm);
|
message.handleDataMessage(data.message, event.confirm);
|
||||||
|
|
||||||
return Promise.resolve();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async function onProfileKeyUpdate({ data, confirm }: ProfileKeyUpdateEvent) {
|
async function onProfileKeyUpdate({
|
||||||
|
data,
|
||||||
|
confirm,
|
||||||
|
}: ProfileKeyUpdateEvent): Promise<void> {
|
||||||
const conversation = window.ConversationController.maybeMergeContacts({
|
const conversation = window.ConversationController.maybeMergeContacts({
|
||||||
aci: data.sourceUuid,
|
aci: data.sourceUuid,
|
||||||
e164: data.source,
|
e164: data.source,
|
||||||
|
@ -3279,7 +3281,7 @@ export async function startApp(): Promise<void> {
|
||||||
// Note: We do very little in this function, since everything in handleDataMessage is
|
// Note: We do very little in this function, since everything in handleDataMessage is
|
||||||
// inside a conversation-specific queue(). Any code here might run before an earlier
|
// inside a conversation-specific queue(). Any code here might run before an earlier
|
||||||
// message is processed in handleDataMessage().
|
// message is processed in handleDataMessage().
|
||||||
function onSentMessage(event: SentEvent) {
|
async function onSentMessage(event: SentEvent): Promise<void> {
|
||||||
const { data, confirm } = event;
|
const { data, confirm } = event;
|
||||||
|
|
||||||
const source = window.textsecure.storage.user.getNumber();
|
const source = window.textsecure.storage.user.getNumber();
|
||||||
|
@ -3327,7 +3329,7 @@ export async function startApp(): Promise<void> {
|
||||||
if (!isValidReactionEmoji(reaction.emoji)) {
|
if (!isValidReactionEmoji(reaction.emoji)) {
|
||||||
log.warn('Received an invalid reaction emoji. Dropping it');
|
log.warn('Received an invalid reaction emoji. Dropping it');
|
||||||
event.confirm();
|
event.confirm();
|
||||||
return Promise.resolve();
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info('Queuing sent reaction for', reaction.targetTimestamp);
|
log.info('Queuing sent reaction for', reaction.targetTimestamp);
|
||||||
|
@ -3345,7 +3347,7 @@ export async function startApp(): Promise<void> {
|
||||||
Reactions.getSingleton().onReaction(reactionModel, message);
|
Reactions.getSingleton().onReaction(reactionModel, message);
|
||||||
|
|
||||||
event.confirm();
|
event.confirm();
|
||||||
return Promise.resolve();
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (data.message.delete) {
|
if (data.message.delete) {
|
||||||
|
@ -3367,20 +3369,18 @@ export async function startApp(): Promise<void> {
|
||||||
// Note: We do not wait for completion here
|
// Note: We do not wait for completion here
|
||||||
Deletes.getSingleton().onDelete(deleteModel);
|
Deletes.getSingleton().onDelete(deleteModel);
|
||||||
confirm();
|
confirm();
|
||||||
return Promise.resolve();
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (handleGroupCallUpdateMessage(data.message, messageDescriptor)) {
|
if (handleGroupCallUpdateMessage(data.message, messageDescriptor)) {
|
||||||
event.confirm();
|
event.confirm();
|
||||||
return Promise.resolve();
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't wait for handleDataMessage, as it has its own per-conversation queueing
|
// Don't wait for handleDataMessage, as it has its own per-conversation queueing
|
||||||
message.handleDataMessage(data.message, event.confirm, {
|
message.handleDataMessage(data.message, event.confirm, {
|
||||||
data,
|
data,
|
||||||
});
|
});
|
||||||
|
|
||||||
return Promise.resolve();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type MessageDescriptor = {
|
type MessageDescriptor = {
|
||||||
|
@ -3525,7 +3525,7 @@ export async function startApp(): Promise<void> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function onError(ev: ErrorEvent) {
|
function onError(ev: ErrorEvent): void {
|
||||||
const { error } = ev;
|
const { error } = ev;
|
||||||
log.error('background onError:', Errors.toLogFormat(error));
|
log.error('background onError:', Errors.toLogFormat(error));
|
||||||
|
|
||||||
|
@ -3540,7 +3540,7 @@ export async function startApp(): Promise<void> {
|
||||||
log.warn('background onError: Doing nothing with incoming error');
|
log.warn('background onError: Doing nothing with incoming error');
|
||||||
}
|
}
|
||||||
|
|
||||||
async function onViewOnceOpenSync(ev: ViewOnceOpenSyncEvent) {
|
function onViewOnceOpenSync(ev: ViewOnceOpenSyncEvent): void {
|
||||||
ev.confirm();
|
ev.confirm();
|
||||||
|
|
||||||
const { source, sourceUuid, timestamp } = ev;
|
const { source, sourceUuid, timestamp } = ev;
|
||||||
|
@ -3558,7 +3558,7 @@ export async function startApp(): Promise<void> {
|
||||||
ViewOnceOpenSyncs.getSingleton().onSync(sync);
|
ViewOnceOpenSyncs.getSingleton().onSync(sync);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function onFetchLatestSync(ev: FetchLatestEvent) {
|
async function onFetchLatestSync(ev: FetchLatestEvent): Promise<void> {
|
||||||
ev.confirm();
|
ev.confirm();
|
||||||
|
|
||||||
const { eventType } = ev;
|
const { eventType } = ev;
|
||||||
|
@ -3567,6 +3567,7 @@ export async function startApp(): Promise<void> {
|
||||||
|
|
||||||
switch (eventType) {
|
switch (eventType) {
|
||||||
case FETCH_LATEST_ENUM.LOCAL_PROFILE: {
|
case FETCH_LATEST_ENUM.LOCAL_PROFILE: {
|
||||||
|
log.info('onFetchLatestSync: fetching latest local profile');
|
||||||
const ourUuid = window.textsecure.storage.user.getUuid()?.toString();
|
const ourUuid = window.textsecure.storage.user.getUuid()?.toString();
|
||||||
const ourE164 = window.textsecure.storage.user.getNumber();
|
const ourE164 = window.textsecure.storage.user.getNumber();
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
|
@ -3620,7 +3621,7 @@ export async function startApp(): Promise<void> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function onMessageRequestResponse(ev: MessageRequestResponseEvent) {
|
function onMessageRequestResponse(ev: MessageRequestResponseEvent): void {
|
||||||
ev.confirm();
|
ev.confirm();
|
||||||
|
|
||||||
const {
|
const {
|
||||||
|
@ -3656,7 +3657,7 @@ export async function startApp(): Promise<void> {
|
||||||
MessageRequests.getSingleton().onResponse(sync);
|
MessageRequests.getSingleton().onResponse(sync);
|
||||||
}
|
}
|
||||||
|
|
||||||
function onReadReceipt(event: Readonly<ReadEvent>) {
|
function onReadReceipt(event: Readonly<ReadEvent>): void {
|
||||||
onReadOrViewReceipt({
|
onReadOrViewReceipt({
|
||||||
logTitle: 'read receipt',
|
logTitle: 'read receipt',
|
||||||
event,
|
event,
|
||||||
|
@ -3731,7 +3732,7 @@ export async function startApp(): Promise<void> {
|
||||||
MessageReceipts.getSingleton().onReceipt(receipt);
|
MessageReceipts.getSingleton().onReceipt(receipt);
|
||||||
}
|
}
|
||||||
|
|
||||||
function onReadSync(ev: ReadSyncEvent) {
|
function onReadSync(ev: ReadSyncEvent): Promise<void> {
|
||||||
const { envelopeTimestamp, sender, senderUuid, timestamp } = ev.read;
|
const { envelopeTimestamp, sender, senderUuid, timestamp } = ev.read;
|
||||||
const readAt = envelopeTimestamp;
|
const readAt = envelopeTimestamp;
|
||||||
const senderConversation = window.ConversationController.lookupOrCreate({
|
const senderConversation = window.ConversationController.lookupOrCreate({
|
||||||
|
@ -3770,7 +3771,7 @@ export async function startApp(): Promise<void> {
|
||||||
return ReadSyncs.getSingleton().onSync(receipt);
|
return ReadSyncs.getSingleton().onSync(receipt);
|
||||||
}
|
}
|
||||||
|
|
||||||
function onViewSync(ev: ViewSyncEvent) {
|
function onViewSync(ev: ViewSyncEvent): Promise<void> {
|
||||||
const { envelopeTimestamp, senderE164, senderUuid, timestamp } = ev.view;
|
const { envelopeTimestamp, senderE164, senderUuid, timestamp } = ev.view;
|
||||||
const senderConversation = window.ConversationController.lookupOrCreate({
|
const senderConversation = window.ConversationController.lookupOrCreate({
|
||||||
e164: senderE164,
|
e164: senderE164,
|
||||||
|
@ -3808,7 +3809,7 @@ export async function startApp(): Promise<void> {
|
||||||
return ViewSyncs.getSingleton().onSync(receipt);
|
return ViewSyncs.getSingleton().onSync(receipt);
|
||||||
}
|
}
|
||||||
|
|
||||||
function onDeliveryReceipt(ev: DeliveryEvent) {
|
function onDeliveryReceipt(ev: DeliveryEvent): void {
|
||||||
const { deliveryReceipt } = ev;
|
const { deliveryReceipt } = ev;
|
||||||
const {
|
const {
|
||||||
envelopeTimestamp,
|
envelopeTimestamp,
|
||||||
|
|
|
@ -3498,10 +3498,9 @@ export class ConversationModel extends window.Backbone
|
||||||
): Promise<T> {
|
): Promise<T> {
|
||||||
this.jobQueue = this.jobQueue || new PQueue({ concurrency: 1 });
|
this.jobQueue = this.jobQueue || new PQueue({ concurrency: 1 });
|
||||||
|
|
||||||
const taskWithTimeout = createTaskWithTimeout(
|
const logId = `conversation.queueJob(${this.idForLogging()}, ${name})`;
|
||||||
callback,
|
|
||||||
`conversation ${this.idForLogging()}`
|
const taskWithTimeout = createTaskWithTimeout(callback, logId);
|
||||||
);
|
|
||||||
|
|
||||||
const abortController = new AbortController();
|
const abortController = new AbortController();
|
||||||
const { signal: abortSignal } = abortController;
|
const { signal: abortSignal } = abortController;
|
||||||
|
@ -3512,7 +3511,7 @@ export class ConversationModel extends window.Backbone
|
||||||
const waitTime = startedAt - queuedAt;
|
const waitTime = startedAt - queuedAt;
|
||||||
|
|
||||||
if (waitTime > JOB_REPORTING_THRESHOLD_MS) {
|
if (waitTime > JOB_REPORTING_THRESHOLD_MS) {
|
||||||
log.info(`Conversation job ${name} was blocked for ${waitTime}ms`);
|
log.info(`${logId}: was blocked for ${waitTime}ms`);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -3524,7 +3523,7 @@ export class ConversationModel extends window.Backbone
|
||||||
const duration = Date.now() - startedAt;
|
const duration = Date.now() - startedAt;
|
||||||
|
|
||||||
if (duration > JOB_REPORTING_THRESHOLD_MS) {
|
if (duration > JOB_REPORTING_THRESHOLD_MS) {
|
||||||
log.info(`Conversation job ${name} took ${duration}ms`);
|
log.info(`${logId}: took ${duration}ms`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -2203,7 +2203,7 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
|
||||||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
||||||
const conversation = window.ConversationController.get(conversationId)!;
|
const conversation = window.ConversationController.get(conversationId)!;
|
||||||
const idLog = `handleDataMessage/${conversation.idForLogging()} ${message.idForLogging()}`;
|
const idLog = `handleDataMessage/${conversation.idForLogging()} ${message.idForLogging()}`;
|
||||||
await conversation.queueJob('handleDataMessage', async () => {
|
await conversation.queueJob(idLog, async () => {
|
||||||
log.info(`${idLog}: starting processing in queue`);
|
log.info(`${idLog}: starting processing in queue`);
|
||||||
|
|
||||||
// First, check for duplicates. If we find one, stop processing here.
|
// First, check for duplicates. If we find one, stop processing here.
|
||||||
|
|
|
@ -187,6 +187,8 @@ async function doContactSync({
|
||||||
|
|
||||||
await window.storage.put('synced_at', Date.now());
|
await window.storage.put('synced_at', Date.now());
|
||||||
window.Whisper.events.trigger('contactSync:complete');
|
window.Whisper.events.trigger('contactSync:complete');
|
||||||
|
|
||||||
|
log.info(`${logId}: done`);
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function onContactSync(ev: ContactSyncEvent): Promise<void> {
|
export async function onContactSync(ev: ContactSyncEvent): Promise<void> {
|
||||||
|
|
|
@ -46,16 +46,16 @@ describe('processDataMessage', () => {
|
||||||
TIMESTAMP
|
TIMESTAMP
|
||||||
);
|
);
|
||||||
|
|
||||||
it('should process attachments', async () => {
|
it('should process attachments', () => {
|
||||||
const out = await check({
|
const out = check({
|
||||||
attachments: [UNPROCESSED_ATTACHMENT],
|
attachments: [UNPROCESSED_ATTACHMENT],
|
||||||
});
|
});
|
||||||
|
|
||||||
assert.deepStrictEqual(out.attachments, [PROCESSED_ATTACHMENT]);
|
assert.deepStrictEqual(out.attachments, [PROCESSED_ATTACHMENT]);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should process attachments with 0 cdnId', async () => {
|
it('should process attachments with 0 cdnId', () => {
|
||||||
const out = await check({
|
const out = check({
|
||||||
attachments: [
|
attachments: [
|
||||||
{
|
{
|
||||||
...UNPROCESSED_ATTACHMENT,
|
...UNPROCESSED_ATTACHMENT,
|
||||||
|
@ -72,25 +72,24 @@ describe('processDataMessage', () => {
|
||||||
]);
|
]);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should throw on too many attachments', async () => {
|
it('should throw on too many attachments', () => {
|
||||||
const attachments: Array<Proto.IAttachmentPointer> = [];
|
const attachments: Array<Proto.IAttachmentPointer> = [];
|
||||||
for (let i = 0; i < ATTACHMENT_MAX + 1; i += 1) {
|
for (let i = 0; i < ATTACHMENT_MAX + 1; i += 1) {
|
||||||
attachments.push(UNPROCESSED_ATTACHMENT);
|
attachments.push(UNPROCESSED_ATTACHMENT);
|
||||||
}
|
}
|
||||||
|
|
||||||
await assert.isRejected(
|
assert.throws(
|
||||||
check({ attachments }),
|
() => check({ attachments }),
|
||||||
`Too many attachments: ${ATTACHMENT_MAX + 1} included in one message` +
|
`Too many attachments: ${ATTACHMENT_MAX + 1} included in one message` +
|
||||||
`, max is ${ATTACHMENT_MAX}`
|
`, max is ${ATTACHMENT_MAX}`
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should process group context UPDATE/QUIT message', async () => {
|
it('should process group context UPDATE/QUIT message', () => {
|
||||||
const { UPDATE, QUIT } = Proto.GroupContext.Type;
|
const { UPDATE, QUIT } = Proto.GroupContext.Type;
|
||||||
|
|
||||||
for (const type of [UPDATE, QUIT]) {
|
for (const type of [UPDATE, QUIT]) {
|
||||||
// eslint-disable-next-line no-await-in-loop
|
const out = check({
|
||||||
const out = await check({
|
|
||||||
body: 'should be deleted',
|
body: 'should be deleted',
|
||||||
attachments: [UNPROCESSED_ATTACHMENT],
|
attachments: [UNPROCESSED_ATTACHMENT],
|
||||||
group: {
|
group: {
|
||||||
|
@ -115,8 +114,8 @@ describe('processDataMessage', () => {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should process group context DELIVER message', async () => {
|
it('should process group context DELIVER message', () => {
|
||||||
const out = await check({
|
const out = check({
|
||||||
body: 'should not be deleted',
|
body: 'should not be deleted',
|
||||||
attachments: [UNPROCESSED_ATTACHMENT],
|
attachments: [UNPROCESSED_ATTACHMENT],
|
||||||
group: {
|
group: {
|
||||||
|
@ -139,8 +138,8 @@ describe('processDataMessage', () => {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should process groupv2 context', async () => {
|
it('should process groupv2 context', () => {
|
||||||
const out = await check({
|
const out = check({
|
||||||
groupV2: {
|
groupV2: {
|
||||||
masterKey: new Uint8Array(32),
|
masterKey: new Uint8Array(32),
|
||||||
revision: 1,
|
revision: 1,
|
||||||
|
@ -168,16 +167,16 @@ describe('processDataMessage', () => {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should base64 profileKey', async () => {
|
it('should base64 profileKey', () => {
|
||||||
const out = await check({
|
const out = check({
|
||||||
profileKey: new Uint8Array([42, 23, 55]),
|
profileKey: new Uint8Array([42, 23, 55]),
|
||||||
});
|
});
|
||||||
|
|
||||||
assert.strictEqual(out.profileKey, 'Khc3');
|
assert.strictEqual(out.profileKey, 'Khc3');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should process quote', async () => {
|
it('should process quote', () => {
|
||||||
const out = await check({
|
const out = check({
|
||||||
quote: {
|
quote: {
|
||||||
id: Long.fromNumber(1),
|
id: Long.fromNumber(1),
|
||||||
authorUuid: 'author',
|
authorUuid: 'author',
|
||||||
|
@ -208,8 +207,8 @@ describe('processDataMessage', () => {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should process contact', async () => {
|
it('should process contact', () => {
|
||||||
const out = await check({
|
const out = check({
|
||||||
contact: [
|
contact: [
|
||||||
{
|
{
|
||||||
avatar: {
|
avatar: {
|
||||||
|
@ -235,16 +234,14 @@ describe('processDataMessage', () => {
|
||||||
]);
|
]);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should process reaction', async () => {
|
it('should process reaction', () => {
|
||||||
assert.deepStrictEqual(
|
assert.deepStrictEqual(
|
||||||
(
|
check({
|
||||||
await check({
|
reaction: {
|
||||||
reaction: {
|
emoji: '😎',
|
||||||
emoji: '😎',
|
targetTimestamp: Long.fromNumber(TIMESTAMP),
|
||||||
targetTimestamp: Long.fromNumber(TIMESTAMP),
|
},
|
||||||
},
|
}).reaction,
|
||||||
})
|
|
||||||
).reaction,
|
|
||||||
{
|
{
|
||||||
emoji: '😎',
|
emoji: '😎',
|
||||||
remove: false,
|
remove: false,
|
||||||
|
@ -254,15 +251,13 @@ describe('processDataMessage', () => {
|
||||||
);
|
);
|
||||||
|
|
||||||
assert.deepStrictEqual(
|
assert.deepStrictEqual(
|
||||||
(
|
check({
|
||||||
await check({
|
reaction: {
|
||||||
reaction: {
|
emoji: '😎',
|
||||||
emoji: '😎',
|
remove: true,
|
||||||
remove: true,
|
targetTimestamp: Long.fromNumber(TIMESTAMP),
|
||||||
targetTimestamp: Long.fromNumber(TIMESTAMP),
|
},
|
||||||
},
|
}).reaction,
|
||||||
})
|
|
||||||
).reaction,
|
|
||||||
{
|
{
|
||||||
emoji: '😎',
|
emoji: '😎',
|
||||||
remove: true,
|
remove: true,
|
||||||
|
@ -272,8 +267,8 @@ describe('processDataMessage', () => {
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should process preview', async () => {
|
it('should process preview', () => {
|
||||||
const out = await check({
|
const out = check({
|
||||||
preview: [
|
preview: [
|
||||||
{
|
{
|
||||||
date: Long.fromNumber(TIMESTAMP),
|
date: Long.fromNumber(TIMESTAMP),
|
||||||
|
@ -293,8 +288,8 @@ describe('processDataMessage', () => {
|
||||||
]);
|
]);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should process sticker', async () => {
|
it('should process sticker', () => {
|
||||||
const out = await check({
|
const out = check({
|
||||||
sticker: {
|
sticker: {
|
||||||
packId: new Uint8Array([1, 2, 3]),
|
packId: new Uint8Array([1, 2, 3]),
|
||||||
packKey: new Uint8Array([4, 5, 6]),
|
packKey: new Uint8Array([4, 5, 6]),
|
||||||
|
@ -313,8 +308,8 @@ describe('processDataMessage', () => {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should process FLAGS=END_SESSION', async () => {
|
it('should process FLAGS=END_SESSION', () => {
|
||||||
const out = await check({
|
const out = check({
|
||||||
flags: FLAGS.END_SESSION,
|
flags: FLAGS.END_SESSION,
|
||||||
body: 'should be deleted',
|
body: 'should be deleted',
|
||||||
group: {
|
group: {
|
||||||
|
@ -329,11 +324,10 @@ describe('processDataMessage', () => {
|
||||||
assert.deepStrictEqual(out.attachments, []);
|
assert.deepStrictEqual(out.attachments, []);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should process FLAGS=EXPIRATION_TIMER_UPDATE,PROFILE_KEY_UPDATE', async () => {
|
it('should process FLAGS=EXPIRATION_TIMER_UPDATE,PROFILE_KEY_UPDATE', () => {
|
||||||
const values = [FLAGS.EXPIRATION_TIMER_UPDATE, FLAGS.PROFILE_KEY_UPDATE];
|
const values = [FLAGS.EXPIRATION_TIMER_UPDATE, FLAGS.PROFILE_KEY_UPDATE];
|
||||||
for (const flags of values) {
|
for (const flags of values) {
|
||||||
// eslint-disable-next-line no-await-in-loop
|
const out = check({
|
||||||
const out = await check({
|
|
||||||
flags,
|
flags,
|
||||||
body: 'should be deleted',
|
body: 'should be deleted',
|
||||||
attachments: [UNPROCESSED_ATTACHMENT],
|
attachments: [UNPROCESSED_ATTACHMENT],
|
||||||
|
@ -344,15 +338,15 @@ describe('processDataMessage', () => {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
it('processes trivial fields', async () => {
|
it('processes trivial fields', () => {
|
||||||
assert.strictEqual((await check({ flags: null })).flags, 0);
|
assert.strictEqual(check({ flags: null }).flags, 0);
|
||||||
assert.strictEqual((await check({ flags: 1 })).flags, 1);
|
assert.strictEqual(check({ flags: 1 }).flags, 1);
|
||||||
|
|
||||||
assert.strictEqual((await check({ expireTimer: null })).expireTimer, 0);
|
assert.strictEqual(check({ expireTimer: null }).expireTimer, 0);
|
||||||
assert.strictEqual((await check({ expireTimer: 123 })).expireTimer, 123);
|
assert.strictEqual(check({ expireTimer: 123 }).expireTimer, 123);
|
||||||
|
|
||||||
assert.isFalse((await check({ isViewOnce: null })).isViewOnce);
|
assert.isFalse(check({ isViewOnce: null }).isViewOnce);
|
||||||
assert.isFalse((await check({ isViewOnce: false })).isViewOnce);
|
assert.isFalse(check({ isViewOnce: false }).isViewOnce);
|
||||||
assert.isTrue((await check({ isViewOnce: true })).isViewOnce);
|
assert.isTrue(check({ isViewOnce: true }).isViewOnce);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
|
@ -1935,7 +1935,7 @@ export default class MessageReceiver
|
||||||
}
|
}
|
||||||
await p;
|
await p;
|
||||||
|
|
||||||
const message = await this.processDecrypted(envelope, msg);
|
const message = this.processDecrypted(envelope, msg);
|
||||||
const groupId = this.getProcessedGroupId(message);
|
const groupId = this.getProcessedGroupId(message);
|
||||||
const isBlocked = groupId ? this.isGroupBlocked(groupId) : false;
|
const isBlocked = groupId ? this.isGroupBlocked(groupId) : false;
|
||||||
const { source, sourceUuid } = envelope;
|
const { source, sourceUuid } = envelope;
|
||||||
|
@ -2234,7 +2234,7 @@ export default class MessageReceiver
|
||||||
|
|
||||||
logUnexpectedUrgentValue(envelope, type);
|
logUnexpectedUrgentValue(envelope, type);
|
||||||
|
|
||||||
const message = await this.processDecrypted(envelope, msg);
|
const message = this.processDecrypted(envelope, msg);
|
||||||
const groupId = this.getProcessedGroupId(message);
|
const groupId = this.getProcessedGroupId(message);
|
||||||
const isBlocked = groupId ? this.isGroupBlocked(groupId) : false;
|
const isBlocked = groupId ? this.isGroupBlocked(groupId) : false;
|
||||||
const { source, sourceUuid } = envelope;
|
const { source, sourceUuid } = envelope;
|
||||||
|
@ -3324,10 +3324,10 @@ export default class MessageReceiver
|
||||||
await this.storage.protocol.archiveAllSessions(theirUuid);
|
await this.storage.protocol.archiveAllSessions(theirUuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async processDecrypted(
|
private processDecrypted(
|
||||||
envelope: ProcessedEnvelope,
|
envelope: ProcessedEnvelope,
|
||||||
decrypted: Proto.IDataMessage
|
decrypted: Proto.IDataMessage
|
||||||
): Promise<ProcessedDataMessage> {
|
): ProcessedDataMessage {
|
||||||
return processDataMessage(decrypted, envelope.timestamp);
|
return processDataMessage(decrypted, envelope.timestamp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -267,10 +267,10 @@ export function processGiftBadge(
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function processDataMessage(
|
export function processDataMessage(
|
||||||
message: Proto.IDataMessage,
|
message: Proto.IDataMessage,
|
||||||
envelopeTimestamp: number
|
envelopeTimestamp: number
|
||||||
): Promise<ProcessedDataMessage> {
|
): ProcessedDataMessage {
|
||||||
/* eslint-disable no-bitwise */
|
/* eslint-disable no-bitwise */
|
||||||
|
|
||||||
// Now that its decrypted, validate the message and clean it up for consumer
|
// Now that its decrypted, validate the message and clean it up for consumer
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue