add --clusterjobs option and default to 1
The default of 1 is not ideal at all, but it avoids an accidental M*N causing so much concurrency it becomes unusable.
This commit is contained in:
parent
1259ad89b6
commit
fbbedae497
8 changed files with 60 additions and 30 deletions
|
@ -58,10 +58,11 @@ proxyCluster clusteruuid proxydone servermode clientside protoerrhandler = do
|
||||||
(withclientbypass protocolversion) (protoerrhandler noop)
|
(withclientbypass protocolversion) (protoerrhandler noop)
|
||||||
|
|
||||||
withclientbypass protocolversion (bypassuuids, othermsg) = do
|
withclientbypass protocolversion (bypassuuids, othermsg) = do
|
||||||
(selectnode, closenodes, concurrencyconfig) <-
|
(selectnode, closenodes) <-
|
||||||
clusterProxySelector clusteruuid
|
clusterProxySelector clusteruuid
|
||||||
protocolversion bypassuuids
|
protocolversion bypassuuids
|
||||||
proxystate <- liftIO mkProxyState
|
proxystate <- liftIO mkProxyState
|
||||||
|
concurrencyconfig <- concurrencyConfigJobs
|
||||||
let proxyparams = ProxyParams
|
let proxyparams = ProxyParams
|
||||||
{ proxyMethods = mkProxyMethods
|
{ proxyMethods = mkProxyMethods
|
||||||
, proxyState = proxystate
|
, proxyState = proxystate
|
||||||
|
@ -79,7 +80,7 @@ clusterProxySelector
|
||||||
:: ClusterUUID
|
:: ClusterUUID
|
||||||
-> ProtocolVersion
|
-> ProtocolVersion
|
||||||
-> Bypass
|
-> Bypass
|
||||||
-> Annex (ProxySelector, Annex (), ConcurrencyConfig)
|
-> Annex (ProxySelector, Annex ())
|
||||||
clusterProxySelector clusteruuid protocolversion (Bypass bypass) = do
|
clusterProxySelector clusteruuid protocolversion (Bypass bypass) = do
|
||||||
nodeuuids <- (fromMaybe S.empty . M.lookup clusteruuid . clusterUUIDs)
|
nodeuuids <- (fromMaybe S.empty . M.lookup clusteruuid . clusterUUIDs)
|
||||||
<$> getClusters
|
<$> getClusters
|
||||||
|
@ -120,8 +121,7 @@ clusterProxySelector clusteruuid protocolversion (Bypass bypass) = do
|
||||||
-- proxied to the client.
|
-- proxied to the client.
|
||||||
, proxyLOCKCONTENT = const (pure Nothing)
|
, proxyLOCKCONTENT = const (pure Nothing)
|
||||||
}
|
}
|
||||||
concurrencyconfig <- getConcurrencyConfig
|
return (proxyselector, closenodes)
|
||||||
return (proxyselector, closenodes, concurrencyconfig)
|
|
||||||
where
|
where
|
||||||
-- Nodes of the cluster have remote.name.annex-cluster-node
|
-- Nodes of the cluster have remote.name.annex-cluster-node
|
||||||
-- containing its name.
|
-- containing its name.
|
||||||
|
|
|
@ -41,6 +41,7 @@ data Options = Options
|
||||||
, unauthAppendOnlyOption :: Bool
|
, unauthAppendOnlyOption :: Bool
|
||||||
, wideOpenOption :: Bool
|
, wideOpenOption :: Bool
|
||||||
, proxyConnectionsOption :: Maybe Integer
|
, proxyConnectionsOption :: Maybe Integer
|
||||||
|
, clusterJobsOption :: Maybe Int
|
||||||
}
|
}
|
||||||
|
|
||||||
optParser :: CmdParamsDesc -> Parser Options
|
optParser :: CmdParamsDesc -> Parser Options
|
||||||
|
@ -89,10 +90,16 @@ optParser _ = Options
|
||||||
( long "proxyconnections" <> metavar paramNumber
|
( long "proxyconnections" <> metavar paramNumber
|
||||||
<> help "maximum number of idle connections when proxying"
|
<> help "maximum number of idle connections when proxying"
|
||||||
))
|
))
|
||||||
|
<*> optional (option auto
|
||||||
|
( long "clusterjobs" <> metavar paramNumber
|
||||||
|
<> help "number of concurrent node accesses per connection"
|
||||||
|
))
|
||||||
|
|
||||||
seek :: Options -> CommandSeek
|
seek :: Options -> CommandSeek
|
||||||
seek o = getAnnexWorkerPool $ \workerpool ->
|
seek o = getAnnexWorkerPool $ \workerpool ->
|
||||||
withP2PConnections workerpool (fromMaybe 1 $ proxyConnectionsOption o)
|
withP2PConnections workerpool
|
||||||
|
(fromMaybe 1 $ proxyConnectionsOption o)
|
||||||
|
(fromMaybe 1 $ clusterJobsOption o)
|
||||||
(go workerpool)
|
(go workerpool)
|
||||||
where
|
where
|
||||||
go workerpool acquireconn = liftIO $ do
|
go workerpool acquireconn = liftIO $ do
|
||||||
|
|
|
@ -181,9 +181,10 @@ type AcquireP2PConnection
|
||||||
withP2PConnections
|
withP2PConnections
|
||||||
:: AnnexWorkerPool
|
:: AnnexWorkerPool
|
||||||
-> ProxyConnectionPoolSize
|
-> ProxyConnectionPoolSize
|
||||||
|
-> ClusterConcurrency
|
||||||
-> (AcquireP2PConnection -> Annex a)
|
-> (AcquireP2PConnection -> Annex a)
|
||||||
-> Annex a
|
-> Annex a
|
||||||
withP2PConnections workerpool proxyconnectionpoolsize a = do
|
withP2PConnections workerpool proxyconnectionpoolsize clusterconcurrency a = do
|
||||||
myuuid <- getUUID
|
myuuid <- getUUID
|
||||||
reqv <- liftIO newEmptyTMVarIO
|
reqv <- liftIO newEmptyTMVarIO
|
||||||
relv <- liftIO newEmptyTMVarIO
|
relv <- liftIO newEmptyTMVarIO
|
||||||
|
@ -241,7 +242,7 @@ withP2PConnections workerpool proxyconnectionpoolsize a = do
|
||||||
Right (Right (Left clusteruuid)) -> proxyconnection $
|
Right (Right (Left clusteruuid)) -> proxyconnection $
|
||||||
openProxyConnectionToCluster workerpool
|
openProxyConnectionToCluster workerpool
|
||||||
(connectionProtocolVersion connparams)
|
(connectionProtocolVersion connparams)
|
||||||
bypass clusteruuid
|
bypass clusteruuid clusterconcurrency
|
||||||
Left ex -> return $ Left $
|
Left ex -> return $ Left $
|
||||||
ConnectionFailed $ show ex
|
ConnectionFailed $ show ex
|
||||||
where
|
where
|
||||||
|
@ -557,16 +558,20 @@ openProxyConnectionToRemote workerpool clientmaxversion bypass remote =
|
||||||
(Proxy.closeRemoteSide remoteside)
|
(Proxy.closeRemoteSide remoteside)
|
||||||
concurrencyconfig
|
concurrencyconfig
|
||||||
|
|
||||||
|
type ClusterConcurrency = Int
|
||||||
|
|
||||||
openProxyConnectionToCluster
|
openProxyConnectionToCluster
|
||||||
:: AnnexWorkerPool
|
:: AnnexWorkerPool
|
||||||
-> P2P.ProtocolVersion
|
-> P2P.ProtocolVersion
|
||||||
-> P2P.Bypass
|
-> P2P.Bypass
|
||||||
-> ClusterUUID
|
-> ClusterUUID
|
||||||
|
-> ClusterConcurrency
|
||||||
-> IO (Either SomeException ProxyConnection)
|
-> IO (Either SomeException ProxyConnection)
|
||||||
openProxyConnectionToCluster workerpool clientmaxversion bypass clusteruuid =
|
openProxyConnectionToCluster workerpool clientmaxversion bypass clusteruuid concurrency =
|
||||||
inAnnexWorker' workerpool $ do
|
inAnnexWorker' workerpool $ do
|
||||||
(proxyselector, closenodes, concurrencyconfig) <-
|
(proxyselector, closenodes) <-
|
||||||
clusterProxySelector clusteruuid clientmaxversion bypass
|
clusterProxySelector clusteruuid clientmaxversion bypass
|
||||||
|
concurrencyconfig <- Proxy.mkConcurrencyConfig concurrency
|
||||||
liftIO $ openedProxyConnection (fromClusterUUID clusteruuid)
|
liftIO $ openedProxyConnection (fromClusterUUID clusteruuid)
|
||||||
proxyselector closenodes concurrencyconfig
|
proxyselector closenodes concurrencyconfig
|
||||||
|
|
||||||
|
|
12
P2P/Proxy.hs
12
P2P/Proxy.hs
|
@ -659,10 +659,13 @@ proxyRequest proxydone proxyparams requestcomplete requestmessage protoerrhandle
|
||||||
data ConcurrencyConfig = ConcurrencyConfig Int (MSem.MSem Int)
|
data ConcurrencyConfig = ConcurrencyConfig Int (MSem.MSem Int)
|
||||||
|
|
||||||
noConcurrencyConfig :: Annex ConcurrencyConfig
|
noConcurrencyConfig :: Annex ConcurrencyConfig
|
||||||
noConcurrencyConfig = liftIO $ ConcurrencyConfig 1 <$> MSem.new 1
|
noConcurrencyConfig = mkConcurrencyConfig 1
|
||||||
|
|
||||||
getConcurrencyConfig :: Annex ConcurrencyConfig
|
mkConcurrencyConfig :: Int -> Annex ConcurrencyConfig
|
||||||
getConcurrencyConfig = (annexJobs <$> Annex.getGitConfig) >>= \case
|
mkConcurrencyConfig n = liftIO $ ConcurrencyConfig n <$> MSem.new n
|
||||||
|
|
||||||
|
concurrencyConfigJobs :: Annex ConcurrencyConfig
|
||||||
|
concurrencyConfigJobs = (annexJobs <$> Annex.getGitConfig) >>= \case
|
||||||
NonConcurrent -> noConcurrencyConfig
|
NonConcurrent -> noConcurrencyConfig
|
||||||
Concurrent n -> go n
|
Concurrent n -> go n
|
||||||
ConcurrentPerCpu -> go =<< liftIO getNumProcessors
|
ConcurrentPerCpu -> go =<< liftIO getNumProcessors
|
||||||
|
@ -672,8 +675,7 @@ getConcurrencyConfig = (annexJobs <$> Annex.getGitConfig) >>= \case
|
||||||
when (n > c) $
|
when (n > c) $
|
||||||
liftIO $ setNumCapabilities n
|
liftIO $ setNumCapabilities n
|
||||||
setConcurrency (ConcurrencyGitConfig (Concurrent n))
|
setConcurrency (ConcurrencyGitConfig (Concurrent n))
|
||||||
msem <- liftIO $ MSem.new n
|
mkConcurrencyConfig n
|
||||||
return (ConcurrencyConfig n msem)
|
|
||||||
|
|
||||||
forMC :: ConcurrencyConfig -> [a] -> (a -> Annex b) -> Annex [b]
|
forMC :: ConcurrencyConfig -> [a] -> (a -> Annex b) -> Annex [b]
|
||||||
forMC _ (x:[]) a = do
|
forMC _ (x:[]) a = do
|
||||||
|
|
|
@ -32,21 +32,37 @@ convenient way to download the content of any key, by using the path
|
||||||
* `--jobs=N` `-JN`
|
* `--jobs=N` `-JN`
|
||||||
|
|
||||||
This or annex.jobs must be set to configure the number of worker
|
This or annex.jobs must be set to configure the number of worker
|
||||||
threads.
|
threads that serve connections to the webserver.
|
||||||
|
|
||||||
Since the webserver itself uses one thread, this needs to be set to
|
Since the webserver itself also uses one of these threads,
|
||||||
2 or more.
|
this needs to be set to 2 or more.
|
||||||
|
|
||||||
A good choice is one worker per CPU core: `--jobs=cpus`
|
A good choice is often one worker per CPU core: `--jobs=cpus`
|
||||||
|
|
||||||
* `--proxyconnections=N`
|
* `--proxyconnections=N`
|
||||||
|
|
||||||
When is command is run in a repository that is configured to act as a
|
When this command is run in a repository that is configured to act as a
|
||||||
proxy for some of its remotes, this is the maximum number of idle
|
proxy for some of its remotes, this is the maximum number of idle
|
||||||
connections to keep open to proxied remotes.
|
connections to keep open to proxied remotes.
|
||||||
|
|
||||||
The default is 1.
|
The default is 1.
|
||||||
|
|
||||||
|
* `--clusterjobs=N`
|
||||||
|
|
||||||
|
When this command is run in a repository that is a gateway for a cluster,
|
||||||
|
this is the number of concurrent jobs to use to access nodes of the
|
||||||
|
cluster, per connection to the webserver.
|
||||||
|
|
||||||
|
The default is 1.
|
||||||
|
|
||||||
|
A good choice for this will be a balance between the number of nodes
|
||||||
|
in the cluster and the value of `--jobs`.
|
||||||
|
|
||||||
|
For example, if the cluster has 4 nodes, and `--jobs=4`, using
|
||||||
|
`--clusterjobs=4` will make all nodes in the cluster be accessed
|
||||||
|
concurrently, which is often optimal. But around 20 cores can be needed
|
||||||
|
when the webserver is busy.
|
||||||
|
|
||||||
* `--port=N`
|
* `--port=N`
|
||||||
|
|
||||||
Port to listen on. The default is port 9417, which is the default
|
Port to listen on. The default is port 9417, which is the default
|
||||||
|
@ -122,6 +138,10 @@ git-http-backend(1)
|
||||||
|
|
||||||
[[git-annex-updateproxy]](1)
|
[[git-annex-updateproxy]](1)
|
||||||
|
|
||||||
|
[[git-annex-initcluster]](1)
|
||||||
|
|
||||||
|
[[git-annex-updatecluster]](1)
|
||||||
|
|
||||||
<https://git-annex.branchable.com/design/p2p_protocol_over_http/>
|
<https://git-annex.branchable.com/design/p2p_protocol_over_http/>
|
||||||
|
|
||||||
# AUTHOR
|
# AUTHOR
|
||||||
|
|
|
@ -26,7 +26,7 @@ it. Then after pulling from "work", git-annex will know about an
|
||||||
additional remote, "work-foo". That remote will be accessed using "work" as
|
additional remote, "work-foo". That remote will be accessed using "work" as
|
||||||
a proxy.
|
a proxy.
|
||||||
|
|
||||||
Proxies can only be accessed via ssh.
|
Proxies can only be accessed via ssh or by an annex+http url.
|
||||||
|
|
||||||
# OPTIONS
|
# OPTIONS
|
||||||
|
|
||||||
|
|
|
@ -12,8 +12,8 @@ special remotes.
|
||||||
## using a cluster
|
## using a cluster
|
||||||
|
|
||||||
To use a cluster, your repository needs to have its gateway configured as a
|
To use a cluster, your repository needs to have its gateway configured as a
|
||||||
remote. Clusters can currently only be accessed via ssh. This gateway
|
remote. Clusters can currently only be accessed via ssh or by a annex+http
|
||||||
remote is added the same as any other git remote:
|
url. This gateway remote is added the same as any other git remote:
|
||||||
|
|
||||||
$ git remote add bigserver me@bigserver:annex
|
$ git remote add bigserver me@bigserver:annex
|
||||||
|
|
||||||
|
|
|
@ -32,11 +32,6 @@ Planned schedule of work:
|
||||||
|
|
||||||
* git-annex testremote cluster
|
* git-annex testremote cluster
|
||||||
|
|
||||||
* Support proxying to git remotes using annex+http urls.
|
|
||||||
(Current documentation says proxying only works with ssh remotes,
|
|
||||||
so current state is not confusing, but this still needs to be done
|
|
||||||
eventually.)
|
|
||||||
|
|
||||||
## completed items for July's work on p2p protocol over http
|
## completed items for July's work on p2p protocol over http
|
||||||
|
|
||||||
* HTTP P2P protocol design [[design/p2p_protocol_over_http]].
|
* HTTP P2P protocol design [[design/p2p_protocol_over_http]].
|
||||||
|
@ -53,14 +48,15 @@ Planned schedule of work:
|
||||||
|
|
||||||
* Make http server support proxying.
|
* Make http server support proxying.
|
||||||
|
|
||||||
|
* Make http server support serving a cluster.
|
||||||
|
|
||||||
## items deferred until later for p2p protocol over http
|
## items deferred until later for p2p protocol over http
|
||||||
|
|
||||||
|
* Support proxying to git remotes that use annex+http urls.
|
||||||
|
|
||||||
* `git-annex p2phttp` could support systemd socket activation. This would
|
* `git-annex p2phttp` could support systemd socket activation. This would
|
||||||
allow making a systemd unit that listens on port 80.
|
allow making a systemd unit that listens on port 80.
|
||||||
|
|
||||||
* `git-annex p2phttp` could serve `.git/annex/p2phttp/.well-known/`,
|
|
||||||
allowing it to be used by an ACME client to get certificates.
|
|
||||||
|
|
||||||
## items deferred until later for [[design/passthrough_proxy]]
|
## items deferred until later for [[design/passthrough_proxy]]
|
||||||
|
|
||||||
* Check annex.diskreserve when proxying for special remotes
|
* Check annex.diskreserve when proxying for special remotes
|
||||||
|
|
Loading…
Reference in a new issue