convert Watcher thread to Assistant monad

This is a nice win; much less code runs in Annex, so other threads have
more chances to run concurrently.

I do notice that renaming a file has gone from 1 to 2 commits. I think this
is due to the above improvement letting the committer run more frequently,
so it commits the rm first.
This commit is contained in:
Joey Hess 2012-10-29 09:55:40 -04:00
parent 4dbdc2b666
commit bad88e404a
4 changed files with 89 additions and 88 deletions

View file

@ -215,7 +215,7 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do
#ifdef WITH_XMPP #ifdef WITH_XMPP
, assist $ pushNotifierThread st dstatus pushnotifier , assist $ pushNotifierThread st dstatus pushnotifier
#endif #endif
, watch $ watchThread st dstatus transferqueue changechan , watch $ watchThread
] ]
liftIO waitForTermination liftIO waitForTermination

View file

@ -8,7 +8,6 @@
module Assistant.Changes where module Assistant.Changes where
import Common.Annex import Common.Annex
import qualified Annex.Queue
import Types.KeySource import Types.KeySource
import Utility.TSet import Utility.TSet
@ -39,19 +38,15 @@ newChangeChan :: IO ChangeChan
newChangeChan = newTSet newChangeChan = newTSet
{- Handlers call this when they made a change that needs to get committed. -} {- Handlers call this when they made a change that needs to get committed. -}
madeChange :: FilePath -> ChangeType -> Annex (Maybe Change) madeChange :: FilePath -> ChangeType -> IO (Maybe Change)
madeChange f t = do madeChange f t = Just <$> (Change <$> getCurrentTime <*> pure f <*> pure t)
-- Just in case the commit thread is not flushing the queue fast enough.
Annex.Queue.flushWhenFull
liftIO $ Just <$> (Change <$> getCurrentTime <*> pure f <*> pure t)
noChange :: Annex (Maybe Change) noChange :: IO (Maybe Change)
noChange = return Nothing noChange = return Nothing
{- Indicates an add needs to be done, but has not started yet. -} {- Indicates an add needs to be done, but has not started yet. -}
pendingAddChange :: FilePath -> Annex (Maybe Change) pendingAddChange :: FilePath -> IO (Maybe Change)
pendingAddChange f = pendingAddChange f = Just <$> (PendingAddChange <$> getCurrentTime <*> pure f)
liftIO $ Just <$> (PendingAddChange <$> getCurrentTime <*> pure f)
isPendingAddChange :: Change -> Bool isPendingAddChange :: Change -> Bool
isPendingAddChange (PendingAddChange {}) = True isPendingAddChange (PendingAddChange {}) = True

View file

@ -90,9 +90,5 @@ check = do
dstatus <- getAssistant daemonStatusHandle dstatus <- getAssistant daemonStatusHandle
liftIO $ void $ addAlert dstatus $ sanityCheckFixAlert msg liftIO $ void $ addAlert dstatus $ sanityCheckFixAlert msg
addsymlink file s = do addsymlink file s = do
d <- getAssistant id Watcher.runHandler Watcher.onAddSymlink file s
liftIO $ Watcher.runHandler (threadName d)
(threadState d) (daemonStatusHandle d)
(transferQueue d) (changeChan d)
Watcher.onAddSymlink file s
insanity $ "found unstaged symlink: " ++ file insanity $ "found unstaged symlink: " ++ file

View file

@ -15,7 +15,6 @@ module Assistant.Threads.Watcher (
) where ) where
import Assistant.Common import Assistant.Common
import Assistant.ThreadedMonad
import Assistant.DaemonStatus import Assistant.DaemonStatus
import Assistant.Changes import Assistant.Changes
import Assistant.TransferQueue import Assistant.TransferQueue
@ -37,9 +36,6 @@ import Git.Types
import Data.Bits.Utils import Data.Bits.Utils
import qualified Data.ByteString.Lazy as L import qualified Data.ByteString.Lazy as L
thisThread :: ThreadName
thisThread = "Watcher"
checkCanWatch :: Annex () checkCanWatch :: Annex ()
checkCanWatch checkCanWatch
| canWatch = | canWatch =
@ -55,35 +51,42 @@ needLsof = error $ unlines
, "Be warned: This can corrupt data in the annex, and make fsck complain." , "Be warned: This can corrupt data in the annex, and make fsck complain."
] ]
watchThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> NamedThread watchThread :: NamedThread
watchThread st dstatus transferqueue changechan = NamedThread thisThread $ liftIO $ do watchThread = NamedThread "Watcher" $ do
void $ watchDir "." ignored hooks startup startup <- asIO startupScan
brokendebug thisThread [ "watching", "."] addhook <- hook onAdd
where delhook <- hook onDel
startup = startupScan st dstatus addsymlinkhook <- hook onAddSymlink
hook a = Just $ runHandler thisThread st dstatus transferqueue changechan a deldirhook <- hook onDelDir
hooks = mkWatchHooks errhook <- hook onErr
{ addHook = hook onAdd let hooks = mkWatchHooks
, delHook = hook onDel { addHook = addhook
, addSymlinkHook = hook onAddSymlink , delHook = delhook
, delDirHook = hook onDelDir , addSymlinkHook = addsymlinkhook
, errHook = hook onErr , delDirHook = deldirhook
, errHook = errhook
} }
void $ liftIO $ watchDir "." ignored hooks startup
debug [ "watching", "."]
where
hook a = Just <$> asIO2 (runHandler a)
{- Initial scartup scan. The action should return once the scan is complete. -} {- Initial scartup scan. The action should return once the scan is complete. -}
startupScan :: ThreadState -> DaemonStatusHandle -> IO a -> IO a startupScan :: IO a -> Assistant a
startupScan st dstatus scanner = do startupScan scanner = do
runThreadState st $ showAction "scanning" liftAnnex $ showAction "scanning"
alertWhile' dstatus startupScanAlert $ do dstatus <- getAssistant daemonStatusHandle
r <- scanner alertWhile' dstatus startupScanAlert <~> do
r <- liftIO $ scanner
-- Notice any files that were deleted before -- Notice any files that were deleted before
-- watching was started. -- watching was started.
runThreadState st $ do liftAnnex $ do
inRepo $ Git.Command.run "add" [Param "--update"] inRepo $ Git.Command.run "add" [Param "--update"]
showAction "started" showAction "started"
modifyDaemonStatus_ dstatus $ \s -> s { scanComplete = True } liftIO $ modifyDaemonStatus_ dstatus $
\s -> s { scanComplete = True }
return (True, r) return (True, r)
@ -95,52 +98,52 @@ ignored = ig . takeFileName
ig ".gitattributes" = True ig ".gitattributes" = True
ig _ = False ig _ = False
type Handler = ThreadName -> FilePath -> Maybe FileStatus -> DaemonStatusHandle -> TransferQueue -> Annex (Maybe Change) type Handler = FilePath -> Maybe FileStatus -> Assistant (Maybe Change)
{- Runs an action handler, inside the Annex monad, and if there was a {- Runs an action handler, and if there was a change, adds it to the ChangeChan.
- change, adds it to the ChangeChan.
- -
- Exceptions are ignored, otherwise a whole watcher thread could be crashed. - Exceptions are ignored, otherwise a whole watcher thread could be crashed.
-} -}
runHandler :: ThreadName -> ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> Handler -> FilePath -> Maybe FileStatus -> IO () runHandler :: Handler -> FilePath -> Maybe FileStatus -> Assistant ()
runHandler threadname st dstatus transferqueue changechan handler file filestatus = void $ do runHandler handler file filestatus = void $ do
r <- tryIO go r <- tryIO <~> handler file filestatus
case r of case r of
Left e -> print e Left e -> liftIO $ print e
Right Nothing -> noop Right Nothing -> noop
Right (Just change) -> recordChange changechan change Right (Just change) -> do
where -- Just in case the commit thread is not
go = runThreadState st $ handler threadname file filestatus dstatus transferqueue -- flushing the queue fast enough.
liftAnnex $ Annex.Queue.flushWhenFull
flip recordChange change <<~ changeChan
onAdd :: Handler onAdd :: Handler
onAdd _ file filestatus _ _ onAdd file filestatus
| maybe False isRegularFile filestatus = pendingAddChange file | maybe False isRegularFile filestatus = liftIO $ pendingAddChange file
| otherwise = noChange | otherwise = liftIO $ noChange
{- A symlink might be an arbitrary symlink, which is just added. {- A symlink might be an arbitrary symlink, which is just added.
- Or, if it is a git-annex symlink, ensure it points to the content - Or, if it is a git-annex symlink, ensure it points to the content
- before adding it. - before adding it.
-} -}
onAddSymlink :: Handler onAddSymlink :: Handler
onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.lookupFile file onAddSymlink file filestatus = go =<< liftAnnex (Backend.lookupFile file)
where where
go (Just (key, _)) = do go (Just (key, _)) = do
link <- calcGitLink file key link <- liftAnnex $ calcGitLink file key
ifM ((==) link <$> liftIO (readSymbolicLink file)) ifM ((==) link <$> liftIO (readSymbolicLink file))
( do ( do
s <- liftIO $ getDaemonStatus dstatus s <- daemonStatus
checkcontent key s checkcontent key s
ensurestaged link s ensurestaged link s
, do , do
liftIO $ brokendebug threadname ["fix symlink", file]
liftIO $ removeFile file liftIO $ removeFile file
liftIO $ createSymbolicLink link file liftIO $ createSymbolicLink link file
checkcontent key =<< liftIO (getDaemonStatus dstatus) checkcontent key =<< daemonStatus
addlink link addlink link
) )
go Nothing = do -- other symlink go Nothing = do -- other symlink
link <- liftIO (readSymbolicLink file) link <- liftIO (readSymbolicLink file)
ensurestaged link =<< liftIO (getDaemonStatus dstatus) ensurestaged link =<< daemonStatus
{- This is often called on symlinks that are already {- This is often called on symlinks that are already
- staged correctly. A symlink may have been deleted - staged correctly. A symlink may have been deleted
@ -156,41 +159,47 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l
ensurestaged link daemonstatus ensurestaged link daemonstatus
| scanComplete daemonstatus = addlink link | scanComplete daemonstatus = addlink link
| otherwise = case filestatus of | otherwise = case filestatus of
Just s Just s | changedrecently s -> liftIO noChange
| not (afterLastDaemonRun (statusChangeTime s) daemonstatus) -> noChange
_ -> addlink link _ -> addlink link
where
changedrecently s = not $
afterLastDaemonRun (statusChangeTime s) daemonstatus
{- For speed, tries to reuse the existing blob for symlink target. -} {- For speed, tries to reuse the existing blob for symlink target. -}
addlink link = do addlink link = do
liftIO $ brokendebug threadname ["add symlink", file] debug ["add symlink", file]
v <- catObjectDetails $ Ref $ ':':file liftAnnex $ do
case v of v <- catObjectDetails $ Ref $ ':':file
Just (currlink, sha) case v of
| s2w8 link == L.unpack currlink -> Just (currlink, sha)
| s2w8 link == L.unpack currlink ->
stageSymlink file sha
_ -> do
sha <- inRepo $
Git.HashObject.hashObject BlobObject link
stageSymlink file sha stageSymlink file sha
_ -> do liftIO $ madeChange file LinkChange
sha <- inRepo $
Git.HashObject.hashObject BlobObject link
stageSymlink file sha
madeChange file LinkChange
{- When a new link appears, or a link is changed, after the startup {- When a new link appears, or a link is changed, after the startup
- scan, handle getting or dropping the key's content. -} - scan, handle getting or dropping the key's content. -}
checkcontent key daemonstatus checkcontent key daemonstatus
| scanComplete daemonstatus = do | scanComplete daemonstatus = do
present <- inAnnex key present <- liftAnnex $ inAnnex key
unless present $ dstatus <- getAssistant daemonStatusHandle
queueTransfers Next transferqueue dstatus unless present $ do
key (Just file) Download transferqueue <- getAssistant transferQueue
handleDrops dstatus present key (Just file) liftAnnex $ queueTransfers Next transferqueue
dstatus key (Just file) Download
liftAnnex $ handleDrops dstatus present key (Just file)
| otherwise = noop | otherwise = noop
onDel :: Handler onDel :: Handler
onDel threadname file _ _dstatus _ = do onDel file _ = do
liftIO $ brokendebug threadname ["file deleted", file] debug ["file deleted", file]
Annex.Queue.addUpdateIndex =<< liftAnnex $
inRepo (Git.UpdateIndex.unstageFile file) Annex.Queue.addUpdateIndex =<<
madeChange file RmChange inRepo (Git.UpdateIndex.unstageFile file)
liftIO $ madeChange file RmChange
{- A directory has been deleted, or moved, so tell git to remove anything {- A directory has been deleted, or moved, so tell git to remove anything
- that was inside it from its cache. Since it could reappear at any time, - that was inside it from its cache. Since it could reappear at any time,
@ -200,18 +209,19 @@ onDel threadname file _ _dstatus _ = do
- command to get the recursive list of files in the directory, so rm is - command to get the recursive list of files in the directory, so rm is
- just as good. -} - just as good. -}
onDelDir :: Handler onDelDir :: Handler
onDelDir threadname dir _ _dstatus _ = do onDelDir dir _ = do
liftIO $ brokendebug threadname ["directory deleted", dir] debug ["directory deleted", dir]
Annex.Queue.addCommand "rm" liftAnnex $ Annex.Queue.addCommand "rm"
[Params "--quiet -r --cached --ignore-unmatch --"] [dir] [Params "--quiet -r --cached --ignore-unmatch --"] [dir]
madeChange dir RmDirChange liftIO $ madeChange dir RmDirChange
{- Called when there's an error with inotify or kqueue. -} {- Called when there's an error with inotify or kqueue. -}
onErr :: Handler onErr :: Handler
onErr _ msg _ dstatus _ = do onErr msg _ = do
warning msg liftAnnex $ warning msg
dstatus <- getAssistant daemonStatusHandle
void $ liftIO $ addAlert dstatus $ warningAlert "watcher" msg void $ liftIO $ addAlert dstatus $ warningAlert "watcher" msg
return Nothing liftIO noChange
{- Adds a symlink to the index, without ever accessing the actual symlink {- Adds a symlink to the index, without ever accessing the actual symlink
- on disk. This avoids a race if git add is used, where the symlink is - on disk. This avoids a race if git add is used, where the symlink is