use total size from DATA
Noticed that getting a key whose size is not known resulted in a progress display that didn't include the percent complete. Fixed for P2P by making the size sent with DATA be used to update the meter's total size. In order for rateLimitMeterUpdate to also learn the total size, had to make it be passed the Meter, and some other reorg in Utility.Metered was also done so that --json-progress can construct a Meter to pass to rateLimitMeterUpdate. When the fallback rsync is done, the progress display still doesn't include the percent complete. Only way to fix that seems to be to let rsync display its output again, but that would conflict with git-annex's own progress meter, which is also being displayed. This commit was sponsored by Henrik Riomar on Patreon.
This commit is contained in:
parent
b96b845ffd
commit
e16b069331
7 changed files with 68 additions and 47 deletions
|
@ -220,7 +220,7 @@ performExport r ea db ek af contentsha loc = do
|
||||||
let rollback = void $
|
let rollback = void $
|
||||||
performUnexport r ea db [ek] loc
|
performUnexport r ea db [ek] loc
|
||||||
sendAnnex k rollback $ \f ->
|
sendAnnex k rollback $ \f ->
|
||||||
metered Nothing k (return $ Just f) $ \m -> do
|
metered Nothing k (return $ Just f) $ \_ m -> do
|
||||||
let m' = combineMeterUpdate pm m
|
let m' = combineMeterUpdate pm m
|
||||||
storer f k loc m'
|
storer f k loc m'
|
||||||
, do
|
, do
|
||||||
|
@ -228,7 +228,7 @@ performExport r ea db ek af contentsha loc = do
|
||||||
return False
|
return False
|
||||||
)
|
)
|
||||||
-- Sending a non-annexed file.
|
-- Sending a non-annexed file.
|
||||||
GitKey sha1k -> metered Nothing sha1k (return Nothing) $ \m ->
|
GitKey sha1k -> metered Nothing sha1k (return Nothing) $ \_ m ->
|
||||||
withTmpFile "export" $ \tmp h -> do
|
withTmpFile "export" $ \tmp h -> do
|
||||||
b <- catObject contentsha
|
b <- catObject contentsha
|
||||||
liftIO $ L.hPut h b
|
liftIO $ L.hPut h b
|
||||||
|
|
|
@ -24,46 +24,52 @@ import qualified System.Console.Concurrent as Console
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
{- Shows a progress meter while performing a transfer of a key.
|
{- Shows a progress meter while performing a transfer of a key.
|
||||||
- The action is passed a callback to use to update the meter.
|
- The action is passed the meter and a callback to use to update the meter.
|
||||||
-
|
-
|
||||||
- When the key's size is not known, the srcfile is statted to get the size.
|
- When the key's size is not known, the srcfile is statted to get the size.
|
||||||
- This allows uploads of keys without size to still have progress
|
- This allows uploads of keys without size to still have progress
|
||||||
- displayed.
|
- displayed.
|
||||||
--}
|
--}
|
||||||
metered :: Maybe MeterUpdate -> Key -> Annex (Maybe FilePath) -> (MeterUpdate -> Annex a) -> Annex a
|
metered :: Maybe MeterUpdate -> Key -> Annex (Maybe FilePath) -> (Meter -> MeterUpdate -> Annex a) -> Annex a
|
||||||
metered othermeter key getsrcfile a = withMessageState $ \st ->
|
metered othermeter key getsrcfile a = withMessageState $ \st ->
|
||||||
flip go st =<< getsz
|
flip go st =<< getsz
|
||||||
where
|
where
|
||||||
go _ (MessageState { outputType = QuietOutput }) = nometer
|
go _ (MessageState { outputType = QuietOutput }) = nometer
|
||||||
go msize (MessageState { outputType = NormalOutput, concurrentOutputEnabled = False }) = do
|
go msize (MessageState { outputType = NormalOutput, concurrentOutputEnabled = False }) = do
|
||||||
showOutput
|
showOutput
|
||||||
meter <- liftIO $ mkMeter msize bandwidthMeter $
|
meter <- liftIO $ mkMeter msize $
|
||||||
displayMeterHandle stdout
|
displayMeterHandle stdout bandwidthMeter
|
||||||
m <- liftIO $ rateLimitMeterUpdate 0.1 msize $
|
m <- liftIO $ rateLimitMeterUpdate 0.1 meter $
|
||||||
updateMeter meter
|
updateMeter meter
|
||||||
r <- a (combinemeter m)
|
r <- a meter (combinemeter m)
|
||||||
liftIO $ clearMeterHandle meter stdout
|
liftIO $ clearMeterHandle meter stdout
|
||||||
return r
|
return r
|
||||||
go msize (MessageState { outputType = NormalOutput, concurrentOutputEnabled = True }) =
|
go msize (MessageState { outputType = NormalOutput, concurrentOutputEnabled = True }) =
|
||||||
#if WITH_CONCURRENTOUTPUT
|
#if WITH_CONCURRENTOUTPUT
|
||||||
withProgressRegion $ \r -> do
|
withProgressRegion $ \r -> do
|
||||||
meter <- liftIO $ mkMeter msize bandwidthMeter $ \_ s ->
|
meter <- liftIO $ mkMeter msize $ \_ msize' old new ->
|
||||||
Regions.setConsoleRegion r ('\n' : s)
|
let s = bandwidthMeter msize' old new
|
||||||
m <- liftIO $ rateLimitMeterUpdate 0.1 msize $
|
in Regions.setConsoleRegion r ('\n' : s)
|
||||||
|
m <- liftIO $ rateLimitMeterUpdate 0.1 meter $
|
||||||
updateMeter meter
|
updateMeter meter
|
||||||
a (combinemeter m)
|
a meter (combinemeter m)
|
||||||
#else
|
#else
|
||||||
nometer
|
nometer
|
||||||
#endif
|
#endif
|
||||||
go msize (MessageState { outputType = JSONOutput jsonoptions })
|
go msize (MessageState { outputType = JSONOutput jsonoptions })
|
||||||
| jsonProgress jsonoptions = do
|
| jsonProgress jsonoptions = do
|
||||||
buf <- withMessageState $ return . jsonBuffer
|
buf <- withMessageState $ return . jsonBuffer
|
||||||
m <- liftIO $ rateLimitMeterUpdate 0.1 msize $
|
meter <- liftIO $ mkMeter msize $ \_ msize' _old (new, _now) ->
|
||||||
JSON.progress buf msize
|
JSON.progress buf msize' new
|
||||||
a (combinemeter m)
|
m <- liftIO $ rateLimitMeterUpdate 0.1 meter $
|
||||||
|
updateMeter meter
|
||||||
|
a meter (combinemeter m)
|
||||||
| otherwise = nometer
|
| otherwise = nometer
|
||||||
|
|
||||||
nometer = a $ combinemeter (const noop)
|
nometer = do
|
||||||
|
dummymeter <- liftIO $ mkMeter Nothing $
|
||||||
|
\_ _ _ _ -> return ()
|
||||||
|
a dummymeter (combinemeter (const noop))
|
||||||
|
|
||||||
combinemeter m = case othermeter of
|
combinemeter m = case othermeter of
|
||||||
Nothing -> m
|
Nothing -> m
|
||||||
|
@ -82,7 +88,7 @@ metered othermeter key getsrcfile a = withMessageState $ \st ->
|
||||||
meteredFile :: FilePath -> Maybe MeterUpdate -> Key -> Annex a -> Annex a
|
meteredFile :: FilePath -> Maybe MeterUpdate -> Key -> Annex a -> Annex a
|
||||||
meteredFile file combinemeterupdate key a =
|
meteredFile file combinemeterupdate key a =
|
||||||
withMessageState $ \s -> if needOutputMeter s
|
withMessageState $ \s -> if needOutputMeter s
|
||||||
then metered combinemeterupdate key (return Nothing) $ \p ->
|
then metered combinemeterupdate key (return Nothing) $ \_ p ->
|
||||||
watchFileSize file p a
|
watchFileSize file p a
|
||||||
else a
|
else a
|
||||||
where
|
where
|
||||||
|
|
|
@ -117,6 +117,9 @@ runLocal runst runner a = case a of
|
||||||
Left e -> return (Left (show e))
|
Left e -> return (Left (show e))
|
||||||
Right changedrefs -> runner (next changedrefs)
|
Right changedrefs -> runner (next changedrefs)
|
||||||
_ -> return $ Left "change notification not available"
|
_ -> return $ Left "change notification not available"
|
||||||
|
UpdateMeterTotalSize m sz next -> do
|
||||||
|
liftIO $ setMeterTotalSize m sz
|
||||||
|
runner next
|
||||||
where
|
where
|
||||||
transfer mk k af ta = case runst of
|
transfer mk k af ta = case runst of
|
||||||
-- Update transfer logs when serving.
|
-- Update transfer logs when serving.
|
||||||
|
|
|
@ -257,7 +257,10 @@ data LocalF c
|
||||||
-- action. If unable to lock the content, or the content is not
|
-- action. If unable to lock the content, or the content is not
|
||||||
-- present, runs the protocol action with False.
|
-- present, runs the protocol action with False.
|
||||||
| WaitRefChange (ChangedRefs -> c)
|
| WaitRefChange (ChangedRefs -> c)
|
||||||
-- ^ Waits for one or more git refs to change and returns them.
|
-- ^ Waits for one or more git refs to change and returns them.a
|
||||||
|
| UpdateMeterTotalSize Meter Integer c
|
||||||
|
-- ^ Updates the total size of a Meter, for cases where the size is
|
||||||
|
-- not known until the data is being received.
|
||||||
deriving (Functor)
|
deriving (Functor)
|
||||||
|
|
||||||
type Local = Free LocalF
|
type Local = Free LocalF
|
||||||
|
@ -323,8 +326,9 @@ remove key = do
|
||||||
net $ sendMessage (REMOVE key)
|
net $ sendMessage (REMOVE key)
|
||||||
checkSuccess
|
checkSuccess
|
||||||
|
|
||||||
get :: FilePath -> Key -> AssociatedFile -> MeterUpdate -> Proto Bool
|
get :: FilePath -> Key -> AssociatedFile -> Meter -> MeterUpdate -> Proto Bool
|
||||||
get dest key af p = receiveContent p sizer storer (\offset -> GET offset af key)
|
get dest key af m p =
|
||||||
|
receiveContent (Just m) p sizer storer (\offset -> GET offset af key)
|
||||||
where
|
where
|
||||||
sizer = fileSize dest
|
sizer = fileSize dest
|
||||||
storer = storeContentTo dest
|
storer = storeContentTo dest
|
||||||
|
@ -433,7 +437,7 @@ serveAuthed servermode myuuid = void $ serverLoop handler
|
||||||
else do
|
else do
|
||||||
let sizer = tmpContentSize key
|
let sizer = tmpContentSize key
|
||||||
let storer = storeContent key af
|
let storer = storeContent key af
|
||||||
ok <- receiveContent nullMeterUpdate sizer storer PUT_FROM
|
ok <- receiveContent Nothing nullMeterUpdate sizer storer PUT_FROM
|
||||||
when ok $
|
when ok $
|
||||||
local $ setPresent key myuuid
|
local $ setPresent key myuuid
|
||||||
return ServerContinue
|
return ServerContinue
|
||||||
|
@ -477,15 +481,18 @@ sendContent key af offset@(Offset n) p = go =<< local (contentSize key)
|
||||||
net $ sendBytes len content p'
|
net $ sendBytes len content p'
|
||||||
checkSuccess
|
checkSuccess
|
||||||
|
|
||||||
receiveContent :: MeterUpdate -> Local Len -> (Offset -> Len -> Proto L.ByteString -> Local Bool) -> (Offset -> Message) -> Proto Bool
|
receiveContent :: Maybe Meter -> MeterUpdate -> Local Len -> (Offset -> Len -> Proto L.ByteString -> Local Bool) -> (Offset -> Message) -> Proto Bool
|
||||||
receiveContent p sizer storer mkmsg = do
|
receiveContent mm p sizer storer mkmsg = do
|
||||||
Len n <- local sizer
|
Len n <- local sizer
|
||||||
let p' = offsetMeterUpdate p (toBytesProcessed n)
|
let p' = offsetMeterUpdate p (toBytesProcessed n)
|
||||||
let offset = Offset n
|
let offset = Offset n
|
||||||
net $ sendMessage (mkmsg offset)
|
net $ sendMessage (mkmsg offset)
|
||||||
r <- net receiveMessage
|
r <- net receiveMessage
|
||||||
case r of
|
case r of
|
||||||
Just (DATA len) -> do
|
Just (DATA len@(Len l)) -> do
|
||||||
|
local $ case mm of
|
||||||
|
Nothing -> return ()
|
||||||
|
Just m -> updateMeterTotalSize m (n+l)
|
||||||
ok <- local $ storer offset len
|
ok <- local $ storer offset len
|
||||||
(net (receiveBytes len p'))
|
(net (receiveBytes len p'))
|
||||||
sendSuccess ok
|
sendSuccess ok
|
||||||
|
|
|
@ -33,14 +33,14 @@ type WithConn a c = (ClosableConnection c -> Annex (ClosableConnection c, a)) ->
|
||||||
store :: (MeterUpdate -> ProtoRunner Bool) -> Key -> AssociatedFile -> MeterUpdate -> Annex Bool
|
store :: (MeterUpdate -> ProtoRunner Bool) -> Key -> AssociatedFile -> MeterUpdate -> Annex Bool
|
||||||
store runner k af p = do
|
store runner k af p = do
|
||||||
let getsrcfile = fmap fst <$> prepSendAnnex k
|
let getsrcfile = fmap fst <$> prepSendAnnex k
|
||||||
metered (Just p) k getsrcfile $ \p' ->
|
metered (Just p) k getsrcfile $ \_ p' ->
|
||||||
fromMaybe False
|
fromMaybe False
|
||||||
<$> runner p' (P2P.put k af p')
|
<$> runner p' (P2P.put k af p')
|
||||||
|
|
||||||
retrieve :: (MeterUpdate -> ProtoRunner Bool) -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> Annex (Bool, Verification)
|
retrieve :: (MeterUpdate -> ProtoRunner Bool) -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> Annex (Bool, Verification)
|
||||||
retrieve runner k af dest p = unVerified $
|
retrieve runner k af dest p = unVerified $
|
||||||
metered (Just p) k (return Nothing) $ \p' -> fromMaybe False
|
metered (Just p) k (return Nothing) $ \m p' -> fromMaybe False
|
||||||
<$> runner p (P2P.get dest k af p')
|
<$> runner p' (P2P.get dest k af m p')
|
||||||
|
|
||||||
remove :: ProtoRunner Bool -> Key -> Annex Bool
|
remove :: ProtoRunner Bool -> Key -> Annex Bool
|
||||||
remove runner k = fromMaybe False <$> runner (P2P.remove k)
|
remove runner k = fromMaybe False <$> runner (P2P.remove k)
|
||||||
|
|
|
@ -228,7 +228,7 @@ specialRemote' cfg c preparestorer prepareretriever prepareremover preparecheckp
|
||||||
chunkconfig = chunkConfig cfg
|
chunkconfig = chunkConfig cfg
|
||||||
|
|
||||||
displayprogress p k srcfile a
|
displayprogress p k srcfile a
|
||||||
| displayProgress cfg = metered (Just p) k (return srcfile) a
|
| displayProgress cfg = metered (Just p) k (return srcfile) (const a)
|
||||||
| otherwise = a p
|
| otherwise = a p
|
||||||
|
|
||||||
{- Sink callback for retrieveChunks. Stores the file content into the
|
{- Sink callback for retrieveChunks. Stores the file content into the
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{- Metered IO and actions
|
{- Metered IO and actions
|
||||||
-
|
-
|
||||||
- Copyright 2012-2016 Joey Hess <id@joeyh.name>
|
- Copyright 2012-2018 Joey Hess <id@joeyh.name>
|
||||||
-
|
-
|
||||||
- License: BSD-2-clause
|
- License: BSD-2-clause
|
||||||
-}
|
-}
|
||||||
|
@ -288,14 +288,14 @@ outputFilter cmd params environ outfilter errfilter = catchBoolIO $ do
|
||||||
-- | Limit a meter to only update once per unit of time.
|
-- | Limit a meter to only update once per unit of time.
|
||||||
--
|
--
|
||||||
-- It's nice to display the final update to 100%, even if it comes soon
|
-- It's nice to display the final update to 100%, even if it comes soon
|
||||||
-- after a previous update. To make that happen, a total size has to be
|
-- after a previous update. To make that happen, the Meter has to know
|
||||||
-- provided.
|
-- its total size.
|
||||||
rateLimitMeterUpdate :: NominalDiffTime -> Maybe Integer -> MeterUpdate -> IO MeterUpdate
|
rateLimitMeterUpdate :: NominalDiffTime -> Meter -> MeterUpdate -> IO MeterUpdate
|
||||||
rateLimitMeterUpdate delta totalsize meterupdate = do
|
rateLimitMeterUpdate delta (Meter totalsizev _ _ _) meterupdate = do
|
||||||
lastupdate <- newMVar (toEnum 0 :: POSIXTime)
|
lastupdate <- newMVar (toEnum 0 :: POSIXTime)
|
||||||
return $ mu lastupdate
|
return $ mu lastupdate
|
||||||
where
|
where
|
||||||
mu lastupdate n@(BytesProcessed i) = case totalsize of
|
mu lastupdate n@(BytesProcessed i) = tryReadMVar totalsizev >>= \case
|
||||||
Just t | i >= t -> meterupdate n
|
Just t | i >= t -> meterupdate n
|
||||||
_ -> do
|
_ -> do
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
|
@ -306,35 +306,40 @@ rateLimitMeterUpdate delta totalsize meterupdate = do
|
||||||
meterupdate n
|
meterupdate n
|
||||||
else putMVar lastupdate prev
|
else putMVar lastupdate prev
|
||||||
|
|
||||||
data Meter = Meter (Maybe Integer) (MVar MeterState) (MVar String) RenderMeter DisplayMeter
|
data Meter = Meter (MVar Integer) (MVar MeterState) (MVar String) DisplayMeter
|
||||||
|
|
||||||
type MeterState = (BytesProcessed, POSIXTime)
|
type MeterState = (BytesProcessed, POSIXTime)
|
||||||
|
|
||||||
type DisplayMeter = MVar String -> String -> IO ()
|
type DisplayMeter = MVar String -> Maybe Integer -> (BytesProcessed, POSIXTime) -> (BytesProcessed, POSIXTime) -> IO ()
|
||||||
|
|
||||||
type RenderMeter = Maybe Integer -> (BytesProcessed, POSIXTime) -> (BytesProcessed, POSIXTime) -> String
|
type RenderMeter = Maybe Integer -> (BytesProcessed, POSIXTime) -> (BytesProcessed, POSIXTime) -> String
|
||||||
|
|
||||||
-- | Make a meter. Pass the total size, if it's known.
|
-- | Make a meter. Pass the total size, if it's known.
|
||||||
mkMeter :: Maybe Integer -> RenderMeter -> DisplayMeter -> IO Meter
|
mkMeter :: Maybe Integer -> DisplayMeter -> IO Meter
|
||||||
mkMeter totalsize rendermeter displaymeter = Meter
|
mkMeter totalsize displaymeter = Meter
|
||||||
<$> pure totalsize
|
<$> maybe newEmptyMVar newMVar totalsize
|
||||||
<*> ((\t -> newMVar (zeroBytesProcessed, t)) =<< getPOSIXTime)
|
<*> ((\t -> newMVar (zeroBytesProcessed, t)) =<< getPOSIXTime)
|
||||||
<*> newMVar ""
|
<*> newMVar ""
|
||||||
<*> pure rendermeter
|
|
||||||
<*> pure displaymeter
|
<*> pure displaymeter
|
||||||
|
|
||||||
|
setMeterTotalSize :: Meter -> Integer -> IO ()
|
||||||
|
setMeterTotalSize (Meter totalsizev _ _ _) totalsize = do
|
||||||
|
void $ tryTakeMVar totalsizev
|
||||||
|
putMVar totalsizev totalsize
|
||||||
|
|
||||||
-- | Updates the meter, displaying it if necessary.
|
-- | Updates the meter, displaying it if necessary.
|
||||||
updateMeter :: Meter -> BytesProcessed -> IO ()
|
updateMeter :: Meter -> BytesProcessed -> IO ()
|
||||||
updateMeter (Meter totalsize sv bv rendermeter displaymeter) new = do
|
updateMeter (Meter totalsizev sv bv displaymeter) new = do
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
(old, before) <- swapMVar sv (new, now)
|
(old, before) <- swapMVar sv (new, now)
|
||||||
when (old /= new) $
|
when (old /= new) $ do
|
||||||
displaymeter bv $
|
totalsize <- tryReadMVar totalsizev
|
||||||
rendermeter totalsize (old, before) (new, now)
|
displaymeter bv totalsize (old, before) (new, now)
|
||||||
|
|
||||||
-- | Display meter to a Handle.
|
-- | Display meter to a Handle.
|
||||||
displayMeterHandle :: Handle -> DisplayMeter
|
displayMeterHandle :: Handle -> RenderMeter -> DisplayMeter
|
||||||
displayMeterHandle h v s = do
|
displayMeterHandle h rendermeter v msize old new = do
|
||||||
|
let s = rendermeter msize old new
|
||||||
olds <- swapMVar v s
|
olds <- swapMVar v s
|
||||||
-- Avoid writing when the rendered meter has not changed.
|
-- Avoid writing when the rendered meter has not changed.
|
||||||
when (olds /= s) $ do
|
when (olds /= s) $ do
|
||||||
|
@ -344,7 +349,7 @@ displayMeterHandle h v s = do
|
||||||
|
|
||||||
-- | Clear meter displayed by displayMeterHandle.
|
-- | Clear meter displayed by displayMeterHandle.
|
||||||
clearMeterHandle :: Meter -> Handle -> IO ()
|
clearMeterHandle :: Meter -> Handle -> IO ()
|
||||||
clearMeterHandle (Meter _ _ v _ _) h = do
|
clearMeterHandle (Meter _ _ v _) h = do
|
||||||
olds <- readMVar v
|
olds <- readMVar v
|
||||||
hPutStr h $ '\r' : replicate (length olds) ' ' ++ "\r"
|
hPutStr h $ '\r' : replicate (length olds) ' ' ++ "\r"
|
||||||
hFlush h
|
hFlush h
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue