Pass abortSignal to sendToGroup
This commit is contained in:
parent
7afe3fcca2
commit
3be95e821e
6 changed files with 49 additions and 26 deletions
|
@ -92,7 +92,7 @@ export async function sendDeleteForEveryone(
|
||||||
|
|
||||||
await conversation.queueJob(
|
await conversation.queueJob(
|
||||||
'conversationQueue/sendDeleteForEveryone',
|
'conversationQueue/sendDeleteForEveryone',
|
||||||
async () => {
|
async abortSignal => {
|
||||||
log.info(
|
log.info(
|
||||||
`Sending deleteForEveryone to conversation ${logId}`,
|
`Sending deleteForEveryone to conversation ${logId}`,
|
||||||
`with timestamp ${timestamp}`,
|
`with timestamp ${timestamp}`,
|
||||||
|
@ -209,6 +209,7 @@ export async function sendDeleteForEveryone(
|
||||||
messageIds,
|
messageIds,
|
||||||
send: async () =>
|
send: async () =>
|
||||||
window.Signal.Util.sendToGroup({
|
window.Signal.Util.sendToGroup({
|
||||||
|
abortSignal,
|
||||||
contentHint,
|
contentHint,
|
||||||
groupSendOptions: {
|
groupSendOptions: {
|
||||||
groupV1: conversation.getGroupV1Info(recipients),
|
groupV1: conversation.getGroupV1Info(recipients),
|
||||||
|
|
|
@ -90,27 +90,30 @@ export async function sendGroupUpdate(
|
||||||
};
|
};
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await conversation.queueJob('conversationQueue/sendGroupUpdate', async () =>
|
await conversation.queueJob(
|
||||||
wrapWithSyncMessageSend({
|
'conversationQueue/sendGroupUpdate',
|
||||||
conversation,
|
async abortSignal =>
|
||||||
logId,
|
wrapWithSyncMessageSend({
|
||||||
messageIds: [],
|
conversation,
|
||||||
send: async () =>
|
logId,
|
||||||
window.Signal.Util.sendToGroup({
|
messageIds: [],
|
||||||
groupSendOptions: {
|
send: async () =>
|
||||||
groupV2,
|
window.Signal.Util.sendToGroup({
|
||||||
timestamp,
|
abortSignal,
|
||||||
profileKey,
|
groupSendOptions: {
|
||||||
},
|
groupV2,
|
||||||
contentHint,
|
timestamp,
|
||||||
messageId: undefined,
|
profileKey,
|
||||||
sendOptions,
|
},
|
||||||
sendTarget: conversation.toSenderKeyTarget(),
|
contentHint,
|
||||||
sendType,
|
messageId: undefined,
|
||||||
}),
|
sendOptions,
|
||||||
sendType,
|
sendTarget: conversation.toSenderKeyTarget(),
|
||||||
timestamp,
|
sendType,
|
||||||
})
|
}),
|
||||||
|
sendType,
|
||||||
|
timestamp,
|
||||||
|
})
|
||||||
);
|
);
|
||||||
} catch (error: unknown) {
|
} catch (error: unknown) {
|
||||||
await handleMultipleSendErrors({
|
await handleMultipleSendErrors({
|
||||||
|
|
|
@ -199,8 +199,9 @@ export async function sendNormalMessage(
|
||||||
log.info('sending group message');
|
log.info('sending group message');
|
||||||
innerPromise = conversation.queueJob(
|
innerPromise = conversation.queueJob(
|
||||||
'conversationQueue/sendNormalMessage',
|
'conversationQueue/sendNormalMessage',
|
||||||
() =>
|
abortSignal =>
|
||||||
window.Signal.Util.sendToGroup({
|
window.Signal.Util.sendToGroup({
|
||||||
|
abortSignal,
|
||||||
contentHint: ContentHint.RESENDABLE,
|
contentHint: ContentHint.RESENDABLE,
|
||||||
groupSendOptions: {
|
groupSendOptions: {
|
||||||
attachments,
|
attachments,
|
||||||
|
|
|
@ -242,7 +242,7 @@ export async function sendReaction(
|
||||||
log.info('sending group reaction message');
|
log.info('sending group reaction message');
|
||||||
promise = conversation.queueJob(
|
promise = conversation.queueJob(
|
||||||
'conversationQueue/sendReaction',
|
'conversationQueue/sendReaction',
|
||||||
() => {
|
abortSignal => {
|
||||||
// Note: this will happen for all old jobs queued before 5.32.x
|
// Note: this will happen for all old jobs queued before 5.32.x
|
||||||
if (isGroupV2(conversation.attributes) && !isNumber(revision)) {
|
if (isGroupV2(conversation.attributes) && !isNumber(revision)) {
|
||||||
log.error('No revision provided, but conversation is GroupV2');
|
log.error('No revision provided, but conversation is GroupV2');
|
||||||
|
@ -256,6 +256,7 @@ export async function sendReaction(
|
||||||
}
|
}
|
||||||
|
|
||||||
return window.Signal.Util.sendToGroup({
|
return window.Signal.Util.sendToGroup({
|
||||||
|
abortSignal,
|
||||||
contentHint: ContentHint.RESENDABLE,
|
contentHint: ContentHint.RESENDABLE,
|
||||||
groupSendOptions: {
|
groupSendOptions: {
|
||||||
groupV1: conversation.getGroupV1Info(
|
groupV1: conversation.getGroupV1Info(
|
||||||
|
|
|
@ -3440,7 +3440,10 @@ export class ConversationModel extends window.Backbone
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
queueJob<T>(name: string, callback: () => Promise<T>): Promise<T> {
|
queueJob<T>(
|
||||||
|
name: string,
|
||||||
|
callback: (abortSignal: AbortSignal) => Promise<T>
|
||||||
|
): Promise<T> {
|
||||||
this.jobQueue = this.jobQueue || new window.PQueue({ concurrency: 1 });
|
this.jobQueue = this.jobQueue || new window.PQueue({ concurrency: 1 });
|
||||||
|
|
||||||
const taskWithTimeout = createTaskWithTimeout(
|
const taskWithTimeout = createTaskWithTimeout(
|
||||||
|
@ -3448,6 +3451,9 @@ export class ConversationModel extends window.Backbone
|
||||||
`conversation ${this.idForLogging()}`
|
`conversation ${this.idForLogging()}`
|
||||||
);
|
);
|
||||||
|
|
||||||
|
const abortController = new AbortController();
|
||||||
|
const { signal: abortSignal } = abortController;
|
||||||
|
|
||||||
const queuedAt = Date.now();
|
const queuedAt = Date.now();
|
||||||
return this.jobQueue.add(async () => {
|
return this.jobQueue.add(async () => {
|
||||||
const startedAt = Date.now();
|
const startedAt = Date.now();
|
||||||
|
@ -3458,7 +3464,10 @@ export class ConversationModel extends window.Backbone
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return await taskWithTimeout();
|
return await taskWithTimeout(abortSignal);
|
||||||
|
} catch (error) {
|
||||||
|
abortController.abort();
|
||||||
|
throw error;
|
||||||
} finally {
|
} finally {
|
||||||
const duration = Date.now() - startedAt;
|
const duration = Date.now() - startedAt;
|
||||||
|
|
||||||
|
|
|
@ -89,6 +89,7 @@ export type SenderKeyTargetType = {
|
||||||
};
|
};
|
||||||
|
|
||||||
export async function sendToGroup({
|
export async function sendToGroup({
|
||||||
|
abortSignal,
|
||||||
contentHint,
|
contentHint,
|
||||||
groupSendOptions,
|
groupSendOptions,
|
||||||
isPartialSend,
|
isPartialSend,
|
||||||
|
@ -97,6 +98,7 @@ export async function sendToGroup({
|
||||||
sendTarget,
|
sendTarget,
|
||||||
sendType,
|
sendType,
|
||||||
}: {
|
}: {
|
||||||
|
abortSignal?: AbortSignal;
|
||||||
contentHint: number;
|
contentHint: number;
|
||||||
groupSendOptions: GroupSendOptionsType;
|
groupSendOptions: GroupSendOptionsType;
|
||||||
isPartialSend?: boolean;
|
isPartialSend?: boolean;
|
||||||
|
@ -120,6 +122,12 @@ export async function sendToGroup({
|
||||||
protoAttributes
|
protoAttributes
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Attachment upload might take too long to succeed - we don't want to proceed
|
||||||
|
// with the send if the caller aborted this call.
|
||||||
|
if (abortSignal?.aborted) {
|
||||||
|
throw new Error('sendToGroup was aborted');
|
||||||
|
}
|
||||||
|
|
||||||
return sendContentMessageToGroup({
|
return sendContentMessageToGroup({
|
||||||
contentHint,
|
contentHint,
|
||||||
contentMessage,
|
contentMessage,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue