smart commit thread

The commit thread now has access to a channel containing the times of
all uncommitted changes. This lets it be smart about detecting busy times
when a batch job is running (such as rm -rf, or untarring something, etc),
and avoid committing until it's done. While at the same time, instantly
committing one-off changes that the user is going to expect to see
immediately.

I had to use STM to implement the channel, because of
http://hackage.haskell.org/trac/ghc/ticket/4154
While this adds a dependency, I always wanted to use STM, so this actually
makes me happy. ;)

Also happy that shouldCommit is a pure function, so other commit smartness
strategies can easily be played with. Although the current one seems pretty
good.

There is one bug, for some reason it does double commits, every time.
This commit is contained in:
Joey Hess 2012-06-10 16:07:48 -04:00
parent 6e54907e35
commit 2de50f733a
3 changed files with 84 additions and 27 deletions

View file

@ -12,9 +12,6 @@ module Command.Watch where
import Common.Annex import Common.Annex
import Command import Command
#if defined linux_HOST_OS
import Utility.Inotify
#endif
import Utility.ThreadLock import Utility.ThreadLock
import qualified Annex import qualified Annex
import qualified Annex.Queue import qualified Annex.Queue
@ -25,11 +22,16 @@ import qualified Backend
import Annex.Content import Annex.Content
import Control.Concurrent import Control.Concurrent
import Control.Concurrent.STM
import Data.Time.Clock
#if defined linux_HOST_OS #if defined linux_HOST_OS
import Utility.Inotify
import System.INotify import System.INotify
#endif #endif
type ChangeChan = TChan UTCTime
def :: [Command] def :: [Command]
def = [command "watch" paramPaths seek "watch for changes"] def = [command "watch" paramPaths seek "watch for changes"]
@ -42,8 +44,10 @@ start = notBareRepo $ do
showStart "watch" "." showStart "watch" "."
showAction "scanning" showAction "scanning"
inRepo $ Git.Command.run "add" [Param "--update"] inRepo $ Git.Command.run "add" [Param "--update"]
next $ next $ withStateMVar $ \mvar -> liftIO $ withINotify $ \i -> do next $ next $ withStateMVar $ \st -> liftIO $ withINotify $ \i -> do
let hook a = Just $ runHook mvar a changechan <- atomically newTChan
_ <- forkIO $ commitThread st changechan
let hook a = Just $ runHook st changechan a
let hooks = WatchHooks let hooks = WatchHooks
{ addHook = hook onAdd { addHook = hook onAdd
, delHook = hook onDel , delHook = hook onDel
@ -52,7 +56,6 @@ start = notBareRepo $ do
, errHook = hook onErr , errHook = hook onErr
} }
watchDir i "." (ignored . takeFileName) hooks watchDir i "." (ignored . takeFileName) hooks
_ <- forkIO $ commitThread mvar
putStrLn "(started)" putStrLn "(started)"
waitForTermination waitForTermination
return True return True
@ -91,12 +94,12 @@ runStateMVar mvar a = do
- -
- Exceptions are ignored, otherwise a whole watcher thread could be crashed. - Exceptions are ignored, otherwise a whole watcher thread could be crashed.
-} -}
runHook :: MVar Annex.AnnexState -> (FilePath -> Annex ()) -> FilePath -> IO () runHook :: MVar Annex.AnnexState -> ChangeChan -> (FilePath -> Annex ()) -> FilePath -> IO ()
runHook mvar a f = handle =<< tryIO (runStateMVar mvar go) runHook st changetimes a f = handle =<< tryIO (runStateMVar st go)
where where
go = do go = do
a f a f
Annex.Queue.flushWhenFull signalChange changetimes
handle (Right ()) = return () handle (Right ()) = return ()
handle (Left e) = putStrLn $ show e handle (Left e) = putStrLn $ show e
@ -160,20 +163,72 @@ stageSymlink file linktext =
Annex.Queue.addUpdateIndex =<< Annex.Queue.addUpdateIndex =<<
inRepo (Git.UpdateIndex.stageSymlink file linktext) inRepo (Git.UpdateIndex.stageSymlink file linktext)
{- This thread wakes up periodically and makes git commits. -} {- Signals that a change has been made, that needs to get committed. -}
commitThread :: MVar Annex.AnnexState -> IO () signalChange :: ChangeChan -> Annex ()
commitThread mvar = forever $ do signalChange chan = do
threadDelay 1000000 -- 1 second liftIO $ (atomically . writeTChan chan) =<< getCurrentTime
commit -- Just in case the commit thread is not flushing
-- the queue fast enough.
Annex.Queue.flushWhenFull
{- Gets the times of all unhandled changes.
- Blocks until at least one change is made. -}
getChanges :: ChangeChan -> IO [UTCTime]
getChanges chan = atomically $ do
c <- readTChan chan
go [c]
where where
commit = tryIO $ runStateMVar mvar $ go l = do
whenM ((>) <$> Annex.Queue.size <*> pure 0) $ do v <- tryReadTChan chan
Annex.Queue.flush case v of
{- Empty commits may be made if tree Nothing -> return l
- changes cancel each other out, etc. -} Just c -> go (c:l)
inRepo $ Git.Command.run "commit"
[ Param "--allow-empty-message" {- Puts unhandled changes back into the channel.
, Param "-m", Param "" - Note: Original order is not preserved. -}
, Param "--allow-empty" refillChanges :: ChangeChan -> [UTCTime] -> IO ()
, Param "--quiet" refillChanges chan cs = atomically $ mapM_ (writeTChan chan) cs
]
{- This thread makes git commits. -}
commitThread :: MVar Annex.AnnexState -> ChangeChan -> IO ()
commitThread st changechan = forever $ do
-- First, a simple rate limiter.
threadDelay $ oneSecond
liftIO $ putStrLn "running"
-- Next, wait until at least one change has been made.
cs <- getChanges changechan
-- Now see if now's a good time to commit.
ifM (shouldCommit <$> getCurrentTime <*> pure cs) $
( commit
, do
liftIO $ putStrLn $ "no commit now " ++ show (length cs)
refillChanges changechan cs
)
where
commit = void $ tryIO $ runStateMVar st $ do
Annex.Queue.flush
{- Empty commits may be made if tree
- changes cancel each other out, etc. -}
inRepo $ Git.Command.run "commit"
[ Param "--allow-empty-message"
, Param "-m", Param ""
, Param "--allow-empty"
, Param "--quiet"
]
oneSecond = 1000000 -- microseconds
{- Decide if now is a good time to make a commit.
- Note that the list of change times has an undefined order.
-
- Current strategy: If there have been 10 commits within the past second,
- a batch activity is taking place, so wait for later.
-}
shouldCommit :: UTCTime -> [UTCTime] -> Bool
shouldCommit now changetimes
| len == 0 = False
| len > 4096 = True -- avoid bloating queue too much
| length (filter thisSecond changetimes) < 10 = True
| otherwise = False -- batch activity
where
len = length changetimes
thisSecond t = now `diffUTCTime` t <= 1

