separate queue for cleanup actions

When running multiple concurrent actions, the cleanup phase is run in a
separate queue than the main action queue. This can make some commands
faster, because less time is spent on bookkeeping in between each file
transfer.

But as far as I can see, nothing will be sped up much by this yet, because
all the existing cleanup actions are very light-weight. This is just groundwork
for deferring checksum verification to cleanup time.

This change does mean that if the user expects -J2 will mean that they see no
more than 2 jobs running at a time, they may be surprised to see 4 in some
cases (if the cleanup actions are slow enough to notice).

It might also make sense to enable background cleanup without the -J,
for at least one cleanup action. Indeed, that's the behavior that -J1
has now. At some point in the future, it make make sense to make the
behavior with no -J the same as -J1. The only reason it's not currently
is that git-annex can build w/o concurrent-output, and also any bugs
in concurrent-output (such as perhaps misbehaving on non-VT100 compatible
terminals) are avoided by default by only using it when -J is used.
This commit is contained in:
Joey Hess 2019-06-05 17:54:35 -04:00
parent c04b2af3e1
commit 659640e224
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
6 changed files with 128 additions and 46 deletions

View file

@ -142,7 +142,7 @@ data AnnexState = AnnexState
, tempurls :: M.Map Key URLString
, existinghooks :: M.Map Git.Hook.Hook Bool
, desktopnotify :: DesktopNotify
, workers :: WorkerPool AnnexState
, workers :: TMVar (WorkerPool AnnexState)
, activekeys :: TVar (M.Map Key ThreadId)
, activeremotes :: MVar (M.Map (Types.Remote.RemoteA Annex) Integer)
, keysdbhandle :: Maybe Keys.DbHandle
@ -155,6 +155,7 @@ newState :: GitConfig -> Git.Repo -> IO AnnexState
newState c r = do
emptyactiveremotes <- newMVar M.empty
emptyactivekeys <- newTVarIO M.empty
emptyworkerpool <- newTMVarIO UnallocatedWorkerPool
o <- newMessageState
sc <- newTMVarIO False
return $ AnnexState
@ -199,7 +200,7 @@ newState c r = do
, tempurls = M.empty
, existinghooks = M.empty
, desktopnotify = mempty
, workers = UnallocatedWorkerPool
, workers = emptyworkerpool
, activekeys = emptyactivekeys
, activeremotes = emptyactiveremotes
, keysdbhandle = Nothing

View file

