update progress logs in remotedaemon send/receive
This commit is contained in:
parent
1f3ed1b6b2
commit
38516b2fca
4 changed files with 57 additions and 49 deletions
|
@ -45,6 +45,11 @@ instance Observable (Bool, Verification) where
|
||||||
observeBool = fst
|
observeBool = fst
|
||||||
observeFailure = (False, UnVerified)
|
observeFailure = (False, UnVerified)
|
||||||
|
|
||||||
|
instance Observable (Either e Bool) where
|
||||||
|
observeBool (Left _) = False
|
||||||
|
observeBool (Right b) = b
|
||||||
|
observeFailure = Right False
|
||||||
|
|
||||||
upload :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v
|
upload :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v
|
||||||
upload u key f d a _witness = guardHaveUUID u $
|
upload u key f d a _witness = guardHaveUUID u $
|
||||||
runTransfer (Transfer Upload u key) f d a
|
runTransfer (Transfer Upload u key) f d a
|
||||||
|
|
57
P2P/Annex.hs
57
P2P/Annex.hs
|
@ -20,9 +20,9 @@ import P2P.Protocol
|
||||||
import P2P.IO
|
import P2P.IO
|
||||||
import Logs.Location
|
import Logs.Location
|
||||||
import Types.NumCopies
|
import Types.NumCopies
|
||||||
|
import Utility.Metered
|
||||||
|
|
||||||
import Control.Monad.Free
|
import Control.Monad.Free
|
||||||
import qualified Data.ByteString.Lazy as L
|
|
||||||
|
|
||||||
-- When we're serving a peer, we know their uuid, and can use it to update
|
-- When we're serving a peer, we know their uuid, and can use it to update
|
||||||
-- transfer logs.
|
-- transfer logs.
|
||||||
|
@ -52,35 +52,33 @@ runLocal runmode runner a = case a of
|
||||||
let getsize = liftIO . catchMaybeIO . getFileSize
|
let getsize = liftIO . catchMaybeIO . getFileSize
|
||||||
size <- inAnnex' isJust Nothing getsize k
|
size <- inAnnex' isJust Nothing getsize k
|
||||||
runner (next (Len <$> size))
|
runner (next (Len <$> size))
|
||||||
-- TODO transfer log not updated
|
ReadContent k af o sender next -> do
|
||||||
ReadContent k af (Offset o) next -> do
|
|
||||||
v <- tryNonAsync $ prepSendAnnex k
|
v <- tryNonAsync $ prepSendAnnex k
|
||||||
case v of
|
case v of
|
||||||
-- The check can detect a problem after the
|
-- The check can detect if the file
|
||||||
-- content is sent, but we don't use it.
|
-- changed while it was transferred, but we don't
|
||||||
-- Instead, the receiving peer must AlwaysVerify
|
-- use it. Instead, the receiving peer must
|
||||||
-- the content it receives.
|
-- AlwaysVerify the content it receives.
|
||||||
Right (Just (f, _check)) -> do
|
Right (Just (f, _check)) -> do
|
||||||
v' <- tryNonAsync $ -- transfer upload k af $
|
v' <- tryNonAsync $
|
||||||
liftIO $ do
|
transfer upload k af $
|
||||||
h <- openBinaryFile f ReadMode
|
sinkfile f o sender
|
||||||
when (o /= 0) $
|
|
||||||
hSeek h AbsoluteSeek o
|
|
||||||
L.hGetContents h
|
|
||||||
case v' of
|
case v' of
|
||||||
Left e -> return (Left (show e))
|
Left e -> return (Left (show e))
|
||||||
Right b -> runner (next b)
|
Right (Left e) -> return (Left (show e))
|
||||||
Right Nothing -> return (Left "content not available")
|
Right (Right ok) -> runner (next ok)
|
||||||
|
-- content not available
|
||||||
|
Right Nothing -> runner (next False)
|
||||||
Left e -> return (Left (show e))
|
Left e -> return (Left (show e))
|
||||||
StoreContent k af o l getb next -> do
|
StoreContent k af o l getb next -> do
|
||||||
ok <- flip catchNonAsync (const $ return False) $
|
ok <- flip catchNonAsync (const $ return False) $
|
||||||
transfer download k af $
|
transfer download k af $ \p ->
|
||||||
getViaTmp AlwaysVerify k $ \tmp ->
|
getViaTmp AlwaysVerify k $ \tmp ->
|
||||||
unVerified $ storefile tmp o l getb
|
unVerified $ storefile tmp o l getb p
|
||||||
runner (next ok)
|
runner (next ok)
|
||||||
StoreContentTo dest o l getb next -> do
|
StoreContentTo dest o l getb next -> do
|
||||||
ok <- flip catchNonAsync (const $ return False) $
|
ok <- flip catchNonAsync (const $ return False) $
|
||||||
storefile dest o l getb
|
storefile dest o l getb nullMeterUpdate
|
||||||
runner (next ok)
|
runner (next ok)
|
||||||
SetPresent k u next -> do
|
SetPresent k u next -> do
|
||||||
v <- tryNonAsync $ logChange k u InfoPresent
|
v <- tryNonAsync $ logChange k u InfoPresent
|
||||||
|
@ -116,18 +114,31 @@ runLocal runmode runner a = case a of
|
||||||
transfer mk k af ta = case runmode of
|
transfer mk k af ta = case runmode of
|
||||||
-- Update transfer logs when serving.
|
-- Update transfer logs when serving.
|
||||||
Serving theiruuid ->
|
Serving theiruuid ->
|
||||||
mk theiruuid k af noRetry (const ta) noNotification
|
mk theiruuid k af noRetry ta noNotification
|
||||||
-- Transfer logs are updated higher in the stack when
|
-- Transfer logs are updated higher in the stack when
|
||||||
-- a client.
|
-- a client.
|
||||||
Client -> ta
|
Client -> ta nullMeterUpdate
|
||||||
storefile dest (Offset o) (Len l) getb = do
|
|
||||||
|
storefile dest (Offset o) (Len l) getb p = do
|
||||||
|
let p' = offsetMeterUpdate p (toBytesProcessed o)
|
||||||
v <- runner getb
|
v <- runner getb
|
||||||
case v of
|
case v of
|
||||||
Right b -> liftIO $ do
|
Right b -> liftIO $ do
|
||||||
withBinaryFile dest ReadWriteMode $ \h -> do
|
withBinaryFile dest ReadWriteMode $ \h -> do
|
||||||
when (o /= 0) $
|
when (o /= 0) $
|
||||||
hSeek h AbsoluteSeek o
|
hSeek h AbsoluteSeek o
|
||||||
L.hPut h b
|
meteredWrite p' h b
|
||||||
sz <- liftIO $ getFileSize dest
|
sz <- getFileSize dest
|
||||||
return (toInteger sz == l + o)
|
return (toInteger sz == l + o)
|
||||||
Left e -> error e
|
Left e -> error e
|
||||||
|
|
||||||
|
sinkfile f (Offset o) sender p = bracket setup cleanup go
|
||||||
|
where
|
||||||
|
setup = liftIO $ openBinaryFile f ReadMode
|
||||||
|
cleanup = liftIO . hClose
|
||||||
|
go h = do
|
||||||
|
let p' = offsetMeterUpdate p (toBytesProcessed o)
|
||||||
|
when (o /= 0) $
|
||||||
|
liftIO $ hSeek h AbsoluteSeek o
|
||||||
|
b <- liftIO $ hGetContentsMetered h p'
|
||||||
|
runner (sender b)
|
||||||
|
|
|
@ -197,9 +197,10 @@ data LocalF c
|
||||||
| ContentSize Key (Maybe Len -> c)
|
| ContentSize Key (Maybe Len -> c)
|
||||||
-- ^ Gets size of the content of a key, when the full content is
|
-- ^ Gets size of the content of a key, when the full content is
|
||||||
-- present.
|
-- present.
|
||||||
| ReadContent Key AssociatedFile Offset (L.ByteString -> c)
|
| ReadContent Key AssociatedFile Offset (L.ByteString -> Proto Bool) (Bool -> c)
|
||||||
-- ^ Lazily reads the content of a key. Note that the content
|
-- ^ Reads the content of a key and sends it to the callback.
|
||||||
-- may change while it's being sent.
|
-- Note that the content may change while it's being sent.
|
||||||
|
-- If the content is not available, sends L.empty to the callback.
|
||||||
| StoreContent Key AssociatedFile Offset Len (Proto L.ByteString) (Bool -> c)
|
| StoreContent Key AssociatedFile Offset Len (Proto L.ByteString) (Bool -> c)
|
||||||
-- ^ Stores content to the key's temp file starting at an offset.
|
-- ^ Stores content to the key's temp file starting at an offset.
|
||||||
-- Once the whole content of the key has been stored, moves the
|
-- Once the whole content of the key has been stored, moves the
|
||||||
|
@ -381,9 +382,17 @@ serveAuthed myuuid = void $ serverLoop handler
|
||||||
handler _ = return ServerUnexpected
|
handler _ = return ServerUnexpected
|
||||||
|
|
||||||
sendContent :: Key -> AssociatedFile -> Offset -> MeterUpdate -> Proto Bool
|
sendContent :: Key -> AssociatedFile -> Offset -> MeterUpdate -> Proto Bool
|
||||||
sendContent key af offset@(Offset n) p = do
|
sendContent key af offset@(Offset n) p = go =<< local (contentSize key)
|
||||||
|
where
|
||||||
|
go Nothing = sender (Len 0) L.empty
|
||||||
|
go (Just (Len totallen)) = do
|
||||||
|
let len = totallen - n
|
||||||
|
if len <= 0
|
||||||
|
then sender (Len 0) L.empty
|
||||||
|
else local $ readContent key af offset $
|
||||||
|
sender (Len len)
|
||||||
|
sender len content = do
|
||||||
let p' = offsetMeterUpdate p (toBytesProcessed n)
|
let p' = offsetMeterUpdate p (toBytesProcessed n)
|
||||||
(len, content) <- readContentLen key af offset
|
|
||||||
net $ sendMessage (DATA len)
|
net $ sendMessage (DATA len)
|
||||||
net $ sendBytes len content p'
|
net $ sendBytes len content p'
|
||||||
checkSuccess
|
checkSuccess
|
||||||
|
@ -419,22 +428,6 @@ sendSuccess :: Bool -> Proto ()
|
||||||
sendSuccess True = net $ sendMessage SUCCESS
|
sendSuccess True = net $ sendMessage SUCCESS
|
||||||
sendSuccess False = net $ sendMessage FAILURE
|
sendSuccess False = net $ sendMessage FAILURE
|
||||||
|
|
||||||
-- Reads content from an offset. The Len should correspond to
|
|
||||||
-- the length of the ByteString, but to avoid buffering the content
|
|
||||||
-- in memory, is gotten using contentSize.
|
|
||||||
readContentLen :: Key -> AssociatedFile -> Offset -> Proto (Len, L.ByteString)
|
|
||||||
readContentLen key af (Offset offset) = go =<< local (contentSize key)
|
|
||||||
where
|
|
||||||
go Nothing = return (Len 0, L.empty)
|
|
||||||
go (Just (Len totallen)) = do
|
|
||||||
let len = totallen - offset
|
|
||||||
if len <= 0
|
|
||||||
then return (Len 0, L.empty)
|
|
||||||
else do
|
|
||||||
content <- local $
|
|
||||||
readContent key af (Offset offset)
|
|
||||||
return (Len len, content)
|
|
||||||
|
|
||||||
connect :: Service -> Handle -> Handle -> Proto ExitCode
|
connect :: Service -> Handle -> Handle -> Proto ExitCode
|
||||||
connect service hin hout = do
|
connect service hin hout = do
|
||||||
net $ sendMessage (CONNECT service)
|
net $ sendMessage (CONNECT service)
|
||||||
|
|
|
@ -8,7 +8,6 @@ Current todo list:
|
||||||
object is already in progress, the message about this is output by the
|
object is already in progress, the message about this is output by the
|
||||||
remotedaemon --debug, but not forwarded to the peer, which shows
|
remotedaemon --debug, but not forwarded to the peer, which shows
|
||||||
"Connection reset by peer"
|
"Connection reset by peer"
|
||||||
* update progress logs in remotedaemon send/receive
|
|
||||||
* Think about locking some more. What happens if the connection to the peer
|
* Think about locking some more. What happens if the connection to the peer
|
||||||
is dropped while we think we're locking content there from being dropped?
|
is dropped while we think we're locking content there from being dropped?
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue