committer tweak to wait for Watcher to resume after a max-size commit
Without this, a very large batch add has commits of sizes approx 5000, 2500, 1250, etc down to 10, and then starts over at 5000. This fixes it so it's 5000+ every time.
This commit is contained in:
parent
49547ad32d
commit
82a6db8fe8
1 changed files with 73 additions and 20 deletions
|
@ -52,7 +52,7 @@ commitThread = namedThread "Committer" $ do
|
||||||
=<< annexDelayAdd <$> Annex.getGitConfig
|
=<< annexDelayAdd <$> Annex.getGitConfig
|
||||||
waitChangeTime $ \(changes, time) -> do
|
waitChangeTime $ \(changes, time) -> do
|
||||||
readychanges <- handleAdds delayadd changes
|
readychanges <- handleAdds delayadd changes
|
||||||
if shouldCommit time readychanges
|
if shouldCommit time (length readychanges) readychanges
|
||||||
then do
|
then do
|
||||||
debug
|
debug
|
||||||
[ "committing"
|
[ "committing"
|
||||||
|
@ -62,8 +62,12 @@ commitThread = namedThread "Committer" $ do
|
||||||
void $ alertWhile commitAlert $
|
void $ alertWhile commitAlert $
|
||||||
liftAnnex commitStaged
|
liftAnnex commitStaged
|
||||||
recordCommit
|
recordCommit
|
||||||
|
let numchanges = length readychanges
|
||||||
mapM_ checkChangeContent readychanges
|
mapM_ checkChangeContent readychanges
|
||||||
else refill readychanges
|
return numchanges
|
||||||
|
else do
|
||||||
|
refill readychanges
|
||||||
|
return 0
|
||||||
|
|
||||||
refill :: [Change] -> Assistant ()
|
refill :: [Change] -> Assistant ()
|
||||||
refill [] = noop
|
refill [] = noop
|
||||||
|
@ -72,21 +76,33 @@ refill cs = do
|
||||||
refillChanges cs
|
refillChanges cs
|
||||||
|
|
||||||
{- Wait for one or more changes to arrive to be committed. -}
|
{- Wait for one or more changes to arrive to be committed. -}
|
||||||
waitChangeTime :: (([Change], UTCTime) -> Assistant ()) -> Assistant ()
|
waitChangeTime :: (([Change], UTCTime) -> Assistant Int) -> Assistant ()
|
||||||
waitChangeTime a = runEvery (Seconds 1) <~> do
|
waitChangeTime a = go [] 0
|
||||||
-- We already waited one second as a simple rate limiter.
|
|
||||||
-- Next, wait until at least one change is available for
|
|
||||||
-- processing.
|
|
||||||
changes <- getChanges
|
|
||||||
-- See if now's a good time to commit.
|
|
||||||
now <- liftIO getCurrentTime
|
|
||||||
case (shouldCommit now changes, possiblyrename changes) of
|
|
||||||
(True, False) -> a (changes, now)
|
|
||||||
(True, True) -> do
|
|
||||||
morechanges <- getrelatedchanges changes
|
|
||||||
a (changes ++ morechanges, now)
|
|
||||||
_ -> refill changes
|
|
||||||
where
|
where
|
||||||
|
go unhandled lastcommitsize = do
|
||||||
|
-- Wait one one second as a simple rate limiter.
|
||||||
|
liftIO $ threadDelaySeconds (Seconds 1)
|
||||||
|
-- Now, wait until at least one change is available for
|
||||||
|
-- processing.
|
||||||
|
cs <- getChanges
|
||||||
|
let changes = unhandled ++ cs
|
||||||
|
let len = length changes
|
||||||
|
-- See if now's a good time to commit.
|
||||||
|
now <- liftIO getCurrentTime
|
||||||
|
case (lastcommitsize >= maxCommitSize, shouldCommit now len changes, possiblyrename changes) of
|
||||||
|
(True, True, _)
|
||||||
|
| len > maxCommitSize ->
|
||||||
|
go [] =<< a (changes, now)
|
||||||
|
| otherwise -> aftermaxcommit changes
|
||||||
|
(_, True, False) ->
|
||||||
|
go [] =<< a (changes, now)
|
||||||
|
(_, True, True) -> do
|
||||||
|
morechanges <- getrelatedchanges changes
|
||||||
|
go [] =<< a (changes ++ morechanges, now)
|
||||||
|
_ -> do
|
||||||
|
refill changes
|
||||||
|
go [] lastcommitsize
|
||||||
|
|
||||||
{- Did we perhaps only get one of the AddChange and RmChange pair
|
{- Did we perhaps only get one of the AddChange and RmChange pair
|
||||||
- that make up a file rename? Or some of the pairs that make up
|
- that make up a file rename? Or some of the pairs that make up
|
||||||
- a directory rename?
|
- a directory rename?
|
||||||
|
@ -116,6 +132,41 @@ waitChangeTime a = runEvery (Seconds 1) <~> do
|
||||||
then return cs
|
then return cs
|
||||||
else getbatchchanges (cs':cs)
|
else getbatchchanges (cs':cs)
|
||||||
|
|
||||||
|
{- The last commit was maximum size, so it's very likely there
|
||||||
|
- are more changes and we'd like to ensure we make another commit
|
||||||
|
- of maximum size if possible.
|
||||||
|
-
|
||||||
|
- But, it can take a while for the Watcher to wake back up
|
||||||
|
- after a commit. It can get blocked by another thread
|
||||||
|
- that is using the Annex state, such as a git-annex branch
|
||||||
|
- commit. Especially after such a large commit, this can
|
||||||
|
- take several seconds. When this happens, it defeats the
|
||||||
|
- normal commit batching, which sees some old changes the
|
||||||
|
- Watcher found while the commit was being prepared, and sees
|
||||||
|
- no recent ones, and wants to commit immediately.
|
||||||
|
-
|
||||||
|
- All that we need to do, then, is wait for the Watcher to
|
||||||
|
- wake up, and queue up one more change.
|
||||||
|
-
|
||||||
|
- However, it's also possible that we're at the end of changes for
|
||||||
|
- now. So to avoid waiting a really long time before committing
|
||||||
|
- those changes we have, poll for up to 30 seconds, and then
|
||||||
|
- commit them.
|
||||||
|
-
|
||||||
|
- Also, try to run something in Annex, to ensure we block
|
||||||
|
- longer if the Annex state is indeed blocked.
|
||||||
|
-}
|
||||||
|
aftermaxcommit oldchanges = loop (30 :: Int)
|
||||||
|
where
|
||||||
|
loop 0 = go oldchanges 0
|
||||||
|
loop n = do
|
||||||
|
liftAnnex noop -- ensure Annex state is free
|
||||||
|
liftIO $ threadDelaySeconds (Seconds 1)
|
||||||
|
changes <- getAnyChanges
|
||||||
|
if null changes
|
||||||
|
then loop (n - 1)
|
||||||
|
else go (oldchanges ++ changes) 0
|
||||||
|
|
||||||
isRmChange :: Change -> Bool
|
isRmChange :: Change -> Bool
|
||||||
isRmChange (Change { changeInfo = i }) | i == RmChange = True
|
isRmChange (Change { changeInfo = i }) | i == RmChange = True
|
||||||
isRmChange _ = False
|
isRmChange _ = False
|
||||||
|
@ -131,20 +182,22 @@ humanImperceptibleDelay :: IO ()
|
||||||
humanImperceptibleDelay = threadDelay $
|
humanImperceptibleDelay = threadDelay $
|
||||||
truncate $ humanImperceptibleInterval * fromIntegral oneSecond
|
truncate $ humanImperceptibleInterval * fromIntegral oneSecond
|
||||||
|
|
||||||
|
maxCommitSize :: Int
|
||||||
|
maxCommitSize = 5000
|
||||||
|
|
||||||
{- Decide if now is a good time to make a commit.
|
{- Decide if now is a good time to make a commit.
|
||||||
- Note that the list of changes has an undefined order.
|
- Note that the list of changes has an undefined order.
|
||||||
-
|
-
|
||||||
- Current strategy: If there have been 10 changes within the past second,
|
- Current strategy: If there have been 10 changes within the past second,
|
||||||
- a batch activity is taking place, so wait for later.
|
- a batch activity is taking place, so wait for later.
|
||||||
-}
|
-}
|
||||||
shouldCommit :: UTCTime -> [Change] -> Bool
|
shouldCommit :: UTCTime -> Int -> [Change] -> Bool
|
||||||
shouldCommit now changes
|
shouldCommit now len changes
|
||||||
| len == 0 = False
|
| len == 0 = False
|
||||||
| len > 5000 = True -- avoid bloating change pool too much
|
| len >= maxCommitSize = True
|
||||||
| length recentchanges < 10 = True
|
| length recentchanges < 10 = True
|
||||||
| otherwise = False -- batch activity
|
| otherwise = False -- batch activity
|
||||||
where
|
where
|
||||||
len = length changes
|
|
||||||
thissecond c = timeDelta c <= 1
|
thissecond c = timeDelta c <= 1
|
||||||
recentchanges = filter thissecond changes
|
recentchanges = filter thissecond changes
|
||||||
timeDelta c = now `diffUTCTime` changeTime c
|
timeDelta c = now `diffUTCTime` changeTime c
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue