commit more transactions when fscking
This makes interrupt and resume work, robustly. But, incremental fsck is slowed down by all those transactions..
This commit is contained in:
parent
91e9146d1b
commit
d2766df914
3 changed files with 34 additions and 19 deletions
|
@ -434,7 +434,7 @@ runFsck inc file key a = ifM (needFsck inc key)
|
||||||
|
|
||||||
{- Check if a key needs to be fscked, with support for incremental fscks. -}
|
{- Check if a key needs to be fscked, with support for incremental fscks. -}
|
||||||
needFsck :: Incremental -> Key -> Annex Bool
|
needFsck :: Incremental -> Key -> Annex Bool
|
||||||
needFsck (ContIncremental h) key = not <$> FsckDb.inDb h key
|
needFsck (ContIncremental h) key = liftIO $ not <$> FsckDb.inDb h key
|
||||||
needFsck _ _ = return True
|
needFsck _ _ = return True
|
||||||
|
|
||||||
withFsckDb :: Incremental -> (FsckDb.DbHandle -> Annex ()) -> Annex ()
|
withFsckDb :: Incremental -> (FsckDb.DbHandle -> Annex ()) -> Annex ()
|
||||||
|
@ -443,7 +443,9 @@ withFsckDb (StartIncremental h) a = a h
|
||||||
withFsckDb NonIncremental _ = noop
|
withFsckDb NonIncremental _ = noop
|
||||||
|
|
||||||
recordFsckTime :: Incremental -> Key -> Annex ()
|
recordFsckTime :: Incremental -> Key -> Annex ()
|
||||||
recordFsckTime inc key = withFsckDb inc $ \h -> FsckDb.addDb h key
|
recordFsckTime inc key = withFsckDb inc $ \h -> liftIO $ do
|
||||||
|
FsckDb.addDb h key
|
||||||
|
FsckDb.commitDb h
|
||||||
|
|
||||||
{- Records the start time of an incremental fsck.
|
{- Records the start time of an incremental fsck.
|
||||||
-
|
-
|
||||||
|
|
|
@ -11,6 +11,7 @@
|
||||||
module Database.Fsck (
|
module Database.Fsck (
|
||||||
newPass,
|
newPass,
|
||||||
openDb,
|
openDb,
|
||||||
|
H.commitDb,
|
||||||
H.closeDb,
|
H.closeDb,
|
||||||
H.DbHandle,
|
H.DbHandle,
|
||||||
addDb,
|
addDb,
|
||||||
|
@ -60,11 +61,11 @@ openDb = do
|
||||||
liftIO $ renameFile newdb db
|
liftIO $ renameFile newdb db
|
||||||
liftIO $ H.openDb db
|
liftIO $ H.openDb db
|
||||||
|
|
||||||
addDb :: H.DbHandle -> Key -> Annex ()
|
addDb :: H.DbHandle -> Key -> IO ()
|
||||||
addDb h = void . liftIO . H.runDb h . insert . Fscked . toSKey
|
addDb h = void . H.runDb h . insert . Fscked . toSKey
|
||||||
|
|
||||||
inDb :: H.DbHandle -> Key -> Annex Bool
|
inDb :: H.DbHandle -> Key -> IO Bool
|
||||||
inDb h k = liftIO $ H.runDb h $ do
|
inDb h k = H.runDb h $ do
|
||||||
r <- select $ from $ \r -> do
|
r <- select $ from $ \r -> do
|
||||||
where_ (r ^. FsckedKey ==. val (toSKey k))
|
where_ (r ^. FsckedKey ==. val (toSKey k))
|
||||||
return (r ^. FsckedKey)
|
return (r ^. FsckedKey)
|
||||||
|
|
|
@ -8,8 +8,9 @@
|
||||||
module Database.Handle (
|
module Database.Handle (
|
||||||
DbHandle,
|
DbHandle,
|
||||||
openDb,
|
openDb,
|
||||||
closeDb,
|
|
||||||
runDb,
|
runDb,
|
||||||
|
commitDb,
|
||||||
|
closeDb,
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Utility.Exception
|
import Utility.Exception
|
||||||
|
@ -26,27 +27,28 @@ import qualified Data.Text as T
|
||||||
- the database. It has a MVar which Jobs are submitted to. -}
|
- the database. It has a MVar which Jobs are submitted to. -}
|
||||||
data DbHandle = DbHandle (Async ()) (MVar Job)
|
data DbHandle = DbHandle (Async ()) (MVar Job)
|
||||||
|
|
||||||
data Job = Job (SqlPersistM ()) | CloseJob
|
data Job = RunJob (SqlPersistM ()) | CommitJob | CloseJob
|
||||||
|
|
||||||
openDb :: FilePath -> IO DbHandle
|
openDb :: FilePath -> IO DbHandle
|
||||||
openDb db = do
|
openDb db = do
|
||||||
jobs <- newEmptyMVar
|
jobs <- newEmptyMVar
|
||||||
worker <- async (workerThread db jobs)
|
worker <- async (workerThread (T.pack db) jobs)
|
||||||
return $ DbHandle worker jobs
|
return $ DbHandle worker jobs
|
||||||
|
|
||||||
workerThread :: FilePath -> MVar Job -> IO ()
|
workerThread :: T.Text -> MVar Job -> IO ()
|
||||||
workerThread db jobs = runSqlite (T.pack db) go
|
workerThread db jobs = go
|
||||||
where
|
where
|
||||||
go = do
|
go = do
|
||||||
|
r <- runSqlite db transaction
|
||||||
|
case r of
|
||||||
|
CloseJob -> return ()
|
||||||
|
_ -> go
|
||||||
|
transaction = do
|
||||||
job <- liftIO $ takeMVar jobs
|
job <- liftIO $ takeMVar jobs
|
||||||
case job of
|
case job of
|
||||||
Job a -> a >> go
|
RunJob a -> a >> transaction
|
||||||
CloseJob -> return ()
|
CommitJob -> return CommitJob
|
||||||
|
CloseJob -> return CloseJob
|
||||||
closeDb :: DbHandle -> IO ()
|
|
||||||
closeDb (DbHandle worker jobs) = do
|
|
||||||
putMVar jobs CloseJob
|
|
||||||
wait worker
|
|
||||||
|
|
||||||
{- Runs an action using the DbHandle.
|
{- Runs an action using the DbHandle.
|
||||||
-
|
-
|
||||||
|
@ -59,5 +61,15 @@ closeDb (DbHandle worker jobs) = do
|
||||||
runDb :: DbHandle -> SqlPersistM a -> IO a
|
runDb :: DbHandle -> SqlPersistM a -> IO a
|
||||||
runDb (DbHandle _ jobs) a = do
|
runDb (DbHandle _ jobs) a = do
|
||||||
res <- newEmptyMVar
|
res <- newEmptyMVar
|
||||||
putMVar jobs $ Job $ liftIO . putMVar res =<< tryNonAsync a
|
putMVar jobs $ RunJob $ liftIO . putMVar res =<< tryNonAsync a
|
||||||
either throwIO return =<< takeMVar res
|
either throwIO return =<< takeMVar res
|
||||||
|
|
||||||
|
{- Commits any transaction that was created by the previous calls to runDb,
|
||||||
|
- and starts a new transaction. -}
|
||||||
|
commitDb :: DbHandle -> IO ()
|
||||||
|
commitDb (DbHandle _ jobs) = putMVar jobs CommitJob
|
||||||
|
|
||||||
|
closeDb :: DbHandle -> IO ()
|
||||||
|
closeDb (DbHandle worker jobs) = do
|
||||||
|
putMVar jobs CloseJob
|
||||||
|
wait worker
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue