more p2p progress meters

Display progress meter on send and receive from remote.

Added a new hGetMetered that can read an exact number of bytes (or
less), updating a meter as it goes.

This commit was sponsored by Andreas on Patreon.
This commit is contained in:
Joey Hess 2016-12-07 14:25:01 -04:00
parent f3a3dc14ec
commit ad5ef51040
No known key found for this signature in database
GPG key ID: C910D9222512E3C7
5 changed files with 45 additions and 27 deletions

View file

@ -119,8 +119,8 @@ runNet conn runner f = case f of
case v of
Right True -> runner next
_ -> return Nothing
ReceiveBytes (Len n) next -> do
v <- liftIO $ tryNonAsync $ L.hGet (connIhdl conn) (fromIntegral n)
ReceiveBytes len p next -> do
v <- liftIO $ tryNonAsync $ receiveExactly len (connIhdl conn) p
case v of
Left _e -> return Nothing
Right b -> runner (next b)
@ -155,9 +155,12 @@ runNet conn runner f = case f of
-- If too few bytes are sent, the only option is to give up on this
-- connection. False is returned to indicate this problem.
sendExactly :: Len -> L.ByteString -> Handle -> MeterUpdate -> IO Bool
sendExactly (Len l) b h p = do
sent <- meteredWrite' p h (L.take (fromIntegral l) b)
return (fromBytesProcessed sent == l)
sendExactly (Len n) b h p = do
sent <- meteredWrite' p h (L.take (fromIntegral n) b)
return (fromBytesProcessed sent == n)
receiveExactly :: Len -> Handle -> MeterUpdate -> IO L.ByteString
receiveExactly (Len n) h p = hGetMetered h (Just n) p
runRelay :: RunProto IO -> RelayHandle -> RelayHandle -> IO (Maybe ExitCode)
runRelay runner (RelayHandle hout) (RelayHandle hin) = bracket setup cleanup go

View file

@ -167,7 +167,7 @@ data NetF c
| SendBytes Len L.ByteString MeterUpdate c
-- ^ Sends exactly Len bytes of data. (Any more or less will
-- confuse the receiver.)
| ReceiveBytes Len (L.ByteString -> c)
| ReceiveBytes Len MeterUpdate (L.ByteString -> c)
-- ^ Lazily reads bytes from peer. Stops once Len are read,
-- or if connection is lost, and in either case returns the bytes
-- that were read. This allows resuming interrupted transfers.
@ -273,8 +273,8 @@ remove key = do
net $ sendMessage (REMOVE key)
checkSuccess
get :: FilePath -> Key -> AssociatedFile -> Proto Bool
get dest key af = receiveContent sizer storer (\offset -> GET offset af key)
get :: FilePath -> Key -> AssociatedFile -> MeterUpdate -> Proto Bool
get dest key af p = receiveContent p sizer storer (\offset -> GET offset af key)
where
sizer = fileSize dest
storer = storeContentTo dest
@ -364,7 +364,7 @@ serveAuthed myuuid = void $ serverLoop handler
else do
let sizer = tmpContentSize key
let storer = storeContent key af
ok <- receiveContent sizer storer PUT_FROM
ok <- receiveContent nullMeterUpdate sizer storer PUT_FROM
when ok $
local $ setPresent key myuuid
return ServerContinue
@ -385,8 +385,8 @@ sendContent key af offset p = do
net $ sendBytes len content p
checkSuccess
receiveContent :: Local Len -> (Offset -> Len -> L.ByteString -> Local Bool) -> (Offset -> Message) -> Proto Bool
receiveContent sizer storer mkmsg = do
receiveContent :: MeterUpdate -> Local Len -> (Offset -> Len -> L.ByteString -> Local Bool) -> (Offset -> Message) -> Proto Bool
receiveContent p sizer storer mkmsg = do
Len n <- local sizer
let offset = Offset n
net $ sendMessage (mkmsg offset)
@ -394,7 +394,7 @@ receiveContent sizer storer mkmsg = do
case r of
DATA len -> do
ok <- local . storer offset len
=<< net (receiveBytes len)
=<< net (receiveBytes len p)
sendSuccess ok
return ok
_ -> do
@ -447,7 +447,7 @@ relayFromPeer = do
r <- receiveMessage
case r of
CONNECTDONE exitcode -> return $ RelayDone exitcode
DATA len -> RelayFromPeer <$> receiveBytes len
DATA len -> RelayFromPeer <$> receiveBytes len nullMeterUpdate
_ -> do
sendMessage $ ERROR "expected DATA or CONNECTDONE"
return $ RelayDone $ ExitFailure 1

View file

@ -24,6 +24,7 @@ import Annex.UUID
import Config
import Config.Cost
import Remote.Helper.Git
import Messages.Progress
import Utility.Metered
import Utility.AuthToken
import Types.NumCopies
@ -74,12 +75,14 @@ chainGen addr r u c gc = do
return (Just this)
store :: UUID -> P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> MeterUpdate -> Annex Bool
store u addr connpool k af p = fromMaybe False
<$> runProto u addr connpool (P2P.put k af p)
store u addr connpool k af p =
metered (Just p) k $ \p' -> fromMaybe False
<$> runProto u addr connpool (P2P.put k af p')
retrieve :: UUID -> P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> Annex (Bool, Verification)
retrieve u addr connpool k af dest _p = unVerified $ fromMaybe False
<$> runProto u addr connpool (P2P.get dest k af)
retrieve u addr connpool k af dest p = unVerified $
metered (Just p) k $ \p' -> fromMaybe False
<$> runProto u addr connpool (P2P.get dest k af p')
remove :: UUID -> P2PAddress -> ConnectionPool -> Key -> Annex Bool
remove u addr connpool k = fromMaybe False

View file

@ -193,7 +193,7 @@ store _r info h = fileStorer $ \k f p -> do
uploadid <- S3.imurUploadId <$> sendS3Handle h startreq
-- The actual part size will be a even multiple of the
-- 32k chunk size that hGetUntilMetered uses.
-- 32k chunk size that lazy ByteStrings use.
let partsz' = (partsz `div` toInteger defaultChunkSize) * toInteger defaultChunkSize
-- Send parts of the file, taking care to stream each part

View file

@ -1,6 +1,6 @@
{- Metered IO and actions
-
- Copyright 2012-2106 Joey Hess <id@joeyh.name>
- Copyright 2012-2016 Joey Hess <id@joeyh.name>
-
- License: BSD-2-clause
-}
@ -115,24 +115,24 @@ offsetMeterUpdate base offset = \n -> base (offset `addBytesProcessed` n)
- meter updates, so use caution.
-}
hGetContentsMetered :: Handle -> MeterUpdate -> IO L.ByteString
hGetContentsMetered h = hGetUntilMetered h (const True)
hGetContentsMetered h = hGetMetered h Nothing
{- Reads from the Handle, updating the meter after each chunk.
{- Reads from the Handle, updating the meter after each chunk is read.
-
- Stops at EOF, or when the requested number of bytes have been read.
- Closes the Handle at EOF, but otherwise leaves it open.
-
- Note that the meter update is run in unsafeInterleaveIO, which means that
- it can be run at any time. It's even possible for updates to run out
- of order, as different parts of the ByteString are consumed.
-
- Stops at EOF, or when keepgoing evaluates to False.
- Closes the Handle at EOF, but otherwise leaves it open.
-}
hGetUntilMetered :: Handle -> (Integer -> Bool) -> MeterUpdate -> IO L.ByteString
hGetUntilMetered h keepgoing meterupdate = lazyRead zeroBytesProcessed
hGetMetered :: Handle -> Maybe Integer -> MeterUpdate -> IO L.ByteString
hGetMetered h wantsize meterupdate = lazyRead zeroBytesProcessed
where
lazyRead sofar = unsafeInterleaveIO $ loop sofar
loop sofar = do
c <- S.hGet h defaultChunkSize
c <- S.hGet h (nextchunksize (fromBytesProcessed sofar))
if S.null c
then do
hClose h
@ -148,6 +148,18 @@ hGetUntilMetered h keepgoing meterupdate = lazyRead zeroBytesProcessed
cs <- lazyRead sofar'
return $ L.append (L.fromChunks [c]) cs
else return $ L.fromChunks [c]
keepgoing n = case wantsize of
Nothing -> True
Just sz -> n < sz
nextchunksize n = case wantsize of
Nothing -> defaultChunkSize
Just sz ->
let togo = sz - n
in if togo < toInteger defaultChunkSize
then fromIntegral togo
else defaultChunkSize
{- Same default chunk size Lazy ByteStrings use. -}
defaultChunkSize :: Int