git-annex/Assistant/NetMessager.hs
Joey Hess 14d96b8e06 XMPP: Be better at responding to CanPush messages when busy with something else.
Observed: With 2 xmpp clients, one would sometimes stop responding
to CanPush messages. Often it was in the middle of a receive-pack
of its own (or was waiting for a failed one to time out).

Now these are always immediately responded to, which is fine; the point
of CanPush is to find out if there's another client out there that's
interested in our push.

Also, in queueNetPushMessage, queue push initiation messages when
we're already running the side of the push they would initiate.
Before, these messages were sent into the netMessagesPush channel,
which was wrong. The xmpp send-pack and receive-pack code discarded
such messages.

This still doesn't make XMPP push 100% robust. In testing, I am seeing
it sometimes try to run two send-packs, or two receive-packs at once
to the same client (probably because the client sent two requests).

Also, I'm seeing rather a lot of cases where it stalls out until it
runs into the 120 second timeout and cancels a push.

And finally, there seems to be a bug in runPush. I have logs that
show it running its setup action, but never its cleanup action.
How is this possible given its use of E.bracket? Either some exception
is finding its way through, or the action somehow stalls forever.
When this happens, one of the 2 clients stops syncing.
2013-05-21 00:59:38 -04:00

132 lines
4.9 KiB
Haskell

{- git-annex assistant out of band network messager interface
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.NetMessager where
import Assistant.Common
import Assistant.Types.NetMessager
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.MSampleVar
import Control.Exception as E
import qualified Data.Set as S
import qualified Data.Map as M
sendNetMessage :: NetMessage -> Assistant ()
sendNetMessage m =
(atomically . flip writeTChan m) <<~ (netMessages . netMessager)
waitNetMessage :: Assistant (NetMessage)
waitNetMessage = (atomically . readTChan) <<~ (netMessages . netMessager)
notifyNetMessagerRestart :: Assistant ()
notifyNetMessagerRestart =
flip writeSV () <<~ (netMessagerRestart . netMessager)
waitNetMessagerRestart :: Assistant ()
waitNetMessagerRestart = readSV <<~ (netMessagerRestart . netMessager)
{- Store an important NetMessage for a client, and if the same message was
- already sent, remove it from sentImportantNetMessages. -}
storeImportantNetMessage :: NetMessage -> ClientID -> (ClientID -> Bool) -> Assistant ()
storeImportantNetMessage m client matchingclient = go <<~ netMessager
where
go nm = atomically $ do
q <- takeTMVar $ importantNetMessages nm
sent <- takeTMVar $ sentImportantNetMessages nm
putTMVar (importantNetMessages nm) $
M.alter (Just . maybe (S.singleton m) (S.insert m)) client q
putTMVar (sentImportantNetMessages nm) $
M.mapWithKey removematching sent
removematching someclient s
| matchingclient someclient = S.delete m s
| otherwise = s
{- Indicates that an important NetMessage has been sent to a client. -}
sentImportantNetMessage :: NetMessage -> ClientID -> Assistant ()
sentImportantNetMessage m client = go <<~ (sentImportantNetMessages . netMessager)
where
go v = atomically $ do
sent <- takeTMVar v
putTMVar v $
M.alter (Just . maybe (S.singleton m) (S.insert m)) client sent
{- Checks for important NetMessages that have been stored for a client, and
- sent to a client. Typically the same client for both, although
- a modified or more specific client may need to be used. -}
checkImportantNetMessages :: (ClientID, ClientID) -> Assistant (S.Set NetMessage, S.Set NetMessage)
checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager
where
go nm = atomically $ do
stored <- M.lookup storedclient <$> (readTMVar $ importantNetMessages nm)
sent <- M.lookup sentclient <$> (readTMVar $ sentImportantNetMessages nm)
return (fromMaybe S.empty stored, fromMaybe S.empty sent)
{- Runs an action that runs either the send or receive side of a push.
-
- While the push is running, netMessagesPush will get messages put into it
- relating to this push, while any messages relating to other pushes
- on the same side go to netMessagesDeferred. Once the push finishes,
- those deferred messages will be fed to handledeferred for processing.
-}
runPush :: PushSide -> ClientID -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a
runPush side clientid handledeferred a = do
nm <- getAssistant netMessager
let runningv = getSide side $ netMessagerPushRunning nm
let setup = void $ atomically $ swapTMVar runningv $ Just clientid
let cleanup = atomically $ do
void $ swapTMVar runningv Nothing
emptytchan (getSide side $ netMessagesPush nm)
r <- E.bracket_ setup cleanup <~> a
(void . forkIO) <~> processdeferred nm
return r
where
emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c
processdeferred nm = do
s <- liftIO $ atomically $ swapTMVar (getSide side $ netMessagesPushDeferred nm) S.empty
mapM_ rundeferred (S.toList s)
rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ()))))
<~> handledeferred m
{- While a push is running, matching push messages are put into
- netMessagesPush, while others that involve the same side go to
- netMessagesPushDeferred.
-
- When no push is running involving the same side, returns False.
-
- To avoid bloating memory, only messages that initiate pushes are
- deferred.
-}
queueNetPushMessage :: NetMessage -> Assistant Bool
queueNetPushMessage m@(Pushing clientid stage) = do
nm <- getAssistant netMessager
liftIO $ atomically $ do
v <- readTMVar (getSide side $ netMessagerPushRunning nm)
case v of
Nothing -> return False
(Just runningclientid)
| isPushInitiation stage -> defer nm
| runningclientid == clientid -> queue nm
| otherwise -> discard
where
side = pushDestinationSide stage
queue nm = do
writeTChan (getSide side $ netMessagesPush nm) m
return True
defer nm = do
let mv = getSide side $ netMessagesPushDeferred nm
s <- takeTMVar mv
putTMVar mv $ S.insert m s
return True
discard = return True
queueNetPushMessage _ = return False
waitNetPushMessage :: PushSide -> Assistant (NetMessage)
waitNetPushMessage side = (atomically . readTChan)
<<~ (getSide side . netMessagesPush . netMessager)