1ae77a32db
Co-authored-by: trevor-signal <131492920+trevor-signal@users.noreply.github.com>
484 lines
12 KiB
TypeScript
484 lines
12 KiB
TypeScript
// Copyright 2024 Signal Messenger, LLC
|
|
// SPDX-License-Identifier: AGPL-3.0-only
|
|
import { omit } from 'lodash';
|
|
import { assert } from 'chai';
|
|
|
|
import type { ReadableDB, WritableDB } from '../../sql/Interface';
|
|
import { jsonToObject, objectToJSON, sql, sqlJoin } from '../../sql/util';
|
|
import { createDB, updateToVersion } from './helpers';
|
|
import type { LegacyAttachmentDownloadJobType } from '../../sql/migrations/1040-undownloaded-backed-up-media';
|
|
import type { AttachmentType } from '../../types/Attachment';
|
|
import type { AttachmentDownloadJobType } from '../../types/AttachmentDownload';
|
|
import { IMAGE_JPEG } from '../../types/MIME';
|
|
|
|
function getAttachmentDownloadJobs(db: ReadableDB) {
|
|
const [query] = sql`
|
|
SELECT * FROM attachment_downloads ORDER BY receivedAt DESC;
|
|
`;
|
|
|
|
return db
|
|
.prepare(query)
|
|
.all()
|
|
.map(job => ({
|
|
...omit(job, 'attachmentJson'),
|
|
attachment: jsonToObject(job.attachmentJson),
|
|
}));
|
|
}
|
|
|
|
type UnflattenedAttachmentDownloadJobType = Omit<
|
|
AttachmentDownloadJobType,
|
|
'digest' | 'contentType' | 'size' | 'source' | 'ciphertextSize'
|
|
>;
|
|
function insertNewJob(
|
|
db: WritableDB,
|
|
job: UnflattenedAttachmentDownloadJobType,
|
|
addMessageFirst: boolean = true
|
|
): void {
|
|
if (addMessageFirst) {
|
|
try {
|
|
db.prepare('INSERT INTO messages (id) VALUES ($id)').run({
|
|
id: job.messageId,
|
|
});
|
|
} catch (e) {
|
|
// pass; message has already been inserted
|
|
}
|
|
}
|
|
const [query, params] = sql`
|
|
INSERT INTO attachment_downloads
|
|
(
|
|
messageId,
|
|
attachmentType,
|
|
attachmentJson,
|
|
digest,
|
|
contentType,
|
|
size,
|
|
receivedAt,
|
|
sentAt,
|
|
active,
|
|
attempts,
|
|
retryAfter,
|
|
lastAttemptTimestamp
|
|
)
|
|
VALUES
|
|
(
|
|
${job.messageId},
|
|
${job.attachmentType},
|
|
${objectToJSON(job.attachment)},
|
|
${job.attachment.digest},
|
|
${job.attachment.contentType},
|
|
${job.attachment.size},
|
|
${job.receivedAt},
|
|
${job.sentAt},
|
|
${job.active ? 1 : 0},
|
|
${job.attempts},
|
|
${job.retryAfter},
|
|
${job.lastAttemptTimestamp}
|
|
);
|
|
`;
|
|
|
|
db.prepare(query).run(params);
|
|
}
|
|
|
|
describe('SQL/updateToSchemaVersion1040', () => {
|
|
describe('Storing of new attachment jobs', () => {
|
|
let db: WritableDB;
|
|
|
|
beforeEach(() => {
|
|
db = createDB();
|
|
updateToVersion(db, 1040);
|
|
});
|
|
|
|
afterEach(() => {
|
|
db.close();
|
|
});
|
|
|
|
it('allows storing of new backup attachment jobs', () => {
|
|
insertNewJob(db, {
|
|
messageId: 'message1',
|
|
attachmentType: 'attachment',
|
|
attachment: {
|
|
digest: 'digest1',
|
|
contentType: IMAGE_JPEG,
|
|
size: 128,
|
|
},
|
|
receivedAt: 1970,
|
|
sentAt: 2070,
|
|
active: false,
|
|
retryAfter: null,
|
|
attempts: 0,
|
|
lastAttemptTimestamp: null,
|
|
});
|
|
|
|
insertNewJob(db, {
|
|
messageId: 'message2',
|
|
attachmentType: 'attachment',
|
|
attachment: {
|
|
digest: 'digest2',
|
|
contentType: IMAGE_JPEG,
|
|
size: 128,
|
|
},
|
|
receivedAt: 1971,
|
|
sentAt: 2071,
|
|
active: false,
|
|
retryAfter: 1204,
|
|
attempts: 0,
|
|
lastAttemptTimestamp: 1004,
|
|
});
|
|
|
|
const attachments = getAttachmentDownloadJobs(db);
|
|
assert.strictEqual(attachments.length, 2);
|
|
assert.deepEqual(attachments, [
|
|
{
|
|
messageId: 'message2',
|
|
attachmentType: 'attachment',
|
|
digest: 'digest2',
|
|
contentType: IMAGE_JPEG,
|
|
size: 128,
|
|
receivedAt: 1971,
|
|
sentAt: 2071,
|
|
active: 0,
|
|
retryAfter: 1204,
|
|
attempts: 0,
|
|
lastAttemptTimestamp: 1004,
|
|
attachment: {
|
|
digest: 'digest2',
|
|
contentType: IMAGE_JPEG,
|
|
size: 128,
|
|
},
|
|
},
|
|
{
|
|
messageId: 'message1',
|
|
attachmentType: 'attachment',
|
|
digest: 'digest1',
|
|
contentType: IMAGE_JPEG,
|
|
size: 128,
|
|
receivedAt: 1970,
|
|
sentAt: 2070,
|
|
active: 0,
|
|
retryAfter: null,
|
|
attempts: 0,
|
|
lastAttemptTimestamp: null,
|
|
attachment: {
|
|
digest: 'digest1',
|
|
contentType: IMAGE_JPEG,
|
|
size: 128,
|
|
},
|
|
},
|
|
]);
|
|
});
|
|
|
|
it('Respects primary key constraint', () => {
|
|
const job: UnflattenedAttachmentDownloadJobType = {
|
|
messageId: 'message1',
|
|
attachmentType: 'attachment',
|
|
attachment: {
|
|
digest: 'digest1',
|
|
contentType: IMAGE_JPEG,
|
|
size: 128,
|
|
},
|
|
receivedAt: 1970,
|
|
sentAt: 2070,
|
|
active: false,
|
|
retryAfter: null,
|
|
attempts: 0,
|
|
lastAttemptTimestamp: null,
|
|
};
|
|
insertNewJob(db, job);
|
|
assert.throws(() => {
|
|
insertNewJob(db, { ...job, attempts: 1 });
|
|
});
|
|
|
|
const attachments = getAttachmentDownloadJobs(db);
|
|
assert.strictEqual(attachments.length, 1);
|
|
assert.strictEqual(attachments[0].attempts, 0);
|
|
});
|
|
|
|
it('uses indices searching for next job', () => {
|
|
const now = Date.now();
|
|
|
|
const job: UnflattenedAttachmentDownloadJobType = {
|
|
messageId: 'message1',
|
|
attachmentType: 'attachment',
|
|
attachment: {
|
|
digest: 'digest1',
|
|
contentType: IMAGE_JPEG,
|
|
size: 128,
|
|
},
|
|
receivedAt: 101,
|
|
sentAt: 101,
|
|
attempts: 0,
|
|
active: false,
|
|
retryAfter: null,
|
|
lastAttemptTimestamp: null,
|
|
};
|
|
insertNewJob(db, job);
|
|
insertNewJob(db, {
|
|
...job,
|
|
messageId: 'message2',
|
|
receivedAt: 102,
|
|
sentAt: 102,
|
|
retryAfter: now + 1,
|
|
lastAttemptTimestamp: now - 10,
|
|
});
|
|
insertNewJob(db, {
|
|
...job,
|
|
messageId: 'message3',
|
|
active: true,
|
|
receivedAt: 103,
|
|
sentAt: 103,
|
|
});
|
|
insertNewJob(db, {
|
|
...job,
|
|
messageId: 'message4',
|
|
attachmentType: 'contact',
|
|
receivedAt: 104,
|
|
sentAt: 104,
|
|
retryAfter: now,
|
|
lastAttemptTimestamp: now - 1000,
|
|
});
|
|
|
|
{
|
|
const [query, params] = sql`
|
|
SELECT * FROM attachment_downloads
|
|
WHERE
|
|
active = 0
|
|
AND
|
|
(retryAfter is NULL OR retryAfter <= ${now})
|
|
ORDER BY receivedAt DESC
|
|
LIMIT 5
|
|
`;
|
|
|
|
const result = db.prepare(query).all(params);
|
|
assert.strictEqual(result.length, 2);
|
|
assert.deepStrictEqual(
|
|
result.map(res => res.messageId),
|
|
['message4', 'message1']
|
|
);
|
|
|
|
const details = db
|
|
.prepare(`EXPLAIN QUERY PLAN ${query}`)
|
|
.all(params)
|
|
.map(step => step.detail)
|
|
.join(', ');
|
|
assert.include(
|
|
details,
|
|
'USING INDEX attachment_downloads_active_receivedAt'
|
|
);
|
|
assert.notInclude(details, 'TEMP B-TREE');
|
|
assert.notInclude(details, 'SCAN');
|
|
}
|
|
{
|
|
const messageIds = ['message1', 'message2', 'message4'];
|
|
const [query, params] = sql`
|
|
SELECT * FROM attachment_downloads
|
|
INDEXED BY attachment_downloads_active_messageId
|
|
WHERE
|
|
active = 0
|
|
AND
|
|
(lastAttemptTimestamp is NULL OR lastAttemptTimestamp <= ${now - 100})
|
|
AND
|
|
messageId IN (${sqlJoin(messageIds)})
|
|
ORDER BY receivedAt ASC
|
|
LIMIT 5
|
|
`;
|
|
|
|
const result = db.prepare(query).all(params);
|
|
assert.strictEqual(result.length, 2);
|
|
assert.deepStrictEqual(
|
|
result.map(res => res.messageId),
|
|
['message1', 'message4']
|
|
);
|
|
const details = db
|
|
.prepare(`EXPLAIN QUERY PLAN ${query}`)
|
|
.all(params)
|
|
.map(step => step.detail)
|
|
.join(', ');
|
|
|
|
// This query _will_ use a temp b-tree for ordering, but the number of rows
|
|
// should be quite low.
|
|
assert.include(
|
|
details,
|
|
'USING INDEX attachment_downloads_active_messageId'
|
|
);
|
|
}
|
|
});
|
|
|
|
it('respects foreign key constraint on messageId', () => {
|
|
const job: Omit<AttachmentDownloadJobType, 'source' | 'ciphertextSize'> =
|
|
{
|
|
messageId: 'message1',
|
|
attachmentType: 'attachment',
|
|
attachment: {
|
|
digest: 'digest1',
|
|
contentType: IMAGE_JPEG,
|
|
size: 128,
|
|
},
|
|
receivedAt: 1970,
|
|
digest: 'digest1',
|
|
contentType: IMAGE_JPEG,
|
|
size: 128,
|
|
sentAt: 2070,
|
|
active: false,
|
|
retryAfter: null,
|
|
attempts: 0,
|
|
lastAttemptTimestamp: null,
|
|
};
|
|
// throws if we don't add the message first
|
|
assert.throws(() => insertNewJob(db, job, false));
|
|
insertNewJob(db, job, true);
|
|
|
|
assert.strictEqual(getAttachmentDownloadJobs(db).length, 1);
|
|
|
|
// Deletes the job when the message is deleted
|
|
db.prepare('DELETE FROM messages WHERE id = $id').run({
|
|
id: job.messageId,
|
|
});
|
|
assert.strictEqual(getAttachmentDownloadJobs(db).length, 0);
|
|
});
|
|
});
|
|
|
|
describe('existing jobs are transferred', () => {
|
|
let db: WritableDB;
|
|
|
|
beforeEach(() => {
|
|
db = createDB();
|
|
updateToVersion(db, 1030);
|
|
});
|
|
|
|
afterEach(() => {
|
|
db.close();
|
|
});
|
|
|
|
it('existing rows are retained; invalid existing rows are removed', () => {
|
|
insertLegacyJob(db, {
|
|
id: 'id-1',
|
|
messageId: 'message-1',
|
|
timestamp: 1000,
|
|
attachment: {
|
|
size: 100,
|
|
contentType: 'image/png',
|
|
digest: 'digest1',
|
|
cdnKey: 'key1',
|
|
} as AttachmentType,
|
|
pending: 0,
|
|
index: 0,
|
|
type: 'attachment',
|
|
});
|
|
insertLegacyJob(db, {
|
|
id: 'invalid-1',
|
|
});
|
|
insertLegacyJob(db, {
|
|
id: 'id-2',
|
|
messageId: 'message-2',
|
|
timestamp: 1001,
|
|
attachment: {
|
|
size: 100,
|
|
contentType: 'image/jpeg',
|
|
digest: 'digest2',
|
|
cdnKey: 'key2',
|
|
} as AttachmentType,
|
|
pending: 1,
|
|
index: 2,
|
|
type: 'attachment',
|
|
attempts: 1,
|
|
});
|
|
insertLegacyJob(db, {
|
|
id: 'invalid-2',
|
|
timestamp: 1000,
|
|
attachment: { size: 100, contentType: 'image/jpeg' } as AttachmentType,
|
|
pending: 0,
|
|
index: 0,
|
|
type: 'attachment',
|
|
});
|
|
insertLegacyJob(db, {
|
|
id: 'invalid-3-no-content-type',
|
|
timestamp: 1000,
|
|
attachment: { size: 100 } as AttachmentType,
|
|
pending: 0,
|
|
index: 0,
|
|
type: 'attachment',
|
|
});
|
|
insertLegacyJob(db, {
|
|
id: 'duplicate-1',
|
|
messageId: 'message-1',
|
|
timestamp: 1000,
|
|
attachment: {
|
|
size: 100,
|
|
contentType: 'image/jpeg',
|
|
digest: 'digest1',
|
|
} as AttachmentType,
|
|
pending: 0,
|
|
index: 0,
|
|
type: 'attachment',
|
|
});
|
|
|
|
const legacyJobs = db.prepare('SELECT * FROM attachment_downloads').all();
|
|
assert.strictEqual(legacyJobs.length, 6);
|
|
|
|
updateToVersion(db, 1040);
|
|
|
|
const newJobs = getAttachmentDownloadJobs(db);
|
|
assert.strictEqual(newJobs.length, 2);
|
|
assert.deepEqual(newJobs[1], {
|
|
messageId: 'message-1',
|
|
receivedAt: 1000,
|
|
sentAt: 1000,
|
|
attachment: {
|
|
size: 100,
|
|
contentType: 'image/png',
|
|
digest: 'digest1',
|
|
cdnKey: 'key1',
|
|
},
|
|
size: 100,
|
|
contentType: 'image/png',
|
|
digest: 'digest1',
|
|
active: 0,
|
|
attempts: 0,
|
|
attachmentType: 'attachment',
|
|
lastAttemptTimestamp: null,
|
|
retryAfter: null,
|
|
});
|
|
assert.deepEqual(newJobs[0], {
|
|
messageId: 'message-2',
|
|
receivedAt: 1001,
|
|
sentAt: 1001,
|
|
attachment: {
|
|
size: 100,
|
|
contentType: 'image/jpeg',
|
|
digest: 'digest2',
|
|
cdnKey: 'key2',
|
|
},
|
|
size: 100,
|
|
contentType: 'image/jpeg',
|
|
digest: 'digest2',
|
|
active: 0,
|
|
attempts: 1,
|
|
attachmentType: 'attachment',
|
|
lastAttemptTimestamp: null,
|
|
retryAfter: null,
|
|
});
|
|
});
|
|
});
|
|
});
|
|
|
|
function insertLegacyJob(
|
|
db: WritableDB,
|
|
job: Partial<LegacyAttachmentDownloadJobType>
|
|
): void {
|
|
db.prepare('INSERT OR REPLACE INTO messages (id) VALUES ($id)').run({
|
|
id: job.messageId,
|
|
});
|
|
const [query, params] = sql`
|
|
INSERT INTO attachment_downloads
|
|
(id, timestamp, pending, json)
|
|
VALUES
|
|
(
|
|
${job.id},
|
|
${job.timestamp},
|
|
${job.pending},
|
|
${objectToJSON(job)}
|
|
);
|
|
`;
|
|
|
|
db.prepare(query).run(params);
|
|
}
|