Also use sent_at whenever we query database with received_at

This commit is contained in:
Scott Nonnenberg 2021-01-13 08:32:18 -08:00
parent f32a0b537d
commit 9f81b4157b
8 changed files with 181 additions and 134 deletions

View file

@ -673,6 +673,7 @@ async function exportConversation(conversation, options = {}) {
// We're looping from the most recent to the oldest
let lastReceivedAt = Number.MAX_VALUE;
let lastSentAt = Number.MAX_VALUE;
while (!complete) {
// eslint-disable-next-line no-await-in-loop
@ -681,6 +682,7 @@ async function exportConversation(conversation, options = {}) {
{
limit: CHUNK_SIZE,
receivedAt: lastReceivedAt,
sentAt: lastSentAt,
MessageCollection: Whisper.MessageCollection,
}
);
@ -771,6 +773,7 @@ async function exportConversation(conversation, options = {}) {
const last = messages.length > 0 ? messages[messages.length - 1] : null;
if (last) {
lastReceivedAt = last.received_at;
lastSentAt = last.sent_at;
}
if (messages.length < CHUNK_SIZE) {

View file

@ -457,6 +457,7 @@ describe('Backup', () => {
body: 'Totally!',
source: OUR_NUMBER,
received_at: 1524185933350,
sent_at: 1524185933350,
timestamp: 1524185933350,
errors: [],
attachments: [

View file

@ -1314,6 +1314,7 @@ export class ConversationModel extends window.Backbone.Model<
MessageCollection: window.Whisper.MessageCollection,
limit: 100,
receivedAt: first ? first.get('received_at') : undefined,
sentAt: first ? first.get('sent_at') : undefined,
messageId: first ? first.id : undefined,
}
);

View file

@ -983,11 +983,13 @@ async function getOlderMessagesByConversation(
{
limit = 100,
receivedAt = Number.MAX_VALUE,
sentAt = Number.MAX_VALUE,
messageId,
MessageCollection,
}: {
limit?: number;
receivedAt?: number;
sentAt?: number;
messageId?: string;
MessageCollection: typeof MessageModelCollectionType;
}
@ -997,6 +999,7 @@ async function getOlderMessagesByConversation(
{
limit,
receivedAt,
sentAt,
messageId,
}
);
@ -1008,10 +1011,12 @@ async function getNewerMessagesByConversation(
{
limit = 100,
receivedAt = 0,
sentAt = 0,
MessageCollection,
}: {
limit?: number;
receivedAt?: number;
sentAt?: number;
MessageCollection: typeof MessageModelCollectionType;
}
) {
@ -1020,6 +1025,7 @@ async function getNewerMessagesByConversation(
{
limit,
receivedAt,
sentAt,
}
);

View file

@ -217,12 +217,13 @@ export type ServerInterface = DataInterface & {
options?: {
limit?: number;
receivedAt?: number;
sentAt?: number;
messageId?: string;
}
) => Promise<Array<MessageTypeUnhydrated>>;
getNewerMessagesByConversation: (
conversationId: string,
options?: { limit?: number; receivedAt?: number }
options?: { limit?: number; receivedAt?: number; sentAt?: number }
) => Promise<Array<MessageTypeUnhydrated>>;
getLastConversationActivity: (
conversationId: string
@ -309,6 +310,7 @@ export type ClientInterface = DataInterface & {
limit?: number;
messageId?: string;
receivedAt?: number;
sentAt?: number;
MessageCollection: typeof MessageModelCollectionType;
}
) => Promise<MessageModelCollectionType>;
@ -317,6 +319,7 @@ export type ClientInterface = DataInterface & {
options: {
limit?: number;
receivedAt?: number;
sentAt?: number;
MessageCollection: typeof MessageModelCollectionType;
}
) => Promise<MessageModelCollectionType>;

View file

@ -454,16 +454,16 @@ async function updateToSchemaVersion1(
);`);
await instance.run(`CREATE TABLE unprocessed(
id STRING,
timestamp INTEGER,
json TEXT
);`);
id STRING,
timestamp INTEGER,
json TEXT
);`);
await instance.run(`CREATE INDEX unprocessed_id ON unprocessed (
id
);`);
id
);`);
await instance.run(`CREATE INDEX unprocessed_timestamp ON unprocessed (
timestamp
);`);
timestamp
);`);
await instance.run('PRAGMA user_version = 1;');
await instance.run('COMMIT TRANSACTION;');
@ -694,19 +694,19 @@ async function updateToSchemaVersion7(
await instance.run(
`CREATE TABLE sessions(
id TEXT PRIMARY KEY,
number TEXT,
json TEXT
);`
id TEXT PRIMARY KEY,
number TEXT,
json TEXT
);`
);
await instance.run(`CREATE INDEX sessions_number ON sessions (
number
) WHERE number IS NOT NULL;`);
number
) WHERE number IS NOT NULL;`);
await instance.run(`INSERT INTO sessions(id, number, json)
SELECT "+" || id, number, json FROM sessions_old;
`);
SELECT "+" || id, number, json FROM sessions_old;
`);
await instance.run('DROP TABLE sessions_old;');
@ -741,43 +741,43 @@ async function updateToSchemaVersion8(
// Then we create our full-text search table and populate it
await instance.run(`
CREATE VIRTUAL TABLE messages_fts
USING fts5(id UNINDEXED, body);
`);
CREATE VIRTUAL TABLE messages_fts
USING fts5(id UNINDEXED, body);
`);
await instance.run(`
INSERT INTO messages_fts(id, body)
SELECT id, body FROM messages;
`);
INSERT INTO messages_fts(id, body)
SELECT id, body FROM messages;
`);
// Then we set up triggers to keep the full-text search table up to date
await instance.run(`
CREATE TRIGGER messages_on_insert AFTER INSERT ON messages BEGIN
INSERT INTO messages_fts (
id,
body
) VALUES (
new.id,
new.body
);
END;
`);
CREATE TRIGGER messages_on_insert AFTER INSERT ON messages BEGIN
INSERT INTO messages_fts (
id,
body
) VALUES (
new.id,
new.body
);
END;
`);
await instance.run(`
CREATE TRIGGER messages_on_delete AFTER DELETE ON messages BEGIN
DELETE FROM messages_fts WHERE id = old.id;
END;
`);
CREATE TRIGGER messages_on_delete AFTER DELETE ON messages BEGIN
DELETE FROM messages_fts WHERE id = old.id;
END;
`);
await instance.run(`
CREATE TRIGGER messages_on_update AFTER UPDATE ON messages BEGIN
DELETE FROM messages_fts WHERE id = old.id;
INSERT INTO messages_fts(
id,
body
) VALUES (
new.id,
new.body
);
END;
`);
CREATE TRIGGER messages_on_update AFTER UPDATE ON messages BEGIN
DELETE FROM messages_fts WHERE id = old.id;
INSERT INTO messages_fts(
id,
body
) VALUES (
new.id,
new.body
);
END;
`);
// For formatting search results:
// https://sqlite.org/fts5.html#the_highlight_function
@ -804,20 +804,20 @@ async function updateToSchemaVersion9(
try {
await instance.run(`CREATE TABLE attachment_downloads(
id STRING primary key,
timestamp INTEGER,
pending INTEGER,
json TEXT
);`);
id STRING primary key,
timestamp INTEGER,
pending INTEGER,
json TEXT
);`);
await instance.run(`CREATE INDEX attachment_downloads_timestamp
ON attachment_downloads (
timestamp
) WHERE pending = 0;`);
ON attachment_downloads (
timestamp
) WHERE pending = 0;`);
await instance.run(`CREATE INDEX attachment_downloads_pending
ON attachment_downloads (
pending
) WHERE pending != 0;`);
ON attachment_downloads (
pending
) WHERE pending != 0;`);
await instance.run('PRAGMA user_version = 9;');
await instance.run('COMMIT TRANSACTION;');
@ -844,46 +844,46 @@ async function updateToSchemaVersion10(
await instance.run('ALTER TABLE unprocessed RENAME TO unprocessed_old;');
await instance.run(`CREATE TABLE unprocessed(
id STRING,
timestamp INTEGER,
version INTEGER,
attempts INTEGER,
envelope TEXT,
decrypted TEXT,
source TEXT,
sourceDevice TEXT,
serverTimestamp INTEGER
);`);
id STRING,
timestamp INTEGER,
version INTEGER,
attempts INTEGER,
envelope TEXT,
decrypted TEXT,
source TEXT,
sourceDevice TEXT,
serverTimestamp INTEGER
);`);
await instance.run(`CREATE INDEX unprocessed_id ON unprocessed (
id
);`);
id
);`);
await instance.run(`CREATE INDEX unprocessed_timestamp ON unprocessed (
timestamp
);`);
timestamp
);`);
await instance.run(`INSERT INTO unprocessed (
id,
timestamp,
version,
attempts,
envelope,
decrypted,
source,
sourceDevice,
serverTimestamp
) SELECT
id,
timestamp,
json_extract(json, '$.version'),
json_extract(json, '$.attempts'),
json_extract(json, '$.envelope'),
json_extract(json, '$.decrypted'),
json_extract(json, '$.source'),
json_extract(json, '$.sourceDevice'),
json_extract(json, '$.serverTimestamp')
FROM unprocessed_old;
`);
id,
timestamp,
version,
attempts,
envelope,
decrypted,
source,
sourceDevice,
serverTimestamp
) SELECT
id,
timestamp,
json_extract(json, '$.version'),
json_extract(json, '$.attempts'),
json_extract(json, '$.envelope'),
json_extract(json, '$.decrypted'),
json_extract(json, '$.source'),
json_extract(json, '$.sourceDevice'),
json_extract(json, '$.serverTimestamp')
FROM unprocessed_old;
`);
await instance.run('DROP TABLE unprocessed_old;');
@ -1024,14 +1024,14 @@ async function updateToSchemaVersion14(
try {
await instance.run(`CREATE TABLE emojis(
shortName STRING PRIMARY KEY,
lastUsage INTEGER
);`);
shortName STRING PRIMARY KEY,
lastUsage INTEGER
);`);
await instance.run(`CREATE INDEX emojis_lastUsage
ON emojis (
lastUsage
);`);
ON emojis (
lastUsage
);`);
await instance.run('PRAGMA user_version = 14;');
await instance.run('COMMIT TRANSACTION;');
@ -2363,7 +2363,7 @@ async function searchMessages(
INNER JOIN messages on messages_fts.id = messages.id
WHERE
messages_fts match $query
ORDER BY messages.received_at DESC
ORDER BY messages.received_at DESC, messages.sent_at DESC
LIMIT $limit;`,
{
$query: query,
@ -2392,7 +2392,7 @@ async function searchMessagesInConversation(
WHERE
messages_fts match $query AND
messages.conversationId = $conversationId
ORDER BY messages.received_at DESC
ORDER BY messages.received_at DESC, messages.sent_at DESC
LIMIT $limit;`,
{
$query: query,
@ -2662,7 +2662,7 @@ async function getUnreadByConversation(conversationId: string) {
`SELECT json FROM messages WHERE
unread = $unread AND
conversationId = $conversationId
ORDER BY received_at DESC;`,
ORDER BY received_at DESC, sent_at DESC;`,
{
$unread: 1,
$conversationId: conversationId,
@ -2677,10 +2677,12 @@ async function getOlderMessagesByConversation(
{
limit = 100,
receivedAt = Number.MAX_VALUE,
sentAt = Number.MAX_VALUE,
messageId,
}: {
limit?: number;
receivedAt?: number;
sentAt?: number;
messageId?: string;
} = {}
) {
@ -2691,13 +2693,17 @@ async function getOlderMessagesByConversation(
rows = await db.all(
`SELECT json FROM messages WHERE
conversationId = $conversationId AND
received_at <= $received_at AND
id != $messageId
ORDER BY received_at DESC
id != $messageId AND
(
(received_at = $received_at AND sent_at < $sent_at) OR
received_at < $received_at
)
ORDER BY received_at DESC, sent_at DESC
LIMIT $limit;`,
{
$conversationId: conversationId,
$received_at: receivedAt,
$sent_at: sentAt,
$limit: limit,
$messageId: messageId,
}
@ -2706,12 +2712,16 @@ async function getOlderMessagesByConversation(
rows = await db.all(
`SELECT json FROM messages WHERE
conversationId = $conversationId AND
received_at < $received_at
ORDER BY received_at DESC
(
(received_at = $received_at AND sent_at < $sent_at) OR
received_at < $received_at
)
ORDER BY received_at DESC, sent_at DESC
LIMIT $limit;`,
{
$conversationId: conversationId,
$received_at: receivedAt,
$sent_at: sentAt,
$limit: limit,
}
);
@ -2722,18 +2732,26 @@ async function getOlderMessagesByConversation(
async function getNewerMessagesByConversation(
conversationId: string,
{ limit = 100, receivedAt = 0 }: { limit?: number; receivedAt?: number } = {}
{
limit = 100,
receivedAt = 0,
sentAt = 0,
}: { limit?: number; receivedAt?: number; sentAt?: number } = {}
) {
const db = getInstance();
const rows = await db.all(
`SELECT json FROM messages WHERE
conversationId = $conversationId AND
received_at > $received_at
ORDER BY received_at ASC
(
(received_at = $received_at AND sent_at > $sent_at) OR
received_at > $received_at
)
ORDER BY received_at ASC, sent_at ASC
LIMIT $limit;`,
{
$conversationId: conversationId,
$received_at: receivedAt,
$sent_at: sentAt,
$limit: limit,
}
);
@ -2745,7 +2763,7 @@ async function getOldestMessageForConversation(conversationId: string) {
const row = await db.get(
`SELECT * FROM messages WHERE
conversationId = $conversationId
ORDER BY received_at ASC
ORDER BY received_at ASC, sent_at ASC
LIMIT 1;`,
{
$conversationId: conversationId,
@ -2763,7 +2781,7 @@ async function getNewestMessageForConversation(conversationId: string) {
const row = await db.get(
`SELECT * FROM messages WHERE
conversationId = $conversationId
ORDER BY received_at DESC
ORDER BY received_at DESC, sent_at DESC
LIMIT 1;`,
{
$conversationId: conversationId,
@ -2786,7 +2804,7 @@ async function getLastConversationActivity(
conversationId = $conversationId AND
(type IS NULL OR type NOT IN ('profile-change', 'verified-change', 'message-history-unsynced', 'keychange', 'group-v1-migration')) AND
(json_extract(json, '$.expirationTimerUpdate.fromSync') IS NULL OR json_extract(json, '$.expirationTimerUpdate.fromSync') != 1)
ORDER BY received_at DESC
ORDER BY received_at DESC, sent_at DESC
LIMIT 1;`,
{
$conversationId: conversationId,
@ -2807,7 +2825,7 @@ async function getLastConversationPreview(
`SELECT * FROM messages WHERE
conversationId = $conversationId AND
(type IS NULL OR type NOT IN ('profile-change', 'verified-change', 'message-history-unsynced', 'group-v1-migration'))
ORDER BY received_at DESC
ORDER BY received_at DESC, sent_at DESC
LIMIT 1;`,
{
$conversationId: conversationId,
@ -2826,7 +2844,7 @@ async function getOldestUnreadMessageForConversation(conversationId: string) {
`SELECT * FROM messages WHERE
conversationId = $conversationId AND
unread = 1
ORDER BY received_at ASC
ORDER BY received_at ASC, sent_at ASC
LIMIT 1;`,
{
$conversationId: conversationId,
@ -2870,10 +2888,10 @@ async function getMessageMetricsForConversation(conversationId: string) {
const [oldest, newest, oldestUnread, totalUnread] = results;
return {
oldest: oldest ? pick(oldest, ['received_at', 'id']) : null,
newest: newest ? pick(newest, ['received_at', 'id']) : null,
oldest: oldest ? pick(oldest, ['received_at', 'sent_at', 'id']) : null,
newest: newest ? pick(newest, ['received_at', 'sent_at', 'id']) : null,
oldestUnread: oldestUnread
? pick(oldestUnread, ['received_at', 'id'])
? pick(oldestUnread, ['received_at', 'sent_at', 'id'])
: null,
totalUnread,
};
@ -2932,7 +2950,7 @@ async function getMessagesBySentAt(sentAt: number) {
const rows = await db.all(
`SELECT * FROM messages
WHERE sent_at = $sent_at
ORDER BY received_at DESC;`,
ORDER BY received_at DESC, sent_at DESC;`,
{
$sent_at: sentAt,
}
@ -2998,7 +3016,7 @@ async function getNextTapToViewMessageToAgeOut() {
WHERE
isViewOnce = 1
AND (isErased IS NULL OR isErased != 1)
ORDER BY received_at ASC
ORDER BY received_at ASC, sent_at ASC
LIMIT 1;
`);
@ -3019,7 +3037,7 @@ async function getTapToViewMessagesNeedingErase() {
isViewOnce = 1
AND (isErased IS NULL OR isErased != 1)
AND received_at <= $THIRTY_DAYS_AGO
ORDER BY received_at ASC;`,
ORDER BY received_at ASC, sent_at ASC;`,
{
$THIRTY_DAYS_AGO: THIRTY_DAYS_AGO,
}
@ -3824,7 +3842,7 @@ async function getMessagesWithVisualMediaAttachments(
`SELECT json FROM messages WHERE
conversationId = $conversationId AND
hasVisualMediaAttachments = 1
ORDER BY received_at DESC
ORDER BY received_at DESC, sent_at DESC
LIMIT $limit;`,
{
$conversationId: conversationId,
@ -3844,7 +3862,7 @@ async function getMessagesWithFileAttachments(
`SELECT json FROM messages WHERE
conversationId = $conversationId AND
hasFileAttachments = 1
ORDER BY received_at DESC
ORDER BY received_at DESC, sent_at DESC
LIMIT $limit;`,
{
$conversationId: conversationId,

View file

@ -160,6 +160,7 @@ export type MessageType = {
type MessagePointerType = {
id: string;
received_at: number;
sent_at?: number;
};
type MessageMetricsType = {
newest?: MessagePointerType;
@ -1011,7 +1012,7 @@ export function reducer(
if (messages.length > 0) {
const first = messages[0];
if (first && (!oldest || first.received_at <= oldest.received_at)) {
oldest = pick(first, ['id', 'received_at']);
oldest = pick(first, ['id', 'received_at', 'sent_at']);
}
const last = messages[messages.length - 1];
@ -1019,7 +1020,7 @@ export function reducer(
last &&
(!newest || unboundedFetch || last.received_at >= newest.received_at)
) {
newest = pick(last, ['id', 'received_at']);
newest = pick(last, ['id', 'received_at', 'sent_at']);
}
}
@ -1172,12 +1173,14 @@ export function reducer(
if (oldest && oldest.id === firstId && firstId === id) {
const second = messagesLookup[oldIds[1]];
oldest = second ? pick(second, ['id', 'received_at']) : undefined;
oldest = second
? pick(second, ['id', 'received_at', 'sent_at'])
: undefined;
}
if (newest && newest.id === lastId && lastId === id) {
const penultimate = messagesLookup[oldIds[oldIds.length - 2]];
newest = penultimate
? pick(penultimate, ['id', 'received_at'])
? pick(penultimate, ['id', 'received_at', 'sent_at'])
: undefined;
}
}
@ -1231,7 +1234,9 @@ export function reducer(
? messageIds[messageIds.length - 1]
: undefined;
const last = lastId ? getOwn(messagesLookup, lastId) : undefined;
const newest = last ? pick(last, ['id', 'received_at']) : undefined;
const newest = last
? pick(last, ['id', 'received_at', 'sent_at'])
: undefined;
return {
...state,
@ -1260,7 +1265,9 @@ export function reducer(
const { messageIds } = existingConversation;
const firstId = messageIds && messageIds.length ? messageIds[0] : undefined;
const first = firstId ? getOwn(messagesLookup, firstId) : undefined;
const oldest = first ? pick(first, ['id', 'received_at']) : undefined;
const oldest = first
? pick(first, ['id', 'received_at', 'sent_at'])
: undefined;
return {
...state,
@ -1315,10 +1322,10 @@ export function reducer(
const last = sorted[sorted.length - 1];
if (!newest) {
newest = pick(first, ['id', 'received_at']);
newest = pick(first, ['id', 'received_at', 'sent_at']);
}
if (!oldest) {
oldest = pick(last, ['id', 'received_at']);
oldest = pick(last, ['id', 'received_at', 'sent_at']);
}
const existingTotal = existingConversation.messageIds.length;
@ -1336,10 +1343,10 @@ export function reducer(
// Update oldest and newest if we receive older/newer
// messages (or duplicated timestamps!)
if (first && oldest && first.received_at <= oldest.received_at) {
oldest = pick(first, ['id', 'received_at']);
oldest = pick(first, ['id', 'received_at', 'sent_at']);
}
if (last && newest && last.received_at >= newest.received_at) {
newest = pick(last, ['id', 'received_at']);
newest = pick(last, ['id', 'received_at', 'sent_at']);
}
const newIds = messages.map(message => message.id);
@ -1357,6 +1364,7 @@ export function reducer(
oldestUnread = pick(lookup[oldestId], [
'id',
'received_at',
'sent_at',
]) as MessagePointerType;
}
}

View file

@ -840,8 +840,10 @@ Whisper.ConversationView = Whisper.View.extend({
}
const receivedAt = message.get('received_at');
const sentAt = message.get('sent_at');
const models = await getOlderMessagesByConversation(conversationId, {
receivedAt,
sentAt,
messageId: oldestMessageId,
limit: 500,
MessageCollection: Whisper.MessageCollection,
@ -894,8 +896,10 @@ Whisper.ConversationView = Whisper.View.extend({
}
const receivedAt = message.get('received_at');
const sentAt = message.get('sent_at');
const models = await getNewerMessagesByConversation(this.model.id, {
receivedAt,
sentAt,
limit: 500,
MessageCollection: Whisper.MessageCollection,
});
@ -1075,15 +1079,18 @@ Whisper.ConversationView = Whisper.View.extend({
}
const receivedAt = message.get('received_at');
const sentAt = message.get('sent_at');
const older = await getOlderMessagesByConversation(conversationId, {
limit: 250,
receivedAt,
sentAt,
messageId,
MessageCollection: Whisper.MessageCollection,
});
const newer = await getNewerMessagesByConversation(conversationId, {
limit: 250,
receivedAt,
sentAt,
MessageCollection: Whisper.MessageCollection,
});
const metrics = await getMessageMetricsForConversation(conversationId);