separate data type for push stages
This improves type safety.
This commit is contained in:
parent
dedd2a407e
commit
81953c2131
6 changed files with 76 additions and 76 deletions
|
@ -74,17 +74,19 @@ queueNetPushMessage m = do
|
|||
running <- readTMVar (netMessagerPushRunning nm)
|
||||
case running of
|
||||
NoPushRunning -> return False
|
||||
SendPushRunning cid -> go nm cid
|
||||
ReceivePushRunning cid -> go nm cid
|
||||
SendPushRunning runningcid -> do
|
||||
go nm m runningcid
|
||||
return True
|
||||
ReceivePushRunning runningcid -> do
|
||||
go nm m runningcid
|
||||
return True
|
||||
where
|
||||
go nm cid
|
||||
| getClientID m == Just cid = do
|
||||
writeTChan (netMessagesPush nm) m
|
||||
return True
|
||||
| otherwise = do
|
||||
when (isPushInitiationMessage m) $
|
||||
defer nm
|
||||
return True
|
||||
go nm (Pushing cid stage) runningcid
|
||||
| cid == runningcid = writeTChan (netMessagesPush nm) m
|
||||
| isPushInitiation stage = defer nm
|
||||
| otherwise = noop
|
||||
go _ _ _ = noop
|
||||
|
||||
defer nm = do
|
||||
s <- takeTMVar (netMessagesDeferredPush nm)
|
||||
putTMVar (netMessagesDeferredPush nm) $ S.insert m s
|
||||
|
|
|
@ -98,9 +98,10 @@ pushToRemotes now notifypushes remotes = do
|
|||
<*> inRepo Git.Branch.current
|
||||
<*> getUUID
|
||||
let (xmppremotes, normalremotes) = partition isXMPPRemote remotes
|
||||
r <- go True branch g u normalremotes
|
||||
mapM_ (sendNetMessage . CanPush . getXMPPClientID) xmppremotes
|
||||
return r
|
||||
ret <- go True branch g u normalremotes
|
||||
forM_ xmppremotes $ \r ->
|
||||
sendNetMessage $ Pushing (getXMPPClientID r) CanPush
|
||||
return ret
|
||||
where
|
||||
go _ Nothing _ _ _ = return True -- no branch, so nothing to do
|
||||
go shouldretry (Just branch) g u rs = do
|
||||
|
|
|
@ -96,11 +96,11 @@ xmppClient urlrenderer d = do
|
|||
handle _ (GotNetMessage (NotifyPush us)) = void $ inAssistant $ pull us
|
||||
handle selfjid (GotNetMessage (PairingNotification stage c u)) =
|
||||
maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c)
|
||||
handle _ (GotNetMessage pushmsg)
|
||||
| isPushInitiationMessage pushmsg = inAssistant $
|
||||
unlessM (queueNetPushMessage pushmsg) $
|
||||
void $ forkIO <~> handlePushMessage pushmsg
|
||||
| otherwise = void $ inAssistant $ queueNetPushMessage pushmsg
|
||||
handle _ (GotNetMessage m@(Pushing _ pushstage))
|
||||
| isPushInitiation pushstage = inAssistant $
|
||||
unlessM (queueNetPushMessage m) $
|
||||
void $ forkIO <~> handlePushMessage m
|
||||
| otherwise = void $ inAssistant $ queueNetPushMessage m
|
||||
handle _ (Ignorable _) = noop
|
||||
handle _ (Unknown _) = noop
|
||||
handle _ (ProtocolError _) = noop
|
||||
|
@ -158,12 +158,12 @@ relayNetMessage selfjid = convert =<< waitNetMessage
|
|||
convert (PairingNotification stage c u) = withclient c $ \tojid -> do
|
||||
changeBuddyPairing tojid True
|
||||
return $ putStanza $ pairingNotification stage u tojid selfjid
|
||||
convert (CanPush c) = sendclient c canPush
|
||||
convert (PushRequest c) = sendclient c pushRequest
|
||||
convert (StartingPush c) = sendclient c startingPush
|
||||
convert (ReceivePackOutput c b) = sendclient c $ receivePackOutput b
|
||||
convert (SendPackOutput c b) = sendclient c $ sendPackOutput b
|
||||
convert (ReceivePackDone c code) = sendclient c $ receivePackDone code
|
||||
convert (Pushing c CanPush) = sendclient c canPush
|
||||
convert (Pushing c PushRequest) = sendclient c pushRequest
|
||||
convert (Pushing c StartingPush) = sendclient c startingPush
|
||||
convert (Pushing c (ReceivePackOutput b)) = sendclient c $ receivePackOutput b
|
||||
convert (Pushing c (SendPackOutput b)) = sendclient c $ sendPackOutput b
|
||||
convert (Pushing c (ReceivePackDone code)) = sendclient c $ receivePackDone code
|
||||
|
||||
sendclient c construct = withclient c $ \tojid ->
|
||||
return $ putStanza $ construct tojid selfjid
|
||||
|
|
|
@ -25,39 +25,36 @@ data NetMessage
|
|||
-- notification about a stage in the pairing process,
|
||||
-- involving a client, and a UUID.
|
||||
| PairingNotification PairStage ClientID UUID
|
||||
-- indicates that we have data to push over the out of band network
|
||||
| CanPush ClientID
|
||||
-- request that a git push be sent over the out of band network
|
||||
| PushRequest ClientID
|
||||
-- indicates that a push is starting
|
||||
| StartingPush ClientID
|
||||
-- a chunk of output of git receive-pack
|
||||
| ReceivePackOutput ClientID ByteString
|
||||
-- a chuck of output of git send-pack
|
||||
| SendPackOutput ClientID ByteString
|
||||
-- sent when git receive-pack exits, with its exit code
|
||||
| ReceivePackDone ClientID ExitCode
|
||||
-- used for git push over the network messager
|
||||
| Pushing ClientID PushStage
|
||||
deriving (Show, Eq, Ord)
|
||||
|
||||
{- Something used to identify the client, or clients to send the message to. -}
|
||||
type ClientID = Text
|
||||
|
||||
getClientID :: NetMessage -> Maybe ClientID
|
||||
getClientID (NotifyPush _) = Nothing
|
||||
getClientID QueryPresence = Nothing
|
||||
getClientID (PairingNotification _ cid _) = Just cid
|
||||
getClientID (CanPush cid) = Just cid
|
||||
getClientID (PushRequest cid) = Just cid
|
||||
getClientID (StartingPush cid) = Just cid
|
||||
getClientID (ReceivePackOutput cid _) = Just cid
|
||||
getClientID (SendPackOutput cid _) = Just cid
|
||||
getClientID (ReceivePackDone cid _) = Just cid
|
||||
data PushStage
|
||||
-- indicates that we have data to push over the out of band network
|
||||
= CanPush
|
||||
-- request that a git push be sent over the out of band network
|
||||
| PushRequest
|
||||
-- indicates that a push is starting
|
||||
| StartingPush
|
||||
-- a chunk of output of git receive-pack
|
||||
| ReceivePackOutput ByteString
|
||||
-- a chuck of output of git send-pack
|
||||
| SendPackOutput ByteString
|
||||
-- sent when git receive-pack exits, with its exit code
|
||||
| ReceivePackDone ExitCode
|
||||
deriving (Show, Eq, Ord)
|
||||
|
||||
isPushInitiationMessage :: NetMessage -> Bool
|
||||
isPushInitiationMessage (CanPush _) = True
|
||||
isPushInitiationMessage (PushRequest _) = True
|
||||
isPushInitiationMessage (StartingPush _) = True
|
||||
isPushInitiationMessage _ = False
|
||||
data PushRunning = NoPushRunning | SendPushRunning ClientID | ReceivePushRunning ClientID
|
||||
deriving (Eq)
|
||||
|
||||
isPushInitiation :: PushStage -> Bool
|
||||
isPushInitiation CanPush = True
|
||||
isPushInitiation PushRequest = True
|
||||
isPushInitiation StartingPush = True
|
||||
isPushInitiation _ = False
|
||||
|
||||
data NetMessager = NetMessager
|
||||
-- outgoing messages
|
||||
|
@ -72,9 +69,6 @@ data NetMessager = NetMessager
|
|||
, netMessagerRestart :: MSampleVar ()
|
||||
}
|
||||
|
||||
data PushRunning = NoPushRunning | SendPushRunning ClientID | ReceivePushRunning ClientID
|
||||
deriving (Eq)
|
||||
|
||||
newNetMessager :: IO NetMessager
|
||||
newNetMessager = NetMessager
|
||||
<$> atomically newTChan
|
||||
|
|
|
@ -137,7 +137,7 @@ canPush :: JID -> JID -> Message
|
|||
canPush = gitAnnexMessage $ gitAnnexTag canPushAttr T.empty
|
||||
|
||||
decodeCanPush :: Message -> GitAnnexTagInfo -> Maybe NetMessage
|
||||
decodeCanPush m _ = CanPush <$> (formatJID <$> messageFrom m)
|
||||
decodeCanPush m _ = Pushing <$> (formatJID <$> messageFrom m) <*> pure CanPush
|
||||
|
||||
canPushAttr :: Name
|
||||
canPushAttr = "canpush"
|
||||
|
@ -146,7 +146,7 @@ pushRequest :: JID -> JID -> Message
|
|||
pushRequest = gitAnnexMessage $ gitAnnexTag pushRequestAttr T.empty
|
||||
|
||||
decodePushRequest :: Message -> GitAnnexTagInfo -> Maybe NetMessage
|
||||
decodePushRequest m _ = PushRequest <$> (formatJID <$> messageFrom m)
|
||||
decodePushRequest m _ = Pushing <$> (formatJID <$> messageFrom m) <*> pure PushRequest
|
||||
|
||||
pushRequestAttr :: Name
|
||||
pushRequestAttr = "pushrequest"
|
||||
|
@ -158,7 +158,7 @@ startingPushAttr :: Name
|
|||
startingPushAttr = "startingpush"
|
||||
|
||||
decodeStartingPush :: Message -> GitAnnexTagInfo -> Maybe NetMessage
|
||||
decodeStartingPush m _ = StartingPush <$> (formatJID <$> messageFrom m)
|
||||
decodeStartingPush m _ = Pushing <$> (formatJID <$> messageFrom m) <*> pure StartingPush
|
||||
|
||||
receivePackOutput :: ByteString -> JID -> JID -> Message
|
||||
receivePackOutput = gitAnnexMessage .
|
||||
|
@ -168,9 +168,9 @@ receivePackAttr :: Name
|
|||
receivePackAttr = "rp"
|
||||
|
||||
decodeReceivePackOutput :: Message -> GitAnnexTagInfo -> Maybe NetMessage
|
||||
decodeReceivePackOutput m i = ReceivePackOutput
|
||||
decodeReceivePackOutput m i = Pushing
|
||||
<$> (formatJID <$> messageFrom m)
|
||||
<*> decodeTagContent (tagElement i)
|
||||
<*> (ReceivePackOutput <$> decodeTagContent (tagElement i))
|
||||
|
||||
sendPackOutput :: ByteString -> JID -> JID -> Message
|
||||
sendPackOutput = gitAnnexMessage .
|
||||
|
@ -180,9 +180,9 @@ sendPackAttr :: Name
|
|||
sendPackAttr = "sp"
|
||||
|
||||
decodeSendPackOutput :: Message -> GitAnnexTagInfo -> Maybe NetMessage
|
||||
decodeSendPackOutput m i = SendPackOutput
|
||||
decodeSendPackOutput m i = Pushing
|
||||
<$> (formatJID <$> messageFrom m)
|
||||
<*> decodeTagContent (tagElement i)
|
||||
<*> (SendPackOutput <$> decodeTagContent (tagElement i))
|
||||
|
||||
receivePackDone :: ExitCode -> JID -> JID -> Message
|
||||
receivePackDone = gitAnnexMessage . gitAnnexTag receivePackDoneAttr . T.pack . show . toi
|
||||
|
@ -191,9 +191,9 @@ receivePackDone = gitAnnexMessage . gitAnnexTag receivePackDoneAttr . T.pack . s
|
|||
toi (ExitFailure i) = i
|
||||
|
||||
decodeReceivePackDone :: Message -> GitAnnexTagInfo -> Maybe NetMessage
|
||||
decodeReceivePackDone m i = ReceivePackDone
|
||||
decodeReceivePackDone m i = Pushing
|
||||
<$> (formatJID <$> messageFrom m)
|
||||
<*> (convert <$> readish (T.unpack $ tagValue i))
|
||||
<*> (ReceivePackDone . convert <$> readish (T.unpack $ tagValue i))
|
||||
where
|
||||
convert 0 = ExitSuccess
|
||||
convert n = ExitFailure n
|
||||
|
|
|
@ -74,7 +74,7 @@ makeXMPPGitRemote buddyname jid u = do
|
|||
-}
|
||||
xmppPush :: ClientID -> Remote -> [Ref] -> Assistant Bool
|
||||
xmppPush cid remote refs = runPush (SendPushRunning cid) handleDeferred $ do
|
||||
sendNetMessage $ StartingPush cid
|
||||
sendNetMessage $ Pushing cid StartingPush
|
||||
|
||||
(Fd inf, writepush) <- liftIO createPipe
|
||||
(readpush, Fd outf) <- liftIO createPipe
|
||||
|
@ -118,14 +118,16 @@ xmppPush cid remote refs = runPush (SendPushRunning cid) handleDeferred $ do
|
|||
b <- liftIO $ B.hGetSome inh chunkSize
|
||||
if B.null b
|
||||
then liftIO $ killThread =<< myThreadId
|
||||
else sendNetMessage $ SendPackOutput cid b
|
||||
else sendNetMessage $ Pushing cid $ SendPackOutput b
|
||||
fromxmpp outh controlh = forever $ do
|
||||
m <- waitNetPushMessage
|
||||
case m of
|
||||
(ReceivePackOutput _ b) -> liftIO $ writeChunk outh b
|
||||
(ReceivePackDone _ exitcode) -> liftIO $ do
|
||||
hPrint controlh exitcode
|
||||
hFlush controlh
|
||||
(Pushing _ (ReceivePackOutput b)) ->
|
||||
liftIO $ writeChunk outh b
|
||||
(Pushing _ (ReceivePackDone exitcode)) ->
|
||||
liftIO $ do
|
||||
hPrint controlh exitcode
|
||||
hFlush controlh
|
||||
_ -> noop
|
||||
installwrapper tmpdir = liftIO $ do
|
||||
createDirectoryIfMissing True tmpdir
|
||||
|
@ -197,7 +199,7 @@ xmppReceivePack cid = runPush (ReceivePushRunning cid) handleDeferred $ do
|
|||
readertid <- forkIO <~> relayfromxmpp inh
|
||||
relaytoxmpp outh
|
||||
code <- liftIO $ waitForProcess pid
|
||||
void $ sendNetMessage $ ReceivePackDone cid code
|
||||
void $ sendNetMessage $ Pushing cid $ ReceivePackDone code
|
||||
liftIO $ do
|
||||
killThread readertid
|
||||
hClose inh
|
||||
|
@ -208,12 +210,13 @@ xmppReceivePack cid = runPush (ReceivePushRunning cid) handleDeferred $ do
|
|||
b <- liftIO $ B.hGetSome outh chunkSize
|
||||
-- empty is EOF, so exit
|
||||
unless (B.null b) $ do
|
||||
sendNetMessage $ ReceivePackOutput cid b
|
||||
sendNetMessage $ Pushing cid $ ReceivePackOutput b
|
||||
relaytoxmpp outh
|
||||
relayfromxmpp inh = forever $ do
|
||||
m <- waitNetPushMessage
|
||||
case m of
|
||||
(SendPackOutput _ b) -> liftIO $ writeChunk inh b
|
||||
(Pushing _ (SendPackOutput b)) ->
|
||||
liftIO $ writeChunk inh b
|
||||
_ -> noop
|
||||
|
||||
xmppRemotes :: ClientID -> Assistant [Remote]
|
||||
|
@ -230,15 +233,15 @@ whenXMPPRemote :: ClientID -> Assistant () -> Assistant ()
|
|||
whenXMPPRemote cid = unlessM (null <$> xmppRemotes cid)
|
||||
|
||||
handlePushMessage :: NetMessage -> Assistant ()
|
||||
handlePushMessage (CanPush cid) = whenXMPPRemote cid $
|
||||
sendNetMessage $ PushRequest cid
|
||||
handlePushMessage (PushRequest cid) = do
|
||||
handlePushMessage (Pushing cid CanPush) = whenXMPPRemote cid $
|
||||
sendNetMessage $ Pushing cid PushRequest
|
||||
handlePushMessage (Pushing cid PushRequest) = do
|
||||
rs <- xmppRemotes cid
|
||||
current <- liftAnnex $ inRepo Git.Branch.current
|
||||
--let refs = catMaybes [current, Just Annex.Branch.fullname] -- TODO
|
||||
let refs = [Ref "master:refs/remotes/xmpp/newmaster"]
|
||||
forM_ rs $ \r -> xmppPush cid r refs
|
||||
handlePushMessage (StartingPush cid) = whenXMPPRemote cid $
|
||||
handlePushMessage (Pushing cid StartingPush) = whenXMPPRemote cid $
|
||||
void $ xmppReceivePack cid
|
||||
handlePushMessage _ = noop
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue