support annex.jobs for clusters

This commit is contained in:
Joey Hess 2024-06-25 14:52:47 -04:00
parent 818030e4d3
commit cec2848e8a
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
6 changed files with 65 additions and 12 deletions

View file

@ -11,14 +11,20 @@
module P2P.Proxy where
import Annex.Common
import qualified Annex
import P2P.Protocol
import P2P.IO
import Utility.Metered
import Git.FilePath
import Types.Concurrency
import Annex.Concurrent
import Data.Either
import Control.Concurrent.STM
import Control.Concurrent.Async
import qualified Control.Concurrent.MSem as MSem
import qualified Data.ByteString.Lazy as L
import GHC.Conc
type ProtoCloser = Annex ()
@ -141,6 +147,7 @@ proxy
-> ClientSide
-> UUID
-> ProxySelector
-> ConcurrencyConfig
-> ProtocolVersion
-- ^ Protocol version being spoken between the proxy and the
-- client. When there are multiple remotes, some may speak an
@ -149,7 +156,7 @@ proxy
-- ^ non-VERSION message that was received from the client when
-- negotiating protocol version, and has not been responded to yet
-> ProtoErrorHandled r
proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remoteuuid proxyselector (ProtocolVersion protocolversion) othermessage protoerrhandler = do
proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remoteuuid proxyselector concurrencyconfig (ProtocolVersion protocolversion) othermessage protoerrhandler = do
case othermessage of
Nothing -> protoerrhandler proxynextclientmessage $
client $ net $ sendMessage $ VERSION $ ProtocolVersion protocolversion
@ -276,7 +283,7 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
protoerrhandler proxynextclientmessage $
client $ net $ sendMessage FAILURE
handleREMOVE remotesides k message = do
v <- forM remotesides $ \r ->
v <- forMC concurrencyconfig remotesides $ \r ->
runRemoteSideOrSkipFailed r $ do
net $ sendMessage message
net receiveMessage >>= return . \case
@ -370,7 +377,7 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
let alreadyhave = \case
Right (Left _) -> True
_ -> False
l <- forM remotesides initiate
l <- forMC concurrencyconfig remotesides initiate
if all alreadyhave l
then if protocolversion < 2
then protoerrhandler proxynextclientmessage $
@ -392,7 +399,7 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
let totallen = datalen + minoffset
-- Tell each remote how much data to expect, depending
-- on the remote's offset.
rs <- forM remotes $ \remote@(remoteside, remoteoffset) ->
rs <- forMC concurrencyconfig remotes $ \remote@(remoteside, remoteoffset) ->
runRemoteSideOrSkipFailed remoteside $ do
net $ sendMessage $ DATA $ Len $
totallen - remoteoffset
@ -409,7 +416,7 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
let (chunk, b') = L.splitAt chunksize b
let chunklen = fromIntegral (L.length chunk)
let !n' = n + chunklen
rs' <- forM rs $ \r@(remoteside, remoteoffset) ->
rs' <- forMC concurrencyconfig rs $ \r@(remoteside, remoteoffset) ->
if n >= remoteoffset
then runRemoteSideOrSkipFailed remoteside $ do
net $ sendBytes (Len chunklen) chunk nullMeterUpdate
@ -471,7 +478,7 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
net receiveMessage
where
finish a = do
storeduuids <- forM rs $ \r ->
storeduuids <- forMC concurrencyconfig rs $ \r ->
runRemoteSideOrSkipFailed r a >>= \case
Just (Just resp) ->
relayPUTRecord k r resp
@ -492,3 +499,37 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
<$> fromRepo (fromTopFilePath (asTopFilePath f))
getassociatedfile (ProtoAssociatedFile (AssociatedFile Nothing)) =
return $ AssociatedFile Nothing
data ConcurrencyConfig = ConcurrencyConfig Int (MSem.MSem Int)
noConcurrencyConfig :: Annex ConcurrencyConfig
noConcurrencyConfig = liftIO $ ConcurrencyConfig 1 <$> MSem.new 1
getConcurrencyConfig :: Annex ConcurrencyConfig
getConcurrencyConfig = (annexJobs <$> Annex.getGitConfig) >>= \case
NonConcurrent -> noConcurrencyConfig
Concurrent n -> go n
ConcurrentPerCpu -> go =<< liftIO getNumProcessors
where
go n = do
c <- liftIO getNumCapabilities
when (n > c) $
liftIO $ setNumCapabilities n
setConcurrency (ConcurrencyGitConfig (Concurrent n))
msem <- liftIO $ MSem.new n
return (ConcurrencyConfig n msem)
forMC :: ConcurrencyConfig -> [a] -> (a -> Annex b) -> Annex [b]
forMC _ (x:[]) a = do
r <- a x
return [r]
forMC (ConcurrencyConfig n msem) xs a
| n < 2 = forM xs a
| otherwise = do
runners <- forM xs $ \x ->
forkState $ bracketIO
(MSem.wait msem)
(const $ MSem.signal msem)
(const $ a x)
mapM id =<< liftIO (forConcurrently runners id)