speed up enteringStage in non-concurrent mode
Avoid a STM transaction. Also got rid of UnallocatedWorkerPool.
This commit is contained in:
		
					parent
					
						
							
								05a908c3c9
							
						
					
				
			
			
				commit
				
					
						9671248fff
					
				
			
		
					 4 changed files with 29 additions and 39 deletions
				
			
		
							
								
								
									
										5
									
								
								Annex.hs
									
										
									
									
									
								
							
							
						
						
									
										5
									
								
								Annex.hs
									
										
									
									
									
								
							| 
						 | 
				
			
			@ -142,7 +142,7 @@ data AnnexState = AnnexState
 | 
			
		|||
	, tempurls :: M.Map Key URLString
 | 
			
		||||
	, existinghooks :: M.Map Git.Hook.Hook Bool
 | 
			
		||||
	, desktopnotify :: DesktopNotify
 | 
			
		||||
	, workers :: TMVar (WorkerPool AnnexState)
 | 
			
		||||
	, workers :: Maybe (TMVar (WorkerPool AnnexState))
 | 
			
		||||
	, activekeys :: TVar (M.Map Key ThreadId)
 | 
			
		||||
	, activeremotes :: MVar (M.Map (Types.Remote.RemoteA Annex) Integer)
 | 
			
		||||
	, keysdbhandle :: Maybe Keys.DbHandle
 | 
			
		||||
| 
						 | 
				
			
			@ -155,7 +155,6 @@ newState :: GitConfig -> Git.Repo -> IO AnnexState
 | 
			
		|||
newState c r = do
 | 
			
		||||
	emptyactiveremotes <- newMVar M.empty
 | 
			
		||||
	emptyactivekeys <- newTVarIO M.empty
 | 
			
		||||
	emptyworkerpool <- newTMVarIO UnallocatedWorkerPool
 | 
			
		||||
	o <- newMessageState
 | 
			
		||||
	sc <- newTMVarIO False
 | 
			
		||||
	return $ AnnexState
 | 
			
		||||
| 
						 | 
				
			
			@ -200,7 +199,7 @@ newState c r = do
 | 
			
		|||
		, tempurls = M.empty
 | 
			
		||||
		, existinghooks = M.empty
 | 
			
		||||
		, desktopnotify = mempty
 | 
			
		||||
		, workers = emptyworkerpool
 | 
			
		||||
		, workers = Nothing
 | 
			
		||||
		, activekeys = emptyactivekeys
 | 
			
		||||
		, activeremotes = emptyactiveremotes
 | 
			
		||||
		, keysdbhandle = Nothing
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -91,9 +91,10 @@ stopCoProcesses = do
 | 
			
		|||
 - of it and the current thread are swapped.
 | 
			
		||||
 -}
 | 
			
		||||
enteringStage :: WorkerStage -> Annex a -> Annex a
 | 
			
		||||
enteringStage newstage a = do
 | 
			
		||||
enteringStage newstage a = Annex.getState Annex.workers >>= \case
 | 
			
		||||
	Nothing -> a
 | 
			
		||||
	Just tv -> do
 | 
			
		||||
		mytid <- liftIO myThreadId
 | 
			
		||||
	tv <- Annex.getState Annex.workers
 | 
			
		||||
		let set = changeStageTo mytid tv newstage
 | 
			
		||||
		let restore = maybe noop (void . changeStageTo mytid tv)
 | 
			
		||||
		bracket set restore (const a)
 | 
			
		||||
| 
						 | 
				
			
			@ -124,12 +125,8 @@ changeStageTo mytid tv newstage = liftIO $ atomically $ do
 | 
			
		|||
--
 | 
			
		||||
-- If the worker pool is not already allocated, returns Nothing.
 | 
			
		||||
waitInitialWorkerSlot :: TMVar (WorkerPool Annex.AnnexState) -> STM (Maybe (Annex.AnnexState, WorkerStage))
 | 
			
		||||
waitInitialWorkerSlot tv =
 | 
			
		||||
	takeTMVar tv >>= \case
 | 
			
		||||
		UnallocatedWorkerPool -> do
 | 
			
		||||
			putTMVar tv UnallocatedWorkerPool 
 | 
			
		||||
			return Nothing
 | 
			
		||||
		WorkerPool usedstages l -> do
 | 
			
		||||
