diff --git a/Annex.hs b/Annex.hs index 48d3c4c633..90c327eab1 100644 --- a/Annex.hs +++ b/Annex.hs @@ -66,13 +66,13 @@ import Types.LockCache import Types.DesktopNotify import Types.CleanupActions import Types.AdjustedBranch +import Types.WorkerPool import qualified Database.Keys.Handle as Keys import Utility.InodeCache import Utility.Url import "mtl" Control.Monad.Reader import Control.Concurrent -import Control.Concurrent.Async import Control.Concurrent.STM import qualified Control.Monad.Fail as Fail import qualified Data.Map.Strict as M @@ -142,7 +142,7 @@ data AnnexState = AnnexState , tempurls :: M.Map Key URLString , existinghooks :: M.Map Git.Hook.Hook Bool , desktopnotify :: DesktopNotify - , workers :: [Either AnnexState (Async AnnexState)] + , workers :: WorkerPool AnnexState , activekeys :: TVar (M.Map Key ThreadId) , activeremotes :: MVar (M.Map (Types.Remote.RemoteA Annex) Integer) , keysdbhandle :: Maybe Keys.DbHandle @@ -199,7 +199,7 @@ newState c r = do , tempurls = M.empty , existinghooks = M.empty , desktopnotify = mempty - , workers = [] + , workers = UnallocatedWorkerPool , activekeys = emptyactivekeys , activeremotes = emptyactiveremotes , keysdbhandle = Nothing diff --git a/Annex/Concurrent.hs b/Annex/Concurrent.hs index 346d673387..b3f8688054 100644 --- a/Annex/Concurrent.hs +++ b/Annex/Concurrent.hs @@ -11,6 +11,7 @@ import Annex import Annex.Common import Annex.Action import qualified Annex.Queue +import Types.WorkerPool import qualified Data.Map as M @@ -42,7 +43,7 @@ dupState :: Annex AnnexState dupState = do st <- Annex.getState id return $ st - { Annex.workers = [] + { Annex.workers = UnallocatedWorkerPool -- each thread has its own repoqueue , Annex.repoqueue = Nothing -- avoid sharing eg, open file handles diff --git a/CmdLine/Action.hs b/CmdLine/Action.hs index 4de3d2c45c..534c7ed3cd 100644 --- a/CmdLine/Action.hs +++ b/CmdLine/Action.hs @@ -16,6 +16,7 @@ import Types.Command import Types.Concurrency import Messages.Concurrent import Types.Messages +import Types.WorkerPool import Remote.List import Control.Concurrent @@ -60,9 +61,9 @@ commandAction a = Annex.getState Annex.concurrency >>= \case run = void $ includeCommandAction a runconcurrent n = do - ws <- Annex.getState Annex.workers - (st, ws') <- if null ws - then do + ws <- liftIO . drainTo (n-1) =<< Annex.getState Annex.workers + (st, ws') <- case ws of + UnallocatedWorkerPool -> do -- Generate the remote list now, to avoid -- each thread generating it, which would -- be more expensive and could cause @@ -70,13 +71,12 @@ commandAction a = Annex.getState Annex.concurrency >>= \case -- setConfig. _ <- remoteList st <- dupState - return (st, replicate (n-1) (Left st)) - else do - l <- liftIO $ drainTo (n-1) ws - findFreeSlot l + return (st, allocateWorkerPool st (n-1)) + WorkerPool l -> findFreeSlot l w <- liftIO $ async $ snd <$> Annex.run st (inOwnConsoleRegion (Annex.output st) run) - Annex.changeState $ \s -> s { Annex.workers = Right w:ws' } + Annex.changeState $ \s -> s + { Annex.workers = addWorkerPool ws' (Right w) } commandActions :: [CommandStart] -> Annex () commandActions = mapM_ commandAction @@ -91,42 +91,41 @@ commandActions = mapM_ commandAction finishCommandActions :: Annex () finishCommandActions = do ws <- Annex.getState Annex.workers - Annex.changeState $ \s -> s { Annex.workers = [] } - l <- liftIO $ drainTo 0 ws - forM_ (lefts l) mergeState + Annex.changeState $ \s -> s { Annex.workers = UnallocatedWorkerPool } + ws' <- liftIO $ drainTo 0 ws + forM_ (idleWorkers ws') mergeState -{- Wait for Asyncs from the list to finish, replacing them with their - - final AnnexStates, until the list of remaining Asyncs is not larger - - than the specified size, then returns the new list. +{- Wait for jobs from the WorkerPool to complete, until + - the number of running jobs is not larger than the specified number. - - - If the action throws an exception, it is propigated, but first - - all other actions are waited for, to allow for a clean shutdown. + - If a job throws an exception, it is propigated, but first + - all other jobs are waited for, to allow for a clean shutdown. -} -drainTo - :: Int - -> [Either Annex.AnnexState (Async Annex.AnnexState)] - -> IO [Either Annex.AnnexState (Async Annex.AnnexState)] -drainTo sz l - | null as || sz >= length as = return l +drainTo :: Int -> WorkerPool t -> IO (WorkerPool t) +drainTo _ UnallocatedWorkerPool = pure UnallocatedWorkerPool +drainTo sz (WorkerPool l) + | null as || sz >= length as = pure (WorkerPool l) | otherwise = do (done, ret) <- waitAnyCatch as let as' = filter (/= done) as case ret of Left e -> do - void $ drainTo 0 (map Left sts ++ map Right as') + void $ drainTo 0 $ WorkerPool $ + map Left sts ++ map Right as' throwIO e Right st -> do - drainTo sz $ map Left (st:sts) ++ map Right as' + drainTo sz $ WorkerPool $ + map Left (st:sts) ++ map Right as' where (sts, as) = partitionEithers l -findFreeSlot :: [Either Annex.AnnexState (Async Annex.AnnexState)] -> Annex (Annex.AnnexState, [Either Annex.AnnexState (Async Annex.AnnexState)]) +findFreeSlot :: [Worker Annex.AnnexState] -> Annex (Annex.AnnexState, WorkerPool Annex.AnnexState) findFreeSlot = go [] where go c [] = do st <- dupState - return (st, c) - go c (Left st:rest) = return (st, c ++ rest) + return (st, WorkerPool c) + go c (Left st:rest) = return (st, WorkerPool (c ++ rest)) go c (v:rest) = go (v:c) rest {- Like commandAction, but without the concurrency. -} diff --git a/Types/WorkerPool.hs b/Types/WorkerPool.hs new file mode 100644 index 0000000000..acc11c8843 --- /dev/null +++ b/Types/WorkerPool.hs @@ -0,0 +1,30 @@ +{- Command worker pool. + - + - Copyright 2019 Joey Hess + - + - Licensed under the GNU AGPL version 3 or higher. + -} + +module Types.WorkerPool where + +import Control.Concurrent.Async +import Data.Either + +-- | Pool of worker threads. +data WorkerPool t + = UnallocatedWorkerPool + | WorkerPool [Worker t] + +-- | A worker can either be idle or running an Async action. +type Worker t = Either t (Async t) + +allocateWorkerPool :: t -> Int -> WorkerPool t +allocateWorkerPool t n = WorkerPool $ replicate n (Left t) + +addWorkerPool :: WorkerPool t -> Worker t -> WorkerPool t +addWorkerPool (WorkerPool l) w = WorkerPool (w:l) +addWorkerPool UnallocatedWorkerPool w = WorkerPool [w] + +idleWorkers :: WorkerPool t -> [t] +idleWorkers UnallocatedWorkerPool = [] +idleWorkers (WorkerPool l) = lefts l diff --git a/git-annex.cabal b/git-annex.cabal index 2d645cba26..8bbf855741 100644 --- a/git-annex.cabal +++ b/git-annex.cabal @@ -1001,6 +1001,7 @@ Executable git-annex Types.UUID Types.UrlContents Types.View + Types.WorkerPool Upgrade Upgrade.V0 Upgrade.V1