avoid STM deadlock onredundant call to changeStageTo
I couldn't find a way to avoid the deadlock w/o rewriting it to clearly not have one. I'm not quite sure what was the actual cause of the deadlock. This makes me unsure how I now know it clearly doesn't have a deadlock. But, it was easy to reproduce before (just call it twice in a row) and doesn't happen now.
This commit is contained in:
parent
ecbd456312
commit
5a9842d7ed
1 changed files with 30 additions and 25 deletions
|
@ -61,14 +61,14 @@ commandAction start = Annex.getState Annex.concurrency >>= \case
|
|||
where
|
||||
runconcurrent n = do
|
||||
tv <- Annex.getState Annex.workers
|
||||
workerst <- waitWorkerSlot n (== PerformStage) tv
|
||||
workerst <- waitWorkerSlot n PerformStage tv
|
||||
aid <- liftIO $ async $ snd <$> Annex.run workerst
|
||||
(concurrentjob workerst)
|
||||
liftIO $ atomically $ do
|
||||
pool <- takeTMVar tv
|
||||
let !pool' = addWorkerPool (ActiveWorker aid PerformStage) pool
|
||||
putTMVar tv pool'
|
||||
void $ liftIO $ forkIO $ do
|
||||
void $ liftIO $ forkIO $ debugLocks $ do
|
||||
-- accountCommandAction will usually catch
|
||||
-- exceptions. Just in case, fall back to the
|
||||
-- original workerst.
|
||||
|
@ -122,8 +122,8 @@ commandAction start = Annex.getState Annex.concurrency >>= \case
|
|||
-- pool, and return its state.
|
||||
--
|
||||
-- If the pool is unallocated, it will be allocated to the specified size.
|
||||
waitWorkerSlot :: Int -> (WorkerStage -> Bool) -> TMVar (WorkerPool Annex.AnnexState) -> Annex (Annex.AnnexState)
|
||||
waitWorkerSlot n wantstage tv =
|
||||
waitWorkerSlot :: Int -> WorkerStage -> TMVar (WorkerPool Annex.AnnexState) -> Annex Annex.AnnexState
|
||||
waitWorkerSlot n wantstage tv = debugLocks $
|
||||
join $ liftIO $ atomically $ waitWorkerSlot' wantstage tv >>= \case
|
||||
Nothing -> return $ do
|
||||
-- Generate the remote list now, to avoid
|
||||
|
@ -142,26 +142,30 @@ waitWorkerSlot n wantstage tv =
|
|||
where
|
||||
findidle st _ [] = (st, WorkerPool [])
|
||||
findidle _ c ((IdleWorker st stage):rest)
|
||||
| wantstage stage = (st, WorkerPool (c ++ rest))
|
||||
| stage == wantstage = (st, WorkerPool (c ++ rest))
|
||||
findidle st c (w:rest) = findidle st (w:c) rest
|
||||
|
||||
-- | STM action that waits until there's an idle worker in the worker pool.
|
||||
-- | STM action that waits until there's an idle worker in the worker pool,
|
||||
-- removes it from the pool, and returns its state.
|
||||
--
|
||||
-- If the worker pool is not already allocated, returns Nothing.
|
||||
waitWorkerSlot' :: (WorkerStage -> Bool) -> TMVar (WorkerPool Annex.AnnexState) -> STM (Maybe (Annex.AnnexState))
|
||||
waitWorkerSlot' :: WorkerStage -> TMVar (WorkerPool Annex.AnnexState) -> STM (Maybe (Annex.AnnexState))
|
||||
waitWorkerSlot' wantstage tv =
|
||||
takeTMVar tv >>= \case
|
||||
UnallocatedWorkerPool -> do
|
||||
putTMVar tv UnallocatedWorkerPool
|
||||
return Nothing
|
||||
WorkerPool l -> do
|
||||
(st, pool') <- findidle [] l
|
||||
(st, pool') <- waitWorkerSlot'' wantstage l
|
||||
putTMVar tv pool'
|
||||
return $ Just st
|
||||
where
|
||||
|
||||
waitWorkerSlot'' :: WorkerStage -> [Worker Annex.AnnexState] -> STM (Annex.AnnexState, WorkerPool Annex.AnnexState)
|
||||
waitWorkerSlot'' wantstage = findidle []
|
||||
where
|
||||
findidle _ [] = retry
|
||||
findidle c ((IdleWorker st stage):rest)
|
||||
| wantstage stage = return (st, WorkerPool (c ++ rest))
|
||||
| stage == wantstage = return (st, WorkerPool (c ++ rest))
|
||||
findidle c (w:rest) = findidle (w:c) rest
|
||||
|
||||
{- Waits for all worker threads to finish and merges their AnnexStates
|
||||
|
@ -185,24 +189,25 @@ finishCommandActions = do
|
|||
- for each stage. So, an idle worker with the desired stage is found in
|
||||
- the pool (waiting if necessary for one to become idle), and the stages
|
||||
- of it and the current thread are swapped.
|
||||
-
|
||||
- Noop if the current thread already has the requested stage, or if the
|
||||
- current thread is not in the worker pool, or if concurrency is not
|
||||
- enabled.
|
||||
-}
|
||||
changeStageTo :: WorkerStage -> Annex ()
|
||||
changeStageTo newstage = do
|
||||
changeStageTo newstage = debugLocks $ do
|
||||
mytid <- liftIO myThreadId
|
||||
tv <- Annex.getState Annex.workers
|
||||
liftIO $ atomically $ waitWorkerSlot' (== newstage) tv >>= \case
|
||||
Just idlest -> do
|
||||
pool <- takeTMVar tv
|
||||
let restorepool = addWorkerPool (IdleWorker idlest newstage) pool
|
||||
let pool' = case removeThreadIdWorkerPool mytid pool of
|
||||
Just ((myaid, oldstage), p) ->
|
||||
addWorkerPool (IdleWorker idlest oldstage) $
|
||||
addWorkerPool (ActiveWorker myaid newstage) p
|
||||
Nothing -> restorepool
|
||||
putTMVar tv pool'
|
||||
-- No worker pool is allocated, not running in concurrent
|
||||
-- mode.
|
||||
Nothing -> noop
|
||||
liftIO $ atomically $ do
|
||||
pool <- takeTMVar tv
|
||||
case removeThreadIdWorkerPool mytid pool of
|
||||
Just ((myaid, oldstage), WorkerPool l)
|
||||
| oldstage /= newstage -> do
|
||||
(idlest, restpool) <- waitWorkerSlot'' newstage l
|
||||
let pool' = addWorkerPool (IdleWorker idlest oldstage) $
|
||||
addWorkerPool (ActiveWorker myaid newstage) restpool
|
||||
putTMVar tv pool'
|
||||
_ -> putTMVar tv pool
|
||||
|
||||
{- Like commandAction, but without the concurrency. -}
|
||||
includeCommandAction :: CommandStart -> CommandCleanup
|
||||
|
@ -306,7 +311,7 @@ allowConcurrentOutput a = do
|
|||
-
|
||||
- May be called repeatedly by the same thread without blocking. -}
|
||||
ensureOnlyActionOn :: Key -> Annex a -> Annex a
|
||||
ensureOnlyActionOn k a =
|
||||
ensureOnlyActionOn k a = debugLocks $
|
||||
go =<< Annex.getState Annex.concurrency
|
||||
where
|
||||
go NonConcurrent = a
|
||||
|
|
Loading…
Reference in a new issue