waitInitialWorkerSlot tv = do
 | 
			
		||||
	WorkerPool usedstages l <- takeTMVar tv
 | 
			
		||||
	let stage = initialStage usedstages
 | 
			
		||||
	(st, pool') <- waitWorkerSlot usedstages stage l
 | 
			
		||||
	putTMVar tv pool'
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -60,8 +60,9 @@ commandAction start = Annex.getState Annex.concurrency >>= \case
 | 
			
		|||
	ConcurrentPerCpu -> runconcurrent
 | 
			
		||||
  where
 | 
			
		||||
	runnonconcurrent = void $ includeCommandAction start
 | 
			
		||||
	runconcurrent = do
 | 
			
		||||
		tv <- Annex.getState Annex.workers
 | 
			
		||||
	runconcurrent = Annex.getState Annex.workers >>= \case
 | 
			
		||||
		Nothing -> runnonconcurrent
 | 
			
		||||
		Just tv -> 
 | 
			
		||||
			liftIO (atomically (waitInitialWorkerSlot tv)) >>=
 | 
			
		||||
				maybe runnonconcurrent (runconcurrent' tv)
 | 
			
		||||
	runconcurrent' tv (workerst, workerstage) = do
 | 
			
		||||
| 
						 | 
				
			
			@ -124,13 +125,12 @@ commandAction start = Annex.getState Annex.concurrency >>= \case
 | 
			
		|||
 - back into the current Annex's state.
 | 
			
		||||
 -}
 | 
			
		||||
finishCommandActions :: Annex ()
 | 
			
		||||
finishCommandActions = do
 | 
			
		||||
	tv <- Annex.getState Annex.workers
 | 
			
		||||
	pool <- liftIO $ atomically $
 | 
			
		||||
		swapTMVar tv UnallocatedWorkerPool
 | 
			
		||||
	case pool of
 | 
			
		||||
		UnallocatedWorkerPool -> noop
 | 
			
		||||
		WorkerPool _ l -> forM_ (mapMaybe workerAsync l) $ \aid ->
 | 
			
		||||
finishCommandActions = Annex.getState Annex.workers >>= \case
 | 
			
		||||
	Nothing -> noop
 | 
			
		||||
	Just tv -> do
 | 
			
		||||
		Annex.changeState $ \s -> s { Annex.workers = Nothing }
 | 
			
		||||
		WorkerPool _ l <- liftIO $ atomically $ takeTMVar tv
 | 
			
		||||
		forM_ (mapMaybe workerAsync l) $ \aid ->
 | 
			
		||||
			liftIO (waitCatch aid) >>= \case
 | 
			
		||||
				Left _ -> noop
 | 
			
		||||
				Right st -> mergeState st
 | 
			
		||||
| 
						 | 
				
			
			@ -248,9 +248,9 @@ startConcurrency usedstages a = do
 | 
			
		|||
		-- setConfig.
 | 
			
		||||
		_ <- remoteList
 | 
			
		||||
		st <- dupState
 | 
			
		||||
		tv <- Annex.getState Annex.workers
 | 
			
		||||
		liftIO $ atomically $ putTMVar tv $
 | 
			
		||||
		tv <- liftIO $ newTMVarIO $
 | 
			
		||||
			allocateWorkerPool st (max n 1) usedstages
 | 
			
		||||
		Annex.changeState $ \s -> s { Annex.workers = Just tv }
 | 
			
		||||
 | 
			
		||||
{- Ensures that only one thread processes a key at a time.
 | 
			
		||||
 - Other threads will block until it's done.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -12,9 +12,7 @@ import Control.Concurrent.Async
 | 
			
		|||
import qualified Data.Set as S
 | 
			
		||||
 | 
			
		||||
-- | Pool of worker threads. 
 | 
			
		||||
data WorkerPool t
 | 
			
		||||
	= UnallocatedWorkerPool
 | 
			
		||||
	| WorkerPool UsedStages [Worker t]
 | 
			
		||||
data WorkerPool t = WorkerPool UsedStages [Worker t]
 | 
			
		||||
	deriving (Show)
 | 
			
		||||
 | 
			
		||||
-- | A worker can either be idle or running an Async action.
 | 
			
		||||
| 
						 | 
				
			
			@ -95,10 +93,8 @@ allocateWorkerPool t n u = WorkerPool u $ take (n+n) $
 | 
			
		|||
 | 
			
		||||
addWorkerPool :: Worker t -> WorkerPool t -> WorkerPool t
 | 
			
		||||
addWorkerPool w (WorkerPool u l) = WorkerPool u (w:l)
 | 
			
		||||
addWorkerPool _ UnallocatedWorkerPool = UnallocatedWorkerPool
 | 
			
		||||
 | 
			
		||||
idleWorkers :: WorkerPool t -> [t]
 | 
			
		||||
idleWorkers UnallocatedWorkerPool = []
 | 
			
		||||
idleWorkers (WorkerPool _ l) = go l
 | 
			
		||||
  where
 | 
			
		||||
	go [] = []
 | 
			
		||||
| 
						 | 
				
			
			@ -110,7 +106,6 @@ idleWorkers (WorkerPool _ l) = go l
 | 
			
		|||
-- Each Async has its own ThreadId, so this stops once it finds
 | 
			
		||||
-- a match.
 | 
			
		||||
removeThreadIdWorkerPool :: ThreadId -> WorkerPool t -> Maybe ((Async t, WorkerStage), WorkerPool t)
 | 
			
		||||
removeThreadIdWorkerPool _  UnallocatedWorkerPool = Nothing
 | 
			
		||||
removeThreadIdWorkerPool tid (WorkerPool u l) = go [] l
 | 
			
		||||
  where
 | 
			
		||||
	go _ [] = Nothing
 | 
			
		||||
| 
						 | 
				
			
			@ -119,7 +114,6 @@ removeThreadIdWorkerPool tid (WorkerPool u l) = go [] l
 | 
			
		|||
	go c (v : rest) = go (v:c) rest
 | 
			
		||||
 | 
			
		||||
deactivateWorker :: WorkerPool t -> Async t -> t -> WorkerPool t
 | 
			
		||||
deactivateWorker UnallocatedWorkerPool _ _ = UnallocatedWorkerPool
 | 
			
		||||
deactivateWorker (WorkerPool u l) aid t = WorkerPool u $ go l
 | 
			
		||||
  where
 | 
			
		||||
	go [] = []
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue