Fix ConversationController
load race condition
This commit is contained in:
parent
d6ffb08a63
commit
8256170066
8 changed files with 127 additions and 106 deletions
|
@ -114,19 +114,11 @@ export function start(): void {
|
|||
}
|
||||
|
||||
export class ConversationController {
|
||||
private _initialFetchComplete: boolean | undefined;
|
||||
private _initialFetchComplete = false;
|
||||
|
||||
private _initialPromise: Promise<void> = Promise.resolve();
|
||||
private _initialPromise: undefined | Promise<void>;
|
||||
|
||||
private _conversations: ConversationModelCollectionType;
|
||||
|
||||
constructor(conversations?: ConversationModelCollectionType) {
|
||||
if (!conversations) {
|
||||
throw new Error('ConversationController: need conversation collection!');
|
||||
}
|
||||
|
||||
this._conversations = conversations;
|
||||
}
|
||||
constructor(private _conversations: ConversationModelCollectionType) {}
|
||||
|
||||
get(id?: string | null): ConversationModel | undefined {
|
||||
if (!this._initialFetchComplete) {
|
||||
|
@ -255,7 +247,7 @@ export class ConversationController {
|
|||
type: ConversationAttributesTypeType,
|
||||
additionalInitialProps = {}
|
||||
): Promise<ConversationModel> {
|
||||
await this._initialPromise;
|
||||
await this.load();
|
||||
const conversation = this.getOrCreate(id, type, additionalInitialProps);
|
||||
|
||||
if (conversation) {
|
||||
|
@ -756,109 +748,100 @@ export class ConversationController {
|
|||
);
|
||||
}
|
||||
|
||||
async loadPromise(): Promise<void> {
|
||||
return this._initialPromise;
|
||||
}
|
||||
|
||||
reset(): void {
|
||||
this._initialPromise = Promise.resolve();
|
||||
delete this._initialPromise;
|
||||
this._initialFetchComplete = false;
|
||||
this._conversations.reset([]);
|
||||
}
|
||||
|
||||
isFetchComplete(): boolean | undefined {
|
||||
return this._initialFetchComplete;
|
||||
load(): Promise<void> {
|
||||
this._initialPromise ||= this.doLoad();
|
||||
return this._initialPromise;
|
||||
}
|
||||
|
||||
async load(): Promise<void> {
|
||||
private async doLoad(): Promise<void> {
|
||||
log.info('ConversationController: starting initial fetch');
|
||||
|
||||
if (this._conversations.length) {
|
||||
throw new Error('ConversationController: Already loaded!');
|
||||
}
|
||||
|
||||
const load = async () => {
|
||||
try {
|
||||
const collection = await getAllConversations({
|
||||
ConversationCollection: window.Whisper.ConversationCollection,
|
||||
});
|
||||
try {
|
||||
const collection = await getAllConversations({
|
||||
ConversationCollection: window.Whisper.ConversationCollection,
|
||||
});
|
||||
|
||||
// Get rid of temporary conversations
|
||||
const temporaryConversations = collection.filter(conversation =>
|
||||
Boolean(conversation.get('isTemporary'))
|
||||
// Get rid of temporary conversations
|
||||
const temporaryConversations = collection.filter(conversation =>
|
||||
Boolean(conversation.get('isTemporary'))
|
||||
);
|
||||
|
||||
if (temporaryConversations.length) {
|
||||
log.warn(
|
||||
`ConversationController: Removing ${temporaryConversations.length} temporary conversations`
|
||||
);
|
||||
|
||||
if (temporaryConversations.length) {
|
||||
log.warn(
|
||||
`ConversationController: Removing ${temporaryConversations.length} temporary conversations`
|
||||
);
|
||||
}
|
||||
const queue = new PQueue({ concurrency: 3, timeout: 1000 * 60 * 2 });
|
||||
queue.addAll(
|
||||
temporaryConversations.map(item => async () => {
|
||||
await removeConversation(item.id, {
|
||||
Conversation: window.Whisper.Conversation,
|
||||
});
|
||||
})
|
||||
);
|
||||
await queue.onIdle();
|
||||
|
||||
// Hydrate the final set of conversations
|
||||
this._conversations.add(
|
||||
collection.filter(conversation => !conversation.get('isTemporary'))
|
||||
);
|
||||
|
||||
this._initialFetchComplete = true;
|
||||
|
||||
await Promise.all(
|
||||
this._conversations.map(async conversation => {
|
||||
try {
|
||||
// Hydrate contactCollection, now that initial fetch is complete
|
||||
conversation.fetchContacts();
|
||||
|
||||
const isChanged = maybeDeriveGroupV2Id(conversation);
|
||||
if (isChanged) {
|
||||
updateConversation(conversation.attributes);
|
||||
}
|
||||
|
||||
// In case a too-large draft was saved to the database
|
||||
const draft = conversation.get('draft');
|
||||
if (draft && draft.length > MAX_MESSAGE_BODY_LENGTH) {
|
||||
conversation.set({
|
||||
draft: draft.slice(0, MAX_MESSAGE_BODY_LENGTH),
|
||||
});
|
||||
updateConversation(conversation.attributes);
|
||||
}
|
||||
|
||||
// Clean up the conversations that have UUID as their e164.
|
||||
const e164 = conversation.get('e164');
|
||||
const uuid = conversation.get('uuid');
|
||||
if (isValidUuid(e164) && uuid) {
|
||||
conversation.set({ e164: undefined });
|
||||
updateConversation(conversation.attributes);
|
||||
|
||||
log.info(`Cleaning up conversation(${uuid}) with invalid e164`);
|
||||
}
|
||||
} catch (error) {
|
||||
log.error(
|
||||
'ConversationController.load/map: Failed to prepare a conversation',
|
||||
error && error.stack ? error.stack : error
|
||||
);
|
||||
}
|
||||
})
|
||||
);
|
||||
log.info('ConversationController: done with initial fetch');
|
||||
} catch (error) {
|
||||
log.error(
|
||||
'ConversationController: initial fetch failed',
|
||||
error && error.stack ? error.stack : error
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
const queue = new PQueue({ concurrency: 3, timeout: 1000 * 60 * 2 });
|
||||
queue.addAll(
|
||||
temporaryConversations.map(item => async () => {
|
||||
await removeConversation(item.id, {
|
||||
Conversation: window.Whisper.Conversation,
|
||||
});
|
||||
})
|
||||
);
|
||||
await queue.onIdle();
|
||||
|
||||
this._initialPromise = load();
|
||||
// Hydrate the final set of conversations
|
||||
this._conversations.add(
|
||||
collection.filter(conversation => !conversation.get('isTemporary'))
|
||||
);
|
||||
|
||||
return this._initialPromise;
|
||||
this._initialFetchComplete = true;
|
||||
|
||||
await Promise.all(
|
||||
this._conversations.map(async conversation => {
|
||||
try {
|
||||
// Hydrate contactCollection, now that initial fetch is complete
|
||||
conversation.fetchContacts();
|
||||
|
||||
const isChanged = maybeDeriveGroupV2Id(conversation);
|
||||
if (isChanged) {
|
||||
updateConversation(conversation.attributes);
|
||||
}
|
||||
|
||||
// In case a too-large draft was saved to the database
|
||||
const draft = conversation.get('draft');
|
||||
if (draft && draft.length > MAX_MESSAGE_BODY_LENGTH) {
|
||||
conversation.set({
|
||||
draft: draft.slice(0, MAX_MESSAGE_BODY_LENGTH),
|
||||
});
|
||||
updateConversation(conversation.attributes);
|
||||
}
|
||||
|
||||
// Clean up the conversations that have UUID as their e164.
|
||||
const e164 = conversation.get('e164');
|
||||
const uuid = conversation.get('uuid');
|
||||
if (isValidUuid(e164) && uuid) {
|
||||
conversation.set({ e164: undefined });
|
||||
updateConversation(conversation.attributes);
|
||||
|
||||
log.info(`Cleaning up conversation(${uuid}) with invalid e164`);
|
||||
}
|
||||
} catch (error) {
|
||||
log.error(
|
||||
'ConversationController.load/map: Failed to prepare a conversation',
|
||||
error && error.stack ? error.stack : error
|
||||
);
|
||||
}
|
||||
})
|
||||
);
|
||||
log.info('ConversationController: done with initial fetch');
|
||||
} catch (error) {
|
||||
log.error(
|
||||
'ConversationController: initial fetch failed',
|
||||
error && error.stack ? error.stack : error
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue