diff --git a/P2P/IO.hs b/P2P/IO.hs index 9abefb8a06..8a580452c0 100644 --- a/P2P/IO.hs +++ b/P2P/IO.hs @@ -5,7 +5,7 @@ - Licensed under the GNU GPL version 3 or higher. -} -{-# LANGUAGE RankNTypes, FlexibleContexts, BangPatterns, CPP #-} +{-# LANGUAGE RankNTypes, FlexibleContexts, CPP #-} module P2P.IO ( RunProto @@ -26,6 +26,7 @@ import Utility.AuthToken import Utility.SafeCommand import Utility.SimpleProtocol import Utility.Exception +import Utility.Metered import Utility.Tor import Utility.FileSystemEncoding @@ -110,9 +111,9 @@ runNet conn runner f = case f of let e = ERROR $ "protocol parse error: " ++ show l net $ sendMessage e next e - SendBytes len b next -> do + SendBytes len b p next -> do v <- liftIO $ tryNonAsync $ do - ok <- sendExactly len b (connOhdl conn) + ok <- sendExactly len b (connOhdl conn) p hFlush (connOhdl conn) return ok case v of @@ -153,18 +154,10 @@ runNet conn runner f = case f of -- -- If too few bytes are sent, the only option is to give up on this -- connection. False is returned to indicate this problem. --- --- We can't check the length of the whole lazy bytestring without buffering --- it in memory. Instead, process it one chunk at a time, and sum the length --- of the chunks. -sendExactly :: Len -> L.ByteString -> Handle -> IO Bool -sendExactly (Len l) lb h = go 0 $ L.toChunks $ L.take (fromIntegral l) lb - where - go n [] = return (toInteger n == l) - go n (b:bs) = do - B.hPut h b - let !n' = n + B.length b - go n' bs +sendExactly :: Len -> L.ByteString -> Handle -> MeterUpdate -> IO Bool +sendExactly (Len l) b h p = do + sent <- meteredWrite' p h (L.take (fromIntegral l) b) + return (fromBytesProcessed sent == l) runRelay :: RunProto IO -> RelayHandle -> RelayHandle -> IO (Maybe ExitCode) runRelay runner (RelayHandle hout) (RelayHandle hin) = bracket setup cleanup go diff --git a/P2P/Protocol.hs b/P2P/Protocol.hs index b2b734e48a..6cefce38c0 100644 --- a/P2P/Protocol.hs +++ b/P2P/Protocol.hs @@ -17,6 +17,7 @@ import Types.UUID import Utility.AuthToken import Utility.Applicative import Utility.PartialPrelude +import Utility.Metered import Git.FilePath import Control.Monad @@ -163,7 +164,7 @@ local = hoistFree Local data NetF c = SendMessage Message c | ReceiveMessage (Message -> c) - | SendBytes Len L.ByteString c + | SendBytes Len L.ByteString MeterUpdate c -- ^ Sends exactly Len bytes of data. (Any more or less will -- confuse the receiver.) | ReceiveBytes Len (L.ByteString -> c) @@ -278,12 +279,12 @@ get dest key af = receiveContent sizer storer (\offset -> GET offset af key) sizer = fileSize dest storer = storeContentTo dest -put :: Key -> AssociatedFile -> Proto Bool -put key af = do +put :: Key -> AssociatedFile -> MeterUpdate -> Proto Bool +put key af p = do net $ sendMessage (PUT af key) r <- net receiveMessage case r of - PUT_FROM offset -> sendContent key af offset + PUT_FROM offset -> sendContent key af offset p ALREADY_HAVE -> return True _ -> do net $ sendMessage (ERROR "expected PUT_FROM") @@ -368,7 +369,7 @@ serveAuthed myuuid = void $ serverLoop handler local $ setPresent key myuuid return ServerContinue handler (GET offset key af) = do - void $ sendContent af key offset + void $ sendContent af key offset nullMeterUpdate -- setPresent not called because the peer may have -- requested the data but not permanently stored it. return ServerContinue @@ -377,11 +378,11 @@ serveAuthed myuuid = void $ serverLoop handler return ServerContinue handler _ = return ServerUnexpected -sendContent :: Key -> AssociatedFile -> Offset -> Proto Bool -sendContent key af offset = do +sendContent :: Key -> AssociatedFile -> Offset -> MeterUpdate -> Proto Bool +sendContent key af offset p = do (len, content) <- readContentLen key af offset net $ sendMessage (DATA len) - net $ sendBytes len content + net $ sendBytes len content p checkSuccess receiveContent :: Local Len -> (Offset -> Len -> L.ByteString -> Local Bool) -> (Offset -> Message) -> Proto Bool @@ -456,5 +457,5 @@ relayToPeer (RelayDone exitcode) = sendMessage (CONNECTDONE exitcode) relayToPeer (RelayToPeer b) = do let len = Len $ fromIntegral $ L.length b sendMessage (DATA len) - sendBytes len b + sendBytes len b nullMeterUpdate relayToPeer (RelayFromPeer _) = return () diff --git a/Remote/P2P.hs b/Remote/P2P.hs index f4f1d5f38f..8286a9a18c 100644 --- a/Remote/P2P.hs +++ b/Remote/P2P.hs @@ -73,10 +73,9 @@ chainGen addr r u c gc = do } return (Just this) --- TODO update progress store :: UUID -> P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> MeterUpdate -> Annex Bool store u addr connpool k af p = fromMaybe False - <$> runProto u addr connpool (P2P.put k af) + <$> runProto u addr connpool (P2P.put k af p) retrieve :: UUID -> P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> Annex (Bool, Verification) retrieve u addr connpool k af dest _p = unVerified $ fromMaybe False diff --git a/Utility/Metered.hs b/Utility/Metered.hs index 440aa3f074..aa65efd4d4 100644 --- a/Utility/Metered.hs +++ b/Utility/Metered.hs @@ -85,9 +85,12 @@ streamMeteredFile f meterupdate h = withMeteredFile f meterupdate $ L.hPut h {- Writes a ByteString to a Handle, updating a meter as it's written. -} meteredWrite :: MeterUpdate -> Handle -> L.ByteString -> IO () -meteredWrite meterupdate h = go zeroBytesProcessed . L.toChunks +meteredWrite meterupdate h = void . meteredWrite' meterupdate h + +meteredWrite' :: MeterUpdate -> Handle -> L.ByteString -> IO BytesProcessed +meteredWrite' meterupdate h = go zeroBytesProcessed . L.toChunks where - go _ [] = return () + go sofar [] = return sofar go sofar (c:cs) = do S.hPut h c let sofar' = addBytesProcessed sofar $ S.length c