move remoteList into dupState

This does mean that RemoteDaemon.Transport.Tor's call runs it, otherwise
no change, but this is groundwork for doing more such expensive actions
in dupState.
This commit is contained in:
Joey Hess 2020-04-17 14:36:45 -04:00
parent 988317634b
commit fe9cf1256e
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
7 changed files with 148 additions and 20 deletions

View file

@ -23,7 +23,10 @@ import Utility.Exception
import Annex.Common import Annex.Common
import qualified Annex import qualified Annex
import Annex.Content import Annex.Content
import Annex.Concurrent import Annex.CatFile
import Annex.CheckAttr
import Annex.HashObject
import Annex.CheckIgnore
{- Actions to perform each time ran. -} {- Actions to perform each time ran. -}
startup :: Annex () startup :: Annex ()
@ -37,6 +40,14 @@ shutdown nocommit = do
stopCoProcesses stopCoProcesses
liftIO reapZombies -- zombies from long-running git processes liftIO reapZombies -- zombies from long-running git processes
{- Stops all long-running git query processes. -}
stopCoProcesses :: Annex ()
stopCoProcesses = do
catFileStop
checkAttrStop
hashObjectStop
checkIgnoreStop
{- Reaps any zombie processes that may be hanging around. {- Reaps any zombie processes that may be hanging around.
- -
- Warning: Not thread safe. Anything that was expecting to wait - Warning: Not thread safe. Anything that was expecting to wait

View file

@ -10,11 +10,9 @@ module Annex.Concurrent where
import Annex import Annex
import Annex.Common import Annex.Common
import qualified Annex.Queue import qualified Annex.Queue
import Annex.CatFile import Annex.Action
import Annex.CheckAttr
import Annex.HashObject
import Annex.CheckIgnore
import Types.WorkerPool import Types.WorkerPool
import Remote.List
import Control.Concurrent import Control.Concurrent
import Control.Concurrent.STM import Control.Concurrent.STM
@ -46,6 +44,11 @@ forkState a = do
-} -}
dupState :: Annex AnnexState dupState :: Annex AnnexState
dupState = do dupState = do
-- Make sure that some expensive actions have been done before
-- starting threads. This way the state has them already run,
-- and each thread won't try to do them.
_ <- remoteList
st <- Annex.getState id st <- Annex.getState id
return $ st return $ st
-- each thread has its own repoqueue -- each thread has its own repoqueue
@ -66,14 +69,6 @@ mergeState st = do
Annex.Queue.mergeFrom st' Annex.Queue.mergeFrom st'
changeState $ \s -> s { errcounter = errcounter s + errcounter st' } changeState $ \s -> s { errcounter = errcounter s + errcounter st' }
{- Stops all long-running git query processes. -}
stopCoProcesses :: Annex ()
stopCoProcesses = do
catFileStop
checkAttrStop
hashObjectStop
checkIgnoreStop
{- Runs an action and makes the current thread have the specified stage {- Runs an action and makes the current thread have the specified stage
- while doing so. If too many other threads are running in the specified - while doing so. If too many other threads are running in the specified
- stage, waits for one of them to become idle. - stage, waits for one of them to become idle.

View file

@ -74,6 +74,7 @@ import Git.FilePath
import Annex.Perms import Annex.Perms
import Annex.Link import Annex.Link
import Annex.LockPool import Annex.LockPool
import Annex.WorkerPool
import Messages.Progress import Messages.Progress
import Types.Remote (unVerified, Verification(..), RetrievalSecurityPolicy(..)) import Types.Remote (unVerified, Verification(..), RetrievalSecurityPolicy(..))
import qualified Types.Remote import qualified Types.Remote
@ -87,7 +88,6 @@ import Annex.InodeSentinal
import Utility.InodeCache import Utility.InodeCache
import Annex.Content.LowLevel import Annex.Content.LowLevel
import Annex.Content.PointerFile import Annex.Content.PointerFile
import Annex.Concurrent
import Types.WorkerPool import Types.WorkerPool
import qualified Utility.RawFilePath as R import qualified Utility.RawFilePath as R

View file

@ -30,9 +30,9 @@ import Utility.ThreadScheduler
import Annex.LockPool import Annex.LockPool
import Types.Key import Types.Key
import qualified Types.Remote as Remote import qualified Types.Remote as Remote
import Annex.Concurrent
import Types.Concurrency import Types.Concurrency
import Types.WorkerPool import Types.WorkerPool
import Annex.WorkerPool
import Control.Concurrent import Control.Concurrent
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M

126
Annex/WorkerPool.hs Normal file
View file

@ -0,0 +1,126 @@
{- git-annex worker thread pool
-
- Copyright 2015-2019 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
module Annex.WorkerPool where
import Annex
import Annex.Common
import Types.WorkerPool
import Control.Concurrent
import Control.Concurrent.STM
{- Runs an action and makes the current thread have the specified stage
- while doing so. If too many other threads are running in the specified
- stage, waits for one of them to become idle.
-
- Noop if the current thread already has the requested stage, or if the
- current thread is not in the worker pool, or if concurrency is not
- enabled.
-
- Also a noop if the stage is not one of the stages that the worker pool
- uses.
-}
enteringStage :: WorkerStage -> Annex a -> Annex a
enteringStage newstage a = Annex.getState Annex.workers >>= \case
Nothing -> a
Just tv -> do
mytid <- liftIO myThreadId
let set = changeStageTo mytid tv (const newstage)
let restore = maybe noop (void . changeStageTo mytid tv . const)
bracket set restore (const a)
{- Transition the current thread to the initial stage.
- This is done once the thread is ready to begin work.
-}
enteringInitialStage :: Annex ()
enteringInitialStage = Annex.getState Annex.workers >>= \case
Nothing -> noop
Just tv -> do
mytid <- liftIO myThreadId
void $ changeStageTo mytid tv initialStage
{- This needs to leave the WorkerPool with the same number of
- idle and active threads, and with the same number of threads for each
- WorkerStage. So, all it can do is swap the WorkerStage of our thread's
- ActiveWorker with an IdleWorker.
-
- Must avoid a deadlock if all worker threads end up here at the same
- time, or if there are no suitable IdleWorkers left. So if necessary
- we first replace our ActiveWorker with an IdleWorker in the pool, to allow
- some other thread to use it, before waiting for a suitable IdleWorker
- for us to use.
-
- Note that the spareVals in the WorkerPool does not get anything added to
- it when adding the IdleWorker, so there will for a while be more IdleWorkers
- in the pool than spareVals. That does not prevent other threads that call
- this from using them though, so it's fine.
-}
changeStageTo :: ThreadId -> TMVar (WorkerPool AnnexState) -> (UsedStages -> WorkerStage) -> Annex (Maybe WorkerStage)
changeStageTo mytid tv getnewstage = liftIO $
replaceidle >>= maybe
(return Nothing)
(either waitidle (return . Just))
where
replaceidle = atomically $ do
pool <- takeTMVar tv
let newstage = getnewstage (usedStages pool)
let notchanging = do
putTMVar tv pool
return Nothing
if memberStage newstage (usedStages pool)
then case removeThreadIdWorkerPool mytid pool of
Just ((myaid, oldstage), pool')
| oldstage /= newstage -> case getIdleWorkerSlot newstage pool' of
Nothing -> do
putTMVar tv $
addWorkerPool (IdleWorker oldstage) pool'
return $ Just $ Left (myaid, newstage, oldstage)
Just pool'' -> do
-- optimisation
putTMVar tv $
addWorkerPool (IdleWorker oldstage) $
addWorkerPool (ActiveWorker myaid newstage) pool''
return $ Just $ Right oldstage
| otherwise -> notchanging
_ -> notchanging
else notchanging
waitidle (myaid, newstage, oldstage) = atomically $ do
pool <- waitIdleWorkerSlot newstage =<< takeTMVar tv
putTMVar tv $ addWorkerPool (ActiveWorker myaid newstage) pool
return (Just oldstage)
-- | Waits until there's an idle StartStage worker in the worker pool,
-- removes it from the pool, and returns its state.
--
-- If the worker pool is not already allocated, returns Nothing.
waitStartWorkerSlot :: TMVar (WorkerPool Annex.AnnexState) -> STM (Maybe (Annex.AnnexState, WorkerStage))
waitStartWorkerSlot tv = do
pool <- takeTMVar tv
st <- go pool
return $ Just (st, StartStage)
where
go pool = case spareVals pool of
[] -> retry
(v:vs) -> do
let pool' = pool { spareVals = vs }
putTMVar tv =<< waitIdleWorkerSlot StartStage pool'
return v
waitIdleWorkerSlot :: WorkerStage -> WorkerPool Annex.AnnexState -> STM (WorkerPool Annex.AnnexState)
waitIdleWorkerSlot wantstage = maybe retry return . getIdleWorkerSlot wantstage
getIdleWorkerSlot :: WorkerStage -> WorkerPool Annex.AnnexState -> Maybe (WorkerPool Annex.AnnexState)
getIdleWorkerSlot wantstage pool = do
l <- findidle [] (workerList pool)
return $ pool { workerList = l }
where
findidle _ [] = Nothing
findidle c ((IdleWorker stage):rest)
| stage == wantstage = Just (c ++ rest)
findidle c (w:rest) = findidle (w:c) rest

View file

@ -245,11 +245,6 @@ startConcurrency usedstages a = do
liftIO $ setNumCapabilities n liftIO $ setNumCapabilities n
initworkerpool n = do initworkerpool n = do
-- Generate the remote list now, to avoid each thread
-- generating it, which would be more expensive and
-- could cause threads to contend over eg, calls to
-- setConfig.
_ <- remoteList
tv <- liftIO newEmptyTMVarIO tv <- liftIO newEmptyTMVarIO
Annex.changeState $ \s -> s { Annex.workers = Just tv } Annex.changeState $ \s -> s { Annex.workers = Just tv }
st <- dupState st <- dupState

View file

@ -668,6 +668,7 @@ Executable git-annex
Annex.View Annex.View
Annex.View.ViewedFile Annex.View.ViewedFile
Annex.Wanted Annex.Wanted
Annex.WorkerPool
Annex.WorkTree Annex.WorkTree
Annex.YoutubeDl Annex.YoutubeDl
Backend Backend