diff --git a/Annex.hs b/Annex.hs index d80500cf5b..d642885bc8 100644 --- a/Annex.hs +++ b/Annex.hs @@ -142,7 +142,7 @@ data AnnexState = AnnexState , tempurls :: M.Map Key URLString , existinghooks :: M.Map Git.Hook.Hook Bool , desktopnotify :: DesktopNotify - , workers :: TMVar (WorkerPool AnnexState) + , workers :: Maybe (TMVar (WorkerPool AnnexState)) , activekeys :: TVar (M.Map Key ThreadId) , activeremotes :: MVar (M.Map (Types.Remote.RemoteA Annex) Integer) , keysdbhandle :: Maybe Keys.DbHandle @@ -155,7 +155,6 @@ newState :: GitConfig -> Git.Repo -> IO AnnexState newState c r = do emptyactiveremotes <- newMVar M.empty emptyactivekeys <- newTVarIO M.empty - emptyworkerpool <- newTMVarIO UnallocatedWorkerPool o <- newMessageState sc <- newTMVarIO False return $ AnnexState @@ -200,7 +199,7 @@ newState c r = do , tempurls = M.empty , existinghooks = M.empty , desktopnotify = mempty - , workers = emptyworkerpool + , workers = Nothing , activekeys = emptyactivekeys , activeremotes = emptyactiveremotes , keysdbhandle = Nothing diff --git a/Annex/Concurrent.hs b/Annex/Concurrent.hs index 1423e28ffc..4908b9949c 100644 --- a/Annex/Concurrent.hs +++ b/Annex/Concurrent.hs @@ -91,12 +91,13 @@ stopCoProcesses = do - of it and the current thread are swapped. -} enteringStage :: WorkerStage -> Annex a -> Annex a -enteringStage newstage a = do - mytid <- liftIO myThreadId - tv <- Annex.getState Annex.workers - let set = changeStageTo mytid tv newstage - let restore = maybe noop (void . changeStageTo mytid tv) - bracket set restore (const a) +enteringStage newstage a = Annex.getState Annex.workers >>= \case + Nothing -> a + Just tv -> do + mytid <- liftIO myThreadId + let set = changeStageTo mytid tv newstage + let restore = maybe noop (void . changeStageTo mytid tv) + bracket set restore (const a) changeStageTo :: ThreadId -> TMVar (WorkerPool AnnexState) -> WorkerStage -> Annex (Maybe WorkerStage) changeStageTo mytid tv newstage = liftIO $ atomically $ do @@ -124,16 +125,12 @@ changeStageTo mytid tv newstage = liftIO $ atomically $ do -- -- If the worker pool is not already allocated, returns Nothing. waitInitialWorkerSlot :: TMVar (WorkerPool Annex.AnnexState) -> STM (Maybe (Annex.AnnexState, WorkerStage)) -waitInitialWorkerSlot tv = - takeTMVar tv >>= \case - UnallocatedWorkerPool -> do - putTMVar tv UnallocatedWorkerPool - return Nothing - WorkerPool usedstages l -> do - let stage = initialStage usedstages - (st, pool') <- waitWorkerSlot usedstages stage l - putTMVar tv pool' - return $ Just (st, stage) +waitInitialWorkerSlot tv = do + WorkerPool usedstages l <- takeTMVar tv + let stage = initialStage usedstages + (st, pool') <- waitWorkerSlot usedstages stage l + putTMVar tv pool' + return $ Just (st, stage) -- | Waits until there's an idle worker for the specified stage, and returns -- its state and a WorkerPool containing all the other workers. diff --git a/CmdLine/Action.hs b/CmdLine/Action.hs index a2961f7d09..20ab5b6e56 100644 --- a/CmdLine/Action.hs +++ b/CmdLine/Action.hs @@ -60,10 +60,11 @@ commandAction start = Annex.getState Annex.concurrency >>= \case ConcurrentPerCpu -> runconcurrent where runnonconcurrent = void $ includeCommandAction start - runconcurrent = do - tv <- Annex.getState Annex.workers - liftIO (atomically (waitInitialWorkerSlot tv)) >>= - maybe runnonconcurrent (runconcurrent' tv) + runconcurrent = Annex.getState Annex.workers >>= \case + Nothing -> runnonconcurrent + Just tv -> + liftIO (atomically (waitInitialWorkerSlot tv)) >>= + maybe runnonconcurrent (runconcurrent' tv) runconcurrent' tv (workerst, workerstage) = do aid <- liftIO $ async $ snd <$> Annex.run workerst (concurrentjob workerst) @@ -124,13 +125,12 @@ commandAction start = Annex.getState Annex.concurrency >>= \case - back into the current Annex's state. -} finishCommandActions :: Annex () -finishCommandActions = do - tv <- Annex.getState Annex.workers - pool <- liftIO $ atomically $ - swapTMVar tv UnallocatedWorkerPool - case pool of - UnallocatedWorkerPool -> noop - WorkerPool _ l -> forM_ (mapMaybe workerAsync l) $ \aid -> +finishCommandActions = Annex.getState Annex.workers >>= \case + Nothing -> noop + Just tv -> do + Annex.changeState $ \s -> s { Annex.workers = Nothing } + WorkerPool _ l <- liftIO $ atomically $ takeTMVar tv + forM_ (mapMaybe workerAsync l) $ \aid -> liftIO (waitCatch aid) >>= \case Left _ -> noop Right st -> mergeState st @@ -248,9 +248,9 @@ startConcurrency usedstages a = do -- setConfig. _ <- remoteList st <- dupState - tv <- Annex.getState Annex.workers - liftIO $ atomically $ putTMVar tv $ + tv <- liftIO $ newTMVarIO $ allocateWorkerPool st (max n 1) usedstages + Annex.changeState $ \s -> s { Annex.workers = Just tv } {- Ensures that only one thread processes a key at a time. - Other threads will block until it's done. diff --git a/Types/WorkerPool.hs b/Types/WorkerPool.hs index a0695cf999..f993d3fdf0 100644 --- a/Types/WorkerPool.hs +++ b/Types/WorkerPool.hs @@ -12,9 +12,7 @@ import Control.Concurrent.Async import qualified Data.Set as S -- | Pool of worker threads. -data WorkerPool t - = UnallocatedWorkerPool - | WorkerPool UsedStages [Worker t] +data WorkerPool t = WorkerPool UsedStages [Worker t] deriving (Show) -- | A worker can either be idle or running an Async action. @@ -95,10 +93,8 @@ allocateWorkerPool t n u = WorkerPool u $ take (n+n) $ addWorkerPool :: Worker t -> WorkerPool t -> WorkerPool t addWorkerPool w (WorkerPool u l) = WorkerPool u (w:l) -addWorkerPool _ UnallocatedWorkerPool = UnallocatedWorkerPool idleWorkers :: WorkerPool t -> [t] -idleWorkers UnallocatedWorkerPool = [] idleWorkers (WorkerPool _ l) = go l where go [] = [] @@ -110,7 +106,6 @@ idleWorkers (WorkerPool _ l) = go l -- Each Async has its own ThreadId, so this stops once it finds -- a match. removeThreadIdWorkerPool :: ThreadId -> WorkerPool t -> Maybe ((Async t, WorkerStage), WorkerPool t) -removeThreadIdWorkerPool _ UnallocatedWorkerPool = Nothing removeThreadIdWorkerPool tid (WorkerPool u l) = go [] l where go _ [] = Nothing @@ -119,7 +114,6 @@ removeThreadIdWorkerPool tid (WorkerPool u l) = go [] l go c (v : rest) = go (v:c) rest deactivateWorker :: WorkerPool t -> Async t -> t -> WorkerPool t -deactivateWorker UnallocatedWorkerPool _ _ = UnallocatedWorkerPool deactivateWorker (WorkerPool u l) aid t = WorkerPool u $ go l where go [] = []