Handle messages with the same received_at

This commit is contained in:
Scott Nonnenberg 2020-07-06 10:06:44 -07:00
parent 82bf517a69
commit e536929e35
5 changed files with 60 additions and 19 deletions

View file

@ -567,6 +567,7 @@
const receivedAt = message.get('received_at'); const receivedAt = message.get('received_at');
const models = await getOlderMessagesByConversation(conversationId, { const models = await getOlderMessagesByConversation(conversationId, {
receivedAt, receivedAt,
messageId: oldestMessageId,
limit: 500, limit: 500,
MessageCollection: Whisper.MessageCollection, MessageCollection: Whisper.MessageCollection,
}); });
@ -796,6 +797,7 @@
const older = await getOlderMessagesByConversation(conversationId, { const older = await getOlderMessagesByConversation(conversationId, {
limit: 250, limit: 250,
receivedAt, receivedAt,
messageId,
MessageCollection: Whisper.MessageCollection, MessageCollection: Whisper.MessageCollection,
}); });
const newer = await getNewerMessagesByConversation(conversationId, { const newer = await getNewerMessagesByConversation(conversationId, {
@ -874,11 +876,18 @@
const scrollToMessageId = const scrollToMessageId =
setFocus && metrics.newest ? metrics.newest.id : undefined; setFocus && metrics.newest ? metrics.newest.id : undefined;
// Because our `getOlderMessages` fetch above didn't specify a receivedAt, we got
// the most recent 500 messages in the conversation. If it has a conflict with
// metrics, fetched a bit before, that's likely a race condition. So we tell our
// reducer to trust the message set we just fetched for determining if we have
// the newest message loaded.
const unboundedFetch = true;
messagesReset( messagesReset(
conversationId, conversationId,
cleaned.map(model => model.getReduxData()), cleaned.map(model => model.getReduxData()),
metrics, metrics,
scrollToMessageId scrollToMessageId,
unboundedFetch
); );
} catch (error) { } catch (error) {
setMessagesLoading(conversationId, false); setMessagesLoading(conversationId, false);
@ -1735,12 +1744,8 @@
window.log.warn(`onOpened: Did not find message ${messageId}`); window.log.warn(`onOpened: Did not find message ${messageId}`);
} }
// Incoming messages may still be processing, so we wait until those are
// complete to pull the 500 most-recent messages in this conversation.
this.model.queueJob(() => {
this.loadNewestMessages(); this.loadNewestMessages();
this.model.updateLastMessage(); this.model.updateLastMessage();
});
this.focusMessageField(); this.focusMessageField();

View file

@ -969,10 +969,12 @@ async function getOlderMessagesByConversation(
{ {
limit = 100, limit = 100,
receivedAt = Number.MAX_VALUE, receivedAt = Number.MAX_VALUE,
messageId,
MessageCollection, MessageCollection,
}: { }: {
limit?: number; limit?: number;
receivedAt?: number; receivedAt?: number;
messageId?: string;
MessageCollection: BackboneMessageCollectionType; MessageCollection: BackboneMessageCollectionType;
} }
) { ) {
@ -981,6 +983,7 @@ async function getOlderMessagesByConversation(
{ {
limit, limit,
receivedAt, receivedAt,
messageId,
} }
); );

View file

@ -197,7 +197,7 @@ export type ServerInterface = DataInterface & {
getMessagesBySentAt: (sentAt: number) => Promise<Array<MessageType>>; getMessagesBySentAt: (sentAt: number) => Promise<Array<MessageType>>;
getOlderMessagesByConversation: ( getOlderMessagesByConversation: (
conversationId: string, conversationId: string,
options?: { limit?: number; receivedAt?: number } options?: { limit?: number; receivedAt?: number; messageId?: string }
) => Promise<Array<MessageTypeUnhydrated>>; ) => Promise<Array<MessageTypeUnhydrated>>;
getNewerMessagesByConversation: ( getNewerMessagesByConversation: (
conversationId: string, conversationId: string,

View file

@ -2598,10 +2598,33 @@ async function getOlderMessagesByConversation(
{ {
limit = 100, limit = 100,
receivedAt = Number.MAX_VALUE, receivedAt = Number.MAX_VALUE,
}: { limit?: number; receivedAt?: number } = {} messageId,
}: { limit?: number; receivedAt?: number; messageId?: string } = {}
) { ) {
if (receivedAt !== Number.MAX_VALUE && !messageId) {
throw new Error('If receivedAt is supplied, messageId should be as well');
}
const db = getInstance(); const db = getInstance();
const rows = await db.all( let rows;
if (messageId) {
rows = await db.all(
`SELECT json FROM messages WHERE
conversationId = $conversationId AND
received_at <= $received_at AND
id != $messageId
ORDER BY received_at DESC
LIMIT $limit;`,
{
$conversationId: conversationId,
$received_at: receivedAt,
$limit: limit,
$messageId: messageId,
}
);
} else {
rows = await db.all(
`SELECT json FROM messages WHERE `SELECT json FROM messages WHERE
conversationId = $conversationId AND conversationId = $conversationId AND
received_at < $received_at received_at < $received_at
@ -2613,6 +2636,7 @@ async function getOlderMessagesByConversation(
$limit: limit, $limit: limit,
} }
); );
}
return rows.reverse(); return rows.reverse();
} }

View file

@ -209,6 +209,9 @@ export type MessagesResetActionType = {
messages: Array<MessageType>; messages: Array<MessageType>;
metrics: MessageMetricsType; metrics: MessageMetricsType;
scrollToMessageId?: string; scrollToMessageId?: string;
// The set of provided messages should be trusted, even if it conflicts with metrics,
// because we weren't looking for a specific time window of messages with our query.
unboundedFetch: boolean;
}; };
}; };
export type SetMessagesLoadingActionType = { export type SetMessagesLoadingActionType = {
@ -424,11 +427,13 @@ function messagesReset(
conversationId: string, conversationId: string,
messages: Array<MessageType>, messages: Array<MessageType>,
metrics: MessageMetricsType, metrics: MessageMetricsType,
scrollToMessageId?: string scrollToMessageId?: string,
unboundedFetch?: boolean
): MessagesResetActionType { ): MessagesResetActionType {
return { return {
type: 'MESSAGES_RESET', type: 'MESSAGES_RESET',
payload: { payload: {
unboundedFetch: Boolean(unboundedFetch),
conversationId, conversationId,
messages, messages,
metrics, metrics,
@ -784,6 +789,7 @@ export function reducer(
messages, messages,
metrics, metrics,
scrollToMessageId, scrollToMessageId,
unboundedFetch,
} = action.payload; } = action.payload;
const { messagesByConversation, messagesLookup } = state; const { messagesByConversation, messagesLookup } = state;
@ -807,7 +813,10 @@ export function reducer(
} }
const last = messages[messages.length - 1]; const last = messages[messages.length - 1];
if (last && (!newest || last.received_at >= newest.received_at)) { if (
last &&
(!newest || unboundedFetch || last.received_at >= newest.received_at)
) {
newest = pick(last, ['id', 'received_at']); newest = pick(last, ['id', 'received_at']);
} }
} }