implement p2p protocol for Handle
This is most of the way to having the p2p protocol working over tor hidden services, at least enough to do git push/pull. The free monad was split into two, one for network operations and the other for local (Annex) operations. This will allow git-remote-tor-annex to run only an IO action, not needing the Annex monad. This commit was sponsored by Remy van Elst on Patreon.
This commit is contained in:
parent
0eaad7ca3a
commit
d50b0f3bb3
3 changed files with 455 additions and 277 deletions
|
@ -7,20 +7,7 @@
|
||||||
|
|
||||||
{-# LANGUAGE DeriveFunctor, TemplateHaskell, FlexibleContexts, RankNTypes #-}
|
{-# LANGUAGE DeriveFunctor, TemplateHaskell, FlexibleContexts, RankNTypes #-}
|
||||||
|
|
||||||
module Remote.Helper.P2P (
|
module Remote.Helper.P2P where
|
||||||
AuthToken(..),
|
|
||||||
ProtoF(..),
|
|
||||||
runPure,
|
|
||||||
protoDump,
|
|
||||||
auth,
|
|
||||||
checkPresent,
|
|
||||||
lockContentWhile,
|
|
||||||
remove,
|
|
||||||
get,
|
|
||||||
put,
|
|
||||||
connect,
|
|
||||||
serve,
|
|
||||||
) where
|
|
||||||
|
|
||||||
import qualified Utility.SimpleProtocol as Proto
|
import qualified Utility.SimpleProtocol as Proto
|
||||||
import Types.Key
|
import Types.Key
|
||||||
|
@ -33,7 +20,7 @@ import Control.Monad.Free
|
||||||
import Control.Monad.Free.TH
|
import Control.Monad.Free.TH
|
||||||
import Control.Monad.Catch
|
import Control.Monad.Catch
|
||||||
import System.Exit (ExitCode(..))
|
import System.Exit (ExitCode(..))
|
||||||
import System.IO (Handle)
|
import System.IO
|
||||||
import qualified Data.ByteString.Lazy as L
|
import qualified Data.ByteString.Lazy as L
|
||||||
|
|
||||||
newtype AuthToken = AuthToken String
|
newtype AuthToken = AuthToken String
|
||||||
|
@ -49,10 +36,6 @@ newtype Len = Len Integer
|
||||||
data Service = UploadPack | ReceivePack
|
data Service = UploadPack | ReceivePack
|
||||||
deriving (Show)
|
deriving (Show)
|
||||||
|
|
||||||
data RelayData
|
|
||||||
= RelayData L.ByteString
|
|
||||||
| RelayMessage Message
|
|
||||||
|
|
||||||
-- | Messages in the protocol. The peer that makes the connection
|
-- | Messages in the protocol. The peer that makes the connection
|
||||||
-- always initiates requests, and the other peer makes responses to them.
|
-- always initiates requests, and the other peer makes responses to them.
|
||||||
data Message
|
data Message
|
||||||
|
@ -75,264 +58,6 @@ data Message
|
||||||
| ERROR String
|
| ERROR String
|
||||||
deriving (Show)
|
deriving (Show)
|
||||||
|
|
||||||
-- | Free monad for implementing actions that use the protocol.
|
|
||||||
data ProtoF next
|
|
||||||
= SendMessage Message next
|
|
||||||
| ReceiveMessage (Message -> next)
|
|
||||||
| SendBytes Len L.ByteString next
|
|
||||||
| ReceiveBytes Len (L.ByteString -> next)
|
|
||||||
-- ^ Lazily reads bytes from peer. Stops once Len are read,
|
|
||||||
-- or if connection is lost, and in either case returns the bytes
|
|
||||||
-- that were read. This allows resuming interrupted transfers.
|
|
||||||
| KeyFileSize Key (Len -> next)
|
|
||||||
-- ^ Checks size of key file (dne = 0)
|
|
||||||
| ReadKeyFile Key Offset (L.ByteString -> next)
|
|
||||||
| WriteKeyFile Key Offset Len L.ByteString (Bool -> next)
|
|
||||||
-- ^ Writes to key file starting at an offset. Returns True
|
|
||||||
-- once the whole content of the key is stored in the key file.
|
|
||||||
--
|
|
||||||
-- Note: The ByteString may not contain the entire remaining content
|
|
||||||
-- of the key. Only once the key file size == Len has the whole
|
|
||||||
-- content been transferred.
|
|
||||||
| CheckAuthToken UUID AuthToken (Bool -> next)
|
|
||||||
| SetPresent Key UUID next
|
|
||||||
| CheckContentPresent Key (Bool -> next)
|
|
||||||
-- ^ Checks if the whole content of the key is locally present.
|
|
||||||
| RemoveKeyFile Key (Bool -> next)
|
|
||||||
-- ^ If the key file is not present, still succeeds.
|
|
||||||
-- May fail if not enough copies to safely drop, etc.
|
|
||||||
| TryLockContent Key (Bool -> Proto ()) next
|
|
||||||
| WriteHandle Handle L.ByteString next
|
|
||||||
-- ^ Try to lock the content of a key, preventing it
|
|
||||||
-- from being deleted, and run the provided protocol action.
|
|
||||||
| Relay Handle (RelayData -> Proto (Maybe ExitCode)) (ExitCode -> next)
|
|
||||||
-- ^ Waits for data to be written to the Handle, and for messages
|
|
||||||
-- to be received from the peer, and passes the data to the
|
|
||||||
-- callback, continuing until it returns an ExitCode.
|
|
||||||
| RelayService Service
|
|
||||||
(Handle -> RelayData -> Proto (Maybe ExitCode))
|
|
||||||
(ExitCode -> next)
|
|
||||||
-- ^ Runs a service, and waits for it to output to stdout,
|
|
||||||
-- and for messages to be received from the peer, and passes
|
|
||||||
-- the data to the callback (which is also passed the service's
|
|
||||||
-- stdin Handle), continuing uniil the service exits.
|
|
||||||
deriving (Functor)
|
|
||||||
|
|
||||||
type Proto = Free ProtoF
|
|
||||||
|
|
||||||
$(makeFree ''ProtoF)
|
|
||||||
|
|
||||||
-- | Running Proto actions purely, to see what they do.
|
|
||||||
runPure :: Show r => Proto r -> [Message] -> [(String, Maybe Message)]
|
|
||||||
runPure (Pure r) _ = [("result: " ++ show r, Nothing)]
|
|
||||||
runPure (Free (SendMessage m next)) ms = (">", Just m):runPure next ms
|
|
||||||
runPure (Free (ReceiveMessage _)) [] = [("not enough Messages provided", Nothing)]
|
|
||||||
runPure (Free (ReceiveMessage next)) (m:ms) = ("<", Just m):runPure (next m) ms
|
|
||||||
runPure (Free (SendBytes _ _ next)) ms = ("> bytes", Nothing):runPure next ms
|
|
||||||
runPure (Free (ReceiveBytes _ next)) ms = ("< bytes", Nothing):runPure (next L.empty) ms
|
|
||||||
runPure (Free (KeyFileSize _ next)) ms = runPure (next (Len 100)) ms
|
|
||||||
runPure (Free (ReadKeyFile _ _ next)) ms = runPure (next L.empty) ms
|
|
||||||
runPure (Free (WriteKeyFile _ _ _ _ next)) ms = runPure (next True) ms
|
|
||||||
runPure (Free (CheckAuthToken _ _ next)) ms = runPure (next True) ms
|
|
||||||
runPure (Free (SetPresent _ _ next)) ms = runPure next ms
|
|
||||||
runPure (Free (CheckContentPresent _ next)) ms = runPure (next False) ms
|
|
||||||
runPure (Free (RemoveKeyFile _ next)) ms = runPure (next True) ms
|
|
||||||
runPure (Free (TryLockContent _ p next)) ms = runPure (p True >> next) ms
|
|
||||||
runPure (Free (WriteHandle _ _ next)) ms = runPure next ms
|
|
||||||
runPure (Free (Relay _ _ next)) ms = runPure (next ExitSuccess) ms
|
|
||||||
runPure (Free (RelayService _ _ next)) ms = runPure (next ExitSuccess) ms
|
|
||||||
|
|
||||||
protoDump :: [(String, Maybe Message)] -> String
|
|
||||||
protoDump = unlines . map protoDump'
|
|
||||||
|
|
||||||
protoDump' :: (String, Maybe Message) -> String
|
|
||||||
protoDump' (s, Nothing) = s
|
|
||||||
protoDump' (s, Just m) = s ++ " " ++ unwords (Proto.formatMessage m)
|
|
||||||
|
|
||||||
auth :: UUID -> AuthToken -> Proto (Maybe UUID)
|
|
||||||
auth myuuid t = do
|
|
||||||
sendMessage (AUTH myuuid t)
|
|
||||||
r <- receiveMessage
|
|
||||||
case r of
|
|
||||||
AUTH_SUCCESS theiruuid -> return $ Just theiruuid
|
|
||||||
AUTH_FAILURE -> return Nothing
|
|
||||||
_ -> do
|
|
||||||
sendMessage (ERROR "auth failed")
|
|
||||||
return Nothing
|
|
||||||
|
|
||||||
checkPresent :: Key -> Proto Bool
|
|
||||||
checkPresent key = do
|
|
||||||
sendMessage (CHECKPRESENT key)
|
|
||||||
checkSuccess
|
|
||||||
|
|
||||||
{- Locks content to prevent it from being dropped, while running an action.
|
|
||||||
-
|
|
||||||
- Note that this only guarantees that the content is locked as long as the
|
|
||||||
- connection to the peer remains up. If the connection is unexpectededly
|
|
||||||
- dropped, the peer will then unlock the content.
|
|
||||||
-}
|
|
||||||
lockContentWhile
|
|
||||||
:: MonadMask m
|
|
||||||
=> (forall r. Proto r -> m r)
|
|
||||||
-> Key
|
|
||||||
-> (Bool -> m ())
|
|
||||||
-> m ()
|
|
||||||
lockContentWhile runproto key a = bracket setup cleanup a
|
|
||||||
where
|
|
||||||
setup = runproto $ do
|
|
||||||
sendMessage (LOCKCONTENT key)
|
|
||||||
checkSuccess
|
|
||||||
cleanup True = runproto $ sendMessage UNLOCKCONTENT
|
|
||||||
cleanup False = return ()
|
|
||||||
|
|
||||||
remove :: Key -> Proto Bool
|
|
||||||
remove key = do
|
|
||||||
sendMessage (REMOVE key)
|
|
||||||
checkSuccess
|
|
||||||
|
|
||||||
get :: Key -> Proto Bool
|
|
||||||
get key = receiveContent key (`GET` key)
|
|
||||||
|
|
||||||
put :: Key -> Proto Bool
|
|
||||||
put key = do
|
|
||||||
sendMessage (PUT key)
|
|
||||||
r <- receiveMessage
|
|
||||||
case r of
|
|
||||||
PUT_FROM offset -> sendContent key offset
|
|
||||||
ALREADY_HAVE -> return True
|
|
||||||
_ -> do
|
|
||||||
sendMessage (ERROR "expected PUT_FROM")
|
|
||||||
return False
|
|
||||||
|
|
||||||
connect :: Service -> Handle -> Handle -> Proto ExitCode
|
|
||||||
connect service hin hout = do
|
|
||||||
sendMessage (CONNECT service)
|
|
||||||
relay hin (relayCallback hout)
|
|
||||||
|
|
||||||
relayCallback :: Handle -> RelayData -> Proto (Maybe ExitCode)
|
|
||||||
relayCallback hout (RelayMessage (DATA len)) = do
|
|
||||||
writeHandle hout =<< receiveBytes len
|
|
||||||
return Nothing
|
|
||||||
relayCallback _ (RelayMessage (CONNECTDONE exitcode)) =
|
|
||||||
return (Just exitcode)
|
|
||||||
relayCallback _ (RelayMessage _) = do
|
|
||||||
sendMessage (ERROR "expected DATA or CONNECTDONE")
|
|
||||||
return (Just (ExitFailure 1))
|
|
||||||
relayCallback _ (RelayData b) = do
|
|
||||||
let len = Len $ fromIntegral $ L.length b
|
|
||||||
sendMessage (DATA len)
|
|
||||||
sendBytes len b
|
|
||||||
return Nothing
|
|
||||||
|
|
||||||
-- | Serve the protocol.
|
|
||||||
--
|
|
||||||
-- Note that if the client sends an unexpected message, the server will
|
|
||||||
-- respond with PTOTO_ERROR, and always continues processing messages.
|
|
||||||
-- Since the protocol is not versioned, this is necessary to handle
|
|
||||||
-- protocol changes robustly, since the client can detect when it's
|
|
||||||
-- talking to a server that does not support some new feature, and fall
|
|
||||||
-- back.
|
|
||||||
--
|
|
||||||
-- When the client sends ERROR to the server, the server gives up,
|
|
||||||
-- since it's not clear what state the client is is, and so not possible to
|
|
||||||
-- recover.
|
|
||||||
serve :: UUID -> Proto ()
|
|
||||||
serve myuuid = go Nothing
|
|
||||||
where
|
|
||||||
go autheduuid = do
|
|
||||||
r <- receiveMessage
|
|
||||||
case r of
|
|
||||||
AUTH theiruuid authtoken -> do
|
|
||||||
ok <- checkAuthToken theiruuid authtoken
|
|
||||||
if ok
|
|
||||||
then do
|
|
||||||
sendMessage (AUTH_SUCCESS myuuid)
|
|
||||||
go (Just theiruuid)
|
|
||||||
else do
|
|
||||||
sendMessage AUTH_FAILURE
|
|
||||||
go autheduuid
|
|
||||||
ERROR _ -> return ()
|
|
||||||
_ -> do
|
|
||||||
case autheduuid of
|
|
||||||
Just theiruuid -> authed theiruuid r
|
|
||||||
Nothing -> sendMessage (ERROR "must AUTH first")
|
|
||||||
go autheduuid
|
|
||||||
|
|
||||||
authed _theiruuid r = case r of
|
|
||||||
LOCKCONTENT key -> tryLockContent key $ \locked -> do
|
|
||||||
sendSuccess locked
|
|
||||||
when locked $ do
|
|
||||||
r' <- receiveMessage
|
|
||||||
case r' of
|
|
||||||
UNLOCKCONTENT -> return ()
|
|
||||||
_ -> sendMessage (ERROR "expected UNLOCKCONTENT")
|
|
||||||
CHECKPRESENT key -> sendSuccess =<< checkContentPresent key
|
|
||||||
REMOVE key -> sendSuccess =<< removeKeyFile key
|
|
||||||
PUT key -> do
|
|
||||||
have <- checkContentPresent key
|
|
||||||
if have
|
|
||||||
then sendMessage ALREADY_HAVE
|
|
||||||
else do
|
|
||||||
ok <- receiveContent key PUT_FROM
|
|
||||||
when ok $
|
|
||||||
setPresent key myuuid
|
|
||||||
-- setPresent not called because the peer may have
|
|
||||||
-- requested the data but not permanatly stored it.
|
|
||||||
GET offset key -> void $ sendContent key offset
|
|
||||||
CONNECT service -> do
|
|
||||||
exitcode <- relayService service relayCallback
|
|
||||||
sendMessage (CONNECTDONE exitcode)
|
|
||||||
_ -> sendMessage (ERROR "unexpected command")
|
|
||||||
|
|
||||||
sendContent :: Key -> Offset -> Proto Bool
|
|
||||||
sendContent key offset = do
|
|
||||||
(len, content) <- readKeyFileLen key offset
|
|
||||||
sendMessage (DATA len)
|
|
||||||
sendBytes len content
|
|
||||||
checkSuccess
|
|
||||||
|
|
||||||
receiveContent :: Key -> (Offset -> Message) -> Proto Bool
|
|
||||||
receiveContent key mkmsg = do
|
|
||||||
Len n <- keyFileSize key
|
|
||||||
let offset = Offset n
|
|
||||||
sendMessage (mkmsg offset)
|
|
||||||
r <- receiveMessage
|
|
||||||
case r of
|
|
||||||
DATA len -> do
|
|
||||||
ok <- writeKeyFile key offset len =<< receiveBytes len
|
|
||||||
sendSuccess ok
|
|
||||||
return ok
|
|
||||||
_ -> do
|
|
||||||
sendMessage (ERROR "expected DATA")
|
|
||||||
return False
|
|
||||||
|
|
||||||
checkSuccess :: Proto Bool
|
|
||||||
checkSuccess = do
|
|
||||||
ack <- receiveMessage
|
|
||||||
case ack of
|
|
||||||
SUCCESS -> return True
|
|
||||||
FAILURE -> return False
|
|
||||||
_ -> do
|
|
||||||
sendMessage (ERROR "expected SUCCESS or FAILURE")
|
|
||||||
return False
|
|
||||||
|
|
||||||
sendSuccess :: Bool -> Proto ()
|
|
||||||
sendSuccess True = sendMessage SUCCESS
|
|
||||||
sendSuccess False = sendMessage FAILURE
|
|
||||||
|
|
||||||
-- Reads key file from an offset. The Len should correspond to
|
|
||||||
-- the length of the ByteString, but to avoid buffering the content
|
|
||||||
-- in memory, is gotten using keyFileSize.
|
|
||||||
readKeyFileLen :: Key -> Offset -> Proto (Len, L.ByteString)
|
|
||||||
readKeyFileLen key (Offset offset) = do
|
|
||||||
(Len totallen) <- keyFileSize key
|
|
||||||
let len = totallen - offset
|
|
||||||
if len <= 0
|
|
||||||
then return (Len 0, L.empty)
|
|
||||||
else do
|
|
||||||
content <- readKeyFile key (Offset offset)
|
|
||||||
return (Len len, content)
|
|
||||||
|
|
||||||
instance Proto.Sendable Message where
|
instance Proto.Sendable Message where
|
||||||
formatMessage (AUTH uuid authtoken) = ["AUTH", Proto.serialize uuid, Proto.serialize authtoken]
|
formatMessage (AUTH uuid authtoken) = ["AUTH", Proto.serialize uuid, Proto.serialize authtoken]
|
||||||
formatMessage (AUTH_SUCCESS uuid) = ["AUTH-SUCCESS", Proto.serialize uuid]
|
formatMessage (AUTH_SUCCESS uuid) = ["AUTH-SUCCESS", Proto.serialize uuid]
|
||||||
|
@ -390,3 +115,296 @@ instance Proto.Serializable Service where
|
||||||
deserialize "git-upload-pack" = Just UploadPack
|
deserialize "git-upload-pack" = Just UploadPack
|
||||||
deserialize "git-receive-pack" = Just ReceivePack
|
deserialize "git-receive-pack" = Just ReceivePack
|
||||||
deserialize _ = Nothing
|
deserialize _ = Nothing
|
||||||
|
|
||||||
|
-- | Free monad for the protocol, combining net communication,
|
||||||
|
-- and local actions.
|
||||||
|
data ProtoF c = Net (NetF c) | Local (LocalF c)
|
||||||
|
deriving (Functor)
|
||||||
|
|
||||||
|
type Proto = Free ProtoF
|
||||||
|
|
||||||
|
net :: Net a -> Proto a
|
||||||
|
net = hoistFree Net
|
||||||
|
|
||||||
|
local :: Local a -> Proto a
|
||||||
|
local = hoistFree Local
|
||||||
|
|
||||||
|
data NetF c
|
||||||
|
= SendMessage Message c
|
||||||
|
| ReceiveMessage (Message -> c)
|
||||||
|
| SendBytes Len L.ByteString c
|
||||||
|
| ReceiveBytes Len (L.ByteString -> c)
|
||||||
|
| Relay RelayHandle
|
||||||
|
(RelayData -> Net (Maybe ExitCode))
|
||||||
|
(ExitCode -> c)
|
||||||
|
-- ^ Waits for data to be written to the RelayHandle, and for messages
|
||||||
|
-- to be received from the peer, and passes the data to the
|
||||||
|
-- callback, continuing until it returns an ExitCode.
|
||||||
|
| RelayService Service
|
||||||
|
(RelayHandle -> RelayData -> Net (Maybe ExitCode))
|
||||||
|
(ExitCode -> c)
|
||||||
|
-- ^ Runs a service, and waits for it to output to stdout,
|
||||||
|
-- and for messages to be received from the peer, and passes
|
||||||
|
-- the data to the callback (which is also passed the service's
|
||||||
|
-- stdin RelayHandle), continuing uniil the service exits.
|
||||||
|
| WriteRelay RelayHandle L.ByteString c
|
||||||
|
-- ^ Write data to a relay's handle, flushing it immediately.
|
||||||
|
deriving (Functor)
|
||||||
|
|
||||||
|
type Net = Free NetF
|
||||||
|
|
||||||
|
data RelayData
|
||||||
|
= RelayData L.ByteString
|
||||||
|
| RelayMessage Message
|
||||||
|
|
||||||
|
newtype RelayHandle = RelayHandle Handle
|
||||||
|
|
||||||
|
data LocalF c
|
||||||
|
-- ^ Lazily reads bytes from peer. Stops once Len are read,
|
||||||
|
-- or if connection is lost, and in either case returns the bytes
|
||||||
|
-- that were read. This allows resuming interrupted transfers.
|
||||||
|
= KeyFileSize Key (Len -> c)
|
||||||
|
-- ^ Checks size of key file (dne = 0)
|
||||||
|
| ReadKeyFile Key Offset (L.ByteString -> c)
|
||||||
|
| WriteKeyFile Key Offset Len L.ByteString (Bool -> c)
|
||||||
|
-- ^ Writes to key file starting at an offset. Returns True
|
||||||
|
-- once the whole content of the key is stored in the key file.
|
||||||
|
--
|
||||||
|
-- Note: The ByteString may not contain the entire remaining content
|
||||||
|
-- of the key. Only once the key file size == Len has the whole
|
||||||
|
-- content been transferred.
|
||||||
|
| CheckAuthToken UUID AuthToken (Bool -> c)
|
||||||
|
| SetPresent Key UUID c
|
||||||
|
| CheckContentPresent Key (Bool -> c)
|
||||||
|
-- ^ Checks if the whole content of the key is locally present.
|
||||||
|
| RemoveKeyFile Key (Bool -> c)
|
||||||
|
-- ^ If the key file is not present, still succeeds.
|
||||||
|
-- May fail if not enough copies to safely drop, etc.
|
||||||
|
| TryLockContent Key (Bool -> Proto ()) c
|
||||||
|
-- ^ Try to lock the content of a key, preventing it
|
||||||
|
-- from being deleted, and run the provided protocol action.
|
||||||
|
deriving (Functor)
|
||||||
|
|
||||||
|
type Local = Free LocalF
|
||||||
|
|
||||||
|
-- Generate sendMessage etc functions for all free monad constructors.
|
||||||
|
$(makeFree ''NetF)
|
||||||
|
$(makeFree ''LocalF)
|
||||||
|
|
||||||
|
-- | Running Proto actions purely, to see what they do.
|
||||||
|
runPure :: Show r => Proto r -> [Message] -> [(String, Maybe Message)]
|
||||||
|
runPure (Pure r) _ = [("result: " ++ show r, Nothing)]
|
||||||
|
runPure (Free (Net n)) ms = runNet n ms
|
||||||
|
runPure (Free (Local n)) ms = runLocal n ms
|
||||||
|
|
||||||
|
runNet :: Show r => NetF (Proto r) -> [Message] -> [(String, Maybe Message)]
|
||||||
|
runNet (SendMessage m next) ms = (">", Just m):runPure next ms
|
||||||
|
runNet (ReceiveMessage _) [] = [("not enough Messages provided", Nothing)]
|
||||||
|
runNet (ReceiveMessage next) (m:ms) = ("<", Just m):runPure (next m) ms
|
||||||
|
runNet (SendBytes _ _ next) ms = ("> bytes", Nothing):runPure next ms
|
||||||
|
runNet (ReceiveBytes _ next) ms = ("< bytes", Nothing):runPure (next L.empty) ms
|
||||||
|
runNet (Relay _ _ next) ms = runPure (next ExitSuccess) ms
|
||||||
|
runNet (RelayService _ _ next) ms = runPure (next ExitSuccess) ms
|
||||||
|
runNet (WriteRelay _ _ next) ms = runPure next ms
|
||||||
|
|
||||||
|
runLocal :: Show r => LocalF (Proto r) -> [Message] -> [(String, Maybe Message)]
|
||||||
|
runLocal (KeyFileSize _ next) ms = runPure (next (Len 100)) ms
|
||||||
|
runLocal (ReadKeyFile _ _ next) ms = runPure (next L.empty) ms
|
||||||
|
runLocal (WriteKeyFile _ _ _ _ next) ms = runPure (next True) ms
|
||||||
|
runLocal (CheckAuthToken _ _ next) ms = runPure (next True) ms
|
||||||
|
runLocal (SetPresent _ _ next) ms = runPure next ms
|
||||||
|
runLocal (CheckContentPresent _ next) ms = runPure (next False) ms
|
||||||
|
runLocal (RemoveKeyFile _ next) ms = runPure (next True) ms
|
||||||
|
runLocal (TryLockContent _ p next) ms = runPure (p True >> next) ms
|
||||||
|
|
||||||
|
protoDump :: [(String, Maybe Message)] -> String
|
||||||
|
protoDump = unlines . map protoDump'
|
||||||
|
|
||||||
|
protoDump' :: (String, Maybe Message) -> String
|
||||||
|
protoDump' (s, Nothing) = s
|
||||||
|
protoDump' (s, Just m) = s ++ " " ++ unwords (Proto.formatMessage m)
|
||||||
|
|
||||||
|
auth :: UUID -> AuthToken -> Proto (Maybe UUID)
|
||||||
|
auth myuuid t = do
|
||||||
|
net $ sendMessage (AUTH myuuid t)
|
||||||
|
r <- net receiveMessage
|
||||||
|
case r of
|
||||||
|
AUTH_SUCCESS theiruuid -> return $ Just theiruuid
|
||||||
|
AUTH_FAILURE -> return Nothing
|
||||||
|
_ -> do
|
||||||
|
net $ sendMessage (ERROR "auth failed")
|
||||||
|
return Nothing
|
||||||
|
|
||||||
|
checkPresent :: Key -> Proto Bool
|
||||||
|
checkPresent key = do
|
||||||
|
net $ sendMessage (CHECKPRESENT key)
|
||||||
|
checkSuccess
|
||||||
|
|
||||||
|
{- Locks content to prevent it from being dropped, while running an action.
|
||||||
|
-
|
||||||
|
- Note that this only guarantees that the content is locked as long as the
|
||||||
|
- connection to the peer remains up. If the connection is unexpectededly
|
||||||
|
- dropped, the peer will then unlock the content.
|
||||||
|
-}
|
||||||
|
lockContentWhile
|
||||||
|
:: MonadMask m
|
||||||
|
=> (forall r. Proto r -> m r)
|
||||||
|
-> Key
|
||||||
|
-> (Bool -> m ())
|
||||||
|
-> m ()
|
||||||
|
lockContentWhile runproto key a = bracket setup cleanup a
|
||||||
|
where
|
||||||
|
setup = runproto $ do
|
||||||
|
net $ sendMessage (LOCKCONTENT key)
|
||||||
|
checkSuccess
|
||||||
|
cleanup True = runproto $ net $ sendMessage UNLOCKCONTENT
|
||||||
|
cleanup False = return ()
|
||||||
|
|
||||||
|
remove :: Key -> Proto Bool
|
||||||
|
remove key = do
|
||||||
|
net $ sendMessage (REMOVE key)
|
||||||
|
checkSuccess
|
||||||
|
|
||||||
|
get :: Key -> Proto Bool
|
||||||
|
get key = receiveContent key (`GET` key)
|
||||||
|
|
||||||
|
put :: Key -> Proto Bool
|
||||||
|
put key = do
|
||||||
|
net $ sendMessage (PUT key)
|
||||||
|
r <- net receiveMessage
|
||||||
|
case r of
|
||||||
|
PUT_FROM offset -> sendContent key offset
|
||||||
|
ALREADY_HAVE -> return True
|
||||||
|
_ -> do
|
||||||
|
net $ sendMessage (ERROR "expected PUT_FROM")
|
||||||
|
return False
|
||||||
|
|
||||||
|
-- | Serve the protocol.
|
||||||
|
--
|
||||||
|
-- Note that if the client sends an unexpected message, the server will
|
||||||
|
-- respond with PTOTO_ERROR, and always continues processing messages.
|
||||||
|
-- Since the protocol is not versioned, this is necessary to handle
|
||||||
|
-- protocol changes robustly, since the client can detect when it's
|
||||||
|
-- talking to a server that does not support some new feature, and fall
|
||||||
|
-- back.
|
||||||
|
--
|
||||||
|
-- When the client sends ERROR to the server, the server gives up,
|
||||||
|
-- since it's not clear what state the client is is, and so not possible to
|
||||||
|
-- recover.
|
||||||
|
serve :: UUID -> Proto ()
|
||||||
|
serve myuuid = go Nothing
|
||||||
|
where
|
||||||
|
go autheduuid = do
|
||||||
|
r <- net receiveMessage
|
||||||
|
case r of
|
||||||
|
AUTH theiruuid authtoken -> do
|
||||||
|
ok <- local $ checkAuthToken theiruuid authtoken
|
||||||
|
if ok
|
||||||
|
then do
|
||||||
|
net $ sendMessage (AUTH_SUCCESS myuuid)
|
||||||
|
go (Just theiruuid)
|
||||||
|
else do
|
||||||
|
net $ sendMessage AUTH_FAILURE
|
||||||
|
go autheduuid
|
||||||
|
ERROR _ -> return ()
|
||||||
|
_ -> do
|
||||||
|
case autheduuid of
|
||||||
|
Just theiruuid -> authed theiruuid r
|
||||||
|
Nothing -> net $ sendMessage (ERROR "must AUTH first")
|
||||||
|
go autheduuid
|
||||||
|
|
||||||
|
authed _theiruuid r = case r of
|
||||||
|
LOCKCONTENT key -> local $ tryLockContent key $ \locked -> do
|
||||||
|
sendSuccess locked
|
||||||
|
when locked $ do
|
||||||
|
r' <- net receiveMessage
|
||||||
|
case r' of
|
||||||
|
UNLOCKCONTENT -> return ()
|
||||||
|
_ -> net $ sendMessage (ERROR "expected UNLOCKCONTENT")
|
||||||
|
CHECKPRESENT key -> sendSuccess =<< local (checkContentPresent key)
|
||||||
|
REMOVE key -> sendSuccess =<< local (removeKeyFile key)
|
||||||
|
PUT key -> do
|
||||||
|
have <- local $ checkContentPresent key
|
||||||
|
if have
|
||||||
|
then net $ sendMessage ALREADY_HAVE
|
||||||
|
else do
|
||||||
|
ok <- receiveContent key PUT_FROM
|
||||||
|
when ok $
|
||||||
|
local $ setPresent key myuuid
|
||||||
|
-- setPresent not called because the peer may have
|
||||||
|
-- requested the data but not permanatly stored it.
|
||||||
|
GET offset key -> void $ sendContent key offset
|
||||||
|
CONNECT service -> do
|
||||||
|
exitcode <- net $ relayService service relayCallback
|
||||||
|
net $ sendMessage (CONNECTDONE exitcode)
|
||||||
|
_ -> net $ sendMessage (ERROR "unexpected command")
|
||||||
|
|
||||||
|
sendContent :: Key -> Offset -> Proto Bool
|
||||||
|
sendContent key offset = do
|
||||||
|
(len, content) <- readKeyFileLen key offset
|
||||||
|
net $ sendMessage (DATA len)
|
||||||
|
net $ sendBytes len content
|
||||||
|
checkSuccess
|
||||||
|
|
||||||
|
receiveContent :: Key -> (Offset -> Message) -> Proto Bool
|
||||||
|
receiveContent key mkmsg = do
|
||||||
|
Len n <- local $ keyFileSize key
|
||||||
|
let offset = Offset n
|
||||||
|
net $ sendMessage (mkmsg offset)
|
||||||
|
r <- net receiveMessage
|
||||||
|
case r of
|
||||||
|
DATA len -> do
|
||||||
|
ok <- local . writeKeyFile key offset len
|
||||||
|
=<< net (receiveBytes len)
|
||||||
|
sendSuccess ok
|
||||||
|
return ok
|
||||||
|
_ -> do
|
||||||
|
net $ sendMessage (ERROR "expected DATA")
|
||||||
|
return False
|
||||||
|
|
||||||
|
checkSuccess :: Proto Bool
|
||||||
|
checkSuccess = do
|
||||||
|
ack <- net receiveMessage
|
||||||
|
case ack of
|
||||||
|
SUCCESS -> return True
|
||||||
|
FAILURE -> return False
|
||||||
|
_ -> do
|
||||||
|
net $ sendMessage (ERROR "expected SUCCESS or FAILURE")
|
||||||
|
return False
|
||||||
|
|
||||||
|
sendSuccess :: Bool -> Proto ()
|
||||||
|
sendSuccess True = net $ sendMessage SUCCESS
|
||||||
|
sendSuccess False = net $ sendMessage FAILURE
|
||||||
|
|
||||||
|
-- Reads key file from an offset. The Len should correspond to
|
||||||
|
-- the length of the ByteString, but to avoid buffering the content
|
||||||
|
-- in memory, is gotten using keyFileSize.
|
||||||
|
readKeyFileLen :: Key -> Offset -> Proto (Len, L.ByteString)
|
||||||
|
readKeyFileLen key (Offset offset) = do
|
||||||
|
(Len totallen) <- local $ keyFileSize key
|
||||||
|
let len = totallen - offset
|
||||||
|
if len <= 0
|
||||||
|
then return (Len 0, L.empty)
|
||||||
|
else do
|
||||||
|
content <- local $ readKeyFile key (Offset offset)
|
||||||
|
return (Len len, content)
|
||||||
|
|
||||||
|
connect :: Service -> Handle -> Handle -> Proto ExitCode
|
||||||
|
connect service hin hout = do
|
||||||
|
net $ sendMessage (CONNECT service)
|
||||||
|
net $ relay (RelayHandle hin) (relayCallback (RelayHandle hout))
|
||||||
|
|
||||||
|
relayCallback :: RelayHandle -> RelayData -> Net (Maybe ExitCode)
|
||||||
|
relayCallback hout (RelayMessage (DATA len)) = do
|
||||||
|
writeRelay hout =<< receiveBytes len
|
||||||
|
return Nothing
|
||||||
|
relayCallback _ (RelayMessage (CONNECTDONE exitcode)) =
|
||||||
|
return (Just exitcode)
|
||||||
|
relayCallback _ (RelayMessage _) = do
|
||||||
|
sendMessage (ERROR "expected DATA or CONNECTDONE")
|
||||||
|
return (Just (ExitFailure 1))
|
||||||
|
relayCallback _ (RelayData b) = do
|
||||||
|
let len = Len $ fromIntegral $ L.length b
|
||||||
|
sendMessage (DATA len)
|
||||||
|
sendBytes len b
|
||||||
|
return Nothing
|
||||||
|
|
159
Remote/Helper/P2P/IO.hs
Normal file
159
Remote/Helper/P2P/IO.hs
Normal file
|
@ -0,0 +1,159 @@
|
||||||
|
{- P2P protocol, partial IO implementation
|
||||||
|
-
|
||||||
|
- Copyright 2016 Joey Hess <id@joeyh.name>
|
||||||
|
-
|
||||||
|
- Licensed under the GNU GPL version 3 or higher.
|
||||||
|
-}
|
||||||
|
|
||||||
|
{-# LANGUAGE RankNTypes #-}
|
||||||
|
|
||||||
|
module Remote.Helper.P2P.IO
|
||||||
|
( RunProto
|
||||||
|
, runProtoHandle
|
||||||
|
) where
|
||||||
|
|
||||||
|
import Remote.Helper.P2P
|
||||||
|
import Utility.Process
|
||||||
|
import Git
|
||||||
|
import Git.Command
|
||||||
|
import Utility.SafeCommand
|
||||||
|
import Utility.SimpleProtocol
|
||||||
|
|
||||||
|
import Control.Monad
|
||||||
|
import Control.Monad.Free
|
||||||
|
import Control.Monad.IO.Class
|
||||||
|
import Data.Maybe
|
||||||
|
import System.Exit (ExitCode(..))
|
||||||
|
import System.IO
|
||||||
|
import Control.Concurrent
|
||||||
|
import qualified Data.ByteString as B
|
||||||
|
import qualified Data.ByteString.Lazy as L
|
||||||
|
|
||||||
|
type RunProto = forall a m. MonadIO m => Proto a -> m a
|
||||||
|
|
||||||
|
data S = S
|
||||||
|
{ repo :: Repo
|
||||||
|
, hdl :: Handle
|
||||||
|
}
|
||||||
|
|
||||||
|
-- Implementation of the protocol, communicating with a peer
|
||||||
|
-- over a Handle. No Local actions will be run.
|
||||||
|
runProtoHandle :: MonadIO m => Handle -> Repo -> Proto a -> m a
|
||||||
|
runProtoHandle h r = go
|
||||||
|
where
|
||||||
|
go :: RunProto
|
||||||
|
go (Pure a) = pure a
|
||||||
|
go (Free (Net n)) = runNetHandle (S r h) go n
|
||||||
|
go (Free (Local _)) = error "local actions not allowed"
|
||||||
|
|
||||||
|
runNetHandle :: MonadIO m => S -> RunProto -> NetF (Proto a) -> m a
|
||||||
|
runNetHandle s runner f = case f of
|
||||||
|
SendMessage m next -> do
|
||||||
|
liftIO $ do
|
||||||
|
hPutStrLn (hdl s) (unwords (formatMessage m))
|
||||||
|
hFlush (hdl s)
|
||||||
|
runner next
|
||||||
|
ReceiveMessage next -> do
|
||||||
|
l <- liftIO $ hGetLine (hdl s)
|
||||||
|
let m = fromMaybe (ERROR "protocol parse error")
|
||||||
|
(parseMessage l)
|
||||||
|
runner (next m)
|
||||||
|
SendBytes _len b next -> do
|
||||||
|
liftIO $ do
|
||||||
|
L.hPut (hdl s) b
|
||||||
|
hFlush (hdl s)
|
||||||
|
runner next
|
||||||
|
ReceiveBytes (Len n) next -> do
|
||||||
|
b <- liftIO $ L.hGet (hdl s) (fromIntegral n)
|
||||||
|
runner (next b)
|
||||||
|
Relay hout callback next ->
|
||||||
|
runRelay runner hout callback >>= runner . next
|
||||||
|
RelayService service callback next ->
|
||||||
|
runRelayService s runner service callback >>= runner . next
|
||||||
|
WriteRelay (RelayHandle h) b next -> do
|
||||||
|
liftIO $ do
|
||||||
|
L.hPut h b
|
||||||
|
hFlush h
|
||||||
|
runner next
|
||||||
|
|
||||||
|
runRelay
|
||||||
|
:: MonadIO m
|
||||||
|
=> RunProto
|
||||||
|
-> RelayHandle
|
||||||
|
-> (RelayData -> Net (Maybe ExitCode))
|
||||||
|
-> m ExitCode
|
||||||
|
runRelay runner (RelayHandle hout) callback = do
|
||||||
|
v <- liftIO newEmptyMVar
|
||||||
|
_ <- liftIO $ forkIO $ readout v
|
||||||
|
feeder <- liftIO $ forkIO $ feedin v
|
||||||
|
exitcode <- liftIO $ drain v
|
||||||
|
liftIO $ killThread feeder
|
||||||
|
return exitcode
|
||||||
|
where
|
||||||
|
feedin v = forever $ do
|
||||||
|
m <- runner $ net receiveMessage
|
||||||
|
putMVar v $ RelayMessage m
|
||||||
|
|
||||||
|
readout v = do
|
||||||
|
b <- B.hGetSome hout 65536
|
||||||
|
if B.null b
|
||||||
|
then hClose hout
|
||||||
|
else do
|
||||||
|
putMVar v $ RelayData (L.fromChunks [b])
|
||||||
|
readout v
|
||||||
|
|
||||||
|
drain v = do
|
||||||
|
d <- takeMVar v
|
||||||
|
r <- runner $ net $ callback d
|
||||||
|
case r of
|
||||||
|
Nothing -> drain v
|
||||||
|
Just exitcode -> return exitcode
|
||||||
|
|
||||||
|
runRelayService
|
||||||
|
:: MonadIO m
|
||||||
|
=> S
|
||||||
|
-> RunProto
|
||||||
|
-> Service
|
||||||
|
-> (RelayHandle -> RelayData -> Net (Maybe ExitCode))
|
||||||
|
-> m ExitCode
|
||||||
|
runRelayService s runner service callback = do
|
||||||
|
v <- liftIO newEmptyMVar
|
||||||
|
(Just hin, Just hout, _, pid) <- liftIO $ createProcess serviceproc
|
||||||
|
{ std_out = CreatePipe
|
||||||
|
, std_in = CreatePipe
|
||||||
|
}
|
||||||
|
_ <- liftIO $ forkIO $ readout v hout
|
||||||
|
feeder <- liftIO $ forkIO $ feedin v
|
||||||
|
_ <- liftIO $ forkIO $ putMVar v . Left =<< waitForProcess pid
|
||||||
|
exitcode <- liftIO $ drain v hin
|
||||||
|
liftIO $ killThread feeder
|
||||||
|
return exitcode
|
||||||
|
where
|
||||||
|
cmd = case service of
|
||||||
|
UploadPack -> "upload-pack"
|
||||||
|
ReceivePack -> "receive-pack"
|
||||||
|
serviceproc = gitCreateProcess [Param cmd, File (repoPath (repo s))] (repo s)
|
||||||
|
|
||||||
|
drain v hin = do
|
||||||
|
d <- takeMVar v
|
||||||
|
case d of
|
||||||
|
Left exitcode -> do
|
||||||
|
hClose hin
|
||||||
|
return exitcode
|
||||||
|
Right relaydata -> do
|
||||||
|
_ <- runner $ net $
|
||||||
|
callback (RelayHandle hin) relaydata
|
||||||
|
drain v hin
|
||||||
|
|
||||||
|
readout v hout = do
|
||||||
|
b <- B.hGetSome hout 65536
|
||||||
|
if B.null b
|
||||||
|
then hClose hout
|
||||||
|
else do
|
||||||
|
putMVar v $ Right $
|
||||||
|
RelayData (L.fromChunks [b])
|
||||||
|
readout v hout
|
||||||
|
|
||||||
|
feedin v = forever $ do
|
||||||
|
m <- runner $ net receiveMessage
|
||||||
|
putMVar v $ Right $ RelayMessage m
|
|
@ -920,6 +920,7 @@ Executable git-annex
|
||||||
Remote.Helper.Http
|
Remote.Helper.Http
|
||||||
Remote.Helper.Messages
|
Remote.Helper.Messages
|
||||||
Remote.Helper.P2P
|
Remote.Helper.P2P
|
||||||
|
Remote.Helper.P2P.IO
|
||||||
Remote.Helper.ReadOnly
|
Remote.Helper.ReadOnly
|
||||||
Remote.Helper.Special
|
Remote.Helper.Special
|
||||||
Remote.Helper.Ssh
|
Remote.Helper.Ssh
|
||||||
|
|
Loading…
Reference in a new issue