{- git-annex concurrent state - - Copyright 2015-2020 Joey Hess - - Licensed under the GNU AGPL version 3 or higher. -} module Annex.Concurrent ( module Annex.Concurrent, module Annex.Concurrent.Utility ) where import Annex import Annex.Common import Annex.Concurrent.Utility import qualified Annex.Queue import Annex.Action import Types.Concurrency import Types.CatFileHandles import Annex.CheckAttr import Annex.CheckIgnore import qualified Data.Map as M setConcurrency :: ConcurrencySetting -> Annex () setConcurrency (ConcurrencyCmdLine s) = setConcurrency' s ConcurrencyCmdLine setConcurrency (ConcurrencyGitConfig s) = setConcurrency' s ConcurrencyGitConfig setConcurrency' :: Concurrency -> (Concurrency -> ConcurrencySetting) -> Annex () setConcurrency' NonConcurrent f = Annex.changeState $ \s -> s { Annex.concurrency = f NonConcurrent } setConcurrency' c f = do cfh <- getState Annex.catfilehandles cfh' <- case cfh of CatFileHandlesNonConcurrent _ -> liftIO catFileHandlesPool CatFileHandlesPool _ -> pure cfh cah <- mkConcurrentCheckAttrHandle c cih <- mkConcurrentCheckIgnoreHandle c Annex.changeState $ \s -> s { Annex.concurrency = f c , Annex.catfilehandles = cfh' , Annex.checkattrhandle = Just cah , Annex.checkignorehandle = Just cih } {- Allows forking off a thread that uses a copy of the current AnnexState - to run an Annex action. - - The returned IO action can be used to start the thread. - It returns an Annex action that must be run in the original - calling context to merge the forked AnnexState back into the - current AnnexState. -} forkState :: Annex a -> Annex (IO (Annex a)) forkState a = do rd <- Annex.getRead id st <- dupState return $ do (ret, (newst, _rd)) <- run (st, rd) a return $ do mergeState newst return ret {- Returns a copy of the current AnnexState that is safe to be - used when forking off a thread. - - After an Annex action is run using this AnnexState, it - should be merged back into the current Annex's state, - by calling mergeState. -} dupState :: Annex AnnexState dupState = do st <- Annex.getState id -- Make sure that concurrency is enabled, if it was not already, -- so the concurrency-safe resource pools are set up. st' <- case getConcurrency' (Annex.concurrency st) of NonConcurrent -> do setConcurrency (ConcurrencyCmdLine (Concurrent 1)) Annex.getState id _ -> return st return $ st' -- each thread has its own repoqueue { Annex.repoqueue = Nothing -- no errors from this thread yet , Annex.errcounter = 0 } {- Merges the passed AnnexState into the current Annex state. - Also closes various handles in it. -} mergeState :: AnnexState -> Annex () mergeState st = do rd <- Annex.getRead id st' <- liftIO $ (fst . snd) <$> run (st, rd) stopNonConcurrentSafeCoProcesses forM_ (M.toList $ Annex.cleanupactions st') $ uncurry addCleanupAction Annex.Queue.mergeFrom st' changeState $ \s -> s { errcounter = errcounter s + errcounter st' }