assistant: Added sequence numbers to XMPP git push packets. (Not yet used.)
For backwards compatability, "" is treated as "0" sequence number. --debug will show xmpp sequence numbers now, but they are not otherwise used.
This commit is contained in:
parent
92b1b8e9ab
commit
271a919d14
5 changed files with 45 additions and 27 deletions
|
@ -42,13 +42,18 @@ data PushStage
|
||||||
-- indicates that a push is starting
|
-- indicates that a push is starting
|
||||||
| StartingPush
|
| StartingPush
|
||||||
-- a chunk of output of git receive-pack
|
-- a chunk of output of git receive-pack
|
||||||
| ReceivePackOutput ByteString
|
| ReceivePackOutput SequenceNum ByteString
|
||||||
-- a chuck of output of git send-pack
|
-- a chuck of output of git send-pack
|
||||||
| SendPackOutput ByteString
|
| SendPackOutput SequenceNum ByteString
|
||||||
-- sent when git receive-pack exits, with its exit code
|
-- sent when git receive-pack exits, with its exit code
|
||||||
| ReceivePackDone ExitCode
|
| ReceivePackDone ExitCode
|
||||||
deriving (Show, Eq, Ord)
|
deriving (Show, Eq, Ord)
|
||||||
|
|
||||||
|
{- A sequence number. Incremented by one per packet in a sequence,
|
||||||
|
- starting with 1 for the first packet. 0 means sequence numbers are
|
||||||
|
- not being used. -}
|
||||||
|
type SequenceNum = Int
|
||||||
|
|
||||||
{- NetMessages that are important (and small), and should be stored to be
|
{- NetMessages that are important (and small), and should be stored to be
|
||||||
- resent when new clients are seen. -}
|
- resent when new clients are seen. -}
|
||||||
isImportantNetMessage :: NetMessage -> Maybe ClientID
|
isImportantNetMessage :: NetMessage -> Maybe ClientID
|
||||||
|
@ -64,8 +69,8 @@ readdressNetMessage m _ = m
|
||||||
{- Convert a NetMessage to something that can be logged. -}
|
{- Convert a NetMessage to something that can be logged. -}
|
||||||
sanitizeNetMessage :: NetMessage -> NetMessage
|
sanitizeNetMessage :: NetMessage -> NetMessage
|
||||||
sanitizeNetMessage (Pushing c stage) = Pushing c $ case stage of
|
sanitizeNetMessage (Pushing c stage) = Pushing c $ case stage of
|
||||||
ReceivePackOutput _ -> ReceivePackOutput elided
|
ReceivePackOutput n _ -> ReceivePackOutput n elided
|
||||||
SendPackOutput _ -> SendPackOutput elided
|
SendPackOutput n _ -> SendPackOutput n elided
|
||||||
s -> s
|
s -> s
|
||||||
where
|
where
|
||||||
elided = B8.pack "<elided>"
|
elided = B8.pack "<elided>"
|
||||||
|
@ -85,8 +90,8 @@ pushDestinationSide :: PushStage -> PushSide
|
||||||
pushDestinationSide CanPush = ReceivePack
|
pushDestinationSide CanPush = ReceivePack
|
||||||
pushDestinationSide PushRequest = SendPack
|
pushDestinationSide PushRequest = SendPack
|
||||||
pushDestinationSide StartingPush = ReceivePack
|
pushDestinationSide StartingPush = ReceivePack
|
||||||
pushDestinationSide (ReceivePackOutput _) = SendPack
|
pushDestinationSide (ReceivePackOutput _ _) = SendPack
|
||||||
pushDestinationSide (SendPackOutput _) = ReceivePack
|
pushDestinationSide (SendPackOutput _ _) = ReceivePack
|
||||||
pushDestinationSide (ReceivePackDone _) = SendPack
|
pushDestinationSide (ReceivePackDone _) = SendPack
|
||||||
|
|
||||||
type SideMap a = PushSide -> a
|
type SideMap a = PushSide -> a
|
||||||
|
|
|
@ -134,13 +134,13 @@ pushMessage = gitAnnexMessage . encode
|
||||||
encode CanPush = gitAnnexTag canPushAttr T.empty
|
encode CanPush = gitAnnexTag canPushAttr T.empty
|
||||||
encode PushRequest = gitAnnexTag pushRequestAttr T.empty
|
encode PushRequest = gitAnnexTag pushRequestAttr T.empty
|
||||||
encode StartingPush = gitAnnexTag startingPushAttr T.empty
|
encode StartingPush = gitAnnexTag startingPushAttr T.empty
|
||||||
encode (ReceivePackOutput b) =
|
encode (ReceivePackOutput n b) =
|
||||||
gitAnnexTagContent receivePackAttr T.empty $ encodeTagContent b
|
gitAnnexTagContent receivePackAttr (val n) $ encodeTagContent b
|
||||||
encode (SendPackOutput b) =
|
encode (SendPackOutput n b) =
|
||||||
gitAnnexTagContent sendPackAttr T.empty $ encodeTagContent b
|
gitAnnexTagContent sendPackAttr (val n) $ encodeTagContent b
|
||||||
encode (ReceivePackDone code) =
|
encode (ReceivePackDone code) =
|
||||||
gitAnnexTag receivePackDoneAttr $
|
gitAnnexTag receivePackDoneAttr $ val $ encodeExitCode code
|
||||||
T.pack $ show $ encodeExitCode code
|
val = T.pack . show
|
||||||
|
|
||||||
decodeMessage :: Message -> Maybe NetMessage
|
decodeMessage :: Message -> Maybe NetMessage
|
||||||
decodeMessage m = decode =<< gitAnnexTagInfo m
|
decodeMessage m = decode =<< gitAnnexTagInfo m
|
||||||
|
@ -160,10 +160,8 @@ decodeMessage m = decode =<< gitAnnexTagInfo m
|
||||||
, pushdecoder $ const $ Just CanPush
|
, pushdecoder $ const $ Just CanPush
|
||||||
, pushdecoder $ const $ Just PushRequest
|
, pushdecoder $ const $ Just PushRequest
|
||||||
, pushdecoder $ const $ Just StartingPush
|
, pushdecoder $ const $ Just StartingPush
|
||||||
, pushdecoder $
|
, pushdecoder $ gen ReceivePackOutput
|
||||||
fmap ReceivePackOutput . decodeTagContent . tagElement
|
, pushdecoder $ gen SendPackOutput
|
||||||
, pushdecoder $
|
|
||||||
fmap SendPackOutput . decodeTagContent . tagElement
|
|
||||||
, pushdecoder $
|
, pushdecoder $
|
||||||
fmap (ReceivePackDone . decodeExitCode) . readish .
|
fmap (ReceivePackDone . decodeExitCode) . readish .
|
||||||
T.unpack . tagValue
|
T.unpack . tagValue
|
||||||
|
@ -171,6 +169,10 @@ decodeMessage m = decode =<< gitAnnexTagInfo m
|
||||||
pushdecoder a m' i = Pushing
|
pushdecoder a m' i = Pushing
|
||||||
<$> (formatJID <$> messageFrom m')
|
<$> (formatJID <$> messageFrom m')
|
||||||
<*> a i
|
<*> a i
|
||||||
|
gen c i = do
|
||||||
|
packet <- decodeTagContent $ tagElement i
|
||||||
|
let sequence = fromMaybe 0 $ readish $ T.unpack $ tagValue i
|
||||||
|
return $ c sequence packet
|
||||||
|
|
||||||
decodeExitCode :: Int -> ExitCode
|
decodeExitCode :: Int -> ExitCode
|
||||||
decodeExitCode 0 = ExitSuccess
|
decodeExitCode 0 = ExitSuccess
|
||||||
|
|
|
@ -108,7 +108,7 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
|
||||||
outh <- liftIO $ fdToHandle writepush
|
outh <- liftIO $ fdToHandle writepush
|
||||||
controlh <- liftIO $ fdToHandle writecontrol
|
controlh <- liftIO $ fdToHandle writecontrol
|
||||||
|
|
||||||
t1 <- forkIO <~> toxmpp inh
|
t1 <- forkIO <~> toxmpp 0 inh
|
||||||
t2 <- forkIO <~> fromxmpp outh controlh
|
t2 <- forkIO <~> fromxmpp outh controlh
|
||||||
|
|
||||||
{- This can take a long time to run, so avoid running it in the
|
{- This can take a long time to run, so avoid running it in the
|
||||||
|
@ -122,15 +122,19 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
|
||||||
|
|
||||||
return r
|
return r
|
||||||
where
|
where
|
||||||
toxmpp inh = forever $ do
|
toxmpp seqnum inh = do
|
||||||
b <- liftIO $ B.hGetSome inh chunkSize
|
b <- liftIO $ B.hGetSome inh chunkSize
|
||||||
if B.null b
|
if B.null b
|
||||||
then liftIO $ killThread =<< myThreadId
|
then liftIO $ killThread =<< myThreadId
|
||||||
else sendNetMessage $ Pushing cid $ SendPackOutput b
|
else do
|
||||||
|
let seqnum' = succ seqnum
|
||||||
|
sendNetMessage $ Pushing cid $
|
||||||
|
SendPackOutput seqnum' b
|
||||||
|
toxmpp seqnum' inh
|
||||||
fromxmpp outh controlh = forever $ do
|
fromxmpp outh controlh = forever $ do
|
||||||
m <- timeout xmppTimeout <~> waitNetPushMessage SendPack
|
m <- timeout xmppTimeout <~> waitNetPushMessage SendPack
|
||||||
case m of
|
case m of
|
||||||
(Just (Pushing _ (ReceivePackOutput b))) ->
|
(Just (Pushing _ (ReceivePackOutput _ b))) ->
|
||||||
liftIO $ writeChunk outh b
|
liftIO $ writeChunk outh b
|
||||||
(Just (Pushing _ (ReceivePackDone exitcode))) ->
|
(Just (Pushing _ (ReceivePackDone exitcode))) ->
|
||||||
liftIO $ do
|
liftIO $ do
|
||||||
|
@ -213,7 +217,7 @@ xmppReceivePack cid handledeferred = runPush ReceivePack cid handledeferred $ do
|
||||||
}
|
}
|
||||||
(Just inh, Just outh, _, pid) <- liftIO $ createProcess p
|
(Just inh, Just outh, _, pid) <- liftIO $ createProcess p
|
||||||
readertid <- forkIO <~> relayfromxmpp inh
|
readertid <- forkIO <~> relayfromxmpp inh
|
||||||
relaytoxmpp outh
|
relaytoxmpp 0 outh
|
||||||
code <- liftIO $ waitForProcess pid
|
code <- liftIO $ waitForProcess pid
|
||||||
void $ sendNetMessage $ Pushing cid $ ReceivePackDone code
|
void $ sendNetMessage $ Pushing cid $ ReceivePackDone code
|
||||||
liftIO $ do
|
liftIO $ do
|
||||||
|
@ -222,16 +226,17 @@ xmppReceivePack cid handledeferred = runPush ReceivePack cid handledeferred $ do
|
||||||
hClose outh
|
hClose outh
|
||||||
return $ code == ExitSuccess
|
return $ code == ExitSuccess
|
||||||
where
|
where
|
||||||
relaytoxmpp outh = do
|
relaytoxmpp seqnum outh = do
|
||||||
b <- liftIO $ B.hGetSome outh chunkSize
|
b <- liftIO $ B.hGetSome outh chunkSize
|
||||||
-- empty is EOF, so exit
|
-- empty is EOF, so exit
|
||||||
unless (B.null b) $ do
|
unless (B.null b) $ do
|
||||||
sendNetMessage $ Pushing cid $ ReceivePackOutput b
|
let seqnum' = succ seqnum
|
||||||
relaytoxmpp outh
|
sendNetMessage $ Pushing cid $ ReceivePackOutput seqnum' b
|
||||||
|
relaytoxmpp seqnum' outh
|
||||||
relayfromxmpp inh = forever $ do
|
relayfromxmpp inh = forever $ do
|
||||||
m <- timeout xmppTimeout <~> waitNetPushMessage ReceivePack
|
m <- timeout xmppTimeout <~> waitNetPushMessage ReceivePack
|
||||||
case m of
|
case m of
|
||||||
(Just (Pushing _ (SendPackOutput b))) ->
|
(Just (Pushing _ (SendPackOutput _ b))) ->
|
||||||
liftIO $ writeChunk inh b
|
liftIO $ writeChunk inh b
|
||||||
(Just _) -> noop
|
(Just _) -> noop
|
||||||
Nothing -> do
|
Nothing -> do
|
||||||
|
|
1
debian/changelog
vendored
1
debian/changelog
vendored
|
@ -18,6 +18,7 @@ git-annex (4.20130406) UNRELEASED; urgency=low
|
||||||
VERSION_FROM_CHANGELOG.
|
VERSION_FROM_CHANGELOG.
|
||||||
* webapp: Added animations.
|
* webapp: Added animations.
|
||||||
* assistant: Stop any transfers the assistant initiated on shutdown.
|
* assistant: Stop any transfers the assistant initiated on shutdown.
|
||||||
|
* assistant: Added sequence numbers to XMPP git push packets. (Not yet used.)
|
||||||
|
|
||||||
-- Joey Hess <joeyh@debian.org> Sat, 06 Apr 2013 15:24:15 -0400
|
-- Joey Hess <joeyh@debian.org> Sat, 06 Apr 2013 15:24:15 -0400
|
||||||
|
|
||||||
|
|
|
@ -88,17 +88,22 @@ When a peer is ready to send a git push, it sends:
|
||||||
The receiver runs `git receive-pack`, and sends back its output in
|
The receiver runs `git receive-pack`, and sends back its output in
|
||||||
one or more chat messages, directed to the client that is pushing:
|
one or more chat messages, directed to the client that is pushing:
|
||||||
|
|
||||||
<git-annex xmlns='git-annex' rp="">
|
<git-annex xmlns='git-annex' rp="N">
|
||||||
007b27ca394d26a05d9b6beefa1b07da456caa2157d7 refs/heads/git-annex report-status delete-refs side-band-64k quiet ofs-delta
|
007b27ca394d26a05d9b6beefa1b07da456caa2157d7 refs/heads/git-annex report-status delete-refs side-band-64k quiet ofs-delta
|
||||||
</git-annex>
|
</git-annex>
|
||||||
|
|
||||||
The sender replies with the data from `git push`, in
|
The sender replies with the data from `git push`, in
|
||||||
one or more chat messages, directed to the receiver:
|
one or more chat messages, directed to the receiver:
|
||||||
|
|
||||||
<git-annex xmlns='git-annex' sp="">
|
<git-annex xmlns='git-annex' sp="N">
|
||||||
data
|
data
|
||||||
</git-annex>
|
</git-annex>
|
||||||
|
|
||||||
|
The value of rp and sp used to be empty, but now it's a sequence number.
|
||||||
|
This indicates the sequence of this packet, counting from 1. The receiver
|
||||||
|
and sender each have their own sequence numbers. These sequence numbers
|
||||||
|
are not really used yet, but are available for debugging.
|
||||||
|
|
||||||
When `git receive-pack` exits, the receiver indicates its exit
|
When `git receive-pack` exits, the receiver indicates its exit
|
||||||
status with a chat message, directed at the sender:
|
status with a chat message, directed at the sender:
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue