add NetWatcher thread

This deals with interruptions in network connectevity, by listening
for a new network interface coming up (using dbus to see when
network-manager or wicd do it), and forcing a rescan of
This commit is contained in:
Joey Hess 2012-08-21 19:58:53 -04:00
parent e3e8d32924
commit 5a68acb521
4 changed files with 190 additions and 17 deletions

View file

@ -52,11 +52,17 @@
- state about that remote, pulls from it, and queues a push to it, - state about that remote, pulls from it, and queues a push to it,
- as well as an update, and queues it onto the - as well as an update, and queues it onto the
- ConnectedRemoteChan - ConnectedRemoteChan
- Thread 14: TransferScanner - Thread 13: NetWatcher
- Deals with network connection interruptions, which would cause
- transfers to fail, and can be recovered from by waiting for a
- network connection, and syncing with all network remotes.
- Uses dbus to watch for network connections, or when dbus
- cannot be used, assumes there's been one every 30 minutes.
- Thread 15: TransferScanner
- Does potentially expensive checks to find data that needs to be - Does potentially expensive checks to find data that needs to be
- transferred from or to remotes, and queues Transfers. - transferred from or to remotes, and queues Transfers.
- Uses the ScanRemotes map. - Uses the ScanRemotes map.
- Thread 15: WebApp - Thread 16: WebApp
- Spawns more threads as necessary to handle clients. - Spawns more threads as necessary to handle clients.
- Displays the DaemonStatus. - Displays the DaemonStatus.
- -
@ -110,6 +116,7 @@ import Assistant.Threads.TransferWatcher
import Assistant.Threads.Transferrer import Assistant.Threads.Transferrer
import Assistant.Threads.SanityChecker import Assistant.Threads.SanityChecker
import Assistant.Threads.MountWatcher import Assistant.Threads.MountWatcher
import Assistant.Threads.NetWatcher
import Assistant.Threads.TransferScanner import Assistant.Threads.TransferScanner
#ifdef WITH_WEBAPP #ifdef WITH_WEBAPP
import Assistant.Threads.WebApp import Assistant.Threads.WebApp
@ -165,6 +172,7 @@ startAssistant assistant daemonize webappwaiter = do
, assist $ daemonStatusThread st dstatus , assist $ daemonStatusThread st dstatus
, assist $ sanityCheckerThread st dstatus transferqueue changechan , assist $ sanityCheckerThread st dstatus transferqueue changechan
, assist $ mountWatcherThread st dstatus scanremotes , assist $ mountWatcherThread st dstatus scanremotes
, assist $ netWatcherThread st dstatus scanremotes
, assist $ transferScannerThread st dstatus scanremotes transferqueue , assist $ transferScannerThread st dstatus scanremotes transferqueue
, watch $ watchThread st dstatus transferqueue changechan , watch $ watchThread st dstatus transferqueue changechan
] ]

View file

@ -31,6 +31,7 @@ import qualified Data.Set as S
import Data.Time.Clock import Data.Time.Clock
#if WITH_DBUS #if WITH_DBUS
import Utility.DBus
import DBus.Client import DBus.Client
import DBus import DBus
import Data.Word (Word32) import Data.Word (Word32)
@ -78,20 +79,6 @@ dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr
pollinstead pollinstead
pollinstead = pollingThread st dstatus scanremotes pollinstead = pollingThread st dstatus scanremotes
type ServiceName = String
listServiceNames :: Client -> IO [ServiceName]
listServiceNames client = do
reply <- callDBus client "ListNames" []
return $ fromMaybe [] $ fromVariant (methodReturnBody reply !! 0)
callDBus :: Client -> MemberName -> [Variant] -> IO MethodReturn
callDBus client name params = call_ client $
(methodCall "/org/freedesktop/DBus" "org.freedesktop.DBus" name)
{ methodCallDestination = Just "org.freedesktop.DBus"
, methodCallBody = params
}
{- Examine the list of services connected to dbus, to see if there {- Examine the list of services connected to dbus, to see if there
- are any we can use to monitor mounts. If not, will attempt to start one. -} - are any we can use to monitor mounts. If not, will attempt to start one. -}
checkMountMonitor :: Client -> IO Bool checkMountMonitor :: Client -> IO Bool
@ -164,7 +151,7 @@ handleMount st dstatus scanremotes dir = do
let nonspecial = filter (Git.repoIsLocal . Remote.repo) rs let nonspecial = filter (Git.repoIsLocal . Remote.repo) rs
unless (null nonspecial) $ do unless (null nonspecial) $ do
void $ alertWhile dstatus (syncAlert nonspecial) $ do void $ alertWhile dstatus (syncAlert nonspecial) $ do
debug thisThread ["syncing with", show rs] debug thisThread ["syncing with", show nonspecial]
sync nonspecial =<< runThreadState st (inRepo Git.Branch.current) sync nonspecial =<< runThreadState st (inRepo Git.Branch.current)
addScanRemotes scanremotes nonspecial addScanRemotes scanremotes nonspecial
where where

View file

