Retry outbound "normal" messages for up to a day

This commit is contained in:
Evan Hahn 2021-08-31 15:58:39 -05:00 committed by GitHub
parent 62cf51c060
commit a85dd1be36
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
30 changed files with 1414 additions and 603 deletions

View file

@ -196,6 +196,7 @@ const dataInterface: ServerInterface = {
removeReactionFromConversation,
getMessageBySender,
getMessageById,
getMessagesById,
_getAllMessages,
getAllMessageIds,
getMessagesBySentAt,
@ -2363,22 +2364,37 @@ function getInstance(): Database {
return globalInstance;
}
function batchMultiVarQuery<T>(
values: Array<T>,
query: (batch: Array<T>) => void
): void {
function batchMultiVarQuery<ValueT>(
values: Array<ValueT>,
query: (batch: Array<ValueT>) => void
): [];
function batchMultiVarQuery<ValueT, ResultT>(
values: Array<ValueT>,
query: (batch: Array<ValueT>) => Array<ResultT>
): Array<ResultT>;
function batchMultiVarQuery<ValueT, ResultT>(
values: Array<ValueT>,
query:
| ((batch: Array<ValueT>) => void)
| ((batch: Array<ValueT>) => Array<ResultT>)
): Array<ResultT> {
const db = getInstance();
if (values.length > MAX_VARIABLE_COUNT) {
const result: Array<ResultT> = [];
db.transaction(() => {
for (let i = 0; i < values.length; i += MAX_VARIABLE_COUNT) {
const batch = values.slice(i, i + MAX_VARIABLE_COUNT);
query(batch);
const batchResult = query(batch);
if (Array.isArray(batchResult)) {
result.push(...batchResult);
}
}
})();
return;
return result;
}
query(values);
const result = query(values);
return Array.isArray(result) ? result : [];
}
const IDENTITY_KEYS_TABLE = 'identityKeys';
@ -3577,11 +3593,15 @@ function hasUserInitiatedMessages(conversationId: string): boolean {
function saveMessageSync(
data: MessageType,
options?: { forceSave?: boolean; alreadyInTransaction?: boolean }
options?: {
jobToInsert?: StoredJob;
forceSave?: boolean;
alreadyInTransaction?: boolean;
}
): string {
const db = getInstance();
const { forceSave, alreadyInTransaction } = options || {};
const { jobToInsert, forceSave, alreadyInTransaction } = options || {};
if (!alreadyInTransaction) {
return db.transaction(() => {
@ -3670,6 +3690,10 @@ function saveMessageSync(
`
).run(payload);
if (jobToInsert) {
insertJobSync(db, jobToInsert);
}
return id;
}
@ -3733,12 +3757,20 @@ function saveMessageSync(
json: objectToJSON(toCreate),
});
if (jobToInsert) {
insertJobSync(db, jobToInsert);
}
return toCreate.id;
}
async function saveMessage(
data: MessageType,
options?: { forceSave?: boolean; alreadyInTransaction?: boolean }
options?: {
jobToInsert?: StoredJob;
forceSave?: boolean;
alreadyInTransaction?: boolean;
}
): Promise<string> {
return saveMessageSync(data, options);
}
@ -3795,6 +3827,25 @@ async function getMessageById(id: string): Promise<MessageType | undefined> {
return jsonToObject(row.json);
}
async function getMessagesById(
messageIds: Array<string>
): Promise<Array<MessageType>> {
const db = getInstance();
return batchMultiVarQuery(
messageIds,
(batch: Array<string>): Array<MessageType> => {
const query = db.prepare<ArrayQuery>(
`SELECT json FROM messages WHERE id IN (${Array(batch.length)
.fill('?')
.join(',')});`
);
const rows: JSONRows = query.all(batch);
return rows.map(row => jsonToObject(row.json));
}
);
}
async function _getAllMessages(): Promise<Array<MessageType>> {
const db = getInstance();
const rows: JSONRows = db
@ -5902,9 +5953,7 @@ async function getJobsInQueue(queueType: string): Promise<Array<StoredJob>> {
}));
}
async function insertJob(job: Readonly<StoredJob>): Promise<void> {
const db = getInstance();
function insertJobSync(db: Database, job: Readonly<StoredJob>): void {
db.prepare<Query>(
`
INSERT INTO jobs
@ -5920,6 +5969,11 @@ async function insertJob(job: Readonly<StoredJob>): Promise<void> {
});
}
async function insertJob(job: Readonly<StoredJob>): Promise<void> {
const db = getInstance();
return insertJobSync(db, job);
}
async function deleteJob(id: string): Promise<void> {
const db = getInstance();