use async to track and manage threads

This commit is contained in:
Joey Hess 2013-01-26 14:14:32 +11:00
parent 7fc6ebb765
commit 1713ed95f7
11 changed files with 77 additions and 61 deletions

View file

@ -154,11 +154,6 @@ import Assistant.Threads.XMPPClient
import Assistant.Environment import Assistant.Environment
import qualified Utility.Daemon import qualified Utility.Daemon
import Utility.LogFile import Utility.LogFile
import Utility.ThreadScheduler
import Control.Concurrent
type NamedThread = IO () -> IO (String, IO ())
stopDaemon :: Annex () stopDaemon :: Annex ()
stopDaemon = liftIO . Utility.Daemon.stopDaemon =<< fromRepo gitAnnexPidFile stopDaemon = liftIO . Utility.Daemon.stopDaemon =<< fromRepo gitAnnexPidFile
@ -197,11 +192,11 @@ startDaemon assistant foreground startbrowser = do
=<< newAssistantData st dstatus =<< newAssistantData st dstatus
go webappwaiter = do go webappwaiter = do
d <- getAssistant id
#ifdef WITH_WEBAPP #ifdef WITH_WEBAPP
d <- getAssistant id
urlrenderer <- liftIO newUrlRenderer urlrenderer <- liftIO newUrlRenderer
#endif #endif
mapM_ (startthread d) mapM_ startthread
[ watch $ commitThread [ watch $ commitThread
#ifdef WITH_WEBAPP #ifdef WITH_WEBAPP
, assist $ webAppThread d urlrenderer False Nothing webappwaiter , assist $ webAppThread d urlrenderer False Nothing webappwaiter
@ -229,11 +224,10 @@ startDaemon assistant foreground startbrowser = do
, watch $ watchThread , watch $ watchThread
] ]
liftIO waitForTermination waitNamedThreads
watch a = (True, a) watch a = (True, a)
assist a = (False, a) assist a = (False, a)
startthread d (watcher, t) startthread (watcher, t)
| watcher || assistant = void $ liftIO $ forkIO $ | watcher || assistant = startNamedThread t
runAssistant d $ runNamedThread t
| otherwise = noop | otherwise = noop

View file

@ -10,4 +10,3 @@ module Assistant.Common (module X) where
import Common.Annex as X import Common.Annex as X
import Assistant.Monad as X import Assistant.Monad as X
import Assistant.Types.DaemonStatus as X import Assistant.Types.DaemonStatus as X
import Assistant.Types.NamedThread as X

View file

