annex.jobs=cpus etc

Added the ability to run one job per CPU (core), by setting annex.jobs=cpus,
or using option --jobs=cpus or -Jcpus.

Built with future expansion in mind, including not defaulting matching on
Concurrency so more constructors can later be added, and using "cpu"
instead of "0".
This commit is contained in:
Joey Hess 2019-05-10 13:24:31 -04:00
parent 459bbd9005
commit 82186ca58f
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
20 changed files with 105 additions and 32 deletions

View file

@ -22,6 +22,7 @@ import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception (throwIO)
import GHC.Conc
import Data.Either
import qualified Data.Map.Strict as M
import qualified System.Console.Regions as Regions
@ -51,9 +52,14 @@ performCommandAction Command { cmdcheck = c, cmdname = name } seek cont = do
- This should only be run in the seek stage.
-}
commandAction :: CommandStart -> Annex ()
commandAction a = go =<< Annex.getState Annex.concurrency
commandAction a = Annex.getState Annex.concurrency >>= \case
NonConcurrent -> run
Concurrent n -> runconcurrent n
ConcurrentPerCpu -> runconcurrent =<< liftIO getNumProcessors
where
go (Concurrent n) = do
run = void $ includeCommandAction a
runconcurrent n = do
ws <- Annex.getState Annex.workers
(st, ws') <- if null ws
then do
@ -71,8 +77,6 @@ commandAction a = go =<< Annex.getState Annex.concurrency
w <- liftIO $ async
$ snd <$> Annex.run st (inOwnConsoleRegion (Annex.output st) run)
Annex.changeState $ \s -> s { Annex.workers = Right w:ws' }
go NonConcurrent = run
run = void $ includeCommandAction a
commandActions :: [CommandStart] -> Annex ()
commandActions = mapM_ commandAction
@ -170,18 +174,23 @@ allowConcurrentOutput :: Annex a -> Annex a
allowConcurrentOutput a = do
fromcmdline <- Annex.getState Annex.concurrency
fromgitcfg <- annexJobs <$> Annex.getGitConfig
let usegitcfg = Annex.changeState $
\c -> c { Annex.concurrency = fromgitcfg }
case (fromcmdline, fromgitcfg) of
(NonConcurrent, NonConcurrent) -> a
(Concurrent n, _) -> goconcurrent n
(Concurrent n, _) -> do
raisecapabilitiesto n
goconcurrent
(ConcurrentPerCpu, _) -> goconcurrent
(NonConcurrent, Concurrent n) -> do
Annex.changeState $
\c -> c { Annex.concurrency = fromgitcfg }
goconcurrent n
usegitcfg
raisecapabilitiesto n
goconcurrent
(NonConcurrent, ConcurrentPerCpu) -> do
usegitcfg
goconcurrent
where
goconcurrent n = do
c <- liftIO getNumCapabilities
when (n > c) $
liftIO $ setNumCapabilities n
goconcurrent = do
withMessageState $ \s -> case outputType s of
NormalOutput -> ifM (liftIO concurrentOutputSupported)
( Regions.displayConsoleRegions $
@ -190,13 +199,21 @@ allowConcurrentOutput a = do
)
_ -> goconcurrent' False
goconcurrent' b = bracket_ (setup b) cleanup a
setup = setconcurrentoutputenabled
cleanup = do
finishCommandActions
setconcurrentoutputenabled False
setconcurrentoutputenabled b = Annex.changeState $ \s ->
s { Annex.output = (Annex.output s) { concurrentOutputEnabled = b } }
raisecapabilitiesto n = do
c <- liftIO getNumCapabilities
when (n > c) $
liftIO $ setNumCapabilities n
{- Ensures that only one thread processes a key at a time.
- Other threads will block until it's done. -}
onlyActionOn :: Key -> CommandStart -> CommandStart
@ -212,7 +229,9 @@ onlyActionOn' :: Key -> Annex a -> Annex a
onlyActionOn' k a = go =<< Annex.getState Annex.concurrency
where
go NonConcurrent = a
go (Concurrent _) = do
go (Concurrent _) = goconcurrent
go ConcurrentPerCpu = goconcurrent
goconcurrent = do
tv <- Annex.getState Annex.activekeys
bracket (setup tv) id (const a)
setup tv = liftIO $ do

View file

@ -366,14 +366,15 @@ jsonProgressOption =
jobsOption :: [GlobalOption]
jobsOption =
[ globalSetter set $
option auto
( long "jobs" <> short 'J' <> metavar paramNumber
option (maybeReader parseConcurrency)
( long "jobs" <> short 'J'
<> metavar (paramNumber `paramOr` "cpu")
<> help "enable concurrent jobs"
<> hidden
)
]
where
set n = Annex.changeState $ \s -> s { Annex.concurrency = Concurrent n }
set v = Annex.changeState $ \s -> s { Annex.concurrency = v }
timeLimitOption :: [GlobalOption]
timeLimitOption =