diff --git a/Annex/WorkerPool.hs b/Annex/WorkerPool.hs index 9f2b7f872f..ddad985b42 100644 --- a/Annex/WorkerPool.hs +++ b/Annex/WorkerPool.hs @@ -97,13 +97,11 @@ changeStageTo mytid tv getnewstage = liftIO $ -- | Waits until there's an idle StartStage worker in the worker pool, -- removes it from the pool, and returns its state. --- --- If the worker pool is not already allocated, returns Nothing. -waitStartWorkerSlot :: TMVar (WorkerPool t) -> STM (Maybe (t, WorkerStage)) +waitStartWorkerSlot :: TMVar (WorkerPool t) -> STM (t, WorkerStage) waitStartWorkerSlot tv = do pool <- takeTMVar tv v <- go pool - return $ Just (v, StartStage) + return (v, StartStage) where go pool = case spareVals pool of [] -> retry diff --git a/CmdLine/Action.hs b/CmdLine/Action.hs index fbfc92272c..d0e5d3e733 100644 --- a/CmdLine/Action.hs +++ b/CmdLine/Action.hs @@ -87,9 +87,8 @@ commandAction start = do runconcurrent sizelimit Nothing = runnonconcurrent sizelimit runconcurrent sizelimit (Just tv) = - liftIO (atomically (waitStartWorkerSlot tv)) >>= maybe - (runnonconcurrent sizelimit) - (runconcurrent' sizelimit tv) + liftIO (atomically (waitStartWorkerSlot tv)) + >>= runconcurrent' sizelimit tv runconcurrent' sizelimit tv (workerstrd, workerstage) = do aid <- liftIO $ async $ snd <$> Annex.run workerstrd diff --git a/Command/P2PHttp.hs b/Command/P2PHttp.hs index 077617ca1b..4b141adbe1 100644 --- a/Command/P2PHttp.hs +++ b/Command/P2PHttp.hs @@ -28,7 +28,7 @@ import Network.Socket (PortNumber) import qualified Data.Map as M cmd :: Command -cmd = command "p2phttp" SectionPlumbing +cmd = withAnnexOptions [jobsOption] $ command "p2phttp" SectionPlumbing "communicate in P2P protocol over http" paramNothing (seek <$$> optParser) @@ -69,7 +69,7 @@ optParser _ = Options ) seek :: Options -> CommandSeek -seek o = startConcurrency commandStages $ do +seek o = getAnnexWorkerPool $ \workerpool -> do -- XXX remove this when (isNothing (portOption o)) $ do liftIO $ putStrLn "test begins" @@ -77,7 +77,7 @@ seek o = startConcurrency commandStages $ do giveup "TEST DONE" withLocalP2PConnections $ \acquireconn -> liftIO $ do authenv <- getAuthEnv - st <- mkP2PHttpServerState acquireconn $ + st <- mkP2PHttpServerState acquireconn workerpool $ mkGetServerMode authenv o Warp.run (fromIntegral port) (p2pHttpApp st) where diff --git a/P2P/Http.hs b/P2P/Http.hs index a361105b71..4fa0dab043 100644 --- a/P2P/Http.hs +++ b/P2P/Http.hs @@ -23,6 +23,10 @@ import P2P.Http.Types import P2P.Http.State import P2P.Protocol hiding (Offset, Bypass, auth) import P2P.IO +import Annex.WorkerPool +import Types.WorkerPool +import Types.Direction +import Utility.Metered import Servant import Servant.Client.Streaming @@ -114,11 +118,12 @@ serveGetGeneric :: P2PHttpServerState -> B64Key -> Handler (S.SourceT IO B.ByteS serveGetGeneric = undefined -- TODO type GetAPI - = ClientUUID Optional - :> ServerUUID Optional + = ClientUUID Required + :> ServerUUID Required :> BypassUUIDs :> AssociatedFileParam :> OffsetParam + :> IsSecure :> AuthHeader :> StreamGet NoFraming OctetStream (Headers '[DataLengthHeader] (SourceIO B.ByteString)) @@ -128,20 +133,46 @@ serveGet => P2PHttpServerState -> v -> B64Key - -> Maybe (B64UUID ClientSide) - -> Maybe (B64UUID ServerSide) + -> B64UUID ClientSide + -> B64UUID ServerSide -> [B64UUID Bypass] -> Maybe B64FilePath -> Maybe Offset + -> IsSecure -> Maybe Auth -> Handler (Headers '[DataLengthHeader] (S.SourceT IO B.ByteString)) -serveGet = undefined -- TODO +serveGet st apiver (B64Key k) cu su bypass baf startat sec auth = do + res <- withP2PConnection apiver st cu su bypass sec auth ReadAction + $ \runst conn -> do + liftIO $ inAnnexWorker st $ + enteringStage (TransferStage Upload) $ do + liftIO $ print "IN ANNEX WORKER!" + {- + let storer offset len getdata checkvalidity = do + undefined -- FIXME + -- XXX needs to run in annex monad to runFullProto + liftIO $ runNetProto runst conn $ + receiveContent Nothing nullMeterUpdate + sizer storer getreq + -} + undefined + undefined -- XXX fixme streaming out + where + sizer = Len $ case startat of + Just (Offset o) -> fromIntegral o + Nothing -> 0 + + getreq offset = P2P.Protocol.GET offset (ProtoAssociatedFile af) k + + af = AssociatedFile $ case baf of + Just (B64FilePath f) -> Just f + Nothing -> Nothing clientGet :: ProtocolVersion -> B64Key - -> Maybe (B64UUID ClientSide) - -> Maybe (B64UUID ServerSide) + -> B64UUID ClientSide + -> B64UUID ServerSide -> [B64UUID Bypass] -> Maybe B64FilePath -> Maybe Offset diff --git a/P2P/Http/State.hs b/P2P/Http/State.hs index 29d2351e25..7c42b2ea93 100644 --- a/P2P/Http/State.hs +++ b/P2P/Http/State.hs @@ -13,12 +13,16 @@ module P2P.Http.State where import Annex.Common +import qualified Annex import P2P.Http.Types import qualified P2P.Protocol as P2P import P2P.IO import P2P.Annex import Annex.UUID import Annex.Concurrent +import Types.WorkerPool +import Annex.WorkerPool +import CmdLine.Action (startConcurrency) import Servant import qualified Data.Map as M @@ -27,13 +31,20 @@ import Control.Concurrent.STM data P2PHttpServerState = P2PHttpServerState { acquireP2PConnection :: AcquireP2PConnection + , annexWorkerPool :: AnnexWorkerPool , getServerMode :: GetServerMode , openLocks :: TMVar (M.Map LockID Locker) } -mkP2PHttpServerState :: AcquireP2PConnection -> GetServerMode -> IO P2PHttpServerState -mkP2PHttpServerState acquireconn getservermode = P2PHttpServerState +type AnnexWorkerPool = TMVar (WorkerPool (Annex.AnnexState, Annex.AnnexRead)) + +-- Nothing when the server is not allowed to serve any requests. +type GetServerMode = IsSecure -> Maybe Auth -> Maybe P2P.ServerMode + +mkP2PHttpServerState :: AcquireP2PConnection -> AnnexWorkerPool -> GetServerMode -> IO P2PHttpServerState +mkP2PHttpServerState acquireconn annexworkerpool getservermode = P2PHttpServerState <$> pure acquireconn + <*> pure annexworkerpool <*> pure getservermode <*> newTMVarIO mempty @@ -89,9 +100,6 @@ basicAuthRequired = err401 { errHeaders = [(h, v)] } h = "WWW-Authenticate" v = "Basic realm=\"git-annex\", charset=\"UTF-8\"" --- Nothing when the server is not allowed to serve any requests. -type GetServerMode = IsSecure -> Maybe Auth -> Maybe P2P.ServerMode - data ConnectionParams = ConnectionParams { connectionProtocolVersion :: P2P.ProtocolVersion , connectionServerUUID :: UUID @@ -237,3 +245,33 @@ dropLock lckid st = do case v of Nothing -> return () Just locker -> wait (lockerThread locker) + +getAnnexWorkerPool :: (AnnexWorkerPool -> Annex a) -> Annex a +getAnnexWorkerPool a = startConcurrency transferStages $ + Annex.getState Annex.workers >>= \case + Nothing -> giveup "Use -Jn or set annex.jobs to configure the number of worker threads." + Just wp -> a wp + +inAnnexWorker :: P2PHttpServerState -> Annex a -> IO (Either SomeException a) +inAnnexWorker st annexaction = do + (workerstrd, workerstage) <- atomically $ + waitStartWorkerSlot (annexWorkerPool st) + resv <- newEmptyTMVarIO + aid <- async $ do + (res, strd) <- Annex.run workerstrd annexaction + atomically $ putTMVar resv res + return strd + atomically $ do + pool <- takeTMVar (annexWorkerPool st) + let !pool' = addWorkerPool (ActiveWorker aid workerstage) pool + putTMVar (annexWorkerPool st) pool' + (res, workerstrd') <- waitCatch aid >>= \case + Right strd -> do + r <- atomically $ takeTMVar resv + return (Right r, strd) + Left err -> return (Left err, workerstrd) + atomically $ do + pool <- takeTMVar (annexWorkerPool st) + let !pool' = deactivateWorker pool aid workerstrd' + putTMVar (annexWorkerPool st) pool' + return res diff --git a/P2P/Protocol.hs b/P2P/Protocol.hs index 18fc729f33..9f8131d3b8 100644 --- a/P2P/Protocol.hs +++ b/P2P/Protocol.hs @@ -49,7 +49,7 @@ import Control.DeepSeq import Prelude newtype Offset = Offset Integer - deriving (Show, NFData) + deriving (Show, Eq, NFData, Num, Real, Ord, Enum, Integral) newtype Len = Len Integer deriving (Show) diff --git a/doc/git-annex-p2phttp.mdwn b/doc/git-annex-p2phttp.mdwn index 0e58e18665..655084e37b 100644 --- a/doc/git-annex-p2phttp.mdwn +++ b/doc/git-annex-p2phttp.mdwn @@ -14,6 +14,14 @@ a repository over HTTP with write access for authenticated users. # OPTIONS +* `--jobs=N` + + Use this or annex.jobs must be set to configure the number of worker + threads. + + Since the webserver itself uses one thread, this needs to be set to + 2 or more. + * `--port=N` Port to listen on. Default is port 80.