added push notifier thread, currently a no-op

Hooked up everything that needs to notify on pushes. Note that
syncNewRemote does not notify. This is probably ok, and I'd need to thread
more state through to make it do so.

This is only set up to support a single push notification method; I didn't
use a NotificationBroadcaster. Partly because I don't yet know what info
about pushes needs to be communicated, so my data types are only
preliminary.
This commit is contained in:
Joey Hess 2012-10-24 13:35:43 -04:00
parent 6b6ce30b42
commit ae8a3ab31e
7 changed files with 100 additions and 50 deletions

View file

@ -69,7 +69,9 @@
- Thread 18: ConfigMonitor - Thread 18: ConfigMonitor
- Triggered by changes to the git-annex branch, checks for changed - Triggered by changes to the git-annex branch, checks for changed
- config files, and reloads configs. - config files, and reloads configs.
- Thread 19: WebApp - Thread 19: PushNotifier
- Notifies other repositories of pushes, using out of band signaling.
- Thread 20: WebApp
- Spawns more threads as necessary to handle clients. - Spawns more threads as necessary to handle clients.
- Displays the DaemonStatus. - Displays the DaemonStatus.
- -
@ -100,6 +102,11 @@
- ScanRemotes (STM TMVar) - ScanRemotes (STM TMVar)
- Remotes that have been disconnected, and should be scanned - Remotes that have been disconnected, and should be scanned
- are indicated by writing to this TMVar. - are indicated by writing to this TMVar.
- BranchChanged (STM SampleVar)
- Changes to the git-annex branch are indicated by updating this
- SampleVar.
- PushNotifier (STM SampleVar)
- After successful pushes, this SampleVar is updated.
- UrlRenderer (MVar) - UrlRenderer (MVar)
- A Yesod route rendering function is stored here. This allows - A Yesod route rendering function is stored here. This allows
- things that need to render Yesod routes to block until the webapp - things that need to render Yesod routes to block until the webapp
@ -133,6 +140,7 @@ import Assistant.Threads.NetWatcher
import Assistant.Threads.TransferScanner import Assistant.Threads.TransferScanner
import Assistant.Threads.TransferPoller import Assistant.Threads.TransferPoller
import Assistant.Threads.ConfigMonitor import Assistant.Threads.ConfigMonitor
import Assistant.Threads.PushNotifier
#ifdef WITH_WEBAPP #ifdef WITH_WEBAPP
import Assistant.WebApp import Assistant.WebApp
import Assistant.Threads.WebApp import Assistant.Threads.WebApp
@ -180,6 +188,7 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do
transferslots <- newTransferSlots transferslots <- newTransferSlots
scanremotes <- newScanRemoteMap scanremotes <- newScanRemoteMap
branchhandle <- newBranchChangeHandle branchhandle <- newBranchChangeHandle
pushnotifier <- newPushNotifier
#ifdef WITH_WEBAPP #ifdef WITH_WEBAPP
urlrenderer <- newUrlRenderer urlrenderer <- newUrlRenderer
#endif #endif
@ -191,19 +200,20 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do
, assist $ pairListenerThread st dstatus scanremotes urlrenderer , assist $ pairListenerThread st dstatus scanremotes urlrenderer
#endif #endif
#endif #endif
, assist $ pushThread st dstatus commitchan pushmap , assist $ pushThread st dstatus commitchan pushmap pushnotifier
, assist $ pushRetryThread st dstatus pushmap , assist $ pushRetryThread st dstatus pushmap pushnotifier
, assist $ mergeThread st dstatus transferqueue branchhandle , assist $ mergeThread st dstatus transferqueue branchhandle
, assist $ transferWatcherThread st dstatus transferqueue , assist $ transferWatcherThread st dstatus transferqueue
, assist $ transferPollerThread st dstatus , assist $ transferPollerThread st dstatus
, assist $ transfererThread st dstatus transferqueue transferslots , assist $ transfererThread st dstatus transferqueue transferslots
, assist $ daemonStatusThread st dstatus , assist $ daemonStatusThread st dstatus
, assist $ sanityCheckerThread st dstatus transferqueue changechan , assist $ sanityCheckerThread st dstatus transferqueue changechan
, assist $ mountWatcherThread st dstatus scanremotes , assist $ mountWatcherThread st dstatus scanremotes pushnotifier
, assist $ netWatcherThread st dstatus scanremotes , assist $ netWatcherThread st dstatus scanremotes pushnotifier
, assist $ netWatcherFallbackThread st dstatus scanremotes , assist $ netWatcherFallbackThread st dstatus scanremotes pushnotifier
, assist $ transferScannerThread st dstatus scanremotes transferqueue , assist $ transferScannerThread st dstatus scanremotes transferqueue
, assist $ configMonitorThread st dstatus branchhandle commitchan , assist $ configMonitorThread st dstatus branchhandle commitchan
, assist $ pushNotifierThread pushnotifier
, watch $ watchThread st dstatus transferqueue changechan , watch $ watchThread st dstatus transferqueue changechan
] ]
waitForTermination waitForTermination

View file

@ -10,6 +10,7 @@ module Assistant.Pushes where
import Common.Annex import Common.Annex
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Data.Time.Clock import Data.Time.Clock
import qualified Data.Map as M import qualified Data.Map as M
@ -17,6 +18,9 @@ import qualified Data.Map as M
type PushMap = M.Map Remote UTCTime type PushMap = M.Map Remote UTCTime
type FailedPushMap = TMVar PushMap type FailedPushMap = TMVar PushMap
{- Used to notify about successful pushes. -}
newtype PushNotifier = PushNotifier (MSampleVar ())
{- The TMVar starts empty, and is left empty when there are no {- The TMVar starts empty, and is left empty when there are no
- failed pushes. This way we can block until there are some failed pushes. - failed pushes. This way we can block until there are some failed pushes.
-} -}
@ -44,3 +48,12 @@ changeFailedPushMap v a = atomically $
store m store m
| m == M.empty = noop | m == M.empty = noop
| otherwise = putTMVar v $! m | otherwise = putTMVar v $! m
newPushNotifier :: IO PushNotifier
newPushNotifier = PushNotifier <$> newEmptySV
notifyPush :: PushNotifier -> IO ()
notifyPush (PushNotifier sv) = writeSV sv ()
waitPush :: PushNotifier -> IO ()
waitPush (PushNotifier sv) = readSV sv

View file

@ -36,9 +36,9 @@ import Control.Concurrent
- the remotes have diverged from the local git-annex branch. Otherwise, - the remotes have diverged from the local git-annex branch. Otherwise,
- it's sufficient to requeue failed transfers. - it's sufficient to requeue failed transfers.
-} -}
reconnectRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> [Remote] -> IO () reconnectRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Maybe PushNotifier -> [Remote] -> IO ()
reconnectRemotes _ _ _ _ [] = noop reconnectRemotes _ _ _ _ _ [] = noop
reconnectRemotes threadname st dstatus scanremotes rs = void $ reconnectRemotes threadname st dstatus scanremotes pushnotifier rs = void $
alertWhile dstatus (syncAlert rs) $ do alertWhile dstatus (syncAlert rs) $ do
(ok, diverged) <- sync (ok, diverged) <- sync
=<< runThreadState st (inRepo Git.Branch.current) =<< runThreadState st (inRepo Git.Branch.current)
@ -50,7 +50,7 @@ reconnectRemotes threadname st dstatus scanremotes rs = void $
sync (Just branch) = do sync (Just branch) = do
diverged <- manualPull st (Just branch) gitremotes diverged <- manualPull st (Just branch) gitremotes
now <- getCurrentTime now <- getCurrentTime
ok <- pushToRemotes threadname now st Nothing gitremotes ok <- pushToRemotes threadname now st pushnotifier Nothing gitremotes
return (ok, diverged) return (ok, diverged)
{- No local branch exists yet, but we can try pulling. -} {- No local branch exists yet, but we can try pulling. -}
sync Nothing = do sync Nothing = do
@ -81,8 +81,8 @@ reconnectRemotes threadname st dstatus scanremotes rs = void $
- them. While ugly, those branches are reserved for pushing by us, and - them. While ugly, those branches are reserved for pushing by us, and
- so our pushes will succeed. - so our pushes will succeed.
-} -}
pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> Maybe FailedPushMap -> [Remote] -> IO Bool pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> Maybe PushNotifier -> Maybe FailedPushMap -> [Remote] -> IO Bool
pushToRemotes threadname now st mpushmap remotes = do pushToRemotes threadname now st mpushnotifier mpushmap remotes = do
(g, branch, u) <- runThreadState st $ (,,) (g, branch, u) <- runThreadState st $ (,,)
<$> gitRepo <$> gitRepo
<*> inRepo Git.Branch.current <*> inRepo Git.Branch.current
@ -100,7 +100,9 @@ pushToRemotes threadname now st mpushmap remotes = do
updatemap succeeded [] updatemap succeeded []
let ok = null failed let ok = null failed
if ok if ok
then return ok then do
maybe noop notifyPush mpushnotifier
return ok
else if shouldretry else if shouldretry
then retry branch g u failed then retry branch g u failed
else fallback branch g u failed else fallback branch g u failed
@ -124,6 +126,8 @@ pushToRemotes threadname now st mpushmap remotes = do
] ]
(succeeded, failed) <- inParallel (pushfallback g u branch) rs (succeeded, failed) <- inParallel (pushfallback g u branch) rs
updatemap succeeded failed updatemap succeeded failed
unless (null succeeded) $
maybe noop notifyPush mpushnotifier
return $ null failed return $ null failed
push g branch remote = Command.Sync.pushBranch remote branch g push g branch remote = Command.Sync.pushBranch remote branch g
@ -157,4 +161,4 @@ manualPull st currentbranch remotes = do
syncNewRemote :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Remote -> IO () syncNewRemote :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Remote -> IO ()
syncNewRemote st dstatus scanremotes remote = do syncNewRemote st dstatus scanremotes remote = do
runThreadState st $ updateSyncRemotes dstatus runThreadState st $ updateSyncRemotes dstatus
void $ forkIO $ reconnectRemotes "SyncRemote" st dstatus scanremotes [remote] void $ forkIO $ reconnectRemotes "SyncRemote" st dstatus scanremotes Nothing [remote]

View file

