GET and CHECKPRESENT amoung lowest cost cluster nodes
Before it was using a node that might have had a higher cost. Also threw in a random selection from amoung the low cost nodes. Of course this is a poor excuse for load balancing, but it's better than nothing. Most of the time...
This commit is contained in:
parent
dceb8dc776
commit
cf59d7f92c
5 changed files with 56 additions and 39 deletions
|
@ -27,6 +27,7 @@ import qualified Types.Remote as Remote
|
||||||
|
|
||||||
import qualified Data.Map as M
|
import qualified Data.Map as M
|
||||||
import qualified Data.Set as S
|
import qualified Data.Set as S
|
||||||
|
import System.Random
|
||||||
|
|
||||||
{- Proxy to a cluster. -}
|
{- Proxy to a cluster. -}
|
||||||
proxyCluster
|
proxyCluster
|
||||||
|
@ -75,7 +76,7 @@ clusterProxySelector clusteruuid protocolversion (Bypass bypass) = do
|
||||||
nodeuuids <- (fromMaybe S.empty . M.lookup clusteruuid . clusterUUIDs)
|
nodeuuids <- (fromMaybe S.empty . M.lookup clusteruuid . clusterUUIDs)
|
||||||
<$> getClusters
|
<$> getClusters
|
||||||
myclusters <- annexClusters <$> Annex.getGitConfig
|
myclusters <- annexClusters <$> Annex.getGitConfig
|
||||||
allremotes <- remoteList
|
allremotes <- concat . Remote.byCost <$> remoteList
|
||||||
hereu <- getUUID
|
hereu <- getUUID
|
||||||
let bypass' = S.insert hereu bypass
|
let bypass' = S.insert hereu bypass
|
||||||
let clusterremotes = filter (isnode bypass' allremotes nodeuuids myclusters) allremotes
|
let clusterremotes = filter (isnode bypass' allremotes nodeuuids myclusters) allremotes
|
||||||
|
@ -94,8 +95,8 @@ clusterProxySelector clusteruuid protocolversion (Bypass bypass) = do
|
||||||
-- skipping nodes where it's not preferred content.
|
-- skipping nodes where it's not preferred content.
|
||||||
, proxyPUT = \af k -> do
|
, proxyPUT = \af k -> do
|
||||||
locs <- S.fromList <$> loggedLocations k
|
locs <- S.fromList <$> loggedLocations k
|
||||||
let l = filter (flip S.notMember locs . remoteUUID) nodes
|
let l = filter (flip S.notMember locs . Remote.uuid . remote) nodes
|
||||||
l' <- filterM (\n -> isPreferredContent (Just (remoteUUID n)) mempty (Just k) af True) l
|
l' <- filterM (\n -> isPreferredContent (Just (Remote.uuid (remote n))) mempty (Just k) af True) l
|
||||||
-- PUT to no nodes doesn't work, so fall
|
-- PUT to no nodes doesn't work, so fall
|
||||||
-- back to all nodes.
|
-- back to all nodes.
|
||||||
return $ nonempty [l', l] nodes
|
return $ nonempty [l', l] nodes
|
||||||
|
@ -146,11 +147,19 @@ clusterProxySelector clusteruuid protocolversion (Bypass bypass) = do
|
||||||
|
|
||||||
nodecontaining nodes k = do
|
nodecontaining nodes k = do
|
||||||
locs <- S.fromList <$> loggedLocations k
|
locs <- S.fromList <$> loggedLocations k
|
||||||
case filter (flip S.member locs . remoteUUID) nodes of
|
case filter (flip S.member locs . Remote.uuid . remote) nodes of
|
||||||
-- For now, pick the first node that has the
|
|
||||||
-- content. Load balancing would be nice..
|
|
||||||
(r:_) -> return (Just r)
|
|
||||||
[] -> return Nothing
|
[] -> return Nothing
|
||||||
|
(node:[]) -> return (Just node)
|
||||||
|
(node:rest) ->
|
||||||
|
-- The list of nodes is ordered by cost.
|
||||||
|
-- Use any of the ones with equally low
|
||||||
|
-- cost.
|
||||||
|
let lowestcost = Remote.cost (remote node)
|
||||||
|
samecost = node : takeWhile (\n -> Remote.cost (remote n) == lowestcost) rest
|
||||||
|
in do
|
||||||
|
n <- getStdRandom $
|
||||||
|
randomR (0, length samecost - 1)
|
||||||
|
return (Just (samecost !! n))
|
||||||
|
|
||||||
nonempty (l:ls) fallback
|
nonempty (l:ls) fallback
|
||||||
| null l = nonempty ls fallback
|
| null l = nonempty ls fallback
|
||||||
|
|
|
@ -11,14 +11,12 @@ import Annex.Common
|
||||||
import P2P.Proxy
|
import P2P.Proxy
|
||||||
import P2P.Protocol
|
import P2P.Protocol
|
||||||
import P2P.IO
|
import P2P.IO
|
||||||
import qualified Remote
|
|
||||||
import Remote.Helper.Ssh (openP2PShellConnection', closeP2PShellConnection)
|
import Remote.Helper.Ssh (openP2PShellConnection', closeP2PShellConnection)
|
||||||
|
|
||||||
-- FIXME: Support special remotes.
|
-- FIXME: Support special remotes.
|
||||||
proxySshRemoteSide :: ProtocolVersion -> Bypass -> Remote -> Annex RemoteSide
|
proxySshRemoteSide :: ProtocolVersion -> Bypass -> Remote -> Annex RemoteSide
|
||||||
proxySshRemoteSide clientmaxversion bypass remote =
|
proxySshRemoteSide clientmaxversion bypass r = mkRemoteSide r $
|
||||||
mkRemoteSide (Remote.uuid remote) $
|
openP2PShellConnection' r clientmaxversion bypass >>= \case
|
||||||
openP2PShellConnection' remote clientmaxversion bypass >>= \case
|
|
||||||
Just conn@(OpenConnection (remoterunst, remoteconn, _)) ->
|
Just conn@(OpenConnection (remoterunst, remoteconn, _)) ->
|
||||||
return $ Just
|
return $ Just
|
||||||
( remoterunst
|
( remoterunst
|
||||||
|
|
|
@ -60,14 +60,14 @@ performLocal theiruuid servermode = do
|
||||||
p2pErrHandler (const p2pDone) (runFullProto runst conn server)
|
p2pErrHandler (const p2pDone) (runFullProto runst conn server)
|
||||||
|
|
||||||
performProxy :: UUID -> P2P.ServerMode -> Remote -> CommandPerform
|
performProxy :: UUID -> P2P.ServerMode -> Remote -> CommandPerform
|
||||||
performProxy clientuuid servermode remote = do
|
performProxy clientuuid servermode r = do
|
||||||
clientside <- proxyClientSide clientuuid
|
clientside <- proxyClientSide clientuuid
|
||||||
getClientProtocolVersion (Remote.uuid remote) clientside
|
getClientProtocolVersion (Remote.uuid r) clientside
|
||||||
(withclientversion clientside)
|
(withclientversion clientside)
|
||||||
p2pErrHandler
|
p2pErrHandler
|
||||||
where
|
where
|
||||||
withclientversion clientside (Just (clientmaxversion, othermsg)) = do
|
withclientversion clientside (Just (clientmaxversion, othermsg)) = do
|
||||||
remoteside <- proxySshRemoteSide clientmaxversion mempty remote
|
remoteside <- proxySshRemoteSide clientmaxversion mempty r
|
||||||
protocolversion <- either (const (min P2P.maxProtocolVersion clientmaxversion)) id
|
protocolversion <- either (const (min P2P.maxProtocolVersion clientmaxversion)) id
|
||||||
<$> runRemoteSide remoteside
|
<$> runRemoteSide remoteside
|
||||||
(P2P.net P2P.getProtocolVersion)
|
(P2P.net P2P.getProtocolVersion)
|
||||||
|
@ -77,7 +77,7 @@ performProxy clientuuid servermode remote = do
|
||||||
concurrencyconfig <- noConcurrencyConfig
|
concurrencyconfig <- noConcurrencyConfig
|
||||||
let runproxy othermsg' = proxy closer proxymethods
|
let runproxy othermsg' = proxy closer proxymethods
|
||||||
servermode clientside
|
servermode clientside
|
||||||
(Remote.uuid remote)
|
(Remote.uuid r)
|
||||||
(singleProxySelector remoteside)
|
(singleProxySelector remoteside)
|
||||||
concurrencyconfig
|
concurrencyconfig
|
||||||
protocolversion othermsg' p2pErrHandler
|
protocolversion othermsg' p2pErrHandler
|
||||||
|
|
27
P2P/Proxy.hs
27
P2P/Proxy.hs
|
@ -18,6 +18,7 @@ import Utility.Metered
|
||||||
import Git.FilePath
|
import Git.FilePath
|
||||||
import Types.Concurrency
|
import Types.Concurrency
|
||||||
import Annex.Concurrent
|
import Annex.Concurrent
|
||||||
|
import qualified Remote
|
||||||
|
|
||||||
import Data.Either
|
import Data.Either
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
|
@ -32,14 +33,14 @@ type ProtoCloser = Annex ()
|
||||||
data ClientSide = ClientSide RunState P2PConnection
|
data ClientSide = ClientSide RunState P2PConnection
|
||||||
|
|
||||||
data RemoteSide = RemoteSide
|
data RemoteSide = RemoteSide
|
||||||
{ remoteUUID :: UUID
|
{ remote :: Remote
|
||||||
, remoteConnect :: Annex (Maybe (RunState, P2PConnection, ProtoCloser))
|
, remoteConnect :: Annex (Maybe (RunState, P2PConnection, ProtoCloser))
|
||||||
, remoteTMVar :: TMVar (RunState, P2PConnection, ProtoCloser)
|
, remoteTMVar :: TMVar (RunState, P2PConnection, ProtoCloser)
|
||||||
}
|
}
|
||||||
|
|
||||||
mkRemoteSide :: UUID -> Annex (Maybe (RunState, P2PConnection, ProtoCloser)) -> Annex RemoteSide
|
mkRemoteSide :: Remote -> Annex (Maybe (RunState, P2PConnection, ProtoCloser)) -> Annex RemoteSide
|
||||||
mkRemoteSide remoteuuid remoteconnect = RemoteSide
|
mkRemoteSide r remoteconnect = RemoteSide
|
||||||
<$> pure remoteuuid
|
<$> pure r
|
||||||
<*> pure remoteconnect
|
<*> pure remoteconnect
|
||||||
<*> liftIO (atomically newEmptyTMVar)
|
<*> liftIO (atomically newEmptyTMVar)
|
||||||
|
|
||||||
|
@ -328,9 +329,9 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
|
||||||
net $ sendMessage message
|
net $ sendMessage message
|
||||||
net receiveMessage >>= return . \case
|
net receiveMessage >>= return . \case
|
||||||
Just SUCCESS ->
|
Just SUCCESS ->
|
||||||
Just (True, [remoteUUID r])
|
Just (True, [Remote.uuid (remote r)])
|
||||||
Just (SUCCESS_PLUS us) ->
|
Just (SUCCESS_PLUS us) ->
|
||||||
Just (True, remoteUUID r:us)
|
Just (True, Remote.uuid (remote r):us)
|
||||||
Just FAILURE ->
|
Just FAILURE ->
|
||||||
Just (False, [])
|
Just (False, [])
|
||||||
Just (FAILURE_PLUS us) ->
|
Just (FAILURE_PLUS us) ->
|
||||||
|
@ -355,7 +356,7 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
|
||||||
withDATA (relayGET remoteside)
|
withDATA (relayGET remoteside)
|
||||||
|
|
||||||
handlePUT (remoteside:[]) k message
|
handlePUT (remoteside:[]) k message
|
||||||
| remoteUUID remoteside == remoteuuid =
|
| Remote.uuid (remote remoteside) == remoteuuid =
|
||||||
getresponse (runRemoteSide remoteside) message $ \resp -> case resp of
|
getresponse (runRemoteSide remoteside) message $ \resp -> case resp of
|
||||||
ALREADY_HAVE -> protoerrhandler proxynextclientmessage $
|
ALREADY_HAVE -> protoerrhandler proxynextclientmessage $
|
||||||
client $ net $ sendMessage resp
|
client $ net $ sendMessage resp
|
||||||
|
@ -390,10 +391,10 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
|
||||||
proxynextclientmessage ()
|
proxynextclientmessage ()
|
||||||
|
|
||||||
relayPUTRecord k remoteside SUCCESS = do
|
relayPUTRecord k remoteside SUCCESS = do
|
||||||
addedContent proxymethods (remoteUUID remoteside) k
|
addedContent proxymethods (Remote.uuid (remote remoteside)) k
|
||||||
return $ Just [remoteUUID remoteside]
|
return $ Just [Remote.uuid (remote remoteside)]
|
||||||
relayPUTRecord k remoteside (SUCCESS_PLUS us) = do
|
relayPUTRecord k remoteside (SUCCESS_PLUS us) = do
|
||||||
let us' = remoteUUID remoteside : us
|
let us' = (Remote.uuid (remote remoteside)) : us
|
||||||
forM_ us' $ \u ->
|
forM_ us' $ \u ->
|
||||||
addedContent proxymethods u k
|
addedContent proxymethods u k
|
||||||
return $ Just us'
|
return $ Just us'
|
||||||
|
@ -425,7 +426,7 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
|
||||||
else protoerrhandler proxynextclientmessage $
|
else protoerrhandler proxynextclientmessage $
|
||||||
client $ net $ sendMessage $ ALREADY_HAVE_PLUS $
|
client $ net $ sendMessage $ ALREADY_HAVE_PLUS $
|
||||||
filter (/= remoteuuid) $
|
filter (/= remoteuuid) $
|
||||||
map remoteUUID (lefts (rights l))
|
map (Remote.uuid . remote) (lefts (rights l))
|
||||||
else if null (rights l)
|
else if null (rights l)
|
||||||
-- no response from any remote
|
-- no response from any remote
|
||||||
then proxydone
|
then proxydone
|
||||||
|
@ -439,11 +440,11 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
|
||||||
let totallen = datalen + minoffset
|
let totallen = datalen + minoffset
|
||||||
-- Tell each remote how much data to expect, depending
|
-- Tell each remote how much data to expect, depending
|
||||||
-- on the remote's offset.
|
-- on the remote's offset.
|
||||||
rs <- forMC concurrencyconfig remotes $ \remote@(remoteside, remoteoffset) ->
|
rs <- forMC concurrencyconfig remotes $ \r@(remoteside, remoteoffset) ->
|
||||||
runRemoteSideOrSkipFailed remoteside $ do
|
runRemoteSideOrSkipFailed remoteside $ do
|
||||||
net $ sendMessage $ DATA $ Len $
|
net $ sendMessage $ DATA $ Len $
|
||||||
totallen - remoteoffset
|
totallen - remoteoffset
|
||||||
return remote
|
return r
|
||||||
protoerrhandler (send (catMaybes rs) minoffset) $
|
protoerrhandler (send (catMaybes rs) minoffset) $
|
||||||
client $ net $ receiveBytes (Len datalen) nullMeterUpdate
|
client $ net $ receiveBytes (Len datalen) nullMeterUpdate
|
||||||
where
|
where
|
||||||
|
|
|
@ -36,10 +36,19 @@ For June's work on [[design/passthrough_proxy]], remaining todos:
|
||||||
* Indirect uploads when proxying for special remote
|
* Indirect uploads when proxying for special remote
|
||||||
(to be considered). See design.
|
(to be considered). See design.
|
||||||
|
|
||||||
* Getting a key from a cluster currently always selects the lowest cost
|
* Getting a key from a cluster currently picks from amoung
|
||||||
remote, and always the same remote if cost is the same. Should
|
the lowest cost remotes at random. This could be smarter,
|
||||||
round-robin amoung remotes, and prefer to avoid using remotes that
|
eg prefer to avoid using remotes that are doing other transfers at the
|
||||||
other git-annex processes are currently using.
|
same time.
|
||||||
|
|
||||||
|
* The cost of a cluster and of its proxied nodes is currently all the same.
|
||||||
|
It would make sense for proxied nodes that are accessed via an intermedia
|
||||||
|
gateway to have a higher cost than proxied nodes that are accessed via
|
||||||
|
the remote gateway. And proxied nodes should generally have a higher cost
|
||||||
|
than the cluster, so that git-annex defaults to using the cluster.
|
||||||
|
(The cost of accessing a proxied node vs using the cluster is the same,
|
||||||
|
but using the cluster allows smarter load-balancing to be done on the
|
||||||
|
cluster. It also makes the UI not mention individual nodes.)
|
||||||
|
|
||||||
* Optimise proxy speed. See design for ideas.
|
* Optimise proxy speed. See design for ideas.
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue