avoid STM deadlock
When all worker threads are running and enteringStage is called,
it waits for an idle slot. If all off the other threads then call it in
turn, a deadlock occurrs.
This is the same problem I didn't actually fix in
Fixed by doing two separate STM transactions, the first replaces its
active thread with an idle thread, and the second waits for another idle
thread. That guarantees there will eventually be an idle thread to find.
The changes to WorkerPool were necessary because it can't add an idle
thread containing the Annex state and go on to run an action using that
same state, so I had to remove the Annex state from IdleWorker.
@ -84,11 +84,6 @@ stopCoProcesses = do
- Also a noop if the stage is not one of the stages that the worker pool
- uses.
- The pool needs to continue to contain the same number of worker threads
- for each stage. So, an idle worker with the desired stage is found in
- the pool (waiting if necessary for one to become idle), and the stages
- of it and the current thread are swapped.
enteringStage :: WorkerStage -> Annex a -> Annex a
enteringStage newstage a = Annex.getState Annex.workers >>= \case
@ -99,26 +94,49 @@ enteringStage newstage a = Annex.getState Annex.workers >>= \case
let restore = maybe noop (void . changeStageTo mytid tv)
bracket set restore (const a)
{- This needs to leave the WorkerPool with the same number of
- idle and active threads, and with the same number of threads for each
- WorkerStage. So, all it can do is swap the WorkerStage of our thread's
- ActiveWorker with an IdleWorker.
- Must avoid a deadlock if all worker threads end up here at the same
- time, or if there are no suitable IdleWorkers left. So we first
- replace our ActiveWorker with an IdleWorker in the pool, to allow
- some other thread to use it, before waiting for a suitable IdleWorker
- for us to use.
- Note that the spareVals in the WorkerPool does not get anything added to
- it when adding the IdleWorker, so there will for a while be more IdleWorkers
- in the pool than spareVals. That does not prevent other threads that call
- this from using them though, so it's fine.
changeStageTo :: ThreadId -> TMVar (WorkerPool AnnexState) -> WorkerStage -> Annex (Maybe WorkerStage)
changeStageTo mytid tv newstage = liftIO $ atomically $ do
pool <- takeTMVar tv
case pool of
WorkerPool usedstages _
| memberStage newstage usedstages ->
case removeThreadIdWorkerPool mytid pool of
Just ((myaid, oldstage), WorkerPool usedstages' l)
| oldstage /= newstage -> do
(idlest, restpool) <- waitWorkerSlot usedstages' newstage l
let pool' = addWorkerPool (IdleWorker idlest oldstage) $
addWorkerPool (ActiveWorker myaid newstage) restpool
putTMVar tv pool'
return (Just oldstage)
_ -> do
changeStageTo mytid tv newstage = liftIO $
replaceidle >>= maybe (return Nothing) waitidle
replaceidle = atomically $ do
pool <- takeTMVar tv
if memberStage newstage (usedStages pool)
then case removeThreadIdWorkerPool mytid pool of
Just ((myaid, oldstage), pool')
| oldstage /= newstage -> do
putTMVar tv $
addWorkerPool (IdleWorker oldstage) pool'
return $ Just (myaid, oldstage)
| otherwise -> do
putTMVar tv pool
return Nothing
_ -> do
putTMVar tv pool
return Nothing
_ -> do
putTMVar tv pool
return Nothing
else do
putTMVar tv pool
return Nothing
waitidle (myaid, oldstage) = atomically $ do
pool <- waitIdleWorkerSlot newstage =<< takeTMVar tv
putTMVar tv $ addWorkerPool (ActiveWorker myaid newstage) pool
return (Just oldstage)
-- | Waits until there's an idle worker in the worker pool
-- for its initial stage, removes it from the pool, and returns its state.
@ -126,18 +144,24 @@ changeStageTo mytid tv newstage = liftIO $ atomically $ do
-- If the worker pool is not already allocated, returns Nothing.
waitInitialWorkerSlot :: TMVar (WorkerPool Annex.AnnexState) -> STM (Maybe (Annex.AnnexState, WorkerStage))
waitInitialWorkerSlot tv = do
WorkerPool usedstages l <- takeTMVar tv
let stage = initialStage usedstages
(st, pool') <- waitWorkerSlot usedstages stage l
putTMVar tv pool'
pool <- takeTMVar tv
let stage = initialStage (usedStages pool)
st <- go stage pool
return $ Just (st, stage)
go wantstage pool = case spareVals pool of
[] -> retry
(v:vs) -> do
let pool' = pool { spareVals = vs }
putTMVar tv =<< waitIdleWorkerSlot wantstage pool'
return v
-- | Waits until there's an idle worker for the specified stage, and returns
-- its state and a WorkerPool containing all the other workers.
waitWorkerSlot :: UsedStages -> WorkerStage -> [Worker Annex.AnnexState] -> STM (Annex.AnnexState, WorkerPool Annex.AnnexState)
waitWorkerSlot usedstages wantstage = findidle []
waitIdleWorkerSlot :: WorkerStage -> WorkerPool Annex.AnnexState -> STM (WorkerPool Annex.AnnexState)
waitIdleWorkerSlot wantstage pool = do
l <- findidle [] (workerList pool)
return $ pool { workerList = l }
findidle _ [] = retry
findidle c ((IdleWorker st stage):rest)
| stage == wantstage = return (st, WorkerPool usedstages (c ++ rest))
findidle c ((IdleWorker stage):rest) | stage == wantstage =
return (c ++ rest)
findidle c (w:rest) = findidle (w:c) rest
@ -129,8 +129,8 @@ finishCommandActions = Annex.getState Annex.workers >>= \case
Nothing -> noop
Just tv -> do
Annex.changeState $ \s -> s { Annex.workers = Nothing }
WorkerPool _ l <- liftIO $ atomically $ takeTMVar tv
forM_ (mapMaybe workerAsync l) $ \aid ->
pool <- liftIO $ atomically $ takeTMVar tv
forM_ (mapMaybe workerAsync $ workerList pool) $ \aid ->
liftIO (waitCatch aid) >>= \case
Left _ -> noop
Right st -> mergeState st
@ -12,17 +12,23 @@ import Control.Concurrent.Async
import qualified Data.Set as S
-- | Pool of worker threads.
data WorkerPool t = WorkerPool UsedStages [Worker t]
data WorkerPool t = WorkerPool
{ usedStages :: UsedStages
, workerList :: [Worker t]
, spareVals :: [t]
-- ^ Normally there is one value for each IdleWorker,
-- but there can temporarily be fewer.
deriving (Show)
-- | A worker can either be idle or running an Async action.
-- And it is used for some stage.
data Worker t
= IdleWorker t WorkerStage
= IdleWorker WorkerStage
| ActiveWorker (Async t) WorkerStage
instance Show (Worker t) where
show (IdleWorker _ s) = "IdleWorker " ++ show s
show (IdleWorker s) = "IdleWorker " ++ show s
show (ActiveWorker _ s) = "ActiveWorker " ++ show s
data WorkerStage
@ -46,12 +52,12 @@ data WorkerStage
-- stage, and so there will be no blocking before starting them.
data UsedStages = UsedStages
{ initialStage :: WorkerStage
, usedStages :: S.Set WorkerStage
, stageSet :: S.Set WorkerStage
deriving (Show)
memberStage :: WorkerStage -> UsedStages -> Bool
memberStage s u = S.member s (usedStages u)
memberStage s u = S.member s (stageSet u)
-- | The default is to use only the CommandPerform and CommandCleanup
-- stages. Since cleanup actions often don't contend much with
@ -60,7 +66,7 @@ memberStage s u = S.member s (usedStages u)
commandStages :: UsedStages
commandStages = UsedStages
{ initialStage = PerformStage
, usedStages = S.fromList [PerformStage, CleanupStage]
, stageSet = S.fromList [PerformStage, CleanupStage]
-- | When a command is transferring content, it can use this instead.
@ -70,15 +76,15 @@ commandStages = UsedStages
transferStages :: UsedStages
transferStages = UsedStages
{ initialStage = TransferStage
, usedStages = S.fromList [TransferStage, VerifyStage]
, stageSet = S.fromList [TransferStage, VerifyStage]
workerStage :: Worker t -> WorkerStage
workerStage (IdleWorker _ s) = s
workerStage (IdleWorker s) = s
workerStage (ActiveWorker _ s) = s
workerAsync :: Worker t -> Maybe (Async t)
workerAsync (IdleWorker _ _) = Nothing
workerAsync (IdleWorker _) = Nothing
workerAsync (ActiveWorker aid _) = Just aid
-- | Allocates a WorkerPool that has the specified number of workers
@ -86,39 +92,41 @@ workerAsync (ActiveWorker aid _) = Just aid
-- The stages are distributed evenly throughout.
allocateWorkerPool :: t -> Int -> UsedStages -> WorkerPool t
allocateWorkerPool t n u = WorkerPool u $ take (n+n) $
map (uncurry IdleWorker) $ zip (repeat t) stages
allocateWorkerPool t n u = WorkerPool
{ usedStages = u
, workerList = take totalthreads $ map IdleWorker stages
, spareVals = replicate totalthreads t
stages = concat $ repeat $ S.toList $ usedStages u
stages = concat $ repeat $ S.toList $ stageSet u
totalthreads = n * S.size (stageSet u)
addWorkerPool :: Worker t -> WorkerPool t -> WorkerPool t
addWorkerPool w (WorkerPool u l) = WorkerPool u (w:l)
idleWorkers :: WorkerPool t -> [t]
idleWorkers (WorkerPool _ l) = go l
go [] = []
go (IdleWorker t _ : rest) = t : go rest
go (ActiveWorker _ _ : rest) = go rest
addWorkerPool w pool = pool { workerList = w : workerList pool }
-- | Removes a worker from the pool whose Async uses the ThreadId.
-- Each Async has its own ThreadId, so this stops once it finds
-- a match.
removeThreadIdWorkerPool :: ThreadId -> WorkerPool t -> Maybe ((Async t, WorkerStage), WorkerPool t)
removeThreadIdWorkerPool tid (WorkerPool u l) = go [] l
removeThreadIdWorkerPool tid pool = go [] (workerList pool)
go _ [] = Nothing
go c (ActiveWorker a stage : rest)
| asyncThreadId a == tid = Just ((a, stage), WorkerPool u (c++rest))
| asyncThreadId a == tid =
let pool' = pool { workerList = (c++rest) }
in Just ((a, stage), pool')
go c (v : rest) = go (v:c) rest
deactivateWorker :: WorkerPool t -> Async t -> t -> WorkerPool t
deactivateWorker (WorkerPool u l) aid t = WorkerPool u $ go l
deactivateWorker pool aid t = pool
{ workerList = go (workerList pool)
, spareVals = t : spareVals pool
go [] = []
go (w@(IdleWorker _ _) : rest) = w : go rest
go (w@(IdleWorker _) : rest) = w : go rest
go (w@(ActiveWorker a st) : rest)
| a == aid = IdleWorker t st : rest
| a == aid = IdleWorker st : rest
| otherwise = w : go rest
