refactor XMPP client

This commit is contained in:
Joey Hess 2012-11-03 14:16:17 -04:00
parent 74385e3d38
commit a1228e27ed
12 changed files with 291 additions and 175 deletions

View file

@ -69,8 +69,8 @@
- Thread 18: ConfigMonitor
- Triggered by changes to the git-annex branch, checks for changed
- config files, and reloads configs.
- Thread 19: PushNotifier
- Notifies other repositories of pushes, using out of band signaling.
- Thread 19: XMPPClient
- Built-in XMPP client.
- Thread 20: WebApp
- Spawns more threads as necessary to handle clients.
- Displays the DaemonStatus.
@ -105,8 +105,10 @@
- BranchChanged (STM SampleVar)
- Changes to the git-annex branch are indicated by updating this
- SampleVar.
- PushNotifier (STM TChan)
- After successful pushes, this SampleVar is updated.
- NetMessagerControl (STM TChan, SampleVar)
- Used to feed messages to the built-in XMPP client, and
- signal it when it needs to restart due to configuration or
- networking changes.
- UrlRenderer (MVar)
- A Yesod route rendering function is stored here. This allows
- things that need to render Yesod routes to block until the webapp
@ -135,7 +137,7 @@ import Assistant.Threads.TransferScanner
import Assistant.Threads.TransferPoller
import Assistant.Threads.ConfigMonitor
#ifdef WITH_XMPP
import Assistant.Threads.PushNotifier
import Assistant.Threads.XMPPClient
#endif
#ifdef WITH_WEBAPP
import Assistant.WebApp
@ -204,7 +206,7 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do
, assist $ transferScannerThread
, assist $ configMonitorThread
#ifdef WITH_XMPP
, assist $ pushNotifierThread
, assist $ xmppClientThread
#endif
, watch $ watchThread
]

View file

@ -35,6 +35,7 @@ import Assistant.Types.BranchChange
import Assistant.Types.Commits
import Assistant.Types.Changes
import Assistant.Types.Buddies
import Assistant.Types.NetMessager
newtype Assistant a = Assistant { mkAssistant :: ReaderT AssistantData IO a }
deriving (
@ -55,12 +56,12 @@ data AssistantData = AssistantData
, scanRemoteMap :: ScanRemoteMap
, transferQueue :: TransferQueue
, transferSlots :: TransferSlots
, pushNotifier :: PushNotifier
, failedPushMap :: FailedPushMap
, commitChan :: CommitChan
, changeChan :: ChangeChan
, branchChangeHandle :: BranchChangeHandle
, buddyList :: BuddyList
, netMessagerControl :: NetMessagerControl
}
newAssistantData :: ThreadState -> DaemonStatusHandle -> IO AssistantData
@ -71,12 +72,12 @@ newAssistantData st dstatus = AssistantData
<*> newScanRemoteMap
<*> newTransferQueue
<*> newTransferSlots
<*> newPushNotifier
<*> newFailedPushMap
<*> newCommitChan
<*> newChangeChan
<*> newBranchChangeHandle
<*> newBuddyList
<*> newNetMessagerControl
runAssistant :: Assistant a -> AssistantData -> IO a
runAssistant a = runReaderT (mkAssistant a)

28
Assistant/NetMessager.hs Normal file
View file

@ -0,0 +1,28 @@
{- git-annex assistant out of band network messager interface
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.NetMessager where
import Assistant.Common
import Assistant.Types.NetMessager
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
sendNetMessage :: NetMessage -> Assistant ()
sendNetMessage m =
(atomically . flip writeTChan m) <<~ (netMessages . netMessagerControl)
waitNetMessage :: Assistant (NetMessage)
waitNetMessage = (atomically . readTChan) <<~ (netMessages . netMessagerControl)
notifyNetMessagerRestart :: Assistant ()
notifyNetMessagerRestart =
flip writeSV () <<~ (netMessagerRestart . netMessagerControl)
waitNetMessagerRestart :: Assistant ()
waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessagerControl)

View file

@ -9,10 +9,8 @@ module Assistant.Pushes where
import Assistant.Common
import Assistant.Types.Pushes
import Utility.TSet
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Data.Time.Clock
import qualified Data.Map as M
@ -40,15 +38,3 @@ changeFailedPushMap a = do
store v m
| m == M.empty = noop
| otherwise = putTMVar v $! m
notifyPush :: [UUID] -> Assistant ()
notifyPush us = flip putTSet us <<~ (pushNotifierSuccesses . pushNotifier)
waitPush :: Assistant [UUID]
waitPush = getTSet <<~ (pushNotifierSuccesses . pushNotifier)
notifyRestart :: Assistant ()
notifyRestart = flip writeSV () <<~ (pushNotifierWaiter . pushNotifier)
waitRestart :: Assistant ()
waitRestart = readSV <<~ (pushNotifierWaiter . pushNotifier)

View file

@ -9,6 +9,8 @@ module Assistant.Sync where
import Assistant.Common
import Assistant.Pushes
import Assistant.NetMessager
import Assistant.Types.NetMessager
import Assistant.Alert
import Assistant.DaemonStatus
import Assistant.ScanRemotes
@ -102,7 +104,8 @@ pushToRemotes now notifypushes remotes = do
if null failed
then do
when notifypushes $
notifyPush (map Remote.uuid succeeded)
sendNetMessage $ NotifyPush $
map Remote.uuid succeeded
return True
else if shouldretry
then retry branch g u failed
@ -124,7 +127,8 @@ pushToRemotes now notifypushes remotes = do
inParallel (pushfallback g u branch) rs
updatemap succeeded failed
when (notifypushes && (not $ null succeeded)) $
notifyPush (map Remote.uuid succeeded)
sendNetMessage $ NotifyPush $
map Remote.uuid succeeded
return $ null failed
push g branch remote = Command.Sync.pushBranch remote branch g

View file

@ -12,7 +12,7 @@ module Assistant.Threads.NetWatcher where
import Assistant.Common
import Assistant.Sync
import Assistant.Pushes
import Assistant.NetMessager
import Utility.ThreadScheduler
import Remote.List
import qualified Types.Remote as Remote
@ -62,7 +62,7 @@ dbusThread = do
)
handleconn = do
debug ["detected network connection"]
notifyRestart
notifyNetMessagerRestart
handleConnection
onerr e _ = do
liftAnnex $

View file

