diff --git a/ts/background.ts b/ts/background.ts index ab65348d9663..f8df656b6ec5 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -2848,7 +2848,7 @@ export async function startApp(): Promise { 'DataMessage.Reaction.targetAuthorUuid' ); - const { reaction } = data.message; + const { reaction, timestamp } = data.message; if (!isValidReactionEmoji(reaction.emoji)) { log.warn('Received an invalid reaction emoji. Dropping it'); @@ -2862,7 +2862,7 @@ export async function startApp(): Promise { remove: reaction.remove, targetAuthorUuid, targetTimestamp: reaction.targetTimestamp, - timestamp: Date.now(), + timestamp, fromId: window.ConversationController.ensureContactIds({ e164: data.source, uuid: data.sourceUuid, @@ -3190,7 +3190,7 @@ export async function startApp(): Promise { 'DataMessage.Reaction.targetAuthorUuid' ); - const { reaction } = data.message; + const { reaction, timestamp } = data.message; if (!isValidReactionEmoji(reaction.emoji)) { log.warn('Received an invalid reaction emoji. Dropping it'); @@ -3204,7 +3204,7 @@ export async function startApp(): Promise { remove: reaction.remove, targetAuthorUuid, targetTimestamp: reaction.targetTimestamp, - timestamp: Date.now(), + timestamp, fromId: window.ConversationController.getOurConversationId(), fromSync: true, }); diff --git a/ts/jobs/JobQueue.ts b/ts/jobs/JobQueue.ts index fe1354bcc8bc..a80381ff0e3d 100644 --- a/ts/jobs/JobQueue.ts +++ b/ts/jobs/JobQueue.ts @@ -138,22 +138,29 @@ export abstract class JobQueue { * Add a job, which should cause it to be enqueued and run. * * If `streamJobs` has not been called yet, this will throw an error. + * + * You can override `insert` to change the way the job is added to the database. This is + * useful if you're trying to save a message and a job in the same database transaction. */ - async add(data: Readonly): Promise> { - this.throwIfNotStarted(); - - const job = this.createJob(data); - await this.store.insert(job); - log.info(`${this.logPrefix} added new job ${job.id}`); - return job; - } - - protected throwIfNotStarted(): void { + async add( + data: Readonly, + insert?: (job: ParsedJob) => Promise + ): Promise> { if (!this.started) { throw new Error( `${this.logPrefix} has not started streaming. Make sure to call streamJobs().` ); } + + const job = this.createJob(data); + + if (insert) { + await insert(job); + } + await this.store.insert(job, { shouldPersist: !insert }); + + log.info(`${this.logPrefix} added new job ${job.id}`); + return job; } protected createJob(data: Readonly): Job { diff --git a/ts/jobs/JobQueueDatabaseStore.ts b/ts/jobs/JobQueueDatabaseStore.ts index c9f3ca115043..0dc8aafdd40d 100644 --- a/ts/jobs/JobQueueDatabaseStore.ts +++ b/ts/jobs/JobQueueDatabaseStore.ts @@ -26,9 +26,7 @@ export class JobQueueDatabaseStore implements JobQueueStore { async insert( job: Readonly, - { - shouldInsertIntoDatabase = true, - }: Readonly<{ shouldInsertIntoDatabase?: boolean }> = {} + { shouldPersist = true }: Readonly<{ shouldPersist?: boolean }> = {} ): Promise { log.info( `JobQueueDatabaseStore adding job ${job.id} to queue ${JSON.stringify( @@ -46,7 +44,7 @@ export class JobQueueDatabaseStore implements JobQueueStore { } await initialFetchPromise; - if (shouldInsertIntoDatabase) { + if (shouldPersist) { await this.db.insertJob(formatJobForInsert(job)); } diff --git a/ts/jobs/helpers/InMemoryQueues.ts b/ts/jobs/helpers/InMemoryQueues.ts new file mode 100644 index 000000000000..e324bf9f25ec --- /dev/null +++ b/ts/jobs/helpers/InMemoryQueues.ts @@ -0,0 +1,23 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import PQueue from 'p-queue'; + +export class InMemoryQueues { + private readonly queues = new Map(); + + get(key: string): PQueue { + const existingQueue = this.queues.get(key); + if (existingQueue) { + return existingQueue; + } + + const newQueue = new PQueue({ concurrency: 1 }); + newQueue.once('idle', () => { + this.queues.delete(key); + }); + + this.queues.set(key, newQueue); + return newQueue; + } +} diff --git a/ts/jobs/initializeAllJobQueues.ts b/ts/jobs/initializeAllJobQueues.ts index 6355e05e6036..863fcb5eec7b 100644 --- a/ts/jobs/initializeAllJobQueues.ts +++ b/ts/jobs/initializeAllJobQueues.ts @@ -4,6 +4,7 @@ import type { WebAPIType } from '../textsecure/WebAPI'; import { normalMessageSendJobQueue } from './normalMessageSendJobQueue'; +import { reactionJobQueue } from './reactionJobQueue'; import { readSyncJobQueue } from './readSyncJobQueue'; import { removeStorageKeyJobQueue } from './removeStorageKeyJobQueue'; import { reportSpamJobQueue } from './reportSpamJobQueue'; @@ -21,6 +22,7 @@ export function initializeAllJobQueues({ reportSpamJobQueue.initialize({ server }); normalMessageSendJobQueue.streamJobs(); + reactionJobQueue.streamJobs(); readSyncJobQueue.streamJobs(); removeStorageKeyJobQueue.streamJobs(); reportSpamJobQueue.streamJobs(); diff --git a/ts/jobs/normalMessageSendJobQueue.ts b/ts/jobs/normalMessageSendJobQueue.ts index 2bc8486e71a7..6bd900c78a4c 100644 --- a/ts/jobs/normalMessageSendJobQueue.ts +++ b/ts/jobs/normalMessageSendJobQueue.ts @@ -3,11 +3,12 @@ /* eslint-disable class-methods-use-this */ -import PQueue from 'p-queue'; +import type PQueue from 'p-queue'; import type { LoggerType } from '../types/Logging'; import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff'; import { commonShouldJobContinue } from './helpers/commonShouldJobContinue'; import { sleepFor413RetryAfterTime } from './helpers/sleepFor413RetryAfterTime'; +import { InMemoryQueues } from './helpers/InMemoryQueues'; import type { MessageModel } from '../models/messages'; import { getMessageById } from '../messages/getMessageById'; import type { ConversationModel } from '../models/conversations'; @@ -23,19 +24,13 @@ import type { CallbackResultType } from '../textsecure/Types.d'; import { isSent } from '../messages/MessageSendState'; import { getLastChallengeError, isOutgoing } from '../state/selectors/message'; import * as Errors from '../types/errors'; -import type { - AttachmentType, - GroupV1InfoType, - GroupV2InfoType, -} from '../textsecure/SendMessage'; +import type { AttachmentType } from '../textsecure/SendMessage'; import type { LinkPreviewType } from '../types/message/LinkPreviews'; import type { BodyRangesType } from '../types/Util'; import type { WhatIsThis } from '../window.d'; -import type { ParsedJob } from './types'; import { JobQueue } from './JobQueue'; import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; -import type { Job } from './Job'; import { getHttpErrorCode } from './helpers/getHttpErrorCode'; const { @@ -55,31 +50,7 @@ type NormalMessageSendJobData = { }; export class NormalMessageSendJobQueue extends JobQueue { - private readonly queues = new Map(); - - /** - * Add a job (see `JobQueue.prototype.add`). - * - * You can override `insert` to change the way the job is added to the database. This is - * useful if you're trying to save a message and a job in the same database transaction. - */ - async add( - data: Readonly, - insert?: (job: ParsedJob) => Promise - ): Promise> { - if (!insert) { - return super.add(data); - } - - this.throwIfNotStarted(); - - const job = this.createJob(data); - await insert(job); - await jobQueueDatabaseStore.insert(job, { - shouldInsertIntoDatabase: false, - }); - return job; - } + private readonly inMemoryQueues = new InMemoryQueues(); protected parseData(data: unknown): NormalMessageSendJobData { // Because we do this so often and Zod is a bit slower, we do "manual" parsing here. @@ -99,20 +70,7 @@ export class NormalMessageSendJobQueue extends JobQueue): PQueue { - const { conversationId } = data; - - const existingQueue = this.queues.get(conversationId); - if (existingQueue) { - return existingQueue; - } - - const newQueue = new PQueue({ concurrency: 1 }); - newQueue.once('idle', () => { - this.queues.delete(conversationId); - }); - - this.queues.set(conversationId, newQueue); - return newQueue; + return this.inMemoryQueues.get(data.conversationId); } protected async run( @@ -234,10 +192,9 @@ export class NormalMessageSendJobQueue extends JobQueue): boolean { isSent(sendState.status) ); } - -function updateRecipients( - groupInfo: undefined | GroupV1InfoType, - recipients: Array -): undefined | GroupV1InfoType; -function updateRecipients( - groupInfo: undefined | GroupV2InfoType, - recipients: Array -): undefined | GroupV2InfoType; -function updateRecipients( - groupInfo: undefined | GroupV1InfoType | GroupV2InfoType, - recipients: Array -): undefined | GroupV1InfoType | GroupV2InfoType { - return ( - groupInfo && { - ...groupInfo, - members: recipients, - } - ); -} diff --git a/ts/jobs/reactionJobQueue.ts b/ts/jobs/reactionJobQueue.ts new file mode 100644 index 000000000000..10029013f6a7 --- /dev/null +++ b/ts/jobs/reactionJobQueue.ts @@ -0,0 +1,337 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import * as z from 'zod'; +import type PQueue from 'p-queue'; +import { repeat, zipObject } from '../util/iterables'; +import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff'; +import * as durations from '../util/durations'; + +import type { LoggerType } from '../types/Logging'; +import type { CallbackResultType } from '../textsecure/Types.d'; +import type { MessageModel } from '../models/messages'; +import type { MessageReactionType } from '../model-types.d'; +import type { ConversationModel } from '../models/conversations'; + +import * as reactionUtil from '../reactions/util'; +import { isSent, SendStatus } from '../messages/MessageSendState'; +import { getMessageById } from '../messages/getMessageById'; +import { isMe, isDirectConversation } from '../util/whatTypeOfConversation'; +import { getSendOptions } from '../util/getSendOptions'; +import { SignalService as Proto } from '../protobuf'; +import { handleMessageSend } from '../util/handleMessageSend'; +import { ourProfileKeyService } from '../services/ourProfileKey'; +import { canReact } from '../state/selectors/message'; +import { findAndFormatContact } from '../util/findAndFormatContact'; +import { UUID } from '../types/UUID'; + +import { JobQueue } from './JobQueue'; +import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; +import { commonShouldJobContinue } from './helpers/commonShouldJobContinue'; +import { handleCommonJobRequestError } from './helpers/handleCommonJobRequestError'; +import { InMemoryQueues } from './helpers/InMemoryQueues'; + +const MAX_RETRY_TIME = durations.DAY; +const MAX_ATTEMPTS = exponentialBackoffMaxAttempts(MAX_RETRY_TIME); + +const reactionJobData = z.object({ + messageId: z.string(), +}); + +export type ReactionJobData = z.infer; + +/* eslint-disable class-methods-use-this */ + +export class ReactionJobQueue extends JobQueue { + private readonly inMemoryQueues = new InMemoryQueues(); + + protected parseData(data: unknown): ReactionJobData { + return reactionJobData.parse(data); + } + + protected getInMemoryQueue({ + data, + }: Readonly<{ data: Pick }>): PQueue { + return this.inMemoryQueues.get(data.messageId); + } + + protected async run( + { data, timestamp }: Readonly<{ data: ReactionJobData; timestamp: number }>, + { attempt, log }: Readonly<{ attempt: number; log: LoggerType }> + ): Promise { + const { messageId } = data; + + const timeRemaining = timestamp + MAX_RETRY_TIME - Date.now(); + const isFinalAttempt = attempt >= MAX_ATTEMPTS; + + // We don't immediately use this value because we may want to mark the reaction + // failed before doing so. + const shouldContinue = await commonShouldJobContinue({ + attempt, + log, + timeRemaining, + }); + + await window.ConversationController.loadPromise(); + + const ourConversationId = window.ConversationController.getOurConversationIdOrThrow(); + + const message = await getMessageById(messageId); + if (!message) { + log.info( + `message ${messageId} was not found, maybe because it was deleted. Giving up on sending its reactions` + ); + return; + } + + const { + pendingReaction, + emojiToRemove, + } = reactionUtil.getNewestPendingOutgoingReaction( + getReactions(message), + ourConversationId + ); + if (!pendingReaction) { + log.info(`no pending reaction for ${messageId}. Doing nothing`); + return; + } + + if ( + !canReact(message.attributes, ourConversationId, findAndFormatContact) + ) { + log.info( + `could not react to ${messageId}. Removing this pending reaction` + ); + markReactionFailed(message, pendingReaction); + await window.Signal.Data.saveMessage(message.attributes); + return; + } + + if (!shouldContinue) { + log.info( + `reacting to message ${messageId} ran out of time. Giving up on sending it` + ); + markReactionFailed(message, pendingReaction); + await window.Signal.Data.saveMessage(message.attributes); + return; + } + + try { + const conversation = message.getConversation(); + if (!conversation) { + throw new Error( + `could not find conversation for message with ID ${messageId}` + ); + } + + const { + allRecipientIdentifiers, + recipientIdentifiersWithoutMe, + } = getRecipients(pendingReaction, conversation); + + const expireTimer = message.get('expireTimer'); + const profileKey = conversation.get('profileSharing') + ? await ourProfileKeyService.get() + : undefined; + + const reactionForSend = pendingReaction.emoji + ? pendingReaction + : { + ...pendingReaction, + emoji: emojiToRemove, + remove: true, + }; + + const ephemeralMessageForReactionSend = new window.Whisper.Message({ + id: UUID.generate.toString(), + type: 'outgoing', + conversationId: conversation.get('id'), + sent_at: pendingReaction.timestamp, + received_at: window.Signal.Util.incrementMessageCounter(), + received_at_ms: pendingReaction.timestamp, + reaction: reactionForSend, + timestamp: pendingReaction.timestamp, + sendStateByConversationId: zipObject( + Object.keys(pendingReaction.isSentByConversationId || {}), + repeat({ + status: SendStatus.Pending, + updatedAt: Date.now(), + }) + ), + }); + ephemeralMessageForReactionSend.doNotSave = true; + + const successfulConversationIds = new Set(); + + if (recipientIdentifiersWithoutMe.length === 0) { + log.info('sending sync reaction message only'); + const dataMessage = await window.textsecure.messaging.getDataMessage({ + attachments: [], + expireTimer, + groupV2: conversation.getGroupV2Info({ + members: recipientIdentifiersWithoutMe, + }), + preview: [], + profileKey, + reaction: reactionForSend, + recipients: allRecipientIdentifiers, + timestamp: pendingReaction.timestamp, + }); + await ephemeralMessageForReactionSend.sendSyncMessageOnly(dataMessage); + + successfulConversationIds.add(ourConversationId); + } else { + const sendOptions = await getSendOptions(conversation.attributes); + const { ContentHint } = Proto.UnidentifiedSenderMessage.Message; + + let promise: Promise; + if (isDirectConversation(conversation.attributes)) { + log.info('sending direct reaction message'); + promise = window.textsecure.messaging.sendMessageToIdentifier({ + identifier: recipientIdentifiersWithoutMe[0], + messageText: undefined, + attachments: [], + quote: undefined, + preview: [], + sticker: undefined, + reaction: reactionForSend, + deletedForEveryoneTimestamp: undefined, + timestamp: pendingReaction.timestamp, + expireTimer, + contentHint: ContentHint.RESENDABLE, + groupId: undefined, + profileKey, + options: sendOptions, + }); + } else { + log.info('sending group reaction message'); + promise = window.Signal.Util.sendToGroup({ + groupSendOptions: { + groupV1: conversation.getGroupV1Info( + recipientIdentifiersWithoutMe + ), + groupV2: conversation.getGroupV2Info({ + members: recipientIdentifiersWithoutMe, + }), + reaction: reactionForSend, + timestamp: pendingReaction.timestamp, + expireTimer, + profileKey, + }, + conversation, + contentHint: ContentHint.RESENDABLE, + messageId, + sendOptions, + sendType: 'reaction', + }); + } + + await ephemeralMessageForReactionSend.send( + handleMessageSend(promise, { + messageIds: [messageId], + sendType: 'reaction', + }) + ); + + const reactionSendStateByConversationId = + ephemeralMessageForReactionSend.get('sendStateByConversationId') || + {}; + for (const [conversationId, sendState] of Object.entries( + reactionSendStateByConversationId + )) { + if (isSent(sendState.status)) { + successfulConversationIds.add(conversationId); + } + } + } + + const newReactions = reactionUtil.markOutgoingReactionSent( + getReactions(message), + pendingReaction, + successfulConversationIds + ); + setReactions(message, newReactions); + + const didFullySend = true; + if (!didFullySend) { + throw new Error('reaction did not fully send'); + } + } catch (err: unknown) { + if (isFinalAttempt) { + markReactionFailed(message, pendingReaction); + } + await handleCommonJobRequestError({ err, log, timeRemaining }); + } finally { + await window.Signal.Data.saveMessage(message.attributes); + } + } +} + +export const reactionJobQueue = new ReactionJobQueue({ + store: jobQueueDatabaseStore, + queueType: 'reactions', + maxAttempts: MAX_ATTEMPTS, +}); + +const getReactions = (message: MessageModel): Array => + message.get('reactions') || []; + +const setReactions = ( + message: MessageModel, + reactions: Array +): void => { + if (reactions.length) { + message.set('reactions', reactions); + } else { + message.unset('reactions'); + } +}; + +function getRecipients( + reaction: Readonly, + conversation: ConversationModel +): { + allRecipientIdentifiers: Array; + recipientIdentifiersWithoutMe: Array; +} { + const allRecipientIdentifiers: Array = []; + const recipientIdentifiersWithoutMe: Array = []; + + const currentConversationRecipients = conversation.getRecipientConversationIds(); + + for (const id of reactionUtil.getUnsentConversationIds(reaction)) { + const recipient = window.ConversationController.get(id); + if (!recipient) { + continue; + } + + const recipientIdentifier = recipient.getSendTarget(); + const isRecipientMe = isMe(recipient.attributes); + + if ( + !recipientIdentifier || + recipient.isUntrusted() || + (!currentConversationRecipients.has(id) && !isRecipientMe) + ) { + continue; + } + + allRecipientIdentifiers.push(recipientIdentifier); + if (!isRecipientMe) { + recipientIdentifiersWithoutMe.push(recipientIdentifier); + } + } + + return { allRecipientIdentifiers, recipientIdentifiersWithoutMe }; +} + +function markReactionFailed( + message: MessageModel, + pendingReaction: MessageReactionType +): void { + const newReactions = reactionUtil.markOutgoingReactionFailed( + getReactions(message), + pendingReaction + ); + setReactions(message, newReactions); +} diff --git a/ts/jobs/types.ts b/ts/jobs/types.ts index 2d3dfb47ac3f..7240ad93a4f5 100644 --- a/ts/jobs/types.ts +++ b/ts/jobs/types.ts @@ -5,7 +5,10 @@ export type JobQueueStore = { /** * Add a job to the database. Doing this should enqueue it in the stream. */ - insert(job: Readonly): Promise; + insert( + job: Readonly, + options?: Readonly<{ shouldPersist?: boolean }> + ): Promise; /** * Remove a job. This should be called when a job finishes successfully or diff --git a/ts/messageModifiers/Reactions.ts b/ts/messageModifiers/Reactions.ts index 5886c09291fc..815cbdc2239d 100644 --- a/ts/messageModifiers/Reactions.ts +++ b/ts/messageModifiers/Reactions.ts @@ -54,9 +54,7 @@ export class Reactions extends Collection { return []; } - async onReaction( - reaction: ReactionModel - ): Promise { + async onReaction(reaction: ReactionModel): Promise { try { // The conversation the target message was in; we have to find it in the database // to to figure that out. @@ -85,73 +83,67 @@ export class Reactions extends Collection { } // awaiting is safe since `onReaction` is never called from inside the queue - return await targetConversation.queueJob( - 'Reactions.onReaction', - async () => { - log.info('Handling reaction for', reaction.get('targetTimestamp')); + await targetConversation.queueJob('Reactions.onReaction', async () => { + log.info('Handling reaction for', reaction.get('targetTimestamp')); - const messages = await window.Signal.Data.getMessagesBySentAt( - reaction.get('targetTimestamp'), - { - MessageCollection: window.Whisper.MessageCollection, - } - ); - // Message is fetched inside the conversation queue so we have the - // most recent data - const targetMessage = messages.find(m => { - const contact = m.getContact(); + const messages = await window.Signal.Data.getMessagesBySentAt( + reaction.get('targetTimestamp'), + { + MessageCollection: window.Whisper.MessageCollection, + } + ); + // Message is fetched inside the conversation queue so we have the + // most recent data + const targetMessage = messages.find(m => { + const contact = m.getContact(); - if (!contact) { - return false; - } - - const mcid = contact.get('id'); - const recid = window.ConversationController.ensureContactIds({ - uuid: reaction.get('targetAuthorUuid'), - }); - return mcid === recid; - }); - - if (!targetMessage) { - log.info( - 'No message for reaction', - reaction.get('targetAuthorUuid'), - reaction.get('targetTimestamp') - ); - - // Since we haven't received the message for which we are removing a - // reaction, we can just remove those pending reactions - if (reaction.get('remove')) { - this.remove(reaction); - const oldReaction = this.where({ - targetAuthorUuid: reaction.get('targetAuthorUuid'), - targetTimestamp: reaction.get('targetTimestamp'), - emoji: reaction.get('emoji'), - }); - oldReaction.forEach(r => this.remove(r)); - } - - return undefined; + if (!contact) { + return false; } - const message = window.MessageController.register( - targetMessage.id, - targetMessage + const mcid = contact.get('id'); + const recid = window.ConversationController.ensureContactIds({ + uuid: reaction.get('targetAuthorUuid'), + }); + return mcid === recid; + }); + + if (!targetMessage) { + log.info( + 'No message for reaction', + reaction.get('targetAuthorUuid'), + reaction.get('targetTimestamp') ); - const oldReaction = await message.handleReaction(reaction); + // Since we haven't received the message for which we are removing a + // reaction, we can just remove those pending reactions + if (reaction.get('remove')) { + this.remove(reaction); + const oldReaction = this.where({ + targetAuthorUuid: reaction.get('targetAuthorUuid'), + targetTimestamp: reaction.get('targetTimestamp'), + emoji: reaction.get('emoji'), + }); + oldReaction.forEach(r => this.remove(r)); + } - this.remove(reaction); - - return oldReaction; + return; } - ); + + const message = window.MessageController.register( + targetMessage.id, + targetMessage + ); + + await message.handleReaction(reaction); + + this.remove(reaction); + }); } catch (error) { log.error( 'Reactions.onReaction error:', error && error.stack ? error.stack : error ); - return undefined; } } } diff --git a/ts/model-types.d.ts b/ts/model-types.d.ts index 6f2bedc6ee4f..5a3491365f19 100644 --- a/ts/model-types.d.ts +++ b/ts/model-types.d.ts @@ -27,6 +27,7 @@ import { EmbeddedContactType } from './types/EmbeddedContact'; import { SignalService as Proto } from './protobuf'; import { AvatarDataType } from './types/Avatar'; import { UUIDStringType } from './types/UUID'; +import { ReactionSource } from './reactions/ReactionSource'; import AccessRequiredEnum = Proto.AccessControl.AccessRequired; import MemberRoleEnum = Proto.Member.Role; @@ -86,6 +87,15 @@ export type GroupV1Update = { name?: string; }; +export type MessageReactionType = { + emoji: undefined | string; + fromId: string; + targetAuthorUuid: string; + targetTimestamp: number; + timestamp: number; + isSentByConversationId?: Record; +}; + export type MessageAttributesType = { bodyPending?: boolean; bodyRanges?: BodyRangesType; @@ -113,13 +123,7 @@ export type MessageAttributesType = { messageTimer?: unknown; profileChange?: ProfileNameChangeType; quote?: QuotedMessageType; - reactions?: Array<{ - emoji: string; - fromId: string; - targetAuthorUuid: string; - targetTimestamp: number; - timestamp: number; - }>; + reactions?: Array; requiredProtocolVersion?: number; retryOptions?: RetryOptions; sourceDevice?: number; @@ -376,5 +380,5 @@ export type ReactionAttributesType = { targetTimestamp: number; fromId: string; timestamp: number; - fromSync?: boolean; + source: ReactionSource; }; diff --git a/ts/models/conversations.ts b/ts/models/conversations.ts index 51935f1e311c..16a5c6bb122e 100644 --- a/ts/models/conversations.ts +++ b/ts/models/conversations.ts @@ -94,7 +94,6 @@ import { import { normalMessageSendJobQueue } from '../jobs/normalMessageSendJobQueue'; import { Deletes } from '../messageModifiers/Deletes'; import type { ReactionModel } from '../messageModifiers/Reactions'; -import { Reactions } from '../messageModifiers/Reactions'; import { isAnnouncementGroupReady } from '../util/isAnnouncementGroupReady'; import { getProfile } from '../util/getProfile'; import { SEALED_SENDER } from '../types/SealedSender'; @@ -1123,15 +1122,17 @@ export class ConversationModel extends window.Backbone window.Signal.Data.updateConversation(this.attributes); } - getGroupV2Info({ - groupChange, - includePendingMembers, - extraConversationsForSend, - }: { - groupChange?: Uint8Array; - includePendingMembers?: boolean; - extraConversationsForSend?: Array; - } = {}): GroupV2InfoType | undefined { + getGroupV2Info( + options: Readonly< + { groupChange?: Uint8Array } & ( + | { + includePendingMembers?: boolean; + extraConversationsForSend?: Array; + } + | { members: Array } + ) + > = {} + ): GroupV2InfoType | undefined { if (isDirectConversation(this.attributes) || !isGroupV2(this.attributes)) { return undefined; } @@ -1142,15 +1143,13 @@ export class ConversationModel extends window.Backbone ), // eslint-disable-next-line @typescript-eslint/no-non-null-assertion revision: this.get('revision')!, - members: this.getRecipients({ - includePendingMembers, - extraConversationsForSend, - }), - groupChange, + members: + 'members' in options ? options.members : this.getRecipients(options), + groupChange: options.groupChange, }; } - getGroupV1Info(): GroupV1InfoType | undefined { + getGroupV1Info(members?: Array): GroupV1InfoType | undefined { const groupId = this.get('groupId'); const groupVersion = this.get('groupVersion'); @@ -1164,7 +1163,7 @@ export class ConversationModel extends window.Backbone return { id: groupId, - members: this.getRecipients(), + members: members || this.getRecipients(), }; } @@ -3478,166 +3477,6 @@ export class ConversationModel extends window.Backbone }); } - async sendReactionMessage( - reaction: { emoji: string; remove: boolean }, - target: { - messageId: string; - targetAuthorUuid: string; - targetTimestamp: number; - } - ): Promise { - const { messageId } = target; - const timestamp = Date.now(); - const outgoingReaction = { ...reaction, ...target }; - - const reactionModel = Reactions.getSingleton().add({ - ...outgoingReaction, - fromId: window.ConversationController.getOurConversationId(), - timestamp, - fromSync: true, - }); - - // Apply reaction optimistically - const oldReaction = await Reactions.getSingleton().onReaction( - reactionModel - ); - - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const destination = this.getSendTarget()!; - - return this.queueJob('sendReactionMessage', async () => { - log.info( - 'Sending reaction to conversation', - this.idForLogging(), - 'with timestamp', - timestamp - ); - - await this.maybeApplyUniversalTimer(false); - - const expireTimer = this.get('expireTimer'); - - // We are only creating this model so we can use its sync message - // sending functionality. It will not be saved to the database. - const message = new window.Whisper.Message({ - id: UUID.generate.toString(), - type: 'outgoing', - conversationId: this.get('id'), - sent_at: timestamp, - received_at: window.Signal.Util.incrementMessageCounter(), - received_at_ms: timestamp, - reaction: outgoingReaction, - timestamp, - }); - - // This is to ensure that the functions in send() and sendSyncMessage() don't save - // anything to the database. - message.doNotSave = true; - - // We're offline! - if (!window.textsecure.messaging) { - throw new Error('Cannot send reaction while offline!'); - } - - let profileKey: Uint8Array | undefined; - if (this.get('profileSharing')) { - profileKey = await ourProfileKeyService.get(); - } - // Special-case the self-send case - we send only a sync message - if (isMe(this.attributes)) { - const dataMessage = await window.textsecure.messaging.getDataMessage({ - attachments: [], - // body - // deletedForEveryoneTimestamp - expireTimer, - preview: [], - profileKey, - // quote - reaction: outgoingReaction, - recipients: [destination], - // sticker - timestamp, - }); - const result = await message.sendSyncMessageOnly(dataMessage); - Reactions.getSingleton().onReaction(reactionModel); - return result; - } - - const options = await getSendOptions(this.attributes); - const { ContentHint } = Proto.UnidentifiedSenderMessage.Message; - - const promise = (() => { - if (isDirectConversation(this.attributes)) { - return window.textsecure.messaging.sendMessageToIdentifier({ - identifier: destination, - messageText: undefined, - attachments: [], - quote: undefined, - preview: [], - sticker: undefined, - reaction: outgoingReaction, - deletedForEveryoneTimestamp: undefined, - timestamp, - expireTimer, - contentHint: ContentHint.RESENDABLE, - groupId: undefined, - profileKey, - options, - }); - } - - return window.Signal.Util.sendToGroup({ - groupSendOptions: { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - groupV1: this.getGroupV1Info()!, - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - groupV2: this.getGroupV2Info()!, - reaction: outgoingReaction, - timestamp, - expireTimer, - profileKey, - }, - conversation: this, - contentHint: ContentHint.RESENDABLE, - messageId, - sendOptions: options, - sendType: 'reaction', - }); - })(); - - const result = await message.send( - handleMessageSend(promise, { - messageIds: [messageId], - sendType: 'reaction', - }) - ); - - if (!message.hasSuccessfulDelivery()) { - // This is handled by `conversation_view` which displays a toast on - // send error. - throw new Error('No successful delivery for reaction'); - } - - return result; - }).catch(() => { - let reverseReaction: ReactionModel; - if (oldReaction) { - // Either restore old reaction - reverseReaction = Reactions.getSingleton().add({ - ...oldReaction, - fromId: window.ConversationController.getOurConversationId(), - timestamp, - }); - } else { - // Or remove a new one on failure - reverseReaction = reactionModel.clone(); - reverseReaction.set('remove', !reverseReaction.get('remove')); - } - - Reactions.getSingleton().onReaction(reverseReaction); - }); - } - async sendProfileKeyUpdate(): Promise { const id = this.get('id'); const recipients = this.getRecipients(); diff --git a/ts/models/messages.ts b/ts/models/messages.ts index 4daa262c0d3e..1b73b905028f 100644 --- a/ts/models/messages.ts +++ b/ts/models/messages.ts @@ -1,17 +1,24 @@ // Copyright 2020-2021 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only -import { isEmpty, isEqual, mapValues, noop, omit, union } from 'lodash'; +import { isEmpty, isEqual, mapValues, maxBy, noop, omit, union } from 'lodash'; import type { CustomError, GroupV1Update, MessageAttributesType, - ReactionAttributesType, + MessageReactionType, ShallowChallengeError, QuotedMessageType, WhatIsThis, } from '../model-types.d'; -import { filter, find, map, reduce } from '../util/iterables'; +import { + filter, + find, + map, + reduce, + repeat, + zipObject, +} from '../util/iterables'; import { isNotNil } from '../util/isNotNil'; import { isNormalNumber } from '../util/isNormalNumber'; import { strictAssert } from '../util/assert'; @@ -35,6 +42,7 @@ import * as expirationTimer from '../util/expirationTimer'; import type { ReactionType } from '../types/Reactions'; import { UUID } from '../types/UUID'; import type { UUIDStringType } from '../types/UUID'; +import * as reactionUtil from '../reactions/util'; import { copyStickerToAttachments, deletePackReference, @@ -112,6 +120,7 @@ import { import { Deletes } from '../messageModifiers/Deletes'; import type { ReactionModel } from '../messageModifiers/Reactions'; import { Reactions } from '../messageModifiers/Reactions'; +import { ReactionSource } from '../reactions/ReactionSource'; import { ReadSyncs } from '../messageModifiers/ReadSyncs'; import { ViewSyncs } from '../messageModifiers/ViewSyncs'; import { ViewOnceOpenSyncs } from '../messageModifiers/ViewOnceOpenSyncs'; @@ -119,6 +128,7 @@ import * as AttachmentDownloads from '../messageModifiers/AttachmentDownloads'; import * as LinkPreview from '../types/LinkPreview'; import { SignalService as Proto } from '../protobuf'; import { normalMessageSendJobQueue } from '../jobs/normalMessageSendJobQueue'; +import { reactionJobQueue } from '../jobs/reactionJobQueue'; import { notificationService } from '../services/notifications'; import type { LinkPreviewType } from '../types/message/LinkPreviews'; import * as log from '../logging/log'; @@ -3121,11 +3131,11 @@ export class MessageModel extends window.Backbone.Model { async handleReaction( reaction: ReactionModel, shouldPersist = true - ): Promise { + ): Promise { const { attributes } = this; if (this.get('deletedForEveryone')) { - return undefined; + return; } // We allow you to react to messages with outgoing errors only if it has sent @@ -3138,75 +3148,138 @@ export class MessageModel extends window.Backbone.Model { window.ConversationController.getOurConversationIdOrThrow() ) !== 'partial-sent') ) { - return undefined; + return; } - const reactions = this.get('reactions') || []; - const messageId = this.idForLogging(); - const count = reactions.length; - - const conversation = window.ConversationController.get( - this.get('conversationId') - ); - - const oldReaction = reactions.find( - re => re.fromId === reaction.get('fromId') - ); - if (oldReaction) { - this.clearNotifications(oldReaction); + const conversation = this.getConversation(); + if (!conversation) { + return; } - if (reaction.get('remove')) { - log.info('Removing reaction for message', messageId); - const newReactions = reactions.filter( - re => re.fromId !== reaction.get('fromId') + const oldReactions = this.get('reactions') || []; + + let newReactions: typeof oldReactions; + if (reaction.get('source') === ReactionSource.FromThisDevice) { + log.info( + `handleReaction: sending reaction to ${this.idForLogging()} from this device` ); - this.set({ reactions: newReactions }); - await window.Signal.Data.removeReactionFromConversation({ - emoji: reaction.get('emoji'), + const newReaction = { + emoji: reaction.get('remove') ? undefined : reaction.get('emoji'), fromId: reaction.get('fromId'), targetAuthorUuid: reaction.get('targetAuthorUuid'), targetTimestamp: reaction.get('targetTimestamp'), - }); + timestamp: reaction.get('timestamp'), + isSentByConversationId: zipObject( + conversation.getRecipientConversationIds(), + repeat(false) + ), + }; + + newReactions = reactionUtil.addOutgoingReaction( + oldReactions, + newReaction + ); } else { - log.info('Adding reaction for message', messageId); - const newReactions = reactions.filter( - re => re.fromId !== reaction.get('fromId') + const oldReaction = oldReactions.find( + re => re.fromId === reaction.get('fromId') ); - newReactions.push(reaction.toJSON()); - this.set({ reactions: newReactions }); + if (oldReaction) { + this.clearNotifications(oldReaction); + } - await window.Signal.Data.addReaction({ - conversationId: this.get('conversationId'), - emoji: reaction.get('emoji'), - fromId: reaction.get('fromId'), - messageId: this.id, - messageReceivedAt: this.get('received_at'), - targetAuthorUuid: reaction.get('targetAuthorUuid'), - targetTimestamp: reaction.get('targetTimestamp'), - }); + if (reaction.get('remove')) { + log.info( + 'handleReaction: removing reaction for message', + this.idForLogging() + ); - // Only notify for reactions to our own messages - if ( - conversation && - isOutgoing(this.attributes) && - !reaction.get('fromSync') - ) { - conversation.notify(this, reaction); + if (reaction.get('source') === ReactionSource.FromSync) { + newReactions = oldReactions.filter( + re => + re.fromId !== reaction.get('fromId') || + re.timestamp > reaction.get('timestamp') + ); + } else { + newReactions = oldReactions.filter( + re => re.fromId !== reaction.get('fromId') + ); + } + + await window.Signal.Data.removeReactionFromConversation({ + emoji: reaction.get('emoji'), + fromId: reaction.get('fromId'), + targetAuthorUuid: reaction.get('targetAuthorUuid'), + targetTimestamp: reaction.get('targetTimestamp'), + }); + } else { + log.info( + 'handleReaction: adding reaction for message', + this.idForLogging() + ); + + let reactionToAdd: MessageReactionType; + if (reaction.get('source') === ReactionSource.FromSync) { + const ourReactions = [ + reaction.toJSON(), + ...oldReactions.filter(re => re.fromId === reaction.get('fromId')), + ]; + reactionToAdd = maxBy(ourReactions, 'timestamp'); + } else { + reactionToAdd = reaction.toJSON(); + } + + newReactions = oldReactions.filter( + re => re.fromId !== reaction.get('fromId') + ); + newReactions.push(reactionToAdd); + + if ( + isOutgoing(this.attributes) && + reaction.get('source') === ReactionSource.FromSomeoneElse + ) { + conversation.notify(this, reaction); + } + + await window.Signal.Data.addReaction({ + conversationId: this.get('conversationId'), + emoji: reaction.get('emoji'), + fromId: reaction.get('fromId'), + messageId: this.id, + messageReceivedAt: this.get('received_at'), + targetAuthorUuid: reaction.get('targetAuthorUuid'), + targetTimestamp: reaction.get('targetTimestamp'), + }); } } - const newCount = (this.get('reactions') || []).length; + this.set({ reactions: newReactions }); + log.info( - `Done processing reaction for message ${messageId}. Went from ${count} to ${newCount} reactions.` + 'handleReaction:', + `Done processing reaction for message ${this.idForLogging()}.`, + `Went from ${oldReactions.length} to ${newReactions.length} reactions.` ); - if (shouldPersist) { + if (reaction.get('source') === ReactionSource.FromThisDevice) { + const jobData = { messageId: this.id }; + if (shouldPersist) { + await reactionJobQueue.add(jobData, async jobToInsert => { + log.info( + `enqueueReactionForSend: saving message ${this.idForLogging()} and job ${ + jobToInsert.id + }` + ); + await window.Signal.Data.saveMessage(this.attributes, { + jobToInsert, + }); + }); + } else { + await reactionJobQueue.add(jobData); + } + } else if (shouldPersist) { await window.Signal.Data.saveMessage(this.attributes); } - - return oldReaction; } async handleDeleteForEveryone( diff --git a/ts/reactions/ReactionSource.ts b/ts/reactions/ReactionSource.ts new file mode 100644 index 000000000000..c92266b9af86 --- /dev/null +++ b/ts/reactions/ReactionSource.ts @@ -0,0 +1,8 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +export enum ReactionSource { + FromSomeoneElse, + FromSync, + FromThisDevice, +} diff --git a/ts/reactions/enqueueReactionForSend.ts b/ts/reactions/enqueueReactionForSend.ts new file mode 100644 index 000000000000..ce993e938f38 --- /dev/null +++ b/ts/reactions/enqueueReactionForSend.ts @@ -0,0 +1,46 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { ReactionModel } from '../messageModifiers/Reactions'; +import { ReactionSource } from './ReactionSource'; +import { getMessageById } from '../messages/getMessageById'; +import { strictAssert } from '../util/assert'; + +export async function enqueueReactionForSend({ + emoji, + messageId, + remove, +}: Readonly<{ + emoji: string; + messageId: string; + remove: boolean; +}>): Promise { + const message = await getMessageById(messageId); + strictAssert(message, 'enqueueReactionForSend: no message found'); + + const targetAuthorUuid = message.getSourceUuid(); + strictAssert( + targetAuthorUuid, + `enqueueReactionForSend: message ${message.idForLogging()} had no source UUID` + ); + + const targetTimestamp = message.get('sent_at') || message.get('timestamp'); + strictAssert( + targetTimestamp, + `enqueueReactionForSend: message ${message.idForLogging()} had no timestamp` + ); + + const reaction = new ReactionModel({ + emoji, + remove, + targetAuthorUuid, + targetTimestamp, + fromId: window.ConversationController.getOurConversationIdOrThrow(), + timestamp: Date.now(), + source: ReactionSource.FromThisDevice, + }); + + await message.getConversation()?.maybeApplyUniversalTimer(false); + + await message.handleReaction(reaction); +} diff --git a/ts/reactions/util.ts b/ts/reactions/util.ts new file mode 100644 index 000000000000..d741238c2a4c --- /dev/null +++ b/ts/reactions/util.ts @@ -0,0 +1,158 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { findLastIndex, has, identity, omit, negate } from 'lodash'; +import type { MessageReactionType } from '../model-types.d'; +import { areObjectEntriesEqual } from '../util/areObjectEntriesEqual'; + +const isReactionEqual = ( + a: undefined | Readonly, + b: undefined | Readonly +): boolean => + a === b || + Boolean( + a && b && areObjectEntriesEqual(a, b, ['emoji', 'fromId', 'timestamp']) + ); + +const isOutgoingReactionFullySent = ({ + isSentByConversationId = {}, +}: Readonly>): boolean => + !isSentByConversationId || + Object.values(isSentByConversationId).every(identity); + +const isOutgoingReactionPending = negate(isOutgoingReactionFullySent); + +const isOutgoingReactionCompletelyUnsent = ({ + isSentByConversationId = {}, +}: Readonly>): boolean => { + const sendStates = Object.values(isSentByConversationId); + return sendStates.length > 0 && sendStates.every(state => state === false); +}; + +export function addOutgoingReaction( + oldReactions: ReadonlyArray, + newReaction: Readonly +): Array { + const pendingOutgoingReactions = new Set( + oldReactions.filter(isOutgoingReactionPending) + ); + return [ + ...oldReactions.filter(re => !pendingOutgoingReactions.has(re)), + newReaction, + ]; +} + +export function getNewestPendingOutgoingReaction( + reactions: ReadonlyArray, + ourConversationId: string +): + | { pendingReaction?: undefined; emojiToRemove?: undefined } + | { + pendingReaction: MessageReactionType; + emojiToRemove?: string; + } { + const ourReactions = reactions + .filter(({ fromId }) => fromId === ourConversationId) + .sort((a, b) => a.timestamp - b.timestamp); + + const newestFinishedReactionIndex = findLastIndex( + ourReactions, + re => re.emoji && isOutgoingReactionFullySent(re) + ); + const newestFinishedReaction = ourReactions[newestFinishedReactionIndex]; + + const newestPendingReactionIndex = findLastIndex( + ourReactions, + isOutgoingReactionPending + ); + const pendingReaction: undefined | MessageReactionType = + newestPendingReactionIndex > newestFinishedReactionIndex + ? ourReactions[newestPendingReactionIndex] + : undefined; + + return pendingReaction + ? { + pendingReaction, + // This might not be right in some cases. For example, imagine the following + // sequence: + // + // 1. I send reaction A to Alice and Bob, but it was only delivered to Alice. + // 2. I send reaction B to Alice and Bob, but it was only delivered to Bob. + // 3. I remove the reaction. + // + // Android and iOS don't care what your previous reaction is. Old Desktop versions + // *do* care, so we make our best guess. We should be able to remove this after + // Desktop has ignored this field for awhile. See commit + // `1dc353f08910389ad8cc5487949e6998e90038e2`. + emojiToRemove: newestFinishedReaction?.emoji, + } + : {}; +} + +export function* getUnsentConversationIds({ + isSentByConversationId = {}, +}: Readonly< + Pick +>): Iterable { + for (const [id, isSent] of Object.entries(isSentByConversationId)) { + if (!isSent) { + yield id; + } + } +} + +export const markOutgoingReactionFailed = ( + reactions: Array, + reaction: Readonly +): Array => + isOutgoingReactionCompletelyUnsent(reaction) || !reaction.emoji + ? reactions.filter(re => !isReactionEqual(re, reaction)) + : reactions.map(re => + isReactionEqual(re, reaction) + ? omit(re, ['isSentByConversationId']) + : re + ); + +export const markOutgoingReactionSent = ( + reactions: ReadonlyArray, + reaction: Readonly, + conversationIdsSentTo: Iterable +): Array => { + const result: Array = []; + + const newIsSentByConversationId = { + ...(reaction.isSentByConversationId || {}), + }; + for (const id of conversationIdsSentTo) { + if (has(newIsSentByConversationId, id)) { + newIsSentByConversationId[id] = true; + } + } + + const isFullySent = Object.values(newIsSentByConversationId).every(identity); + + for (const re of reactions) { + if (!isReactionEqual(re, reaction)) { + const shouldKeep = !isFullySent + ? true + : re.fromId !== reaction.fromId || re.timestamp > reaction.timestamp; + if (shouldKeep) { + result.push(re); + } + continue; + } + + if (isFullySent) { + if (re.emoji) { + result.push(omit(re, ['isSentByConversationId'])); + } + } else { + result.push({ + ...re, + isSentByConversationId: newIsSentByConversationId, + }); + } + } + + return result; +}; diff --git a/ts/state/selectors/message.ts b/ts/state/selectors/message.ts index 9ef5583b06d1..2ba79046a131 100644 --- a/ts/state/selectors/message.ts +++ b/ts/state/selectors/message.ts @@ -17,6 +17,7 @@ import filesize from 'filesize'; import type { LastMessageStatus, MessageAttributesType, + MessageReactionType, ShallowChallengeError, } from '../../model-types.d'; @@ -54,6 +55,8 @@ import { memoizeByRoot } from '../../util/memoizeByRoot'; import { missingCaseError } from '../../util/missingCaseError'; import { isNotNil } from '../../util/isNotNil'; import { isMoreRecentThan } from '../../util/timestamp'; +import * as iterables from '../../util/iterables'; +import { strictAssert } from '../../util/assert'; import type { ConversationType } from '../ducks/conversations'; @@ -379,7 +382,23 @@ export const getReactionsForMessage = createSelectorCreator( { reactions = [] }: MessageAttributesType, { conversationSelector }: { conversationSelector: GetConversationByIdType } ) => { - return reactions.map(re => { + const reactionBySender = new Map(); + for (const reaction of reactions) { + const existingReaction = reactionBySender.get(reaction.fromId); + if ( + !existingReaction || + reaction.timestamp > existingReaction.timestamp + ) { + reactionBySender.set(reaction.fromId, reaction); + } + } + + const reactionsWithEmpties = reactionBySender.values(); + const reactionsWithEmoji = iterables.filter( + reactionsWithEmpties, + re => re.emoji + ); + const formattedReactions = iterables.map(reactionsWithEmoji, re => { const c = conversationSelector(re.fromId); type From = NonNullable[0]['from']; @@ -399,12 +418,16 @@ export const getReactionsForMessage = createSelectorCreator( const from: AssertProps = unsafe; + strictAssert(re.emoji, 'Expected all reactions to have an emoji'); + return { emoji: re.emoji, timestamp: re.timestamp, from, }; }); + + return [...formattedReactions]; }, (_: MessageAttributesType, reactions: PropsData['reactions']) => reactions @@ -1373,6 +1396,51 @@ function processQuoteAttachment( }; } +function canReplyOrReact( + message: Pick< + MessageAttributesType, + 'deletedForEveryone' | 'sendStateByConversationId' | 'type' + >, + ourConversationId: string, + conversation: undefined | Readonly +): boolean { + const { deletedForEveryone, sendStateByConversationId } = message; + + if (!conversation) { + return false; + } + + if (conversation.isGroupV1AndDisabled) { + return false; + } + + if (isMissingRequiredProfileSharing(conversation)) { + return false; + } + + if (!conversation.acceptedMessageRequest) { + return false; + } + + if (deletedForEveryone) { + return false; + } + + if (isOutgoing(message)) { + return ( + isMessageJustForMe(sendStateByConversationId, ourConversationId) || + someSendStatus(omit(sendStateByConversationId, ourConversationId), isSent) + ); + } + + if (isIncoming(message)) { + return true; + } + + // Fail safe. + return false; +} + export function canReply( message: Pick< MessageAttributesType, @@ -1385,53 +1453,28 @@ export function canReply( conversationSelector: GetConversationByIdType ): boolean { const conversation = getConversation(message, conversationSelector); - const { deletedForEveryone, sendStateByConversationId } = message; - - if (!conversation) { + if ( + !conversation || + (conversation.announcementsOnly && !conversation.areWeAdmin) + ) { return false; } + return canReplyOrReact(message, ourConversationId, conversation); +} - // If GroupV1 groups have been disabled, we can't reply. - if (conversation.isGroupV1AndDisabled) { - return false; - } - - // If mandatory profile sharing is enabled, and we haven't shared yet, then - // we can't reply. - if (isMissingRequiredProfileSharing(conversation)) { - return false; - } - - // We cannot reply if we haven't accepted the message request - if (!conversation.acceptedMessageRequest) { - return false; - } - - // We cannot reply if this message is deleted for everyone - if (deletedForEveryone) { - return false; - } - - // Groups where only admins can send messages - if (conversation.announcementsOnly && !conversation.areWeAdmin) { - return false; - } - - // We can reply if this is outgoing and sent to at least one recipient - if (isOutgoing(message)) { - return ( - isMessageJustForMe(sendStateByConversationId, ourConversationId) || - someSendStatus(omit(sendStateByConversationId, ourConversationId), isSent) - ); - } - - // We can reply to incoming messages - if (isIncoming(message)) { - return true; - } - - // Fail safe. - return false; +export function canReact( + message: Pick< + MessageAttributesType, + | 'conversationId' + | 'deletedForEveryone' + | 'sendStateByConversationId' + | 'type' + >, + ourConversationId: string, + conversationSelector: GetConversationByIdType +): boolean { + const conversation = getConversation(message, conversationSelector); + return canReplyOrReact(message, ourConversationId, conversation); } export function canDeleteForEveryone( diff --git a/ts/test-both/reactions/util_test.ts b/ts/test-both/reactions/util_test.ts new file mode 100644 index 000000000000..093ce49993d9 --- /dev/null +++ b/ts/test-both/reactions/util_test.ts @@ -0,0 +1,270 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; +import { v4 as uuid } from 'uuid'; +import { omit } from 'lodash'; +import type { MessageReactionType } from '../../model-types.d'; +import { isEmpty } from '../../util/iterables'; + +import { + addOutgoingReaction, + getNewestPendingOutgoingReaction, + getUnsentConversationIds, + markOutgoingReactionFailed, + markOutgoingReactionSent, +} from '../../reactions/util'; + +describe('reaction utilities', () => { + const OUR_CONVO_ID = uuid(); + + const rxn = ( + emoji: undefined | string, + { isPending = false }: Readonly<{ isPending?: boolean }> = {} + ): MessageReactionType => ({ + emoji, + fromId: OUR_CONVO_ID, + targetAuthorUuid: uuid(), + targetTimestamp: Date.now(), + timestamp: Date.now(), + ...(isPending ? { isSentByConversationId: { [uuid()]: false } } : {}), + }); + + describe('addOutgoingReaction', () => { + it('adds the reaction to the end of an empty list', () => { + const reaction = rxn('💅'); + const result = addOutgoingReaction([], reaction); + assert.deepStrictEqual(result, [reaction]); + }); + + it('removes all pending reactions', () => { + const oldReactions = [ + { ...rxn('😭', { isPending: true }), timestamp: 3 }, + { ...rxn('💬'), fromId: uuid() }, + { ...rxn('🥀', { isPending: true }), timestamp: 1 }, + { ...rxn('🌹', { isPending: true }), timestamp: 2 }, + ]; + const reaction = rxn('😀'); + const newReactions = addOutgoingReaction(oldReactions, reaction); + assert.deepStrictEqual(newReactions, [oldReactions[1], reaction]); + }); + }); + + describe('getNewestPendingOutgoingReaction', () => { + it('returns undefined if there are no pending outgoing reactions', () => { + [[], [rxn('🔔')], [rxn('😭'), { ...rxn('💬'), fromId: uuid() }]].forEach( + oldReactions => { + assert.deepStrictEqual( + getNewestPendingOutgoingReaction(oldReactions, OUR_CONVO_ID), + {} + ); + } + ); + }); + + it("returns undefined if there's a pending reaction before a fully sent one", () => { + const oldReactions = [ + { ...rxn('⭐️'), timestamp: 2 }, + { ...rxn('🔥', { isPending: true }), timestamp: 1 }, + ]; + const { + pendingReaction, + emojiToRemove, + } = getNewestPendingOutgoingReaction(oldReactions, OUR_CONVO_ID); + + assert.isUndefined(pendingReaction); + assert.isUndefined(emojiToRemove); + }); + + it('returns the newest pending reaction', () => { + [ + [rxn('⭐️', { isPending: true })], + [ + { ...rxn('🥀', { isPending: true }), timestamp: 1 }, + { ...rxn('⭐️', { isPending: true }), timestamp: 2 }, + ], + ].forEach(oldReactions => { + const { + pendingReaction, + emojiToRemove, + } = getNewestPendingOutgoingReaction(oldReactions, OUR_CONVO_ID); + + assert.strictEqual(pendingReaction?.emoji, '⭐️'); + assert.isUndefined(emojiToRemove); + }); + }); + + it('makes its best guess of an emoji to remove, if applicable', () => { + const oldReactions = [ + { ...rxn('⭐️'), timestamp: 1 }, + { ...rxn(undefined, { isPending: true }), timestamp: 3 }, + { ...rxn('🔥', { isPending: true }), timestamp: 2 }, + ]; + const { + pendingReaction, + emojiToRemove, + } = getNewestPendingOutgoingReaction(oldReactions, OUR_CONVO_ID); + + assert.isDefined(pendingReaction); + assert.isUndefined(pendingReaction?.emoji); + assert.strictEqual(emojiToRemove, '⭐️'); + }); + }); + + describe('getUnsentConversationIds', () => { + it("returns an empty iterable if there's nothing to send", () => { + assert(isEmpty(getUnsentConversationIds({}))); + assert( + isEmpty( + getUnsentConversationIds({ + isSentByConversationId: { [uuid()]: true }, + }) + ) + ); + }); + + it('returns an iterable of all unsent conversation IDs', () => { + const unsent1 = uuid(); + const unsent2 = uuid(); + const fakeReaction = { + isSentByConversationId: { + [unsent1]: false, + [unsent2]: false, + [uuid()]: true, + [uuid()]: true, + }, + }; + + assert.sameMembers( + [...getUnsentConversationIds(fakeReaction)], + [unsent1, unsent2] + ); + }); + }); + + describe('markReactionFailed', () => { + const fullySent = rxn('⭐️'); + const partiallySent = { + ...rxn('🔥'), + isSentByConversationId: { [uuid()]: true, [uuid()]: false }, + }; + const unsent = rxn('🤫', { isPending: true }); + + const reactions = [fullySent, partiallySent, unsent]; + + it('removes the pending state if the reaction, with emoji, was partially sent', () => { + assert.deepStrictEqual( + markOutgoingReactionFailed(reactions, partiallySent), + [fullySent, omit(partiallySent, 'isSentByConversationId'), unsent] + ); + }); + + it('removes the removal reaction', () => { + const none = rxn(undefined, { isPending: true }); + assert.isEmpty(markOutgoingReactionFailed([none], none)); + }); + + it('does nothing if the reaction is not in the list', () => { + assert.deepStrictEqual( + markOutgoingReactionFailed(reactions, rxn('🥀', { isPending: true })), + reactions + ); + }); + + it('removes the completely-unsent emoji reaction', () => { + assert.deepStrictEqual(markOutgoingReactionFailed(reactions, unsent), [ + fullySent, + partiallySent, + ]); + }); + }); + + describe('markOutgoingReactionSent', () => { + const uuid1 = uuid(); + const uuid2 = uuid(); + const uuid3 = uuid(); + + const star = { + ...rxn('⭐️'), + timestamp: 2, + isSentByConversationId: { + [uuid1]: false, + [uuid2]: false, + [uuid3]: false, + }, + }; + const none = { + ...rxn(undefined), + timestamp: 3, + isSentByConversationId: { + [uuid1]: false, + [uuid2]: false, + [uuid3]: false, + }, + }; + + const reactions = [star, none, { ...rxn('🔕'), timestamp: 1 }]; + + it("does nothing if the reaction isn't in the list", () => { + const result = markOutgoingReactionSent( + reactions, + rxn('🥀', { isPending: true }), + [uuid()] + ); + assert.deepStrictEqual(result, reactions); + }); + + it('updates reactions to be partially sent', () => { + [star, none].forEach(reaction => { + const result = markOutgoingReactionSent(reactions, reaction, [ + uuid1, + uuid2, + ]); + assert.deepStrictEqual( + result.find(re => re.emoji === reaction.emoji) + ?.isSentByConversationId, + { + [uuid1]: true, + [uuid2]: true, + [uuid3]: false, + } + ); + }); + }); + + it('removes sent state if a reaction with emoji is fully sent', () => { + const result = markOutgoingReactionSent(reactions, star, [ + uuid1, + uuid2, + uuid3, + ]); + + const newReaction = result.find(re => re.emoji === '⭐️'); + assert.isDefined(newReaction); + assert.isUndefined(newReaction?.isSentByConversationId); + }); + + it('removes a fully-sent reaction removal', () => { + const result = markOutgoingReactionSent(reactions, none, [ + uuid1, + uuid2, + uuid3, + ]); + + assert( + result.every(({ emoji }) => typeof emoji === 'string'), + 'Expected the emoji removal to be gone' + ); + }); + + it('removes older reactions of mine', () => { + const result = markOutgoingReactionSent(reactions, star, [ + uuid1, + uuid2, + uuid3, + ]); + + assert.isUndefined(result.find(re => re.emoji === '🔕')); + }); + }); +}); diff --git a/ts/test-both/util/areObjectEntriesEqual_test.ts b/ts/test-both/util/areObjectEntriesEqual_test.ts new file mode 100644 index 000000000000..e6f6f760c42b --- /dev/null +++ b/ts/test-both/util/areObjectEntriesEqual_test.ts @@ -0,0 +1,46 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; + +import { areObjectEntriesEqual } from '../../util/areObjectEntriesEqual'; + +describe('areObjectEntriesEqual', () => { + type TestObject = { foo?: number; bar?: number }; + + const empty: TestObject = {}; + const foo: TestObject = { foo: 1 }; + const bar: TestObject = { bar: 2 }; + const undefinedEntries: TestObject = { foo: undefined, bar: undefined }; + + it('returns true for an empty list of keys', () => { + assert.isTrue(areObjectEntriesEqual({}, {}, [])); + assert.isTrue(areObjectEntriesEqual(foo, foo, [])); + assert.isTrue(areObjectEntriesEqual(foo, bar, [])); + }); + + it('returns true for empty objects', () => { + assert.isTrue(areObjectEntriesEqual(empty, empty, ['foo'])); + }); + + it('considers missing keys equal to undefined keys', () => { + assert.isTrue( + areObjectEntriesEqual(empty, undefinedEntries, ['foo', 'bar']) + ); + }); + + it('ignores unspecified properties', () => { + assert.isTrue(areObjectEntriesEqual({ x: 1, y: 2 }, { x: 1, y: 3 }, ['x'])); + }); + + it('returns false for different objects', () => { + assert.isFalse(areObjectEntriesEqual({ x: 1 }, { x: 2 }, ['x'])); + assert.isFalse( + areObjectEntriesEqual({ x: 1, y: 2 }, { x: 1, y: 3 }, ['x', 'y']) + ); + }); + + it('only performs a shallow check', () => { + assert.isFalse(areObjectEntriesEqual({ x: [1, 2] }, { x: [1, 2] }, ['x'])); + }); +}); diff --git a/ts/test-electron/state/selectors/messages_test.ts b/ts/test-electron/state/selectors/messages_test.ts index 4a4603bcd7c6..5c289762ac5e 100644 --- a/ts/test-electron/state/selectors/messages_test.ts +++ b/ts/test-electron/state/selectors/messages_test.ts @@ -13,6 +13,7 @@ import type { ConversationType } from '../../../state/ducks/conversations'; import { canDeleteForEveryone, + canReact, canReply, getMessagePropStatus, isEndSession, @@ -120,6 +121,117 @@ describe('state/selectors/messages', () => { }); }); + describe('canReact', () => { + const defaultConversation: ConversationType = { + id: uuid(), + type: 'direct', + title: 'Test conversation', + isMe: false, + sharedGroupNames: [], + acceptedMessageRequest: true, + }; + + it('returns false for disabled v1 groups', () => { + const message = { + conversationId: 'fake-conversation-id', + type: 'incoming' as const, + }; + const getConversationById = () => ({ + ...defaultConversation, + type: 'group' as const, + isGroupV1AndDisabled: true, + }); + + assert.isFalse(canReact(message, ourConversationId, getConversationById)); + }); + + // NOTE: This is missing a test for mandatory profile sharing. + + it('returns false if the message was deleted for everyone', () => { + const message = { + conversationId: 'fake-conversation-id', + type: 'incoming' as const, + deletedForEveryone: true, + }; + const getConversationById = () => defaultConversation; + + assert.isFalse(canReact(message, ourConversationId, getConversationById)); + }); + + it('returns false for outgoing messages that have not been sent', () => { + const message = { + conversationId: 'fake-conversation-id', + type: 'outgoing' as const, + sendStateByConversationId: { + [ourConversationId]: { + status: SendStatus.Sent, + updatedAt: Date.now(), + }, + [uuid()]: { + status: SendStatus.Pending, + updatedAt: Date.now(), + }, + }, + }; + const getConversationById = () => defaultConversation; + + assert.isFalse(canReact(message, ourConversationId, getConversationById)); + }); + + it('returns true for outgoing messages that are only sent to yourself', () => { + const message = { + conversationId: 'fake-conversation-id', + type: 'outgoing' as const, + sendStateByConversationId: { + [ourConversationId]: { + status: SendStatus.Pending, + updatedAt: Date.now(), + }, + }, + }; + const getConversationById = () => defaultConversation; + + assert.isTrue(canReact(message, ourConversationId, getConversationById)); + }); + + it('returns true for outgoing messages that have been sent to at least one person', () => { + const message = { + conversationId: 'fake-conversation-id', + type: 'outgoing' as const, + sendStateByConversationId: { + [ourConversationId]: { + status: SendStatus.Sent, + updatedAt: Date.now(), + }, + [uuid()]: { + status: SendStatus.Pending, + updatedAt: Date.now(), + }, + [uuid()]: { + status: SendStatus.Sent, + updatedAt: Date.now(), + }, + }, + }; + const getConversationById = () => ({ + ...defaultConversation, + type: 'group' as const, + }); + + assert.isTrue(canReact(message, ourConversationId, getConversationById)); + }); + + it('returns true for incoming messages', () => { + const message = { + conversationId: 'fake-conversation-id', + type: 'incoming' as const, + }; + const getConversationById = () => defaultConversation; + + assert.isTrue(canReact(message, ourConversationId, getConversationById)); + }); + }); + describe('canReply', () => { const defaultConversation: ConversationType = { id: uuid(), diff --git a/ts/test-node/jobs/JobQueueDatabaseStore_test.ts b/ts/test-node/jobs/JobQueueDatabaseStore_test.ts index 87c477f7a898..a8f98902d028 100644 --- a/ts/test-node/jobs/JobQueueDatabaseStore_test.ts +++ b/ts/test-node/jobs/JobQueueDatabaseStore_test.ts @@ -110,7 +110,7 @@ describe('JobQueueDatabaseStore', () => { queueType: 'test queue', data: { hi: 5 }, }, - { shouldInsertIntoDatabase: false } + { shouldPersist: false } ); await streamPromise; diff --git a/ts/test-node/jobs/JobQueue_test.ts b/ts/test-node/jobs/JobQueue_test.ts index 166737024a42..e0d5e39ac9a2 100644 --- a/ts/test-node/jobs/JobQueue_test.ts +++ b/ts/test-node/jobs/JobQueue_test.ts @@ -228,6 +228,45 @@ describe('JobQueue', () => { assert.lengthOf(queueTypes['test 2'], 2); }); + it('can override the insertion logic, skipping storage persistence', async () => { + const store = new TestJobQueueStore(); + + class TestQueue extends JobQueue { + parseData(data: unknown): string { + return z.string().parse(data); + } + + async run(): Promise { + return Promise.resolve(); + } + } + + const queue = new TestQueue({ + store, + queueType: 'test queue', + maxAttempts: 1, + }); + + queue.streamJobs(); + + const insert = sinon.stub().resolves(); + + await queue.add('foo bar', insert); + + assert.lengthOf(store.storedJobs, 0); + + sinon.assert.calledOnce(insert); + sinon.assert.calledWith( + insert, + sinon.match({ + id: sinon.match.string, + timestamp: sinon.match.number, + queueType: 'test queue', + data: 'foo bar', + }) + ); + }); + it('retries jobs, running them up to maxAttempts times', async () => { type TestJobData = 'foo' | 'bar'; diff --git a/ts/test-node/jobs/TestJobQueueStore.ts b/ts/test-node/jobs/TestJobQueueStore.ts index d52f3623e423..c680bdfec10e 100644 --- a/ts/test-node/jobs/TestJobQueueStore.ts +++ b/ts/test-node/jobs/TestJobQueueStore.ts @@ -24,7 +24,10 @@ export class TestJobQueueStore implements JobQueueStore { }); } - async insert(job: Readonly): Promise { + async insert( + job: Readonly, + { shouldPersist = true }: Readonly<{ shouldPersist?: boolean }> = {} + ): Promise { await fakeDelay(); this.storedJobs.forEach(storedJob => { @@ -33,7 +36,9 @@ export class TestJobQueueStore implements JobQueueStore { } }); - this.storedJobs.push(job); + if (shouldPersist) { + this.storedJobs.push(job); + } this.getPipe(job.queueType).add(job); diff --git a/ts/test-node/jobs/helpers/InMemoryQueues_test.ts b/ts/test-node/jobs/helpers/InMemoryQueues_test.ts new file mode 100644 index 000000000000..c29cd4a9ef1d --- /dev/null +++ b/ts/test-node/jobs/helpers/InMemoryQueues_test.ts @@ -0,0 +1,44 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; + +import { InMemoryQueues } from '../../../jobs/helpers/InMemoryQueues'; + +describe('InMemoryQueues', () => { + describe('get', () => { + it('returns a new PQueue for each key', () => { + const queues = new InMemoryQueues(); + + assert.strictEqual(queues.get('a'), queues.get('a')); + assert.notStrictEqual(queues.get('a'), queues.get('b')); + assert.notStrictEqual(queues.get('b'), queues.get('c')); + }); + + it('returns a queue that only executes one thing at a time', () => { + const queue = new InMemoryQueues().get('foo'); + + assert.strictEqual(queue.concurrency, 1); + }); + + it('cleans up the queues when all tasks have run', async () => { + const queues = new InMemoryQueues(); + + const originalQueue = queues.get('foo'); + + originalQueue.pause(); + const tasksPromise = originalQueue.addAll([ + async () => { + assert.strictEqual(queues.get('foo'), originalQueue); + }, + async () => { + assert.strictEqual(queues.get('foo'), originalQueue); + }, + ]); + originalQueue.start(); + await tasksPromise; + + assert.notStrictEqual(queues.get('foo'), originalQueue); + }); + }); +}); diff --git a/ts/util/areObjectEntriesEqual.ts b/ts/util/areObjectEntriesEqual.ts new file mode 100644 index 000000000000..c1838b857b97 --- /dev/null +++ b/ts/util/areObjectEntriesEqual.ts @@ -0,0 +1,8 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +export const areObjectEntriesEqual = ( + a: Readonly, + b: Readonly, + keys: ReadonlyArray +): boolean => a === b || keys.every(key => a[key] === b[key]); diff --git a/ts/views/conversation_view.ts b/ts/views/conversation_view.ts index 178a2f823a2e..16421bc5fc94 100644 --- a/ts/views/conversation_view.ts +++ b/ts/views/conversation_view.ts @@ -32,6 +32,7 @@ import type { import type { MessageModel } from '../models/messages'; import { strictAssert } from '../util/assert'; import { maybeParseUrl } from '../util/url'; +import { enqueueReactionForSend } from '../reactions/enqueueReactionForSend'; import { addReportSpamJob } from '../jobs/helpers/addReportSpamJob'; import { reportSpamJobQueue } from '../jobs/reportSpamJobQueue'; import type { GroupNameCollisionsWithIdsByTitle } from '../util/groupMemberNameCollisions'; @@ -845,11 +846,21 @@ export class ConversationView extends window.Backbone.View { } getMessageActions(): MessageActionsType { - const reactToMessage = ( + const reactToMessage = async ( messageId: string, reaction: { emoji: string; remove: boolean } ) => { - this.sendReactionMessage(messageId, reaction); + const { emoji, remove } = reaction; + try { + await enqueueReactionForSend({ + messageId, + emoji, + remove, + }); + } catch (error) { + log.error('Error sending reaction', error, messageId, reaction); + showToast(ToastReactionFailed); + } }; const replyToMessage = (messageId: string) => { this.setQuoteMessage(messageId); @@ -2997,38 +3008,6 @@ export class ConversationView extends window.Backbone.View { }); } - async sendReactionMessage( - messageId: string, - reaction: { emoji: string; remove: boolean } - ): Promise { - const messageModel = messageId - ? await getMessageById(messageId, { - Message: Whisper.Message, - }) - : undefined; - - try { - if (!messageModel) { - throw new Error('sendReactionMessage: Message not found'); - } - const targetAuthorUuid = messageModel.getSourceUuid(); - if (!targetAuthorUuid) { - throw new Error( - `sendReactionMessage: Message ${messageModel.idForLogging()} had no source uuid! Cannot send reaction.` - ); - } - - await this.model.sendReactionMessage(reaction, { - messageId, - targetAuthorUuid, - targetTimestamp: messageModel.get('sent_at'), - }); - } catch (error) { - log.error('Error sending reaction', error, messageId, reaction); - showToast(ToastReactionFailed); - } - } - async sendStickerMessage(options: { packId: string; stickerId: number;