preparing for cluster node selection
Support selecting what remote to proxy for each top-level P2P protocol message. This only needs to be extended now to support fanout to multiple nodes for PUT and REMOVE, and with a remote that fails for LOCKCONTENT and UNLOCKCONTENT. But a good first step would be to implement CHECKPRESENT and GET for clusters. Both should select a node that actually does have the content. That will allow a cluster to work for GET even when location tracking is out of date.
@ -67,14 +67,16 @@ performProxy clientuuid servermode remote = do
withclientversion clientside (Just (clientmaxversion, othermsg)) = do
remoteside <- proxySshRemoteSide clientmaxversion remote
proxy p2pDone proxymethods servermode clientside remoteside
othermsg p2pErrHandler
protocolversion <- either (const (min P2P.maxProtocolVersion clientmaxversion)) id
<$> runRemoteSide remoteside
( P2P.getProtocolVersion)
let closer = do
closeRemoteSide remoteside
proxy closer proxyMethods servermode clientside
(const $ return remoteside)
protocolversion othermsg p2pErrHandler
withclientversion _ Nothing = p2pDone
proxymethods = ProxyMethods
{ removedContent = \u k -> logChange k u InfoMissing
, addedContent = \u k -> logChange k u InfoPresent
performProxyCluster :: UUID -> ClusterUUID -> P2P.ServerMode -> CommandPerform
performProxyCluster clientuuid clusteruuid servermode = do
@ -84,9 +86,18 @@ performProxyCluster clientuuid clusteruuid servermode = do
withclientversion clientside (Just (clientmaxversion, othermsg)) = do
giveup "TODO"
let protocolversion = min P2P.maxProtocolVersion clientmaxversion
let selectnode = giveup "FIXME" -- FIXME
proxy p2pDone proxyMethods servermode clientside selectnode
protocolversion othermsg p2pErrHandler
withclientversion _ Nothing = p2pDone
proxyMethods :: ProxyMethods
proxyMethods = ProxyMethods
{ removedContent = \u k -> logChange k u InfoMissing
, addedContent = \u k -> logChange k u InfoPresent
proxyClientSide :: UUID -> Annex ClientSide
proxyClientSide clientuuid = do
clientrunst <- liftIO (mkRunState $ Serving clientuuid Nothing)
@ -32,6 +32,24 @@ mkRemoteSide remoteuuid remoteconnect = RemoteSide
<*> pure remoteconnect
<*> liftIO (atomically newEmptyTMVar)
runRemoteSide :: RemoteSide -> Proto a -> Annex (Either ProtoFailure a)
runRemoteSide remoteside a =
liftIO (atomically $ tryReadTMVar $ remoteTMVar remoteside) >>= \case
Just (runst, conn, _closer) -> liftIO $ runNetProto runst conn a
Nothing -> remoteConnect remoteside >>= \case
Just (runst, conn, closer) -> do
liftIO $ atomically $ putTMVar
(remoteTMVar remoteside)
(runst, conn, closer)
liftIO $ runNetProto runst conn a
Nothing -> giveup "Unable to connect to remote."
closeRemoteSide :: RemoteSide -> Annex ()
closeRemoteSide remoteside =
liftIO (atomically $ tryReadTMVar $ remoteTMVar remoteside) >>= \case
Just (_, _, closer) -> closer
Nothing -> return ()
{- To keep this module limited to P2P protocol actions,
- all other actions that a proxy needs to do are provided
- here. -}
@ -95,42 +113,20 @@ proxy
-> ProxyMethods
-> ServerMode
-> ClientSide
-> RemoteSide
-> (Message -> Annex RemoteSide)
-> ProtocolVersion
-> Maybe Message
-- ^ non-VERSION message that was received from the client when
-- negotiating protocol version, and has not been responded to yet
-> ProtoErrorHandled r
proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remoteside othermessage protoerrhandler = do
proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) getremoteside protocolversion othermessage protoerrhandler = do
case othermessage of
Nothing -> protoerrhandler proxynextclientmessage $
client $ net $ sendMessage $ VERSION protocolversion
Just message -> proxyclientmessage (Just message)
Nothing -> do
v <- protocolversion
protoerrhandler proxynextclientmessage $
client $ net $ sendMessage $ VERSION v
client = liftIO . runNetProto clientrunst clientconn
remote a = liftIO (atomically $ tryReadTMVar $ remoteTMVar remoteside) >>= \case
Just (runst, conn, _closer) -> liftIO $ runNetProto runst conn a
Nothing -> remoteConnect remoteside >>= \case
Just (runst, conn, closer) -> do
liftIO $ atomically $ putTMVar
(remoteTMVar remoteside)
(runst, conn, closer)
liftIO $ runNetProto runst conn a
Nothing -> giveup "Unable to connect to remote."
closeremote = liftIO (atomically $ tryReadTMVar $ remoteTMVar remoteside) >>= \case
Just (_, _, closer) -> closer
Nothing -> return ()
proxydone' = do
protocolversion = either (const defaultProtocolVersion) id
<$> remote (net getProtocolVersion)
proxynextclientmessage () = protoerrhandler proxyclientmessage $
client (net receiveMessage)
@ -140,21 +136,28 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
protoerrhandler proxynextclientmessage $
client notallowed
proxyclientmessage Nothing = proxydone'
proxyclientmessage Nothing = proxydone
proxyclientmessage (Just message) = case message of
proxyresponse message (const proxynextclientmessage)
proxyresponse message (const proxynextclientmessage)
proxynoresponse message proxynextclientmessage
remoteside <- getremoteside message
proxyresponse remoteside message (const proxynextclientmessage)
remoteside <- getremoteside message
proxyresponse remoteside message (const proxynextclientmessage)
remoteside <- getremoteside message
proxynoresponse remoteside message proxynextclientmessage
REMOVE k -> do
remoteside <- getremoteside message
servermodechecker checkREMOVEServerMode $
handleREMOVE k message
GET _ _ _ -> handleGET message
PUT _ k ->
handleREMOVE remoteside k message
GET _ _ _ -> do
remoteside <- getremoteside message
handleGET remoteside message
PUT _ k -> do
remoteside <- getremoteside message
servermodechecker checkPUTServerMode $
handlePUT k message
handlePUT remoteside k message
-- These messages involve the git repository, not the
-- annex. So they affect the git repository of the proxy,
-- not the remote.
@ -162,7 +165,7 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
servermodechecker (checkCONNECTServerMode service) $
-- P2P protocol does not continue after
-- relaying from git.
protoerrhandler (\() -> proxydone') $
protoerrhandler (\() -> proxydone) $
client $ net $ relayService service
NOTIFYCHANGE -> protoerr
-- Messages that the client should only send after one of
@ -186,14 +189,15 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
-- Send a message to the remote, send its response back to the
-- client, and pass it to the continuation.
proxyresponse message a = getresponse remote message $ \resp ->
protoerrhandler (a resp) $
client $ net $ sendMessage resp
proxyresponse remoteside message a =
getresponse (runRemoteSide remoteside) message $ \resp ->
protoerrhandler (a resp) $
client $ net $ sendMessage resp
-- Send a message to the remote, that it will not respond to.
proxynoresponse message a =
proxynoresponse remoteside message a =
protoerrhandler a $
remote $ net $ sendMessage message
runRemoteSide remoteside $ net $ sendMessage message
-- Send a message to the endpoint and get back its response.
getresponse endpoint message handleresp =
@ -205,7 +209,7 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
withresp a (Just resp) = a resp
-- Whichever of the remote or client the message was read from
-- hung up.
withresp _ Nothing = proxydone'
withresp _ Nothing = proxydone
-- Read a message from one party, send it to the other,
-- and then pass the message to the continuation.
@ -219,36 +223,38 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
_ <- client $ net $ sendMessage (ERROR "protocol error")
giveup "protocol error"
handleREMOVE k message =
proxyresponse message $ \resp () -> do
handleREMOVE remoteside k message =
proxyresponse remoteside message $ \resp () -> do
case resp of
SUCCESS -> removedContent proxymethods
(remoteUUID remoteside) k
_ -> return ()
proxynextclientmessage ()
handleGET message = getresponse remote message $ withDATA relayGET
handleGET remoteside message = getresponse (runRemoteSide remoteside) message $
withDATA (relayGET remoteside)
handlePUT k message = getresponse remote message $ \resp -> case resp of
ALREADY_HAVE -> protoerrhandler proxynextclientmessage $
client $ net $ sendMessage resp
getresponse client resp $ withDATA (relayPUT k)
_ -> protoerr
handlePUT remoteside k message =
getresponse (runRemoteSide remoteside) message $ \resp -> case resp of
ALREADY_HAVE -> protoerrhandler proxynextclientmessage $
client $ net $ sendMessage resp
getresponse client resp $ withDATA (relayPUT remoteside k)
_ -> protoerr
withDATA a message@(DATA len) = a len message
withDATA _ _ = protoerr
relayGET len = relayDATAStart client $
relayDATACore len remote client $
relayDATAFinish remote client $
relayonemessage client remote $
relayGET remoteside len = relayDATAStart client $
relayDATACore len (runRemoteSide remoteside) client $
relayDATAFinish (runRemoteSide remoteside) client $
relayonemessage client (runRemoteSide remoteside) $
const proxynextclientmessage
relayPUT k len = relayDATAStart remote $
relayDATACore len client remote $
relayDATAFinish client remote $
relayonemessage remote client finished
relayPUT remoteside k len = relayDATAStart (runRemoteSide remoteside) $
relayDATACore len client (runRemoteSide remoteside) $
relayDATAFinish client (runRemoteSide remoteside) $
relayonemessage (runRemoteSide remoteside) client finished
finished resp () = do
case resp of
@ -266,7 +272,7 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
send b = protoerrhandler finishget $
y $ net $ sendBytes len b nullMeterUpdate
relayDATAFinish x y sendsuccessfailure () = protocolversion >>= \case
relayDATAFinish x y sendsuccessfailure () = case protocolversion of
ProtocolVersion 0 -> sendsuccessfailure
-- Protocol version 1 has a VALID or
-- INVALID message after the data.
