join back threads before ending concurrent output so display works

I didn't really want to put allowConcurrentOutput in CmdLine.Action,
but there were dep loops and that was the best place available.
This commit is contained in:
Joey Hess 2015-11-05 17:22:45 -04:00
parent 2ca52b4a9d
commit ab6b1edfee
Failed to extract signature
4 changed files with 123 additions and 92 deletions

View file

@ -5,6 +5,8 @@
- Licensed under the GNU GPL version 3 or higher. - Licensed under the GNU GPL version 3 or higher.
-} -}
{-# LANGUAGE CPP #-}
module CmdLine.Action where module CmdLine.Action where
import Common.Annex import Common.Annex
@ -12,6 +14,7 @@ import qualified Annex
import Annex.Concurrent import Annex.Concurrent
import Types.Command import Types.Command
import qualified Annex.Queue import qualified Annex.Queue
import Messages.Concurrent
import Messages.Internal import Messages.Internal
import Types.Messages import Types.Messages
@ -19,6 +22,10 @@ import Control.Concurrent.Async
import Control.Exception (throwIO) import Control.Exception (throwIO)
import Data.Either import Data.Either
#ifdef WITH_CONCURRENTOUTPUT
import qualified System.Console.Regions as Regions
#endif
{- Runs a command, starting with the check stage, and then {- Runs a command, starting with the check stage, and then
- the seek stage. Finishes by running the continutation, and - the seek stage. Finishes by running the continutation, and
- then showing a count of any failures. -} - then showing a count of any failures. -}
@ -71,7 +78,9 @@ commandAction a = withOutputType go
-} -}
finishCommandActions :: Annex () finishCommandActions :: Annex ()
finishCommandActions = do finishCommandActions = do
l <- liftIO . drainTo 0 =<< Annex.getState Annex.workers ws <- Annex.getState Annex.workers
Annex.changeState $ \s -> s { Annex.workers = [] }
l <- liftIO $ drainTo 0 ws
forM_ (lefts l) mergeState forM_ (lefts l) mergeState
{- Wait for Asyncs from the list to finish, replacing them with their {- Wait for Asyncs from the list to finish, replacing them with their
@ -138,3 +147,19 @@ callCommandAction = start
skip = return True skip = return True
failure = showEndFail >> return False failure = showEndFail >> return False
status r = showEndResult r >> return r status r = showEndResult r >> return r
{- Do concurrent output when that has been requested. -}
allowConcurrentOutput :: Annex a -> Annex a
#ifdef WITH_CONCURRENTOUTPUT
allowConcurrentOutput a = go =<< Annex.getState Annex.concurrentjobs
where
go Nothing = a
go (Just n) = Regions.displayConsoleRegions $
bracket_ (setup n) cleanup a
setup = Annex.setOutput . ConcurrentOutput
cleanup = do
finishCommandActions
Annex.setOutput NormalOutput
#else
allowConcurrentOutput = id
#endif

View file

@ -19,7 +19,6 @@ module Command (
whenAnnexed, whenAnnexed,
ifAnnexed, ifAnnexed,
isBareRepo, isBareRepo,
allowConcurrentOutput,
module ReExported module ReExported
) where ) where
@ -37,7 +36,6 @@ import CmdLine.Option as ReExported
import CmdLine.GlobalSetter as ReExported import CmdLine.GlobalSetter as ReExported
import CmdLine.GitAnnex.Options as ReExported import CmdLine.GitAnnex.Options as ReExported
import Options.Applicative as ReExported hiding (command) import Options.Applicative as ReExported hiding (command)
import Messages.Internal (allowConcurrentOutput)
import qualified Options.Applicative as O import qualified Options.Applicative as O

96
Messages/Concurrent.hs Normal file
View file

@ -0,0 +1,96 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
{- git-annex output messages, including concurrent output to display regions
-
- Copyright 2010-2015 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU GPL version 3 or higher.
-}
{-# LANGUAGE CPP #-}
module Messages.Concurrent where
import Common
import Annex
import Types.Messages
#ifdef WITH_CONCURRENTOUTPUT
import qualified System.Console.Concurrent as Console
import qualified System.Console.Regions as Regions
import Control.Concurrent.STM
import qualified Data.Text as T
#endif
{- Outputs a message in a concurrency safe way.
-
- The message may be an error message, in which case it goes to stderr.
-
- When built without concurrent-output support, the fallback action is run
- instead.
-}
concurrentMessage :: Bool -> String -> Annex () -> Annex ()
#ifdef WITH_CONCURRENTOUTPUT
concurrentMessage iserror msg _ = go =<< consoleRegion <$> Annex.getState Annex.output
where
go Nothing
| iserror = liftIO $ Console.errorConcurrent msg
| otherwise = liftIO $ Console.outputConcurrent msg
go (Just r) = do
-- Can't display the error to stdout while
-- console regions are in use, so set the errflag
-- to get it to display to stderr later.
when iserror $ do
Annex.changeState $ \s ->
s { Annex.output = (Annex.output s) { consoleRegionErrFlag = True } }
liftIO $ Regions.appendConsoleRegion r msg
#else
concurrentMessage _ _ fallback = fallback
#endif
{- Runs an action in its own dedicated region of the console.
-
- The region is closed at the end or on exception, and at that point
- the value of the region is displayed in the scrolling area above
- any other active regions.
-
- When not at a console, a region is not displayed until the action is
- complete.
-}
inOwnConsoleRegion :: Annex a -> Annex a
#ifdef WITH_CONCURRENTOUTPUT
inOwnConsoleRegion a = do
r <- mkregion
setregion (Just r)
eret <- tryNonAsync a `onException` rmregion r
case eret of
Left e -> do
-- Add error message to region before it closes.
concurrentMessage True (show e) noop
rmregion r
throwM e
Right ret -> do
rmregion r
return ret
where
mkregion = Regions.openConsoleRegion Regions.Linear
setregion r = Annex.changeState $ \s -> s { Annex.output = (Annex.output s) { consoleRegion = r } }
rmregion r = do
errflag <- consoleRegionErrFlag <$> Annex.getState Annex.output
let h = if errflag then Console.StdErr else Console.StdOut
Annex.changeState $ \s ->
s { Annex.output = (Annex.output s) { consoleRegionErrFlag = False } }
setregion Nothing
liftIO $ atomically $ do
t <- Regions.getConsoleRegion r
unless (T.null t) $
Console.bufferOutputSTM h t
Regions.closeConsoleRegion r
#else
inOwnConsoleRegion = id
#endif
#ifdef WITH_CONCURRENTOUTPUT
instance Regions.LiftRegion Annex where
liftRegion = liftIO . atomically
#endif

View file

@ -1,5 +1,3 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
{- git-annex output messages, including concurrent output to display regions {- git-annex output messages, including concurrent output to display regions
- -
- Copyright 2010-2015 Joey Hess <id@joeyh.name> - Copyright 2010-2015 Joey Hess <id@joeyh.name>
@ -7,20 +5,12 @@
- Licensed under the GNU GPL version 3 or higher. - Licensed under the GNU GPL version 3 or higher.
-} -}
{-# LANGUAGE CPP #-}
module Messages.Internal where module Messages.Internal where
import Common import Common
import Annex import Annex
import Types.Messages import Types.Messages
import Messages.Concurrent
#ifdef WITH_CONCURRENTOUTPUT
import qualified System.Console.Concurrent as Console
import qualified System.Console.Regions as Regions
import Control.Concurrent.STM
import qualified Data.Text as T
#endif
withOutputType :: (OutputType -> Annex a) -> Annex a withOutputType :: (OutputType -> Annex a) -> Annex a
withOutputType a = outputType <$> Annex.getState Annex.output >>= a withOutputType a = outputType <$> Annex.getState Annex.output >>= a
@ -48,81 +38,3 @@ q = noop
flushed :: IO () -> IO () flushed :: IO () -> IO ()
flushed a = a >> hFlush stdout flushed a = a >> hFlush stdout
{- Outputs a message in a concurrency safe way.
-
- The message may be an error message, in which case it goes to stderr.
-
- When built without concurrent-output support, the fallback action is run
- instead.
-}
concurrentMessage :: Bool -> String -> Annex () -> Annex ()
#ifdef WITH_CONCURRENTOUTPUT
concurrentMessage iserror msg _ = go =<< consoleRegion <$> Annex.getState Annex.output
where
go Nothing
| iserror = liftIO $ Console.errorConcurrent msg
| otherwise = liftIO $ Console.outputConcurrent msg
go (Just r) = do
-- Can't display the error to stdout while
-- console regions are in use, so set the errflag
-- to get it to display to stderr later.
when iserror $ do
Annex.changeState $ \s ->
s { Annex.output = (Annex.output s) { consoleRegionErrFlag = True } }
liftIO $ Regions.appendConsoleRegion r msg
#else
concurrentMessage _ _ fallback = fallback
#endif
{- Do concurrent output when that has been requested. -}
allowConcurrentOutput :: Annex a -> Annex a
#ifdef WITH_CONCURRENTOUTPUT
allowConcurrentOutput a = go =<< Annex.getState Annex.concurrentjobs
where
go (Just n) = Regions.displayConsoleRegions $ bracket_
(Annex.setOutput (ConcurrentOutput n))
(Annex.setOutput NormalOutput)
a
go Nothing = a
#else
allowConcurrentOutput = id
#endif
{- Runs an action in its own dedicated region of the console.
-
- The region is closed at the end or on exception, and at that point
- the value of the region is displayed in the scrolling area above
- any other active regions.
-
- When not at a console, a region is not displayed until the action is
- complete.
-}
inOwnConsoleRegion :: Annex a -> Annex a
#ifdef WITH_CONCURRENTOUTPUT
inOwnConsoleRegion a = bracket mkregion rmregion go
where
go r = do
setregion (Just r)
a
mkregion = Regions.openConsoleRegion Regions.Linear
rmregion r = do
errflag <- consoleRegionErrFlag <$> Annex.getState Annex.output
let h = if errflag then Console.StdErr else Console.StdOut
Annex.changeState $ \s ->
s { Annex.output = (Annex.output s) { consoleRegionErrFlag = False } }
setregion Nothing
liftIO $ atomically $ do
t <- Regions.getConsoleRegion r
unless (T.null t) $
Console.bufferOutputSTM h t
Regions.closeConsoleRegion r
setregion r = Annex.changeState $ \s -> s { Annex.output = (Annex.output s) { consoleRegion = r } }
#else
inOwnConsoleRegion = id
#endif
#ifdef WITH_CONCURRENTOUTPUT
instance Regions.LiftRegion Annex where
liftRegion = liftIO . atomically
#endif