split out Database.Queue from Database.Handle

Fsck can use the queue for efficiency since it is write-heavy, and only
reads a value before writing it. But, the queue is not suited to the Keys
database.
This commit is contained in:
Joey Hess 2015-12-23 14:59:58 -04:00
parent b3690c4499
commit 6d38f54db4
Failed to extract signature
5 changed files with 177 additions and 120 deletions

View file

@ -21,7 +21,7 @@ module Database.Fsck (
) where
import Database.Types
import qualified Database.Handle as H
import qualified Database.Queue as H
import Locations
import Utility.PosixFiles
import Utility.Exception
@ -37,7 +37,7 @@ import Database.Persist.TH
import Database.Esqueleto hiding (Key)
import Data.Time.Clock
data FsckHandle = FsckHandle H.DbHandle UUID
data FsckHandle = FsckHandle H.DbQueue UUID
{- Each key stored in the database has already been fscked as part
- of the latest incremental fsck pass. -}
@ -77,7 +77,7 @@ openDb u = do
void $ tryIO $ removeDirectoryRecursive dbdir
rename tmpdbdir dbdir
lockFileCached =<< fromRepo (gitAnnexFsckDbLock u)
h <- liftIO $ H.openDb db "fscked"
h <- liftIO $ H.openDbQueue db "fscked"
-- work around https://github.com/yesodweb/persistent/issues/474
liftIO setConsoleEncoding
@ -86,7 +86,7 @@ openDb u = do
closeDb :: FsckHandle -> Annex ()
closeDb (FsckHandle h u) = do
liftIO $ H.closeDb h
liftIO $ H.closeDbQueue h
unlockFile =<< fromRepo (gitAnnexFsckDbLock u)
addDb :: FsckHandle -> Key -> IO ()
@ -102,8 +102,9 @@ addDb (FsckHandle h _) k = H.queueDb h checkcommit $
now <- getCurrentTime
return $ diffUTCTime lastcommittime now > 300
{- Doesn't know about keys that were just added with addDb. -}
inDb :: FsckHandle -> Key -> IO Bool
inDb (FsckHandle h _) = H.queryDb h . inDb' . toSKey
inDb (FsckHandle h _) = H.queryDbQueue h . inDb' . toSKey
inDb' :: SKey -> SqlPersistM Bool
inDb' sk = do

View file

@ -11,16 +11,14 @@ module Database.Handle (
DbHandle,
initDb,
openDb,
TableName,
queryDb,
closeDb,
Size,
queueDb,
flushQueueDb,
commitDb,
commitDb',
) where
import Utility.Exception
import Utility.Monad
import Database.Persist.Sqlite
import qualified Database.Sqlite as Sqlite
@ -33,18 +31,17 @@ import qualified Data.Text as T
import Control.Monad.Trans.Resource (runResourceT)
import Control.Monad.Logger (runNoLoggingT)
import Data.List
import Data.Time.Clock
import System.IO
{- A DbHandle is a reference to a worker thread that communicates with
- the database. It has a MVar which Jobs are submitted to. -}
data DbHandle = DbHandle (Async ()) (MVar Job) (MVar DbQueue)
data DbHandle = DbHandle (Async ()) (MVar Job)
{- Ensures that the database is initialized. Pass the migration action for
- the database.
-
- The database is put into WAL mode, to prevent readers from blocking
- writers, and prevent a writer from blocking readers.
- The database is initialized using WAL mode, to prevent readers
- from blocking writers, and prevent a writer from blocking readers.
-}
initDb :: FilePath -> SqlPersistM () -> IO ()
initDb f migration = do
@ -60,22 +57,71 @@ enableWAL db = do
void $ Sqlite.finalize stmt
Sqlite.close conn
{- Name of a table that should exist once the database is initialized. -}
type TableName = String
{- Opens the database, but does not perform any migrations. Only use
- if the database is known to exist and have the right tables. -}
openDb :: FilePath -> TableName -> IO DbHandle
openDb db tablename = do
jobs <- newEmptyMVar
worker <- async (workerThread (T.pack db) tablename jobs)
q <- newMVar =<< emptyDbQueue
return $ DbHandle worker jobs q
return $ DbHandle worker jobs
closeDb :: DbHandle -> IO ()
closeDb (DbHandle worker jobs) = do
putMVar jobs CloseJob
wait worker
{- Makes a query using the DbHandle. This should not be used to make
- changes to the database!
-
- Note that the action is not run by the calling thread, but by a
- worker thread. Exceptions are propigated to the calling thread.
-
- Only one action can be run at a time against a given DbHandle.
- If called concurrently in the same process, this will block until
- it is able to run.
-}
queryDb :: DbHandle -> SqlPersistM a -> IO a
queryDb (DbHandle _ jobs) a = do
res <- newEmptyMVar
putMVar jobs $ QueryJob $
liftIO . putMVar res =<< tryNonAsync a
(either throwIO return =<< takeMVar res)
`catchNonAsync` (const $ error "sqlite query crashed")
{- Writes a change to the database.
-
- If a database is opened multiple times and there's a concurrent writer,
- the write could fail. Retries repeatedly for up to 10 seconds,
- which should avoid all but the most exceptional problems.
-}
commitDb :: DbHandle -> SqlPersistM () -> IO ()
commitDb h wa = robustly Nothing 100 (commitDb' h wa)
where
robustly :: Maybe SomeException -> Int -> IO (Either SomeException ()) -> IO ()
robustly e 0 _ = error $ "failed to commit changes to sqlite database: " ++ show e
robustly _ n a = do
r <- a
case r of
Right _ -> return ()
Left e -> do
threadDelay 100000 -- 1/10th second
robustly (Just e) (n-1) a
commitDb' :: DbHandle -> SqlPersistM () -> IO (Either SomeException ())
commitDb' (DbHandle _ jobs) a = do
res <- newEmptyMVar
putMVar jobs $ ChangeJob $ \runner ->
liftIO $ putMVar res =<< tryNonAsync (runner a)
takeMVar res
data Job
= QueryJob (SqlPersistM ())
| ChangeJob ((SqlPersistM () -> IO ()) -> IO ())
| CloseJob
type TableName = String
workerThread :: T.Text -> TableName -> MVar Job -> IO ()
workerThread db tablename jobs = catchNonAsync (run loop) showerr
where
@ -121,97 +167,3 @@ workerThread db tablename jobs = catchNonAsync (run loop) showerr
-- This should succeed for any table.
nullselect = T.pack $ "SELECT null from " ++ tablename ++ " limit 1"
{- Makes a query using the DbHandle. This should not be used to make
- changes to the database!
-
- Note that the action is not run by the calling thread, but by a
- worker thread. Exceptions are propigated to the calling thread.
-
- Only one action can be run at a time against a given DbHandle.
- If called concurrently in the same process, this will block until
- it is able to run.
-}
queryDb :: DbHandle -> SqlPersistM a -> IO a
queryDb (DbHandle _ jobs _) a = do
res <- newEmptyMVar
putMVar jobs $ QueryJob $
liftIO . putMVar res =<< tryNonAsync a
(either throwIO return =<< takeMVar res)
`catchNonAsync` (const $ error "sqlite query crashed")
closeDb :: DbHandle -> IO ()
closeDb h@(DbHandle worker jobs _) = do
putMVar jobs CloseJob
wait worker
flushQueueDb h
type Size = Int
type LastCommitTime = UTCTime
{- A queue of actions to perform, with a count of the number of actions
- queued, and a last commit time. -}
data DbQueue = DbQueue Size LastCommitTime (SqlPersistM ())
emptyDbQueue :: IO DbQueue
emptyDbQueue = do
now <- getCurrentTime
return $ DbQueue 0 now (return ())
{- Queues a change to be made to the database. It will be buffered
- to be committed later, unless the commitchecker action returns true.
-
- (Be sure to call closeDb or flushQueueDb to ensure the change
- gets committed.)
-
- Transactions built up by queueDb are sent to sqlite all at once.
- If sqlite fails due to another change being made concurrently by another
- process, the transaction is put back in the queue. This solves
- the sqlite multiple writer problem.
-}
queueDb
:: DbHandle
-> (Size -> LastCommitTime -> IO Bool)
-> SqlPersistM ()
-> IO ()
queueDb h@(DbHandle _ _ qvar) commitchecker a = do
DbQueue sz lastcommittime qa <- takeMVar qvar
let !sz' = sz + 1
let qa' = qa >> a
let enqueue = putMVar qvar
ifM (commitchecker sz' lastcommittime)
( do
r <- commitDb h qa'
case r of
Left _ -> enqueue $ DbQueue sz' lastcommittime qa'
Right _ -> do
now <- getCurrentTime
enqueue $ DbQueue 0 now (return ())
, enqueue $ DbQueue sz' lastcommittime qa'
)
{- If flushing the queue fails, this could be because there is another
- writer to the database. Retry repeatedly for up to 10 seconds. -}
flushQueueDb :: DbHandle -> IO ()
flushQueueDb h@(DbHandle _ _ qvar) = do
DbQueue sz _ qa <- takeMVar qvar
when (sz > 0) $
robustly Nothing 100 (commitDb h qa)
where
robustly :: Maybe SomeException -> Int -> IO (Either SomeException ()) -> IO ()
robustly e 0 _ = error $ "failed to commit changes to sqlite database: " ++ show e
robustly _ n a = do
r <- a
case r of
Right _ -> return ()
Left e -> do
threadDelay 100000 -- 1/10th second
robustly (Just e) (n-1) a
commitDb :: DbHandle -> SqlPersistM () -> IO (Either SomeException ())
commitDb (DbHandle _ jobs _) a = do
res <- newEmptyMVar
putMVar jobs $ ChangeJob $ \runner ->
liftIO $ putMVar res =<< tryNonAsync (runner a)
takeMVar res

View file

@ -28,7 +28,7 @@ module Database.Keys (
import Database.Types
import Database.Keys.Types
import qualified Database.Handle as H
import qualified Database.Queue as H
import Locations
import Common hiding (delete)
import Annex
@ -72,7 +72,7 @@ openDb = withExclusiveLock gitAnnexKeysDbLock $ do
runMigrationSilent migrateKeysDb
setAnnexDirPerm dbdir
setAnnexFilePerm db
h <- liftIO $ H.openDb db "content"
h <- liftIO $ H.openDbQueue db "content"
-- work around https://github.com/yesodweb/persistent/issues/474
liftIO setConsoleEncoding
@ -80,9 +80,9 @@ openDb = withExclusiveLock gitAnnexKeysDbLock $ do
return $ DbHandle h
closeDb :: DbHandle -> IO ()
closeDb (DbHandle h) = H.closeDb h
closeDb (DbHandle h) = H.closeDbQueue h
withDbHandle :: (H.DbHandle -> IO a) -> Annex a
withDbHandle :: (H.DbQueue -> IO a) -> Annex a
withDbHandle a = bracket openDb (liftIO . closeDb) (\(DbHandle h) -> liftIO (a h))
addAssociatedFile :: Key -> FilePath -> Annex ()
@ -98,7 +98,7 @@ addAssociatedFile k f = withDbHandle $ \h -> H.queueDb h (\_ _ -> pure True) $ d
{- Note that the files returned were once associated with the key, but
- some of them may not be any longer. -}
getAssociatedFiles :: Key -> Annex [FilePath]
getAssociatedFiles k = withDbHandle $ \h -> H.queryDb h $
getAssociatedFiles k = withDbHandle $ \h -> H.queryDbQueue h $
getAssociatedFiles' $ toSKey k
getAssociatedFiles' :: SKey -> SqlPersistM [FilePath]
@ -111,7 +111,7 @@ getAssociatedFiles' sk = do
{- Gets any keys that are on record as having a particular associated file.
- (Should be one or none but the database doesn't enforce that.) -}
getAssociatedKey :: FilePath -> Annex [Key]
getAssociatedKey f = withDbHandle $ \h -> H.queryDb h $
getAssociatedKey f = withDbHandle $ \h -> H.queryDbQueue h $
getAssociatedKey' f
getAssociatedKey' :: FilePath -> SqlPersistM [Key]
@ -140,7 +140,7 @@ addInodeCaches k is = withDbHandle $ \h -> H.queueDb h (\_ _ -> pure True) $
{- A key may have multiple InodeCaches; one for the annex object, and one
- for each pointer file that is a copy of it. -}
getInodeCaches :: Key -> Annex [InodeCache]
getInodeCaches k = withDbHandle $ \h -> H.queryDb h $ do
getInodeCaches k = withDbHandle $ \h -> H.queryDbQueue h $ do
l <- select $ from $ \r -> do
where_ (r ^. ContentKey ==. val sk)
return (r ^. ContentCache)

View file

@ -9,6 +9,6 @@ module Database.Keys.Types (
DbHandle(..)
) where
import qualified Database.Handle as H
import qualified Database.Queue as H
newtype DbHandle = DbHandle H.DbHandle
newtype DbHandle = DbHandle H.DbQueue

104
Database/Queue.hs Normal file
View file

@ -0,0 +1,104 @@
{- Persistent sqlite database queues
-
- Copyright 2015 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU GPL version 3 or higher.
-}
{-# LANGUAGE BangPatterns #-}
module Database.Queue (
DbQueue,
initDb,
openDbQueue,
queryDbQueue,
closeDbQueue,
QueueSize,
queueDb,
) where
import Utility.Monad
import Database.Handle
import Database.Persist.Sqlite
import Control.Monad
import Control.Concurrent
import Data.Time.Clock
{- A DbQueue wraps a DbHandle, adding a queue of writes to perform.
-
- This is efficient when there are frequent writes, but
- reads will not immediately have access to queued writes. -}
data DbQueue = DQ DbHandle (MVar Queue)
{- Opens the database queue, but does not perform any migrations. Only use
- if the database is known to exist and have the right tables; ie after
- running initDb. -}
openDbQueue :: FilePath -> TableName -> IO DbQueue
openDbQueue db tablename = DQ
<$> openDb db tablename
<*> (newMVar =<< emptyQueue)
{- Must be called to ensure queued changes get written to the database. -}
closeDbQueue :: DbQueue -> IO ()
closeDbQueue h@(DQ hdl _) = do
flushDbQueue h
closeDb hdl
{- Makes a queury using the DbQueue. This should not be used to make
- changes to the database!
-
- Queries will not return changes that have been recently queued,
- so use with care.
-}
queryDbQueue :: DbQueue -> SqlPersistM a -> IO a
queryDbQueue (DQ hdl _) = queryDb hdl
{- A queue of actions to perform, with a count of the number of actions
- queued, and a last commit time. -}
data Queue = Queue QueueSize LastCommitTime (SqlPersistM ())
type QueueSize = Int
type LastCommitTime = UTCTime
emptyQueue :: IO Queue
emptyQueue = do
now <- getCurrentTime
return $ Queue 0 now (return ())
flushDbQueue :: DbQueue -> IO ()
flushDbQueue (DQ hdl qvar) = do
Queue sz _ qa <- takeMVar qvar
when (sz > 0) $
commitDb hdl qa
{- Queues a change to be made to the database. It will be queued
- to be committed later, unless the commitchecker action returns true,
- in which case any previously queued changes are also committed.
-
- Transactions built up by queueDb are sent to sqlite all at once.
- If sqlite fails due to another change being made concurrently by another
- process, the transaction is put back in the queue. This avoids
- the sqlite multiple writer problem.
-}
queueDb
:: DbQueue
-> (QueueSize -> LastCommitTime -> IO Bool)
-> SqlPersistM ()
-> IO ()
queueDb (DQ hdl qvar) commitchecker a = do
Queue sz lastcommittime qa <- takeMVar qvar
let !sz' = sz + 1
let qa' = qa >> a
let enqueue = putMVar qvar
ifM (commitchecker sz' lastcommittime)
( do
r <- commitDb' hdl qa'
case r of
Left _ -> enqueue $ Queue sz' lastcommittime qa'
Right _ -> do
now <- getCurrentTime
enqueue $ Queue 0 now (return ())
, enqueue $ Queue sz' lastcommittime qa'
)