remove accidental duplicated code

The code in Annex.WorkerStage and Annex.Concurrent was 100% identical.
This commit is contained in:
Joey Hess 2021-02-03 15:21:43 -04:00
parent 135757d64a
commit 7db4e62a90
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
2 changed files with 1 additions and 114 deletions

View file

@ -16,14 +16,11 @@ import Annex.Concurrent.Utility
import qualified Annex.Queue import qualified Annex.Queue
import Annex.Action import Annex.Action
import Types.Concurrency import Types.Concurrency
import Types.WorkerPool
import Types.CatFileHandles import Types.CatFileHandles
import Annex.CheckAttr import Annex.CheckAttr
import Annex.CheckIgnore import Annex.CheckIgnore
import Remote.List import Remote.List
import Control.Concurrent
import Control.Concurrent.STM
import qualified Data.Map as M import qualified Data.Map as M
setConcurrency :: ConcurrencySetting -> Annex () setConcurrency :: ConcurrencySetting -> Annex ()
@ -104,114 +101,3 @@ mergeState st = do
uncurry addCleanupAction uncurry addCleanupAction
Annex.Queue.mergeFrom st' Annex.Queue.mergeFrom st'
changeState $ \s -> s { errcounter = errcounter s + errcounter st' } changeState $ \s -> s { errcounter = errcounter s + errcounter st' }
{- Runs an action and makes the current thread have the specified stage
- while doing so. If too many other threads are running in the specified
- stage, waits for one of them to become idle.
-
- Noop if the current thread already has the requested stage, or if the
- current thread is not in the worker pool, or if concurrency is not
- enabled.
-
- Also a noop if the stage is not one of the stages that the worker pool
- uses.
-}
enteringStage :: WorkerStage -> Annex a -> Annex a
enteringStage newstage a = Annex.getState Annex.workers >>= \case
Nothing -> a
Just tv -> do
mytid <- liftIO myThreadId
let set = changeStageTo mytid tv (const newstage)
let restore = maybe noop (void . changeStageTo mytid tv . const)
bracket set restore (const a)
{- Transition the current thread to the initial stage.
- This is done once the thread is ready to begin work.
-}
enteringInitialStage :: Annex ()
enteringInitialStage = Annex.getState Annex.workers >>= \case
Nothing -> noop
Just tv -> do
mytid <- liftIO myThreadId
void $ changeStageTo mytid tv initialStage
{- This needs to leave the WorkerPool with the same number of
- idle and active threads, and with the same number of threads for each
- WorkerStage. So, all it can do is swap the WorkerStage of our thread's
- ActiveWorker with an IdleWorker.
-
- Must avoid a deadlock if all worker threads end up here at the same
- time, or if there are no suitable IdleWorkers left. So if necessary
- we first replace our ActiveWorker with an IdleWorker in the pool, to allow
- some other thread to use it, before waiting for a suitable IdleWorker
- for us to use.
-
- Note that the spareVals in the WorkerPool does not get anything added to
- it when adding the IdleWorker, so there will for a while be more IdleWorkers
- in the pool than spareVals. That does not prevent other threads that call
- this from using them though, so it's fine.
-}
changeStageTo :: ThreadId -> TMVar (WorkerPool AnnexState) -> (UsedStages -> WorkerStage) -> Annex (Maybe WorkerStage)
changeStageTo mytid tv getnewstage = liftIO $
replaceidle >>= maybe
(return Nothing)
(either waitidle (return . Just))
where
replaceidle = atomically $ do
pool <- takeTMVar tv
let newstage = getnewstage (usedStages pool)
let notchanging = do
putTMVar tv pool
return Nothing
if memberStage newstage (usedStages pool)
then case removeThreadIdWorkerPool mytid pool of
Just ((myaid, oldstage), pool')
| oldstage /= newstage -> case getIdleWorkerSlot newstage pool' of
Nothing -> do
putTMVar tv $
addWorkerPool (IdleWorker oldstage) pool'
return $ Just $ Left (myaid, newstage, oldstage)
Just pool'' -> do
-- optimisation
putTMVar tv $
addWorkerPool (IdleWorker oldstage) $
addWorkerPool (ActiveWorker myaid newstage) pool''
return $ Just $ Right oldstage
| otherwise -> notchanging
_ -> notchanging
else notchanging
waitidle (myaid, newstage, oldstage) = atomically $ do
pool <- waitIdleWorkerSlot newstage =<< takeTMVar tv
putTMVar tv $ addWorkerPool (ActiveWorker myaid newstage) pool
return (Just oldstage)
-- | Waits until there's an idle StartStage worker in the worker pool,
-- removes it from the pool, and returns its state.
--
-- If the worker pool is not already allocated, returns Nothing.
waitStartWorkerSlot :: TMVar (WorkerPool Annex.AnnexState) -> STM (Maybe (Annex.AnnexState, WorkerStage))
waitStartWorkerSlot tv = do
pool <- takeTMVar tv
st <- go pool
return $ Just (st, StartStage)
where
go pool = case spareVals pool of
[] -> retry
(v:vs) -> do
let pool' = pool { spareVals = vs }
putTMVar tv =<< waitIdleWorkerSlot StartStage pool'
return v
waitIdleWorkerSlot :: WorkerStage -> WorkerPool Annex.AnnexState -> STM (WorkerPool Annex.AnnexState)
waitIdleWorkerSlot wantstage = maybe retry return . getIdleWorkerSlot wantstage
getIdleWorkerSlot :: WorkerStage -> WorkerPool Annex.AnnexState -> Maybe (WorkerPool Annex.AnnexState)
getIdleWorkerSlot wantstage pool = do
l <- findidle [] (workerList pool)
return $ pool { workerList = l }
where
findidle _ [] = Nothing
findidle c ((IdleWorker stage):rest)
| stage == wantstage = Just (c ++ rest)
findidle c (w:rest) = findidle (w:c) rest

View file

@ -12,6 +12,7 @@ module CmdLine.Action where
import Annex.Common import Annex.Common
import qualified Annex import qualified Annex
import Annex.Concurrent import Annex.Concurrent
import Annex.WorkerPool
import Types.Command import Types.Command
import Types.Concurrency import Types.Concurrency
import Messages.Concurrent import Messages.Concurrent