Enable more specific AttachmentDownload prioritization

This commit is contained in:
trevor-signal 2024-04-15 20:11:48 -04:00 committed by GitHub
parent 87ea909ae9
commit fc02762588
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 2245 additions and 817 deletions

View file

@ -35,7 +35,6 @@ import { ipcInvoke, doShutdown } from './channels';
import type {
AdjacentMessagesByConversationOptionsType,
AllItemsType,
AttachmentDownloadJobType,
ClientInterface,
ClientExclusiveInterface,
ClientSearchResultMessageType,
@ -66,6 +65,7 @@ import { getMessageIdForLogging } from '../util/idForLogging';
import type { MessageAttributesType } from '../model-types';
import { incrementMessageCounter } from '../util/incrementMessageCounter';
import { generateSnippetAroundMention } from '../util/search';
import type { AttachmentDownloadJobType } from '../types/AttachmentDownload';
const ERASE_SQL_KEY = 'erase-sql-key';
const ERASE_ATTACHMENTS_KEY = 'erase-attachments';

View file

@ -10,7 +10,6 @@ import type { StoredJob } from '../jobs/types';
import type { ReactionType, ReactionReadStatus } from '../types/Reactions';
import type { ConversationColorType, CustomColorType } from '../types/Colors';
import type { StorageAccessType } from '../types/Storage.d';
import type { AttachmentType } from '../types/Attachment';
import type { BytesToStrings } from '../types/Util';
import type { QualifiedAddressStringType } from '../types/QualifiedAddress';
import type { StoryDistributionIdString } from '../types/StoryDistributionId';
@ -31,6 +30,7 @@ import type {
CallHistoryPagination,
} from '../types/CallDisposition';
import type { CallLinkType, CallLinkRestrictions } from '../types/CallLink';
import type { AttachmentDownloadJobType } from '../types/AttachmentDownload';
export type AdjacentMessagesByConversationOptionsType = Readonly<{
conversationId: string;
@ -51,24 +51,6 @@ export type GetNearbyMessageFromDeletedSetOptionsType = Readonly<{
includeStoryReplies: boolean;
}>;
export type AttachmentDownloadJobTypeType =
| 'long-message'
| 'attachment'
| 'preview'
| 'contact'
| 'quote'
| 'sticker';
export type AttachmentDownloadJobType = {
attachment: AttachmentType;
attempts: number;
id: string;
index: number;
messageId: string;
pending: number;
timestamp: number;
type: AttachmentDownloadJobTypeType;
};
export type MessageMetricsType = {
id: string;
received_at: number;
@ -741,21 +723,22 @@ export type DataInterface = {
/** only for testing */
removeAllUnprocessed: () => Promise<void>;
getAttachmentDownloadJobById: (
id: string
) => Promise<AttachmentDownloadJobType | undefined>;
getNextAttachmentDownloadJobs: (
limit?: number,
options?: { timestamp?: number }
) => Promise<Array<AttachmentDownloadJobType>>;
getAttachmentDownloadJob(
job: Pick<
AttachmentDownloadJobType,
'messageId' | 'attachmentType' | 'digest'
>
): AttachmentDownloadJobType;
getNextAttachmentDownloadJobs: (options: {
limit: number;
prioritizeMessageIds?: Array<string>;
timestamp?: number;
}) => Promise<Array<AttachmentDownloadJobType>>;
saveAttachmentDownloadJob: (job: AttachmentDownloadJobType) => Promise<void>;
resetAttachmentDownloadPending: () => Promise<void>;
setAttachmentDownloadJobPending: (
id: string,
pending: boolean
resetAttachmentDownloadActive: () => Promise<void>;
removeAttachmentDownloadJob: (
job: AttachmentDownloadJobType
) => Promise<void>;
removeAttachmentDownloadJob: (id: string) => Promise<number>;
removeAllAttachmentDownloadJobs: () => Promise<number>;
createOrUpdateStickerPack: (pack: StickerPackType) => Promise<void>;
updateStickerPackStatus: (

View file

@ -88,7 +88,6 @@ import { updateSchema } from './migrations';
import type {
AdjacentMessagesByConversationOptionsType,
StoredAllItemsType,
AttachmentDownloadJobType,
ConversationMetricsType,
ConversationType,
DeleteSentProtoRecipientOptionsType,
@ -173,6 +172,10 @@ import {
updateCallLinkState,
} from './server/callLinks';
import { CallMode } from '../types/Calling';
import {
attachmentDownloadJobSchema,
type AttachmentDownloadJobType,
} from '../types/AttachmentDownload';
type ConversationRow = Readonly<{
json: string;
@ -353,13 +356,11 @@ const dataInterface: ServerInterface = {
removeUnprocessed,
removeAllUnprocessed,
getAttachmentDownloadJobById,
getAttachmentDownloadJob,
getNextAttachmentDownloadJobs,
saveAttachmentDownloadJob,
resetAttachmentDownloadPending,
setAttachmentDownloadJobPending,
resetAttachmentDownloadActive,
removeAttachmentDownloadJob,
removeAllAttachmentDownloadJobs,
createOrUpdateStickerPack,
updateStickerPackStatus,
@ -4403,127 +4404,184 @@ async function removeAllUnprocessed(): Promise<void> {
// Attachment Downloads
const ATTACHMENT_DOWNLOADS_TABLE = 'attachment_downloads';
async function getAttachmentDownloadJobById(
id: string
): Promise<AttachmentDownloadJobType | undefined> {
return getById(getReadonlyInstance(), ATTACHMENT_DOWNLOADS_TABLE, id);
function getAttachmentDownloadJob(
job: Pick<
AttachmentDownloadJobType,
'messageId' | 'attachmentType' | 'digest'
>
): AttachmentDownloadJobType {
const db = getReadonlyInstance();
const [query, params] = sql`
SELECT * FROM attachment_downloads
WHERE
messageId = ${job.messageId}
AND
attachmentType = ${job.attachmentType}
AND
digest = ${job.digest};
`;
return db.prepare(query).get(params);
}
async function getNextAttachmentDownloadJobs(
limit?: number,
options: { timestamp?: number } = {}
): Promise<Array<AttachmentDownloadJobType>> {
async function getNextAttachmentDownloadJobs({
limit = 3,
prioritizeMessageIds,
timestamp = Date.now(),
maxLastAttemptForPrioritizedMessages,
}: {
limit: number;
prioritizeMessageIds?: Array<string>;
timestamp?: number;
maxLastAttemptForPrioritizedMessages?: number;
}): Promise<Array<AttachmentDownloadJobType>> {
const db = await getWritableInstance();
const timestamp =
options && options.timestamp ? options.timestamp : Date.now();
const rows: Array<{ json: string; id: string }> = db
.prepare<Query>(
`
SELECT id, json
FROM attachment_downloads
WHERE pending = 0 AND timestamp <= $timestamp
ORDER BY timestamp DESC
LIMIT $limit;
`
)
.all({
limit: limit || 3,
timestamp,
});
let priorityJobs = [];
const INNER_ERROR = 'jsonToObject error';
// First, try to get jobs for prioritized messages (e.g. those currently user-visible)
if (prioritizeMessageIds?.length) {
const [priorityQuery, priorityParams] = sql`
SELECT * FROM attachment_downloads
-- very few rows will match messageIds, so in this case we want to optimize
-- the WHERE clause rather than the ORDER BY
INDEXED BY attachment_downloads_active_messageId
WHERE
active = 0
AND
-- for priority messages, we want to retry based on the last attempt, rather than retryAfter
(lastAttemptTimestamp is NULL OR lastAttemptTimestamp <= ${
maxLastAttemptForPrioritizedMessages ?? timestamp - durations.HOUR
})
AND
messageId IN (${sqlJoin(prioritizeMessageIds)})
-- for priority messages, let's load them oldest first; this helps, e.g. for stories where we
-- want the oldest one first
ORDER BY receivedAt ASC
LIMIT ${limit}
`;
priorityJobs = db.prepare(priorityQuery).all(priorityParams);
}
// Next, get any other jobs, sorted by receivedAt
const numJobsRemaining = limit - priorityJobs.length;
let standardJobs = [];
if (numJobsRemaining > 0) {
const [query, params] = sql`
SELECT * FROM attachment_downloads
WHERE
active = 0
AND
(retryAfter is NULL OR retryAfter <= ${timestamp})
ORDER BY receivedAt DESC
LIMIT ${numJobsRemaining}
`;
standardJobs = db.prepare(query).all(params);
}
const allJobs = priorityJobs.concat(standardJobs);
const INNER_ERROR = 'jsonToObject or SchemaParse error';
try {
return rows.map(row => {
return allJobs.map(row => {
try {
return jsonToObject(row.json);
return attachmentDownloadJobSchema.parse({
...row,
active: Boolean(row.active),
attachment: jsonToObject(row.attachmentJson),
});
} catch (error) {
logger.error(
`getNextAttachmentDownloadJobs: Error with job '${row.id}', deleting. ` +
`JSON: '${row.json}' ` +
`Error: ${Errors.toLogFormat(error)}`
`getNextAttachmentDownloadJobs: Error with job for message ${row.messageId}, deleting.`
);
removeAttachmentDownloadJobSync(db, row.id);
throw new Error(INNER_ERROR);
removeAttachmentDownloadJobSync(db, row);
throw new Error(error);
}
});
} catch (error) {
if ('message' in error && error.message === INNER_ERROR) {
return getNextAttachmentDownloadJobs(limit, { timestamp });
return getNextAttachmentDownloadJobs({
limit,
prioritizeMessageIds,
timestamp,
maxLastAttemptForPrioritizedMessages,
});
}
throw error;
}
}
async function saveAttachmentDownloadJob(
job: AttachmentDownloadJobType
): Promise<void> {
const db = await getWritableInstance();
const { id, pending, timestamp } = job;
if (!id) {
throw new Error(
'saveAttachmentDownloadJob: Provided job did not have a truthy id'
);
}
db.prepare<Query>(
`
const [query, params] = sql`
INSERT OR REPLACE INTO attachment_downloads (
id,
pending,
timestamp,
json
) values (
$id,
$pending,
$timestamp,
$json
)
`
).run({
id,
pending,
timestamp,
json: objectToJSON(job),
});
messageId,
attachmentType,
digest,
receivedAt,
sentAt,
contentType,
size,
active,
attempts,
retryAfter,
lastAttemptTimestamp,
attachmentJson
) VALUES (
${job.messageId},
${job.attachmentType},
${job.digest},
${job.receivedAt},
${job.sentAt},
${job.contentType},
${job.size},
${job.active ? 1 : 0},
${job.attempts},
${job.retryAfter},
${job.lastAttemptTimestamp},
${objectToJSON(job.attachment)}
);
`;
db.prepare(query).run(params);
}
async function setAttachmentDownloadJobPending(
id: string,
pending: boolean
): Promise<void> {
const db = await getWritableInstance();
db.prepare<Query>(
`
UPDATE attachment_downloads
SET pending = $pending
WHERE id = $id;
`
).run({
id,
pending: pending ? 1 : 0,
});
}
async function resetAttachmentDownloadPending(): Promise<void> {
async function resetAttachmentDownloadActive(): Promise<void> {
const db = await getWritableInstance();
db.prepare<EmptyQuery>(
`
UPDATE attachment_downloads
SET pending = 0
WHERE pending != 0;
SET active = 0
WHERE active != 0;
`
).run();
}
function removeAttachmentDownloadJobSync(db: Database, id: string): number {
return removeById(db, ATTACHMENT_DOWNLOADS_TABLE, id);
function removeAttachmentDownloadJobSync(
db: Database,
job: AttachmentDownloadJobType
): void {
const [query, params] = sql`
DELETE FROM attachment_downloads
WHERE
messageId = ${job.messageId}
AND
attachmentType = ${job.attachmentType}
AND
digest = ${job.digest};
`;
db.prepare(query).run(params);
}
async function removeAttachmentDownloadJob(id: string): Promise<number> {
async function removeAttachmentDownloadJob(
job: AttachmentDownloadJobType
): Promise<void> {
const db = await getWritableInstance();
return removeAttachmentDownloadJobSync(db, id);
}
async function removeAllAttachmentDownloadJobs(): Promise<number> {
return removeAllFromTable(
await getWritableInstance(),
ATTACHMENT_DOWNLOADS_TABLE
);
return removeAttachmentDownloadJobSync(db, job);
}
// Stickers

View file

@ -0,0 +1,208 @@
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import type { Database } from '@signalapp/better-sqlite3';
import type { LoggerType } from '../../types/Logging';
import {
attachmentDownloadJobSchema,
type AttachmentDownloadJobType,
type AttachmentDownloadJobTypeType,
} from '../../types/AttachmentDownload';
import type { AttachmentType } from '../../types/Attachment';
import { jsonToObject, objectToJSON, sql } from '../util';
export const version = 1040;
export type LegacyAttachmentDownloadJobType = {
attachment: AttachmentType;
attempts: number;
id: string;
index: number;
messageId: string;
pending: number;
timestamp: number;
type: AttachmentDownloadJobTypeType;
};
export function updateToSchemaVersion1040(
currentVersion: number,
db: Database,
logger: LoggerType
): void {
if (currentVersion >= 1040) {
return;
}
db.transaction(() => {
// 1. Load all existing rows into memory (shouldn't be many)
const existingJobs: Array<{
id: string | null;
timestamp: number | null;
pending: number | null;
json: string | null;
}> = db
.prepare(
`
SELECT id, timestamp, pending, json from attachment_downloads
`
)
.all();
logger.info(
`updateToSchemaVersion1040: loaded ${existingJobs.length} existing jobs`
);
// 2. Create new temp table, with a couple new columns and stricter typing
db.exec(`
CREATE TABLE tmp_attachment_downloads (
messageId TEXT NOT NULL REFERENCES messages(id) ON DELETE CASCADE,
attachmentType TEXT NOT NULL,
digest TEXT NOT NULL,
receivedAt INTEGER NOT NULL,
sentAt INTEGER NOT NULL,
contentType TEXT NOT NULL,
size INTEGER NOT NULL,
attachmentJson TEXT NOT NULL,
active INTEGER NOT NULL,
attempts INTEGER NOT NULL,
retryAfter INTEGER,
lastAttemptTimestamp INTEGER,
PRIMARY KEY (messageId, attachmentType, digest)
) STRICT;
`);
// 3. Drop existing table
db.exec('DROP TABLE attachment_downloads;');
// 4. Rename temp table
db.exec(
'ALTER TABLE tmp_attachment_downloads RENAME TO attachment_downloads;'
);
// 5. Add new index on active & receivedAt. For most queries when there are lots of
// jobs (like during backup restore), many jobs will match the the WHERE clause, so
// the ORDER BY on receivedAt is probably the most expensive part.
db.exec(`
CREATE INDEX attachment_downloads_active_receivedAt
ON attachment_downloads (
active, receivedAt
);
`);
// 6. Add new index on active & messageId. In order to prioritize visible messages,
// we'll also query for rows with a matching messageId. For these, the messageId
// matching is likely going to be the most expensive part.
db.exec(`
CREATE INDEX attachment_downloads_active_messageId
ON attachment_downloads (
active, messageId
);
`);
// 7. Add new index just on messageId, for the ON DELETE CASCADE foreign key
// constraint
db.exec(`
CREATE INDEX attachment_downloads_messageId
ON attachment_downloads (
messageId
);
`);
// 8. Rewrite old rows to match new schema
const rowsToTransfer: Array<AttachmentDownloadJobType> = [];
for (const existingJob of existingJobs) {
try {
// Type this as partial in case there is missing data
const existingJobData: Partial<LegacyAttachmentDownloadJobType> =
jsonToObject(existingJob.json ?? '');
const updatedJob: Partial<AttachmentDownloadJobType> = {
messageId: existingJobData.messageId,
attachmentType: existingJobData.type,
attachment: existingJobData.attachment,
// The existing timestamp column works reasonably well in place of
// actually retrieving the message's receivedAt
receivedAt: existingJobData.timestamp ?? Date.now(),
sentAt: existingJobData.timestamp ?? Date.now(),
digest: existingJobData.attachment?.digest,
contentType: existingJobData.attachment?.contentType,
size: existingJobData.attachment?.size,
active: false, // all jobs are inactive on app start
attempts: existingJobData.attempts ?? 0,
retryAfter: null,
lastAttemptTimestamp: null,
};
const parsed = attachmentDownloadJobSchema.parse(updatedJob);
rowsToTransfer.push(parsed as AttachmentDownloadJobType);
} catch {
logger.warn(
`updateToSchemaVersion1040: unable to transfer job ${existingJob.id} to new table; invalid data`
);
}
}
let numTransferred = 0;
if (rowsToTransfer.length) {
logger.info(
`updateToSchemaVersion1040: transferring ${rowsToTransfer.length} rows`
);
for (const row of rowsToTransfer) {
const [insertQuery, insertParams] = sql`
INSERT INTO attachment_downloads
(
messageId,
attachmentType,
receivedAt,
sentAt,
digest,
contentType,
size,
attachmentJson,
active,
attempts,
retryAfter,
lastAttemptTimestamp
)
VALUES
(
${row.messageId},
${row.attachmentType},
${row.receivedAt},
${row.sentAt},
${row.digest},
${row.contentType},
${row.size},
${objectToJSON(row.attachment)},
${row.active ? 1 : 0},
${row.attempts},
${row.retryAfter},
${row.lastAttemptTimestamp}
);
`;
try {
db.prepare(insertQuery).run(insertParams);
numTransferred += 1;
} catch (error) {
logger.error(
'updateToSchemaVersion1040: error when transferring row',
error
);
}
}
}
logger.info(
`updateToSchemaVersion1040: transferred ${numTransferred} rows, removed ${
existingJobs.length - numTransferred
}`
);
})();
db.pragma('user_version = 1040');
logger.info('updateToSchemaVersion1040: success!');
}

View file

@ -78,10 +78,11 @@ import { updateToSchemaVersion990 } from './990-phone-number-sharing';
import { updateToSchemaVersion1000 } from './1000-mark-unread-call-history-messages-as-unseen';
import { updateToSchemaVersion1010 } from './1010-call-links-table';
import { updateToSchemaVersion1020 } from './1020-self-merges';
import { updateToSchemaVersion1030 } from './1030-unblock-event';
import {
updateToSchemaVersion1040,
version as MAX_VERSION,
updateToSchemaVersion1030,
} from './1030-unblock-event';
} from './1040-undownloaded-backed-up-media';
function updateToSchemaVersion1(
currentVersion: number,
@ -2027,6 +2028,7 @@ export const SCHEMA_VERSIONS = [
updateToSchemaVersion1010,
updateToSchemaVersion1020,
updateToSchemaVersion1030,
updateToSchemaVersion1040,
];
export class DBVersionFromFutureError extends Error {