From ad5ef51040f4220dd27cb8bfd2951ca1145c8c0e Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 7 Dec 2016 14:25:01 -0400 Subject: [PATCH] more p2p progress meters Display progress meter on send and receive from remote. Added a new hGetMetered that can read an exact number of bytes (or less), updating a meter as it goes. This commit was sponsored by Andreas on Patreon. --- P2P/IO.hs | 13 ++++++++----- P2P/Protocol.hs | 16 ++++++++-------- Remote/P2P.hs | 11 +++++++---- Remote/S3.hs | 2 +- Utility/Metered.hs | 30 +++++++++++++++++++++--------- 5 files changed, 45 insertions(+), 27 deletions(-) diff --git a/P2P/IO.hs b/P2P/IO.hs index 8a580452c0..ea15ecfc3a 100644 --- a/P2P/IO.hs +++ b/P2P/IO.hs @@ -119,8 +119,8 @@ runNet conn runner f = case f of case v of Right True -> runner next _ -> return Nothing - ReceiveBytes (Len n) next -> do - v <- liftIO $ tryNonAsync $ L.hGet (connIhdl conn) (fromIntegral n) + ReceiveBytes len p next -> do + v <- liftIO $ tryNonAsync $ receiveExactly len (connIhdl conn) p case v of Left _e -> return Nothing Right b -> runner (next b) @@ -155,9 +155,12 @@ runNet conn runner f = case f of -- If too few bytes are sent, the only option is to give up on this -- connection. False is returned to indicate this problem. sendExactly :: Len -> L.ByteString -> Handle -> MeterUpdate -> IO Bool -sendExactly (Len l) b h p = do - sent <- meteredWrite' p h (L.take (fromIntegral l) b) - return (fromBytesProcessed sent == l) +sendExactly (Len n) b h p = do + sent <- meteredWrite' p h (L.take (fromIntegral n) b) + return (fromBytesProcessed sent == n) + +receiveExactly :: Len -> Handle -> MeterUpdate -> IO L.ByteString +receiveExactly (Len n) h p = hGetMetered h (Just n) p runRelay :: RunProto IO -> RelayHandle -> RelayHandle -> IO (Maybe ExitCode) runRelay runner (RelayHandle hout) (RelayHandle hin) = bracket setup cleanup go diff --git a/P2P/Protocol.hs b/P2P/Protocol.hs index 6cefce38c0..b1e2bf4817 100644 --- a/P2P/Protocol.hs +++ b/P2P/Protocol.hs @@ -167,7 +167,7 @@ data NetF c | SendBytes Len L.ByteString MeterUpdate c -- ^ Sends exactly Len bytes of data. (Any more or less will -- confuse the receiver.) - | ReceiveBytes Len (L.ByteString -> c) + | ReceiveBytes Len MeterUpdate (L.ByteString -> c) -- ^ Lazily reads bytes from peer. Stops once Len are read, -- or if connection is lost, and in either case returns the bytes -- that were read. This allows resuming interrupted transfers. @@ -273,8 +273,8 @@ remove key = do net $ sendMessage (REMOVE key) checkSuccess -get :: FilePath -> Key -> AssociatedFile -> Proto Bool -get dest key af = receiveContent sizer storer (\offset -> GET offset af key) +get :: FilePath -> Key -> AssociatedFile -> MeterUpdate -> Proto Bool +get dest key af p = receiveContent p sizer storer (\offset -> GET offset af key) where sizer = fileSize dest storer = storeContentTo dest @@ -364,7 +364,7 @@ serveAuthed myuuid = void $ serverLoop handler else do let sizer = tmpContentSize key let storer = storeContent key af - ok <- receiveContent sizer storer PUT_FROM + ok <- receiveContent nullMeterUpdate sizer storer PUT_FROM when ok $ local $ setPresent key myuuid return ServerContinue @@ -385,8 +385,8 @@ sendContent key af offset p = do net $ sendBytes len content p checkSuccess -receiveContent :: Local Len -> (Offset -> Len -> L.ByteString -> Local Bool) -> (Offset -> Message) -> Proto Bool -receiveContent sizer storer mkmsg = do +receiveContent :: MeterUpdate -> Local Len -> (Offset -> Len -> L.ByteString -> Local Bool) -> (Offset -> Message) -> Proto Bool +receiveContent p sizer storer mkmsg = do Len n <- local sizer let offset = Offset n net $ sendMessage (mkmsg offset) @@ -394,7 +394,7 @@ receiveContent sizer storer mkmsg = do case r of DATA len -> do ok <- local . storer offset len - =<< net (receiveBytes len) + =<< net (receiveBytes len p) sendSuccess ok return ok _ -> do @@ -447,7 +447,7 @@ relayFromPeer = do r <- receiveMessage case r of CONNECTDONE exitcode -> return $ RelayDone exitcode - DATA len -> RelayFromPeer <$> receiveBytes len + DATA len -> RelayFromPeer <$> receiveBytes len nullMeterUpdate _ -> do sendMessage $ ERROR "expected DATA or CONNECTDONE" return $ RelayDone $ ExitFailure 1 diff --git a/Remote/P2P.hs b/Remote/P2P.hs index 8286a9a18c..1d7ede30ff 100644 --- a/Remote/P2P.hs +++ b/Remote/P2P.hs @@ -24,6 +24,7 @@ import Annex.UUID import Config import Config.Cost import Remote.Helper.Git +import Messages.Progress import Utility.Metered import Utility.AuthToken import Types.NumCopies @@ -74,12 +75,14 @@ chainGen addr r u c gc = do return (Just this) store :: UUID -> P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> MeterUpdate -> Annex Bool -store u addr connpool k af p = fromMaybe False - <$> runProto u addr connpool (P2P.put k af p) +store u addr connpool k af p = + metered (Just p) k $ \p' -> fromMaybe False + <$> runProto u addr connpool (P2P.put k af p') retrieve :: UUID -> P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> Annex (Bool, Verification) -retrieve u addr connpool k af dest _p = unVerified $ fromMaybe False - <$> runProto u addr connpool (P2P.get dest k af) +retrieve u addr connpool k af dest p = unVerified $ + metered (Just p) k $ \p' -> fromMaybe False + <$> runProto u addr connpool (P2P.get dest k af p') remove :: UUID -> P2PAddress -> ConnectionPool -> Key -> Annex Bool remove u addr connpool k = fromMaybe False diff --git a/Remote/S3.hs b/Remote/S3.hs index c6f23333f1..4c1bd5784f 100644 --- a/Remote/S3.hs +++ b/Remote/S3.hs @@ -193,7 +193,7 @@ store _r info h = fileStorer $ \k f p -> do uploadid <- S3.imurUploadId <$> sendS3Handle h startreq -- The actual part size will be a even multiple of the - -- 32k chunk size that hGetUntilMetered uses. + -- 32k chunk size that lazy ByteStrings use. let partsz' = (partsz `div` toInteger defaultChunkSize) * toInteger defaultChunkSize -- Send parts of the file, taking care to stream each part diff --git a/Utility/Metered.hs b/Utility/Metered.hs index aa65efd4d4..b80d3ae3f9 100644 --- a/Utility/Metered.hs +++ b/Utility/Metered.hs @@ -1,6 +1,6 @@ {- Metered IO and actions - - - Copyright 2012-2106 Joey Hess + - Copyright 2012-2016 Joey Hess - - License: BSD-2-clause -} @@ -115,24 +115,24 @@ offsetMeterUpdate base offset = \n -> base (offset `addBytesProcessed` n) - meter updates, so use caution. -} hGetContentsMetered :: Handle -> MeterUpdate -> IO L.ByteString -hGetContentsMetered h = hGetUntilMetered h (const True) +hGetContentsMetered h = hGetMetered h Nothing -{- Reads from the Handle, updating the meter after each chunk. +{- Reads from the Handle, updating the meter after each chunk is read. + - + - Stops at EOF, or when the requested number of bytes have been read. + - Closes the Handle at EOF, but otherwise leaves it open. - - Note that the meter update is run in unsafeInterleaveIO, which means that - it can be run at any time. It's even possible for updates to run out - of order, as different parts of the ByteString are consumed. - - - - Stops at EOF, or when keepgoing evaluates to False. - - Closes the Handle at EOF, but otherwise leaves it open. -} -hGetUntilMetered :: Handle -> (Integer -> Bool) -> MeterUpdate -> IO L.ByteString -hGetUntilMetered h keepgoing meterupdate = lazyRead zeroBytesProcessed +hGetMetered :: Handle -> Maybe Integer -> MeterUpdate -> IO L.ByteString +hGetMetered h wantsize meterupdate = lazyRead zeroBytesProcessed where lazyRead sofar = unsafeInterleaveIO $ loop sofar loop sofar = do - c <- S.hGet h defaultChunkSize + c <- S.hGet h (nextchunksize (fromBytesProcessed sofar)) if S.null c then do hClose h @@ -148,6 +148,18 @@ hGetUntilMetered h keepgoing meterupdate = lazyRead zeroBytesProcessed cs <- lazyRead sofar' return $ L.append (L.fromChunks [c]) cs else return $ L.fromChunks [c] + + keepgoing n = case wantsize of + Nothing -> True + Just sz -> n < sz + + nextchunksize n = case wantsize of + Nothing -> defaultChunkSize + Just sz -> + let togo = sz - n + in if togo < toInteger defaultChunkSize + then fromIntegral togo + else defaultChunkSize {- Same default chunk size Lazy ByteStrings use. -} defaultChunkSize :: Int