cluster support in http API server

Wired it up and it seems to basically work, although the test suite is
not fully passing.

Note that --jobs currently gets multiplied by the number of nodes in the
cluster, which is probably not good.
This commit is contained in:
Joey Hess 2024-07-28 10:16:35 -04:00
parent 8ec174408e
commit 1259ad89b6
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
3 changed files with 71 additions and 41 deletions

View file

@ -58,9 +58,9 @@ proxyCluster clusteruuid proxydone servermode clientside protoerrhandler = do
(withclientbypass protocolversion) (protoerrhandler noop)
withclientbypass protocolversion (bypassuuids, othermsg) = do
(selectnode, closenodes) <- clusterProxySelector clusteruuid
protocolversion bypassuuids
concurrencyconfig <- getConcurrencyConfig
(selectnode, closenodes, concurrencyconfig) <-
clusterProxySelector clusteruuid
protocolversion bypassuuids
proxystate <- liftIO mkProxyState
let proxyparams = ProxyParams
{ proxyMethods = mkProxyMethods
@ -75,7 +75,11 @@ proxyCluster clusteruuid proxydone servermode clientside protoerrhandler = do
proxy proxydone proxyparams othermsg
(protoerrhandler closenodes)
clusterProxySelector :: ClusterUUID -> ProtocolVersion -> Bypass -> Annex (ProxySelector, Annex ())
clusterProxySelector
:: ClusterUUID
-> ProtocolVersion
-> Bypass
-> Annex (ProxySelector, Annex (), ConcurrencyConfig)
clusterProxySelector clusteruuid protocolversion (Bypass bypass) = do
nodeuuids <- (fromMaybe S.empty . M.lookup clusteruuid . clusterUUIDs)
<$> getClusters
@ -116,7 +120,8 @@ clusterProxySelector clusteruuid protocolversion (Bypass bypass) = do
-- proxied to the client.
, proxyLOCKCONTENT = const (pure Nothing)
}
return (proxyselector, closenodes)
concurrencyconfig <- getConcurrencyConfig
return (proxyselector, closenodes, concurrencyconfig)
where
-- Nodes of the cluster have remote.name.annex-cluster-node
-- containing its name.

View file

@ -30,6 +30,7 @@ import CmdLine.Action (startConcurrency)
import Utility.ThreadScheduler
import Utility.HumanTime
import Annex.Proxy
import Annex.Cluster
import qualified P2P.Proxy as Proxy
import qualified Types.Remote as Remote
@ -233,21 +234,21 @@ withP2PConnections workerpool proxyconnectionpoolsize a = do
Right (Left reason) -> return $ Left $
ConnectionFailed $
fromMaybe "unknown uuid" reason
Right (Right (Right proxyremote)) ->
Right (Right (Right proxyremote)) -> proxyconnection $
openProxyConnectionToRemote workerpool
(connectionProtocolVersion connparams)
(connectionBypass connparams)
proxyremote
>>= \case
Right conn -> proxyConnection proxyconnectionpoolsize relv connparams workerpool proxypool conn
Left ex -> return $ Left $
ConnectionFailed $ show ex
Right (Right (Left clusteruuid)) ->
undefined -- XXX todo
{-
openProxyConnectionToCluster clusteruuid
>>= proxyConnection clusteruuid relv connparams workerpool
-}
bypass proxyremote
Right (Right (Left clusteruuid)) -> proxyconnection $
openProxyConnectionToCluster workerpool
(connectionProtocolVersion connparams)
bypass clusteruuid
Left ex -> return $ Left $
ConnectionFailed $ show ex
where
bypass = P2P.Bypass $ S.fromList $ connectionBypass connparams
proxyconnection openconn = openconn >>= \case
Right conn -> proxyConnection proxyconnectionpoolsize
relv connparams workerpool proxypool conn
Left ex -> return $ Left $
ConnectionFailed $ show ex
@ -353,16 +354,14 @@ proxyConnection proxyconnectionpoolsize relv connparams workerpool proxypool pro
asyncworker <- async $
inAnnexWorker' workerpool $ do
proxystate <- liftIO Proxy.mkProxyState
concurrencyconfig <- Proxy.noConcurrencyConfig
let proxyparams = Proxy.ProxyParams
{ Proxy.proxyMethods = mkProxyMethods
, Proxy.proxyState = proxystate
, Proxy.proxyServerMode = connectionServerMode connparams
, Proxy.proxyClientSide = Proxy.ClientSide proxyfromclientrunst proxyfromclientconn
, Proxy.proxyUUID = proxyConnectionRemoteUUID proxyconn
, Proxy.proxySelector = Proxy.singleProxySelector $
proxyConnectionRemoteSide proxyconn
, Proxy.proxyConcurrencyConfig = concurrencyconfig
, Proxy.proxySelector = proxyConnectionSelector proxyconn
, Proxy.proxyConcurrencyConfig = proxyConnectionConcurrency proxyconn
, Proxy.proxyClientProtocolVersion = connectionProtocolVersion connparams
}
let proxy mrequestmessage = case mrequestmessage of
@ -397,9 +396,7 @@ proxyConnection proxyconnectionpoolsize relv connparams workerpool proxypool pro
}
where
protoerrhandler cont a = a >>= \case
Left _ ->
Proxy.closeRemoteSide $
proxyConnectionRemoteSide proxyconn
Left _ -> proxyConnectionCloser proxyconn
Right v -> cont v
proxydone = return ()
@ -407,8 +404,7 @@ proxyConnection proxyconnectionpoolsize relv connparams workerpool proxypool pro
requestcomplete () = return ()
closeproxyconnection =
void . inAnnexWorker' workerpool .
Proxy.closeRemoteSide . proxyConnectionRemoteSide
void . inAnnexWorker' workerpool . proxyConnectionCloser
data Locker = Locker
{ lockerThread :: Async ()
@ -523,29 +519,56 @@ inAnnexWorker' poolv annexaction = do
data ProxyConnection = ProxyConnection
{ proxyConnectionRemoteUUID :: UUID
, proxyConnectionRemoteSide :: Proxy.RemoteSide
, proxyConnectionSelector :: Proxy.ProxySelector
, proxyConnectionCloser :: Annex ()
, proxyConnectionConcurrency :: Proxy.ConcurrencyConfig
, proxyConnectionLastUsed :: POSIXTime
}
deriving (Show)
instance Show ProxyConnection where
show pc = unwords
[ "ProxyConnection"
, show (proxyConnectionRemoteUUID pc)
, show (proxyConnectionLastUsed pc)
]
openedProxyConnection
:: UUID
-> Proxy.ProxySelector
-> Annex ()
-> Proxy.ConcurrencyConfig
-> IO ProxyConnection
openedProxyConnection u selector closer concurrency = do
now <- getPOSIXTime
return $ ProxyConnection u selector closer concurrency now
openProxyConnectionToRemote
:: AnnexWorkerPool
-> P2P.ProtocolVersion
-> [UUID]
-> P2P.Bypass
-> Remote
-> IO (Either SomeException ProxyConnection)
openProxyConnectionToRemote workerpool clientmaxversion bypass remote =
inAnnexWorker' workerpool (proxyRemoteSide clientmaxversion bypass' remote) >>= \case
Left ex -> return (Left ex)
Right remoteside -> do
now <- getPOSIXTime
return $ Right $
ProxyConnection (Remote.uuid remote) remoteside now
where
bypass' = P2P.Bypass (S.fromList bypass)
inAnnexWorker' workerpool $ do
remoteside <- proxyRemoteSide clientmaxversion bypass remote
concurrencyconfig <- Proxy.noConcurrencyConfig
liftIO $ openedProxyConnection (Remote.uuid remote)
(Proxy.singleProxySelector remoteside)
(Proxy.closeRemoteSide remoteside)
concurrencyconfig
openProxyConnectionToCluster :: ClusterUUID -> IO ProxyConnection
openProxyConnectionToCluster cu = error "XXX" -- TODO
openProxyConnectionToCluster
:: AnnexWorkerPool
-> P2P.ProtocolVersion
-> P2P.Bypass
-> ClusterUUID
-> IO (Either SomeException ProxyConnection)
openProxyConnectionToCluster workerpool clientmaxversion bypass clusteruuid =
inAnnexWorker' workerpool $ do
(proxyselector, closenodes, concurrencyconfig) <-
clusterProxySelector clusteruuid clientmaxversion bypass
liftIO $ openedProxyConnection (fromClusterUUID clusteruuid)
proxyselector closenodes concurrencyconfig
type ProxyConnectionPool = (Integer, M.Map ProxyConnectionPoolKey [ProxyConnection])

View file

@ -28,7 +28,9 @@ Planned schedule of work:
## work notes
* Make http server support clusters.
* http proxying for a local git remote seems to probably not work
* git-annex testremote cluster
* Support proxying to git remotes using annex+http urls.
(Current documentation says proxying only works with ssh remotes,