reorganize
This commit is contained in:
parent
c31ddeda84
commit
ccc5005245
6 changed files with 463 additions and 382 deletions
74
Assistant.hs
Normal file
74
Assistant.hs
Normal file
|
@ -0,0 +1,74 @@
|
||||||
|
{- git-annex assistant daemon
|
||||||
|
-
|
||||||
|
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
||||||
|
-
|
||||||
|
- Licensed under the GNU GPL version 3 or higher.
|
||||||
|
-
|
||||||
|
- Overview of threads and MVars, etc:
|
||||||
|
-
|
||||||
|
- Thread 1: parent
|
||||||
|
- The initial thread run, double forks to background, starts other
|
||||||
|
- threads, and then stops, waiting for them to terminate,
|
||||||
|
- or for a ctrl-c.
|
||||||
|
- Thread 2: inotify
|
||||||
|
- Notices new files, and calls handlers for events, queuing changes.
|
||||||
|
- Thread 3: inotify internal
|
||||||
|
- Used by haskell inotify library to ensure inotify event buffer is
|
||||||
|
- kept drained.
|
||||||
|
- Thread 4: inotify initial scan
|
||||||
|
- A MVar lock is used to prevent other inotify handlers from running
|
||||||
|
- until this is complete.
|
||||||
|
- Thread 5: committer
|
||||||
|
- Waits for changes to occur, and runs the git queue to update its
|
||||||
|
- index, then commits.
|
||||||
|
- Thread 6: status logger
|
||||||
|
- Wakes up periodically and records the daemon's status to disk.
|
||||||
|
-
|
||||||
|
- ThreadState: (MVar)
|
||||||
|
- The Annex state is stored here, which allows resuscitating the
|
||||||
|
- Annex monad in IO actions run by the inotify and committer
|
||||||
|
- threads. Thus, a single state is shared amoung the threads, and
|
||||||
|
- only one at a time can access it.
|
||||||
|
- DaemonStatusHandle: (MVar)
|
||||||
|
- The daemon's current status. This MVar should only be manipulated
|
||||||
|
- from inside the Annex monad, which ensures it's accessed only
|
||||||
|
- after the ThreadState MVar.
|
||||||
|
- ChangeChan: (STM TChan)
|
||||||
|
- Changes are indicated by writing to this channel. The committer
|
||||||
|
- reads from it.
|
||||||
|
-}
|
||||||
|
|
||||||
|
module Assistant where
|
||||||
|
|
||||||
|
import Common.Annex
|
||||||
|
import Assistant.ThreadedMonad
|
||||||
|
import Assistant.DaemonStatus
|
||||||
|
import Assistant.Watcher
|
||||||
|
import Assistant.Committer
|
||||||
|
import qualified Utility.Daemon
|
||||||
|
import Utility.LogFile
|
||||||
|
|
||||||
|
import Control.Concurrent
|
||||||
|
|
||||||
|
startDaemon :: Bool -> Annex ()
|
||||||
|
startDaemon foreground
|
||||||
|
| foreground = do
|
||||||
|
showStart "watch" "."
|
||||||
|
go id
|
||||||
|
| otherwise = do
|
||||||
|
logfd <- liftIO . openLog =<< fromRepo gitAnnexLogFile
|
||||||
|
pidfile <- fromRepo gitAnnexPidFile
|
||||||
|
go $ Utility.Daemon.daemonize logfd (Just pidfile) False
|
||||||
|
where
|
||||||
|
go a = withThreadState $ \st -> liftIO $ a $ do
|
||||||
|
dstatus <- startDaemonStatus
|
||||||
|
changechan <- newChangeChan
|
||||||
|
-- The commit thread is started early, so that the user
|
||||||
|
-- can immediately begin adding files and having them
|
||||||
|
-- committed, even while the startup scan is taking
|
||||||
|
-- place.
|
||||||
|
_ <- forkIO $ commitThread st changechan
|
||||||
|
watchThread st dstatus changechan
|
||||||
|
|
||||||
|
stopDaemon :: Annex ()
|
||||||
|
stopDaemon = liftIO . Utility.Daemon.stopDaemon =<< fromRepo gitAnnexPidFile
|
104
Assistant/Committer.hs
Normal file
104
Assistant/Committer.hs
Normal file
|
@ -0,0 +1,104 @@
|
||||||
|
{- git-annex assistant change tracking and committing
|
||||||
|
-
|
||||||
|
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
||||||
|
-}
|
||||||
|
|
||||||
|
module Assistant.Committer where
|
||||||
|
|
||||||
|
import Common.Annex
|
||||||
|
import Assistant.ThreadedMonad
|
||||||
|
import qualified Annex.Queue
|
||||||
|
import qualified Git.Command
|
||||||
|
|
||||||
|
import Control.Concurrent
|
||||||
|
import Control.Concurrent.STM
|
||||||
|
import Data.Time.Clock
|
||||||
|
|
||||||
|
type ChangeChan = TChan Change
|
||||||
|
|
||||||
|
data Change = Change
|
||||||
|
{ changeTime :: UTCTime
|
||||||
|
, changeFile :: FilePath
|
||||||
|
, changeDesc :: String
|
||||||
|
}
|
||||||
|
deriving (Show)
|
||||||
|
|
||||||
|
runChangeChan :: STM a -> IO a
|
||||||
|
runChangeChan = atomically
|
||||||
|
|
||||||
|
newChangeChan :: IO ChangeChan
|
||||||
|
newChangeChan = atomically newTChan
|
||||||
|
|
||||||
|
{- Handlers call this when they made a change that needs to get committed. -}
|
||||||
|
madeChange :: FilePath -> String -> Annex (Maybe Change)
|
||||||
|
madeChange file desc = do
|
||||||
|
-- Just in case the commit thread is not flushing the queue fast enough.
|
||||||
|
Annex.Queue.flushWhenFull
|
||||||
|
liftIO $ Just <$> (Change <$> getCurrentTime <*> pure file <*> pure desc)
|
||||||
|
|
||||||
|
noChange :: Annex (Maybe Change)
|
||||||
|
noChange = return Nothing
|
||||||
|
|
||||||
|
{- Gets all unhandled changes.
|
||||||
|
- Blocks until at least one change is made. -}
|
||||||
|
getChanges :: ChangeChan -> IO [Change]
|
||||||
|
getChanges chan = runChangeChan $ do
|
||||||
|
c <- readTChan chan
|
||||||
|
go [c]
|
||||||
|
where
|
||||||
|
go l = do
|
||||||
|
v <- tryReadTChan chan
|
||||||
|
case v of
|
||||||
|
Nothing -> return l
|
||||||
|
Just c -> go (c:l)
|
||||||
|
|
||||||
|
{- Puts unhandled changes back into the channel.
|
||||||
|
- Note: Original order is not preserved. -}
|
||||||
|
refillChanges :: ChangeChan -> [Change] -> IO ()
|
||||||
|
refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs
|
||||||
|
|
||||||
|
{- This thread makes git commits at appropriate times. -}
|
||||||
|
commitThread :: ThreadState -> ChangeChan -> IO ()
|
||||||
|
commitThread st changechan = forever $ do
|
||||||
|
-- First, a simple rate limiter.
|
||||||
|
threadDelay oneSecond
|
||||||
|
-- Next, wait until at least one change has been made.
|
||||||
|
cs <- getChanges changechan
|
||||||
|
-- Now see if now's a good time to commit.
|
||||||
|
time <- getCurrentTime
|
||||||
|
if shouldCommit time cs
|
||||||
|
then void $ tryIO $ runThreadState st commitStaged
|
||||||
|
else refillChanges changechan cs
|
||||||
|
where
|
||||||
|
oneSecond = 1000000 -- microseconds
|
||||||
|
|
||||||
|
commitStaged :: Annex ()
|
||||||
|
commitStaged = do
|
||||||
|
Annex.Queue.flush
|
||||||
|
inRepo $ Git.Command.run "commit"
|
||||||
|
[ Param "--allow-empty-message"
|
||||||
|
, Param "-m", Param ""
|
||||||
|
-- Empty commits may be made if tree changes cancel
|
||||||
|
-- each other out, etc
|
||||||
|
, Param "--allow-empty"
|
||||||
|
-- Avoid running the usual git-annex pre-commit hook;
|
||||||
|
-- watch does the same symlink fixing, and we don't want
|
||||||
|
-- to deal with unlocked files in these commits.
|
||||||
|
, Param "--quiet"
|
||||||
|
]
|
||||||
|
|
||||||
|
{- Decide if now is a good time to make a commit.
|
||||||
|
- Note that the list of change times has an undefined order.
|
||||||
|
-
|
||||||
|
- Current strategy: If there have been 10 commits within the past second,
|
||||||
|
- a batch activity is taking place, so wait for later.
|
||||||
|
-}
|
||||||
|
shouldCommit :: UTCTime -> [Change] -> Bool
|
||||||
|
shouldCommit now changes
|
||||||
|
| len == 0 = False
|
||||||
|
| len > 10000 = True -- avoid bloating queue too much
|
||||||
|
| length (filter thisSecond changes) < 10 = True
|
||||||
|
| otherwise = False -- batch activity
|
||||||
|
where
|
||||||
|
len = length changes
|
||||||
|
thisSecond c = now `diffUTCTime` changeTime c <= 1
|
35
Assistant/DaemonStatus.hs
Normal file
35
Assistant/DaemonStatus.hs
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
{- git-annex assistant daemon status
|
||||||
|
-
|
||||||
|
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
||||||
|
-}
|
||||||
|
|
||||||
|
module Assistant.DaemonStatus where
|
||||||
|
|
||||||
|
import Common.Annex
|
||||||
|
|
||||||
|
import Control.Concurrent
|
||||||
|
import System.Posix.Types
|
||||||
|
|
||||||
|
data DaemonStatus = DaemonStatus
|
||||||
|
-- False when the daemon is performing its startup scan
|
||||||
|
{ scanComplete :: Bool
|
||||||
|
-- Time when a previous process of the daemon was running ok
|
||||||
|
, lastRunning :: Maybe EpochTime
|
||||||
|
}
|
||||||
|
|
||||||
|
type DaemonStatusHandle = MVar DaemonStatus
|
||||||
|
|
||||||
|
newDaemonStatus :: DaemonStatus
|
||||||
|
newDaemonStatus = DaemonStatus
|
||||||
|
{ scanComplete = False
|
||||||
|
, lastRunning = Nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
startDaemonStatus :: IO DaemonStatusHandle
|
||||||
|
startDaemonStatus = newMVar newDaemonStatus
|
||||||
|
|
||||||
|
getDaemonStatus :: DaemonStatusHandle -> Annex DaemonStatus
|
||||||
|
getDaemonStatus = liftIO . readMVar
|
||||||
|
|
||||||
|
modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> Annex ()
|
||||||
|
modifyDaemonStatus status a = liftIO $ modifyMVar_ status (return . a)
|
40
Assistant/ThreadedMonad.hs
Normal file
40
Assistant/ThreadedMonad.hs
Normal file
|
@ -0,0 +1,40 @@
|
||||||
|
{- making the Annex monad available across threads
|
||||||
|
-
|
||||||
|
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
||||||
|
-}
|
||||||
|
|
||||||
|
{-# LANGUAGE BangPatterns #-}
|
||||||
|
|
||||||
|
module Assistant.ThreadedMonad where
|
||||||
|
|
||||||
|
import Common.Annex
|
||||||
|
import qualified Annex
|
||||||
|
|
||||||
|
import Control.Concurrent
|
||||||
|
|
||||||
|
{- The Annex state is stored in a MVar, so that threaded actions can access
|
||||||
|
- it. -}
|
||||||
|
type ThreadState = MVar Annex.AnnexState
|
||||||
|
|
||||||
|
{- Stores the Annex state in a MVar.
|
||||||
|
-
|
||||||
|
- Once the action is finished, retrieves the state from the MVar.
|
||||||
|
-}
|
||||||
|
withThreadState :: (ThreadState -> Annex a) -> Annex a
|
||||||
|
withThreadState a = do
|
||||||
|
state <- Annex.getState id
|
||||||
|
mvar <- liftIO $ newMVar state
|
||||||
|
r <- a mvar
|
||||||
|
newstate <- liftIO $ takeMVar mvar
|
||||||
|
Annex.changeState (const newstate)
|
||||||
|
return r
|
||||||
|
|
||||||
|
{- Runs an Annex action, using the state from the MVar.
|
||||||
|
-
|
||||||
|
- This serializes calls by threads. -}
|
||||||
|
runThreadState :: ThreadState -> Annex a -> IO a
|
||||||
|
runThreadState mvar a = do
|
||||||
|
startstate <- takeMVar mvar
|
||||||
|
!(r, newstate) <- Annex.run startstate a
|
||||||
|
putMVar mvar newstate
|
||||||
|
return r
|
206
Assistant/Watcher.hs
Normal file
206
Assistant/Watcher.hs
Normal file
|
@ -0,0 +1,206 @@
|
||||||
|
{- git-annex assistant tree watcher
|
||||||
|
-
|
||||||
|
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
||||||
|
-
|
||||||
|
- Licensed under the GNU GPL version 3 or higher.
|
||||||
|
-}
|
||||||
|
|
||||||
|
{-# LANGUAGE CPP #-}
|
||||||
|
|
||||||
|
module Assistant.Watcher where
|
||||||
|
|
||||||
|
import Common.Annex
|
||||||
|
import Assistant.ThreadedMonad
|
||||||
|
import Assistant.DaemonStatus
|
||||||
|
import Assistant.Committer
|
||||||
|
import Utility.ThreadLock
|
||||||
|
import qualified Annex.Queue
|
||||||
|
import qualified Command.Add
|
||||||
|
import qualified Git.Command
|
||||||
|
import qualified Git.UpdateIndex
|
||||||
|
import qualified Git.HashObject
|
||||||
|
import qualified Git.LsFiles
|
||||||
|
import qualified Backend
|
||||||
|
import Annex.Content
|
||||||
|
import Annex.CatFile
|
||||||
|
import Git.Types
|
||||||
|
|
||||||
|
import Control.Concurrent.STM
|
||||||
|
import Data.Bits.Utils
|
||||||
|
import qualified Data.ByteString.Lazy as L
|
||||||
|
|
||||||
|
#if defined linux_HOST_OS
|
||||||
|
import Utility.Inotify
|
||||||
|
import System.INotify
|
||||||
|
#endif
|
||||||
|
|
||||||
|
type Handler = FilePath -> Maybe FileStatus -> DaemonStatusHandle -> Annex (Maybe Change)
|
||||||
|
|
||||||
|
watchThread :: ThreadState -> DaemonStatusHandle -> ChangeChan -> IO ()
|
||||||
|
#if defined linux_HOST_OS
|
||||||
|
watchThread st dstatus changechan = withINotify $ \i -> do
|
||||||
|
runThreadState st $
|
||||||
|
showAction "scanning"
|
||||||
|
-- This does not return until the startup scan is done.
|
||||||
|
-- That can take some time for large trees.
|
||||||
|
watchDir i "." (ignored . takeFileName) hooks
|
||||||
|
runThreadState st $
|
||||||
|
modifyDaemonStatus dstatus $ \s -> s { scanComplete = True }
|
||||||
|
-- Notice any files that were deleted before inotify
|
||||||
|
-- was started.
|
||||||
|
runThreadState st $ do
|
||||||
|
inRepo $ Git.Command.run "add" [Param "--update"]
|
||||||
|
showAction "started"
|
||||||
|
waitForTermination
|
||||||
|
where
|
||||||
|
hook a = Just $ runHandler st dstatus changechan a
|
||||||
|
hooks = WatchHooks
|
||||||
|
{ addHook = hook onAdd
|
||||||
|
, delHook = hook onDel
|
||||||
|
, addSymlinkHook = hook onAddSymlink
|
||||||
|
, delDirHook = hook onDelDir
|
||||||
|
, errHook = hook onErr
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
watchThread = error "so far only available on Linux"
|
||||||
|
#endif
|
||||||
|
|
||||||
|
ignored :: FilePath -> Bool
|
||||||
|
ignored ".git" = True
|
||||||
|
ignored ".gitignore" = True
|
||||||
|
ignored ".gitattributes" = True
|
||||||
|
ignored _ = False
|
||||||
|
|
||||||
|
{- Runs an action handler, inside the Annex monad, and if there was a
|
||||||
|
- change, adds it to the ChangeChan.
|
||||||
|
-
|
||||||
|
- Exceptions are ignored, otherwise a whole watcher thread could be crashed.
|
||||||
|
-}
|
||||||
|
runHandler :: ThreadState -> DaemonStatusHandle -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO ()
|
||||||
|
runHandler st dstatus changechan handler file filestatus = void $ do
|
||||||
|
r <- tryIO go
|
||||||
|
case r of
|
||||||
|
Left e -> print e
|
||||||
|
Right Nothing -> noop
|
||||||
|
Right (Just change) -> void $
|
||||||
|
runChangeChan $ writeTChan changechan change
|
||||||
|
where
|
||||||
|
go = runThreadState st $ handler file filestatus dstatus
|
||||||
|
|
||||||
|
{- Adding a file is tricky; the file has to be replaced with a symlink
|
||||||
|
- but this is race prone, as the symlink could be changed immediately
|
||||||
|
- after creation. To avoid that race, git add is not used to stage the
|
||||||
|
- symlink.
|
||||||
|
-
|
||||||
|
- Inotify will notice the new symlink, so this Handler does not stage it
|
||||||
|
- or return a Change, leaving that to onAddSymlink.
|
||||||
|
-
|
||||||
|
- During initial directory scan, this will be run for any files that
|
||||||
|
- are already checked into git. We don't want to turn those into symlinks,
|
||||||
|
- so do a check. This is rather expensive, but only happens during
|
||||||
|
- startup.
|
||||||
|
-}
|
||||||
|
onAdd :: Handler
|
||||||
|
onAdd file _filestatus dstatus = do
|
||||||
|
ifM (scanComplete <$> getDaemonStatus dstatus)
|
||||||
|
( go
|
||||||
|
, ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file]))
|
||||||
|
( noChange
|
||||||
|
, go
|
||||||
|
)
|
||||||
|
)
|
||||||
|
where
|
||||||
|
go = do
|
||||||
|
showStart "add" file
|
||||||
|
handle =<< Command.Add.ingest file
|
||||||
|
noChange
|
||||||
|
handle Nothing = showEndFail
|
||||||
|
handle (Just key) = do
|
||||||
|
Command.Add.link file key True
|
||||||
|
showEndOk
|
||||||
|
|
||||||
|
{- A symlink might be an arbitrary symlink, which is just added.
|
||||||
|
- Or, if it is a git-annex symlink, ensure it points to the content
|
||||||
|
- before adding it.
|
||||||
|
-}
|
||||||
|
onAddSymlink :: Handler
|
||||||
|
onAddSymlink file filestatus dstatus = go =<< Backend.lookupFile file
|
||||||
|
where
|
||||||
|
go (Just (key, _)) = do
|
||||||
|
link <- calcGitLink file key
|
||||||
|
ifM ((==) link <$> liftIO (readSymbolicLink file))
|
||||||
|
( ensurestaged link =<< getDaemonStatus dstatus
|
||||||
|
, do
|
||||||
|
liftIO $ removeFile file
|
||||||
|
liftIO $ createSymbolicLink link file
|
||||||
|
addlink link
|
||||||
|
)
|
||||||
|
go Nothing = do -- other symlink
|
||||||
|
link <- liftIO (readSymbolicLink file)
|
||||||
|
ensurestaged link =<< getDaemonStatus dstatus
|
||||||
|
|
||||||
|
{- This is often called on symlinks that are already
|
||||||
|
- staged correctly. A symlink may have been deleted
|
||||||
|
- and being re-added, or added when the watcher was
|
||||||
|
- not running. So they're normally restaged to make sure.
|
||||||
|
-
|
||||||
|
- As an optimisation, during the status scan, avoid
|
||||||
|
- restaging everything. Only links that were created since
|
||||||
|
- the last time the daemon was running are staged.
|
||||||
|
- (If the daemon has never ran before, avoid staging
|
||||||
|
- links too.)
|
||||||
|
-}
|
||||||
|
ensurestaged link daemonstatus
|
||||||
|
| scanComplete daemonstatus = addlink link
|
||||||
|
| otherwise = case filestatus of
|
||||||
|
Just s
|
||||||
|
| safe (statusChangeTime s) -> noChange
|
||||||
|
_ -> addlink link
|
||||||
|
where
|
||||||
|
safe t = maybe True (> t) (lastRunning daemonstatus)
|
||||||
|
|
||||||
|
{- For speed, tries to reuse the existing blob for
|
||||||
|
- the symlink target. -}
|
||||||
|
addlink link = do
|
||||||
|
v <- catObjectDetails $ Ref $ ':':file
|
||||||
|
case v of
|
||||||
|
Just (currlink, sha)
|
||||||
|
| s2w8 link == L.unpack currlink ->
|
||||||
|
stageSymlink file sha
|
||||||
|
_ -> do
|
||||||
|
sha <- inRepo $
|
||||||
|
Git.HashObject.hashObject BlobObject link
|
||||||
|
stageSymlink file sha
|
||||||
|
madeChange file "link"
|
||||||
|
|
||||||
|
onDel :: Handler
|
||||||
|
onDel file _ _dstatus = do
|
||||||
|
Annex.Queue.addUpdateIndex =<<
|
||||||
|
inRepo (Git.UpdateIndex.unstageFile file)
|
||||||
|
madeChange file "rm"
|
||||||
|
|
||||||
|
{- A directory has been deleted, or moved, so tell git to remove anything
|
||||||
|
- that was inside it from its cache. Since it could reappear at any time,
|
||||||
|
- use --cached to only delete it from the index.
|
||||||
|
-
|
||||||
|
- Note: This could use unstageFile, but would need to run another git
|
||||||
|
- command to get the recursive list of files in the directory, so rm is
|
||||||
|
- just as good. -}
|
||||||
|
onDelDir :: Handler
|
||||||
|
onDelDir dir _ _dstatus = do
|
||||||
|
Annex.Queue.addCommand "rm"
|
||||||
|
[Params "--quiet -r --cached --ignore-unmatch --"] [dir]
|
||||||
|
madeChange dir "rmdir"
|
||||||
|
|
||||||
|
{- Called when there's an error with inotify. -}
|
||||||
|
onErr :: Handler
|
||||||
|
onErr msg _ _dstatus = do
|
||||||
|
warning msg
|
||||||
|
return Nothing
|
||||||
|
|
||||||
|
{- Adds a symlink to the index, without ever accessing the actual symlink
|
||||||
|
- on disk. -}
|
||||||
|
stageSymlink :: FilePath -> Sha -> Annex ()
|
||||||
|
stageSymlink file sha =
|
||||||
|
Annex.Queue.addUpdateIndex =<<
|
||||||
|
inRepo (Git.UpdateIndex.stageSymlink file sha)
|
386
Command/Watch.hs
386
Command/Watch.hs
|
@ -1,108 +1,20 @@
|
||||||
{-# LANGUAGE CPP #-}
|
{-# LANGUAGE CPP #-}
|
||||||
{-# LANGUAGE BangPatterns #-}
|
{-# LANGUAGE BangPatterns #-}
|
||||||
|
|
||||||
{- git-annex watch daemon
|
{- git-annex watch command
|
||||||
-
|
-
|
||||||
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
||||||
-
|
-
|
||||||
- Licensed under the GNU GPL version 3 or higher.
|
- Licensed under the GNU GPL version 3 or higher.
|
||||||
-
|
|
||||||
- Overview of threads and MVars, etc:
|
|
||||||
-
|
|
||||||
- Thread 1: parent
|
|
||||||
- The initial thread run, double forks to background, starts other
|
|
||||||
- threads, and then stops, waiting for them to terminate,
|
|
||||||
- or for a ctrl-c.
|
|
||||||
- Thread 2: inotify
|
|
||||||
- Notices new files, and calls handlers for events, queuing changes.
|
|
||||||
- Thread 3: inotify internal
|
|
||||||
- Used by haskell inotify library to ensure inotify event buffer is
|
|
||||||
- kept drained.
|
|
||||||
- Thread 4: inotify initial scan
|
|
||||||
- A MVar lock is used to prevent other inotify handlers from running
|
|
||||||
- until this is complete.
|
|
||||||
- Thread 5: committer
|
|
||||||
- Waits for changes to occur, and runs the git queue to update its
|
|
||||||
- index, then commits.
|
|
||||||
- Thread 6: status logger
|
|
||||||
- Wakes up periodically and records the daemon's status to disk.
|
|
||||||
-
|
|
||||||
- State MVar:
|
|
||||||
- The Annex state is stored here, which allows resuscitating the
|
|
||||||
- Annex monad in IO actions run by the inotify and committer
|
|
||||||
- threads. Thus, a single state is shared amoung the threads, and
|
|
||||||
- only one at a time can access it.
|
|
||||||
- DaemonStatus MVar:
|
|
||||||
- The daemon's current status. This MVar should only be manipulated
|
|
||||||
- from inside the Annex monad, which ensures it's accessed only
|
|
||||||
- after the State MVar.
|
|
||||||
- ChangeChan STM TChan:
|
|
||||||
- Changes are indicated by writing to this channel. The committer
|
|
||||||
- reads from it.
|
|
||||||
-}
|
-}
|
||||||
|
|
||||||
module Command.Watch where
|
module Command.Watch where
|
||||||
|
|
||||||
import Common.Annex
|
import Common.Annex
|
||||||
|
import Assistant
|
||||||
import Command
|
import Command
|
||||||
import Utility.Daemon
|
|
||||||
import Utility.LogFile
|
|
||||||
import Utility.ThreadLock
|
|
||||||
import qualified Annex
|
|
||||||
import qualified Annex.Queue
|
|
||||||
import qualified Command.Add
|
|
||||||
import qualified Git.Command
|
|
||||||
import qualified Git.UpdateIndex
|
|
||||||
import qualified Git.HashObject
|
|
||||||
import qualified Git.LsFiles
|
|
||||||
import qualified Backend
|
|
||||||
import Annex.Content
|
|
||||||
import Annex.CatFile
|
|
||||||
import Git.Types
|
|
||||||
import Option
|
import Option
|
||||||
|
|
||||||
import Control.Concurrent
|
|
||||||
import Control.Concurrent.STM
|
|
||||||
import Data.Time.Clock
|
|
||||||
import Data.Bits.Utils
|
|
||||||
import System.Posix.Types
|
|
||||||
import qualified Data.ByteString.Lazy as L
|
|
||||||
|
|
||||||
#if defined linux_HOST_OS
|
|
||||||
import Utility.Inotify
|
|
||||||
import System.INotify
|
|
||||||
#endif
|
|
||||||
|
|
||||||
data DaemonStatus = DaemonStatus
|
|
||||||
-- False when the daemon is performing its startup scan
|
|
||||||
{ scanComplete :: Bool
|
|
||||||
-- Time when a previous process of the daemon was running ok
|
|
||||||
, lastRunning :: Maybe EpochTime
|
|
||||||
}
|
|
||||||
|
|
||||||
newDaemonStatus :: Annex DaemonStatus
|
|
||||||
newDaemonStatus = return $ DaemonStatus
|
|
||||||
{ scanComplete = False
|
|
||||||
, lastRunning = Nothing
|
|
||||||
}
|
|
||||||
|
|
||||||
getDaemonStatus :: MVar DaemonStatus -> Annex DaemonStatus
|
|
||||||
getDaemonStatus = liftIO . readMVar
|
|
||||||
|
|
||||||
modifyDaemonStatus :: MVar DaemonStatus -> (DaemonStatus -> DaemonStatus) -> Annex ()
|
|
||||||
modifyDaemonStatus status a = liftIO $ modifyMVar_ status (return . a)
|
|
||||||
|
|
||||||
type ChangeChan = TChan Change
|
|
||||||
|
|
||||||
type Handler = FilePath -> Maybe FileStatus -> MVar DaemonStatus -> Annex (Maybe Change)
|
|
||||||
|
|
||||||
data Change = Change
|
|
||||||
{ changeTime :: UTCTime
|
|
||||||
, changeFile :: FilePath
|
|
||||||
, changeDesc :: String
|
|
||||||
}
|
|
||||||
deriving (Show)
|
|
||||||
|
|
||||||
def :: [Command]
|
def :: [Command]
|
||||||
def = [withOptions [foregroundOption, stopOption] $
|
def = [withOptions [foregroundOption, stopOption] $
|
||||||
command "watch" paramNothing seek "watch for changes"]
|
command "watch" paramNothing seek "watch for changes"]
|
||||||
|
@ -121,296 +33,6 @@ stopOption = Option.flag [] "stop" "stop daemon"
|
||||||
start :: Bool -> Bool -> CommandStart
|
start :: Bool -> Bool -> CommandStart
|
||||||
start foreground stopdaemon = notBareRepo $ do
|
start foreground stopdaemon = notBareRepo $ do
|
||||||
if stopdaemon
|
if stopdaemon
|
||||||
then liftIO . stopDaemon =<< fromRepo gitAnnexPidFile
|
then stopDaemon
|
||||||
else withStateMVar $ startDaemon foreground
|
else startDaemon foreground -- does not return
|
||||||
stop
|
stop
|
||||||
|
|
||||||
startDaemon :: Bool -> MVar Annex.AnnexState -> Annex ()
|
|
||||||
startDaemon foreground st
|
|
||||||
| foreground = do
|
|
||||||
showStart "watch" "."
|
|
||||||
go id
|
|
||||||
| otherwise = do
|
|
||||||
logfd <- liftIO . openLog =<< fromRepo gitAnnexLogFile
|
|
||||||
pidfile <- fromRepo gitAnnexPidFile
|
|
||||||
go $ daemonize logfd (Just pidfile) False
|
|
||||||
where
|
|
||||||
go a = do
|
|
||||||
daemonstatus <- newDaemonStatus
|
|
||||||
liftIO $ a $ do
|
|
||||||
dstatus <- newMVar daemonstatus
|
|
||||||
changechan <- runChangeChan newTChan
|
|
||||||
watch st dstatus changechan
|
|
||||||
|
|
||||||
watch :: MVar Annex.AnnexState -> MVar DaemonStatus -> ChangeChan -> IO ()
|
|
||||||
#if defined linux_HOST_OS
|
|
||||||
watch st dstatus changechan = withINotify $ \i -> do
|
|
||||||
-- The commit thread is started early, so that the user
|
|
||||||
-- can immediately begin adding files and having them
|
|
||||||
-- committed, even while the startup scan is taking place.
|
|
||||||
_ <- forkIO $ commitThread st changechan
|
|
||||||
runStateMVar st $
|
|
||||||
showAction "scanning"
|
|
||||||
-- This does not return until the startup scan is done.
|
|
||||||
-- That can take some time for large trees.
|
|
||||||
watchDir i "." (ignored . takeFileName) hooks
|
|
||||||
runStateMVar st $
|
|
||||||
modifyDaemonStatus dstatus $ \s -> s { scanComplete = True }
|
|
||||||
-- Notice any files that were deleted before inotify
|
|
||||||
-- was started.
|
|
||||||
runStateMVar st $ do
|
|
||||||
inRepo $ Git.Command.run "add" [Param "--update"]
|
|
||||||
showAction "started"
|
|
||||||
waitForTermination
|
|
||||||
where
|
|
||||||
hook a = Just $ runHandler st dstatus changechan a
|
|
||||||
hooks = WatchHooks
|
|
||||||
{ addHook = hook onAdd
|
|
||||||
, delHook = hook onDel
|
|
||||||
, addSymlinkHook = hook onAddSymlink
|
|
||||||
, delDirHook = hook onDelDir
|
|
||||||
, errHook = hook onErr
|
|
||||||
}
|
|
||||||
#else
|
|
||||||
watch = error "watch mode is so far only available on Linux"
|
|
||||||
#endif
|
|
||||||
|
|
||||||
ignored :: FilePath -> Bool
|
|
||||||
ignored ".git" = True
|
|
||||||
ignored ".gitignore" = True
|
|
||||||
ignored ".gitattributes" = True
|
|
||||||
ignored _ = False
|
|
||||||
|
|
||||||
{- Stores the Annex state in a MVar, so that threaded actions can access
|
|
||||||
- it.
|
|
||||||
-
|
|
||||||
- Once the action is finished, retrieves the state from the MVar.
|
|
||||||
-}
|
|
||||||
withStateMVar :: (MVar Annex.AnnexState -> Annex a) -> Annex a
|
|
||||||
withStateMVar a = do
|
|
||||||
state <- Annex.getState id
|
|
||||||
mvar <- liftIO $ newMVar state
|
|
||||||
r <- a mvar
|
|
||||||
newstate <- liftIO $ takeMVar mvar
|
|
||||||
Annex.changeState (const newstate)
|
|
||||||
return r
|
|
||||||
|
|
||||||
{- Runs an Annex action, using the state from the MVar. -}
|
|
||||||
runStateMVar :: MVar Annex.AnnexState -> Annex a -> IO a
|
|
||||||
runStateMVar mvar a = do
|
|
||||||
startstate <- takeMVar mvar
|
|
||||||
!(r, newstate) <- Annex.run startstate a
|
|
||||||
putMVar mvar newstate
|
|
||||||
return r
|
|
||||||
|
|
||||||
runChangeChan :: STM a -> IO a
|
|
||||||
runChangeChan = atomically
|
|
||||||
|
|
||||||
{- Runs an action handler, inside the Annex monad, and if there was a
|
|
||||||
- change, adds it to the ChangeChan.
|
|
||||||
-
|
|
||||||
- Exceptions are ignored, otherwise a whole watcher thread could be crashed.
|
|
||||||
-}
|
|
||||||
runHandler :: MVar Annex.AnnexState -> MVar DaemonStatus -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO ()
|
|
||||||
runHandler st dstatus changechan handler file filestatus = void $ do
|
|
||||||
r <- tryIO go
|
|
||||||
case r of
|
|
||||||
Left e -> print e
|
|
||||||
Right Nothing -> noop
|
|
||||||
Right (Just change) -> void $
|
|
||||||
runChangeChan $ writeTChan changechan change
|
|
||||||
where
|
|
||||||
go = runStateMVar st $ handler file filestatus dstatus
|
|
||||||
|
|
||||||
{- Handlers call this when they made a change that needs to get committed. -}
|
|
||||||
madeChange :: FilePath -> String -> Annex (Maybe Change)
|
|
||||||
madeChange file desc = do
|
|
||||||
-- Just in case the commit thread is not flushing the queue fast enough.
|
|
||||||
Annex.Queue.flushWhenFull
|
|
||||||
liftIO $ Just <$> (Change <$> getCurrentTime <*> pure file <*> pure desc)
|
|
||||||
|
|
||||||
noChange :: Annex (Maybe Change)
|
|
||||||
noChange = return Nothing
|
|
||||||
|
|
||||||
{- Adding a file is tricky; the file has to be replaced with a symlink
|
|
||||||
- but this is race prone, as the symlink could be changed immediately
|
|
||||||
- after creation. To avoid that race, git add is not used to stage the
|
|
||||||
- symlink.
|
|
||||||
-
|
|
||||||
- Inotify will notice the new symlink, so this Handler does not stage it
|
|
||||||
- or return a Change, leaving that to onAddSymlink.
|
|
||||||
-
|
|
||||||
- During initial directory scan, this will be run for any files that
|
|
||||||
- are already checked into git. We don't want to turn those into symlinks,
|
|
||||||
- so do a check. This is rather expensive, but only happens during
|
|
||||||
- startup.
|
|
||||||
-}
|
|
||||||
onAdd :: Handler
|
|
||||||
onAdd file _filestatus dstatus = do
|
|
||||||
ifM (scanComplete <$> getDaemonStatus dstatus)
|
|
||||||
( go
|
|
||||||
, ifM (null <$> inRepo (Git.LsFiles.notInRepo False [file]))
|
|
||||||
( noChange
|
|
||||||
, go
|
|
||||||
)
|
|
||||||
)
|
|
||||||
where
|
|
||||||
go = do
|
|
||||||
showStart "add" file
|
|
||||||
handle =<< Command.Add.ingest file
|
|
||||||
noChange
|
|
||||||
handle Nothing = showEndFail
|
|
||||||
handle (Just key) = do
|
|
||||||
Command.Add.link file key True
|
|
||||||
showEndOk
|
|
||||||
|
|
||||||
{- A symlink might be an arbitrary symlink, which is just added.
|
|
||||||
- Or, if it is a git-annex symlink, ensure it points to the content
|
|
||||||
- before adding it.
|
|
||||||
-
|
|
||||||
-}
|
|
||||||
onAddSymlink :: Handler
|
|
||||||
onAddSymlink file filestatus dstatus = go =<< Backend.lookupFile file
|
|
||||||
where
|
|
||||||
go (Just (key, _)) = do
|
|
||||||
link <- calcGitLink file key
|
|
||||||
ifM ((==) link <$> liftIO (readSymbolicLink file))
|
|
||||||
( ensurestaged link =<< getDaemonStatus dstatus
|
|
||||||
, do
|
|
||||||
liftIO $ removeFile file
|
|
||||||
liftIO $ createSymbolicLink link file
|
|
||||||
addlink link
|
|
||||||
)
|
|
||||||
go Nothing = do -- other symlink
|
|
||||||
link <- liftIO (readSymbolicLink file)
|
|
||||||
ensurestaged link =<< getDaemonStatus dstatus
|
|
||||||
|
|
||||||
{- This is often called on symlinks that are already
|
|
||||||
- staged correctly. A symlink may have been deleted
|
|
||||||
- and being re-added, or added when the watcher was
|
|
||||||
- not running. So they're normally restaged to make sure.
|
|
||||||
-
|
|
||||||
- As an optimisation, during the status scan, avoid
|
|
||||||
- restaging everything. Only links that were created since
|
|
||||||
- the last time the daemon was running are staged.
|
|
||||||
- (If the daemon has never ran before, avoid staging
|
|
||||||
- links too.)
|
|
||||||
-}
|
|
||||||
ensurestaged link daemonstatus
|
|
||||||
| scanComplete daemonstatus = addlink link
|
|
||||||
| otherwise = case filestatus of
|
|
||||||
Just s
|
|
||||||
| safe (statusChangeTime s) -> noChange
|
|
||||||
_ -> addlink link
|
|
||||||
where
|
|
||||||
safe t = maybe True (> t) (lastRunning daemonstatus)
|
|
||||||
|
|
||||||
{- For speed, tries to reuse the existing blob for
|
|
||||||
- the symlink target. -}
|
|
||||||
addlink link = do
|
|
||||||
v <- catObjectDetails $ Ref $ ':':file
|
|
||||||
case v of
|
|
||||||
Just (currlink, sha)
|
|
||||||
| s2w8 link == L.unpack currlink ->
|
|
||||||
stageSymlink file sha
|
|
||||||
_ -> do
|
|
||||||
sha <- inRepo $
|
|
||||||
Git.HashObject.hashObject BlobObject link
|
|
||||||
stageSymlink file sha
|
|
||||||
madeChange file "link"
|
|
||||||
|
|
||||||
onDel :: Handler
|
|
||||||
onDel file _ _dstatus = do
|
|
||||||
Annex.Queue.addUpdateIndex =<<
|
|
||||||
inRepo (Git.UpdateIndex.unstageFile file)
|
|
||||||
madeChange file "rm"
|
|
||||||
|
|
||||||
{- A directory has been deleted, or moved, so tell git to remove anything
|
|
||||||
- that was inside it from its cache. Since it could reappear at any time,
|
|
||||||
- use --cached to only delete it from the index.
|
|
||||||
-
|
|
||||||
- Note: This could use unstageFile, but would need to run another git
|
|
||||||
- command to get the recursive list of files in the directory, so rm is
|
|
||||||
- just as good. -}
|
|
||||||
onDelDir :: Handler
|
|
||||||
onDelDir dir _ _dstatus = do
|
|
||||||
Annex.Queue.addCommand "rm"
|
|
||||||
[Params "--quiet -r --cached --ignore-unmatch --"] [dir]
|
|
||||||
madeChange dir "rmdir"
|
|
||||||
|
|
||||||
{- Called when there's an error with inotify. -}
|
|
||||||
onErr :: Handler
|
|
||||||
onErr msg _ _dstatus = do
|
|
||||||
warning msg
|
|
||||||
return Nothing
|
|
||||||
|
|
||||||
{- Adds a symlink to the index, without ever accessing the actual symlink
|
|
||||||
- on disk. -}
|
|
||||||
stageSymlink :: FilePath -> Sha -> Annex ()
|
|
||||||
stageSymlink file sha =
|
|
||||||
Annex.Queue.addUpdateIndex =<<
|
|
||||||
inRepo (Git.UpdateIndex.stageSymlink file sha)
|
|
||||||
|
|
||||||
{- Gets all unhandled changes.
|
|
||||||
- Blocks until at least one change is made. -}
|
|
||||||
getChanges :: ChangeChan -> IO [Change]
|
|
||||||
getChanges chan = runChangeChan $ do
|
|
||||||
c <- readTChan chan
|
|
||||||
go [c]
|
|
||||||
where
|
|
||||||
go l = do
|
|
||||||
v <- tryReadTChan chan
|
|
||||||
case v of
|
|
||||||
Nothing -> return l
|
|
||||||
Just c -> go (c:l)
|
|
||||||
|
|
||||||
{- Puts unhandled changes back into the channel.
|
|
||||||
- Note: Original order is not preserved. -}
|
|
||||||
refillChanges :: ChangeChan -> [Change] -> IO ()
|
|
||||||
refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs
|
|
||||||
|
|
||||||
{- This thread makes git commits at appropriate times. -}
|
|
||||||
commitThread :: MVar Annex.AnnexState -> ChangeChan -> IO ()
|
|
||||||
commitThread st changechan = forever $ do
|
|
||||||
-- First, a simple rate limiter.
|
|
||||||
threadDelay oneSecond
|
|
||||||
-- Next, wait until at least one change has been made.
|
|
||||||
cs <- getChanges changechan
|
|
||||||
-- Now see if now's a good time to commit.
|
|
||||||
time <- getCurrentTime
|
|
||||||
if shouldCommit time cs
|
|
||||||
then void $ tryIO $ runStateMVar st commitStaged
|
|
||||||
else refillChanges changechan cs
|
|
||||||
where
|
|
||||||
oneSecond = 1000000 -- microseconds
|
|
||||||
|
|
||||||
commitStaged :: Annex ()
|
|
||||||
commitStaged = do
|
|
||||||
Annex.Queue.flush
|
|
||||||
inRepo $ Git.Command.run "commit"
|
|
||||||
[ Param "--allow-empty-message"
|
|
||||||
, Param "-m", Param ""
|
|
||||||
-- Empty commits may be made if tree changes cancel
|
|
||||||
-- each other out, etc
|
|
||||||
, Param "--allow-empty"
|
|
||||||
-- Avoid running the usual git-annex pre-commit hook;
|
|
||||||
-- watch does the same symlink fixing, and we don't want
|
|
||||||
-- to deal with unlocked files in these commits.
|
|
||||||
, Param "--quiet"
|
|
||||||
]
|
|
||||||
|
|
||||||
{- Decide if now is a good time to make a commit.
|
|
||||||
- Note that the list of change times has an undefined order.
|
|
||||||
-
|
|
||||||
- Current strategy: If there have been 10 commits within the past second,
|
|
||||||
- a batch activity is taking place, so wait for later.
|
|
||||||
-}
|
|
||||||
shouldCommit :: UTCTime -> [Change] -> Bool
|
|
||||||
shouldCommit now changes
|
|
||||||
| len == 0 = False
|
|
||||||
| len > 10000 = True -- avoid bloating queue too much
|
|
||||||
| length (filter thisSecond changes) < 10 = True
|
|
||||||
| otherwise = False -- batch activity
|
|
||||||
where
|
|
||||||
len = length changes
|
|
||||||
thisSecond c = now `diffUTCTime` changeTime c <= 1
|
|
||||||
|
|
Loading…
Reference in a new issue