@ -1,124 +0,0 @@
{- git-annex assistant push notification thread, using XMPP
-
- This handles both sending outgoing push notifications, and receiving
- incoming push notifications.
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.Threads.PushNotifier where
import Assistant.Common
import Assistant.XMPP
import Assistant.XMPP.Client
import Assistant.Pushes
import Assistant.Types.Buddies
import Assistant.XMPP.Buddies
import Assistant.Sync
import Assistant.DaemonStatus
import qualified Remote
import Utility.ThreadScheduler
import Network.Protocol.XMPP
import Control.Concurrent
import qualified Data.Set as S
import qualified Git.Branch
import Data.Time.Clock
pushNotifierThread :: NamedThread
pushNotifierThread = NamedThread "PushNotifier" $ do
iodebug <- asIO1 debug
iopull <- asIO1 pull
iowaitpush <- asIO waitPush
ioupdatebuddies <- asIO1 $ \p -> do
updateBuddyList (updateBuddies p) <<~ buddyList
debug =<< map show <$> getBuddyList <<~ buddyList
ioemptybuddies <- asIO $
updateBuddyList (const noBuddies) <<~ buddyList
ioclient <- asIO $
xmppClient iowaitpush iodebug iopull ioupdatebuddies ioemptybuddies
forever $ do
tid <- liftIO $ forkIO ioclient
waitRestart
liftIO $ killThread tid
xmppClient
:: (IO [UUID])
-> ([String] -> IO ())
-> ([UUID] -> IO ())
-> (Presence -> IO ())
-> IO ()
-> Assistant ()
xmppClient iowaitpush iodebug iopull ioupdatebuddies ioemptybuddies = do
v <- liftAnnex getXMPPCreds
case v of
Nothing -> noop
Just c -> liftIO $ loop c =<< getCurrentTime
where
loop c starttime = do
void $ connectXMPP c $ \jid -> do
fulljid <- bindJID jid
liftIO $ iodebug ["XMPP connected", show fulljid]
{- The buddy list starts empty each time the client
- connects, so that stale info is not retained. -}
liftIO ioemptybuddies
putStanza $ gitAnnexPresence gitAnnexSignature
s <- getSession
_ <- liftIO $ forkIO $ void $ runXMPP s $
receivenotifications fulljid
sendnotifications
now <- getCurrentTime
if diffUTCTime now starttime > 300
then do
iodebug ["XMPP connection lost; reconnecting"]
loop c now
else do
iodebug ["XMPP connection failed; will retry"]
threadDelaySeconds (Seconds 300)
loop c =<< getCurrentTime
sendnotifications = forever $ do
us <- liftIO iowaitpush
putStanza $ gitAnnexPresence $ encodePushNotification us
receivenotifications fulljid = forever $ do
s <- getStanza
liftIO $ iodebug ["received XMPP:", show s]
case s of
ReceivedPresence p@(Presence { presenceFrom = from })
| from == Just fulljid -> noop
| otherwise -> do
liftIO $ ioupdatebuddies p
when (isGitAnnexPresence p) $
liftIO $ iopull $ concat $ catMaybes $
map decodePushNotification $
presencePayloads p
_ -> noop
{- We only pull from one remote out of the set listed in the push
- notification, as an optimisation.
-
- Note that it might be possible (though very unlikely) for the push
- notification to take a while to be sent, and multiple pushes happen
- before it is sent, so it includes multiple remotes that were pushed
- to at different times.
-
- It could then be the case that the remote we choose had the earlier
- push sent to it, but then failed to get the later push, and so is not
- fully up-to-date. If that happens, the pushRetryThread will come along
- and retry the push, and we'll get another notification once it succeeds,
- and pull again. -}
pull :: [UUID] -> Assistant ()
pull [] = noop
pull us = do
rs <- filter matching . syncRemotes <$> getDaemonStatus
debug $ "push notification for" : map (fromUUID . Remote.uuid ) rs
pullone rs =<< liftAnnex (inRepo Git.Branch.current)
where
matching r = Remote.uuid r `S.member` s
s = S.fromList us
pullone [] _ = noop
pullone (r:rs) branch =
unlessM (all id . fst <$> manualPull branch [r]) $
pullone rs branch

View file

@ -0,0 +1,155 @@
{- git-annex XMPP client
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.Threads.XMPPClient where
import Assistant.Common
import Assistant.XMPP
import Assistant.XMPP.Client
import Assistant.NetMessager
import Assistant.Types.NetMessager
import Assistant.Types.Buddies
import Assistant.XMPP.Buddies
import Assistant.Sync
import Assistant.DaemonStatus
import qualified Remote
import Utility.ThreadScheduler
import Network.Protocol.XMPP
import Control.Concurrent
import qualified Data.Set as S
import qualified Git.Branch
import Data.Time.Clock
xmppClientThread :: NamedThread
xmppClientThread = NamedThread "XMPPClient" $ do
iodebug <- asIO1 debug
iopull <- asIO1 pull
ioupdatebuddies <- asIO1 $ \p ->
updateBuddyList (updateBuddies p) <<~ buddyList
ioemptybuddies <- asIO $
updateBuddyList (const noBuddies) <<~ buddyList
iorelay <- asIO relayNetMessage
ioclientthread <- asIO $
go iorelay iodebug iopull ioupdatebuddies ioemptybuddies
restartableClient ioclientthread
where
go iorelay iodebug iopull ioupdatebuddies ioemptybuddies = do
v <- liftAnnex getXMPPCreds
case v of
Nothing -> noop
Just c -> liftIO $ loop c =<< getCurrentTime
where
debug' = void . liftIO . iodebug
{- When the client exits, it's restarted;
- if it keeps failing, back off to wait 5 minutes before
- trying it again. -}
loop c starttime = do
runclient c
now <- getCurrentTime
if diffUTCTime now starttime > 300
then do
void $ iodebug ["connection lost; reconnecting"]
loop c now
else do
void $ iodebug ["connection failed; will retry"]
threadDelaySeconds (Seconds 300)
loop c =<< getCurrentTime
runclient c = void $ connectXMPP c $ \jid -> do
fulljid <- bindJID jid
debug' ["connected", show fulljid]
{- The buddy list starts empty each time
- the client connects, so that stale info
- is not retained. -}
void $ liftIO ioemptybuddies
putStanza $ gitAnnexPresence gitAnnexSignature
xmppThread $ receivenotifications fulljid
forever $ do
a <- liftIO iorelay
a
receivenotifications fulljid = forever $ do
s <- getStanza
let v = decodeStanza fulljid s
debug' ["received:", show v]
case v of
PresenceMessage p -> void $ liftIO $ ioupdatebuddies p
PresenceQuery p -> do
void $ liftIO $ ioupdatebuddies p
putStanza $ gitAnnexPresence gitAnnexSignature
PushNotification us -> void $ liftIO $ iopull us
Ignorable _ -> noop
Unknown _ -> noop
{- Waits for a NetMessager message to be sent, and relays it to XMPP. -}
relayNetMessage :: Assistant (XMPP ())
relayNetMessage = convert <$> waitNetMessage
where
convert (NotifyPush us) =
putStanza $ gitAnnexPresence $ encodePushNotification us
data DecodedStanza
= PresenceMessage Presence
| PresenceQuery Presence
| PushNotification [UUID]
| Ignorable Presence
| Unknown ReceivedStanza
deriving Show
decodeStanza :: JID -> ReceivedStanza -> DecodedStanza
decodeStanza fulljid (ReceivedPresence p)
| presenceFrom p == Nothing = Ignorable p
| presenceFrom p == Just fulljid = Ignorable p
| isPresenceQuery p = PresenceQuery p
| null pushed = Ignorable p
| otherwise = PushNotification pushed
where
pushed = concat $ catMaybes $ map decodePushNotification $
presencePayloads p
decodeStanza _ s = Unknown s
{- Runs the client, handing restart events. -}
restartableClient :: IO () -> Assistant ()
restartableClient a = forever $ do
tid <- liftIO $ forkIO a
waitNetMessagerRestart
liftIO $ killThread tid
{- Runs a XMPP action in a separate thread, using a session to allow it
- to access the same XMPP client. -}
xmppThread :: XMPP () -> XMPP ()
xmppThread a = do
s <- getSession
void $ liftIO $ forkIO $
void $ runXMPP s a
{- We only pull from one remote out of the set listed in the push
- notification, as an optimisation.
-
- Note that it might be possible (though very unlikely) for the push
- notification to take a while to be sent, and multiple pushes happen
- before it is sent, so it includes multiple remotes that were pushed
- to at different times.
-
- It could then be the case that the remote we choose had the earlier
- push sent to it, but then failed to get the later push, and so is not
- fully up-to-date. If that happens, the pushRetryThread will come along
- and retry the push, and we'll get another notification once it succeeds,
- and pull again. -}
pull :: [UUID] -> Assistant ()
pull [] = noop
pull us = do
rs <- filter matching . syncRemotes <$> getDaemonStatus
debug $ "push notification for" : map (fromUUID . Remote.uuid ) rs
pullone rs =<< liftAnnex (inRepo Git.Branch.current)
where
matching r = Remote.uuid r `S.member` s
s = S.fromList us
pullone [] _ = noop
pullone (r:rs) branch =
unlessM (all id . fst <$> manualPull branch [r]) $
pullone rs branch

View file

@ -0,0 +1,31 @@
{- git-annex assistant out of band network messager types
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.Types.NetMessager where
import Common.Annex
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
{- Messages that can be sent out of band by a network messager. -}
data NetMessage = NotifyPush [UUID]
{- Controls for the XMPP client.
-
- It can be fed XMPP messages to send.
-
- It can also be sent a signal when it should restart for some reason. -}
data NetMessagerControl = NetMessagerControl
{ netMessages :: TChan (NetMessage)
, netMessagerRestart :: MSampleVar ()
}
newNetMessagerControl :: IO NetMessagerControl
newNetMessagerControl = NetMessagerControl
<$> atomically newTChan
<*> newEmptySV

View file

@ -8,10 +8,8 @@
module Assistant.Types.Pushes where
import Common.Annex
import Utility.TSet
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Data.Time.Clock
import qualified Data.Map as M
@ -19,24 +17,8 @@ import qualified Data.Map as M
type PushMap = M.Map Remote UTCTime
type FailedPushMap = TMVar PushMap
{- The TSet is recent, successful pushes that other remotes should be
- notified about.
-
- The MSampleVar is written to when the PushNotifier thread should be
- restarted for some reason.
-}
data PushNotifier = PushNotifier
{ pushNotifierSuccesses :: TSet UUID
, pushNotifierWaiter :: MSampleVar ()
}
{- The TMVar starts empty, and is left empty when there are no
- failed pushes. This way we can block until there are some failed pushes.
-}
newFailedPushMap :: IO FailedPushMap
newFailedPushMap = atomically newEmptyTMVar
newPushNotifier :: IO PushNotifier
newPushNotifier = PushNotifier
<$> newTSet
<*> newEmptySV

View file

@ -23,7 +23,7 @@ import Assistant.Common
#ifdef WITH_XMPP
import Assistant.XMPP.Client
import Assistant.XMPP.Buddies
import Assistant.Pushes
import Assistant.NetMessager
import Utility.SRV
#endif
@ -80,7 +80,7 @@ getXMPPR' redirto = xmppPage $ do
where
storecreds creds = do
void $ runAnnex undefined $ setXMPPCreds creds
liftAssistant notifyRestart
liftAssistant notifyNetMessagerRestart
redirect redirto
#endif

View file

@ -7,7 +7,9 @@
module Assistant.XMPP where
import Common.Annex
import Assistant.Common
import Annex.UUID
import Assistant.Pairing
import Network.Protocol.XMPP
import qualified Data.Text as T
@ -21,24 +23,44 @@ gitAnnexPresence tag = (emptyPresence PresenceAvailable)
extendedAway = Element (Name (T.pack "show") Nothing Nothing) []
[NodeContent $ ContentText $ T.pack "xa"]
{- Does a presence contain a gitp-annex tag? -}
{- Does a presence contain a git-annex tag? -}
isGitAnnexPresence :: Presence -> Bool
isGitAnnexPresence p = any matchingtag (presencePayloads p)
where
matchingtag t = elementName t == gitAnnexTagName
isGitAnnexPresence p = any isGitAnnexTag (presencePayloads p)
{- Name of a git-annex tag, in our own XML namespace.
- (Not using a namespace URL to avoid unnecessary bloat.) -}
gitAnnexTagName :: Name
gitAnnexTagName = Name (T.pack "git-annex") (Just $ T.pack "git-annex") Nothing
isGitAnnexTag :: Element -> Bool
isGitAnnexTag t = elementName t == gitAnnexTagName
{- A git-annex tag, to let other clients know we're a git-annex client too. -}
gitAnnexSignature :: Element
gitAnnexSignature = Element gitAnnexTagName [] []
queryAttr :: Name
queryAttr = Name (T.pack "query") Nothing Nothing
pushAttr :: Name
pushAttr = Name (T.pack "push") Nothing Nothing
pairingAttr :: Name
pairingAttr = Name (T.pack "pairing") Nothing Nothing
isAttr :: Name -> (Name, [Content]) -> Bool
isAttr attr (k, _) = k == attr
getAttr :: Name -> [(Name, [Content])] -> Maybe String
getAttr wantattr attrs = content <$> headMaybe (filter (isAttr wantattr) attrs)
where
content (_name, cs) = T.unpack $ T.concat $ map unpack cs
unpack (ContentText t) = t
unpack (ContentEntity t) = t
uuidAttr :: Name
uuidAttr = Name (T.pack "uuid") Nothing Nothing
uuidSep :: T.Text
uuidSep = T.pack ","
@ -61,3 +83,32 @@ decodePushNotification (Element name attrs _nodes)
ispush (k, _) = k == pushAttr
fromContent (ContentText t) = t
fromContent (ContentEntity t) = t
{- A request for other git-annex clients to send presence. -}
presenceQuery :: Presence
presenceQuery = gitAnnexPresence $ Element gitAnnexTagName
[ (queryAttr, [ContentText T.empty]) ]
[]
isPresenceQuery :: Presence -> Bool
isPresenceQuery p = case filter isGitAnnexTag (presencePayloads p) of
[] -> False
((Element _name attrs _nodes):_) -> any (isAttr queryAttr) attrs
{- A notification about a stage of pairing. -}
pairingNotification :: PairStage -> Annex Presence
pairingNotification pairstage = do
u <- getUUID
return $ gitAnnexPresence $ Element gitAnnexTagName
[ (pairingAttr, [ContentText $ T.pack $ show pairstage])
, (uuidAttr, [ContentText $ T.pack $ fromUUID u])
]
[]
isPairingNotification :: Presence -> Maybe (PairStage, UUID)
isPairingNotification p = case filter isGitAnnexTag (presencePayloads p) of
[] -> Nothing
((Element _name attrs _nodes):_) ->
(,)
<$> (readish =<< getAttr pairingAttr attrs)
<*> (toUUID <$> getAttr uuidAttr attrs)