@ -11,7 +11,6 @@ import Annex
import Annex.Common
import Annex.Action
import qualified Annex.Queue
import Types.WorkerPool
import qualified Data.Map as M
@ -43,9 +42,8 @@ dupState :: Annex AnnexState
dupState = do
st <- Annex.getState id
return $ st
{ Annex.workers = UnallocatedWorkerPool
-- each thread has its own repoqueue
, Annex.repoqueue = Nothing
{ Annex.repoqueue = Nothing
-- avoid sharing eg, open file handles
, Annex.catfilehandles = M.empty
, Annex.checkattrhandle = Nothing

View file

@ -30,6 +30,10 @@ git-annex (7.20190508) UNRELEASED; urgency=medium
security hole CVE-2018-10857 (except for configurations which enabled curl
and bypassed public IP address restrictions). Now it will work
if allowed by annex.security.allowed-ip-addresses.
* When running multiple concurrent actions, the cleanup phase is run
in a separate queue than the main action queue. This can make some
commands faster, because less time is spent on bookkeeping in
between each file transfer.
-- Joey Hess <id@joeyh.name> Mon, 06 May 2019 13:52:02 -0400

View file

@ -24,7 +24,6 @@ import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception (throwIO)
import GHC.Conc
import Data.Either
import qualified Data.Map.Strict as M
import qualified System.Console.Regions as Regions
@ -61,7 +60,9 @@ commandAction a = Annex.getState Annex.concurrency >>= \case
run = void $ includeCommandAction a
runconcurrent n = do
ws <- liftIO . drainTo (n-1) =<< Annex.getState Annex.workers
tv <- Annex.getState Annex.workers
ws <- liftIO $ drainTo (n-1) (== PerformStage)
=<< atomically (takeTMVar tv)
(st, ws') <- case ws of
UnallocatedWorkerPool -> do
-- Generate the remote list now, to avoid
@ -72,61 +73,99 @@ commandAction a = Annex.getState Annex.concurrency >>= \case
_ <- remoteList
st <- dupState
return (st, allocateWorkerPool st (n-1))
WorkerPool l -> findFreeSlot l
WorkerPool _ -> findFreeSlot (== PerformStage) ws
w <- liftIO $ async $ snd <$> Annex.run st
(inOwnConsoleRegion (Annex.output st) run)
Annex.changeState $ \s -> s
{ Annex.workers = addWorkerPool ws' (Right w) }
liftIO $ atomically $ putTMVar tv $
addWorkerPool (ActiveWorker w PerformStage) ws'
commandActions :: [CommandStart] -> Annex ()
commandActions = mapM_ commandAction
{- Waits for any forked off command actions to finish.
{- Waits for any worker threads to finish.
-
- Merge together the cleanup actions of all the AnnexStates used by
- threads, into the current Annex's state, so they'll run at shutdown.
-
- Also merge together the errcounters of the AnnexStates.
- Merge the AnnexStates used by the threads back into the current Annex's
- state.
-}
finishCommandActions :: Annex ()
finishCommandActions = do
ws <- Annex.getState Annex.workers
Annex.changeState $ \s -> s { Annex.workers = UnallocatedWorkerPool }
ws' <- liftIO $ drainTo 0 ws
forM_ (idleWorkers ws') mergeState
tv <- Annex.getState Annex.workers
let get = liftIO $ atomically $ takeTMVar tv
let put = liftIO . atomically . putTMVar tv
bracketOnError get put $ \ws -> do
ws' <- liftIO $ drainTo 0 (const True) ws
forM_ (idleWorkers ws') mergeState
put UnallocatedWorkerPool
{- Wait for jobs from the WorkerPool to complete, until
- the number of running jobs is not larger than the specified number.
- the number of running jobs of the desired stage
- is not larger than the specified number.
-
- If a job throws an exception, it is propigated, but first
- all other jobs are waited for, to allow for a clean shutdown.
-}
drainTo :: Int -> WorkerPool t -> IO (WorkerPool t)
drainTo _ UnallocatedWorkerPool = pure UnallocatedWorkerPool
drainTo sz (WorkerPool l)
drainTo :: Int -> (WorkerStage -> Bool) -> WorkerPool t -> IO (WorkerPool t)
drainTo _ _ UnallocatedWorkerPool = pure UnallocatedWorkerPool
drainTo sz wantstage (WorkerPool l)
| null as || sz >= length as = pure (WorkerPool l)
| otherwise = do
(done, ret) <- waitAnyCatch as
let as' = filter (/= done) as
(done, ret) <- waitAnyCatch (mapMaybe workerAsync as)
let (ActiveWorker _ donestage:[], as') =
partition (\w -> workerAsync w == Just done) as
case ret of
Left e -> do
void $ drainTo 0 $ WorkerPool $
map Left sts ++ map Right as'
void $ drainTo 0 (const True) $ WorkerPool $
sts ++ as' ++ otheras
throwIO e
Right st -> do
drainTo sz $ WorkerPool $
map Left (st:sts) ++ map Right as'
let w = IdleWorker st donestage
drainTo sz wantstage $ WorkerPool $
w : sts ++ as' ++ otheras
where
(sts, as) = partitionEithers l
(sts, allas) = partition isidle l
(as, otheras) = partition (wantstage . workerStage) allas
isidle (IdleWorker _ _) = True
isidle (ActiveWorker _ _) = False
findFreeSlot :: [Worker Annex.AnnexState] -> Annex (Annex.AnnexState, WorkerPool Annex.AnnexState)
findFreeSlot = go []
findFreeSlot :: (WorkerStage -> Bool) -> WorkerPool Annex.AnnexState -> Annex (Annex.AnnexState, WorkerPool Annex.AnnexState)
findFreeSlot wantstage (WorkerPool l) = go [] l
where
go c [] = do
st <- dupState
return (st, WorkerPool c)
go c (Left st:rest) = return (st, WorkerPool (c ++ rest))
go c ((IdleWorker st stage):rest) | wantstage stage =
return (st, WorkerPool (c ++ rest))
go c (v:rest) = go (v:c) rest
findFreeSlot _ UnallocatedWorkerPool = do
st <- dupState
return (st, UnallocatedWorkerPool)
{- Changes the current thread's stage in the worker pool.
-
- An idle worker with the desired stage is found in the pool
- (waiting if necessary for one to become idle)
- and the stages of it and the current thread are swapped.
-}
changeStageTo :: WorkerStage -> Annex ()
changeStageTo newstage = Annex.getState Annex.concurrency >>= \case
NonConcurrent -> noop
Concurrent n -> go n
ConcurrentPerCpu -> go =<< liftIO getNumProcessors
where
go n = do
tv <- Annex.getState Annex.workers
let get = liftIO $ atomically $ takeTMVar tv
let put = liftIO . atomically . putTMVar tv
bracketOnError get put $ \pool -> do
pool' <- liftIO $ drainTo (n-1) (== newstage) pool
(idlest, pool'') <- findFreeSlot (== newstage) pool'
mytid <- liftIO myThreadId
case removeThreadIdWorkerPool mytid pool'' of
Just ((myaid, oldstage), pool''') -> do
liftIO $ print "switching"
put $ addWorkerPool (IdleWorker idlest oldstage) $
addWorkerPool (ActiveWorker myaid newstage) pool'''
Nothing -> put pool'
{- Like commandAction, but without the concurrency. -}
includeCommandAction :: CommandStart -> CommandCleanup
@ -161,7 +200,9 @@ callCommandActionQuiet :: CommandStart -> Annex (Maybe Bool)
callCommandActionQuiet = start
where
start = stage $ maybe skip perform
perform = stage $ maybe failure cleanup
perform = stage $ maybe failure $ \a -> do
changeStageTo CleanupStage
cleanup a
cleanup = stage $ status
stage = (=<<)
skip = return Nothing

View file

@ -7,8 +7,8 @@
module Types.WorkerPool where
import Control.Concurrent
import Control.Concurrent.Async
import Data.Either
-- | Pool of worker threads.
data WorkerPool t
@ -16,15 +16,54 @@ data WorkerPool t
| WorkerPool [Worker t]
-- | A worker can either be idle or running an Async action.
type Worker t = Either t (Async t)
-- And it is used for some stage.
data Worker t
= IdleWorker t WorkerStage
| ActiveWorker (Async t) WorkerStage
-- | These correspond to CommandPerform and CommandCleanup.
data WorkerStage = PerformStage | CleanupStage
deriving (Eq)
workerStage :: Worker t -> WorkerStage
workerStage (IdleWorker _ s) = s
workerStage (ActiveWorker _ s) = s
workerAsync :: Worker t -> Maybe (Async t)
workerAsync (IdleWorker _ _) = Nothing
workerAsync (ActiveWorker aid _) = Just aid
-- | Allocates a WorkerPool that has the specified number of workers
-- in it, of each stage.
--
-- The stages are distributed evenly throughout.
allocateWorkerPool :: t -> Int -> WorkerPool t
allocateWorkerPool t n = WorkerPool $ replicate n (Left t)
allocateWorkerPool t n = WorkerPool $ take (n+n) $
map (uncurry IdleWorker) $ zip (repeat t) stages
where
stages = concat $ repeat [PerformStage, CleanupStage]
addWorkerPool :: WorkerPool t -> Worker t -> WorkerPool t
addWorkerPool (WorkerPool l) w = WorkerPool (w:l)
addWorkerPool UnallocatedWorkerPool w = WorkerPool [w]
addWorkerPool :: Worker t -> WorkerPool t -> WorkerPool t
addWorkerPool w (WorkerPool l) = WorkerPool (w:l)
addWorkerPool w UnallocatedWorkerPool = WorkerPool [w]
idleWorkers :: WorkerPool t -> [t]
idleWorkers UnallocatedWorkerPool = []
idleWorkers (WorkerPool l) = lefts l
idleWorkers (WorkerPool l) = go l
where
go [] = []
go (IdleWorker t _ : rest) = t : go rest
go (ActiveWorker _ _ : rest) = go rest
-- | Removes a worker from the pool whose Async uses the ThreadId.
--
-- Each Async has its own ThreadId, so this stops once it finds
-- a match.
removeThreadIdWorkerPool :: ThreadId -> WorkerPool t -> Maybe ((Async t, WorkerStage), WorkerPool t)
removeThreadIdWorkerPool _ UnallocatedWorkerPool = Nothing
removeThreadIdWorkerPool tid (WorkerPool l) = go [] l
where
go _ [] = Nothing
go c (ActiveWorker a stage : rest)
| asyncThreadId a == tid = Just ((a, stage), WorkerPool (c++rest))
go c (v : rest) = go (v:c) rest

View file

@ -17,8 +17,7 @@ are still some things that could be improved, tracked here:
can still end up stuck doing checksum verification at the same time,
so the pipe to the remote is not saturated.
Running cleanup actions in a separate queue from the main job queue
wouldn't be sufficient for this, because verification is done as part
of the same action that transfers content. That needs to somehow be
refactored to a cleanup action that ingests the file, and then
the cleanup action can be run in a separate queue.
Now that cleanup actions don't occupy space in the main worker queue,
all that needs to be done is make checksum verification be done as the
cleanup action. Currently, it's bundled into the same action that
transfers content.