From c7ad44e4d16d4ebb12fd4662a894c5bcbc620f21 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 17 Jun 2024 14:14:08 -0400 Subject: [PATCH] work toward supporting proxying to multiple remotes at once For eg, upload fanout. Delay connecting to a remote until it's needed. When there are many proxied remotes, it would not do for the proxy to connect to each of them on startup; that could take a long time. --- Command/P2PStdIO.hs | 23 ++++++++--------- P2P/Proxy.hs | 60 +++++++++++++++++++++++++++++++++++---------- 2 files changed, 59 insertions(+), 24 deletions(-) diff --git a/Command/P2PStdIO.hs b/Command/P2PStdIO.hs index c02673f89c..5424df8140 100644 --- a/Command/P2PStdIO.hs +++ b/Command/P2PStdIO.hs @@ -62,17 +62,15 @@ performLocal theiruuid servermode = do performProxy :: UUID -> P2P.ServerMode -> Remote -> CommandPerform performProxy clientuuid servermode remote = do clientrunst <- liftIO (mkRunState $ Serving clientuuid Nothing) - let clientside = ClientSide $ - liftIO . runNetProto clientrunst - (stdioP2PConnection Nothing) + let clientside = ClientSide clientrunst (stdioP2PConnection Nothing) getClientProtocolVersion remote clientside (withclientversion clientside) protoerrhandler where - withclientversion clientside (Just (clientmaxversion, othermsg)) = - connectremote clientmaxversion $ \remoteside -> - proxy done proxymethods servermode clientside remoteside - othermsg protoerrhandler + withclientversion clientside (Just (clientmaxversion, othermsg)) = do + remoteside <- connectremote clientmaxversion + proxy done proxymethods servermode clientside remoteside + othermsg protoerrhandler withclientversion _ Nothing = done proxymethods = ProxyMethods @@ -81,12 +79,15 @@ performProxy clientuuid servermode remote = do } -- FIXME: Support special remotes. - connectremote clientmaxversion cont = + connectremote clientmaxversion = mkRemoteSide (Remote.uuid remote) $ openP2PShellConnection' remote clientmaxversion >>= \case Just conn@(P2P.IO.OpenConnection (remoterunst, remoteconn, _)) -> - cont (RemoteSide (liftIO . runNetProto remoterunst remoteconn) (Remote.uuid remote)) - `finally` liftIO (closeP2PShellConnection conn) - _ -> giveup "Unable to connect to remote." + return $ Just + ( remoterunst + , remoteconn + , void $ liftIO $ closeP2PShellConnection conn + ) + _ -> return Nothing protoerrhandler cont a = a >>= \case -- Avoid displaying an error when the client hung up on us. diff --git a/P2P/Proxy.hs b/P2P/Proxy.hs index f1788b09f6..ca96819857 100644 --- a/P2P/Proxy.hs +++ b/P2P/Proxy.hs @@ -15,10 +15,23 @@ import P2P.IO import qualified Remote import Utility.Metered (nullMeterUpdate) -type ProtoRunner = forall a. Proto a -> Annex (Either ProtoFailure a) +import Control.Concurrent.STM -data ClientSide = ClientSide ProtoRunner -data RemoteSide = RemoteSide ProtoRunner UUID +type ProtoCloser = Annex () + +data ClientSide = ClientSide RunState P2PConnection + +data RemoteSide = RemoteSide + { remoteUUID :: UUID + , remoteConnect :: Annex (Maybe (RunState, P2PConnection, ProtoCloser)) + , remoteTMVar :: TMVar (RunState, P2PConnection, ProtoCloser) + } + +mkRemoteSide :: UUID -> Annex (Maybe (RunState, P2PConnection, ProtoCloser)) -> Annex RemoteSide +mkRemoteSide remoteuuid remoteconnect = RemoteSide + <$> pure remoteuuid + <*> pure remoteconnect + <*> liftIO (atomically newEmptyTMVar) {- To keep this module limited to P2P protocol actions, - all other actions that a proxy needs to do are provided @@ -47,12 +60,14 @@ type ProtoErrorHandled r = - brought up yet. -} getClientProtocolVersion - :: Remote + :: Remote -> ClientSide -> (Maybe (ProtocolVersion, Maybe Message) -> Annex r) -> ProtoErrorHandled r -getClientProtocolVersion remote (ClientSide client) cont protoerrhandler = +getClientProtocolVersion remote (ClientSide clientrunst clientconn) cont protoerrhandler = protoerrhandler cont $ client $ getClientProtocolVersion' remote + where + client = liftIO . runNetProto clientrunst clientconn getClientProtocolVersion' :: Remote @@ -74,8 +89,7 @@ getClientProtocolVersion' remote = do (Just (defaultProtocolVersion, Just othermsg)) {- Proxy between the client and the remote. This picks up after - - getClientProtocolVersion, after the connection to the remote has - - been made, and the protocol version negotiated with the remote. + - getClientProtocolVersion. -} proxy :: Annex r @@ -87,7 +101,7 @@ proxy -- ^ non-VERSION message that was received from the client when -- negotiating protocol version, and has not been responded to yet -> ProtoErrorHandled r -proxy proxydone proxymethods servermode (ClientSide client) (RemoteSide remote remoteuuid) othermessage protoerrhandler = do +proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remoteside othermessage protoerrhandler = do case othermessage of Just message -> proxyclientmessage (Just message) Nothing -> do @@ -95,6 +109,26 @@ proxy proxydone proxymethods servermode (ClientSide client) (RemoteSide remote r protoerrhandler proxynextclientmessage $ client $ net $ sendMessage $ VERSION v where + client = liftIO . runNetProto clientrunst clientconn + + remote a = liftIO (atomically $ tryReadTMVar $ remoteTMVar remoteside) >>= \case + Just (runst, conn, _closer) -> liftIO $ runNetProto runst conn a + Nothing -> remoteConnect remoteside >>= \case + Just (runst, conn, closer) -> do + liftIO $ atomically $ putTMVar + (remoteTMVar remoteside) + (runst, conn, closer) + liftIO $ runNetProto runst conn a + Nothing -> giveup "Unable to connect to remote." + + closeremote = liftIO (atomically $ tryReadTMVar $ remoteTMVar remoteside) >>= \case + Just (_, _, closer) -> closer + Nothing -> return () + + proxydone' = do + closeremote + proxydone + protocolversion = either (const defaultProtocolVersion) id <$> remote (net getProtocolVersion) @@ -107,7 +141,7 @@ proxy proxydone proxymethods servermode (ClientSide client) (RemoteSide remote r protoerrhandler proxynextclientmessage $ client notallowed - proxyclientmessage Nothing = proxydone + proxyclientmessage Nothing = proxydone' proxyclientmessage (Just message) = case message of CHECKPRESENT _ -> proxyresponse message (const proxynextclientmessage) @@ -129,7 +163,7 @@ proxy proxydone proxymethods servermode (ClientSide client) (RemoteSide remote r servermodechecker (checkCONNECTServerMode service) $ -- P2P protocol does not continue after -- relaying from git. - protoerrhandler (\() -> proxydone) $ + protoerrhandler (\() -> proxydone') $ client $ net $ relayService service NOTIFYCHANGE -> protoerr -- Messages that the client should only send after one of @@ -172,7 +206,7 @@ proxy proxydone proxymethods servermode (ClientSide client) (RemoteSide remote r withresp a (Just resp) = a resp -- Whichever of the remote or client the message was read from -- hung up. - withresp _ Nothing = proxydone + withresp _ Nothing = proxydone' -- Read a message from one party, send it to the other, -- and then pass the message to the continuation. @@ -190,7 +224,7 @@ proxy proxydone proxymethods servermode (ClientSide client) (RemoteSide remote r proxyresponse message $ \resp () -> do case resp of SUCCESS -> removedContent proxymethods - remoteuuid k + (remoteUUID remoteside) k _ -> return () proxynextclientmessage () @@ -219,7 +253,7 @@ proxy proxydone proxymethods servermode (ClientSide client) (RemoteSide remote r where finished resp () = do case resp of - SUCCESS -> addedContent proxymethods remoteuuid k + SUCCESS -> addedContent proxymethods (remoteUUID remoteside) k _ -> return () proxynextclientmessage ()