diff --git a/Annex/Cluster.hs b/Annex/Cluster.hs index 5d8213e97e..599dc1c417 100644 --- a/Annex/Cluster.hs +++ b/Annex/Cluster.hs @@ -53,9 +53,11 @@ proxyCluster clusteruuid proxydone servermode clientside protoerrhandler = do -- nodes. let protocolversion = min maxProtocolVersion clientmaxversion selectnode <- clusterProxySelector clusteruuid protocolversion + concurrencyconfig <- getConcurrencyConfig proxy proxydone proxymethods servermode clientside (fromClusterUUID clusteruuid) - selectnode protocolversion othermsg protoerrhandler + selectnode concurrencyconfig protocolversion + othermsg protoerrhandler withclientversion Nothing = proxydone clusterProxySelector :: ClusterUUID -> ProtocolVersion -> Annex ProxySelector diff --git a/CmdLine/Action.hs b/CmdLine/Action.hs index b274065682..fbfc92272c 100644 --- a/CmdLine/Action.hs +++ b/CmdLine/Action.hs @@ -149,7 +149,7 @@ commandAction start = do showEndMessage startmsg False return False -{- Waits for all worker threads to finish and merges their AnnexStates +{- Waits for all worker thrneads to finish and merges their AnnexStates - back into the current Annex's state. -} finishCommandActions :: Annex () diff --git a/Command/P2PStdIO.hs b/Command/P2PStdIO.hs index a1b1459244..3724d4222a 100644 --- a/Command/P2PStdIO.hs +++ b/Command/P2PStdIO.hs @@ -74,9 +74,11 @@ performProxy clientuuid servermode remote = do let closer = do closeRemoteSide remoteside p2pDone + concurrencyconfig <- noConcurrencyConfig proxy closer proxymethods servermode clientside (Remote.uuid remote) (singleProxySelector remoteside) + concurrencyconfig protocolversion othermsg p2pErrHandler withclientversion _ Nothing = p2pDone diff --git a/P2P/Proxy.hs b/P2P/Proxy.hs index 184cb91175..3a569aed51 100644 --- a/P2P/Proxy.hs +++ b/P2P/Proxy.hs @@ -11,14 +11,20 @@ module P2P.Proxy where import Annex.Common +import qualified Annex import P2P.Protocol import P2P.IO import Utility.Metered import Git.FilePath +import Types.Concurrency +import Annex.Concurrent import Data.Either import Control.Concurrent.STM +import Control.Concurrent.Async +import qualified Control.Concurrent.MSem as MSem import qualified Data.ByteString.Lazy as L +import GHC.Conc type ProtoCloser = Annex () @@ -141,6 +147,7 @@ proxy -> ClientSide -> UUID -> ProxySelector + -> ConcurrencyConfig -> ProtocolVersion -- ^ Protocol version being spoken between the proxy and the -- client. When there are multiple remotes, some may speak an @@ -149,7 +156,7 @@ proxy -- ^ non-VERSION message that was received from the client when -- negotiating protocol version, and has not been responded to yet -> ProtoErrorHandled r -proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remoteuuid proxyselector (ProtocolVersion protocolversion) othermessage protoerrhandler = do +proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remoteuuid proxyselector concurrencyconfig (ProtocolVersion protocolversion) othermessage protoerrhandler = do case othermessage of Nothing -> protoerrhandler proxynextclientmessage $ client $ net $ sendMessage $ VERSION $ ProtocolVersion protocolversion @@ -276,7 +283,7 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo protoerrhandler proxynextclientmessage $ client $ net $ sendMessage FAILURE handleREMOVE remotesides k message = do - v <- forM remotesides $ \r -> + v <- forMC concurrencyconfig remotesides $ \r -> runRemoteSideOrSkipFailed r $ do net $ sendMessage message net receiveMessage >>= return . \case @@ -370,7 +377,7 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo let alreadyhave = \case Right (Left _) -> True _ -> False - l <- forM remotesides initiate + l <- forMC concurrencyconfig remotesides initiate if all alreadyhave l then if protocolversion < 2 then protoerrhandler proxynextclientmessage $ @@ -392,7 +399,7 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo let totallen = datalen + minoffset -- Tell each remote how much data to expect, depending -- on the remote's offset. - rs <- forM remotes $ \remote@(remoteside, remoteoffset) -> + rs <- forMC concurrencyconfig remotes $ \remote@(remoteside, remoteoffset) -> runRemoteSideOrSkipFailed remoteside $ do net $ sendMessage $ DATA $ Len $ totallen - remoteoffset @@ -409,7 +416,7 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo let (chunk, b') = L.splitAt chunksize b let chunklen = fromIntegral (L.length chunk) let !n' = n + chunklen - rs' <- forM rs $ \r@(remoteside, remoteoffset) -> + rs' <- forMC concurrencyconfig rs $ \r@(remoteside, remoteoffset) -> if n >= remoteoffset then runRemoteSideOrSkipFailed remoteside $ do net $ sendBytes (Len chunklen) chunk nullMeterUpdate @@ -471,7 +478,7 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo net receiveMessage where finish a = do - storeduuids <- forM rs $ \r -> + storeduuids <- forMC concurrencyconfig rs $ \r -> runRemoteSideOrSkipFailed r a >>= \case Just (Just resp) -> relayPUTRecord k r resp @@ -492,3 +499,37 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo <$> fromRepo (fromTopFilePath (asTopFilePath f)) getassociatedfile (ProtoAssociatedFile (AssociatedFile Nothing)) = return $ AssociatedFile Nothing + +data ConcurrencyConfig = ConcurrencyConfig Int (MSem.MSem Int) + +noConcurrencyConfig :: Annex ConcurrencyConfig +noConcurrencyConfig = liftIO $ ConcurrencyConfig 1 <$> MSem.new 1 + +getConcurrencyConfig :: Annex ConcurrencyConfig +getConcurrencyConfig = (annexJobs <$> Annex.getGitConfig) >>= \case + NonConcurrent -> noConcurrencyConfig + Concurrent n -> go n + ConcurrentPerCpu -> go =<< liftIO getNumProcessors + where + go n = do + c <- liftIO getNumCapabilities + when (n > c) $ + liftIO $ setNumCapabilities n + setConcurrency (ConcurrencyGitConfig (Concurrent n)) + msem <- liftIO $ MSem.new n + return (ConcurrencyConfig n msem) + +forMC :: ConcurrencyConfig -> [a] -> (a -> Annex b) -> Annex [b] +forMC _ (x:[]) a = do + r <- a x + return [r] +forMC (ConcurrencyConfig n msem) xs a + | n < 2 = forM xs a + | otherwise = do + runners <- forM xs $ \x -> + forkState $ bracketIO + (MSem.wait msem) + (const $ MSem.signal msem) + (const $ a x) + mapM id =<< liftIO (forConcurrently runners id) + diff --git a/doc/clusters.mdwn b/doc/clusters.mdwn index 8ad9e96443..bf79adbae8 100644 --- a/doc/clusters.mdwn +++ b/doc/clusters.mdwn @@ -83,6 +83,13 @@ in the git-annex branch. That tells other repositories about the cluster. Started proxying for node2 Started proxying for node3 +Operations that affect multiple nodes of a cluster can often be sped up by +configuring annex.jobs in the repository that will serve the cluster to +clients. In the example above, the nodes are all disk bound, so operating +on more than one at a time will likely be faster. + + $ git config annex.jobs cpus + ## preferred content of clusters The preferred content of the cluster can be configured. This tells @@ -94,8 +101,8 @@ to do the configuration in a repository that has the cluster as a remote. For example: - git-annex wanted bigserver-mycluster standard - git-annex group bigserver-mycluster archive + $ git-annex wanted bigserver-mycluster standard + $ git-annex group bigserver-mycluster archive By default, when a file is uploaded to a cluster, it is stored on every node of the cluster. To control which nodes to store to, the [[preferred_content]] of diff --git a/doc/todo/git-annex_proxies.mdwn b/doc/todo/git-annex_proxies.mdwn index 2666640fa4..e3491b57cc 100644 --- a/doc/todo/git-annex_proxies.mdwn +++ b/doc/todo/git-annex_proxies.mdwn @@ -31,8 +31,6 @@ For June's work on [[design/passthrough_proxy]], remaining todos: round-robin amoung remotes, and prefer to avoid using remotes that other git-annex processes are currently using. -* Support annex.jobs for clusters. - * Basic proxying to special remote support (non-streaming). * Support proxies-of-proxies better, eg foo-bar-baz. @@ -104,3 +102,6 @@ For June's work on [[design/passthrough_proxy]], remaining todos: * On upload to cluster, send to nodes where its preferred content, and not to other nodes. (done) + +* Support annex.jobs for clusters. (done) +