diff --git a/Command/P2PStdIO.hs b/Command/P2PStdIO.hs index a2bc93cbc8..dfd42587bb 100644 --- a/Command/P2PStdIO.hs +++ b/Command/P2PStdIO.hs @@ -67,14 +67,16 @@ performProxy clientuuid servermode remote = do where withclientversion clientside (Just (clientmaxversion, othermsg)) = do remoteside <- proxySshRemoteSide clientmaxversion remote - proxy p2pDone proxymethods servermode clientside remoteside - othermsg p2pErrHandler + protocolversion <- either (const (min P2P.maxProtocolVersion clientmaxversion)) id + <$> runRemoteSide remoteside + (P2P.net P2P.getProtocolVersion) + let closer = do + closeRemoteSide remoteside + p2pDone + proxy closer proxyMethods servermode clientside + (const $ return remoteside) + protocolversion othermsg p2pErrHandler withclientversion _ Nothing = p2pDone - - proxymethods = ProxyMethods - { removedContent = \u k -> logChange k u InfoMissing - , addedContent = \u k -> logChange k u InfoPresent - } performProxyCluster :: UUID -> ClusterUUID -> P2P.ServerMode -> CommandPerform performProxyCluster clientuuid clusteruuid servermode = do @@ -84,9 +86,18 @@ performProxyCluster clientuuid clusteruuid servermode = do p2pErrHandler where withclientversion clientside (Just (clientmaxversion, othermsg)) = do - giveup "TODO" + let protocolversion = min P2P.maxProtocolVersion clientmaxversion + let selectnode = giveup "FIXME" -- FIXME + proxy p2pDone proxyMethods servermode clientside selectnode + protocolversion othermsg p2pErrHandler withclientversion _ Nothing = p2pDone +proxyMethods :: ProxyMethods +proxyMethods = ProxyMethods + { removedContent = \u k -> logChange k u InfoMissing + , addedContent = \u k -> logChange k u InfoPresent + } + proxyClientSide :: UUID -> Annex ClientSide proxyClientSide clientuuid = do clientrunst <- liftIO (mkRunState $ Serving clientuuid Nothing) diff --git a/P2P/Proxy.hs b/P2P/Proxy.hs index ed668fa781..e4101177e0 100644 --- a/P2P/Proxy.hs +++ b/P2P/Proxy.hs @@ -32,6 +32,24 @@ mkRemoteSide remoteuuid remoteconnect = RemoteSide <*> pure remoteconnect <*> liftIO (atomically newEmptyTMVar) +runRemoteSide :: RemoteSide -> Proto a -> Annex (Either ProtoFailure a) +runRemoteSide remoteside a = + liftIO (atomically $ tryReadTMVar $ remoteTMVar remoteside) >>= \case + Just (runst, conn, _closer) -> liftIO $ runNetProto runst conn a + Nothing -> remoteConnect remoteside >>= \case + Just (runst, conn, closer) -> do + liftIO $ atomically $ putTMVar + (remoteTMVar remoteside) + (runst, conn, closer) + liftIO $ runNetProto runst conn a + Nothing -> giveup "Unable to connect to remote." + +closeRemoteSide :: RemoteSide -> Annex () +closeRemoteSide remoteside = + liftIO (atomically $ tryReadTMVar $ remoteTMVar remoteside) >>= \case + Just (_, _, closer) -> closer + Nothing -> return () + {- To keep this module limited to P2P protocol actions, - all other actions that a proxy needs to do are provided - here. -} @@ -95,42 +113,20 @@ proxy -> ProxyMethods -> ServerMode -> ClientSide - -> RemoteSide + -> (Message -> Annex RemoteSide) + -> ProtocolVersion -> Maybe Message -- ^ non-VERSION message that was received from the client when -- negotiating protocol version, and has not been responded to yet -> ProtoErrorHandled r -proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remoteside othermessage protoerrhandler = do +proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) getremoteside protocolversion othermessage protoerrhandler = do case othermessage of + Nothing -> protoerrhandler proxynextclientmessage $ + client $ net $ sendMessage $ VERSION protocolversion Just message -> proxyclientmessage (Just message) - Nothing -> do - v <- protocolversion - protoerrhandler proxynextclientmessage $ - client $ net $ sendMessage $ VERSION v where client = liftIO . runNetProto clientrunst clientconn - remote a = liftIO (atomically $ tryReadTMVar $ remoteTMVar remoteside) >>= \case - Just (runst, conn, _closer) -> liftIO $ runNetProto runst conn a - Nothing -> remoteConnect remoteside >>= \case - Just (runst, conn, closer) -> do - liftIO $ atomically $ putTMVar - (remoteTMVar remoteside) - (runst, conn, closer) - liftIO $ runNetProto runst conn a - Nothing -> giveup "Unable to connect to remote." - - closeremote = liftIO (atomically $ tryReadTMVar $ remoteTMVar remoteside) >>= \case - Just (_, _, closer) -> closer - Nothing -> return () - - proxydone' = do - closeremote - proxydone - - protocolversion = either (const defaultProtocolVersion) id - <$> remote (net getProtocolVersion) - proxynextclientmessage () = protoerrhandler proxyclientmessage $ client (net receiveMessage) @@ -140,21 +136,28 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo protoerrhandler proxynextclientmessage $ client notallowed - proxyclientmessage Nothing = proxydone' + proxyclientmessage Nothing = proxydone proxyclientmessage (Just message) = case message of - CHECKPRESENT _ -> - proxyresponse message (const proxynextclientmessage) - LOCKCONTENT _ -> - proxyresponse message (const proxynextclientmessage) - UNLOCKCONTENT -> - proxynoresponse message proxynextclientmessage - REMOVE k -> + CHECKPRESENT _ -> do + remoteside <- getremoteside message + proxyresponse remoteside message (const proxynextclientmessage) + LOCKCONTENT _ -> do + remoteside <- getremoteside message + proxyresponse remoteside message (const proxynextclientmessage) + UNLOCKCONTENT -> do + remoteside <- getremoteside message + proxynoresponse remoteside message proxynextclientmessage + REMOVE k -> do + remoteside <- getremoteside message servermodechecker checkREMOVEServerMode $ - handleREMOVE k message - GET _ _ _ -> handleGET message - PUT _ k -> + handleREMOVE remoteside k message + GET _ _ _ -> do + remoteside <- getremoteside message + handleGET remoteside message + PUT _ k -> do + remoteside <- getremoteside message servermodechecker checkPUTServerMode $ - handlePUT k message + handlePUT remoteside k message -- These messages involve the git repository, not the -- annex. So they affect the git repository of the proxy, -- not the remote. @@ -162,7 +165,7 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo servermodechecker (checkCONNECTServerMode service) $ -- P2P protocol does not continue after -- relaying from git. - protoerrhandler (\() -> proxydone') $ + protoerrhandler (\() -> proxydone) $ client $ net $ relayService service NOTIFYCHANGE -> protoerr -- Messages that the client should only send after one of @@ -186,14 +189,15 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo -- Send a message to the remote, send its response back to the -- client, and pass it to the continuation. - proxyresponse message a = getresponse remote message $ \resp -> - protoerrhandler (a resp) $ - client $ net $ sendMessage resp + proxyresponse remoteside message a = + getresponse (runRemoteSide remoteside) message $ \resp -> + protoerrhandler (a resp) $ + client $ net $ sendMessage resp -- Send a message to the remote, that it will not respond to. - proxynoresponse message a = + proxynoresponse remoteside message a = protoerrhandler a $ - remote $ net $ sendMessage message + runRemoteSide remoteside $ net $ sendMessage message -- Send a message to the endpoint and get back its response. getresponse endpoint message handleresp = @@ -205,7 +209,7 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo withresp a (Just resp) = a resp -- Whichever of the remote or client the message was read from -- hung up. - withresp _ Nothing = proxydone' + withresp _ Nothing = proxydone -- Read a message from one party, send it to the other, -- and then pass the message to the continuation. @@ -219,36 +223,38 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo _ <- client $ net $ sendMessage (ERROR "protocol error") giveup "protocol error" - handleREMOVE k message = - proxyresponse message $ \resp () -> do + handleREMOVE remoteside k message = + proxyresponse remoteside message $ \resp () -> do case resp of SUCCESS -> removedContent proxymethods (remoteUUID remoteside) k _ -> return () proxynextclientmessage () - handleGET message = getresponse remote message $ withDATA relayGET + handleGET remoteside message = getresponse (runRemoteSide remoteside) message $ + withDATA (relayGET remoteside) - handlePUT k message = getresponse remote message $ \resp -> case resp of - ALREADY_HAVE -> protoerrhandler proxynextclientmessage $ - client $ net $ sendMessage resp - PUT_FROM _ -> - getresponse client resp $ withDATA (relayPUT k) - _ -> protoerr + handlePUT remoteside k message = + getresponse (runRemoteSide remoteside) message $ \resp -> case resp of + ALREADY_HAVE -> protoerrhandler proxynextclientmessage $ + client $ net $ sendMessage resp + PUT_FROM _ -> + getresponse client resp $ withDATA (relayPUT remoteside k) + _ -> protoerr withDATA a message@(DATA len) = a len message withDATA _ _ = protoerr - relayGET len = relayDATAStart client $ - relayDATACore len remote client $ - relayDATAFinish remote client $ - relayonemessage client remote $ + relayGET remoteside len = relayDATAStart client $ + relayDATACore len (runRemoteSide remoteside) client $ + relayDATAFinish (runRemoteSide remoteside) client $ + relayonemessage client (runRemoteSide remoteside) $ const proxynextclientmessage - relayPUT k len = relayDATAStart remote $ - relayDATACore len client remote $ - relayDATAFinish client remote $ - relayonemessage remote client finished + relayPUT remoteside k len = relayDATAStart (runRemoteSide remoteside) $ + relayDATACore len client (runRemoteSide remoteside) $ + relayDATAFinish client (runRemoteSide remoteside) $ + relayonemessage (runRemoteSide remoteside) client finished where finished resp () = do case resp of @@ -266,7 +272,7 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo send b = protoerrhandler finishget $ y $ net $ sendBytes len b nullMeterUpdate - relayDATAFinish x y sendsuccessfailure () = protocolversion >>= \case + relayDATAFinish x y sendsuccessfailure () = case protocolversion of ProtocolVersion 0 -> sendsuccessfailure -- Protocol version 1 has a VALID or -- INVALID message after the data.