@ -15,6 +15,7 @@ import Assistant.ThreadedMonad
import Assistant.DaemonStatus import Assistant.DaemonStatus
import Assistant.ScanRemotes import Assistant.ScanRemotes
import Assistant.Sync import Assistant.Sync
import Assistant.Pushes
import qualified Annex import qualified Annex
import qualified Git import qualified Git
import Utility.ThreadScheduler import Utility.ThreadScheduler
@ -38,20 +39,20 @@ import qualified Control.Exception as E
thisThread :: ThreadName thisThread :: ThreadName
thisThread = "MountWatcher" thisThread = "MountWatcher"
mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread
mountWatcherThread st handle scanremotes = thread $ mountWatcherThread st handle scanremotes pushnotifier = thread $
#if WITH_DBUS #if WITH_DBUS
dbusThread st handle scanremotes dbusThread st handle scanremotes pushnotifier
#else #else
pollingThread st handle scanremotes pollingThread st handle scanremotes pushnotifier
#endif #endif
where where
thread = NamedThread thisThread thread = NamedThread thisThread
#if WITH_DBUS #if WITH_DBUS
dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO ()
dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr dbusThread st dstatus scanremotes pushnotifier = E.catch (go =<< connectSession) onerr
where where
go client = ifM (checkMountMonitor client) go client = ifM (checkMountMonitor client)
( do ( do
@ -64,7 +65,7 @@ dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr
listen client matcher $ \_event -> do listen client matcher $ \_event -> do
nowmounted <- currentMountPoints nowmounted <- currentMountPoints
wasmounted <- swapMVar mvar nowmounted wasmounted <- swapMVar mvar nowmounted
handleMounts st dstatus scanremotes wasmounted nowmounted handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted
, do , do
runThreadState st $ runThreadState st $
warning "No known volume monitor available through dbus; falling back to mtab polling" warning "No known volume monitor available through dbus; falling back to mtab polling"
@ -75,7 +76,7 @@ dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr
runThreadState st $ runThreadState st $
warning $ "Failed to use dbus; falling back to mtab polling (" ++ show e ++ ")" warning $ "Failed to use dbus; falling back to mtab polling (" ++ show e ++ ")"
pollinstead pollinstead
pollinstead = pollingThread st dstatus scanremotes pollinstead = pollingThread st dstatus scanremotes pushnotifier
{- Examine the list of services connected to dbus, to see if there {- Examine the list of services connected to dbus, to see if there
- are any we can use to monitor mounts. If not, will attempt to start one. -} - are any we can use to monitor mounts. If not, will attempt to start one. -}
@ -137,24 +138,24 @@ mountChanged = [gvfs True, gvfs False, kde, kdefallback]
#endif #endif
pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO ()
pollingThread st dstatus scanremotes = go =<< currentMountPoints pollingThread st dstatus scanremotes pushnotifier = go =<< currentMountPoints
where where
go wasmounted = do go wasmounted = do
threadDelaySeconds (Seconds 10) threadDelaySeconds (Seconds 10)
nowmounted <- currentMountPoints nowmounted <- currentMountPoints
handleMounts st dstatus scanremotes wasmounted nowmounted handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted
go nowmounted go nowmounted
handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> MountPoints -> MountPoints -> IO () handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> MountPoints -> MountPoints -> IO ()
handleMounts st dstatus scanremotes wasmounted nowmounted = handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted =
mapM_ (handleMount st dstatus scanremotes . mnt_dir) $ mapM_ (handleMount st dstatus scanremotes pushnotifier . mnt_dir) $
S.toList $ newMountPoints wasmounted nowmounted S.toList $ newMountPoints wasmounted nowmounted
handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> FilePath -> IO () handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> FilePath -> IO ()
handleMount st dstatus scanremotes dir = do handleMount st dstatus scanremotes pushnotifier dir = do
debug thisThread ["detected mount of", dir] debug thisThread ["detected mount of", dir]
reconnectRemotes thisThread st dstatus scanremotes reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier)
=<< filter (Git.repoIsLocal . Remote.repo) =<< filter (Git.repoIsLocal . Remote.repo)
<$> remotesUnder st dstatus dir <$> remotesUnder st dstatus dir

View file

@ -15,6 +15,7 @@ import Assistant.ThreadedMonad
import Assistant.DaemonStatus import Assistant.DaemonStatus
import Assistant.ScanRemotes import Assistant.ScanRemotes
import Assistant.Sync import Assistant.Sync
import Assistant.Pushes
import Utility.ThreadScheduler import Utility.ThreadScheduler
import Remote.List import Remote.List
import qualified Types.Remote as Remote import qualified Types.Remote as Remote
@ -32,12 +33,12 @@ import qualified Control.Exception as E
thisThread :: ThreadName thisThread :: ThreadName
thisThread = "NetWatcher" thisThread = "NetWatcher"
netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread
#if WITH_DBUS #if WITH_DBUS
netWatcherThread st dstatus scanremotes = thread $ netWatcherThread st dstatus scanremotes pushnotifier = thread $
dbusThread st dstatus scanremotes dbusThread st dstatus scanremotes pushnotifier
#else #else
netWatcherThread _ _ _ = thread noop netWatcherThread _ _ _ _ = thread noop
#endif #endif
where where
thread = NamedThread thisThread thread = NamedThread thisThread
@ -47,17 +48,17 @@ netWatcherThread _ _ _ = thread noop
- any networked remotes that may have not been routable for a - any networked remotes that may have not been routable for a
- while (despite the local network staying up), are synced with - while (despite the local network staying up), are synced with
- periodically. -} - periodically. -}
netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> NamedThread netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread
netWatcherFallbackThread st dstatus scanremotes = thread $ netWatcherFallbackThread st dstatus scanremotes pushnotifier = thread $
runEvery (Seconds 3600) $ runEvery (Seconds 3600) $
handleConnection st dstatus scanremotes handleConnection st dstatus scanremotes pushnotifier
where where
thread = NamedThread thisThread thread = NamedThread thisThread
#if WITH_DBUS #if WITH_DBUS
dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO ()
dbusThread st dstatus scanremotes = E.catch (go =<< connectSystem) onerr dbusThread st dstatus scanremotes pushnotifier = E.catch (go =<< connectSystem) onerr
where where
go client = ifM (checkNetMonitor client) go client = ifM (checkNetMonitor client)
( do ( do
@ -72,7 +73,7 @@ dbusThread st dstatus scanremotes = E.catch (go =<< connectSystem) onerr
warning $ "Failed to use dbus; falling back to polling (" ++ show e ++ ")" warning $ "Failed to use dbus; falling back to polling (" ++ show e ++ ")"
handle = do handle = do
debug thisThread ["detected network connection"] debug thisThread ["detected network connection"]
handleConnection st dstatus scanremotes handleConnection st dstatus scanremotes pushnotifier
{- Examine the list of services connected to dbus, to see if there {- Examine the list of services connected to dbus, to see if there
- are any we can use to monitor network connections. -} - are any we can use to monitor network connections. -}
@ -126,9 +127,9 @@ listenWicdConnections client callback =
#endif #endif
handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO () handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO ()
handleConnection st dstatus scanremotes = handleConnection st dstatus scanremotes pushnotifier =
reconnectRemotes thisThread st dstatus scanremotes reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier)
=<< networkRemotes st =<< networkRemotes st
{- Finds network remotes. -} {- Finds network remotes. -}

View file

@ -0,0 +1,21 @@
{- git-annex assistant push notification thread
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.Threads.PushNotifier where
import Assistant.Common
import Assistant.Pushes
thisThread :: ThreadName
thisThread = "PushNotifier"
pushNotifierThread :: PushNotifier -> NamedThread
pushNotifierThread pushnotifier = thread $ forever $ do
waitPush pushnotifier
-- TODO
where
thread = NamedThread thisThread

View file

@ -24,8 +24,8 @@ thisThread :: ThreadName
thisThread = "Pusher" thisThread = "Pusher"
{- This thread retries pushes that failed before. -} {- This thread retries pushes that failed before. -}
pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> NamedThread pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> PushNotifier -> NamedThread
pushRetryThread st dstatus pushmap = thread $ runEvery (Seconds halfhour) $ do pushRetryThread st dstatus pushmap pushnotifier = thread $ runEvery (Seconds halfhour) $ do
-- We already waited half an hour, now wait until there are failed -- We already waited half an hour, now wait until there are failed
-- pushes to retry. -- pushes to retry.
topush <- getFailedPushesBefore pushmap (fromIntegral halfhour) topush <- getFailedPushesBefore pushmap (fromIntegral halfhour)
@ -37,14 +37,14 @@ pushRetryThread st dstatus pushmap = thread $ runEvery (Seconds halfhour) $ do
] ]
now <- getCurrentTime now <- getCurrentTime
void $ alertWhile dstatus (pushRetryAlert topush) $ void $ alertWhile dstatus (pushRetryAlert topush) $
pushToRemotes thisThread now st (Just pushmap) topush pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) topush
where where
halfhour = 1800 halfhour = 1800
thread = NamedThread thisThread thread = NamedThread thisThread
{- This thread pushes git commits out to remotes soon after they are made. -} {- This thread pushes git commits out to remotes soon after they are made. -}
pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> NamedThread pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> PushNotifier -> NamedThread
pushThread st dstatus commitchan pushmap = thread $ runEvery (Seconds 2) $ do pushThread st dstatus commitchan pushmap pushnotifier = thread $ runEvery (Seconds 2) $ do
-- We already waited two seconds as a simple rate limiter. -- We already waited two seconds as a simple rate limiter.
-- Next, wait until at least one commit has been made -- Next, wait until at least one commit has been made
commits <- getCommits commitchan commits <- getCommits commitchan
@ -56,7 +56,7 @@ pushThread st dstatus commitchan pushmap = thread $ runEvery (Seconds 2) $ do
<$> getDaemonStatus dstatus <$> getDaemonStatus dstatus
unless (null remotes) $ unless (null remotes) $
void $ alertWhile dstatus (pushAlert remotes) $ void $ alertWhile dstatus (pushAlert remotes) $
pushToRemotes thisThread now st (Just pushmap) remotes pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) remotes
else do else do
debug thisThread debug thisThread
[ "delaying push of" [ "delaying push of"