1
debian/control vendored
View file

@ -21,6 +21,7 @@ Build-Depends:
libghc-bloomfilter-dev, libghc-bloomfilter-dev,
libghc-edit-distance-dev, libghc-edit-distance-dev,
libghc-hinotify-dev, libghc-hinotify-dev,
libghc-stm-dev,
ikiwiki, ikiwiki,
perlmagick, perlmagick,
git, git,

View file

@ -36,7 +36,7 @@ Executable git-annex
pcre-light, extensible-exceptions, dataenc, SHA, process, json, HTTP, pcre-light, extensible-exceptions, dataenc, SHA, process, json, HTTP,
base == 4.5.*, monad-control, transformers-base, lifted-base, base == 4.5.*, monad-control, transformers-base, lifted-base,
IfElse, text, QuickCheck >= 2.1, bloomfilter, edit-distance, IfElse, text, QuickCheck >= 2.1, bloomfilter, edit-distance,
hinotify hinotify, STM
Other-Modules: Utility.Touch Other-Modules: Utility.Touch
C-Sources: Utility/libdiskfree.c C-Sources: Utility/libdiskfree.c
Extensions: CPP Extensions: CPP
@ -52,7 +52,8 @@ Test-Suite test
unix, containers, utf8-string, network, mtl, bytestring, old-locale, time, unix, containers, utf8-string, network, mtl, bytestring, old-locale, time,
pcre-light, extensible-exceptions, dataenc, SHA, process, json, HTTP, pcre-light, extensible-exceptions, dataenc, SHA, process, json, HTTP,
base == 4.5.*, monad-control, transformers-base, lifted-base, base == 4.5.*, monad-control, transformers-base, lifted-base,
IfElse, text, QuickCheck >= 2.1, bloomfilter, edit-distance IfElse, text, QuickCheck >= 2.1, bloomfilter, edit-distance,
hinotify, STM
C-Sources: Utility/libdiskfree.c C-Sources: Utility/libdiskfree.c
Extensions: CPP Extensions: CPP