From 3c18398d5ad06aaff480d20aa7765fce81efbf24 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Thu, 21 Nov 2024 14:15:14 -0400 Subject: [PATCH] p2phttp support --jobs with --directory --jobs is usually an Annex option setter, but --directory runs in IO, so would not have that available. So instead moved the option parser into the command's Options. --- CmdLine/GitAnnex/Options.hs | 18 +++++++++++------- Command/P2PHttp.hs | 20 ++++++++++---------- P2P/Http/State.hs | 14 +++++++++----- 3 files changed, 30 insertions(+), 22 deletions(-) diff --git a/CmdLine/GitAnnex/Options.hs b/CmdLine/GitAnnex/Options.hs index 00a23484f8..890f9654de 100644 --- a/CmdLine/GitAnnex/Options.hs +++ b/CmdLine/GitAnnex/Options.hs @@ -511,15 +511,19 @@ jsonProgressOption = -- action in `allowConcurrentOutput`. jobsOption :: [AnnexOption] jobsOption = - [ annexOption (setAnnexState . setConcurrency . ConcurrencyCmdLine) $ - option (maybeReader parseConcurrency) - ( long "jobs" <> short 'J' - <> metavar (paramNumber `paramOr` "cpus") - <> help "enable concurrent jobs" - <> hidden - ) + [ annexOption (setAnnexState . setConcurrency . ConcurrencyCmdLine) + jobsOptionParser ] +jobsOptionParser :: Parser Concurrency +jobsOptionParser = + option (maybeReader parseConcurrency) + ( long "jobs" <> short 'J' + <> metavar (paramNumber `paramOr` "cpus") + <> help "enable concurrent jobs" + <> hidden + ) + timeLimitOption :: [AnnexOption] timeLimitOption = [ annexOption settimelimit $ option (eitherReader parseDuration) diff --git a/Command/P2PHttp.hs b/Command/P2PHttp.hs index 5246b09302..cb2c752055 100644 --- a/Command/P2PHttp.hs +++ b/Command/P2PHttp.hs @@ -11,7 +11,7 @@ module Command.P2PHttp where -import Command +import Command hiding (jobsOption) import P2P.Http.Server import P2P.Http.Url import qualified P2P.Protocol as P2P @@ -20,6 +20,7 @@ import Annex.UUID import qualified Git import qualified Git.Construct import qualified Annex +import Types.Concurrency import Servant import qualified Network.Wai.Handler.Warp as Warp @@ -29,12 +30,11 @@ import qualified Data.Map as M import Data.String cmd :: Command -cmd = withAnnexOptions [jobsOption] $ - noMessages $ dontCheck repoExists $ - noRepo (startIO <$$> optParser) $ - command "p2phttp" SectionPlumbing - "communicate in P2P protocol over http" - paramNothing (startAnnex <$$> optParser) +cmd = noMessages $ dontCheck repoExists $ + noRepo (startIO <$$> optParser) $ + command "p2phttp" SectionPlumbing + "communicate in P2P protocol over http" + paramNothing (startAnnex <$$> optParser) data Options = Options { portOption :: Maybe PortNumber @@ -49,6 +49,7 @@ data Options = Options , unauthNoLockingOption :: Bool , wideOpenOption :: Bool , proxyConnectionsOption :: Maybe Integer + , jobsOption :: Maybe Concurrency , clusterJobsOption :: Maybe Int , directoryOption :: [FilePath] } @@ -103,6 +104,7 @@ optParser _ = Options ( long "proxyconnections" <> metavar paramNumber <> help "maximum number of idle connections when proxying" )) + <*> optional jobsOptionParser <*> optional (option auto ( long "clusterjobs" <> metavar paramNumber <> help "number of concurrent node accesses per connection" @@ -124,8 +126,6 @@ startAnnex o ) | otherwise = liftIO $ startIO o --- TODO --jobs option only available to startAnnex, not here, need --- to parse it into Options for this command. startIO :: Options -> IO () startIO o | null (directoryOption o) = @@ -162,7 +162,7 @@ runServer o mst = go `finally` serverShutdownCleanup mst mkServerState :: Options -> M.Map Auth P2P.ServerMode -> Annex P2PHttpServerState mkServerState o authenv = - getAnnexWorkerPool $ + withAnnexWorkerPool (jobsOption o) $ mkP2PHttpServerState (mkGetServerMode authenv o) (fromMaybe 1 $ proxyConnectionsOption o) diff --git a/P2P/Http/State.hs b/P2P/Http/State.hs index 3a15b3e902..556553cfa1 100644 --- a/P2P/Http/State.hs +++ b/P2P/Http/State.hs @@ -26,6 +26,8 @@ import Types.NumCopies import Types.WorkerPool import Annex.WorkerPool import Annex.BranchState +import Annex.Concurrent +import Types.Concurrency import Types.Cluster import CmdLine.Action (startConcurrency) import Utility.ThreadScheduler @@ -551,11 +553,13 @@ dropLock lckid st = do Nothing -> return () Just locker -> wait (lockerThread locker) -getAnnexWorkerPool :: (AnnexWorkerPool -> Annex a) -> Annex a -getAnnexWorkerPool a = startConcurrency transferStages $ - Annex.getState Annex.workers >>= \case - Nothing -> giveup "Use -Jn or set annex.jobs to configure the number of worker threads." - Just wp -> a wp +withAnnexWorkerPool :: (Maybe Concurrency) -> (AnnexWorkerPool -> Annex a) -> Annex a +withAnnexWorkerPool mc a = do + maybe noop (setConcurrency . ConcurrencyCmdLine) mc + startConcurrency transferStages $ + Annex.getState Annex.workers >>= \case + Nothing -> giveup "Use -Jn or set annex.jobs to configure the number of worker threads." + Just wp -> a wp inAnnexWorker :: PerRepoServerState -> Annex a -> IO (Either SomeException a) inAnnexWorker st = inAnnexWorker' (annexWorkerPool st)