diff --git a/Assistant.hs b/Assistant.hs index 55641b90e4..5b3dd9cb9f 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -105,10 +105,10 @@ - BranchChanged (STM SampleVar) - Changes to the git-annex branch are indicated by updating this - SampleVar. - - NetMessager (STM TChan, SampleVar) - - Used to feed messages to the built-in XMPP client, and - - signal it when it needs to restart due to configuration or - - networking changes. + - NetMessager (STM TChan, TMVar, SampleVar) + - Used to feed messages to the built-in XMPP client, handle + - pushes, and signal it when it needs to restart due to configuration + - or networking changes. - UrlRenderer (MVar) - A Yesod route rendering function is stored here. This allows - things that need to render Yesod routes to block until the webapp diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs index 8895cb7dbe..8fac55c8a5 100644 --- a/Assistant/NetMessager.hs +++ b/Assistant/NetMessager.hs @@ -10,8 +10,11 @@ module Assistant.NetMessager where import Assistant.Common import Assistant.Types.NetMessager +import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.MSampleVar +import Control.Exception as E +import qualified Data.Set as S sendNetMessage :: NetMessage -> Assistant () sendNetMessage m = @@ -26,3 +29,67 @@ notifyNetMessagerRestart = waitNetMessagerRestart :: Assistant () waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager) + +getPushRunning :: Assistant PushRunning +getPushRunning = + (atomically . readTMVar) <<~ (netMessagerPushRunning . netMessager) + +{- Runs an action that runs either the send or receive end of a push. + - + - While the push is running, netMessagesPush will get messages put into it + - relating to this push, while any messages relating to other pushes + - go to netMessagesDeferred. Once the push finishes, those deferred + - messages will be fed to handledeferred for processing. + -} +runPush :: PushRunning -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a +runPush v handledeferred a = do + nm <- getAssistant netMessager + let pr = netMessagerPushRunning nm + let setup = void $ atomically $ swapTMVar pr v + let cleanup = atomically $ do + void $ swapTMVar pr NoPushRunning + emptytchan (netMessagesPush nm) + r <- E.bracket_ setup cleanup <~> a + (void . forkIO) <~> processdeferred nm + return r + where + emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c + processdeferred nm = do + s <- liftIO $ atomically $ swapTMVar (netMessagesDeferredPush nm) S.empty + mapM_ rundeferred (S.toList s) + rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ())))) + <~> handledeferred m + +{- While a push is running, matching push messages are put into + - netMessagesPush, while others go to netMessagesDeferredPush. To avoid + - bloating memory, only PushRequest and StartingPush messages are + - deferred. + - + - When no push is running, returns False. + -} +queueNetPushMessage :: NetMessage -> Assistant Bool +queueNetPushMessage m = do + nm <- getAssistant netMessager + liftIO $ atomically $ do + running <- readTMVar (netMessagerPushRunning nm) + case running of + NoPushRunning -> return False + SendPushRunning cid -> go nm cid + ReceivePushRunning cid -> go nm cid + where + go nm cid + | getClientID m == Just cid = do + writeTChan (netMessagesPush nm) m + return True + | otherwise = do + case m of + PushRequest _ -> defer nm + StartingPush _ -> defer nm + _ -> noop + return True + defer nm = do + s <- takeTMVar (netMessagesDeferredPush nm) + putTMVar (netMessagesDeferredPush nm) $ S.insert m s + +waitNetPushMessage :: Assistant (NetMessage) +waitNetPushMessage = (atomically . readTChan) <<~ (netMessagesPush . netMessager) diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs index 1117c3c140..efdecb5870 100644 --- a/Assistant/Threads/XMPPClient.hs +++ b/Assistant/Threads/XMPPClient.hs @@ -93,15 +93,14 @@ xmppClient urlrenderer d = do handle _ (PresenceMessage p) = void $ inAssistant $ updateBuddyList (updateBuddies p) <<~ buddyList handle _ (GotNetMessage QueryPresence) = putStanza gitAnnexSignature - handle _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $ - pull us + handle _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $ pull us handle selfjid (GotNetMessage (PairingNotification stage c u)) = maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c) - handle selfjid (GotNetMessage (PushRequest c)) = error "TODO" - handle selfjid (GotNetMessage (StartingPush c)) = error "TODO" - handle selfjid (GotNetMessage (ReceivePackOutput c b)) = error "TODO" - handle selfjid (GotNetMessage (SendPackOutput c b)) = error "TODO" - handle selfjid (GotNetMessage (ReceivePackDone c code)) = error "TODO" + handle _ (GotNetMessage m@(PushRequest _)) = inAssistant $ + unlessM (queueNetPushMessage m) $ void $ handlePush m + handle _ (GotNetMessage m@(StartingPush _)) = inAssistant $ + unlessM (queueNetPushMessage m) $ void $ handlePush m + handle _ (GotNetMessage m) = void $ inAssistant $ queueNetPushMessage m handle _ (Ignorable _) = noop handle _ (Unknown _) = noop handle _ (ProtocolError _) = noop diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs index 77f2759b34..6974cf57df 100644 --- a/Assistant/Types/NetMessager.hs +++ b/Assistant/Types/NetMessager.hs @@ -14,6 +14,7 @@ import Data.Text (Text) import Control.Concurrent.STM import Control.Concurrent.MSampleVar import Data.ByteString (ByteString) +import Data.Set as S {- Messages that can be sent out of band by a network messager. -} data NetMessage @@ -34,17 +35,41 @@ data NetMessage | SendPackOutput ClientID ByteString -- sent when git receive-pack exits, with its exit code | ReceivePackDone ClientID ExitCode - deriving (Show) + deriving (Show, Eq, Ord) -{- Something used to identify a specific client to send the message to. -} +{- Something used to identify the client, or clients to send the message to. -} type ClientID = Text +getClientID :: NetMessage -> Maybe ClientID +getClientID (NotifyPush _) = Nothing +getClientID QueryPresence = Nothing +getClientID (PairingNotification _ cid _) = Just cid +getClientID (PushRequest cid) = Just cid +getClientID (StartingPush cid) = Just cid +getClientID (ReceivePackOutput cid _) = Just cid +getClientID (SendPackOutput cid _) = Just cid +getClientID (ReceivePackDone cid _) = Just cid + data NetMessager = NetMessager + -- outgoing messages { netMessages :: TChan (NetMessage) + -- only one push can be running at a time, and this tracks it + , netMessagerPushRunning :: TMVar (PushRunning) + -- incoming messages relating to the currently running push + , netMessagesPush :: TChan (NetMessage) + -- incoming push messages that have been deferred to be processed later + , netMessagesDeferredPush :: TMVar (S.Set NetMessage) + -- write to this to restart the net messager , netMessagerRestart :: MSampleVar () } +data PushRunning = NoPushRunning | SendPushRunning ClientID | ReceivePushRunning ClientID + deriving (Eq) + newNetMessager :: IO NetMessager newNetMessager = NetMessager <$> atomically newTChan + <*> atomically (newTMVar NoPushRunning) + <*> atomically newTChan + <*> atomically (newTMVar S.empty) <*> newEmptySV diff --git a/Assistant/XMPP/Git.hs b/Assistant/XMPP/Git.hs index 7c4509c51e..344f943275 100644 --- a/Assistant/XMPP/Git.hs +++ b/Assistant/XMPP/Git.hs @@ -20,6 +20,8 @@ import Annex.UUID import Config import Git import Git.Command +import qualified Git.Branch +import qualified Annex.Branch import Locations.UserConfig import qualified Types.Remote as Remote @@ -31,8 +33,8 @@ import System.Process (std_in, std_out, std_err) import Control.Concurrent import qualified Data.ByteString as B -configKey :: Remote -> ConfigKey -configKey r = remoteConfig (Remote.repo r) "xmppaddress" +configKey :: UnqualifiedConfigKey +configKey = "xmppaddress" finishXMPPPairing :: JID -> UUID -> Assistant () finishXMPPPairing jid u = void $ alertWhile alert $ @@ -53,13 +55,15 @@ makeXMPPGitRemote buddyname jid u = do liftAnnex $ do let r = Remote.repo remote storeUUID (remoteConfig r "uuid") u - setConfig (configKey remote) xmppaddress + setConfig (remoteConfig r configKey) xmppaddress syncNewRemote remote return True where xmppaddress = T.unpack $ formatJID $ baseJID jid -{- Pushes the named refs to the remote, over XMPP. +{- Pushes the named refs to the remote, over XMPP, communicating with a + - specific client that either requested the push, or responded to our + - StartingPush message. - - Strategy: Set GIT_SSH to run git-annex. By setting the remote url - to "xmppgit:dummy", "git-annex xmppgit" will be run locally by @@ -78,11 +82,9 @@ makeXMPPGitRemote buddyname jid u = do - - We listen at the other end of the pipe and relay to and from XMPP. -} -xmppPush :: Remote -> [Ref] -> Assistant Bool -xmppPush remote refs = error "TODO" - -xmppPush' :: ClientID -> Remote -> [Ref] -> Assistant Bool -xmppPush' cid remote refs = do +xmppPush :: ClientID -> Remote -> [Ref] -> Assistant Bool +xmppPush cid remote refs = runPush (SendPushRunning cid) handleDeferred $ do + sendNetMessage $ StartingPush cid program <- liftIO readProgramFile (Fd inf, writepush) <- liftIO createPipe @@ -107,30 +109,26 @@ xmppPush' cid remote refs = do liftIO $ hSetBuffering outh NoBuffering t1 <- forkIO <~> toxmpp inh - t2 <- forkIO <~> fromxmpp outh - t3 <- forkIO <~> controlxmpp controlh + t2 <- forkIO <~> fromxmpp outh controlh ok <- liftIO $ boolSystemEnv "git" (mainparams ++ gitCommandLine params g) (Just $ env ++ myenv) - liftIO $ mapM_ killThread [t1, t2, t3] + liftIO $ mapM_ killThread [t1, t2] return ok where toxmpp inh = forever $ do b <- liftIO $ B.hGetSome inh 1024 - when (B.null b) $ - liftIO $ killThread =<< myThreadId - sendNetMessage $ SendPackOutput cid b - error "TODO" - fromxmpp outh = forever $ do - -- TODO get b from xmpp - let b = undefined - liftIO $ B.hPut outh b - controlxmpp controlh = do - -- TODO wait for control message from xmpp - let exitcode = undefined :: Int - liftIO $ hPutStrLn controlh (show exitcode) - + if B.null b + then liftIO $ killThread =<< myThreadId + else sendNetMessage $ SendPackOutput cid b + fromxmpp outh controlh = forever $ do + m <- waitNetPushMessage + case m of + (ReceivePackOutput _ b) -> liftIO $ B.hPut outh b + (ReceivePackDone _ exitcode) -> do + liftIO $ hPutStrLn controlh (show exitcode) + _ -> noop relayIn :: String relayIn = "GIT_ANNEX_XMPPGIT_IN" @@ -176,7 +174,7 @@ xmppGitRelay = do {- Relays git receive-pack stdin and stdout via XMPP, as well as propigating - its exit status to XMPP. -} xmppReceivePack :: ClientID -> Assistant Bool -xmppReceivePack cid = do +xmppReceivePack cid = runPush (ReceivePushRunning cid) handleDeferred $ do feeder <- asIO1 toxmpp reader <- asIO1 fromxmpp sendexitcode <- asIO1 $ sendNetMessage . ReceivePackDone cid @@ -202,4 +200,36 @@ xmppReceivePack cid = do else do sendNetMessage $ ReceivePackOutput cid b toxmpp outh - fromxmpp _inh = error "TODO feed xmpp to inh" + fromxmpp inh = forever $ do + m <- waitNetPushMessage + case m of + (SendPackOutput _ b) -> liftIO $ B.hPut inh b + _ -> noop + +xmppRemotes :: ClientID -> Assistant [Remote] +xmppRemotes cid = case baseJID <$> parseJID cid of + Nothing -> return [] + Just jid -> do + rs <- syncRemotes <$> getDaemonStatus + let want = T.unpack $ formatJID jid + liftAnnex $ filterM (matching want) rs + where + matching want r = do + v <- getRemoteConfig (Remote.repo r) configKey "" + return $ v == want + +handleDeferred :: NetMessage -> Assistant () +handleDeferred = void . handlePush + +handlePush :: NetMessage -> Assistant Bool +handlePush (PushRequest cid) = do + rs <- xmppRemotes cid + current <- liftAnnex $ inRepo Git.Branch.current + let refs = catMaybes [current, Just Annex.Branch.fullname] + any id <$> (forM rs $ \r -> xmppPush cid r refs) +handlePush (StartingPush cid) = do + rs <- xmppRemotes cid + if null rs + then return False + else xmppReceivePack cid +handlePush _ = return False diff --git a/doc/design/assistant/xmpp.mdwn b/doc/design/assistant/xmpp.mdwn index a7370382e1..b39d155e14 100644 --- a/doc/design/assistant/xmpp.mdwn +++ b/doc/design/assistant/xmpp.mdwn @@ -56,30 +56,38 @@ For pairing, a chat message is sent, containing: ### git push over XMPP -To request that a peer push to us, a chat message can be sent: +To request that a remote push to us, a chat message can be sent. +The push request is typically sent directed at the account associated +with the remote, not to a specific client. So it can result in multiple +responses. + When a peer is ready to send a git push, it sends: +If that's a response to a pushrequest, it'll be directed at only the client +that requested the push. If a push request is being initiated, it'll be sent +to the account assicated with the remote. + The receiver runs `git receive-pack`, and sends back its output in -one or more chat messages: +one or more chat messages, directed to a specific client: 007b27ca394d26a05d9b6beefa1b07da456caa2157d7 refs/heads/git-annex report-status delete-refs side-band-64k quiet ofs-delta -The sender replies with the data from `git push` (which does not need -to actually be started until this point): +The sender replies with the data from `git push`, in +one or more chat messages, directed to the receiver: data When `git receive-pack` edits, the receiver indicates its exit -status: +status with a chat message, directed at the sender: