From 75f0cd50beff73885ebae92e4ac977de9f56d6c9 Mon Sep 17 00:00:00 2001 From: Evan Hahn <69474926+EvanHahn-Signal@users.noreply.github.com> Date: Tue, 17 Aug 2021 10:43:26 -0500 Subject: [PATCH] Send viewed receipt when you start listening to an audio message --- ts/background.ts | 10 +-- ts/jobs/JobLogger.ts | 49 ++++++++++ ts/jobs/JobQueue.ts | 20 ++++- ts/jobs/helpers/commonShouldJobContinue.ts | 50 +++++++++++ .../helpers/handleCommonJobRequestError.ts | 22 +++++ ts/jobs/helpers/runReadOrViewSyncJob.ts | 61 +++---------- ts/jobs/initializeAllJobQueues.ts | 2 + ts/jobs/readSyncJobQueue.ts | 6 +- ts/jobs/removeStorageKeyJobQueue.ts | 2 - ts/jobs/reportSpamJobQueue.ts | 11 ++- ts/jobs/viewSyncJobQueue.ts | 6 +- ts/jobs/viewedReceiptsJobQueue.ts | 64 +++++++++++++ ts/logging/log.ts | 10 +++ ts/test-node/jobs/JobLogger_test.ts | 54 +++++++++++ ts/test-node/jobs/JobQueue_test.ts | 57 ++++++++++++ ts/textsecure/SendMessage.ts | 89 +++++++++++-------- ts/util/handleMessageSend.ts | 3 +- ts/util/sendViewedReceipt.ts | 55 ++++++++++++ ts/views/conversation_view.ts | 21 ++++- 19 files changed, 483 insertions(+), 109 deletions(-) create mode 100644 ts/jobs/JobLogger.ts create mode 100644 ts/jobs/helpers/commonShouldJobContinue.ts create mode 100644 ts/jobs/helpers/handleCommonJobRequestError.ts create mode 100644 ts/jobs/viewedReceiptsJobQueue.ts create mode 100644 ts/test-node/jobs/JobLogger_test.ts create mode 100644 ts/util/sendViewedReceipt.ts diff --git a/ts/background.ts b/ts/background.ts index ec34971e4b60..bfa63d2ab42e 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -350,8 +350,8 @@ export async function startApp(): Promise { continue; } - const uuid = c.get('uuid'); - const e164 = c.get('e164'); + const senderUuid = c.get('uuid'); + const senderE164 = c.get('e164'); c.queueJob('sendDeliveryReceipt', async () => { try { @@ -360,8 +360,8 @@ export async function startApp(): Promise { // eslint-disable-next-line no-await-in-loop await handleMessageSend( window.textsecure.messaging.sendDeliveryReceipt({ - e164, - uuid, + senderE164, + senderUuid, timestamps, options: sendOptions, }), @@ -369,7 +369,7 @@ export async function startApp(): Promise { ); } catch (error) { window.log.error( - `Failed to send delivery receipt to ${e164}/${uuid} for timestamps ${timestamps}:`, + `Failed to send delivery receipt to ${senderE164}/${senderUuid} for timestamps ${timestamps}:`, error && error.stack ? error.stack : error ); } diff --git a/ts/jobs/JobLogger.ts b/ts/jobs/JobLogger.ts new file mode 100644 index 000000000000..b8ba586e1498 --- /dev/null +++ b/ts/jobs/JobLogger.ts @@ -0,0 +1,49 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import type { LoggerType } from '../logging/log'; +import type { ParsedJob } from './types'; + +export class JobLogger implements LoggerType { + private id: string; + + private queueType: string; + + public attempt = -1; + + constructor( + job: Readonly, 'id' | 'queueType'>>, + private logger: LoggerType + ) { + this.id = job.id; + this.queueType = job.queueType; + } + + fatal(...args: ReadonlyArray): void { + this.logger.fatal(this.prefix(), ...args); + } + + error(...args: ReadonlyArray): void { + this.logger.error(this.prefix(), ...args); + } + + warn(...args: ReadonlyArray): void { + this.logger.warn(this.prefix(), ...args); + } + + info(...args: ReadonlyArray): void { + this.logger.info(this.prefix(), ...args); + } + + debug(...args: ReadonlyArray): void { + this.logger.debug(this.prefix(), ...args); + } + + trace(...args: ReadonlyArray): void { + this.logger.trace(this.prefix(), ...args); + } + + private prefix(): string { + return `${this.queueType} job queue, job ID ${this.id}, attempt ${this.attempt}:`; + } +} diff --git a/ts/jobs/JobQueue.ts b/ts/jobs/JobQueue.ts index 2facb669f0e1..4b1e160fbb29 100644 --- a/ts/jobs/JobQueue.ts +++ b/ts/jobs/JobQueue.ts @@ -9,6 +9,7 @@ import { JobError } from './JobError'; import { ParsedJob, StoredJob, JobQueueStore } from './types'; import { assert } from '../util/assert'; import * as log from '../logging/log'; +import { JobLogger } from './JobLogger'; const noopOnCompleteCallbacks = { resolve: noop, @@ -32,6 +33,11 @@ type JobQueueOptions = { * the job to fail; a value of 2 will allow the job to fail once; etc. */ maxAttempts: number; + + /** + * A custom logger. Might be overwritten in test. + */ + logger?: log.LoggerType; }; export abstract class JobQueue { @@ -41,6 +47,8 @@ export abstract class JobQueue { private readonly store: JobQueueStore; + private readonly logger: log.LoggerType; + private readonly logPrefix: string; private readonly onCompleteCallbacks = new Map< @@ -70,6 +78,7 @@ export abstract class JobQueue { this.maxAttempts = options.maxAttempts; this.queueType = options.queueType; this.store = options.store; + this.logger = options.logger ?? log; this.logPrefix = `${this.queueType} job queue:`; } @@ -92,10 +101,13 @@ export abstract class JobQueue { * * If it rejects, the job will be retried up to `maxAttempts - 1` times, after which it * will be deleted from the store. + * + * If your job logs things, you're encouraged to use the logger provided, as it + * automatically includes debugging information. */ protected abstract run( job: Readonly>, - extra?: Readonly<{ attempt: number }> + extra?: Readonly<{ attempt?: number; log?: log.LoggerType }> ): Promise; /** @@ -188,12 +200,16 @@ export abstract class JobQueue { data: parsedData, }; + const logger = new JobLogger(parsedJob, this.logger); + let result: | undefined | { success: true } | { success: false; err: unknown }; for (let attempt = 1; attempt <= this.maxAttempts; attempt += 1) { + logger.attempt = attempt; + log.info( `${this.logPrefix} running job ${storedJob.id}, attempt ${attempt} of ${this.maxAttempts}` ); @@ -201,7 +217,7 @@ export abstract class JobQueue { // We want an `await` in the loop, as we don't want a single job running more // than once at a time. Ideally, the job will succeed on the first attempt. // eslint-disable-next-line no-await-in-loop - await this.run(parsedJob, { attempt }); + await this.run(parsedJob, { attempt, log: logger }); result = { success: true }; log.info( `${this.logPrefix} job ${storedJob.id} succeeded on attempt ${attempt}` diff --git a/ts/jobs/helpers/commonShouldJobContinue.ts b/ts/jobs/helpers/commonShouldJobContinue.ts new file mode 100644 index 000000000000..3dd5dd9a0492 --- /dev/null +++ b/ts/jobs/helpers/commonShouldJobContinue.ts @@ -0,0 +1,50 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import type { LoggerType } from '../../logging/log'; +import { waitForOnline } from '../../util/waitForOnline'; +import { sleep } from '../../util/sleep'; +import { exponentialBackoffSleepTime } from '../../util/exponentialBackoff'; +import { isDone as isDeviceLinked } from '../../util/registration'; + +export async function commonShouldJobContinue({ + attempt, + log, + maxRetryTime, + timestamp, +}: Readonly<{ + attempt: number; + log: LoggerType; + maxRetryTime: number; + timestamp: number; +}>): Promise { + const maxJobAge = timestamp + maxRetryTime; + const timeRemaining = maxJobAge - Date.now(); + + if (timeRemaining <= 0) { + log.info("giving up because it's been too long"); + return false; + } + + try { + await waitForOnline(window.navigator, window, { timeout: timeRemaining }); + } catch (err: unknown) { + log.info("didn't come online in time, giving up"); + return false; + } + + await new Promise(resolve => { + window.storage.onready(resolve); + }); + + if (!isDeviceLinked()) { + log.info("skipping this job because we're unlinked"); + return false; + } + + const sleepTime = exponentialBackoffSleepTime(attempt); + log.info(`sleeping for ${sleepTime}`); + await sleep(sleepTime); + + return true; +} diff --git a/ts/jobs/helpers/handleCommonJobRequestError.ts b/ts/jobs/helpers/handleCommonJobRequestError.ts new file mode 100644 index 000000000000..ba805c03a771 --- /dev/null +++ b/ts/jobs/helpers/handleCommonJobRequestError.ts @@ -0,0 +1,22 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import type { LoggerType } from '../../logging/log'; +import { parseIntWithFallback } from '../../util/parseIntWithFallback'; + +export function handleCommonJobRequestError( + err: unknown, + log: LoggerType +): void { + if (!(err instanceof Error)) { + throw err; + } + + const code = parseIntWithFallback(err.code, -1); + if (code === 508) { + log.info('server responded with 508. Giving up on this job'); + return; + } + + throw err; +} diff --git a/ts/jobs/helpers/runReadOrViewSyncJob.ts b/ts/jobs/helpers/runReadOrViewSyncJob.ts index 26a3899950e7..7336893d943a 100644 --- a/ts/jobs/helpers/runReadOrViewSyncJob.ts +++ b/ts/jobs/helpers/runReadOrViewSyncJob.ts @@ -2,27 +2,27 @@ // SPDX-License-Identifier: AGPL-3.0-only import { chunk } from 'lodash'; -import * as log from '../../logging/log'; -import { waitForOnline } from '../../util/waitForOnline'; +import type { LoggerType } from '../../logging/log'; import { getSendOptions } from '../../util/getSendOptions'; import { handleMessageSend, SendTypesType } from '../../util/handleMessageSend'; import { isNotNil } from '../../util/isNotNil'; -import { sleep } from '../../util/sleep'; -import { exponentialBackoffSleepTime } from '../../util/exponentialBackoff'; -import { isDone as isDeviceLinked } from '../../util/registration'; -import { parseIntWithFallback } from '../../util/parseIntWithFallback'; + +import { commonShouldJobContinue } from './commonShouldJobContinue'; +import { handleCommonJobRequestError } from './handleCommonJobRequestError'; const CHUNK_SIZE = 100; export async function runReadOrViewSyncJob({ attempt, isView, + log, maxRetryTime, syncs, timestamp, }: Readonly<{ attempt: number; isView: boolean; + log: LoggerType; maxRetryTime: number; syncs: ReadonlyArray<{ messageId?: string; @@ -33,59 +33,36 @@ export async function runReadOrViewSyncJob({ timestamp: number; }>): Promise { let sendType: SendTypesType; - let nameForLogging: string; let doSync: | typeof window.textsecure.messaging.syncReadMessages | typeof window.textsecure.messaging.syncView; if (isView) { sendType = 'viewSync'; - nameForLogging = 'viewSyncJobQueue'; doSync = window.textsecure.messaging.syncView.bind( window.textsecure.messaging ); } else { sendType = 'readSync'; - nameForLogging = 'readSyncJobQueue'; doSync = window.textsecure.messaging.syncReadMessages.bind( window.textsecure.messaging ); } - const logInfo = (message: string): void => { - log.info(`${nameForLogging}: ${message}`); - }; - if (!syncs.length) { - logInfo("skipping this job because there's nothing to sync"); + log.info("skipping this job because there's nothing to sync"); return; } - const maxJobAge = timestamp + maxRetryTime; - const timeRemaining = maxJobAge - Date.now(); - - if (timeRemaining <= 0) { - logInfo("giving up because it's been too long"); - return; - } - - try { - await waitForOnline(window.navigator, window, { timeout: timeRemaining }); - } catch (err) { - logInfo("didn't come online in time, giving up"); - return; - } - - await new Promise(resolve => { - window.storage.onready(resolve); + const shouldContinue = await commonShouldJobContinue({ + attempt, + log, + maxRetryTime, + timestamp, }); - - if (!isDeviceLinked()) { - logInfo("skipping this job because we're unlinked"); + if (!shouldContinue) { return; } - await sleep(exponentialBackoffSleepTime(attempt)); - const ourConversation = window.ConversationController.getOurConversationOrThrow(); const sendOptions = await getSendOptions(ourConversation.attributes, { syncMessage: true, @@ -103,16 +80,6 @@ export async function runReadOrViewSyncJob({ }) ); } catch (err: unknown) { - if (!(err instanceof Error)) { - throw err; - } - - const code = parseIntWithFallback(err.code, -1); - if (code === 508) { - logInfo('server responded with 508. Giving up on this job'); - return; - } - - throw err; + handleCommonJobRequestError(err, log); } } diff --git a/ts/jobs/initializeAllJobQueues.ts b/ts/jobs/initializeAllJobQueues.ts index 532e8c4bbffb..b7a503a9faab 100644 --- a/ts/jobs/initializeAllJobQueues.ts +++ b/ts/jobs/initializeAllJobQueues.ts @@ -7,6 +7,7 @@ import { readSyncJobQueue } from './readSyncJobQueue'; import { removeStorageKeyJobQueue } from './removeStorageKeyJobQueue'; import { reportSpamJobQueue } from './reportSpamJobQueue'; import { viewSyncJobQueue } from './viewSyncJobQueue'; +import { viewedReceiptsJobQueue } from './viewedReceiptsJobQueue'; /** * Start all of the job queues. Should be called when the database is ready. @@ -22,4 +23,5 @@ export function initializeAllJobQueues({ removeStorageKeyJobQueue.streamJobs(); reportSpamJobQueue.streamJobs(); viewSyncJobQueue.streamJobs(); + viewedReceiptsJobQueue.streamJobs(); } diff --git a/ts/jobs/readSyncJobQueue.ts b/ts/jobs/readSyncJobQueue.ts index 95d09b0d3072..1a685275c446 100644 --- a/ts/jobs/readSyncJobQueue.ts +++ b/ts/jobs/readSyncJobQueue.ts @@ -5,6 +5,7 @@ import * as z from 'zod'; import * as moment from 'moment'; +import type { LoggerType } from '../logging/log'; import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff'; import { runReadOrViewSyncJob } from './helpers/runReadOrViewSyncJob'; @@ -33,11 +34,12 @@ export class ReadSyncJobQueue extends JobQueue { protected async run( { data, timestamp }: Readonly<{ data: ReadSyncJobData; timestamp: number }>, - { attempt }: Readonly<{ attempt: number }> + { attempt, log }: Readonly<{ attempt: number; log: LoggerType }> ): Promise { await runReadOrViewSyncJob({ attempt, isView: false, + log, maxRetryTime: MAX_RETRY_TIME, syncs: data.readSyncs, timestamp, @@ -47,8 +49,6 @@ export class ReadSyncJobQueue extends JobQueue { export const readSyncJobQueue = new ReadSyncJobQueue({ store: jobQueueDatabaseStore, - queueType: 'read sync', - maxAttempts: exponentialBackoffMaxAttempts(MAX_RETRY_TIME), }); diff --git a/ts/jobs/removeStorageKeyJobQueue.ts b/ts/jobs/removeStorageKeyJobQueue.ts index 71d88d92bbc4..72889a7ac7a8 100644 --- a/ts/jobs/removeStorageKeyJobQueue.ts +++ b/ts/jobs/removeStorageKeyJobQueue.ts @@ -31,8 +31,6 @@ export class RemoveStorageKeyJobQueue extends JobQueue export const removeStorageKeyJobQueue = new RemoveStorageKeyJobQueue({ store: jobQueueDatabaseStore, - queueType: 'remove storage key', - maxAttempts: 100, }); diff --git a/ts/jobs/reportSpamJobQueue.ts b/ts/jobs/reportSpamJobQueue.ts index 9972b909f31a..31387c0bc798 100644 --- a/ts/jobs/reportSpamJobQueue.ts +++ b/ts/jobs/reportSpamJobQueue.ts @@ -7,7 +7,7 @@ import * as moment from 'moment'; import { strictAssert } from '../util/assert'; import { waitForOnline } from '../util/waitForOnline'; import { isDone as isDeviceLinked } from '../util/registration'; -import * as log from '../logging/log'; +import type { LoggerType } from '../logging/log'; import { map } from '../util/iterables'; import { sleep } from '../util/sleep'; @@ -58,9 +58,10 @@ export class ReportSpamJobQueue extends JobQueue { return reportSpamJobDataSchema.parse(data); } - protected async run({ - data, - }: Readonly<{ data: ReportSpamJobData }>): Promise { + protected async run( + { data }: Readonly<{ data: ReportSpamJobData }>, + { log }: Readonly<{ log: LoggerType }> + ): Promise { const { e164, serverGuids } = data; await new Promise(resolve => { @@ -122,8 +123,6 @@ export class ReportSpamJobQueue extends JobQueue { export const reportSpamJobQueue = new ReportSpamJobQueue({ store: jobQueueDatabaseStore, - queueType: 'report spam', - maxAttempts: 25, }); diff --git a/ts/jobs/viewSyncJobQueue.ts b/ts/jobs/viewSyncJobQueue.ts index b2985bd55159..773210d41081 100644 --- a/ts/jobs/viewSyncJobQueue.ts +++ b/ts/jobs/viewSyncJobQueue.ts @@ -5,6 +5,7 @@ import * as z from 'zod'; import * as moment from 'moment'; +import type { LoggerType } from '../logging/log'; import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff'; import { runReadOrViewSyncJob } from './helpers/runReadOrViewSyncJob'; @@ -33,11 +34,12 @@ export class ViewSyncJobQueue extends JobQueue { protected async run( { data, timestamp }: Readonly<{ data: ViewSyncJobData; timestamp: number }>, - { attempt }: Readonly<{ attempt: number }> + { attempt, log }: Readonly<{ attempt: number; log: LoggerType }> ): Promise { await runReadOrViewSyncJob({ attempt, isView: true, + log, maxRetryTime: MAX_RETRY_TIME, syncs: data.viewSyncs, timestamp, @@ -47,8 +49,6 @@ export class ViewSyncJobQueue extends JobQueue { export const viewSyncJobQueue = new ViewSyncJobQueue({ store: jobQueueDatabaseStore, - queueType: 'view sync', - maxAttempts: exponentialBackoffMaxAttempts(MAX_RETRY_TIME), }); diff --git a/ts/jobs/viewedReceiptsJobQueue.ts b/ts/jobs/viewedReceiptsJobQueue.ts new file mode 100644 index 000000000000..297d285aabe6 --- /dev/null +++ b/ts/jobs/viewedReceiptsJobQueue.ts @@ -0,0 +1,64 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +/* eslint-disable class-methods-use-this */ + +import { z } from 'zod'; +import * as moment from 'moment'; +import type { LoggerType } from '../logging/log'; +import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff'; +import { commonShouldJobContinue } from './helpers/commonShouldJobContinue'; +import { sendViewedReceipt } from '../util/sendViewedReceipt'; + +import { JobQueue } from './JobQueue'; +import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; +import { handleCommonJobRequestError } from './helpers/handleCommonJobRequestError'; + +const MAX_RETRY_TIME = moment.duration(1, 'day').asMilliseconds(); + +const viewedReceiptsJobDataSchema = z.object({ + viewedReceipt: z.object({ + messageId: z.string(), + senderE164: z.string().optional(), + senderUuid: z.string().optional(), + timestamp: z.number(), + }), +}); + +type ViewedReceiptsJobData = z.infer; + +export class ViewedReceiptsJobQueue extends JobQueue { + protected parseData(data: unknown): ViewedReceiptsJobData { + return viewedReceiptsJobDataSchema.parse(data); + } + + protected async run( + { + data, + timestamp, + }: Readonly<{ data: ViewedReceiptsJobData; timestamp: number }>, + { attempt, log }: Readonly<{ attempt: number; log: LoggerType }> + ): Promise { + const shouldContinue = await commonShouldJobContinue({ + attempt, + log, + maxRetryTime: MAX_RETRY_TIME, + timestamp, + }); + if (!shouldContinue) { + return; + } + + try { + await sendViewedReceipt(data.viewedReceipt); + } catch (err: unknown) { + handleCommonJobRequestError(err, log); + } + } +} + +export const viewedReceiptsJobQueue = new ViewedReceiptsJobQueue({ + store: jobQueueDatabaseStore, + queueType: 'viewed receipts', + maxAttempts: exponentialBackoffMaxAttempts(MAX_RETRY_TIME), +}); diff --git a/ts/logging/log.ts b/ts/logging/log.ts index 6e5e7f0bcdd7..50ae59ea67f0 100644 --- a/ts/logging/log.ts +++ b/ts/logging/log.ts @@ -13,6 +13,16 @@ let logAtLevel: LogAtLevelFnType = noop; let hasInitialized = false; type LogFn = (...args: ReadonlyArray) => void; + +export type LoggerType = { + fatal: LogFn; + error: LogFn; + warn: LogFn; + info: LogFn; + debug: LogFn; + trace: LogFn; +}; + export const fatal: LogFn = (...args) => logAtLevel(LogLevel.Fatal, ...args); export const error: LogFn = (...args) => logAtLevel(LogLevel.Error, ...args); export const warn: LogFn = (...args) => logAtLevel(LogLevel.Warn, ...args); diff --git a/ts/test-node/jobs/JobLogger_test.ts b/ts/test-node/jobs/JobLogger_test.ts new file mode 100644 index 000000000000..c723e8d608b0 --- /dev/null +++ b/ts/test-node/jobs/JobLogger_test.ts @@ -0,0 +1,54 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import * as sinon from 'sinon'; + +import { JobLogger } from '../../jobs/JobLogger'; + +describe('JobLogger', () => { + const LEVELS = ['fatal', 'error', 'warn', 'info', 'debug', 'trace'] as const; + + const createFakeLogger = () => ({ + fatal: sinon.fake(), + error: sinon.fake(), + warn: sinon.fake(), + info: sinon.fake(), + debug: sinon.fake(), + trace: sinon.fake(), + }); + + LEVELS.forEach(level => { + describe(level, () => { + it('logs its arguments with a prefix', () => { + const fakeLogger = createFakeLogger(); + + const logger = new JobLogger( + { id: 'abc', queueType: 'test queue' }, + fakeLogger + ); + + logger.attempt = 123; + logger[level]('foo', 456); + + sinon.assert.calledOnce(fakeLogger[level]); + + sinon.assert.calledWith( + fakeLogger[level], + sinon.match( + (arg: unknown) => + typeof arg === 'string' && + arg.includes('test queue') && + arg.includes('abc') && + arg.includes('123') + ), + 'foo', + 456 + ); + + LEVELS.filter(l => l !== level).forEach(otherLevel => { + sinon.assert.notCalled(fakeLogger[otherLevel]); + }); + }); + }); + }); +}); diff --git a/ts/test-node/jobs/JobQueue_test.ts b/ts/test-node/jobs/JobQueue_test.ts index a7532f6437b8..8b6569fed17a 100644 --- a/ts/test-node/jobs/JobQueue_test.ts +++ b/ts/test-node/jobs/JobQueue_test.ts @@ -12,6 +12,7 @@ import { JobError } from '../../jobs/JobError'; import { TestJobQueueStore } from './TestJobQueueStore'; import { missingCaseError } from '../../util/missingCaseError'; import { assertRejects } from '../helpers'; +import type { LoggerType } from '../../logging/log'; import { JobQueue } from '../../jobs/JobQueue'; import { ParsedJob, StoredJob, JobQueueStore } from '../../jobs/types'; @@ -240,6 +241,62 @@ describe('JobQueue', () => { assert.deepStrictEqual(attempts, [1, 2, 3, 4, 5, 6]); }); + it('passes a logger to the run function', async () => { + const uniqueString = uuid(); + + const fakeLogger = { + fatal: sinon.fake(), + error: sinon.fake(), + warn: sinon.fake(), + info: sinon.fake(), + debug: sinon.fake(), + trace: sinon.fake(), + }; + + class TestQueue extends JobQueue { + parseData(data: unknown): number { + return z.number().parse(data); + } + + async run( + _: unknown, + { log }: Readonly<{ log: LoggerType }> + ): Promise { + log.info(uniqueString); + log.warn(uniqueString); + log.error(uniqueString); + } + } + + const queue = new TestQueue({ + store: new TestJobQueueStore(), + queueType: 'test queue 123', + maxAttempts: 6, + logger: fakeLogger, + }); + + queue.streamJobs(); + + const job = await queue.add(1); + await job.completion; + + [fakeLogger.info, fakeLogger.warn, fakeLogger.error].forEach(logFn => { + sinon.assert.calledWith( + logFn, + sinon.match( + (arg: unknown) => + typeof arg === 'string' && + arg.includes(job.id) && + arg.includes('test queue 123') + ), + sinon.match( + (arg: unknown) => + typeof arg === 'string' && arg.includes(uniqueString) + ) + ); + }); + }); + it('makes job.completion reject if parseData throws', async () => { class TestQueue extends JobQueue { parseData(data: unknown): string { diff --git a/ts/textsecure/SendMessage.ts b/ts/textsecure/SendMessage.ts index 57f67a0d525e..69c6b4279fd4 100644 --- a/ts/textsecure/SendMessage.ts +++ b/ts/textsecure/SendMessage.ts @@ -1550,54 +1550,69 @@ export default class MessageSender { }); } - async sendDeliveryReceipt({ - e164, - uuid, + async sendDeliveryReceipt( + options: Readonly<{ + senderE164?: string; + senderUuid?: string; + timestamps: Array; + options?: Readonly; + }> + ): Promise { + return this.sendReceiptMessage({ + ...options, + type: Proto.ReceiptMessage.Type.DELIVERY, + }); + } + + async sendReadReceipts( + options: Readonly<{ + senderE164?: string; + senderUuid?: string; + timestamps: Array; + options?: Readonly; + }> + ): Promise { + return this.sendReceiptMessage({ + ...options, + type: Proto.ReceiptMessage.Type.READ, + }); + } + + async sendViewedReceipts( + options: Readonly<{ + senderE164?: string; + senderUuid?: string; + timestamps: Array; + options?: Readonly; + }> + ): Promise { + return this.sendReceiptMessage({ + ...options, + type: Proto.ReceiptMessage.Type.VIEWED, + }); + } + + private async sendReceiptMessage({ + senderE164, + senderUuid, timestamps, + type, options, }: Readonly<{ - e164?: string; - uuid?: string; + senderE164?: string; + senderUuid?: string; timestamps: Array; + type: Proto.ReceiptMessage.Type; options?: Readonly; }>): Promise { - if (!uuid && !e164) { + if (!senderUuid && !senderE164) { throw new Error( - 'sendDeliveryReceipt: Neither uuid nor e164 was provided!' + 'sendReceiptMessage: Neither uuid nor e164 was provided!' ); } const receiptMessage = new Proto.ReceiptMessage(); - receiptMessage.type = Proto.ReceiptMessage.Type.DELIVERY; - receiptMessage.timestamp = timestamps; - - const contentMessage = new Proto.Content(); - contentMessage.receiptMessage = receiptMessage; - - const { ContentHint } = Proto.UnidentifiedSenderMessage.Message; - - return this.sendIndividualProto({ - identifier: uuid || e164, - proto: contentMessage, - timestamp: Date.now(), - contentHint: ContentHint.RESENDABLE, - options, - }); - } - - async sendReadReceipts({ - senderE164, - senderUuid, - timestamps, - options, - }: Readonly<{ - senderE164: string; - senderUuid: string; - timestamps: Array; - options?: Readonly; - }>): Promise { - const receiptMessage = new Proto.ReceiptMessage(); - receiptMessage.type = Proto.ReceiptMessage.Type.READ; + receiptMessage.type = type; receiptMessage.timestamp = timestamps; const contentMessage = new Proto.Content(); diff --git a/ts/util/handleMessageSend.ts b/ts/util/handleMessageSend.ts index 3bd8a2a199a6..6f8531808fe6 100644 --- a/ts/util/handleMessageSend.ts +++ b/ts/util/handleMessageSend.ts @@ -37,7 +37,8 @@ export type SendTypesType = | 'typing' // excluded from send log | 'verificationSync' | 'viewOnceSync' - | 'viewSync'; + | 'viewSync' + | 'viewedReceipt'; export function shouldSaveProto(sendType: SendTypesType): boolean { if (sendType === 'callingMessage') { diff --git a/ts/util/sendViewedReceipt.ts b/ts/util/sendViewedReceipt.ts new file mode 100644 index 000000000000..9c749b300b1c --- /dev/null +++ b/ts/util/sendViewedReceipt.ts @@ -0,0 +1,55 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import type { ConversationAttributesType } from '../model-types.d'; +import { getSendOptions } from './getSendOptions'; +import { handleMessageSend } from './handleMessageSend'; +import { isConversationAccepted } from './isConversationAccepted'; + +export async function sendViewedReceipt({ + messageId, + senderE164, + senderUuid, + timestamp, +}: Readonly<{ + messageId: string; + senderE164?: string; + senderUuid?: string; + timestamp: number; +}>): Promise { + if (!window.storage.get('read-receipt-setting')) { + return; + } + + const conversationId = window.ConversationController.ensureContactIds({ + e164: senderE164, + uuid: senderUuid, + }); + if (!conversationId) { + throw new Error( + 'sendViewedReceipt: no conversation found with that E164/UUID' + ); + } + + const conversation = window.ConversationController.get(conversationId); + if (!conversation) { + throw new Error( + 'sendViewedReceipt: no conversation found with that conversation ID, even though we found the ID with E164/UUID?' + ); + } + + const conversationAttrs: ConversationAttributesType = conversation.attributes; + if (!isConversationAccepted(conversationAttrs)) { + return; + } + + await handleMessageSend( + window.textsecure.messaging.sendViewedReceipts({ + senderE164, + senderUuid, + timestamps: [timestamp], + options: await getSendOptions(conversationAttrs), + }), + { messageIds: [messageId], sendType: 'viewedReceipt' } + ); +} diff --git a/ts/views/conversation_view.ts b/ts/views/conversation_view.ts index 625e9d18bea0..bf556d51486c 100644 --- a/ts/views/conversation_view.ts +++ b/ts/views/conversation_view.ts @@ -63,6 +63,7 @@ import { } from '../util/handleImageAttachment'; import { ReadStatus } from '../messages/MessageReadStatus'; import { markViewed } from '../services/MessageUpdater'; +import { viewedReceiptsJobQueue } from '../jobs/viewedReceiptsJobQueue'; import { viewSyncJobQueue } from '../jobs/viewSyncJobQueue'; import type { ContactType } from '../types/Contact'; import type { WhatIsThis } from '../window.d'; @@ -879,14 +880,28 @@ Whisper.ConversationView = Whisper.View.extend({ return; } + const senderE164 = message.get('source'); + const senderUuid = message.get('sourceUuid'); + const timestamp = message.get('sent_at'); + message.set(markViewed(message.attributes, Date.now())); + + viewedReceiptsJobQueue.add({ + viewedReceipt: { + messageId, + senderE164, + senderUuid, + timestamp, + }, + }); + viewSyncJobQueue.add({ viewSyncs: [ { messageId, - senderE164: message.get('source'), - senderUuid: message.get('sourceUuid'), - timestamp: message.get('sent_at'), + senderE164, + senderUuid, + timestamp, }, ], });