improved WorkerPool abstraction

No behavior changes.
This commit is contained in:
Joey Hess 2019-06-05 13:03:05 -04:00
parent 30286bf067
commit c04b2af3e1
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
5 changed files with 62 additions and 31 deletions

View file

@ -66,13 +66,13 @@ import Types.LockCache
import Types.DesktopNotify
import Types.CleanupActions
import Types.AdjustedBranch
import Types.WorkerPool
import qualified Database.Keys.Handle as Keys
import Utility.InodeCache
import Utility.Url
import "mtl" Control.Monad.Reader
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import qualified Control.Monad.Fail as Fail
import qualified Data.Map.Strict as M
@ -142,7 +142,7 @@ data AnnexState = AnnexState
, tempurls :: M.Map Key URLString
, existinghooks :: M.Map Git.Hook.Hook Bool
, desktopnotify :: DesktopNotify
, workers :: [Either AnnexState (Async AnnexState)]
, workers :: WorkerPool AnnexState
, activekeys :: TVar (M.Map Key ThreadId)
, activeremotes :: MVar (M.Map (Types.Remote.RemoteA Annex) Integer)
, keysdbhandle :: Maybe Keys.DbHandle
@ -199,7 +199,7 @@ newState c r = do
, tempurls = M.empty
, existinghooks = M.empty
, desktopnotify = mempty
, workers = []
, workers = UnallocatedWorkerPool
, activekeys = emptyactivekeys
, activeremotes = emptyactiveremotes
, keysdbhandle = Nothing

View file

@ -11,6 +11,7 @@ import Annex
import Annex.Common
import Annex.Action
import qualified Annex.Queue
import Types.WorkerPool
import qualified Data.Map as M
@ -42,7 +43,7 @@ dupState :: Annex AnnexState
dupState = do
st <- Annex.getState id
return $ st
{ Annex.workers = []
{ Annex.workers = UnallocatedWorkerPool
-- each thread has its own repoqueue
, Annex.repoqueue = Nothing
-- avoid sharing eg, open file handles

View file

@ -16,6 +16,7 @@ import Types.Command
import Types.Concurrency
import Messages.Concurrent
import Types.Messages
import Types.WorkerPool
import Remote.List
import Control.Concurrent
@ -60,9 +61,9 @@ commandAction a = Annex.getState Annex.concurrency >>= \case
run = void $ includeCommandAction a
runconcurrent n = do
ws <- Annex.getState Annex.workers
(st, ws') <- if null ws
then do
ws <- liftIO . drainTo (n-1) =<< Annex.getState Annex.workers
(st, ws') <- case ws of
UnallocatedWorkerPool -> do
-- Generate the remote list now, to avoid
-- each thread generating it, which would
-- be more expensive and could cause
@ -70,13 +71,12 @@ commandAction a = Annex.getState Annex.concurrency >>= \case
-- setConfig.
_ <- remoteList
st <- dupState
return (st, replicate (n-1) (Left st))
else do
l <- liftIO $ drainTo (n-1) ws
findFreeSlot l
return (st, allocateWorkerPool st (n-1))
WorkerPool l -> findFreeSlot l
w <- liftIO $ async $ snd <$> Annex.run st
(inOwnConsoleRegion (Annex.output st) run)
Annex.changeState $ \s -> s { Annex.workers = Right w:ws' }
Annex.changeState $ \s -> s
{ Annex.workers = addWorkerPool ws' (Right w) }
commandActions :: [CommandStart] -> Annex ()
commandActions = mapM_ commandAction
@ -91,42 +91,41 @@ commandActions = mapM_ commandAction
finishCommandActions :: Annex ()
finishCommandActions = do
ws <- Annex.getState Annex.workers
Annex.changeState $ \s -> s { Annex.workers = [] }
l <- liftIO $ drainTo 0 ws
forM_ (lefts l) mergeState
Annex.changeState $ \s -> s { Annex.workers = UnallocatedWorkerPool }
ws' <- liftIO $ drainTo 0 ws
forM_ (idleWorkers ws') mergeState
{- Wait for Asyncs from the list to finish, replacing them with their
- final AnnexStates, until the list of remaining Asyncs is not larger
- than the specified size, then returns the new list.
{- Wait for jobs from the WorkerPool to complete, until
- the number of running jobs is not larger than the specified number.
-
- If the action throws an exception, it is propigated, but first
- all other actions are waited for, to allow for a clean shutdown.
- If a job throws an exception, it is propigated, but first
- all other jobs are waited for, to allow for a clean shutdown.
-}
drainTo
:: Int
-> [Either Annex.AnnexState (Async Annex.AnnexState)]
-> IO [Either Annex.AnnexState (Async Annex.AnnexState)]
drainTo sz l
| null as || sz >= length as = return l
drainTo :: Int -> WorkerPool t -> IO (WorkerPool t)
drainTo _ UnallocatedWorkerPool = pure UnallocatedWorkerPool
drainTo sz (WorkerPool l)
| null as || sz >= length as = pure (WorkerPool l)
| otherwise = do
(done, ret) <- waitAnyCatch as
let as' = filter (/= done) as
case ret of
Left e -> do
void $ drainTo 0 (map Left sts ++ map Right as')
void $ drainTo 0 $ WorkerPool $
map Left sts ++ map Right as'
throwIO e
Right st -> do
drainTo sz $ map Left (st:sts) ++ map Right as'
drainTo sz $ WorkerPool $
map Left (st:sts) ++ map Right as'
where
(sts, as) = partitionEithers l
findFreeSlot :: [Either Annex.AnnexState (Async Annex.AnnexState)] -> Annex (Annex.AnnexState, [Either Annex.AnnexState (Async Annex.AnnexState)])
findFreeSlot :: [Worker Annex.AnnexState] -> Annex (Annex.AnnexState, WorkerPool Annex.AnnexState)
findFreeSlot = go []
where
go c [] = do
st <- dupState
return (st, c)
go c (Left st:rest) = return (st, c ++ rest)
return (st, WorkerPool c)
go c (Left st:rest) = return (st, WorkerPool (c ++ rest))
go c (v:rest) = go (v:c) rest
{- Like commandAction, but without the concurrency. -}

30
Types/WorkerPool.hs Normal file
View file

@ -0,0 +1,30 @@
{- Command worker pool.
-
- Copyright 2019 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
module Types.WorkerPool where
import Control.Concurrent.Async
import Data.Either
-- | Pool of worker threads.
data WorkerPool t
= UnallocatedWorkerPool
| WorkerPool [Worker t]
-- | A worker can either be idle or running an Async action.
type Worker t = Either t (Async t)
allocateWorkerPool :: t -> Int -> WorkerPool t
allocateWorkerPool t n = WorkerPool $ replicate n (Left t)
addWorkerPool :: WorkerPool t -> Worker t -> WorkerPool t
addWorkerPool (WorkerPool l) w = WorkerPool (w:l)
addWorkerPool UnallocatedWorkerPool w = WorkerPool [w]
idleWorkers :: WorkerPool t -> [t]
idleWorkers UnallocatedWorkerPool = []
idleWorkers (WorkerPool l) = lefts l

View file

@ -1001,6 +1001,7 @@ Executable git-annex
Types.UUID
Types.UrlContents
Types.View
Types.WorkerPool
Upgrade
Upgrade.V0
Upgrade.V1