Fix a crash (STM deadlock) when -J is used with multiple files that point to the same key
See the comment for a trace of the deadlock. Added a new StartStage. New worker threads begin in the StartStage. Once a thread is ready to do work, it moves away from the StartStage, and no thread will ever transition back to it. A thread that blocks waiting on another thread that is processing the same key will block while in the StartStage. That other thread will never switch back to the StartStage, and so the deadlock is avoided.
This commit is contained in:
parent
20d9a9b662
commit
667d38a8f1
6 changed files with 114 additions and 24 deletions
|
@ -90,10 +90,20 @@ enteringStage newstage a = Annex.getState Annex.workers >>= \case
|
|||
Nothing -> a
|
||||
Just tv -> do
|
||||
mytid <- liftIO myThreadId
|
||||
let set = changeStageTo mytid tv newstage
|
||||
let restore = maybe noop (void . changeStageTo mytid tv)
|
||||
let set = changeStageTo mytid tv (const newstage)
|
||||
let restore = maybe noop (void . changeStageTo mytid tv . const)
|
||||
bracket set restore (const a)
|
||||
|
||||
{- Transition the current thread to the initial stage.
|
||||
- This is done once the thread is ready to begin work.
|
||||
-}
|
||||
enteringInitialStage :: Annex ()
|
||||
enteringInitialStage = Annex.getState Annex.workers >>= \case
|
||||
Nothing -> noop
|
||||
Just tv -> do
|
||||
mytid <- liftIO myThreadId
|
||||
void $ changeStageTo mytid tv initialStage
|
||||
|
||||
{- This needs to leave the WorkerPool with the same number of
|
||||
- idle and active threads, and with the same number of threads for each
|
||||
- WorkerStage. So, all it can do is swap the WorkerStage of our thread's
|
||||
|
@ -110,14 +120,15 @@ enteringStage newstage a = Annex.getState Annex.workers >>= \case
|
|||
- in the pool than spareVals. That does not prevent other threads that call
|
||||
- this from using them though, so it's fine.
|
||||
-}
|
||||
changeStageTo :: ThreadId -> TMVar (WorkerPool AnnexState) -> WorkerStage -> Annex (Maybe WorkerStage)
|
||||
changeStageTo mytid tv newstage = liftIO $
|
||||
changeStageTo :: ThreadId -> TMVar (WorkerPool AnnexState) -> (UsedStages -> WorkerStage) -> Annex (Maybe WorkerStage)
|
||||
changeStageTo mytid tv getnewstage = liftIO $
|
||||
replaceidle >>= maybe
|
||||
(return Nothing)
|
||||
(either waitidle (return . Just))
|
||||
where
|
||||
replaceidle = atomically $ do
|
||||
pool <- takeTMVar tv
|
||||
let newstage = getnewstage (usedStages pool)
|
||||
let notchanging = do
|
||||
putTMVar tv pool
|
||||
return Nothing
|
||||
|
@ -128,7 +139,7 @@ changeStageTo mytid tv newstage = liftIO $
|
|||
Nothing -> do
|
||||
putTMVar tv $
|
||||
addWorkerPool (IdleWorker oldstage) pool'
|
||||
return $ Just $ Left (myaid, oldstage)
|
||||
return $ Just $ Left (myaid, newstage, oldstage)
|
||||
Just pool'' -> do
|
||||
-- optimisation
|
||||
putTMVar tv $
|
||||
|
@ -139,27 +150,26 @@ changeStageTo mytid tv newstage = liftIO $
|
|||
_ -> notchanging
|
||||
else notchanging
|
||||
|
||||
waitidle (myaid, oldstage) = atomically $ do
|
||||
waitidle (myaid, newstage, oldstage) = atomically $ do
|
||||
pool <- waitIdleWorkerSlot newstage =<< takeTMVar tv
|
||||
putTMVar tv $ addWorkerPool (ActiveWorker myaid newstage) pool
|
||||
return (Just oldstage)
|
||||
|
||||
-- | Waits until there's an idle worker in the worker pool
|
||||
-- for its initial stage, removes it from the pool, and returns its state.
|
||||
-- | Waits until there's an idle StartStage worker in the worker pool,
|
||||
-- removes it from the pool, and returns its state.
|
||||
--
|
||||
-- If the worker pool is not already allocated, returns Nothing.
|
||||
waitInitialWorkerSlot :: TMVar (WorkerPool Annex.AnnexState) -> STM (Maybe (Annex.AnnexState, WorkerStage))
|
||||
waitInitialWorkerSlot tv = do
|
||||
waitStartWorkerSlot :: TMVar (WorkerPool Annex.AnnexState) -> STM (Maybe (Annex.AnnexState, WorkerStage))
|
||||
waitStartWorkerSlot tv = do
|
||||
pool <- takeTMVar tv
|
||||
let stage = initialStage (usedStages pool)
|
||||
st <- go stage pool
|
||||
return $ Just (st, stage)
|
||||
st <- go pool
|
||||
return $ Just (st, StartStage)
|
||||
where
|
||||
go wantstage pool = case spareVals pool of
|
||||
go pool = case spareVals pool of
|
||||
[] -> retry
|
||||
(v:vs) -> do
|
||||
let pool' = pool { spareVals = vs }
|
||||
putTMVar tv =<< waitIdleWorkerSlot wantstage pool'
|
||||
putTMVar tv =<< waitIdleWorkerSlot StartStage pool'
|
||||
return v
|
||||
|
||||
waitIdleWorkerSlot :: WorkerStage -> WorkerPool Annex.AnnexState -> STM (WorkerPool Annex.AnnexState)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue