Fixed some other potential hangs in the P2P protocol

Finishes the start made in 983c9d5a53, by
handling the case where `transfer` fails for some other reason, and so the
ReadContent callback does not get run. I don't know of a case where
`transfer` does fail other than the locking dealt with in that commit, but
it's good to have a guarantee.

StoreContent and StoreContentTo had a similar problem.
Things like `getViaTmp` may decide not to run the transfer action.
And `transfer` could certianly fail, if another transfer of the same
object was in progress. (Or a different object when annex.pidlock is set.)

If the transfer action was not run, the content of the object would
not all get consumed, and so would get interpreted as protocol commands,
which would not go well.

My approach to fixing all of these things is to set a TVar only
once all the data in the transfer is known to have been read/written.
This way the internals of `transfer`, `getViaTmp` etc don't matter.

So in ReadContent, it checks if the transfer completed.
If not, as long as it didn't throw an exception, send empty and Invalid
data to the callback. On an exception the state of the protocol is unknown
so it has to raise ProtoFailureException and close the connection,
same as before.

In StoreContent, if the transfer did not complete
some portion of the DATA has been read, so the protocol is in an unknown
state and it has to close the conection as well.

(The ProtoFailureMessage used here matches the one in Annex.Transfer, which
is the most likely reason. Not ideal to duplicate it..)

StoreContent did not ever close the protocol connection before. So this is
a protocol change, but only in an exceptional circumstance, and it's not
going to break anything, because clients already need to deal with the
connection breaking at any point.

The way this new behavior looks (here origin has annex.pidlock = true so will
only accept one upload to it at a time):

git annex copy --to origin -J2
copy x (to origin...) ok
copy y (to origin...)
  Lost connection (fd:25: hGetChar: end of file)

This work is supported by the NIH-funded NICEMAN (ReproNim TR&D3) project.
This commit is contained in:
Joey Hess 2018-11-06 14:44:00 -04:00
parent 9adc0b3417
commit 6ecd55a9fa
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
6 changed files with 90 additions and 20 deletions

View file

