support a P2PConnection that uses TMVars rather than Handles

This will allow having an internal thread speaking P2P protocol,
which will be needed to support proxying to external special remotes.

No serialization is done on the internal P2P protocol of course.

When a ByteString is being exchanged, it may or may not be exactly
the length indicated by DATA. While that has to be carefully managed
for the serialized P2P protocol, here it would require buffering the
whole lazy bytestring in memory to check its length when sending,
so it's better to do length checks on the receiving side.
This commit is contained in:
Joey Hess 2024-06-28 11:22:29 -04:00
parent 28f5c47b5a
commit c3a785204e
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
5 changed files with 86 additions and 45 deletions

View file

@ -135,8 +135,8 @@ checkHiddenService = bracket setup cleanup go
let conn = P2PConnection let conn = P2PConnection
{ connRepo = Nothing { connRepo = Nothing
, connCheckAuth = const False , connCheckAuth = const False
, connIhdl = h , connIhdl = P2PHandle h
, connOhdl = h , connOhdl = P2PHandle h
, connIdent = ConnIdent Nothing , connIdent = ConnIdent Nothing
} }
runst <- mkRunState Client runst <- mkRunState Client

View file

@ -67,7 +67,7 @@ performProxy clientuuid servermode r = do
p2pErrHandler p2pErrHandler
where where
withclientversion clientside (Just (clientmaxversion, othermsg)) = do withclientversion clientside (Just (clientmaxversion, othermsg)) = do
remoteside <- proxySshRemoteSide clientmaxversion mempty r remoteside <- proxyRemoteSide clientmaxversion mempty r
protocolversion <- either (const (min P2P.maxProtocolVersion clientmaxversion)) id protocolversion <- either (const (min P2P.maxProtocolVersion clientmaxversion)) id
<$> runRemoteSide remoteside <$> runRemoteSide remoteside
(P2P.net P2P.getProtocolVersion) (P2P.net P2P.getProtocolVersion)

117
P2P/IO.hs
View file

