759a87ad70
Probably not noticed until now because the queue is large enough that two threads each filling theirs at the same time and flushing is unlikely to happen. Also made explicit that each worker thread gets its own queue. I think that was the case before, but if something was put in the queue before worker threads were forked off, they could have each inherited the same queue. Could have gone with a single shared queue, but per-worker queues is more efficient, because a worker can add lots of stuff to its own queue without any locking. This commit was sponsored by Ole-Morten Duesund on Patreon.
64 lines
1.8 KiB
Haskell
64 lines
1.8 KiB
Haskell
{- git-annex concurrent state
|
|
-
|
|
- Copyright 2015 Joey Hess <id@joeyh.name>
|
|
-
|
|
- Licensed under the GNU GPL version 3 or higher.
|
|
-}
|
|
|
|
module Annex.Concurrent where
|
|
|
|
import Annex
|
|
import Annex.Common
|
|
import Annex.Action
|
|
import qualified Annex.Queue
|
|
|
|
import qualified Data.Map as M
|
|
|
|
{- Allows forking off a thread that uses a copy of the current AnnexState
|
|
- to run an Annex action.
|
|
-
|
|
- The returned IO action can be used to start the thread.
|
|
- It returns an Annex action that must be run in the original
|
|
- calling context to merge the forked AnnexState back into the
|
|
- current AnnexState.
|
|
-}
|
|
forkState :: Annex a -> Annex (IO (Annex a))
|
|
forkState a = do
|
|
st <- dupState
|
|
return $ do
|
|
(ret, newst) <- run st a
|
|
return $ do
|
|
mergeState newst
|
|
return ret
|
|
|
|
{- Returns a copy of the current AnnexState that is safe to be
|
|
- used when forking off a thread.
|
|
-
|
|
- After an Annex action is run using this AnnexState, it
|
|
- should be merged back into the current Annex's state,
|
|
- by calling mergeState.
|
|
-}
|
|
dupState :: Annex AnnexState
|
|
dupState = do
|
|
st <- Annex.getState id
|
|
return $ st
|
|
{ Annex.workers = []
|
|
-- each thread has its own repoqueue, but the repoqueuesem
|
|
-- is shared to prevent more than one thread flushing its
|
|
-- queue at the same time
|
|
, Annex.repoqueue = Nothing
|
|
-- avoid sharing eg, open file handles
|
|
, Annex.catfilehandles = M.empty
|
|
, Annex.checkattrhandle = Nothing
|
|
, Annex.checkignorehandle = Nothing
|
|
}
|
|
|
|
{- Merges the passed AnnexState into the current Annex state.
|
|
- Also closes various handles in it. -}
|
|
mergeState :: AnnexState -> Annex ()
|
|
mergeState st = do
|
|
st' <- liftIO $ snd <$> run st stopCoProcesses
|
|
forM_ (M.toList $ Annex.cleanup st') $
|
|
uncurry addCleanup
|
|
Annex.Queue.mergeFrom st'
|
|
changeState $ \s -> s { errcounter = errcounter s + errcounter st' }
|