From fb3b9412e46688b4a22ac02099862f1a16eb8e8b Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Thu, 8 Nov 2012 16:44:23 -0400 Subject: [PATCH] xmpp push control flow It might even work, although nothing yet triggers XMPP pushes. Also added a set of deferred push messages. Only one push can run at a time, and unrelated push messages get deferred. The set will never grow very large, because it only puts two types of messages in there, that can only vary in the client doing the push. --- Assistant.hs | 8 ++-- Assistant/NetMessager.hs | 67 ++++++++++++++++++++++++++ Assistant/Threads/XMPPClient.hs | 13 +++-- Assistant/Types/NetMessager.hs | 29 +++++++++++- Assistant/XMPP/Git.hs | 84 ++++++++++++++++++++++----------- doc/design/assistant/xmpp.mdwn | 18 +++++-- 6 files changed, 174 insertions(+), 45 deletions(-) diff --git a/Assistant.hs b/Assistant.hs index 55641b90e4..5b3dd9cb9f 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -105,10 +105,10 @@ - BranchChanged (STM SampleVar) - Changes to the git-annex branch are indicated by updating this - SampleVar. - - NetMessager (STM TChan, SampleVar) - - Used to feed messages to the built-in XMPP client, and - - signal it when it needs to restart due to configuration or - - networking changes. + - NetMessager (STM TChan, TMVar, SampleVar) + - Used to feed messages to the built-in XMPP client, handle + - pushes, and signal it when it needs to restart due to configuration + - or networking changes. - UrlRenderer (MVar) - A Yesod route rendering function is stored here. This allows - things that need to render Yesod routes to block until the webapp diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs index 8895cb7dbe..8fac55c8a5 100644 --- a/Assistant/NetMessager.hs +++ b/Assistant/NetMessager.hs @@ -10,8 +10,11 @@ module Assistant.NetMessager where import Assistant.Common import Assistant.Types.NetMessager +import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.MSampleVar +import Control.Exception as E +import qualified Data.Set as S sendNetMessage :: NetMessage -> Assistant () sendNetMessage m = @@ -26,3 +29,67 @@ notifyNetMessagerRestart = waitNetMessagerRestart :: Assistant () waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager) + +getPushRunning :: Assistant PushRunning +getPushRunning = + (atomically . readTMVar) <<~ (netMessagerPushRunning . netMessager) + +{- Runs an action that runs either the send or receive end of a push. + - + - While the push is running, netMessagesPush will get messages put into it + - relating to this push, while any messages relating to other pushes + - go to netMessagesDeferred. Once the push finishes, those deferred + - messages will be fed to handledeferred for processing. + -} +runPush :: PushRunning -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a +runPush v handledeferred a = do + nm <- getAssistant netMessager + let pr = netMessagerPushRunning nm + let setup = void $ atomically $ swapTMVar pr v + let cleanup = atomically $ do + void $ swapTMVar pr NoPushRunning + emptytchan (netMessagesPush nm) + r <- E.bracket_ setup cleanup <~> a + (void . forkIO) <~> processdeferred nm + return r + where + emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c + processdeferred nm = do + s <- liftIO $ atomically $ swapTMVar (netMessagesDeferredPush nm) S.empty + mapM_ rundeferred (S.toList s) + rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ())))) + <~> handledeferred m + +{- While a push is running, matching push messages are put into + - netMessagesPush, while others go to netMessagesDeferredPush. To avoid + - bloating memory, only PushRequest and StartingPush messages are + - deferred. + - + - When no push is running, returns False. + -} +queueNetPushMessage :: NetMessage -> Assistant Bool +queueNetPushMessage m = do + nm <- getAssistant netMessager + liftIO $ atomically $ do + running <- readTMVar (netMessagerPushRunning nm) + case running of + NoPushRunning -> return False + SendPushRunning cid -> go nm cid + ReceivePushRunning cid -> go nm cid + where + go nm cid + | getClientID m == Just cid = do + writeTChan (netMessagesPush nm) m + return True + | otherwise = do + case m of + PushRequest _ -> defer nm + StartingPush _ -> defer nm + _ -> noop + return True + defer nm = do + s <- takeTMVar (netMessagesDeferredPush nm) + putTMVar (netMessagesDeferredPush nm) $ S.insert m s + +waitNetPushMessage :: Assistant (NetMessage) +waitNetPushMessage = (atomically . readTChan) <<~ (netMessagesPush . netMessager) diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs index 1117c3c140..efdecb5870 100644 --- a/Assistant/Threads/XMPPClient.hs +++ b/Assistant/Threads/XMPPClient.hs @@ -93,15 +93,14 @@ xmppClient urlrenderer d = do handle _ (PresenceMessage p) = void $ inAssistant $ updateBuddyList (updateBuddies p) <<~ buddyList handle _ (GotNetMessage QueryPresence) = putStanza gitAnnexSignature - handle _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $ - pull us + handle _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $ pull us handle selfjid (GotNetMessage (PairingNotification stage c u)) = maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c) - handle selfjid (GotNetMessage (PushRequest c)) = error "TODO" - handle selfjid (GotNetMessage (StartingPush c)) = error "TODO" - handle selfjid (GotNetMessage (ReceivePackOutput c b)) = error "TODO" - handle selfjid (GotNetMessage (SendPackOutput c b)) = error "TODO" - handle selfjid (GotNetMessage (ReceivePackDone c code)) = error "TODO" + handle _ (GotNetMessage m@(PushRequest _)) = inAssistant $ + unlessM (queueNetPushMessage m) $ void $ handlePush m + handle _ (GotNetMessage m@(StartingPush _)) = inAssistant $ + unlessM (queueNetPushMessage m) $ void $ handlePush m + handle _ (GotNetMessage m) = void $ inAssistant $ queueNetPushMessage m handle _ (Ignorable _) = noop handle _ (Unknown _) = noop handle _ (ProtocolError _) = noop diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs index 77f2759b34..6974cf57df 100644 --- a/Assistant/Types/NetMessager.hs +++ b/Assistant/Types/NetMessager.hs @@ -14,6 +14,7 @@ import Data.Text (Text) import Control.Concurrent.STM import Control.Concurrent.MSampleVar import Data.ByteString (ByteString) +import Data.Set as S {- Messages that can be sent out of band by a network messager. -} data NetMessage @@ -34,17 +35,41 @@ data NetMessage | SendPackOutput ClientID ByteString -- sent when git receive-pack exits, with its exit code | ReceivePackDone ClientID ExitCode - deriving (Show) + deriving (Show, Eq, Ord) -{- Something used to identify a specific client to send the message to. -} +{- Something used to identify the client, or clients to send the message to. -} type ClientID = Text +getClientID :: NetMessage -> Maybe ClientID +getClientID (NotifyPush _) = Nothing +getClientID QueryPresence = Nothing +getClientID (PairingNotification _ cid _) = Just cid +getClientID (PushRequest cid) = Just cid +getClientID (StartingPush cid) = Just cid +getClientID (ReceivePackOutput cid _) = Just cid +getClientID (SendPackOutput cid _) = Just cid +getClientID (ReceivePackDone cid _) = Just cid + data NetMessager = NetMessager + -- outgoing messages { netMessages :: TChan (NetMessage) + -- only one push can be running at a time, and this tracks it + , netMessagerPushRunning :: TMVar (PushRunning) + -- incoming messages relating to the currently running push + , netMessagesPush :: TChan (NetMessage) + -- incoming push messages that have been deferred to be processed later + , netMessagesDeferredPush :: TMVar (S.Set NetMessage) + -- write to this to restart the net messager , netMessagerRestart :: MSampleVar () } +data PushRunning = NoPushRunning | SendPushRunning ClientID | ReceivePushRunning ClientID + deriving (Eq) + newNetMessager :: IO NetMessager newNetMessager = NetMessager <$> atomically newTChan + <*> atomically (newTMVar NoPushRunning) + <*> atomically newTChan + <*> atomically (newTMVar S.empty) <*> newEmptySV diff --git a/Assistant/XMPP/Git.hs b/Assistant/XMPP/Git.hs index 7c4509c51e..344f943275 100644 --- a/Assistant/XMPP/Git.hs +++ b/Assistant/XMPP/Git.hs @@ -20,6 +20,8 @@ import Annex.UUID import Config import Git import Git.Command +import qualified Git.Branch +import qualified Annex.Branch import Locations.UserConfig import qualified Types.Remote as Remote @@ -31,8 +33,8 @@ import System.Process (std_in, std_out, std_err) import Control.Concurrent import qualified Data.ByteString as B -configKey :: Remote -> ConfigKey -configKey r = remoteConfig (Remote.repo r) "xmppaddress" +configKey :: UnqualifiedConfigKey +configKey = "xmppaddress" finishXMPPPairing :: JID -> UUID -> Assistant () finishXMPPPairing jid u = void $ alertWhile alert $ @@ -53,13 +55,15 @@ makeXMPPGitRemote buddyname jid u = do liftAnnex $ do let r = Remote.repo remote storeUUID (remoteConfig r "uuid") u - setConfig (configKey remote) xmppaddress + setConfig (remoteConfig r configKey) xmppaddress syncNewRemote remote return True where xmppaddress = T.unpack $ formatJID $ baseJID jid -{- Pushes the named refs to the remote, over XMPP. +{- Pushes the named refs to the remote, over XMPP, communicating with a + - specific client that either requested the push, or responded to our + - StartingPush message. - - Strategy: Set GIT_SSH to run git-annex. By setting the remote url - to "xmppgit:dummy", "git-annex xmppgit" will be run locally by @@ -78,11 +82,9 @@ makeXMPPGitRemote buddyname jid u = do - - We listen at the other end of the pipe and relay to and from XMPP. -} -xmppPush :: Remote -> [Ref] -> Assistant Bool -xmppPush remote refs = error "TODO" - -xmppPush' :: ClientID -> Remote -> [Ref] -> Assistant Bool -xmppPush' cid remote refs = do +xmppPush :: ClientID -> Remote -> [Ref] -> Assistant Bool +xmppPush cid remote refs = runPush (SendPushRunning cid) handleDeferred $ do + sendNetMessage $ StartingPush cid program <- liftIO readProgramFile (Fd inf, writepush) <- liftIO createPipe @@ -107,30 +109,26 @@ xmppPush' cid remote refs = do liftIO $ hSetBuffering outh NoBuffering t1 <- forkIO <~> toxmpp inh - t2 <- forkIO <~> fromxmpp outh - t3 <- forkIO <~> controlxmpp controlh + t2 <- forkIO <~> fromxmpp outh controlh ok <- liftIO $ boolSystemEnv "git" (mainparams ++ gitCommandLine params g) (Just $ env ++ myenv) - liftIO $ mapM_ killThread [t1, t2, t3] + liftIO $ mapM_ killThread [t1, t2] return ok where toxmpp inh = forever $ do b <- liftIO $ B.hGetSome inh 1024 - when (B.null b) $ - liftIO $ killThread =<< myThreadId - sendNetMessage $ SendPackOutput cid b - error "TODO" - fromxmpp outh = forever $ do - -- TODO get b from xmpp - let b = undefined - liftIO $ B.hPut outh b - controlxmpp controlh = do - -- TODO wait for control message from xmpp - let exitcode = undefined :: Int - liftIO $ hPutStrLn controlh (show exitcode) - + if B.null b + then liftIO $ killThread =<< myThreadId + else sendNetMessage $ SendPackOutput cid b + fromxmpp outh controlh = forever $ do + m <- waitNetPushMessage + case m of + (ReceivePackOutput _ b) -> liftIO $ B.hPut outh b + (ReceivePackDone _ exitcode) -> do + liftIO $ hPutStrLn controlh (show exitcode) + _ -> noop relayIn :: String relayIn = "GIT_ANNEX_XMPPGIT_IN" @@ -176,7 +174,7 @@ xmppGitRelay = do {- Relays git receive-pack stdin and stdout via XMPP, as well as propigating - its exit status to XMPP. -} xmppReceivePack :: ClientID -> Assistant Bool -xmppReceivePack cid = do +xmppReceivePack cid = runPush (ReceivePushRunning cid) handleDeferred $ do feeder <- asIO1 toxmpp reader <- asIO1 fromxmpp sendexitcode <- asIO1 $ sendNetMessage . ReceivePackDone cid @@ -202,4 +200,36 @@ xmppReceivePack cid = do else do sendNetMessage $ ReceivePackOutput cid b toxmpp outh - fromxmpp _inh = error "TODO feed xmpp to inh" + fromxmpp inh = forever $ do + m <- waitNetPushMessage + case m of + (SendPackOutput _ b) -> liftIO $ B.hPut inh b + _ -> noop + +xmppRemotes :: ClientID -> Assistant [Remote] +xmppRemotes cid = case baseJID <$> parseJID cid of + Nothing -> return [] + Just jid -> do + rs <- syncRemotes <$> getDaemonStatus + let want = T.unpack $ formatJID jid + liftAnnex $ filterM (matching want) rs + where + matching want r = do + v <- getRemoteConfig (Remote.repo r) configKey "" + return $ v == want + +handleDeferred :: NetMessage -> Assistant () +handleDeferred = void . handlePush + +handlePush :: NetMessage -> Assistant Bool +handlePush (PushRequest cid) = do + rs <- xmppRemotes cid + current <- liftAnnex $ inRepo Git.Branch.current + let refs = catMaybes [current, Just Annex.Branch.fullname] + any id <$> (forM rs $ \r -> xmppPush cid r refs) +handlePush (StartingPush cid) = do + rs <- xmppRemotes cid + if null rs + then return False + else xmppReceivePack cid +handlePush _ = return False diff --git a/doc/design/assistant/xmpp.mdwn b/doc/design/assistant/xmpp.mdwn index a7370382e1..b39d155e14 100644 --- a/doc/design/assistant/xmpp.mdwn +++ b/doc/design/assistant/xmpp.mdwn @@ -56,30 +56,38 @@ For pairing, a chat message is sent, containing: ### git push over XMPP -To request that a peer push to us, a chat message can be sent: +To request that a remote push to us, a chat message can be sent. +The push request is typically sent directed at the account associated +with the remote, not to a specific client. So it can result in multiple +responses. + When a peer is ready to send a git push, it sends: +If that's a response to a pushrequest, it'll be directed at only the client +that requested the push. If a push request is being initiated, it'll be sent +to the account assicated with the remote. + The receiver runs `git receive-pack`, and sends back its output in -one or more chat messages: +one or more chat messages, directed to a specific client: 007b27ca394d26a05d9b6beefa1b07da456caa2157d7 refs/heads/git-annex report-status delete-refs side-band-64k quiet ofs-delta -The sender replies with the data from `git push` (which does not need -to actually be started until this point): +The sender replies with the data from `git push`, in +one or more chat messages, directed to the receiver: data When `git receive-pack` edits, the receiver indicates its exit -status: +status with a chat message, directed at the sender: