904b175707
Added an additional constraint that persistent needs. This also builds with persistent-2.9.2 without needing any cpp.
294 lines
9.3 KiB
Haskell
294 lines
9.3 KiB
Haskell
{- Persistent sqlite database handles.
|
|
-
|
|
- Copyright 2015-2019 Joey Hess <id@joeyh.name>
|
|
-
|
|
- Licensed under the GNU AGPL version 3 or higher.
|
|
-}
|
|
|
|
{-# LANGUAGE TypeFamilies, FlexibleContexts #-}
|
|
|
|
module Database.Handle (
|
|
DbHandle,
|
|
DbConcurrency(..),
|
|
openDb,
|
|
TableName,
|
|
queryDb,
|
|
closeDb,
|
|
commitDb,
|
|
commitDb',
|
|
) where
|
|
|
|
import Utility.Exception
|
|
import Utility.FileSystemEncoding
|
|
|
|
import Database.Persist.Sqlite
|
|
import qualified Database.Sqlite as Sqlite
|
|
import Control.Monad
|
|
import Control.Monad.IO.Class (liftIO)
|
|
import Control.Monad.IO.Unlift (MonadUnliftIO, withRunInIO)
|
|
import Control.Monad.Logger (MonadLogger)
|
|
import Control.Concurrent
|
|
import Control.Concurrent.Async
|
|
import Control.Exception (throwIO, BlockedIndefinitelyOnMVar(..))
|
|
import qualified Data.Text as T
|
|
import Control.Monad.Trans.Resource (runResourceT)
|
|
import Control.Monad.Logger (runNoLoggingT)
|
|
import System.IO
|
|
|
|
{- A DbHandle is a reference to a worker thread that communicates with
|
|
- the database. It has a MVar which Jobs are submitted to. -}
|
|
data DbHandle = DbHandle DbConcurrency (Async ()) (MVar Job)
|
|
|
|
{- Name of a table that should exist once the database is initialized. -}
|
|
type TableName = String
|
|
|
|
{- Sqlite only allows a single write to a database at a time; a concurrent
|
|
- write will crash.
|
|
-
|
|
- MultiWrter works around this limitation.
|
|
- The downside of using MultiWriter is that after writing a change to the
|
|
- database, the a query using the same DbHandle will not immediately see
|
|
- the change! This is because the change is actually written using a
|
|
- separate database connection, and caching can prevent seeing the change.
|
|
- Also, consider that if multiple processes are writing to a database,
|
|
- you can't rely on seeing values you've just written anyway, as another
|
|
- process may change them.
|
|
-
|
|
- When a database can only be written to by a single process (enforced by
|
|
- a lock file), use SingleWriter. Changes written to the database will
|
|
- always be immediately visible then. Multiple threads can write; their
|
|
- writes will be serialized.
|
|
-}
|
|
data DbConcurrency = SingleWriter | MultiWriter
|
|
|
|
{- Opens the database, but does not perform any migrations. Only use
|
|
- once the database is known to exist and have the right tables. -}
|
|
openDb :: DbConcurrency -> FilePath -> TableName -> IO DbHandle
|
|
openDb dbconcurrency db tablename = do
|
|
jobs <- newEmptyMVar
|
|
worker <- async (workerThread (T.pack db) tablename jobs)
|
|
|
|
-- work around https://github.com/yesodweb/persistent/issues/474
|
|
liftIO $ fileEncoding stderr
|
|
|
|
return $ DbHandle dbconcurrency worker jobs
|
|
|
|
{- This is optional; when the DbHandle gets garbage collected it will
|
|
- auto-close. -}
|
|
closeDb :: DbHandle -> IO ()
|
|
closeDb (DbHandle _ worker jobs) = do
|
|
putMVar jobs CloseJob
|
|
wait worker
|
|
|
|
{- Makes a query using the DbHandle. This should not be used to make
|
|
- changes to the database!
|
|
-
|
|
- Note that the action is not run by the calling thread, but by a
|
|
- worker thread. Exceptions are propigated to the calling thread.
|
|
-
|
|
- Only one action can be run at a time against a given DbHandle.
|
|
- If called concurrently in the same process, this will block until
|
|
- it is able to run.
|
|
-
|
|
- Note that when the DbHandle was opened in MultiWriter mode, recent
|
|
- writes may not be seen by queryDb.
|
|
-}
|
|
queryDb :: DbHandle -> SqlPersistM a -> IO a
|
|
queryDb (DbHandle _ _ jobs) a = do
|
|
res <- newEmptyMVar
|
|
putMVar jobs $ QueryJob $
|
|
liftIO . putMVar res =<< tryNonAsync a
|
|
(either throwIO return =<< takeMVar res)
|
|
`catchNonAsync` (const $ error "sqlite query crashed")
|
|
|
|
{- Writes a change to the database.
|
|
-
|
|
- In MultiWriter mode, writes can fail if another write is happening
|
|
- concurrently. So write failures are caught and retried repeatedly
|
|
- for up to 10 seconds, which should avoid all but the most exceptional
|
|
- problems.
|
|
-}
|
|
commitDb :: DbHandle -> SqlPersistM () -> IO ()
|
|
commitDb h wa = robustly Nothing 100 (commitDb' h wa)
|
|
where
|
|
robustly :: Maybe SomeException -> Int -> IO (Either SomeException ()) -> IO ()
|
|
robustly e 0 _ = error $ "failed to commit changes to sqlite database: " ++ show e
|
|
robustly _ n a = do
|
|
r <- a
|
|
case r of
|
|
Right _ -> return ()
|
|
Left e -> do
|
|
threadDelay 100000 -- 1/10th second
|
|
robustly (Just e) (n-1) a
|
|
|
|
commitDb' :: DbHandle -> SqlPersistM () -> IO (Either SomeException ())
|
|
commitDb' (DbHandle MultiWriter _ jobs) a = do
|
|
res <- newEmptyMVar
|
|
putMVar jobs $ RobustChangeJob $ \runner ->
|
|
liftIO $ putMVar res =<< tryNonAsync (runner a)
|
|
takeMVar res
|
|
commitDb' (DbHandle SingleWriter _ jobs) a = do
|
|
res <- newEmptyMVar
|
|
putMVar jobs $ ChangeJob $
|
|
liftIO . putMVar res =<< tryNonAsync a
|
|
takeMVar res
|
|
`catchNonAsync` (const $ error "sqlite commit crashed")
|
|
|
|
data Job
|
|
= QueryJob (SqlPersistM ())
|
|
| ChangeJob (SqlPersistM ())
|
|
| RobustChangeJob ((SqlPersistM () -> IO ()) -> IO ())
|
|
| CloseJob
|
|
|
|
workerThread :: T.Text -> TableName -> MVar Job -> IO ()
|
|
workerThread db tablename jobs = go
|
|
where
|
|
go = do
|
|
v <- tryNonAsync (runSqliteRobustly tablename db loop)
|
|
case v of
|
|
Left e -> hPutStrLn stderr $
|
|
"sqlite worker thread crashed: " ++ show e
|
|
Right True -> go
|
|
Right False -> return ()
|
|
|
|
getjob :: IO (Either BlockedIndefinitelyOnMVar Job)
|
|
getjob = try $ takeMVar jobs
|
|
|
|
loop = do
|
|
job <- liftIO getjob
|
|
case job of
|
|
-- Exception is thrown when the MVar is garbage
|
|
-- collected, which means the whole DbHandle
|
|
-- is not used any longer. Shutdown cleanly.
|
|
Left BlockedIndefinitelyOnMVar -> return False
|
|
Right CloseJob -> return False
|
|
Right (QueryJob a) -> a >> loop
|
|
Right (ChangeJob a) -> do
|
|
a
|
|
-- Exit this sqlite transaction so the
|
|
-- database gets updated on disk.
|
|
return True
|
|
-- Change is run in a separate database connection
|
|
-- since sqlite only supports a single writer at a
|
|
-- time, and it may crash the database connection
|
|
-- that the write is made to.
|
|
Right (RobustChangeJob a) -> do
|
|
liftIO (a (runSqliteRobustly tablename db))
|
|
loop
|
|
|
|
-- Like runSqlite, but more robust.
|
|
--
|
|
-- New database connections can sometimes take a while to become usable.
|
|
-- This may be due to WAL mode recovering after a crash, or perhaps a bug
|
|
-- like described in blob 500f777a6ab6c45ca5f9790e0a63575f8e3cb88f.
|
|
-- So, loop until a select succeeds; once one succeeds the connection will
|
|
-- stay usable.
|
|
--
|
|
-- And sqlite sometimes throws ErrorIO when there's not really an IO problem,
|
|
-- but perhaps just a short read(). That's caught and retried several times.
|
|
runSqliteRobustly :: TableName -> T.Text -> (SqlPersistM a) -> IO a
|
|
runSqliteRobustly tablename db a = do
|
|
conn <- opensettle maxretries
|
|
go conn maxretries
|
|
where
|
|
maxretries = 100 :: Int
|
|
|
|
rethrow msg e = throwIO $ userError $ show e ++ "(" ++ msg ++ ")"
|
|
|
|
go conn retries = do
|
|
r <- try $ runResourceT $ runNoLoggingT $
|
|
withSqlConnRobustly (wrapConnection conn) $
|
|
runSqlConn a
|
|
case r of
|
|
Right v -> return v
|
|
Left ex@(Sqlite.SqliteException { Sqlite.seError = e })
|
|
| e == Sqlite.ErrorIO ->
|
|
let retries' = retries - 1
|
|
in if retries' < 1
|
|
then rethrow "after successful open" ex
|
|
else go conn retries'
|
|
| otherwise -> rethrow "after successful open" ex
|
|
|
|
opensettle retries = do
|
|
conn <- Sqlite.open db
|
|
settle conn retries
|
|
|
|
settle conn retries = do
|
|
r <- try $ do
|
|
stmt <- Sqlite.prepare conn nullselect
|
|
void $ Sqlite.step stmt
|
|
void $ Sqlite.finalize stmt
|
|
case r of
|
|
Right _ -> return conn
|
|
Left ex@(Sqlite.SqliteException { Sqlite.seError = e })
|
|
| e == Sqlite.ErrorBusy -> do
|
|
-- Wait and retry any number of times; it
|
|
-- will stop being busy eventually.
|
|
briefdelay
|
|
settle conn retries
|
|
| e == Sqlite.ErrorIO -> do
|
|
-- Could be a real IO error,
|
|
-- so don't retry indefinitely.
|
|
Sqlite.close conn
|
|
briefdelay
|
|
let retries' = retries - 1
|
|
if retries' < 1
|
|
then rethrow "while opening database connection" ex
|
|
else opensettle retries'
|
|
| otherwise -> rethrow "while opening database connection" ex
|
|
|
|
-- This should succeed for any table.
|
|
nullselect = T.pack $ "SELECT null from " ++ tablename ++ " limit 1"
|
|
|
|
briefdelay = threadDelay 1000 -- 1/1000th second
|
|
|
|
-- Like withSqlConn, but more robust.
|
|
withSqlConnRobustly
|
|
:: (MonadUnliftIO m
|
|
, MonadLogger m
|
|
, IsPersistBackend backend
|
|
, BaseBackend backend ~ SqlBackend
|
|
, BackendCompatible SqlBackend backend
|
|
)
|
|
=> (LogFunc -> IO backend)
|
|
-> (backend -> m a)
|
|
-> m a
|
|
withSqlConnRobustly open f = do
|
|
logFunc <- askLogFunc
|
|
withRunInIO $ \run -> bracket
|
|
(open logFunc)
|
|
closeRobustly
|
|
(run . f)
|
|
|
|
-- Sqlite can throw ErrorBusy while closing a database; this catches
|
|
-- the exception and retries.
|
|
closeRobustly
|
|
:: (IsPersistBackend backend
|
|
, BaseBackend backend ~ SqlBackend
|
|
, BackendCompatible SqlBackend backend
|
|
)
|
|
=> backend
|
|
-> IO ()
|
|
closeRobustly conn = go maxretries briefdelay
|
|
where
|
|
briefdelay = 1000 -- 1/1000th second
|
|
|
|
-- Try up to 14 times; with the delay doubling each time,
|
|
-- the maximum delay before giving up is 16 seconds.
|
|
maxretries = 14 :: Int
|
|
|
|
go retries delay = do
|
|
r <- try $ close' conn
|
|
case r of
|
|
Right () -> return ()
|
|
Left ex@(Sqlite.SqliteException { Sqlite.seError = e })
|
|
| e == Sqlite.ErrorBusy -> do
|
|
threadDelay delay
|
|
let delay' = delay * 2
|
|
let retries' = retries - 1
|
|
if retries' < 1
|
|
then rethrow "while closing database connection" ex
|
|
else go retries' delay'
|
|
| otherwise -> rethrow "while closing database connection" ex
|
|
|
|
rethrow msg e = throwIO $ userError $ show e ++ "(" ++ msg ++ ")"
|