This commit is contained in:
Joey Hess 2015-04-10 17:53:58 -04:00
parent f03473d0b1
commit 9971c82ead
4 changed files with 88 additions and 24 deletions

View file

@ -7,7 +7,8 @@
module Annex.CheckAttr (
checkAttr,
checkAttrHandle
checkAttrHandle,
checkAttrStop,
) where
import Common.Annex
@ -33,3 +34,10 @@ checkAttrHandle = maybe startup return =<< Annex.getState Annex.checkattrhandle
h <- inRepo $ Git.checkAttrStart annexAttrs
Annex.changeState $ \s -> s { Annex.checkattrhandle = Just h }
return h
checkAttrStop :: Annex ()
checkAttrStop = maybe noop stop =<< Annex.getState Annex.checkattrhandle
where
stop h = do
liftIO $ Git.checkAttrStop h
Annex.changeState $ \s -> s { Annex.checkattrhandle = Nothing }

View file

@ -8,7 +8,8 @@
module Annex.CheckIgnore (
checkIgnored,
checkIgnoreHandle
checkIgnoreHandle,
checkIgnoreStop
) where
import Common.Annex
@ -30,3 +31,11 @@ checkIgnoreHandle = maybe startup return =<< Annex.getState Annex.checkignorehan
warning "The installed version of git is too old for .gitignores to be honored by git-annex."
Annex.changeState $ \s -> s { Annex.checkignorehandle = Just v }
return v
checkIgnoreStop :: Annex ()
checkIgnoreStop = maybe noop stop =<< Annex.getState Annex.checkignorehandle
where
stop (Just h) = do
liftIO $ Git.checkIgnoreStop h
Annex.changeState $ \s -> s { Annex.checkignorehandle = Nothing }
stop Nothing = noop

65
Annex/Concurrent.hs Normal file
View file

@ -0,0 +1,65 @@
{- git-annex concurrent state
-
- Copyright 2015 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Annex.Concurrent where
import Common.Annex
import Annex
import Annex.CatFile
import Annex.CheckAttr
import Annex.CheckIgnore
import qualified Data.Map as M
{- Allows forking off a thread that uses a copy of the current AnnexState
- to run an Annex action.
-
- The returned IO action can be used to start the thread.
- It returns an Annex action that must be run in the original
- calling context to merge the forked AnnexState back into the
- current AnnexState.
-}
forkState :: Annex a -> Annex (IO (Annex a))
forkState a = do
st <- dupState
return $ do
(ret, newst) <- run st a
return $ do
mergeState newst
return ret
{- Returns a copy of the current AnnexState that is safe to be
- used when forking off a thread.
-
- After an Annex action is run using this AnnexState, it
- should be merged back into the current Annex's state,
- by calling mergeState.
-}
dupState :: Annex AnnexState
dupState = do
st <- Annex.getState id
-- avoid sharing eg, open file handles
return $ st
{ Annex.workers = []
, Annex.catfilehandles = M.empty
, Annex.checkattrhandle = Nothing
, Annex.checkignorehandle = Nothing
}
{- Merges the passed AnnexState into the current Annex state.
- Also shuts closes various handles in it. -}
mergeState :: AnnexState -> Annex ()
mergeState st = do
st' <- liftIO $ snd <$> run st closehandles
forM_ (M.toList $ Annex.cleanup st') $
uncurry addCleanup
changeState $ \s -> s { errcounter = errcounter s + errcounter st' }
where
closehandles = do
catFileStop
checkAttrStop
checkIgnoreStop

View file

@ -11,6 +11,7 @@ module CmdLine.Action where
import Common.Annex
import qualified Annex
import Annex.Concurrent
import Types.Command
import qualified Annex.Queue
import Messages.Internal
@ -18,11 +19,8 @@ import Types.Messages
import Control.Concurrent.Async
import Control.Exception (throwIO)
import qualified Data.Map as M
import Data.Either
type CommandActionRunner = CommandStart -> CommandCleanup
{- Runs a command, starting with the check stage, and then
- the seek stage. Finishes by running the continutation, and
- then showing a count of any failures. -}
@ -55,7 +53,7 @@ commandAction a = withOutputType go
ws <- Annex.getState Annex.workers
(st, ws') <- if null ws
then do
st <- newWorkerState
st <- dupState
return (st, replicate (n-1) (Left st))
else do
l <- liftIO $ drainTo (n-1) ws
@ -75,11 +73,7 @@ commandAction a = withOutputType go
finishCommandActions :: Annex ()
finishCommandActions = do
l <- liftIO . drainTo 0 =<< Annex.getState Annex.workers
forM_ (lefts l) $ \st -> do
forM_ (M.toList $ Annex.cleanup st) $
uncurry Annex.addCleanup
Annex.changeState $ \s ->
s { Annex.errcounter = Annex.errcounter s + Annex.errcounter st }
forM_ (lefts l) mergeState
{- Wait for Asyncs from the list to finish, replacing them with their
- final AnnexStates, until the list of remaining Asyncs is not larger
@ -110,23 +104,11 @@ findFreeSlot :: [Either Annex.AnnexState (Async Annex.AnnexState)] -> Annex (Ann
findFreeSlot = go []
where
go c [] = do
st <- newWorkerState
st <- dupState
return (st, c)
go c (Left st:rest) = return (st, c ++ rest)
go c (v:rest) = go (v:c) rest
{- From the current Annex state, get a state that is suitable for being
- used for a worker thread. Avoid sharing eg, open file handles. -}
newWorkerState :: Annex Annex.AnnexState
newWorkerState = do
st <- Annex.getState id
return $ st
{ Annex.workers = []
, Annex.catfilehandles = M.empty
, Annex.checkattrhandle = Nothing
, Annex.checkignorehandle = Nothing
}
{- Like commandAction, but without the concurrency. -}
includeCommandAction :: CommandStart -> CommandCleanup
includeCommandAction a = account =<< tryIO go