Revert "separate queue for cleanup actions"

This reverts commit 659640e224
and 4932972487

Too early to include these in a release; they'll be de-reverted after
the release.
This commit is contained in:
Joey Hess 2019-06-12 14:47:40 -04:00
parent 6f8322b8f7
commit e07003ab73
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
5 changed files with 76 additions and 169 deletions

View file

@ -142,7 +142,7 @@ data AnnexState = AnnexState
, tempurls :: M.Map Key URLString , tempurls :: M.Map Key URLString
, existinghooks :: M.Map Git.Hook.Hook Bool , existinghooks :: M.Map Git.Hook.Hook Bool
, desktopnotify :: DesktopNotify , desktopnotify :: DesktopNotify
, workers :: TMVar (WorkerPool AnnexState) , workers :: WorkerPool AnnexState
, activekeys :: TVar (M.Map Key ThreadId) , activekeys :: TVar (M.Map Key ThreadId)
, activeremotes :: MVar (M.Map (Types.Remote.RemoteA Annex) Integer) , activeremotes :: MVar (M.Map (Types.Remote.RemoteA Annex) Integer)
, keysdbhandle :: Maybe Keys.DbHandle , keysdbhandle :: Maybe Keys.DbHandle
@ -155,7 +155,6 @@ newState :: GitConfig -> Git.Repo -> IO AnnexState
newState c r = do newState c r = do
emptyactiveremotes <- newMVar M.empty emptyactiveremotes <- newMVar M.empty
emptyactivekeys <- newTVarIO M.empty emptyactivekeys <- newTVarIO M.empty
emptyworkerpool <- newTMVarIO UnallocatedWorkerPool
o <- newMessageState o <- newMessageState
sc <- newTMVarIO False sc <- newTMVarIO False
return $ AnnexState return $ AnnexState
@ -200,7 +199,7 @@ newState c r = do
, tempurls = M.empty , tempurls = M.empty
, existinghooks = M.empty , existinghooks = M.empty
, desktopnotify = mempty , desktopnotify = mempty
, workers = emptyworkerpool , workers = UnallocatedWorkerPool
, activekeys = emptyactivekeys , activekeys = emptyactivekeys
, activeremotes = emptyactiveremotes , activeremotes = emptyactiveremotes
, keysdbhandle = Nothing , keysdbhandle = Nothing

View file