@ -26,6 +26,7 @@ import Types.Remote (RetrievalSecurityPolicy(..))
import Utility.Metered
import Control.Monad.Free
import Control.Concurrent.STM
-- Full interpreter for Proto, that can receive and send objects.
runFullProto :: RunState -> P2PConnection -> Proto a -> Annex (Either ProtoFailure a)
@ -56,28 +57,46 @@ runLocal runst runner a = case a of
Left e -> return $ Left $ ProtoFailureException e
Right (Left e) -> return $ Left e
Right (Right ok) -> runner (next ok)
-- If the content is not present, or the transfer doesn't
-- run for any other action, the sender action still must
-- be run, so is given empty and Invalid data.
let fallback = runner (sender mempty (return Invalid))
v <- tryNonAsync $ prepSendAnnex k
case v of
Right (Just (f, checkchanged)) -> proceed $
-- Allow multiple uploads of the same key.
transfer alwaysUpload k af $
sinkfile f o checkchanged sender
Right Nothing -> proceed $
runner (sender mempty (return Invalid))
Right (Just (f, checkchanged)) -> proceed $ do
-- alwaysUpload to allow multiple uploads of the same key.
let runtransfer ti = transfer alwaysUpload k af $ \p ->
sinkfile f o checkchanged sender p ti
checktransfer runtransfer fallback
Right Nothing -> proceed fallback
Left e -> return $ Left $ ProtoFailureException e
StoreContent k af o l getb validitycheck next -> do
-- This is the same as the retrievalSecurityPolicy of
-- Remote.P2P and Remote.Git.
-- Remote.P2P and Remote.Git.
let rsp = RetrievalAllKeysSecure
ok <- flip catchNonAsync (const $ return False) $
transfer download k af $ \p ->
getViaTmp rsp DefaultVerify k $ \tmp -> do
storefile tmp o l getb validitycheck p
runner (next ok)
v <- tryNonAsync $ do
let runtransfer ti =
Right <$> transfer download k af (\p ->
getViaTmp rsp DefaultVerify k $ \tmp ->
storefile tmp o l getb validitycheck p ti)
let fallback = return $ Left $
ProtoFailureMessage "transfer already in progress, or unable to take transfer lock"
checktransfer runtransfer fallback
case v of
Left e -> return $ Left $ ProtoFailureException e
Right (Left e) -> return $ Left e
Right (Right ok) -> runner (next ok)
StoreContentTo dest o l getb validitycheck next -> do
res <- flip catchNonAsync (const $ return (False, UnVerified)) $
storefile dest o l getb validitycheck nullMeterUpdate
runner (next res)
v <- tryNonAsync $ do
let runtransfer ti = Right
<$> storefile dest o l getb validitycheck nullMeterUpdate ti
let fallback = return $ Left $
ProtoFailureMessage "transfer failed"
checktransfer runtransfer fallback
case v of
Left e -> return $ Left $ ProtoFailureException e
Right (Left e) -> return $ Left e
Right (Right ok) -> runner (next ok)
SetPresent k u next -> do
v <- tryNonAsync $ logChange k u InfoPresent
case v of
@ -123,7 +142,7 @@ runLocal runst runner a = case a of
UpdateMeterTotalSize m sz next -> do
liftIO $ setMeterTotalSize m sz
runner next
RunValidityCheck check next -> runner . next =<< check
RunValidityCheck checkaction next -> runner . next =<< checkaction
where
transfer mk k af ta = case runst of
-- Update transfer logs when serving.
@ -133,8 +152,8 @@ runLocal runst runner a = case a of
-- Transfer logs are updated higher in the stack when
-- a client.
Client _ -> ta nullMeterUpdate
storefile dest (Offset o) (Len l) getb validitycheck p = do
storefile dest (Offset o) (Len l) getb validitycheck p ti = do
let p' = offsetMeterUpdate p (toBytesProcessed o)
v <- runner getb
case v of
@ -143,6 +162,8 @@ runLocal runst runner a = case a of
when (o /= 0) $
hSeek h AbsoluteSeek o
meteredWrite p' h b
indicatetransferred ti
rightsize <- do
sz <- liftIO $ getFileSize dest
return (toInteger sz == l + o)
@ -158,7 +179,7 @@ runLocal runst runner a = case a of
return (rightsize, MustVerify)
Left e -> error $ describeProtoFailure e
sinkfile f (Offset o) checkchanged sender p = bracket setup cleanup go
sinkfile f (Offset o) checkchanged sender p ti = bracket setup cleanup go
where
setup = liftIO $ openBinaryFile f ReadMode
cleanup = liftIO . hClose
@ -167,8 +188,34 @@ runLocal runst runner a = case a of
when (o /= 0) $
liftIO $ hSeek h AbsoluteSeek o
b <- liftIO $ hGetContentsMetered h p'
let validitycheck = local $ runValidityCheck $
checkchanged >>= return . \case
False -> Invalid
True -> Valid
runner (sender b validitycheck)
r <- runner (sender b validitycheck)
indicatetransferred ti
return r
-- This allows using actions like download and viaTmp
-- that may abort a transfer, and clean up the protocol after them.
--
-- Runs an action that may make a transfer, passing a transfer
-- indicator. The action should call indicatetransferred on it,
-- only after it's actually sent/received the all data.
--
-- If the action ends without having called indicatetransferred,
-- runs the fallback action, which can close the protoocol
-- connection or otherwise clean up after the transfer not having
-- occurred.
--
-- If the action throws an exception, the fallback is not run.
checktransfer ta fallback = do
ti <- liftIO $ newTVarIO False
r <- ta ti
ifM (liftIO $ atomically $ readTVar ti)
( return r
, fallback
)
indicatetransferred ti = liftIO $ atomically $ writeTVar ti True