2015-04-10 21:53:58 +00:00
|
|
|
{- git-annex concurrent state
|
|
|
|
-
|
2020-04-20 17:53:27 +00:00
|
|
|
- Copyright 2015-2020 Joey Hess <id@joeyh.name>
|
2015-04-10 21:53:58 +00:00
|
|
|
-
|
2019-03-13 19:48:14 +00:00
|
|
|
- Licensed under the GNU AGPL version 3 or higher.
|
2015-04-10 21:53:58 +00:00
|
|
|
-}
|
|
|
|
|
|
|
|
module Annex.Concurrent where
|
|
|
|
|
|
|
|
import Annex
|
2017-09-30 02:36:08 +00:00
|
|
|
import Annex.Common
|
2015-11-05 22:21:48 +00:00
|
|
|
import qualified Annex.Queue
|
2020-04-17 18:36:45 +00:00
|
|
|
import Annex.Action
|
2020-04-20 17:53:27 +00:00
|
|
|
import Types.Concurrency
|
2019-06-19 16:35:08 +00:00
|
|
|
import Types.WorkerPool
|
2020-04-20 17:53:27 +00:00
|
|
|
import Types.CatFileHandles
|
check-attr resource pool
Limited to min of -JN or number of CPU cores, because it will often be
CPU bound, once it's read the gitignore file for a directory.
In some situations it's more disk bound, but in any case it's unlikely
to be the main bottleneck that -J is used to avoid. Eg, when dropping,
this is used for numcopies checks, but the main bottleneck will be
accessing the remotes to verify presence. So the user might decide to
-J32 that, but having 32 check-attr processes would just waste however
many filehandles they open, and probably worsen their performance due to
CPU contention.
Note that, I first tried just letting up to the -JN be started. However,
even when it's no bottleneck at all, that still results in all of them
being started. Why? Well, all the worker threads start up nearly
simulantaneously, so there's a thundering herd..
2020-04-21 14:38:44 +00:00
|
|
|
import Annex.CheckAttr
|
2020-04-21 15:20:10 +00:00
|
|
|
import Annex.CheckIgnore
|
2020-04-17 18:36:45 +00:00
|
|
|
import Remote.List
|
2015-04-10 21:53:58 +00:00
|
|
|
|
2019-06-19 16:35:08 +00:00
|
|
|
import Control.Concurrent
|
|
|
|
import Control.Concurrent.STM
|
2015-04-10 21:53:58 +00:00
|
|
|
import qualified Data.Map as M
|
|
|
|
|
2020-04-20 17:53:27 +00:00
|
|
|
setConcurrency :: Concurrency -> Annex ()
|
|
|
|
setConcurrency NonConcurrent = Annex.changeState $ \s -> s
|
|
|
|
{ Annex.concurrency = NonConcurrent
|
|
|
|
}
|
|
|
|
setConcurrency c = do
|
|
|
|
cfh <- Annex.getState Annex.catfilehandles
|
|
|
|
cfh' <- case cfh of
|
|
|
|
CatFileHandlesNonConcurrent _ -> liftIO catFileHandlesPool
|
|
|
|
CatFileHandlesPool _ -> pure cfh
|
check-attr resource pool
Limited to min of -JN or number of CPU cores, because it will often be
CPU bound, once it's read the gitignore file for a directory.
In some situations it's more disk bound, but in any case it's unlikely
to be the main bottleneck that -J is used to avoid. Eg, when dropping,
this is used for numcopies checks, but the main bottleneck will be
accessing the remotes to verify presence. So the user might decide to
-J32 that, but having 32 check-attr processes would just waste however
many filehandles they open, and probably worsen their performance due to
CPU contention.
Note that, I first tried just letting up to the -JN be started. However,
even when it's no bottleneck at all, that still results in all of them
being started. Why? Well, all the worker threads start up nearly
simulantaneously, so there's a thundering herd..
2020-04-21 14:38:44 +00:00
|
|
|
cah <- mkConcurrentCheckAttrHandle c
|
2020-04-21 15:20:10 +00:00
|
|
|
cih <- mkConcurrentCheckIgnoreHandle c
|
2020-04-20 17:53:27 +00:00
|
|
|
Annex.changeState $ \s -> s
|
|
|
|
{ Annex.concurrency = c
|
|
|
|
, Annex.catfilehandles = cfh'
|
check-attr resource pool
Limited to min of -JN or number of CPU cores, because it will often be
CPU bound, once it's read the gitignore file for a directory.
In some situations it's more disk bound, but in any case it's unlikely
to be the main bottleneck that -J is used to avoid. Eg, when dropping,
this is used for numcopies checks, but the main bottleneck will be
accessing the remotes to verify presence. So the user might decide to
-J32 that, but having 32 check-attr processes would just waste however
many filehandles they open, and probably worsen their performance due to
CPU contention.
Note that, I first tried just letting up to the -JN be started. However,
even when it's no bottleneck at all, that still results in all of them
being started. Why? Well, all the worker threads start up nearly
simulantaneously, so there's a thundering herd..
2020-04-21 14:38:44 +00:00
|
|
|
, Annex.checkattrhandle = Just cah
|
2020-04-21 15:20:10 +00:00
|
|
|
, Annex.checkignorehandle = Just cih
|
2020-04-20 17:53:27 +00:00
|
|
|
}
|
|
|
|
|
2015-04-10 21:53:58 +00:00
|
|
|
{- Allows forking off a thread that uses a copy of the current AnnexState
|
|
|
|
- to run an Annex action.
|
|
|
|
-
|
|
|
|
- The returned IO action can be used to start the thread.
|
|
|
|
- It returns an Annex action that must be run in the original
|
|
|
|
- calling context to merge the forked AnnexState back into the
|
|
|
|
- current AnnexState.
|
|
|
|
-}
|
|
|
|
forkState :: Annex a -> Annex (IO (Annex a))
|
|
|
|
forkState a = do
|
|
|
|
st <- dupState
|
|
|
|
return $ do
|
|
|
|
(ret, newst) <- run st a
|
|
|
|
return $ do
|
|
|
|
mergeState newst
|
|
|
|
return ret
|
|
|
|
|
|
|
|
{- Returns a copy of the current AnnexState that is safe to be
|
|
|
|
- used when forking off a thread.
|
|
|
|
-
|
|
|
|
- After an Annex action is run using this AnnexState, it
|
|
|
|
- should be merged back into the current Annex's state,
|
|
|
|
- by calling mergeState.
|
|
|
|
-}
|
|
|
|
dupState :: Annex AnnexState
|
|
|
|
dupState = do
|
2020-04-17 18:36:45 +00:00
|
|
|
-- Make sure that some expensive actions have been done before
|
|
|
|
-- starting threads. This way the state has them already run,
|
|
|
|
-- and each thread won't try to do them.
|
|
|
|
_ <- remoteList
|
|
|
|
|
2015-04-10 21:53:58 +00:00
|
|
|
st <- Annex.getState id
|
2020-04-20 17:53:27 +00:00
|
|
|
-- Make sure that concurrency is enabled, if it was not already,
|
check-attr resource pool
Limited to min of -JN or number of CPU cores, because it will often be
CPU bound, once it's read the gitignore file for a directory.
In some situations it's more disk bound, but in any case it's unlikely
to be the main bottleneck that -J is used to avoid. Eg, when dropping,
this is used for numcopies checks, but the main bottleneck will be
accessing the remotes to verify presence. So the user might decide to
-J32 that, but having 32 check-attr processes would just waste however
many filehandles they open, and probably worsen their performance due to
CPU contention.
Note that, I first tried just letting up to the -JN be started. However,
even when it's no bottleneck at all, that still results in all of them
being started. Why? Well, all the worker threads start up nearly
simulantaneously, so there's a thundering herd..
2020-04-21 14:38:44 +00:00
|
|
|
-- so the concurrency-safe resource pools are set up.
|
2020-04-20 17:53:27 +00:00
|
|
|
st' <- case Annex.concurrency st of
|
|
|
|
NonConcurrent -> do
|
|
|
|
setConcurrency (Concurrent 1)
|
|
|
|
Annex.getState id
|
|
|
|
_ -> return st
|
|
|
|
return $ st'
|
2019-05-06 19:15:12 +00:00
|
|
|
-- each thread has its own repoqueue
|
2019-06-05 21:54:35 +00:00
|
|
|
{ Annex.repoqueue = Nothing
|
2020-07-19 22:31:25 +00:00
|
|
|
-- no errors from this thread yet
|
|
|
|
, Annex.errcounter = 0
|
2015-04-10 21:53:58 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
{- Merges the passed AnnexState into the current Annex state.
|
2015-11-04 18:52:07 +00:00
|
|
|
- Also closes various handles in it. -}
|
2015-04-10 21:53:58 +00:00
|
|
|
mergeState :: AnnexState -> Annex ()
|
|
|
|
mergeState st = do
|
2017-02-17 18:30:18 +00:00
|
|
|
st' <- liftIO $ snd <$> run st stopCoProcesses
|
2015-04-10 21:53:58 +00:00
|
|
|
forM_ (M.toList $ Annex.cleanup st') $
|
|
|
|
uncurry addCleanup
|
2015-11-05 22:21:48 +00:00
|
|
|
Annex.Queue.mergeFrom st'
|
2015-04-10 21:53:58 +00:00
|
|
|
changeState $ \s -> s { errcounter = errcounter s + errcounter st' }
|
2019-06-19 16:35:08 +00:00
|
|
|
|
|
|
|
{- Runs an action and makes the current thread have the specified stage
|
|
|
|
- while doing so. If too many other threads are running in the specified
|
|
|
|
- stage, waits for one of them to become idle.
|
|
|
|
-
|
|
|
|
- Noop if the current thread already has the requested stage, or if the
|
|
|
|
- current thread is not in the worker pool, or if concurrency is not
|
|
|
|
- enabled.
|
|
|
|
-
|
|
|
|
- Also a noop if the stage is not one of the stages that the worker pool
|
|
|
|
- uses.
|
|
|
|
-}
|
|
|
|
enteringStage :: WorkerStage -> Annex a -> Annex a
|
2019-06-19 19:47:54 +00:00
|
|
|
enteringStage newstage a = Annex.getState Annex.workers >>= \case
|
|
|
|
Nothing -> a
|
|
|
|
Just tv -> do
|
|
|
|
mytid <- liftIO myThreadId
|
2019-11-14 15:31:43 +00:00
|
|
|
let set = changeStageTo mytid tv (const newstage)
|
|
|
|
let restore = maybe noop (void . changeStageTo mytid tv . const)
|
2019-06-19 19:47:54 +00:00
|
|
|
bracket set restore (const a)
|
2019-06-19 16:35:08 +00:00
|
|
|
|
2019-11-14 15:31:43 +00:00
|
|
|
{- Transition the current thread to the initial stage.
|
|
|
|
- This is done once the thread is ready to begin work.
|
|
|
|
-}
|
|
|
|
enteringInitialStage :: Annex ()
|
|
|
|
enteringInitialStage = Annex.getState Annex.workers >>= \case
|
|
|
|
Nothing -> noop
|
|
|
|
Just tv -> do
|
|
|
|
mytid <- liftIO myThreadId
|
|
|
|
void $ changeStageTo mytid tv initialStage
|
|
|
|
|
2019-06-19 22:07:30 +00:00
|
|
|
{- This needs to leave the WorkerPool with the same number of
|
|
|
|
- idle and active threads, and with the same number of threads for each
|
|
|
|
- WorkerStage. So, all it can do is swap the WorkerStage of our thread's
|
|
|
|
- ActiveWorker with an IdleWorker.
|
|
|
|
-
|
|
|
|
- Must avoid a deadlock if all worker threads end up here at the same
|
2019-06-20 00:13:19 +00:00
|
|
|
- time, or if there are no suitable IdleWorkers left. So if necessary
|
|
|
|
- we first replace our ActiveWorker with an IdleWorker in the pool, to allow
|
2019-06-19 22:07:30 +00:00
|
|
|
- some other thread to use it, before waiting for a suitable IdleWorker
|
|
|
|
- for us to use.
|
|
|
|
-
|
|
|
|
- Note that the spareVals in the WorkerPool does not get anything added to
|
|
|
|
- it when adding the IdleWorker, so there will for a while be more IdleWorkers
|
|
|
|
- in the pool than spareVals. That does not prevent other threads that call
|
|
|
|
- this from using them though, so it's fine.
|
|
|
|
-}
|
2019-11-14 15:31:43 +00:00
|
|
|
changeStageTo :: ThreadId -> TMVar (WorkerPool AnnexState) -> (UsedStages -> WorkerStage) -> Annex (Maybe WorkerStage)
|
|
|
|
changeStageTo mytid tv getnewstage = liftIO $
|
2019-06-20 00:13:19 +00:00
|
|
|
replaceidle >>= maybe
|
|
|
|
(return Nothing)
|
|
|
|
(either waitidle (return . Just))
|
2019-06-19 22:07:30 +00:00
|
|
|
where
|
|
|
|
replaceidle = atomically $ do
|
|
|
|
pool <- takeTMVar tv
|
2019-11-14 15:31:43 +00:00
|
|
|
let newstage = getnewstage (usedStages pool)
|
2019-06-20 00:19:38 +00:00
|
|
|
let notchanging = do
|
|
|
|
putTMVar tv pool
|
|
|
|
return Nothing
|
2019-06-19 22:07:30 +00:00
|
|
|
if memberStage newstage (usedStages pool)
|
|
|
|
then case removeThreadIdWorkerPool mytid pool of
|
|
|
|
Just ((myaid, oldstage), pool')
|
2019-06-20 00:13:19 +00:00
|
|
|
| oldstage /= newstage -> case getIdleWorkerSlot newstage pool' of
|
|
|
|
Nothing -> do
|
|
|
|
putTMVar tv $
|
|
|
|
addWorkerPool (IdleWorker oldstage) pool'
|
2019-11-14 15:31:43 +00:00
|
|
|
return $ Just $ Left (myaid, newstage, oldstage)
|
2019-06-20 00:13:19 +00:00
|
|
|
Just pool'' -> do
|
|
|
|
-- optimisation
|
|
|
|
putTMVar tv $
|
|
|
|
addWorkerPool (IdleWorker oldstage) $
|
|
|
|
addWorkerPool (ActiveWorker myaid newstage) pool''
|
|
|
|
return $ Just $ Right oldstage
|
2019-06-20 00:19:38 +00:00
|
|
|
| otherwise -> notchanging
|
|
|
|
_ -> notchanging
|
|
|
|
else notchanging
|
2019-06-19 22:07:30 +00:00
|
|
|
|
2019-11-14 15:31:43 +00:00
|
|
|
waitidle (myaid, newstage, oldstage) = atomically $ do
|
2019-06-19 22:07:30 +00:00
|
|
|
pool <- waitIdleWorkerSlot newstage =<< takeTMVar tv
|
|
|
|
putTMVar tv $ addWorkerPool (ActiveWorker myaid newstage) pool
|
|
|
|
return (Just oldstage)
|
2019-06-19 16:35:08 +00:00
|
|
|
|
2019-11-14 15:31:43 +00:00
|
|
|
-- | Waits until there's an idle StartStage worker in the worker pool,
|
|
|
|
-- removes it from the pool, and returns its state.
|
2019-06-19 16:35:08 +00:00
|
|
|
--
|
|
|
|
-- If the worker pool is not already allocated, returns Nothing.
|
2019-11-14 15:31:43 +00:00
|
|
|
waitStartWorkerSlot :: TMVar (WorkerPool Annex.AnnexState) -> STM (Maybe (Annex.AnnexState, WorkerStage))
|
|
|
|
waitStartWorkerSlot tv = do
|
2019-06-19 22:07:30 +00:00
|
|
|
pool <- takeTMVar tv
|
2019-11-14 15:31:43 +00:00
|
|
|
st <- go pool
|
|
|
|
return $ Just (st, StartStage)
|
2019-06-19 22:07:30 +00:00
|
|
|
where
|
2019-11-14 15:31:43 +00:00
|
|
|
go pool = case spareVals pool of
|
2019-06-19 22:07:30 +00:00
|
|
|
[] -> retry
|
|
|
|
(v:vs) -> do
|
|
|
|
let pool' = pool { spareVals = vs }
|
2019-11-14 15:31:43 +00:00
|
|
|
putTMVar tv =<< waitIdleWorkerSlot StartStage pool'
|
2019-06-19 22:07:30 +00:00
|
|
|
return v
|
2019-06-19 16:35:08 +00:00
|
|
|
|
2019-06-19 22:07:30 +00:00
|
|
|
waitIdleWorkerSlot :: WorkerStage -> WorkerPool Annex.AnnexState -> STM (WorkerPool Annex.AnnexState)
|
2019-06-20 00:13:19 +00:00
|
|
|
waitIdleWorkerSlot wantstage = maybe retry return . getIdleWorkerSlot wantstage
|
|
|
|
|
|
|
|
getIdleWorkerSlot :: WorkerStage -> WorkerPool Annex.AnnexState -> Maybe (WorkerPool Annex.AnnexState)
|
|
|
|
getIdleWorkerSlot wantstage pool = do
|
2019-06-19 22:07:30 +00:00
|
|
|
l <- findidle [] (workerList pool)
|
|
|
|
return $ pool { workerList = l }
|
2019-06-19 16:35:08 +00:00
|
|
|
where
|
2019-06-20 00:13:19 +00:00
|
|
|
findidle _ [] = Nothing
|
|
|
|
findidle c ((IdleWorker stage):rest)
|
|
|
|
| stage == wantstage = Just (c ++ rest)
|
2019-06-19 16:35:08 +00:00
|
|
|
findidle c (w:rest) = findidle (w:c) rest
|