@ -19,10 +19,15 @@ module Assistant.Monad (
asIO, asIO,
asIO1, asIO1,
asIO2, asIO2,
NamedThread(..),
ThreadName,
debug,
notice
) where ) where
import "mtl" Control.Monad.Reader import "mtl" Control.Monad.Reader
import Control.Monad.Base (liftBase, MonadBase) import Control.Monad.Base (liftBase, MonadBase)
import System.Log.Logger
import Common.Annex import Common.Annex
import Assistant.Types.ThreadedMonad import Assistant.Types.ThreadedMonad
@ -37,6 +42,10 @@ import Assistant.Types.Changes
import Assistant.Types.Buddies import Assistant.Types.Buddies
import Assistant.Types.NetMessager import Assistant.Types.NetMessager
{- Information about a named thread that can be run. -}
data NamedThread = NamedThread ThreadName (Assistant ())
type ThreadName = String
newtype Assistant a = Assistant { mkAssistant :: ReaderT AssistantData IO a } newtype Assistant a = Assistant { mkAssistant :: ReaderT AssistantData IO a }
deriving ( deriving (
Monad, Monad,
@ -118,3 +127,14 @@ asIO2 a = do
{- Runs an IO action on a selected field of the AssistantData. -} {- Runs an IO action on a selected field of the AssistantData. -}
(<<~) :: (a -> IO b) -> (AssistantData -> a) -> Assistant b (<<~) :: (a -> IO b) -> (AssistantData -> a) -> Assistant b
io <<~ v = reader v >>= liftIO . io io <<~ v = reader v >>= liftIO . io
debug :: [String] -> Assistant ()
debug = logaction debugM
notice :: [String] -> Assistant ()
notice = logaction noticeM
logaction :: (String -> String -> IO ()) -> [String] -> Assistant ()
logaction a ws = do
name <- getAssistant threadName
liftIO $ a name $ unwords $ (name ++ ":") : ws

View file

@ -7,19 +7,39 @@
module Assistant.NamedThread where module Assistant.NamedThread where
import Assistant.Common import Common.Annex
import Assistant.Types.DaemonStatus
import Assistant.DaemonStatus import Assistant.DaemonStatus
import Assistant.Alert import Assistant.Alert
import Assistant.Monad
import qualified Control.Exception as E import Control.Concurrent
import Control.Concurrent.Async
import qualified Data.Map as M
runNamedThread :: NamedThread -> Assistant () {- Starts a named thread, if it's not already running.
runNamedThread (NamedThread name a) = do -
d <- getAssistant id - Named threads are run by a management thread, so if they crash
liftIO . go $ d { threadName = name } - an alert is displayed, allowing the thread to be restarted. -}
startNamedThread :: NamedThread -> Assistant ()
startNamedThread namedthread@(NamedThread name a) = do
m <- startedThreads <$> getDaemonStatus
case M.lookup name m of
Nothing -> start
Just aid ->
maybe noop (const start) =<< liftIO (poll aid)
where where
go d = do start = do
r <- E.try (runAssistant d a) :: IO (Either E.SomeException ()) d <- getAssistant id
aid <- liftIO $ runmanaged $ d { threadName = name }
modifyDaemonStatus_ $ \s -> s
{ startedThreads = M.insertWith' const name aid (startedThreads s) }
runmanaged d = do
aid <- async $ runAssistant d a
void $ forkIO $ manager d aid
return aid
manager d aid = do
r <- waitCatch aid
case r of case r of
Right _ -> noop Right _ -> noop
Left e -> do Left e -> do
@ -28,3 +48,10 @@ runNamedThread (NamedThread name a) = do
-- TODO click to restart -- TODO click to restart
runAssistant d $ void $ runAssistant d $ void $
addAlert $ warningAlert name msg addAlert $ warningAlert name msg
{- Waits for all named threads that have been started to finish. -}
waitNamedThreads :: Assistant ()
waitNamedThreads = do
m <- startedThreads <$> getDaemonStatus
liftIO $ mapM_ wait $ M.elems m

View file

@ -23,9 +23,6 @@ import qualified Annex.Branch
import qualified Data.Set as S import qualified Data.Set as S
thisThread :: ThreadName
thisThread = "ConfigMonitor"
{- This thread detects when configuration changes have been made to the {- This thread detects when configuration changes have been made to the
- git-annex branch and reloads cached configuration. - git-annex branch and reloads cached configuration.
- -

View file

@ -16,12 +16,15 @@ import Utility.NotificationBroadcaster
import Logs.Transfer import Logs.Transfer
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.Async
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import qualified Data.Map as M import qualified Data.Map as M
data DaemonStatus = DaemonStatus data DaemonStatus = DaemonStatus
-- All the named threads that comprise the daemon.
{ startedThreads :: M.Map String (Async ())
-- False when the daemon is performing its startup scan -- False when the daemon is performing its startup scan
{ scanComplete :: Bool , scanComplete :: Bool
-- Time when a previous process of the daemon was running ok -- Time when a previous process of the daemon was running ok
, lastRunning :: Maybe POSIXTime , lastRunning :: Maybe POSIXTime
-- True when the sanity checker is running -- True when the sanity checker is running
@ -58,7 +61,8 @@ type DaemonStatusHandle = TMVar DaemonStatus
newDaemonStatus :: IO DaemonStatus newDaemonStatus :: IO DaemonStatus
newDaemonStatus = DaemonStatus newDaemonStatus = DaemonStatus
<$> pure False <$> pure M.empty
<*> pure False
<*> pure Nothing <*> pure Nothing
<*> pure False <*> pure False
<*> pure Nothing <*> pure Nothing

View file

@ -1,32 +0,0 @@
{- git-annex assistant named threads.
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.Types.NamedThread (
ThreadName,
NamedThread(..),
debug,
notice,
) where
import Common.Annex
import Assistant.Monad
import System.Log.Logger
type ThreadName = String
data NamedThread = NamedThread ThreadName (Assistant ())
debug :: [String] -> Assistant ()
debug = logaction debugM
notice :: [String] -> Assistant ()
notice = logaction noticeM
logaction :: (String -> String -> IO ()) -> [String] -> Assistant ()
logaction a ws = do
name <- getAssistant threadName
liftIO $ a name $ unwords $ (name ++ ":") : ws

View file

@ -109,10 +109,12 @@ firstRun = do
urlrenderer <- newUrlRenderer urlrenderer <- newUrlRenderer
v <- newEmptyMVar v <- newEmptyMVar
let callback a = Just $ a v let callback a = Just $ a v
void $ runAssistant d $ runNamedThread $ runAssistant d $ do
webAppThread d urlrenderer True startNamedThread $
(callback signaler) webAppThread d urlrenderer True
(callback mainthread) (callback signaler)
(callback mainthread)
waitNamedThreads
where where
signaler v = do signaler v = do
putMVar v "" putMVar v ""

1
debian/control vendored
View file

@ -44,6 +44,7 @@ Build-Depends:
libghc-network-protocol-xmpp-dev (>= 0.4.3-1+b1), libghc-network-protocol-xmpp-dev (>= 0.4.3-1+b1),
libghc-gnutls-dev (>= 0.1.4), libghc-gnutls-dev (>= 0.1.4),
libghc-xml-types-dev, libghc-xml-types-dev,
libghc-async-dev,
ikiwiki, ikiwiki,
perlmagick, perlmagick,
git, git,

View file

@ -46,6 +46,7 @@ quite a lot.
* [network-protocol-xmpp](http://hackage.haskell.org/package/network-protocol-xmpp) * [network-protocol-xmpp](http://hackage.haskell.org/package/network-protocol-xmpp)
* [dns](http://hackage.haskell.org/package/dns) * [dns](http://hackage.haskell.org/package/dns)
* [xml-types](http://hackage.haskell.org/package/xml-types) * [xml-types](http://hackage.haskell.org/package/xml-types)
* [async](http://hackage.haskell.org/package/async)
* Shell commands * Shell commands
* [git](http://git-scm.com/) * [git](http://git-scm.com/)
* [uuid](http://www.ossp.org/pkg/lib/uuid/) * [uuid](http://www.ossp.org/pkg/lib/uuid/)

View file

@ -75,6 +75,9 @@ Executable git-annex
if flag(WebDAV) if flag(WebDAV)
Build-Depends: DAV (>= 0.3), http-conduit, xml-conduit, http-types Build-Depends: DAV (>= 0.3), http-conduit, xml-conduit, http-types
CPP-Options: -DWITH_WEBDAV CPP-Options: -DWITH_WEBDAV
if flag(Assistant)
Build-Depends: async
if flag(Assistant) && ! os(windows) && ! os(solaris) if flag(Assistant) && ! os(windows) && ! os(solaris)
Build-Depends: stm >= 2.3 Build-Depends: stm >= 2.3