git-annex/Annex/Concurrent.hs
Joey Hess 759a87ad70
fix git command queue to be concurrency safe
Probably not noticed until now because the queue is large enough that two
threads each filling theirs at the same time and flushing is unlikely to
happen.

Also made explicit that each worker thread gets its own queue.
I think that was the case before, but if something was put in the queue
before worker threads were forked off, they could have each inherited the
same queue.

Could have gone with a single shared queue, but per-worker queues is more
efficient, because a worker can add lots of stuff to its own queue without
any locking.

This commit was sponsored by Ole-Morten Duesund on Patreon.
2018-08-28 13:16:33 -04:00

64 lines
1.8 KiB
Haskell

{- git-annex concurrent state
-
- Copyright 2015 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Annex.Concurrent where
import Annex
import Annex.Common
import Annex.Action
import qualified Annex.Queue
import qualified Data.Map as M
{- Allows forking off a thread that uses a copy of the current AnnexState
- to run an Annex action.
-
- The returned IO action can be used to start the thread.
- It returns an Annex action that must be run in the original
- calling context to merge the forked AnnexState back into the
- current AnnexState.
-}
forkState :: Annex a -> Annex (IO (Annex a))
forkState a = do
st <- dupState
return $ do
(ret, newst) <- run st a
return $ do
mergeState newst
return ret
{- Returns a copy of the current AnnexState that is safe to be
- used when forking off a thread.
-
- After an Annex action is run using this AnnexState, it
- should be merged back into the current Annex's state,
- by calling mergeState.
-}
dupState :: Annex AnnexState
dupState = do
st <- Annex.getState id
return $ st
{ Annex.workers = []
-- each thread has its own repoqueue, but the repoqueuesem
-- is shared to prevent more than one thread flushing its
-- queue at the same time
, Annex.repoqueue = Nothing
-- avoid sharing eg, open file handles
, Annex.catfilehandles = M.empty
, Annex.checkattrhandle = Nothing
, Annex.checkignorehandle = Nothing
}
{- Merges the passed AnnexState into the current Annex state.
- Also closes various handles in it. -}
mergeState :: AnnexState -> Annex ()
mergeState st = do
st' <- liftIO $ snd <$> run st stopCoProcesses
forM_ (M.toList $ Annex.cleanup st') $
uncurry addCleanup
Annex.Queue.mergeFrom st'
changeState $ \s -> s { errcounter = errcounter s + errcounter st' }