clean P2P protocol shutdown on EOF try 2
Same goal as b18fb1e343
but without
breaking backwards compatability. Just return IO exceptions when running
the P2P protocol, so that git-annex-shell can detect eof and avoid the
ugly message.
This commit was sponsored by Ethan Aubin.
This commit is contained in:
parent
80defa62c6
commit
6134431254
8 changed files with 70 additions and 45 deletions
|
@ -32,8 +32,9 @@ run (_remotename:address:[]) = forever $
|
|||
| otherwise = parseAddressPort address
|
||||
go service = do
|
||||
ready
|
||||
either giveup exitWith
|
||||
=<< connectService onionaddress onionport service
|
||||
connectService onionaddress onionport service >>= \case
|
||||
Right exitcode -> exitWith exitcode
|
||||
Left e -> giveup $ describeProtoFailure e
|
||||
ready = do
|
||||
putStrLn ""
|
||||
hFlush stdout
|
||||
|
@ -48,7 +49,7 @@ parseAddressPort s =
|
|||
Nothing -> giveup "onion address must include port number"
|
||||
Just p -> (OnionAddress a, p)
|
||||
|
||||
connectService :: OnionAddress -> OnionPort -> Service -> IO (Either String ExitCode)
|
||||
connectService :: OnionAddress -> OnionPort -> Service -> IO (Either ProtoFailure ExitCode)
|
||||
connectService address port service = do
|
||||
state <- Annex.new =<< Git.CurrentRepo.get
|
||||
Annex.eval state $ do
|
||||
|
|
|
@ -324,4 +324,4 @@ setupLink remotename (P2PAddressAuth addr authtoken) = do
|
|||
storeP2PRemoteAuthToken addr authtoken
|
||||
return LinkSuccess
|
||||
go (Right Nothing) = return $ AuthenticationError "Unable to authenticate with peer. Please check the address and try again."
|
||||
go (Left e) = return $ AuthenticationError $ "Unable to authenticate with peer: " ++ e
|
||||
go (Left e) = return $ AuthenticationError $ "Unable to authenticate with peer: " ++ describeProtoFailure e
|
||||
|
|
|
@ -15,6 +15,8 @@ import qualified Annex
|
|||
import Annex.UUID
|
||||
import qualified CmdLine.GitAnnexShell.Checks as Checks
|
||||
|
||||
import System.IO.Error
|
||||
|
||||
cmd :: Command
|
||||
cmd = noMessages $ command "p2pstdio" SectionPlumbing
|
||||
"communicate in P2P protocol over stdio"
|
||||
|
@ -40,5 +42,9 @@ start theiruuid = do
|
|||
P2P.serveAuthed servermode myuuid
|
||||
runst <- liftIO $ mkRunState $ Serving theiruuid Nothing
|
||||
runFullProto runst conn server >>= \case
|
||||
Right () -> next $ next $ return True
|
||||
Left e -> giveup e
|
||||
Right () -> done
|
||||
-- Avoid displaying an error when the client hung up on us.
|
||||
Left (ProtoFailureIOError e) | isEOFError e -> done
|
||||
Left e -> giveup (describeProtoFailure e)
|
||||
where
|
||||
done = next $ next $ return True
|
||||
|
|
23
P2P/Annex.hs
23
P2P/Annex.hs
|
@ -28,7 +28,7 @@ import Utility.Metered
|
|||
import Control.Monad.Free
|
||||
|
||||
-- Full interpreter for Proto, that can receive and send objects.
|
||||
runFullProto :: RunState -> P2PConnection -> Proto a -> Annex (Either String a)
|
||||
runFullProto :: RunState -> P2PConnection -> Proto a -> Annex (Either ProtoFailure a)
|
||||
runFullProto runst conn = go
|
||||
where
|
||||
go :: RunProto Annex
|
||||
|
@ -36,7 +36,7 @@ runFullProto runst conn = go
|
|||
go (Free (Net n)) = runNet runst conn go n
|
||||
go (Free (Local l)) = runLocal runst go l
|
||||
|
||||
runLocal :: RunState -> RunProto Annex -> LocalF (Proto a) -> Annex (Either String a)
|
||||
runLocal :: RunState -> RunProto Annex -> LocalF (Proto a) -> Annex (Either ProtoFailure a)
|
||||
runLocal runst runner a = case a of
|
||||
TmpContentSize k next -> do
|
||||
tmp <- fromRepo $ gitAnnexTmpObjectLocation k
|
||||
|
@ -57,12 +57,12 @@ runLocal runst runner a = case a of
|
|||
transfer upload k af $
|
||||
sinkfile f o checkchanged sender
|
||||
case v' of
|
||||
Left e -> return (Left (show e))
|
||||
Right (Left e) -> return (Left (show e))
|
||||
Left e -> return $ Left $ ProtoFailureException e
|
||||
Right (Left e) -> return $ Left e
|
||||
Right (Right ok) -> runner (next ok)
|
||||
-- content not available
|
||||
Right Nothing -> runner (next False)
|
||||
Left e -> return (Left (show e))
|
||||
Left e -> return $ Left $ ProtoFailureException e
|
||||
StoreContent k af o l getb validitycheck next -> do
|
||||
-- This is the same as the retrievalSecurityPolicy of
|
||||
-- Remote.P2P and Remote.Git.
|
||||
|
@ -79,12 +79,12 @@ runLocal runst runner a = case a of
|
|||
SetPresent k u next -> do
|
||||
v <- tryNonAsync $ logChange k u InfoPresent
|
||||
case v of
|
||||
Left e -> return (Left (show e))
|
||||
Left e -> return $ Left $ ProtoFailureException e
|
||||
Right () -> runner next
|
||||
CheckContentPresent k next -> do
|
||||
v <- tryNonAsync $ inAnnex k
|
||||
case v of
|
||||
Left e -> return (Left (show e))
|
||||
Left e -> return $ Left $ ProtoFailureException e
|
||||
Right result -> runner (next result)
|
||||
RemoveContent k next -> do
|
||||
v <- tryNonAsync $
|
||||
|
@ -96,7 +96,7 @@ runLocal runst runner a = case a of
|
|||
, return True
|
||||
)
|
||||
case v of
|
||||
Left e -> return (Left (show e))
|
||||
Left e -> return $ Left $ ProtoFailureException e
|
||||
Right result -> runner (next result)
|
||||
TryLockContent k protoaction next -> do
|
||||
v <- tryNonAsync $ lockContentShared k $ \verifiedcopy ->
|
||||
|
@ -114,9 +114,10 @@ runLocal runst runner a = case a of
|
|||
Serving _ (Just h) _ -> do
|
||||
v <- tryNonAsync $ liftIO $ waitChangedRefs h
|
||||
case v of
|
||||
Left e -> return (Left (show e))
|
||||
Left e -> return $ Left $ ProtoFailureException e
|
||||
Right changedrefs -> runner (next changedrefs)
|
||||
_ -> return $ Left "change notification not available"
|
||||
_ -> return $ Left $
|
||||
ProtoFailureMessage "change notification not available"
|
||||
UpdateMeterTotalSize m sz next -> do
|
||||
liftIO $ setMeterTotalSize m sz
|
||||
runner next
|
||||
|
@ -153,7 +154,7 @@ runLocal runst runner a = case a of
|
|||
-- known. Force content
|
||||
-- verification.
|
||||
return (rightsize, MustVerify)
|
||||
Left e -> error e
|
||||
Left e -> error $ describeProtoFailure e
|
||||
|
||||
sinkfile f (Offset o) checkchanged sender p = bracket setup cleanup go
|
||||
where
|
||||
|
|
56
P2P/IO.hs
56
P2P/IO.hs
|
@ -1,6 +1,6 @@
|
|||
{- P2P protocol, IO implementation
|
||||
-
|
||||
- Copyright 2016 Joey Hess <id@joeyh.name>
|
||||
- Copyright 2016-2018 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
@ -18,6 +18,8 @@ module P2P.IO
|
|||
, closeConnection
|
||||
, serveUnixSocket
|
||||
, setupHandle
|
||||
, ProtoFailure(..)
|
||||
, describeProtoFailure
|
||||
, runNetProto
|
||||
, runNet
|
||||
) where
|
||||
|
@ -38,6 +40,7 @@ import Annex.ChangedRefs
|
|||
import Control.Monad.Free
|
||||
import Control.Monad.IO.Class
|
||||
import System.Exit (ExitCode(..))
|
||||
import System.IO.Error
|
||||
import Network.Socket
|
||||
import Control.Concurrent
|
||||
import Control.Concurrent.Async
|
||||
|
@ -48,7 +51,17 @@ import System.Log.Logger (debugM)
|
|||
import qualified Network.Socket as S
|
||||
|
||||
-- Type of interpreters of the Proto free monad.
|
||||
type RunProto m = forall a. Proto a -> m (Either String a)
|
||||
type RunProto m = forall a. Proto a -> m (Either ProtoFailure a)
|
||||
|
||||
data ProtoFailure
|
||||
= ProtoFailureMessage String
|
||||
| ProtoFailureException SomeException
|
||||
| ProtoFailureIOError IOError
|
||||
|
||||
describeProtoFailure :: ProtoFailure -> String
|
||||
describeProtoFailure (ProtoFailureMessage s) = s
|
||||
describeProtoFailure (ProtoFailureException e) = show e
|
||||
describeProtoFailure (ProtoFailureIOError e) = show e
|
||||
|
||||
data RunState
|
||||
= Serving UUID (Maybe ChangedRefsHandle) (TVar ProtocolVersion)
|
||||
|
@ -135,19 +148,20 @@ setupHandle s = do
|
|||
-- This only runs Net actions. No Local actions will be run
|
||||
-- (those need the Annex monad) -- if the interpreter reaches any,
|
||||
-- it returns Nothing.
|
||||
runNetProto :: RunState -> P2PConnection -> Proto a -> IO (Either String a)
|
||||
runNetProto :: RunState -> P2PConnection -> Proto a -> IO (Either ProtoFailure a)
|
||||
runNetProto runst conn = go
|
||||
where
|
||||
go :: RunProto IO
|
||||
go (Pure v) = return (Right v)
|
||||
go (Free (Net n)) = runNet runst conn go n
|
||||
go (Free (Local _)) = return (Left "unexpected annex operation attempted")
|
||||
go (Free (Local _)) = return $ Left $
|
||||
ProtoFailureMessage "unexpected annex operation attempted"
|
||||
|
||||
-- Interpreter of the Net part of Proto.
|
||||
--
|
||||
-- An interpreter of Proto has to be provided, to handle the rest of Proto
|
||||
-- actions.
|
||||
runNet :: (MonadIO m, MonadMask m) => RunState -> P2PConnection -> RunProto m -> NetF (Proto a) -> m (Either String a)
|
||||
runNet :: (MonadIO m, MonadMask m) => RunState -> P2PConnection -> RunProto m -> NetF (Proto a) -> m (Either ProtoFailure a)
|
||||
runNet runst conn runner f = case f of
|
||||
SendMessage m next -> do
|
||||
v <- liftIO $ tryNonAsync $ do
|
||||
|
@ -156,13 +170,14 @@ runNet runst conn runner f = case f of
|
|||
hPutStrLn (connOhdl conn) l
|
||||
hFlush (connOhdl conn)
|
||||
case v of
|
||||
Left e -> return (Left (show e))
|
||||
Left e -> return $ Left $ ProtoFailureException e
|
||||
Right () -> runner next
|
||||
ReceiveMessage next -> do
|
||||
v <- liftIO $ tryNonAsync $ getProtocolLine (connIhdl conn)
|
||||
v <- liftIO $ tryIOError $ getProtocolLine (connIhdl conn)
|
||||
case v of
|
||||
Left e -> return (Left (show e))
|
||||
Right Nothing -> return (Left "protocol error")
|
||||
Left e -> return $ Left $ ProtoFailureIOError e
|
||||
Right Nothing -> return $ Left $
|
||||
ProtoFailureMessage "protocol error"
|
||||
Right (Just l) -> case parseMessage l of
|
||||
Just m -> do
|
||||
liftIO $ debugMessage "P2P <" m
|
||||
|
@ -175,12 +190,13 @@ runNet runst conn runner f = case f of
|
|||
return ok
|
||||
case v of
|
||||
Right True -> runner next
|
||||
Right False -> return (Left "short data write")
|
||||
Left e -> return (Left (show e))
|
||||
Right False -> return $ Left $
|
||||
ProtoFailureMessage "short data write"
|
||||
Left e -> return $ Left $ ProtoFailureException e
|
||||
ReceiveBytes len p next -> do
|
||||
v <- liftIO $ tryNonAsync $ receiveExactly len (connIhdl conn) p
|
||||
case v of
|
||||
Left e -> return (Left (show e))
|
||||
Left e -> return $ Left $ ProtoFailureException e
|
||||
Right b -> runner (next b)
|
||||
CheckAuthToken _u t next -> do
|
||||
let authed = connCheckAuth conn t
|
||||
|
@ -188,12 +204,12 @@ runNet runst conn runner f = case f of
|
|||
Relay hin hout next -> do
|
||||
v <- liftIO $ runRelay runnerio hin hout
|
||||
case v of
|
||||
Left e -> return (Left e)
|
||||
Left e -> return $ Left e
|
||||
Right exitcode -> runner (next exitcode)
|
||||
RelayService service next -> do
|
||||
v <- liftIO $ runRelayService conn runnerio service
|
||||
case v of
|
||||
Left e -> return (Left e)
|
||||
Left e -> return $ Left e
|
||||
Right () -> runner next
|
||||
SetProtocolVersion v next -> do
|
||||
liftIO $ atomically $ writeTVar versiontvar v
|
||||
|
@ -236,10 +252,10 @@ sendExactly (Len n) b h p = do
|
|||
receiveExactly :: Len -> Handle -> MeterUpdate -> IO L.ByteString
|
||||
receiveExactly (Len n) h p = hGetMetered h (Just n) p
|
||||
|
||||
runRelay :: RunProto IO -> RelayHandle -> RelayHandle -> IO (Either String ExitCode)
|
||||
runRelay :: RunProto IO -> RelayHandle -> RelayHandle -> IO (Either ProtoFailure ExitCode)
|
||||
runRelay runner (RelayHandle hout) (RelayHandle hin) =
|
||||
bracket setup cleanup go
|
||||
`catchNonAsync` (return . Left . show)
|
||||
`catchNonAsync` (return . Left . ProtoFailureException)
|
||||
where
|
||||
setup = do
|
||||
v <- newEmptyMVar
|
||||
|
@ -253,10 +269,10 @@ runRelay runner (RelayHandle hout) (RelayHandle hin) =
|
|||
|
||||
go v = relayHelper runner v
|
||||
|
||||
runRelayService :: P2PConnection -> RunProto IO -> Service -> IO (Either String ())
|
||||
runRelayService :: P2PConnection -> RunProto IO -> Service -> IO (Either ProtoFailure ())
|
||||
runRelayService conn runner service =
|
||||
bracket setup cleanup go
|
||||
`catchNonAsync` (return . Left . show)
|
||||
`catchNonAsync` (return . Left . ProtoFailureException)
|
||||
where
|
||||
cmd = case service of
|
||||
UploadPack -> "upload-pack"
|
||||
|
@ -287,13 +303,13 @@ runRelayService conn runner service =
|
|||
go (v, _, _, _, _) = do
|
||||
r <- relayHelper runner v
|
||||
case r of
|
||||
Left e -> return (Left (show e))
|
||||
Left e -> return $ Left e
|
||||
Right exitcode -> runner $ net $ relayToPeer (RelayDone exitcode)
|
||||
|
||||
waitexit v pid = putMVar v . RelayDone =<< waitForProcess pid
|
||||
|
||||
-- Processes RelayData as it is put into the MVar.
|
||||
relayHelper :: RunProto IO -> MVar RelayData -> IO (Either String ExitCode)
|
||||
relayHelper :: RunProto IO -> MVar RelayData -> IO (Either ProtoFailure ExitCode)
|
||||
relayHelper runner v = loop
|
||||
where
|
||||
loop = do
|
||||
|
|
|
@ -343,7 +343,7 @@ runProtoConn a conn@(P2P.OpenConnection (runst, c, _, _)) = do
|
|||
-- When runFullProto fails, the connection is no longer
|
||||
-- usable, so close it.
|
||||
Left e -> do
|
||||
warning $ "Lost connection (" ++ e ++ ")"
|
||||
warning $ "Lost connection (" ++ P2P.describeProtoFailure e ++ ")"
|
||||
conn' <- fst <$> liftIO (closeP2PSshConnection conn)
|
||||
return (conn', Nothing)
|
||||
|
||||
|
|
|
@ -97,7 +97,7 @@ runProtoConn a c@(OpenConnection (runst, conn)) = do
|
|||
-- so close it.
|
||||
case v of
|
||||
Left e -> do
|
||||
warning $ "Lost connection to peer (" ++ e ++ ")"
|
||||
warning $ "Lost connection to peer (" ++ describeProtoFailure e ++ ")"
|
||||
liftIO $ closeConnection conn
|
||||
return (ClosedConnection, Nothing)
|
||||
Right r -> return (c, Just r)
|
||||
|
@ -162,7 +162,7 @@ openConnection u addr = do
|
|||
liftIO $ closeConnection conn
|
||||
return ClosedConnection
|
||||
Left e -> do
|
||||
warning $ "Problem communicating with peer. (" ++ e ++ ")"
|
||||
warning $ "Problem communicating with peer. (" ++ describeProtoFailure e ++ ")"
|
||||
liftIO $ closeConnection conn
|
||||
return ClosedConnection
|
||||
Left e -> do
|
||||
|
|
|
@ -120,10 +120,10 @@ serveClient th u r q = bracket setup cleanup start
|
|||
v <- liftIO $ runNetProto runstauth conn $ P2P.serveAuth u
|
||||
case v of
|
||||
Right (Just theiruuid) -> authed conn theiruuid
|
||||
Right Nothing -> liftIO $
|
||||
debugM "remotedaemon" "Tor connection failed to authenticate"
|
||||
Left e -> liftIO $
|
||||
debugM "remotedaemon" ("Tor connection error before authentication: " ++ e)
|
||||
Right Nothing -> liftIO $ debugM "remotedaemon"
|
||||
"Tor connection failed to authenticate"
|
||||
Left e -> liftIO $ debugM "remotedaemon" $
|
||||
"Tor connection error before authentication: " ++ describeProtoFailure e
|
||||
-- Merge the duplicated state back in.
|
||||
liftAnnex th $ mergeState st'
|
||||
|
||||
|
@ -134,7 +134,8 @@ serveClient th u r q = bracket setup cleanup start
|
|||
P2P.serveAuthed P2P.ServeReadWrite u
|
||||
case v' of
|
||||
Right () -> return ()
|
||||
Left e -> liftIO $ debugM "remotedaemon" ("Tor connection error: " ++ e)
|
||||
Left e -> liftIO $ debugM "remotedaemon" $
|
||||
"Tor connection error: " ++ describeProtoFailure e
|
||||
|
||||
-- Connect to peer's tor hidden service.
|
||||
transport :: Transport
|
||||
|
|
Loading…
Reference in a new issue