Refactor send story job

This commit is contained in:
Fedor Indutny 2022-11-28 17:02:01 -08:00 committed by GitHub
parent 5b6c624803
commit 11deabe959
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -27,7 +27,7 @@ import type { UUIDStringType } from '../../types/UUID';
import * as Errors from '../../types/errors';
import dataInterface from '../../sql/Client';
import { SignalService as Proto } from '../../protobuf';
import { getMessageById } from '../../messages/getMessageById';
import { getMessagesById } from '../../messages/getMessagesById';
import {
getSendOptions,
getSendOptionsForRecipients,
@ -70,36 +70,68 @@ export async function sendStory(
return;
}
// We want to generate the StoryMessage proto once at the top level so we
// can reuse it but first we'll need textAttachment | fileAttachment.
// This function pulls off the attachment and generates the proto from the
// first message on the list prior to continuing.
const originalStoryMessage = await (async (): Promise<
Proto.StoryMessage | undefined
> => {
const [messageId] = messageIds;
const message = await getMessageById(messageId);
if (!message) {
log.info(
`stories.sendStory(${messageId}): message was not found, maybe because it was deleted. Giving up on sending it`
);
return;
}
const notFound = new Set(messageIds);
const messages = (await getMessagesById(messageIds)).filter(message => {
notFound.delete(message.id);
const distributionId = message.get('storyDistributionListId');
const logId = `stories.sendStory(${timestamp}/${distributionId})`;
const messageConversation = message.getConversation();
if (messageConversation !== conversation) {
log.error(
`stories.sendStory(${timestamp}): Message conversation '${messageConversation?.idForLogging()}' does not match job conversation ${conversation.idForLogging()}`
`${logId}: Message conversation ` +
`'${messageConversation?.idForLogging()}' does not match job ` +
`conversation ${conversation.idForLogging()}`
);
return false;
}
if (message.get('timestamp') !== timestamp) {
log.error(
`${logId}: Message timestamp ${message.get(
'timestamp'
)} does not match job timestamp`
);
return false;
}
if (message.isErased() || message.get('deletedForEveryone')) {
log.info(`${logId}: message was erased. Giving up on sending it`);
return false;
}
return true;
});
for (const messageId of notFound) {
log.info(
`stories.sendStory(${messageId}): message was not found, ` +
'maybe because it was deleted. Giving up on sending it'
);
}
// We want to generate the StoryMessage proto once at the top level so we
// can reuse it but first we'll need textAttachment | fileAttachment.
// This function pulls off the attachment and generates the proto from the
// first message on the list prior to continuing.
let originalStoryMessage: Proto.StoryMessage;
{
const [originalMessageId] = messageIds;
const originalMessage = messages.find(
message => message.id === originalMessageId
);
if (!originalMessage) {
return;
}
const attachments = message.get('attachments') || [];
const attachments = originalMessage.get('attachments') || [];
const [attachment] = attachments;
if (!attachment) {
log.info(
`stories.sendStory(${timestamp}): message does not have any attachments to send. Giving up on sending it`
`stories.sendStory(${timestamp}): original story message does not ` +
'have any attachments to send. Giving up on sending it'
);
return;
}
@ -122,17 +154,13 @@ export async function sendStory(
// Some distribution lists need allowsReplies false, some need it set to true
// we create this proto (for the sync message) and also to re-use some of the
// attributes inside it.
return messaging.getStoryMessage({
originalStoryMessage = await messaging.getStoryMessage({
allowsReplies: true,
fileAttachment,
groupV2,
textAttachment,
profileKey,
});
})();
if (!originalStoryMessage) {
return;
}
const canReplyUuids = new Set<string>();
@ -171,43 +199,13 @@ export async function sendStory(
// complete, and so we can send a sync message afterwards if we sent the story
// successfully to at least one recipient.
const sendResults = await Promise.allSettled(
messageIds.map(async (messageId: string): Promise<void> => {
const message = await getMessageById(messageId);
if (!message) {
log.info(
`stories.sendStory(${messageId}): message was not found, maybe because it was deleted. Giving up on sending it`
);
return;
}
messages.map(async (message: MessageModel): Promise<void> => {
const distributionId = message.get('storyDistributionListId');
const logId = `stories.sendStory(${timestamp}/${distributionId})`;
if (message.get('timestamp') !== timestamp) {
log.error(
`${logId}: Message timestamp ${message.get(
'timestamp'
)} does not match job timestamp`
);
return;
}
const messageConversation = message.getConversation();
if (messageConversation !== conversation) {
log.error(
`${logId}: Message conversation '${messageConversation?.idForLogging()}' does not match job conversation ${conversation.idForLogging()}`
);
return;
}
if (message.isErased() || message.get('deletedForEveryone')) {
log.info(`${logId}: message was erased. Giving up on sending it`);
return;
}
const listId = message.get('storyDistributionListId');
const receiverId = isGroupV2(messageConversation.attributes)
? messageConversation.id
const receiverId = isGroupV2(conversation.attributes)
? conversation.id
: listId;
if (!receiverId) {
@ -217,7 +215,7 @@ export async function sendStory(
return;
}
const distributionList = isGroupV2(messageConversation.attributes)
const distributionList = isGroupV2(conversation.attributes)
? undefined
: await dataInterface.getStoryDistributionWithMembers(receiverId);
@ -302,7 +300,7 @@ export async function sendStory(
storyMessage.textAttachment = originalStoryMessage.textAttachment;
storyMessage.group = originalStoryMessage.group;
storyMessage.allowsReplies =
isGroupV2(messageConversation.attributes) ||
isGroupV2(conversation.attributes) ||
Boolean(distributionList?.allowsReplies);
let inMemorySenderKeyInfo = distributionList?.senderKeyInfo;
@ -347,11 +345,12 @@ export async function sendStory(
});
// Don't send normal sync messages; a story sync is sent at the end of the process
// eslint-disable-next-line no-param-reassign
message.doNotSendSyncMessage = true;
const messageSendPromise = message.send(
handleMessageSend(innerPromise, {
messageIds: [messageId],
messageIds: [message.id],
sendType: 'story',
}),
saveErrors
@ -456,12 +455,7 @@ export async function sendStory(
// messages but we still want to make sure that the sendStateByConversationId
// is kept in sync across all messages.
await Promise.all(
messageIds.map(async messageId => {
const message = await getMessageById(messageId);
if (!message) {
return;
}
messages.map(async message => {
const oldSendStateByConversationId =
message.get('sendStateByConversationId') || {};