Clean up transaction handling in sql.js
This commit is contained in:
parent
3feb0037e5
commit
48691a2558
1 changed files with 302 additions and 220 deletions
176
app/sql.js
176
app/sql.js
|
@ -216,6 +216,7 @@ async function updateToSchemaVersion1(currentVersion, instance) {
|
|||
|
||||
await instance.run('BEGIN TRANSACTION;');
|
||||
|
||||
try {
|
||||
await instance.run(
|
||||
`CREATE TABLE messages(
|
||||
id STRING PRIMARY KEY ASC,
|
||||
|
@ -290,6 +291,10 @@ async function updateToSchemaVersion1(currentVersion, instance) {
|
|||
await instance.run('COMMIT TRANSACTION;');
|
||||
|
||||
console.log('updateToSchemaVersion1: success!');
|
||||
} catch (error) {
|
||||
await instance.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function updateToSchemaVersion2(currentVersion, instance) {
|
||||
|
@ -301,6 +306,7 @@ async function updateToSchemaVersion2(currentVersion, instance) {
|
|||
|
||||
await instance.run('BEGIN TRANSACTION;');
|
||||
|
||||
try {
|
||||
await instance.run(
|
||||
`ALTER TABLE messages
|
||||
ADD COLUMN expireTimer INTEGER;`
|
||||
|
@ -333,6 +339,10 @@ async function updateToSchemaVersion2(currentVersion, instance) {
|
|||
await instance.run('COMMIT TRANSACTION;');
|
||||
|
||||
console.log('updateToSchemaVersion2: success!');
|
||||
} catch (error) {
|
||||
await instance.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function updateToSchemaVersion3(currentVersion, instance) {
|
||||
|
@ -344,6 +354,7 @@ async function updateToSchemaVersion3(currentVersion, instance) {
|
|||
|
||||
await instance.run('BEGIN TRANSACTION;');
|
||||
|
||||
try {
|
||||
await instance.run('DROP INDEX messages_expiring;');
|
||||
await instance.run('DROP INDEX messages_unread;');
|
||||
|
||||
|
@ -363,6 +374,10 @@ async function updateToSchemaVersion3(currentVersion, instance) {
|
|||
await instance.run('COMMIT TRANSACTION;');
|
||||
|
||||
console.log('updateToSchemaVersion3: success!');
|
||||
} catch (error) {
|
||||
await instance.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function updateToSchemaVersion4(currentVersion, instance) {
|
||||
|
@ -374,6 +389,7 @@ async function updateToSchemaVersion4(currentVersion, instance) {
|
|||
|
||||
await instance.run('BEGIN TRANSACTION;');
|
||||
|
||||
try {
|
||||
await instance.run(
|
||||
`CREATE TABLE conversations(
|
||||
id STRING PRIMARY KEY ASC,
|
||||
|
@ -399,6 +415,10 @@ async function updateToSchemaVersion4(currentVersion, instance) {
|
|||
await instance.run('COMMIT TRANSACTION;');
|
||||
|
||||
console.log('updateToSchemaVersion4: success!');
|
||||
} catch (error) {
|
||||
await instance.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function updateToSchemaVersion6(currentVersion, instance) {
|
||||
|
@ -408,6 +428,7 @@ async function updateToSchemaVersion6(currentVersion, instance) {
|
|||
console.log('updateToSchemaVersion6: starting...');
|
||||
await instance.run('BEGIN TRANSACTION;');
|
||||
|
||||
try {
|
||||
// key-value, ids are strings, one extra column
|
||||
await instance.run(
|
||||
`CREATE TABLE sessions(
|
||||
|
@ -458,6 +479,10 @@ async function updateToSchemaVersion6(currentVersion, instance) {
|
|||
await instance.run('PRAGMA schema_version = 6;');
|
||||
await instance.run('COMMIT TRANSACTION;');
|
||||
console.log('updateToSchemaVersion6: success!');
|
||||
} catch (error) {
|
||||
await instance.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function updateToSchemaVersion7(currentVersion, instance) {
|
||||
|
@ -467,6 +492,7 @@ async function updateToSchemaVersion7(currentVersion, instance) {
|
|||
console.log('updateToSchemaVersion7: starting...');
|
||||
await instance.run('BEGIN TRANSACTION;');
|
||||
|
||||
try {
|
||||
// SQLite has been coercing our STRINGs into numbers, so we force it with TEXT
|
||||
// We create a new table then copy the data into it, since we can't modify columns
|
||||
|
||||
|
@ -494,6 +520,10 @@ async function updateToSchemaVersion7(currentVersion, instance) {
|
|||
await instance.run('PRAGMA schema_version = 7;');
|
||||
await instance.run('COMMIT TRANSACTION;');
|
||||
console.log('updateToSchemaVersion7: success!');
|
||||
} catch (error) {
|
||||
await instance.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function updateToSchemaVersion8(currentVersion, instance) {
|
||||
|
@ -503,12 +533,15 @@ async function updateToSchemaVersion8(currentVersion, instance) {
|
|||
console.log('updateToSchemaVersion8: starting...');
|
||||
await instance.run('BEGIN TRANSACTION;');
|
||||
|
||||
try {
|
||||
// First, we pull a new body field out of the message table's json blob
|
||||
await instance.run(
|
||||
`ALTER TABLE messages
|
||||
ADD COLUMN body TEXT;`
|
||||
);
|
||||
await instance.run("UPDATE messages SET body = json_extract(json, '$.body')");
|
||||
await instance.run(
|
||||
"UPDATE messages SET body = json_extract(json, '$.body')"
|
||||
);
|
||||
|
||||
// Then we create our full-text search table and populate it
|
||||
await instance.run(`
|
||||
|
@ -557,6 +590,10 @@ async function updateToSchemaVersion8(currentVersion, instance) {
|
|||
await instance.run('PRAGMA schema_version = 8;');
|
||||
await instance.run('COMMIT TRANSACTION;');
|
||||
console.log('updateToSchemaVersion8: success!');
|
||||
} catch (error) {
|
||||
await instance.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function updateToSchemaVersion9(currentVersion, instance) {
|
||||
|
@ -566,6 +603,7 @@ async function updateToSchemaVersion9(currentVersion, instance) {
|
|||
console.log('updateToSchemaVersion9: starting...');
|
||||
await instance.run('BEGIN TRANSACTION;');
|
||||
|
||||
try {
|
||||
await instance.run(`CREATE TABLE attachment_downloads(
|
||||
id STRING primary key,
|
||||
timestamp INTEGER,
|
||||
|
@ -585,6 +623,10 @@ async function updateToSchemaVersion9(currentVersion, instance) {
|
|||
await instance.run('PRAGMA schema_version = 9;');
|
||||
await instance.run('COMMIT TRANSACTION;');
|
||||
console.log('updateToSchemaVersion9: success!');
|
||||
} catch (error) {
|
||||
await instance.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function updateToSchemaVersion10(currentVersion, instance) {
|
||||
|
@ -594,6 +636,7 @@ async function updateToSchemaVersion10(currentVersion, instance) {
|
|||
console.log('updateToSchemaVersion10: starting...');
|
||||
await instance.run('BEGIN TRANSACTION;');
|
||||
|
||||
try {
|
||||
await instance.run('DROP INDEX unprocessed_id;');
|
||||
await instance.run('DROP INDEX unprocessed_timestamp;');
|
||||
await instance.run('ALTER TABLE unprocessed RENAME TO unprocessed_old;');
|
||||
|
@ -645,6 +688,10 @@ async function updateToSchemaVersion10(currentVersion, instance) {
|
|||
await instance.run('PRAGMA schema_version = 10;');
|
||||
await instance.run('COMMIT TRANSACTION;');
|
||||
console.log('updateToSchemaVersion10: success!');
|
||||
} catch (error) {
|
||||
await instance.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function updateToSchemaVersion11(currentVersion, instance) {
|
||||
|
@ -654,11 +701,16 @@ async function updateToSchemaVersion11(currentVersion, instance) {
|
|||
console.log('updateToSchemaVersion11: starting...');
|
||||
await instance.run('BEGIN TRANSACTION;');
|
||||
|
||||
try {
|
||||
await instance.run('DROP TABLE groups;');
|
||||
|
||||
await instance.run('PRAGMA schema_version = 11;');
|
||||
await instance.run('COMMIT TRANSACTION;');
|
||||
console.log('updateToSchemaVersion11: success!');
|
||||
} catch (error) {
|
||||
await instance.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function updateToSchemaVersion12(currentVersion, instance) {
|
||||
|
@ -669,6 +721,7 @@ async function updateToSchemaVersion12(currentVersion, instance) {
|
|||
console.log('updateToSchemaVersion12: starting...');
|
||||
await instance.run('BEGIN TRANSACTION;');
|
||||
|
||||
try {
|
||||
await instance.run(`CREATE TABLE sticker_packs(
|
||||
id TEXT PRIMARY KEY,
|
||||
key TEXT NOT NULL,
|
||||
|
@ -719,6 +772,10 @@ async function updateToSchemaVersion12(currentVersion, instance) {
|
|||
await instance.run('PRAGMA schema_version = 12;');
|
||||
await instance.run('COMMIT TRANSACTION;');
|
||||
console.log('updateToSchemaVersion12: success!');
|
||||
} catch (error) {
|
||||
await instance.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function updateToSchemaVersion13(currentVersion, instance) {
|
||||
|
@ -729,6 +786,7 @@ async function updateToSchemaVersion13(currentVersion, instance) {
|
|||
console.log('updateToSchemaVersion13: starting...');
|
||||
await instance.run('BEGIN TRANSACTION;');
|
||||
|
||||
try {
|
||||
await instance.run(
|
||||
'ALTER TABLE sticker_packs ADD COLUMN attemptedStatus STRING;'
|
||||
);
|
||||
|
@ -736,6 +794,10 @@ async function updateToSchemaVersion13(currentVersion, instance) {
|
|||
await instance.run('PRAGMA schema_version = 13;');
|
||||
await instance.run('COMMIT TRANSACTION;');
|
||||
console.log('updateToSchemaVersion13: success!');
|
||||
} catch (error) {
|
||||
await instance.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function updateToSchemaVersion14(currentVersion, instance) {
|
||||
|
@ -746,6 +808,7 @@ async function updateToSchemaVersion14(currentVersion, instance) {
|
|||
console.log('updateToSchemaVersion14: starting...');
|
||||
await instance.run('BEGIN TRANSACTION;');
|
||||
|
||||
try {
|
||||
await instance.run(`CREATE TABLE emojis(
|
||||
shortName STRING PRIMARY KEY,
|
||||
lastUsage INTEGER
|
||||
|
@ -759,6 +822,10 @@ async function updateToSchemaVersion14(currentVersion, instance) {
|
|||
await instance.run('PRAGMA schema_version = 14;');
|
||||
await instance.run('COMMIT TRANSACTION;');
|
||||
console.log('updateToSchemaVersion14: success!');
|
||||
} catch (error) {
|
||||
await instance.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function updateToSchemaVersion15(currentVersion, instance) {
|
||||
|
@ -769,6 +836,7 @@ async function updateToSchemaVersion15(currentVersion, instance) {
|
|||
console.log('updateToSchemaVersion15: starting...');
|
||||
await instance.run('BEGIN TRANSACTION;');
|
||||
|
||||
try {
|
||||
// SQLite has again coerced our STRINGs into numbers, so we force it with TEXT
|
||||
// We create a new table then copy the data into it, since we can't modify columns
|
||||
|
||||
|
@ -794,6 +862,10 @@ async function updateToSchemaVersion15(currentVersion, instance) {
|
|||
await instance.run('PRAGMA schema_version = 15;');
|
||||
await instance.run('COMMIT TRANSACTION;');
|
||||
console.log('updateToSchemaVersion15: success!');
|
||||
} catch (error) {
|
||||
await instance.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
const SCHEMA_VERSIONS = [
|
||||
|
@ -1105,17 +1177,16 @@ async function createOrUpdate(table, data) {
|
|||
}
|
||||
|
||||
async function bulkAdd(table, array) {
|
||||
let promise;
|
||||
await db.run('BEGIN TRANSACTION;');
|
||||
|
||||
db.serialize(() => {
|
||||
promise = Promise.all([
|
||||
db.run('BEGIN TRANSACTION;'),
|
||||
...map(array, data => createOrUpdate(table, data)),
|
||||
db.run('COMMIT TRANSACTION;'),
|
||||
]);
|
||||
});
|
||||
try {
|
||||
await Promise.all([...map(array, data => createOrUpdate(table, data))]);
|
||||
|
||||
await promise;
|
||||
await db.run('COMMIT TRANSACTION;');
|
||||
} catch (error) {
|
||||
await db.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function getById(table, id) {
|
||||
|
@ -1208,19 +1279,20 @@ async function saveConversation(data) {
|
|||
}
|
||||
|
||||
async function saveConversations(arrayOfConversations) {
|
||||
let promise;
|
||||
await db.run('BEGIN TRANSACTION;');
|
||||
|
||||
db.serialize(() => {
|
||||
promise = Promise.all([
|
||||
db.run('BEGIN TRANSACTION;'),
|
||||
try {
|
||||
await Promise.all([
|
||||
...map(arrayOfConversations, conversation =>
|
||||
saveConversation(conversation)
|
||||
),
|
||||
db.run('COMMIT TRANSACTION;'),
|
||||
]);
|
||||
});
|
||||
|
||||
await promise;
|
||||
await db.run('COMMIT TRANSACTION;');
|
||||
} catch (error) {
|
||||
await db.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function updateConversation(data) {
|
||||
|
@ -1525,17 +1597,18 @@ async function saveMessage(data, { forceSave } = {}) {
|
|||
}
|
||||
|
||||
async function saveMessages(arrayOfMessages, { forceSave } = {}) {
|
||||
let promise;
|
||||
await db.run('BEGIN TRANSACTION;');
|
||||
|
||||
db.serialize(() => {
|
||||
promise = Promise.all([
|
||||
db.run('BEGIN TRANSACTION;'),
|
||||
try {
|
||||
await Promise.all([
|
||||
...map(arrayOfMessages, message => saveMessage(message, { forceSave })),
|
||||
db.run('COMMIT TRANSACTION;'),
|
||||
]);
|
||||
});
|
||||
|
||||
await promise;
|
||||
await db.run('COMMIT TRANSACTION;');
|
||||
} catch (error) {
|
||||
await db.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function removeMessage(id) {
|
||||
|
@ -1736,19 +1809,20 @@ async function saveUnprocessed(data, { forceSave } = {}) {
|
|||
}
|
||||
|
||||
async function saveUnprocesseds(arrayOfUnprocessed, { forceSave } = {}) {
|
||||
let promise;
|
||||
await db.run('BEGIN TRANSACTION;');
|
||||
|
||||
db.serialize(() => {
|
||||
promise = Promise.all([
|
||||
db.run('BEGIN TRANSACTION;'),
|
||||
try {
|
||||
await Promise.all([
|
||||
...map(arrayOfUnprocessed, unprocessed =>
|
||||
saveUnprocessed(unprocessed, { forceSave })
|
||||
),
|
||||
db.run('COMMIT TRANSACTION;'),
|
||||
]);
|
||||
});
|
||||
|
||||
await promise;
|
||||
await db.run('COMMIT TRANSACTION;');
|
||||
} catch (error) {
|
||||
await db.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function updateUnprocessedAttempts(id, attempts) {
|
||||
|
@ -2149,7 +2223,7 @@ async function deleteStickerPackReference(messageId, packId) {
|
|||
}
|
||||
const count = countRow['count(*)'];
|
||||
if (count > 0) {
|
||||
await db.run('COMMIT TRANSACTION');
|
||||
await db.run('COMMIT TRANSACTION;');
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -2160,13 +2234,13 @@ async function deleteStickerPackReference(messageId, packId) {
|
|||
);
|
||||
if (!packRow) {
|
||||
console.log('deleteStickerPackReference: did not find referenced pack');
|
||||
await db.run('COMMIT TRANSACTION');
|
||||
await db.run('COMMIT TRANSACTION;');
|
||||
return null;
|
||||
}
|
||||
const { status } = packRow;
|
||||
|
||||
if (status === 'installed') {
|
||||
await db.run('COMMIT TRANSACTION');
|
||||
await db.run('COMMIT TRANSACTION;');
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -2274,6 +2348,7 @@ async function getRecentStickers({ limit } = {}) {
|
|||
async function updateEmojiUsage(shortName, timeUsed = Date.now()) {
|
||||
await db.run('BEGIN TRANSACTION;');
|
||||
|
||||
try {
|
||||
const rows = await db.get(
|
||||
'SELECT * FROM emojis WHERE shortName = $shortName;',
|
||||
{
|
||||
|
@ -2294,6 +2369,10 @@ async function updateEmojiUsage(shortName, timeUsed = Date.now()) {
|
|||
}
|
||||
|
||||
await db.run('COMMIT TRANSACTION;');
|
||||
} catch (error) {
|
||||
await db.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function getRecentEmojis(limit = 32) {
|
||||
|
@ -2309,11 +2388,10 @@ async function getRecentEmojis(limit = 32) {
|
|||
|
||||
// All data in database
|
||||
async function removeAll() {
|
||||
let promise;
|
||||
await db.run('BEGIN TRANSACTION;');
|
||||
|
||||
db.serialize(() => {
|
||||
promise = Promise.all([
|
||||
db.run('BEGIN TRANSACTION;'),
|
||||
try {
|
||||
await Promise.all([
|
||||
db.run('DELETE FROM conversations;'),
|
||||
db.run('DELETE FROM identityKeys;'),
|
||||
db.run('DELETE FROM items;'),
|
||||
|
@ -2327,19 +2405,21 @@ async function removeAll() {
|
|||
db.run('DELETE FROM stickers;'),
|
||||
db.run('DELETE FROM sticker_packs;'),
|
||||
db.run('DELETE FROM sticker_references;'),
|
||||
db.run('COMMIT TRANSACTION;'),
|
||||
]);
|
||||
});
|
||||
|
||||
await promise;
|
||||
await db.run('COMMIT TRANSACTION;');
|
||||
} catch (error) {
|
||||
await db.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// Anything that isn't user-visible data
|
||||
async function removeAllConfiguration() {
|
||||
let promise;
|
||||
await db.run('BEGIN TRANSACTION;');
|
||||
|
||||
db.serialize(() => {
|
||||
promise = Promise.all([
|
||||
try {
|
||||
await Promise.all([
|
||||
db.run('BEGIN TRANSACTION;'),
|
||||
db.run('DELETE FROM identityKeys;'),
|
||||
db.run('DELETE FROM items;'),
|
||||
|
@ -2347,11 +2427,13 @@ async function removeAllConfiguration() {
|
|||
db.run('DELETE FROM sessions;'),
|
||||
db.run('DELETE FROM signedPreKeys;'),
|
||||
db.run('DELETE FROM unprocessed;'),
|
||||
db.run('COMMIT TRANSACTION;'),
|
||||
]);
|
||||
});
|
||||
|
||||
await promise;
|
||||
await db.run('COMMIT TRANSACTION;');
|
||||
} catch (error) {
|
||||
await db.run('ROLLBACK;');
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function getMessagesNeedingUpgrade(limit, { maxVersion }) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue