allow for concurrent incremental fsck processes again (sorta)
Sqlite doesn't support multiple concurrent writers at all. One of them will fail to write. It's not even possible to have two processes building up separate transactions at the same time. Before using sqlite, incremental fsck could work perfectly well with multiple fsck processes running concurrently. I'd like to keep that working. My partial solution, so far, is to make git-annex buffer writes, and every so often send them all to sqlite at once, in a transaction. So most of the time, nothing is writing to the database. (And if it gets unlucky and a write fails due to a collision with another writer, it can just wait and retry the write later.) This lets multiple processes write to the database successfully. But, for the purposes of concurrent, incremental fsck, it's not ideal. Each process doesn't immediately learn of files that another process has checked. So they'll tend to do redundant work. Only way I can see to improve this is to use some other mechanism for short-term IPC between the fsck processes. Not yet done. ---- Also, make addDb check if an item is in the database already, and not try to re-add it. That fixes an intermittent crash with "SQLite3 returned ErrorConstraint while attempting to perform step." I am not 100% sure why; it only started happening when I moved write buffering into the queue. It seemed to generally happen on the same file each time, so could just be due to multiple files having the same key. However, I doubt my sound repo has many duplicate keys, and I suspect something else is going on. ---- Updated benchmark, with the 1000 item queue: 6m33.808s
This commit is contained in:
parent
bd6e41f8e6
commit
a3370ac459
2 changed files with 84 additions and 39 deletions
|
@ -77,18 +77,18 @@ closeDb h = do
|
||||||
unlockFile =<< fromRepo gitAnnexFsckDbLock
|
unlockFile =<< fromRepo gitAnnexFsckDbLock
|
||||||
|
|
||||||
addDb :: H.DbHandle -> Key -> IO ()
|
addDb :: H.DbHandle -> Key -> IO ()
|
||||||
addDb h = void . H.runDb' h commitPolicy . insert . Fscked . toSKey
|
addDb h k = H.queueDb h 1000 $
|
||||||
|
unlessM (inDb' sk) $
|
||||||
|
insert_ $ Fscked sk
|
||||||
|
where
|
||||||
|
sk = toSKey k
|
||||||
|
|
||||||
inDb :: H.DbHandle -> Key -> IO Bool
|
inDb :: H.DbHandle -> Key -> IO Bool
|
||||||
inDb h k = H.runDb h $ do
|
inDb h = H.runDb h . inDb' . toSKey
|
||||||
|
|
||||||
|
inDb' :: SKey -> SqlPersistM Bool
|
||||||
|
inDb' sk = do
|
||||||
r <- select $ from $ \r -> do
|
r <- select $ from $ \r -> do
|
||||||
where_ (r ^. FsckedKey ==. val (toSKey k))
|
where_ (r ^. FsckedKey ==. val sk)
|
||||||
return (r ^. FsckedKey)
|
return (r ^. FsckedKey)
|
||||||
return $ not $ null r
|
return $ not $ null r
|
||||||
|
|
||||||
{- Bundle up addDb transactions and commit after 60 seconds.
|
|
||||||
- This is a balance between resuming where the last incremental
|
|
||||||
- fsck left off, and making too many commits which slows down the fsck
|
|
||||||
- of lots of small or not present files. -}
|
|
||||||
commitPolicy :: H.CommitPolicy
|
|
||||||
commitPolicy = H.CommitAfter (fromIntegral (60 :: Int))
|
|
||||||
|
|
|
@ -5,32 +5,33 @@
|
||||||
- Licensed under the GNU GPL version 3 or higher.
|
- Licensed under the GNU GPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
|
||||||
|
{-# LANGUAGE BangPatterns #-}
|
||||||
|
|
||||||
module Database.Handle (
|
module Database.Handle (
|
||||||
DbHandle,
|
DbHandle,
|
||||||
openDb,
|
openDb,
|
||||||
runDb,
|
runDb,
|
||||||
CommitPolicy(..),
|
|
||||||
runDb',
|
|
||||||
commitDb,
|
commitDb,
|
||||||
closeDb,
|
closeDb,
|
||||||
|
Size,
|
||||||
|
queueDb,
|
||||||
|
flushQueueDb,
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Utility.Exception
|
import Utility.Exception
|
||||||
import Messages
|
import Messages
|
||||||
|
|
||||||
import Database.Persist.Sqlite (runSqlite)
|
import Database.Persist.Sqlite
|
||||||
import Database.Esqueleto hiding (Key)
|
|
||||||
import Control.Monad
|
import Control.Monad
|
||||||
import Control.Monad.IO.Class (liftIO)
|
import Control.Monad.IO.Class (liftIO)
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
import Control.Concurrent.Async
|
import Control.Concurrent.Async
|
||||||
import Control.Exception (throwIO)
|
import Control.Exception (throwIO)
|
||||||
import qualified Data.Text as T
|
import qualified Data.Text as T
|
||||||
import Data.Time.Clock
|
|
||||||
|
|
||||||
{- A DbHandle is a reference to a worker thread that communicates with
|
{- A DbHandle is a reference to a worker thread that communicates with
|
||||||
- the database. It has a MVar which Jobs are submitted to. -}
|
- the database. It has a MVar which Jobs are submitted to. -}
|
||||||
data DbHandle = DbHandle (Async ()) (MVar Job) (MVar UTCTime)
|
data DbHandle = DbHandle (Async ()) (MVar Job) (MVar DbQueue)
|
||||||
|
|
||||||
data Job = RunJob (SqlPersistM ()) | CommitJob | CloseJob
|
data Job = RunJob (SqlPersistM ()) | CommitJob | CloseJob
|
||||||
|
|
||||||
|
@ -38,8 +39,8 @@ openDb :: FilePath -> IO DbHandle
|
||||||
openDb db = do
|
openDb db = do
|
||||||
jobs <- newEmptyMVar
|
jobs <- newEmptyMVar
|
||||||
worker <- async (workerThread (T.pack db) jobs)
|
worker <- async (workerThread (T.pack db) jobs)
|
||||||
t <- newMVar =<< getCurrentTime
|
q <- newMVar emptyDbQueue
|
||||||
return $ DbHandle worker jobs t
|
return $ DbHandle worker jobs q
|
||||||
|
|
||||||
workerThread :: T.Text -> MVar Job -> IO ()
|
workerThread :: T.Text -> MVar Job -> IO ()
|
||||||
workerThread db jobs = catchNonAsync go showerr
|
workerThread db jobs = catchNonAsync go showerr
|
||||||
|
@ -57,34 +58,27 @@ workerThread db jobs = catchNonAsync go showerr
|
||||||
CommitJob -> return CommitJob
|
CommitJob -> return CommitJob
|
||||||
CloseJob -> return CloseJob
|
CloseJob -> return CloseJob
|
||||||
|
|
||||||
|
{- Runs an action using the DbHandle. The action may be a query, or it may
|
||||||
{- Runs an action using the DbHandle.
|
- make a change. Changes are bundled up in a transaction, which does not
|
||||||
|
- complete until commitDb is called.
|
||||||
-
|
-
|
||||||
- Note that the action is not run by the calling thread, but by a
|
- Note that the action is not run by the calling thread, but by a
|
||||||
- worker thread. Exceptions are propigated to the calling thread.
|
- worker thread. Exceptions are propigated to the calling thread.
|
||||||
-
|
-
|
||||||
- Only one action can be run at a time against a given DbHandle.
|
- Only one action can be run at a time against a given DbHandle.
|
||||||
- If called concurrently, this will block until it is able to run.
|
- If called concurrently in the same process, this will block until
|
||||||
|
- it is able to run.
|
||||||
|
-
|
||||||
|
- Note that if multiple processes are trying to change the database
|
||||||
|
- at the same time, sqlite will only let one build a transaction at a
|
||||||
|
- time.
|
||||||
-}
|
-}
|
||||||
runDb :: DbHandle -> SqlPersistM a -> IO a
|
runDb :: DbHandle -> SqlPersistM a -> IO a
|
||||||
runDb h = runDb' h CommitManually
|
runDb (DbHandle _ jobs _) a = do
|
||||||
|
|
||||||
data CommitPolicy = CommitManually | CommitAfter NominalDiffTime
|
|
||||||
|
|
||||||
runDb' :: DbHandle -> CommitPolicy -> SqlPersistM a -> IO a
|
|
||||||
runDb' h@(DbHandle _ jobs t) pol a = do
|
|
||||||
res <- newEmptyMVar
|
res <- newEmptyMVar
|
||||||
putMVar jobs $ RunJob $ liftIO . putMVar res =<< tryNonAsync a
|
putMVar jobs $ RunJob $
|
||||||
r <- either throwIO return =<< takeMVar res
|
liftIO . putMVar res =<< tryNonAsync a
|
||||||
case pol of
|
either throwIO return =<< takeMVar res
|
||||||
CommitManually -> return ()
|
|
||||||
CommitAfter n -> do
|
|
||||||
now <- getCurrentTime
|
|
||||||
prev <- takeMVar t
|
|
||||||
putMVar t now
|
|
||||||
when (diffUTCTime now prev > n) $
|
|
||||||
commitDb h
|
|
||||||
return r
|
|
||||||
|
|
||||||
{- Commits any transaction that was created by the previous calls to runDb,
|
{- Commits any transaction that was created by the previous calls to runDb,
|
||||||
- and starts a new transaction. -}
|
- and starts a new transaction. -}
|
||||||
|
@ -92,6 +86,57 @@ commitDb :: DbHandle -> IO ()
|
||||||
commitDb (DbHandle _ jobs _) = putMVar jobs CommitJob
|
commitDb (DbHandle _ jobs _) = putMVar jobs CommitJob
|
||||||
|
|
||||||
closeDb :: DbHandle -> IO ()
|
closeDb :: DbHandle -> IO ()
|
||||||
closeDb (DbHandle worker jobs _) = do
|
closeDb h@(DbHandle worker jobs _) = do
|
||||||
|
flushQueueDb h
|
||||||
putMVar jobs CloseJob
|
putMVar jobs CloseJob
|
||||||
wait worker
|
wait worker
|
||||||
|
|
||||||
|
type Size = Int
|
||||||
|
|
||||||
|
{- A queue of actions to perform, with a count of the number of actions
|
||||||
|
- queued. -}
|
||||||
|
data DbQueue = DbQueue Size (SqlPersistM ())
|
||||||
|
|
||||||
|
emptyDbQueue :: DbQueue
|
||||||
|
emptyDbQueue = DbQueue 0 (return ())
|
||||||
|
|
||||||
|
{- Queues a change to be committed to the database. It will be buffered
|
||||||
|
- to be committed later, unless the queue gets larger than the specified
|
||||||
|
- size.
|
||||||
|
-
|
||||||
|
- (Be sure to call closeDb or flushQueue to ensure the change gets committed.)
|
||||||
|
-
|
||||||
|
- Transactions built up by queueDb are sent to sqlite all at once.
|
||||||
|
- If sqlite fails due to another change being made concurrently by another
|
||||||
|
- process, the transaction is put back in the queue. This solves
|
||||||
|
- the sqlite multiple writer problem.
|
||||||
|
-}
|
||||||
|
queueDb :: DbHandle -> Size -> SqlPersistM () -> IO ()
|
||||||
|
queueDb h@(DbHandle _ _ qvar) maxsz a = do
|
||||||
|
DbQueue sz qa <- takeMVar qvar
|
||||||
|
let !sz' = sz + 1
|
||||||
|
let qa' = qa >> a
|
||||||
|
let enqueue = putMVar qvar (DbQueue sz' qa')
|
||||||
|
if sz' > maxsz
|
||||||
|
then do
|
||||||
|
r <- tryNonAsync $ do
|
||||||
|
runDb h qa'
|
||||||
|
commitDb h
|
||||||
|
case r of
|
||||||
|
Left _ -> enqueue
|
||||||
|
Right _ -> putMVar qvar emptyDbQueue
|
||||||
|
else enqueue
|
||||||
|
|
||||||
|
{- If flushing the queue fails, this could be because there is another
|
||||||
|
- writer to the database. Retry repeatedly for up to 10 seconds. -}
|
||||||
|
flushQueueDb :: DbHandle -> IO ()
|
||||||
|
flushQueueDb h@(DbHandle _ _ qvar) = do
|
||||||
|
DbQueue sz qa <- takeMVar qvar
|
||||||
|
when (sz > 0) $
|
||||||
|
robustly 100 $ runDb h qa
|
||||||
|
where
|
||||||
|
robustly :: Int -> IO () -> IO ()
|
||||||
|
robustly 0 _ = error "failed to commit changes to sqlite database"
|
||||||
|
robustly n a = catchNonAsync a $ \_ -> do
|
||||||
|
threadDelay 100000 -- 1/10th second
|
||||||
|
robustly (n-1) a
|
||||||
|
|
Loading…
Add table
Reference in a new issue