2015-02-16 19:08:29 +00:00
|
|
|
{- Persistent sqlite database handles.
|
|
|
|
-
|
2019-09-26 16:24:03 +00:00
|
|
|
- Copyright 2015-2019 Joey Hess <id@joeyh.name>
|
2015-02-16 19:08:29 +00:00
|
|
|
-
|
2019-03-13 19:48:14 +00:00
|
|
|
- Licensed under the GNU AGPL version 3 or higher.
|
2015-02-16 19:08:29 +00:00
|
|
|
-}
|
|
|
|
|
2019-09-26 16:24:03 +00:00
|
|
|
{-# LANGUAGE TypeFamilies, FlexibleContexts #-}
|
|
|
|
|
2015-02-16 19:08:29 +00:00
|
|
|
module Database.Handle (
|
|
|
|
DbHandle,
|
2017-09-06 21:07:49 +00:00
|
|
|
DbConcurrency(..),
|
2015-02-16 19:08:29 +00:00
|
|
|
openDb,
|
2015-12-23 18:59:58 +00:00
|
|
|
TableName,
|
2015-02-18 18:11:27 +00:00
|
|
|
queryDb,
|
2015-02-16 20:04:23 +00:00
|
|
|
closeDb,
|
2015-02-18 18:11:27 +00:00
|
|
|
commitDb,
|
2015-12-23 18:59:58 +00:00
|
|
|
commitDb',
|
2015-02-16 19:08:29 +00:00
|
|
|
) where
|
|
|
|
|
|
|
|
import Utility.Exception
|
2015-12-23 22:34:51 +00:00
|
|
|
import Utility.FileSystemEncoding
|
2015-02-16 19:08:29 +00:00
|
|
|
|
allow for concurrent incremental fsck processes again (sorta)
Sqlite doesn't support multiple concurrent writers
at all. One of them will fail to write. It's not even possible to have two
processes building up separate transactions at the same time. Before using
sqlite, incremental fsck could work perfectly well with multiple fsck
processes running concurrently. I'd like to keep that working.
My partial solution, so far, is to make git-annex buffer writes, and every
so often send them all to sqlite at once, in a transaction. So most of the
time, nothing is writing to the database. (And if it gets unlucky and
a write fails due to a collision with another writer, it can just wait and
retry the write later.) This lets multiple processes write to the database
successfully.
But, for the purposes of concurrent, incremental fsck, it's not ideal.
Each process doesn't immediately learn of files that another process has
checked. So they'll tend to do redundant work.
Only way I can see to improve this is to use some other mechanism for
short-term IPC between the fsck processes. Not yet done.
----
Also, make addDb check if an item is in the database already, and not try
to re-add it. That fixes an intermittent crash with
"SQLite3 returned ErrorConstraint while attempting to perform step."
I am not 100% sure why; it only started happening when I moved write
buffering into the queue. It seemed to generally happen on the same file
each time, so could just be due to multiple files having the same key.
However, I doubt my sound repo has many duplicate keys, and I suspect
something else is going on.
----
Updated benchmark, with the 1000 item queue: 6m33.808s
2015-02-17 20:39:35 +00:00
|
|
|
import Database.Persist.Sqlite
|
2015-02-18 19:54:24 +00:00
|
|
|
import qualified Database.Sqlite as Sqlite
|
2015-02-16 20:48:19 +00:00
|
|
|
import Control.Monad
|
2015-02-16 19:08:29 +00:00
|
|
|
import Control.Monad.IO.Class (liftIO)
|
2019-09-26 16:24:03 +00:00
|
|
|
import Control.Monad.IO.Unlift (MonadUnliftIO, withRunInIO)
|
|
|
|
import Control.Monad.Logger (MonadLogger)
|
2015-02-16 19:08:29 +00:00
|
|
|
import Control.Concurrent
|
|
|
|
import Control.Concurrent.Async
|
2015-12-23 20:11:36 +00:00
|
|
|
import Control.Exception (throwIO, BlockedIndefinitelyOnMVar(..))
|
2015-02-16 19:08:29 +00:00
|
|
|
import qualified Data.Text as T
|
2015-02-22 18:08:26 +00:00
|
|
|
import Control.Monad.Trans.Resource (runResourceT)
|
|
|
|
import Control.Monad.Logger (runNoLoggingT)
|
2015-12-09 18:55:47 +00:00
|
|
|
import System.IO
|
2015-02-16 19:08:29 +00:00
|
|
|
|
|
|
|
{- A DbHandle is a reference to a worker thread that communicates with
|
|
|
|
- the database. It has a MVar which Jobs are submitted to. -}
|
2017-09-06 21:07:49 +00:00
|
|
|
data DbHandle = DbHandle DbConcurrency (Async ()) (MVar Job)
|
2015-02-16 19:08:29 +00:00
|
|
|
|
2015-12-23 18:59:58 +00:00
|
|
|
{- Name of a table that should exist once the database is initialized. -}
|
|
|
|
type TableName = String
|
|
|
|
|
2017-09-06 21:07:49 +00:00
|
|
|
{- Sqlite only allows a single write to a database at a time; a concurrent
|
|
|
|
- write will crash.
|
|
|
|
-
|
2018-10-30 15:40:11 +00:00
|
|
|
- MultiWrter works around this limitation.
|
2017-09-06 21:07:49 +00:00
|
|
|
- The downside of using MultiWriter is that after writing a change to the
|
|
|
|
- database, the a query using the same DbHandle will not immediately see
|
|
|
|
- the change! This is because the change is actually written using a
|
|
|
|
- separate database connection, and caching can prevent seeing the change.
|
|
|
|
- Also, consider that if multiple processes are writing to a database,
|
|
|
|
- you can't rely on seeing values you've just written anyway, as another
|
|
|
|
- process may change them.
|
|
|
|
-
|
2018-10-30 15:40:11 +00:00
|
|
|
- When a database can only be written to by a single process (enforced by
|
|
|
|
- a lock file), use SingleWriter. Changes written to the database will
|
|
|
|
- always be immediately visible then. Multiple threads can write; their
|
|
|
|
- writes will be serialized.
|
2017-09-06 21:07:49 +00:00
|
|
|
-}
|
|
|
|
data DbConcurrency = SingleWriter | MultiWriter
|
|
|
|
|
2015-02-18 19:54:24 +00:00
|
|
|
{- Opens the database, but does not perform any migrations. Only use
|
2017-09-06 21:07:49 +00:00
|
|
|
- once the database is known to exist and have the right tables. -}
|
|
|
|
openDb :: DbConcurrency -> FilePath -> TableName -> IO DbHandle
|
|
|
|
openDb dbconcurrency db tablename = do
|
2015-02-16 19:08:29 +00:00
|
|
|
jobs <- newEmptyMVar
|
2015-02-22 18:08:26 +00:00
|
|
|
worker <- async (workerThread (T.pack db) tablename jobs)
|
2015-12-23 22:34:51 +00:00
|
|
|
|
|
|
|
-- work around https://github.com/yesodweb/persistent/issues/474
|
2016-12-30 22:14:19 +00:00
|
|
|
liftIO $ fileEncoding stderr
|
2015-12-23 22:34:51 +00:00
|
|
|
|
2017-09-06 21:07:49 +00:00
|
|
|
return $ DbHandle dbconcurrency worker jobs
|
2015-12-23 18:59:58 +00:00
|
|
|
|
2015-12-23 20:11:36 +00:00
|
|
|
{- This is optional; when the DbHandle gets garbage collected it will
|
|
|
|
- auto-close. -}
|
2015-12-23 18:59:58 +00:00
|
|
|
closeDb :: DbHandle -> IO ()
|
2017-09-06 21:07:49 +00:00
|
|
|
closeDb (DbHandle _ worker jobs) = do
|
2015-12-23 18:59:58 +00:00
|
|
|
putMVar jobs CloseJob
|
|
|
|
wait worker
|
|
|
|
|
|
|
|
{- Makes a query using the DbHandle. This should not be used to make
|
|
|
|
- changes to the database!
|
|
|
|
-
|
|
|
|
- Note that the action is not run by the calling thread, but by a
|
|
|
|
- worker thread. Exceptions are propigated to the calling thread.
|
|
|
|
-
|
|
|
|
- Only one action can be run at a time against a given DbHandle.
|
|
|
|
- If called concurrently in the same process, this will block until
|
|
|
|
- it is able to run.
|
2017-09-06 21:07:49 +00:00
|
|
|
-
|
|
|
|
- Note that when the DbHandle was opened in MultiWriter mode, recent
|
|
|
|
- writes may not be seen by queryDb.
|
2015-12-23 18:59:58 +00:00
|
|
|
-}
|
|
|
|
queryDb :: DbHandle -> SqlPersistM a -> IO a
|
2017-09-06 21:07:49 +00:00
|
|
|
queryDb (DbHandle _ _ jobs) a = do
|
2015-12-23 18:59:58 +00:00
|
|
|
res <- newEmptyMVar
|
|
|
|
putMVar jobs $ QueryJob $
|
|
|
|
liftIO . putMVar res =<< tryNonAsync a
|
|
|
|
(either throwIO return =<< takeMVar res)
|
|
|
|
`catchNonAsync` (const $ error "sqlite query crashed")
|
|
|
|
|
|
|
|
{- Writes a change to the database.
|
|
|
|
-
|
2018-10-30 15:40:11 +00:00
|
|
|
- In MultiWriter mode, writes can fail if another write is happening
|
|
|
|
- concurrently. So write failures are caught and retried repeatedly
|
|
|
|
- for up to 10 seconds, which should avoid all but the most exceptional
|
|
|
|
- problems.
|
2015-12-23 18:59:58 +00:00
|
|
|
-}
|
|
|
|
commitDb :: DbHandle -> SqlPersistM () -> IO ()
|
|
|
|
commitDb h wa = robustly Nothing 100 (commitDb' h wa)
|
|
|
|
where
|
|
|
|
robustly :: Maybe SomeException -> Int -> IO (Either SomeException ()) -> IO ()
|
|
|
|
robustly e 0 _ = error $ "failed to commit changes to sqlite database: " ++ show e
|
|
|
|
robustly _ n a = do
|
|
|
|
r <- a
|
|
|
|
case r of
|
|
|
|
Right _ -> return ()
|
|
|
|
Left e -> do
|
|
|
|
threadDelay 100000 -- 1/10th second
|
|
|
|
robustly (Just e) (n-1) a
|
|
|
|
|
|
|
|
commitDb' :: DbHandle -> SqlPersistM () -> IO (Either SomeException ())
|
2017-09-06 21:07:49 +00:00
|
|
|
commitDb' (DbHandle MultiWriter _ jobs) a = do
|
2015-12-23 18:59:58 +00:00
|
|
|
res <- newEmptyMVar
|
2017-09-06 21:07:49 +00:00
|
|
|
putMVar jobs $ RobustChangeJob $ \runner ->
|
2015-12-23 18:59:58 +00:00
|
|
|
liftIO $ putMVar res =<< tryNonAsync (runner a)
|
|
|
|
takeMVar res
|
2017-09-06 21:07:49 +00:00
|
|
|
commitDb' (DbHandle SingleWriter _ jobs) a = do
|
|
|
|
res <- newEmptyMVar
|
|
|
|
putMVar jobs $ ChangeJob $
|
|
|
|
liftIO . putMVar res =<< tryNonAsync a
|
|
|
|
takeMVar res
|
|
|
|
`catchNonAsync` (const $ error "sqlite commit crashed")
|
2015-02-16 19:08:29 +00:00
|
|
|
|
2015-02-18 18:11:27 +00:00
|
|
|
data Job
|
|
|
|
= QueryJob (SqlPersistM ())
|
2017-09-06 21:07:49 +00:00
|
|
|
| ChangeJob (SqlPersistM ())
|
|
|
|
| RobustChangeJob ((SqlPersistM () -> IO ()) -> IO ())
|
2015-02-18 18:11:27 +00:00
|
|
|
| CloseJob
|
|
|
|
|
2015-02-22 18:08:26 +00:00
|
|
|
workerThread :: T.Text -> TableName -> MVar Job -> IO ()
|
2017-09-18 23:42:20 +00:00
|
|
|
workerThread db tablename jobs = go
|
2015-02-16 19:08:29 +00:00
|
|
|
where
|
2017-09-18 23:42:20 +00:00
|
|
|
go = do
|
|
|
|
v <- tryNonAsync (runSqliteRobustly tablename db loop)
|
|
|
|
case v of
|
|
|
|
Left e -> hPutStrLn stderr $
|
|
|
|
"sqlite worker thread crashed: " ++ show e
|
|
|
|
Right True -> go
|
|
|
|
Right False -> return ()
|
2015-02-22 18:08:26 +00:00
|
|
|
|
2015-12-23 20:11:36 +00:00
|
|
|
getjob :: IO (Either BlockedIndefinitelyOnMVar Job)
|
|
|
|
getjob = try $ takeMVar jobs
|
|
|
|
|
2015-02-18 18:11:27 +00:00
|
|
|
loop = do
|
2015-12-23 20:11:36 +00:00
|
|
|
job <- liftIO getjob
|
2015-02-22 18:21:39 +00:00
|
|
|
case job of
|
2015-12-23 20:11:36 +00:00
|
|
|
-- Exception is thrown when the MVar is garbage
|
|
|
|
-- collected, which means the whole DbHandle
|
|
|
|
-- is not used any longer. Shutdown cleanly.
|
2017-09-18 23:42:20 +00:00
|
|
|
Left BlockedIndefinitelyOnMVar -> return False
|
|
|
|
Right CloseJob -> return False
|
2015-12-23 20:11:36 +00:00
|
|
|
Right (QueryJob a) -> a >> loop
|
2017-09-18 23:42:20 +00:00
|
|
|
Right (ChangeJob a) -> do
|
|
|
|
a
|
|
|
|
-- Exit this sqlite transaction so the
|
|
|
|
-- database gets updated on disk.
|
|
|
|
return True
|
2017-09-06 21:07:49 +00:00
|
|
|
-- Change is run in a separate database connection
|
2015-02-18 18:11:27 +00:00
|
|
|
-- since sqlite only supports a single writer at a
|
|
|
|
-- time, and it may crash the database connection
|
2017-09-06 21:07:49 +00:00
|
|
|
-- that the write is made to.
|
2017-09-18 23:42:20 +00:00
|
|
|
Right (RobustChangeJob a) -> do
|
|
|
|
liftIO (a (runSqliteRobustly tablename db))
|
|
|
|
loop
|
2015-02-22 18:08:26 +00:00
|
|
|
|
2018-10-30 22:03:03 +00:00
|
|
|
-- Like runSqlite, but more robust.
|
|
|
|
--
|
|
|
|
-- New database connections can sometimes take a while to become usable.
|
|
|
|
-- This may be due to WAL mode recovering after a crash, or perhaps a bug
|
|
|
|
-- like described in blob 500f777a6ab6c45ca5f9790e0a63575f8e3cb88f.
|
|
|
|
-- So, loop until a select succeeds; once one succeeds the connection will
|
|
|
|
-- stay usable.
|
|
|
|
--
|
|
|
|
-- And sqlite sometimes throws ErrorIO when there's not really an IO problem,
|
|
|
|
-- but perhaps just a short read(). That's caught and retried several times.
|
2015-12-23 20:11:36 +00:00
|
|
|
runSqliteRobustly :: TableName -> T.Text -> (SqlPersistM a) -> IO a
|
|
|
|
runSqliteRobustly tablename db a = do
|
2018-10-30 22:03:03 +00:00
|
|
|
conn <- opensettle maxretries
|
|
|
|
go conn maxretries
|
2015-12-23 20:11:36 +00:00
|
|
|
where
|
2018-10-30 22:03:03 +00:00
|
|
|
maxretries = 100 :: Int
|
|
|
|
|
|
|
|
rethrow msg e = throwIO $ userError $ show e ++ "(" ++ msg ++ ")"
|
|
|
|
|
|
|
|
go conn retries = do
|
|
|
|
r <- try $ runResourceT $ runNoLoggingT $
|
2019-09-26 16:24:03 +00:00
|
|
|
withSqlConnRobustly (wrapConnection conn) $
|
2018-10-30 22:03:03 +00:00
|
|
|
runSqlConn a
|
|
|
|
case r of
|
|
|
|
Right v -> return v
|
|
|
|
Left ex@(Sqlite.SqliteException { Sqlite.seError = e })
|
|
|
|
| e == Sqlite.ErrorIO ->
|
|
|
|
let retries' = retries - 1
|
|
|
|
in if retries' < 1
|
|
|
|
then rethrow "after successful open" ex
|
|
|
|
else go conn retries'
|
|
|
|
| otherwise -> rethrow "after successful open" ex
|
|
|
|
|
|
|
|
opensettle retries = do
|
|
|
|
conn <- Sqlite.open db
|
|
|
|
settle conn retries
|
|
|
|
|
|
|
|
settle conn retries = do
|
|
|
|
r <- try $ do
|
2015-02-22 18:08:26 +00:00
|
|
|
stmt <- Sqlite.prepare conn nullselect
|
|
|
|
void $ Sqlite.step stmt
|
|
|
|
void $ Sqlite.finalize stmt
|
|
|
|
case r of
|
2018-10-30 22:03:03 +00:00
|
|
|
Right _ -> return conn
|
|
|
|
Left ex@(Sqlite.SqliteException { Sqlite.seError = e })
|
|
|
|
| e == Sqlite.ErrorBusy -> do
|
|
|
|
-- Wait and retry any number of times; it
|
|
|
|
-- will stop being busy eventually.
|
|
|
|
briefdelay
|
|
|
|
settle conn retries
|
|
|
|
| e == Sqlite.ErrorIO -> do
|
|
|
|
-- Could be a real IO error,
|
|
|
|
-- so don't retry indefinitely.
|
|
|
|
Sqlite.close conn
|
|
|
|
briefdelay
|
|
|
|
let retries' = retries - 1
|
|
|
|
if retries' < 1
|
|
|
|
then rethrow "while opening database connection" ex
|
|
|
|
else opensettle retries'
|
|
|
|
| otherwise -> rethrow "while opening database connection" ex
|
2015-02-22 18:08:26 +00:00
|
|
|
|
|
|
|
-- This should succeed for any table.
|
|
|
|
nullselect = T.pack $ "SELECT null from " ++ tablename ++ " limit 1"
|
2018-10-30 22:03:03 +00:00
|
|
|
|
|
|
|
briefdelay = threadDelay 1000 -- 1/1000th second
|
2019-09-26 16:24:03 +00:00
|
|
|
|
|
|
|
-- Like withSqlConn, but more robust.
|
|
|
|
withSqlConnRobustly
|
|
|
|
:: (MonadUnliftIO m, MonadLogger m, IsPersistBackend backend, BaseBackend backend ~ SqlBackend)
|
|
|
|
=> (LogFunc -> IO backend)
|
|
|
|
-> (backend -> m a)
|
|
|
|
-> m a
|
|
|
|
withSqlConnRobustly open f = do
|
|
|
|
logFunc <- askLogFunc
|
|
|
|
withRunInIO $ \run -> bracket
|
|
|
|
(open logFunc)
|
|
|
|
closeRobustly
|
|
|
|
(run . f)
|
|
|
|
|
|
|
|
-- Sqlite can throw ErrorBusy while closing a database; this catches
|
|
|
|
-- the exception and retries.
|
|
|
|
closeRobustly
|
|
|
|
:: (IsPersistBackend backend, BaseBackend backend ~ SqlBackend)
|
|
|
|
=> backend
|
|
|
|
-> IO ()
|
|
|
|
closeRobustly conn = go maxretries briefdelay
|
|
|
|
where
|
|
|
|
briefdelay = 1000 -- 1/1000th second
|
|
|
|
|
|
|
|
-- Try up to 14 times; with the delay doubling each time,
|
|
|
|
-- the maximum delay before giving up is 16 seconds.
|
|
|
|
maxretries = 14 :: Int
|
|
|
|
|
|
|
|
go retries delay = do
|
|
|
|
r <- try $ close' conn
|
|
|
|
case r of
|
|
|
|
Right () -> return ()
|
|
|
|
Left ex@(Sqlite.SqliteException { Sqlite.seError = e })
|
|
|
|
| e == Sqlite.ErrorBusy -> do
|
|
|
|
threadDelay delay
|
|
|
|
let delay' = delay * 2
|
|
|
|
let retries' = retries - 1
|
|
|
|
if retries' < 1
|
|
|
|
then rethrow "while closing database connection" ex
|
|
|
|
else go retries' delay'
|
|
|
|
| otherwise -> rethrow "while closing database connection" ex
|
|
|
|
|
|
|
|
rethrow msg e = throwIO $ userError $ show e ++ "(" ++ msg ++ ")"
|