implement proxy connection pool
removeOldestProxyConnectionPool will be innefficient the larger the pool is. A better data structure could be more efficient. Eg, make each value in the pool include the timestamp of its oldest element, then the oldest value can be found and modified, rather than rebuilding the whole Map. But, for pools of a few hundred items, this should be fine. It's O(n*n log n) or so. Also, when more than 1 connection with the same pool key exists, it's efficient even for larger pools, since removeOldestProxyConnectionPool is not needed. The default of 1 idle connection could perhaps be larger.. like the number of jobs? Otoh, it seems good to ramp up and down the number of connections, which does happen. With 1, there is at most one stale connection, which might cause a request to fail.
This commit is contained in:
parent
fb43b7ea3f
commit
d1faa13d6a
5 changed files with 114 additions and 38 deletions
|
@ -10,6 +10,7 @@
|
|||
{-# LANGUAGE BangPatterns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE FlexibleContexts #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
|
||||
module P2P.Http.State where
|
||||
|
||||
|
@ -33,10 +34,11 @@ import qualified P2P.Proxy as Proxy
|
|||
import qualified Types.Remote as Remote
|
||||
|
||||
import Servant
|
||||
import qualified Data.Map as M
|
||||
import qualified Data.Map.Strict as M
|
||||
import qualified Data.Set as S
|
||||
import Control.Concurrent.Async
|
||||
import Control.Concurrent.STM
|
||||
import Data.Time.Clock.POSIX
|
||||
|
||||
data P2PHttpServerState = P2PHttpServerState
|
||||
{ acquireP2PConnection :: AcquireP2PConnection
|
||||
|
@ -175,13 +177,17 @@ type AcquireP2PConnection
|
|||
= ConnectionParams
|
||||
-> IO (Either ConnectionProblem P2PConnectionPair)
|
||||
|
||||
withP2PConnections :: AnnexWorkerPool -> (AcquireP2PConnection -> Annex a) -> Annex a
|
||||
withP2PConnections workerpool a = do
|
||||
withP2PConnections
|
||||
:: AnnexWorkerPool
|
||||
-> ProxyConnectionPoolSize
|
||||
-> (AcquireP2PConnection -> Annex a)
|
||||
-> Annex a
|
||||
withP2PConnections workerpool proxyconnectionpoolsize a = do
|
||||
myuuid <- getUUID
|
||||
reqv <- liftIO newEmptyTMVarIO
|
||||
relv <- liftIO newEmptyTMVarIO
|
||||
endv <- liftIO newEmptyTMVarIO
|
||||
proxypool <- liftIO $ newTMVarIO mempty
|
||||
proxypool <- liftIO $ newTMVarIO (0, mempty)
|
||||
asyncservicer <- liftIO $ async $
|
||||
servicer myuuid proxypool reqv relv endv
|
||||
let endit = do
|
||||
|
@ -216,8 +222,8 @@ withP2PConnections workerpool a = do
|
|||
| connectionServerUUID connparams == myuuid =
|
||||
localConnection relv connparams workerpool
|
||||
| otherwise =
|
||||
atomically (getProxyConnectionFromPool proxypool connparams) >>= \case
|
||||
Just conn -> proxyConnection relv connparams workerpool conn
|
||||
atomically (getProxyConnectionPool proxypool connparams) >>= \case
|
||||
Just conn -> proxyConnection proxyconnectionpoolsize relv connparams workerpool proxypool conn
|
||||
Nothing -> checkcanproxy myuuid proxypool relv connparams
|
||||
|
||||
checkcanproxy myuuid proxypool relv connparams =
|
||||
|
@ -233,7 +239,7 @@ withP2PConnections workerpool a = do
|
|||
(connectionBypass connparams)
|
||||
proxyremote
|
||||
>>= \case
|
||||
Right conn -> proxyConnection relv connparams workerpool conn
|
||||
Right conn -> proxyConnection proxyconnectionpoolsize relv connparams workerpool proxypool conn
|
||||
Left ex -> return $ Left $
|
||||
ConnectionFailed $ show ex
|
||||
Right (Right (Left clusteruuid)) ->
|
||||
|
@ -332,12 +338,14 @@ mkClientRunState connparams = do
|
|||
mkRunState $ const $ Client prototvar
|
||||
|
||||
proxyConnection
|
||||
:: TMVar (IO ())
|
||||
:: ProxyConnectionPoolSize
|
||||
-> TMVar (IO ())
|
||||
-> ConnectionParams
|
||||
-> AnnexWorkerPool
|
||||
-> TMVar ProxyConnectionPool
|
||||
-> ProxyConnection
|
||||
-> IO (Either ConnectionProblem P2PConnectionPair)
|
||||
proxyConnection relv connparams workerpool proxyconn = do
|
||||
proxyConnection proxyconnectionpoolsize relv connparams workerpool proxypool proxyconn = do
|
||||
(clientconn, proxyfromclientconn) <-
|
||||
mkP2PConnectionPair connparams ("http client", "proxy")
|
||||
clientrunst <- mkClientRunState connparams
|
||||
|
@ -371,11 +379,15 @@ proxyConnection relv connparams workerpool proxyconn = do
|
|||
r <- liftIO $ wait asyncworker
|
||||
liftIO $ closeConnection proxyfromclientconn
|
||||
liftIO $ closeConnection clientconn
|
||||
inAnnexWorker' workerpool $
|
||||
Proxy.closeRemoteSide $
|
||||
proxyConnectionRemoteSide proxyconn
|
||||
if returntopool
|
||||
then liftIO $ do
|
||||
now <- getPOSIXTime
|
||||
evicted <- atomically $ putProxyConnectionPool proxypool proxyconnectionpoolsize connparams $
|
||||
proxyconn { proxyConnectionLastUsed = now }
|
||||
maybe noop closeproxyconnection evicted
|
||||
else closeproxyconnection proxyconn
|
||||
either throwM return r
|
||||
|
||||
|
||||
return $ Right $ P2PConnectionPair
|
||||
{ clientRunState = clientrunst
|
||||
, clientP2PConnection = clientconn
|
||||
|
@ -385,12 +397,18 @@ proxyConnection relv connparams workerpool proxyconn = do
|
|||
}
|
||||
where
|
||||
protoerrhandler cont a = a >>= \case
|
||||
Left err ->
|
||||
Left _ ->
|
||||
Proxy.closeRemoteSide $
|
||||
proxyConnectionRemoteSide proxyconn
|
||||
Right v -> cont v
|
||||
|
||||
proxydone = return ()
|
||||
|
||||
requestcomplete () = return ()
|
||||
|
||||
closeproxyconnection =
|
||||
void . inAnnexWorker' workerpool .
|
||||
Proxy.closeRemoteSide . proxyConnectionRemoteSide
|
||||
|
||||
data Locker = Locker
|
||||
{ lockerThread :: Async ()
|
||||
|
@ -506,7 +524,9 @@ inAnnexWorker' poolv annexaction = do
|
|||
data ProxyConnection = ProxyConnection
|
||||
{ proxyConnectionRemoteUUID :: UUID
|
||||
, proxyConnectionRemoteSide :: Proxy.RemoteSide
|
||||
, proxyConnectionLastUsed :: POSIXTime
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
openProxyConnectionToRemote
|
||||
:: AnnexWorkerPool
|
||||
|
@ -517,36 +537,74 @@ openProxyConnectionToRemote
|
|||
openProxyConnectionToRemote workerpool clientmaxversion bypass remote =
|
||||
inAnnexWorker' workerpool (proxyRemoteSide clientmaxversion bypass' remote) >>= \case
|
||||
Left ex -> return (Left ex)
|
||||
Right remoteside -> return $ Right $
|
||||
ProxyConnection (Remote.uuid remote) remoteside
|
||||
Right remoteside -> do
|
||||
now <- getPOSIXTime
|
||||
return $ Right $
|
||||
ProxyConnection (Remote.uuid remote) remoteside now
|
||||
where
|
||||
bypass' = P2P.Bypass (S.fromList bypass)
|
||||
|
||||
openProxyConnectionToCluster :: ClusterUUID -> IO ProxyConnection
|
||||
openProxyConnectionToCluster cu = error "XXX" -- TODO
|
||||
|
||||
type ProxyConnectionPool =
|
||||
M.Map (UUID, UUID, P2P.ProtocolVersion) [ProxyConnection]
|
||||
type ProxyConnectionPool = (Integer, M.Map ProxyConnectionPoolKey [ProxyConnection])
|
||||
|
||||
getProxyConnectionFromPool
|
||||
type ProxyConnectionPoolSize = Integer
|
||||
|
||||
-- Returns any older ProxyConnection that was evicted from the pool.
|
||||
putProxyConnectionPool
|
||||
:: TMVar ProxyConnectionPool
|
||||
-> ProxyConnectionPoolSize
|
||||
-> ConnectionParams
|
||||
-> ProxyConnection
|
||||
-> STM (Maybe ProxyConnection)
|
||||
putProxyConnectionPool proxypool maxsz connparams conn = do
|
||||
(sz, m) <- takeTMVar proxypool
|
||||
let ((sz', m'), evicted) = case M.lookup k m of
|
||||
Nothing -> ((succ sz, M.insert k [conn] m), Nothing)
|
||||
Just [] -> ((succ sz, M.insert k [conn] m), Nothing)
|
||||
Just cs -> if sz >= maxsz
|
||||
then ((sz, M.insert k (conn : dropFromEnd 1 cs) m), lastMaybe cs)
|
||||
else ((sz, M.insert k (conn : cs) m), Nothing)
|
||||
let ((sz'', m''), evicted') = if sz' > maxsz
|
||||
then removeOldestProxyConnectionPool (sz', m')
|
||||
else ((sz', m'), Nothing)
|
||||
putTMVar proxypool (sz'', m'')
|
||||
return (evicted <|> evicted')
|
||||
where
|
||||
k = proxyConnectionPoolKey connparams
|
||||
|
||||
removeOldestProxyConnectionPool :: ProxyConnectionPool -> (ProxyConnectionPool, Maybe ProxyConnection)
|
||||
removeOldestProxyConnectionPool (sz, m) =
|
||||
((pred sz, m'), snd <$> headMaybe l)
|
||||
where
|
||||
m' = M.fromListWith (++) $ map (\(k', v) -> (k', [v])) (drop 1 l)
|
||||
l = sortOn (proxyConnectionLastUsed . snd) $
|
||||
concatMap (\(k', pl) -> map (k', ) pl) $
|
||||
M.toList m
|
||||
|
||||
getProxyConnectionPool
|
||||
:: TMVar ProxyConnectionPool
|
||||
-> ConnectionParams
|
||||
-> STM (Maybe ProxyConnection)
|
||||
getProxyConnectionFromPool proxypool connparams = do
|
||||
m <- takeTMVar proxypool
|
||||
getProxyConnectionPool proxypool connparams = do
|
||||
(sz, m) <- takeTMVar proxypool
|
||||
case M.lookup k m of
|
||||
Nothing -> do
|
||||
putTMVar proxypool m
|
||||
return Nothing
|
||||
Just [] -> do
|
||||
putTMVar proxypool $ M.insert k [] m
|
||||
return Nothing
|
||||
Just (c:cs) -> do
|
||||
putTMVar proxypool $ M.insert k cs m
|
||||
putTMVar proxypool (sz-1, M.insert k cs m)
|
||||
return (Just c)
|
||||
_ -> do
|
||||
putTMVar proxypool (sz, m)
|
||||
return Nothing
|
||||
where
|
||||
k =
|
||||
( connectionServerUUID connparams
|
||||
, connectionClientUUID connparams
|
||||
, connectionProtocolVersion connparams
|
||||
)
|
||||
k = proxyConnectionPoolKey connparams
|
||||
|
||||
type ProxyConnectionPoolKey = (UUID, UUID, [UUID], P2P.ProtocolVersion)
|
||||
|
||||
proxyConnectionPoolKey :: ConnectionParams -> ProxyConnectionPoolKey
|
||||
proxyConnectionPoolKey connparams =
|
||||
( connectionServerUUID connparams
|
||||
, connectionClientUUID connparams
|
||||
, connectionBypass connparams
|
||||
, connectionProtocolVersion connparams
|
||||
)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue