add two long-running XMPP push threads, no more inversion of control

I hope this will be easier to reason about, and less buggy. It was
certianly easier to write!

An immediate benefit is that with a traversable queue of push requests to
select from, the threads can be a lot fairer about choosing which client to
service next.
This commit is contained in:
Joey Hess 2013-05-22 15:13:31 -04:00
parent 52371274f0
commit e2b67e0bc4
7 changed files with 166 additions and 107 deletions

View file

@ -37,6 +37,7 @@ import Assistant.Threads.PairListener
#endif #endif
#ifdef WITH_XMPP #ifdef WITH_XMPP
import Assistant.Threads.XMPPClient import Assistant.Threads.XMPPClient
import Assistant.Threads.XMPPPusher
#endif #endif
#else #else
#warning Building without the webapp. You probably need to install Yesod.. #warning Building without the webapp. You probably need to install Yesod..
@ -111,6 +112,8 @@ startDaemon assistant foreground listenhost startbrowser = do
#endif #endif
#ifdef WITH_XMPP #ifdef WITH_XMPP
, assist $ xmppClientThread urlrenderer , assist $ xmppClientThread urlrenderer
, assist $ xmppSendPackThread urlrenderer
, assist $ xmppReceivePackThread urlrenderer
#endif #endif
#endif #endif
, assist $ pushThread , assist $ pushThread

View file

