// Copyright 2025 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only import type { AciString } from '../types/ServiceId.js'; import type { MessageAttributesType, ReadonlyMessageAttributesType, } from '../model-types.d.ts'; import type { MessagePollVoteType } from '../types/Polls.js'; import { MessageModel } from '../models/messages.js'; import { DataReader } from '../sql/Client.js'; import * as Errors from '../types/errors.js'; import { createLogger } from '../logging/log.js'; import { getAuthor, isIncoming, isOutgoing } from '../messages/helpers.js'; import { isSent } from '../messages/MessageSendState.js'; import { getPropForTimestamp } from '../util/editHelpers.js'; import { isMe } from '../util/whatTypeOfConversation.js'; import { strictAssert } from '../util/assert.js'; import { getMessageIdForLogging } from '../util/idForLogging.js'; const log = createLogger('Polls'); export enum PollSource { FromThisDevice = 'FromThisDevice', FromSync = 'FromSync', FromSomeoneElse = 'FromSomeoneElse', } export type PollVoteAttributesType = { envelopeId: string; fromConversationId: string; removeFromMessageReceiverCache: () => unknown; source: PollSource; targetAuthorAci: AciString; targetTimestamp: number; optionIndexes: ReadonlyArray; voteCount: number; timestamp: number; receivedAtDate: number; }; export type PollTerminateAttributesType = { envelopeId: string; fromConversationId: string; removeFromMessageReceiverCache: () => unknown; source: PollSource; targetTimestamp: number; timestamp: number; receivedAtDate: number; }; const pollVoteCache = new Map(); const pollTerminateCache = new Map(); function removeVote(vote: PollVoteAttributesType): void { pollVoteCache.delete(vote.envelopeId); vote.removeFromMessageReceiverCache(); } function removeTerminate(terminate: PollTerminateAttributesType): void { pollTerminateCache.delete(terminate.envelopeId); terminate.removeFromMessageReceiverCache(); } function doesVoteModifierMatchMessage({ message, targetTimestamp, targetAuthorAci, targetAuthorId, voteSenderConversationId, }: { message: ReadonlyMessageAttributesType; targetTimestamp: number; targetAuthorAci?: string; targetAuthorId?: string; voteSenderConversationId: string; }): boolean { if (message.sent_at !== targetTimestamp) { return false; } const author = getAuthor(message); if (!author) { return false; } const targetAuthorConversation = window.ConversationController.get( targetAuthorAci ?? targetAuthorId ); if (!targetAuthorConversation) { return false; } if (author.id !== targetAuthorConversation.id) { return false; } const voteSenderConversation = window.ConversationController.get( voteSenderConversationId ); if (!voteSenderConversation) { return false; } if (isMe(voteSenderConversation.attributes)) { return true; } if (isOutgoing(message)) { const sendStateByConversationId = getPropForTimestamp({ log, message, prop: 'sendStateByConversationId', targetTimestamp, }); const sendState = sendStateByConversationId?.[voteSenderConversationId]; return !!sendState && isSent(sendState.status); } if (isIncoming(message)) { const messageConversation = window.ConversationController.get( message.conversationId ); if (!messageConversation) { return false; } const voteSenderServiceId = voteSenderConversation.getServiceId(); return ( voteSenderServiceId != null && messageConversation.hasMember(voteSenderServiceId) ); } return false; } async function findPollMessage({ targetTimestamp, targetAuthorAci, targetAuthorId, voteSenderConversationId, logId, }: { targetTimestamp: number; targetAuthorAci?: string; targetAuthorId?: string; voteSenderConversationId: string; logId: string; }): Promise { const messages = await DataReader.getMessagesBySentAt(targetTimestamp); const matchingMessages = messages.filter(message => { if (!message.poll) { return false; } return doesVoteModifierMatchMessage({ message, targetTimestamp, targetAuthorAci, targetAuthorId, voteSenderConversationId, }); }); if (!matchingMessages.length) { return undefined; } if (matchingMessages.length > 1) { log.warn( `${logId}/findPollMessage: found ${matchingMessages.length} matching messages for the poll!` ); } return matchingMessages[0]; } export async function onPollVote(vote: PollVoteAttributesType): Promise { pollVoteCache.set(vote.envelopeId, vote); const logId = `Polls.onPollVote(timestamp=${vote.timestamp};target=${vote.targetTimestamp})`; try { const matchingMessage = await findPollMessage({ targetTimestamp: vote.targetTimestamp, targetAuthorAci: vote.targetAuthorAci, voteSenderConversationId: vote.fromConversationId, logId, }); if (!matchingMessage) { log.info( `${logId}: No poll message for vote`, 'targeting', vote.targetAuthorAci ); return; } const matchingMessageConversation = window.ConversationController.get( matchingMessage.conversationId ); if (!matchingMessageConversation) { log.info( `${logId}: No target conversation for poll vote`, vote.targetAuthorAci, vote.targetTimestamp ); removeVote(vote); return undefined; } // awaiting is safe since `onPollVote` is never called from inside the queue await matchingMessageConversation.queueJob('Polls.onPollVote', async () => { log.info(`${logId}: handling`); // Message is fetched inside the conversation queue so we have the // most recent data const targetMessage = await findPollMessage({ targetTimestamp: vote.targetTimestamp, targetAuthorAci: vote.targetAuthorAci, voteSenderConversationId: vote.fromConversationId, logId: `${logId}/conversationQueue`, }); if (!targetMessage || targetMessage.id !== matchingMessage.id) { log.warn( `${logId}: message no longer a match for vote! Maybe it's been deleted?` ); removeVote(vote); return; } const targetMessageModel = window.MessageCache.register( new MessageModel(targetMessage) ); await handlePollVote(targetMessageModel, vote); removeVote(vote); }); } catch (error) { removeVote(vote); log.error(`${logId} error:`, Errors.toLogFormat(error)); } } export async function onPollTerminate( terminate: PollTerminateAttributesType ): Promise { pollTerminateCache.set(terminate.envelopeId, terminate); const logId = `Polls.onPollTerminate(timestamp=${terminate.timestamp};target=${terminate.targetTimestamp})`; try { // For termination, we need to find the poll by timestamp only // The fromConversationId must be the poll creator const matchingMessage = await findPollMessage({ targetTimestamp: terminate.targetTimestamp, targetAuthorId: terminate.fromConversationId, voteSenderConversationId: terminate.fromConversationId, logId, }); if (!matchingMessage) { log.info( `${logId}: No poll message for termination`, 'targeting timestamp', terminate.targetTimestamp ); return; } const matchingMessageConversation = window.ConversationController.get( matchingMessage.conversationId ); if (!matchingMessageConversation) { log.info( `${logId}: No target conversation for poll termination`, terminate.targetTimestamp ); removeTerminate(terminate); return undefined; } // awaiting is safe since `onPollTerminate` is never called from inside the queue await matchingMessageConversation.queueJob( 'Polls.onPollTerminate', async () => { log.info(`${logId}: handling`); // Re-fetch to ensure we have the most recent data const targetMessages = await DataReader.getMessagesBySentAt( terminate.targetTimestamp ); const targetMessage = targetMessages.find( msg => msg.id === matchingMessage.id ); if (!targetMessage) { log.warn( `${logId}: message no longer exists! Maybe it's been deleted?` ); removeTerminate(terminate); return; } const targetMessageModel = window.MessageCache.register( new MessageModel(targetMessage) ); await handlePollTerminate(targetMessageModel, terminate); removeTerminate(terminate); } ); } catch (error) { removeTerminate(terminate); log.error(`${logId} error:`, Errors.toLogFormat(error)); } } export async function handlePollVote( message: MessageModel, vote: PollVoteAttributesType, { shouldPersist = true, }: { shouldPersist?: boolean; } = {} ): Promise { if (message.get('deletedForEveryone')) { return; } const poll = message.get('poll'); if (!poll) { log.warn('handlePollVote: Message is not a poll'); return; } if (poll.terminatedAt) { log.info('handlePollVote: Poll is already terminated, ignoring vote'); return; } // Validate option indexes const maxOptionIndex = poll.options.length - 1; const invalidIndexes = vote.optionIndexes.filter( index => index < 0 || index > maxOptionIndex ); if (invalidIndexes.length > 0) { log.warn('handlePollVote: Invalid option indexes found, dropping'); return; } // Check multiple choice constraint if (!poll.allowMultiple && vote.optionIndexes.length > 1) { log.warn( 'handlePollVote: Multiple votes not allowed for this poll, dropping' ); return; } const conversation = window.ConversationController.get( message.attributes.conversationId ); if (!conversation) { return; } const isFromThisDevice = vote.source === PollSource.FromThisDevice; const isFromSync = vote.source === PollSource.FromSync; const isFromSomeoneElse = vote.source === PollSource.FromSomeoneElse; strictAssert( isFromThisDevice || isFromSync || isFromSomeoneElse, 'Vote can only be from this device, from sync, or from someone else' ); const newVote: MessagePollVoteType = { fromConversationId: vote.fromConversationId, optionIndexes: vote.optionIndexes, voteCount: vote.voteCount, timestamp: vote.timestamp, }; // Update or add vote with conflict resolution const currentVotes: Array = poll.votes ? [...poll.votes] : []; let updatedVotes: Array; const existingVoteIndex = currentVotes.findIndex( v => v.fromConversationId === vote.fromConversationId ); if (existingVoteIndex !== -1) { const existingVote = currentVotes[existingVoteIndex]; if (newVote.voteCount > existingVote.voteCount) { updatedVotes = [...currentVotes]; updatedVotes[existingVoteIndex] = newVote; } else { log.info( 'handlePollVote: Keeping existing vote with higher or same voteCount' ); updatedVotes = currentVotes; } } else { updatedVotes = [...currentVotes, newVote]; } message.set({ poll: { ...poll, votes: updatedVotes, }, }); log.info( 'handlePollVote:', `Done processing vote for poll ${getMessageIdForLogging(message.attributes)}.` ); if (shouldPersist) { await window.MessageCache.saveMessage(message.attributes); window.reduxActions.conversations.markOpenConversationRead(conversation.id); } } export async function handlePollTerminate( message: MessageModel, terminate: PollTerminateAttributesType, { shouldPersist = true, }: { shouldPersist?: boolean; } = {} ): Promise { const { attributes } = message; if (message.get('deletedForEveryone')) { return; } const poll = message.get('poll'); if (!poll) { log.warn('handlePollTerminate: Message is not a poll'); return; } if (poll.terminatedAt) { log.info('handlePollTerminate: Poll is already terminated'); return; } const conversation = window.ConversationController.get( message.attributes.conversationId ); if (!conversation) { return; } // Verify the terminator is the poll creator const author = getAuthor(attributes); const terminatorConversation = window.ConversationController.get( terminate.fromConversationId ); if ( !author || !terminatorConversation || author.id !== terminatorConversation.id ) { log.warn( 'handlePollTerminate: Termination rejected - not from poll creator' ); return; } message.set({ poll: { ...poll, terminatedAt: terminate.timestamp, }, }); log.info( 'handlePollTerminate:', `Poll ${getMessageIdForLogging(message.attributes)} terminated at ${terminate.timestamp}` ); if (shouldPersist) { await window.MessageCache.saveMessage(message.attributes); window.reduxActions.conversations.markOpenConversationRead(conversation.id); } } export function drainCachedVotesForMessage( message: ReadonlyMessageAttributesType ): Array { const matching = Array.from(pollVoteCache.values()).filter(vote => { if (!message.poll) { return false; } return doesVoteModifierMatchMessage({ message, targetTimestamp: vote.targetTimestamp, targetAuthorAci: vote.targetAuthorAci, voteSenderConversationId: vote.fromConversationId, }); }); matching.forEach(vote => removeVote(vote)); return matching; } export function drainCachedTerminatesForMessage( message: ReadonlyMessageAttributesType ): Array { const matching = Array.from(pollTerminateCache.values()).filter(term => { return message.poll && message.sent_at === term.targetTimestamp; }); matching.forEach(term => removeTerminate(term)); return matching; }