add Annex worker pool to P2PHttp

This will be needed for get and store, since those need to run Annex
actions.

withLocalP2PConnections will also probably use it.
This commit is contained in:
Joey Hess 2024-07-10 12:19:47 -04:00
parent d4b9aea87b
commit f9b7ce7224
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
7 changed files with 97 additions and 23 deletions

View file

@ -97,13 +97,11 @@ changeStageTo mytid tv getnewstage = liftIO $
-- | Waits until there's an idle StartStage worker in the worker pool, -- | Waits until there's an idle StartStage worker in the worker pool,
-- removes it from the pool, and returns its state. -- removes it from the pool, and returns its state.
-- waitStartWorkerSlot :: TMVar (WorkerPool t) -> STM (t, WorkerStage)
-- If the worker pool is not already allocated, returns Nothing.
waitStartWorkerSlot :: TMVar (WorkerPool t) -> STM (Maybe (t, WorkerStage))
waitStartWorkerSlot tv = do waitStartWorkerSlot tv = do
pool <- takeTMVar tv pool <- takeTMVar tv
v <- go pool v <- go pool
return $ Just (v, StartStage) return (v, StartStage)
where where
go pool = case spareVals pool of go pool = case spareVals pool of
[] -> retry [] -> retry

View file

@ -87,9 +87,8 @@ commandAction start = do
runconcurrent sizelimit Nothing = runnonconcurrent sizelimit runconcurrent sizelimit Nothing = runnonconcurrent sizelimit
runconcurrent sizelimit (Just tv) = runconcurrent sizelimit (Just tv) =
liftIO (atomically (waitStartWorkerSlot tv)) >>= maybe liftIO (atomically (waitStartWorkerSlot tv))
(runnonconcurrent sizelimit) >>= runconcurrent' sizelimit tv
(runconcurrent' sizelimit tv)
runconcurrent' sizelimit tv (workerstrd, workerstage) = do runconcurrent' sizelimit tv (workerstrd, workerstage) = do
aid <- liftIO $ async $ snd aid <- liftIO $ async $ snd
<$> Annex.run workerstrd <$> Annex.run workerstrd

View file

@ -28,7 +28,7 @@ import Network.Socket (PortNumber)
import qualified Data.Map as M import qualified Data.Map as M
cmd :: Command cmd :: Command
cmd = command "p2phttp" SectionPlumbing cmd = withAnnexOptions [jobsOption] $ command "p2phttp" SectionPlumbing
"communicate in P2P protocol over http" "communicate in P2P protocol over http"
paramNothing (seek <$$> optParser) paramNothing (seek <$$> optParser)
@ -69,7 +69,7 @@ optParser _ = Options
) )
seek :: Options -> CommandSeek seek :: Options -> CommandSeek
seek o = startConcurrency commandStages $ do seek o = getAnnexWorkerPool $ \workerpool -> do
-- XXX remove this -- XXX remove this
when (isNothing (portOption o)) $ do when (isNothing (portOption o)) $ do
liftIO $ putStrLn "test begins" liftIO $ putStrLn "test begins"
@ -77,7 +77,7 @@ seek o = startConcurrency commandStages $ do
giveup "TEST DONE" giveup "TEST DONE"
withLocalP2PConnections $ \acquireconn -> liftIO $ do withLocalP2PConnections $ \acquireconn -> liftIO $ do
authenv <- getAuthEnv authenv <- getAuthEnv
st <- mkP2PHttpServerState acquireconn $ st <- mkP2PHttpServerState acquireconn workerpool $
mkGetServerMode authenv o mkGetServerMode authenv o
Warp.run (fromIntegral port) (p2pHttpApp st) Warp.run (fromIntegral port) (p2pHttpApp st)
where where

View file

@ -23,6 +23,10 @@ import P2P.Http.Types
import P2P.Http.State import P2P.Http.State
import P2P.Protocol hiding (Offset, Bypass, auth) import P2P.Protocol hiding (Offset, Bypass, auth)
import P2P.IO import P2P.IO
import Annex.WorkerPool
import Types.WorkerPool
import Types.Direction
import Utility.Metered
import Servant import Servant
import Servant.Client.Streaming import Servant.Client.Streaming
@ -114,11 +118,12 @@ serveGetGeneric :: P2PHttpServerState -> B64Key -> Handler (S.SourceT IO B.ByteS
serveGetGeneric = undefined -- TODO serveGetGeneric = undefined -- TODO
type GetAPI type GetAPI
= ClientUUID Optional = ClientUUID Required
:> ServerUUID Optional :> ServerUUID Required
:> BypassUUIDs :> BypassUUIDs
:> AssociatedFileParam :> AssociatedFileParam
:> OffsetParam :> OffsetParam
:> IsSecure
:> AuthHeader :> AuthHeader
:> StreamGet NoFraming OctetStream :> StreamGet NoFraming OctetStream
(Headers '[DataLengthHeader] (SourceIO B.ByteString)) (Headers '[DataLengthHeader] (SourceIO B.ByteString))
@ -128,20 +133,46 @@ serveGet
=> P2PHttpServerState => P2PHttpServerState
-> v -> v
-> B64Key -> B64Key
-> Maybe (B64UUID ClientSide) -> B64UUID ClientSide
-> Maybe (B64UUID ServerSide) -> B64UUID ServerSide
-> [B64UUID Bypass] -> [B64UUID Bypass]
-> Maybe B64FilePath -> Maybe B64FilePath
-> Maybe Offset -> Maybe Offset
-> IsSecure
-> Maybe Auth -> Maybe Auth
-> Handler (Headers '[DataLengthHeader] (S.SourceT IO B.ByteString)) -> Handler (Headers '[DataLengthHeader] (S.SourceT IO B.ByteString))
serveGet = undefined -- TODO serveGet st apiver (B64Key k) cu su bypass baf startat sec auth = do
res <- withP2PConnection apiver st cu su bypass sec auth ReadAction
$ \runst conn -> do
liftIO $ inAnnexWorker st $
enteringStage (TransferStage Upload) $ do
liftIO $ print "IN ANNEX WORKER!"
{-
let storer offset len getdata checkvalidity = do
undefined -- FIXME
-- XXX needs to run in annex monad to runFullProto
liftIO $ runNetProto runst conn $
receiveContent Nothing nullMeterUpdate
sizer storer getreq
-}
undefined
undefined -- XXX fixme streaming out
where
sizer = Len $ case startat of
Just (Offset o) -> fromIntegral o
Nothing -> 0
getreq offset = P2P.Protocol.GET offset (ProtoAssociatedFile af) k
af = AssociatedFile $ case baf of
Just (B64FilePath f) -> Just f
Nothing -> Nothing
clientGet clientGet
:: ProtocolVersion :: ProtocolVersion
-> B64Key -> B64Key
-> Maybe (B64UUID ClientSide) -> B64UUID ClientSide
-> Maybe (B64UUID ServerSide) -> B64UUID ServerSide
-> [B64UUID Bypass] -> [B64UUID Bypass]
-> Maybe B64FilePath -> Maybe B64FilePath
-> Maybe Offset -> Maybe Offset

View file

@ -13,12 +13,16 @@
module P2P.Http.State where module P2P.Http.State where
import Annex.Common import Annex.Common
import qualified Annex
import P2P.Http.Types import P2P.Http.Types
import qualified P2P.Protocol as P2P import qualified P2P.Protocol as P2P
import P2P.IO import P2P.IO
import P2P.Annex import P2P.Annex
import Annex.UUID import Annex.UUID
import Annex.Concurrent import Annex.Concurrent
import Types.WorkerPool
import Annex.WorkerPool
import CmdLine.Action (startConcurrency)
import Servant import Servant
import qualified Data.Map as M import qualified Data.Map as M
@ -27,13 +31,20 @@ import Control.Concurrent.STM
data P2PHttpServerState = P2PHttpServerState data P2PHttpServerState = P2PHttpServerState
{ acquireP2PConnection :: AcquireP2PConnection { acquireP2PConnection :: AcquireP2PConnection
, annexWorkerPool :: AnnexWorkerPool
, getServerMode :: GetServerMode , getServerMode :: GetServerMode
, openLocks :: TMVar (M.Map LockID Locker) , openLocks :: TMVar (M.Map LockID Locker)
} }
mkP2PHttpServerState :: AcquireP2PConnection -> GetServerMode -> IO P2PHttpServerState type AnnexWorkerPool = TMVar (WorkerPool (Annex.AnnexState, Annex.AnnexRead))
mkP2PHttpServerState acquireconn getservermode = P2PHttpServerState
-- Nothing when the server is not allowed to serve any requests.
type GetServerMode = IsSecure -> Maybe Auth -> Maybe P2P.ServerMode
mkP2PHttpServerState :: AcquireP2PConnection -> AnnexWorkerPool -> GetServerMode -> IO P2PHttpServerState
mkP2PHttpServerState acquireconn annexworkerpool getservermode = P2PHttpServerState
<$> pure acquireconn <$> pure acquireconn
<*> pure annexworkerpool
<*> pure getservermode <*> pure getservermode
<*> newTMVarIO mempty <*> newTMVarIO mempty
@ -89,9 +100,6 @@ basicAuthRequired = err401 { errHeaders = [(h, v)] }
h = "WWW-Authenticate" h = "WWW-Authenticate"
v = "Basic realm=\"git-annex\", charset=\"UTF-8\"" v = "Basic realm=\"git-annex\", charset=\"UTF-8\""
-- Nothing when the server is not allowed to serve any requests.
type GetServerMode = IsSecure -> Maybe Auth -> Maybe P2P.ServerMode
data ConnectionParams = ConnectionParams data ConnectionParams = ConnectionParams
{ connectionProtocolVersion :: P2P.ProtocolVersion { connectionProtocolVersion :: P2P.ProtocolVersion
, connectionServerUUID :: UUID , connectionServerUUID :: UUID
@ -237,3 +245,33 @@ dropLock lckid st = do
case v of case v of
Nothing -> return () Nothing -> return ()
Just locker -> wait (lockerThread locker) Just locker -> wait (lockerThread locker)
getAnnexWorkerPool :: (AnnexWorkerPool -> Annex a) -> Annex a
getAnnexWorkerPool a = startConcurrency transferStages $
Annex.getState Annex.workers >>= \case
Nothing -> giveup "Use -Jn or set annex.jobs to configure the number of worker threads."
Just wp -> a wp
inAnnexWorker :: P2PHttpServerState -> Annex a -> IO (Either SomeException a)
inAnnexWorker st annexaction = do
(workerstrd, workerstage) <- atomically $
waitStartWorkerSlot (annexWorkerPool st)
resv <- newEmptyTMVarIO
aid <- async $ do
(res, strd) <- Annex.run workerstrd annexaction
atomically $ putTMVar resv res
return strd
atomically $ do
pool <- takeTMVar (annexWorkerPool st)
let !pool' = addWorkerPool (ActiveWorker aid workerstage) pool
putTMVar (annexWorkerPool st) pool'
(res, workerstrd') <- waitCatch aid >>= \case
Right strd -> do
r <- atomically $ takeTMVar resv
return (Right r, strd)
Left err -> return (Left err, workerstrd)
atomically $ do
pool <- takeTMVar (annexWorkerPool st)
let !pool' = deactivateWorker pool aid workerstrd'
putTMVar (annexWorkerPool st) pool'
return res

View file

@ -49,7 +49,7 @@ import Control.DeepSeq
import Prelude import Prelude
newtype Offset = Offset Integer newtype Offset = Offset Integer
deriving (Show, NFData) deriving (Show, Eq, NFData, Num, Real, Ord, Enum, Integral)
newtype Len = Len Integer newtype Len = Len Integer
deriving (Show) deriving (Show)

View file

@ -14,6 +14,14 @@ a repository over HTTP with write access for authenticated users.
# OPTIONS # OPTIONS
* `--jobs=N`
Use this or annex.jobs must be set to configure the number of worker
threads.
Since the webserver itself uses one thread, this needs to be set to
2 or more.
* `--port=N` * `--port=N`
Port to listen on. Default is port 80. Port to listen on. Default is port 80.