make WorkerStage an open type
Rather than limiting it to PerformStage and CleanupStage, this opens it up so any number of stages can be added as needed by commands. Each concurrent command has a set of stages that it uses, and only transitions between those can block waiting for a free slot in the worker pool. Calling enteringStage for some other stage does not block, and has very little overhead. Note that while before the Annex state was duplicated on the first call to commandAction, this now happens earlier, in startConcurrency. That means that seek stage actions should that use startConcurrency and then modify Annex state won't modify the state of worker threads they then start. I audited all of them, and only Command.Seek did so; prepMerge changes the working directory and so has to come before startConcurrency. Also, the remote list is built before duplicating the state, which means that it gets built earlier now than it used to. This would only have an effect of making commands that end up not needing to perform any actions unncessary build the remote list (only when they're run with concurrency enable), but that's a minor overhead compared to commands seeking through the work tree and determining they don't need to do anything.
This commit is contained in:
parent
e19408ed9d
commit
53882ab4a7
17 changed files with 230 additions and 147 deletions
|
@ -7,7 +7,12 @@
|
|||
|
||||
{-# LANGUAGE CPP #-}
|
||||
|
||||
module Annex.Action where
|
||||
module Annex.Action (
|
||||
startup,
|
||||
shutdown,
|
||||
stopCoProcesses,
|
||||
reapZombies,
|
||||
) where
|
||||
|
||||
import qualified Data.Map as M
|
||||
#ifndef mingw32_HOST_OS
|
||||
|
@ -18,10 +23,7 @@ import Utility.Exception
|
|||
import Annex.Common
|
||||
import qualified Annex
|
||||
import Annex.Content
|
||||
import Annex.CatFile
|
||||
import Annex.CheckAttr
|
||||
import Annex.HashObject
|
||||
import Annex.CheckIgnore
|
||||
import Annex.Concurrent
|
||||
|
||||
{- Actions to perform each time ran. -}
|
||||
startup :: Annex ()
|
||||
|
@ -35,14 +37,6 @@ shutdown nocommit = do
|
|||
stopCoProcesses
|
||||
liftIO reapZombies -- zombies from long-running git processes
|
||||
|
||||
{- Stops all long-running git query processes. -}
|
||||
stopCoProcesses :: Annex ()
|
||||
stopCoProcesses = do
|
||||
catFileStop
|
||||
checkAttrStop
|
||||
hashObjectStop
|
||||
checkIgnoreStop
|
||||
|
||||
{- Reaps any zombie processes that may be hanging around.
|
||||
-
|
||||
- Warning: Not thread safe. Anything that was expecting to wait
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{- git-annex concurrent state
|
||||
-
|
||||
- Copyright 2015 Joey Hess <id@joeyh.name>
|
||||
- Copyright 2015-2019 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU AGPL version 3 or higher.
|
||||
-}
|
||||
|
@ -9,9 +9,15 @@ module Annex.Concurrent where
|
|||
|
||||
import Annex
|
||||
import Annex.Common
|
||||
import Annex.Action
|
||||
import qualified Annex.Queue
|
||||
import Annex.CatFile
|
||||
import Annex.CheckAttr
|
||||
import Annex.HashObject
|
||||
import Annex.CheckIgnore
|
||||
import Types.WorkerPool
|
||||
|
||||
import Control.Concurrent
|
||||
import Control.Concurrent.STM
|
||||
import qualified Data.Map as M
|
||||
|
||||
{- Allows forking off a thread that uses a copy of the current AnnexState
|
||||
|
@ -59,3 +65,83 @@ mergeState st = do
|
|||
uncurry addCleanup
|
||||
Annex.Queue.mergeFrom st'
|
||||
changeState $ \s -> s { errcounter = errcounter s + errcounter st' }
|
||||
|
||||
{- Stops all long-running git query processes. -}
|
||||
stopCoProcesses :: Annex ()
|
||||
stopCoProcesses = do
|
||||
catFileStop
|
||||
checkAttrStop
|
||||
hashObjectStop
|
||||
checkIgnoreStop
|
||||
|
||||
{- Runs an action and makes the current thread have the specified stage
|
||||
- while doing so. If too many other threads are running in the specified
|
||||
- stage, waits for one of them to become idle.
|
||||
-
|
||||
- Noop if the current thread already has the requested stage, or if the
|
||||
- current thread is not in the worker pool, or if concurrency is not
|
||||
- enabled.
|
||||
-
|
||||
- Also a noop if the stage is not one of the stages that the worker pool
|
||||
- uses.
|
||||
-
|
||||
- The pool needs to continue to contain the same number of worker threads
|
||||
- for each stage. So, an idle worker with the desired stage is found in
|
||||
- the pool (waiting if necessary for one to become idle), and the stages
|
||||
- of it and the current thread are swapped.
|
||||
-}
|
||||
enteringStage :: WorkerStage -> Annex a -> Annex a
|
||||
enteringStage newstage a = do
|
||||
mytid <- liftIO myThreadId
|
||||
tv <- Annex.getState Annex.workers
|
||||
let setup = changeStageTo mytid tv newstage
|
||||
let cleanup Nothing = noop
|
||||
let cleanup (Just oldstage) = changeStageTo mytid tv oldstage
|
||||
bracket setup cleanup (const a)
|
||||
|
||||
changeStageTo :: ThreadId -> TMVar (WorkerPool AnnexState) -> WorkerStage -> Annex (Maybe WorkerStage)
|
||||
changeStageTo mytid tv newstage = liftIO $ atomically $ do
|
||||
pool <- takeTMVar tv
|
||||
case pool of
|
||||
WorkerPool usedstages _
|
||||
| memberStage newstage usedstages ->
|
||||
case removeThreadIdWorkerPool mytid pool of
|
||||
Just ((myaid, oldstage), WorkerPool usedstages' l)
|
||||
| oldstage /= newstage -> do
|
||||
(idlest, restpool) <- waitWorkerSlot usedstages' newstage l
|
||||
let pool' = addWorkerPool (IdleWorker idlest oldstage) $
|
||||
addWorkerPool (ActiveWorker myaid newstage) restpool
|
||||
putTMVar tv pool'
|
||||
return (Just oldstage)
|
||||
_ -> do
|
||||
putTMVar tv pool
|
||||
return Nothing
|
||||
_ -> do
|
||||
putTMVar tv pool
|
||||
return Nothing
|
||||
|
||||
-- | Waits until there's an idle worker in the worker pool
|
||||
-- for its initial stage, removes it from the pool, and returns its state.
|
||||
--
|
||||
-- If the worker pool is not already allocated, returns Nothing.
|
||||
waitInitialWorkerSlot :: TMVar (WorkerPool Annex.AnnexState) -> STM (Maybe (Annex.AnnexState, WorkerStage))
|
||||
waitInitialWorkerSlot tv =
|
||||
takeTMVar tv >>= \case
|
||||
UnallocatedWorkerPool -> do
|
||||
putTMVar tv UnallocatedWorkerPool
|
||||
return Nothing
|
||||
WorkerPool usedstages l -> do
|
||||
let stage = initialStage usedstages
|
||||
(st, pool') <- waitWorkerSlot usedstages stage l
|
||||
putTMVar tv pool'
|
||||
return $ Just (st, stage)
|
||||
|
||||
-- | Waits until there's an idle worker for the specified stage, and returns
|
||||
-- its state and a WorkerPool containing all the other workers.
|
||||
waitWorkerSlot :: UsedStages -> WorkerStage -> [Worker Annex.AnnexState] -> STM (Annex.AnnexState, WorkerPool Annex.AnnexState)
|
||||
waitWorkerSlot usedstages wantstage = findidle []
|
||||
where
|
||||
findidle _ [] = retry
|
||||
findidle c ((IdleWorker st stage):rest)
|
||||
| stage == wantstage = return (st, WorkerPool usedstages (c ++ rest))
|
||||
findidle c (w:rest) = findidle (w:c) rest
|
||||
|
|
|
@ -90,6 +90,7 @@ import Annex.InodeSentinal
|
|||
import Utility.InodeCache
|
||||
import Annex.Content.LowLevel
|
||||
import Annex.Content.PointerFile
|
||||
import Annex.Concurrent
|
||||
|
||||
{- Checks if a given key's content is currently present. -}
|
||||
inAnnex :: Key -> Annex Bool
|
||||
|
|
|
@ -55,18 +55,21 @@ commandActions = mapM_ commandAction
|
|||
-}
|
||||
commandAction :: CommandStart -> Annex ()
|
||||
commandAction start = Annex.getState Annex.concurrency >>= \case
|
||||
NonConcurrent -> void $ includeCommandAction start
|
||||
Concurrent n -> runconcurrent n
|
||||
ConcurrentPerCpu -> runconcurrent =<< liftIO getNumProcessors
|
||||
NonConcurrent -> runnonconcurrent
|
||||
Concurrent _ -> runconcurrent
|
||||
ConcurrentPerCpu -> runconcurrent
|
||||
where
|
||||
runconcurrent n = do
|
||||
runnonconcurrent = void $ includeCommandAction start
|
||||
runconcurrent = do
|
||||
tv <- Annex.getState Annex.workers
|
||||
workerst <- waitWorkerSlot n PerformStage tv
|
||||
liftIO (atomically (waitInitialWorkerSlot tv)) >>=
|
||||
maybe runnonconcurrent (runconcurrent' tv)
|
||||
runconcurrent' tv (workerst, workerstage) = do
|
||||
aid <- liftIO $ async $ snd <$> Annex.run workerst
|
||||
(concurrentjob workerst)
|
||||
liftIO $ atomically $ do
|
||||
pool <- takeTMVar tv
|
||||
let !pool' = addWorkerPool (ActiveWorker aid PerformStage) pool
|
||||
let !pool' = addWorkerPool (ActiveWorker aid workerstage) pool
|
||||
putTMVar tv pool'
|
||||
void $ liftIO $ forkIO $ debugLocks $ do
|
||||
-- accountCommandAction will usually catch
|
||||
|
@ -109,8 +112,7 @@ commandAction start = Annex.getState Annex.concurrency >>= \case
|
|||
performconcurrent startmsg perform = do
|
||||
showStartMessage startmsg
|
||||
perform >>= \case
|
||||
Just cleanup -> do
|
||||
changeStageTo CleanupStage
|
||||
Just cleanup -> enteringStage CleanupStage $ do
|
||||
r <- cleanup
|
||||
showEndMessage startmsg r
|
||||
return r
|
||||
|
@ -118,56 +120,6 @@ commandAction start = Annex.getState Annex.concurrency >>= \case
|
|||
showEndMessage startmsg False
|
||||
return False
|
||||
|
||||
-- | Wait until there's an idle worker in the pool, remove it from the
|
||||
-- pool, and return its state.
|
||||
--
|
||||
-- If the pool is unallocated, it will be allocated to the specified size.
|
||||
waitWorkerSlot :: Int -> WorkerStage -> TMVar (WorkerPool Annex.AnnexState) -> Annex Annex.AnnexState
|
||||
waitWorkerSlot n wantstage tv = debugLocks $
|
||||
join $ liftIO $ atomically $ waitWorkerSlot' wantstage tv >>= \case
|
||||
Nothing -> return $ do
|
||||
-- Generate the remote list now, to avoid
|
||||
-- each thread generating it, which would
|
||||
-- be more expensive and could cause
|
||||
-- threads to contend over eg, calls to
|
||||
-- setConfig.
|
||||
_ <- remoteList
|
||||
st <- dupState
|
||||
liftIO $ atomically $ do
|
||||
let (WorkerPool l) = allocateWorkerPool st (max n 1)
|
||||
let (st', pool) = findidle st [] l
|
||||
void $ swapTMVar tv pool
|
||||
return st'
|
||||
Just st -> return $ return st
|
||||
where
|
||||
findidle st _ [] = (st, WorkerPool [])
|
||||
findidle _ c ((IdleWorker st stage):rest)
|
||||
| stage == wantstage = (st, WorkerPool (c ++ rest))
|
||||
findidle st c (w:rest) = findidle st (w:c) rest
|
||||
|
||||
-- | STM action that waits until there's an idle worker in the worker pool,
|
||||
-- removes it from the pool, and returns its state.
|
||||
--
|
||||
-- If the worker pool is not already allocated, returns Nothing.
|
||||
waitWorkerSlot' :: WorkerStage -> TMVar (WorkerPool Annex.AnnexState) -> STM (Maybe (Annex.AnnexState))
|
||||
waitWorkerSlot' wantstage tv =
|
||||
takeTMVar tv >>= \case
|
||||
UnallocatedWorkerPool -> do
|
||||
putTMVar tv UnallocatedWorkerPool
|
||||
return Nothing
|
||||
WorkerPool l -> do
|
||||
(st, pool') <- waitWorkerSlot'' wantstage l
|
||||
putTMVar tv pool'
|
||||
return $ Just st
|
||||
|
||||
waitWorkerSlot'' :: WorkerStage -> [Worker Annex.AnnexState] -> STM (Annex.AnnexState, WorkerPool Annex.AnnexState)
|
||||
waitWorkerSlot'' wantstage = findidle []
|
||||
where
|
||||
findidle _ [] = retry
|
||||
findidle c ((IdleWorker st stage):rest)
|
||||
| stage == wantstage = return (st, WorkerPool (c ++ rest))
|
||||
findidle c (w:rest) = findidle (w:c) rest
|
||||
|
||||
{- Waits for all worker threads to finish and merges their AnnexStates
|
||||
- back into the current Annex's state.
|
||||
-}
|
||||
|
@ -178,37 +130,11 @@ finishCommandActions = do
|
|||
swapTMVar tv UnallocatedWorkerPool
|
||||
case pool of
|
||||
UnallocatedWorkerPool -> noop
|
||||
WorkerPool l -> forM_ (mapMaybe workerAsync l) $ \aid ->
|
||||
WorkerPool _ l -> forM_ (mapMaybe workerAsync l) $ \aid ->
|
||||
liftIO (waitCatch aid) >>= \case
|
||||
Left _ -> noop
|
||||
Right st -> mergeState st
|
||||
|
||||
{- Changes the current thread's stage in the worker pool.
|
||||
-
|
||||
- The pool needs to continue to contain the same number of worker threads
|
||||
- for each stage. So, an idle worker with the desired stage is found in
|
||||
- the pool (waiting if necessary for one to become idle), and the stages
|
||||
- of it and the current thread are swapped.
|
||||
-
|
||||
- Noop if the current thread already has the requested stage, or if the
|
||||
- current thread is not in the worker pool, or if concurrency is not
|
||||
- enabled.
|
||||
-}
|
||||
changeStageTo :: WorkerStage -> Annex ()
|
||||
changeStageTo newstage = debugLocks $ do
|
||||
mytid <- liftIO myThreadId
|
||||
tv <- Annex.getState Annex.workers
|
||||
liftIO $ atomically $ do
|
||||
pool <- takeTMVar tv
|
||||
case removeThreadIdWorkerPool mytid pool of
|
||||
Just ((myaid, oldstage), WorkerPool l)
|
||||
| oldstage /= newstage -> do
|
||||
(idlest, restpool) <- waitWorkerSlot'' newstage l
|
||||
let pool' = addWorkerPool (IdleWorker idlest oldstage) $
|
||||
addWorkerPool (ActiveWorker myaid newstage) restpool
|
||||
putTMVar tv pool'
|
||||
_ -> putTMVar tv pool
|
||||
|
||||
{- Like commandAction, but without the concurrency. -}
|
||||
includeCommandAction :: CommandStart -> CommandCleanup
|
||||
includeCommandAction start =
|
||||
|
@ -261,28 +187,35 @@ performCommandAction' startmsg perform =
|
|||
showEndMessage startmsg r
|
||||
return r
|
||||
|
||||
{- Do concurrent output when that has been requested. -}
|
||||
allowConcurrentOutput :: Annex a -> Annex a
|
||||
allowConcurrentOutput a = do
|
||||
{- Start concurrency when that has been requested.
|
||||
- Should be run wrapping the seek stage of a command.
|
||||
-
|
||||
- Note that a duplicate of the Annex state is made here, and worker
|
||||
- threads use that state. While the worker threads are not actually
|
||||
- started here, that has the same effect.
|
||||
-}
|
||||
startConcurrency :: UsedStages -> Annex a -> Annex a
|
||||
startConcurrency usedstages a = do
|
||||
fromcmdline <- Annex.getState Annex.concurrency
|
||||
fromgitcfg <- annexJobs <$> Annex.getGitConfig
|
||||
let usegitcfg = Annex.changeState $
|
||||
\c -> c { Annex.concurrency = fromgitcfg }
|
||||
case (fromcmdline, fromgitcfg) of
|
||||
(NonConcurrent, NonConcurrent) -> a
|
||||
(Concurrent n, _) -> do
|
||||
raisecapabilitiesto n
|
||||
goconcurrent
|
||||
(ConcurrentPerCpu, _) -> goconcurrent
|
||||
(Concurrent n, _) ->
|
||||
goconcurrent n
|
||||
(ConcurrentPerCpu, _) ->
|
||||
goconcurrentpercpu
|
||||
(NonConcurrent, Concurrent n) -> do
|
||||
usegitcfg
|
||||
raisecapabilitiesto n
|
||||
goconcurrent
|
||||
goconcurrent n
|
||||
(NonConcurrent, ConcurrentPerCpu) -> do
|
||||
usegitcfg
|
||||
goconcurrent
|
||||
goconcurrentpercpu
|
||||
where
|
||||
goconcurrent = do
|
||||
goconcurrent n = do
|
||||
raisecapabilitiesto n
|
||||
initworkerpool n
|
||||
withMessageState $ \s -> case outputType s of
|
||||
NormalOutput -> ifM (liftIO concurrentOutputSupported)
|
||||
( Regions.displayConsoleRegions $
|
||||
|
@ -292,6 +225,8 @@ allowConcurrentOutput a = do
|
|||
_ -> goconcurrent' False
|
||||
goconcurrent' b = bracket_ (setup b) cleanup a
|
||||
|
||||
goconcurrentpercpu = goconcurrent =<< liftIO getNumProcessors
|
||||
|
||||
setup = setconcurrentoutputenabled
|
||||
|
||||
cleanup = do
|
||||
|
@ -305,6 +240,17 @@ allowConcurrentOutput a = do
|
|||
c <- liftIO getNumCapabilities
|
||||
when (n > c) $
|
||||
liftIO $ setNumCapabilities n
|
||||
|
||||
initworkerpool n = do
|
||||
-- Generate the remote list now, to avoid each thread
|
||||
-- generating it, which would be more expensive and
|
||||
-- could cause threads to contend over eg, calls to
|
||||
-- setConfig.
|
||||
_ <- remoteList
|
||||
st <- dupState
|
||||
tv <- Annex.getState Annex.workers
|
||||
liftIO $ atomically $ putTMVar tv $
|
||||
allocateWorkerPool st (max n 1) usedstages
|
||||
|
||||
{- Ensures that only one thread processes a key at a time.
|
||||
- Other threads will block until it's done.
|
||||
|
|
|
@ -28,7 +28,7 @@ import Config
|
|||
import Utility.Daemon
|
||||
import Types.Transfer
|
||||
import Types.ActionItem
|
||||
import Types.WorkerPool
|
||||
import Types.WorkerPool as ReExported
|
||||
|
||||
{- Generates a normal Command -}
|
||||
command :: String -> CommandSection -> String -> CmdParamsDesc -> (CmdParamsDesc -> CommandParser) -> Command
|
||||
|
|
|
@ -50,7 +50,7 @@ optParser desc = AddOptions
|
|||
)
|
||||
|
||||
seek :: AddOptions -> CommandSeek
|
||||
seek o = allowConcurrentOutput $ do
|
||||
seek o = startConcurrency commandStages $ do
|
||||
matcher <- largeFilesMatcher
|
||||
let gofile file = ifM (checkFileMatcher matcher file <||> Annex.getState Annex.force)
|
||||
( start file
|
||||
|
|
|
@ -93,7 +93,7 @@ parseDownloadOptions withfileoption = DownloadOptions
|
|||
else pure Nothing
|
||||
|
||||
seek :: AddUrlOptions -> CommandSeek
|
||||
seek o = allowConcurrentOutput $ do
|
||||
seek o = startConcurrency commandStages $ do
|
||||
forM_ (addUrls o) (\u -> go (o, u))
|
||||
case batchOption o of
|
||||
Batch fmt -> batchInput fmt (parseBatchInput o) go
|
||||
|
|
|
@ -44,7 +44,7 @@ instance DeferredParseClass CopyOptions where
|
|||
<*> pure (batchOption v)
|
||||
|
||||
seek :: CopyOptions -> CommandSeek
|
||||
seek o = allowConcurrentOutput $ do
|
||||
seek o = startConcurrency commandStages $ do
|
||||
let go = whenAnnexed $ start o
|
||||
case batchOption o of
|
||||
Batch fmt -> batchFilesMatching fmt go
|
||||
|
|
|
@ -52,7 +52,7 @@ parseDropFromOption = parseRemoteOption <$> strOption
|
|||
)
|
||||
|
||||
seek :: DropOptions -> CommandSeek
|
||||
seek o = allowConcurrentOutput $
|
||||
seek o = startConcurrency commandStages $
|
||||
case batchOption o of
|
||||
Batch fmt -> batchFilesMatching fmt go
|
||||
NoBatch -> withKeyOptions (keyOptions o) (autoMode o)
|
||||
|
|
|
@ -88,7 +88,7 @@ optParser desc = FsckOptions
|
|||
))
|
||||
|
||||
seek :: FsckOptions -> CommandSeek
|
||||
seek o = allowConcurrentOutput $ do
|
||||
seek o = startConcurrency commandStages $ do
|
||||
from <- maybe (pure Nothing) (Just <$$> getParsed) (fsckFromOption o)
|
||||
u <- maybe getUUID (pure . Remote.uuid) from
|
||||
checkDeadRepo u
|
||||
|
|
|
@ -38,7 +38,7 @@ optParser desc = GetOptions
|
|||
<*> parseBatchOption
|
||||
|
||||
seek :: GetOptions -> CommandSeek
|
||||
seek o = allowConcurrentOutput $ do
|
||||
seek o = startConcurrency commandStages $ do
|
||||
from <- maybe (pure Nothing) (Just <$$> getParsed) (getFrom o)
|
||||
let go = whenAnnexed $ start o from
|
||||
case batchOption o of
|
||||
|
|
|
@ -96,7 +96,7 @@ duplicateModeParser =
|
|||
)
|
||||
|
||||
seek :: ImportOptions -> CommandSeek
|
||||
seek o@(LocalImportOptions {}) = allowConcurrentOutput $ do
|
||||
seek o@(LocalImportOptions {}) = startConcurrency commandStages $ do
|
||||
repopath <- liftIO . absPath =<< fromRepo Git.repoPath
|
||||
inrepops <- liftIO $ filter (dirContains repopath) <$> mapM absPath (importFiles o)
|
||||
unless (null inrepops) $ do
|
||||
|
@ -104,7 +104,7 @@ seek o@(LocalImportOptions {}) = allowConcurrentOutput $ do
|
|||
largematcher <- largeFilesMatcher
|
||||
(commandAction . startLocal largematcher (duplicateMode o))
|
||||
`withPathContents` importFiles o
|
||||
seek o@(RemoteImportOptions {}) = allowConcurrentOutput $ do
|
||||
seek o@(RemoteImportOptions {}) = startConcurrency commandStages $ do
|
||||
r <- getParsed (importFromRemote o)
|
||||
unlessM (Remote.isImportSupported r) $
|
||||
giveup "That remote does not support imports."
|
||||
|
|
|
@ -41,7 +41,7 @@ instance DeferredParseClass MirrorOptions where
|
|||
<*> pure (keyOptions v)
|
||||
|
||||
seek :: MirrorOptions -> CommandSeek
|
||||
seek o = allowConcurrentOutput $
|
||||
seek o = startConcurrency commandStages $
|
||||
withKeyOptions (keyOptions o) False
|
||||
(commandAction . startKey o (AssociatedFile Nothing))
|
||||
(withFilesInGit (commandAction . (whenAnnexed $ start o)))
|
||||
|
|
|
@ -54,7 +54,7 @@ data RemoveWhen = RemoveSafe | RemoveNever
|
|||
deriving (Show, Eq)
|
||||
|
||||
seek :: MoveOptions -> CommandSeek
|
||||
seek o = allowConcurrentOutput $ do
|
||||
seek o = startConcurrency commandStages $ do
|
||||
let go = whenAnnexed $ start (fromToOptions o) (removeWhen o)
|
||||
case batchOption o of
|
||||
Batch fmt -> batchFilesMatching fmt go
|
||||
|
|
|
@ -162,9 +162,12 @@ instance DeferredParseClass SyncOptions where
|
|||
<*> pure (resolveMergeOverride v)
|
||||
|
||||
seek :: SyncOptions -> CommandSeek
|
||||
seek o = allowConcurrentOutput $ do
|
||||
seek o = do
|
||||
prepMerge
|
||||
|
||||
startConcurrency commandStages (seek' o)
|
||||
|
||||
seek' :: SyncOptions -> CommandSeek
|
||||
seek' o = do
|
||||
let withbranch a = a =<< getCurrentBranch
|
||||
|
||||
remotes <- syncRemotes (syncWith o)
|
||||
|
|
|
@ -63,16 +63,24 @@ instance MkActionItem StartMessage where
|
|||
|
||||
{- A command is defined by specifying these things. -}
|
||||
data Command = Command
|
||||
{ cmdcheck :: [CommandCheck] -- check stage
|
||||
, cmdnocommit :: Bool -- don't commit journalled state changes
|
||||
, cmdnomessages :: Bool -- don't output normal messages
|
||||
{ cmdcheck :: [CommandCheck]
|
||||
-- ^ check stage
|
||||
, cmdnocommit :: Bool
|
||||
-- ^ don't commit journalled state changes
|
||||
, cmdnomessages :: Bool
|
||||
-- ^ don't output normal messages
|
||||
, cmdname :: String
|
||||
, cmdparamdesc :: CmdParamsDesc -- description of params for usage
|
||||
, cmdparamdesc :: CmdParamsDesc
|
||||
-- ^ description of params for usage
|
||||
, cmdsection :: CommandSection
|
||||
, cmddesc :: String -- description of command for usage
|
||||
, cmdparser :: CommandParser -- command line parser
|
||||
, cmdglobaloptions :: [GlobalOption] -- additional global options
|
||||
, cmdnorepo :: Maybe (Parser (IO ())) -- used when not in a repo
|
||||
, cmddesc :: String
|
||||
-- ^ description of command for usage
|
||||
, cmdparser :: CommandParser
|
||||
-- ^ command line parser
|
||||
, cmdglobaloptions :: [GlobalOption]
|
||||
-- ^ additional global options
|
||||
, cmdnorepo :: Maybe (Parser (IO ()))
|
||||
-- ^used when not in a repo
|
||||
}
|
||||
|
||||
{- Command-line parameters, after the command is selected and options
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
{- Command worker pool.
|
||||
{- Worker thread pool.
|
||||
-
|
||||
- Copyright 2019 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
|
@ -9,11 +9,12 @@ module Types.WorkerPool where
|
|||
|
||||
import Control.Concurrent
|
||||
import Control.Concurrent.Async
|
||||
import qualified Data.Set as S
|
||||
|
||||
-- | Pool of worker threads.
|
||||
data WorkerPool t
|
||||
= UnallocatedWorkerPool
|
||||
| WorkerPool [Worker t]
|
||||
| WorkerPool UsedStages [Worker t]
|
||||
deriving (Show)
|
||||
|
||||
-- | A worker can either be idle or running an Async action.
|
||||
|
@ -26,9 +27,53 @@ instance Show (Worker t) where
|
|||
show (IdleWorker _ s) = "IdleWorker " ++ show s
|
||||
show (ActiveWorker _ s) = "ActiveWorker " ++ show s
|
||||
|
||||
-- | These correspond to CommandPerform and CommandCleanup.
|
||||
data WorkerStage = PerformStage | CleanupStage
|
||||
deriving (Show, Eq)
|
||||
data WorkerStage
|
||||
= PerformStage
|
||||
-- ^ Running a CommandPerform action.
|
||||
| CleanupStage
|
||||
-- ^ Running a CommandCleanup action.
|
||||
| TransferStage
|
||||
-- ^ Transferring content to or from a remote.
|
||||
| VerifyStage
|
||||
-- ^ Verifying content, eg by calculating a checksum.
|
||||
deriving (Show, Eq, Ord)
|
||||
|
||||
-- | Set of stages that make sense to be used while performing an action,
|
||||
-- and the stage to use initially.
|
||||
--
|
||||
-- Transitions between these stages will block as needed until there's a
|
||||
-- free Worker in the pool for the new stage.
|
||||
--
|
||||
-- Actions that indicate they are in some other stage won't change the
|
||||
-- stage, and so there will be no blocking before starting them.
|
||||
data UsedStages = UsedStages
|
||||
{ initialStage :: WorkerStage
|
||||
, usedStages :: S.Set WorkerStage
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
memberStage :: WorkerStage -> UsedStages -> Bool
|
||||
memberStage s u = S.member s (usedStages u)
|
||||
|
||||
-- | The default is to use only the CommandPerform and CommandCleanup
|
||||
-- stages. Since cleanup actions often don't contend much with
|
||||
-- perform actions, this prevents blocking starting the next perform action
|
||||
-- on finishing the previous cleanup action.
|
||||
commandStages :: UsedStages
|
||||
commandStages = UsedStages
|
||||
{ initialStage = PerformStage
|
||||
, usedStages = S.fromList [PerformStage, CleanupStage]
|
||||
}
|
||||
|
||||
-- | When a command is transferring content, it can use this instead.
|
||||
-- Transfers are often bottlenecked on the network another disk than the one
|
||||
-- containing the repository, while verification bottlenecks on
|
||||
-- the disk containing the repository or on the CPU.
|
||||
transferStages :: UsedStages
|
||||
transferStages = UsedStages
|
||||
{ initialStage = TransferStage
|
||||
, usedStages = S.fromList [TransferStage, VerifyStage]
|
||||
}
|
||||
|
||||
workerStage :: Worker t -> WorkerStage
|
||||
workerStage (IdleWorker _ s) = s
|
||||
|
@ -42,19 +87,19 @@ workerAsync (ActiveWorker aid _) = Just aid
|
|||
-- in it, of each stage.
|
||||
--
|
||||
-- The stages are distributed evenly throughout.
|
||||
allocateWorkerPool :: t -> Int -> WorkerPool t
|
||||
allocateWorkerPool t n = WorkerPool $ take (n+n) $
|
||||
allocateWorkerPool :: t -> Int -> UsedStages -> WorkerPool t
|
||||
allocateWorkerPool t n u = WorkerPool u $ take (n+n) $
|
||||
map (uncurry IdleWorker) $ zip (repeat t) stages
|
||||
where
|
||||
stages = concat $ repeat [PerformStage, CleanupStage]
|
||||
stages = concat $ repeat $ S.toList $ usedStages u
|
||||
|
||||
addWorkerPool :: Worker t -> WorkerPool t -> WorkerPool t
|
||||
addWorkerPool w (WorkerPool l) = WorkerPool (w:l)
|
||||
addWorkerPool w UnallocatedWorkerPool = WorkerPool [w]
|
||||
addWorkerPool w (WorkerPool u l) = WorkerPool u (w:l)
|
||||
addWorkerPool _ UnallocatedWorkerPool = UnallocatedWorkerPool
|
||||
|
||||
idleWorkers :: WorkerPool t -> [t]
|
||||
idleWorkers UnallocatedWorkerPool = []
|
||||
idleWorkers (WorkerPool l) = go l
|
||||
idleWorkers (WorkerPool _ l) = go l
|
||||
where
|
||||
go [] = []
|
||||
go (IdleWorker t _ : rest) = t : go rest
|
||||
|
@ -65,17 +110,17 @@ idleWorkers (WorkerPool l) = go l
|
|||
-- Each Async has its own ThreadId, so this stops once it finds
|
||||
-- a match.
|
||||
removeThreadIdWorkerPool :: ThreadId -> WorkerPool t -> Maybe ((Async t, WorkerStage), WorkerPool t)
|
||||
removeThreadIdWorkerPool _ UnallocatedWorkerPool = Nothing
|
||||
removeThreadIdWorkerPool tid (WorkerPool l) = go [] l
|
||||
removeThreadIdWorkerPool _ UnallocatedWorkerPool = Nothing
|
||||
removeThreadIdWorkerPool tid (WorkerPool u l) = go [] l
|
||||
where
|
||||
go _ [] = Nothing
|
||||
go c (ActiveWorker a stage : rest)
|
||||
| asyncThreadId a == tid = Just ((a, stage), WorkerPool (c++rest))
|
||||
| asyncThreadId a == tid = Just ((a, stage), WorkerPool u (c++rest))
|
||||
go c (v : rest) = go (v:c) rest
|
||||
|
||||
deactivateWorker :: WorkerPool t -> Async t -> t -> WorkerPool t
|
||||
deactivateWorker UnallocatedWorkerPool _ _ = UnallocatedWorkerPool
|
||||
deactivateWorker (WorkerPool l) aid t = WorkerPool $ go l
|
||||
deactivateWorker (WorkerPool u l) aid t = WorkerPool u $ go l
|
||||
where
|
||||
go [] = []
|
||||
go (w@(IdleWorker _ _) : rest) = w : go rest
|
||||
|
|
Loading…
Reference in a new issue