
If a DbHandle is in use by another thread, it could be queueing changes while shutdown is running. So, wait for the worker to finish before flushing the queue, so that any last-minute writes are included. Before this fix, they would be silently dropped. Of course, if the other thread continues to try to use a DbHandle once it's closed, it will block forever as the worker is no longer reading from the jobs MVar. So, that would crash with "thread blocked indefinitely in an MVar operation".
217 lines
6.3 KiB
Haskell
217 lines
6.3 KiB
Haskell
{- Persistent sqlite database handles.
|
|
-
|
|
- Copyright 2015 Joey Hess <id@joeyh.name>
|
|
-
|
|
- Licensed under the GNU GPL version 3 or higher.
|
|
-}
|
|
|
|
{-# LANGUAGE BangPatterns #-}
|
|
|
|
module Database.Handle (
|
|
DbHandle,
|
|
initDb,
|
|
openDb,
|
|
queryDb,
|
|
closeDb,
|
|
Size,
|
|
queueDb,
|
|
flushQueueDb,
|
|
commitDb,
|
|
) where
|
|
|
|
import Utility.Exception
|
|
import Utility.Monad
|
|
|
|
import Database.Persist.Sqlite
|
|
import qualified Database.Sqlite as Sqlite
|
|
import Control.Monad
|
|
import Control.Monad.IO.Class (liftIO)
|
|
import Control.Concurrent
|
|
import Control.Concurrent.Async
|
|
import Control.Exception (throwIO)
|
|
import qualified Data.Text as T
|
|
import Control.Monad.Trans.Resource (runResourceT)
|
|
import Control.Monad.Logger (runNoLoggingT)
|
|
import Data.List
|
|
import Data.Time.Clock
|
|
import System.IO
|
|
|
|
{- A DbHandle is a reference to a worker thread that communicates with
|
|
- the database. It has a MVar which Jobs are submitted to. -}
|
|
data DbHandle = DbHandle (Async ()) (MVar Job) (MVar DbQueue)
|
|
|
|
{- Ensures that the database is initialized. Pass the migration action for
|
|
- the database.
|
|
-
|
|
- The database is put into WAL mode, to prevent readers from blocking
|
|
- writers, and prevent a writer from blocking readers.
|
|
-}
|
|
initDb :: FilePath -> SqlPersistM () -> IO ()
|
|
initDb f migration = do
|
|
let db = T.pack f
|
|
enableWAL db
|
|
runSqlite db migration
|
|
|
|
enableWAL :: T.Text -> IO ()
|
|
enableWAL db = do
|
|
conn <- Sqlite.open db
|
|
stmt <- Sqlite.prepare conn (T.pack "PRAGMA journal_mode=WAL;")
|
|
void $ Sqlite.step stmt
|
|
void $ Sqlite.finalize stmt
|
|
Sqlite.close conn
|
|
|
|
{- Opens the database, but does not perform any migrations. Only use
|
|
- if the database is known to exist and have the right tables. -}
|
|
openDb :: FilePath -> TableName -> IO DbHandle
|
|
openDb db tablename = do
|
|
jobs <- newEmptyMVar
|
|
worker <- async (workerThread (T.pack db) tablename jobs)
|
|
q <- newMVar =<< emptyDbQueue
|
|
return $ DbHandle worker jobs q
|
|
|
|
data Job
|
|
= QueryJob (SqlPersistM ())
|
|
| ChangeJob ((SqlPersistM () -> IO ()) -> IO ())
|
|
| CloseJob
|
|
|
|
type TableName = String
|
|
|
|
workerThread :: T.Text -> TableName -> MVar Job -> IO ()
|
|
workerThread db tablename jobs = catchNonAsync (run loop) showerr
|
|
where
|
|
showerr e = liftIO $ hPutStrLn stderr $
|
|
"sqlite worker thread crashed: " ++ show e
|
|
|
|
loop = do
|
|
job <- liftIO $ takeMVar jobs
|
|
case job of
|
|
QueryJob a -> a >> loop
|
|
-- change is run in a separate database connection
|
|
-- since sqlite only supports a single writer at a
|
|
-- time, and it may crash the database connection
|
|
ChangeJob a -> liftIO (a run) >> loop
|
|
CloseJob -> return ()
|
|
|
|
-- like runSqlite, but calls settle on the raw sql Connection.
|
|
run a = do
|
|
conn <- Sqlite.open db
|
|
settle conn
|
|
runResourceT $ runNoLoggingT $
|
|
withSqlConn (wrapConnection conn) $
|
|
runSqlConn a
|
|
|
|
-- Work around a bug in sqlite: New database connections can
|
|
-- sometimes take a while to become usable; select statements will
|
|
-- fail with ErrorBusy for some time. So, loop until a select
|
|
-- succeeds; once one succeeds the connection will stay usable.
|
|
-- <http://thread.gmane.org/gmane.comp.db.sqlite.general/93116>
|
|
settle conn = do
|
|
r <- tryNonAsync $ do
|
|
stmt <- Sqlite.prepare conn nullselect
|
|
void $ Sqlite.step stmt
|
|
void $ Sqlite.finalize stmt
|
|
case r of
|
|
Right _ -> return ()
|
|
Left e -> do
|
|
if "ErrorBusy" `isInfixOf` show e
|
|
then do
|
|
threadDelay 1000 -- 1/1000th second
|
|
settle conn
|
|
else throwIO e
|
|
|
|
-- This should succeed for any table.
|
|
nullselect = T.pack $ "SELECT null from " ++ tablename ++ " limit 1"
|
|
|
|
{- Makes a query using the DbHandle. This should not be used to make
|
|
- changes to the database!
|
|
-
|
|
- Note that the action is not run by the calling thread, but by a
|
|
- worker thread. Exceptions are propigated to the calling thread.
|
|
-
|
|
- Only one action can be run at a time against a given DbHandle.
|
|
- If called concurrently in the same process, this will block until
|
|
- it is able to run.
|
|
-}
|
|
queryDb :: DbHandle -> SqlPersistM a -> IO a
|
|
queryDb (DbHandle _ jobs _) a = do
|
|
res <- newEmptyMVar
|
|
putMVar jobs $ QueryJob $
|
|
liftIO . putMVar res =<< tryNonAsync a
|
|
(either throwIO return =<< takeMVar res)
|
|
`catchNonAsync` (const $ error "sqlite query crashed")
|
|
|
|
closeDb :: DbHandle -> IO ()
|
|
closeDb h@(DbHandle worker jobs _) = do
|
|
putMVar jobs CloseJob
|
|
wait worker
|
|
flushQueueDb h
|
|
|
|
type Size = Int
|
|
|
|
type LastCommitTime = UTCTime
|
|
|
|
{- A queue of actions to perform, with a count of the number of actions
|
|
- queued, and a last commit time. -}
|
|
data DbQueue = DbQueue Size LastCommitTime (SqlPersistM ())
|
|
|
|
emptyDbQueue :: IO DbQueue
|
|
emptyDbQueue = do
|
|
now <- getCurrentTime
|
|
return $ DbQueue 0 now (return ())
|
|
|
|
{- Queues a change to be made to the database. It will be buffered
|
|
- to be committed later, unless the commitchecker action returns true.
|
|
-
|
|
- (Be sure to call closeDb or flushQueueDb to ensure the change
|
|
- gets committed.)
|
|
-
|
|
- Transactions built up by queueDb are sent to sqlite all at once.
|
|
- If sqlite fails due to another change being made concurrently by another
|
|
- process, the transaction is put back in the queue. This solves
|
|
- the sqlite multiple writer problem.
|
|
-}
|
|
queueDb
|
|
:: DbHandle
|
|
-> (Size -> LastCommitTime -> IO Bool)
|
|
-> SqlPersistM ()
|
|
-> IO ()
|
|
queueDb h@(DbHandle _ _ qvar) commitchecker a = do
|
|
DbQueue sz lastcommittime qa <- takeMVar qvar
|
|
let !sz' = sz + 1
|
|
let qa' = qa >> a
|
|
let enqueue = putMVar qvar
|
|
ifM (commitchecker sz' lastcommittime)
|
|
( do
|
|
r <- commitDb h qa'
|
|
case r of
|
|
Left _ -> enqueue $ DbQueue sz' lastcommittime qa'
|
|
Right _ -> do
|
|
now <- getCurrentTime
|
|
enqueue $ DbQueue 0 now (return ())
|
|
, enqueue $ DbQueue sz' lastcommittime qa'
|
|
)
|
|
|
|
{- If flushing the queue fails, this could be because there is another
|
|
- writer to the database. Retry repeatedly for up to 10 seconds. -}
|
|
flushQueueDb :: DbHandle -> IO ()
|
|
flushQueueDb h@(DbHandle _ _ qvar) = do
|
|
DbQueue sz _ qa <- takeMVar qvar
|
|
when (sz > 0) $
|
|
robustly Nothing 100 (commitDb h qa)
|
|
where
|
|
robustly :: Maybe SomeException -> Int -> IO (Either SomeException ()) -> IO ()
|
|
robustly e 0 _ = error $ "failed to commit changes to sqlite database: " ++ show e
|
|
robustly _ n a = do
|
|
r <- a
|
|
case r of
|
|
Right _ -> return ()
|
|
Left e -> do
|
|
threadDelay 100000 -- 1/10th second
|
|
robustly (Just e) (n-1) a
|
|
|
|
commitDb :: DbHandle -> SqlPersistM () -> IO (Either SomeException ())
|
|
commitDb (DbHandle _ jobs _) a = do
|
|
res <- newEmptyMVar
|
|
putMVar jobs $ ChangeJob $ \runner ->
|
|
liftIO $ putMVar res =<< tryNonAsync (runner a)
|
|
takeMVar res
|