diff --git a/Annex/Action.hs b/Annex/Action.hs index cd124d48f7..b3ad1d9088 100644 --- a/Annex/Action.hs +++ b/Annex/Action.hs @@ -23,7 +23,10 @@ import Utility.Exception import Annex.Common import qualified Annex import Annex.Content -import Annex.Concurrent +import Annex.CatFile +import Annex.CheckAttr +import Annex.HashObject +import Annex.CheckIgnore {- Actions to perform each time ran. -} startup :: Annex () @@ -37,6 +40,14 @@ shutdown nocommit = do stopCoProcesses liftIO reapZombies -- zombies from long-running git processes +{- Stops all long-running git query processes. -} +stopCoProcesses :: Annex () +stopCoProcesses = do + catFileStop + checkAttrStop + hashObjectStop + checkIgnoreStop + {- Reaps any zombie processes that may be hanging around. - - Warning: Not thread safe. Anything that was expecting to wait diff --git a/Annex/Concurrent.hs b/Annex/Concurrent.hs index 1ff8e0c730..f8d40273bc 100644 --- a/Annex/Concurrent.hs +++ b/Annex/Concurrent.hs @@ -10,11 +10,9 @@ module Annex.Concurrent where import Annex import Annex.Common import qualified Annex.Queue -import Annex.CatFile -import Annex.CheckAttr -import Annex.HashObject -import Annex.CheckIgnore +import Annex.Action import Types.WorkerPool +import Remote.List import Control.Concurrent import Control.Concurrent.STM @@ -46,6 +44,11 @@ forkState a = do -} dupState :: Annex AnnexState dupState = do + -- Make sure that some expensive actions have been done before + -- starting threads. This way the state has them already run, + -- and each thread won't try to do them. + _ <- remoteList + st <- Annex.getState id return $ st -- each thread has its own repoqueue @@ -66,14 +69,6 @@ mergeState st = do Annex.Queue.mergeFrom st' changeState $ \s -> s { errcounter = errcounter s + errcounter st' } -{- Stops all long-running git query processes. -} -stopCoProcesses :: Annex () -stopCoProcesses = do - catFileStop - checkAttrStop - hashObjectStop - checkIgnoreStop - {- Runs an action and makes the current thread have the specified stage - while doing so. If too many other threads are running in the specified - stage, waits for one of them to become idle. diff --git a/Annex/Content.hs b/Annex/Content.hs index 84383c192b..3a4899864c 100644 --- a/Annex/Content.hs +++ b/Annex/Content.hs @@ -74,6 +74,7 @@ import Git.FilePath import Annex.Perms import Annex.Link import Annex.LockPool +import Annex.WorkerPool import Messages.Progress import Types.Remote (unVerified, Verification(..), RetrievalSecurityPolicy(..)) import qualified Types.Remote @@ -87,7 +88,6 @@ import Annex.InodeSentinal import Utility.InodeCache import Annex.Content.LowLevel import Annex.Content.PointerFile -import Annex.Concurrent import Types.WorkerPool import qualified Utility.RawFilePath as R diff --git a/Annex/Transfer.hs b/Annex/Transfer.hs index 193adf857a..61595b0ace 100644 --- a/Annex/Transfer.hs +++ b/Annex/Transfer.hs @@ -30,9 +30,9 @@ import Utility.ThreadScheduler import Annex.LockPool import Types.Key import qualified Types.Remote as Remote -import Annex.Concurrent import Types.Concurrency import Types.WorkerPool +import Annex.WorkerPool import Control.Concurrent import qualified Data.Map.Strict as M diff --git a/Annex/WorkerPool.hs b/Annex/WorkerPool.hs new file mode 100644 index 0000000000..8d6ddcd835 --- /dev/null +++ b/Annex/WorkerPool.hs @@ -0,0 +1,126 @@ +{- git-annex worker thread pool + - + - Copyright 2015-2019 Joey Hess + - + - Licensed under the GNU AGPL version 3 or higher. + -} + +module Annex.WorkerPool where + +import Annex +import Annex.Common +import Types.WorkerPool + +import Control.Concurrent +import Control.Concurrent.STM + +{- Runs an action and makes the current thread have the specified stage + - while doing so. If too many other threads are running in the specified + - stage, waits for one of them to become idle. + - + - Noop if the current thread already has the requested stage, or if the + - current thread is not in the worker pool, or if concurrency is not + - enabled. + - + - Also a noop if the stage is not one of the stages that the worker pool + - uses. + -} +enteringStage :: WorkerStage -> Annex a -> Annex a +enteringStage newstage a = Annex.getState Annex.workers >>= \case + Nothing -> a + Just tv -> do + mytid <- liftIO myThreadId + let set = changeStageTo mytid tv (const newstage) + let restore = maybe noop (void . changeStageTo mytid tv . const) + bracket set restore (const a) + +{- Transition the current thread to the initial stage. + - This is done once the thread is ready to begin work. + -} +enteringInitialStage :: Annex () +enteringInitialStage = Annex.getState Annex.workers >>= \case + Nothing -> noop + Just tv -> do + mytid <- liftIO myThreadId + void $ changeStageTo mytid tv initialStage + +{- This needs to leave the WorkerPool with the same number of + - idle and active threads, and with the same number of threads for each + - WorkerStage. So, all it can do is swap the WorkerStage of our thread's + - ActiveWorker with an IdleWorker. + - + - Must avoid a deadlock if all worker threads end up here at the same + - time, or if there are no suitable IdleWorkers left. So if necessary + - we first replace our ActiveWorker with an IdleWorker in the pool, to allow + - some other thread to use it, before waiting for a suitable IdleWorker + - for us to use. + - + - Note that the spareVals in the WorkerPool does not get anything added to + - it when adding the IdleWorker, so there will for a while be more IdleWorkers + - in the pool than spareVals. That does not prevent other threads that call + - this from using them though, so it's fine. + -} +changeStageTo :: ThreadId -> TMVar (WorkerPool AnnexState) -> (UsedStages -> WorkerStage) -> Annex (Maybe WorkerStage) +changeStageTo mytid tv getnewstage = liftIO $ + replaceidle >>= maybe + (return Nothing) + (either waitidle (return . Just)) + where + replaceidle = atomically $ do + pool <- takeTMVar tv + let newstage = getnewstage (usedStages pool) + let notchanging = do + putTMVar tv pool + return Nothing + if memberStage newstage (usedStages pool) + then case removeThreadIdWorkerPool mytid pool of + Just ((myaid, oldstage), pool') + | oldstage /= newstage -> case getIdleWorkerSlot newstage pool' of + Nothing -> do + putTMVar tv $ + addWorkerPool (IdleWorker oldstage) pool' + return $ Just $ Left (myaid, newstage, oldstage) + Just pool'' -> do + -- optimisation + putTMVar tv $ + addWorkerPool (IdleWorker oldstage) $ + addWorkerPool (ActiveWorker myaid newstage) pool'' + return $ Just $ Right oldstage + | otherwise -> notchanging + _ -> notchanging + else notchanging + + waitidle (myaid, newstage, oldstage) = atomically $ do + pool <- waitIdleWorkerSlot newstage =<< takeTMVar tv + putTMVar tv $ addWorkerPool (ActiveWorker myaid newstage) pool + return (Just oldstage) + +-- | Waits until there's an idle StartStage worker in the worker pool, +-- removes it from the pool, and returns its state. +-- +-- If the worker pool is not already allocated, returns Nothing. +waitStartWorkerSlot :: TMVar (WorkerPool Annex.AnnexState) -> STM (Maybe (Annex.AnnexState, WorkerStage)) +waitStartWorkerSlot tv = do + pool <- takeTMVar tv + st <- go pool + return $ Just (st, StartStage) + where + go pool = case spareVals pool of + [] -> retry + (v:vs) -> do + let pool' = pool { spareVals = vs } + putTMVar tv =<< waitIdleWorkerSlot StartStage pool' + return v + +waitIdleWorkerSlot :: WorkerStage -> WorkerPool Annex.AnnexState -> STM (WorkerPool Annex.AnnexState) +waitIdleWorkerSlot wantstage = maybe retry return . getIdleWorkerSlot wantstage + +getIdleWorkerSlot :: WorkerStage -> WorkerPool Annex.AnnexState -> Maybe (WorkerPool Annex.AnnexState) +getIdleWorkerSlot wantstage pool = do + l <- findidle [] (workerList pool) + return $ pool { workerList = l } + where + findidle _ [] = Nothing + findidle c ((IdleWorker stage):rest) + | stage == wantstage = Just (c ++ rest) + findidle c (w:rest) = findidle (w:c) rest diff --git a/CmdLine/Action.hs b/CmdLine/Action.hs index 67a7618e4c..2c2cb3f4d2 100644 --- a/CmdLine/Action.hs +++ b/CmdLine/Action.hs @@ -245,11 +245,6 @@ startConcurrency usedstages a = do liftIO $ setNumCapabilities n initworkerpool n = do - -- Generate the remote list now, to avoid each thread - -- generating it, which would be more expensive and - -- could cause threads to contend over eg, calls to - -- setConfig. - _ <- remoteList tv <- liftIO newEmptyTMVarIO Annex.changeState $ \s -> s { Annex.workers = Just tv } st <- dupState diff --git a/git-annex.cabal b/git-annex.cabal index 8c2e2c89d9..1bb92a2220 100644 --- a/git-annex.cabal +++ b/git-annex.cabal @@ -668,6 +668,7 @@ Executable git-annex Annex.View Annex.View.ViewedFile Annex.Wanted + Annex.WorkerPool Annex.WorkTree Annex.YoutubeDl Backend