Send viewed receipt when you start listening to an audio message
This commit is contained in:
parent
caf544b3a1
commit
75f0cd50be
19 changed files with 483 additions and 109 deletions
|
@ -350,8 +350,8 @@ export async function startApp(): Promise<void> {
|
|||
continue;
|
||||
}
|
||||
|
||||
const uuid = c.get('uuid');
|
||||
const e164 = c.get('e164');
|
||||
const senderUuid = c.get('uuid');
|
||||
const senderE164 = c.get('e164');
|
||||
|
||||
c.queueJob('sendDeliveryReceipt', async () => {
|
||||
try {
|
||||
|
@ -360,8 +360,8 @@ export async function startApp(): Promise<void> {
|
|||
// eslint-disable-next-line no-await-in-loop
|
||||
await handleMessageSend(
|
||||
window.textsecure.messaging.sendDeliveryReceipt({
|
||||
e164,
|
||||
uuid,
|
||||
senderE164,
|
||||
senderUuid,
|
||||
timestamps,
|
||||
options: sendOptions,
|
||||
}),
|
||||
|
@ -369,7 +369,7 @@ export async function startApp(): Promise<void> {
|
|||
);
|
||||
} catch (error) {
|
||||
window.log.error(
|
||||
`Failed to send delivery receipt to ${e164}/${uuid} for timestamps ${timestamps}:`,
|
||||
`Failed to send delivery receipt to ${senderE164}/${senderUuid} for timestamps ${timestamps}:`,
|
||||
error && error.stack ? error.stack : error
|
||||
);
|
||||
}
|
||||
|
|
49
ts/jobs/JobLogger.ts
Normal file
49
ts/jobs/JobLogger.ts
Normal file
|
@ -0,0 +1,49 @@
|
|||
// Copyright 2021 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import type { LoggerType } from '../logging/log';
|
||||
import type { ParsedJob } from './types';
|
||||
|
||||
export class JobLogger implements LoggerType {
|
||||
private id: string;
|
||||
|
||||
private queueType: string;
|
||||
|
||||
public attempt = -1;
|
||||
|
||||
constructor(
|
||||
job: Readonly<Pick<ParsedJob<unknown>, 'id' | 'queueType'>>,
|
||||
private logger: LoggerType
|
||||
) {
|
||||
this.id = job.id;
|
||||
this.queueType = job.queueType;
|
||||
}
|
||||
|
||||
fatal(...args: ReadonlyArray<unknown>): void {
|
||||
this.logger.fatal(this.prefix(), ...args);
|
||||
}
|
||||
|
||||
error(...args: ReadonlyArray<unknown>): void {
|
||||
this.logger.error(this.prefix(), ...args);
|
||||
}
|
||||
|
||||
warn(...args: ReadonlyArray<unknown>): void {
|
||||
this.logger.warn(this.prefix(), ...args);
|
||||
}
|
||||
|
||||
info(...args: ReadonlyArray<unknown>): void {
|
||||
this.logger.info(this.prefix(), ...args);
|
||||
}
|
||||
|
||||
debug(...args: ReadonlyArray<unknown>): void {
|
||||
this.logger.debug(this.prefix(), ...args);
|
||||
}
|
||||
|
||||
trace(...args: ReadonlyArray<unknown>): void {
|
||||
this.logger.trace(this.prefix(), ...args);
|
||||
}
|
||||
|
||||
private prefix(): string {
|
||||
return `${this.queueType} job queue, job ID ${this.id}, attempt ${this.attempt}:`;
|
||||
}
|
||||
}
|
|
@ -9,6 +9,7 @@ import { JobError } from './JobError';
|
|||
import { ParsedJob, StoredJob, JobQueueStore } from './types';
|
||||
import { assert } from '../util/assert';
|
||||
import * as log from '../logging/log';
|
||||
import { JobLogger } from './JobLogger';
|
||||
|
||||
const noopOnCompleteCallbacks = {
|
||||
resolve: noop,
|
||||
|
@ -32,6 +33,11 @@ type JobQueueOptions = {
|
|||
* the job to fail; a value of 2 will allow the job to fail once; etc.
|
||||
*/
|
||||
maxAttempts: number;
|
||||
|
||||
/**
|
||||
* A custom logger. Might be overwritten in test.
|
||||
*/
|
||||
logger?: log.LoggerType;
|
||||
};
|
||||
|
||||
export abstract class JobQueue<T> {
|
||||
|
@ -41,6 +47,8 @@ export abstract class JobQueue<T> {
|
|||
|
||||
private readonly store: JobQueueStore;
|
||||
|
||||
private readonly logger: log.LoggerType;
|
||||
|
||||
private readonly logPrefix: string;
|
||||
|
||||
private readonly onCompleteCallbacks = new Map<
|
||||
|
@ -70,6 +78,7 @@ export abstract class JobQueue<T> {
|
|||
this.maxAttempts = options.maxAttempts;
|
||||
this.queueType = options.queueType;
|
||||
this.store = options.store;
|
||||
this.logger = options.logger ?? log;
|
||||
|
||||
this.logPrefix = `${this.queueType} job queue:`;
|
||||
}
|
||||
|
@ -92,10 +101,13 @@ export abstract class JobQueue<T> {
|
|||
*
|
||||
* If it rejects, the job will be retried up to `maxAttempts - 1` times, after which it
|
||||
* will be deleted from the store.
|
||||
*
|
||||
* If your job logs things, you're encouraged to use the logger provided, as it
|
||||
* automatically includes debugging information.
|
||||
*/
|
||||
protected abstract run(
|
||||
job: Readonly<ParsedJob<T>>,
|
||||
extra?: Readonly<{ attempt: number }>
|
||||
extra?: Readonly<{ attempt?: number; log?: log.LoggerType }>
|
||||
): Promise<void>;
|
||||
|
||||
/**
|
||||
|
@ -188,12 +200,16 @@ export abstract class JobQueue<T> {
|
|||
data: parsedData,
|
||||
};
|
||||
|
||||
const logger = new JobLogger(parsedJob, this.logger);
|
||||
|
||||
let result:
|
||||
| undefined
|
||||
| { success: true }
|
||||
| { success: false; err: unknown };
|
||||
|
||||
for (let attempt = 1; attempt <= this.maxAttempts; attempt += 1) {
|
||||
logger.attempt = attempt;
|
||||
|
||||
log.info(
|
||||
`${this.logPrefix} running job ${storedJob.id}, attempt ${attempt} of ${this.maxAttempts}`
|
||||
);
|
||||
|
@ -201,7 +217,7 @@ export abstract class JobQueue<T> {
|
|||
// We want an `await` in the loop, as we don't want a single job running more
|
||||
// than once at a time. Ideally, the job will succeed on the first attempt.
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
await this.run(parsedJob, { attempt });
|
||||
await this.run(parsedJob, { attempt, log: logger });
|
||||
result = { success: true };
|
||||
log.info(
|
||||
`${this.logPrefix} job ${storedJob.id} succeeded on attempt ${attempt}`
|
||||
|
|
50
ts/jobs/helpers/commonShouldJobContinue.ts
Normal file
50
ts/jobs/helpers/commonShouldJobContinue.ts
Normal file
|
@ -0,0 +1,50 @@
|
|||
// Copyright 2021 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import type { LoggerType } from '../../logging/log';
|
||||
import { waitForOnline } from '../../util/waitForOnline';
|
||||
import { sleep } from '../../util/sleep';
|
||||
import { exponentialBackoffSleepTime } from '../../util/exponentialBackoff';
|
||||
import { isDone as isDeviceLinked } from '../../util/registration';
|
||||
|
||||
export async function commonShouldJobContinue({
|
||||
attempt,
|
||||
log,
|
||||
maxRetryTime,
|
||||
timestamp,
|
||||
}: Readonly<{
|
||||
attempt: number;
|
||||
log: LoggerType;
|
||||
maxRetryTime: number;
|
||||
timestamp: number;
|
||||
}>): Promise<boolean> {
|
||||
const maxJobAge = timestamp + maxRetryTime;
|
||||
const timeRemaining = maxJobAge - Date.now();
|
||||
|
||||
if (timeRemaining <= 0) {
|
||||
log.info("giving up because it's been too long");
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
await waitForOnline(window.navigator, window, { timeout: timeRemaining });
|
||||
} catch (err: unknown) {
|
||||
log.info("didn't come online in time, giving up");
|
||||
return false;
|
||||
}
|
||||
|
||||
await new Promise<void>(resolve => {
|
||||
window.storage.onready(resolve);
|
||||
});
|
||||
|
||||
if (!isDeviceLinked()) {
|
||||
log.info("skipping this job because we're unlinked");
|
||||
return false;
|
||||
}
|
||||
|
||||
const sleepTime = exponentialBackoffSleepTime(attempt);
|
||||
log.info(`sleeping for ${sleepTime}`);
|
||||
await sleep(sleepTime);
|
||||
|
||||
return true;
|
||||
}
|
22
ts/jobs/helpers/handleCommonJobRequestError.ts
Normal file
22
ts/jobs/helpers/handleCommonJobRequestError.ts
Normal file
|
@ -0,0 +1,22 @@
|
|||
// Copyright 2021 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import type { LoggerType } from '../../logging/log';
|
||||
import { parseIntWithFallback } from '../../util/parseIntWithFallback';
|
||||
|
||||
export function handleCommonJobRequestError(
|
||||
err: unknown,
|
||||
log: LoggerType
|
||||
): void {
|
||||
if (!(err instanceof Error)) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
const code = parseIntWithFallback(err.code, -1);
|
||||
if (code === 508) {
|
||||
log.info('server responded with 508. Giving up on this job');
|
||||
return;
|
||||
}
|
||||
|
||||
throw err;
|
||||
}
|
|
@ -2,27 +2,27 @@
|
|||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import { chunk } from 'lodash';
|
||||
import * as log from '../../logging/log';
|
||||
import { waitForOnline } from '../../util/waitForOnline';
|
||||
import type { LoggerType } from '../../logging/log';
|
||||
import { getSendOptions } from '../../util/getSendOptions';
|
||||
import { handleMessageSend, SendTypesType } from '../../util/handleMessageSend';
|
||||
import { isNotNil } from '../../util/isNotNil';
|
||||
import { sleep } from '../../util/sleep';
|
||||
import { exponentialBackoffSleepTime } from '../../util/exponentialBackoff';
|
||||
import { isDone as isDeviceLinked } from '../../util/registration';
|
||||
import { parseIntWithFallback } from '../../util/parseIntWithFallback';
|
||||
|
||||
import { commonShouldJobContinue } from './commonShouldJobContinue';
|
||||
import { handleCommonJobRequestError } from './handleCommonJobRequestError';
|
||||
|
||||
const CHUNK_SIZE = 100;
|
||||
|
||||
export async function runReadOrViewSyncJob({
|
||||
attempt,
|
||||
isView,
|
||||
log,
|
||||
maxRetryTime,
|
||||
syncs,
|
||||
timestamp,
|
||||
}: Readonly<{
|
||||
attempt: number;
|
||||
isView: boolean;
|
||||
log: LoggerType;
|
||||
maxRetryTime: number;
|
||||
syncs: ReadonlyArray<{
|
||||
messageId?: string;
|
||||
|
@ -33,59 +33,36 @@ export async function runReadOrViewSyncJob({
|
|||
timestamp: number;
|
||||
}>): Promise<void> {
|
||||
let sendType: SendTypesType;
|
||||
let nameForLogging: string;
|
||||
let doSync:
|
||||
| typeof window.textsecure.messaging.syncReadMessages
|
||||
| typeof window.textsecure.messaging.syncView;
|
||||
if (isView) {
|
||||
sendType = 'viewSync';
|
||||
nameForLogging = 'viewSyncJobQueue';
|
||||
doSync = window.textsecure.messaging.syncView.bind(
|
||||
window.textsecure.messaging
|
||||
);
|
||||
} else {
|
||||
sendType = 'readSync';
|
||||
nameForLogging = 'readSyncJobQueue';
|
||||
doSync = window.textsecure.messaging.syncReadMessages.bind(
|
||||
window.textsecure.messaging
|
||||
);
|
||||
}
|
||||
|
||||
const logInfo = (message: string): void => {
|
||||
log.info(`${nameForLogging}: ${message}`);
|
||||
};
|
||||
|
||||
if (!syncs.length) {
|
||||
logInfo("skipping this job because there's nothing to sync");
|
||||
log.info("skipping this job because there's nothing to sync");
|
||||
return;
|
||||
}
|
||||
|
||||
const maxJobAge = timestamp + maxRetryTime;
|
||||
const timeRemaining = maxJobAge - Date.now();
|
||||
|
||||
if (timeRemaining <= 0) {
|
||||
logInfo("giving up because it's been too long");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await waitForOnline(window.navigator, window, { timeout: timeRemaining });
|
||||
} catch (err) {
|
||||
logInfo("didn't come online in time, giving up");
|
||||
return;
|
||||
}
|
||||
|
||||
await new Promise<void>(resolve => {
|
||||
window.storage.onready(resolve);
|
||||
const shouldContinue = await commonShouldJobContinue({
|
||||
attempt,
|
||||
log,
|
||||
maxRetryTime,
|
||||
timestamp,
|
||||
});
|
||||
|
||||
if (!isDeviceLinked()) {
|
||||
logInfo("skipping this job because we're unlinked");
|
||||
if (!shouldContinue) {
|
||||
return;
|
||||
}
|
||||
|
||||
await sleep(exponentialBackoffSleepTime(attempt));
|
||||
|
||||
const ourConversation = window.ConversationController.getOurConversationOrThrow();
|
||||
const sendOptions = await getSendOptions(ourConversation.attributes, {
|
||||
syncMessage: true,
|
||||
|
@ -103,16 +80,6 @@ export async function runReadOrViewSyncJob({
|
|||
})
|
||||
);
|
||||
} catch (err: unknown) {
|
||||
if (!(err instanceof Error)) {
|
||||
throw err;
|
||||
}
|
||||
|
||||
const code = parseIntWithFallback(err.code, -1);
|
||||
if (code === 508) {
|
||||
logInfo('server responded with 508. Giving up on this job');
|
||||
return;
|
||||
}
|
||||
|
||||
throw err;
|
||||
handleCommonJobRequestError(err, log);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import { readSyncJobQueue } from './readSyncJobQueue';
|
|||
import { removeStorageKeyJobQueue } from './removeStorageKeyJobQueue';
|
||||
import { reportSpamJobQueue } from './reportSpamJobQueue';
|
||||
import { viewSyncJobQueue } from './viewSyncJobQueue';
|
||||
import { viewedReceiptsJobQueue } from './viewedReceiptsJobQueue';
|
||||
|
||||
/**
|
||||
* Start all of the job queues. Should be called when the database is ready.
|
||||
|
@ -22,4 +23,5 @@ export function initializeAllJobQueues({
|
|||
removeStorageKeyJobQueue.streamJobs();
|
||||
reportSpamJobQueue.streamJobs();
|
||||
viewSyncJobQueue.streamJobs();
|
||||
viewedReceiptsJobQueue.streamJobs();
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
|
||||
import * as z from 'zod';
|
||||
import * as moment from 'moment';
|
||||
import type { LoggerType } from '../logging/log';
|
||||
import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff';
|
||||
import { runReadOrViewSyncJob } from './helpers/runReadOrViewSyncJob';
|
||||
|
||||
|
@ -33,11 +34,12 @@ export class ReadSyncJobQueue extends JobQueue<ReadSyncJobData> {
|
|||
|
||||
protected async run(
|
||||
{ data, timestamp }: Readonly<{ data: ReadSyncJobData; timestamp: number }>,
|
||||
{ attempt }: Readonly<{ attempt: number }>
|
||||
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
|
||||
): Promise<void> {
|
||||
await runReadOrViewSyncJob({
|
||||
attempt,
|
||||
isView: false,
|
||||
log,
|
||||
maxRetryTime: MAX_RETRY_TIME,
|
||||
syncs: data.readSyncs,
|
||||
timestamp,
|
||||
|
@ -47,8 +49,6 @@ export class ReadSyncJobQueue extends JobQueue<ReadSyncJobData> {
|
|||
|
||||
export const readSyncJobQueue = new ReadSyncJobQueue({
|
||||
store: jobQueueDatabaseStore,
|
||||
|
||||
queueType: 'read sync',
|
||||
|
||||
maxAttempts: exponentialBackoffMaxAttempts(MAX_RETRY_TIME),
|
||||
});
|
||||
|
|
|
@ -31,8 +31,6 @@ export class RemoveStorageKeyJobQueue extends JobQueue<RemoveStorageKeyJobData>
|
|||
|
||||
export const removeStorageKeyJobQueue = new RemoveStorageKeyJobQueue({
|
||||
store: jobQueueDatabaseStore,
|
||||
|
||||
queueType: 'remove storage key',
|
||||
|
||||
maxAttempts: 100,
|
||||
});
|
||||
|
|
|
@ -7,7 +7,7 @@ import * as moment from 'moment';
|
|||
import { strictAssert } from '../util/assert';
|
||||
import { waitForOnline } from '../util/waitForOnline';
|
||||
import { isDone as isDeviceLinked } from '../util/registration';
|
||||
import * as log from '../logging/log';
|
||||
import type { LoggerType } from '../logging/log';
|
||||
import { map } from '../util/iterables';
|
||||
import { sleep } from '../util/sleep';
|
||||
|
||||
|
@ -58,9 +58,10 @@ export class ReportSpamJobQueue extends JobQueue<ReportSpamJobData> {
|
|||
return reportSpamJobDataSchema.parse(data);
|
||||
}
|
||||
|
||||
protected async run({
|
||||
data,
|
||||
}: Readonly<{ data: ReportSpamJobData }>): Promise<void> {
|
||||
protected async run(
|
||||
{ data }: Readonly<{ data: ReportSpamJobData }>,
|
||||
{ log }: Readonly<{ log: LoggerType }>
|
||||
): Promise<void> {
|
||||
const { e164, serverGuids } = data;
|
||||
|
||||
await new Promise<void>(resolve => {
|
||||
|
@ -122,8 +123,6 @@ export class ReportSpamJobQueue extends JobQueue<ReportSpamJobData> {
|
|||
|
||||
export const reportSpamJobQueue = new ReportSpamJobQueue({
|
||||
store: jobQueueDatabaseStore,
|
||||
|
||||
queueType: 'report spam',
|
||||
|
||||
maxAttempts: 25,
|
||||
});
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
|
||||
import * as z from 'zod';
|
||||
import * as moment from 'moment';
|
||||
import type { LoggerType } from '../logging/log';
|
||||
import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff';
|
||||
import { runReadOrViewSyncJob } from './helpers/runReadOrViewSyncJob';
|
||||
|
||||
|
@ -33,11 +34,12 @@ export class ViewSyncJobQueue extends JobQueue<ViewSyncJobData> {
|
|||
|
||||
protected async run(
|
||||
{ data, timestamp }: Readonly<{ data: ViewSyncJobData; timestamp: number }>,
|
||||
{ attempt }: Readonly<{ attempt: number }>
|
||||
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
|
||||
): Promise<void> {
|
||||
await runReadOrViewSyncJob({
|
||||
attempt,
|
||||
isView: true,
|
||||
log,
|
||||
maxRetryTime: MAX_RETRY_TIME,
|
||||
syncs: data.viewSyncs,
|
||||
timestamp,
|
||||
|
@ -47,8 +49,6 @@ export class ViewSyncJobQueue extends JobQueue<ViewSyncJobData> {
|
|||
|
||||
export const viewSyncJobQueue = new ViewSyncJobQueue({
|
||||
store: jobQueueDatabaseStore,
|
||||
|
||||
queueType: 'view sync',
|
||||
|
||||
maxAttempts: exponentialBackoffMaxAttempts(MAX_RETRY_TIME),
|
||||
});
|
||||
|
|
64
ts/jobs/viewedReceiptsJobQueue.ts
Normal file
64
ts/jobs/viewedReceiptsJobQueue.ts
Normal file
|
@ -0,0 +1,64 @@
|
|||
// Copyright 2021 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
/* eslint-disable class-methods-use-this */
|
||||
|
||||
import { z } from 'zod';
|
||||
import * as moment from 'moment';
|
||||
import type { LoggerType } from '../logging/log';
|
||||
import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff';
|
||||
import { commonShouldJobContinue } from './helpers/commonShouldJobContinue';
|
||||
import { sendViewedReceipt } from '../util/sendViewedReceipt';
|
||||
|
||||
import { JobQueue } from './JobQueue';
|
||||
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
|
||||
import { handleCommonJobRequestError } from './helpers/handleCommonJobRequestError';
|
||||
|
||||
const MAX_RETRY_TIME = moment.duration(1, 'day').asMilliseconds();
|
||||
|
||||
const viewedReceiptsJobDataSchema = z.object({
|
||||
viewedReceipt: z.object({
|
||||
messageId: z.string(),
|
||||
senderE164: z.string().optional(),
|
||||
senderUuid: z.string().optional(),
|
||||
timestamp: z.number(),
|
||||
}),
|
||||
});
|
||||
|
||||
type ViewedReceiptsJobData = z.infer<typeof viewedReceiptsJobDataSchema>;
|
||||
|
||||
export class ViewedReceiptsJobQueue extends JobQueue<ViewedReceiptsJobData> {
|
||||
protected parseData(data: unknown): ViewedReceiptsJobData {
|
||||
return viewedReceiptsJobDataSchema.parse(data);
|
||||
}
|
||||
|
||||
protected async run(
|
||||
{
|
||||
data,
|
||||
timestamp,
|
||||
}: Readonly<{ data: ViewedReceiptsJobData; timestamp: number }>,
|
||||
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
|
||||
): Promise<void> {
|
||||
const shouldContinue = await commonShouldJobContinue({
|
||||
attempt,
|
||||
log,
|
||||
maxRetryTime: MAX_RETRY_TIME,
|
||||
timestamp,
|
||||
});
|
||||
if (!shouldContinue) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await sendViewedReceipt(data.viewedReceipt);
|
||||
} catch (err: unknown) {
|
||||
handleCommonJobRequestError(err, log);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const viewedReceiptsJobQueue = new ViewedReceiptsJobQueue({
|
||||
store: jobQueueDatabaseStore,
|
||||
queueType: 'viewed receipts',
|
||||
maxAttempts: exponentialBackoffMaxAttempts(MAX_RETRY_TIME),
|
||||
});
|
|
@ -13,6 +13,16 @@ let logAtLevel: LogAtLevelFnType = noop;
|
|||
let hasInitialized = false;
|
||||
|
||||
type LogFn = (...args: ReadonlyArray<unknown>) => void;
|
||||
|
||||
export type LoggerType = {
|
||||
fatal: LogFn;
|
||||
error: LogFn;
|
||||
warn: LogFn;
|
||||
info: LogFn;
|
||||
debug: LogFn;
|
||||
trace: LogFn;
|
||||
};
|
||||
|
||||
export const fatal: LogFn = (...args) => logAtLevel(LogLevel.Fatal, ...args);
|
||||
export const error: LogFn = (...args) => logAtLevel(LogLevel.Error, ...args);
|
||||
export const warn: LogFn = (...args) => logAtLevel(LogLevel.Warn, ...args);
|
||||
|
|
54
ts/test-node/jobs/JobLogger_test.ts
Normal file
54
ts/test-node/jobs/JobLogger_test.ts
Normal file
|
@ -0,0 +1,54 @@
|
|||
// Copyright 2021 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import * as sinon from 'sinon';
|
||||
|
||||
import { JobLogger } from '../../jobs/JobLogger';
|
||||
|
||||
describe('JobLogger', () => {
|
||||
const LEVELS = ['fatal', 'error', 'warn', 'info', 'debug', 'trace'] as const;
|
||||
|
||||
const createFakeLogger = () => ({
|
||||
fatal: sinon.fake(),
|
||||
error: sinon.fake(),
|
||||
warn: sinon.fake(),
|
||||
info: sinon.fake(),
|
||||
debug: sinon.fake(),
|
||||
trace: sinon.fake(),
|
||||
});
|
||||
|
||||
LEVELS.forEach(level => {
|
||||
describe(level, () => {
|
||||
it('logs its arguments with a prefix', () => {
|
||||
const fakeLogger = createFakeLogger();
|
||||
|
||||
const logger = new JobLogger(
|
||||
{ id: 'abc', queueType: 'test queue' },
|
||||
fakeLogger
|
||||
);
|
||||
|
||||
logger.attempt = 123;
|
||||
logger[level]('foo', 456);
|
||||
|
||||
sinon.assert.calledOnce(fakeLogger[level]);
|
||||
|
||||
sinon.assert.calledWith(
|
||||
fakeLogger[level],
|
||||
sinon.match(
|
||||
(arg: unknown) =>
|
||||
typeof arg === 'string' &&
|
||||
arg.includes('test queue') &&
|
||||
arg.includes('abc') &&
|
||||
arg.includes('123')
|
||||
),
|
||||
'foo',
|
||||
456
|
||||
);
|
||||
|
||||
LEVELS.filter(l => l !== level).forEach(otherLevel => {
|
||||
sinon.assert.notCalled(fakeLogger[otherLevel]);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
|
@ -12,6 +12,7 @@ import { JobError } from '../../jobs/JobError';
|
|||
import { TestJobQueueStore } from './TestJobQueueStore';
|
||||
import { missingCaseError } from '../../util/missingCaseError';
|
||||
import { assertRejects } from '../helpers';
|
||||
import type { LoggerType } from '../../logging/log';
|
||||
|
||||
import { JobQueue } from '../../jobs/JobQueue';
|
||||
import { ParsedJob, StoredJob, JobQueueStore } from '../../jobs/types';
|
||||
|
@ -240,6 +241,62 @@ describe('JobQueue', () => {
|
|||
assert.deepStrictEqual(attempts, [1, 2, 3, 4, 5, 6]);
|
||||
});
|
||||
|
||||
it('passes a logger to the run function', async () => {
|
||||
const uniqueString = uuid();
|
||||
|
||||
const fakeLogger = {
|
||||
fatal: sinon.fake(),
|
||||
error: sinon.fake(),
|
||||
warn: sinon.fake(),
|
||||
info: sinon.fake(),
|
||||
debug: sinon.fake(),
|
||||
trace: sinon.fake(),
|
||||
};
|
||||
|
||||
class TestQueue extends JobQueue<number> {
|
||||
parseData(data: unknown): number {
|
||||
return z.number().parse(data);
|
||||
}
|
||||
|
||||
async run(
|
||||
_: unknown,
|
||||
{ log }: Readonly<{ log: LoggerType }>
|
||||
): Promise<void> {
|
||||
log.info(uniqueString);
|
||||
log.warn(uniqueString);
|
||||
log.error(uniqueString);
|
||||
}
|
||||
}
|
||||
|
||||
const queue = new TestQueue({
|
||||
store: new TestJobQueueStore(),
|
||||
queueType: 'test queue 123',
|
||||
maxAttempts: 6,
|
||||
logger: fakeLogger,
|
||||
});
|
||||
|
||||
queue.streamJobs();
|
||||
|
||||
const job = await queue.add(1);
|
||||
await job.completion;
|
||||
|
||||
[fakeLogger.info, fakeLogger.warn, fakeLogger.error].forEach(logFn => {
|
||||
sinon.assert.calledWith(
|
||||
logFn,
|
||||
sinon.match(
|
||||
(arg: unknown) =>
|
||||
typeof arg === 'string' &&
|
||||
arg.includes(job.id) &&
|
||||
arg.includes('test queue 123')
|
||||
),
|
||||
sinon.match(
|
||||
(arg: unknown) =>
|
||||
typeof arg === 'string' && arg.includes(uniqueString)
|
||||
)
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
it('makes job.completion reject if parseData throws', async () => {
|
||||
class TestQueue extends JobQueue<string> {
|
||||
parseData(data: unknown): string {
|
||||
|
|
|
@ -1550,54 +1550,69 @@ export default class MessageSender {
|
|||
});
|
||||
}
|
||||
|
||||
async sendDeliveryReceipt({
|
||||
e164,
|
||||
uuid,
|
||||
async sendDeliveryReceipt(
|
||||
options: Readonly<{
|
||||
senderE164?: string;
|
||||
senderUuid?: string;
|
||||
timestamps: Array<number>;
|
||||
options?: Readonly<SendOptionsType>;
|
||||
}>
|
||||
): Promise<CallbackResultType> {
|
||||
return this.sendReceiptMessage({
|
||||
...options,
|
||||
type: Proto.ReceiptMessage.Type.DELIVERY,
|
||||
});
|
||||
}
|
||||
|
||||
async sendReadReceipts(
|
||||
options: Readonly<{
|
||||
senderE164?: string;
|
||||
senderUuid?: string;
|
||||
timestamps: Array<number>;
|
||||
options?: Readonly<SendOptionsType>;
|
||||
}>
|
||||
): Promise<CallbackResultType> {
|
||||
return this.sendReceiptMessage({
|
||||
...options,
|
||||
type: Proto.ReceiptMessage.Type.READ,
|
||||
});
|
||||
}
|
||||
|
||||
async sendViewedReceipts(
|
||||
options: Readonly<{
|
||||
senderE164?: string;
|
||||
senderUuid?: string;
|
||||
timestamps: Array<number>;
|
||||
options?: Readonly<SendOptionsType>;
|
||||
}>
|
||||
): Promise<CallbackResultType> {
|
||||
return this.sendReceiptMessage({
|
||||
...options,
|
||||
type: Proto.ReceiptMessage.Type.VIEWED,
|
||||
});
|
||||
}
|
||||
|
||||
private async sendReceiptMessage({
|
||||
senderE164,
|
||||
senderUuid,
|
||||
timestamps,
|
||||
type,
|
||||
options,
|
||||
}: Readonly<{
|
||||
e164?: string;
|
||||
uuid?: string;
|
||||
senderE164?: string;
|
||||
senderUuid?: string;
|
||||
timestamps: Array<number>;
|
||||
type: Proto.ReceiptMessage.Type;
|
||||
options?: Readonly<SendOptionsType>;
|
||||
}>): Promise<CallbackResultType> {
|
||||
if (!uuid && !e164) {
|
||||
if (!senderUuid && !senderE164) {
|
||||
throw new Error(
|
||||
'sendDeliveryReceipt: Neither uuid nor e164 was provided!'
|
||||
'sendReceiptMessage: Neither uuid nor e164 was provided!'
|
||||
);
|
||||
}
|
||||
|
||||
const receiptMessage = new Proto.ReceiptMessage();
|
||||
receiptMessage.type = Proto.ReceiptMessage.Type.DELIVERY;
|
||||
receiptMessage.timestamp = timestamps;
|
||||
|
||||
const contentMessage = new Proto.Content();
|
||||
contentMessage.receiptMessage = receiptMessage;
|
||||
|
||||
const { ContentHint } = Proto.UnidentifiedSenderMessage.Message;
|
||||
|
||||
return this.sendIndividualProto({
|
||||
identifier: uuid || e164,
|
||||
proto: contentMessage,
|
||||
timestamp: Date.now(),
|
||||
contentHint: ContentHint.RESENDABLE,
|
||||
options,
|
||||
});
|
||||
}
|
||||
|
||||
async sendReadReceipts({
|
||||
senderE164,
|
||||
senderUuid,
|
||||
timestamps,
|
||||
options,
|
||||
}: Readonly<{
|
||||
senderE164: string;
|
||||
senderUuid: string;
|
||||
timestamps: Array<number>;
|
||||
options?: Readonly<SendOptionsType>;
|
||||
}>): Promise<CallbackResultType> {
|
||||
const receiptMessage = new Proto.ReceiptMessage();
|
||||
receiptMessage.type = Proto.ReceiptMessage.Type.READ;
|
||||
receiptMessage.type = type;
|
||||
receiptMessage.timestamp = timestamps;
|
||||
|
||||
const contentMessage = new Proto.Content();
|
||||
|
|
|
@ -37,7 +37,8 @@ export type SendTypesType =
|
|||
| 'typing' // excluded from send log
|
||||
| 'verificationSync'
|
||||
| 'viewOnceSync'
|
||||
| 'viewSync';
|
||||
| 'viewSync'
|
||||
| 'viewedReceipt';
|
||||
|
||||
export function shouldSaveProto(sendType: SendTypesType): boolean {
|
||||
if (sendType === 'callingMessage') {
|
||||
|
|
55
ts/util/sendViewedReceipt.ts
Normal file
55
ts/util/sendViewedReceipt.ts
Normal file
|
@ -0,0 +1,55 @@
|
|||
// Copyright 2021 Signal Messenger, LLC
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
import type { ConversationAttributesType } from '../model-types.d';
|
||||
import { getSendOptions } from './getSendOptions';
|
||||
import { handleMessageSend } from './handleMessageSend';
|
||||
import { isConversationAccepted } from './isConversationAccepted';
|
||||
|
||||
export async function sendViewedReceipt({
|
||||
messageId,
|
||||
senderE164,
|
||||
senderUuid,
|
||||
timestamp,
|
||||
}: Readonly<{
|
||||
messageId: string;
|
||||
senderE164?: string;
|
||||
senderUuid?: string;
|
||||
timestamp: number;
|
||||
}>): Promise<void> {
|
||||
if (!window.storage.get('read-receipt-setting')) {
|
||||
return;
|
||||
}
|
||||
|
||||
const conversationId = window.ConversationController.ensureContactIds({
|
||||
e164: senderE164,
|
||||
uuid: senderUuid,
|
||||
});
|
||||
if (!conversationId) {
|
||||
throw new Error(
|
||||
'sendViewedReceipt: no conversation found with that E164/UUID'
|
||||
);
|
||||
}
|
||||
|
||||
const conversation = window.ConversationController.get(conversationId);
|
||||
if (!conversation) {
|
||||
throw new Error(
|
||||
'sendViewedReceipt: no conversation found with that conversation ID, even though we found the ID with E164/UUID?'
|
||||
);
|
||||
}
|
||||
|
||||
const conversationAttrs: ConversationAttributesType = conversation.attributes;
|
||||
if (!isConversationAccepted(conversationAttrs)) {
|
||||
return;
|
||||
}
|
||||
|
||||
await handleMessageSend(
|
||||
window.textsecure.messaging.sendViewedReceipts({
|
||||
senderE164,
|
||||
senderUuid,
|
||||
timestamps: [timestamp],
|
||||
options: await getSendOptions(conversationAttrs),
|
||||
}),
|
||||
{ messageIds: [messageId], sendType: 'viewedReceipt' }
|
||||
);
|
||||
}
|
|
@ -63,6 +63,7 @@ import {
|
|||
} from '../util/handleImageAttachment';
|
||||
import { ReadStatus } from '../messages/MessageReadStatus';
|
||||
import { markViewed } from '../services/MessageUpdater';
|
||||
import { viewedReceiptsJobQueue } from '../jobs/viewedReceiptsJobQueue';
|
||||
import { viewSyncJobQueue } from '../jobs/viewSyncJobQueue';
|
||||
import type { ContactType } from '../types/Contact';
|
||||
import type { WhatIsThis } from '../window.d';
|
||||
|
@ -879,14 +880,28 @@ Whisper.ConversationView = Whisper.View.extend({
|
|||
return;
|
||||
}
|
||||
|
||||
const senderE164 = message.get('source');
|
||||
const senderUuid = message.get('sourceUuid');
|
||||
const timestamp = message.get('sent_at');
|
||||
|
||||
message.set(markViewed(message.attributes, Date.now()));
|
||||
|
||||
viewedReceiptsJobQueue.add({
|
||||
viewedReceipt: {
|
||||
messageId,
|
||||
senderE164,
|
||||
senderUuid,
|
||||
timestamp,
|
||||
},
|
||||
});
|
||||
|
||||
viewSyncJobQueue.add({
|
||||
viewSyncs: [
|
||||
{
|
||||
messageId,
|
||||
senderE164: message.get('source'),
|
||||
senderUuid: message.get('sourceUuid'),
|
||||
timestamp: message.get('sent_at'),
|
||||
senderE164,
|
||||
senderUuid,
|
||||
timestamp,
|
||||
},
|
||||
],
|
||||
});
|
||||
|
|
Loading…
Reference in a new issue