differentiate between concurrency enabled at command line and by git config
The latter should not affect --batch mode.
This commit is contained in:
parent
8471df3b6d
commit
77c42782d0
9 changed files with 46 additions and 21 deletions
4
Annex.hs
4
Annex.hs
|
@ -112,7 +112,7 @@ data AnnexState = AnnexState
|
||||||
, backend :: Maybe (BackendA Annex)
|
, backend :: Maybe (BackendA Annex)
|
||||||
, remotes :: [Types.Remote.RemoteA Annex]
|
, remotes :: [Types.Remote.RemoteA Annex]
|
||||||
, output :: MessageState
|
, output :: MessageState
|
||||||
, concurrency :: Concurrency
|
, concurrency :: ConcurrencySetting
|
||||||
, force :: Bool
|
, force :: Bool
|
||||||
, fast :: Bool
|
, fast :: Bool
|
||||||
, daemon :: Bool
|
, daemon :: Bool
|
||||||
|
@ -171,7 +171,7 @@ newState c r = do
|
||||||
, backend = Nothing
|
, backend = Nothing
|
||||||
, remotes = []
|
, remotes = []
|
||||||
, output = o
|
, output = o
|
||||||
, concurrency = NonConcurrent
|
, concurrency = ConcurrencyCmdLine NonConcurrent
|
||||||
, force = False
|
, force = False
|
||||||
, fast = False
|
, fast = False
|
||||||
, daemon = False
|
, daemon = False
|
||||||
|
|
|
@ -5,10 +5,14 @@
|
||||||
- Licensed under the GNU AGPL version 3 or higher.
|
- Licensed under the GNU AGPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
|
||||||
module Annex.Concurrent where
|
module Annex.Concurrent (
|
||||||
|
module Annex.Concurrent,
|
||||||
|
module Annex.Concurrent.Utility
|
||||||
|
) where
|
||||||
|
|
||||||
import Annex
|
import Annex
|
||||||
import Annex.Common
|
import Annex.Common
|
||||||
|
import Annex.Concurrent.Utility
|
||||||
import qualified Annex.Queue
|
import qualified Annex.Queue
|
||||||
import Annex.Action
|
import Annex.Action
|
||||||
import Types.Concurrency
|
import Types.Concurrency
|
||||||
|
@ -22,19 +26,24 @@ import Control.Concurrent
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
import qualified Data.Map as M
|
import qualified Data.Map as M
|
||||||
|
|
||||||
setConcurrency :: Concurrency -> Annex ()
|
setConcurrency :: ConcurrencySetting -> Annex ()
|
||||||
setConcurrency NonConcurrent = Annex.changeState $ \s -> s
|
setConcurrency (ConcurrencyCmdLine s) = setConcurrency' s ConcurrencyCmdLine
|
||||||
{ Annex.concurrency = NonConcurrent
|
setConcurrency (ConcurrencyGitConfig s) = setConcurrency' s ConcurrencyGitConfig
|
||||||
}
|
|
||||||
setConcurrency c = do
|
setConcurrency' :: Concurrency -> (Concurrency -> ConcurrencySetting) -> Annex ()
|
||||||
cfh <- Annex.getState Annex.catfilehandles
|
setConcurrency' NonConcurrent f =
|
||||||
|
Annex.changeState $ \s -> s
|
||||||
|
{ Annex.concurrency = f NonConcurrent
|
||||||
|
}
|
||||||
|
setConcurrency' c f = do
|
||||||
|
cfh <- getState Annex.catfilehandles
|
||||||
cfh' <- case cfh of
|
cfh' <- case cfh of
|
||||||
CatFileHandlesNonConcurrent _ -> liftIO catFileHandlesPool
|
CatFileHandlesNonConcurrent _ -> liftIO catFileHandlesPool
|
||||||
CatFileHandlesPool _ -> pure cfh
|
CatFileHandlesPool _ -> pure cfh
|
||||||
cah <- mkConcurrentCheckAttrHandle c
|
cah <- mkConcurrentCheckAttrHandle c
|
||||||
cih <- mkConcurrentCheckIgnoreHandle c
|
cih <- mkConcurrentCheckIgnoreHandle c
|
||||||
Annex.changeState $ \s -> s
|
Annex.changeState $ \s -> s
|
||||||
{ Annex.concurrency = c
|
{ Annex.concurrency = f c
|
||||||
, Annex.catfilehandles = cfh'
|
, Annex.catfilehandles = cfh'
|
||||||
, Annex.checkattrhandle = Just cah
|
, Annex.checkattrhandle = Just cah
|
||||||
, Annex.checkignorehandle = Just cih
|
, Annex.checkignorehandle = Just cih
|
||||||
|
@ -74,9 +83,9 @@ dupState = do
|
||||||
st <- Annex.getState id
|
st <- Annex.getState id
|
||||||
-- Make sure that concurrency is enabled, if it was not already,
|
-- Make sure that concurrency is enabled, if it was not already,
|
||||||
-- so the concurrency-safe resource pools are set up.
|
-- so the concurrency-safe resource pools are set up.
|
||||||
st' <- case Annex.concurrency st of
|
st' <- case getConcurrency' (Annex.concurrency st) of
|
||||||
NonConcurrent -> do
|
NonConcurrent -> do
|
||||||
setConcurrency (Concurrent 1)
|
setConcurrency (ConcurrencyCmdLine (Concurrent 1))
|
||||||
Annex.getState id
|
Annex.getState id
|
||||||
_ -> return st
|
_ -> return st
|
||||||
return $ st'
|
return $ st'
|
||||||
|
|
|
@ -7,10 +7,18 @@
|
||||||
|
|
||||||
module Annex.Concurrent.Utility where
|
module Annex.Concurrent.Utility where
|
||||||
|
|
||||||
|
import Annex
|
||||||
import Types.Concurrency
|
import Types.Concurrency
|
||||||
|
|
||||||
import GHC.Conc
|
import GHC.Conc
|
||||||
|
|
||||||
|
getConcurrency :: Annex Concurrency
|
||||||
|
getConcurrency = getConcurrency' <$> getState concurrency
|
||||||
|
|
||||||
|
getConcurrency' :: ConcurrencySetting -> Concurrency
|
||||||
|
getConcurrency' (ConcurrencyCmdLine c) = c
|
||||||
|
getConcurrency' (ConcurrencyGitConfig c) = c
|
||||||
|
|
||||||
{- Honor the requested level of concurrency, but only up to the number of
|
{- Honor the requested level of concurrency, but only up to the number of
|
||||||
- CPU cores. Useful for things that are known to be CPU bound. -}
|
- CPU cores. Useful for things that are known to be CPU bound. -}
|
||||||
concurrencyUpToCpus :: Concurrency -> IO Int
|
concurrencyUpToCpus :: Concurrency -> IO Int
|
||||||
|
|
|
@ -34,6 +34,7 @@ import Annex.Path
|
||||||
import Utility.Env
|
import Utility.Env
|
||||||
import Utility.Hash
|
import Utility.Hash
|
||||||
import Types.CleanupActions
|
import Types.CleanupActions
|
||||||
|
import Annex.Concurrent.Utility
|
||||||
import Types.Concurrency
|
import Types.Concurrency
|
||||||
import Git.Env
|
import Git.Env
|
||||||
import Git.Ssh
|
import Git.Ssh
|
||||||
|
@ -107,7 +108,7 @@ sshCachingInfo (host, port) = go =<< sshCacheDir'
|
||||||
-- No connection caching with concurrency is not a good
|
-- No connection caching with concurrency is not a good
|
||||||
-- combination, so warn the user.
|
-- combination, so warn the user.
|
||||||
go (Left whynocaching) = do
|
go (Left whynocaching) = do
|
||||||
Annex.getState Annex.concurrency >>= \case
|
getConcurrency >>= \case
|
||||||
NonConcurrent -> return ()
|
NonConcurrent -> return ()
|
||||||
Concurrent {} -> warnnocaching whynocaching
|
Concurrent {} -> warnnocaching whynocaching
|
||||||
ConcurrentPerCpu -> warnnocaching whynocaching
|
ConcurrentPerCpu -> warnnocaching whynocaching
|
||||||
|
@ -229,7 +230,7 @@ prepSocket socketfile sshhost sshparams = do
|
||||||
|
|
||||||
let socketlock = socket2lock socketfile
|
let socketlock = socket2lock socketfile
|
||||||
|
|
||||||
Annex.getState Annex.concurrency >>= \case
|
getConcurrency >>= \case
|
||||||
NonConcurrent -> return ()
|
NonConcurrent -> return ()
|
||||||
Concurrent {} -> makeconnection socketlock
|
Concurrent {} -> makeconnection socketlock
|
||||||
ConcurrentPerCpu -> makeconnection socketlock
|
ConcurrentPerCpu -> makeconnection socketlock
|
||||||
|
|
|
@ -31,6 +31,7 @@ import Annex.LockPool
|
||||||
import Types.Key
|
import Types.Key
|
||||||
import qualified Types.Remote as Remote
|
import qualified Types.Remote as Remote
|
||||||
import Types.Concurrency
|
import Types.Concurrency
|
||||||
|
import Annex.Concurrent.Utility
|
||||||
import Types.WorkerPool
|
import Types.WorkerPool
|
||||||
import Annex.WorkerPool
|
import Annex.WorkerPool
|
||||||
import Backend (isCryptographicallySecure)
|
import Backend (isCryptographicallySecure)
|
||||||
|
@ -262,7 +263,7 @@ configuredRetry numretries _old new = do
|
||||||
- increase total transfer speed.
|
- increase total transfer speed.
|
||||||
-}
|
-}
|
||||||
pickRemote :: Observable v => [Remote] -> (Remote -> Annex v) -> Annex v
|
pickRemote :: Observable v => [Remote] -> (Remote -> Annex v) -> Annex v
|
||||||
pickRemote l a = debugLocks $ go l =<< Annex.getState Annex.concurrency
|
pickRemote l a = debugLocks $ go l =<< getConcurrency
|
||||||
where
|
where
|
||||||
go [] _ = return observeFailure
|
go [] _ = return observeFailure
|
||||||
go (r:[]) _ = a r
|
go (r:[]) _ = a r
|
||||||
|
|
|
@ -53,7 +53,7 @@ commandActions = mapM_ commandAction
|
||||||
- This should only be run in the seek stage.
|
- This should only be run in the seek stage.
|
||||||
-}
|
-}
|
||||||
commandAction :: CommandStart -> Annex ()
|
commandAction :: CommandStart -> Annex ()
|
||||||
commandAction start = Annex.getState Annex.concurrency >>= \case
|
commandAction start = getConcurrency >>= \case
|
||||||
NonConcurrent -> runnonconcurrent
|
NonConcurrent -> runnonconcurrent
|
||||||
Concurrent _ -> runconcurrent
|
Concurrent _ -> runconcurrent
|
||||||
ConcurrentPerCpu -> runconcurrent
|
ConcurrentPerCpu -> runconcurrent
|
||||||
|
@ -200,9 +200,9 @@ performCommandAction' startmsg perform =
|
||||||
-}
|
-}
|
||||||
startConcurrency :: UsedStages -> Annex a -> Annex a
|
startConcurrency :: UsedStages -> Annex a -> Annex a
|
||||||
startConcurrency usedstages a = do
|
startConcurrency usedstages a = do
|
||||||
fromcmdline <- Annex.getState Annex.concurrency
|
fromcmdline <- getConcurrency
|
||||||
fromgitcfg <- annexJobs <$> Annex.getGitConfig
|
fromgitcfg <- annexJobs <$> Annex.getGitConfig
|
||||||
let usegitcfg = setConcurrency fromgitcfg
|
let usegitcfg = setConcurrency (ConcurrencyGitConfig fromgitcfg)
|
||||||
case (fromcmdline, fromgitcfg) of
|
case (fromcmdline, fromgitcfg) of
|
||||||
(NonConcurrent, NonConcurrent) -> a
|
(NonConcurrent, NonConcurrent) -> a
|
||||||
(Concurrent n, _) ->
|
(Concurrent n, _) ->
|
||||||
|
@ -258,7 +258,7 @@ startConcurrency usedstages a = do
|
||||||
- May be called repeatedly by the same thread without blocking. -}
|
- May be called repeatedly by the same thread without blocking. -}
|
||||||
ensureOnlyActionOn :: Key -> Annex a -> Annex a
|
ensureOnlyActionOn :: Key -> Annex a -> Annex a
|
||||||
ensureOnlyActionOn k a = debugLocks $
|
ensureOnlyActionOn k a = debugLocks $
|
||||||
go =<< Annex.getState Annex.concurrency
|
go =<< getConcurrency
|
||||||
where
|
where
|
||||||
go NonConcurrent = a
|
go NonConcurrent = a
|
||||||
go (Concurrent _) = goconcurrent
|
go (Concurrent _) = goconcurrent
|
||||||
|
|
|
@ -392,7 +392,7 @@ jsonProgressOption =
|
||||||
-- action in `allowConcurrentOutput`.
|
-- action in `allowConcurrentOutput`.
|
||||||
jobsOption :: [GlobalOption]
|
jobsOption :: [GlobalOption]
|
||||||
jobsOption =
|
jobsOption =
|
||||||
[ globalSetter setConcurrency $
|
[ globalSetter (setConcurrency . ConcurrencyCmdLine) $
|
||||||
option (maybeReader parseConcurrency)
|
option (maybeReader parseConcurrency)
|
||||||
( long "jobs" <> short 'J'
|
( long "jobs" <> short 'J'
|
||||||
<> metavar (paramNumber `paramOr` "cpus")
|
<> metavar (paramNumber `paramOr` "cpus")
|
||||||
|
|
|
@ -68,6 +68,7 @@ import Types.Command (StartMessage(..), SeekInput)
|
||||||
import Types.Transfer (transferKey)
|
import Types.Transfer (transferKey)
|
||||||
import Messages.Internal
|
import Messages.Internal
|
||||||
import Messages.Concurrent
|
import Messages.Concurrent
|
||||||
|
import Annex.Concurrent.Utility
|
||||||
import qualified Messages.JSON as JSON
|
import qualified Messages.JSON as JSON
|
||||||
import qualified Annex
|
import qualified Annex
|
||||||
|
|
||||||
|
@ -298,7 +299,7 @@ prompt a = do
|
||||||
|
|
||||||
{- Like prompt, but for a non-annex action that prompts. -}
|
{- Like prompt, but for a non-annex action that prompts. -}
|
||||||
mkPrompter :: (MonadMask m, MonadIO m) => Annex (m a -> m a)
|
mkPrompter :: (MonadMask m, MonadIO m) => Annex (m a -> m a)
|
||||||
mkPrompter = Annex.getState Annex.concurrency >>= \case
|
mkPrompter = getConcurrency >>= \case
|
||||||
NonConcurrent -> return id
|
NonConcurrent -> return id
|
||||||
(Concurrent _) -> goconcurrent
|
(Concurrent _) -> goconcurrent
|
||||||
ConcurrentPerCpu -> goconcurrent
|
ConcurrentPerCpu -> goconcurrent
|
||||||
|
|
|
@ -17,3 +17,8 @@ parseConcurrency :: String -> Maybe Concurrency
|
||||||
parseConcurrency "cpus" = Just ConcurrentPerCpu
|
parseConcurrency "cpus" = Just ConcurrentPerCpu
|
||||||
parseConcurrency "cpu" = Just ConcurrentPerCpu
|
parseConcurrency "cpu" = Just ConcurrentPerCpu
|
||||||
parseConcurrency s = Concurrent <$> readish s
|
parseConcurrency s = Concurrent <$> readish s
|
||||||
|
|
||||||
|
-- Concurrency can be configured at the command line or by git config.
|
||||||
|
data ConcurrencySetting
|
||||||
|
= ConcurrencyCmdLine Concurrency
|
||||||
|
| ConcurrencyGitConfig Concurrency
|
||||||
|
|
Loading…
Reference in a new issue