assistant: XMPP git pull and push requests are cached and sent when presence of a new client is detected.
Noticed that, At startup or network reconnect, git push messages were sent, often before presence info has been gathered, so were not sent to any buddies. To fix this, keep track of which buddies have seen such messages, and when new presence is received from a buddy that has not yet seen it, resend. This is done only for push initiation messages, so very little data needs to be stored.
This commit is contained in:
parent
d76e281de0
commit
c16adc25c4
5 changed files with 106 additions and 20 deletions
|
@ -15,6 +15,7 @@ import Control.Concurrent.STM
|
||||||
import Control.Concurrent.MSampleVar
|
import Control.Concurrent.MSampleVar
|
||||||
import Control.Exception as E
|
import Control.Exception as E
|
||||||
import qualified Data.Set as S
|
import qualified Data.Set as S
|
||||||
|
import qualified Data.Map as M
|
||||||
|
|
||||||
sendNetMessage :: NetMessage -> Assistant ()
|
sendNetMessage :: NetMessage -> Assistant ()
|
||||||
sendNetMessage m =
|
sendNetMessage m =
|
||||||
|
@ -30,6 +31,42 @@ notifyNetMessagerRestart =
|
||||||
waitNetMessagerRestart :: Assistant ()
|
waitNetMessagerRestart :: Assistant ()
|
||||||
waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager)
|
waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager)
|
||||||
|
|
||||||
|
{- Store an important NetMessage for a client, and if the same message was
|
||||||
|
- already sent, remove it from sentImportantNetMessages. -}
|
||||||
|
storeImportantNetMessage :: NetMessage -> ClientID -> (ClientID -> Bool) -> Assistant ()
|
||||||
|
storeImportantNetMessage m client matchingclient = go <<~ netMessager
|
||||||
|
where
|
||||||
|
go nm = atomically $ do
|
||||||
|
q <- takeTMVar $ importantNetMessages nm
|
||||||
|
sent <- takeTMVar $ sentImportantNetMessages nm
|
||||||
|
putTMVar (importantNetMessages nm) $
|
||||||
|
M.alter (Just . maybe (S.singleton m) (S.insert m)) client q
|
||||||
|
putTMVar (sentImportantNetMessages nm) $
|
||||||
|
M.mapWithKey removematching sent
|
||||||
|
removematching someclient s
|
||||||
|
| matchingclient someclient = S.delete m s
|
||||||
|
| otherwise = s
|
||||||
|
|
||||||
|
{- Indicates that an important NetMessage has been sent to a client. -}
|
||||||
|
sentImportantNetMessage :: NetMessage -> ClientID -> Assistant ()
|
||||||
|
sentImportantNetMessage m client = go <<~ (sentImportantNetMessages . netMessager)
|
||||||
|
where
|
||||||
|
go v = atomically $ do
|
||||||
|
sent <- takeTMVar v
|
||||||
|
putTMVar v $
|
||||||
|
M.alter (Just . maybe (S.singleton m) (S.insert m)) client sent
|
||||||
|
|
||||||
|
{- Checks for important NetMessages that have been stored for a client, and
|
||||||
|
- sent to a client. Typically the same client for both, although
|
||||||
|
- a modified or more specific client may need to be used. -}
|
||||||
|
checkImportantNetMessages :: (ClientID, ClientID) -> Assistant (S.Set NetMessage, S.Set NetMessage)
|
||||||
|
checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager
|
||||||
|
where
|
||||||
|
go nm = atomically $ do
|
||||||
|
stored <- M.lookup storedclient <$> (readTMVar $ importantNetMessages nm)
|
||||||
|
sent <- M.lookup sentclient <$> (readTMVar $ sentImportantNetMessages nm)
|
||||||
|
return (fromMaybe S.empty stored, fromMaybe S.empty sent)
|
||||||
|
|
||||||
{- Runs an action that runs either the send or receive side of a push.
|
{- Runs an action that runs either the send or receive side of a push.
|
||||||
-
|
-
|
||||||
- While the push is running, netMessagesPush will get messages put into it
|
- While the push is running, netMessagesPush will get messages put into it
|
||||||
|
|
|
@ -35,7 +35,7 @@ import Data.Time.Clock
|
||||||
|
|
||||||
{- Whether to include verbose protocol dump in debug output. -}
|
{- Whether to include verbose protocol dump in debug output. -}
|
||||||
protocolDebug :: Bool
|
protocolDebug :: Bool
|
||||||
protocolDebug = True
|
protocolDebug = False
|
||||||
|
|
||||||
xmppClientThread :: UrlRenderer -> NamedThread
|
xmppClientThread :: UrlRenderer -> NamedThread
|
||||||
xmppClientThread urlrenderer = namedThread "XMPPClient" $
|
xmppClientThread urlrenderer = namedThread "XMPPClient" $
|
||||||
|
@ -97,10 +97,10 @@ xmppClient urlrenderer d creds =
|
||||||
inAssistant $ debug ["received:", show l]
|
inAssistant $ debug ["received:", show l]
|
||||||
mapM_ (handle selfjid) l
|
mapM_ (handle selfjid) l
|
||||||
|
|
||||||
handle _ (PresenceMessage p) = do
|
handle selfjid (PresenceMessage p) = do
|
||||||
|
|
||||||
void $ inAssistant $
|
void $ inAssistant $
|
||||||
updateBuddyList (updateBuddies p) <<~ buddyList
|
updateBuddyList (updateBuddies p) <<~ buddyList
|
||||||
|
resendImportantMessages selfjid p
|
||||||
handle _ (GotNetMessage QueryPresence) = putStanza gitAnnexSignature
|
handle _ (GotNetMessage QueryPresence) = putStanza gitAnnexSignature
|
||||||
handle _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $ pull us
|
handle _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $ pull us
|
||||||
handle selfjid (GotNetMessage (PairingNotification stage c u)) =
|
handle selfjid (GotNetMessage (PairingNotification stage c u)) =
|
||||||
|
@ -114,6 +114,16 @@ xmppClient urlrenderer d creds =
|
||||||
handle _ (Unknown _) = noop
|
handle _ (Unknown _) = noop
|
||||||
handle _ (ProtocolError _) = noop
|
handle _ (ProtocolError _) = noop
|
||||||
|
|
||||||
|
resendImportantMessages selfjid (Presence { presenceFrom = Just jid }) = do
|
||||||
|
let c = formatJID jid
|
||||||
|
(stored, sent) <- inAssistant $
|
||||||
|
checkImportantNetMessages (formatJID (baseJID jid), c)
|
||||||
|
forM_ (S.toList $ S.difference stored sent) $ \msg -> do
|
||||||
|
inAssistant $ debug ["sending to new client:", show c, show msg]
|
||||||
|
a <- inAssistant $ convertNetMsg (readdressNetMessage msg c) selfjid
|
||||||
|
a
|
||||||
|
inAssistant $ sentImportantNetMessage msg c
|
||||||
|
resendImportantMessages _ _ = noop
|
||||||
|
|
||||||
data XMPPEvent
|
data XMPPEvent
|
||||||
= GotNetMessage NetMessage
|
= GotNetMessage NetMessage
|
||||||
|
@ -151,21 +161,27 @@ decodeStanza _ s = [Unknown s]
|
||||||
- Chat messages must be directed to specific clients, not a base
|
- Chat messages must be directed to specific clients, not a base
|
||||||
- account JID, due to git-annex clients using a negative presence priority.
|
- account JID, due to git-annex clients using a negative presence priority.
|
||||||
- PairingNotification messages are always directed at specific
|
- PairingNotification messages are always directed at specific
|
||||||
- clients, but Pushing messages are sometimes not, and need to be exploded.
|
- clients, but Pushing messages are sometimes not, and need to be exploded
|
||||||
|
- out to specific clients.
|
||||||
|
-
|
||||||
|
- Important messages, not directed at any specific client,
|
||||||
|
- are cached to be sent later when additional clients connect.
|
||||||
-}
|
-}
|
||||||
relayNetMessage :: JID -> Assistant (XMPP ())
|
relayNetMessage :: JID -> Assistant (XMPP ())
|
||||||
relayNetMessage selfjid = do
|
relayNetMessage selfjid = do
|
||||||
msg <- waitNetMessage
|
msg <- waitNetMessage
|
||||||
when protocolDebug $
|
when protocolDebug $
|
||||||
debug ["sending:", show msg]
|
debug ["sending:", show msg]
|
||||||
|
handleImportant msg
|
||||||
convert msg
|
convert msg
|
||||||
where
|
where
|
||||||
convert (NotifyPush us) = return $ putStanza $ pushNotification us
|
handleImportant msg = case parseJID =<< isImportantNetMessage msg of
|
||||||
convert QueryPresence = return $ putStanza presenceQuery
|
Just tojid
|
||||||
convert (PairingNotification stage c u) = withclient c $ \tojid -> do
|
| tojid == baseJID tojid ->
|
||||||
changeBuddyPairing tojid True
|
storeImportantNetMessage msg (formatJID tojid) $
|
||||||
return $ putStanza $ pairingNotification stage u tojid selfjid
|
\c -> (baseJID <$> parseJID c) == Just tojid
|
||||||
convert (Pushing c pushstage) = withclient c $ \tojid -> do
|
_ -> noop
|
||||||
|
convert (Pushing c pushstage) = withOtherClient selfjid c $ \tojid -> do
|
||||||
if tojid == baseJID tojid
|
if tojid == baseJID tojid
|
||||||
then do
|
then do
|
||||||
clients <- maybe [] (S.toList . buddyAssistants)
|
clients <- maybe [] (S.toList . buddyAssistants)
|
||||||
|
@ -175,12 +191,29 @@ relayNetMessage selfjid = do
|
||||||
return $ forM_ (clients) $ \(Client jid) ->
|
return $ forM_ (clients) $ \(Client jid) ->
|
||||||
putStanza $ pushMessage pushstage jid selfjid
|
putStanza $ pushMessage pushstage jid selfjid
|
||||||
else return $ putStanza $ pushMessage pushstage tojid selfjid
|
else return $ putStanza $ pushMessage pushstage tojid selfjid
|
||||||
|
convert msg = convertNetMsg msg selfjid
|
||||||
|
|
||||||
withclient c a = case parseJID c of
|
{- Converts a NetMessage to an XMPP action. -}
|
||||||
Nothing -> return noop
|
convertNetMsg :: NetMessage -> JID -> Assistant (XMPP ())
|
||||||
Just tojid
|
convertNetMsg msg selfjid = convert msg
|
||||||
| tojid == selfjid -> return noop
|
where
|
||||||
| otherwise -> a tojid
|
convert (NotifyPush us) = return $ putStanza $ pushNotification us
|
||||||
|
convert QueryPresence = return $ putStanza presenceQuery
|
||||||
|
convert (PairingNotification stage c u) = withOtherClient selfjid c $ \tojid -> do
|
||||||
|
changeBuddyPairing tojid True
|
||||||
|
return $ putStanza $ pairingNotification stage u tojid selfjid
|
||||||
|
convert (Pushing c pushstage) = withOtherClient selfjid c $ \tojid ->
|
||||||
|
return $ putStanza $ pushMessage pushstage tojid selfjid
|
||||||
|
|
||||||
|
withOtherClient :: JID -> ClientID -> (JID -> Assistant (XMPP ())) -> (Assistant (XMPP ()))
|
||||||
|
withOtherClient selfjid c a = case parseJID c of
|
||||||
|
Nothing -> return noop
|
||||||
|
Just tojid
|
||||||
|
| tojid == selfjid -> return noop
|
||||||
|
| otherwise -> a tojid
|
||||||
|
|
||||||
|
withClient :: ClientID -> (JID -> XMPP ()) -> XMPP ()
|
||||||
|
withClient c a = maybe noop a $ parseJID c
|
||||||
|
|
||||||
{- Runs a XMPP action in a separate thread, using a session to allow it
|
{- Runs a XMPP action in a separate thread, using a session to allow it
|
||||||
- to access the same XMPP client. -}
|
- to access the same XMPP client. -}
|
||||||
|
|
|
@ -15,6 +15,7 @@ import Control.Concurrent.STM
|
||||||
import Control.Concurrent.MSampleVar
|
import Control.Concurrent.MSampleVar
|
||||||
import Data.ByteString (ByteString)
|
import Data.ByteString (ByteString)
|
||||||
import qualified Data.Set as S
|
import qualified Data.Set as S
|
||||||
|
import qualified Data.Map as M
|
||||||
|
|
||||||
{- Messages that can be sent out of band by a network messager. -}
|
{- Messages that can be sent out of band by a network messager. -}
|
||||||
data NetMessage
|
data NetMessage
|
||||||
|
@ -47,6 +48,18 @@ data PushStage
|
||||||
| ReceivePackDone ExitCode
|
| ReceivePackDone ExitCode
|
||||||
deriving (Show, Eq, Ord)
|
deriving (Show, Eq, Ord)
|
||||||
|
|
||||||
|
{- NetMessages that are important (and small), and should be stored to be
|
||||||
|
- resent when new clients are seen. -}
|
||||||
|
isImportantNetMessage :: NetMessage -> Maybe ClientID
|
||||||
|
isImportantNetMessage (Pushing c CanPush) = Just c
|
||||||
|
isImportantNetMessage (Pushing c PushRequest) = Just c
|
||||||
|
isImportantNetMessage _ = Nothing
|
||||||
|
|
||||||
|
readdressNetMessage :: NetMessage -> ClientID -> NetMessage
|
||||||
|
readdressNetMessage (PairingNotification stage _ uuid) c = PairingNotification stage c uuid
|
||||||
|
readdressNetMessage (Pushing _ stage) c = Pushing c stage
|
||||||
|
readdressNetMessage m _ = m
|
||||||
|
|
||||||
{- Things that initiate either side of a push, but do not actually send data. -}
|
{- Things that initiate either side of a push, but do not actually send data. -}
|
||||||
isPushInitiation :: PushStage -> Bool
|
isPushInitiation :: PushStage -> Bool
|
||||||
isPushInitiation CanPush = True
|
isPushInitiation CanPush = True
|
||||||
|
@ -81,6 +94,10 @@ getSide side m = m side
|
||||||
data NetMessager = NetMessager
|
data NetMessager = NetMessager
|
||||||
-- outgoing messages
|
-- outgoing messages
|
||||||
{ netMessages :: TChan (NetMessage)
|
{ netMessages :: TChan (NetMessage)
|
||||||
|
-- important messages for each client
|
||||||
|
, importantNetMessages :: TMVar (M.Map ClientID (S.Set NetMessage))
|
||||||
|
-- important messages that are believed to have been sent to a client
|
||||||
|
, sentImportantNetMessages :: TMVar (M.Map ClientID (S.Set NetMessage))
|
||||||
-- write to this to restart the net messager
|
-- write to this to restart the net messager
|
||||||
, netMessagerRestart :: MSampleVar ()
|
, netMessagerRestart :: MSampleVar ()
|
||||||
-- only one side of a push can be running at a time
|
-- only one side of a push can be running at a time
|
||||||
|
@ -94,8 +111,9 @@ data NetMessager = NetMessager
|
||||||
newNetMessager :: IO NetMessager
|
newNetMessager :: IO NetMessager
|
||||||
newNetMessager = NetMessager
|
newNetMessager = NetMessager
|
||||||
<$> atomically newTChan
|
<$> atomically newTChan
|
||||||
|
<*> atomically (newTMVar M.empty)
|
||||||
|
<*> atomically (newTMVar M.empty)
|
||||||
<*> newEmptySV
|
<*> newEmptySV
|
||||||
<*> mkSideMap (newTMVar Nothing)
|
<*> mkSideMap (newTMVar Nothing)
|
||||||
<*> mkSideMap newTChan
|
<*> mkSideMap newTChan
|
||||||
<*> mkSideMap (newTMVar S.empty)
|
<*> mkSideMap (newTMVar S.empty)
|
||||||
where
|
|
||||||
|
|
2
debian/changelog
vendored
2
debian/changelog
vendored
|
@ -27,6 +27,8 @@ git-annex (4.20130228) UNRELEASED; urgency=low
|
||||||
may do undesired things with it.
|
may do undesired things with it.
|
||||||
* assistant: Get back in sync with XMPP remotes after network reconnection,
|
* assistant: Get back in sync with XMPP remotes after network reconnection,
|
||||||
and on startup.
|
and on startup.
|
||||||
|
* assistant: XMPP git pull and push requests are cached and sent when
|
||||||
|
presence of a new client is detected.
|
||||||
|
|
||||||
-- Joey Hess <joeyh@debian.org> Wed, 27 Feb 2013 23:20:40 -0400
|
-- Joey Hess <joeyh@debian.org> Wed, 27 Feb 2013 23:20:40 -0400
|
||||||
|
|
||||||
|
|
|
@ -14,10 +14,6 @@ who share a repository, that is stored in the [[cloud]].
|
||||||
push notification use of XMPP, since unknown UUIDs will just be ignored.
|
push notification use of XMPP, since unknown UUIDs will just be ignored.
|
||||||
But XMPP pairing and the pushes over XMPP assume that anyone you're
|
But XMPP pairing and the pushes over XMPP assume that anyone you're
|
||||||
paired with is intending to sync to your repository.
|
paired with is intending to sync to your repository.
|
||||||
* At startup or network reconnect, git push messages are sent, often before
|
|
||||||
presence info has been gathered, so are not sent to any buddies.
|
|
||||||
Need to cache the last sent CanPush and PushRequest NetMessages for each
|
|
||||||
buddy, and when a new client is seen, resend.
|
|
||||||
|
|
||||||
## design goals
|
## design goals
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue