display errors when any named thread crashes
This commit is contained in:
parent
d11ded822c
commit
a00f1d26bc
18 changed files with 133 additions and 64 deletions
11
Assistant.hs
11
Assistant.hs
|
@ -110,6 +110,7 @@ import Assistant.Pushes
|
|||
import Assistant.ScanRemotes
|
||||
import Assistant.TransferQueue
|
||||
import Assistant.TransferSlots
|
||||
import Assistant.Threads.DaemonStatus
|
||||
import Assistant.Threads.Watcher
|
||||
import Assistant.Threads.Committer
|
||||
import Assistant.Threads.Pusher
|
||||
|
@ -132,6 +133,8 @@ import Utility.ThreadScheduler
|
|||
|
||||
import Control.Concurrent
|
||||
|
||||
type NamedThread = IO () -> IO (String, IO ())
|
||||
|
||||
stopDaemon :: Annex ()
|
||||
stopDaemon = liftIO . Utility.Daemon.stopDaemon =<< fromRepo gitAnnexPidFile
|
||||
|
||||
|
@ -162,7 +165,7 @@ startAssistant assistant daemonize webappwaiter = do
|
|||
transferqueue <- newTransferQueue
|
||||
transferslots <- newTransferSlots
|
||||
scanremotes <- newScanRemoteMap
|
||||
mapM_ startthread
|
||||
mapM_ (startthread dstatus)
|
||||
[ watch $ commitThread st changechan commitchan transferqueue dstatus
|
||||
#ifdef WITH_WEBAPP
|
||||
, assist $ webAppThread (Just st) dstatus scanremotes transferqueue transferslots Nothing webappwaiter
|
||||
|
@ -177,12 +180,14 @@ startAssistant assistant daemonize webappwaiter = do
|
|||
, assist $ sanityCheckerThread st dstatus transferqueue changechan
|
||||
, assist $ mountWatcherThread st dstatus scanremotes
|
||||
, assist $ netWatcherThread st dstatus scanremotes
|
||||
, assist $ netWatcherFallbackThread st dstatus scanremotes
|
||||
, assist $ transferScannerThread st dstatus scanremotes transferqueue
|
||||
, watch $ watchThread st dstatus transferqueue changechan
|
||||
]
|
||||
waitForTermination
|
||||
watch a = (True, a)
|
||||
assist a = (False, a)
|
||||
startthread (watcher, a)
|
||||
| watcher || assistant = void $ forkIO a
|
||||
startthread dstatus (watcher, t)
|
||||
| watcher || assistant = void $ forkIO $
|
||||
runNamedThread dstatus t
|
||||
| otherwise = noop
|
||||
|
|
|
@ -8,14 +8,38 @@
|
|||
module Assistant.Common (
|
||||
module X,
|
||||
ThreadName,
|
||||
NamedThread(..),
|
||||
runNamedThread,
|
||||
debug
|
||||
) where
|
||||
|
||||
import Common.Annex as X
|
||||
import Assistant.DaemonStatus
|
||||
import Assistant.Alert
|
||||
|
||||
import System.Log.Logger
|
||||
import qualified Control.Exception as E
|
||||
|
||||
type ThreadName = String
|
||||
data NamedThread = NamedThread ThreadName (IO ())
|
||||
|
||||
debug :: ThreadName -> [String] -> IO ()
|
||||
debug threadname ws = debugM threadname $ unwords $ (threadname ++ ":") : ws
|
||||
|
||||
runNamedThread :: DaemonStatusHandle -> NamedThread -> IO ()
|
||||
runNamedThread dstatus (NamedThread name a) = go
|
||||
where
|
||||
go = do
|
||||
r <- E.try a :: IO (Either E.SomeException ())
|
||||
case r of
|
||||
Right _ -> noop
|
||||
Left e -> do
|
||||
let msg = unwords
|
||||
[ name
|
||||
, "crashed:"
|
||||
, show e
|
||||
]
|
||||
hPutStrLn stderr msg
|
||||
-- TODO click to restart
|
||||
void $ addAlert dstatus $
|
||||
warningAlert name msg
|
||||
|
|
|
@ -8,9 +8,7 @@
|
|||
module Assistant.DaemonStatus where
|
||||
|
||||
import Common.Annex
|
||||
import Assistant.ThreadedMonad
|
||||
import Assistant.Alert
|
||||
import Utility.ThreadScheduler
|
||||
import Utility.TempFile
|
||||
import Utility.NotificationBroadcaster
|
||||
import Logs.Transfer
|
||||
|
@ -114,23 +112,6 @@ startDaemonStatus = do
|
|||
, knownRemotes = remotes
|
||||
}
|
||||
|
||||
{- This writes the daemon status to disk, when it changes, but no more
|
||||
- frequently than once every ten minutes.
|
||||
-}
|
||||
daemonStatusThread :: ThreadState -> DaemonStatusHandle -> IO ()
|
||||
daemonStatusThread st dstatus = do
|
||||
notifier <- newNotificationHandle
|
||||
=<< changeNotifier <$> getDaemonStatus dstatus
|
||||
checkpoint
|
||||
runEvery (Seconds tenMinutes) $ do
|
||||
waitNotification notifier
|
||||
checkpoint
|
||||
where
|
||||
checkpoint = do
|
||||
status <- getDaemonStatus dstatus
|
||||
file <- runThreadState st $ fromRepo gitAnnexDaemonStatusFile
|
||||
writeDaemonStatusFile file status
|
||||
|
||||
{- Don't just dump out the structure, because it will change over time,
|
||||
- and parts of it are not relevant. -}
|
||||
writeDaemonStatusFile :: FilePath -> DaemonStatus -> IO ()
|
||||
|
|
|
@ -42,7 +42,7 @@ reconnectRemotes threadname st dstatus scanremotes rs = void $
|
|||
addScanRemotes scanremotes diverged rs
|
||||
return ok
|
||||
where
|
||||
(gitremotes, specialremotes) =
|
||||
(gitremotes, _specialremotes) =
|
||||
partition (Git.repoIsUrl . Remote.repo) rs
|
||||
sync (Just branch) = do
|
||||
diverged <- manualPull st (Just branch) gitremotes
|
||||
|
|
|
@ -36,8 +36,8 @@ thisThread :: ThreadName
|
|||
thisThread = "Committer"
|
||||
|
||||
{- This thread makes git commits at appropriate times. -}
|
||||
commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> IO ()
|
||||
commitThread st changechan commitchan transferqueue dstatus = runEvery (Seconds 1) $ do
|
||||
commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> NamedThread
|
||||
commitThread st changechan commitchan transferqueue dstatus = thread $ runEvery (Seconds 1) $ do
|
||||
-- We already waited one second as a simple rate limiter.
|
||||
-- Next, wait until at least one change is available for
|
||||
-- processing.
|
||||
|
@ -61,6 +61,7 @@ commitThread st changechan commitchan transferqueue dstatus = runEvery (Seconds
|
|||
else refill readychanges
|
||||
else refill changes
|
||||
where
|
||||
thread = NamedThread thisThread
|
||||
refill [] = noop
|
||||
refill cs = do
|
||||
debug thisThread
|
||||
|
|
36
Assistant/Threads/DaemonStatus.hs
Normal file
36
Assistant/Threads/DaemonStatus.hs
Normal file
|
@ -0,0 +1,36 @@
|
|||
{- git-annex assistant daemon status thread
|
||||
-
|
||||
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
module Assistant.Threads.DaemonStatus where
|
||||
|
||||
import Assistant.Common
|
||||
import Assistant.DaemonStatus
|
||||
import Assistant.ThreadedMonad
|
||||
import Utility.ThreadScheduler
|
||||
import Utility.NotificationBroadcaster
|
||||
|
||||
thisThread :: ThreadName
|
||||
thisThread = "DaemonStatus"
|
||||
|
||||
{- This writes the daemon status to disk, when it changes, but no more
|
||||
- frequently than once every ten minutes.
|
||||
-}
|
||||
daemonStatusThread :: ThreadState -> DaemonStatusHandle -> NamedThread
|
||||
daemonStatusThread st dstatus = thread $ do
|
||||
notifier <- newNotificationHandle
|
||||
=<< changeNotifier <$> getDaemonStatus dstatus
|
||||
checkpoint
|
||||
runEvery (Seconds tenMinutes) $ do
|
||||
waitNotification notifier
|
||||
checkpoint
|
||||
where
|
||||
thread = NamedThread thisThread
|
||||
checkpoint = do
|
||||
status <- getDaemonStatus dstatus
|
||||
file <- runThreadState st $ fromRepo gitAnnexDaemonStatusFile
|
||||
writeDaemonStatusFile file status
|
||||
|
|
@ -22,8 +22,8 @@ thisThread = "Merger"
|
|||
{- This thread watches for changes to .git/refs/heads/synced/,
|
||||
- which indicate incoming pushes. It merges those pushes into the
|
||||
- currently checked out branch. -}
|
||||
mergeThread :: ThreadState -> IO ()
|
||||
mergeThread st = do
|
||||
mergeThread :: ThreadState -> NamedThread
|
||||
mergeThread st = thread $ do
|
||||
g <- runThreadState st $ fromRepo id
|
||||
let dir = Git.localGitDir g </> "refs" </> "heads" </> "synced"
|
||||
createDirectoryIfMissing True dir
|
||||
|
@ -34,6 +34,8 @@ mergeThread st = do
|
|||
}
|
||||
void $ watchDir dir (const False) hooks id
|
||||
debug thisThread ["watching", dir]
|
||||
where
|
||||
thread = NamedThread thisThread
|
||||
|
||||
type Handler = Git.Repo -> FilePath -> Maybe FileStatus -> IO ()
|
||||
|
||||
|
|
|
@ -38,13 +38,15 @@ import Data.Word (Word32)
|
|||
thisThread :: ThreadName
|
||||
thisThread = "MountWatcher"
|
||||
|
||||
mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
|
||||
mountWatcherThread st handle scanremotes =
|
||||
mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread
|
||||
mountWatcherThread st handle scanremotes = thread $
|
||||
#if WITH_DBUS
|
||||
dbusThread st handle scanremotes
|
||||
#else
|
||||
pollingThread st handle scanremotes
|
||||
#endif
|
||||
where
|
||||
thread = NamedThread thisThread
|
||||
|
||||
#if WITH_DBUS
|
||||
|
||||
|
|
|
@ -15,13 +15,11 @@ import Assistant.ThreadedMonad
|
|||
import Assistant.DaemonStatus
|
||||
import Assistant.ScanRemotes
|
||||
import Assistant.Sync
|
||||
import qualified Git
|
||||
import Utility.ThreadScheduler
|
||||
import Remote.List
|
||||
import qualified Types.Remote as Remote
|
||||
|
||||
import qualified Control.Exception as E
|
||||
import Control.Concurrent
|
||||
|
||||
#if WITH_DBUS
|
||||
import Utility.DBus
|
||||
|
@ -35,18 +33,27 @@ import Data.Word (Word32)
|
|||
thisThread :: ThreadName
|
||||
thisThread = "NetWatcher"
|
||||
|
||||
netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
|
||||
netWatcherThread st dstatus scanremotes = do
|
||||
netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread
|
||||
netWatcherThread st dstatus scanremotes = thread $ do
|
||||
#if WITH_DBUS
|
||||
void $ forkIO $ dbusThread st dstatus scanremotes
|
||||
dbusThread st dstatus scanremotes
|
||||
#else
|
||||
noop
|
||||
#endif
|
||||
{- This is a fallback for when dbus cannot be used to detect
|
||||
- network connection changes, but it also ensures that
|
||||
- any networked remotes that may have not been routable for a
|
||||
- while (despite the local network staying up), are synced with
|
||||
- periodically. -}
|
||||
where
|
||||
thread = NamedThread thisThread
|
||||
|
||||
{- This is a fallback for when dbus cannot be used to detect
|
||||
- network connection changes, but it also ensures that
|
||||
- any networked remotes that may have not been routable for a
|
||||
- while (despite the local network staying up), are synced with
|
||||
- periodically. -}
|
||||
netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread
|
||||
netWatcherFallbackThread st dstatus scanremotes = thread $ do
|
||||
runEvery (Seconds 3600) $
|
||||
handleConnection st dstatus scanremotes
|
||||
where
|
||||
thread = NamedThread thisThread
|
||||
|
||||
#if WITH_DBUS
|
||||
|
||||
|
|
|
@ -24,8 +24,8 @@ thisThread :: ThreadName
|
|||
thisThread = "Pusher"
|
||||
|
||||
{- This thread retries pushes that failed before. -}
|
||||
pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> IO ()
|
||||
pushRetryThread st dstatus pushmap = runEvery (Seconds halfhour) $ do
|
||||
pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> NamedThread
|
||||
pushRetryThread st dstatus pushmap = thread $ runEvery (Seconds halfhour) $ do
|
||||
-- We already waited half an hour, now wait until there are failed
|
||||
-- pushes to retry.
|
||||
topush <- getFailedPushesBefore pushmap (fromIntegral halfhour)
|
||||
|
@ -40,10 +40,11 @@ pushRetryThread st dstatus pushmap = runEvery (Seconds halfhour) $ do
|
|||
pushToRemotes thisThread now st (Just pushmap) topush
|
||||
where
|
||||
halfhour = 1800
|
||||
thread = NamedThread thisThread
|
||||
|
||||
{- This thread pushes git commits out to remotes soon after they are made. -}
|
||||
pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> IO ()
|
||||
pushThread st dstatus commitchan pushmap = do
|
||||
pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> NamedThread
|
||||
pushThread st dstatus commitchan pushmap = thread $ do
|
||||
runEvery (Seconds 2) $ do
|
||||
-- We already waited two seconds as a simple rate limiter.
|
||||
-- Next, wait until at least one commit has been made
|
||||
|
@ -64,11 +65,12 @@ pushThread st dstatus commitchan pushmap = do
|
|||
, "commits"
|
||||
]
|
||||
refillCommits commitchan commits
|
||||
where
|
||||
pushable r
|
||||
| Remote.specialRemote r = False
|
||||
| Remote.readonly r = False
|
||||
| otherwise = True
|
||||
where
|
||||
thread = NamedThread thisThread
|
||||
pushable r
|
||||
| Remote.specialRemote r = False
|
||||
| Remote.readonly r = False
|
||||
| otherwise = True
|
||||
|
||||
{- Decide if now is a good time to push to remotes.
|
||||
-
|
||||
|
|
|
@ -25,8 +25,8 @@ thisThread :: ThreadName
|
|||
thisThread = "SanityChecker"
|
||||
|
||||
{- This thread wakes up occasionally to make sure the tree is in good shape. -}
|
||||
sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO ()
|
||||
sanityCheckerThread st dstatus transferqueue changechan = forever $ do
|
||||
sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> NamedThread
|
||||
sanityCheckerThread st dstatus transferqueue changechan = thread $ forever $ do
|
||||
waitForNextCheck dstatus
|
||||
|
||||
debug thisThread ["starting sanity check"]
|
||||
|
@ -35,6 +35,7 @@ sanityCheckerThread st dstatus transferqueue changechan = forever $ do
|
|||
|
||||
debug thisThread ["sanity check complete"]
|
||||
where
|
||||
thread = NamedThread thisThread
|
||||
go = do
|
||||
modifyDaemonStatus_ dstatus $ \s -> s
|
||||
{ sanityCheckRunning = True }
|
||||
|
|
|
@ -21,8 +21,8 @@ thisThread = "TransferPoller"
|
|||
|
||||
{- This thread polls the status of ongoing transfers, determining how much
|
||||
- of each transfer is complete. -}
|
||||
transferPollerThread :: ThreadState -> DaemonStatusHandle -> IO ()
|
||||
transferPollerThread st dstatus = do
|
||||
transferPollerThread :: ThreadState -> DaemonStatusHandle -> NamedThread
|
||||
transferPollerThread st dstatus = thread $ do
|
||||
g <- runThreadState st $ fromRepo id
|
||||
tn <- newNotificationHandle =<<
|
||||
transferNotifier <$> getDaemonStatus dstatus
|
||||
|
@ -33,6 +33,7 @@ transferPollerThread st dstatus = do
|
|||
then waitNotification tn -- block until transfers running
|
||||
else mapM_ (poll g) $ M.toList ts
|
||||
where
|
||||
thread = NamedThread thisThread
|
||||
poll g (t, info)
|
||||
{- Downloads are polled by checking the size of the
|
||||
- temp file being used for the transfer. -}
|
||||
|
|
|
@ -31,11 +31,12 @@ thisThread = "TransferScanner"
|
|||
{- This thread waits until a remote needs to be scanned, to find transfers
|
||||
- that need to be made, to keep data in sync.
|
||||
-}
|
||||
transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> IO ()
|
||||
transferScannerThread st dstatus scanremotes transferqueue = do
|
||||
transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> NamedThread
|
||||
transferScannerThread st dstatus scanremotes transferqueue = thread $ do
|
||||
startupScan
|
||||
go S.empty
|
||||
where
|
||||
thread = NamedThread thisThread
|
||||
go scanned = do
|
||||
threadDelaySeconds (Seconds 2)
|
||||
(rs, infos) <- unzip <$> getScanRemote scanremotes
|
||||
|
|
|
@ -20,8 +20,8 @@ thisThread = "TransferWatcher"
|
|||
|
||||
{- This thread watches for changes to the gitAnnexTransferDir,
|
||||
- and updates the DaemonStatus's map of ongoing transfers. -}
|
||||
transferWatcherThread :: ThreadState -> DaemonStatusHandle -> IO ()
|
||||
transferWatcherThread st dstatus = do
|
||||
transferWatcherThread :: ThreadState -> DaemonStatusHandle -> NamedThread
|
||||
transferWatcherThread st dstatus = thread $ do
|
||||
g <- runThreadState st $ fromRepo id
|
||||
let dir = gitAnnexTransferDir g
|
||||
createDirectoryIfMissing True dir
|
||||
|
@ -33,6 +33,8 @@ transferWatcherThread st dstatus = do
|
|||
}
|
||||
void $ watchDir dir (const False) hooks id
|
||||
debug thisThread ["watching for transfers"]
|
||||
where
|
||||
thread = NamedThread thisThread
|
||||
|
||||
type Handler = ThreadState -> DaemonStatusHandle -> FilePath -> Maybe FileStatus -> IO ()
|
||||
|
||||
|
|
|
@ -30,9 +30,10 @@ maxTransfers :: Int
|
|||
maxTransfers = 1
|
||||
|
||||
{- Dispatches transfers from the queue. -}
|
||||
transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> TransferSlots -> IO ()
|
||||
transfererThread st dstatus transferqueue slots = go =<< readProgramFile
|
||||
transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> TransferSlots -> NamedThread
|
||||
transfererThread st dstatus transferqueue slots = thread $ go =<< readProgramFile
|
||||
where
|
||||
thread = NamedThread thisThread
|
||||
go program = forever $ inTransferSlot dstatus slots $
|
||||
maybe (return Nothing) (uncurry $ startTransfer st dstatus program)
|
||||
=<< getNextTransfer transferqueue dstatus notrunning
|
||||
|
|
|
@ -56,8 +56,8 @@ needLsof = error $ unlines
|
|||
, "Be warned: This can corrupt data in the annex, and make fsck complain."
|
||||
]
|
||||
|
||||
watchThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO ()
|
||||
watchThread st dstatus transferqueue changechan = do
|
||||
watchThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> NamedThread
|
||||
watchThread st dstatus transferqueue changechan = NamedThread thisThread $ do
|
||||
void $ watchDir "." ignored hooks startup
|
||||
debug thisThread [ "watching", "."]
|
||||
where
|
||||
|
|
|
@ -50,8 +50,8 @@ webAppThread
|
|||
-> TransferSlots
|
||||
-> Maybe (IO String)
|
||||
-> Maybe (Url -> FilePath -> IO ())
|
||||
-> IO ()
|
||||
webAppThread mst dstatus scanremotes transferqueue transferslots postfirstrun onstartup = do
|
||||
-> NamedThread
|
||||
webAppThread mst dstatus scanremotes transferqueue transferslots postfirstrun onstartup = thread $ do
|
||||
webapp <- WebApp
|
||||
<$> pure mst
|
||||
<*> pure dstatus
|
||||
|
@ -72,6 +72,7 @@ webAppThread mst dstatus scanremotes transferqueue transferslots postfirstrun on
|
|||
Nothing -> withTempFile "webapp.html" $ \tmpfile _ -> go port webapp tmpfile
|
||||
Just st -> go port webapp =<< runThreadState st (fromRepo gitAnnexHtmlShim)
|
||||
where
|
||||
thread = NamedThread thisThread
|
||||
getreldir Nothing = return Nothing
|
||||
getreldir (Just st) = Just <$>
|
||||
(relHome =<< absPath
|
||||
|
|
|
@ -10,6 +10,7 @@ module Command.WebApp where
|
|||
import Common.Annex
|
||||
import Command
|
||||
import Assistant
|
||||
import Assistant.Common
|
||||
import Assistant.DaemonStatus
|
||||
import Assistant.ScanRemotes
|
||||
import Assistant.TransferQueue
|
||||
|
@ -93,8 +94,9 @@ firstRun = do
|
|||
transferslots <- newTransferSlots
|
||||
v <- newEmptyMVar
|
||||
let callback a = Just $ a v
|
||||
webAppThread Nothing dstatus scanremotes transferqueue transferslots
|
||||
(callback signaler) (callback mainthread)
|
||||
void $ runNamedThread dstatus $
|
||||
webAppThread Nothing dstatus scanremotes transferqueue transferslots
|
||||
(callback signaler) (callback mainthread)
|
||||
where
|
||||
signaler v = do
|
||||
putMVar v ""
|
||||
|
|
Loading…
Reference in a new issue