per-client inboxes for push messages
This will avoid losing any messages received from 1 client when a push involving another client is running. Additionally, the handling of push initiation is improved, it's no longer allowed to run multiples of the same type of push to the same client. Still stalls sometimes :(
This commit is contained in:
parent
df3203ec62
commit
9efde46cdd
4 changed files with 108 additions and 76 deletions
|
@ -1,21 +1,23 @@
|
||||||
{- git-annex assistant out of band network messager interface
|
{- git-annex assistant out of band network messager interface
|
||||||
-
|
-
|
||||||
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
- Copyright 2012-2013 Joey Hess <joey@kitenet.net>
|
||||||
-
|
-
|
||||||
- Licensed under the GNU GPL version 3 or higher.
|
- Licensed under the GNU GPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
|
||||||
|
{-# LANGUAGE BangPatterns #-}
|
||||||
|
|
||||||
module Assistant.NetMessager where
|
module Assistant.NetMessager where
|
||||||
|
|
||||||
import Assistant.Common
|
import Assistant.Common
|
||||||
import Assistant.Types.NetMessager
|
import Assistant.Types.NetMessager
|
||||||
|
|
||||||
import Control.Concurrent
|
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
import Control.Concurrent.MSampleVar
|
import Control.Concurrent.MSampleVar
|
||||||
import Control.Exception as E
|
import Control.Exception as E
|
||||||
import qualified Data.Set as S
|
import qualified Data.Set as S
|
||||||
import qualified Data.Map as M
|
import qualified Data.Map as M
|
||||||
|
import qualified Data.DList as D
|
||||||
|
|
||||||
sendNetMessage :: NetMessage -> Assistant ()
|
sendNetMessage :: NetMessage -> Assistant ()
|
||||||
sendNetMessage m =
|
sendNetMessage m =
|
||||||
|
@ -73,60 +75,94 @@ checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager
|
||||||
- relating to this push, while any messages relating to other pushes
|
- relating to this push, while any messages relating to other pushes
|
||||||
- on the same side go to netMessagesDeferred. Once the push finishes,
|
- on the same side go to netMessagesDeferred. Once the push finishes,
|
||||||
- those deferred messages will be fed to handledeferred for processing.
|
- those deferred messages will be fed to handledeferred for processing.
|
||||||
|
-
|
||||||
|
- If this is called when a push of the same side is running, it will
|
||||||
|
- block until that push completes, and then run.
|
||||||
-}
|
-}
|
||||||
runPush :: PushSide -> ClientID -> (NetMessage -> Assistant ()) -> Assistant a -> Assistant a
|
runPush :: PushSide -> ClientID -> Assistant a -> Assistant a
|
||||||
runPush side clientid handledeferred a = do
|
runPush side clientid a = do
|
||||||
nm <- getAssistant netMessager
|
nm <- getAssistant netMessager
|
||||||
let runningv = getSide side $ netMessagerPushRunning nm
|
let v = getSide side $ netMessagerPushRunning nm
|
||||||
let setup = void $ atomically $ swapTMVar runningv $ Just clientid
|
debugmsg <- asIO1 $ \s -> netMessagerDebug clientid [s, show side]
|
||||||
let cleanup = atomically $ do
|
let setup = do
|
||||||
void $ swapTMVar runningv Nothing
|
debugmsg "preparing to run"
|
||||||
emptytchan (getSide side $ netMessagesPush nm)
|
atomically $ ifM (isNothing <$> tryReadTMVar v)
|
||||||
|
( putTMVar v clientid
|
||||||
|
, retry
|
||||||
|
)
|
||||||
|
debugmsg "started running"
|
||||||
|
let cleanup = do
|
||||||
|
debugmsg "finished running"
|
||||||
|
atomically $ takeTMVar v
|
||||||
r <- E.bracket_ setup cleanup <~> a
|
r <- E.bracket_ setup cleanup <~> a
|
||||||
(void . forkIO) <~> processdeferred nm
|
{- Empty the inbox, because stuff may have been left in it
|
||||||
|
- if the push failed. -}
|
||||||
|
emptyInbox clientid side
|
||||||
return r
|
return r
|
||||||
where
|
|
||||||
emptytchan c = maybe noop (const $ emptytchan c) =<< tryReadTChan c
|
|
||||||
processdeferred nm = do
|
|
||||||
s <- liftIO $ atomically $ swapTMVar (getSide side $ netMessagesPushDeferred nm) S.empty
|
|
||||||
mapM_ rundeferred (S.toList s)
|
|
||||||
rundeferred m = (void . (E.try :: (IO () -> IO (Either SomeException ()))))
|
|
||||||
<~> handledeferred m
|
|
||||||
|
|
||||||
{- While a push is running, matching push messages are put into
|
{- Stores messages for a push into the appropriate inbox.
|
||||||
- netMessagesPush, while others that involve the same side go to
|
|
||||||
- netMessagesPushDeferred.
|
|
||||||
-
|
-
|
||||||
- When no push is running involving the same side, returns False.
|
- To avoid overflow, only 1000 messages max are stored in any
|
||||||
|
- inbox, which should be far more than necessary.
|
||||||
-
|
-
|
||||||
- To avoid bloating memory, only messages that initiate pushes are
|
- TODO: If we have more than 100 inboxes for different clients,
|
||||||
- deferred.
|
- discard old ones that are not currently being used by any push.
|
||||||
-}
|
-}
|
||||||
queueNetPushMessage :: NetMessage -> Assistant Bool
|
storeInbox :: NetMessage -> Assistant ()
|
||||||
queueNetPushMessage m@(Pushing clientid stage) = do
|
storeInbox msg@(Pushing clientid stage) = do
|
||||||
nm <- getAssistant netMessager
|
inboxes <- getInboxes side
|
||||||
liftIO $ atomically $ do
|
stored <- liftIO $ atomically $ do
|
||||||
v <- readTMVar (getSide side $ netMessagerPushRunning nm)
|
m <- readTVar inboxes
|
||||||
case v of
|
let update = \v -> do
|
||||||
Nothing -> return False
|
writeTVar inboxes $
|
||||||
(Just runningclientid)
|
M.insertWith' const clientid v m
|
||||||
| isPushInitiation stage -> defer nm
|
return True
|
||||||
| runningclientid == clientid -> queue nm
|
case M.lookup clientid m of
|
||||||
| otherwise -> discard
|
Nothing -> update (1, tostore)
|
||||||
|
Just (sz, l)
|
||||||
|
| sz > 1000 -> return False
|
||||||
|
| otherwise ->
|
||||||
|
let !sz' = sz + 1
|
||||||
|
!l' = D.append l tostore
|
||||||
|
in update (sz', l')
|
||||||
|
if stored
|
||||||
|
then netMessagerDebug clientid ["stored", logNetMessage msg, "in", show side, "inbox"]
|
||||||
|
else netMessagerDebug clientid ["discarded", logNetMessage msg, "; ", show side, "inbox is full"]
|
||||||
where
|
where
|
||||||
side = pushDestinationSide stage
|
side = pushDestinationSide stage
|
||||||
queue nm = do
|
tostore = D.singleton msg
|
||||||
writeTChan (getSide side $ netMessagesPush nm) m
|
storeInbox _ = noop
|
||||||
return True
|
|
||||||
defer nm = do
|
|
||||||
let mv = getSide side $ netMessagesPushDeferred nm
|
|
||||||
s <- takeTMVar mv
|
|
||||||
putTMVar mv $ S.insert m s
|
|
||||||
return True
|
|
||||||
discard = return True
|
|
||||||
queueNetPushMessage _ = return False
|
|
||||||
|
|
||||||
waitNetPushMessage :: PushSide -> Assistant (NetMessage)
|
{- Gets the new message for a push from its inbox.
|
||||||
waitNetPushMessage side = (atomically . readTChan)
|
- Blocks until a message has been received. -}
|
||||||
<<~ (getSide side . netMessagesPush . netMessager)
|
waitInbox :: ClientID -> PushSide -> Assistant (NetMessage)
|
||||||
|
waitInbox clientid side = do
|
||||||
|
inboxes <- getInboxes side
|
||||||
|
liftIO $ atomically $ do
|
||||||
|
m <- readTVar inboxes
|
||||||
|
case M.lookup clientid m of
|
||||||
|
Nothing -> retry
|
||||||
|
Just (sz, dl)
|
||||||
|
| sz < 1 -> retry
|
||||||
|
| otherwise -> do
|
||||||
|
let msg = D.head dl
|
||||||
|
let dl' = D.tail dl
|
||||||
|
let !sz' = sz - 1
|
||||||
|
writeTVar inboxes $
|
||||||
|
M.insertWith' const clientid (sz', dl') m
|
||||||
|
return msg
|
||||||
|
|
||||||
|
emptyInbox :: ClientID -> PushSide -> Assistant ()
|
||||||
|
emptyInbox clientid side = do
|
||||||
|
inboxes <- getInboxes side
|
||||||
|
liftIO $ atomically $
|
||||||
|
modifyTVar' inboxes $
|
||||||
|
M.delete clientid
|
||||||
|
|
||||||
|
getInboxes :: PushSide -> Assistant Inboxes
|
||||||
|
getInboxes side =
|
||||||
|
getSide side . netMessagesInboxes <$> getAssistant netMessager
|
||||||
|
|
||||||
|
netMessagerDebug :: ClientID -> [String] -> Assistant ()
|
||||||
|
netMessagerDebug clientid l = debug $
|
||||||
|
"NetMessager" : l ++ [show $ logClientID clientid]
|
||||||
|
|
|
@ -108,11 +108,10 @@ xmppClient urlrenderer d creds =
|
||||||
maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c)
|
maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c)
|
||||||
handle _ (GotNetMessage m@(Pushing _ pushstage))
|
handle _ (GotNetMessage m@(Pushing _ pushstage))
|
||||||
| isPushNotice pushstage = inAssistant $ handlePushNotice m
|
| isPushNotice pushstage = inAssistant $ handlePushNotice m
|
||||||
| isPushInitiation pushstage = inAssistant $
|
| isPushInitiation pushstage = inAssistant $ do
|
||||||
unlessM (queueNetPushMessage m) $ do
|
let checker = checkCloudRepos urlrenderer
|
||||||
let checker = checkCloudRepos urlrenderer
|
void $ forkIO <~> handlePushInitiation checker m
|
||||||
void $ forkIO <~> handlePushInitiation checker m
|
| otherwise = void $ inAssistant $ storeInbox m
|
||||||
| otherwise = void $ inAssistant $ queueNetPushMessage m
|
|
||||||
handle _ (Ignorable _) = noop
|
handle _ (Ignorable _) = noop
|
||||||
handle _ (Unknown _) = noop
|
handle _ (Unknown _) = noop
|
||||||
handle _ (ProtocolError _) = noop
|
handle _ (ProtocolError _) = noop
|
||||||
|
|
|
@ -18,6 +18,7 @@ import Data.Text (Text)
|
||||||
import qualified Data.Text as T
|
import qualified Data.Text as T
|
||||||
import qualified Data.Set as S
|
import qualified Data.Set as S
|
||||||
import qualified Data.Map as M
|
import qualified Data.Map as M
|
||||||
|
import qualified Data.DList as D
|
||||||
|
|
||||||
{- Messages that can be sent out of band by a network messager. -}
|
{- Messages that can be sent out of band by a network messager. -}
|
||||||
data NetMessage
|
data NetMessage
|
||||||
|
@ -117,6 +118,8 @@ mkSideMap gen = do
|
||||||
getSide :: PushSide -> SideMap a -> a
|
getSide :: PushSide -> SideMap a -> a
|
||||||
getSide side m = m side
|
getSide side m = m side
|
||||||
|
|
||||||
|
type Inboxes = TVar (M.Map ClientID (Int, D.DList NetMessage))
|
||||||
|
|
||||||
data NetMessager = NetMessager
|
data NetMessager = NetMessager
|
||||||
-- outgoing messages
|
-- outgoing messages
|
||||||
{ netMessages :: TChan NetMessage
|
{ netMessages :: TChan NetMessage
|
||||||
|
@ -127,11 +130,11 @@ data NetMessager = NetMessager
|
||||||
-- write to this to restart the net messager
|
-- write to this to restart the net messager
|
||||||
, netMessagerRestart :: MSampleVar ()
|
, netMessagerRestart :: MSampleVar ()
|
||||||
-- only one side of a push can be running at a time
|
-- only one side of a push can be running at a time
|
||||||
, netMessagerPushRunning :: SideMap (TMVar (Maybe ClientID))
|
-- the TMVars are empty when nothing is running
|
||||||
-- incoming messages related to a running push
|
, netMessagerPushRunning :: SideMap (TMVar ClientID)
|
||||||
, netMessagesPush :: SideMap (TChan NetMessage)
|
-- incoming messages containing data for a push,
|
||||||
-- incoming push messages, deferred to be processed later
|
-- on a per-client and per-side basis
|
||||||
, netMessagesPushDeferred :: SideMap (TMVar (S.Set NetMessage))
|
, netMessagesInboxes :: SideMap Inboxes
|
||||||
}
|
}
|
||||||
|
|
||||||
newNetMessager :: IO NetMessager
|
newNetMessager :: IO NetMessager
|
||||||
|
@ -140,6 +143,5 @@ newNetMessager = NetMessager
|
||||||
<*> atomically (newTMVar M.empty)
|
<*> atomically (newTMVar M.empty)
|
||||||
<*> atomically (newTMVar M.empty)
|
<*> atomically (newTMVar M.empty)
|
||||||
<*> newEmptySV
|
<*> newEmptySV
|
||||||
<*> mkSideMap (newTMVar Nothing)
|
<*> mkSideMap newEmptyTMVar
|
||||||
<*> mkSideMap newTChan
|
<*> mkSideMap (newTVar M.empty)
|
||||||
<*> mkSideMap (newTMVar S.empty)
|
|
||||||
|
|
|
@ -99,8 +99,8 @@ makeXMPPGitRemote buddyname jid u = do
|
||||||
-
|
-
|
||||||
- We listen at the other end of the pipe and relay to and from XMPP.
|
- We listen at the other end of the pipe and relay to and from XMPP.
|
||||||
-}
|
-}
|
||||||
xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> (NetMessage -> Assistant ()) -> Assistant Bool
|
xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> Assistant Bool
|
||||||
xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
|
xmppPush cid gitpush = runPush SendPack cid $ do
|
||||||
u <- liftAnnex getUUID
|
u <- liftAnnex getUUID
|
||||||
sendNetMessage $ Pushing cid (StartingPush u)
|
sendNetMessage $ Pushing cid (StartingPush u)
|
||||||
|
|
||||||
|
@ -149,7 +149,7 @@ xmppPush cid gitpush handledeferred = runPush SendPack cid handledeferred $ do
|
||||||
SendPackOutput seqnum' b
|
SendPackOutput seqnum' b
|
||||||
toxmpp seqnum' inh
|
toxmpp seqnum' inh
|
||||||
|
|
||||||
fromxmpp outh controlh = withPushMessagesInSequence SendPack handle
|
fromxmpp outh controlh = withPushMessagesInSequence cid SendPack handle
|
||||||
where
|
where
|
||||||
handle (Just (Pushing _ (ReceivePackOutput _ b))) =
|
handle (Just (Pushing _ (ReceivePackOutput _ b))) =
|
||||||
liftIO $ writeChunk outh b
|
liftIO $ writeChunk outh b
|
||||||
|
@ -236,8 +236,8 @@ xmppGitRelay = do
|
||||||
|
|
||||||
{- Relays git receive-pack stdin and stdout via XMPP, as well as propigating
|
{- Relays git receive-pack stdin and stdout via XMPP, as well as propigating
|
||||||
- its exit status to XMPP. -}
|
- its exit status to XMPP. -}
|
||||||
xmppReceivePack :: ClientID -> (NetMessage -> Assistant ()) -> Assistant Bool
|
xmppReceivePack :: ClientID -> Assistant Bool
|
||||||
xmppReceivePack cid handledeferred = runPush ReceivePack cid handledeferred $ do
|
xmppReceivePack cid = runPush ReceivePack cid $ do
|
||||||
repodir <- liftAnnex $ fromRepo repoPath
|
repodir <- liftAnnex $ fromRepo repoPath
|
||||||
let p = (proc "git" ["receive-pack", repodir])
|
let p = (proc "git" ["receive-pack", repodir])
|
||||||
{ std_in = CreatePipe
|
{ std_in = CreatePipe
|
||||||
|
@ -262,7 +262,7 @@ xmppReceivePack cid handledeferred = runPush ReceivePack cid handledeferred $ do
|
||||||
let seqnum' = succ seqnum
|
let seqnum' = succ seqnum
|
||||||
sendNetMessage $ Pushing cid $ ReceivePackOutput seqnum' b
|
sendNetMessage $ Pushing cid $ ReceivePackOutput seqnum' b
|
||||||
relaytoxmpp seqnum' outh
|
relaytoxmpp seqnum' outh
|
||||||
relayfromxmpp inh = withPushMessagesInSequence ReceivePack handle
|
relayfromxmpp inh = withPushMessagesInSequence cid ReceivePack handle
|
||||||
where
|
where
|
||||||
handle (Just (Pushing _ (SendPackOutput _ b))) =
|
handle (Just (Pushing _ (SendPackOutput _ b))) =
|
||||||
liftIO $ writeChunk inh b
|
liftIO $ writeChunk inh b
|
||||||
|
@ -301,15 +301,13 @@ handlePushInitiation checkcloudrepos (Pushing cid (PushRequest theiruuid)) =
|
||||||
selfjid <- ((T.unpack <$>) . xmppClientID) <$> getDaemonStatus
|
selfjid <- ((T.unpack <$>) . xmppClientID) <$> getDaemonStatus
|
||||||
forM_ rs $ \r -> do
|
forM_ rs $ \r -> do
|
||||||
void $ alertWhile (syncAlert [r]) $
|
void $ alertWhile (syncAlert [r]) $
|
||||||
xmppPush cid
|
xmppPush cid (taggedPush u selfjid branch r)
|
||||||
(taggedPush u selfjid branch r)
|
|
||||||
(handleDeferred checkcloudrepos)
|
|
||||||
checkcloudrepos r
|
checkcloudrepos r
|
||||||
handlePushInitiation checkcloudrepos (Pushing cid (StartingPush theiruuid)) = do
|
handlePushInitiation checkcloudrepos (Pushing cid (StartingPush theiruuid)) = do
|
||||||
rs <- xmppRemotes cid theiruuid
|
rs <- xmppRemotes cid theiruuid
|
||||||
unless (null rs) $ do
|
unless (null rs) $ do
|
||||||
void $ alertWhile (syncAlert rs) $
|
void $ alertWhile (syncAlert rs) $
|
||||||
xmppReceivePack cid (handleDeferred checkcloudrepos)
|
xmppReceivePack cid
|
||||||
mapM_ checkcloudrepos rs
|
mapM_ checkcloudrepos rs
|
||||||
handlePushInitiation _ _ = noop
|
handlePushInitiation _ _ = noop
|
||||||
|
|
||||||
|
@ -320,9 +318,6 @@ handlePushNotice (Pushing cid (CanPush theiruuid)) =
|
||||||
sendNetMessage $ Pushing cid (PushRequest u)
|
sendNetMessage $ Pushing cid (PushRequest u)
|
||||||
handlePushNotice _ = noop
|
handlePushNotice _ = noop
|
||||||
|
|
||||||
handleDeferred :: (Remote -> Assistant ()) -> NetMessage -> Assistant ()
|
|
||||||
handleDeferred checkcloudrepos m = handlePushInitiation checkcloudrepos m
|
|
||||||
|
|
||||||
writeChunk :: Handle -> B.ByteString -> IO ()
|
writeChunk :: Handle -> B.ByteString -> IO ()
|
||||||
writeChunk h b = do
|
writeChunk h b = do
|
||||||
B.hPut h b
|
B.hPut h b
|
||||||
|
@ -335,11 +330,11 @@ writeChunk h b = do
|
||||||
- Does not currently reorder messages, but does ensure that any
|
- Does not currently reorder messages, but does ensure that any
|
||||||
- duplicate messages, or messages not in the sequence, are discarded.
|
- duplicate messages, or messages not in the sequence, are discarded.
|
||||||
-}
|
-}
|
||||||
withPushMessagesInSequence :: PushSide -> (Maybe NetMessage -> Assistant ()) -> Assistant ()
|
withPushMessagesInSequence :: ClientID -> PushSide -> (Maybe NetMessage -> Assistant ()) -> Assistant ()
|
||||||
withPushMessagesInSequence side a = loop 0
|
withPushMessagesInSequence cid side a = loop 0
|
||||||
where
|
where
|
||||||
loop seqnum = do
|
loop seqnum = do
|
||||||
m <- timeout xmppTimeout <~> waitNetPushMessage side
|
m <- timeout xmppTimeout <~> waitInbox cid side
|
||||||
let go s = a m >> loop s
|
let go s = a m >> loop s
|
||||||
case extractSequence =<< m of
|
case extractSequence =<< m of
|
||||||
Just seqnum'
|
Just seqnum'
|
||||||
|
|
Loading…
Add table
Reference in a new issue