disentangle concurrency and message type

This makes -Jn work with --json and --quiet, where before
setting -Jn disabled those options.

Concurrent json output is currently a mess though since threads output
chunks over top of one-another.
This commit is contained in:
Joey Hess 2016-09-09 12:57:42 -04:00
parent 8e9267a1ed
commit 8ef494a833
No known key found for this signature in database
GPG key ID: C910D9222512E3C7
12 changed files with 96 additions and 84 deletions

View file

@ -56,6 +56,7 @@ import Types.BranchState
import Types.TrustLevel import Types.TrustLevel
import Types.Group import Types.Group
import Types.Messages import Types.Messages
import Types.Concurrency
import Types.UUID import Types.UUID
import Types.FileMatcher import Types.FileMatcher
import Types.NumCopies import Types.NumCopies
@ -101,6 +102,7 @@ data AnnexState = AnnexState
, remotes :: [Types.Remote.RemoteA Annex] , remotes :: [Types.Remote.RemoteA Annex]
, remoteannexstate :: M.Map UUID AnnexState , remoteannexstate :: M.Map UUID AnnexState
, output :: MessageState , output :: MessageState
, concurrency :: Concurrency
, force :: Bool , force :: Bool
, fast :: Bool , fast :: Bool
, daemon :: Bool , daemon :: Bool
@ -134,7 +136,6 @@ data AnnexState = AnnexState
, existinghooks :: M.Map Git.Hook.Hook Bool , existinghooks :: M.Map Git.Hook.Hook Bool
, desktopnotify :: DesktopNotify , desktopnotify :: DesktopNotify
, workers :: [Either AnnexState (Async AnnexState)] , workers :: [Either AnnexState (Async AnnexState)]
, concurrentjobs :: Maybe Int
, activeremotes :: MVar (S.Set (Types.Remote.RemoteA Annex)) , activeremotes :: MVar (S.Set (Types.Remote.RemoteA Annex))
, keysdbhandle :: Maybe Keys.DbHandle , keysdbhandle :: Maybe Keys.DbHandle
, cachedcurrentbranch :: Maybe Git.Branch , cachedcurrentbranch :: Maybe Git.Branch
@ -151,6 +152,7 @@ newState c r = do
, remotes = [] , remotes = []
, remoteannexstate = M.empty , remoteannexstate = M.empty
, output = def , output = def
, concurrency = NonConcurrent
, force = False , force = False
, fast = False , fast = False
, daemon = False , daemon = False
@ -184,7 +186,6 @@ newState c r = do
, existinghooks = M.empty , existinghooks = M.empty
, desktopnotify = mempty , desktopnotify = mempty
, workers = [] , workers = []
, concurrentjobs = Nothing
, activeremotes = emptyactiveremotes , activeremotes = emptyactiveremotes
, keysdbhandle = Nothing , keysdbhandle = Nothing
, cachedcurrentbranch = Nothing , cachedcurrentbranch = Nothing

View file

@ -28,6 +28,7 @@ import Utility.Metered
import Annex.LockPool import Annex.LockPool
import Types.Remote (Verification(..)) import Types.Remote (Verification(..))
import qualified Types.Remote as Remote import qualified Types.Remote as Remote
import Types.Concurrency
import Control.Concurrent import Control.Concurrent
import qualified Data.Set as S import qualified Data.Set as S
@ -180,11 +181,11 @@ forwardRetry old new = bytesComplete old < bytesComplete new
- increase total transfer speed. - increase total transfer speed.
-} -}
pickRemote :: Observable v => [Remote] -> (Remote -> Annex v) -> Annex v pickRemote :: Observable v => [Remote] -> (Remote -> Annex v) -> Annex v
pickRemote l a = go l =<< Annex.getState Annex.concurrentjobs pickRemote l a = go l =<< Annex.getState Annex.concurrency
where where
go [] _ = return observeFailure go [] _ = return observeFailure
go (r:[]) _ = a r go (r:[]) _ = a r
go rs (Just n) | n > 1 = do go rs (Concurrent n) | n > 1 = do
mv <- Annex.getState Annex.activeremotes mv <- Annex.getState Annex.activeremotes
active <- liftIO $ takeMVar mv active <- liftIO $ takeMVar mv
let rs' = sortBy (inactiveFirst active) rs let rs' = sortBy (inactiveFirst active) rs
@ -193,7 +194,7 @@ pickRemote l a = go l =<< Annex.getState Annex.concurrentjobs
ok <- a r ok <- a r
if observeBool ok if observeBool ok
then return ok then return ok
else go rs Nothing else go rs NonConcurrent
goconcurrent mv active [] = do goconcurrent mv active [] = do
liftIO $ putMVar mv active liftIO $ putMVar mv active
return observeFailure return observeFailure

