avoid using STM while the MVar is held
I thought this might be a lock conflict that explains the deadlock when built with -threaded, but it seems not.. it still locks! It even locks without the committer thread. Indeed, it locks when running "git annex add"! -threaded is exposing some other problem. Still, this seems conceptually cleaner and did not add any inneficiencies. Also added some high-level documentation about the threads used.
This commit is contained in:
parent
f7dbcd58ff
commit
7f3934520a
1 changed files with 51 additions and 25 deletions
|
@ -1,13 +1,36 @@
|
||||||
{- git-annex command
|
{-# LANGUAGE CPP #-}
|
||||||
|
{-# LANGUAGE BangPatterns #-}
|
||||||
|
|
||||||
|
{- git-annex watch daemon
|
||||||
|
-
|
||||||
|
- Overview of threads and MVars, etc:
|
||||||
|
-
|
||||||
|
- Thread 1: Parent
|
||||||
|
- The initial thread run, double forks to background, starts other
|
||||||
|
- threads, and then stops, waiting for them to terminate.
|
||||||
|
- Thread 2: inotify
|
||||||
|
- Notices new files, and calls handlers for events, queuing changes.
|
||||||
|
- Thread 3: inotify internal
|
||||||
|
- Used by haskell inotify library to ensure inotify event buffer is
|
||||||
|
- kept drained.
|
||||||
|
- Thread 4: committer
|
||||||
|
- Waits for changes to occur, and runs the git queue to update its
|
||||||
|
- index, then commits.
|
||||||
|
-
|
||||||
|
- State MVar:
|
||||||
|
- The Annex state is stored here, which allows recuscitating the
|
||||||
|
- Annex monad in IO actions run by the inotify and committer
|
||||||
|
- threads. Thus, a single state is shared amoung the threads, and
|
||||||
|
- only one at a time can access it.
|
||||||
|
- ChangeChan STM TChan:
|
||||||
|
- Changes are indicated by writing to this channel. The committer
|
||||||
|
- reads from it.
|
||||||
-
|
-
|
||||||
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
||||||
-
|
-
|
||||||
- Licensed under the GNU GPL version 3 or higher.
|
- Licensed under the GNU GPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
|
||||||
{-# LANGUAGE CPP #-}
|
|
||||||
{-# LANGUAGE BangPatterns #-}
|
|
||||||
|
|
||||||
module Command.Watch where
|
module Command.Watch where
|
||||||
|
|
||||||
import Common.Annex
|
import Common.Annex
|
||||||
|
@ -83,7 +106,7 @@ startDaemon True st = do
|
||||||
watch :: MVar Annex.AnnexState -> IO ()
|
watch :: MVar Annex.AnnexState -> IO ()
|
||||||
#if defined linux_HOST_OS
|
#if defined linux_HOST_OS
|
||||||
watch st = withINotify $ \i -> do
|
watch st = withINotify $ \i -> do
|
||||||
changechan <- atomically newTChan
|
changechan <- runChangeChan newTChan
|
||||||
let hook a = Just $ runHandler st changechan a
|
let hook a = Just $ runHandler st changechan a
|
||||||
let hooks = WatchHooks
|
let hooks = WatchHooks
|
||||||
{ addHook = hook onAdd
|
{ addHook = hook onAdd
|
||||||
|
@ -131,26 +154,38 @@ withStateMVar a = do
|
||||||
return r
|
return r
|
||||||
|
|
||||||
{- Runs an Annex action, using the state from the MVar. -}
|
{- Runs an Annex action, using the state from the MVar. -}
|
||||||
runStateMVar :: MVar Annex.AnnexState -> Annex () -> IO ()
|
runStateMVar :: MVar Annex.AnnexState -> Annex a -> IO a
|
||||||
runStateMVar mvar a = do
|
runStateMVar mvar a = do
|
||||||
|
liftIO $ putStrLn "takeMVar"
|
||||||
startstate <- takeMVar mvar
|
startstate <- takeMVar mvar
|
||||||
!newstate <- Annex.exec startstate a
|
!(r, newstate) <- Annex.run startstate a
|
||||||
|
liftIO $ putStrLn "putMVar"
|
||||||
putMVar mvar newstate
|
putMVar mvar newstate
|
||||||
|
return r
|
||||||
|
|
||||||
{- Runs an action handler, inside the Annex monad.
|
runChangeChan :: STM a -> IO a
|
||||||
|
runChangeChan = atomically
|
||||||
|
|
||||||
|
{- Runs an action handler, inside the Annex monad, and if there was a
|
||||||
|
- change, adds it to the ChangeChan.
|
||||||
-
|
-
|
||||||
- Exceptions are ignored, otherwise a whole watcher thread could be crashed.
|
- Exceptions are ignored, otherwise a whole watcher thread could be crashed.
|
||||||
-}
|
-}
|
||||||
runHandler :: MVar Annex.AnnexState -> ChangeChan -> Handler -> FilePath -> IO ()
|
runHandler :: MVar Annex.AnnexState -> ChangeChan -> Handler -> FilePath -> IO ()
|
||||||
runHandler st changechan handler file =
|
runHandler st changechan handler file = void $ do
|
||||||
either (putStrLn . show) return =<< tryIO (runStateMVar st go)
|
r <- tryIO (runStateMVar st $ handler file)
|
||||||
where
|
case r of
|
||||||
go = maybe noop (signalChange changechan) =<< handler file
|
Left e -> putStrLn $ show e
|
||||||
|
Right Nothing -> noop
|
||||||
|
Right (Just change) -> void $
|
||||||
|
runChangeChan $ writeTChan changechan change
|
||||||
|
|
||||||
{- Handlers call this when they made a change that needs to get committed. -}
|
{- Handlers call this when they made a change that needs to get committed. -}
|
||||||
madeChange :: FilePath -> String -> Annex (Maybe Change)
|
madeChange :: FilePath -> String -> Annex (Maybe Change)
|
||||||
madeChange file desc = liftIO $
|
madeChange file desc = do
|
||||||
Just <$> (Change <$> getCurrentTime <*> pure file <*> pure desc)
|
-- Just in case the commit thread is not flushing the queue fast enough.
|
||||||
|
Annex.Queue.flushWhenFull
|
||||||
|
liftIO $ Just <$> (Change <$> getCurrentTime <*> pure file <*> pure desc)
|
||||||
|
|
||||||
noChange :: Annex (Maybe Change)
|
noChange :: Annex (Maybe Change)
|
||||||
noChange = return Nothing
|
noChange = return Nothing
|
||||||
|
@ -243,19 +278,10 @@ stageSymlink file sha =
|
||||||
Annex.Queue.addUpdateIndex =<<
|
Annex.Queue.addUpdateIndex =<<
|
||||||
inRepo (Git.UpdateIndex.stageSymlink file sha)
|
inRepo (Git.UpdateIndex.stageSymlink file sha)
|
||||||
|
|
||||||
{- Signals that a change has been made, that needs to get committed. -}
|
|
||||||
signalChange :: ChangeChan -> Change -> Annex ()
|
|
||||||
signalChange chan change = do
|
|
||||||
liftIO $ atomically $ writeTChan chan change
|
|
||||||
|
|
||||||
-- Just in case the commit thread is not flushing
|
|
||||||
-- the queue fast enough.
|
|
||||||
Annex.Queue.flushWhenFull
|
|
||||||
|
|
||||||
{- Gets all unhandled changes.
|
{- Gets all unhandled changes.
|
||||||
- Blocks until at least one change is made. -}
|
- Blocks until at least one change is made. -}
|
||||||
getChanges :: ChangeChan -> IO [Change]
|
getChanges :: ChangeChan -> IO [Change]
|
||||||
getChanges chan = atomically $ do
|
getChanges chan = runChangeChan $ do
|
||||||
c <- readTChan chan
|
c <- readTChan chan
|
||||||
go [c]
|
go [c]
|
||||||
where
|
where
|
||||||
|
@ -268,7 +294,7 @@ getChanges chan = atomically $ do
|
||||||
{- Puts unhandled changes back into the channel.
|
{- Puts unhandled changes back into the channel.
|
||||||
- Note: Original order is not preserved. -}
|
- Note: Original order is not preserved. -}
|
||||||
refillChanges :: ChangeChan -> [Change] -> IO ()
|
refillChanges :: ChangeChan -> [Change] -> IO ()
|
||||||
refillChanges chan cs = atomically $ mapM_ (writeTChan chan) cs
|
refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs
|
||||||
|
|
||||||
{- This thread makes git commits at appropriate times. -}
|
{- This thread makes git commits at appropriate times. -}
|
||||||
commitThread :: MVar Annex.AnnexState -> ChangeChan -> IO ()
|
commitThread :: MVar Annex.AnnexState -> ChangeChan -> IO ()
|
||||||
|
|
Loading…
Reference in a new issue