From d50b0f3bb3f8509d462b70fbea072227b80e9227 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Sun, 20 Nov 2016 12:08:16 -0400 Subject: [PATCH] implement p2p protocol for Handle This is most of the way to having the p2p protocol working over tor hidden services, at least enough to do git push/pull. The free monad was split into two, one for network operations and the other for local (Annex) operations. This will allow git-remote-tor-annex to run only an IO action, not needing the Annex monad. This commit was sponsored by Remy van Elst on Patreon. --- Remote/Helper/P2P.hs | 572 +++++++++++++++++++++------------------- Remote/Helper/P2P/IO.hs | 159 +++++++++++ git-annex.cabal | 1 + 3 files changed, 455 insertions(+), 277 deletions(-) create mode 100644 Remote/Helper/P2P/IO.hs diff --git a/Remote/Helper/P2P.hs b/Remote/Helper/P2P.hs index d3d3dfa088..fbd6c24635 100644 --- a/Remote/Helper/P2P.hs +++ b/Remote/Helper/P2P.hs @@ -7,20 +7,7 @@ {-# LANGUAGE DeriveFunctor, TemplateHaskell, FlexibleContexts, RankNTypes #-} -module Remote.Helper.P2P ( - AuthToken(..), - ProtoF(..), - runPure, - protoDump, - auth, - checkPresent, - lockContentWhile, - remove, - get, - put, - connect, - serve, -) where +module Remote.Helper.P2P where import qualified Utility.SimpleProtocol as Proto import Types.Key @@ -33,7 +20,7 @@ import Control.Monad.Free import Control.Monad.Free.TH import Control.Monad.Catch import System.Exit (ExitCode(..)) -import System.IO (Handle) +import System.IO import qualified Data.ByteString.Lazy as L newtype AuthToken = AuthToken String @@ -49,10 +36,6 @@ newtype Len = Len Integer data Service = UploadPack | ReceivePack deriving (Show) -data RelayData - = RelayData L.ByteString - | RelayMessage Message - -- | Messages in the protocol. The peer that makes the connection -- always initiates requests, and the other peer makes responses to them. data Message @@ -75,264 +58,6 @@ data Message | ERROR String deriving (Show) --- | Free monad for implementing actions that use the protocol. -data ProtoF next - = SendMessage Message next - | ReceiveMessage (Message -> next) - | SendBytes Len L.ByteString next - | ReceiveBytes Len (L.ByteString -> next) - -- ^ Lazily reads bytes from peer. Stops once Len are read, - -- or if connection is lost, and in either case returns the bytes - -- that were read. This allows resuming interrupted transfers. - | KeyFileSize Key (Len -> next) - -- ^ Checks size of key file (dne = 0) - | ReadKeyFile Key Offset (L.ByteString -> next) - | WriteKeyFile Key Offset Len L.ByteString (Bool -> next) - -- ^ Writes to key file starting at an offset. Returns True - -- once the whole content of the key is stored in the key file. - -- - -- Note: The ByteString may not contain the entire remaining content - -- of the key. Only once the key file size == Len has the whole - -- content been transferred. - | CheckAuthToken UUID AuthToken (Bool -> next) - | SetPresent Key UUID next - | CheckContentPresent Key (Bool -> next) - -- ^ Checks if the whole content of the key is locally present. - | RemoveKeyFile Key (Bool -> next) - -- ^ If the key file is not present, still succeeds. - -- May fail if not enough copies to safely drop, etc. - | TryLockContent Key (Bool -> Proto ()) next - | WriteHandle Handle L.ByteString next - -- ^ Try to lock the content of a key, preventing it - -- from being deleted, and run the provided protocol action. - | Relay Handle (RelayData -> Proto (Maybe ExitCode)) (ExitCode -> next) - -- ^ Waits for data to be written to the Handle, and for messages - -- to be received from the peer, and passes the data to the - -- callback, continuing until it returns an ExitCode. - | RelayService Service - (Handle -> RelayData -> Proto (Maybe ExitCode)) - (ExitCode -> next) - -- ^ Runs a service, and waits for it to output to stdout, - -- and for messages to be received from the peer, and passes - -- the data to the callback (which is also passed the service's - -- stdin Handle), continuing uniil the service exits. - deriving (Functor) - -type Proto = Free ProtoF - -$(makeFree ''ProtoF) - --- | Running Proto actions purely, to see what they do. -runPure :: Show r => Proto r -> [Message] -> [(String, Maybe Message)] -runPure (Pure r) _ = [("result: " ++ show r, Nothing)] -runPure (Free (SendMessage m next)) ms = (">", Just m):runPure next ms -runPure (Free (ReceiveMessage _)) [] = [("not enough Messages provided", Nothing)] -runPure (Free (ReceiveMessage next)) (m:ms) = ("<", Just m):runPure (next m) ms -runPure (Free (SendBytes _ _ next)) ms = ("> bytes", Nothing):runPure next ms -runPure (Free (ReceiveBytes _ next)) ms = ("< bytes", Nothing):runPure (next L.empty) ms -runPure (Free (KeyFileSize _ next)) ms = runPure (next (Len 100)) ms -runPure (Free (ReadKeyFile _ _ next)) ms = runPure (next L.empty) ms -runPure (Free (WriteKeyFile _ _ _ _ next)) ms = runPure (next True) ms -runPure (Free (CheckAuthToken _ _ next)) ms = runPure (next True) ms -runPure (Free (SetPresent _ _ next)) ms = runPure next ms -runPure (Free (CheckContentPresent _ next)) ms = runPure (next False) ms -runPure (Free (RemoveKeyFile _ next)) ms = runPure (next True) ms -runPure (Free (TryLockContent _ p next)) ms = runPure (p True >> next) ms -runPure (Free (WriteHandle _ _ next)) ms = runPure next ms -runPure (Free (Relay _ _ next)) ms = runPure (next ExitSuccess) ms -runPure (Free (RelayService _ _ next)) ms = runPure (next ExitSuccess) ms - -protoDump :: [(String, Maybe Message)] -> String -protoDump = unlines . map protoDump' - -protoDump' :: (String, Maybe Message) -> String -protoDump' (s, Nothing) = s -protoDump' (s, Just m) = s ++ " " ++ unwords (Proto.formatMessage m) - -auth :: UUID -> AuthToken -> Proto (Maybe UUID) -auth myuuid t = do - sendMessage (AUTH myuuid t) - r <- receiveMessage - case r of - AUTH_SUCCESS theiruuid -> return $ Just theiruuid - AUTH_FAILURE -> return Nothing - _ -> do - sendMessage (ERROR "auth failed") - return Nothing - -checkPresent :: Key -> Proto Bool -checkPresent key = do - sendMessage (CHECKPRESENT key) - checkSuccess - -{- Locks content to prevent it from being dropped, while running an action. - - - - Note that this only guarantees that the content is locked as long as the - - connection to the peer remains up. If the connection is unexpectededly - - dropped, the peer will then unlock the content. - -} -lockContentWhile - :: MonadMask m - => (forall r. Proto r -> m r) - -> Key - -> (Bool -> m ()) - -> m () -lockContentWhile runproto key a = bracket setup cleanup a - where - setup = runproto $ do - sendMessage (LOCKCONTENT key) - checkSuccess - cleanup True = runproto $ sendMessage UNLOCKCONTENT - cleanup False = return () - -remove :: Key -> Proto Bool -remove key = do - sendMessage (REMOVE key) - checkSuccess - -get :: Key -> Proto Bool -get key = receiveContent key (`GET` key) - -put :: Key -> Proto Bool -put key = do - sendMessage (PUT key) - r <- receiveMessage - case r of - PUT_FROM offset -> sendContent key offset - ALREADY_HAVE -> return True - _ -> do - sendMessage (ERROR "expected PUT_FROM") - return False - -connect :: Service -> Handle -> Handle -> Proto ExitCode -connect service hin hout = do - sendMessage (CONNECT service) - relay hin (relayCallback hout) - -relayCallback :: Handle -> RelayData -> Proto (Maybe ExitCode) -relayCallback hout (RelayMessage (DATA len)) = do - writeHandle hout =<< receiveBytes len - return Nothing -relayCallback _ (RelayMessage (CONNECTDONE exitcode)) = - return (Just exitcode) -relayCallback _ (RelayMessage _) = do - sendMessage (ERROR "expected DATA or CONNECTDONE") - return (Just (ExitFailure 1)) -relayCallback _ (RelayData b) = do - let len = Len $ fromIntegral $ L.length b - sendMessage (DATA len) - sendBytes len b - return Nothing - --- | Serve the protocol. --- --- Note that if the client sends an unexpected message, the server will --- respond with PTOTO_ERROR, and always continues processing messages. --- Since the protocol is not versioned, this is necessary to handle --- protocol changes robustly, since the client can detect when it's --- talking to a server that does not support some new feature, and fall --- back. --- --- When the client sends ERROR to the server, the server gives up, --- since it's not clear what state the client is is, and so not possible to --- recover. -serve :: UUID -> Proto () -serve myuuid = go Nothing - where - go autheduuid = do - r <- receiveMessage - case r of - AUTH theiruuid authtoken -> do - ok <- checkAuthToken theiruuid authtoken - if ok - then do - sendMessage (AUTH_SUCCESS myuuid) - go (Just theiruuid) - else do - sendMessage AUTH_FAILURE - go autheduuid - ERROR _ -> return () - _ -> do - case autheduuid of - Just theiruuid -> authed theiruuid r - Nothing -> sendMessage (ERROR "must AUTH first") - go autheduuid - - authed _theiruuid r = case r of - LOCKCONTENT key -> tryLockContent key $ \locked -> do - sendSuccess locked - when locked $ do - r' <- receiveMessage - case r' of - UNLOCKCONTENT -> return () - _ -> sendMessage (ERROR "expected UNLOCKCONTENT") - CHECKPRESENT key -> sendSuccess =<< checkContentPresent key - REMOVE key -> sendSuccess =<< removeKeyFile key - PUT key -> do - have <- checkContentPresent key - if have - then sendMessage ALREADY_HAVE - else do - ok <- receiveContent key PUT_FROM - when ok $ - setPresent key myuuid - -- setPresent not called because the peer may have - -- requested the data but not permanatly stored it. - GET offset key -> void $ sendContent key offset - CONNECT service -> do - exitcode <- relayService service relayCallback - sendMessage (CONNECTDONE exitcode) - _ -> sendMessage (ERROR "unexpected command") - -sendContent :: Key -> Offset -> Proto Bool -sendContent key offset = do - (len, content) <- readKeyFileLen key offset - sendMessage (DATA len) - sendBytes len content - checkSuccess - -receiveContent :: Key -> (Offset -> Message) -> Proto Bool -receiveContent key mkmsg = do - Len n <- keyFileSize key - let offset = Offset n - sendMessage (mkmsg offset) - r <- receiveMessage - case r of - DATA len -> do - ok <- writeKeyFile key offset len =<< receiveBytes len - sendSuccess ok - return ok - _ -> do - sendMessage (ERROR "expected DATA") - return False - -checkSuccess :: Proto Bool -checkSuccess = do - ack <- receiveMessage - case ack of - SUCCESS -> return True - FAILURE -> return False - _ -> do - sendMessage (ERROR "expected SUCCESS or FAILURE") - return False - -sendSuccess :: Bool -> Proto () -sendSuccess True = sendMessage SUCCESS -sendSuccess False = sendMessage FAILURE - --- Reads key file from an offset. The Len should correspond to --- the length of the ByteString, but to avoid buffering the content --- in memory, is gotten using keyFileSize. -readKeyFileLen :: Key -> Offset -> Proto (Len, L.ByteString) -readKeyFileLen key (Offset offset) = do - (Len totallen) <- keyFileSize key - let len = totallen - offset - if len <= 0 - then return (Len 0, L.empty) - else do - content <- readKeyFile key (Offset offset) - return (Len len, content) - instance Proto.Sendable Message where formatMessage (AUTH uuid authtoken) = ["AUTH", Proto.serialize uuid, Proto.serialize authtoken] formatMessage (AUTH_SUCCESS uuid) = ["AUTH-SUCCESS", Proto.serialize uuid] @@ -390,3 +115,296 @@ instance Proto.Serializable Service where deserialize "git-upload-pack" = Just UploadPack deserialize "git-receive-pack" = Just ReceivePack deserialize _ = Nothing + +-- | Free monad for the protocol, combining net communication, +-- and local actions. +data ProtoF c = Net (NetF c) | Local (LocalF c) + deriving (Functor) + +type Proto = Free ProtoF + +net :: Net a -> Proto a +net = hoistFree Net + +local :: Local a -> Proto a +local = hoistFree Local + +data NetF c + = SendMessage Message c + | ReceiveMessage (Message -> c) + | SendBytes Len L.ByteString c + | ReceiveBytes Len (L.ByteString -> c) + | Relay RelayHandle + (RelayData -> Net (Maybe ExitCode)) + (ExitCode -> c) + -- ^ Waits for data to be written to the RelayHandle, and for messages + -- to be received from the peer, and passes the data to the + -- callback, continuing until it returns an ExitCode. + | RelayService Service + (RelayHandle -> RelayData -> Net (Maybe ExitCode)) + (ExitCode -> c) + -- ^ Runs a service, and waits for it to output to stdout, + -- and for messages to be received from the peer, and passes + -- the data to the callback (which is also passed the service's + -- stdin RelayHandle), continuing uniil the service exits. + | WriteRelay RelayHandle L.ByteString c + -- ^ Write data to a relay's handle, flushing it immediately. + deriving (Functor) + +type Net = Free NetF + +data RelayData + = RelayData L.ByteString + | RelayMessage Message + +newtype RelayHandle = RelayHandle Handle + +data LocalF c + -- ^ Lazily reads bytes from peer. Stops once Len are read, + -- or if connection is lost, and in either case returns the bytes + -- that were read. This allows resuming interrupted transfers. + = KeyFileSize Key (Len -> c) + -- ^ Checks size of key file (dne = 0) + | ReadKeyFile Key Offset (L.ByteString -> c) + | WriteKeyFile Key Offset Len L.ByteString (Bool -> c) + -- ^ Writes to key file starting at an offset. Returns True + -- once the whole content of the key is stored in the key file. + -- + -- Note: The ByteString may not contain the entire remaining content + -- of the key. Only once the key file size == Len has the whole + -- content been transferred. + | CheckAuthToken UUID AuthToken (Bool -> c) + | SetPresent Key UUID c + | CheckContentPresent Key (Bool -> c) + -- ^ Checks if the whole content of the key is locally present. + | RemoveKeyFile Key (Bool -> c) + -- ^ If the key file is not present, still succeeds. + -- May fail if not enough copies to safely drop, etc. + | TryLockContent Key (Bool -> Proto ()) c + -- ^ Try to lock the content of a key, preventing it + -- from being deleted, and run the provided protocol action. + deriving (Functor) + +type Local = Free LocalF + +-- Generate sendMessage etc functions for all free monad constructors. +$(makeFree ''NetF) +$(makeFree ''LocalF) + +-- | Running Proto actions purely, to see what they do. +runPure :: Show r => Proto r -> [Message] -> [(String, Maybe Message)] +runPure (Pure r) _ = [("result: " ++ show r, Nothing)] +runPure (Free (Net n)) ms = runNet n ms +runPure (Free (Local n)) ms = runLocal n ms + +runNet :: Show r => NetF (Proto r) -> [Message] -> [(String, Maybe Message)] +runNet (SendMessage m next) ms = (">", Just m):runPure next ms +runNet (ReceiveMessage _) [] = [("not enough Messages provided", Nothing)] +runNet (ReceiveMessage next) (m:ms) = ("<", Just m):runPure (next m) ms +runNet (SendBytes _ _ next) ms = ("> bytes", Nothing):runPure next ms +runNet (ReceiveBytes _ next) ms = ("< bytes", Nothing):runPure (next L.empty) ms +runNet (Relay _ _ next) ms = runPure (next ExitSuccess) ms +runNet (RelayService _ _ next) ms = runPure (next ExitSuccess) ms +runNet (WriteRelay _ _ next) ms = runPure next ms + +runLocal :: Show r => LocalF (Proto r) -> [Message] -> [(String, Maybe Message)] +runLocal (KeyFileSize _ next) ms = runPure (next (Len 100)) ms +runLocal (ReadKeyFile _ _ next) ms = runPure (next L.empty) ms +runLocal (WriteKeyFile _ _ _ _ next) ms = runPure (next True) ms +runLocal (CheckAuthToken _ _ next) ms = runPure (next True) ms +runLocal (SetPresent _ _ next) ms = runPure next ms +runLocal (CheckContentPresent _ next) ms = runPure (next False) ms +runLocal (RemoveKeyFile _ next) ms = runPure (next True) ms +runLocal (TryLockContent _ p next) ms = runPure (p True >> next) ms + +protoDump :: [(String, Maybe Message)] -> String +protoDump = unlines . map protoDump' + +protoDump' :: (String, Maybe Message) -> String +protoDump' (s, Nothing) = s +protoDump' (s, Just m) = s ++ " " ++ unwords (Proto.formatMessage m) + +auth :: UUID -> AuthToken -> Proto (Maybe UUID) +auth myuuid t = do + net $ sendMessage (AUTH myuuid t) + r <- net receiveMessage + case r of + AUTH_SUCCESS theiruuid -> return $ Just theiruuid + AUTH_FAILURE -> return Nothing + _ -> do + net $ sendMessage (ERROR "auth failed") + return Nothing + +checkPresent :: Key -> Proto Bool +checkPresent key = do + net $ sendMessage (CHECKPRESENT key) + checkSuccess + +{- Locks content to prevent it from being dropped, while running an action. + - + - Note that this only guarantees that the content is locked as long as the + - connection to the peer remains up. If the connection is unexpectededly + - dropped, the peer will then unlock the content. + -} +lockContentWhile + :: MonadMask m + => (forall r. Proto r -> m r) + -> Key + -> (Bool -> m ()) + -> m () +lockContentWhile runproto key a = bracket setup cleanup a + where + setup = runproto $ do + net $ sendMessage (LOCKCONTENT key) + checkSuccess + cleanup True = runproto $ net $ sendMessage UNLOCKCONTENT + cleanup False = return () + +remove :: Key -> Proto Bool +remove key = do + net $ sendMessage (REMOVE key) + checkSuccess + +get :: Key -> Proto Bool +get key = receiveContent key (`GET` key) + +put :: Key -> Proto Bool +put key = do + net $ sendMessage (PUT key) + r <- net receiveMessage + case r of + PUT_FROM offset -> sendContent key offset + ALREADY_HAVE -> return True + _ -> do + net $ sendMessage (ERROR "expected PUT_FROM") + return False + +-- | Serve the protocol. +-- +-- Note that if the client sends an unexpected message, the server will +-- respond with PTOTO_ERROR, and always continues processing messages. +-- Since the protocol is not versioned, this is necessary to handle +-- protocol changes robustly, since the client can detect when it's +-- talking to a server that does not support some new feature, and fall +-- back. +-- +-- When the client sends ERROR to the server, the server gives up, +-- since it's not clear what state the client is is, and so not possible to +-- recover. +serve :: UUID -> Proto () +serve myuuid = go Nothing + where + go autheduuid = do + r <- net receiveMessage + case r of + AUTH theiruuid authtoken -> do + ok <- local $ checkAuthToken theiruuid authtoken + if ok + then do + net $ sendMessage (AUTH_SUCCESS myuuid) + go (Just theiruuid) + else do + net $ sendMessage AUTH_FAILURE + go autheduuid + ERROR _ -> return () + _ -> do + case autheduuid of + Just theiruuid -> authed theiruuid r + Nothing -> net $ sendMessage (ERROR "must AUTH first") + go autheduuid + + authed _theiruuid r = case r of + LOCKCONTENT key -> local $ tryLockContent key $ \locked -> do + sendSuccess locked + when locked $ do + r' <- net receiveMessage + case r' of + UNLOCKCONTENT -> return () + _ -> net $ sendMessage (ERROR "expected UNLOCKCONTENT") + CHECKPRESENT key -> sendSuccess =<< local (checkContentPresent key) + REMOVE key -> sendSuccess =<< local (removeKeyFile key) + PUT key -> do + have <- local $ checkContentPresent key + if have + then net $ sendMessage ALREADY_HAVE + else do + ok <- receiveContent key PUT_FROM + when ok $ + local $ setPresent key myuuid + -- setPresent not called because the peer may have + -- requested the data but not permanatly stored it. + GET offset key -> void $ sendContent key offset + CONNECT service -> do + exitcode <- net $ relayService service relayCallback + net $ sendMessage (CONNECTDONE exitcode) + _ -> net $ sendMessage (ERROR "unexpected command") + +sendContent :: Key -> Offset -> Proto Bool +sendContent key offset = do + (len, content) <- readKeyFileLen key offset + net $ sendMessage (DATA len) + net $ sendBytes len content + checkSuccess + +receiveContent :: Key -> (Offset -> Message) -> Proto Bool +receiveContent key mkmsg = do + Len n <- local $ keyFileSize key + let offset = Offset n + net $ sendMessage (mkmsg offset) + r <- net receiveMessage + case r of + DATA len -> do + ok <- local . writeKeyFile key offset len + =<< net (receiveBytes len) + sendSuccess ok + return ok + _ -> do + net $ sendMessage (ERROR "expected DATA") + return False + +checkSuccess :: Proto Bool +checkSuccess = do + ack <- net receiveMessage + case ack of + SUCCESS -> return True + FAILURE -> return False + _ -> do + net $ sendMessage (ERROR "expected SUCCESS or FAILURE") + return False + +sendSuccess :: Bool -> Proto () +sendSuccess True = net $ sendMessage SUCCESS +sendSuccess False = net $ sendMessage FAILURE + +-- Reads key file from an offset. The Len should correspond to +-- the length of the ByteString, but to avoid buffering the content +-- in memory, is gotten using keyFileSize. +readKeyFileLen :: Key -> Offset -> Proto (Len, L.ByteString) +readKeyFileLen key (Offset offset) = do + (Len totallen) <- local $ keyFileSize key + let len = totallen - offset + if len <= 0 + then return (Len 0, L.empty) + else do + content <- local $ readKeyFile key (Offset offset) + return (Len len, content) + +connect :: Service -> Handle -> Handle -> Proto ExitCode +connect service hin hout = do + net $ sendMessage (CONNECT service) + net $ relay (RelayHandle hin) (relayCallback (RelayHandle hout)) + +relayCallback :: RelayHandle -> RelayData -> Net (Maybe ExitCode) +relayCallback hout (RelayMessage (DATA len)) = do + writeRelay hout =<< receiveBytes len + return Nothing +relayCallback _ (RelayMessage (CONNECTDONE exitcode)) = + return (Just exitcode) +relayCallback _ (RelayMessage _) = do + sendMessage (ERROR "expected DATA or CONNECTDONE") + return (Just (ExitFailure 1)) +relayCallback _ (RelayData b) = do + let len = Len $ fromIntegral $ L.length b + sendMessage (DATA len) + sendBytes len b + return Nothing diff --git a/Remote/Helper/P2P/IO.hs b/Remote/Helper/P2P/IO.hs new file mode 100644 index 0000000000..7179adc2b5 --- /dev/null +++ b/Remote/Helper/P2P/IO.hs @@ -0,0 +1,159 @@ +{- P2P protocol, partial IO implementation + - + - Copyright 2016 Joey Hess + - + - Licensed under the GNU GPL version 3 or higher. + -} + +{-# LANGUAGE RankNTypes #-} + +module Remote.Helper.P2P.IO + ( RunProto + , runProtoHandle + ) where + +import Remote.Helper.P2P +import Utility.Process +import Git +import Git.Command +import Utility.SafeCommand +import Utility.SimpleProtocol + +import Control.Monad +import Control.Monad.Free +import Control.Monad.IO.Class +import Data.Maybe +import System.Exit (ExitCode(..)) +import System.IO +import Control.Concurrent +import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as L + +type RunProto = forall a m. MonadIO m => Proto a -> m a + +data S = S + { repo :: Repo + , hdl :: Handle + } + +-- Implementation of the protocol, communicating with a peer +-- over a Handle. No Local actions will be run. +runProtoHandle :: MonadIO m => Handle -> Repo -> Proto a -> m a +runProtoHandle h r = go + where + go :: RunProto + go (Pure a) = pure a + go (Free (Net n)) = runNetHandle (S r h) go n + go (Free (Local _)) = error "local actions not allowed" + +runNetHandle :: MonadIO m => S -> RunProto -> NetF (Proto a) -> m a +runNetHandle s runner f = case f of + SendMessage m next -> do + liftIO $ do + hPutStrLn (hdl s) (unwords (formatMessage m)) + hFlush (hdl s) + runner next + ReceiveMessage next -> do + l <- liftIO $ hGetLine (hdl s) + let m = fromMaybe (ERROR "protocol parse error") + (parseMessage l) + runner (next m) + SendBytes _len b next -> do + liftIO $ do + L.hPut (hdl s) b + hFlush (hdl s) + runner next + ReceiveBytes (Len n) next -> do + b <- liftIO $ L.hGet (hdl s) (fromIntegral n) + runner (next b) + Relay hout callback next -> + runRelay runner hout callback >>= runner . next + RelayService service callback next -> + runRelayService s runner service callback >>= runner . next + WriteRelay (RelayHandle h) b next -> do + liftIO $ do + L.hPut h b + hFlush h + runner next + +runRelay + :: MonadIO m + => RunProto + -> RelayHandle + -> (RelayData -> Net (Maybe ExitCode)) + -> m ExitCode +runRelay runner (RelayHandle hout) callback = do + v <- liftIO newEmptyMVar + _ <- liftIO $ forkIO $ readout v + feeder <- liftIO $ forkIO $ feedin v + exitcode <- liftIO $ drain v + liftIO $ killThread feeder + return exitcode + where + feedin v = forever $ do + m <- runner $ net receiveMessage + putMVar v $ RelayMessage m + + readout v = do + b <- B.hGetSome hout 65536 + if B.null b + then hClose hout + else do + putMVar v $ RelayData (L.fromChunks [b]) + readout v + + drain v = do + d <- takeMVar v + r <- runner $ net $ callback d + case r of + Nothing -> drain v + Just exitcode -> return exitcode + +runRelayService + :: MonadIO m + => S + -> RunProto + -> Service + -> (RelayHandle -> RelayData -> Net (Maybe ExitCode)) + -> m ExitCode +runRelayService s runner service callback = do + v <- liftIO newEmptyMVar + (Just hin, Just hout, _, pid) <- liftIO $ createProcess serviceproc + { std_out = CreatePipe + , std_in = CreatePipe + } + _ <- liftIO $ forkIO $ readout v hout + feeder <- liftIO $ forkIO $ feedin v + _ <- liftIO $ forkIO $ putMVar v . Left =<< waitForProcess pid + exitcode <- liftIO $ drain v hin + liftIO $ killThread feeder + return exitcode + where + cmd = case service of + UploadPack -> "upload-pack" + ReceivePack -> "receive-pack" + serviceproc = gitCreateProcess [Param cmd, File (repoPath (repo s))] (repo s) + + drain v hin = do + d <- takeMVar v + case d of + Left exitcode -> do + hClose hin + return exitcode + Right relaydata -> do + _ <- runner $ net $ + callback (RelayHandle hin) relaydata + drain v hin + + readout v hout = do + b <- B.hGetSome hout 65536 + if B.null b + then hClose hout + else do + putMVar v $ Right $ + RelayData (L.fromChunks [b]) + readout v hout + + feedin v = forever $ do + m <- runner $ net receiveMessage + putMVar v $ Right $ RelayMessage m diff --git a/git-annex.cabal b/git-annex.cabal index 4fb4e1c3c0..77c50b66e4 100644 --- a/git-annex.cabal +++ b/git-annex.cabal @@ -920,6 +920,7 @@ Executable git-annex Remote.Helper.Http Remote.Helper.Messages Remote.Helper.P2P + Remote.Helper.P2P.IO Remote.Helper.ReadOnly Remote.Helper.Special Remote.Helper.Ssh