Merge branch 'concurrentprogress'
This commit is contained in:
commit
1d57f142f1
26 changed files with 342 additions and 142 deletions
2
Annex.hs
2
Annex.hs
|
@ -142,7 +142,7 @@ newState c r = AnnexState
|
|||
, backends = []
|
||||
, remotes = []
|
||||
, remoteannexstate = M.empty
|
||||
, output = defaultMessageState
|
||||
, output = def
|
||||
, force = False
|
||||
, fast = False
|
||||
, daemon = False
|
||||
|
|
|
@ -57,6 +57,7 @@ import Annex.Link
|
|||
import Annex.Content.Direct
|
||||
import Annex.ReplaceFile
|
||||
import Utility.LockFile
|
||||
import Messages.Progress
|
||||
|
||||
{- Checks if a given key's content is currently present. -}
|
||||
inAnnex :: Key -> Annex Bool
|
||||
|
@ -555,12 +556,17 @@ saveState nocommit = doSideAction $ do
|
|||
downloadUrl :: [Url.URLString] -> FilePath -> Annex Bool
|
||||
downloadUrl urls file = go =<< annexWebDownloadCommand <$> Annex.getGitConfig
|
||||
where
|
||||
go Nothing = Url.withUrlOptions $ \uo ->
|
||||
anyM (\u -> Url.download u file uo) urls
|
||||
go (Just basecmd) = liftIO $ anyM (downloadcmd basecmd) urls
|
||||
go Nothing = do
|
||||
a <- ifM commandProgressDisabled
|
||||
( return Url.downloadQuiet
|
||||
, return Url.download
|
||||
)
|
||||
Url.withUrlOptions $ \uo ->
|
||||
anyM (\u -> a u file uo) urls
|
||||
go (Just basecmd) = anyM (downloadcmd basecmd) urls
|
||||
downloadcmd basecmd url =
|
||||
boolSystem "sh" [Param "-c", Param $ gencmd url basecmd]
|
||||
<&&> doesFileExist file
|
||||
progressCommand "sh" [Param "-c", Param $ gencmd url basecmd]
|
||||
<&&> liftIO (doesFileExist file)
|
||||
gencmd url = massReplace
|
||||
[ ("%file", shellEscape file)
|
||||
, ("%url", shellEscape url)
|
||||
|
|
|
@ -16,7 +16,7 @@ import Logs.Location
|
|||
import Annex.Transfer
|
||||
import qualified Remote
|
||||
import Types.Key
|
||||
import Utility.SimpleProtocol (ioHandles)
|
||||
import Utility.SimpleProtocol (dupIoHandles)
|
||||
import Git.Types (RemoteName)
|
||||
|
||||
data TransferRequest = TransferRequest Direction Remote Key AssociatedFile
|
||||
|
@ -30,7 +30,7 @@ seek = withNothing start
|
|||
|
||||
start :: CommandStart
|
||||
start = do
|
||||
(readh, writeh) <- liftIO ioHandles
|
||||
(readh, writeh) <- liftIO dupIoHandles
|
||||
runRequests readh writeh runner
|
||||
stop
|
||||
where
|
||||
|
|
93
Messages.hs
93
Messages.hs
|
@ -10,9 +10,6 @@ module Messages (
|
|||
showStart',
|
||||
showNote,
|
||||
showAction,
|
||||
showProgressDots,
|
||||
metered,
|
||||
meteredBytes,
|
||||
showSideAction,
|
||||
doSideAction,
|
||||
doQuietSideAction,
|
||||
|
@ -33,28 +30,26 @@ module Messages (
|
|||
showRaw,
|
||||
setupConsole,
|
||||
enableDebugOutput,
|
||||
disableDebugOutput
|
||||
disableDebugOutput,
|
||||
commandProgressDisabled,
|
||||
) where
|
||||
|
||||
import Text.JSON
|
||||
import Data.Progress.Meter
|
||||
import Data.Progress.Tracker
|
||||
import Data.Quantity
|
||||
import System.Log.Logger
|
||||
import System.Log.Formatter
|
||||
import System.Log.Handler (setFormatter)
|
||||
import System.Log.Handler.Simple
|
||||
|
||||
import Common hiding (handle)
|
||||
import Common
|
||||
import Types
|
||||
import Types.Messages
|
||||
import Messages.Internal
|
||||
import qualified Messages.JSON as JSON
|
||||
import Types.Key
|
||||
import qualified Annex
|
||||
import Utility.Metered
|
||||
|
||||
showStart :: String -> FilePath -> Annex ()
|
||||
showStart command file = handle (JSON.start command $ Just file) $
|
||||
showStart command file = handleMessage (JSON.start command $ Just file) $
|
||||
flushed $ putStr $ command ++ " " ++ file ++ " "
|
||||
|
||||
showStart' :: String -> Key -> Maybe FilePath -> Annex ()
|
||||
|
@ -62,42 +57,12 @@ showStart' command key afile = showStart command $
|
|||
fromMaybe (key2file key) afile
|
||||
|
||||
showNote :: String -> Annex ()
|
||||
showNote s = handle (JSON.note s) $
|
||||
showNote s = handleMessage (JSON.note s) $
|
||||
flushed $ putStr $ "(" ++ s ++ ") "
|
||||
|
||||
showAction :: String -> Annex ()
|
||||
showAction s = showNote $ s ++ "..."
|
||||
|
||||
{- Progress dots. -}
|
||||
showProgressDots :: Annex ()
|
||||
showProgressDots = handle q $
|
||||
flushed $ putStr "."
|
||||
|
||||
{- Shows a progress meter while performing a transfer of a key.
|
||||
- The action is passed a callback to use to update the meter. -}
|
||||
metered :: Maybe MeterUpdate -> Key -> (MeterUpdate -> Annex a) -> Annex a
|
||||
metered combinemeterupdate key a = go (keySize key)
|
||||
where
|
||||
go (Just size) = meteredBytes combinemeterupdate size a
|
||||
go _ = a (const noop)
|
||||
|
||||
{- Shows a progress meter while performing an action on a given number
|
||||
- of bytes. -}
|
||||
meteredBytes :: Maybe MeterUpdate -> Integer -> (MeterUpdate -> Annex a) -> Annex a
|
||||
meteredBytes combinemeterupdate size a = withOutputType go
|
||||
where
|
||||
go NormalOutput = do
|
||||
progress <- liftIO $ newProgress "" size
|
||||
meter <- liftIO $ newMeter progress "B" 25 (renderNums binaryOpts 1)
|
||||
showOutput
|
||||
r <- a $ \n -> liftIO $ do
|
||||
setP progress $ fromBytesProcessed n
|
||||
displayMeter stdout meter
|
||||
maybe noop (\m -> m n) combinemeterupdate
|
||||
liftIO $ clearMeter stdout meter
|
||||
return r
|
||||
go _ = a (const noop)
|
||||
|
||||
showSideAction :: String -> Annex ()
|
||||
showSideAction m = Annex.getState Annex.output >>= go
|
||||
where
|
||||
|
@ -108,7 +73,7 @@ showSideAction m = Annex.getState Annex.output >>= go
|
|||
Annex.changeState $ \s -> s { Annex.output = st' }
|
||||
| sideActionBlock st == InBlock = return ()
|
||||
| otherwise = p
|
||||
p = handle q $ putStrLn $ "(" ++ m ++ "...)"
|
||||
p = handleMessage q $ putStrLn $ "(" ++ m ++ "...)"
|
||||
|
||||
showStoringStateAction :: Annex ()
|
||||
showStoringStateAction = showSideAction "recording state in git"
|
||||
|
@ -130,12 +95,13 @@ doSideAction' b a = do
|
|||
where
|
||||
set o = Annex.changeState $ \s -> s { Annex.output = o }
|
||||
|
||||
{- Make way for subsequent output of a command. -}
|
||||
showOutput :: Annex ()
|
||||
showOutput = handle q $
|
||||
putStr "\n"
|
||||
showOutput = unlessM commandProgressDisabled $
|
||||
handleMessage q $ putStr "\n"
|
||||
|
||||
showLongNote :: String -> Annex ()
|
||||
showLongNote s = handle (JSON.note s) $
|
||||
showLongNote s = handleMessage (JSON.note s) $
|
||||
putStrLn $ '\n' : indent s
|
||||
|
||||
showEndOk :: Annex ()
|
||||
|
@ -145,7 +111,7 @@ showEndFail :: Annex ()
|
|||
showEndFail = showEndResult False
|
||||
|
||||
showEndResult :: Bool -> Annex ()
|
||||
showEndResult ok = handle (JSON.end ok) $ putStrLn msg
|
||||
showEndResult ok = handleMessage (JSON.end ok) $ putStrLn msg
|
||||
where
|
||||
msg
|
||||
| ok = "ok"
|
||||
|
@ -159,7 +125,7 @@ warning = warning' . indent
|
|||
|
||||
warning' :: String -> Annex ()
|
||||
warning' w = do
|
||||
handle q $ putStr "\n"
|
||||
handleMessage q $ putStr "\n"
|
||||
liftIO $ do
|
||||
hFlush stdout
|
||||
hPutStrLn stderr w
|
||||
|
@ -175,7 +141,7 @@ indent = intercalate "\n" . map (\l -> " " ++ l) . lines
|
|||
|
||||
{- Shows a JSON fragment only when in json mode. -}
|
||||
maybeShowJSON :: JSON a => [(String, a)] -> Annex ()
|
||||
maybeShowJSON v = handle (JSON.add v) q
|
||||
maybeShowJSON v = handleMessage (JSON.add v) q
|
||||
|
||||
{- Shows a complete JSON value, only when in json mode. -}
|
||||
showFullJSON :: JSON a => [(String, a)] -> Annex Bool
|
||||
|
@ -190,16 +156,16 @@ showFullJSON v = withOutputType $ liftIO . go
|
|||
- This is only needed when showStart and showEndOk is not used. -}
|
||||
showCustom :: String -> Annex Bool -> Annex ()
|
||||
showCustom command a = do
|
||||
handle (JSON.start command Nothing) q
|
||||
handleMessage (JSON.start command Nothing) q
|
||||
r <- a
|
||||
handle (JSON.end r) q
|
||||
handleMessage (JSON.end r) q
|
||||
|
||||
showHeader :: String -> Annex ()
|
||||
showHeader h = handle q $
|
||||
showHeader h = handleMessage q $
|
||||
flushed $ putStr $ h ++ ": "
|
||||
|
||||
showRaw :: String -> Annex ()
|
||||
showRaw s = handle q $ putStrLn s
|
||||
showRaw s = handleMessage q $ putStrLn s
|
||||
|
||||
setupConsole :: IO ()
|
||||
setupConsole = do
|
||||
|
@ -219,18 +185,11 @@ enableDebugOutput = updateGlobalLogger rootLoggerName $ setLevel DEBUG
|
|||
disableDebugOutput :: IO ()
|
||||
disableDebugOutput = updateGlobalLogger rootLoggerName $ setLevel NOTICE
|
||||
|
||||
handle :: IO () -> IO () -> Annex ()
|
||||
handle json normal = withOutputType go
|
||||
where
|
||||
go NormalOutput = liftIO normal
|
||||
go QuietOutput = q
|
||||
go JSONOutput = liftIO $ flushed json
|
||||
|
||||
q :: Monad m => m ()
|
||||
q = noop
|
||||
|
||||
flushed :: IO () -> IO ()
|
||||
flushed a = a >> hFlush stdout
|
||||
|
||||
withOutputType :: (OutputType -> Annex a) -> Annex a
|
||||
withOutputType a = outputType <$> Annex.getState Annex.output >>= a
|
||||
{- Should commands that normally output progress messages have that
|
||||
- output disabled? -}
|
||||
commandProgressDisabled :: Annex Bool
|
||||
commandProgressDisabled = withOutputType $ \t -> return $ case t of
|
||||
QuietOutput -> True
|
||||
ProgressOutput -> True
|
||||
JSONOutput -> True
|
||||
NormalOutput -> False
|
||||
|
|
30
Messages/Internal.hs
Normal file
30
Messages/Internal.hs
Normal file
|
@ -0,0 +1,30 @@
|
|||
{- git-annex output messages
|
||||
-
|
||||
- Copyright 2010-2014 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
module Messages.Internal where
|
||||
|
||||
import Common
|
||||
import Types
|
||||
import Types.Messages
|
||||
import qualified Annex
|
||||
|
||||
handleMessage :: IO () -> IO () -> Annex ()
|
||||
handleMessage json normal = withOutputType go
|
||||
where
|
||||
go NormalOutput = liftIO normal
|
||||
go QuietOutput = q
|
||||
go ProgressOutput = q
|
||||
go JSONOutput = liftIO $ flushed json
|
||||
|
||||
q :: Monad m => m ()
|
||||
q = noop
|
||||
|
||||
flushed :: IO () -> IO ()
|
||||
flushed a = a >> hFlush stdout
|
||||
|
||||
withOutputType :: (OutputType -> Annex a) -> Annex a
|
||||
withOutputType a = outputType <$> Annex.getState Annex.output >>= a
|
88
Messages/Progress.hs
Normal file
88
Messages/Progress.hs
Normal file
|
@ -0,0 +1,88 @@
|
|||
{- git-annex progress output
|
||||
-
|
||||
- Copyright 2010-2015 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
module Messages.Progress where
|
||||
|
||||
import Common
|
||||
import Messages
|
||||
import Messages.Internal
|
||||
import Utility.Metered
|
||||
import Types
|
||||
import Types.Messages
|
||||
import Types.Key
|
||||
|
||||
import Data.Progress.Meter
|
||||
import Data.Progress.Tracker
|
||||
import Data.Quantity
|
||||
|
||||
{- Shows a progress meter while performing a transfer of a key.
|
||||
- The action is passed a callback to use to update the meter. -}
|
||||
metered :: Maybe MeterUpdate -> Key -> (MeterUpdate -> Annex a) -> Annex a
|
||||
metered combinemeterupdate key a = go (keySize key)
|
||||
where
|
||||
go (Just size) = meteredBytes combinemeterupdate size a
|
||||
go _ = a (const noop)
|
||||
|
||||
{- Shows a progress meter while performing an action on a given number
|
||||
- of bytes. -}
|
||||
meteredBytes :: Maybe MeterUpdate -> Integer -> (MeterUpdate -> Annex a) -> Annex a
|
||||
meteredBytes combinemeterupdate size a = withOutputType go
|
||||
where
|
||||
go NormalOutput = do
|
||||
progress <- liftIO $ newProgress "" size
|
||||
meter <- liftIO $ newMeter progress "B" 25 (renderNums binaryOpts 1)
|
||||
showOutput
|
||||
r <- a $ \n -> liftIO $ do
|
||||
setP progress $ fromBytesProcessed n
|
||||
displayMeter stdout meter
|
||||
maybe noop (\m -> m n) combinemeterupdate
|
||||
liftIO $ clearMeter stdout meter
|
||||
return r
|
||||
go _ = a (const noop)
|
||||
|
||||
{- Progress dots. -}
|
||||
showProgressDots :: Annex ()
|
||||
showProgressDots = handleMessage q $
|
||||
flushed $ putStr "."
|
||||
|
||||
{- Runs a command, that may output progress to either stdout or
|
||||
- stderr, as well as other messages.
|
||||
-
|
||||
- In quiet mode, the output is suppressed, except for error messages.
|
||||
-}
|
||||
progressCommand :: FilePath -> [CommandParam] -> Annex Bool
|
||||
progressCommand cmd params = progressCommandEnv cmd params Nothing
|
||||
|
||||
progressCommandEnv :: FilePath -> [CommandParam] -> Maybe [(String, String)] -> Annex Bool
|
||||
progressCommandEnv cmd params environ = ifM commandProgressDisabled
|
||||
( do
|
||||
oh <- mkOutputHandler
|
||||
liftIO $ demeterCommandEnv oh cmd params environ
|
||||
, liftIO $ boolSystemEnv cmd params environ
|
||||
)
|
||||
|
||||
mkOutputHandler :: Annex OutputHandler
|
||||
mkOutputHandler = OutputHandler
|
||||
<$> commandProgressDisabled
|
||||
<*> mkStderrEmitter
|
||||
|
||||
mkStderrRelayer :: Annex (Handle -> IO ())
|
||||
mkStderrRelayer = do
|
||||
quiet <- commandProgressDisabled
|
||||
emitter <- mkStderrEmitter
|
||||
return $ \h -> avoidProgress quiet h emitter
|
||||
|
||||
{- Generates an IO action that can be used to emit stderr.
|
||||
-
|
||||
- When a progress meter is displayed, this takes care to avoid
|
||||
- messing it up with interleaved stderr from a command.
|
||||
-}
|
||||
mkStderrEmitter :: Annex (String -> IO ())
|
||||
mkStderrEmitter = withOutputType go
|
||||
where
|
||||
go ProgressOutput = return $ \s -> hPutStrLn stderr ("E: " ++ s)
|
||||
go _ = return (hPutStrLn stderr)
|
|
@ -19,6 +19,7 @@ import Logs.Web
|
|||
import Types.UrlContents
|
||||
import Types.CleanupActions
|
||||
import Types.Key
|
||||
import Messages.Progress
|
||||
import Utility.Metered
|
||||
import Utility.Tmp
|
||||
import Backend.URL
|
||||
|
@ -288,14 +289,15 @@ ariaParams ps = do
|
|||
return (ps ++ opts)
|
||||
|
||||
runAria :: [CommandParam] -> Annex Bool
|
||||
runAria ps = liftIO . boolSystem "aria2c" =<< ariaParams ps
|
||||
runAria ps = progressCommand "aria2c" =<< ariaParams ps
|
||||
|
||||
-- Parse aria output to find "(n%)" and update the progress meter
|
||||
-- with it. The output is also output to stdout.
|
||||
-- with it.
|
||||
ariaProgress :: Maybe Integer -> MeterUpdate -> [CommandParam] -> Annex Bool
|
||||
ariaProgress Nothing _ ps = runAria ps
|
||||
ariaProgress (Just sz) meter ps =
|
||||
liftIO . commandMeter (parseAriaProgress sz) meter "aria2c"
|
||||
ariaProgress (Just sz) meter ps = do
|
||||
oh <- mkOutputHandler
|
||||
liftIO . commandMeter (parseAriaProgress sz) oh meter "aria2c"
|
||||
=<< ariaParams ps
|
||||
|
||||
parseAriaProgress :: Integer -> ProgressParser
|
||||
|
|
|
@ -121,18 +121,22 @@ bup command buprepo params = do
|
|||
showOutput -- make way for bup output
|
||||
liftIO $ boolSystem "bup" $ bupParams command buprepo params
|
||||
|
||||
bupSplitParams :: Remote -> BupRepo -> Key -> [CommandParam] -> Annex [CommandParam]
|
||||
bupSplitParams r buprepo k src = do
|
||||
bupSplitParams :: Remote -> BupRepo -> Key -> [CommandParam] -> [CommandParam]
|
||||
bupSplitParams r buprepo k src =
|
||||
let os = map Param $ remoteAnnexBupSplitOptions $ gitconfig r
|
||||
showOutput -- make way for bup output
|
||||
return $ bupParams "split" buprepo
|
||||
in bupParams "split" buprepo
|
||||
(os ++ [Param "-q", Param "-n", Param (bupRef k)] ++ src)
|
||||
|
||||
store :: Remote -> BupRepo -> Storer
|
||||
store r buprepo = byteStorer $ \k b p -> do
|
||||
params <- bupSplitParams r buprepo k []
|
||||
let params = bupSplitParams r buprepo k []
|
||||
showOutput -- make way for bup output
|
||||
let cmd = proc "bup" (toCommand params)
|
||||
liftIO $ withHandle StdinHandle createProcessSuccess cmd $ \h -> do
|
||||
runner <- ifM commandProgressDisabled
|
||||
( return feedWithQuietOutput
|
||||
, return (withHandle StdinHandle)
|
||||
)
|
||||
liftIO $ runner createProcessSuccess cmd $ \h -> do
|
||||
meteredWrite p h b
|
||||
return True
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ import qualified Git
|
|||
import Config
|
||||
import Remote.Helper.Special
|
||||
import Utility.Metered
|
||||
import Messages.Progress
|
||||
import Logs.Transfer
|
||||
import Logs.PreferredContent.Raw
|
||||
import Logs.RemoteState
|
||||
|
@ -26,6 +27,7 @@ import Annex.UUID
|
|||
import Creds
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Control.Concurrent.Async
|
||||
import System.Log.Logger (debugM)
|
||||
import qualified Data.Map as M
|
||||
|
||||
|
@ -323,21 +325,28 @@ fromExternal lck external extractor a =
|
|||
{- Starts an external remote process running, but does not handle checking
|
||||
- VERSION, etc. -}
|
||||
startExternal :: ExternalType -> Annex ExternalState
|
||||
startExternal externaltype = liftIO $ do
|
||||
(Just hin, Just hout, _, pid) <- createProcess $ (proc cmd [])
|
||||
{ std_in = CreatePipe
|
||||
, std_out = CreatePipe
|
||||
, std_err = Inherit
|
||||
}
|
||||
fileEncoding hin
|
||||
fileEncoding hout
|
||||
checkearlytermination =<< getProcessExitCode pid
|
||||
return $ ExternalState
|
||||
{ externalSend = hin
|
||||
, externalReceive = hout
|
||||
, externalPid = pid
|
||||
, externalPrepared = Unprepared
|
||||
}
|
||||
startExternal externaltype = do
|
||||
errrelayer <- mkStderrRelayer
|
||||
liftIO $ do
|
||||
(Just hin, Just hout, Just herr, pid) <- createProcess $
|
||||
(proc cmd [])
|
||||
{ std_in = CreatePipe
|
||||
, std_out = CreatePipe
|
||||
, std_err = CreatePipe
|
||||
}
|
||||
fileEncoding hin
|
||||
fileEncoding hout
|
||||
fileEncoding herr
|
||||
stderrelay <- async $ errrelayer herr
|
||||
checkearlytermination =<< getProcessExitCode pid
|
||||
return $ ExternalState
|
||||
{ externalSend = hin
|
||||
, externalReceive = hout
|
||||
, externalShutdown = do
|
||||
cancel stderrelay
|
||||
void $ waitForProcess pid
|
||||
, externalPrepared = Unprepared
|
||||
}
|
||||
where
|
||||
cmd = externalRemoteProgram externaltype
|
||||
|
||||
|
@ -357,7 +366,7 @@ stopExternal external = liftIO $ stop =<< atomically (tryReadTMVar v)
|
|||
void $ atomically $ tryTakeTMVar v
|
||||
hClose $ externalSend st
|
||||
hClose $ externalReceive st
|
||||
void $ waitForProcess $ externalPid st
|
||||
externalShutdown st
|
||||
v = externalState external
|
||||
|
||||
externalRemoteProgram :: ExternalType -> String
|
||||
|
|
2
Remote/External/Types.hs
vendored
2
Remote/External/Types.hs
vendored
|
@ -70,7 +70,7 @@ type ExternalType = String
|
|||
data ExternalState = ExternalState
|
||||
{ externalSend :: Handle
|
||||
, externalReceive :: Handle
|
||||
, externalPid :: ProcessHandle
|
||||
, externalShutdown :: IO ()
|
||||
, externalPrepared :: PrepareStatus
|
||||
}
|
||||
|
||||
|
|
|
@ -542,7 +542,8 @@ onLocal r a = do
|
|||
cache st = Annex.changeState $ \s -> s
|
||||
{ Annex.remoteannexstate = M.insert (uuid r) st (Annex.remoteannexstate s) }
|
||||
go st a' = do
|
||||
(ret, st') <- liftIO $ Annex.run st $
|
||||
curro <- Annex.getState Annex.output
|
||||
(ret, st') <- liftIO $ Annex.run (st { Annex.output = curro }) $
|
||||
catFileStop `after` a'
|
||||
cache st'
|
||||
return ret
|
||||
|
|
|
@ -42,6 +42,7 @@ import Remote.Helper.Chunked as X
|
|||
import Remote.Helper.Encryptable as X
|
||||
import Remote.Helper.Messages
|
||||
import Annex.Content
|
||||
import Messages.Progress
|
||||
import qualified Git
|
||||
import qualified Git.Command
|
||||
import qualified Git.Construct
|
||||
|
|
|
@ -17,6 +17,7 @@ import CmdLine.GitAnnexShell.Fields (Field, fieldName)
|
|||
import qualified CmdLine.GitAnnexShell.Fields as Fields
|
||||
import Types.Key
|
||||
import Remote.Helper.Messages
|
||||
import Messages.Progress
|
||||
import Utility.Metered
|
||||
import Utility.Rsync
|
||||
import Types.Remote
|
||||
|
@ -100,9 +101,14 @@ dropKey r key = onRemote r (boolSystem, return False) "dropkey"
|
|||
[]
|
||||
|
||||
rsyncHelper :: Maybe MeterUpdate -> [CommandParam] -> Annex Bool
|
||||
rsyncHelper callback params = do
|
||||
rsyncHelper m params = do
|
||||
showOutput -- make way for progress bar
|
||||
ifM (liftIO $ (maybe rsync rsyncProgress callback) params)
|
||||
a <- case m of
|
||||
Nothing -> return $ rsync params
|
||||
Just meter -> do
|
||||
oh <- mkOutputHandler
|
||||
return $ rsyncProgress oh meter params
|
||||
ifM (liftIO a)
|
||||
( return True
|
||||
, do
|
||||
showLongNote "rsync failed -- run git annex again to resume file transfer"
|
||||
|
|
|
@ -17,6 +17,7 @@ import Config.Cost
|
|||
import Annex.UUID
|
||||
import Remote.Helper.Special
|
||||
import Utility.Env
|
||||
import Messages.Progress
|
||||
|
||||
import qualified Data.Map as M
|
||||
|
||||
|
@ -113,7 +114,7 @@ runHook hook action k f a = maybe (return False) run =<< lookupHook hook action
|
|||
where
|
||||
run command = do
|
||||
showOutput -- make way for hook output
|
||||
ifM (liftIO $ boolSystemEnv "sh" [Param "-c", Param command] =<< hookEnv action k f)
|
||||
ifM (progressCommandEnv "sh" [Param "-c", Param command] =<< liftIO (hookEnv action k f))
|
||||
( a
|
||||
, do
|
||||
warning $ hook ++ " hook exited nonzero!"
|
||||
|
|
|
@ -31,6 +31,7 @@ import Remote.Rsync.RsyncUrl
|
|||
import Crypto
|
||||
import Utility.Rsync
|
||||
import Utility.CopyFile
|
||||
import Messages.Progress
|
||||
import Utility.Metered
|
||||
import Utility.PID
|
||||
import Annex.Perms
|
||||
|
@ -281,11 +282,15 @@ showResumable a = ifM a
|
|||
)
|
||||
|
||||
rsyncRemote :: Direction -> RsyncOpts -> Maybe MeterUpdate -> [CommandParam] -> Annex Bool
|
||||
rsyncRemote direction o callback params = do
|
||||
rsyncRemote direction o m params = do
|
||||
showOutput -- make way for progress bar
|
||||
liftIO $ (maybe rsync rsyncProgress callback) $
|
||||
opts ++ [Params "--progress"] ++ params
|
||||
case m of
|
||||
Nothing -> liftIO $ rsync ps
|
||||
Just meter -> do
|
||||
oh <- mkOutputHandler
|
||||
liftIO $ rsyncProgress oh meter ps
|
||||
where
|
||||
ps = opts ++ [Params "--progress"] ++ params
|
||||
opts
|
||||
| direction == Download = rsyncDownloadOptions o
|
||||
| otherwise = rsyncUploadOptions o
|
||||
|
|
|
@ -28,7 +28,7 @@ import qualified Data.Map as M
|
|||
|
||||
runForeground :: IO ()
|
||||
runForeground = do
|
||||
(readh, writeh) <- ioHandles
|
||||
(readh, writeh) <- dupIoHandles
|
||||
ichan <- newTChanIO :: IO (TChan Consumed)
|
||||
ochan <- newTChanIO :: IO (TChan Emitted)
|
||||
|
||||
|
|
|
@ -7,7 +7,9 @@
|
|||
|
||||
module Types.Messages where
|
||||
|
||||
data OutputType = NormalOutput | QuietOutput | JSONOutput
|
||||
import Data.Default
|
||||
|
||||
data OutputType = NormalOutput | QuietOutput | ProgressOutput | JSONOutput
|
||||
|
||||
data SideActionBlock = NoBlock | StartBlock | InBlock
|
||||
deriving (Eq)
|
||||
|
@ -17,5 +19,6 @@ data MessageState = MessageState
|
|||
, sideActionBlock :: SideActionBlock
|
||||
}
|
||||
|
||||
defaultMessageState :: MessageState
|
||||
defaultMessageState = MessageState NormalOutput NoBlock
|
||||
instance Default MessageState
|
||||
where
|
||||
def = MessageState NormalOutput NoBlock
|
||||
|
|
|
@ -16,6 +16,7 @@ import qualified Annex
|
|||
import Annex.Content
|
||||
import Utility.Tmp
|
||||
import Logs
|
||||
import Messages.Progress
|
||||
|
||||
olddir :: Git.Repo -> FilePath
|
||||
olddir g
|
||||
|
|
|
@ -87,7 +87,7 @@ readStrict params = do
|
|||
pipeStrict :: [CommandParam] -> String -> IO String
|
||||
pipeStrict params input = do
|
||||
params' <- stdParams params
|
||||
withBothHandles createProcessSuccess (proc gpgcmd params') $ \(to, from) -> do
|
||||
withIOHandles createProcessSuccess (proc gpgcmd params') $ \(to, from) -> do
|
||||
hSetBinaryMode to True
|
||||
hSetBinaryMode from True
|
||||
hPutStr to input
|
||||
|
@ -142,7 +142,7 @@ pipeLazy params feeder reader = do
|
|||
setup = liftIO . createProcess
|
||||
cleanup p (_, _, _, pid) = liftIO $ forceSuccessProcess p pid
|
||||
go p = do
|
||||
let (to, from) = bothHandles p
|
||||
let (to, from) = ioHandles p
|
||||
liftIO $ void $ forkIO $ do
|
||||
feeder to
|
||||
hClose to
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
{- Metered IO
|
||||
{- Metered IO and actions
|
||||
-
|
||||
- Copyright 2012-2105 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
|
@ -18,6 +18,7 @@ import Foreign.Storable (Storable(sizeOf))
|
|||
import System.Posix.Types
|
||||
import Data.Int
|
||||
import Data.Bits.Utils
|
||||
import Control.Concurrent.Async
|
||||
|
||||
{- An action that can be run repeatedly, updating it on the bytes processed.
|
||||
-
|
||||
|
@ -145,8 +146,13 @@ defaultChunkSize = 32 * k - chunkOverhead
|
|||
k = 1024
|
||||
chunkOverhead = 2 * sizeOf (undefined :: Int) -- GHC specific
|
||||
|
||||
data OutputHandler = OutputHandler
|
||||
{ quietMode :: Bool
|
||||
, stderrHandler :: String -> IO ()
|
||||
}
|
||||
|
||||
{- Parses the String looking for a command's progress output, and returns
|
||||
- Maybe the number of bytes rsynced so far, and any any remainder of the
|
||||
- Maybe the number of bytes done so far, and any any remainder of the
|
||||
- string that could be an incomplete progress output. That remainder
|
||||
- should be prepended to future output, and fed back in. This interface
|
||||
- allows the command's output to be read in any desired size chunk, or
|
||||
|
@ -155,11 +161,15 @@ defaultChunkSize = 32 * k - chunkOverhead
|
|||
type ProgressParser = String -> (Maybe BytesProcessed, String)
|
||||
|
||||
{- Runs a command and runs a ProgressParser on its output, in order
|
||||
- to update the meter. The command's output is also sent to stdout. -}
|
||||
commandMeter :: ProgressParser -> MeterUpdate -> FilePath -> [CommandParam] -> IO Bool
|
||||
commandMeter progressparser meterupdate cmd params = liftIO $ catchBoolIO $
|
||||
withHandle StdoutHandle createProcessSuccess p $
|
||||
feedprogress zeroBytesProcessed []
|
||||
- to update a meter.
|
||||
-}
|
||||
commandMeter :: ProgressParser -> OutputHandler -> MeterUpdate -> FilePath -> [CommandParam] -> IO Bool
|
||||
commandMeter progressparser oh meterupdate cmd params = catchBoolIO $
|
||||
withOEHandles createProcessSuccess p $ \(outh, errh) -> do
|
||||
ep <- async $ handlestderr errh
|
||||
op <- async $ feedprogress zeroBytesProcessed [] outh
|
||||
wait ep
|
||||
wait op
|
||||
where
|
||||
p = proc cmd (toCommand params)
|
||||
|
||||
|
@ -168,8 +178,9 @@ commandMeter progressparser meterupdate cmd params = liftIO $ catchBoolIO $
|
|||
if S.null b
|
||||
then return True
|
||||
else do
|
||||
S.hPut stdout b
|
||||
hFlush stdout
|
||||
unless (quietMode oh) $ do
|
||||
S.hPut stdout b
|
||||
hFlush stdout
|
||||
let s = w82s (S.unpack b)
|
||||
let (mbytes, buf') = progressparser (buf++s)
|
||||
case mbytes of
|
||||
|
@ -178,3 +189,40 @@ commandMeter progressparser meterupdate cmd params = liftIO $ catchBoolIO $
|
|||
when (bytes /= prev) $
|
||||
meterupdate bytes
|
||||
feedprogress bytes buf' h
|
||||
|
||||
handlestderr h = unlessM (hIsEOF h) $ do
|
||||
stderrHandler oh =<< hGetLine h
|
||||
handlestderr h
|
||||
|
||||
{- Runs a command, that may display one or more progress meters on
|
||||
- either stdout or stderr, and prevents the meters from being displayed.
|
||||
-
|
||||
- The other command output is handled as configured by the OutputHandler.
|
||||
-}
|
||||
demeterCommand :: OutputHandler -> FilePath -> [CommandParam] -> IO Bool
|
||||
demeterCommand oh cmd params = demeterCommandEnv oh cmd params Nothing
|
||||
|
||||
demeterCommandEnv :: OutputHandler -> FilePath -> [CommandParam] -> Maybe [(String, String)] -> IO Bool
|
||||
demeterCommandEnv oh cmd params environ = catchBoolIO $
|
||||
withOEHandles createProcessSuccess p $ \(outh, errh) -> do
|
||||
ep <- async $ avoidProgress True errh $ stderrHandler oh
|
||||
op <- async $ avoidProgress True outh $ \l ->
|
||||
unless (quietMode oh) $
|
||||
putStrLn l
|
||||
wait ep
|
||||
wait op
|
||||
return True
|
||||
where
|
||||
p = (proc cmd (toCommand params))
|
||||
{ env = environ }
|
||||
|
||||
{- To suppress progress output, while displaying other messages,
|
||||
- filter out lines that contain \r (typically used to reset to the
|
||||
- beginning of the line when updating a progress display).
|
||||
-}
|
||||
avoidProgress :: Bool -> Handle -> (String -> IO ()) -> IO ()
|
||||
avoidProgress doavoid h emitter = unlessM (hIsEOF h) $ do
|
||||
s <- hGetLine h
|
||||
unless (doavoid && '\r' `elem` s) $
|
||||
emitter s
|
||||
avoidProgress doavoid h emitter
|
||||
|
|
|
@ -25,14 +25,16 @@ module Utility.Process (
|
|||
processTranscript,
|
||||
processTranscript',
|
||||
withHandle,
|
||||
withBothHandles,
|
||||
withIOHandles,
|
||||
withOEHandles,
|
||||
withQuietOutput,
|
||||
feedWithQuietOutput,
|
||||
createProcess,
|
||||
startInteractiveProcess,
|
||||
stdinHandle,
|
||||
stdoutHandle,
|
||||
stderrHandle,
|
||||
bothHandles,
|
||||
ioHandles,
|
||||
processHandle,
|
||||
devNull,
|
||||
) where
|
||||
|
@ -255,12 +257,12 @@ withHandle h creator p a = creator p' $ a . select
|
|||
(stderrHandle, base { std_err = CreatePipe })
|
||||
|
||||
{- Like withHandle, but passes (stdin, stdout) handles to the action. -}
|
||||
withBothHandles
|
||||
withIOHandles
|
||||
:: CreateProcessRunner
|
||||
-> CreateProcess
|
||||
-> ((Handle, Handle) -> IO a)
|
||||
-> IO a
|
||||
withBothHandles creator p a = creator p' $ a . bothHandles
|
||||
withIOHandles creator p a = creator p' $ a . ioHandles
|
||||
where
|
||||
p' = p
|
||||
{ std_in = CreatePipe
|
||||
|
@ -268,6 +270,20 @@ withBothHandles creator p a = creator p' $ a . bothHandles
|
|||
, std_err = Inherit
|
||||
}
|
||||
|
||||
{- Like withHandle, but passes (stdout, stderr) handles to the action. -}
|
||||
withOEHandles
|
||||
:: CreateProcessRunner
|
||||
-> CreateProcess
|
||||
-> ((Handle, Handle) -> IO a)
|
||||
-> IO a
|
||||
withOEHandles creator p a = creator p' $ a . oeHandles
|
||||
where
|
||||
p' = p
|
||||
{ std_in = Inherit
|
||||
, std_out = CreatePipe
|
||||
, std_err = CreatePipe
|
||||
}
|
||||
|
||||
{- Forces the CreateProcessRunner to run quietly;
|
||||
- both stdout and stderr are discarded. -}
|
||||
withQuietOutput
|
||||
|
@ -281,6 +297,21 @@ withQuietOutput creator p = withFile devNull WriteMode $ \nullh -> do
|
|||
}
|
||||
creator p' $ const $ return ()
|
||||
|
||||
{- Stdout and stderr are discarded, while the process is fed stdin
|
||||
- from the handle. -}
|
||||
feedWithQuietOutput
|
||||
:: CreateProcessRunner
|
||||
-> CreateProcess
|
||||
-> (Handle -> IO a)
|
||||
-> IO a
|
||||
feedWithQuietOutput creator p a = withFile devNull WriteMode $ \nullh -> do
|
||||
let p' = p
|
||||
{ std_in = CreatePipe
|
||||
, std_out = UseHandle nullh
|
||||
, std_err = UseHandle nullh
|
||||
}
|
||||
creator p' $ a . stdinHandle
|
||||
|
||||
devNull :: FilePath
|
||||
#ifndef mingw32_HOST_OS
|
||||
devNull = "/dev/null"
|
||||
|
@ -303,9 +334,12 @@ stdoutHandle _ = error "expected stdoutHandle"
|
|||
stderrHandle :: HandleExtractor
|
||||
stderrHandle (_, _, Just h, _) = h
|
||||
stderrHandle _ = error "expected stderrHandle"
|
||||
bothHandles :: (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> (Handle, Handle)
|
||||
bothHandles (Just hin, Just hout, _, _) = (hin, hout)
|
||||
bothHandles _ = error "expected bothHandles"
|
||||
ioHandles :: (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> (Handle, Handle)
|
||||
ioHandles (Just hin, Just hout, _, _) = (hin, hout)
|
||||
ioHandles _ = error "expected ioHandles"
|
||||
oeHandles :: (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> (Handle, Handle)
|
||||
oeHandles (_, Just hout, Just herr, _) = (hout, herr)
|
||||
oeHandles _ = error "expected oeHandles"
|
||||
|
||||
processHandle :: (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> ProcessHandle
|
||||
processHandle (_, _, _, pid) = pid
|
||||
|
|
|
@ -92,13 +92,13 @@ rsyncUrlIsPath s
|
|||
| rsyncUrlIsShell s = False
|
||||
| otherwise = ':' `notElem` s
|
||||
|
||||
{- Runs rsync, but intercepts its progress output and updates a meter.
|
||||
- The progress output is also output to stdout.
|
||||
{- Runs rsync, but intercepts its progress output and updates a progress
|
||||
- meter.
|
||||
-
|
||||
- The params must enable rsync's --progress mode for this to work.
|
||||
-}
|
||||
rsyncProgress :: MeterUpdate -> [CommandParam] -> IO Bool
|
||||
rsyncProgress meterupdate = commandMeter parseRsyncProgress meterupdate "rsync" . rsyncParamsFixup
|
||||
rsyncProgress :: OutputHandler -> MeterUpdate -> [CommandParam] -> IO Bool
|
||||
rsyncProgress oh meter = commandMeter parseRsyncProgress oh meter "rsync" . rsyncParamsFixup
|
||||
|
||||
{- Strategy: Look for chunks prefixed with \r (rsync writes a \r before
|
||||
- the first progress output, and each thereafter). The first number
|
||||
|
|
|
@ -16,7 +16,7 @@ module Utility.SimpleProtocol (
|
|||
parse1,
|
||||
parse2,
|
||||
parse3,
|
||||
ioHandles,
|
||||
dupIoHandles,
|
||||
) where
|
||||
|
||||
import Data.Char
|
||||
|
@ -80,8 +80,8 @@ splitWord = separate isSpace
|
|||
- will mess up the protocol. To avoid that, close stdin, and
|
||||
- and duplicate stderr to stdout. Return two new handles
|
||||
- that are duplicates of the original (stdin, stdout). -}
|
||||
ioHandles :: IO (Handle, Handle)
|
||||
ioHandles = do
|
||||
dupIoHandles :: IO (Handle, Handle)
|
||||
dupIoHandles = do
|
||||
readh <- hDuplicate stdin
|
||||
writeh <- hDuplicate stdout
|
||||
nullh <- openFile devNull ReadMode
|
||||
|
|
|
@ -205,7 +205,7 @@ downloadQuiet :: URLString -> FilePath -> UrlOptions -> IO Bool
|
|||
downloadQuiet = download' True
|
||||
|
||||
download' :: Bool -> URLString -> FilePath -> UrlOptions -> IO Bool
|
||||
download' quiet url file uo =
|
||||
download' quiet url file uo = do
|
||||
case parseURIRelaxed url of
|
||||
Just u
|
||||
| uriScheme u == "file:" -> do
|
||||
|
@ -224,7 +224,7 @@ download' quiet url file uo =
|
|||
-}
|
||||
#ifndef __ANDROID__
|
||||
wgetparams = catMaybes
|
||||
[ if Build.SysConfig.wgetquietprogress
|
||||
[ if Build.SysConfig.wgetquietprogress && not quiet
|
||||
then Just $ Params "-q --show-progress"
|
||||
else Nothing
|
||||
, Just $ Params "--clobber -c -O"
|
||||
|
|
3
debian/changelog
vendored
3
debian/changelog
vendored
|
@ -28,6 +28,9 @@ git-annex (5.20150328) UNRELEASED; urgency=medium
|
|||
* version: Add --raw
|
||||
* init: Improve fifo test to detect NFS systems that support fifos
|
||||
but not well enough for sshcaching.
|
||||
* --quiet now suppresses progress displays from eg, rsync.
|
||||
(The option already suppressed git-annex's own built-in progress
|
||||
displays.)
|
||||
|
||||
-- Joey Hess <id@joeyh.name> Fri, 27 Mar 2015 16:04:43 -0400
|
||||
|
||||
|
|
|
@ -655,8 +655,7 @@ may not be explicitly listed on their individual man pages.
|
|||
|
||||
* `--quiet`
|
||||
|
||||
Avoid the default verbose display of what is done; only show errors
|
||||
and progress displays.
|
||||
Avoid the default verbose display of what is done; only show errors.
|
||||
|
||||
* `--verbose`
|
||||
|
||||
|
|
Loading…
Reference in a new issue