update progress meter when sending to p2p remote

This commit was sponsored by Thom May on Patreon.
This commit is contained in:
Joey Hess 2016-12-07 13:37:35 -04:00
parent 7c245b2180
commit 83ea1cec86
No known key found for this signature in database
GPG key ID: C910D9222512E3C7
4 changed files with 24 additions and 28 deletions

View file

@ -5,7 +5,7 @@
- Licensed under the GNU GPL version 3 or higher. - Licensed under the GNU GPL version 3 or higher.
-} -}
{-# LANGUAGE RankNTypes, FlexibleContexts, BangPatterns, CPP #-} {-# LANGUAGE RankNTypes, FlexibleContexts, CPP #-}
module P2P.IO module P2P.IO
( RunProto ( RunProto
@ -26,6 +26,7 @@ import Utility.AuthToken
import Utility.SafeCommand import Utility.SafeCommand
import Utility.SimpleProtocol import Utility.SimpleProtocol
import Utility.Exception import Utility.Exception
import Utility.Metered
import Utility.Tor import Utility.Tor
import Utility.FileSystemEncoding import Utility.FileSystemEncoding
@ -110,9 +111,9 @@ runNet conn runner f = case f of
let e = ERROR $ "protocol parse error: " ++ show l let e = ERROR $ "protocol parse error: " ++ show l
net $ sendMessage e net $ sendMessage e
next e next e
SendBytes len b next -> do SendBytes len b p next -> do
v <- liftIO $ tryNonAsync $ do v <- liftIO $ tryNonAsync $ do
ok <- sendExactly len b (connOhdl conn) ok <- sendExactly len b (connOhdl conn) p
hFlush (connOhdl conn) hFlush (connOhdl conn)
return ok return ok
case v of case v of
@ -153,18 +154,10 @@ runNet conn runner f = case f of
-- --
-- If too few bytes are sent, the only option is to give up on this -- If too few bytes are sent, the only option is to give up on this
-- connection. False is returned to indicate this problem. -- connection. False is returned to indicate this problem.
-- sendExactly :: Len -> L.ByteString -> Handle -> MeterUpdate -> IO Bool
-- We can't check the length of the whole lazy bytestring without buffering sendExactly (Len l) b h p = do
-- it in memory. Instead, process it one chunk at a time, and sum the length sent <- meteredWrite' p h (L.take (fromIntegral l) b)
-- of the chunks. return (fromBytesProcessed sent == l)
sendExactly :: Len -> L.ByteString -> Handle -> IO Bool
sendExactly (Len l) lb h = go 0 $ L.toChunks $ L.take (fromIntegral l) lb
where
go n [] = return (toInteger n == l)
go n (b:bs) = do
B.hPut h b
let !n' = n + B.length b
go n' bs
runRelay :: RunProto IO -> RelayHandle -> RelayHandle -> IO (Maybe ExitCode) runRelay :: RunProto IO -> RelayHandle -> RelayHandle -> IO (Maybe ExitCode)
runRelay runner (RelayHandle hout) (RelayHandle hin) = bracket setup cleanup go runRelay runner (RelayHandle hout) (RelayHandle hin) = bracket setup cleanup go

View file

@ -17,6 +17,7 @@ import Types.UUID
import Utility.AuthToken import Utility.AuthToken
import Utility.Applicative import Utility.Applicative
import Utility.PartialPrelude import Utility.PartialPrelude
import Utility.Metered
import Git.FilePath import Git.FilePath
import Control.Monad import Control.Monad
@ -163,7 +164,7 @@ local = hoistFree Local
data NetF c data NetF c
= SendMessage Message c = SendMessage Message c
| ReceiveMessage (Message -> c) | ReceiveMessage (Message -> c)
| SendBytes Len L.ByteString c | SendBytes Len L.ByteString MeterUpdate c
-- ^ Sends exactly Len bytes of data. (Any more or less will -- ^ Sends exactly Len bytes of data. (Any more or less will
-- confuse the receiver.) -- confuse the receiver.)
| ReceiveBytes Len (L.ByteString -> c) | ReceiveBytes Len (L.ByteString -> c)
@ -278,12 +279,12 @@ get dest key af = receiveContent sizer storer (\offset -> GET offset af key)
sizer = fileSize dest sizer = fileSize dest
storer = storeContentTo dest storer = storeContentTo dest
put :: Key -> AssociatedFile -> Proto Bool put :: Key -> AssociatedFile -> MeterUpdate -> Proto Bool
put key af = do put key af p = do
net $ sendMessage (PUT af key) net $ sendMessage (PUT af key)
r <- net receiveMessage r <- net receiveMessage
case r of case r of
PUT_FROM offset -> sendContent key af offset PUT_FROM offset -> sendContent key af offset p
ALREADY_HAVE -> return True ALREADY_HAVE -> return True
_ -> do _ -> do
net $ sendMessage (ERROR "expected PUT_FROM") net $ sendMessage (ERROR "expected PUT_FROM")
@ -368,7 +369,7 @@ serveAuthed myuuid = void $ serverLoop handler
local $ setPresent key myuuid local $ setPresent key myuuid
return ServerContinue return ServerContinue
handler (GET offset key af) = do handler (GET offset key af) = do
void $ sendContent af key offset void $ sendContent af key offset nullMeterUpdate
-- setPresent not called because the peer may have -- setPresent not called because the peer may have
-- requested the data but not permanently stored it. -- requested the data but not permanently stored it.
return ServerContinue return ServerContinue
@ -377,11 +378,11 @@ serveAuthed myuuid = void $ serverLoop handler
return ServerContinue return ServerContinue
handler _ = return ServerUnexpected handler _ = return ServerUnexpected
sendContent :: Key -> AssociatedFile -> Offset -> Proto Bool sendContent :: Key -> AssociatedFile -> Offset -> MeterUpdate -> Proto Bool
sendContent key af offset = do sendContent key af offset p = do
(len, content) <- readContentLen key af offset (len, content) <- readContentLen key af offset
net $ sendMessage (DATA len) net $ sendMessage (DATA len)
net $ sendBytes len content net $ sendBytes len content p
checkSuccess checkSuccess
receiveContent :: Local Len -> (Offset -> Len -> L.ByteString -> Local Bool) -> (Offset -> Message) -> Proto Bool receiveContent :: Local Len -> (Offset -> Len -> L.ByteString -> Local Bool) -> (Offset -> Message) -> Proto Bool
@ -456,5 +457,5 @@ relayToPeer (RelayDone exitcode) = sendMessage (CONNECTDONE exitcode)
relayToPeer (RelayToPeer b) = do relayToPeer (RelayToPeer b) = do
let len = Len $ fromIntegral $ L.length b let len = Len $ fromIntegral $ L.length b
sendMessage (DATA len) sendMessage (DATA len)
sendBytes len b sendBytes len b nullMeterUpdate
relayToPeer (RelayFromPeer _) = return () relayToPeer (RelayFromPeer _) = return ()

View file

@ -73,10 +73,9 @@ chainGen addr r u c gc = do
} }
return (Just this) return (Just this)
-- TODO update progress
store :: UUID -> P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> MeterUpdate -> Annex Bool store :: UUID -> P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> MeterUpdate -> Annex Bool
store u addr connpool k af p = fromMaybe False store u addr connpool k af p = fromMaybe False
<$> runProto u addr connpool (P2P.put k af) <$> runProto u addr connpool (P2P.put k af p)
retrieve :: UUID -> P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> Annex (Bool, Verification) retrieve :: UUID -> P2PAddress -> ConnectionPool -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> Annex (Bool, Verification)
retrieve u addr connpool k af dest _p = unVerified $ fromMaybe False retrieve u addr connpool k af dest _p = unVerified $ fromMaybe False

View file

@ -85,9 +85,12 @@ streamMeteredFile f meterupdate h = withMeteredFile f meterupdate $ L.hPut h
{- Writes a ByteString to a Handle, updating a meter as it's written. -} {- Writes a ByteString to a Handle, updating a meter as it's written. -}
meteredWrite :: MeterUpdate -> Handle -> L.ByteString -> IO () meteredWrite :: MeterUpdate -> Handle -> L.ByteString -> IO ()
meteredWrite meterupdate h = go zeroBytesProcessed . L.toChunks meteredWrite meterupdate h = void . meteredWrite' meterupdate h
meteredWrite' :: MeterUpdate -> Handle -> L.ByteString -> IO BytesProcessed
meteredWrite' meterupdate h = go zeroBytesProcessed . L.toChunks
where where
go _ [] = return () go sofar [] = return sofar
go sofar (c:cs) = do go sofar (c:cs) = do
S.hPut h c S.hPut h c
let sofar' = addBytesProcessed sofar $ S.length c let sofar' = addBytesProcessed sofar $ S.length c