diff --git a/Assistant.hs b/Assistant.hs index 36fef049d7..7e9f82449e 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -37,6 +37,7 @@ import Assistant.Threads.PairListener #endif #ifdef WITH_XMPP import Assistant.Threads.XMPPClient +import Assistant.Threads.XMPPPusher #endif #else #warning Building without the webapp. You probably need to install Yesod.. @@ -111,6 +112,8 @@ startDaemon assistant foreground listenhost startbrowser = do #endif #ifdef WITH_XMPP , assist $ xmppClientThread urlrenderer + , assist $ xmppSendPackThread urlrenderer + , assist $ xmppReceivePackThread urlrenderer #endif #endif , assist $ pushThread diff --git a/Assistant/NetMessager.hs b/Assistant/NetMessager.hs index fe96df56ff..2e786717d5 100644 --- a/Assistant/NetMessager.hs +++ b/Assistant/NetMessager.hs @@ -14,7 +14,6 @@ import Assistant.Types.NetMessager import Control.Concurrent.STM import Control.Concurrent.MSampleVar -import Control.Exception as E import qualified Data.Set as S import qualified Data.Map as M import qualified Data.DList as D @@ -69,73 +68,39 @@ checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager sent <- M.lookup sentclient <$> (readTMVar $ sentImportantNetMessages nm) return (fromMaybe S.empty stored, fromMaybe S.empty sent) -{- Runs an action that runs either the send or receive side of a push. - - Only one such action per side can run at a time. Other pushes, for - - the same, or other clients, need to wait their turn. - - - - Only one push is allowed to wait per client. - - There is no point in building up more. - - - - While the push is running, netMessagesPush will get messages put into it - - relating to this push, while any messages relating to other pushes - - on the same side go to netMessagesDeferred. Once the push finishes, - - those deferred messages will be fed to handledeferred for processing. - -} -runPush :: PushSide -> ClientID -> Assistant Bool -> Assistant Bool -runPush side clientid a = do - debugmsg "preparing to run" - nm <- getAssistant netMessager - let setup = do - (canrun, release) <- atomically $ checkcanrun nm - if canrun - then atomically $ waittorun nm release - else return (False, noop) - let cleanup (_, release) = atomically release - go <- asIO1 $ \(run, _) -> - if run - then do - debugmsg "started running" - r <- a - debugmsg "finished running" - {- Empty the inbox, because stuff may have - - been left in it if the push failed. -} - emptyInbox clientid side - return r - else do - debugmsg "skipping running" - return False - r <- liftIO $ E.bracket setup cleanup go - return r +{- Queues a push initiation message in the queue for the appropriate + - side of the push but only if there is not already an initiation message + - from the same client in the queue. -} +queuePushInitiation :: NetMessage -> Assistant () +queuePushInitiation msg@(Pushing clientid stage) = do + tv <- getPushInitiationQueue side + liftIO $ atomically $ do + r <- tryTakeTMVar tv + case r of + Nothing -> putTMVar tv [msg] + Just l -> do + let !l' = msg : filter differentclient l + putTMVar tv l' where - debugmsg s = netMessagerDebug clientid [s, show side] - -- check that this is one of the two threads allowed - -- to run at the same time, pushing to the same client - -- on the same side - checkcanrun nm = do - let v = getSide side $ netMessagerPushThreadCount nm - m <- readTVar v - case M.lookup clientid m of - Just count - | count > 2 -> return (False, noop) - _ -> do - writeTVar v $ - M.insertWith' (+) clientid 1 m - let release = modifyTVar' v $ - M.insertWith' (-) clientid 1 - return (True, release) - -- block until this is the only thread performing - -- a push on this side, to any client - waittorun nm release = do - let v = getSide side $ netMessagerPushRunning nm - ifM (isNothing <$> tryReadTMVar v) - ( do - putTMVar v clientid - let release' = do - void $ takeTMVar v - release - return (True, release') - , retry - ) + side = pushDestinationSide stage + differentclient (Pushing cid _) = cid /= clientid + differentclient _ = True +queuePushInitiation _ = noop + +{- Waits for a push inititation message to be received, and runs + - function to select a message from the queue. -} +waitPushInitiation :: PushSide -> ([NetMessage] -> (NetMessage, [NetMessage])) -> Assistant NetMessage +waitPushInitiation side selector = do + tv <- getPushInitiationQueue side + liftIO $ atomically $ do + q <- takeTMVar tv + if null q + then retry + else do + let (msg, !q') = selector q + unless (null q') $ + putTMVar tv q' + return msg {- Stores messages for a push into the appropriate inbox. - @@ -198,7 +163,11 @@ emptyInbox clientid side = do getInboxes :: PushSide -> Assistant Inboxes getInboxes side = - getSide side . netMessagesInboxes <$> getAssistant netMessager + getSide side . netMessagerInboxes <$> getAssistant netMessager + +getPushInitiationQueue :: PushSide -> Assistant (TMVar [NetMessage]) +getPushInitiationQueue side = + getSide side . netMessagerPushInitiations <$> getAssistant netMessager netMessagerDebug :: ClientID -> [String] -> Assistant () netMessagerDebug clientid l = debug $ diff --git a/Assistant/Threads/XMPPClient.hs b/Assistant/Threads/XMPPClient.hs index 929b4c8071..b90a8ca100 100644 --- a/Assistant/Threads/XMPPClient.hs +++ b/Assistant/Threads/XMPPClient.hs @@ -20,7 +20,6 @@ import qualified Remote import Utility.ThreadScheduler import Assistant.WebApp (UrlRenderer) import Assistant.WebApp.Types hiding (liftAssistant) -import Assistant.WebApp.Configurators.XMPP (checkCloudRepos) import Assistant.Alert import Assistant.Pairing import Assistant.XMPP.Git @@ -108,10 +107,8 @@ xmppClient urlrenderer d creds = maybe noop (inAssistant . pairMsgReceived urlrenderer stage u selfjid) (parseJID c) handle _ (GotNetMessage m@(Pushing _ pushstage)) | isPushNotice pushstage = inAssistant $ handlePushNotice m - | isPushInitiation pushstage = inAssistant $ do - let checker = checkCloudRepos urlrenderer - void $ forkIO <~> handlePushInitiation checker m - | otherwise = void $ inAssistant $ storeInbox m + | isPushInitiation pushstage = inAssistant $ queuePushInitiation m + | otherwise = inAssistant $ storeInbox m handle _ (Ignorable _) = noop handle _ (Unknown _) = noop handle _ (ProtocolError _) = noop diff --git a/Assistant/Threads/XMPPPusher.hs b/Assistant/Threads/XMPPPusher.hs new file mode 100644 index 0000000000..30c91c7f09 --- /dev/null +++ b/Assistant/Threads/XMPPPusher.hs @@ -0,0 +1,81 @@ +{- git-annex XMPP pusher threads + - + - This is a pair of threads. One handles git send-pack, + - and the other git receive-pack. Each thread can be running at most + - one such operation at a time. + - + - Why not use a single thread? Consider two clients A and B. + - If both decide to run a receive-pack at the same time to the other, + - they would deadlock with only one thread. For larger numbers of + - clients, the two threads are also sufficient. + - + - Copyright 2013 Joey Hess + - + - Licensed under the GNU GPL version 3 or higher. + -} + +module Assistant.Threads.XMPPPusher where + +import Assistant.Common +import Assistant.NetMessager +import Assistant.Types.NetMessager +import Assistant.WebApp (UrlRenderer) +import Assistant.WebApp.Configurators.XMPP (checkCloudRepos) +import Assistant.XMPP.Git + +import Control.Exception as E + +xmppSendPackThread :: UrlRenderer -> NamedThread +xmppSendPackThread = pusherThread "XMPPSendPack" SendPack + +xmppReceivePackThread :: UrlRenderer -> NamedThread +xmppReceivePackThread = pusherThread "XMPPReceivePack" ReceivePack + +pusherThread :: String -> PushSide -> UrlRenderer -> NamedThread +pusherThread threadname side urlrenderer = namedThread threadname $ go Nothing + where + go lastpushedto = do + msg <- waitPushInitiation side $ selectNextPush lastpushedto + debug ["started running push", logNetMessage msg] + + runpush <- asIO $ runPush checker msg + r <- liftIO (E.try runpush :: IO (Either SomeException (Maybe ClientID))) + let successful = case r of + Right (Just _) -> True + _ -> False + + {- Empty the inbox, because stuff may have + - been left in it if the push failed. -} + let justpushedto = getclient msg + maybe noop (`emptyInbox` side) justpushedto + + debug ["finished running push", logNetMessage msg, show successful] + go $ if successful then justpushedto else lastpushedto + + checker = checkCloudRepos urlrenderer + + getclient (Pushing cid _) = Just cid + getclient _ = Nothing + +{- Select the next push to run from the queue. + - The queue cannot be empty! + - + - We prefer to select the most recently added push, because its requestor + - is more likely to still be connected. + - + - When passed the ID of a client we just pushed to, we prefer to not + - immediately push again to that same client. This avoids one client + - drowing out others. So pushes from the client we just pushed to are + - relocated to the beginning of the list, to be processed later. + -} +selectNextPush :: Maybe ClientID -> [NetMessage] -> (NetMessage, [NetMessage]) +selectNextPush _ (m:[]) = (m, []) -- common case +selectNextPush _ [] = error "selectNextPush: empty list" +selectNextPush lastpushedto l = go [] l + where + go (r:ejected) [] = (r, ejected) + go rejected (m:ms) = case m of + (Pushing clientid _) + | Just clientid /= lastpushedto -> (m, rejected ++ ms) + _ -> go (m:rejected) ms + go [] [] = undefined diff --git a/Assistant/Types/NetMessager.hs b/Assistant/Types/NetMessager.hs index 525ff29f20..4b4e614a2b 100644 --- a/Assistant/Types/NetMessager.hs +++ b/Assistant/Types/NetMessager.hs @@ -11,15 +11,15 @@ import Common.Annex import Assistant.Pairing import Git.Types +import qualified Data.Text as T +import qualified Data.Set as S +import qualified Data.Map as M +import qualified Data.DList as D import Control.Concurrent.STM import Control.Concurrent.MSampleVar import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as B8 import Data.Text (Text) -import qualified Data.Text as T -import qualified Data.Set as S -import qualified Data.Map as M -import qualified Data.DList as D {- Messages that can be sent out of band by a network messager. -} data NetMessage @@ -130,15 +130,11 @@ data NetMessager = NetMessager , sentImportantNetMessages :: TMVar (M.Map ClientID (S.Set NetMessage)) -- write to this to restart the net messager , netMessagerRestart :: MSampleVar () - -- only one side of a push can be running at a time - -- the TMVars are empty when nothing is running - , netMessagerPushRunning :: SideMap (TMVar ClientID) - -- number of threads trying to push to the same client - -- at the same time (either running, or waiting to run) - , netMessagerPushThreadCount :: SideMap (TVar (M.Map ClientID Int)) - -- incoming messages containing data for a push, - -- on a per-client and per-side basis - , netMessagesInboxes :: SideMap Inboxes + -- queue of incoming messages that request the initiation of pushes + , netMessagerPushInitiations :: SideMap (TMVar [NetMessage]) + -- incoming messages containing data for a running + -- (or not yet started) push + , netMessagerInboxes :: SideMap Inboxes } newNetMessager :: IO NetMessager @@ -149,4 +145,3 @@ newNetMessager = NetMessager <*> newEmptySV <*> mkSideMap newEmptyTMVar <*> mkSideMap (newTVar M.empty) - <*> mkSideMap (newTVar M.empty) diff --git a/Assistant/XMPP/Git.hs b/Assistant/XMPP/Git.hs index 1e8ccca622..01585a711a 100644 --- a/Assistant/XMPP/Git.hs +++ b/Assistant/XMPP/Git.hs @@ -101,7 +101,7 @@ makeXMPPGitRemote buddyname jid u = do - We listen at the other end of the pipe and relay to and from XMPP. -} xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> Assistant Bool -xmppPush cid gitpush = runPush SendPack cid $ do +xmppPush cid gitpush = do u <- liftAnnex getUUID sendNetMessage $ Pushing cid (StartingPush u) @@ -239,7 +239,7 @@ xmppGitRelay = do {- Relays git receive-pack stdin and stdout via XMPP, as well as propigating - its exit status to XMPP. -} xmppReceivePack :: ClientID -> Assistant Bool -xmppReceivePack cid = runPush ReceivePack cid $ do +xmppReceivePack cid = do repodir <- liftAnnex $ fromRepo repoPath let p = (proc "git" ["receive-pack", repodir]) { std_in = CreatePipe @@ -288,11 +288,12 @@ xmppRemotes cid theiruuid = case baseJID <$> parseJID cid of matching loc r = repoIsUrl r && repoLocation r == loc knownuuid um r = Remote.uuid r == theiruuid || M.member theiruuid um -handlePushInitiation :: (Remote -> Assistant ()) -> NetMessage -> Assistant () -handlePushInitiation checkcloudrepos (Pushing cid (PushRequest theiruuid)) = +{- Returns the ClientID that it pushed to. -} +runPush :: (Remote -> Assistant ()) -> NetMessage -> Assistant (Maybe ClientID) +runPush checkcloudrepos (Pushing cid (PushRequest theiruuid)) = go =<< liftAnnex (inRepo Git.Branch.current) where - go Nothing = noop + go Nothing = return Nothing go (Just branch) = do rs <- xmppRemotes cid theiruuid liftAnnex $ Annex.Branch.commit "update" @@ -301,17 +302,24 @@ handlePushInitiation checkcloudrepos (Pushing cid (PushRequest theiruuid)) = <*> getUUID liftIO $ Command.Sync.updateBranch (Command.Sync.syncBranch branch) g selfjid <- ((T.unpack <$>) . xmppClientID) <$> getDaemonStatus - forM_ rs $ \r -> do - void $ alertWhile (syncAlert [r]) $ - xmppPush cid (taggedPush u selfjid branch r) - checkcloudrepos r -handlePushInitiation checkcloudrepos (Pushing cid (StartingPush theiruuid)) = do + if null rs + then return Nothing + else do + forM_ rs $ \r -> do + void $ alertWhile (syncAlert [r]) $ + xmppPush cid (taggedPush u selfjid branch r) + checkcloudrepos r + return $ Just cid +runPush checkcloudrepos (Pushing cid (StartingPush theiruuid)) = do rs <- xmppRemotes cid theiruuid - unless (null rs) $ do - void $ alertWhile (syncAlert rs) $ - xmppReceivePack cid - mapM_ checkcloudrepos rs -handlePushInitiation _ _ = noop + if null rs + then return Nothing + else do + void $ alertWhile (syncAlert rs) $ + xmppReceivePack cid + mapM_ checkcloudrepos rs + return $ Just cid +runPush _ _ = return Nothing {- Check if any of the shas that can be pushed are ones we do not - have. @@ -370,4 +378,3 @@ extractSequence :: NetMessage -> Maybe Int extractSequence (Pushing _ (ReceivePackOutput seqnum _)) = Just seqnum extractSequence (Pushing _ (SendPackOutput seqnum _)) = Just seqnum extractSequence _ = Nothing - diff --git a/Utility/TList.hs b/Utility/TList.hs index 716f72017b..e4bb95498d 100644 --- a/Utility/TList.hs +++ b/Utility/TList.hs @@ -23,7 +23,17 @@ newTList = newEmptyTMVar {- Gets the contents of the TList. Blocks when empty. - TList is left empty. -} getTList :: TList a -> STM [a] -getTList tlist = D.toList <$> takeTMVar tlist +getTList tlist = D.toList <$> getTDList tlist + +getTDList :: TList a -> STM (D.DList a) +getTDList = takeTMVar + +{- Replaces the contents of the TList. -} +setTList :: TList a -> [a] -> STM () +setTList tlist = setTDList tlist . D.fromList + +setTDList :: TList a -> D.DList a -> STM () +setTDList tlist = modifyTList tlist . const {- Takes anything currently in the TList, without blocking. - TList is left empty. -} @@ -54,6 +64,3 @@ snocTList tlist v = modifyTList tlist $ \dl -> D.snoc dl v appendTList :: TList a -> [a] -> STM () appendTList tlist l = modifyTList tlist $ \dl -> D.append dl (D.fromList l) - -setTList :: TList a -> [a] -> STM () -setTList tlist l = modifyTList tlist $ const $ D.fromList l