@ -11,6 +11,7 @@ import Annex
import Annex.Common import Annex.Common
import Annex.Action import Annex.Action
import qualified Annex.Queue import qualified Annex.Queue
import Types.WorkerPool
import qualified Data.Map as M import qualified Data.Map as M
@ -42,8 +43,9 @@ dupState :: Annex AnnexState
dupState = do dupState = do
st <- Annex.getState id st <- Annex.getState id
return $ st return $ st
{ Annex.workers = UnallocatedWorkerPool
-- each thread has its own repoqueue -- each thread has its own repoqueue
{ Annex.repoqueue = Nothing , Annex.repoqueue = Nothing
-- avoid sharing eg, open file handles -- avoid sharing eg, open file handles
, Annex.catfilehandles = M.empty , Annex.catfilehandles = M.empty
, Annex.checkattrhandle = Nothing , Annex.checkattrhandle = Nothing

View file

@ -30,10 +30,6 @@ git-annex (7.20190508) UNRELEASED; urgency=medium
security hole CVE-2018-10857 (except for configurations which enabled curl security hole CVE-2018-10857 (except for configurations which enabled curl
and bypassed public IP address restrictions). Now it will work and bypassed public IP address restrictions). Now it will work
if allowed by annex.security.allowed-ip-addresses. if allowed by annex.security.allowed-ip-addresses.
* When running multiple concurrent actions, the cleanup phase is run
in a separate queue than the main action queue. This can make some
commands faster, because less time is spent on bookkeeping in
between each file transfer.
-- Joey Hess <id@joeyh.name> Mon, 06 May 2019 13:52:02 -0400 -- Joey Hess <id@joeyh.name> Mon, 06 May 2019 13:52:02 -0400

View file

@ -1,11 +1,11 @@
{- git-annex command-line actions {- git-annex command-line actions
- -
- Copyright 2010-2019 Joey Hess <id@joeyh.name> - Copyright 2010-2017 Joey Hess <id@joeyh.name>
- -
- Licensed under the GNU AGPL version 3 or higher. - Licensed under the GNU AGPL version 3 or higher.
-} -}
{-# LANGUAGE CPP, BangPatterns #-} {-# LANGUAGE CPP #-}
module CmdLine.Action where module CmdLine.Action where
@ -22,7 +22,9 @@ import Remote.List
import Control.Concurrent import Control.Concurrent
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Exception (throwIO)
import GHC.Conc import GHC.Conc
import Data.Either
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
import qualified System.Console.Regions as Regions import qualified System.Console.Regions as Regions
@ -41,15 +43,12 @@ performCommandAction Command { cmdcheck = c, cmdname = name } seek cont = do
showerrcount 0 = noop showerrcount 0 = noop
showerrcount cnt = giveup $ name ++ ": " ++ show cnt ++ " failed" showerrcount cnt = giveup $ name ++ ": " ++ show cnt ++ " failed"
commandActions :: [CommandStart] -> Annex ()
commandActions = mapM_ commandAction
{- Runs one of the actions needed to perform a command. {- Runs one of the actions needed to perform a command.
- Individual actions can fail without stopping the whole command, - Individual actions can fail without stopping the whole command,
- including by throwing non-async exceptions. - including by throwing non-async exceptions.
- -
- When concurrency is enabled, a thread is forked off to run the action - When concurrency is enabled, a thread is forked off to run the action
- in the background, as soon as a free worker slot is available. - in the background, as soon as a free slot is available.
- This should only be run in the seek stage. - This should only be run in the seek stage.
-} -}
@ -62,34 +61,9 @@ commandAction a = Annex.getState Annex.concurrency >>= \case
run = void $ includeCommandAction a run = void $ includeCommandAction a
runconcurrent n = do runconcurrent n = do
tv <- Annex.getState Annex.workers ws <- liftIO . drainTo (n-1) =<< Annex.getState Annex.workers
workerst <- waitWorkerSlot n (== PerformStage) tv (st, ws') <- case ws of
void $ liftIO $ forkIO $ do UnallocatedWorkerPool -> do
aid <- async $ snd <$> Annex.run workerst
(inOwnConsoleRegion (Annex.output workerst) run)
atomically $ do
pool <- takeTMVar tv
let !pool' = addWorkerPool (ActiveWorker aid PerformStage) pool
putTMVar tv pool'
-- There won't usually be exceptions because the
-- async is running includeCommandAction, which
-- catches exceptions. Just in case, avoid
-- stalling by using the original workerst.
workerst' <- either (const workerst) id
<$> waitCatch aid
atomically $ do
pool <- takeTMVar tv
let !pool' = deactivateWorker pool aid workerst'
putTMVar tv pool'
-- | Wait until there's an idle worker in the pool, remove it from the
-- pool, and return its state.
--
-- If the pool is unallocated, it will be allocated to the specified size.
waitWorkerSlot :: Int -> (WorkerStage -> Bool) -> TMVar (WorkerPool Annex.AnnexState) -> Annex (Annex.AnnexState)
waitWorkerSlot n wantstage tv =
join $ liftIO $ atomically $ waitWorkerSlot' wantstage tv >>= \case
Nothing -> return $ do
-- Generate the remote list now, to avoid -- Generate the remote list now, to avoid
-- each thread generating it, which would -- each thread generating it, which would
-- be more expensive and could cause -- be more expensive and could cause
@ -97,74 +71,62 @@ waitWorkerSlot n wantstage tv =
-- setConfig. -- setConfig.
_ <- remoteList _ <- remoteList
st <- dupState st <- dupState
liftIO $ atomically $ do return (st, allocateWorkerPool st (n-1))
let (WorkerPool l) = allocateWorkerPool st (max n 1) WorkerPool l -> findFreeSlot l
let (st', pool) = findidle st [] l w <- liftIO $ async $ snd <$> Annex.run st
void $ swapTMVar tv pool (inOwnConsoleRegion (Annex.output st) run)
return st' Annex.changeState $ \s -> s
Just st -> return $ return st { Annex.workers = addWorkerPool ws' (Right w) }
where
findidle st _ [] = (st, WorkerPool [])
findidle _ c ((IdleWorker st stage):rest)
| wantstage stage = (st, WorkerPool (c ++ rest))
findidle st c (w:rest) = findidle st (w:c) rest
-- | STM action that waits until there's an idle worker in the worker pool. commandActions :: [CommandStart] -> Annex ()
-- commandActions = mapM_ commandAction
-- If the worker pool is not already allocated, returns Nothing.
waitWorkerSlot' :: (WorkerStage -> Bool) -> TMVar (WorkerPool Annex.AnnexState) -> STM (Maybe (Annex.AnnexState))
waitWorkerSlot' wantstage tv =
takeTMVar tv >>= \case
UnallocatedWorkerPool -> do
putTMVar tv UnallocatedWorkerPool
return Nothing
WorkerPool l -> do
(st, pool') <- findidle [] l
putTMVar tv pool'
return $ Just st
where
findidle _ [] = retry
findidle c ((IdleWorker st stage):rest)
| wantstage stage = return (st, WorkerPool (c ++ rest))
findidle c (w:rest) = findidle (w:c) rest
{- Waits for all worker threads to finish and merges their AnnexStates {- Waits for any forked off command actions to finish.
- back into the current Annex's state. -
- Merge together the cleanup actions of all the AnnexStates used by
- threads, into the current Annex's state, so they'll run at shutdown.
-
- Also merge together the errcounters of the AnnexStates.
-} -}
finishCommandActions :: Annex () finishCommandActions :: Annex ()
finishCommandActions = do finishCommandActions = do
tv <- Annex.getState Annex.workers ws <- Annex.getState Annex.workers
pool <- liftIO $ atomically $ Annex.changeState $ \s -> s { Annex.workers = UnallocatedWorkerPool }
swapTMVar tv UnallocatedWorkerPool ws' <- liftIO $ drainTo 0 ws
case pool of forM_ (idleWorkers ws') mergeState
UnallocatedWorkerPool -> noop
WorkerPool l -> forM_ (mapMaybe workerAsync l) $ \aid ->
liftIO (waitCatch aid) >>= \case
Left _ -> noop
Right st -> mergeState st
{- Changes the current thread's stage in the worker pool. {- Wait for jobs from the WorkerPool to complete, until
- the number of running jobs is not larger than the specified number.
- -
- An idle worker with the desired stage is found in the pool - If a job throws an exception, it is propigated, but first
- (waiting if necessary for one to become idle) - all other jobs are waited for, to allow for a clean shutdown.
- and the stages of it and the current thread are swapped.
-} -}
changeStageTo :: WorkerStage -> Annex () drainTo :: Int -> WorkerPool t -> IO (WorkerPool t)
changeStageTo newstage = do drainTo _ UnallocatedWorkerPool = pure UnallocatedWorkerPool
mytid <- liftIO myThreadId drainTo sz (WorkerPool l)
tv <- Annex.getState Annex.workers | null as || sz >= length as = pure (WorkerPool l)
liftIO $ atomically $ waitWorkerSlot' (== newstage) tv >>= \case | otherwise = do
Just idlest -> do (done, ret) <- waitAnyCatch as
pool <- takeTMVar tv let as' = filter (/= done) as
let pool' = case removeThreadIdWorkerPool mytid pool of case ret of
Just ((myaid, oldstage), p) -> Left e -> do
addWorkerPool (IdleWorker idlest oldstage) $ void $ drainTo 0 $ WorkerPool $
addWorkerPool (ActiveWorker myaid newstage) p map Left sts ++ map Right as'
Nothing -> pool throwIO e
putTMVar tv pool' Right st -> do
-- No worker pool is allocated, not running in concurrent drainTo sz $ WorkerPool $
-- mode. map Left (st:sts) ++ map Right as'
Nothing -> noop where
(sts, as) = partitionEithers l
findFreeSlot :: [Worker Annex.AnnexState] -> Annex (Annex.AnnexState, WorkerPool Annex.AnnexState)
findFreeSlot = go []
where
go c [] = do
st <- dupState
return (st, WorkerPool c)
go c (Left st:rest) = return (st, WorkerPool (c ++ rest))
go c (v:rest) = go (v:c) rest
{- Like commandAction, but without the concurrency. -} {- Like commandAction, but without the concurrency. -}
includeCommandAction :: CommandStart -> CommandCleanup includeCommandAction :: CommandStart -> CommandCleanup
@ -199,9 +161,7 @@ callCommandActionQuiet :: CommandStart -> Annex (Maybe Bool)
callCommandActionQuiet = start callCommandActionQuiet = start
where where
start = stage $ maybe skip perform start = stage $ maybe skip perform
perform = stage $ maybe failure $ \a -> do perform = stage $ maybe failure cleanup
changeStageTo CleanupStage
cleanup a
cleanup = stage $ status cleanup = stage $ status
stage = (=<<) stage = (=<<)
skip = return Nothing skip = return Nothing

View file

@ -7,8 +7,8 @@
module Types.WorkerPool where module Types.WorkerPool where
import Control.Concurrent
import Control.Concurrent.Async import Control.Concurrent.Async
import Data.Either
-- | Pool of worker threads. -- | Pool of worker threads.
data WorkerPool t data WorkerPool t
@ -16,65 +16,15 @@ data WorkerPool t
| WorkerPool [Worker t] | WorkerPool [Worker t]
-- | A worker can either be idle or running an Async action. -- | A worker can either be idle or running an Async action.
-- And it is used for some stage. type Worker t = Either t (Async t)
data Worker t
= IdleWorker t WorkerStage
| ActiveWorker (Async t) WorkerStage
-- | These correspond to CommandPerform and CommandCleanup.
data WorkerStage = PerformStage | CleanupStage
deriving (Eq)
workerStage :: Worker t -> WorkerStage
workerStage (IdleWorker _ s) = s
workerStage (ActiveWorker _ s) = s
workerAsync :: Worker t -> Maybe (Async t)
workerAsync (IdleWorker _ _) = Nothing
workerAsync (ActiveWorker aid _) = Just aid
-- | Allocates a WorkerPool that has the specified number of workers
-- in it, of each stage.
--
-- The stages are distributed evenly throughout.
allocateWorkerPool :: t -> Int -> WorkerPool t allocateWorkerPool :: t -> Int -> WorkerPool t
allocateWorkerPool t n = WorkerPool $ take (n+n) $ allocateWorkerPool t n = WorkerPool $ replicate n (Left t)
map (uncurry IdleWorker) $ zip (repeat t) stages
where
stages = concat $ repeat [PerformStage, CleanupStage]
addWorkerPool :: Worker t -> WorkerPool t -> WorkerPool t addWorkerPool :: WorkerPool t -> Worker t -> WorkerPool t
addWorkerPool w (WorkerPool l) = WorkerPool (w:l) addWorkerPool (WorkerPool l) w = WorkerPool (w:l)
addWorkerPool w UnallocatedWorkerPool = WorkerPool [w] addWorkerPool UnallocatedWorkerPool w = WorkerPool [w]
idleWorkers :: WorkerPool t -> [t] idleWorkers :: WorkerPool t -> [t]
idleWorkers UnallocatedWorkerPool = [] idleWorkers UnallocatedWorkerPool = []
idleWorkers (WorkerPool l) = go l idleWorkers (WorkerPool l) = lefts l
where
go [] = []
go (IdleWorker t _ : rest) = t : go rest
go (ActiveWorker _ _ : rest) = go rest
-- | Removes a worker from the pool whose Async uses the ThreadId.
--
-- Each Async has its own ThreadId, so this stops once it finds
-- a match.
removeThreadIdWorkerPool :: ThreadId -> WorkerPool t -> Maybe ((Async t, WorkerStage), WorkerPool t)
removeThreadIdWorkerPool _ UnallocatedWorkerPool = Nothing
removeThreadIdWorkerPool tid (WorkerPool l) = go [] l
where
go _ [] = Nothing
go c (ActiveWorker a stage : rest)
| asyncThreadId a == tid = Just ((a, stage), WorkerPool (c++rest))
go c (v : rest) = go (v:c) rest
deactivateWorker :: WorkerPool t -> Async t -> t -> WorkerPool t
deactivateWorker UnallocatedWorkerPool _ _ = UnallocatedWorkerPool
deactivateWorker (WorkerPool l) aid t = WorkerPool $ go l
where
go [] = []
go (w@(IdleWorker _ _) : rest) = w : go rest
go (w@(ActiveWorker a st) : rest)
| a == aid = IdleWorker t st : rest
| otherwise = w : go rest