xmpp push control flow

It might even work, although nothing yet triggers XMPP pushes.

Also added a set of deferred push messages. Only one push can run at a
time, and unrelated push messages get deferred. The set will never grow
very large, because it only puts two types of messages in there, that
can only vary in the client doing the push.
This commit is contained in:
Joey Hess 2012-11-08 16:44:23 -04:00
parent 08916ef695
commit fb3b9412e4
6 changed files with 174 additions and 45 deletions

View file

@ -105,10 +105,10 @@
- BranchChanged (STM SampleVar)
- Changes to the git-annex branch are indicated by updating this
- SampleVar.
- NetMessager (STM TChan, SampleVar)
- Used to feed messages to the built-in XMPP client, and
- signal it when it needs to restart due to configuration or
- networking changes.
- NetMessager (STM TChan, TMVar, SampleVar)
- Used to feed messages to the built-in XMPP client, handle
- pushes, and signal it when it needs to restart due to configuration
- or networking changes.
- UrlRenderer (MVar)
- A Yesod route rendering function is stored here. This allows
- things that need to render Yesod routes to block until the webapp

View file

@ -10,8 +10,11 @@ module Assistant.NetMessager where
import Assistant.Common
import Assistant.Types.NetMessager
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Control.Exception as E
import qualified Data.Set as S
sendNetMessage :: NetMessage -> Assistant ()
sendNetMessage m =
@ -26,3 +29,67 @@ notifyNetMessagerRestart =
waitNetMessagerRestart :: Assistant ()
waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager)
getPushRunning :: Assistant PushRunning
getPushRunning =
(atomically . readTMVar) <<~ (netMessagerPushRunning . netMessager)
{- Runs an action that runs either the send or receive end of a push.
-
- While the push is running, netMessagesPush will get messages put into it
- relating to this push, while any messages relating to other pushes
- go to netMessagesDeferred. Once the push finishes, those deferred
- messages will be fed to handledeferred for processing.
-}
runPush :: PushRunning -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a
runPush v handledeferred a = do
nm <- getAssistant netMessager
let pr = netMessagerPushRunning nm
let setup = void $ atomically $ swapTMVar pr v
let cleanup = atomically $ do
void $ swapTMVar pr NoPushRunning
emptytchan (netMessagesPush nm)
r <- E.bracket_ setup cleanup <~> a
(void . forkIO) <~> processdeferred nm
return r
where
emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c
processdeferred nm = do
s <- liftIO $ atomically $ swapTMVar (netMessagesDeferredPush nm) S.empty
mapM_ rundeferred (S.toList s)
rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ()))))
<~> handledeferred m
{- While a push is running, matching push messages are put into
- netMessagesPush, while others go to netMessagesDeferredPush. To avoid
- bloating memory, only PushRequest and StartingPush messages are
- deferred.
-
- When no push is running, returns False.
-}
queueNetPushMessage :: NetMessage -> Assistant Bool
queueNetPushMessage m = do
nm <- getAssistant netMessager
liftIO $ atomically $ do
running <- readTMVar (netMessagerPushRunning nm)
case running of
NoPushRunning -> return False
SendPushRunning cid -> go nm cid
ReceivePushRunning cid -> go nm cid
where
go nm cid
| getClientID m == Just cid = do
writeTChan (netMessagesPush nm) m
return True
| otherwise = do
case m of
PushRequest _ -> defer nm
StartingPush _ -> defer nm
_ -> noop
return True
defer nm = do
s <- takeTMVar (netMessagesDeferredPush nm)
putTMVar (netMessagesDeferredPush nm) $ S.insert m s
waitNetPushMessage :: Assistant (NetMessage)
waitNetPushMessage = (atomically . readTChan) <<~ (netMessagesPush . netMessager)

View file

@ -93,15 +93,14 @@ xmppClient urlrenderer d = do
handle _ (PresenceMessage p) = void $ inAssistant $
updateBuddyList (updateBuddies p) <<~ buddyList
handle _ (GotNetMessage QueryPresence) = putStanza gitAnnexSignature
handle _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $
pull us
handle _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $ pull us
handle selfjid (GotNetMessage (PairingNotification stage c u)) =
maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c)
handle selfjid (GotNetMessage (PushRequest c)) = error "TODO"
handle selfjid (GotNetMessage (StartingPush c)) = error "TODO"
handle selfjid (GotNetMessage (ReceivePackOutput c b)) = error "TODO"
handle selfjid (GotNetMessage (SendPackOutput c b)) = error "TODO"
handle selfjid (GotNetMessage (ReceivePackDone c code)) = error "TODO"
handle _ (GotNetMessage m@(PushRequest _)) = inAssistant $
unlessM (queueNetPushMessage m) $ void $ handlePush m
handle _ (GotNetMessage m@(StartingPush _)) = inAssistant $
unlessM (queueNetPushMessage m) $ void $ handlePush m
handle _ (GotNetMessage m) = void $ inAssistant $ queueNetPushMessage m
handle _ (Ignorable _) = noop
handle _ (Unknown _) = noop
handle _ (ProtocolError _) = noop

View file

@ -14,6 +14,7 @@ import Data.Text (Text)
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Data.ByteString (ByteString)
import Data.Set as S
{- Messages that can be sent out of band by a network messager. -}
data NetMessage
@ -34,17 +35,41 @@ data NetMessage
| SendPackOutput ClientID ByteString
-- sent when git receive-pack exits, with its exit code
| ReceivePackDone ClientID ExitCode
deriving (Show)
deriving (Show, Eq, Ord)
{- Something used to identify a specific client to send the message to. -}
{- Something used to identify the client, or clients to send the message to. -}
type ClientID = Text
getClientID :: NetMessage -> Maybe ClientID
getClientID (NotifyPush _) = Nothing
getClientID QueryPresence = Nothing
getClientID (PairingNotification _ cid _) = Just cid
getClientID (PushRequest cid) = Just cid
getClientID (StartingPush cid) = Just cid
getClientID (ReceivePackOutput cid _) = Just cid
getClientID (SendPackOutput cid _) = Just cid
getClientID (ReceivePackDone cid _) = Just cid
data NetMessager = NetMessager
-- outgoing messages
{ netMessages :: TChan (NetMessage)
-- only one push can be running at a time, and this tracks it
, netMessagerPushRunning :: TMVar (PushRunning)
-- incoming messages relating to the currently running push
, netMessagesPush :: TChan (NetMessage)
-- incoming push messages that have been deferred to be processed later
, netMessagesDeferredPush :: TMVar (S.Set NetMessage)
-- write to this to restart the net messager
, netMessagerRestart :: MSampleVar ()
}
data PushRunning = NoPushRunning | SendPushRunning ClientID | ReceivePushRunning ClientID
deriving (Eq)
newNetMessager :: IO NetMessager
newNetMessager = NetMessager
<$> atomically newTChan
<*> atomically (newTMVar NoPushRunning)
<*> atomically newTChan
<*> atomically (newTMVar S.empty)
<*> newEmptySV

View file

@ -20,6 +20,8 @@ import Annex.UUID
import Config
import Git
import Git.Command
import qualified Git.Branch
import qualified Annex.Branch
import Locations.UserConfig
import qualified Types.Remote as Remote
@ -31,8 +33,8 @@ import System.Process (std_in, std_out, std_err)
import Control.Concurrent
import qualified Data.ByteString as B
configKey :: Remote -> ConfigKey
configKey r = remoteConfig (Remote.repo r) "xmppaddress"
configKey :: UnqualifiedConfigKey
configKey = "xmppaddress"
finishXMPPPairing :: JID -> UUID -> Assistant ()
finishXMPPPairing jid u = void $ alertWhile alert $
@ -53,13 +55,15 @@ makeXMPPGitRemote buddyname jid u = do
liftAnnex $ do
let r = Remote.repo remote
storeUUID (remoteConfig r "uuid") u
setConfig (configKey remote) xmppaddress
setConfig (remoteConfig r configKey) xmppaddress
syncNewRemote remote
return True
where
xmppaddress = T.unpack $ formatJID $ baseJID jid
{- Pushes the named refs to the remote, over XMPP.
{- Pushes the named refs to the remote, over XMPP, communicating with a
- specific client that either requested the push, or responded to our
- StartingPush message.
-
- Strategy: Set GIT_SSH to run git-annex. By setting the remote url
- to "xmppgit:dummy", "git-annex xmppgit" will be run locally by
@ -78,11 +82,9 @@ makeXMPPGitRemote buddyname jid u = do
-
- We listen at the other end of the pipe and relay to and from XMPP.
-}
xmppPush :: Remote -> [Ref] -> Assistant Bool
xmppPush remote refs = error "TODO"
xmppPush' :: ClientID -> Remote -> [Ref] -> Assistant Bool
xmppPush' cid remote refs = do
xmppPush :: ClientID -> Remote -> [Ref] -> Assistant Bool
xmppPush cid remote refs = runPush (SendPushRunning cid) handleDeferred $ do
sendNetMessage $ StartingPush cid
program <- liftIO readProgramFile
(Fd inf, writepush) <- liftIO createPipe
@ -107,30 +109,26 @@ xmppPush' cid remote refs = do
liftIO $ hSetBuffering outh NoBuffering
t1 <- forkIO <~> toxmpp inh
t2 <- forkIO <~> fromxmpp outh
t3 <- forkIO <~> controlxmpp controlh
t2 <- forkIO <~> fromxmpp outh controlh
ok <- liftIO $ boolSystemEnv "git"
(mainparams ++ gitCommandLine params g)
(Just $ env ++ myenv)
liftIO $ mapM_ killThread [t1, t2, t3]
liftIO $ mapM_ killThread [t1, t2]
return ok
where
toxmpp inh = forever $ do
b <- liftIO $ B.hGetSome inh 1024
when (B.null b) $
liftIO $ killThread =<< myThreadId
sendNetMessage $ SendPackOutput cid b
error "TODO"
fromxmpp outh = forever $ do
-- TODO get b from xmpp
let b = undefined
liftIO $ B.hPut outh b
controlxmpp controlh = do
-- TODO wait for control message from xmpp
let exitcode = undefined :: Int
liftIO $ hPutStrLn controlh (show exitcode)
if B.null b
then liftIO $ killThread =<< myThreadId
else sendNetMessage $ SendPackOutput cid b
fromxmpp outh controlh = forever $ do
m <- waitNetPushMessage
case m of
(ReceivePackOutput _ b) -> liftIO $ B.hPut outh b
(ReceivePackDone _ exitcode) -> do
liftIO $ hPutStrLn controlh (show exitcode)
_ -> noop
relayIn :: String
relayIn = "GIT_ANNEX_XMPPGIT_IN"
@ -176,7 +174,7 @@ xmppGitRelay = do
{- Relays git receive-pack stdin and stdout via XMPP, as well as propigating
- its exit status to XMPP. -}
xmppReceivePack :: ClientID -> Assistant Bool
xmppReceivePack cid = do
xmppReceivePack cid = runPush (ReceivePushRunning cid) handleDeferred $ do
feeder <- asIO1 toxmpp
reader <- asIO1 fromxmpp
sendexitcode <- asIO1 $ sendNetMessage . ReceivePackDone cid
@ -202,4 +200,36 @@ xmppReceivePack cid = do
else do
sendNetMessage $ ReceivePackOutput cid b
toxmpp outh
fromxmpp _inh = error "TODO feed xmpp to inh"
fromxmpp inh = forever $ do
m <- waitNetPushMessage
case m of
(SendPackOutput _ b) -> liftIO $ B.hPut inh b
_ -> noop
xmppRemotes :: ClientID -> Assistant [Remote]
xmppRemotes cid = case baseJID <$> parseJID cid of
Nothing -> return []
Just jid -> do
rs <- syncRemotes <$> getDaemonStatus
let want = T.unpack $ formatJID jid
liftAnnex $ filterM (matching want) rs
where
matching want r = do
v <- getRemoteConfig (Remote.repo r) configKey ""
return $ v == want
handleDeferred :: NetMessage -> Assistant ()
handleDeferred = void . handlePush
handlePush :: NetMessage -> Assistant Bool
handlePush (PushRequest cid) = do
rs <- xmppRemotes cid
current <- liftAnnex $ inRepo Git.Branch.current
let refs = catMaybes [current, Just Annex.Branch.fullname]
any id <$> (forM rs $ \r -> xmppPush cid r refs)
handlePush (StartingPush cid) = do
rs <- xmppRemotes cid
if null rs
then return False
else xmppReceivePack cid
handlePush _ = return False

View file

@ -56,30 +56,38 @@ For pairing, a chat message is sent, containing:
### git push over XMPP
To request that a peer push to us, a chat message can be sent:
To request that a remote push to us, a chat message can be sent.
<git-annex xmlns='git-annex' pushrequest="uuid" />
The push request is typically sent directed at the account associated
with the remote, not to a specific client. So it can result in multiple
responses.
When a peer is ready to send a git push, it sends:
<git-annex xmlns='git-annex' startingpush="uuid" />
If that's a response to a pushrequest, it'll be directed at only the client
that requested the push. If a push request is being initiated, it'll be sent
to the account assicated with the remote.
The receiver runs `git receive-pack`, and sends back its output in
one or more chat messages:
one or more chat messages, directed to a specific client:
<git-annex xmlns='git-annex' rp="">
007b27ca394d26a05d9b6beefa1b07da456caa2157d7 refs/heads/git-annex report-status delete-refs side-band-64k quiet ofs-delta
</git-annex>
The sender replies with the data from `git push` (which does not need
to actually be started until this point):
The sender replies with the data from `git push`, in
one or more chat messages, directed to the receiver:
<git-annex xmlns='git-annex' sp="">
data
</git-annex>
When `git receive-pack` edits, the receiver indicates its exit
status:
status with a chat message, directed at the sender:
<git-annex xmlns='git-annex' rpdone="0" />