hooked up XMPP git push send/receive (but not yet control flow)
This commit is contained in:
parent
17fd1bd919
commit
0238e4ba07
5 changed files with 95 additions and 49 deletions
|
@ -52,7 +52,7 @@ xmppClient urlrenderer d = do
|
|||
Just c -> retry (runclient c) =<< getCurrentTime
|
||||
where
|
||||
liftAssistant = runAssistant d
|
||||
xAssistant = liftIO . liftAssistant
|
||||
inAssistant = liftIO . liftAssistant
|
||||
|
||||
{- When the client exits, it's restarted;
|
||||
- if it keeps failing, back off to wait 5 minutes before
|
||||
|
@ -73,30 +73,35 @@ xmppClient urlrenderer d = do
|
|||
selfjid <- bindJID jid
|
||||
putStanza gitAnnexSignature
|
||||
|
||||
xAssistant $ debug ["connected", show selfjid]
|
||||
inAssistant $ debug ["connected", show selfjid]
|
||||
{- The buddy list starts empty each time
|
||||
- the client connects, so that stale info
|
||||
- is not retained. -}
|
||||
void $ xAssistant $
|
||||
void $ inAssistant $
|
||||
updateBuddyList (const noBuddies) <<~ buddyList
|
||||
|
||||
xmppThread $ receivenotifications selfjid
|
||||
forever $ do
|
||||
a <- xAssistant $ relayNetMessage selfjid
|
||||
a <- inAssistant $ relayNetMessage selfjid
|
||||
a
|
||||
|
||||
receivenotifications selfjid = forever $ do
|
||||
l <- decodeStanza selfjid <$> getStanza
|
||||
xAssistant $ debug ["received:", show l]
|
||||
inAssistant $ debug ["received:", show l]
|
||||
mapM_ (handle selfjid) l
|
||||
|
||||
handle _ (PresenceMessage p) = void $ xAssistant $
|
||||
handle _ (PresenceMessage p) = void $ inAssistant $
|
||||
updateBuddyList (updateBuddies p) <<~ buddyList
|
||||
handle _ (GotNetMessage QueryPresence) = putStanza gitAnnexSignature
|
||||
handle _ (GotNetMessage (NotifyPush us)) = void $ xAssistant $
|
||||
handle _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $
|
||||
pull us
|
||||
handle selfjid (GotNetMessage (PairingNotification stage t u)) =
|
||||
maybe noop (xAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID t)
|
||||
handle selfjid (GotNetMessage (PairingNotification stage c u)) =
|
||||
maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c)
|
||||
handle selfjid (GotNetMessage (PushRequest c)) = error "TODO"
|
||||
handle selfjid (GotNetMessage (StartingPush c)) = error "TODO"
|
||||
handle selfjid (GotNetMessage (ReceivePackOutput c b)) = error "TODO"
|
||||
handle selfjid (GotNetMessage (SendPackOutput c b)) = error "TODO"
|
||||
handle selfjid (GotNetMessage (ReceivePackDone c code)) = error "TODO"
|
||||
handle _ (Ignorable _) = noop
|
||||
handle _ (Unknown _) = noop
|
||||
handle _ (ProtocolError _) = noop
|
||||
|
@ -117,7 +122,7 @@ decodeStanza selfjid s@(ReceivedPresence p)
|
|||
| presenceFrom p == Just selfjid = [Ignorable s]
|
||||
| otherwise = maybe [PresenceMessage p] decode (getGitAnnexAttrValue p)
|
||||
where
|
||||
decode (attr, v)
|
||||
decode (attr, v, _tag)
|
||||
| attr == pushAttr = impliedp $ GotNetMessage $ NotifyPush $
|
||||
decodePushNotification v
|
||||
| attr == queryAttr = impliedp $ GotNetMessage QueryPresence
|
||||
|
@ -131,10 +136,15 @@ decodeStanza selfjid s@(ReceivedMessage m)
|
|||
| messageType m == MessageError = [ProtocolError s]
|
||||
| otherwise = maybe [Unknown s] decode (getGitAnnexAttrValue m)
|
||||
where
|
||||
decode (attr, v)
|
||||
| attr == pairAttr =
|
||||
[maybe (Unknown s) GotNetMessage (decodePairingNotification v m)]
|
||||
decode (attr, v, tag)
|
||||
| attr == pairAttr = use $ decodePairingNotification v
|
||||
| attr == pushRequestAttr = use decodePushRequest
|
||||
| attr == startingPushAttr = use decodeStartingPush
|
||||
| attr == receivePackAttr = use $ decodeReceivePackOutput tag
|
||||
| attr == sendPackAttr = use $ decodeSendPackOutput tag
|
||||
| attr == receivePackDoneAttr = use $ decodeReceivePackDone v
|
||||
| otherwise = [Unknown s]
|
||||
use v = [maybe (Unknown s) GotNetMessage (v m)]
|
||||
decodeStanza _ s = [Unknown s]
|
||||
|
||||
{- Waits for a NetMessager message to be sent, and relays it to XMPP. -}
|
||||
|
@ -142,15 +152,23 @@ relayNetMessage :: JID -> Assistant (XMPP ())
|
|||
relayNetMessage selfjid = convert =<< waitNetMessage
|
||||
where
|
||||
convert (NotifyPush us) = return $ putStanza $ pushNotification us
|
||||
convert QueryPresence = return $ putStanza $ presenceQuery
|
||||
convert (PairingNotification stage t u) = case parseJID t of
|
||||
Nothing -> return $ noop
|
||||
Just tojid
|
||||
| tojid == selfjid -> return $ noop
|
||||
| otherwise -> do
|
||||
convert QueryPresence = return $ putStanza presenceQuery
|
||||
convert (PairingNotification stage c u) = withclient c $ \tojid -> do
|
||||
changeBuddyPairing tojid True
|
||||
return $ putStanza $
|
||||
pairingNotification stage u tojid selfjid
|
||||
return $ putStanza $ pairingNotification stage u tojid selfjid
|
||||
convert (PushRequest c) = sendclient c pushRequest
|
||||
convert (StartingPush c) = sendclient c startingPush
|
||||
convert (ReceivePackOutput c b) = sendclient c $ receivePackOutput b
|
||||
convert (SendPackOutput c b) = sendclient c $ sendPackOutput b
|
||||
convert (ReceivePackDone c code) = sendclient c $ receivePackDone code
|
||||
|
||||
sendclient c construct = withclient c $ \tojid ->
|
||||
return $ putStanza $ construct tojid selfjid
|
||||
withclient c a = case parseJID c of
|
||||
Nothing -> return noop
|
||||
Just tojid
|
||||
| tojid == selfjid -> return noop
|
||||
| otherwise -> a tojid
|
||||
|
||||
{- Runs a XMPP action in a separate thread, using a session to allow it
|
||||
- to access the same XMPP client. -}
|
||||
|
|
|
@ -22,20 +22,23 @@ data NetMessage
|
|||
-- requests other clients to inform us of their presence
|
||||
| QueryPresence
|
||||
-- notification about a stage in the pairing process,
|
||||
-- involving a client identified by the Text, and a UUID.
|
||||
| PairingNotification PairStage Text UUID
|
||||
-- involving a client, and a UUID.
|
||||
| PairingNotification PairStage ClientID UUID
|
||||
-- request that a git push be sent over the out of band network
|
||||
| PushRequest
|
||||
-- indicates that a PushRequest has been seen and a push is starting
|
||||
| StartingPush
|
||||
| PushRequest ClientID
|
||||
-- indicates that a push is starting
|
||||
| StartingPush ClientID
|
||||
-- a chunk of output of git receive-pack
|
||||
| ReceivePackOutput ByteString
|
||||
| ReceivePackOutput ClientID ByteString
|
||||
-- a chuck of output of git send-pack
|
||||
| SendPackOutput ByteString
|
||||
| SendPackOutput ClientID ByteString
|
||||
-- sent when git receive-pack exits, with its exit code
|
||||
| ReceivePackDone ExitCode
|
||||
| ReceivePackDone ClientID ExitCode
|
||||
deriving (Show)
|
||||
|
||||
{- Something used to identify a specific client to send the message to. -}
|
||||
type ClientID = Text
|
||||
|
||||
data NetMessagerControl = NetMessagerControl
|
||||
{ netMessages :: TChan (NetMessage)
|
||||
, netMessagerRestart :: MSampleVar ()
|
||||
|
|
|
@ -55,14 +55,16 @@ instance GitAnnexTaggable Presence where
|
|||
insertGitAnnexTag p elt = p { presencePayloads = extendedAway : elt : presencePayloads p }
|
||||
extractGitAnnexTag = headMaybe . filter isGitAnnexTag . presencePayloads
|
||||
|
||||
{- Gets the attr and its value value from a git-annex tag.
|
||||
{- Gets the attr and its value value from a git-annex tag, as well as the
|
||||
- tag.
|
||||
-
|
||||
- Each git-annex tag has a single attribute. -}
|
||||
getGitAnnexAttrValue :: GitAnnexTaggable a => a -> Maybe (Name, Text)
|
||||
getGitAnnexAttrValue :: GitAnnexTaggable a => a -> Maybe (Name, Text, Element)
|
||||
getGitAnnexAttrValue a = case extractGitAnnexTag a of
|
||||
Just (tag@(Element _ [(attr, _)] _)) -> (,)
|
||||
Just (tag@(Element _ [(attr, _)] _)) -> (,,)
|
||||
<$> pure attr
|
||||
<*> attributeText attr tag
|
||||
<*> pure tag
|
||||
_ -> Nothing
|
||||
|
||||
{- A presence with a git-annex tag in it. -}
|
||||
|
@ -120,17 +122,20 @@ encodePairingNotification pairstage u = T.unwords $ map T.pack
|
|||
]
|
||||
|
||||
decodePairingNotification :: Text -> Message -> Maybe NetMessage
|
||||
decodePairingNotification t msg = parse $ words $ T.unpack t
|
||||
decodePairingNotification t m = parse $ words $ T.unpack t
|
||||
where
|
||||
parse [stage, u] = PairingNotification
|
||||
<$> readish stage
|
||||
<*> (formatJID <$> messageFrom msg)
|
||||
<*> (formatJID <$> messageFrom m)
|
||||
<*> pure (toUUID u)
|
||||
parse _ = Nothing
|
||||
|
||||
pushRequest :: JID -> JID -> Message
|
||||
pushRequest = gitAnnexMessage $ gitAnnexTag pushRequestAttr T.empty
|
||||
|
||||
decodePushRequest :: Message -> Maybe NetMessage
|
||||
decodePushRequest m = PushRequest <$> (formatJID <$> messageFrom m)
|
||||
|
||||
pushRequestAttr :: Name
|
||||
pushRequestAttr = "pushrequest"
|
||||
|
||||
|
@ -140,6 +145,9 @@ startingPush = gitAnnexMessage $ gitAnnexTag startingPushAttr T.empty
|
|||
startingPushAttr :: Name
|
||||
startingPushAttr = "startingpush"
|
||||
|
||||
decodeStartingPush :: Message -> Maybe NetMessage
|
||||
decodeStartingPush m = StartingPush <$> (formatJID <$> messageFrom m)
|
||||
|
||||
receivePackOutput :: ByteString -> JID -> JID -> Message
|
||||
receivePackOutput = gitAnnexMessage .
|
||||
gitAnnexTagContent receivePackAttr T.empty . encodeTagContent
|
||||
|
@ -147,6 +155,11 @@ receivePackOutput = gitAnnexMessage .
|
|||
receivePackAttr :: Name
|
||||
receivePackAttr = "rp"
|
||||
|
||||
decodeReceivePackOutput :: Element -> Message -> Maybe NetMessage
|
||||
decodeReceivePackOutput t m = ReceivePackOutput
|
||||
<$> (formatJID <$> messageFrom m)
|
||||
<*> decodeTagContent t
|
||||
|
||||
sendPackOutput :: ByteString -> JID -> JID -> Message
|
||||
sendPackOutput = gitAnnexMessage .
|
||||
gitAnnexTagContent sendPackAttr T.empty . encodeTagContent
|
||||
|
@ -154,15 +167,21 @@ sendPackOutput = gitAnnexMessage .
|
|||
sendPackAttr :: Name
|
||||
sendPackAttr = "sp"
|
||||
|
||||
decodeSendPackOutput :: Element -> Message -> Maybe NetMessage
|
||||
decodeSendPackOutput t m = SendPackOutput
|
||||
<$> (formatJID <$> messageFrom m)
|
||||
<*> decodeTagContent t
|
||||
|
||||
receivePackDone :: ExitCode -> JID -> JID -> Message
|
||||
receivePackDone = gitAnnexMessage . gitAnnexTag receivePackAttr . T.pack . show . toi
|
||||
where
|
||||
toi (ExitSuccess) = 0
|
||||
toi (ExitFailure i) = i
|
||||
|
||||
decodeReceivePackDone :: Text -> ExitCode
|
||||
decodeReceivePackDone t = fromMaybe (ExitFailure 1) $
|
||||
fromi <$> readish (T.unpack t)
|
||||
decodeReceivePackDone :: Text -> Message -> Maybe NetMessage
|
||||
decodeReceivePackDone t m = ReceivePackDone
|
||||
<$> (formatJID <$> messageFrom m)
|
||||
<*> (fromi <$> readish (T.unpack t))
|
||||
where
|
||||
fromi 0 = ExitSuccess
|
||||
fromi i = ExitFailure i
|
||||
|
|
|
@ -8,6 +8,8 @@
|
|||
module Assistant.XMPP.Git where
|
||||
|
||||
import Assistant.Common
|
||||
import Assistant.NetMessager
|
||||
import Assistant.Types.NetMessager
|
||||
import Assistant.XMPP
|
||||
import Assistant.XMPP.Buddies
|
||||
import Assistant.DaemonStatus
|
||||
|
@ -77,7 +79,10 @@ makeXMPPGitRemote buddyname jid u = do
|
|||
- We listen at the other end of the pipe and relay to and from XMPP.
|
||||
-}
|
||||
xmppPush :: Remote -> [Ref] -> Assistant Bool
|
||||
xmppPush remote refs = do
|
||||
xmppPush remote refs = error "TODO"
|
||||
|
||||
xmppPush' :: ClientID -> Remote -> [Ref] -> Assistant Bool
|
||||
xmppPush' cid remote refs = do
|
||||
program <- liftIO readProgramFile
|
||||
|
||||
(Fd inf, writepush) <- liftIO createPipe
|
||||
|
@ -115,7 +120,7 @@ xmppPush remote refs = do
|
|||
b <- liftIO $ B.hGetSome inh 1024
|
||||
when (B.null b) $
|
||||
liftIO $ killThread =<< myThreadId
|
||||
-- TODO relay b to xmpp
|
||||
sendNetMessage $ SendPackOutput cid b
|
||||
error "TODO"
|
||||
fromxmpp outh = forever $ do
|
||||
-- TODO get b from xmpp
|
||||
|
@ -168,12 +173,13 @@ xmppGitRelay = do
|
|||
| otherwise -> ExitFailure n
|
||||
Nothing -> ExitFailure 1
|
||||
|
||||
{- Relays git receive-pack to and from XMPP, and propigates its exit status. -}
|
||||
xmppReceivePack :: Assistant Bool
|
||||
xmppReceivePack = do
|
||||
{- Relays git receive-pack stdin and stdout via XMPP, as well as propigating
|
||||
- its exit status to XMPP. -}
|
||||
xmppReceivePack :: ClientID -> Assistant Bool
|
||||
xmppReceivePack cid = do
|
||||
feeder <- asIO1 toxmpp
|
||||
reader <- asIO1 fromxmpp
|
||||
controller <- asIO1 controlxmpp
|
||||
sendexitcode <- asIO1 $ sendNetMessage . ReceivePackDone cid
|
||||
repodir <- liftAnnex $ fromRepo repoPath
|
||||
let p = (proc "git" ["receive-pack", repodir])
|
||||
{ std_in = CreatePipe
|
||||
|
@ -185,7 +191,7 @@ xmppReceivePack = do
|
|||
feedertid <- forkIO $ feeder outh
|
||||
void $ reader inh
|
||||
code <- waitForProcess pid
|
||||
void $ controller code
|
||||
void $ sendexitcode code
|
||||
killThread feedertid
|
||||
return $ code == ExitSuccess
|
||||
where
|
||||
|
@ -194,7 +200,6 @@ xmppReceivePack = do
|
|||
if B.null b
|
||||
then return () -- EOF
|
||||
else do
|
||||
error "TODO feed b to xmpp"
|
||||
sendNetMessage $ ReceivePackOutput cid b
|
||||
toxmpp outh
|
||||
fromxmpp _inh = error "TODO feed xmpp to inh"
|
||||
controlxmpp _code = error "TODO propigate exit code"
|
||||
|
|
|
@ -58,11 +58,11 @@ For pairing, a chat message is sent, containing:
|
|||
|
||||
To request that a peer push to us, a chat message can be sent:
|
||||
|
||||
<git-annex xmlns='git-annex' pushrequest="" />
|
||||
<git-annex xmlns='git-annex' pushrequest="uuid" />
|
||||
|
||||
When a peer is ready to send a git push, it sends:
|
||||
|
||||
<git-annex xmlns='git-annex' startingpush="" />
|
||||
<git-annex xmlns='git-annex' startingpush="uuid" />
|
||||
|
||||
The receiver runs `git receive-pack`, and sends back its output in
|
||||
one or more chat messages:
|
||||
|
@ -71,7 +71,8 @@ one or more chat messages:
|
|||
007b27ca394d26a05d9b6beefa1b07da456caa2157d7 refs/heads/git-annex report-status delete-refs side-band-64k quiet ofs-delta
|
||||
</git-annex>
|
||||
|
||||
The sender replies with the data from `git push`:
|
||||
The sender replies with the data from `git push` (which does not need
|
||||
to actually be started until this point):
|
||||
|
||||
<git-annex xmlns='git-annex' sp="">
|
||||
data
|
||||
|
|
Loading…
Reference in a new issue