commit new transaction after 60 seconds

Database.Handle can now be given a CommitPolicy, making it easy to specify
transaction granularity.

Benchmarking the old git-annex incremental fsck that flips sticky bits
to the new that uses sqlite, running in a repo with 37000 annexed files,
both from cold cache:

old: 6m6.906s
new: 6m26.913s

This commit was sponsored by TasLUG.
This commit is contained in:
Joey Hess 2015-02-16 16:48:19 -04:00
parent d2766df914
commit 7d36e7d18d
3 changed files with 36 additions and 12 deletions

View file

@ -9,6 +9,8 @@ module Database.Handle (
DbHandle,
openDb,
runDb,
CommitPolicy(..),
runDb',
commitDb,
closeDb,
) where
@ -17,15 +19,17 @@ import Utility.Exception
import Database.Persist.Sqlite (runSqlite)
import Database.Esqueleto hiding (Key)
import Control.Monad
import Control.Monad.IO.Class (liftIO)
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception (throwIO)
import qualified Data.Text as T
import Data.Time.Clock
{- A DbHandle is a reference to a worker thread that communicates with
- the database. It has a MVar which Jobs are submitted to. -}
data DbHandle = DbHandle (Async ()) (MVar Job)
data DbHandle = DbHandle (Async ()) (MVar Job) (MVar UTCTime)
data Job = RunJob (SqlPersistM ()) | CommitJob | CloseJob
@ -33,7 +37,8 @@ openDb :: FilePath -> IO DbHandle
openDb db = do
jobs <- newEmptyMVar
worker <- async (workerThread (T.pack db) jobs)
return $ DbHandle worker jobs
t <- newMVar =<< getCurrentTime
return $ DbHandle worker jobs t
workerThread :: T.Text -> MVar Job -> IO ()
workerThread db jobs = go
@ -50,26 +55,41 @@ workerThread db jobs = go
CommitJob -> return CommitJob
CloseJob -> return CloseJob
{- Runs an action using the DbHandle.
-
- Note that the action is not run by the calling thread, but by a
- worker thread. Exceptions are propigated to the calling thread.
-
- Note that only one action can be run at a time against a given DbHandle.
- Only one action can be run at a time against a given DbHandle.
- If called concurrently, this will block until it is able to run.
-}
runDb :: DbHandle -> SqlPersistM a -> IO a
runDb (DbHandle _ jobs) a = do
runDb h = runDb' h CommitManually
data CommitPolicy = CommitManually | CommitAfterSeconds Int
runDb' :: DbHandle -> CommitPolicy -> SqlPersistM a -> IO a
runDb' h@(DbHandle _ jobs t) pol a = do
res <- newEmptyMVar
putMVar jobs $ RunJob $ liftIO . putMVar res =<< tryNonAsync a
either throwIO return =<< takeMVar res
r <- either throwIO return =<< takeMVar res
case pol of
CommitManually -> return ()
CommitAfterSeconds n -> do
now <- getCurrentTime
prev <- takeMVar t
putMVar t now
when (diffUTCTime now prev > fromIntegral n) $
commitDb h
return r
{- Commits any transaction that was created by the previous calls to runDb,
- and starts a new transaction. -}
commitDb :: DbHandle -> IO ()
commitDb (DbHandle _ jobs) = putMVar jobs CommitJob
commitDb (DbHandle _ jobs _) = putMVar jobs CommitJob
closeDb :: DbHandle -> IO ()
closeDb (DbHandle worker jobs) = do
closeDb (DbHandle worker jobs _) = do
putMVar jobs CloseJob
wait worker