View file

@ -13,6 +13,7 @@ import Annex.Common
import qualified Annex import qualified Annex
import Annex.Concurrent import Annex.Concurrent
import Types.Command import Types.Command
import Types.Concurrency
import Messages.Concurrent import Messages.Concurrent
import Types.Messages import Types.Messages
@ -50,9 +51,9 @@ performCommandAction Command { cmdcheck = c, cmdname = name } seek cont = do
- This should only be run in the seek stage. - This should only be run in the seek stage.
-} -}
commandAction :: CommandStart -> Annex () commandAction :: CommandStart -> Annex ()
commandAction a = withOutputType go commandAction a = go =<< Annex.getState Annex.concurrency
where where
go o@(ConcurrentOutput n _) = do go (Concurrent n) = do
ws <- Annex.getState Annex.workers ws <- Annex.getState Annex.workers
(st, ws') <- if null ws (st, ws') <- if null ws
then do then do
@ -62,9 +63,9 @@ commandAction a = withOutputType go
l <- liftIO $ drainTo (n-1) ws l <- liftIO $ drainTo (n-1) ws
findFreeSlot l findFreeSlot l
w <- liftIO $ async w <- liftIO $ async
$ snd <$> Annex.run st (inOwnConsoleRegion o run) $ snd <$> Annex.run st (inOwnConsoleRegion (Annex.output st) run)
Annex.changeState $ \s -> s { Annex.workers = Right w:ws' } Annex.changeState $ \s -> s { Annex.workers = Right w:ws' }
go _ = run go NonConcurrent = run
run = void $ includeCommandAction a run = void $ includeCommandAction a
{- Waits for any forked off command actions to finish. {- Waits for any forked off command actions to finish.
@ -151,19 +152,21 @@ callCommandAction' = start
{- Do concurrent output when that has been requested. -} {- Do concurrent output when that has been requested. -}
allowConcurrentOutput :: Annex a -> Annex a allowConcurrentOutput :: Annex a -> Annex a
#ifdef WITH_CONCURRENTOUTPUT #ifdef WITH_CONCURRENTOUTPUT
allowConcurrentOutput a = go =<< Annex.getState Annex.concurrentjobs allowConcurrentOutput a = go =<< Annex.getState Annex.concurrency
where where
go Nothing = a go NonConcurrent = a
go (Just n) = ifM (liftIO concurrentOutputSupported) go (Concurrent _) = ifM (liftIO concurrentOutputSupported)
( Regions.displayConsoleRegions $ ( Regions.displayConsoleRegions $
goconcurrent (ConcurrentOutput n True) goconcurrent True
, goconcurrent (ConcurrentOutput n False) , goconcurrent False
) )
goconcurrent o = bracket_ (setup o) cleanup a goconcurrent b = bracket_ (setup b) cleanup a
setup = Annex.setOutput setup = setconcurrentenabled
cleanup = do cleanup = do
finishCommandActions finishCommandActions
Annex.setOutput NormalOutput setconcurrentenabled False
setconcurrentenabled b = Annex.changeState $ \s ->
s { Annex.output = (Annex.output s) { concurrentOutputEnabled = b } }
#else #else
allowConcurrentOutput = id allowConcurrentOutput = id
#endif #endif

View file

@ -21,6 +21,7 @@ import Types.Messages
import Types.Command import Types.Command
import Types.DeferredParse import Types.DeferredParse
import Types.DesktopNotify import Types.DesktopNotify
import Types.Concurrency
import qualified Annex import qualified Annex
import qualified Remote import qualified Remote
import qualified Limit import qualified Limit
@ -302,7 +303,7 @@ jobsOption = globalSetter set $
) )
where where
set n = do set n = do
Annex.changeState $ \s -> s { Annex.concurrentjobs = Just n } Annex.changeState $ \s -> s { Annex.concurrency = Concurrent n }
c <- liftIO getNumCapabilities c <- liftIO getNumCapabilities
when (n > c) $ when (n > c) $
liftIO $ setNumCapabilities n liftIO $ setNumCapabilities n

View file

@ -78,7 +78,7 @@ seek o = do
(startKeys now o) (startKeys now o)
(seeker $ whenAnnexed $ start now o) (seeker $ whenAnnexed $ start now o)
(forFiles o) (forFiles o)
Batch -> withOutputType $ \ot -> case ot of Batch -> withMessageState $ \s -> case outputType s of
JSONOutput -> batchInput parseJSONInput $ JSONOutput -> batchInput parseJSONInput $
commandAction . startBatch now commandAction . startBatch now
_ -> error "--batch is currently only supported in --json mode" _ -> error "--batch is currently only supported in --json mode"

View file

@ -40,7 +40,7 @@ module Messages (
commandProgressDisabled, commandProgressDisabled,
outputMessage, outputMessage,
implicitMessage, implicitMessage,
withOutputType, withMessageState,
) where ) where
import System.Log.Logger import System.Log.Logger
@ -155,17 +155,15 @@ indent = intercalate "\n" . map (\l -> " " ++ l) . lines
{- Shows a JSON chunk only when in json mode. -} {- Shows a JSON chunk only when in json mode. -}
maybeShowJSON :: JSONChunk v -> Annex () maybeShowJSON :: JSONChunk v -> Annex ()
maybeShowJSON v = withOutputType $ liftIO . go maybeShowJSON v = withMessageState $ \s -> case outputType s of
where JSONOutput -> liftIO $ JSON.add v
go JSONOutput = JSON.add v _ -> return ()
go _ = return ()
{- Shows a complete JSON value, only when in json mode. -} {- Shows a complete JSON value, only when in json mode. -}
showFullJSON :: JSONChunk v -> Annex Bool showFullJSON :: JSONChunk v -> Annex Bool
showFullJSON v = withOutputType $ liftIO . go showFullJSON v = withMessageState $ \s -> case outputType s of
where JSONOutput -> liftIO $ JSON.complete v >> return True
go JSONOutput = JSON.complete v >> return True _ -> return False
go _ = return False
{- Performs an action that outputs nonstandard/customized output, and {- Performs an action that outputs nonstandard/customized output, and
- in JSON mode wraps its output in JSON.start and JSON.end, so it's - in JSON mode wraps its output in JSON.start and JSON.end, so it's
@ -216,11 +214,11 @@ debugEnabled = do
{- Should commands that normally output progress messages have that {- Should commands that normally output progress messages have that
- output disabled? -} - output disabled? -}
commandProgressDisabled :: Annex Bool commandProgressDisabled :: Annex Bool
commandProgressDisabled = withOutputType $ \t -> return $ case t of commandProgressDisabled = withMessageState $ \s -> return $
QuietOutput -> True case outputType s of
JSONOutput -> True QuietOutput -> True
NormalOutput -> False JSONOutput -> True
ConcurrentOutput {} -> True NormalOutput -> concurrentOutputEnabled s
{- Use to show a message that is displayed implicitly, and so might be {- Use to show a message that is displayed implicitly, and so might be
- disabled when running a certian command that needs more control over its - disabled when running a certian command that needs more control over its

View file

@ -31,13 +31,13 @@ import GHC.IO.Encoding
- When built without concurrent-output support, the fallback action is run - When built without concurrent-output support, the fallback action is run
- instead. - instead.
-} -}
concurrentMessage :: OutputType -> Bool -> String -> Annex () -> Annex () concurrentMessage :: MessageState -> Bool -> String -> Annex () -> Annex ()
#ifdef WITH_CONCURRENTOUTPUT #ifdef WITH_CONCURRENTOUTPUT
concurrentMessage o iserror msg fallback concurrentMessage s iserror msg fallback
| concurrentOutputEnabled o = | concurrentOutputEnabled s =
go =<< consoleRegion <$> Annex.getState Annex.output go =<< consoleRegion <$> Annex.getState Annex.output
#else #else
concurrentMessage _o _iserror _msg fallback concurrentMessage _s _iserror _msg fallback
#endif #endif
| otherwise = fallback | otherwise = fallback
#ifdef WITH_CONCURRENTOUTPUT #ifdef WITH_CONCURRENTOUTPUT
@ -50,8 +50,8 @@ concurrentMessage _o _iserror _msg fallback
-- console regions are in use, so set the errflag -- console regions are in use, so set the errflag
-- to get it to display to stderr later. -- to get it to display to stderr later.
when iserror $ do when iserror $ do
Annex.changeState $ \s -> Annex.changeState $ \st ->
s { Annex.output = (Annex.output s) { consoleRegionErrFlag = True } } st { Annex.output = (Annex.output st) { consoleRegionErrFlag = True } }
liftIO $ atomically $ do liftIO $ atomically $ do
Regions.appendConsoleRegion r msg Regions.appendConsoleRegion r msg
rl <- takeTMVar Regions.regionList rl <- takeTMVar Regions.regionList
@ -68,24 +68,24 @@ concurrentMessage _o _iserror _msg fallback
- When not at a console, a region is not displayed until the action is - When not at a console, a region is not displayed until the action is
- complete. - complete.
-} -}
inOwnConsoleRegion :: OutputType -> Annex a -> Annex a inOwnConsoleRegion :: MessageState -> Annex a -> Annex a
#ifdef WITH_CONCURRENTOUTPUT #ifdef WITH_CONCURRENTOUTPUT
inOwnConsoleRegion o a inOwnConsoleRegion s a
| concurrentOutputEnabled o = do | concurrentOutputEnabled s = do
r <- mkregion r <- mkregion
setregion (Just r) setregion (Just r)
eret <- tryNonAsync a `onException` rmregion r eret <- tryNonAsync a `onException` rmregion r
case eret of case eret of
Left e -> do Left e -> do
-- Add error message to region before it closes. -- Add error message to region before it closes.
concurrentMessage o True (show e) noop concurrentMessage s True (show e) noop
rmregion r rmregion r
throwM e throwM e
Right ret -> do Right ret -> do
rmregion r rmregion r
return ret return ret
#else #else
inOwnConsoleRegion _o a inOwnConsoleRegion _s a
#endif #endif
| otherwise = a | otherwise = a
#ifdef WITH_CONCURRENTOUTPUT #ifdef WITH_CONCURRENTOUTPUT
@ -94,12 +94,13 @@ inOwnConsoleRegion _o a
-- a message is added to it. This avoids unnecessary screen -- a message is added to it. This avoids unnecessary screen
-- updates when a region does not turn out to need to be used. -- updates when a region does not turn out to need to be used.
mkregion = Regions.newConsoleRegion Regions.Linear "" mkregion = Regions.newConsoleRegion Regions.Linear ""
setregion r = Annex.changeState $ \s -> s { Annex.output = (Annex.output s) { consoleRegion = r } } setregion r = Annex.changeState $ \st -> st
{ Annex.output = (Annex.output st) { consoleRegion = r } }
rmregion r = do rmregion r = do
errflag <- consoleRegionErrFlag <$> Annex.getState Annex.output errflag <- consoleRegionErrFlag <$> Annex.getState Annex.output
let h = if errflag then Console.StdErr else Console.StdOut let h = if errflag then Console.StdErr else Console.StdOut
Annex.changeState $ \s -> Annex.changeState $ \st -> st
s { Annex.output = (Annex.output s) { consoleRegionErrFlag = False } } { Annex.output = (Annex.output st) { consoleRegionErrFlag = False } }
setregion Nothing setregion Nothing
liftIO $ atomically $ do liftIO $ atomically $ do
t <- Regions.getConsoleRegion r t <- Regions.getConsoleRegion r
@ -135,7 +136,3 @@ concurrentOutputSupported = return True -- Windows is always unicode
#else #else
concurrentOutputSupported = return False concurrentOutputSupported = return False
#endif #endif
concurrentOutputEnabled :: OutputType -> Bool
concurrentOutputEnabled (ConcurrentOutput _ b) = b
concurrentOutputEnabled _ = False

View file

@ -12,25 +12,26 @@ import Annex
import Types.Messages import Types.Messages
import Messages.Concurrent import Messages.Concurrent
withOutputType :: (OutputType -> Annex a) -> Annex a withMessageState :: (MessageState -> Annex a) -> Annex a
withOutputType a = outputType <$> Annex.getState Annex.output >>= a withMessageState a = Annex.getState Annex.output >>= a
outputMessage :: IO () -> String -> Annex () outputMessage :: IO () -> String -> Annex ()
outputMessage json s = withOutputType go outputMessage json msg = withMessageState $ \s -> case outputType s of
where NormalOutput
go NormalOutput = liftIO $ | concurrentOutputEnabled s -> concurrentMessage s False msg q
flushed $ putStr s | otherwise -> liftIO $ flushed $ putStr msg
go QuietOutput = q QuietOutput -> q
go o@(ConcurrentOutput {}) = concurrentMessage o False s q JSONOutput -> liftIO $ flushed json
go JSONOutput = liftIO $ flushed json
outputError :: String -> Annex () outputError :: String -> Annex ()
outputError s = withOutputType go outputError msg = withMessageState $ \s ->
if concurrentOutputEnabled s
then concurrentMessage s True msg go
else go
where where
go o@(ConcurrentOutput {}) = concurrentMessage o True s (go NormalOutput) go = liftIO $ do
go _ = liftIO $ do
hFlush stdout hFlush stdout
hPutStr stderr s hPutStr stderr msg
hFlush stderr hFlush stderr
q :: Monad m => m () q :: Monad m => m ()

