reconnect XMPP when NetWatcher notices a change
This commit is contained in:
parent
9fc8257392
commit
2dc40ecbd1
6 changed files with 58 additions and 19 deletions
|
@ -11,6 +11,7 @@ import Common.Annex
|
||||||
import Utility.TSet
|
import Utility.TSet
|
||||||
|
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
|
import Control.Concurrent.MSampleVar
|
||||||
import Data.Time.Clock
|
import Data.Time.Clock
|
||||||
import qualified Data.Map as M
|
import qualified Data.Map as M
|
||||||
|
|
||||||
|
@ -18,8 +19,13 @@ import qualified Data.Map as M
|
||||||
type PushMap = M.Map Remote UTCTime
|
type PushMap = M.Map Remote UTCTime
|
||||||
type FailedPushMap = TMVar PushMap
|
type FailedPushMap = TMVar PushMap
|
||||||
|
|
||||||
{- Used to notify about successful pushes. -}
|
{- The TSet is recent, successful pushes that other remotes should be
|
||||||
newtype PushNotifier = PushNotifier (TSet UUID)
|
- notified about.
|
||||||
|
-
|
||||||
|
- The MSampleVar is written to when the PushNotifier thread should be
|
||||||
|
- restarted for some reason.
|
||||||
|
-}
|
||||||
|
data PushNotifier = PushNotifier (TSet UUID) (MSampleVar ())
|
||||||
|
|
||||||
{- The TMVar starts empty, and is left empty when there are no
|
{- The TMVar starts empty, and is left empty when there are no
|
||||||
- failed pushes. This way we can block until there are some failed pushes.
|
- failed pushes. This way we can block until there are some failed pushes.
|
||||||
|
@ -50,10 +56,18 @@ changeFailedPushMap v a = atomically $
|
||||||
| otherwise = putTMVar v $! m
|
| otherwise = putTMVar v $! m
|
||||||
|
|
||||||
newPushNotifier :: IO PushNotifier
|
newPushNotifier :: IO PushNotifier
|
||||||
newPushNotifier = PushNotifier <$> newTSet
|
newPushNotifier = PushNotifier
|
||||||
|
<$> newTSet
|
||||||
|
<*> newEmptySV
|
||||||
|
|
||||||
notifyPush :: [UUID] -> PushNotifier -> IO ()
|
notifyPush :: [UUID] -> PushNotifier -> IO ()
|
||||||
notifyPush us (PushNotifier s) = putTSet s us
|
notifyPush us (PushNotifier s _) = putTSet s us
|
||||||
|
|
||||||
waitPush :: PushNotifier -> IO [UUID]
|
waitPush :: PushNotifier -> IO [UUID]
|
||||||
waitPush (PushNotifier s) = getTSet s
|
waitPush (PushNotifier s _) = getTSet s
|
||||||
|
|
||||||
|
notifyRestart :: PushNotifier -> IO ()
|
||||||
|
notifyRestart (PushNotifier _ sv) = writeSV sv ()
|
||||||
|
|
||||||
|
waitRestart :: PushNotifier -> IO ()
|
||||||
|
waitRestart (PushNotifier _ sv) = readSV sv
|
||||||
|
|
|
@ -70,6 +70,7 @@ dbusThread st dstatus scanremotes pushnotifier =
|
||||||
)
|
)
|
||||||
handleconn = do
|
handleconn = do
|
||||||
debug thisThread ["detected network connection"]
|
debug thisThread ["detected network connection"]
|
||||||
|
notifyRestart pushnotifier
|
||||||
handleConnection st dstatus scanremotes pushnotifier
|
handleConnection st dstatus scanremotes pushnotifier
|
||||||
onerr e _ = do
|
onerr e _ = do
|
||||||
runThreadState st $
|
runThreadState st $
|
||||||
|
|
|
@ -28,11 +28,18 @@ import Data.Time.Clock
|
||||||
thisThread :: ThreadName
|
thisThread :: ThreadName
|
||||||
thisThread = "PushNotifier"
|
thisThread = "PushNotifier"
|
||||||
|
|
||||||
|
controllerThread :: PushNotifier -> IO () -> IO ()
|
||||||
|
controllerThread pushnotifier a = forever $ do
|
||||||
|
tid <- forkIO a
|
||||||
|
waitRestart pushnotifier
|
||||||
|
killThread tid
|
||||||
|
|
||||||
pushNotifierThread :: ThreadState -> DaemonStatusHandle -> PushNotifier -> NamedThread
|
pushNotifierThread :: ThreadState -> DaemonStatusHandle -> PushNotifier -> NamedThread
|
||||||
pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ do
|
pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $
|
||||||
|
controllerThread pushnotifier $ do
|
||||||
v <- runThreadState st $ getXMPPCreds
|
v <- runThreadState st $ getXMPPCreds
|
||||||
case v of
|
case v of
|
||||||
Nothing -> return () -- no creds? exit thread
|
Nothing -> noop
|
||||||
Just c -> loop c =<< getCurrentTime
|
Just c -> loop c =<< getCurrentTime
|
||||||
where
|
where
|
||||||
loop c starttime = do
|
loop c starttime = do
|
||||||
|
@ -53,7 +60,6 @@ pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ do
|
||||||
threadDelaySeconds (Seconds 300)
|
threadDelaySeconds (Seconds 300)
|
||||||
loop c =<< getCurrentTime
|
loop c =<< getCurrentTime
|
||||||
|
|
||||||
|
|
||||||
sendnotifications = forever $ do
|
sendnotifications = forever $ do
|
||||||
us <- liftIO $ waitPush pushnotifier
|
us <- liftIO $ waitPush pushnotifier
|
||||||
let payload = [extendedAway, encodePushNotification us]
|
let payload = [extendedAway, encodePushNotification us]
|
||||||
|
|
|
@ -16,7 +16,7 @@ import Network
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
import qualified Data.Text as T
|
import qualified Data.Text as T
|
||||||
import Data.XML.Types
|
import Data.XML.Types
|
||||||
import Control.Exception as E
|
import Control.Exception (SomeException)
|
||||||
|
|
||||||
{- Everything we need to know to connect to an XMPP server. -}
|
{- Everything we need to know to connect to an XMPP server. -}
|
||||||
data XMPPCreds = XMPPCreds
|
data XMPPCreds = XMPPCreds
|
||||||
|
@ -53,7 +53,11 @@ connectXMPP' jid c a = go =<< lookupSRV srvrecord
|
||||||
a jid
|
a jid
|
||||||
ifM (isEmptyMVar mv) (go rest, return r)
|
ifM (isEmptyMVar mv) (go rest, return r)
|
||||||
|
|
||||||
run h p a' = E.try (runClientError (Server serverjid h p) jid (xmppUsername c) (xmppPassword c) (void a')) :: IO (Either SomeException ())
|
{- Async exceptions are let through so the XMPP thread can
|
||||||
|
- be killed. -}
|
||||||
|
run h p a' = tryNonAsync $
|
||||||
|
runClientError (Server serverjid h p) jid
|
||||||
|
(xmppUsername c) (xmppPassword c) (void a')
|
||||||
|
|
||||||
{- XMPP runClient, that throws errors rather than returning an Either -}
|
{- XMPP runClient, that throws errors rather than returning an Either -}
|
||||||
runClientError :: Server -> JID -> T.Text -> T.Text -> XMPP a -> IO a
|
runClientError :: Server -> JID -> T.Text -> T.Text -> XMPP a -> IO a
|
||||||
|
|
|
@ -9,6 +9,8 @@
|
||||||
|
|
||||||
module Utility.DBus where
|
module Utility.DBus where
|
||||||
|
|
||||||
|
import Utility.Exception
|
||||||
|
|
||||||
import DBus.Client
|
import DBus.Client
|
||||||
import DBus
|
import DBus
|
||||||
import Data.Maybe
|
import Data.Maybe
|
||||||
|
@ -70,10 +72,7 @@ persistentClient :: IO (Maybe Address) -> v -> (SomeException -> v -> IO v) -> (
|
||||||
persistentClient getaddr v onretry clientaction =
|
persistentClient getaddr v onretry clientaction =
|
||||||
{- runClient can fail with not just ClientError, but also other
|
{- runClient can fail with not just ClientError, but also other
|
||||||
- things, if dbus is not running. Let async exceptions through. -}
|
- things, if dbus is not running. Let async exceptions through. -}
|
||||||
runClient getaddr clientaction `E.catches`
|
runClient getaddr clientaction `catchNonAsync` retry
|
||||||
[ Handler (\ (e :: AsyncException) -> E.throw e)
|
|
||||||
, Handler (\ (e :: SomeException) -> retry e)
|
|
||||||
]
|
|
||||||
where
|
where
|
||||||
retry e = do
|
retry e = do
|
||||||
v' <- onretry e v
|
v' <- onretry e v
|
||||||
|
@ -81,5 +80,5 @@ persistentClient getaddr v onretry clientaction =
|
||||||
|
|
||||||
{- Catches only ClientError -}
|
{- Catches only ClientError -}
|
||||||
catchClientError :: IO () -> (ClientError -> IO ()) -> IO ()
|
catchClientError :: IO () -> (ClientError -> IO ()) -> IO ()
|
||||||
catchClientError io handler = do
|
catchClientError io handler =
|
||||||
either handler return =<< (E.try io :: IO (Either ClientError ()))
|
either handler return =<< (E.try io :: IO (Either ClientError ()))
|
||||||
|
|
|
@ -1,10 +1,12 @@
|
||||||
{- Simple IO exception handling
|
{- Simple IO exception handling (and some more)
|
||||||
-
|
-
|
||||||
- Copyright 2011-2012 Joey Hess <joey@kitenet.net>
|
- Copyright 2011-2012 Joey Hess <joey@kitenet.net>
|
||||||
-
|
-
|
||||||
- Licensed under the GNU GPL version 3 or higher.
|
- Licensed under the GNU GPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
|
||||||
|
{-# LANGUAGE ScopedTypeVariables #-}
|
||||||
|
|
||||||
module Utility.Exception where
|
module Utility.Exception where
|
||||||
|
|
||||||
import Prelude hiding (catch)
|
import Prelude hiding (catch)
|
||||||
|
@ -34,3 +36,16 @@ catchIO = catch
|
||||||
{- try specialized for IO errors only -}
|
{- try specialized for IO errors only -}
|
||||||
tryIO :: IO a -> IO (Either IOException a)
|
tryIO :: IO a -> IO (Either IOException a)
|
||||||
tryIO = try
|
tryIO = try
|
||||||
|
|
||||||
|
{- Catches all exceptions except for async exceptions.
|
||||||
|
- This is often better to use than catching them all, so that
|
||||||
|
- ThreadKilled and UserInterrupt get through.
|
||||||
|
-}
|
||||||
|
catchNonAsync :: IO a -> (SomeException -> IO a) -> IO a
|
||||||
|
catchNonAsync a onerr = a `catches`
|
||||||
|
[ Handler (\ (e :: AsyncException) -> throw e)
|
||||||
|
, Handler (\ (e :: SomeException) -> onerr e)
|
||||||
|
]
|
||||||
|
|
||||||
|
tryNonAsync :: IO a -> IO (Either SomeException a)
|
||||||
|
tryNonAsync a = (Right <$> a) `catchNonAsync` (return . Left)
|
||||||
|
|
Loading…
Reference in a new issue