initial implementation of XMPP push notifier (untested)
Lacking error handling, reconnection, credentials configuration, and doesn't actually do anything when it receives an incoming notification. Other than that, it might work! :)
This commit is contained in:
parent
21c27fed21
commit
32497feb2a
4 changed files with 124 additions and 17 deletions
|
@ -105,7 +105,7 @@
|
|||
- BranchChanged (STM SampleVar)
|
||||
- Changes to the git-annex branch are indicated by updating this
|
||||
- SampleVar.
|
||||
- PushNotifier (STM SampleVar)
|
||||
- PushNotifier (STM TChan)
|
||||
- After successful pushes, this SampleVar is updated.
|
||||
- UrlRenderer (MVar)
|
||||
- A Yesod route rendering function is stored here. This allows
|
||||
|
@ -216,7 +216,7 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do
|
|||
, assist $ transferScannerThread st dstatus scanremotes transferqueue
|
||||
, assist $ configMonitorThread st dstatus branchhandle commitchan
|
||||
#ifdef WITH_XMPP
|
||||
, assist $ pushNotifierThread dstatus pushnotifier
|
||||
, assist $ pushNotifierThread st dstatus pushnotifier
|
||||
#endif
|
||||
, watch $ watchThread st dstatus transferqueue changechan
|
||||
]
|
||||
|
|
|
@ -8,9 +8,9 @@
|
|||
module Assistant.Pushes where
|
||||
|
||||
import Common.Annex
|
||||
import Utility.TSet
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Control.Concurrent.MSampleVar
|
||||
import Data.Time.Clock
|
||||
import qualified Data.Map as M
|
||||
|
||||
|
@ -19,7 +19,7 @@ type PushMap = M.Map Remote UTCTime
|
|||
type FailedPushMap = TMVar PushMap
|
||||
|
||||
{- Used to notify about successful pushes. -}
|
||||
newtype PushNotifier = PushNotifier (MSampleVar ())
|
||||
newtype PushNotifier = PushNotifier (TSet UUID)
|
||||
|
||||
{- The TMVar starts empty, and is left empty when there are no
|
||||
- failed pushes. This way we can block until there are some failed pushes.
|
||||
|
@ -50,10 +50,10 @@ changeFailedPushMap v a = atomically $
|
|||
| otherwise = putTMVar v $! m
|
||||
|
||||
newPushNotifier :: IO PushNotifier
|
||||
newPushNotifier = PushNotifier <$> newEmptySV
|
||||
newPushNotifier = PushNotifier <$> newTSet
|
||||
|
||||
notifyPush :: PushNotifier -> IO ()
|
||||
notifyPush (PushNotifier sv) = writeSV sv ()
|
||||
notifyPush :: [UUID] -> PushNotifier -> IO ()
|
||||
notifyPush us (PushNotifier s) = putTSet s us
|
||||
|
||||
waitPush :: PushNotifier -> IO ()
|
||||
waitPush (PushNotifier sv) = readSV sv
|
||||
waitPush :: PushNotifier -> IO [UUID]
|
||||
waitPush (PushNotifier s) = getTSet s
|
||||
|
|
|
@ -101,7 +101,7 @@ pushToRemotes threadname now st mpushnotifier mpushmap remotes = do
|
|||
let ok = null failed
|
||||
if ok
|
||||
then do
|
||||
maybe noop notifyPush mpushnotifier
|
||||
maybe noop (notifyPush $ map Remote.uuid succeeded) mpushnotifier
|
||||
return ok
|
||||
else if shouldretry
|
||||
then retry branch g u failed
|
||||
|
@ -127,7 +127,7 @@ pushToRemotes threadname now st mpushnotifier mpushmap remotes = do
|
|||
(succeeded, failed) <- inParallel (pushfallback g u branch) rs
|
||||
updatemap succeeded failed
|
||||
unless (null succeeded) $
|
||||
maybe noop notifyPush mpushnotifier
|
||||
maybe noop (notifyPush $ map Remote.uuid succeeded) mpushnotifier
|
||||
return $ null failed
|
||||
|
||||
push g branch remote = Command.Sync.pushBranch remote branch g
|
||||
|
|
|
@ -1,4 +1,7 @@
|
|||
{- git-annex assistant push notification thread
|
||||
{- git-annex assistant push notification thread, using XMPP
|
||||
-
|
||||
- This handles both sending outgoing push notifications, and receiving
|
||||
- incoming push notifications.
|
||||
-
|
||||
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
||||
-
|
||||
|
@ -8,14 +11,118 @@
|
|||
module Assistant.Threads.PushNotifier where
|
||||
|
||||
import Assistant.Common
|
||||
import Assistant.ThreadedMonad
|
||||
import Assistant.DaemonStatus
|
||||
import Assistant.Pushes
|
||||
import qualified Remote
|
||||
|
||||
import Network.Protocol.XMPP
|
||||
import Network
|
||||
import Control.Concurrent
|
||||
import qualified Data.Text as T
|
||||
import qualified Data.Set as S
|
||||
import Utility.FileMode
|
||||
|
||||
thisThread :: ThreadName
|
||||
thisThread = "PushNotifier"
|
||||
|
||||
pushNotifierThread :: PushNotifier -> NamedThread
|
||||
pushNotifierThread pushnotifier = thread $ forever $ do
|
||||
waitPush pushnotifier
|
||||
-- TODO
|
||||
pushNotifierThread :: ThreadState -> DaemonStatusHandle -> PushNotifier -> NamedThread
|
||||
pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ do
|
||||
v <- runThreadState st $ getXMPPCreds
|
||||
case v of
|
||||
Nothing -> nocreds
|
||||
Just c -> case parseJID (xmppUsername c) of
|
||||
Nothing -> nocreds
|
||||
Just jid -> void $ client c jid
|
||||
where
|
||||
thread = NamedThread thisThread
|
||||
nocreds = do
|
||||
-- TODO alert
|
||||
return () -- exit thread
|
||||
|
||||
client c jid = runClient server jid (xmppUsername c) (xmppPassword c) $ do
|
||||
void $ bindJID jid
|
||||
void $ putStanza $ emptyPresence PresenceUnavailable
|
||||
s <- getSession
|
||||
_ <- liftIO $ forkIO $ void $ sendnotifications s
|
||||
receivenotifications
|
||||
where
|
||||
server = Server
|
||||
(JID Nothing (jidDomain jid) Nothing)
|
||||
(xmppHostname c)
|
||||
(PortNumber $ fromIntegral $ xmppPort c)
|
||||
|
||||
sendnotifications session = runXMPP session $ forever $ do
|
||||
us <- liftIO $ waitPush pushnotifier
|
||||
{- Toggle presence to send the notification. -}
|
||||
putStanza $ (emptyPresence PresenceAvailable)
|
||||
{ presenceID = Just $ encodePushNotification us }
|
||||
putStanza $ emptyPresence PresenceUnavailable
|
||||
|
||||
receivenotifications = forever $ do
|
||||
s <- getStanza
|
||||
case s of
|
||||
ReceivedPresence (Presence { presenceType = PresenceAvailable, presenceID = Just t }) ->
|
||||
maybe noop (liftIO . pull dstatus)
|
||||
(decodePushNotification t)
|
||||
_ -> noop
|
||||
|
||||
{- Everything we need to know to connect to an XMPP server. -}
|
||||
data XMPPCreds = XMPPCreds
|
||||
{ xmppUsername :: T.Text
|
||||
, xmppPassword :: T.Text
|
||||
, xmppHostname :: HostName
|
||||
, xmppPort :: Int
|
||||
}
|
||||
deriving (Read, Show)
|
||||
|
||||
getXMPPCreds :: Annex (Maybe XMPPCreds)
|
||||
getXMPPCreds = do
|
||||
f <- xmppCredsFile
|
||||
s <- liftIO $ catchMaybeIO $ readFile f
|
||||
return $ readish =<< s
|
||||
|
||||
setXMPPCreds :: XMPPCreds -> Annex ()
|
||||
setXMPPCreds creds = do
|
||||
f <- xmppCredsFile
|
||||
liftIO $ do
|
||||
h <- openFile f WriteMode
|
||||
modifyFileMode f $ removeModes
|
||||
[groupReadMode, otherReadMode]
|
||||
hPutStr h (show creds)
|
||||
hClose h
|
||||
|
||||
xmppCredsFile :: Annex FilePath
|
||||
xmppCredsFile = do
|
||||
dir <- fromRepo gitAnnexCredsDir
|
||||
return $ dir </> "notify-xmpp"
|
||||
|
||||
{- A push notification is encoded in the id field of an XMPP presence
|
||||
- notification, in the form: "git-annex-push:uuid[:uuid:...]
|
||||
-
|
||||
- Git repos can be pushed to that do not have a git-annex uuid; an empty
|
||||
- string is used for those.
|
||||
-}
|
||||
prefix :: T.Text
|
||||
prefix = T.pack "git-annex-push:"
|
||||
|
||||
delim :: T.Text
|
||||
delim = T.pack ":"
|
||||
|
||||
encodePushNotification :: [UUID] -> T.Text
|
||||
encodePushNotification us = T.concat
|
||||
[ prefix
|
||||
, T.intercalate delim $ map (T.pack . fromUUID) us
|
||||
]
|
||||
|
||||
decodePushNotification :: T.Text -> Maybe [UUID]
|
||||
decodePushNotification t = map (toUUID . T.unpack) . T.splitOn delim
|
||||
<$> T.stripPrefix prefix t
|
||||
|
||||
pull :: DaemonStatusHandle -> [UUID] -> IO ()
|
||||
pull _ [] = noop
|
||||
pull dstatus us = do
|
||||
rs <- filter matching . syncRemotes <$> getDaemonStatus dstatus
|
||||
print ("TODO pull from", rs)
|
||||
where
|
||||
matching r = Remote.uuid r `S.member` s
|
||||
s = S.fromList us
|
||||
|
|
Loading…
Add table
Reference in a new issue