View file

@ -32,11 +32,11 @@ import Data.Quantity
metered :: Maybe MeterUpdate -> Key -> (MeterUpdate -> Annex a) -> Annex a metered :: Maybe MeterUpdate -> Key -> (MeterUpdate -> Annex a) -> Annex a
metered othermeter key a = case keySize key of metered othermeter key a = case keySize key of
Nothing -> nometer Nothing -> nometer
Just size -> withOutputType (go $ fromInteger size) Just size -> withMessageState (go $ fromInteger size)
where where
go _ QuietOutput = nometer go _ (MessageState { outputType = QuietOutput }) = nometer
go _ JSONOutput = nometer go _ (MessageState { outputType = JSONOutput }) = nometer
go size NormalOutput = do go size (MessageState { outputType = NormalOutput, concurrentOutputEnabled = False }) = do
showOutput showOutput
(progress, meter) <- mkmeter size (progress, meter) <- mkmeter size
m <- liftIO $ rateLimitMeterUpdate 0.1 (Just size) $ \n -> do m <- liftIO $ rateLimitMeterUpdate 0.1 (Just size) $ \n -> do
@ -45,9 +45,9 @@ metered othermeter key a = case keySize key of
r <- a (combinemeter m) r <- a (combinemeter m)
liftIO $ clearMeter stdout meter liftIO $ clearMeter stdout meter
return r return r
go size (MessageState { outputType = NormalOutput, concurrentOutputEnabled = True }) =
#if WITH_CONCURRENTOUTPUT #if WITH_CONCURRENTOUTPUT
go size o@(ConcurrentOutput {}) withProgressRegion $ \r -> do
| concurrentOutputEnabled o = withProgressRegion $ \r -> do
(progress, meter) <- mkmeter size (progress, meter) <- mkmeter size
m <- liftIO $ rateLimitMeterUpdate 0.1 (Just size) $ \n -> do m <- liftIO $ rateLimitMeterUpdate 0.1 (Just size) $ \n -> do
setP progress $ fromBytesProcessed n setP progress $ fromBytesProcessed n
@ -55,9 +55,8 @@ metered othermeter key a = case keySize key of
Regions.setConsoleRegion r ("\n" ++ s) Regions.setConsoleRegion r ("\n" ++ s)
a (combinemeter m) a (combinemeter m)
#else #else
go _size _o nometer
#endif #endif
| otherwise = nometer
mkmeter size = do mkmeter size = do
progress <- liftIO $ newProgress "" size progress <- liftIO $ newProgress "" size
@ -73,18 +72,18 @@ metered othermeter key a = case keySize key of
{- Use when the progress meter is only desired for concurrent {- Use when the progress meter is only desired for concurrent
- output; as when a command's own progress output is preferred. -} - output; as when a command's own progress output is preferred. -}
concurrentMetered :: Maybe MeterUpdate -> Key -> (MeterUpdate -> Annex a) -> Annex a concurrentMetered :: Maybe MeterUpdate -> Key -> (MeterUpdate -> Annex a) -> Annex a
concurrentMetered combinemeterupdate key a = withOutputType go concurrentMetered combinemeterupdate key a =
where withMessageState $ \s -> if concurrentOutputEnabled s
go (ConcurrentOutput {}) = metered combinemeterupdate key a then metered combinemeterupdate key a
go _ = a (fromMaybe nullMeterUpdate combinemeterupdate) else a (fromMaybe nullMeterUpdate combinemeterupdate)
{- Poll file size to display meter, but only for concurrent output. -} {- Poll file size to display meter, but only for concurrent output. -}
concurrentMeteredFile :: FilePath -> Maybe MeterUpdate -> Key -> Annex a -> Annex a concurrentMeteredFile :: FilePath -> Maybe MeterUpdate -> Key -> Annex a -> Annex a
concurrentMeteredFile file combinemeterupdate key a = withOutputType go concurrentMeteredFile file combinemeterupdate key a =
where withMessageState $ \s -> if concurrentOutputEnabled s
go (ConcurrentOutput {}) = metered combinemeterupdate key $ \p -> then metered combinemeterupdate key $ \p ->
watchFileSize file p a watchFileSize file p a
go _ = a else a
{- Progress dots. -} {- Progress dots. -}
showProgressDots :: Annex () showProgressDots :: Annex ()
@ -123,9 +122,9 @@ mkStderrRelayer = do
- messing it up with interleaved stderr from a command. - messing it up with interleaved stderr from a command.
-} -}
mkStderrEmitter :: Annex (String -> IO ()) mkStderrEmitter :: Annex (String -> IO ())
mkStderrEmitter = withOutputType go mkStderrEmitter = withMessageState go
where where
#ifdef WITH_CONCURRENTOUTPUT #ifdef WITH_CONCURRENTOUTPUT
go o | concurrentOutputEnabled o = return Console.errorConcurrent go s | concurrentOutputEnabled s = return Console.errorConcurrent
#endif #endif
go _ = return (hPutStrLn stderr) go _ = return (hPutStrLn stderr)

8
Types/Concurrency.hs Normal file
View file

@ -0,0 +1,8 @@
{- Copyright 2016 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Types.Concurrency where
data Concurrency = NonConcurrent | Concurrent Int

View file

@ -15,7 +15,7 @@ import Data.Default
import System.Console.Regions (ConsoleRegion) import System.Console.Regions (ConsoleRegion)
#endif #endif
data OutputType = NormalOutput | QuietOutput | ConcurrentOutput Int Bool | JSONOutput data OutputType = NormalOutput | QuietOutput | JSONOutput
deriving (Show) deriving (Show)
data SideActionBlock = NoBlock | StartBlock | InBlock data SideActionBlock = NoBlock | StartBlock | InBlock
@ -23,6 +23,7 @@ data SideActionBlock = NoBlock | StartBlock | InBlock
data MessageState = MessageState data MessageState = MessageState
{ outputType :: OutputType { outputType :: OutputType
, concurrentOutputEnabled :: Bool
, sideActionBlock :: SideActionBlock , sideActionBlock :: SideActionBlock
, implicitMessages :: Bool , implicitMessages :: Bool
#ifdef WITH_CONCURRENTOUTPUT #ifdef WITH_CONCURRENTOUTPUT
@ -35,6 +36,7 @@ instance Default MessageState
where where
def = MessageState def = MessageState
{ outputType = NormalOutput { outputType = NormalOutput
, concurrentOutputEnabled = False
, sideActionBlock = NoBlock , sideActionBlock = NoBlock
, implicitMessages = True , implicitMessages = True
#ifdef WITH_CONCURRENTOUTPUT #ifdef WITH_CONCURRENTOUTPUT

View file

@ -949,6 +949,7 @@ Executable git-annex
Types.BranchState Types.BranchState
Types.CleanupActions Types.CleanupActions
Types.Command Types.Command
Types.Concurrency
Types.Creds Types.Creds
Types.Crypto Types.Crypto
Types.DeferredParse Types.DeferredParse