fsck: Commit incremental fsck database after every 1000 files fscked, or every 5 minutes, whichever comes first.

Previously, commits were made every 1000 files fscked.

Also, improve docs
This commit is contained in:
Joey Hess 2015-07-31 16:42:15 -04:00
parent 9dfe03dbcd
commit bc4129cc77
5 changed files with 59 additions and 20 deletions

View file

@ -34,6 +34,7 @@ import Annex.LockFile
import Database.Persist.TH
import Database.Esqueleto hiding (Key)
import Data.Time.Clock
data FsckHandle = FsckHandle H.DbHandle UUID
@ -84,11 +85,18 @@ closeDb (FsckHandle h u) = do
unlockFile =<< fromRepo (gitAnnexFsckDbLock u)
addDb :: FsckHandle -> Key -> IO ()
addDb (FsckHandle h _) k = H.queueDb h 1000 $
addDb (FsckHandle h _) k = H.queueDb h checkcommit $
void $ insertUnique $ Fscked sk
where
sk = toSKey k
-- commit queue after 1000 files or 5 minutes, whichever comes first
checkcommit sz lastcommittime
| sz > 1000 = return True
| otherwise = do
now <- getCurrentTime
return $ diffUTCTime lastcommittime now > 300
inDb :: FsckHandle -> Key -> IO Bool
inDb (FsckHandle h _) = H.queryDb h . inDb' . toSKey

View file

@ -20,6 +20,7 @@ module Database.Handle (
) where
import Utility.Exception
import Utility.Monad
import Messages
import Database.Persist.Sqlite
@ -33,6 +34,7 @@ import qualified Data.Text as T
import Control.Monad.Trans.Resource (runResourceT)
import Control.Monad.Logger (runNoLoggingT)
import Data.List
import Data.Time.Clock
{- A DbHandle is a reference to a worker thread that communicates with
- the database. It has a MVar which Jobs are submitted to. -}
@ -64,7 +66,7 @@ openDb :: FilePath -> TableName -> IO DbHandle
openDb db tablename = do
jobs <- newEmptyMVar
worker <- async (workerThread (T.pack db) tablename jobs)
q <- newMVar emptyDbQueue
q <- newMVar =<< emptyDbQueue
return $ DbHandle worker jobs q
data Job
@ -145,16 +147,19 @@ closeDb h@(DbHandle worker jobs _) = do
type Size = Int
{- A queue of actions to perform, with a count of the number of actions
- queued. -}
data DbQueue = DbQueue Size (SqlPersistM ())
type LastCommitTime = UTCTime
emptyDbQueue :: DbQueue
emptyDbQueue = DbQueue 0 (return ())
{- A queue of actions to perform, with a count of the number of actions
- queued, and a last commit time. -}
data DbQueue = DbQueue Size LastCommitTime (SqlPersistM ())
emptyDbQueue :: IO DbQueue
emptyDbQueue = do
now <- getCurrentTime
return $ DbQueue 0 now (return ())
{- Queues a change to be made to the database. It will be buffered
- to be committed later, unless the queue gets larger than the specified
- size.
- to be committed later, unless the commitchecker action returns true.
-
- (Be sure to call closeDb or flushQueueDb to ensure the change
- gets committed.)
@ -164,25 +169,32 @@ emptyDbQueue = DbQueue 0 (return ())
- process, the transaction is put back in the queue. This solves
- the sqlite multiple writer problem.
-}
queueDb :: DbHandle -> Size -> SqlPersistM () -> IO ()
queueDb h@(DbHandle _ _ qvar) maxsz a = do
DbQueue sz qa <- takeMVar qvar
queueDb
:: DbHandle
-> (Size -> LastCommitTime -> IO Bool)
-> SqlPersistM ()
-> IO ()
queueDb h@(DbHandle _ _ qvar) commitchecker a = do
DbQueue sz lastcommittime qa <- takeMVar qvar
let !sz' = sz + 1
let qa' = qa >> a
let enqueue newsz = putMVar qvar (DbQueue newsz qa')
if sz' > maxsz
then do
let enqueue = putMVar qvar
ifM (commitchecker sz' lastcommittime)
( do
r <- commitDb h qa'
case r of
Left _ -> enqueue 0
Right _ -> putMVar qvar emptyDbQueue
else enqueue sz'
Left _ -> enqueue $ DbQueue sz' lastcommittime qa'
Right _ -> do
now <- getCurrentTime
enqueue $ DbQueue 0 now (return ())
, enqueue $ DbQueue sz' lastcommittime qa'
)
{- If flushing the queue fails, this could be because there is another
- writer to the database. Retry repeatedly for up to 10 seconds. -}
flushQueueDb :: DbHandle -> IO ()
flushQueueDb h@(DbHandle _ _ qvar) = do
DbQueue sz qa <- takeMVar qvar
DbQueue sz _ qa <- takeMVar qvar
when (sz > 0) $
robustly Nothing 100 (commitDb h qa)
where