@ -0,0 +1,150 @@
{- git-annex assistant network connection watcher, using dbus
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE OverloadedStrings #-}
module Assistant.Threads.NetWatcher where
import Assistant.Common
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.ScanRemotes
import Assistant.Threads.Pusher (pushToRemotes)
import Assistant.Alert
import qualified Git
import Utility.ThreadScheduler
import Remote.List
import qualified Types.Remote as Remote
import Assistant.Threads.Merger
import qualified Git.Branch
import qualified Control.Exception as E
import Data.Time.Clock
#if WITH_DBUS
import Utility.DBus
import DBus.Client
import DBus
import Data.Word (Word32)
#else
#warning Building without dbus support; will poll for network connection changes
#endif
thisThread :: ThreadName
thisThread = "NetWatcher"
netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
netWatcherThread st handle scanremotes =
#if WITH_DBUS
dbusThread st handle scanremotes
#else
pollingThread st handle scanremotes
#endif
#if WITH_DBUS
dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
dbusThread st dstatus scanremotes = E.catch (go =<< connectSystem) onerr
where
go client = ifM (checkNetMonitor client)
( do
listenNMConnections client handle
listenWicdConnections client handle
, do
runThreadState st $
warning "No known network monitor available through dbus; falling back to polling"
pollinstead
)
onerr :: E.SomeException -> IO ()
onerr e = do
runThreadState st $
warning $ "Failed to use dbus; falling back to polling (" ++ show e ++ ")"
pollinstead
pollinstead = pollingThread st dstatus scanremotes
handle = do
debug thisThread ["detected network connection"]
handleConnection st dstatus scanremotes
{- Examine the list of services connected to dbus, to see if there
- are any we can use to monitor network connections. -}
checkNetMonitor :: Client -> IO Bool
checkNetMonitor client = do
running <- filter (`elem` [networkmanager, wicd])
<$> listServiceNames client
case running of
[] -> return False
(service:_) -> do
debug thisThread [ "Using running DBUS service"
, service
, "to monitor network connection events."
]
return True
where
networkmanager = "org.freedesktop.NetworkManager"
wicd = "org.wicd.daemon"
{- Listens for new NetworkManager connections. -}
listenNMConnections :: Client -> IO () -> IO ()
listenNMConnections client callback =
listen client matcher $ \event ->
when (Just True == anyM activeconnection (signalBody event)) $
callback
where
matcher = matchAny
{ matchInterface = Just "org.freedesktop.NetworkManager.Connection.Active"
, matchMember = Just "PropertiesChanged"
}
nm_connection_activated = toVariant (2 :: Word32)
nm_state_key = toVariant ("State" :: String)
activeconnection v = do
m <- fromVariant v
vstate <- lookup nm_state_key $ dictionaryItems m
state <- fromVariant vstate
return $ state == nm_connection_activated
{- Listens for new Wicd connections. -}
listenWicdConnections :: Client -> IO () -> IO ()
listenWicdConnections client callback =
listen client matcher $ \event ->
when (any (== wicd_success) (signalBody event)) $
callback
where
matcher = matchAny
{ matchInterface = Just "org.wicd.daemon"
, matchMember = Just "ConnectResultsSent"
}
wicd_success = toVariant ("success" :: String)
#endif
pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
pollingThread st dstatus scanremotes = runEvery (Seconds 3600) $
handleConnection st dstatus scanremotes
handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
handleConnection st dstatus scanremotes = do
rs <- networkRemotes st
unless (null rs) $ do
let nonspecial = filter (Git.repoIsUrl . Remote.repo) rs
unless (null nonspecial) $ do
void $ alertWhile dstatus (syncAlert nonspecial) $ do
debug thisThread ["syncing with", show nonspecial]
sync nonspecial =<< runThreadState st (inRepo Git.Branch.current)
addScanRemotes scanremotes nonspecial
where
sync rs (Just branch) = do
runThreadState st $ manualPull (Just branch) rs
now <- getCurrentTime
pushToRemotes thisThread now st Nothing rs
sync _ _ = return True
{- Finds network remotes. -}
networkRemotes :: ThreadState -> IO [Remote]
networkRemotes st = runThreadState st $ do
rs <- remoteList
return $ filter (isNothing . Remote.path) rs

28
Utility/DBus.hs Normal file
View file

@ -0,0 +1,28 @@
{- DBus utilities
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
{-# LANGUAGE OverloadedStrings #-}
module Utility.DBus where
import DBus.Client
import DBus
import Data.Maybe
type ServiceName = String
listServiceNames :: Client -> IO [ServiceName]
listServiceNames client = do
reply <- callDBus client "ListNames" []
return $ fromMaybe [] $ fromVariant (methodReturnBody reply !! 0)
callDBus :: Client -> MemberName -> [Variant] -> IO MethodReturn
callDBus client name params = call_ client $
(methodCall "/org/freedesktop/DBus" "org.freedesktop.DBus" name)
{ methodCallDestination = Just "org.freedesktop.DBus"
, methodCallBody = params
}