@ -14,7 +14,6 @@ import Assistant.Types.NetMessager
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.MSampleVar import Control.Concurrent.MSampleVar
import Control.Exception as E
import qualified Data.Set as S import qualified Data.Set as S
import qualified Data.Map as M import qualified Data.Map as M
import qualified Data.DList as D import qualified Data.DList as D
@ -69,73 +68,39 @@ checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager
sent <- M.lookup sentclient <$> (readTMVar $ sentImportantNetMessages nm) sent <- M.lookup sentclient <$> (readTMVar $ sentImportantNetMessages nm)
return (fromMaybe S.empty stored, fromMaybe S.empty sent) return (fromMaybe S.empty stored, fromMaybe S.empty sent)
{- Runs an action that runs either the send or receive side of a push. {- Queues a push initiation message in the queue for the appropriate
- Only one such action per side can run at a time. Other pushes, for - side of the push but only if there is not already an initiation message
- the same, or other clients, need to wait their turn. - from the same client in the queue. -}
- queuePushInitiation :: NetMessage -> Assistant ()
- Only one push is allowed to wait per client. queuePushInitiation msg@(Pushing clientid stage) = do
- There is no point in building up more. tv <- getPushInitiationQueue side
- liftIO $ atomically $ do
- While the push is running, netMessagesPush will get messages put into it r <- tryTakeTMVar tv
- relating to this push, while any messages relating to other pushes case r of
- on the same side go to netMessagesDeferred. Once the push finishes, Nothing -> putTMVar tv [msg]
- those deferred messages will be fed to handledeferred for processing. Just l -> do
-} let !l' = msg : filter differentclient l
runPush :: PushSide -> ClientID -> Assistant Bool -> Assistant Bool putTMVar tv l'
runPush side clientid a = do
debugmsg "preparing to run"
nm <- getAssistant netMessager
let setup = do
(canrun, release) <- atomically $ checkcanrun nm
if canrun
then atomically $ waittorun nm release
else return (False, noop)
let cleanup (_, release) = atomically release
go <- asIO1 $ \(run, _) ->
if run
then do
debugmsg "started running"
r <- a
debugmsg "finished running"
{- Empty the inbox, because stuff may have
- been left in it if the push failed. -}
emptyInbox clientid side
return r
else do
debugmsg "skipping running"
return False
r <- liftIO $ E.bracket setup cleanup go
return r
where where
debugmsg s = netMessagerDebug clientid [s, show side] side = pushDestinationSide stage
-- check that this is one of the two threads allowed differentclient (Pushing cid _) = cid /= clientid
-- to run at the same time, pushing to the same client differentclient _ = True
-- on the same side queuePushInitiation _ = noop
checkcanrun nm = do
let v = getSide side $ netMessagerPushThreadCount nm {- Waits for a push inititation message to be received, and runs
m <- readTVar v - function to select a message from the queue. -}
case M.lookup clientid m of waitPushInitiation :: PushSide -> ([NetMessage] -> (NetMessage, [NetMessage])) -> Assistant NetMessage
Just count waitPushInitiation side selector = do
| count > 2 -> return (False, noop) tv <- getPushInitiationQueue side
_ -> do liftIO $ atomically $ do
writeTVar v $ q <- takeTMVar tv
M.insertWith' (+) clientid 1 m if null q
let release = modifyTVar' v $ then retry
M.insertWith' (-) clientid 1 else do
return (True, release) let (msg, !q') = selector q
-- block until this is the only thread performing unless (null q') $
-- a push on this side, to any client putTMVar tv q'
waittorun nm release = do return msg
let v = getSide side $ netMessagerPushRunning nm
ifM (isNothing <$> tryReadTMVar v)
( do
putTMVar v clientid
let release' = do
void $ takeTMVar v
release
return (True, release')
, retry
)
{- Stores messages for a push into the appropriate inbox. {- Stores messages for a push into the appropriate inbox.
- -
@ -198,7 +163,11 @@ emptyInbox clientid side = do
getInboxes :: PushSide -> Assistant Inboxes getInboxes :: PushSide -> Assistant Inboxes
getInboxes side = getInboxes side =
getSide side . netMessagesInboxes <$> getAssistant netMessager getSide side . netMessagerInboxes <$> getAssistant netMessager
getPushInitiationQueue :: PushSide -> Assistant (TMVar [NetMessage])
getPushInitiationQueue side =
getSide side . netMessagerPushInitiations <$> getAssistant netMessager
netMessagerDebug :: ClientID -> [String] -> Assistant () netMessagerDebug :: ClientID -> [String] -> Assistant ()
netMessagerDebug clientid l = debug $ netMessagerDebug clientid l = debug $

View file

@ -20,7 +20,6 @@ import qualified Remote
import Utility.ThreadScheduler import Utility.ThreadScheduler
import Assistant.WebApp (UrlRenderer) import Assistant.WebApp (UrlRenderer)
import Assistant.WebApp.Types hiding (liftAssistant) import Assistant.WebApp.Types hiding (liftAssistant)
import Assistant.WebApp.Configurators.XMPP (checkCloudRepos)
import Assistant.Alert import Assistant.Alert
import Assistant.Pairing import Assistant.Pairing
import Assistant.XMPP.Git import Assistant.XMPP.Git
@ -108,10 +107,8 @@ xmppClient urlrenderer d creds =
maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c) maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c)
handle _ (GotNetMessage m@(Pushing _ pushstage)) handle _ (GotNetMessage m@(Pushing _ pushstage))
| isPushNotice pushstage = inAssistant $ handlePushNotice m | isPushNotice pushstage = inAssistant $ handlePushNotice m
| isPushInitiation pushstage = inAssistant $ do | isPushInitiation pushstage = inAssistant $ queuePushInitiation m
let checker = checkCloudRepos urlrenderer | otherwise = inAssistant $ storeInbox m
void $ forkIO <~> handlePushInitiation checker m
| otherwise = void $ inAssistant $ storeInbox m
handle _ (Ignorable _) = noop handle _ (Ignorable _) = noop
handle _ (Unknown _) = noop handle _ (Unknown _) = noop
handle _ (ProtocolError _) = noop handle _ (ProtocolError _) = noop

View file

@ -0,0 +1,81 @@
{- git-annex XMPP pusher threads
-
- This is a pair of threads. One handles git send-pack,
- and the other git receive-pack. Each thread can be running at most
- one such operation at a time.
-
- Why not use a single thread? Consider two clients A and B.
- If both decide to run a receive-pack at the same time to the other,
- they would deadlock with only one thread. For larger numbers of
- clients, the two threads are also sufficient.
-
- Copyright 2013 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.Threads.XMPPPusher where
import Assistant.Common
import Assistant.NetMessager
import Assistant.Types.NetMessager
import Assistant.WebApp (UrlRenderer)
import Assistant.WebApp.Configurators.XMPP (checkCloudRepos)
import Assistant.XMPP.Git
import Control.Exception as E
xmppSendPackThread :: UrlRenderer -> NamedThread
xmppSendPackThread = pusherThread "XMPPSendPack" SendPack
xmppReceivePackThread :: UrlRenderer -> NamedThread
xmppReceivePackThread = pusherThread "XMPPReceivePack" ReceivePack
pusherThread :: String -> PushSide -> UrlRenderer -> NamedThread
pusherThread threadname side urlrenderer = namedThread threadname $ go Nothing
where
go lastpushedto = do
msg <- waitPushInitiation side $ selectNextPush lastpushedto
debug ["started running push", logNetMessage msg]
runpush <- asIO $ runPush checker msg
r <- liftIO (E.try runpush :: IO (Either SomeException (Maybe ClientID)))
let successful = case r of
Right (Just _) -> True
_ -> False
{- Empty the inbox, because stuff may have
- been left in it if the push failed. -}
let justpushedto = getclient msg
maybe noop (`emptyInbox` side) justpushedto
debug ["finished running push", logNetMessage msg, show successful]
go $ if successful then justpushedto else lastpushedto
checker = checkCloudRepos urlrenderer
getclient (Pushing cid _) = Just cid
getclient _ = Nothing
{- Select the next push to run from the queue.
- The queue cannot be empty!
-
- We prefer to select the most recently added push, because its requestor
- is more likely to still be connected.
-
- When passed the ID of a client we just pushed to, we prefer to not
- immediately push again to that same client. This avoids one client
- drowing out others. So pushes from the client we just pushed to are
- relocated to the beginning of the list, to be processed later.
-}
selectNextPush :: Maybe ClientID -> [NetMessage] -> (NetMessage, [NetMessage])
selectNextPush _ (m:[]) = (m, []) -- common case
selectNextPush _ [] = error "selectNextPush: empty list"
selectNextPush lastpushedto l = go [] l
where
go (r:ejected) [] = (r, ejected)
go rejected (m:ms) = case m of
(Pushing clientid _)
| Just clientid /= lastpushedto -> (m, rejected ++ ms)
_ -> go (m:rejected) ms
go [] [] = undefined

View file

@ -11,15 +11,15 @@ import Common.Annex
import Assistant.Pairing import Assistant.Pairing
import Git.Types import Git.Types
import qualified Data.Text as T
import qualified Data.Set as S
import qualified Data.Map as M
import qualified Data.DList as D
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.MSampleVar import Control.Concurrent.MSampleVar
import Data.ByteString (ByteString) import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as B8 import qualified Data.ByteString.Char8 as B8
import Data.Text (Text) import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Set as S
import qualified Data.Map as M
import qualified Data.DList as D
{- Messages that can be sent out of band by a network messager. -} {- Messages that can be sent out of band by a network messager. -}
data NetMessage data NetMessage
@ -130,15 +130,11 @@ data NetMessager = NetMessager
, sentImportantNetMessages :: TMVar (M.Map ClientID (S.Set NetMessage)) , sentImportantNetMessages :: TMVar (M.Map ClientID (S.Set NetMessage))
-- write to this to restart the net messager -- write to this to restart the net messager
, netMessagerRestart :: MSampleVar () , netMessagerRestart :: MSampleVar ()
-- only one side of a push can be running at a time -- queue of incoming messages that request the initiation of pushes
-- the TMVars are empty when nothing is running , netMessagerPushInitiations :: SideMap (TMVar [NetMessage])
, netMessagerPushRunning :: SideMap (TMVar ClientID) -- incoming messages containing data for a running
-- number of threads trying to push to the same client -- (or not yet started) push
-- at the same time (either running, or waiting to run) , netMessagerInboxes :: SideMap Inboxes
, netMessagerPushThreadCount :: SideMap (TVar (M.Map ClientID Int))
-- incoming messages containing data for a push,
-- on a per-client and per-side basis
, netMessagesInboxes :: SideMap Inboxes
} }
newNetMessager :: IO NetMessager newNetMessager :: IO NetMessager
@ -149,4 +145,3 @@ newNetMessager = NetMessager
<*> newEmptySV <*> newEmptySV
<*> mkSideMap newEmptyTMVar <*> mkSideMap newEmptyTMVar
<*> mkSideMap (newTVar M.empty) <*> mkSideMap (newTVar M.empty)
<*> mkSideMap (newTVar M.empty)

View file

@ -101,7 +101,7 @@ makeXMPPGitRemote buddyname jid u = do
- We listen at the other end of the pipe and relay to and from XMPP. - We listen at the other end of the pipe and relay to and from XMPP.
-} -}
xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> Assistant Bool xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> Assistant Bool
xmppPush cid gitpush = runPush SendPack cid $ do xmppPush cid gitpush = do
u <- liftAnnex getUUID u <- liftAnnex getUUID
sendNetMessage $ Pushing cid (StartingPush u) sendNetMessage $ Pushing cid (StartingPush u)
@ -239,7 +239,7 @@ xmppGitRelay = do
{- Relays git receive-pack stdin and stdout via XMPP, as well as propigating {- Relays git receive-pack stdin and stdout via XMPP, as well as propigating
- its exit status to XMPP. -} - its exit status to XMPP. -}
xmppReceivePack :: ClientID -> Assistant Bool xmppReceivePack :: ClientID -> Assistant Bool
xmppReceivePack cid = runPush ReceivePack cid $ do xmppReceivePack cid = do
repodir <- liftAnnex $ fromRepo repoPath repodir <- liftAnnex $ fromRepo repoPath
let p = (proc "git" ["receive-pack", repodir]) let p = (proc "git" ["receive-pack", repodir])
{ std_in = CreatePipe { std_in = CreatePipe
@ -288,11 +288,12 @@ xmppRemotes cid theiruuid = case baseJID <$> parseJID cid of
matching loc r = repoIsUrl r && repoLocation r == loc matching loc r = repoIsUrl r && repoLocation r == loc
knownuuid um r = Remote.uuid r == theiruuid || M.member theiruuid um knownuuid um r = Remote.uuid r == theiruuid || M.member theiruuid um
handlePushInitiation :: (Remote -> Assistant ()) -> NetMessage -> Assistant () {- Returns the ClientID that it pushed to. -}
handlePushInitiation checkcloudrepos (Pushing cid (PushRequest theiruuid)) = runPush :: (Remote -> Assistant ()) -> NetMessage -> Assistant (Maybe ClientID)
runPush checkcloudrepos (Pushing cid (PushRequest theiruuid)) =
go =<< liftAnnex (inRepo Git.Branch.current) go =<< liftAnnex (inRepo Git.Branch.current)
where where
go Nothing = noop go Nothing = return Nothing
go (Just branch) = do go (Just branch) = do
rs <- xmppRemotes cid theiruuid rs <- xmppRemotes cid theiruuid
liftAnnex $ Annex.Branch.commit "update" liftAnnex $ Annex.Branch.commit "update"
@ -301,17 +302,24 @@ handlePushInitiation checkcloudrepos (Pushing cid (PushRequest theiruuid)) =
<*> getUUID <*> getUUID
liftIO $ Command.Sync.updateBranch (Command.Sync.syncBranch branch) g liftIO $ Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
selfjid <- ((T.unpack <$>) . xmppClientID) <$> getDaemonStatus selfjid <- ((T.unpack <$>) . xmppClientID) <$> getDaemonStatus
forM_ rs $ \r -> do if null rs
void $ alertWhile (syncAlert [r]) $ then return Nothing
xmppPush cid (taggedPush u selfjid branch r) else do
checkcloudrepos r forM_ rs $ \r -> do
handlePushInitiation checkcloudrepos (Pushing cid (StartingPush theiruuid)) = do void $ alertWhile (syncAlert [r]) $
xmppPush cid (taggedPush u selfjid branch r)
checkcloudrepos r
return $ Just cid
runPush checkcloudrepos (Pushing cid (StartingPush theiruuid)) = do
rs <- xmppRemotes cid theiruuid rs <- xmppRemotes cid theiruuid
unless (null rs) $ do if null rs
void $ alertWhile (syncAlert rs) $ then return Nothing
xmppReceivePack cid else do
mapM_ checkcloudrepos rs void $ alertWhile (syncAlert rs) $
handlePushInitiation _ _ = noop xmppReceivePack cid
mapM_ checkcloudrepos rs
return $ Just cid
runPush _ _ = return Nothing
{- Check if any of the shas that can be pushed are ones we do not {- Check if any of the shas that can be pushed are ones we do not
- have. - have.
@ -370,4 +378,3 @@ extractSequence :: NetMessage -> Maybe Int
extractSequence (Pushing _ (ReceivePackOutput seqnum _)) = Just seqnum extractSequence (Pushing _ (ReceivePackOutput seqnum _)) = Just seqnum
extractSequence (Pushing _ (SendPackOutput seqnum _)) = Just seqnum extractSequence (Pushing _ (SendPackOutput seqnum _)) = Just seqnum
extractSequence _ = Nothing extractSequence _ = Nothing

View file

@ -23,7 +23,17 @@ newTList = newEmptyTMVar
{- Gets the contents of the TList. Blocks when empty. {- Gets the contents of the TList. Blocks when empty.
- TList is left empty. -} - TList is left empty. -}
getTList :: TList a -> STM [a] getTList :: TList a -> STM [a]
getTList tlist = D.toList <$> takeTMVar tlist getTList tlist = D.toList <$> getTDList tlist
getTDList :: TList a -> STM (D.DList a)
getTDList = takeTMVar
{- Replaces the contents of the TList. -}
setTList :: TList a -> [a] -> STM ()
setTList tlist = setTDList tlist . D.fromList
setTDList :: TList a -> D.DList a -> STM ()
setTDList tlist = modifyTList tlist . const
{- Takes anything currently in the TList, without blocking. {- Takes anything currently in the TList, without blocking.
- TList is left empty. -} - TList is left empty. -}
@ -54,6 +64,3 @@ snocTList tlist v = modifyTList tlist $ \dl -> D.snoc dl v
appendTList :: TList a -> [a] -> STM () appendTList :: TList a -> [a] -> STM ()
appendTList tlist l = modifyTList tlist $ \dl -> D.append dl (D.fromList l) appendTList tlist l = modifyTList tlist $ \dl -> D.append dl (D.fromList l)
setTList :: TList a -> [a] -> STM ()
setTList tlist l = modifyTList tlist $ const $ D.fromList l