allow both one push and one receive-pack to run at the same time

Noticed that when pairing, sometimes both sides start to push, and the other
side sends a PushRequest, and the two deadlock, neither doing anything.
(Timeout eventually breaks this.) So, let both run at the same time.
This commit is contained in:
Joey Hess 2012-11-11 15:42:03 -04:00
parent b44e8bb4a5
commit 217eeede43
5 changed files with 87 additions and 64 deletions

View file

@ -33,69 +33,68 @@ notifyNetMessagerRestart =
waitNetMessagerRestart :: Assistant () waitNetMessagerRestart :: Assistant ()
waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager) waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager)
getPushRunning :: Assistant PushRunning {- Runs an action that runs either the send or receive side of a push.
getPushRunning =
(atomically . readTMVar) <<~ (netMessagerPushRunning . netMessager)
{- Runs an action that runs either the send or receive end of a push.
- -
- While the push is running, netMessagesPush will get messages put into it - While the push is running, netMessagesPush will get messages put into it
- relating to this push, while any messages relating to other pushes - relating to this push, while any messages relating to other pushes
- go to netMessagesDeferred. Once the push finishes, those deferred - on the same side go to netMessagesDeferred. Once the push finishes,
- messages will be fed to handledeferred for processing. - those deferred messages will be fed to handledeferred for processing.
-} -}
runPush :: PushRunning -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a runPush :: PushSide -> ClientID -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a
runPush v handledeferred a = do runPush side clientid handledeferred a = do
nm <- getAssistant netMessager nm <- getAssistant netMessager
let pr = netMessagerPushRunning nm let runningv = getSide side $ netMessagerPushRunning nm
let setup = void $ atomically $ swapTMVar pr v let setup = void $ atomically $ swapTMVar runningv $ Just clientid
let cleanup = atomically $ do let cleanup = atomically $ do
void $ swapTMVar pr NoPushRunning void $ swapTMVar runningv Nothing
emptytchan (netMessagesPush nm) emptytchan (getSide side $ netMessagesPush nm)
r <- E.bracket_ setup cleanup <~> a r <- E.bracket_ setup cleanup <~> a
(void . forkIO) <~> processdeferred nm (void . forkIO) <~> processdeferred nm
return r return r
where where
emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c
processdeferred nm = do processdeferred nm = do
s <- liftIO $ atomically $ swapTMVar (netMessagesDeferredPush nm) S.empty s <- liftIO $ atomically $ swapTMVar (getSide side $ netMessagesPushDeferred nm) S.empty
mapM_ rundeferred (S.toList s) mapM_ rundeferred (S.toList s)
rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ())))) rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ()))))
<~> handledeferred m <~> handledeferred m
{- While a push is running, matching push messages are put into {- While a push is running, matching push messages are put into
- netMessagesPush, while others go to netMessagesDeferredPush. - netMessagesPush, while others that involve the same side go to
- netMessagesDeferredPush.
-
- When no push is running involving the same side, returns False.
-
- To avoid bloating memory, only messages that initiate pushes are - To avoid bloating memory, only messages that initiate pushes are
- deferred. - deferred.
-
- When no push is running, returns False.
-} -}
queueNetPushMessage :: NetMessage -> Assistant Bool queueNetPushMessage :: NetMessage -> Assistant Bool
queueNetPushMessage m = do queueNetPushMessage m@(Pushing clientid stage) = do
nm <- getAssistant netMessager nm <- getAssistant netMessager
liftIO $ atomically $ do liftIO $ atomically $ do
running <- readTMVar (netMessagerPushRunning nm) v <- readTMVar (getSide side $ netMessagerPushRunning nm)
case running of case v of
NoPushRunning -> return False Nothing -> return False
SendPushRunning runningcid -> do (Just runningclientid)
go nm m runningcid | runningclientid == clientid -> queue nm
return True | isPushInitiation stage -> defer nm
ReceivePushRunning runningcid -> do | otherwise -> discard
go nm m runningcid
return True
where where
go nm (Pushing cid stage) runningcid side = pushDestinationSide stage
| cid == runningcid = writeTChan (netMessagesPush nm) m queue nm = do
| isPushInitiation stage = defer nm writeTChan (getSide side $ netMessagesPush nm) m
| otherwise = noop return True
go _ _ _ = noop
defer nm = do defer nm = do
s <- takeTMVar (netMessagesDeferredPush nm) let mv = getSide side $ netMessagesPushDeferred nm
putTMVar (netMessagesDeferredPush nm) $ S.insert m s s <- takeTMVar mv
putTMVar mv $ S.insert m s
return True
discard = return True
queueNetPushMessage _ = return False
waitNetPushMessage :: Assistant (NetMessage) waitNetPushMessage :: PushSide -> Assistant (NetMessage)
waitNetPushMessage = (atomically . readTChan) <<~ (netMessagesPush . netMessager) waitNetPushMessage side = (atomically . readTChan)
<<~ (getSide side . netMessagesPush . netMessager)
{- Remotes using the XMPP transport have urls like xmpp::user@host -} {- Remotes using the XMPP transport have urls like xmpp::user@host -}
isXMPPRemote :: Remote -> Bool isXMPPRemote :: Remote -> Bool

View file

@ -99,7 +99,7 @@ xmppClient urlrenderer d = do
handle _ (GotNetMessage m@(Pushing _ pushstage)) handle _ (GotNetMessage m@(Pushing _ pushstage))
| isPushInitiation pushstage = inAssistant $ | isPushInitiation pushstage = inAssistant $
unlessM (queueNetPushMessage m) $ unlessM (queueNetPushMessage m) $
void $ forkIO <~> handlePushMessage m void $ forkIO <~> handlePushInitiation m
| otherwise = void $ inAssistant $ queueNetPushMessage m | otherwise = void $ inAssistant $ queueNetPushMessage m
handle _ (Ignorable _) = noop handle _ (Ignorable _) = noop
handle _ (Unknown _) = noop handle _ (Unknown _) = noop

View file

@ -14,7 +14,7 @@ import Data.Text (Text)
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.MSampleVar import Control.Concurrent.MSampleVar
import Data.ByteString (ByteString) import Data.ByteString (ByteString)
import Data.Set as S import qualified Data.Set as S
{- Messages that can be sent out of band by a network messager. -} {- Messages that can be sent out of band by a network messager. -}
data NetMessage data NetMessage
@ -47,32 +47,55 @@ data PushStage
| ReceivePackDone ExitCode | ReceivePackDone ExitCode
deriving (Show, Eq, Ord) deriving (Show, Eq, Ord)
data PushRunning = NoPushRunning | SendPushRunning ClientID | ReceivePushRunning ClientID {- Things that initiate either side of a push, but do not actually send data. -}
deriving (Eq)
isPushInitiation :: PushStage -> Bool isPushInitiation :: PushStage -> Bool
isPushInitiation CanPush = True isPushInitiation CanPush = True
isPushInitiation PushRequest = True isPushInitiation PushRequest = True
isPushInitiation StartingPush = True isPushInitiation StartingPush = True
isPushInitiation _ = False isPushInitiation _ = False
data PushSide = SendPack | ReceivePack
deriving (Eq, Ord)
pushDestinationSide :: PushStage -> PushSide
pushDestinationSide CanPush = ReceivePack
pushDestinationSide PushRequest = SendPack
pushDestinationSide StartingPush = ReceivePack
pushDestinationSide (ReceivePackOutput _) = SendPack
pushDestinationSide (SendPackOutput _) = ReceivePack
pushDestinationSide (ReceivePackDone _) = SendPack
type SideMap a = PushSide -> a
mkSideMap :: STM a -> IO (SideMap a)
mkSideMap gen = do
(sp, rp) <- atomically $ (,) <$> gen <*> gen
return $ lookupside sp rp
where
lookupside sp _ SendPack = sp
lookupside _ rp ReceivePack = rp
getSide :: PushSide -> SideMap a -> a
getSide side m = m side
data NetMessager = NetMessager data NetMessager = NetMessager
-- outgoing messages -- outgoing messages
{ netMessages :: TChan (NetMessage) { netMessages :: TChan (NetMessage)
-- only one push can be running at a time, and this tracks it
, netMessagerPushRunning :: TMVar (PushRunning)
-- incoming messages relating to the currently running push
, netMessagesPush :: TChan (NetMessage)
-- incoming push messages that have been deferred to be processed later
, netMessagesDeferredPush :: TMVar (S.Set NetMessage)
-- write to this to restart the net messager -- write to this to restart the net messager
, netMessagerRestart :: MSampleVar () , netMessagerRestart :: MSampleVar ()
-- only one side of a push can be running at a time
, netMessagerPushRunning :: SideMap (TMVar (Maybe ClientID))
-- incoming messages related to a running push
, netMessagesPush :: SideMap (TChan NetMessage)
-- incoming push messages, deferred to be processed later
, netMessagesPushDeferred :: SideMap (TMVar (S.Set NetMessage))
} }
newNetMessager :: IO NetMessager newNetMessager :: IO NetMessager
newNetMessager = NetMessager newNetMessager = NetMessager
<$> atomically newTChan <$> atomically newTChan
<*> atomically (newTMVar NoPushRunning)
<*> atomically newTChan
<*> atomically (newTMVar S.empty)
<*> newEmptySV <*> newEmptySV
<*> mkSideMap (newTMVar Nothing)
<*> mkSideMap newTChan
<*> mkSideMap (newTMVar S.empty)
where

View file

@ -74,7 +74,7 @@ makeXMPPGitRemote buddyname jid u = do
- We listen at the other end of the pipe and relay to and from XMPP. - We listen at the other end of the pipe and relay to and from XMPP.
-} -}
xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> Assistant Bool xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> Assistant Bool
xmppPush cid gitpush = runPush (SendPushRunning cid) handleDeferred $ do xmppPush cid gitpush = runPush SendPack cid handleDeferred $ do
sendNetMessage $ Pushing cid StartingPush sendNetMessage $ Pushing cid StartingPush
(Fd inf, writepush) <- liftIO createPipe (Fd inf, writepush) <- liftIO createPipe
@ -119,7 +119,7 @@ xmppPush cid gitpush = runPush (SendPushRunning cid) handleDeferred $ do
then liftIO $ killThread =<< myThreadId then liftIO $ killThread =<< myThreadId
else sendNetMessage $ Pushing cid $ SendPackOutput b else sendNetMessage $ Pushing cid $ SendPackOutput b
fromxmpp outh controlh = forever $ do fromxmpp outh controlh = forever $ do
m <- runTimeout xmppTimeout <~> waitNetPushMessage m <- runTimeout xmppTimeout <~> waitNetPushMessage SendPack
case m of case m of
(Right (Pushing _ (ReceivePackOutput b))) -> (Right (Pushing _ (ReceivePackOutput b))) ->
liftIO $ writeChunk outh b liftIO $ writeChunk outh b
@ -195,7 +195,7 @@ xmppGitRelay = do
{- Relays git receive-pack stdin and stdout via XMPP, as well as propigating {- Relays git receive-pack stdin and stdout via XMPP, as well as propigating
- its exit status to XMPP. -} - its exit status to XMPP. -}
xmppReceivePack :: ClientID -> Assistant Bool xmppReceivePack :: ClientID -> Assistant Bool
xmppReceivePack cid = runPush (ReceivePushRunning cid) handleDeferred $ do xmppReceivePack cid = runPush ReceivePack cid handleDeferred $ do
repodir <- liftAnnex $ fromRepo repoPath repodir <- liftAnnex $ fromRepo repoPath
let p = (proc "git" ["receive-pack", repodir]) let p = (proc "git" ["receive-pack", repodir])
{ std_in = CreatePipe { std_in = CreatePipe
@ -220,7 +220,7 @@ xmppReceivePack cid = runPush (ReceivePushRunning cid) handleDeferred $ do
sendNetMessage $ Pushing cid $ ReceivePackOutput b sendNetMessage $ Pushing cid $ ReceivePackOutput b
relaytoxmpp outh relaytoxmpp outh
relayfromxmpp inh = forever $ do relayfromxmpp inh = forever $ do
m <- runTimeout xmppTimeout <~> waitNetPushMessage m <- runTimeout xmppTimeout <~> waitNetPushMessage ReceivePack
case m of case m of
(Right (Pushing _ (SendPackOutput b))) -> (Right (Pushing _ (SendPackOutput b))) ->
liftIO $ writeChunk inh b liftIO $ writeChunk inh b
@ -246,12 +246,12 @@ xmppRemotes cid = case baseJID <$> parseJID cid of
whenXMPPRemote :: ClientID -> Assistant () -> Assistant () whenXMPPRemote :: ClientID -> Assistant () -> Assistant ()
whenXMPPRemote cid = unlessM (null <$> xmppRemotes cid) whenXMPPRemote cid = unlessM (null <$> xmppRemotes cid)
handlePushMessage :: NetMessage -> Assistant () handlePushInitiation :: NetMessage -> Assistant ()
handlePushMessage (Pushing cid CanPush) = handlePushInitiation (Pushing cid CanPush) =
whenXMPPRemote cid $ whenXMPPRemote cid $
sendNetMessage $ Pushing cid PushRequest sendNetMessage $ Pushing cid PushRequest
handlePushMessage (Pushing cid PushRequest) = handlePushInitiation (Pushing cid PushRequest) =
go =<< liftAnnex (inRepo Git.Branch.current) go =<< liftAnnex (inRepo Git.Branch.current)
where where
go Nothing = noop go Nothing = noop
@ -265,13 +265,13 @@ handlePushMessage (Pushing cid PushRequest) =
debug ["pushing to", show rs] debug ["pushing to", show rs]
forM_ rs $ \r -> xmppPush cid $ pushFallback u branch r forM_ rs $ \r -> xmppPush cid $ pushFallback u branch r
handlePushMessage (Pushing cid StartingPush) = handlePushInitiation (Pushing cid StartingPush) =
whenXMPPRemote cid $ whenXMPPRemote cid $
void $ xmppReceivePack cid void $ xmppReceivePack cid
handlePushMessage _ = noop handlePushInitiation _ = noop
handleDeferred :: NetMessage -> Assistant () handleDeferred :: NetMessage -> Assistant ()
handleDeferred = handlePushMessage handleDeferred = handlePushInitiation
writeChunk :: Handle -> B.ByteString -> IO () writeChunk :: Handle -> B.ByteString -> IO ()
writeChunk h b = do writeChunk h b = do

View file

@ -11,9 +11,10 @@ who share a repository, that is stored in the [[cloud]].
See <http://git-annex.branchable.com/design/assistant/blog/day_114__xmpp/#comment-aaba579f92cb452caf26ac53071a6788> See <http://git-annex.branchable.com/design/assistant/blog/day_114__xmpp/#comment-aaba579f92cb452caf26ac53071a6788>
* Assistant.Sync.manualPull doesn't handle XMPP remotes yet. * Assistant.Sync.manualPull doesn't handle XMPP remotes yet.
This is needed to handle getting back in sync after reconnection. This is needed to handle getting back in sync after reconnection.
* If a git push over XMPP is underway, and the remote client stops * When pairing, sometimes both sides start to push, and the other side
responding, it currently blocks other git pushes from starting, forever. sends a PushRequest, and the two deadlock, neither doing anything.
Need a timeout.. (Timeout eventually breaks this.)
Maybe should allow one push and one receive-pack at a time?
## design goals ## design goals