work toward supporting proxying to multiple remotes at once

For eg, upload fanout.

Delay connecting to a remote until it's needed. When there are many
proxied remotes, it would not do for the proxy to connect to each of
them on startup; that could take a long time.
This commit is contained in:
Joey Hess 2024-06-17 14:14:08 -04:00
parent 83a1db8d17
commit c7ad44e4d1
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
2 changed files with 59 additions and 24 deletions

View file

@ -62,17 +62,15 @@ performLocal theiruuid servermode = do
performProxy :: UUID -> P2P.ServerMode -> Remote -> CommandPerform performProxy :: UUID -> P2P.ServerMode -> Remote -> CommandPerform
performProxy clientuuid servermode remote = do performProxy clientuuid servermode remote = do
clientrunst <- liftIO (mkRunState $ Serving clientuuid Nothing) clientrunst <- liftIO (mkRunState $ Serving clientuuid Nothing)
let clientside = ClientSide $ let clientside = ClientSide clientrunst (stdioP2PConnection Nothing)
liftIO . runNetProto clientrunst
(stdioP2PConnection Nothing)
getClientProtocolVersion remote clientside getClientProtocolVersion remote clientside
(withclientversion clientside) (withclientversion clientside)
protoerrhandler protoerrhandler
where where
withclientversion clientside (Just (clientmaxversion, othermsg)) = withclientversion clientside (Just (clientmaxversion, othermsg)) = do
connectremote clientmaxversion $ \remoteside -> remoteside <- connectremote clientmaxversion
proxy done proxymethods servermode clientside remoteside proxy done proxymethods servermode clientside remoteside
othermsg protoerrhandler othermsg protoerrhandler
withclientversion _ Nothing = done withclientversion _ Nothing = done
proxymethods = ProxyMethods proxymethods = ProxyMethods
@ -81,12 +79,15 @@ performProxy clientuuid servermode remote = do
} }
-- FIXME: Support special remotes. -- FIXME: Support special remotes.
connectremote clientmaxversion cont = connectremote clientmaxversion = mkRemoteSide (Remote.uuid remote) $
openP2PShellConnection' remote clientmaxversion >>= \case openP2PShellConnection' remote clientmaxversion >>= \case
Just conn@(P2P.IO.OpenConnection (remoterunst, remoteconn, _)) -> Just conn@(P2P.IO.OpenConnection (remoterunst, remoteconn, _)) ->
cont (RemoteSide (liftIO . runNetProto remoterunst remoteconn) (Remote.uuid remote)) return $ Just
`finally` liftIO (closeP2PShellConnection conn) ( remoterunst
_ -> giveup "Unable to connect to remote." , remoteconn
, void $ liftIO $ closeP2PShellConnection conn
)
_ -> return Nothing
protoerrhandler cont a = a >>= \case protoerrhandler cont a = a >>= \case
-- Avoid displaying an error when the client hung up on us. -- Avoid displaying an error when the client hung up on us.

View file

@ -15,10 +15,23 @@ import P2P.IO
import qualified Remote import qualified Remote
import Utility.Metered (nullMeterUpdate) import Utility.Metered (nullMeterUpdate)
type ProtoRunner = forall a. Proto a -> Annex (Either ProtoFailure a) import Control.Concurrent.STM
data ClientSide = ClientSide ProtoRunner type ProtoCloser = Annex ()
data RemoteSide = RemoteSide ProtoRunner UUID
data ClientSide = ClientSide RunState P2PConnection
data RemoteSide = RemoteSide
{ remoteUUID :: UUID
, remoteConnect :: Annex (Maybe (RunState, P2PConnection, ProtoCloser))
, remoteTMVar :: TMVar (RunState, P2PConnection, ProtoCloser)
}
mkRemoteSide :: UUID -> Annex (Maybe (RunState, P2PConnection, ProtoCloser)) -> Annex RemoteSide
mkRemoteSide remoteuuid remoteconnect = RemoteSide
<$> pure remoteuuid
<*> pure remoteconnect
<*> liftIO (atomically newEmptyTMVar)
{- To keep this module limited to P2P protocol actions, {- To keep this module limited to P2P protocol actions,
- all other actions that a proxy needs to do are provided - all other actions that a proxy needs to do are provided
@ -47,12 +60,14 @@ type ProtoErrorHandled r =
- brought up yet. - brought up yet.
-} -}
getClientProtocolVersion getClientProtocolVersion
:: Remote :: Remote
-> ClientSide -> ClientSide
-> (Maybe (ProtocolVersion, Maybe Message) -> Annex r) -> (Maybe (ProtocolVersion, Maybe Message) -> Annex r)
-> ProtoErrorHandled r -> ProtoErrorHandled r
getClientProtocolVersion remote (ClientSide client) cont protoerrhandler = getClientProtocolVersion remote (ClientSide clientrunst clientconn) cont protoerrhandler =
protoerrhandler cont $ client $ getClientProtocolVersion' remote protoerrhandler cont $ client $ getClientProtocolVersion' remote
where
client = liftIO . runNetProto clientrunst clientconn
getClientProtocolVersion' getClientProtocolVersion'
:: Remote :: Remote
@ -74,8 +89,7 @@ getClientProtocolVersion' remote = do
(Just (defaultProtocolVersion, Just othermsg)) (Just (defaultProtocolVersion, Just othermsg))
{- Proxy between the client and the remote. This picks up after {- Proxy between the client and the remote. This picks up after
- getClientProtocolVersion, after the connection to the remote has - getClientProtocolVersion.
- been made, and the protocol version negotiated with the remote.
-} -}
proxy proxy
:: Annex r :: Annex r
@ -87,7 +101,7 @@ proxy
-- ^ non-VERSION message that was received from the client when -- ^ non-VERSION message that was received from the client when
-- negotiating protocol version, and has not been responded to yet -- negotiating protocol version, and has not been responded to yet
-> ProtoErrorHandled r -> ProtoErrorHandled r
proxy proxydone proxymethods servermode (ClientSide client) (RemoteSide remote remoteuuid) othermessage protoerrhandler = do proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remoteside othermessage protoerrhandler = do
case othermessage of case othermessage of
Just message -> proxyclientmessage (Just message) Just message -> proxyclientmessage (Just message)
Nothing -> do Nothing -> do
@ -95,6 +109,26 @@ proxy proxydone proxymethods servermode (ClientSide client) (RemoteSide remote r
protoerrhandler proxynextclientmessage $ protoerrhandler proxynextclientmessage $
client $ net $ sendMessage $ VERSION v client $ net $ sendMessage $ VERSION v
where where
client = liftIO . runNetProto clientrunst clientconn
remote a = liftIO (atomically $ tryReadTMVar $ remoteTMVar remoteside) >>= \case
Just (runst, conn, _closer) -> liftIO $ runNetProto runst conn a
Nothing -> remoteConnect remoteside >>= \case
Just (runst, conn, closer) -> do
liftIO $ atomically $ putTMVar
(remoteTMVar remoteside)
(runst, conn, closer)
liftIO $ runNetProto runst conn a
Nothing -> giveup "Unable to connect to remote."
closeremote = liftIO (atomically $ tryReadTMVar $ remoteTMVar remoteside) >>= \case
Just (_, _, closer) -> closer
Nothing -> return ()
proxydone' = do
closeremote
proxydone
protocolversion = either (const defaultProtocolVersion) id protocolversion = either (const defaultProtocolVersion) id
<$> remote (net getProtocolVersion) <$> remote (net getProtocolVersion)
@ -107,7 +141,7 @@ proxy proxydone proxymethods servermode (ClientSide client) (RemoteSide remote r
protoerrhandler proxynextclientmessage $ protoerrhandler proxynextclientmessage $
client notallowed client notallowed
proxyclientmessage Nothing = proxydone proxyclientmessage Nothing = proxydone'
proxyclientmessage (Just message) = case message of proxyclientmessage (Just message) = case message of
CHECKPRESENT _ -> CHECKPRESENT _ ->
proxyresponse message (const proxynextclientmessage) proxyresponse message (const proxynextclientmessage)
@ -129,7 +163,7 @@ proxy proxydone proxymethods servermode (ClientSide client) (RemoteSide remote r
servermodechecker (checkCONNECTServerMode service) $ servermodechecker (checkCONNECTServerMode service) $
-- P2P protocol does not continue after -- P2P protocol does not continue after
-- relaying from git. -- relaying from git.
protoerrhandler (\() -> proxydone) $ protoerrhandler (\() -> proxydone') $
client $ net $ relayService service client $ net $ relayService service
NOTIFYCHANGE -> protoerr NOTIFYCHANGE -> protoerr
-- Messages that the client should only send after one of -- Messages that the client should only send after one of
@ -172,7 +206,7 @@ proxy proxydone proxymethods servermode (ClientSide client) (RemoteSide remote r
withresp a (Just resp) = a resp withresp a (Just resp) = a resp
-- Whichever of the remote or client the message was read from -- Whichever of the remote or client the message was read from
-- hung up. -- hung up.
withresp _ Nothing = proxydone withresp _ Nothing = proxydone'
-- Read a message from one party, send it to the other, -- Read a message from one party, send it to the other,
-- and then pass the message to the continuation. -- and then pass the message to the continuation.
@ -190,7 +224,7 @@ proxy proxydone proxymethods servermode (ClientSide client) (RemoteSide remote r
proxyresponse message $ \resp () -> do proxyresponse message $ \resp () -> do
case resp of case resp of
SUCCESS -> removedContent proxymethods SUCCESS -> removedContent proxymethods
remoteuuid k (remoteUUID remoteside) k
_ -> return () _ -> return ()
proxynextclientmessage () proxynextclientmessage ()
@ -219,7 +253,7 @@ proxy proxydone proxymethods servermode (ClientSide client) (RemoteSide remote r
where where
finished resp () = do finished resp () = do
case resp of case resp of
SUCCESS -> addedContent proxymethods remoteuuid k SUCCESS -> addedContent proxymethods (remoteUUID remoteside) k
_ -> return () _ -> return ()
proxynextclientmessage () proxynextclientmessage ()