Introduce new conversationJobQueue
This commit is contained in:
parent
37d4776472
commit
30783c887c
40 changed files with 3111 additions and 1742 deletions
|
@ -1,4 +1,4 @@
|
|||
// Copyright 2020-2021 Signal Messenger, LLC
|
||||
// Copyright 2020-2022 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import { isEmpty, isEqual, mapValues, maxBy, noop, omit, union } from 'lodash';
|
||||
|
@ -125,8 +125,10 @@ import { ViewOnceOpenSyncs } from '../messageModifiers/ViewOnceOpenSyncs';
|
|||
import * as AttachmentDownloads from '../messageModifiers/AttachmentDownloads';
|
||||
import * as LinkPreview from '../types/LinkPreview';
|
||||
import { SignalService as Proto } from '../protobuf';
|
||||
import { normalMessageSendJobQueue } from '../jobs/normalMessageSendJobQueue';
|
||||
import { reactionJobQueue } from '../jobs/reactionJobQueue';
|
||||
import {
|
||||
conversationJobQueue,
|
||||
conversationQueueJobEnum,
|
||||
} from '../jobs/conversationJobQueue';
|
||||
import { notificationService } from '../services/notifications';
|
||||
import type { LinkPreviewType } from '../types/message/LinkPreviews';
|
||||
import * as log from '../logging/log';
|
||||
|
@ -144,6 +146,7 @@ import {
|
|||
} from '../messages/helpers';
|
||||
import type { ReplacementValuesType } from '../types/I18N';
|
||||
import { viewOnceOpenJobQueue } from '../jobs/viewOnceOpenJobQueue';
|
||||
import type { ConversationQueueJobData } from '../jobs/conversationJobQueue';
|
||||
|
||||
/* eslint-disable camelcase */
|
||||
/* eslint-disable more/no-then */
|
||||
|
@ -1164,8 +1167,13 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
|
|||
|
||||
this.set('sendStateByConversationId', newSendStateByConversationId);
|
||||
|
||||
await normalMessageSendJobQueue.add(
|
||||
{ messageId: this.id, conversationId: conversation.id },
|
||||
await conversationJobQueue.add(
|
||||
{
|
||||
type: conversationQueueJobEnum.enum.NormalMessage,
|
||||
conversationId: conversation.id,
|
||||
messageId: this.id,
|
||||
revision: conversation.get('revision'),
|
||||
},
|
||||
async jobToInsert => {
|
||||
await window.Signal.Data.saveMessage(this.attributes, {
|
||||
jobToInsert,
|
||||
|
@ -1441,7 +1449,7 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
|
|||
async sendSyncMessageOnly(
|
||||
dataMessage: Uint8Array,
|
||||
saveErrors?: (errors: Array<Error>) => void
|
||||
): Promise<void> {
|
||||
): Promise<CallbackResultType | void> {
|
||||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
||||
const conv = this.getConversation()!;
|
||||
this.set({ dataMessage });
|
||||
|
@ -1461,8 +1469,9 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
|
|||
? result.unidentifiedDeliveries
|
||||
: undefined,
|
||||
});
|
||||
} catch (result) {
|
||||
const resultErrors = result?.errors;
|
||||
return result;
|
||||
} catch (error) {
|
||||
const resultErrors = error?.errors;
|
||||
const errors = Array.isArray(resultErrors)
|
||||
? resultErrors
|
||||
: [new Error('Unknown error')];
|
||||
|
@ -1472,6 +1481,7 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
|
|||
// We don't save because we're about to save below.
|
||||
this.saveErrors(errors, { skipSave: true });
|
||||
}
|
||||
throw error;
|
||||
} finally {
|
||||
await window.Signal.Data.saveMessage(this.attributes, {
|
||||
ourUuid: window.textsecure.storage.user.getCheckedUuid().toString(),
|
||||
|
@ -3180,9 +3190,14 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
|
|||
);
|
||||
|
||||
if (reaction.get('source') === ReactionSource.FromThisDevice) {
|
||||
const jobData = { messageId: this.id };
|
||||
const jobData: ConversationQueueJobData = {
|
||||
type: conversationQueueJobEnum.enum.Reaction,
|
||||
conversationId: conversation.id,
|
||||
messageId: this.id,
|
||||
revision: conversation.get('revision'),
|
||||
};
|
||||
if (shouldPersist) {
|
||||
await reactionJobQueue.add(jobData, async jobToInsert => {
|
||||
await conversationJobQueue.add(jobData, async jobToInsert => {
|
||||
log.info(
|
||||
`enqueueReactionForSend: saving message ${this.idForLogging()} and job ${
|
||||
jobToInsert.id
|
||||
|
@ -3194,7 +3209,7 @@ export class MessageModel extends window.Backbone.Model<MessageAttributesType> {
|
|||
});
|
||||
});
|
||||
} else {
|
||||
await reactionJobQueue.add(jobData);
|
||||
await conversationJobQueue.add(jobData);
|
||||
}
|
||||
} else if (shouldPersist) {
|
||||
await window.Signal.Data.saveMessage(this.attributes, {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue