Challenge: Save conversationIds and start queues
This commit is contained in:
parent
c369e44d8e
commit
bddd55d574
11 changed files with 316 additions and 476 deletions
305
ts/challenge.ts
305
ts/challenge.ts
|
@ -12,15 +12,14 @@
|
|||
// are not immediately retried, however, until `.onOnline()` is called from
|
||||
// when we are actually online.
|
||||
|
||||
import type { MessageModel } from './models/messages';
|
||||
import { assert } from './util/assert';
|
||||
import { isNotNil } from './util/isNotNil';
|
||||
import { isOlderThan } from './util/timestamp';
|
||||
import { parseRetryAfter } from './util/parseRetryAfter';
|
||||
import { clearTimeoutIfNecessary } from './util/clearTimeoutIfNecessary';
|
||||
import { getEnvironment, Environment } from './environment';
|
||||
import type { StorageInterface } from './types/Storage.d';
|
||||
import { HTTPError } from './textsecure/Errors';
|
||||
import type { SendMessageChallengeData } from './textsecure/Errors';
|
||||
import * as log from './logging/log';
|
||||
|
||||
export type ChallengeResponse = {
|
||||
|
@ -36,11 +35,6 @@ export type IPCResponse = {
|
|||
readonly data: ChallengeResponse;
|
||||
};
|
||||
|
||||
export enum RetryMode {
|
||||
Retry = 'Retry',
|
||||
NoImmediateRetry = 'NoImmediateRetry',
|
||||
}
|
||||
|
||||
type Handler = {
|
||||
readonly token: string | undefined;
|
||||
|
||||
|
@ -54,22 +48,12 @@ export type ChallengeData = {
|
|||
readonly captcha: string;
|
||||
};
|
||||
|
||||
export type MinimalMessage = Pick<
|
||||
MessageModel,
|
||||
'id' | 'idForLogging' | 'getLastChallengeError' | 'retrySend'
|
||||
> & {
|
||||
isNormalBubble(): boolean;
|
||||
get(name: 'sent_at'): number;
|
||||
on(event: 'sent', callback: () => void): void;
|
||||
off(event: 'sent', callback: () => void): void;
|
||||
};
|
||||
|
||||
export type Options = {
|
||||
readonly storage: Pick<StorageInterface, 'get' | 'put'>;
|
||||
|
||||
requestChallenge(request: IPCRequest): void;
|
||||
|
||||
getMessageById(messageId: string): Promise<MinimalMessage | undefined>;
|
||||
startQueue(conversationId: string): void;
|
||||
|
||||
sendChallengeResponse(data: ChallengeData): Promise<void>;
|
||||
|
||||
|
@ -81,25 +65,22 @@ export type Options = {
|
|||
expireAfter?: number;
|
||||
};
|
||||
|
||||
export type StoredEntity = {
|
||||
readonly messageId: string;
|
||||
readonly createdAt: number;
|
||||
};
|
||||
export const STORAGE_KEY = 'challenge:conversations';
|
||||
|
||||
type TrackedEntry = {
|
||||
readonly message: MinimalMessage;
|
||||
readonly createdAt: number;
|
||||
};
|
||||
export type RegisteredChallengeType = Readonly<{
|
||||
conversationId: string;
|
||||
createdAt: number;
|
||||
retryAt: number;
|
||||
token?: string;
|
||||
}>;
|
||||
|
||||
const DEFAULT_EXPIRE_AFTER = 24 * 3600 * 1000; // one day
|
||||
const MAX_RETRIES = 5;
|
||||
const CAPTCHA_URL = 'https://signalcaptchas.org/challenge/generate.html';
|
||||
const CAPTCHA_STAGING_URL =
|
||||
'https://signalcaptchas.org/staging/challenge/generate.html';
|
||||
|
||||
function shouldRetrySend(message: MinimalMessage): boolean {
|
||||
const error = message.getLastChallengeError();
|
||||
if (!error || error.retryAfter <= Date.now()) {
|
||||
function shouldStartQueue(registered: RegisteredChallengeType): boolean {
|
||||
if (!registered.retryAt || registered.retryAt <= Date.now()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -117,6 +98,8 @@ export function getChallengeURL(): string {
|
|||
// `ChallengeHandler` should be in memory at the same time because they could
|
||||
// overwrite each others storage data.
|
||||
export class ChallengeHandler {
|
||||
private solving = 0;
|
||||
|
||||
private isLoaded = false;
|
||||
|
||||
private challengeToken: string | undefined;
|
||||
|
@ -127,13 +110,14 @@ export class ChallengeHandler {
|
|||
|
||||
private readonly responseHandlers = new Map<number, Handler>();
|
||||
|
||||
private readonly trackedMessages = new Map<string, TrackedEntry>();
|
||||
private readonly registeredConversations = new Map<
|
||||
string,
|
||||
RegisteredChallengeType
|
||||
>();
|
||||
|
||||
private readonly retryTimers = new Map<string, NodeJS.Timeout>();
|
||||
private readonly startTimers = new Map<string, NodeJS.Timeout>();
|
||||
|
||||
private readonly pendingRetries = new Set<MinimalMessage>();
|
||||
|
||||
private readonly retryCountById = new Map<string, number>();
|
||||
private readonly pendingStarts = new Set<string>();
|
||||
|
||||
constructor(private readonly options: Options) {}
|
||||
|
||||
|
@ -143,43 +127,18 @@ export class ChallengeHandler {
|
|||
}
|
||||
|
||||
this.isLoaded = true;
|
||||
const stored: ReadonlyArray<StoredEntity> =
|
||||
this.options.storage.get('challenge:retry-message-ids') || [];
|
||||
const challenges: ReadonlyArray<RegisteredChallengeType> =
|
||||
this.options.storage.get(STORAGE_KEY) || [];
|
||||
|
||||
log.info(`challenge: loading ${stored.length} messages`);
|
||||
|
||||
const entityMap = new Map<string, StoredEntity>();
|
||||
for (const entity of stored) {
|
||||
entityMap.set(entity.messageId, entity);
|
||||
}
|
||||
|
||||
const retryIds = new Set<string>(stored.map(({ messageId }) => messageId));
|
||||
|
||||
const maybeMessages: ReadonlyArray<MinimalMessage | undefined> =
|
||||
await Promise.all(
|
||||
Array.from(retryIds).map(async messageId =>
|
||||
this.options.getMessageById(messageId)
|
||||
)
|
||||
);
|
||||
|
||||
const messages: Array<MinimalMessage> = maybeMessages.filter(isNotNil);
|
||||
|
||||
log.info(`challenge: loaded ${messages.length} messages`);
|
||||
log.info(`challenge: loading ${challenges.length} challenges`);
|
||||
|
||||
await Promise.all(
|
||||
messages.map(async message => {
|
||||
const entity = entityMap.get(message.id);
|
||||
if (!entity) {
|
||||
log.error(
|
||||
'challenge: unexpected missing entity ' +
|
||||
`for ${message.idForLogging()}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
challenges.map(async challenge => {
|
||||
const expireAfter = this.options.expireAfter || DEFAULT_EXPIRE_AFTER;
|
||||
if (isOlderThan(entity.createdAt, expireAfter)) {
|
||||
log.info(`challenge: expired entity for ${message.idForLogging()}`);
|
||||
if (isOlderThan(challenge.createdAt, expireAfter)) {
|
||||
log.info(
|
||||
`challenge: expired challenge for conversation ${challenge.conversationId}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -190,7 +149,7 @@ export class ChallengeHandler {
|
|||
//
|
||||
// Wait for `.onOnline()` to trigger the retries instead of triggering
|
||||
// them here immediately (if the message is ready to be retried).
|
||||
await this.register(message, RetryMode.NoImmediateRetry, entity);
|
||||
await this.register(challenge);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
@ -204,89 +163,88 @@ export class ChallengeHandler {
|
|||
public async onOnline(): Promise<void> {
|
||||
this.isOnline = true;
|
||||
|
||||
const pending = Array.from(this.pendingRetries.values());
|
||||
this.pendingRetries.clear();
|
||||
const pending = Array.from(this.pendingStarts.values());
|
||||
this.pendingStarts.clear();
|
||||
|
||||
log.info(`challenge: online, retrying ${pending.length} messages`);
|
||||
log.info(`challenge: online, starting ${pending.length} queues`);
|
||||
|
||||
// Retry messages that matured while we were offline
|
||||
await Promise.all(pending.map(message => this.retryOne(message)));
|
||||
// Start queues for challenges that matured while we were offline
|
||||
await Promise.all(
|
||||
pending.map(conversationId => this.startQueue(conversationId))
|
||||
);
|
||||
|
||||
await this.retrySend();
|
||||
await this.startAllQueues();
|
||||
}
|
||||
|
||||
public maybeSolve(conversationId: string): void {
|
||||
const challenge = this.registeredConversations.get(conversationId);
|
||||
if (!challenge) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.solving > 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (challenge.token) {
|
||||
this.solve(challenge.token);
|
||||
}
|
||||
}
|
||||
|
||||
public async register(
|
||||
message: MinimalMessage,
|
||||
retry = RetryMode.Retry,
|
||||
entity?: StoredEntity
|
||||
challenge: RegisteredChallengeType,
|
||||
data?: SendMessageChallengeData
|
||||
): Promise<void> {
|
||||
if (this.isRegistered(message)) {
|
||||
log.info(
|
||||
`challenge: message already registered ${message.idForLogging()}`
|
||||
);
|
||||
const { conversationId } = challenge;
|
||||
|
||||
if (this.isRegistered(conversationId)) {
|
||||
log.info(`challenge: conversation ${conversationId} already registered`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.trackedMessages.set(message.id, {
|
||||
message,
|
||||
createdAt: entity ? entity.createdAt : Date.now(),
|
||||
});
|
||||
this.registeredConversations.set(conversationId, challenge);
|
||||
await this.persist();
|
||||
|
||||
// Message is already retryable - initiate new send
|
||||
if (retry === RetryMode.Retry && shouldRetrySend(message)) {
|
||||
// Challenge is already retryable - start the queue
|
||||
if (shouldStartQueue(challenge)) {
|
||||
log.info(
|
||||
`challenge: sending message immediately ${message.idForLogging()}`
|
||||
`challenge: starting conversation ${conversationId} immediately`
|
||||
);
|
||||
await this.retryOne(message);
|
||||
await this.startQueue(conversationId);
|
||||
return;
|
||||
}
|
||||
|
||||
const error = message.getLastChallengeError();
|
||||
if (!error) {
|
||||
log.error('Unexpected message without challenge error');
|
||||
return;
|
||||
}
|
||||
|
||||
const waitTime = Math.max(0, error.retryAfter - Date.now());
|
||||
const oldTimer = this.retryTimers.get(message.id);
|
||||
const waitTime = Math.max(0, challenge.retryAt - Date.now());
|
||||
const oldTimer = this.startTimers.get(conversationId);
|
||||
if (oldTimer) {
|
||||
clearTimeoutIfNecessary(oldTimer);
|
||||
}
|
||||
this.retryTimers.set(
|
||||
message.id,
|
||||
this.startTimers.set(
|
||||
conversationId,
|
||||
setTimeout(() => {
|
||||
this.retryTimers.delete(message.id);
|
||||
this.startTimers.delete(conversationId);
|
||||
|
||||
this.retryOne(message);
|
||||
this.startQueue(conversationId);
|
||||
}, waitTime)
|
||||
);
|
||||
|
||||
log.info(
|
||||
`challenge: tracking ${message.idForLogging()} ` +
|
||||
`with waitTime=${waitTime}`
|
||||
);
|
||||
log.info(`challenge: tracking ${conversationId} with waitTime=${waitTime}`);
|
||||
|
||||
if (!error.data.options || !error.data.options.includes('recaptcha')) {
|
||||
if (data && !data.options?.includes('recaptcha')) {
|
||||
log.error(
|
||||
`challenge: unexpected options ${JSON.stringify(error.data.options)}`
|
||||
`challenge: unexpected options ${JSON.stringify(data.options)}`
|
||||
);
|
||||
}
|
||||
|
||||
if (!error.data.token) {
|
||||
if (!challenge.token) {
|
||||
const dataString = JSON.stringify(data);
|
||||
log.error(
|
||||
`challenge: no token in challenge error ${JSON.stringify(error.data)}`
|
||||
`challenge: ${conversationId} is waiting; no token in data ${dataString}`
|
||||
);
|
||||
} else if (message.isNormalBubble()) {
|
||||
// Display challenge dialog only for core messages
|
||||
// (e.g. text, attachment, embedded contact, or sticker)
|
||||
//
|
||||
// Note: not waiting on this call intentionally since it waits for
|
||||
// challenge to be fully completed.
|
||||
this.solve(error.data.token);
|
||||
} else {
|
||||
log.info(`challenge: not a bubble message ${message.idForLogging()}`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.solve(challenge.token);
|
||||
}
|
||||
|
||||
public onResponse(response: IPCResponse): void {
|
||||
|
@ -299,13 +257,13 @@ export class ChallengeHandler {
|
|||
handler.resolve(response.data);
|
||||
}
|
||||
|
||||
public async unregister(message: MinimalMessage): Promise<void> {
|
||||
log.info(`challenge: unregistered ${message.idForLogging()}`);
|
||||
this.trackedMessages.delete(message.id);
|
||||
this.pendingRetries.delete(message);
|
||||
public async unregister(conversationId: string): Promise<void> {
|
||||
log.info(`challenge: unregistered conversation ${conversationId}`);
|
||||
this.registeredConversations.delete(conversationId);
|
||||
this.pendingStarts.delete(conversationId);
|
||||
|
||||
const timer = this.retryTimers.get(message.id);
|
||||
this.retryTimers.delete(message.id);
|
||||
const timer = this.startTimers.get(conversationId);
|
||||
this.startTimers.delete(conversationId);
|
||||
clearTimeoutIfNecessary(timer);
|
||||
|
||||
await this.persist();
|
||||
|
@ -330,95 +288,45 @@ export class ChallengeHandler {
|
|||
'ChallengeHandler has to be loaded before persisting new data'
|
||||
);
|
||||
await this.options.storage.put(
|
||||
'challenge:retry-message-ids',
|
||||
Array.from(this.trackedMessages.entries()).map(
|
||||
([messageId, { createdAt }]) => {
|
||||
return { messageId, createdAt };
|
||||
}
|
||||
)
|
||||
STORAGE_KEY,
|
||||
Array.from(this.registeredConversations.values())
|
||||
);
|
||||
}
|
||||
|
||||
private isRegistered(message: MinimalMessage): boolean {
|
||||
return this.trackedMessages.has(message.id);
|
||||
public isRegistered(conversationId: string): boolean {
|
||||
return this.registeredConversations.has(conversationId);
|
||||
}
|
||||
|
||||
private async retrySend(force = false): Promise<void> {
|
||||
log.info(`challenge: retrySend force=${force}`);
|
||||
private startAllQueues({
|
||||
force = false,
|
||||
}: {
|
||||
force?: boolean;
|
||||
} = {}): void {
|
||||
log.info(`challenge: startAllQueues force=${force}`);
|
||||
|
||||
const retries = Array.from(this.trackedMessages.values())
|
||||
.map(({ message }) => message)
|
||||
// Sort messages in `sent_at` order
|
||||
.sort((a, b) => a.get('sent_at') - b.get('sent_at'))
|
||||
.filter(message => force || shouldRetrySend(message))
|
||||
.map(message => this.retryOne(message));
|
||||
|
||||
await Promise.all(retries);
|
||||
Array.from(this.registeredConversations.values())
|
||||
.filter(challenge => force || shouldStartQueue(challenge))
|
||||
.forEach(challenge => this.startQueue(challenge.conversationId));
|
||||
}
|
||||
|
||||
private async retryOne(message: MinimalMessage): Promise<void> {
|
||||
// Send is already pending
|
||||
if (!this.isRegistered(message)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// We are not online
|
||||
private async startQueue(conversationId: string): Promise<void> {
|
||||
if (!this.isOnline) {
|
||||
this.pendingRetries.add(message);
|
||||
this.pendingStarts.add(conversationId);
|
||||
return;
|
||||
}
|
||||
|
||||
const retryCount = this.retryCountById.get(message.id) || 0;
|
||||
log.info(
|
||||
`challenge: retrying sending ${message.idForLogging()}, ` +
|
||||
`retry count: ${retryCount}`
|
||||
);
|
||||
await this.unregister(conversationId);
|
||||
|
||||
if (retryCount === MAX_RETRIES) {
|
||||
log.info(
|
||||
`challenge: dropping message ${message.idForLogging()}, ` +
|
||||
'too many failed retries'
|
||||
);
|
||||
|
||||
// Keep the message registered so that we'll retry sending it on app
|
||||
// restart.
|
||||
return;
|
||||
if (this.registeredConversations.size === 0) {
|
||||
this.options.setChallengeStatus('idle');
|
||||
}
|
||||
|
||||
await this.unregister(message);
|
||||
|
||||
let sent = false;
|
||||
const onSent = () => {
|
||||
sent = true;
|
||||
};
|
||||
message.on('sent', onSent);
|
||||
|
||||
try {
|
||||
await message.retrySend();
|
||||
} catch (error) {
|
||||
log.error(
|
||||
`challenge: failed to send ${message.idForLogging()} due to ` +
|
||||
`error: ${error && error.stack}`
|
||||
);
|
||||
} finally {
|
||||
message.off('sent', onSent);
|
||||
}
|
||||
|
||||
if (sent) {
|
||||
log.info(`challenge: message ${message.idForLogging()} sent`);
|
||||
this.retryCountById.delete(message.id);
|
||||
if (this.trackedMessages.size === 0) {
|
||||
this.options.setChallengeStatus('idle');
|
||||
}
|
||||
} else {
|
||||
log.info(`challenge: message ${message.idForLogging()} not sent`);
|
||||
|
||||
this.retryCountById.set(message.id, retryCount + 1);
|
||||
await this.register(message, RetryMode.NoImmediateRetry);
|
||||
}
|
||||
log.info(`startQueue: starting queue ${conversationId}`);
|
||||
this.options.startQueue(conversationId);
|
||||
}
|
||||
|
||||
private async solve(token: string): Promise<void> {
|
||||
this.solving += 1;
|
||||
this.options.setChallengeStatus('required');
|
||||
this.challengeToken = token;
|
||||
|
||||
|
@ -426,6 +334,7 @@ export class ChallengeHandler {
|
|||
|
||||
// Another `.solve()` has completed earlier than us
|
||||
if (this.challengeToken === undefined) {
|
||||
this.solving -= 1;
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -445,6 +354,7 @@ export class ChallengeHandler {
|
|||
} catch (error) {
|
||||
log.error(`challenge: challenge failure, error: ${error && error.stack}`);
|
||||
this.options.setChallengeStatus('required');
|
||||
this.solving -= 1;
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -452,7 +362,8 @@ export class ChallengeHandler {
|
|||
|
||||
this.options.setChallengeStatus('idle');
|
||||
|
||||
this.retrySend(true);
|
||||
this.startAllQueues({ force: true });
|
||||
this.solving -= 1;
|
||||
}
|
||||
|
||||
private async sendChallengeResponse(data: ChallengeData): Promise<void> {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue