finish CommandStart transition
The hoped for optimisation of CommandStart with -J did not materialize. In fact, not runnign CommandStart in parallel is slower than -J3. So, CommandStart are still run in parallel. (The actual bad performance I've been seeing with -J in my big repo has to do with building the remoteList.) But, this is still progress toward making -J faster, because it gets rid of the onlyActionOn roadblock in the way of making CommandCleanup jobs run separate from CommandPerform jobs. Added OnlyActionOn constructor for ActionItem which fixes the onlyActionOn breakage in the last commit. Made CustomOutput include an ActionItem, so even things using it can specify OnlyActionOn. In Command.Move and Command.Sync, there were CommandStarts that used includeCommandAction, so output messages, which is no longer allowed. Fixed by using startingCustomOutput, but that's still not quite right, since it prevents message display for the includeCommandAction run inside it too.
This commit is contained in:
parent
436f107715
commit
8e5ea28c26
26 changed files with 142 additions and 97 deletions
|
@ -1,4 +1,4 @@
|
|||
{- git-annex command-line actions
|
||||
{- git-annex command-line actions and concurrency
|
||||
-
|
||||
- Copyright 2010-2019 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
|
@ -54,33 +54,70 @@ commandActions = mapM_ commandAction
|
|||
- This should only be run in the seek stage.
|
||||
-}
|
||||
commandAction :: CommandStart -> Annex ()
|
||||
commandAction a = Annex.getState Annex.concurrency >>= \case
|
||||
NonConcurrent -> run
|
||||
commandAction start = Annex.getState Annex.concurrency >>= \case
|
||||
NonConcurrent -> void $ includeCommandAction start
|
||||
Concurrent n -> runconcurrent n
|
||||
ConcurrentPerCpu -> runconcurrent =<< liftIO getNumProcessors
|
||||
where
|
||||
run = void $ includeCommandAction a
|
||||
|
||||
runconcurrent n = do
|
||||
tv <- Annex.getState Annex.workers
|
||||
workerst <- waitWorkerSlot n (== PerformStage) tv
|
||||
aid <- liftIO $ async $ snd <$> Annex.run workerst
|
||||
(concurrentjob workerst)
|
||||
liftIO $ atomically $ do
|
||||
pool <- takeTMVar tv
|
||||
let !pool' = addWorkerPool (ActiveWorker aid PerformStage) pool
|
||||
putTMVar tv pool'
|
||||
void $ liftIO $ forkIO $ do
|
||||
aid <- async $ snd <$> Annex.run workerst
|
||||
(inOwnConsoleRegion (Annex.output workerst) run)
|
||||
atomically $ do
|
||||
pool <- takeTMVar tv
|
||||
let !pool' = addWorkerPool (ActiveWorker aid PerformStage) pool
|
||||
putTMVar tv pool'
|
||||
-- There won't usually be exceptions because the
|
||||
-- async is running includeCommandAction, which
|
||||
-- catches exceptions. Just in case, avoid
|
||||
-- stalling by using the original workerst.
|
||||
-- accountCommandAction will usually catch
|
||||
-- exceptions. Just in case, fall back to the
|
||||
-- original workerst.
|
||||
workerst' <- either (const workerst) id
|
||||
<$> waitCatch aid
|
||||
atomically $ do
|
||||
pool <- takeTMVar tv
|
||||
let !pool' = deactivateWorker pool aid workerst'
|
||||
putTMVar tv pool'
|
||||
|
||||
concurrentjob workerst = start >>= \case
|
||||
Nothing -> noop
|
||||
Just (startmsg, perform) ->
|
||||
concurrentjob' workerst startmsg perform
|
||||
|
||||
concurrentjob' workerst startmsg perform = case mkActionItem startmsg of
|
||||
OnlyActionOn k _ -> ensureOnlyActionOn k $
|
||||
-- If another job performed the same action while we
|
||||
-- waited, there may be nothing left to do, so re-run
|
||||
-- the start stage to see if it still wants to do
|
||||
-- something.
|
||||
start >>= \case
|
||||
Just (startmsg', perform') ->
|
||||
case mkActionItem startmsg' of
|
||||
OnlyActionOn k' _ | k' /= k ->
|
||||
concurrentjob' workerst startmsg' perform'
|
||||
_ -> mkjob workerst startmsg' perform'
|
||||
Nothing -> noop
|
||||
_ -> mkjob workerst startmsg perform
|
||||
|
||||
mkjob workerst startmsg perform =
|
||||
inOwnConsoleRegion (Annex.output workerst) $
|
||||
void $ accountCommandAction $
|
||||
performconcurrent startmsg perform
|
||||
|
||||
-- Like callCommandAction, but the start stage has already run,
|
||||
-- and the worker thread's stage is changed before starting the
|
||||
-- cleanup action.
|
||||
performconcurrent startmsg perform = do
|
||||
showStartMessage startmsg
|
||||
perform >>= \case
|
||||
Just cleanup -> do
|
||||
changeStageTo CleanupStage
|
||||
r <- cleanup
|
||||
implicitMessage (showEndResult r)
|
||||
return r
|
||||
Nothing -> do
|
||||
implicitMessage (showEndResult False)
|
||||
return False
|
||||
|
||||
-- | Wait until there's an idle worker in the pool, remove it from the
|
||||
-- pool, and return its state.
|
||||
|
@ -138,16 +175,17 @@ finishCommandActions = do
|
|||
swapTMVar tv UnallocatedWorkerPool
|
||||
case pool of
|
||||
UnallocatedWorkerPool -> noop
|
||||
WorkerPool l -> forM_ (mapMaybe workerAsync l) $ \aid ->
|
||||
WorkerPool l -> forM_ (mapMaybe workerAsync l) $ \aid ->
|
||||
liftIO (waitCatch aid) >>= \case
|
||||
Left _ -> noop
|
||||
Right st -> mergeState st
|
||||
|
||||
{- Changes the current thread's stage in the worker pool.
|
||||
-
|
||||
- An idle worker with the desired stage is found in the pool
|
||||
- (waiting if necessary for one to become idle)
|
||||
- and the stages of it and the current thread are swapped.
|
||||
- The pool needs to continue to contain the same number of worker threads
|
||||
- for each stage. So, an idle worker with the desired stage is found in
|
||||
- the pool (waiting if necessary for one to become idle), and the stages
|
||||
- of it and the current thread are swapped.
|
||||
-}
|
||||
changeStageTo :: WorkerStage -> Annex ()
|
||||
changeStageTo newstage = do
|
||||
|
@ -168,23 +206,26 @@ changeStageTo newstage = do
|
|||
|
||||
{- Like commandAction, but without the concurrency. -}
|
||||
includeCommandAction :: CommandStart -> CommandCleanup
|
||||
includeCommandAction a = account =<< tryNonAsync (callCommandAction a)
|
||||
where
|
||||
account (Right True) = return True
|
||||
account (Right False) = incerr
|
||||
account (Left err) = case fromException err of
|
||||
includeCommandAction = accountCommandAction . callCommandAction
|
||||
|
||||
accountCommandAction :: CommandCleanup -> CommandCleanup
|
||||
accountCommandAction a = tryNonAsync a >>= \case
|
||||
Right True -> return True
|
||||
Right False -> incerr
|
||||
Left err -> case fromException err of
|
||||
Just exitcode -> liftIO $ exitWith exitcode
|
||||
Nothing -> do
|
||||
toplevelWarning True (show err)
|
||||
implicitMessage showEndFail
|
||||
incerr
|
||||
where
|
||||
incerr = do
|
||||
Annex.incError
|
||||
return False
|
||||
|
||||
{- Runs a single command action through the start, perform and cleanup
|
||||
- stages, without catching errors. Useful if one command wants to run
|
||||
- part of another command. -}
|
||||
- stages, without catching errors and without incrementing error counter.
|
||||
- Useful if one command wants to run part of another command. -}
|
||||
callCommandAction :: CommandStart -> CommandCleanup
|
||||
callCommandAction = fromMaybe True <$$> callCommandAction'
|
||||
|
||||
|
@ -203,9 +244,7 @@ callCommandActionQuiet start =
|
|||
showStartMessage startmsg
|
||||
perform >>= \case
|
||||
Nothing -> return (Just False)
|
||||
Just cleanup -> do
|
||||
changeStageTo CleanupStage
|
||||
Just <$> cleanup
|
||||
Just cleanup -> Just <$> cleanup
|
||||
|
||||
{- Do concurrent output when that has been requested. -}
|
||||
allowConcurrentOutput :: Annex a -> Annex a
|
||||
|
@ -253,22 +292,12 @@ allowConcurrentOutput a = do
|
|||
liftIO $ setNumCapabilities n
|
||||
|
||||
{- Ensures that only one thread processes a key at a time.
|
||||
- Other threads will block until it's done. -}
|
||||
{-
|
||||
onlyActionOn :: Key -> CommandStart -> CommandStart
|
||||
onlyActionOn k a = onlyActionOn' k run
|
||||
where
|
||||
-- Run whole action, not just start stage, so other threads
|
||||
-- block until it's done.
|
||||
run = callCommandActionQuiet a >>= \case
|
||||
Nothing -> return Nothing
|
||||
Just r' -> return $ Just $ return $ Just $ return r'
|
||||
-}
|
||||
|
||||
{- Ensures that only one thread processes a key at a time.
|
||||
- Other threads will block until it's done. -}
|
||||
onlyActionOn' :: Key -> Annex a -> Annex a
|
||||
onlyActionOn' k a = go =<< Annex.getState Annex.concurrency
|
||||
- Other threads will block until it's done.
|
||||
-
|
||||
- May be called repeatedly by the same thread without blocking. -}
|
||||
ensureOnlyActionOn :: Key -> Annex a -> Annex a
|
||||
ensureOnlyActionOn k a =
|
||||
go =<< Annex.getState Annex.concurrency
|
||||
where
|
||||
go NonConcurrent = a
|
||||
go (Concurrent _) = goconcurrent
|
||||
|
@ -283,7 +312,7 @@ onlyActionOn' k a = go =<< Annex.getState Annex.concurrency
|
|||
case M.lookup k m of
|
||||
Just tid
|
||||
| tid /= mytid -> retry
|
||||
| otherwise -> return (return ())
|
||||
| otherwise -> return $ return ()
|
||||
Nothing -> do
|
||||
writeTVar tv $! M.insert k mytid m
|
||||
return $ liftIO $ atomically $
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue