finish message serialization of progress meters
Any given transfer can only display 1 progress meter at a time, or so this code assumes. In some cases, there are progress meters for different stages of a transfer, perhaps, and that is supported by this. This commit was sponsored by Ethan Aubin.
This commit is contained in:
parent
4efecaebd6
commit
31e417f351
7 changed files with 91 additions and 33 deletions
|
@ -11,6 +11,7 @@ import Assistant.Common
|
||||||
import Assistant.Types.TransferrerPool
|
import Assistant.Types.TransferrerPool
|
||||||
import Types.Transfer
|
import Types.Transfer
|
||||||
import Utility.Batch
|
import Utility.Batch
|
||||||
|
import Messages.Serialized
|
||||||
|
|
||||||
import qualified Command.TransferKeys as T
|
import qualified Command.TransferKeys as T
|
||||||
|
|
||||||
|
@ -58,14 +59,9 @@ checkTransferrerPoolItem program batchmaker i = case i of
|
||||||
performTransfer :: Transferrer -> Transfer -> TransferInfo -> Assistant Bool
|
performTransfer :: Transferrer -> Transfer -> TransferInfo -> Assistant Bool
|
||||||
performTransfer transferrer t info = catchBoolIO $ do
|
performTransfer transferrer t info = catchBoolIO $ do
|
||||||
(liftIO $ T.sendRequest t info (transferrerWrite transferrer))
|
(liftIO $ T.sendRequest t info (transferrerWrite transferrer))
|
||||||
readresponse
|
relaySerializedOutput
|
||||||
where
|
(liftIO (T.readResponse (transferrerRead transferrer)))
|
||||||
readresponse =
|
liftAnnex
|
||||||
liftIO (T.readResponse (transferrerRead transferrer)) >>= \case
|
|
||||||
Right r -> return r
|
|
||||||
Left so -> do
|
|
||||||
liftAnnex $ emitSerializedOutput so
|
|
||||||
readresponse
|
|
||||||
|
|
||||||
{- Starts a new git-annex transferkeys process, setting up handles
|
{- Starts a new git-annex transferkeys process, setting up handles
|
||||||
- that will be used to communicate with it. -}
|
- that will be used to communicate with it. -}
|
||||||
|
|
|
@ -50,7 +50,6 @@ module Messages (
|
||||||
withMessageState,
|
withMessageState,
|
||||||
prompt,
|
prompt,
|
||||||
mkPrompter,
|
mkPrompter,
|
||||||
emitSerializedOutput,
|
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import System.Log.Logger
|
import System.Log.Logger
|
||||||
|
|
|
@ -96,18 +96,3 @@ flushed a = a >> hFlush stdout
|
||||||
|
|
||||||
outputSerialized :: (SerializedOutput -> IO ()) -> SerializedOutput -> IO ()
|
outputSerialized :: (SerializedOutput -> IO ()) -> SerializedOutput -> IO ()
|
||||||
outputSerialized = id
|
outputSerialized = id
|
||||||
|
|
||||||
emitSerializedOutput :: SerializedOutput -> Annex ()
|
|
||||||
emitSerializedOutput (OutputMessage msg) =
|
|
||||||
outputMessage' nojsonoutputter nojsonbuilder msg
|
|
||||||
where
|
|
||||||
nojsonoutputter _ _ = return False
|
|
||||||
nojsonbuilder = id
|
|
||||||
emitSerializedOutput (OutputError msg) = outputError msg
|
|
||||||
emitSerializedOutput (ProgressMeter sz old new) = undefined -- TODO
|
|
||||||
emitSerializedOutput (JSONObject b) =
|
|
||||||
withMessageState $ \s -> case outputType s of
|
|
||||||
JSONOutput _ -> liftIO $ flushed $ JSON.emit' b
|
|
||||||
SerializedOutput h -> liftIO $
|
|
||||||
outputSerialized h $ JSONObject b
|
|
||||||
_ -> q
|
|
||||||
|
|
|
@ -84,10 +84,10 @@ metered'
|
||||||
-- ^ this should run showOutput
|
-- ^ this should run showOutput
|
||||||
-> (Meter -> MeterUpdate -> m a)
|
-> (Meter -> MeterUpdate -> m a)
|
||||||
-> m a
|
-> m a
|
||||||
metered' st othermeter size showoutput a = go size st
|
metered' st othermeter msize showoutput a = go st
|
||||||
where
|
where
|
||||||
go _ (MessageState { outputType = QuietOutput }) = nometer
|
go (MessageState { outputType = QuietOutput }) = nometer
|
||||||
go msize (MessageState { outputType = NormalOutput, concurrentOutputEnabled = False }) = do
|
go (MessageState { outputType = NormalOutput, concurrentOutputEnabled = False }) = do
|
||||||
showoutput
|
showoutput
|
||||||
meter <- liftIO $ mkMeter msize $
|
meter <- liftIO $ mkMeter msize $
|
||||||
displayMeterHandle stdout bandwidthMeter
|
displayMeterHandle stdout bandwidthMeter
|
||||||
|
@ -96,7 +96,7 @@ metered' st othermeter size showoutput a = go size st
|
||||||
r <- a meter (combinemeter m)
|
r <- a meter (combinemeter m)
|
||||||
liftIO $ clearMeterHandle meter stdout
|
liftIO $ clearMeterHandle meter stdout
|
||||||
return r
|
return r
|
||||||
go msize (MessageState { outputType = NormalOutput, concurrentOutputEnabled = True }) =
|
go (MessageState { outputType = NormalOutput, concurrentOutputEnabled = True }) =
|
||||||
withProgressRegion st $ \r -> do
|
withProgressRegion st $ \r -> do
|
||||||
meter <- liftIO $ mkMeter msize $ \_ msize' old new ->
|
meter <- liftIO $ mkMeter msize $ \_ msize' old new ->
|
||||||
let s = bandwidthMeter msize' old new
|
let s = bandwidthMeter msize' old new
|
||||||
|
@ -104,7 +104,7 @@ metered' st othermeter size showoutput a = go size st
|
||||||
m <- liftIO $ rateLimitMeterUpdate consoleratelimit meter $
|
m <- liftIO $ rateLimitMeterUpdate consoleratelimit meter $
|
||||||
updateMeter meter
|
updateMeter meter
|
||||||
a meter (combinemeter m)
|
a meter (combinemeter m)
|
||||||
go msize (MessageState { outputType = JSONOutput jsonoptions })
|
go (MessageState { outputType = JSONOutput jsonoptions })
|
||||||
| jsonProgress jsonoptions = do
|
| jsonProgress jsonoptions = do
|
||||||
let buf = jsonBuffer st
|
let buf = jsonBuffer st
|
||||||
meter <- liftIO $ mkMeter msize $ \_ msize' _old new ->
|
meter <- liftIO $ mkMeter msize $ \_ msize' _old new ->
|
||||||
|
@ -113,12 +113,15 @@ metered' st othermeter size showoutput a = go size st
|
||||||
updateMeter meter
|
updateMeter meter
|
||||||
a meter (combinemeter m)
|
a meter (combinemeter m)
|
||||||
| otherwise = nometer
|
| otherwise = nometer
|
||||||
go msize (MessageState { outputType = SerializedOutput h }) = do
|
go (MessageState { outputType = SerializedOutput h }) = do
|
||||||
meter <- liftIO $ mkMeter msize $ \_ msize' old new ->
|
liftIO $ outputSerialized h $ StartProgressMeter msize
|
||||||
outputSerialized h $ ProgressMeter msize' old new
|
meter <- liftIO $ mkMeter msize $ \_ _ _old new ->
|
||||||
|
outputSerialized h $ UpdateProgressMeter $
|
||||||
|
meterBytesProcessed new
|
||||||
m <- liftIO $ rateLimitMeterUpdate minratelimit meter $
|
m <- liftIO $ rateLimitMeterUpdate minratelimit meter $
|
||||||
updateMeter meter
|
updateMeter meter
|
||||||
a meter (combinemeter m)
|
a meter (combinemeter m)
|
||||||
|
`finally` (liftIO $ outputSerialized h EndProgressMeter)
|
||||||
nometer = do
|
nometer = do
|
||||||
dummymeter <- liftIO $ mkMeter Nothing $
|
dummymeter <- liftIO $ mkMeter Nothing $
|
||||||
\_ _ _ _ -> return ()
|
\_ _ _ _ -> return ()
|
||||||
|
|
71
Messages/Serialized.hs
Normal file
71
Messages/Serialized.hs
Normal file
|
@ -0,0 +1,71 @@
|
||||||
|
{- serialized output
|
||||||
|
-
|
||||||
|
- Copyright 2020 Joey Hess <id@joeyh.name>
|
||||||
|
-
|
||||||
|
- Licensed under the GNU AGPL version 3 or higher.
|
||||||
|
-}
|
||||||
|
|
||||||
|
{-# LANGUAGE RankNTypes #-}
|
||||||
|
|
||||||
|
module Messages.Serialized (outputSerialized, relaySerializedOutput) where
|
||||||
|
|
||||||
|
import Common
|
||||||
|
import Annex
|
||||||
|
import Types.Messages
|
||||||
|
import Messages
|
||||||
|
import Messages.Internal
|
||||||
|
import Messages.Progress
|
||||||
|
import qualified Messages.JSON as JSON
|
||||||
|
|
||||||
|
import Control.Monad.IO.Class (MonadIO)
|
||||||
|
|
||||||
|
relaySerializedOutput
|
||||||
|
:: (Monad m, MonadIO m, MonadMask m)
|
||||||
|
=> m (Either SerializedOutput r)
|
||||||
|
-- ^ Get next serialized output, or final value to return.
|
||||||
|
-> (forall a. Annex a -> m a)
|
||||||
|
-- ^ Run an annex action in the monad. Will not be used with
|
||||||
|
-- actions that block for a long time.
|
||||||
|
-> m r
|
||||||
|
relaySerializedOutput getso runannex = go Nothing
|
||||||
|
where
|
||||||
|
go st = loop st >>= \case
|
||||||
|
Right r -> return r
|
||||||
|
Left st' -> go st'
|
||||||
|
|
||||||
|
loop st = getso >>= \case
|
||||||
|
Right r -> return (Right r)
|
||||||
|
Left (OutputMessage msg) -> do
|
||||||
|
runannex $ outputMessage'
|
||||||
|
(\_ _ -> return False)
|
||||||
|
id
|
||||||
|
msg
|
||||||
|
loop st
|
||||||
|
Left (OutputError msg) -> do
|
||||||
|
runannex $ outputError msg
|
||||||
|
loop st
|
||||||
|
Left (JSONObject b) -> do
|
||||||
|
runannex $ withMessageState $ \s -> case outputType s of
|
||||||
|
JSONOutput _ -> liftIO $ flushed $ JSON.emit' b
|
||||||
|
SerializedOutput h -> liftIO $
|
||||||
|
outputSerialized h $ JSONObject b
|
||||||
|
_ -> q
|
||||||
|
loop st
|
||||||
|
Left (StartProgressMeter sz) -> do
|
||||||
|
ost <- runannex (Annex.getState Annex.output)
|
||||||
|
-- Display a progress meter while running, until
|
||||||
|
-- the meter ends or a final value is returned.
|
||||||
|
metered' ost Nothing sz (runannex showOutput)
|
||||||
|
(\_meter meterupdate -> loop (Just meterupdate))
|
||||||
|
>>= \case
|
||||||
|
Right r -> return (Right r)
|
||||||
|
-- Continue processing serialized
|
||||||
|
-- output after the progress meter
|
||||||
|
-- is done.
|
||||||
|
Left _st' -> loop Nothing
|
||||||
|
Left EndProgressMeter -> return (Left st)
|
||||||
|
Left (UpdateProgressMeter n) -> do
|
||||||
|
case st of
|
||||||
|
Just meterupdate -> liftIO $ meterupdate n
|
||||||
|
Nothing -> noop
|
||||||
|
loop st
|
|
@ -9,6 +9,7 @@ module Types.Messages where
|
||||||
|
|
||||||
import qualified Utility.Aeson as Aeson
|
import qualified Utility.Aeson as Aeson
|
||||||
import Utility.Metered
|
import Utility.Metered
|
||||||
|
import Utility.FileSize
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
import System.Console.Regions (ConsoleRegion)
|
import System.Console.Regions (ConsoleRegion)
|
||||||
|
@ -66,7 +67,9 @@ newMessageState = do
|
||||||
data SerializedOutput
|
data SerializedOutput
|
||||||
= OutputMessage S.ByteString
|
= OutputMessage S.ByteString
|
||||||
| OutputError String
|
| OutputError String
|
||||||
| ProgressMeter (Maybe Integer) MeterState MeterState
|
| StartProgressMeter (Maybe FileSize)
|
||||||
|
| UpdateProgressMeter BytesProcessed
|
||||||
|
| EndProgressMeter
|
||||||
| JSONObject L.ByteString
|
| JSONObject L.ByteString
|
||||||
-- ^ This is always sent, it's up to the consumer to decide if it
|
-- ^ This is always sent, it's up to the consumer to decide if it
|
||||||
-- wants to display JSON, or human-readable messages.
|
-- wants to display JSON, or human-readable messages.
|
||||||
|
|
|
@ -931,6 +931,7 @@ Executable git-annex
|
||||||
Messages.Internal
|
Messages.Internal
|
||||||
Messages.JSON
|
Messages.JSON
|
||||||
Messages.Progress
|
Messages.Progress
|
||||||
|
Messages.Serialized
|
||||||
P2P.Address
|
P2P.Address
|
||||||
P2P.Annex
|
P2P.Annex
|
||||||
P2P.Auth
|
P2P.Auth
|
||||||
|
|
Loading…
Reference in a new issue