signal-desktop/ts/routineProfileRefresh.ts

234 lines
6.4 KiB
TypeScript
Raw Normal View History

2023-01-03 19:55:46 +00:00
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { isNil, sortBy } from 'lodash';
import PQueue from 'p-queue';
import * as log from './logging/log';
import { assertDev } from './util/assert';
2022-07-08 20:46:25 +00:00
import { sleep } from './util/sleep';
import { isNormalNumber } from './util/isNormalNumber';
import { take } from './util/iterables';
import type { ConversationModel } from './models/conversations';
import type { StorageInterface } from './types/Storage.d';
2022-07-08 20:46:25 +00:00
import * as Errors from './types/errors';
2022-02-11 21:38:52 +00:00
import { getProfile } from './util/getProfile';
import { drop } from './util/drop';
import { MINUTE, HOUR, DAY, WEEK } from './util/durations';
import { isDirectConversation } from './util/whatTypeOfConversation';
const STORAGE_KEY = 'lastAttemptedToRefreshProfilesAt';
const MAX_AGE_TO_BE_CONSIDERED_RECENTLY_REFRESHED = 3 * DAY;
const MAX_CONVERSATIONS_TO_REFRESH = 50;
const MIN_ELAPSED_DURATION_TO_REFRESH_AGAIN = HOUR;
2022-07-08 20:46:25 +00:00
const MIN_REFRESH_DELAY = MINUTE;
let idCounter = 1;
2022-07-08 20:46:25 +00:00
export class RoutineProfileRefresher {
private started = false;
private id: number;
2022-07-08 20:46:25 +00:00
constructor(
private readonly options: {
getAllConversations: () => ReadonlyArray<ConversationModel>;
getOurConversationId: () => string | undefined;
storage: Pick<StorageInterface, 'get' | 'put'>;
}
) {
// We keep track of how many of these classes we create, because we suspect that
// there might be too many...
idCounter += 1;
this.id = idCounter;
log.info(
`Creating new RoutineProfileRefresher instance with id ${this.id}`
);
}
2022-07-08 20:46:25 +00:00
public async start(): Promise<void> {
const logId = `RoutineProfileRefresher.start/${this.id}`;
if (this.started) {
log.warn(`${logId}: already started!`);
return;
2022-07-08 20:46:25 +00:00
}
this.started = true;
2022-07-08 20:46:25 +00:00
const { storage, getAllConversations, getOurConversationId } = this.options;
// eslint-disable-next-line no-constant-condition
while (true) {
const refreshInMs = timeUntilNextRefresh(storage);
log.info(`${logId}: waiting for ${refreshInMs}ms`);
2022-07-08 20:46:25 +00:00
// eslint-disable-next-line no-await-in-loop
await sleep(refreshInMs);
const ourConversationId = getOurConversationId();
if (!ourConversationId) {
log.warn(`${logId}: missing our conversation id`);
2022-07-08 20:46:25 +00:00
// eslint-disable-next-line no-await-in-loop
await sleep(MIN_REFRESH_DELAY);
continue;
}
try {
// eslint-disable-next-line no-await-in-loop
await routineProfileRefresh({
allConversations: getAllConversations(),
ourConversationId,
storage,
id: this.id,
2022-07-08 20:46:25 +00:00
});
} catch (error) {
log.error(`${logId}: failure`, Errors.toLogFormat(error));
} finally {
2022-07-08 20:46:25 +00:00
// eslint-disable-next-line no-await-in-loop
await sleep(MIN_REFRESH_DELAY);
}
}
}
}
export async function routineProfileRefresh({
allConversations,
ourConversationId,
storage,
id,
2022-02-11 21:38:52 +00:00
// Only for tests
getProfileFn = getProfile,
}: {
2022-07-08 20:46:25 +00:00
allConversations: ReadonlyArray<ConversationModel>;
ourConversationId: string;
storage: Pick<StorageInterface, 'get' | 'put'>;
id: number;
2022-02-11 21:38:52 +00:00
getProfileFn?: typeof getProfile;
}): Promise<void> {
const logId = `routineProfileRefresh/${id}`;
log.info(`${logId}: starting`);
2022-07-08 20:46:25 +00:00
const refreshInMs = timeUntilNextRefresh(storage);
if (refreshInMs > 0) {
log.info(`${logId}: too soon to refresh. Doing nothing`);
return;
}
log.info(`${logId}: updating last refresh time`);
await storage.put(STORAGE_KEY, Date.now());
const conversationsToRefresh = getConversationsToRefresh(
allConversations,
ourConversationId
);
log.info(`${logId}: starting to refresh conversations`);
let totalCount = 0;
let successCount = 0;
async function refreshConversation(
conversation: ConversationModel
): Promise<void> {
log.info(`${logId}: refreshing profile for ${conversation.idForLogging()}`);
totalCount += 1;
try {
2022-02-11 21:38:52 +00:00
await getProfileFn(conversation.get('uuid'), conversation.get('e164'));
log.info(
`${logId}: refreshed profile for ${conversation.idForLogging()}`
);
successCount += 1;
} catch (err) {
log.error(
`${logId}: refreshed profile for ${conversation.idForLogging()}`,
Errors.toLogFormat(err)
);
}
}
2021-11-23 22:01:03 +00:00
const refreshQueue = new PQueue({
concurrency: 5,
timeout: MINUTE * 30,
2021-11-23 22:01:03 +00:00
throwOnTimeout: true,
});
for (const conversation of conversationsToRefresh) {
drop(refreshQueue.add(() => refreshConversation(conversation)));
}
await refreshQueue.onIdle();
log.info(
`${logId}: successfully refreshed ${successCount} out of ${totalCount} conversation(s)`
);
}
2022-07-08 20:46:25 +00:00
function timeUntilNextRefresh(storage: Pick<StorageInterface, 'get'>): number {
const storedValue = storage.get(STORAGE_KEY);
if (isNil(storedValue)) {
2022-07-08 20:46:25 +00:00
return 0;
}
if (isNormalNumber(storedValue)) {
2022-07-08 20:46:25 +00:00
const planned = storedValue + MIN_ELAPSED_DURATION_TO_REFRESH_AGAIN;
const now = Date.now();
return Math.min(Math.max(0, planned - now), WEEK);
}
assertDev(
false,
`An invalid value was stored in ${STORAGE_KEY}; treating it as nil`
);
2022-07-08 20:46:25 +00:00
return 0;
}
function getConversationsToRefresh(
conversations: ReadonlyArray<ConversationModel>,
ourConversationId: string
): Iterable<ConversationModel> {
const filteredConversations = getFilteredConversations(
conversations,
ourConversationId
);
return take(filteredConversations, MAX_CONVERSATIONS_TO_REFRESH);
}
function* getFilteredConversations(
conversations: ReadonlyArray<ConversationModel>,
ourConversationId: string
): Iterable<ConversationModel> {
const filtered = conversations.filter(
c =>
isDirectConversation(c.attributes) &&
!c.isUnregisteredAndStale() &&
c.get('uuid')
);
const sorted = sortBy(filtered, c => c.get('profileLastFetchedAt') || 0);
for (const conversation of sorted) {
if (conversation.id === ourConversationId) {
if (conversation.hasProfileKeyCredentialExpired()) {
yield conversation;
}
continue;
}
if (!hasRefreshedProfileRecently(conversation)) {
yield conversation;
}
}
}
function hasRefreshedProfileRecently(
conversation: Readonly<ConversationModel>
): boolean {
const profileLastFetchedAt = conversation.get('profileLastFetchedAt');
return (
isNormalNumber(profileLastFetchedAt) &&
profileLastFetchedAt + MAX_AGE_TO_BE_CONSIDERED_RECENTLY_REFRESHED >
Date.now()
);
}