fixed the double commits problem
This commit is contained in:
parent
fc0dd79774
commit
aae0ba1995
2 changed files with 84 additions and 59 deletions
|
@ -94,9 +94,8 @@ undo file key e = do
|
||||||
src <- inRepo $ gitAnnexLocation key
|
src <- inRepo $ gitAnnexLocation key
|
||||||
liftIO $ moveFile src file
|
liftIO $ moveFile src file
|
||||||
|
|
||||||
{- Creates the symlink to the annexed content, and also returns the link's
|
{- Creates the symlink to the annexed content. -}
|
||||||
- text. -}
|
link :: FilePath -> Key -> Bool -> Annex ()
|
||||||
link :: FilePath -> Key -> Bool -> Annex FilePath
|
|
||||||
link file key hascontent = handle (undo file key) $ do
|
link file key hascontent = handle (undo file key) $ do
|
||||||
l <- calcGitLink file key
|
l <- calcGitLink file key
|
||||||
liftIO $ createSymbolicLink l file
|
liftIO $ createSymbolicLink l file
|
||||||
|
@ -110,8 +109,6 @@ link file key hascontent = handle (undo file key) $ do
|
||||||
mtime <- modificationTime <$> getFileStatus file
|
mtime <- modificationTime <$> getFileStatus file
|
||||||
touch file (TimeSpec mtime) False
|
touch file (TimeSpec mtime) False
|
||||||
|
|
||||||
return l
|
|
||||||
|
|
||||||
{- Note: Several other commands call this, and expect it to
|
{- Note: Several other commands call this, and expect it to
|
||||||
- create the symlink and add it. -}
|
- create the symlink and add it. -}
|
||||||
cleanup :: FilePath -> Key -> Bool -> CommandCleanup
|
cleanup :: FilePath -> Key -> Bool -> CommandCleanup
|
||||||
|
|
136
Command/Watch.hs
136
Command/Watch.hs
|
@ -30,7 +30,16 @@ import Utility.Inotify
|
||||||
import System.INotify
|
import System.INotify
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
type ChangeChan = TChan UTCTime
|
type ChangeChan = TChan Change
|
||||||
|
|
||||||
|
type Handler = FilePath -> Annex (Maybe Change)
|
||||||
|
|
||||||
|
data Change = Change
|
||||||
|
{ changeTime :: UTCTime
|
||||||
|
, changeFile :: FilePath
|
||||||
|
, changeDesc :: String
|
||||||
|
}
|
||||||
|
deriving (Show)
|
||||||
|
|
||||||
def :: [Command]
|
def :: [Command]
|
||||||
def = [command "watch" paramPaths seek "watch for changes"]
|
def = [command "watch" paramPaths seek "watch for changes"]
|
||||||
|
@ -52,7 +61,7 @@ watch = do
|
||||||
withStateMVar $ \st -> liftIO $ withINotify $ \i -> do
|
withStateMVar $ \st -> liftIO $ withINotify $ \i -> do
|
||||||
changechan <- atomically newTChan
|
changechan <- atomically newTChan
|
||||||
_ <- forkIO $ commitThread st changechan
|
_ <- forkIO $ commitThread st changechan
|
||||||
let hook a = Just $ runHook st changechan a
|
let hook a = Just $ runHandler st changechan a
|
||||||
let hooks = WatchHooks
|
let hooks = WatchHooks
|
||||||
{ addHook = hook onAdd
|
{ addHook = hook onAdd
|
||||||
, delHook = hook onDel
|
, delHook = hook onDel
|
||||||
|
@ -94,56 +103,70 @@ runStateMVar mvar a = do
|
||||||
!newstate <- Annex.exec startstate a
|
!newstate <- Annex.exec startstate a
|
||||||
putMVar mvar newstate
|
putMVar mvar newstate
|
||||||
|
|
||||||
{- Runs a hook, inside the Annex monad.
|
{- Runs an action handler, inside the Annex monad.
|
||||||
-
|
-
|
||||||
- Exceptions are ignored, otherwise a whole watcher thread could be crashed.
|
- Exceptions are ignored, otherwise a whole watcher thread could be crashed.
|
||||||
-}
|
-}
|
||||||
runHook :: MVar Annex.AnnexState -> ChangeChan -> (FilePath -> Annex ()) -> FilePath -> IO ()
|
runHandler :: MVar Annex.AnnexState -> ChangeChan -> Handler -> FilePath -> IO ()
|
||||||
runHook st changetimes a f = handle =<< tryIO (runStateMVar st go)
|
runHandler st changechan hook file = handle =<< tryIO (runStateMVar st go)
|
||||||
where
|
where
|
||||||
go = do
|
go = maybe noop (signalChange changechan) =<< hook file
|
||||||
a f
|
|
||||||
signalChange changetimes
|
|
||||||
handle (Right ()) = return ()
|
handle (Right ()) = return ()
|
||||||
handle (Left e) = putStrLn $ show e
|
handle (Left e) = putStrLn $ show e
|
||||||
|
|
||||||
|
{- Handlers call this when they made a change that needs to get committed. -}
|
||||||
|
madeChange :: FilePath -> String -> Annex (Maybe Change)
|
||||||
|
madeChange file desc = liftIO $
|
||||||
|
Just <$> (Change <$> getCurrentTime <*> pure file <*> pure desc)
|
||||||
|
|
||||||
{- Adding a file is tricky; the file has to be replaced with a symlink
|
{- Adding a file is tricky; the file has to be replaced with a symlink
|
||||||
- but this is race prone, as the symlink could be changed immediately
|
- but this is race prone, as the symlink could be changed immediately
|
||||||
- after creation. To avoid that race, git add is not used to stage the
|
- after creation. To avoid that race, git add is not used to stage the
|
||||||
- symlink. -}
|
- symlink.
|
||||||
onAdd :: FilePath -> Annex ()
|
-
|
||||||
|
- Inotify will notice the new symlink, so this Handler does not stage it
|
||||||
|
- or return a Change, leaving that to onAddSymlink.
|
||||||
|
-}
|
||||||
|
onAdd :: Handler
|
||||||
onAdd file = do
|
onAdd file = do
|
||||||
showStart "add" file
|
showStart "add" file
|
||||||
Command.Add.ingest file >>= go
|
handle =<< Command.Add.ingest file
|
||||||
|
return Nothing
|
||||||
where
|
where
|
||||||
go Nothing = showEndFail
|
handle Nothing = showEndFail
|
||||||
go (Just key) = do
|
handle (Just key) = do
|
||||||
link <- Command.Add.link file key True
|
Command.Add.link file key True
|
||||||
stageSymlink file link
|
|
||||||
showEndOk
|
showEndOk
|
||||||
|
|
||||||
{- A symlink might be an arbitrary symlink, which is just added.
|
{- A symlink might be an arbitrary symlink, which is just added.
|
||||||
- Or, if it is a git-annex symlink, ensure it points to the content
|
- Or, if it is a git-annex symlink, ensure it points to the content
|
||||||
- before adding it.
|
- before adding it.
|
||||||
-}
|
-}
|
||||||
onAddSymlink :: FilePath -> Annex ()
|
onAddSymlink :: Handler
|
||||||
onAddSymlink file = go =<< Backend.lookupFile file
|
onAddSymlink file = go =<< Backend.lookupFile file
|
||||||
where
|
where
|
||||||
go Nothing = addlink =<< liftIO (readSymbolicLink file)
|
go Nothing = do
|
||||||
|
addlink =<< liftIO (readSymbolicLink file)
|
||||||
|
madeChange file "add"
|
||||||
go (Just (key, _)) = do
|
go (Just (key, _)) = do
|
||||||
link <- calcGitLink file key
|
link <- calcGitLink file key
|
||||||
ifM ((==) link <$> liftIO (readSymbolicLink file))
|
ifM ((==) link <$> liftIO (readSymbolicLink file))
|
||||||
( addlink link
|
( do
|
||||||
|
addlink link
|
||||||
|
madeChange file "add"
|
||||||
, do
|
, do
|
||||||
liftIO $ removeFile file
|
liftIO $ removeFile file
|
||||||
liftIO $ createSymbolicLink link file
|
liftIO $ createSymbolicLink link file
|
||||||
addlink link
|
addlink link
|
||||||
|
madeChange file "fix"
|
||||||
)
|
)
|
||||||
addlink link = stageSymlink file link
|
addlink link = stageSymlink file link
|
||||||
|
|
||||||
onDel :: FilePath -> Annex ()
|
onDel :: Handler
|
||||||
onDel file = Annex.Queue.addUpdateIndex =<<
|
onDel file = do
|
||||||
inRepo (Git.UpdateIndex.unstageFile file)
|
Annex.Queue.addUpdateIndex =<<
|
||||||
|
inRepo (Git.UpdateIndex.unstageFile file)
|
||||||
|
madeChange file "rm"
|
||||||
|
|
||||||
{- A directory has been deleted, or moved, so tell git to remove anything
|
{- A directory has been deleted, or moved, so tell git to remove anything
|
||||||
- that was inside it from its cache. Since it could reappear at any time,
|
- that was inside it from its cache. Since it could reappear at any time,
|
||||||
|
@ -152,13 +175,17 @@ onDel file = Annex.Queue.addUpdateIndex =<<
|
||||||
- Note: This could use unstageFile, but would need to run another git
|
- Note: This could use unstageFile, but would need to run another git
|
||||||
- command to get the recursive list of files in the directory, so rm is
|
- command to get the recursive list of files in the directory, so rm is
|
||||||
- just as good. -}
|
- just as good. -}
|
||||||
onDelDir :: FilePath -> Annex ()
|
onDelDir :: Handler
|
||||||
onDelDir dir = Annex.Queue.addCommand "rm"
|
onDelDir dir = do
|
||||||
[Params "--quiet -r --cached --ignore-unmatch --"] [dir]
|
Annex.Queue.addCommand "rm"
|
||||||
|
[Params "--quiet -r --cached --ignore-unmatch --"] [dir]
|
||||||
|
madeChange dir "rmdir"
|
||||||
|
|
||||||
{- Called when there's an error with inotify. -}
|
{- Called when there's an error with inotify. -}
|
||||||
onErr :: String -> Annex ()
|
onErr :: Handler
|
||||||
onErr = warning
|
onErr msg = do
|
||||||
|
warning msg
|
||||||
|
return Nothing
|
||||||
|
|
||||||
{- Adds a symlink to the index, without ever accessing the actual symlink
|
{- Adds a symlink to the index, without ever accessing the actual symlink
|
||||||
- on disk. -}
|
- on disk. -}
|
||||||
|
@ -168,16 +195,17 @@ stageSymlink file linktext =
|
||||||
inRepo (Git.UpdateIndex.stageSymlink file linktext)
|
inRepo (Git.UpdateIndex.stageSymlink file linktext)
|
||||||
|
|
||||||
{- Signals that a change has been made, that needs to get committed. -}
|
{- Signals that a change has been made, that needs to get committed. -}
|
||||||
signalChange :: ChangeChan -> Annex ()
|
signalChange :: ChangeChan -> Change -> Annex ()
|
||||||
signalChange chan = do
|
signalChange chan change = do
|
||||||
liftIO $ (atomically . writeTChan chan) =<< getCurrentTime
|
liftIO $ atomically $ writeTChan chan change
|
||||||
|
|
||||||
-- Just in case the commit thread is not flushing
|
-- Just in case the commit thread is not flushing
|
||||||
-- the queue fast enough.
|
-- the queue fast enough.
|
||||||
Annex.Queue.flushWhenFull
|
Annex.Queue.flushWhenFull
|
||||||
|
|
||||||
{- Gets the times of all unhandled changes.
|
{- Gets all unhandled changes.
|
||||||
- Blocks until at least one change is made. -}
|
- Blocks until at least one change is made. -}
|
||||||
getChanges :: ChangeChan -> IO [UTCTime]
|
getChanges :: ChangeChan -> IO [Change]
|
||||||
getChanges chan = atomically $ do
|
getChanges chan = atomically $ do
|
||||||
c <- readTChan chan
|
c <- readTChan chan
|
||||||
go [c]
|
go [c]
|
||||||
|
@ -190,10 +218,10 @@ getChanges chan = atomically $ do
|
||||||
|
|
||||||
{- Puts unhandled changes back into the channel.
|
{- Puts unhandled changes back into the channel.
|
||||||
- Note: Original order is not preserved. -}
|
- Note: Original order is not preserved. -}
|
||||||
refillChanges :: ChangeChan -> [UTCTime] -> IO ()
|
refillChanges :: ChangeChan -> [Change] -> IO ()
|
||||||
refillChanges chan cs = atomically $ mapM_ (writeTChan chan) cs
|
refillChanges chan cs = atomically $ mapM_ (writeTChan chan) cs
|
||||||
|
|
||||||
{- This thread makes git commits. -}
|
{- This thread makes git commits at appropriate times. -}
|
||||||
commitThread :: MVar Annex.AnnexState -> ChangeChan -> IO ()
|
commitThread :: MVar Annex.AnnexState -> ChangeChan -> IO ()
|
||||||
commitThread st changechan = forever $ do
|
commitThread st changechan = forever $ do
|
||||||
-- First, a simple rate limiter.
|
-- First, a simple rate limiter.
|
||||||
|
@ -203,38 +231,38 @@ commitThread st changechan = forever $ do
|
||||||
-- Now see if now's a good time to commit.
|
-- Now see if now's a good time to commit.
|
||||||
time <- getCurrentTime
|
time <- getCurrentTime
|
||||||
if shouldCommit time cs
|
if shouldCommit time cs
|
||||||
then commit
|
then void $ tryIO $ runStateMVar st $ commitStaged
|
||||||
else refillChanges changechan cs
|
else refillChanges changechan cs
|
||||||
where
|
where
|
||||||
commit = void $ tryIO $ runStateMVar st $ do
|
|
||||||
Annex.Queue.flush
|
|
||||||
inRepo $ Git.Command.run "commit"
|
|
||||||
[ Param "--allow-empty-message"
|
|
||||||
, Param "-m", Param ""
|
|
||||||
-- Empty commits may be made if tree
|
|
||||||
-- changes cancel each other out, etc
|
|
||||||
, Param "--allow-empty"
|
|
||||||
-- Avoid running the usual git-annex
|
|
||||||
-- pre-commit hook; watch does the same
|
|
||||||
-- symlink fixing, and we don't want to
|
|
||||||
-- deal with unlocked files in these
|
|
||||||
-- commits.
|
|
||||||
, Param "--quiet"
|
|
||||||
]
|
|
||||||
oneSecond = 1000000 -- microseconds
|
oneSecond = 1000000 -- microseconds
|
||||||
|
|
||||||
|
commitStaged :: Annex ()
|
||||||
|
commitStaged = do
|
||||||
|
Annex.Queue.flush
|
||||||
|
inRepo $ Git.Command.run "commit"
|
||||||
|
[ Param "--allow-empty-message"
|
||||||
|
, Param "-m", Param ""
|
||||||
|
-- Empty commits may be made if tree changes cancel
|
||||||
|
-- each other out, etc
|
||||||
|
, Param "--allow-empty"
|
||||||
|
-- Avoid running the usual git-annex pre-commit hook;
|
||||||
|
-- watch does the same symlink fixing, and we don't want
|
||||||
|
-- to deal with unlocked files in these commits.
|
||||||
|
, Param "--quiet"
|
||||||
|
]
|
||||||
|
|
||||||
{- Decide if now is a good time to make a commit.
|
{- Decide if now is a good time to make a commit.
|
||||||
- Note that the list of change times has an undefined order.
|
- Note that the list of change times has an undefined order.
|
||||||
-
|
-
|
||||||
- Current strategy: If there have been 10 commits within the past second,
|
- Current strategy: If there have been 10 commits within the past second,
|
||||||
- a batch activity is taking place, so wait for later.
|
- a batch activity is taking place, so wait for later.
|
||||||
-}
|
-}
|
||||||
shouldCommit :: UTCTime -> [UTCTime] -> Bool
|
shouldCommit :: UTCTime -> [Change] -> Bool
|
||||||
shouldCommit now changetimes
|
shouldCommit now changes
|
||||||
| len == 0 = False
|
| len == 0 = False
|
||||||
| len > 4096 = True -- avoid bloating queue too much
|
| len > 4096 = True -- avoid bloating queue too much
|
||||||
| length (filter thisSecond changetimes) < 10 = True
|
| length (filter thisSecond changes) < 10 = True
|
||||||
| otherwise = False -- batch activity
|
| otherwise = False -- batch activity
|
||||||
where
|
where
|
||||||
len = length changetimes
|
len = length changes
|
||||||
thisSecond t = now `diffUTCTime` t <= 1
|
thisSecond c = now `diffUTCTime` changeTime c <= 1
|
||||||
|
|
Loading…
Reference in a new issue