Merge branch 'watch'

This commit is contained in:
Joey Hess 2012-06-21 01:05:37 -04:00
commit 019d073505
24 changed files with 754 additions and 204 deletions

2
.gitignore vendored
View file

@ -11,7 +11,7 @@ html
*.tix
.hpc
Utility/Touch.hs
Utility/libdiskfree.o
Utility/*.o
dist
# Sandboxed builds
cabal-dev

View file

@ -46,6 +46,7 @@ module Assistant where
import Common.Annex
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Changes
import Assistant.Watcher
import Assistant.Committer
import Assistant.SanityChecker

81
Assistant/Changes.hs Normal file
View file

@ -0,0 +1,81 @@
{- git-annex assistant change tracking
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-}
module Assistant.Changes where
import Common.Annex
import qualified Annex.Queue
import Types.KeySource
import Control.Concurrent.STM
import Data.Time.Clock
data ChangeType = AddChange | LinkChange | RmChange | RmDirChange
deriving (Show, Eq)
type ChangeChan = TChan Change
data Change
= Change
{ changeTime :: UTCTime
, changeFile :: FilePath
, changeType :: ChangeType
}
| PendingAddChange
{ changeTime ::UTCTime
, keySource :: KeySource
}
deriving (Show)
runChangeChan :: STM a -> IO a
runChangeChan = atomically
newChangeChan :: IO ChangeChan
newChangeChan = atomically newTChan
{- Handlers call this when they made a change that needs to get committed. -}
madeChange :: FilePath -> ChangeType -> Annex (Maybe Change)
madeChange f t = do
-- Just in case the commit thread is not flushing the queue fast enough.
Annex.Queue.flushWhenFull
liftIO $ Just <$> (Change <$> getCurrentTime <*> pure f <*> pure t)
noChange :: Annex (Maybe Change)
noChange = return Nothing
{- Indicates an add is in progress. -}
pendingAddChange :: KeySource -> Annex (Maybe Change)
pendingAddChange ks =
liftIO $ Just <$> (PendingAddChange <$> getCurrentTime <*> pure ks)
isPendingAddChange :: Change -> Bool
isPendingAddChange (PendingAddChange {}) = True
isPendingAddChange _ = False
finishedChange :: Change -> Change
finishedChange c@(PendingAddChange { keySource = ks }) = Change
{ changeTime = changeTime c
, changeFile = keyFilename ks
, changeType = AddChange
}
finishedChange c = c
{- Gets all unhandled changes.
- Blocks until at least one change is made. -}
getChanges :: ChangeChan -> IO [Change]
getChanges chan = runChangeChan $ do
c <- readTChan chan
go [c]
where
go l = do
v <- tryReadTChan chan
case v of
Nothing -> return l
Just c -> go (c:l)
{- Puts unhandled changes back into the channel.
- Note: Original order is not preserved. -}
refillChanges :: ChangeChan -> [Change] -> IO ()
refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs

View file

@ -1,4 +1,4 @@
{- git-annex assistant change tracking and committing
{- git-annex assistant commit thread
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-}
@ -6,79 +6,42 @@
module Assistant.Committer where
import Common.Annex
import Assistant.Changes
import Assistant.ThreadedMonad
import Assistant.Watcher
import qualified Annex
import qualified Annex.Queue
import qualified Git.Command
import qualified Git.HashObject
import Git.Types
import qualified Command.Add
import Utility.ThreadScheduler
import qualified Utility.Lsof as Lsof
import Types.Backend
import qualified Utility.DirWatcher as DirWatcher
import Types.KeySource
import Control.Concurrent.STM
import Data.Time.Clock
import Data.Tuple.Utils
import qualified Data.Set as S
data ChangeType = PendingAddChange | LinkChange | RmChange | RmDirChange
deriving (Show, Eq)
type ChangeChan = TChan Change
data Change = Change
{ changeTime :: UTCTime
, changeFile :: FilePath
, changeType :: ChangeType
}
deriving (Show)
runChangeChan :: STM a -> IO a
runChangeChan = atomically
newChangeChan :: IO ChangeChan
newChangeChan = atomically newTChan
{- Handlers call this when they made a change that needs to get committed. -}
madeChange :: FilePath -> ChangeType -> Annex (Maybe Change)
madeChange f t = do
-- Just in case the commit thread is not flushing the queue fast enough.
when (t /= PendingAddChange) $
Annex.Queue.flushWhenFull
liftIO $ Just <$> (Change <$> getCurrentTime <*> pure f <*> pure t)
noChange :: Annex (Maybe Change)
noChange = return Nothing
{- Gets all unhandled changes.
- Blocks until at least one change is made. -}
getChanges :: ChangeChan -> IO [Change]
getChanges chan = runChangeChan $ do
c <- readTChan chan
go [c]
where
go l = do
v <- tryReadTChan chan
case v of
Nothing -> return l
Just c -> go (c:l)
{- Puts unhandled changes back into the channel.
- Note: Original order is not preserved. -}
refillChanges :: ChangeChan -> [Change] -> IO ()
refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs
import Data.Either
{- This thread makes git commits at appropriate times. -}
commitThread :: ThreadState -> ChangeChan -> IO ()
commitThread st changechan = runEvery (Seconds 1) $ do
-- We already waited one second as a simple rate limiter.
-- Next, wait until at least one change has been made.
cs <- getChanges changechan
-- Next, wait until at least one change is available for
-- processing.
changes <- getChanges changechan
-- Now see if now's a good time to commit.
time <- getCurrentTime
if shouldCommit time cs
if shouldCommit time changes
then do
handleAdds st changechan cs
void $ tryIO $ runThreadState st commitStaged
else refillChanges changechan cs
readychanges <- handleAdds st changechan changes
if shouldCommit time readychanges
then do
void $ tryIO $ runThreadState st commitStaged
else refillChanges changechan readychanges
else refillChanges changechan changes
commitStaged :: Annex ()
commitStaged = do
@ -121,70 +84,112 @@ shouldCommit now changes
-
- When a file is added, Inotify will notice the new symlink. So this waits
- for additional Changes to arrive, so that the symlink has hopefully been
- staged before returning, and will be committed.
- staged before returning, and will be committed immediately.
-
- OTOH, for kqueue, eventsCoalesce, so instead the symlink is directly
- created and staged.
-
- Returns a list of all changes that are ready to be committed.
- Any pending adds that are not ready yet are put back into the ChangeChan,
- where they will be retried later.
-}
handleAdds :: ThreadState -> ChangeChan -> [Change] -> IO ()
handleAdds st changechan cs
| null toadd = noop
| otherwise = do
toadd' <- safeToAdd st toadd
unless (null toadd') $ do
added <- filter id <$> forM toadd' add
unless (null added) $
handleAdds st changechan =<< getChanges changechan
handleAdds :: ThreadState -> ChangeChan -> [Change] -> IO [Change]
handleAdds st changechan cs = returnWhen (null pendingadds) $ do
(postponed, toadd) <- partitionEithers <$>
safeToAdd st pendingadds
unless (null postponed) $
refillChanges changechan postponed
returnWhen (null toadd) $ do
added <- catMaybes <$> forM toadd add
if (DirWatcher.eventsCoalesce || null added)
then return $ added ++ otherchanges
else do
r <- handleAdds st changechan
=<< getChanges changechan
return $ r ++ added ++ otherchanges
where
toadd = map changeFile $ filter isPendingAdd cs
(pendingadds, otherchanges) = partition isPendingAddChange cs
isPendingAdd (Change { changeType = PendingAddChange }) = True
isPendingAdd _ = False
returnWhen c a
| c = return otherchanges
| otherwise = a
add keysource = catchBoolIO $ runThreadState st $ do
showStart "add" $ keyFilename keysource
handle (keyFilename keysource)
=<< Command.Add.ingest keysource
add :: Change -> IO (Maybe Change)
add change@(PendingAddChange { keySource = ks }) = do
r <- catchMaybeIO $ sanitycheck ks $ runThreadState st $ do
showStart "add" $ keyFilename ks
handle (finishedChange change) (keyFilename ks)
=<< Command.Add.ingest ks
return $ maybeMaybe r
add _ = return Nothing
handle _ Nothing = do
maybeMaybe (Just j@(Just _)) = j
maybeMaybe _ = Nothing
handle _ _ Nothing = do
showEndFail
return False
handle file (Just key) = do
Command.Add.link file key True
return Nothing
handle change file (Just key) = do
link <- Command.Add.link file key True
when DirWatcher.eventsCoalesce $ do
sha <- inRepo $
Git.HashObject.hashObject BlobObject link
stageSymlink file sha
showEndOk
return True
return $ Just change
{- Checks which of a set of files can safely be added.
- Files are locked down as hard links in a temp directory,
- with their write bits disabled. But some may have already
- been opened for write, so lsof is run on the temp directory
- to check them.
{- Check that the keysource's keyFilename still exists,
- and is still a hard link to its contentLocation,
- before ingesting it. -}
sanitycheck keysource a = do
fs <- getSymbolicLinkStatus $ keyFilename keysource
ks <- getSymbolicLinkStatus $ contentLocation keysource
if deviceID ks == deviceID fs && fileID ks == fileID fs
then a
else return Nothing
{- PendingAddChanges can Either be Right to be added now,
- or are unsafe, and must be Left for later.
-
- Check by running lsof on the temp directory, which
- the KeySources are locked down in.
-}
safeToAdd :: ThreadState -> [FilePath] -> IO [KeySource]
safeToAdd st files = do
locked <- catMaybes <$> lockdown files
runThreadState st $ do
tmpdir <- fromRepo gitAnnexTmpDir
open <- S.fromList . map fst3 . filter openwrite <$>
liftIO (Lsof.queryDir tmpdir)
catMaybes <$> forM locked (go open)
safeToAdd :: ThreadState -> [Change] -> IO [Either Change Change]
safeToAdd st changes = runThreadState st $
ifM (Annex.getState Annex.force)
( allRight changes -- force bypasses lsof check
, do
tmpdir <- fromRepo gitAnnexTmpDir
openfiles <- S.fromList . map fst3 . filter openwrite <$>
liftIO (Lsof.queryDir tmpdir)
let checked = map (check openfiles) changes
{- If new events are received when files are closed,
- there's no need to retry any changes that cannot
- be done now. -}
if DirWatcher.closingTracked
then do
mapM_ canceladd $ lefts checked
allRight $ rights checked
else return checked
)
where
go open keysource
| S.member (contentLocation keysource) open = do
warning $ keyFilename keysource
++ " still has writers, not adding"
-- remove the hard link
--_ <- liftIO $ tryIO $
-- removeFile $ contentLocation keysource
return Nothing
| otherwise = return $ Just keysource
lockdown = mapM $ \file -> do
ms <- catchMaybeIO $ getSymbolicLinkStatus file
case ms of
Just s
| isRegularFile s ->
catchMaybeIO $ runThreadState st $
Command.Add.lockDown file
_ -> return Nothing
check openfiles change@(PendingAddChange { keySource = ks })
| S.member (contentLocation ks) openfiles = Left change
check _ change = Right change
canceladd (PendingAddChange { keySource = ks }) = do
warning $ keyFilename ks
++ " still has writers, not adding"
-- remove the hard link
void $ liftIO $ tryIO $
removeFile $ contentLocation ks
canceladd _ = noop
openwrite (_file, mode, _pid) =
mode == Lsof.OpenWriteOnly || mode == Lsof.OpenReadWrite
allRight = return . map Right

View file

@ -11,7 +11,7 @@ import Common.Annex
import qualified Git.LsFiles
import Assistant.DaemonStatus
import Assistant.ThreadedMonad
import Assistant.Committer
import Assistant.Changes
import Utility.ThreadScheduler
import qualified Assistant.Watcher

View file

@ -12,15 +12,17 @@ module Assistant.Watcher where
import Common.Annex
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Committer
import Utility.ThreadScheduler
import Assistant.Changes
import Utility.DirWatcher
import Utility.Types.DirWatcher
import qualified Annex
import qualified Annex.Queue
import qualified Git.Command
import qualified Git.UpdateIndex
import qualified Git.HashObject
import qualified Git.LsFiles
import qualified Backend
import qualified Annex
import qualified Command.Add
import Annex.Content
import Annex.CatFile
import Git.Types
@ -29,24 +31,12 @@ import Control.Concurrent.STM
import Data.Bits.Utils
import qualified Data.ByteString.Lazy as L
#ifdef WITH_INOTIFY
import Utility.Inotify
import System.INotify
#endif
type Handler = FilePath -> Maybe FileStatus -> DaemonStatusHandle -> Annex (Maybe Change)
checkCanWatch :: Annex ()
checkCanWatch = do
#ifdef WITH_INOTIFY
unlessM (liftIO (inPath "lsof") <||> Annex.getState Annex.force) $
needLsof
#else
#if defined linux_HOST_OS
#warning "Building without inotify support; watch mode will be disabled."
#endif
error "watch mode is not available on this system"
#endif
checkCanWatch
| canWatch =
unlessM (liftIO (inPath "lsof") <||> Annex.getState Annex.force) $
needLsof
| otherwise = error "watch mode is not available on this system"
needLsof :: Annex ()
needLsof = error $ unlines
@ -57,22 +47,9 @@ needLsof = error $ unlines
]
watchThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO ()
#ifdef WITH_INOTIFY
watchThread st dstatus changechan = withINotify $ \i -> do
runThreadState st $
showAction "scanning"
-- This does not return until the startup scan is done.
-- That can take some time for large trees.
watchDir i "." (ignored . takeFileName) hooks
runThreadState st $
modifyDaemonStatus dstatus $ \s -> s { scanComplete = True }
-- Notice any files that were deleted before inotify
-- was started.
runThreadState st $ do
inRepo $ Git.Command.run "add" [Param "--update"]
showAction "started"
waitForTermination
watchThread st dstatus changechan = watchDir "." ignored hooks startup
where
startup = statupScan st dstatus
hook a = Just $ runHandler st dstatus changechan a
hooks = WatchHooks
{ addHook = hook onAdd
@ -81,15 +58,32 @@ watchThread st dstatus changechan = withINotify $ \i -> do
, delDirHook = hook onDelDir
, errHook = hook onErr
}
#else
watchThread = undefined
#endif
{- Initial scartup scan. The action should return once the scan is complete. -}
statupScan :: ThreadState -> DaemonStatusHandle -> IO a -> IO a
statupScan st dstatus scanner = do
runThreadState st $
showAction "scanning"
r <- scanner
runThreadState st $
modifyDaemonStatus dstatus $ \s -> s { scanComplete = True }
-- Notice any files that were deleted before watching was started.
runThreadState st $ do
inRepo $ Git.Command.run "add" [Param "--update"]
showAction "started"
return r
ignored :: FilePath -> Bool
ignored ".git" = True
ignored ".gitignore" = True
ignored ".gitattributes" = True
ignored _ = False
ignored = ig . takeFileName
where
ig ".git" = True
ig ".gitignore" = True
ig ".gitattributes" = True
ig _ = False
type Handler = FilePath -> Maybe FileStatus -> DaemonStatusHandle -> Annex (Maybe Change)
{- Runs an action handler, inside the Annex monad, and if there was a
- change, adds it to the ChangeChan.
@ -117,22 +111,27 @@ runHandler st dstatus changechan handler file filestatus = void $ do
- and only one has just closed it. We want to avoid adding a file to the
- annex that is open for write, to avoid anything being able to change it.
-
- We could run lsof on the file here to check for other writer.
- But, that's slow. Instead, a Change is returned that indicates this file
- still needs to be added. The committer will handle bundles of these
- Changes at once.
- We could run lsof on the file here to check for other writers.
- But, that's slow, and even if there is currently a writer, we will want
- to add the file *eventually*. Instead, the file is locked down as a hard
- link in a temp directory, with its write bits disabled, for later
- checking with lsof, and a Change is returned containing a KeySource
- using that hard link. The committer handles running lsof and finishing
- the add.
-}
onAdd :: Handler
onAdd file _filestatus dstatus = do
ifM (scanComplete <$> getDaemonStatus dstatus)
( go
, ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file]))
( noChange
, go
onAdd file filestatus dstatus
| maybe False isRegularFile filestatus = do
ifM (scanComplete <$> getDaemonStatus dstatus)
( go
, ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file]))
( noChange
, go
)
)
)
| otherwise = noChange
where
go = madeChange file PendingAddChange
go = pendingAddChange =<< Command.Add.lockDown file
{- A symlink might be an arbitrary symlink, which is just added.
- Or, if it is a git-annex symlink, ensure it points to the content

View file

@ -6,7 +6,6 @@
-}
module Backend (
B.KeySource(..),
list,
orderedList,
genKey,
@ -23,6 +22,7 @@ import Config
import qualified Annex
import Annex.CheckAttr
import Types.Key
import Types.KeySource
import qualified Types.Backend as B
-- When adding a new backend, import it here and add it to the list.
@ -54,12 +54,12 @@ orderedList = do
{- Generates a key for a file, trying each backend in turn until one
- accepts it.
-}
genKey :: B.KeySource -> Maybe Backend -> Annex (Maybe (Key, Backend))
genKey :: KeySource -> Maybe Backend -> Annex (Maybe (Key, Backend))
genKey source trybackend = do
bs <- orderedList
let bs' = maybe bs (: bs) trybackend
genKey' bs' source
genKey' :: [Backend] -> B.KeySource -> Annex (Maybe (Key, Backend))
genKey' :: [Backend] -> KeySource -> Annex (Maybe (Key, Backend))
genKey' [] _ = return Nothing
genKey' (b:bs) source = do
r <- B.getKey b source

View file

@ -11,6 +11,7 @@ import Common.Annex
import qualified Annex
import Types.Backend
import Types.Key
import Types.KeySource
import qualified Build.SysConfig as SysConfig
type SHASize = Int

View file

@ -10,6 +10,7 @@ module Backend.WORM (backends) where
import Common.Annex
import Types.Backend
import Types.Key
import Types.KeySource
backends :: [Backend]
backends = [backend]

View file

@ -12,6 +12,7 @@ import Annex.Exception
import Command
import qualified Annex
import qualified Annex.Queue
import Types.KeySource
import Backend
import Logs.Location
import Annex.Content
@ -97,8 +98,8 @@ undo file key e = do
src <- inRepo $ gitAnnexLocation key
liftIO $ moveFile src file
{- Creates the symlink to the annexed content. -}
link :: FilePath -> Key -> Bool -> Annex ()
{- Creates the symlink to the annexed content, returns the link target. -}
link :: FilePath -> Key -> Bool -> Annex String
link file key hascontent = handle (undo file key) $ do
l <- calcGitLink file key
liftIO $ createSymbolicLink l file
@ -112,6 +113,8 @@ link file key hascontent = handle (undo file key) $ do
mtime <- modificationTime <$> getFileStatus file
touch file (TimeSpec mtime) False
return l
{- Note: Several other commands call this, and expect it to
- create the symlink and add it. -}
cleanup :: FilePath -> Key -> Bool -> CommandCleanup

View file

@ -20,6 +20,7 @@ import Annex.Content
import Logs.Web
import qualified Option
import Types.Key
import Types.KeySource
import Config
def :: [Command]

View file

@ -11,6 +11,7 @@ import Common.Annex
import Command
import Backend
import qualified Types.Key
import Types.KeySource
import Annex.Content
import qualified Command.ReKey

View file

@ -1,13 +1,22 @@
OS:=$(shell uname | sed 's/[-_].*//')
bins=git-annex
mans=git-annex.1 git-annex-shell.1
sources=Build/SysConfig.hs Utility/Touch.hs
all=$(bins) $(mans) docs
OS:=$(shell uname | sed 's/[-_].*//')
ifeq ($(OS),Linux)
BASEFLAGS_OPTS+=-DWITH_INOTIFY
clibs=Utility/libdiskfree.o
else
BASEFLAGS_OPTS+=-DWITH_KQUEUE
clibs=Utility/libdiskfree.o Utility/libkqueue.o
endif
PREFIX=/usr
IGNORE=-ignore-package monads-fd -ignore-package monads-tf
BASEFLAGS=-Wall $(IGNORE) -outputdir tmp -IUtility -DWITH_S3 $(BASEFLAGS_OPTS)
GHCFLAGS=-O2 $(BASEFLAGS)
CFLAGS=-Wall
ifdef PROFILE
GHCFLAGS=-prof -auto-all -rtsopts -caf-all -fforce-recomp $(BASEFLAGS)
@ -15,13 +24,6 @@ endif
GHCMAKE=ghc $(GHCFLAGS) --make
bins=git-annex
mans=git-annex.1 git-annex-shell.1
sources=Build/SysConfig.hs Utility/Touch.hs
clibs=Utility/libdiskfree.o
all=$(bins) $(mans) docs
# Am I typing :make in vim? Do a fast build.
ifdef VIM
all=fast

