From 5a9842d7ed2d57ee8c5eb8f5da24da6bfdbd37fa Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 17 Jun 2019 14:51:30 -0400 Subject: [PATCH] avoid STM deadlock onredundant call to changeStageTo I couldn't find a way to avoid the deadlock w/o rewriting it to clearly not have one. I'm not quite sure what was the actual cause of the deadlock. This makes me unsure how I now know it clearly doesn't have a deadlock. But, it was easy to reproduce before (just call it twice in a row) and doesn't happen now. --- CmdLine/Action.hs | 55 ++++++++++++++++++++++++++--------------------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/CmdLine/Action.hs b/CmdLine/Action.hs index be18af8f90..48feca1f91 100644 --- a/CmdLine/Action.hs +++ b/CmdLine/Action.hs @@ -61,14 +61,14 @@ commandAction start = Annex.getState Annex.concurrency >>= \case where runconcurrent n = do tv <- Annex.getState Annex.workers - workerst <- waitWorkerSlot n (== PerformStage) tv + workerst <- waitWorkerSlot n PerformStage tv aid <- liftIO $ async $ snd <$> Annex.run workerst (concurrentjob workerst) liftIO $ atomically $ do pool <- takeTMVar tv let !pool' = addWorkerPool (ActiveWorker aid PerformStage) pool putTMVar tv pool' - void $ liftIO $ forkIO $ do + void $ liftIO $ forkIO $ debugLocks $ do -- accountCommandAction will usually catch -- exceptions. Just in case, fall back to the -- original workerst. @@ -122,8 +122,8 @@ commandAction start = Annex.getState Annex.concurrency >>= \case -- pool, and return its state. -- -- If the pool is unallocated, it will be allocated to the specified size. -waitWorkerSlot :: Int -> (WorkerStage -> Bool) -> TMVar (WorkerPool Annex.AnnexState) -> Annex (Annex.AnnexState) -waitWorkerSlot n wantstage tv = +waitWorkerSlot :: Int -> WorkerStage -> TMVar (WorkerPool Annex.AnnexState) -> Annex Annex.AnnexState +waitWorkerSlot n wantstage tv = debugLocks $ join $ liftIO $ atomically $ waitWorkerSlot' wantstage tv >>= \case Nothing -> return $ do -- Generate the remote list now, to avoid @@ -142,26 +142,30 @@ waitWorkerSlot n wantstage tv = where findidle st _ [] = (st, WorkerPool []) findidle _ c ((IdleWorker st stage):rest) - | wantstage stage = (st, WorkerPool (c ++ rest)) + | stage == wantstage = (st, WorkerPool (c ++ rest)) findidle st c (w:rest) = findidle st (w:c) rest --- | STM action that waits until there's an idle worker in the worker pool. +-- | STM action that waits until there's an idle worker in the worker pool, +-- removes it from the pool, and returns its state. -- -- If the worker pool is not already allocated, returns Nothing. -waitWorkerSlot' :: (WorkerStage -> Bool) -> TMVar (WorkerPool Annex.AnnexState) -> STM (Maybe (Annex.AnnexState)) +waitWorkerSlot' :: WorkerStage -> TMVar (WorkerPool Annex.AnnexState) -> STM (Maybe (Annex.AnnexState)) waitWorkerSlot' wantstage tv = takeTMVar tv >>= \case UnallocatedWorkerPool -> do putTMVar tv UnallocatedWorkerPool return Nothing WorkerPool l -> do - (st, pool') <- findidle [] l + (st, pool') <- waitWorkerSlot'' wantstage l putTMVar tv pool' return $ Just st - where + +waitWorkerSlot'' :: WorkerStage -> [Worker Annex.AnnexState] -> STM (Annex.AnnexState, WorkerPool Annex.AnnexState) +waitWorkerSlot'' wantstage = findidle [] + where findidle _ [] = retry findidle c ((IdleWorker st stage):rest) - | wantstage stage = return (st, WorkerPool (c ++ rest)) + | stage == wantstage = return (st, WorkerPool (c ++ rest)) findidle c (w:rest) = findidle (w:c) rest {- Waits for all worker threads to finish and merges their AnnexStates @@ -185,24 +189,25 @@ finishCommandActions = do - for each stage. So, an idle worker with the desired stage is found in - the pool (waiting if necessary for one to become idle), and the stages - of it and the current thread are swapped. + - + - Noop if the current thread already has the requested stage, or if the + - current thread is not in the worker pool, or if concurrency is not + - enabled. -} changeStageTo :: WorkerStage -> Annex () -changeStageTo newstage = do +changeStageTo newstage = debugLocks $ do mytid <- liftIO myThreadId tv <- Annex.getState Annex.workers - liftIO $ atomically $ waitWorkerSlot' (== newstage) tv >>= \case - Just idlest -> do - pool <- takeTMVar tv - let restorepool = addWorkerPool (IdleWorker idlest newstage) pool - let pool' = case removeThreadIdWorkerPool mytid pool of - Just ((myaid, oldstage), p) -> - addWorkerPool (IdleWorker idlest oldstage) $ - addWorkerPool (ActiveWorker myaid newstage) p - Nothing -> restorepool - putTMVar tv pool' - -- No worker pool is allocated, not running in concurrent - -- mode. - Nothing -> noop + liftIO $ atomically $ do + pool <- takeTMVar tv + case removeThreadIdWorkerPool mytid pool of + Just ((myaid, oldstage), WorkerPool l) + | oldstage /= newstage -> do + (idlest, restpool) <- waitWorkerSlot'' newstage l + let pool' = addWorkerPool (IdleWorker idlest oldstage) $ + addWorkerPool (ActiveWorker myaid newstage) restpool + putTMVar tv pool' + _ -> putTMVar tv pool {- Like commandAction, but without the concurrency. -} includeCommandAction :: CommandStart -> CommandCleanup @@ -306,7 +311,7 @@ allowConcurrentOutput a = do - - May be called repeatedly by the same thread without blocking. -} ensureOnlyActionOn :: Key -> Annex a -> Annex a -ensureOnlyActionOn k a = +ensureOnlyActionOn k a = debugLocks $ go =<< Annex.getState Annex.concurrency where go NonConcurrent = a