distributed cluster cycle prevention

Added BYPASS to P2P protocol, and use it to avoid cycling between
cluster gateways.

Distributed clusters are working well now!
This commit is contained in:
Joey Hess 2024-06-27 12:20:22 -04:00
parent effaf51b1f
commit 3dad9446ce
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
8 changed files with 156 additions and 55 deletions

View file

@ -5,7 +5,7 @@
- Licensed under the GNU AGPL version 3 or higher. - Licensed under the GNU AGPL version 3 or higher.
-} -}
{-# LANGUAGE RankNTypes #-} {-# LANGUAGE RankNTypes, OverloadedStrings #-}
module Annex.Cluster where module Annex.Cluster where
@ -17,6 +17,7 @@ import P2P.Proxy
import P2P.Protocol import P2P.Protocol
import P2P.IO import P2P.IO
import Annex.Proxy import Annex.Proxy
import Annex.UUID
import Logs.Location import Logs.Location
import Logs.PreferredContent import Logs.PreferredContent
import Types.Command import Types.Command
@ -50,24 +51,40 @@ proxyCluster clusteruuid proxydone servermode clientside protoerrhandler = do
-- determine. Instead, pick the newest protocol version -- determine. Instead, pick the newest protocol version
-- that we and the client both speak. The proxy code -- that we and the client both speak. The proxy code
-- checks protocol versions when operating on multiple -- checks protocol versions when operating on multiple
-- nodes. -- nodes, and allows nodes to have different protocol
-- versions.
let protocolversion = min maxProtocolVersion clientmaxversion let protocolversion = min maxProtocolVersion clientmaxversion
selectnode <- clusterProxySelector clusteruuid protocolversion sendClientProtocolVersion clientside othermsg protocolversion
(getclientbypass protocolversion) protoerrhandler
withclientversion Nothing = proxydone
getclientbypass protocolversion othermsg =
getClientBypass clientside protocolversion othermsg
(withclientbypass protocolversion) protoerrhandler
withclientbypass protocolversion (bypassuuids, othermsg) = do
selectnode <- clusterProxySelector clusteruuid protocolversion bypassuuids
concurrencyconfig <- getConcurrencyConfig concurrencyconfig <- getConcurrencyConfig
proxy proxydone proxymethods servermode clientside proxy proxydone proxymethods servermode clientside
(fromClusterUUID clusteruuid) (fromClusterUUID clusteruuid)
selectnode concurrencyconfig protocolversion selectnode concurrencyconfig protocolversion
othermsg protoerrhandler othermsg protoerrhandler
withclientversion Nothing = proxydone
clusterProxySelector :: ClusterUUID -> ProtocolVersion -> Annex ProxySelector clusterProxySelector :: ClusterUUID -> ProtocolVersion -> Bypass -> Annex ProxySelector
clusterProxySelector clusteruuid protocolversion = do clusterProxySelector clusteruuid protocolversion (Bypass bypass) = do
nodeuuids <- (fromMaybe S.empty . M.lookup clusteruuid . clusterUUIDs) nodeuuids <- (fromMaybe S.empty . M.lookup clusteruuid . clusterUUIDs)
<$> getClusters <$> getClusters
clusternames <- annexClusters <$> Annex.getGitConfig myclusters <- annexClusters <$> Annex.getGitConfig
allremotes <- remoteList allremotes <- remoteList
let clusterremotes = filter (isnode allremotes nodeuuids clusternames) allremotes hereu <- getUUID
nodes <- mapM (proxySshRemoteSide protocolversion) clusterremotes let bypass' = S.insert hereu bypass
let clusterremotes = filter (isnode bypass' allremotes nodeuuids myclusters) allremotes
fastDebug "Annex.Cluster" $ unwords
[ "cluster gateway at", fromUUID hereu
, "connecting to", show (map Remote.name clusterremotes)
, "bypass", show (S.toList bypass)
]
nodes <- mapM (proxySshRemoteSide protocolversion (Bypass bypass')) clusterremotes
return $ ProxySelector return $ ProxySelector
{ proxyCHECKPRESENT = nodecontaining nodes { proxyCHECKPRESENT = nodecontaining nodes
, proxyGET = nodecontaining nodes , proxyGET = nodecontaining nodes
@ -95,27 +112,37 @@ clusterProxySelector clusteruuid protocolversion = do
} }
where where
-- Nodes of the cluster have remote.name.annex-cluster-node -- Nodes of the cluster have remote.name.annex-cluster-node
-- containing its name. Or they are proxied by a remote -- containing its name.
-- that has remote.name.annex-cluster-gateway --
-- containing the cluster's UUID. -- Or, a node can be the cluster proxied by another gateway.
isnode rs nodeuuids clusternames r = isnode bypass' rs nodeuuids myclusters r =
case remoteAnnexClusterNode (Remote.gitconfig r) of case remoteAnnexClusterNode (Remote.gitconfig r) of
Just names Just names
| any (isclustername clusternames) names -> | any (isclustername myclusters) names ->
flip S.member nodeuuids $ flip S.member nodeuuids $
ClusterNodeUUID $ Remote.uuid r ClusterNodeUUID $ Remote.uuid r
| otherwise -> False | otherwise -> False
Nothing -> case remoteAnnexProxiedBy (Remote.gitconfig r) of Nothing -> isclusterviagateway bypass' rs r
Just proxyuuid
| Remote.uuid r /= fromClusterUUID clusteruuid -> -- Is this remote the same cluster, proxied via another gateway?
--
-- Must avoid bypassed gateways to prevent cycles.
isclusterviagateway bypass' rs r =
case mkClusterUUID (Remote.uuid r) of
Just cu | cu == clusteruuid ->
case remoteAnnexProxiedBy (Remote.gitconfig r) of
Just proxyuuid | proxyuuid `S.notMember` bypass' ->
not $ null $ not $ null $
filter (== clusteruuid) $ filter isclustergateway $
concatMap (remoteAnnexClusterGateway . Remote.gitconfig) $
filter (\p -> Remote.uuid p == proxyuuid) rs filter (\p -> Remote.uuid p == proxyuuid) rs
_ -> False _ -> False
_ -> False
isclustername clusternames name = isclustergateway r = any (== clusteruuid) $
M.lookup name clusternames == Just clusteruuid remoteAnnexClusterGateway $ Remote.gitconfig r
isclustername myclusters name =
M.lookup name myclusters == Just clusteruuid
nodecontaining nodes k = do nodecontaining nodes k = do
locs <- S.fromList <$> loggedLocations k locs <- S.fromList <$> loggedLocations k

View file

@ -15,9 +15,10 @@ import qualified Remote
import Remote.Helper.Ssh (openP2PShellConnection', closeP2PShellConnection) import Remote.Helper.Ssh (openP2PShellConnection', closeP2PShellConnection)
-- FIXME: Support special remotes. -- FIXME: Support special remotes.
proxySshRemoteSide :: ProtocolVersion -> Remote -> Annex RemoteSide proxySshRemoteSide :: ProtocolVersion -> Bypass -> Remote -> Annex RemoteSide
proxySshRemoteSide clientmaxversion remote = mkRemoteSide (Remote.uuid remote) $ proxySshRemoteSide clientmaxversion bypass remote =
openP2PShellConnection' remote clientmaxversion >>= \case mkRemoteSide (Remote.uuid remote) $
openP2PShellConnection' remote clientmaxversion bypass >>= \case
Just conn@(OpenConnection (remoterunst, remoteconn, _)) -> Just conn@(OpenConnection (remoterunst, remoteconn, _)) ->
return $ Just return $ Just
( remoterunst ( remoterunst

View file

@ -67,7 +67,7 @@ performProxy clientuuid servermode remote = do
p2pErrHandler p2pErrHandler
where where
withclientversion clientside (Just (clientmaxversion, othermsg)) = do withclientversion clientside (Just (clientmaxversion, othermsg)) = do
remoteside <- proxySshRemoteSide clientmaxversion remote remoteside <- proxySshRemoteSide clientmaxversion mempty remote
protocolversion <- either (const (min P2P.maxProtocolVersion clientmaxversion)) id protocolversion <- either (const (min P2P.maxProtocolVersion clientmaxversion)) id
<$> runRemoteSide remoteside <$> runRemoteSide remoteside
(P2P.net P2P.getProtocolVersion) (P2P.net P2P.getProtocolVersion)
@ -75,11 +75,14 @@ performProxy clientuuid servermode remote = do
closeRemoteSide remoteside closeRemoteSide remoteside
p2pDone p2pDone
concurrencyconfig <- noConcurrencyConfig concurrencyconfig <- noConcurrencyConfig
proxy closer proxymethods servermode clientside let runproxy othermsg' = proxy closer proxymethods
servermode clientside
(Remote.uuid remote) (Remote.uuid remote)
(singleProxySelector remoteside) (singleProxySelector remoteside)
concurrencyconfig concurrencyconfig
protocolversion othermsg p2pErrHandler protocolversion othermsg' p2pErrHandler
sendClientProtocolVersion clientside othermsg protocolversion
runproxy p2pErrHandler
withclientversion _ Nothing = p2pDone withclientversion _ Nothing = p2pDone
proxymethods = ProxyMethods proxymethods = ProxyMethods

View file

@ -9,6 +9,7 @@
{-# LANGUAGE DeriveFunctor, TemplateHaskell, FlexibleContexts #-} {-# LANGUAGE DeriveFunctor, TemplateHaskell, FlexibleContexts #-}
{-# LANGUAGE TypeSynonymInstances, FlexibleInstances, RankNTypes #-} {-# LANGUAGE TypeSynonymInstances, FlexibleInstances, RankNTypes #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# OPTIONS_GHC -fno-warn-orphans #-} {-# OPTIONS_GHC -fno-warn-orphans #-}
module P2P.Protocol where module P2P.Protocol where
@ -37,6 +38,7 @@ import System.IO
import qualified System.FilePath.ByteString as P import qualified System.FilePath.ByteString as P
import qualified Data.ByteString as B import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as L import qualified Data.ByteString.Lazy as L
import qualified Data.Set as S
import Data.Char import Data.Char
import Control.Applicative import Control.Applicative
import Prelude import Prelude
@ -66,6 +68,9 @@ data Service = UploadPack | ReceivePack
data Validity = Valid | Invalid data Validity = Valid | Invalid
deriving (Show) deriving (Show)
newtype Bypass = Bypass (S.Set UUID)
deriving (Show, Monoid, Semigroup)
-- | Messages in the protocol. The peer that makes the connection -- | Messages in the protocol. The peer that makes the connection
-- always initiates requests, and the other peer makes responses to them. -- always initiates requests, and the other peer makes responses to them.
data Message data Message
@ -90,6 +95,7 @@ data Message
| SUCCESS_PLUS [UUID] | SUCCESS_PLUS [UUID]
| FAILURE | FAILURE
| FAILURE_PLUS [UUID] | FAILURE_PLUS [UUID]
| BYPASS Bypass
| DATA Len -- followed by bytes of data | DATA Len -- followed by bytes of data
| VALIDITY Validity | VALIDITY Validity
| ERROR String | ERROR String
@ -117,6 +123,7 @@ instance Proto.Sendable Message where
formatMessage (SUCCESS_PLUS uuids) = ("SUCCESS-PLUS":map Proto.serialize uuids) formatMessage (SUCCESS_PLUS uuids) = ("SUCCESS-PLUS":map Proto.serialize uuids)
formatMessage FAILURE = ["FAILURE"] formatMessage FAILURE = ["FAILURE"]
formatMessage (FAILURE_PLUS uuids) = ("FAILURE-PLUS":map Proto.serialize uuids) formatMessage (FAILURE_PLUS uuids) = ("FAILURE-PLUS":map Proto.serialize uuids)
formatMessage (BYPASS (Bypass uuids)) = ("BYPASS":map Proto.serialize (S.toList uuids))
formatMessage (VALIDITY Valid) = ["VALID"] formatMessage (VALIDITY Valid) = ["VALID"]
formatMessage (VALIDITY Invalid) = ["INVALID"] formatMessage (VALIDITY Invalid) = ["INVALID"]
formatMessage (DATA len) = ["DATA", Proto.serialize len] formatMessage (DATA len) = ["DATA", Proto.serialize len]
@ -144,6 +151,7 @@ instance Proto.Receivable Message where
parseCommand "SUCCESS-PLUS" = Proto.parseList SUCCESS_PLUS parseCommand "SUCCESS-PLUS" = Proto.parseList SUCCESS_PLUS
parseCommand "FAILURE" = Proto.parse0 FAILURE parseCommand "FAILURE" = Proto.parse0 FAILURE
parseCommand "FAILURE-PLUS" = Proto.parseList FAILURE_PLUS parseCommand "FAILURE-PLUS" = Proto.parseList FAILURE_PLUS
parseCommand "BYPASS" = Proto.parseList (BYPASS . Bypass . S.fromList)
parseCommand "DATA" = Proto.parse1 DATA parseCommand "DATA" = Proto.parse1 DATA
parseCommand "ERROR" = Proto.parse1 ERROR parseCommand "ERROR" = Proto.parse1 ERROR
parseCommand "VALID" = Proto.parse0 (VALIDITY Valid) parseCommand "VALID" = Proto.parse0 (VALIDITY Valid)
@ -336,6 +344,15 @@ negotiateProtocolVersion preferredversion = do
Just (ERROR _) -> return () Just (ERROR _) -> return ()
_ -> net $ sendMessage (ERROR "expected VERSION") _ -> net $ sendMessage (ERROR "expected VERSION")
sendBypass :: Bypass -> Proto ()
sendBypass bypass@(Bypass s)
| S.null s = return ()
| otherwise = do
ver <- net getProtocolVersion
if ver >= ProtocolVersion 2
then net $ sendMessage (BYPASS bypass)
else return ()
checkPresent :: Key -> Proto Bool checkPresent :: Key -> Proto Bool
checkPresent key = do checkPresent key = do
net $ sendMessage (CHECKPRESENT key) net $ sendMessage (CHECKPRESENT key)
@ -505,6 +522,7 @@ serveAuthed servermode myuuid = void $ serverLoop handler
refs <- local waitRefChange refs <- local waitRefChange
net $ sendMessage (CHANGED refs) net $ sendMessage (CHANGED refs)
return ServerContinue return ServerContinue
handler (BYPASS _) = return ServerContinue
handler _ = return ServerUnexpected handler _ = return ServerUnexpected
handleput af key = do handleput af key = do

View file

@ -24,6 +24,7 @@ import Control.Concurrent.STM
import Control.Concurrent.Async import Control.Concurrent.Async
import qualified Control.Concurrent.MSem as MSem import qualified Control.Concurrent.MSem as MSem
import qualified Data.ByteString.Lazy as L import qualified Data.ByteString.Lazy as L
import qualified Data.Set as S
import GHC.Conc import GHC.Conc
type ProtoCloser = Annex () type ProtoCloser = Annex ()
@ -104,7 +105,7 @@ type ProtoErrorHandled r =
{- This is the first thing run when proxying with a client. {- This is the first thing run when proxying with a client.
- The client has already authenticated. Most clients will send a - The client has already authenticated. Most clients will send a
- VERSION message, although version 0 clients will not and will send - VERSION message, although version 0 clients will not and will send
- some other message. - some other message, which is returned to handle later.
- -
- But before the client will send VERSION, it needs to see AUTH_SUCCESS. - But before the client will send VERSION, it needs to see AUTH_SUCCESS.
- So send that, although the connection with the remote is not actually - So send that, although the connection with the remote is not actually
@ -137,8 +138,47 @@ getClientProtocolVersion' remoteuuid = do
Just othermsg -> return Just othermsg -> return
(Just (defaultProtocolVersion, Just othermsg)) (Just (defaultProtocolVersion, Just othermsg))
{- Send negotiated protocol version to the client.
- With a version 0 client, preserves the other protocol message
- received in getClientProtocolVersion. -}
sendClientProtocolVersion
:: ClientSide
-> Maybe Message
-> ProtocolVersion
-> (Maybe Message -> Annex r)
-> ProtoErrorHandled r
sendClientProtocolVersion (ClientSide clientrunst clientconn) othermsg protocolversion cont protoerrhandler =
case othermsg of
Nothing -> protoerrhandler (\() -> cont Nothing) $
client $ net $ sendMessage $ VERSION protocolversion
Just _ -> cont othermsg
where
client = liftIO . runNetProto clientrunst clientconn
{- When speaking to a version 2 client, get the BYPASS message which may be
- sent immediately after VERSION. Returns any other message to be handled
- later. -}
getClientBypass
:: ClientSide
-> ProtocolVersion
-> Maybe Message
-> ((Bypass, Maybe Message) -> Annex r)
-> ProtoErrorHandled r
getClientBypass (ClientSide clientrunst clientconn) (ProtocolVersion protocolversion) Nothing cont protoerrhandler
| protocolversion < 2 = cont (Bypass S.empty, Nothing)
| otherwise = protoerrhandler cont $
client $ net receiveMessage >>= return . \case
Just (BYPASS bypass) -> (bypass, Nothing)
Just othermsg -> (Bypass S.empty, Just othermsg)
Nothing -> (Bypass S.empty, Nothing)
where
client = liftIO . runNetProto clientrunst clientconn
getClientBypass _ _ (Just othermsg) cont _ =
-- Pass along non-BYPASS message from version 0 client.
cont (Bypass S.empty, (Just othermsg))
{- Proxy between the client and the remote. This picks up after {- Proxy between the client and the remote. This picks up after
- getClientProtocolVersion. - sendClientProtocolVersion.
-} -}
proxy proxy
:: Annex r :: Annex r
@ -156,10 +196,9 @@ proxy
-- ^ non-VERSION message that was received from the client when -- ^ non-VERSION message that was received from the client when
-- negotiating protocol version, and has not been responded to yet -- negotiating protocol version, and has not been responded to yet
-> ProtoErrorHandled r -> ProtoErrorHandled r
proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remoteuuid proxyselector concurrencyconfig (ProtocolVersion protocolversion) othermessage protoerrhandler = do proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remoteuuid proxyselector concurrencyconfig (ProtocolVersion protocolversion) othermsg protoerrhandler = do
case othermessage of case othermsg of
Nothing -> protoerrhandler proxynextclientmessage $ Nothing -> proxynextclientmessage ()
client $ net $ sendMessage $ VERSION $ ProtocolVersion protocolversion
Just message -> proxyclientmessage (Just message) Just message -> proxyclientmessage (Just message)
where where
client = liftIO . runNetProto clientrunst clientconn client = liftIO . runNetProto clientrunst clientconn
@ -209,6 +248,7 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
remotesides <- proxyPUT proxyselector af k remotesides <- proxyPUT proxyselector af k
servermodechecker checkPUTServerMode $ servermodechecker checkPUTServerMode $
handlePUT remotesides k message handlePUT remotesides k message
BYPASS _ -> proxynextclientmessage ()
-- These messages involve the git repository, not the -- These messages involve the git repository, not the
-- annex. So they affect the git repository of the proxy, -- annex. So they affect the git repository of the proxy,
-- not the remote. -- not the remote.

View file

@ -233,7 +233,7 @@ storeP2PShellConnection connpool conn = atomically $ modifyTVar' connpool $ \cas
-- the connection pool. -- the connection pool.
openP2PShellConnection :: Remote -> P2PShellConnectionPool -> Annex (Maybe P2PShellConnection) openP2PShellConnection :: Remote -> P2PShellConnectionPool -> Annex (Maybe P2PShellConnection)
openP2PShellConnection r connpool = openP2PShellConnection r connpool =
openP2PShellConnection' r P2P.maxProtocolVersion >>= \case openP2PShellConnection' r P2P.maxProtocolVersion mempty >>= \case
Just conn -> return (Just conn) Just conn -> return (Just conn)
Nothing -> do Nothing -> do
liftIO $ rememberunsupported liftIO $ rememberunsupported
@ -243,8 +243,8 @@ openP2PShellConnection r connpool =
modifyTVar' connpool $ modifyTVar' connpool $
maybe (Just P2PShellUnsupported) Just maybe (Just P2PShellUnsupported) Just
openP2PShellConnection' :: Remote -> P2P.ProtocolVersion -> Annex (Maybe P2PShellConnection) openP2PShellConnection' :: Remote -> P2P.ProtocolVersion -> P2P.Bypass -> Annex (Maybe P2PShellConnection)
openP2PShellConnection' r maxprotoversion = do openP2PShellConnection' r maxprotoversion bypass = do
u <- getUUID u <- getUUID
let ps = [Param (fromUUID u)] let ps = [Param (fromUUID u)]
repo <- getRepo r repo <- getRepo r
@ -271,8 +271,9 @@ openP2PShellConnection' r maxprotoversion = do
let c = P2P.OpenConnection (runst, conn, pid) let c = P2P.OpenConnection (runst, conn, pid)
-- When the connection is successful, the remote -- When the connection is successful, the remote
-- will send an AUTH_SUCCESS with its uuid. -- will send an AUTH_SUCCESS with its uuid.
let proto = P2P.postAuth $ let proto = P2P.postAuth $ do
P2P.negotiateProtocolVersion maxprotoversion P2P.negotiateProtocolVersion maxprotoversion
P2P.sendBypass bypass
tryNonAsync (P2P.runNetProto runst conn proto) >>= \case tryNonAsync (P2P.runNetProto runst conn proto) >>= \case
Right (Right (Just theiruuid)) | theiruuid == uuid r -> Right (Right (Just theiruuid)) | theiruuid == uuid r ->
return $ Just c return $ Just c

View file

@ -55,7 +55,7 @@ any authentication.
The client sends the highest protocol version it supports: The client sends the highest protocol version it supports:
VERSION 3 VERSION 2
The server responds with the highest protocol version it supports The server responds with the highest protocol version it supports
that is less than or equal to the version the client sent: that is less than or equal to the version the client sent:
@ -64,6 +64,19 @@ that is less than or equal to the version the client sent:
Now both client and server should use version 1. Now both client and server should use version 1.
## Cluster cycle prevention
In protocol version 2, immediately after VERSION, the
client can send an additional message that is used to
prevent cycles when accessing clusters.
BYPASS UUID1 UUID2 ...
The UUIDs are cluster gateways to avoid connecting to when
serving a cluster.
The server makes no response to this message.
## Binary data ## Binary data
The protocol allows raw binary data to be sent. This is done The protocol allows raw binary data to be sent. This is done

View file

@ -29,13 +29,6 @@ For June's work on [[design/passthrough_proxy]], remaining todos:
* Since proxying to special remotes is not supported yet, and won't be for * Since proxying to special remotes is not supported yet, and won't be for
the first release, make it fail in a reasonable way. the first release, make it fail in a reasonable way.
* Support distributed clusters: Make a proxy for a cluster repeat
protocol messages on to any remotes that have the same UUID as
the cluster. Needs VIA extension to P2P protocol to avoid cycles.
Status: works, but needs VIA extension to avoid ugly messages and extra
work
* Getting a key from a cluster currently always selects the lowest cost * Getting a key from a cluster currently always selects the lowest cost
remote, and always the same remote if cost is the same. Should remote, and always the same remote if cost is the same. Should
round-robin amoung remotes, and prefer to avoid using remotes that round-robin amoung remotes, and prefer to avoid using remotes that
@ -111,3 +104,8 @@ For June's work on [[design/passthrough_proxy]], remaining todos:
* Support proxying for a remote that is proxied by another gateway of * Support proxying for a remote that is proxied by another gateway of
a cluster. (done) a cluster. (done)
* Support distributed clusters: Make a proxy for a cluster repeat
protocol messages on to any remotes that have the same UUID as
the cluster. Needs extension to P2P protocol to avoid cycles.
(done)