improve git command queue flushing with time limit

So that eg, addurl of several large files that take time to download will
update the index for each file, rather than deferring the index updates to
the end.

In cases like an add of many smallish files, where a new file is being
added every few seconds. In that case, the queue will still build up a
lot of changes which are flushed at once, for best performance. Since
the default queue size is 10240, often it only gets flushed once at the
end, same as before. (Notice that updateQueue updated _lastchanged
when adding a new item to the queue without flushing it; that is
necessary to avoid it flushing the queue every 5 minutes in this case.)

But, when it takes more than a 5 minutes to add a file, the overhead of
updating the index immediately is probably small, so do it after each
file. This avoids git-annex potentially taking a very very long time
indeed to stage newly added files, which can be annoying to the user who
would like to get on with doing something with the files it's already
added, eg using git mv to rename them to a better name.

This is only likely to cause a problem if it takes say, 30 seconds to
update the index; doing an extra 30 seconds of work after every 5
minute file add would be less optimal. Normally, updating the index takes
significantly less time than that. On a SSD with 100k files it takes
less than 1 second, and the index write time is bound by disk read and
write so is not too much worse on a hard drive. So I hope this will not
impact users, although if it does turn out to, the time limit could be
made configurable.

A perhaps better way to do it would be to have a background worker
thread that wakes up every 60 seconds or so and flushes the queue.
That is made somewhat difficult because the queue can contain Annex
actions and so this would add a new source of concurrency issues.
So I'm trying to avoid that approach if possible.

Sponsored-by: Erik Bjäreholt on Patreon
This commit is contained in:
Joey Hess 2021-12-14 11:48:07 -04:00
parent fe31951e5e
commit c2e46f4707
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
3 changed files with 46 additions and 13 deletions

View file

@ -78,7 +78,8 @@ get = maybe new return =<< getState repoqueue
new :: Annex (Git.Queue.Queue Annex) new :: Annex (Git.Queue.Queue Annex)
new = do new = do
q <- Git.Queue.new . annexQueueSize <$> getGitConfig sz <- annexQueueSize <$> getGitConfig
q <- liftIO $ Git.Queue.new sz Nothing
store q store q
return q return q

View file

@ -11,6 +11,9 @@ git-annex (8.20211124) UNRELEASED; urgency=medium
* Fix build with ghc 9.0.1 * Fix build with ghc 9.0.1
* Improve error message display when autoinit fails due to eg, a * Improve error message display when autoinit fails due to eg, a
permissions problem. permissions problem.
* Improve git command queue flushing so that eg, addurl of several
large files that take time to download will update the index for each
file, rather than deferring the index updates to the end.
-- Joey Hess <id@joeyh.name> Tue, 23 Nov 2021 15:58:27 -0400 -- Joey Hess <id@joeyh.name> Tue, 23 Nov 2021 15:58:27 -0400

View file

@ -10,6 +10,7 @@
module Git.Queue ( module Git.Queue (
Queue, Queue,
new, new,
defaultTimelimit,
addCommand, addCommand,
addUpdateIndex, addUpdateIndex,
addInternalAction, addInternalAction,
@ -28,6 +29,8 @@ import qualified Git.UpdateIndex
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
import Control.Monad.IO.Class import Control.Monad.IO.Class
import Data.Time.Clock
import Data.Time.Clock.POSIX
{- Queable actions that can be performed in a git repository. -} {- Queable actions that can be performed in a git repository. -}
data Action m data Action m
@ -76,6 +79,8 @@ actionKey InternalAction { getRunner = InternalActionRunner s _ } = InternalActi
data Queue m = Queue data Queue m = Queue
{ size :: Int { size :: Int
, _limit :: Int , _limit :: Int
, _timelimit :: NominalDiffTime
, _lastchanged :: POSIXTime
, items :: M.Map ActionKey (Action m) , items :: M.Map ActionKey (Action m)
} }
@ -91,9 +96,21 @@ data Queue m = Queue
defaultLimit :: Int defaultLimit :: Int
defaultLimit = 10240 defaultLimit = 10240
{- How close together in seconds changes to the queue have to be happening
- in order for it to keep accumulate actions, rather than running actions
- immediately. -}
defaultTimelimit :: NominalDiffTime
defaultTimelimit = 60 * 5
{- Constructor for empty queue. -} {- Constructor for empty queue. -}
new :: Maybe Int -> Queue m new :: Maybe Int -> Maybe NominalDiffTime -> IO (Queue m)
new lim = Queue 0 (fromMaybe defaultLimit lim) M.empty new lim tlim = do
now <- getPOSIXTime
return $ Queue 0
(fromMaybe defaultLimit lim)
(fromMaybe defaultTimelimit tlim)
now
M.empty
{- Adds an git command to the queue. {- Adds an git command to the queue.
- -
@ -139,15 +156,25 @@ addUpdateIndex streamer q repo =
different (UpdateIndexAction _) = False different (UpdateIndexAction _) = False
different _ = True different _ = True
{- Updates or adds an action in the queue. If the queue already contains a {- Updates or adds an action in the queue.
- different action, it will be flushed; this is to ensure that conflicting -
- actions, like add and rm, are run in the right order.-} - If the queue already contains a different action, it will be flushed
- before adding the action; this is to ensure that conflicting actions,
- like add and rm, are run in the right order.
-
- If the queue's time limit has been exceeded, it will also be flushed,
- and the action will be run right away.
-}
updateQueue :: MonadIO m => Action m -> (Action m -> Bool) -> Int -> Queue m -> Repo -> m (Queue m) updateQueue :: MonadIO m => Action m -> (Action m -> Bool) -> Int -> Queue m -> Repo -> m (Queue m)
updateQueue !action different sizeincrease q repo updateQueue !action different sizeincrease q repo = do
| null (filter different (M.elems (items q))) = return $ go q now <- liftIO getPOSIXTime
| otherwise = go <$> flush q repo if now - (_lastchanged q) > _timelimit q
then flush (mk q) repo
else if null (filter different (M.elems (items q)))
then return $ mk (q { _lastchanged = now })
else mk <$> flush q repo
where where
go q' = newq mk q' = newq
where where
!newq = q' !newq = q'
{ size = newsize { size = newsize
@ -175,17 +202,19 @@ merge :: Queue m -> Queue m -> Queue m
merge origq newq = origq merge origq newq = origq
{ size = size origq + size newq { size = size origq + size newq
, items = M.unionWith combineNewOld (items newq) (items origq) , items = M.unionWith combineNewOld (items newq) (items origq)
, _lastchanged = max (_lastchanged origq) (_lastchanged newq)
} }
{- Is a queue large enough that it should be flushed? -} {- Is a queue large enough that it should be flushed? -}
full :: Queue m -> Bool full :: Queue m -> Bool
full (Queue cur lim _) = cur >= lim full (Queue cur lim _ _ _) = cur >= lim
{- Runs a queue on a git repository. -} {- Runs a queue on a git repository. -}
flush :: MonadIO m => Queue m -> Repo -> m (Queue m) flush :: MonadIO m => Queue m -> Repo -> m (Queue m)
flush (Queue _ lim m) repo = do flush (Queue _ lim tlim _ m) repo = do
forM_ (M.elems m) $ runAction repo forM_ (M.elems m) $ runAction repo
return $ Queue 0 lim M.empty now <- liftIO getPOSIXTime
return $ Queue 0 lim tlim now M.empty
{- Runs an Action on a list of files in a git repository. {- Runs an Action on a list of files in a git repository.
- -