@ -11,6 +11,7 @@ module P2P.IO
( RunProto ( RunProto
, RunState(..) , RunState(..)
, mkRunState , mkRunState
, P2PHandle(..)
, P2PConnection(..) , P2PConnection(..)
, ConnIdent(..) , ConnIdent(..)
, ClosableConnection(..) , ClosableConnection(..)
@ -74,11 +75,15 @@ mkRunState mk = do
tvar <- newTVarIO defaultProtocolVersion tvar <- newTVarIO defaultProtocolVersion
return (mk tvar) return (mk tvar)
data P2PHandle
= P2PHandle Handle
| P2PHandleTMVar (TMVar (Either L.ByteString Message))
data P2PConnection = P2PConnection data P2PConnection = P2PConnection
{ connRepo :: Maybe Repo { connRepo :: Maybe Repo
, connCheckAuth :: (AuthToken -> Bool) , connCheckAuth :: (AuthToken -> Bool)
, connIhdl :: Handle , connIhdl :: P2PHandle
, connOhdl :: Handle , connOhdl :: P2PHandle
, connIdent :: ConnIdent , connIdent :: ConnIdent
} }
@ -94,8 +99,8 @@ stdioP2PConnection :: Maybe Git.Repo -> P2PConnection
stdioP2PConnection g = P2PConnection stdioP2PConnection g = P2PConnection
{ connRepo = g { connRepo = g
, connCheckAuth = const False , connCheckAuth = const False
, connIhdl = stdin , connIhdl = P2PHandle stdin
, connOhdl = stdout , connOhdl = P2PHandle stdout
, connIdent = ConnIdent Nothing , connIdent = ConnIdent Nothing
} }
@ -106,15 +111,18 @@ connectPeer g (TorAnnex onionaddress onionport) = do
return $ P2PConnection return $ P2PConnection
{ connRepo = g { connRepo = g
, connCheckAuth = const False , connCheckAuth = const False
, connIhdl = h , connIhdl = P2PHandle h
, connOhdl = h , connOhdl = P2PHandle h
, connIdent = ConnIdent Nothing , connIdent = ConnIdent Nothing
} }
closeConnection :: P2PConnection -> IO () closeConnection :: P2PConnection -> IO ()
closeConnection conn = do closeConnection conn = do
hClose (connIhdl conn) closehandle (connIhdl conn)
hClose (connOhdl conn) closehandle (connOhdl conn)
where
closehandle (P2PHandle h) = hClose h
closehandle (P2PHandleTMVar _) = return ()
-- Serves the protocol on a unix socket. -- Serves the protocol on a unix socket.
-- --
@ -164,6 +172,11 @@ runNetProto runst conn = go
go (Free (Local _)) = return $ Left $ go (Free (Local _)) = return $ Left $
ProtoFailureMessage "unexpected annex operation attempted" ProtoFailureMessage "unexpected annex operation attempted"
data P2PTMVarException = P2PTMVarException String
deriving (Show)
instance Exception P2PTMVarException
-- Interpreter of the Net part of Proto. -- Interpreter of the Net part of Proto.
-- --
-- An interpreter of Proto has to be provided, to handle the rest of Proto -- An interpreter of Proto has to be provided, to handle the rest of Proto
@ -171,40 +184,68 @@ runNetProto runst conn = go
runNet :: (MonadIO m, MonadMask m) => RunState -> P2PConnection -> RunProto m -> NetF (Proto a) -> m (Either ProtoFailure a) runNet :: (MonadIO m, MonadMask m) => RunState -> P2PConnection -> RunProto m -> NetF (Proto a) -> m (Either ProtoFailure a)
runNet runst conn runner f = case f of runNet runst conn runner f = case f of
SendMessage m next -> do SendMessage m next -> do
v <- liftIO $ tryNonAsync $ do v <- liftIO $ do
let l = unwords (formatMessage m)
debugMessage conn "P2P >" m debugMessage conn "P2P >" m
hPutStrLn (connOhdl conn) l case connOhdl conn of
hFlush (connOhdl conn) P2PHandle h -> tryNonAsync $ do
hPutStrLn h $ unwords (formatMessage m)
hFlush h
P2PHandleTMVar mv ->
ifM (atomically (tryPutTMVar mv (Right m)))
( return $ Right ()
, return $ Left $ toException $
P2PTMVarException "TMVar left full"
)
case v of case v of
Left e -> return $ Left $ ProtoFailureException e Left e -> return $ Left $ ProtoFailureException e
Right () -> runner next Right () -> runner next
ReceiveMessage next -> do ReceiveMessage next ->
v <- liftIO $ tryIOError $ getProtocolLine (connIhdl conn) let protoerr = return $ Left $
case v of ProtoFailureMessage "protocol error"
Left e -> return $ Left $ ProtoFailureIOError e gotmessage m = do
Right Nothing -> return $ Left $ liftIO $ debugMessage conn "P2P <" m
ProtoFailureMessage "protocol error" runner (next (Just m))
Right (Just l) -> case parseMessage l of in case connIhdl conn of
Just m -> do P2PHandle h -> do
liftIO $ debugMessage conn "P2P <" m v <- liftIO $ tryIOError $ getProtocolLine h
runner (next (Just m)) case v of
Nothing -> runner (next Nothing) Left e -> return $ Left $ ProtoFailureIOError e
SendBytes len b p next -> do Right Nothing -> protoerr
v <- liftIO $ tryNonAsync $ do Right (Just l) -> case parseMessage l of
ok <- sendExactly len b (connOhdl conn) p Just m -> gotmessage m
hFlush (connOhdl conn) Nothing -> runner (next Nothing)
return ok P2PHandleTMVar mv ->
case v of liftIO (atomically (takeTMVar mv)) >>= \case
Right True -> runner next Right m -> gotmessage m
Right False -> return $ Left $ Left _b -> protoerr
ProtoFailureMessage "short data write" SendBytes len b p next ->
Left e -> return $ Left $ ProtoFailureException e case connOhdl conn of
ReceiveBytes len p next -> do P2PHandle h -> do
v <- liftIO $ tryNonAsync $ receiveExactly len (connIhdl conn) p v <- liftIO $ tryNonAsync $ do
case v of ok <- sendExactly len b h p
Left e -> return $ Left $ ProtoFailureException e hFlush h
Right b -> runner (next b) return ok
case v of
Right True -> runner next
Right False -> return $ Left $
ProtoFailureMessage "short data write"
Left e -> return $ Left $ ProtoFailureException e
P2PHandleTMVar mv -> do
liftIO $ atomically $ putTMVar mv (Left b)
runner next
ReceiveBytes len p next ->
case connIhdl conn of
P2PHandle h -> do
v <- liftIO $ tryNonAsync $ receiveExactly len h p
case v of
Right b -> runner (next b)
Left e -> return $ Left $
ProtoFailureException e
P2PHandleTMVar mv ->
liftIO (atomically (takeTMVar mv)) >>= \case
Left b -> runner (next b)
Right _m -> return $ Left $
ProtoFailureMessage "protocol error"
CheckAuthToken _u t next -> do CheckAuthToken _u t next -> do
let authed = connCheckAuth conn t let authed = connCheckAuth conn t
runner (next authed) runner (next authed)

View file

@ -264,8 +264,8 @@ openP2PShellConnection' r maxprotoversion bypass = do
let conn = P2P.P2PConnection let conn = P2P.P2PConnection
{ P2P.connRepo = Nothing { P2P.connRepo = Nothing
, P2P.connCheckAuth = const False , P2P.connCheckAuth = const False
, P2P.connIhdl = to , P2P.connIhdl = P2P.P2PHandle to
, P2P.connOhdl = from , P2P.connOhdl = P2P.P2PHandle from
, P2P.connIdent = P2P.ConnIdent $ , P2P.connIdent = P2P.ConnIdent $
Just $ "git-annex-shell connection " ++ show pidnum Just $ "git-annex-shell connection " ++ show pidnum
} }

View file

@ -113,8 +113,8 @@ serveClient th@(TransportHandle _ _ rd) u r q = bracket setup cleanup start
let conn = P2PConnection let conn = P2PConnection
{ connRepo = Just r { connRepo = Just r
, connCheckAuth = (`isAllowedAuthToken` allowed) , connCheckAuth = (`isAllowedAuthToken` allowed)
, connIhdl = h , connIhdl = P2PHandle h
, connOhdl = h , connOhdl = P2PHandle h
, connIdent = ConnIdent $ Just "tor remotedaemon" , connIdent = ConnIdent $ Just "tor remotedaemon"
} }
-- not really Client, but we don't know their uuid yet -- not really Client, but we don't know their uuid yet