use worker pool in withLocalP2PConnections

This allows multiple clients to be handled at the same time.
This commit is contained in:
Joey Hess 2024-07-11 14:37:52 -04:00
parent 68227154fb
commit 97a2d0e4fb
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
3 changed files with 61 additions and 58 deletions

View file

@ -75,7 +75,7 @@ seek o = getAnnexWorkerPool $ \workerpool -> do
liftIO $ putStrLn "test begins" liftIO $ putStrLn "test begins"
testGet testGet
giveup "TEST DONE" giveup "TEST DONE"
withLocalP2PConnections $ \acquireconn -> liftIO $ do withLocalP2PConnections workerpool $ \acquireconn -> liftIO $ do
authenv <- getAuthEnv authenv <- getAuthEnv
st <- mkP2PHttpServerState acquireconn workerpool $ st <- mkP2PHttpServerState acquireconn workerpool $
mkGetServerMode authenv o mkGetServerMode authenv o

View file

@ -21,7 +21,6 @@ import qualified P2P.IO as P2P
import P2P.IO import P2P.IO
import P2P.Annex import P2P.Annex
import Annex.UUID import Annex.UUID
import Annex.Concurrent
import Types.WorkerPool import Types.WorkerPool
import Annex.WorkerPool import Annex.WorkerPool
import CmdLine.Action (startConcurrency) import CmdLine.Action (startConcurrency)
@ -146,15 +145,12 @@ type AcquireP2PConnection
-> IO (Either ConnectionProblem P2PConnectionPair) -> IO (Either ConnectionProblem P2PConnectionPair)
{- Acquire P2P connections to the local repository. -} {- Acquire P2P connections to the local repository. -}
-- TODO need worker pool, this can only service a single request at withLocalP2PConnections :: AnnexWorkerPool -> (AcquireP2PConnection -> Annex a) -> Annex a
-- a time. withLocalP2PConnections workerpool a = do
-- TODO proxies myuuid <- getUUID
-- TODO clusters
withLocalP2PConnections :: (AcquireP2PConnection -> Annex a) -> Annex a
withLocalP2PConnections a = do
reqv <- liftIO newEmptyTMVarIO reqv <- liftIO newEmptyTMVarIO
relv <- liftIO newEmptyTMVarIO relv <- liftIO newEmptyTMVarIO
asyncservicer <- liftIO . async =<< forkState (servicer reqv relv) asyncservicer <- liftIO $ async $ servicer myuuid reqv relv
a (acquireconn reqv) `finally` join (liftIO (wait asyncservicer)) a (acquireconn reqv) `finally` join (liftIO (wait asyncservicer))
where where
acquireconn reqv connparams = do acquireconn reqv connparams = do
@ -162,7 +158,7 @@ withLocalP2PConnections a = do
atomically $ putTMVar reqv (connparams, respvar) atomically $ putTMVar reqv (connparams, respvar)
atomically $ takeTMVar respvar atomically $ takeTMVar respvar
servicer reqv relv = do servicer myuuid reqv relv = do
reqrel <- liftIO $ reqrel <- liftIO $
atomically $ atomically $
(Right <$> takeTMVar reqv) (Right <$> takeTMVar reqv)
@ -170,54 +166,61 @@ withLocalP2PConnections a = do
(Left <$> takeTMVar relv) (Left <$> takeTMVar relv)
case reqrel of case reqrel of
Right (connparams, respvar) -> Right (connparams, respvar) ->
servicereq relv connparams respvar servicereq myuuid relv connparams
>>= atomically . putTMVar respvar
Left releaseconn -> releaseconn Left releaseconn -> releaseconn
servicer reqv relv servicer myuuid reqv relv
servicereq relv connparams respvar = do servicereq myuuid relv connparams
myuuid <- getUUID | connectionServerUUID connparams /= myuuid =
resp <- if connectionServerUUID connparams /= myuuid return $ Left $ ConnectionFailed "unknown uuid"
then return $ Left $ ConnectionFailed "unknown uuid" | otherwise = mkP2PConnectionPair connparams relv $
else do \serverrunst serverconn -> inAnnexWorker' workerpool $
hdl1 <- liftIO newEmptyTMVarIO void $ runFullProto serverrunst serverconn $
hdl2 <- liftIO newEmptyTMVarIO P2P.serveOneCommandAuthed
wait1 <- liftIO newEmptyTMVarIO (connectionServerMode connparams)
wait2 <- liftIO newEmptyTMVarIO (connectionServerUUID connparams)
let h1 = P2PHandleTMVar hdl1 wait1
let h2 = P2PHandleTMVar hdl2 wait2
let serverconn = P2PConnection Nothing
(const True) h1 h2
(ConnIdent (Just "http server"))
let clientconn = P2PConnection Nothing
(const True) h2 h1
(ConnIdent (Just "http client"))
clientrunst <- liftIO $ mkclientrunst connparams
serverrunst <- liftIO $ mkserverrunst connparams
let server = P2P.serveOneCommandAuthed
(connectionServerMode connparams)
(connectionServerUUID connparams)
let protorunner = void $
runFullProto serverrunst serverconn server
asyncworker <- liftIO . async
=<< forkState protorunner
let releaseconn = atomically $ putTMVar relv $
join (liftIO (wait asyncworker))
return $ Right $ P2PConnectionPair
{ clientRunState = clientrunst
, clientP2PConnection = clientconn
, serverP2PConnection = serverconn
, releaseP2PConnection = releaseconn
}
liftIO $ atomically $ putTMVar respvar resp
mkserverrunst connparams = do mkP2PConnectionPair
:: ConnectionParams
-> TMVar (IO ())
-> (RunState -> P2PConnection -> IO (Either SomeException ()))
-> IO (Either ConnectionProblem P2PConnectionPair)
mkP2PConnectionPair connparams relv startworker = do
hdl1 <- newEmptyTMVarIO
hdl2 <- newEmptyTMVarIO
wait1 <- newEmptyTMVarIO
wait2 <- newEmptyTMVarIO
let h1 = P2PHandleTMVar hdl1 wait1
let h2 = P2PHandleTMVar hdl2 wait2
let serverconn = P2PConnection Nothing
(const True) h1 h2
(ConnIdent (Just "http server"))
let clientconn = P2PConnection Nothing
(const True) h2 h1
(ConnIdent (Just "http client"))
clientrunst <- mkclientrunst
serverrunst <- mkserverrunst
asyncworker <- async $
startworker serverrunst serverconn
let releaseconn = atomically $ putTMVar relv $
liftIO $ wait asyncworker
>>= either throwM return
return $ Right $ P2PConnectionPair
{ clientRunState = clientrunst
, clientP2PConnection = clientconn
, serverP2PConnection = serverconn
, releaseP2PConnection = releaseconn
}
where
mkserverrunst = do
prototvar <- newTVarIO $ connectionProtocolVersion connparams prototvar <- newTVarIO $ connectionProtocolVersion connparams
mkRunState $ const $ Serving mkRunState $ const $ Serving
(connectionClientUUID connparams) (connectionClientUUID connparams)
Nothing Nothing
prototvar prototvar
mkclientrunst connparams = do mkclientrunst = do
prototvar <- newTVarIO $ connectionProtocolVersion connparams prototvar <- newTVarIO $ connectionProtocolVersion connparams
mkRunState $ const $ Client prototvar mkRunState $ const $ Client prototvar
@ -285,25 +288,27 @@ getAnnexWorkerPool a = startConcurrency transferStages $
Just wp -> a wp Just wp -> a wp
inAnnexWorker :: P2PHttpServerState -> Annex a -> IO (Either SomeException a) inAnnexWorker :: P2PHttpServerState -> Annex a -> IO (Either SomeException a)
inAnnexWorker st annexaction = do inAnnexWorker st = inAnnexWorker' (annexWorkerPool st)
(workerstrd, workerstage) <- atomically $
waitStartWorkerSlot (annexWorkerPool st) inAnnexWorker' :: AnnexWorkerPool -> Annex a -> IO (Either SomeException a)
inAnnexWorker' poolv annexaction = do
(workerstrd, workerstage) <- atomically $ waitStartWorkerSlot poolv
resv <- newEmptyTMVarIO resv <- newEmptyTMVarIO
aid <- async $ do aid <- async $ do
(res, strd) <- Annex.run workerstrd annexaction (res, strd) <- Annex.run workerstrd annexaction
atomically $ putTMVar resv res atomically $ putTMVar resv res
return strd return strd
atomically $ do atomically $ do
pool <- takeTMVar (annexWorkerPool st) pool <- takeTMVar poolv
let !pool' = addWorkerPool (ActiveWorker aid workerstage) pool let !pool' = addWorkerPool (ActiveWorker aid workerstage) pool
putTMVar (annexWorkerPool st) pool' putTMVar poolv pool'
(res, workerstrd') <- waitCatch aid >>= \case (res, workerstrd') <- waitCatch aid >>= \case
Right strd -> do Right strd -> do
r <- atomically $ takeTMVar resv r <- atomically $ takeTMVar resv
return (Right r, strd) return (Right r, strd)
Left err -> return (Left err, workerstrd) Left err -> return (Left err, workerstrd)
atomically $ do atomically $ do
pool <- takeTMVar (annexWorkerPool st) pool <- takeTMVar poolv
let !pool' = deactivateWorker pool aid workerstrd' let !pool' = deactivateWorker pool aid workerstrd'
putTMVar (annexWorkerPool st) pool' putTMVar poolv pool'
return res return res

View file

@ -40,8 +40,6 @@ Planned schedule of work:
* Make Remote.Git use http client when annex.url is configured. * Make Remote.Git use http client when annex.url is configured.
* withLocalP2PConnections could use a worker pool.
* Make http server support proxies and clusters. * Make http server support proxies and clusters.
* Perhaps: Support cgi program that proxies over to a webserver * Perhaps: Support cgi program that proxies over to a webserver