XMPP: Ignore duplicate messages received when pushing.

This commit is contained in:
Joey Hess 2013-05-20 21:58:59 -04:00
parent 69a1529c89
commit 230aed671f
3 changed files with 77 additions and 47 deletions

View file

@ -43,6 +43,22 @@ import System.Timeout
import qualified Data.ByteString as B import qualified Data.ByteString as B
import qualified Data.Map as M import qualified Data.Map as M
{- Largest chunk of data to send in a single XMPP message. -}
chunkSize :: Int
chunkSize = 4096
{- How long to wait for an expected message before assuming the other side
- has gone away and canceling a push.
-
- This needs to be long enough to allow a message of up to 2+ times
- chunkSize to propigate up to a XMPP server, perhaps across to another
- server, and back down to us. On the other hand, other XMPP pushes can be
- delayed for running until the timeout is reached, so it should not be
- excessive.
-}
xmppTimeout :: Int
xmppTimeout = 120000000 -- 120 seconds
finishXMPPPairing :: JID -> UUID -> Assistant () finishXMPPPairing :: JID -> UUID -> Assistant ()
finishXMPPPairing jid u = void $ alertWhile alert $ finishXMPPPairing jid u = void $ alertWhile alert $
makeXMPPGitRemote buddy (baseJID jid) u makeXMPPGitRemote buddy (baseJID jid) u
@ -132,24 +148,25 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
sendNetMessage $ Pushing cid $ sendNetMessage $ Pushing cid $
SendPackOutput seqnum' b SendPackOutput seqnum' b
toxmpp seqnum' inh toxmpp seqnum' inh
fromxmpp outh controlh = forever $ do
m <- timeout xmppTimeout <~> waitNetPushMessage SendPack fromxmpp outh controlh = withPushMessagesInSequence SendPack handle
case m of where
(Just (Pushing _ (ReceivePackOutput _ b))) -> handle (Just (Pushing _ (ReceivePackOutput _ b))) =
liftIO $ writeChunk outh b liftIO $ writeChunk outh b
(Just (Pushing _ (ReceivePackDone exitcode))) -> handle (Just (Pushing _ (ReceivePackDone exitcode))) =
liftIO $ do liftIO $ do
hPrint controlh exitcode hPrint controlh exitcode
hFlush controlh hFlush controlh
(Just _) -> noop handle (Just _) = noop
Nothing -> do handle Nothing = do
debug ["timeout waiting for git receive-pack output via XMPP"] debug ["timeout waiting for git receive-pack output via XMPP"]
-- Send a synthetic exit code to git-annex -- Send a synthetic exit code to git-annex
-- xmppgit, which will exit and cause git push -- xmppgit, which will exit and cause git push
-- to die. -- to die.
liftIO $ do liftIO $ do
hPrint controlh (ExitFailure 1) hPrint controlh (ExitFailure 1)
hFlush controlh hFlush controlh
installwrapper tmpdir = liftIO $ do installwrapper tmpdir = liftIO $ do
createDirectoryIfMissing True tmpdir createDirectoryIfMissing True tmpdir
let wrapper = tmpdir </> "git-remote-xmpp" let wrapper = tmpdir </> "git-remote-xmpp"
@ -159,6 +176,7 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
, "exec " ++ program ++ " xmppgit" , "exec " ++ program ++ " xmppgit"
] ]
modifyFileMode wrapper $ addModes executeModes modifyFileMode wrapper $ addModes executeModes
{- Use GIT_ANNEX_TMP_DIR if set, since that may be a better temp {- Use GIT_ANNEX_TMP_DIR if set, since that may be a better temp
- dir (ie, not on a crippled filesystem where we can't make - dir (ie, not on a crippled filesystem where we can't make
- the wrapper executable). -} - the wrapper executable). -}
@ -169,7 +187,6 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
tmp <- liftAnnex $ fromRepo gitAnnexTmpDir tmp <- liftAnnex $ fromRepo gitAnnexTmpDir
return $ tmp </> "xmppgit" return $ tmp </> "xmppgit"
Just d -> return $ d </> "xmppgit" Just d -> return $ d </> "xmppgit"
type EnvVar = String type EnvVar = String
@ -245,19 +262,17 @@ xmppReceivePack cid handledeferred = runPush ReceivePack cid handledeferred $ do
let seqnum' = succ seqnum let seqnum' = succ seqnum
sendNetMessage $ Pushing cid $ ReceivePackOutput seqnum' b sendNetMessage $ Pushing cid $ ReceivePackOutput seqnum' b
relaytoxmpp seqnum' outh relaytoxmpp seqnum' outh
relayfromxmpp inh = forever $ do relayfromxmpp inh = withPushMessagesInSequence ReceivePack handle
m <- timeout xmppTimeout <~> waitNetPushMessage ReceivePack where
case m of handle (Just (Pushing _ (SendPackOutput _ b))) =
(Just (Pushing _ (SendPackOutput _ b))) -> liftIO $ writeChunk inh b
liftIO $ writeChunk inh b handle (Just _) = noop
(Just _) -> noop handle Nothing = do
Nothing -> do debug ["timeout waiting for git send-pack output via XMPP"]
debug ["timeout waiting for git send-pack output via XMPP"] -- closing the handle will make git receive-pack exit
-- closing the handle will make liftIO $ do
-- git receive-pack exit hClose inh
liftIO $ do killThread =<< myThreadId
hClose inh
killThread =<< myThreadId
xmppRemotes :: ClientID -> UUID -> Assistant [Remote] xmppRemotes :: ClientID -> UUID -> Assistant [Remote]
xmppRemotes cid theiruuid = case baseJID <$> parseJID cid of xmppRemotes cid theiruuid = case baseJID <$> parseJID cid of
@ -310,18 +325,33 @@ writeChunk h b = do
B.hPut h b B.hPut h b
hFlush h hFlush h
{- Largest chunk of data to send in a single XMPP message. -} {- Gets NetMessages for a PushSide, ensures they are in order,
chunkSize :: Int - and runs an action to handle each in turn. The action will be passed
chunkSize = 4096 - Nothing on timeout.
{- How long to wait for an expected message before assuming the other side
- has gone away and canceling a push.
- -
- This needs to be long enough to allow a message of up to 2+ times - Does not currently reorder messages, but does ensure that any
- chunkSize to propigate up to a XMPP server, perhaps across to another - duplicate messages, or messages not in the sequence, are discarded.
- server, and back down to us. On the other hand, other XMPP pushes can be
- delayed for running until the timeout is reached, so it should not be
- excessive.
-} -}
xmppTimeout :: Int withPushMessagesInSequence :: PushSide -> (Maybe NetMessage -> Assistant ()) -> Assistant ()
xmppTimeout = 120000000 -- 120 seconds withPushMessagesInSequence side a = loop 0
where
loop seqnum = do
m <- timeout xmppTimeout <~> waitNetPushMessage side
let go s = a m >> loop s
case extractSequence =<< m of
Just seqnum'
| seqnum' == seqnum + 1 -> go seqnum'
| seqnum' == 0 -> go seqnum
| seqnum' == seqnum -> do
debug ["ignoring duplicate sequence number", show seqnum]
loop seqnum
| otherwise -> do
debug ["ignoring out of order sequence number", show seqnum', "expected", show seqnum]
loop seqnum
Nothing -> go seqnum
extractSequence :: NetMessage -> Maybe Int
extractSequence (Pushing _ (ReceivePackOutput seqnum _)) = Just seqnum
extractSequence (Pushing _ (SendPackOutput seqnum _)) = Just seqnum
extractSequence _ = Nothing

1
debian/changelog vendored
View file

@ -22,6 +22,7 @@ git-annex (4.20130517) UNRELEASED; urgency=low
* OSX: Fixed gpg included in dmg. * OSX: Fixed gpg included in dmg.
* Linux standalone: Back to being built with glibc 2.13 for maximum * Linux standalone: Back to being built with glibc 2.13 for maximum
portability. portability.
* XMPP: Ignore duplicate messages received when pushing.
-- Joey Hess <joeyh@debian.org> Fri, 17 May 2013 11:17:03 -0400 -- Joey Hess <joeyh@debian.org> Fri, 17 May 2013 11:17:03 -0400

View file

@ -96,8 +96,7 @@ one or more chat messages, directed to the receiver:
The value of rp and sp used to be empty, but now it's a sequence number. The value of rp and sp used to be empty, but now it's a sequence number.
This indicates the sequence of this packet, counting from 1. The receiver This indicates the sequence of this packet, counting from 1. The receiver
and sender each have their own sequence numbers. These sequence numbers and sender each have their own sequence numbers.
are not really used yet, but are available for debugging.
When `git receive-pack` exits, the receiver indicates its exit When `git receive-pack` exits, the receiver indicates its exit
status with a chat message, directed at the sender: status with a chat message, directed at the sender: