This commit is contained in:
Joey Hess 2016-12-06 15:40:31 -04:00
parent 26a53fb4a5
commit f744bd5391
No known key found for this signature in database
GPG key ID: C910D9222512E3C7
7 changed files with 74 additions and 86 deletions

View file

@ -9,7 +9,7 @@
module P2P.Annex
( RunMode(..)
, RunEnv(..)
, P2PConnection(..)
, runFullProto
) where
@ -31,12 +31,12 @@ data RunMode
| Client
-- Full interpreter for Proto, that can receive and send objects.
runFullProto :: RunMode -> RunEnv -> Proto a -> Annex (Maybe a)
runFullProto runmode runenv = go
runFullProto :: RunMode -> P2PConnection -> Proto a -> Annex (Maybe a)
runFullProto runmode conn = go
where
go :: RunProto Annex
go (Pure v) = pure (Just v)
go (Free (Net n)) = runNet runenv go n
go (Free (Net n)) = runNet conn go n
go (Free (Local l)) = runLocal runmode go l
runLocal :: RunMode -> RunProto Annex -> LocalF (Proto a) -> Annex (Maybe a)

View file

@ -9,12 +9,15 @@
module P2P.IO
( RunProto
, RunEnv(..)
, P2PConnection(..)
, connectPeer
, setupHandle
, runNetProto
, runNet
) where
import P2P.Protocol
import P2P.Address
import Utility.Process
import Git
import Git.Command
@ -22,11 +25,14 @@ import Utility.AuthToken
import Utility.SafeCommand
import Utility.SimpleProtocol
import Utility.Exception
import Utility.Tor
import Utility.FileSystemEncoding
import Control.Monad
import Control.Monad.Free
import Control.Monad.IO.Class
import System.Exit (ExitCode(..))
import Network.Socket
import System.IO
import Control.Concurrent
import Control.Concurrent.Async
@ -36,41 +42,60 @@ import qualified Data.ByteString.Lazy as L
-- Type of interpreters of the Proto free monad.
type RunProto m = forall a. (MonadIO m, MonadMask m) => Proto a -> m (Maybe a)
data RunEnv = RunEnv
{ runRepo :: Repo
, runCheckAuth :: (AuthToken -> Bool)
, runIhdl :: Handle
, runOhdl :: Handle
data P2PConnection = P2PConnection
{ connRepo :: Repo
, connCheckAuth :: (AuthToken -> Bool)
, connIhdl :: Handle
, connOhdl :: Handle
}
-- Opens a connection to a peer. Does not authenticate with it.
connectPeer :: Git.Repo -> P2PAddress -> IO P2PConnection
connectPeer g (TorAnnex onionaddress onionport) = do
h <- setupHandle =<< connectHiddenService onionaddress onionport
return $ P2PConnection
{ connRepo = g
, connCheckAuth = const False
, connIhdl = h
, connOhdl = h
}
setupHandle :: Socket -> IO Handle
setupHandle s = do
h <- socketToHandle s ReadWriteMode
hSetBuffering h LineBuffering
hSetBinaryMode h False
fileEncoding h
return h
-- Purposefully incomplete interpreter of Proto.
--
-- This only runs Net actions. No Local actions will be run
-- (those need the Annex monad) -- if the interpreter reaches any,
-- it returns Nothing.
runNetProto :: RunEnv -> Proto a -> IO (Maybe a)
runNetProto runenv = go
runNetProto :: P2PConnection -> Proto a -> IO (Maybe a)
runNetProto conn = go
where
go :: RunProto IO
go (Pure v) = pure (Just v)
go (Free (Net n)) = runNet runenv go n
go (Free (Net n)) = runNet conn go n
go (Free (Local _)) = return Nothing
-- Interpreter of the Net part of Proto.
--
-- An interpreter of Proto has to be provided, to handle the rest of Proto
-- actions.
runNet :: (MonadIO m, MonadMask m) => RunEnv -> RunProto m -> NetF (Proto a) -> m (Maybe a)
runNet runenv runner f = case f of
runNet :: (MonadIO m, MonadMask m) => P2PConnection -> RunProto m -> NetF (Proto a) -> m (Maybe a)
runNet conn runner f = case f of
SendMessage m next -> do
v <- liftIO $ tryNonAsync $ do
hPutStrLn (runOhdl runenv) (unwords (formatMessage m))
hFlush (runOhdl runenv)
hPutStrLn (connOhdl conn) (unwords (formatMessage m))
hFlush (connOhdl conn)
case v of
Left _e -> return Nothing
Right () -> runner next
ReceiveMessage next -> do
v <- liftIO $ tryNonAsync $ hGetLine (runIhdl runenv)
v <- liftIO $ tryNonAsync $ hGetLine (connIhdl conn)
case v of
Left _e -> return Nothing
Right l -> case parseMessage l of
@ -81,19 +106,19 @@ runNet runenv runner f = case f of
next e
SendBytes len b next -> do
v <- liftIO $ tryNonAsync $ do
ok <- sendExactly len b (runOhdl runenv)
hFlush (runOhdl runenv)
ok <- sendExactly len b (connOhdl conn)
hFlush (connOhdl conn)
return ok
case v of
Right True -> runner next
_ -> return Nothing
ReceiveBytes (Len n) next -> do
v <- liftIO $ tryNonAsync $ L.hGet (runIhdl runenv) (fromIntegral n)
v <- liftIO $ tryNonAsync $ L.hGet (connIhdl conn) (fromIntegral n)
case v of
Left _e -> return Nothing
Right b -> runner (next b)
CheckAuthToken _u t next -> do
let authed = runCheckAuth runenv t
let authed = connCheckAuth conn t
runner (next authed)
Relay hin hout next -> do
v <- liftIO $ runRelay runnerio hin hout
@ -101,7 +126,7 @@ runNet runenv runner f = case f of
Nothing -> return Nothing
Just exitcode -> runner (next exitcode)
RelayService service next -> do
v <- liftIO $ runRelayService runenv runnerio service
v <- liftIO $ runRelayService conn runnerio service
case v of
Nothing -> return Nothing
Just () -> runner next
@ -109,7 +134,7 @@ runNet runenv runner f = case f of
-- This is only used for running Net actions when relaying,
-- so it's ok to use runNetProto, despite it not supporting
-- all Proto actions.
runnerio = runNetProto runenv
runnerio = runNetProto conn
-- Send exactly the specified number of bytes or returns False.
--
@ -150,8 +175,8 @@ runRelay runner (RelayHandle hout) (RelayHandle hin) = bracket setup cleanup go
go v = relayHelper runner v hin
runRelayService :: RunEnv -> RunProto IO -> Service -> IO (Maybe ())
runRelayService runenv runner service = bracket setup cleanup go
runRelayService :: P2PConnection -> RunProto IO -> Service -> IO (Maybe ())
runRelayService conn runner service = bracket setup cleanup go
where
cmd = case service of
UploadPack -> "upload-pack"
@ -159,8 +184,8 @@ runRelayService runenv runner service = bracket setup cleanup go
serviceproc = gitCreateProcess
[ Param cmd
, File (repoPath (runRepo runenv))
] (runRepo runenv)
, File (repoPath (connRepo conn))
] (connRepo conn)
setup = do
(Just hin, Just hout, _, pid) <- createProcess serviceproc