View file

@ -10,13 +10,7 @@
module Types.Backend where
import Types.Key
{- The source used to generate a key. The location of the content
- may be different from the filename associated with the key. -}
data KeySource = KeySource
{ keyFilename :: FilePath
, contentLocation :: FilePath
}
import Types.KeySource
data BackendA a = Backend
{ name :: String

33
Types/KeySource.hs Normal file
View file

@ -0,0 +1,33 @@
{- KeySource data type
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Types.KeySource where
import Data.Ord
{- When content is in the process of being added to the annex,
- and a Key generated from it, this data type is used.
-
- The contentLocation may be different from the filename
- associated with the key. For example, the add command
- temporarily puts the content into a lockdown directory
- for checking. The migrate command uses the content
- of a different Key. -}
data KeySource = KeySource
{ keyFilename :: FilePath
, contentLocation :: FilePath
}
deriving (Show)
{- KeySources are assumed to be equal when the same filename is associated
- with the key. The contentLocation can be a random temp file.
-}
instance Eq KeySource where
x == y = keyFilename x == keyFilename y
instance Ord KeySource where
compare = comparing keyFilename

90
Utility/DirWatcher.hs Normal file
View file

@ -0,0 +1,90 @@
{- generic directory watching interface
-
- Uses either inotify or kqueue to watch a directory (and subdirectories)
- for changes, and runs hooks for different sorts of events as they occur.
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
{-# LANGUAGE CPP #-}
module Utility.DirWatcher where
import Utility.Types.DirWatcher
#if WITH_INOTIFY
import qualified Utility.INotify as INotify
import qualified System.INotify as INotify
import Utility.ThreadScheduler
#endif
#if WITH_KQUEUE
import qualified Utility.Kqueue as Kqueue
#endif
type Pruner = FilePath -> Bool
canWatch :: Bool
#if (WITH_INOTIFY || WITH_KQUEUE)
canWatch = True
#else
#if defined linux_HOST_OS
#warning "Building without inotify support"
#endif
canWatch = False
#endif
/* With inotify, discrete events will be received when making multiple changes
* to the same filename. For example, adding it, deleting it, and adding it
* again will be three events.
*
* OTOH, with kqueue, often only one event is received, indicating the most
* recent state of the file.
*/
eventsCoalesce :: Bool
#if WITH_INOTIFY
eventsCoalesce = False
#else
#if WITH_KQUEUE
eventsCoalesce = True
#else
eventsCoalesce = undefined
#endif
#endif
/* With inotify, file closing is tracked to some extent, so an add event
* will always be received for a file once its writer closes it, and
* (typically) not before. This may mean multiple add events for the same file.
*
* OTOH, with kqueue, add events will often be received while a file is
* still being written to, and then no add event will be received once the
* writer closes it.
*/
closingTracked :: Bool
#if WITH_INOTIFY
closingTracked = True
#else
#if WITH_KQUEUE
closingTracked = False
#else
closingTracked = undefined
#endif
#endif
#if WITH_INOTIFY
watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO ()
watchDir dir prune hooks runstartup = INotify.withINotify $ \i -> do
runstartup $ INotify.watchDir i dir prune hooks
waitForTermination -- Let the inotify thread run.
#else
#if WITH_KQUEUE
watchDir :: FilePath -> Pruner -> WatchHooks -> (IO Kqueue.Kqueue -> IO Kqueue.Kqueue) -> IO ()
watchDir dir ignored hooks runstartup = do
kq <- runstartup $ Kqueue.initKqueue dir ignored
Kqueue.runHooks kq hooks
#else
watchDir :: FilePath -> Pruner -> WatchHooks -> (IO () -> IO ()) -> IO ()
watchDir = undefined
#endif
#endif

View file

@ -34,7 +34,7 @@ dirCruft _ = False
dirContents :: FilePath -> IO [FilePath]
dirContents d = map (d </>) . filter (not . dirCruft) <$> getDirectoryContents d
{- Gets contents of directory, and then its subdirectories, recursively,
{- Gets files in a directory, and then its subdirectories, recursively,
- and lazily. -}
dirContentsRecursive :: FilePath -> IO [FilePath]
dirContentsRecursive topdir = dirContentsRecursive' topdir [""]

View file

@ -5,26 +5,17 @@
- Licensed under the GNU GPL version 3 or higher.
-}
module Utility.Inotify where
module Utility.INotify where
import Common hiding (isDirectory)
import Utility.ThreadLock
import Utility.Types.DirWatcher
import System.INotify
import qualified System.Posix.Files as Files
import System.IO.Error
import Control.Exception (throw)
type Hook a = Maybe (a -> Maybe FileStatus -> IO ())
data WatchHooks = WatchHooks
{ addHook :: Hook FilePath
, addSymlinkHook :: Hook FilePath
, delHook :: Hook FilePath
, delDirHook :: Hook FilePath
, errHook :: Hook String -- error message
}
{- Watches for changes to files in a directory, and all its subdirectories
- that are not ignored, using inotify. This function returns after
- its initial scan is complete, leaving a thread running. Callbacks are

248
Utility/Kqueue.hs Normal file
View file

@ -0,0 +1,248 @@
{- BSD kqueue file modification notification interface
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
{-# LANGUAGE ForeignFunctionInterface #-}
module Utility.Kqueue (
Kqueue,
initKqueue,
stopKqueue,
waitChange,
Change(..),
changedFile,
isAdd,
isDelete,
runHooks,
) where
import Common
import Utility.Types.DirWatcher
import System.Posix.Types
import Foreign.C.Types
import Foreign.C.Error
import Foreign.Ptr
import Foreign.Marshal
import qualified Data.Map as M
import qualified Data.Set as S
import qualified System.Posix.Files as Files
import Control.Concurrent
data Change
= Deleted FilePath
| Added FilePath
deriving (Show)
isAdd :: Change -> Bool
isAdd (Added _) = True
isAdd (Deleted _) = False
isDelete :: Change -> Bool
isDelete = not . isAdd
changedFile :: Change -> FilePath
changedFile (Added f) = f
changedFile (Deleted f) = f
data Kqueue = Kqueue
{ kqueueFd :: Fd
, kqueueTop :: FilePath
, kqueueMap :: DirMap
, _kqueuePruner :: Pruner
}
type Pruner = FilePath -> Bool
type DirMap = M.Map Fd DirInfo
{- A directory, and its last known contents (with filenames relative to it) -}
data DirInfo = DirInfo
{ dirName :: FilePath
, dirCache :: S.Set FilePath
}
deriving (Show)
getDirInfo :: FilePath -> IO DirInfo
getDirInfo dir = do
contents <- S.fromList . filter (not . dirCruft)
<$> getDirectoryContents dir
return $ DirInfo dir contents
{- Difference between the dirCaches of two DirInfos. -}
(//) :: DirInfo -> DirInfo -> [Change]
oldc // newc = deleted ++ added
where
deleted = calc Deleted oldc newc
added = calc Added newc oldc
calc a x y = map a . map (dirName x </>) $
S.toList $ S.difference (dirCache x) (dirCache y)
{- Builds a map of directories in a tree, possibly pruning some.
- Opens each directory in the tree, and records its current contents. -}
scanRecursive :: FilePath -> Pruner -> IO DirMap
scanRecursive topdir prune = M.fromList <$> walk [] [topdir]
where
walk c [] = return c
walk c (dir:rest)
| prune dir = walk c rest
| otherwise = do
minfo <- catchMaybeIO $ getDirInfo dir
case minfo of
Nothing -> walk c rest
Just info -> do
mfd <- catchMaybeIO $
openFd dir ReadOnly Nothing defaultFileFlags
case mfd of
Nothing -> walk c rest
Just fd -> do
let subdirs = map (dir </>) $
S.toList $ dirCache info
walk ((fd, info):c) (subdirs ++ rest)
{- Adds a list of subdirectories (and all their children), unless pruned to a
- directory map. Adding a subdirectory that's already in the map will
- cause its contents to be refreshed. -}
addSubDirs :: DirMap -> Pruner -> [FilePath] -> IO DirMap
addSubDirs dirmap prune dirs = do
newmap <- foldr M.union M.empty <$>
mapM (\d -> scanRecursive d prune) dirs
return $ M.union newmap dirmap -- prefer newmap
{- Removes a subdirectory (and all its children) from a directory map. -}
removeSubDir :: DirMap -> FilePath -> IO DirMap
removeSubDir dirmap dir = do
mapM_ closeFd $ M.keys toremove
return rest
where
(toremove, rest) = M.partition (dirContains dir . dirName) dirmap
findDirContents :: DirMap -> FilePath -> [FilePath]
findDirContents dirmap dir = concatMap absolutecontents $ search
where
absolutecontents i = map (dirName i </>) (S.toList $ dirCache i)
search = map snd $ M.toList $
M.filter (\i -> dirName i == dir) dirmap
foreign import ccall unsafe "libkqueue.h init_kqueue" c_init_kqueue
:: IO Fd
foreign import ccall unsafe "libkqueue.h addfds_kqueue" c_addfds_kqueue
:: Fd -> CInt -> Ptr Fd -> IO ()
foreign import ccall unsafe "libkqueue.h waitchange_kqueue" c_waitchange_kqueue
:: Fd -> IO Fd
{- Initializes a Kqueue to watch a directory, and all its subdirectories. -}
initKqueue :: FilePath -> Pruner -> IO Kqueue
initKqueue dir pruned = do
dirmap <- scanRecursive dir pruned
h <- c_init_kqueue
let kq = Kqueue h dir dirmap pruned
updateKqueue kq
return kq
{- Updates a Kqueue, adding watches for its map. -}
updateKqueue :: Kqueue -> IO ()
updateKqueue (Kqueue h _ dirmap _) =
withArrayLen (M.keys dirmap) $ \fdcnt c_fds -> do
c_addfds_kqueue h (fromIntegral fdcnt) c_fds
{- Stops a Kqueue. Note: Does not directly close the Fds in the dirmap,
- so it can be reused. -}
stopKqueue :: Kqueue -> IO ()
stopKqueue = closeFd . kqueueFd
{- Waits for a change on a Kqueue.
- May update the Kqueue.
-}
waitChange :: Kqueue -> IO (Kqueue, [Change])
waitChange kq@(Kqueue h _ dirmap _) = do
changedfd <- c_waitchange_kqueue h
if changedfd == -1
then ifM ((==) eINTR <$> getErrno)
(yield >> waitChange kq, nochange)
else case M.lookup changedfd dirmap of
Nothing -> nochange
Just info -> handleChange kq changedfd info
where
nochange = return (kq, [])
{- The kqueue interface does not tell what type of change took place in
- the directory; it could be an added file, a deleted file, a renamed
- file, a new subdirectory, or a deleted subdirectory, or a moved
- subdirectory.
-
- So to determine this, the contents of the directory are compared
- with its last cached contents. The Kqueue is updated to watch new
- directories as necessary.
-}
handleChange :: Kqueue -> Fd -> DirInfo -> IO (Kqueue, [Change])
handleChange kq@(Kqueue _ _ dirmap pruner) fd olddirinfo =
go =<< catchMaybeIO (getDirInfo $ dirName olddirinfo)
where
go (Just newdirinfo) = do
let changes = olddirinfo // newdirinfo
let (added, deleted) = partition isAdd changes
-- Scan newly added directories to add to the map.
-- (Newly added files will fail getDirInfo.)
newdirinfos <- catMaybes <$>
mapM (catchMaybeIO . getDirInfo . changedFile) added
newmap <- addSubDirs dirmap pruner $ map dirName newdirinfos
-- Remove deleted directories from the map.
newmap' <- foldM removeSubDir newmap (map changedFile deleted)
-- Update the cached dirinfo just looked up.
let newmap'' = M.insertWith' const fd newdirinfo newmap'
-- When new directories were added, need to update
-- the kqueue to watch them.
let kq' = kq { kqueueMap = newmap'' }
unless (null newdirinfos) $
updateKqueue kq'
return (kq', changes)
go Nothing = do
-- The directory has been moved or deleted, so
-- remove it from our map.
newmap <- removeSubDir dirmap (dirName olddirinfo)
return (kq { kqueueMap = newmap }, [])
{- Processes changes on the Kqueue, calling the hooks as appropriate.
- Never returns. -}
runHooks :: Kqueue -> WatchHooks -> IO ()
runHooks kq hooks = do
-- First, synthetic add events for the whole directory tree contents,
-- to catch any files created beforehand.
recursiveadd (kqueueMap kq) (Added $ kqueueTop kq)
loop kq
where
loop q = do
(q', changes) <- waitChange q
forM_ changes $ dispatch (kqueueMap q')
loop q'
-- Kqueue returns changes for both whole directories
-- being added and deleted, and individual files being
-- added and deleted.
dispatch dirmap change
| isAdd change = withstatus change $ dispatchadd dirmap
| otherwise = callhook delDirHook Nothing change
dispatchadd dirmap change s
| Files.isSymbolicLink s =
callhook addSymlinkHook (Just s) change
| Files.isDirectory s = recursiveadd dirmap change
| Files.isRegularFile s =
callhook addHook (Just s) change
| otherwise = noop
recursiveadd dirmap change = do
let contents = findDirContents dirmap $ changedFile change
forM_ contents $ \f ->
withstatus (Added f) $ dispatchadd dirmap
callhook h s change = case h hooks of
Nothing -> noop
Just a -> a (changedFile change) s
withstatus change a = maybe noop (a change) =<<
(catchMaybeIO (getSymbolicLinkStatus (changedFile change)))

View file

@ -0,0 +1,22 @@
{- generic directory watching types
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
{-# LANGUAGE CPP #-}
module Utility.Types.DirWatcher where
import Common
type Hook a = Maybe (a -> Maybe FileStatus -> IO ())
data WatchHooks = WatchHooks
{ addHook :: Hook FilePath
, addSymlinkHook :: Hook FilePath
, delHook :: Hook FilePath
, delDirHook :: Hook FilePath
, errHook :: Hook String -- error message
}

73
Utility/libkqueue.c Normal file
View file

@ -0,0 +1,73 @@
/* kqueue interface, C mini-library
*
* Copyright 2012 Joey Hess <joey@kitenet.net>
*
* Licensed under the GNU GPL version 3 or higher.
*/
#include <stdio.h>
#include <dirent.h>
#include <fcntl.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/event.h>
#include <sys/time.h>
#include <errno.h>
/* The specified fds are added to the set of fds being watched for changes.
* Fds passed to prior calls still take effect, so it's most efficient to
* not pass the same fds repeatedly.
*
* Returns the fd that changed, or -1 on error.
*/
signed int helper(const int kq, const int fdcnt, const int *fdlist, int nodelay) {
int i, nev;
struct kevent evlist[1];
struct kevent chlist[fdcnt];
struct timespec avoiddelay = {0, 0};
struct timespec *timeout = nodelay ? &avoiddelay : NULL;
for (i = 0; i < fdcnt; i++) {
EV_SET(&chlist[i], fdlist[i], EVFILT_VNODE,
EV_ADD | EV_ENABLE | EV_CLEAR,
NOTE_WRITE,
0, 0);
}
nev = kevent(kq, chlist, fdcnt, evlist, 1, timeout);
if (nev == 1)
return evlist[0].ident;
else
return -1;
}
/* Initializes a new, empty kqueue. */
int init_kqueue() {
int kq;
if ((kq = kqueue()) == -1) {
perror("kqueue");
exit(1);
}
return kq;
}
/* Adds fds to the set that should be watched. */
void addfds_kqueue(const int kq, const int fdcnt, const int *fdlist) {
helper(kq, fdcnt, fdlist, 1);
}
/* Waits for a change event on a kqueue. */
signed int waitchange_kqueue(const int kq) {
return helper(kq, 0, NULL, 0);
}
/*
main () {
int list[1];
int kq;
list[0]=open(".", O_RDONLY);
kq = init_kqueue();
addfds_kqueue(kq, 1, list)
printf("change: %i\n", waitchange_kqueue(kq));
}
*/

3
Utility/libkqueue.h Normal file
View file

@ -0,0 +1,3 @@
int init_kqueue();
void addfds_kqueue(const int kq, const int fdcnt, const int *fdlist);
signed int waitchange_kqueue(const int kq);

9
debian/changelog vendored
View file

@ -1,9 +1,10 @@
git-annex (3.20120616) UNRELEASED; urgency=low
* watch: New subcommand, which uses inotify to watch for changes to
files and automatically annexes new files, etc, so you don't need
to manually run git commands when manipulating files.
* Enable diskfree on kfreebsd, using statvfs.
* watch: New subcommand, a daemon which notices changes to
files and automatically annexes new files, etc, so you don't
need to manually run git commands when manipulating files.
Available on Linux, BSDs, and OSX!
* Enable diskfree on kfreebsd, using kqueue.
* unused: Fix crash when key names contain invalid utf8.
-- Joey Hess <joeyh@debian.org> Tue, 12 Jun 2012 11:35:59 -0400

4
debian/control vendored
View file

@ -41,8 +41,8 @@ Depends: ${misc:Depends}, ${shlibs:Depends},
uuid,
rsync,
wget | curl,
openssh-client (>= 1:5.6p1),
lsof
openssh-client (>= 1:5.6p1)
Recommends: lsof
Suggests: graphviz, bup, gnupg
Description: manage files with git, without checking their contents into git
git-annex allows managing files with git, without checking the file