conversationJobQueue: Only show captcha for bubble messages

This commit is contained in:
Scott Nonnenberg 2023-10-27 17:14:35 -07:00 committed by GitHub
parent e69e8f3c9d
commit 2da49456c6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 721 additions and 126 deletions

View file

@ -22,6 +22,7 @@ import * as Errors from './types/errors';
import { HTTPError } from './textsecure/Errors';
import type { SendMessageChallengeData } from './textsecure/Errors';
import * as log from './logging/log';
import { drop } from './util/drop';
export type ChallengeResponse = Readonly<{
captcha: string;
@ -75,6 +76,7 @@ export type RegisteredChallengeType = Readonly<{
reason: string;
retryAt?: number;
token?: string;
silent: boolean;
}>;
type SolveOptionsType = Readonly<{
@ -210,7 +212,7 @@ export class ChallengeHandler {
}
if (challenge.token) {
void this.solve({ reason, token: challenge.token });
drop(this.solve({ reason, token: challenge.token }));
}
}
@ -247,7 +249,7 @@ export class ChallengeHandler {
setTimeout(() => {
this.startTimers.delete(conversationId);
void this.startQueue(conversationId);
drop(this.startQueue(conversationId));
}, waitTime)
);
log.info(
@ -269,7 +271,9 @@ export class ChallengeHandler {
return;
}
void this.solve({ token: challenge.token, reason });
if (!challenge.silent) {
drop(this.solve({ token: challenge.token, reason }));
}
}
public onResponse(response: IPCResponse): void {
@ -282,8 +286,13 @@ export class ChallengeHandler {
handler.resolve(response.data);
}
public async unregister(conversationId: string): Promise<void> {
log.info(`challenge: unregistered conversation ${conversationId}`);
public async unregister(
conversationId: string,
source: string
): Promise<void> {
log.info(
`challenge: unregistered conversation ${conversationId} via ${source}`
);
this.registeredConversations.delete(conversationId);
this.pendingStarts.delete(conversationId);
@ -343,7 +352,7 @@ export class ChallengeHandler {
return;
}
await this.unregister(conversationId);
await this.unregister(conversationId, 'startQueue');
if (this.registeredConversations.size === 0) {
this.options.setChallengeStatus('idle');

View file

@ -13,6 +13,7 @@ import * as log from '../logging/log';
import { JobLogger } from './JobLogger';
import * as Errors from '../types/errors';
import type { LoggerType } from '../types/Logging';
import { drop } from '../util/drop';
const noopOnCompleteCallbacks = {
resolve: noop,
@ -43,6 +44,12 @@ type JobQueueOptions = {
logger?: LoggerType;
};
export enum JOB_STATUS {
SUCCESS = 'SUCCESS',
NEEDS_RETRY = 'NEEDS_RETRY',
ERROR = 'ERROR',
}
export abstract class JobQueue<T> {
private readonly maxAttempts: number;
@ -119,7 +126,7 @@ export abstract class JobQueue<T> {
protected abstract run(
job: Readonly<ParsedJob<T>>,
extra?: Readonly<{ attempt?: number; log?: LoggerType }>
): Promise<void>;
): Promise<JOB_STATUS.NEEDS_RETRY | undefined>;
protected getQueues(): ReadonlySet<PQueue> {
return new Set([this.defaultInMemoryQueue]);
@ -144,7 +151,7 @@ export abstract class JobQueue<T> {
log.info(`${this.logPrefix} is shutting down. Can't accept more work.`);
break;
}
void this.enqueueStoredJob(storedJob);
drop(this.enqueueStoredJob(storedJob));
}
}
@ -201,7 +208,9 @@ export abstract class JobQueue<T> {
return this.defaultInMemoryQueue;
}
private async enqueueStoredJob(storedJob: Readonly<StoredJob>) {
protected async enqueueStoredJob(
storedJob: Readonly<StoredJob>
): Promise<void> {
assertDev(
storedJob.queueType === this.queueType,
'Received a mis-matched queue type'
@ -242,8 +251,10 @@ export abstract class JobQueue<T> {
const result:
| undefined
| { success: true }
| { success: false; err: unknown } = await queue.add(async () => {
| { status: JOB_STATUS.SUCCESS }
| { status: JOB_STATUS.NEEDS_RETRY }
| { status: JOB_STATUS.ERROR; err: unknown } = await queue.add(
async () => {
for (let attempt = 1; attempt <= this.maxAttempts; attempt += 1) {
const isFinalAttempt = attempt === this.maxAttempts;
@ -257,18 +268,30 @@ export abstract class JobQueue<T> {
log.warn(
`${this.logPrefix} returning early for job ${storedJob.id}; shutting down`
);
return { success: false, err: new Error('Shutting down') };
return {
status: JOB_STATUS.ERROR,
err: new Error('Shutting down'),
};
}
try {
// We want an `await` in the loop, as we don't want a single job running more
// than once at a time. Ideally, the job will succeed on the first attempt.
// eslint-disable-next-line no-await-in-loop
await this.run(parsedJob, { attempt, log: logger });
const jobStatus = await this.run(parsedJob, {
attempt,
log: logger,
});
if (!jobStatus) {
log.info(
`${this.logPrefix} job ${storedJob.id} succeeded on attempt ${attempt}`
);
return { success: true };
return { status: JOB_STATUS.SUCCESS };
}
log.info(
`${this.logPrefix} job ${storedJob.id} returned status ${jobStatus} on attempt ${attempt}`
);
return { status: jobStatus };
} catch (err: unknown) {
log.error(
`${this.logPrefix} job ${
@ -276,16 +299,30 @@ export abstract class JobQueue<T> {
} failed on attempt ${attempt}. ${Errors.toLogFormat(err)}`
);
if (isFinalAttempt) {
return { success: false, err };
return { status: JOB_STATUS.ERROR, err };
}
}
}
// This should never happen. See the assertion below.
return undefined;
});
}
);
if (result?.success || !this.isShuttingDown) {
if (result?.status === JOB_STATUS.NEEDS_RETRY) {
const addJobSuccess = await this.retryJobOnQueueIdle({
storedJob,
job: parsedJob,
logger,
});
if (!addJobSuccess) {
await this.store.delete(storedJob.id);
}
}
if (
result?.status === JOB_STATUS.SUCCESS ||
(result?.status === JOB_STATUS.ERROR && !this.isShuttingDown)
) {
await this.store.delete(storedJob.id);
}
@ -293,13 +330,26 @@ export abstract class JobQueue<T> {
result,
'The job never ran. This indicates a developer error in the job queue'
);
if (result.success) {
resolve();
} else {
if (result.status === JOB_STATUS.ERROR) {
reject(result.err);
} else {
resolve();
}
}
async retryJobOnQueueIdle({
logger,
}: {
job: Readonly<ParsedJob<T>>;
storedJob: Readonly<StoredJob>;
logger: LoggerType;
}): Promise<boolean> {
logger.error(
`retryJobOnQueueIdle: not implemented for queue ${this.queueType}; dropping job`
);
return false;
}
async shutdown(): Promise<void> {
const queues = this.getQueues();
log.info(

View file

@ -9,7 +9,7 @@ import * as durations from '../util/durations';
import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff';
import { InMemoryQueues } from './helpers/InMemoryQueues';
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
import { JobQueue } from './JobQueue';
import { JOB_STATUS, JobQueue } from './JobQueue';
import { sendNormalMessage } from './helpers/sendNormalMessage';
import { sendDirectExpirationTimerUpdate } from './helpers/sendDirectExpirationTimerUpdate';
@ -33,7 +33,7 @@ import { strictAssert } from '../util/assert';
import { missingCaseError } from '../util/missingCaseError';
import { explodePromise } from '../util/explodePromise';
import type { Job } from './Job';
import type { ParsedJob } from './types';
import type { ParsedJob, StoredJob } from './types';
import type SendMessage from '../textsecure/SendMessage';
import type { ServiceIdString } from '../types/ServiceId';
import { commonShouldJobContinue } from './helpers/commonShouldJobContinue';
@ -44,6 +44,9 @@ import { sendResendRequest } from './helpers/sendResendRequest';
import { sendNullMessage } from './helpers/sendNullMessage';
import { sendSenderKeyDistribution } from './helpers/sendSenderKeyDistribution';
import { sendSavedProto } from './helpers/sendSavedProto';
import { drop } from '../util/drop';
import { isInPast } from '../util/timestamp';
import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary';
// Note: generally, we only want to add to this list. If you do need to change one of
// these values, you'll likely need to write a database migration.
@ -62,6 +65,7 @@ export const conversationQueueJobEnum = z.enum([
'Story',
'Receipts',
]);
type ConversationQueueJobEnum = z.infer<typeof conversationQueueJobEnum>;
const deleteForEveryoneJobDataSchema = z.object({
type: z.literal(conversationQueueJobEnum.enum.DeleteForEveryone),
@ -234,7 +238,92 @@ export type ConversationQueueJobBundle = {
const MAX_RETRY_TIME = durations.DAY;
const MAX_ATTEMPTS = exponentialBackoffMaxAttempts(MAX_RETRY_TIME);
function shouldSendShowCaptcha(type: ConversationQueueJobEnum): boolean {
if (type === 'DeleteForEveryone') {
return true;
}
if (type === 'DeleteStoryForEveryone') {
return true;
}
if (type === 'DirectExpirationTimerUpdate') {
return true;
}
if (type === 'GroupUpdate') {
return false;
}
if (type === 'NormalMessage') {
return true;
}
if (type === 'NullMessage') {
return false;
}
if (type === 'ProfileKey') {
return false;
}
if (type === 'Reaction') {
return false;
}
if (type === 'ResendRequest') {
return false;
}
if (type === 'SavedProto') {
return false;
}
if (type === 'SenderKeyDistribution') {
return false;
}
if (type === 'Story') {
return true;
}
if (type === 'Receipts') {
return false;
}
throw missingCaseError(type);
}
enum RETRY_STATUS {
BLOCKED = 'BLOCKED',
BLOCKED_WITH_JOBS = 'BLOCKED_WITH_JOBS',
UNBLOCKED = 'UNBLOCKED',
}
type ConversationData = Readonly<
| {
// When we get a retryAt from a 428 error, we immediately record it, but we don't
// yet have a job to retry. We should, very soon, when the job returns
// JOB_STATUS.NEEDS_RETRY. This should be a very short-lived state.
status: RETRY_STATUS.BLOCKED;
callback: undefined;
jobsNeedingRetry: undefined;
retryAt: number;
}
| {
// This is the next stage, when we've added at least one job needing retry, and we
// have a callback registered to run on queue idle (or be called directly).
status: RETRY_STATUS.BLOCKED_WITH_JOBS;
callback: () => void;
jobsNeedingRetry: Array<Readonly<StoredJob>>;
retryAt: number;
retryAtTimeout?: NodeJS.Timeout;
}
| {
// When we discover that we can now run these deferred jobs, we flip into this
// state, which should be short-lived. We very quickly re-enqueue all
// jobsNeedingRetry, and erase perConversationData for this conversation.
status: RETRY_STATUS.UNBLOCKED;
callback: () => void;
jobsNeedingRetry: Array<Readonly<StoredJob>>;
retryAt: undefined;
retryAtTimeout?: NodeJS.Timeout;
}
>;
export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
private readonly perConversationData = new Map<
string,
ConversationData | undefined
>();
private readonly inMemoryQueues = new InMemoryQueues();
private readonly verificationWaitMap = new Map<
string,
@ -244,6 +333,7 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
promise: Promise<unknown>;
}
>();
private callbackCount = 0;
override getQueues(): ReadonlySet<PQueue> {
return this.inMemoryQueues.allQueues;
@ -254,6 +344,8 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
insert?: (job: ParsedJob<ConversationQueueJobData>) => Promise<void>
): Promise<Job<ConversationQueueJobData>> {
const { conversationId, type } = data;
if (shouldSendShowCaptcha(data.type)) {
strictAssert(
window.Signal.challengeHandler,
'conversationJobQueue.add: Missing challengeHandler!'
@ -262,6 +354,7 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
conversationId,
reason: `conversationJobQueue.add(${conversationId}, ${type})`,
});
}
return super.add(data, insert);
}
@ -310,18 +403,239 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
globalLogger.warn(
`resolveVerificationWaiter: Missing waiter for conversation ${conversationId}.`
);
this.unblockConversationRetries(conversationId);
}
}
private unblockConversationRetries(conversationId: string) {
const logId = `unblockConversationRetries/${conversationId}`;
const perConversationData = this.perConversationData.get(conversationId);
if (!perConversationData) {
return;
}
const { status, callback } = perConversationData;
if (status === RETRY_STATUS.BLOCKED) {
globalLogger.info(
`${logId}: Deleting previous BLOCKED state; had no jobs`
);
this.perConversationData.delete(conversationId);
} else if (status === RETRY_STATUS.BLOCKED_WITH_JOBS) {
globalLogger.info(
`${logId}: Moving previous WAITING state to UNBLOCKED, calling callback directly`
);
this.perConversationData.set(conversationId, {
...perConversationData,
status: RETRY_STATUS.UNBLOCKED,
retryAt: undefined,
});
callback();
} else if (status === RETRY_STATUS.UNBLOCKED) {
globalLogger.warn(
`${logId}: We're still in UNBLOCKED state; calling callback directly`
);
callback();
} else {
throw missingCaseError(status);
}
}
private captureRetryAt(conversationId: string, retryAt: number | undefined) {
const logId = `captureRetryAt/${conversationId}`;
const newRetryAt = retryAt || Date.now() + MINUTE;
const perConversationData = this.perConversationData.get(conversationId);
if (!perConversationData) {
if (!retryAt) {
globalLogger.warn(
`${logId}: No existing data, using retryAt of ${newRetryAt}`
);
}
this.perConversationData.set(conversationId, {
status: RETRY_STATUS.BLOCKED,
retryAt: newRetryAt,
callback: undefined,
jobsNeedingRetry: undefined,
});
return;
}
const { status, retryAt: existingRetryAt } = perConversationData;
if (existingRetryAt && existingRetryAt >= newRetryAt) {
globalLogger.warn(
`${logId}: New newRetryAt ${newRetryAt} isn't after existing retryAt ${existingRetryAt}, dropping`
);
return;
}
if (
status === RETRY_STATUS.BLOCKED ||
status === RETRY_STATUS.BLOCKED_WITH_JOBS
) {
globalLogger.info(
`${logId}: Updating to newRetryAt ${newRetryAt} from existing retryAt ${existingRetryAt}, status ${status}`
);
this.perConversationData.set(conversationId, {
...perConversationData,
retryAt: newRetryAt,
});
} else if (status === RETRY_STATUS.UNBLOCKED) {
globalLogger.info(
`${logId}: Updating to newRetryAt ${newRetryAt} from previous UNBLOCKED status`
);
this.perConversationData.set(conversationId, {
...perConversationData,
status: RETRY_STATUS.BLOCKED_WITH_JOBS,
retryAt: newRetryAt,
});
} else {
throw missingCaseError(status);
}
}
override async retryJobOnQueueIdle({
job,
storedJob,
logger,
}: {
job: Readonly<ParsedJob<ConversationQueueJobData>>;
storedJob: Readonly<StoredJob>;
logger: LoggerType;
}): Promise<boolean> {
const { conversationId } = job.data;
const logId = `retryJobOnQueueIdle/${conversationId}/${job.id}`;
const perConversationData = this.perConversationData.get(conversationId);
if (!perConversationData) {
logger.warn(`${logId}: no data for conversation; using default retryAt`);
} else {
logger.warn(
`${logId}: adding to existing data with status ${perConversationData.status}`
);
}
const { status, retryAt, jobsNeedingRetry, callback } =
perConversationData || {
status: RETRY_STATUS.BLOCKED,
retryAt: Date.now() + MINUTE,
};
const newJobsNeedingRetry = (jobsNeedingRetry || []).concat([storedJob]);
logger.info(
`${logId}: job added to retry queue with status ${status}; ${newJobsNeedingRetry.length} items now in queue`
);
const newCallback =
callback || this.createRetryCallback(conversationId, job.id);
if (
status === RETRY_STATUS.BLOCKED ||
status === RETRY_STATUS.BLOCKED_WITH_JOBS
) {
this.perConversationData.set(conversationId, {
status: RETRY_STATUS.BLOCKED_WITH_JOBS,
retryAt,
jobsNeedingRetry: newJobsNeedingRetry,
callback: newCallback,
});
} else {
this.perConversationData.set(conversationId, {
status: RETRY_STATUS.UNBLOCKED,
retryAt,
jobsNeedingRetry: newJobsNeedingRetry,
callback: newCallback,
});
}
if (newCallback !== callback) {
const queue = this.getInMemoryQueue(job);
drop(
// eslint-disable-next-line more/no-then
queue.onIdle().then(() => {
globalLogger.info(`${logId}: Running callback due to queue.onIdle`);
newCallback();
})
);
}
return true;
}
private createRetryCallback(conversationId: string, jobId: string) {
this.callbackCount += 1;
const id = this.callbackCount;
globalLogger.info(
`createRetryCallback/${conversationId}/${id}: callback created for job ${jobId}`
);
return () => {
const logId = `retryCallback/${conversationId}/${id}`;
const perConversationData = this.perConversationData.get(conversationId);
if (!perConversationData) {
globalLogger.warn(`${logId}: no perConversationData, returning early.`);
return;
}
const { status, retryAt } = perConversationData;
if (status === RETRY_STATUS.BLOCKED) {
globalLogger.warn(
`${logId}: Still in blocked state, no jobs to retry. Clearing perConversationData.`
);
this.perConversationData.delete(conversationId);
return;
}
const { callback, jobsNeedingRetry, retryAtTimeout } =
perConversationData;
if (retryAtTimeout) {
clearTimeoutIfNecessary(retryAtTimeout);
}
if (!retryAt || isInPast(retryAt)) {
globalLogger.info(
`${logId}: retryAt is ${retryAt}; queueing ${jobsNeedingRetry?.length} jobs needing retry`
);
// We're starting to retry jobs; remove the challenge handler
drop(window.Signal.challengeHandler?.unregister(conversationId, logId));
jobsNeedingRetry?.forEach(job => {
drop(this.enqueueStoredJob(job));
});
this.perConversationData.delete(conversationId);
return;
}
const timeLeft = retryAt - Date.now();
globalLogger.info(
`${logId}: retryAt ${retryAt} is in the future, scheduling timeout for ${timeLeft}ms`
);
this.perConversationData.set(conversationId, {
...perConversationData,
retryAtTimeout: setTimeout(() => {
globalLogger.info(`${logId}: Running callback due to timeout`);
callback();
}, timeLeft),
});
};
}
protected async run(
{
data,
timestamp,
}: Readonly<{ data: ConversationQueueJobData; timestamp: number }>,
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
): Promise<void> {
): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
const { type, conversationId } = data;
const isFinalAttempt = attempt >= MAX_ATTEMPTS;
const perConversationData = this.perConversationData.get(conversationId);
await window.ConversationController.load();
@ -330,6 +644,11 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
throw new Error(`Failed to find conversation ${conversationId}`);
}
if (perConversationData?.retryAt && !shouldSendShowCaptcha(type)) {
// If we return this value, JobQueue will call retryJobOnQueueIdle for this job
return JOB_STATUS.NEEDS_RETRY;
}
let timeRemaining: number;
let shouldContinue: boolean;
let count = 0;
@ -350,10 +669,24 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
break;
}
if (window.Signal.challengeHandler?.isRegistered(conversationId)) {
const isChallengeRegistered =
window.Signal.challengeHandler?.isRegistered(conversationId);
if (!isChallengeRegistered) {
this.unblockConversationRetries(conversationId);
}
if (isChallengeRegistered && shouldSendShowCaptcha(type)) {
if (this.isShuttingDown) {
throw new Error("Shutting down, can't wait for captcha challenge.");
}
window.Signal.challengeHandler?.maybeSolve({
conversationId,
reason:
'conversationJobQueue.run/addWaiter(' +
`${conversation.idForLogging()}, ${type}, ${timestamp})`,
});
log.info(
'captcha challenge is pending for this conversation; waiting at most 5m...'
);
@ -386,7 +719,7 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
log.warn(
"Cancelling profile share, we don't want to wait for pending verification."
);
return;
return undefined;
}
if (this.isShuttingDown) {
@ -498,10 +831,14 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
);
}
}
return undefined;
} catch (error: unknown) {
const untrustedServiceIds: Array<ServiceIdString> = [];
const processError = (toProcess: unknown) => {
const processError = (
toProcess: unknown
): undefined | typeof JOB_STATUS.NEEDS_RETRY => {
if (toProcess instanceof OutgoingIdentityKeyError) {
const failedConversation = window.ConversationController.getOrCreate(
toProcess.identifier,
@ -513,11 +850,14 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
log.error(
`failedConversation: Conversation ${failedConversation.idForLogging()} missing serviceId!`
);
return;
return undefined;
}
untrustedServiceIds.push(serviceId);
} else if (toProcess instanceof SendMessageChallengeError) {
void window.Signal.challengeHandler?.register(
const silent = !shouldSendShowCaptcha(type);
drop(
window.Signal.challengeHandler?.register(
{
conversationId,
createdAt: Date.now(),
@ -526,15 +866,31 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
reason:
'conversationJobQueue.run(' +
`${conversation.idForLogging()}, ${type}, ${timestamp})`,
silent,
},
toProcess.data
)
);
if (silent) {
this.captureRetryAt(conversationId, toProcess.retryAt);
return JOB_STATUS.NEEDS_RETRY;
}
}
return undefined;
};
processError(error);
const value = processError(error);
if (value) {
return value;
}
if (error instanceof SendMessageProtoError) {
(error.errors || []).forEach(processError);
const values = (error.errors || []).map(processError);
const innerValue = values.find(item => Boolean(item));
if (innerValue) {
return innerValue;
}
}
if (untrustedServiceIds.length) {
@ -542,14 +898,14 @@ export class ConversationJobQueue extends JobQueue<ConversationQueueJobData> {
log.warn(
`Cancelling profile share, since there were ${untrustedServiceIds.length} untrusted send targets.`
);
return;
return undefined;
}
if (type === jobSet.Receipts) {
log.warn(
`Cancelling receipt send, since there were ${untrustedServiceIds.length} untrusted send targets.`
);
return;
return undefined;
}
log.error(

View file

@ -444,6 +444,7 @@ export async function sendStory(
reason:
'conversationJobQueue.run(' +
`${conversation.idForLogging()}, story, ${timestamp}/${distributionId})`,
silent: false,
},
error.data
);

View file

@ -13,6 +13,7 @@ import {
import { strictAssert } from '../util/assert';
import { isRecord } from '../util/isRecord';
import type { JOB_STATUS } from './JobQueue';
import { JobQueue } from './JobQueue';
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
@ -31,7 +32,7 @@ export class ReadSyncJobQueue extends JobQueue<ReadSyncJobData> {
protected async run(
{ data, timestamp }: Readonly<{ data: ReadSyncJobData; timestamp: number }>,
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
): Promise<void> {
): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
await runSyncJob({
attempt,
log,
@ -40,6 +41,8 @@ export class ReadSyncJobQueue extends JobQueue<ReadSyncJobData> {
timestamp,
type: SyncTypeList.Read,
});
return undefined;
}
}

View file

@ -3,7 +3,9 @@
import { z } from 'zod';
import type { JOB_STATUS } from './JobQueue';
import { JobQueue } from './JobQueue';
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
const removeStorageKeyJobDataSchema = z.object({
@ -24,12 +26,16 @@ export class RemoveStorageKeyJobQueue extends JobQueue<RemoveStorageKeyJobData>
protected async run({
data,
}: Readonly<{ data: RemoveStorageKeyJobData }>): Promise<void> {
}: Readonly<{ data: RemoveStorageKeyJobData }>): Promise<
typeof JOB_STATUS.NEEDS_RETRY | undefined
> {
await new Promise<void>(resolve => {
window.storage.onready(resolve);
});
await window.storage.remove(data.key);
return undefined;
}
}

View file

@ -10,6 +10,7 @@ import type { LoggerType } from '../types/Logging';
import { aciSchema } from '../types/ServiceId';
import { map } from '../util/iterables';
import type { JOB_STATUS } from './JobQueue';
import { JobQueue } from './JobQueue';
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
import { parseIntWithFallback } from '../util/parseIntWithFallback';
@ -49,7 +50,7 @@ export class ReportSpamJobQueue extends JobQueue<ReportSpamJobData> {
protected async run(
{ data }: Readonly<{ data: ReportSpamJobData }>,
{ log }: Readonly<{ log: LoggerType }>
): Promise<void> {
): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
const { aci: senderAci, token, serverGuids } = data;
await new Promise<void>(resolve => {
@ -58,7 +59,7 @@ export class ReportSpamJobQueue extends JobQueue<ReportSpamJobData> {
if (!isDeviceLinked()) {
log.info("reportSpamJobQueue: skipping this job because we're unlinked");
return;
return undefined;
}
await waitForOnline(window.navigator, window);
@ -72,6 +73,8 @@ export class ReportSpamJobQueue extends JobQueue<ReportSpamJobData> {
server.reportMessage({ senderAci, serverGuid, token })
)
);
return undefined;
} catch (err: unknown) {
if (!(err instanceof HTTPError)) {
throw err;
@ -88,7 +91,7 @@ export class ReportSpamJobQueue extends JobQueue<ReportSpamJobData> {
log.info(
'reportSpamJobQueue: server responded with 508. Giving up on this job'
);
return;
return undefined;
}
if (isRetriable4xxStatus(code) || is5xxStatus(code)) {
@ -106,7 +109,7 @@ export class ReportSpamJobQueue extends JobQueue<ReportSpamJobData> {
log.error(
`reportSpamJobQueue: server responded with ${code} status code. Giving up on this job`
);
return;
return undefined;
}
throw err;

View file

@ -8,6 +8,7 @@ import * as Bytes from '../Bytes';
import type { LoggerType } from '../types/Logging';
import { exponentialBackoffMaxAttempts } from '../util/exponentialBackoff';
import type { ParsedJob } from './types';
import type { JOB_STATUS } from './JobQueue';
import { JobQueue } from './JobQueue';
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
import { DAY } from '../util/durations';
@ -51,7 +52,7 @@ export class SingleProtoJobQueue extends JobQueue<SingleProtoJobData> {
timestamp,
}: Readonly<{ data: SingleProtoJobData; timestamp: number }>,
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
): Promise<void> {
): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
const timeRemaining = timestamp + MAX_RETRY_TIME - Date.now();
const isFinalAttempt = attempt >= MAX_ATTEMPTS;
@ -62,7 +63,7 @@ export class SingleProtoJobQueue extends JobQueue<SingleProtoJobData> {
skipWait: false,
});
if (!shouldContinue) {
return;
return undefined;
}
const {
@ -87,19 +88,19 @@ export class SingleProtoJobQueue extends JobQueue<SingleProtoJobData> {
log.info(
`conversation ${conversation.idForLogging()} is not accepted; refusing to send`
);
return;
return undefined;
}
if (isConversationUnregistered(conversation.attributes)) {
log.info(
`conversation ${conversation.idForLogging()} is unregistered; refusing to send`
);
return;
return undefined;
}
if (conversation.isBlocked()) {
log.info(
`conversation ${conversation.idForLogging()} is blocked; refusing to send`
);
return;
return undefined;
}
const proto = Proto.Content.decode(Bytes.fromBase64(protoBase64));
@ -133,6 +134,8 @@ export class SingleProtoJobQueue extends JobQueue<SingleProtoJobData> {
toThrow: error,
});
}
return undefined;
}
}

View file

@ -13,6 +13,7 @@ import {
import { strictAssert } from '../util/assert';
import { isRecord } from '../util/isRecord';
import type { JOB_STATUS } from './JobQueue';
import { JobQueue } from './JobQueue';
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
@ -34,7 +35,7 @@ export class ViewOnceOpenJobQueue extends JobQueue<ViewOnceOpenJobData> {
timestamp,
}: Readonly<{ data: ViewOnceOpenJobData; timestamp: number }>,
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
): Promise<void> {
): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
await runSyncJob({
attempt,
log,
@ -43,6 +44,8 @@ export class ViewOnceOpenJobQueue extends JobQueue<ViewOnceOpenJobData> {
timestamp,
type: SyncTypeList.ViewOnceOpen,
});
return undefined;
}
}

View file

@ -13,6 +13,7 @@ import {
import { strictAssert } from '../util/assert';
import { isRecord } from '../util/isRecord';
import type { JOB_STATUS } from './JobQueue';
import { JobQueue } from './JobQueue';
import { jobQueueDatabaseStore } from './JobQueueDatabaseStore';
@ -31,7 +32,7 @@ export class ViewSyncJobQueue extends JobQueue<ViewSyncJobData> {
protected async run(
{ data, timestamp }: Readonly<{ data: ViewSyncJobData; timestamp: number }>,
{ attempt, log }: Readonly<{ attempt: number; log: LoggerType }>
): Promise<void> {
): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
await runSyncJob({
attempt,
log,
@ -40,6 +41,8 @@ export class ViewSyncJobQueue extends JobQueue<ViewSyncJobData> {
timestamp,
type: SyncTypeList.View,
});
return undefined;
}
}

View file

@ -57,6 +57,7 @@ describe('ChallengeHandler', () => {
retryAt: NOW + DEFAULT_RETRY_AFTER,
createdAt: NOW - SECOND,
reason: 'test',
silent: false,
...options,
};
};
@ -185,7 +186,7 @@ describe('ChallengeHandler', () => {
await createHandler();
for (const challenge of challenges) {
await handler.unregister(challenge.conversationId);
await handler.unregister(challenge.conversationId, 'test');
}
for (const challenge of challenges) {
@ -223,7 +224,7 @@ describe('ChallengeHandler', () => {
autoSolve: true,
expireAfter: -1,
});
await handler.unregister(one.conversationId);
await handler.unregister(one.conversationId, 'test');
challengeStatus = 'idle';
await newHandler.load();

View file

@ -147,7 +147,7 @@ describe('editing', function (this: Mocha.Suite) {
.locator(`.module-message__text >> "${initialMessageBody}"`)
.waitFor();
debug('waiting for receipts for original message');
debug('waiting for outgoing receipts for original message');
const receipts = await app.waitForReceipts();
assert.strictEqual(receipts.type, ReceiptType.Read);
assert.strictEqual(receipts.timestamps.length, 1);

View file

@ -20,6 +20,7 @@ describe('challenge/receipts', function (this: Mocha.Suite) {
let bootstrap: Bootstrap;
let app: App;
let contact: PrimaryDevice;
let contactB: PrimaryDevice;
beforeEach(async () => {
bootstrap = new Bootstrap({
@ -34,6 +35,9 @@ describe('challenge/receipts', function (this: Mocha.Suite) {
contact = await server.createPrimaryDevice({
profileName: 'Jamie',
});
contactB = await server.createPrimaryDevice({
profileName: 'Kim',
});
let state = StorageState.getEmpty();
@ -55,13 +59,28 @@ describe('challenge/receipts', function (this: Mocha.Suite) {
},
ServiceIdKind.PNI
);
state = state.addContact(
contactB,
{
whitelisted: true,
serviceE164: contactB.device.number,
identityKey: contactB.getPublicKey(ServiceIdKind.PNI).serialize(),
pni: toUntaggedPni(contactB.device.pni),
givenName: 'Kim',
},
ServiceIdKind.PNI
);
// Just to make PNI Contact visible in the left pane
state = state.pin(contact, ServiceIdKind.PNI);
state = state.pin(contactB, ServiceIdKind.PNI);
const ourKey = await desktop.popSingleUseKey();
await contact.addSingleUseKey(desktop, ourKey);
const ourKeyB = await desktop.popSingleUseKey();
await contactB.addSingleUseKey(desktop, ourKeyB);
await phone.setStorageState(state);
});
@ -95,11 +114,18 @@ describe('challenge/receipts', function (this: Mocha.Suite) {
.locator(`[data-testid="${contact.toContact().aci}"]`)
.click();
debug('Accept conversation from contact');
debug('Accept conversation from contact - does not trigger captcha!');
await conversationStack
.locator('.module-message-request-actions button >> "Accept"')
.click();
debug('Sending a message back to user - will trigger captcha!');
{
const input = await app.waitForEnabledComposer();
await input.type('Hi, good to hear from you!');
await input.press('Enter');
}
debug('Waiting for challenge');
const request = await app.waitForChallenge();
@ -114,14 +140,122 @@ describe('challenge/receipts', function (this: Mocha.Suite) {
target: contact.device.aci,
});
debug(`rate limited requests: ${requests}`);
assert.strictEqual(requests, 1);
debug(`Rate-limited requests: ${requests}`);
assert.strictEqual(requests, 1, 'rate limit requests');
debug('Waiting for receipts');
debug('Waiting for outgoing read receipt');
const receipts = await app.waitForReceipts();
assert.strictEqual(receipts.type, ReceiptType.Read);
assert.strictEqual(receipts.timestamps.length, 1);
assert.strictEqual(receipts.timestamps.length, 1, 'receipts');
assert.strictEqual(receipts.timestamps[0], timestamp);
});
it('should send non-bubble in ConvoA when ConvoB completes challenge', async () => {
const { server, desktop } = bootstrap;
debug(
`Rate limiting (desktop: ${desktop.aci}) -> (ContactA: ${contact.device.aci})`
);
server.rateLimit({ source: desktop.aci, target: contact.device.aci });
debug(
`Rate limiting (desktop: ${desktop.aci}) -> (ContactB: ${contactB.device.aci})`
);
server.rateLimit({ source: desktop.aci, target: contactB.device.aci });
const window = await app.getWindow();
const leftPane = window.locator('#LeftPane');
const conversationStack = window.locator('.Inbox__conversation-stack');
debug('Sending a message from ContactA');
const timestampA = bootstrap.getTimestamp();
await contact.sendText(desktop, 'Hello there!', {
timestamp: timestampA,
});
debug(`Opening conversation with ContactA (${contact.toContact().aci})`);
await leftPane
.locator(`[data-testid="${contact.toContact().aci}"]`)
.click();
debug('Accept conversation from ContactA - does not trigger captcha!');
await conversationStack
.locator('.module-message-request-actions button >> "Accept"')
.click();
debug('Sending a message from ContactB');
const timestampB = bootstrap.getTimestamp();
await contactB.sendText(desktop, 'Hey there!', {
timestamp: timestampB,
});
debug(`Opening conversation with ContactB (${contact.toContact().aci})`);
await leftPane
.locator(`[data-testid="${contactB.toContact().aci}"]`)
.click();
debug('Accept conversation from ContactB - does not trigger captcha!');
await conversationStack
.locator('.module-message-request-actions button >> "Accept"')
.click();
debug('Sending a message back to ContactB - will trigger captcha!');
{
const input = await app.waitForEnabledComposer();
await input.type('Hi, good to hear from you!');
await input.press('Enter');
}
debug('Waiting for challenge');
const request = await app.waitForChallenge();
debug('Solving challenge');
await app.solveChallenge({
seq: request.seq,
data: { captcha: 'anything' },
});
const requestsA = server.stopRateLimiting({
source: desktop.aci,
target: contact.device.aci,
});
const requestsB = server.stopRateLimiting({
source: desktop.aci,
target: contactB.device.aci,
});
debug(`Rate-limited requests to A: ${requestsA}`);
assert.strictEqual(requestsA, 1, 'rate limit requests');
debug(`Rate-limited requests to B: ${requestsA}`);
assert.strictEqual(requestsB, 1, 'rate limit requests');
debug('Waiting for outgoing read receipt #1');
const receipts1 = await app.waitForReceipts();
assert.strictEqual(receipts1.type, ReceiptType.Read);
assert.strictEqual(receipts1.timestamps.length, 1, 'receipts');
if (
!receipts1.timestamps.includes(timestampA) &&
!receipts1.timestamps.includes(timestampB)
) {
throw new Error(
'receipts1: Failed to find both timestampA and timestampB'
);
}
debug('Waiting for outgoing read receipt #2');
const receipts2 = await app.waitForReceipts();
assert.strictEqual(receipts2.type, ReceiptType.Read);
assert.strictEqual(receipts2.timestamps.length, 1, 'receipts');
if (
!receipts2.timestamps.includes(timestampA) &&
!receipts2.timestamps.includes(timestampB)
) {
throw new Error(
'receipts2: Failed to find both timestampA and timestampB'
);
}
});
});

View file

@ -16,6 +16,7 @@ import { missingCaseError } from '../../util/missingCaseError';
import { drop } from '../../util/drop';
import type { LoggerType } from '../../types/Logging';
import type { JOB_STATUS } from '../../jobs/JobQueue';
import { JobQueue } from '../../jobs/JobQueue';
import type { ParsedJob, StoredJob, JobQueueStore } from '../../jobs/types';
import { sleep } from '../../util/sleep';
@ -38,8 +39,13 @@ describe('JobQueue', () => {
return testJobSchema.parse(data);
}
async run({ data }: ParsedJob<TestJobData>): Promise<void> {
async run({
data,
}: ParsedJob<TestJobData>): Promise<
typeof JOB_STATUS.NEEDS_RETRY | undefined
> {
results.add(data.a + data.b);
return undefined;
}
}
@ -83,13 +89,15 @@ describe('JobQueue', () => {
return z.number().parse(data);
}
async run(): Promise<void> {
async run(): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
try {
updateActiveJobCount(1);
await sleep(1);
} finally {
updateActiveJobCount(-1);
}
return undefined;
}
}
@ -142,8 +150,8 @@ describe('JobQueue', () => {
return testQueue;
}
run(): Promise<void> {
return Promise.resolve();
run(): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
return Promise.resolve(undefined);
}
}
@ -175,8 +183,8 @@ describe('JobQueue', () => {
return z.string().parse(data);
}
async run(): Promise<void> {
return Promise.resolve();
async run(): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
return Promise.resolve(undefined);
}
}
@ -243,8 +251,8 @@ describe('JobQueue', () => {
return z.string().parse(data);
}
async run(): Promise<void> {
return Promise.resolve();
async run(): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
return Promise.resolve(undefined);
}
}
@ -291,7 +299,11 @@ describe('JobQueue', () => {
return data;
}
async run({ data }: ParsedJob<TestJobData>): Promise<void> {
async run({
data,
}: ParsedJob<TestJobData>): Promise<
typeof JOB_STATUS.NEEDS_RETRY | undefined
> {
switch (data) {
case 'foo':
fooAttempts += 1;
@ -308,6 +320,7 @@ describe('JobQueue', () => {
default:
throw missingCaseError(data);
}
return undefined;
}
}
@ -360,7 +373,7 @@ describe('JobQueue', () => {
async run(
_: unknown,
{ attempt }: Readonly<{ attempt: number }>
): Promise<void> {
): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
attempts.push(attempt);
throw new Error('this job always fails');
}
@ -405,10 +418,12 @@ describe('JobQueue', () => {
async run(
_: unknown,
{ log }: Readonly<{ log: LoggerType }>
): Promise<void> {
): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
log.info(uniqueString);
log.warn(uniqueString);
log.error(uniqueString);
return undefined;
}
}
@ -450,8 +465,8 @@ describe('JobQueue', () => {
throw new Error('uh oh');
}
async run(): Promise<void> {
return Promise.resolve();
async run(): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
return Promise.resolve(undefined);
}
}
@ -494,7 +509,9 @@ describe('JobQueue', () => {
throw new Error('invalid data!');
}
run(job: { data: string }): Promise<void> {
run(job: {
data: string;
}): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
return run(job);
}
}
@ -528,8 +545,8 @@ describe('JobQueue', () => {
throw new Error('invalid data!');
}
async run(): Promise<void> {
return Promise.resolve();
async run(): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
return Promise.resolve(undefined);
}
}
@ -562,8 +579,8 @@ describe('JobQueue', () => {
return undefined;
}
async run(): Promise<void> {
return Promise.resolve();
async run(): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
return Promise.resolve(undefined);
}
}
@ -596,8 +613,9 @@ describe('JobQueue', () => {
return data;
}
async run(): Promise<void> {
async run(): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
events.push('running');
return undefined;
}
}
@ -629,8 +647,8 @@ describe('JobQueue', () => {
return undefined;
}
async run(): Promise<void> {
return Promise.resolve();
async run(): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
return Promise.resolve(undefined);
}
}
@ -670,7 +688,7 @@ describe('JobQueue', () => {
return undefined;
}
async run(): Promise<void> {
async run(): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
events.push('running');
throw new Error('uh oh');
}
@ -751,8 +769,13 @@ describe('JobQueue', () => {
return z.number().parse(data);
}
async run({ data }: Readonly<{ data: number }>): Promise<void> {
async run({
data,
}: Readonly<{ data: number }>): Promise<
typeof JOB_STATUS.NEEDS_RETRY | undefined
> {
eventEmitter.emit('run', data);
return undefined;
}
}
@ -794,8 +817,8 @@ describe('JobQueue', () => {
return data;
}
async run(): Promise<void> {
return Promise.resolve();
async run(): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
return Promise.resolve(undefined);
}
}
@ -827,8 +850,8 @@ describe('JobQueue', () => {
return undefined;
}
async run(): Promise<void> {
return Promise.resolve();
async run(): Promise<typeof JOB_STATUS.NEEDS_RETRY | undefined> {
return Promise.resolve(undefined);
}
}