git-annex/P2P/Proxy.hs
Joey Hess 54dc1d6f6e
fix recording present on the proxy when proxying DATA-PRESENT
Apparently the protoerrhandler parameter never runs. Also the const
typo prevented the type checker from complaining that relayPUTRecord was
being called with 1 less parameter than needed.

So move the relayPUTRecord out of it.
2024-10-30 13:17:31 -04:00

740 lines
25 KiB
Haskell

{- P2P protocol proxying
-
- Copyright 2024 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
{-# LANGUAGE RankNTypes, FlexibleContexts, ScopedTypeVariables #-}
{-# LANGUAGE BangPatterns #-}
module P2P.Proxy where
import Annex.Common
import qualified Annex
import P2P.Protocol
import P2P.IO
import Utility.Metered
import Utility.MonotonicClock
import Git.FilePath
import Types.Concurrency
import Annex.Concurrent
import qualified Remote
import Data.Either
import Control.Concurrent.STM
import Control.Concurrent.Async
import qualified Control.Concurrent.MSem as MSem
import qualified Data.ByteString.Lazy as L
import qualified Data.Set as S
import qualified Data.Map as M
import Data.Unique
import GHC.Conc
type ProtoCloser = Annex ()
data ClientSide = ClientSide RunState P2PConnection
newtype RemoteSideId = RemoteSideId Unique
deriving (Eq, Ord)
data RemoteSide = RemoteSide
{ remote :: Remote
, remoteConnect :: Annex (Maybe (RunState, P2PConnection, ProtoCloser))
, remoteTMVar :: TMVar (RunState, P2PConnection, ProtoCloser)
, remoteSideId :: RemoteSideId
, remoteLiveUpdate :: LiveUpdate
}
instance Show RemoteSide where
show rs = show (remote rs)
mkRemoteSide :: Remote -> Annex (Maybe (RunState, P2PConnection, ProtoCloser)) -> Annex RemoteSide
mkRemoteSide r remoteconnect = RemoteSide
<$> pure r
<*> pure remoteconnect
<*> liftIO (atomically newEmptyTMVar)
<*> liftIO (RemoteSideId <$> newUnique)
<*> pure NoLiveUpdate
runRemoteSide :: RemoteSide -> Proto a -> Annex (Either ProtoFailure a)
runRemoteSide remoteside a =
liftIO (atomically $ tryReadTMVar $ remoteTMVar remoteside) >>= \case
Just (runst, conn, _closer) -> liftIO $ runNetProto runst conn a
Nothing -> remoteConnect remoteside >>= \case
Just (runst, conn, closer) -> do
liftIO $ atomically $ putTMVar
(remoteTMVar remoteside)
(runst, conn, closer)
liftIO $ runNetProto runst conn a
Nothing -> giveup "Unable to connect to remote."
closeRemoteSide :: RemoteSide -> Annex ()
closeRemoteSide remoteside =
liftIO (atomically $ tryTakeTMVar $ remoteTMVar remoteside) >>= \case
Just (_, _, closer) -> closer
Nothing -> return ()
{- Selects what remotes to proxy to for top-level P2P protocol
- actions.
- -}
data ProxySelector = ProxySelector
{ proxyCHECKPRESENT :: Key -> Annex (Maybe RemoteSide)
, proxyLOCKCONTENT :: Key -> Annex (Maybe RemoteSide)
, proxyREMOVE :: Key -> Annex [RemoteSide]
-- ^ remove from all of these remotes
, proxyGETTIMESTAMP :: Annex [RemoteSide]
-- ^ should send every remote that proxyREMOVE can
-- ever return for any key
, proxyGET :: Key -> Annex (Maybe RemoteSide)
, proxyPUT :: AssociatedFile -> Key -> Annex [RemoteSide]
-- ^ put to some/all of these remotes
}
singleProxySelector :: RemoteSide -> ProxySelector
singleProxySelector r = ProxySelector
{ proxyCHECKPRESENT = const (pure (Just r))
, proxyLOCKCONTENT = const (pure (Just r))
, proxyREMOVE = const (pure [r])
, proxyGETTIMESTAMP = pure [r]
, proxyGET = const (pure (Just r))
, proxyPUT = const (const (pure [r]))
}
{- To keep this module limited to P2P protocol actions,
- all other actions that a proxy needs to do are provided
- here. -}
data ProxyMethods = ProxyMethods
{ removedContent :: LiveUpdate -> UUID -> Key -> Annex ()
-- ^ called when content is removed from a repository
, addedContent :: LiveUpdate -> UUID -> Key -> Annex ()
-- ^ called when content is added to a repository
}
{- Type of function that takes a error handler, which is
- used to handle a ProtoFailure when receiving a message
- from the client or remote.
-}
type ProtoErrorHandled r =
(forall t. ((t -> Annex r) -> Annex (Either ProtoFailure t) -> Annex r)) -> Annex r
{- This is the first thing run when proxying with a client.
- The client has already authenticated. Most clients will send a
- VERSION message, although version 0 clients will not and will send
- some other message, which is returned to handle later.
-
- But before the client will send VERSION, it needs to see AUTH_SUCCESS.
- So send that, although the connection with the remote is not actually
- brought up yet.
-}
getClientProtocolVersion
:: UUID
-> ClientSide
-> (Maybe (ProtocolVersion, Maybe Message) -> Annex r)
-> ProtoErrorHandled r
getClientProtocolVersion remoteuuid (ClientSide clientrunst clientconn) cont protoerrhandler =
protoerrhandler cont $ client $ getClientProtocolVersion' remoteuuid
where
client = liftIO . runNetProto clientrunst clientconn
getClientProtocolVersion'
:: UUID
-> Proto (Maybe (ProtocolVersion, Maybe Message))
getClientProtocolVersion' remoteuuid = do
net $ sendMessage (AUTH_SUCCESS remoteuuid)
msg <- net receiveMessage
case msg of
Nothing -> return Nothing
Just (VERSION v) ->
-- If the client sends a newer version than we
-- understand, reduce it; we need to parse the
-- protocol too.
let v' = min v maxProtocolVersion
in return (Just (v', Nothing))
Just othermsg -> return
(Just (defaultProtocolVersion, Just othermsg))
{- Send negotiated protocol version to the client.
- With a version 0 client, preserves the other protocol message
- received in getClientProtocolVersion. -}
sendClientProtocolVersion
:: ClientSide
-> Maybe Message
-> ProtocolVersion
-> (Maybe Message -> Annex r)
-> ProtoErrorHandled r
sendClientProtocolVersion (ClientSide clientrunst clientconn) othermsg protocolversion cont protoerrhandler =
case othermsg of
Nothing -> protoerrhandler (\() -> cont Nothing) $
client $ net $ sendMessage $ VERSION protocolversion
Just _ -> cont othermsg
where
client = liftIO . runNetProto clientrunst clientconn
{- When speaking to a version 2 client, get the BYPASS message which may be
- sent immediately after VERSION. Returns any other message to be handled
- later. -}
getClientBypass
:: ClientSide
-> ProtocolVersion
-> Maybe Message
-> ((Bypass, Maybe Message) -> Annex r)
-> ProtoErrorHandled r
getClientBypass (ClientSide clientrunst clientconn) (ProtocolVersion protocolversion) Nothing cont protoerrhandler
| protocolversion < 2 = cont (Bypass S.empty, Nothing)
| otherwise = protoerrhandler cont $
client $ net receiveMessage >>= return . \case
Just (BYPASS bypass) -> (bypass, Nothing)
Just othermsg -> (Bypass S.empty, Just othermsg)
Nothing -> (Bypass S.empty, Nothing)
where
client = liftIO . runNetProto clientrunst clientconn
getClientBypass _ _ (Just othermsg) cont _ =
-- Pass along non-BYPASS message from version 0 client.
cont (Bypass S.empty, (Just othermsg))
data ProxyState = ProxyState
{ proxyRemoteLatestTimestamps :: TVar (M.Map RemoteSideId MonotonicTimestamp)
, proxyRemoteLatestLocalTimestamp :: TVar (Maybe MonotonicTimestamp)
}
mkProxyState :: IO ProxyState
mkProxyState = ProxyState
<$> newTVarIO mempty
<*> newTVarIO Nothing
data ProxyParams = ProxyParams
{ proxyMethods :: ProxyMethods
, proxyState :: ProxyState
, proxyServerMode :: ServerMode
, proxyClientSide :: ClientSide
, proxyUUID :: UUID
, proxySelector :: ProxySelector
, proxyConcurrencyConfig :: ConcurrencyConfig
, proxyClientProtocolVersion :: ProtocolVersion
-- ^ The remote(s) may speak an earlier version, or the same
-- version, but not a later version.
}
{- Proxy between the client and the remote. This picks up after
- sendClientProtocolVersion.
-}
proxy
:: Annex r
-> ProxyParams
-> Maybe Message
-- ^ non-VERSION message that was received from the client when
-- negotiating protocol version, and has not been responded to yet
-> ProtoErrorHandled r
proxy proxydone proxyparams othermsg protoerrhandler = do
case othermsg of
Nothing -> proxynextclientmessage ()
Just message -> proxyclientmessage (Just message)
where
proxyclientmessage Nothing = proxydone
proxyclientmessage (Just message) = proxyRequest
proxydone proxyparams proxynextclientmessage
message protoerrhandler
proxynextclientmessage () = protoerrhandler proxyclientmessage $
client (net receiveMessage)
client = liftIO . runNetProto clientrunst clientconn
ClientSide clientrunst clientconn = proxyClientSide proxyparams
{- Handles proxying a single request between the client and remote. -}
proxyRequest
:: Annex r
-> ProxyParams
-> (() -> Annex r) -- ^ called once the request has been handled
-> Message
-> ProtoErrorHandled r
proxyRequest proxydone proxyparams requestcomplete requestmessage protoerrhandler =
case requestmessage of
CHECKPRESENT k -> proxyCHECKPRESENT (proxySelector proxyparams) k >>= \case
Just remoteside ->
proxyresponse remoteside requestmessage
(const requestcomplete)
Nothing ->
protoerrhandler requestcomplete $
client $ net $ sendMessage FAILURE
LOCKCONTENT k -> proxyLOCKCONTENT (proxySelector proxyparams) k >>= \case
Just remoteside ->
handleLOCKCONTENT remoteside requestmessage
Nothing ->
protoerrhandler requestcomplete $
client $ net $ sendMessage FAILURE
REMOVE k -> do
remotesides <- proxyREMOVE (proxySelector proxyparams) k
servermodechecker checkREMOVEServerMode $
handleREMOVE remotesides k requestmessage
REMOVE_BEFORE _ k -> do
remotesides <- proxyREMOVE (proxySelector proxyparams) k
servermodechecker checkREMOVEServerMode $
handleREMOVE remotesides k requestmessage
GETTIMESTAMP -> do
remotesides <- proxyGETTIMESTAMP (proxySelector proxyparams)
handleGETTIMESTAMP remotesides
GET _ _ k -> proxyGET (proxySelector proxyparams) k >>= \case
Just remoteside -> handleGET remoteside requestmessage
Nothing -> handleGETNoRemoteSide
PUT paf k -> do
af <- getassociatedfile paf
remotesides <- proxyPUT (proxySelector proxyparams) af k
servermodechecker checkPUTServerMode $
handlePUT remotesides k requestmessage
BYPASS _ -> requestcomplete ()
-- These messages involve the git repository, not the
-- annex. So they affect the git repository of the proxy,
-- not the remote.
CONNECT service ->
servermodechecker (checkCONNECTServerMode service) $
-- P2P protocol does not continue after
-- relaying from git.
protoerrhandler (\() -> proxydone) $
client $ net $ relayService service
NOTIFYCHANGE -> protoerr
-- Messages that the client should only send after one of
-- the messages above.
SUCCESS -> protoerr
SUCCESS_PLUS _ -> protoerr
FAILURE -> protoerr
FAILURE_PLUS _ -> protoerr
DATA _ -> protoerr
DATA_PRESENT -> protoerr
VALIDITY _ -> protoerr
UNLOCKCONTENT -> protoerr
-- If the client errors out, give up.
ERROR msg -> giveup $ "client error: " ++ msg
-- Messages that only the server should send.
CONNECTDONE _ -> protoerr
CHANGED _ -> protoerr
AUTH_SUCCESS _ -> protoerr
AUTH_FAILURE -> protoerr
PUT_FROM _ -> protoerr
ALREADY_HAVE -> protoerr
ALREADY_HAVE_PLUS _ -> protoerr
TIMESTAMP _ -> protoerr
-- Early messages that the client should not send now.
AUTH _ _ -> protoerr
VERSION _ -> protoerr
where
client = liftIO . runNetProto clientrunst clientconn
ClientSide clientrunst clientconn = proxyClientSide proxyparams
servermodechecker c a = c (proxyServerMode proxyparams) $ \case
Nothing -> a
Just notallowed ->
protoerrhandler requestcomplete $
client notallowed
-- Send a message to the remote, send its response back to the
-- client, and pass it to the continuation.
proxyresponse remoteside message a =
getresponse (runRemoteSide remoteside) message $ \resp ->
protoerrhandler (a resp) $
client $ net $ sendMessage resp
-- Send a message to the endpoint and get back its response.
getresponse endpoint message handleresp =
protoerrhandler (withresp handleresp) $
endpoint $ net $ do
sendMessage message
receiveMessage
withresp a (Just resp) = a resp
-- Whichever of the remote or client the message was read from
-- hung up.
withresp _ Nothing = proxydone
-- Read a message from one party, send it to the other,
-- and then pass the message to the continuation.
relayonemessage from to cont =
flip protoerrhandler (from $ net receiveMessage) $
withresp $ \message ->
protoerrhandler (cont message) $
to $ net $ sendMessage message
protoerr = do
_ <- client $ net $ sendMessage (ERROR "protocol error")
giveup "protocol error"
handleLOCKCONTENT remoteside msg =
proxyresponse remoteside msg $ \r () -> case r of
SUCCESS -> relayonemessage client
(runRemoteSide remoteside)
(const requestcomplete)
FAILURE -> requestcomplete ()
_ -> requestcomplete ()
-- When there is a single remote, reply with its timestamp,
-- to avoid needing timestamp translation.
handleGETTIMESTAMP (remoteside:[]) = do
liftIO $ atomically $ do
writeTVar (proxyRemoteLatestTimestamps (proxyState proxyparams))
mempty
writeTVar (proxyRemoteLatestLocalTimestamp (proxyState proxyparams))
Nothing
proxyresponse remoteside GETTIMESTAMP
(const requestcomplete)
-- When there are multiple remotes, reply with our local timestamp,
-- and do timestamp translation when sending REMOVE-FROM.
handleGETTIMESTAMP remotesides = do
-- Order of getting timestamps matters.
-- Getting the local time after the time of the remotes
-- means that if there is some delay in getting the time
-- from a remote, that is reflected in the local time,
-- and so reduces the allowed time.
remotetimes <- (M.fromList . mapMaybe join) <$> getremotetimes
localtime <- liftIO currentMonotonicTimestamp
liftIO $ atomically $ do
writeTVar (proxyRemoteLatestTimestamps (proxyState proxyparams))
remotetimes
writeTVar (proxyRemoteLatestLocalTimestamp (proxyState proxyparams))
(Just localtime)
protoerrhandler requestcomplete $
client $ net $ sendMessage (TIMESTAMP localtime)
where
getremotetimes = forMC (proxyConcurrencyConfig proxyparams) remotesides $ \r ->
runRemoteSideOrSkipFailed r $ do
net $ sendMessage GETTIMESTAMP
net receiveMessage >>= return . \case
Just (TIMESTAMP ts) ->
Just (remoteSideId r, ts)
_ -> Nothing
proxyTimestamp ts _ _ Nothing = ts -- not proxying timestamps
proxyTimestamp ts r tsm (Just correspondinglocaltime) =
case M.lookup (remoteSideId r) tsm of
Just oldts -> oldts + (ts - correspondinglocaltime)
Nothing -> ts -- not reached
handleREMOVE [] _ _ =
-- When no places are provided to remove from,
-- don't report a successful remote.
protoerrhandler requestcomplete $
client $ net $ sendMessage FAILURE
handleREMOVE remotesides k message = do
tsm <- liftIO $ readTVarIO $
proxyRemoteLatestTimestamps (proxyState proxyparams)
oldlocaltime <- liftIO $ readTVarIO $
proxyRemoteLatestLocalTimestamp (proxyState proxyparams)
v <- forMC (proxyConcurrencyConfig proxyparams) remotesides $ \r ->
runRemoteSideOrSkipFailed r $ do
case message of
REMOVE_BEFORE ts _ -> do
v <- net getProtocolVersion
if v < ProtocolVersion 3
then net $ sendMessage $
REMOVE k
else net $ sendMessage $
REMOVE_BEFORE (proxyTimestamp ts r tsm oldlocaltime) k
_ -> net $ sendMessage message
net receiveMessage >>= return . \case
Just SUCCESS ->
Just ((True, Nothing), [Remote.uuid (remote r)])
Just (SUCCESS_PLUS us) ->
Just ((True, Nothing), Remote.uuid (remote r):us)
Just FAILURE ->
Just ((False, Nothing), [])
Just (FAILURE_PLUS us) ->
Just ((False, Nothing), us)
Just (ERROR err) ->
Just ((False, Just err), [])
_ -> Nothing
let v' = map join v
let us = concatMap snd $ catMaybes v'
mapM_ (\u -> removedContent (proxyMethods proxyparams) NoLiveUpdate u k) us
protoerrhandler requestcomplete $
client $ net $ sendMessage $
let nonplussed = all (== proxyUUID proxyparams) us
|| proxyClientProtocolVersion proxyparams < ProtocolVersion 2
in if all (maybe False (fst . fst)) v'
then if nonplussed
then SUCCESS
else SUCCESS_PLUS us
else if nonplussed
then case mapMaybe (snd . fst) (catMaybes v') of
[] -> FAILURE
(err:_) -> ERROR err
else FAILURE_PLUS us
-- Send an empty DATA and indicate it was invalid.
handleGETNoRemoteSide = protoerrhandler requestcomplete $
client $ net $ do
sendMessage $ DATA (Len 0)
sendBytes (Len 0) mempty nullMeterUpdate
when (proxyClientProtocolVersion proxyparams /= ProtocolVersion 0) $
sendMessage (VALIDITY Invalid)
void $ receiveMessage
handleGET remoteside message = getresponse (runRemoteSide remoteside) message $
withDATA (relayGET remoteside) $ \case
ERROR err -> protoerrhandler requestcomplete $
client $ net $ sendMessage (ERROR err)
_ -> protoerr
handlePUT (remoteside:[]) k message
| Remote.uuid (remote remoteside) == proxyUUID proxyparams =
getresponse (runRemoteSide remoteside) message $ \resp -> case resp of
ALREADY_HAVE -> protoerrhandler requestcomplete $
client $ net $ sendMessage resp
ALREADY_HAVE_PLUS _ -> protoerrhandler requestcomplete $
client $ net $ sendMessage resp
PUT_FROM _ ->
getresponse client resp $
withDATA
(relayPUT remoteside k)
(handlePut_DATA_PRESENT remoteside k)
_ -> protoerr
handlePUT [] _ _ =
protoerrhandler requestcomplete $
client $ net $ sendMessage ALREADY_HAVE
handlePUT remotesides k message =
handlePutMulti remotesides k message
withDATA a _ message@(DATA len) = a len message
withDATA _ a message = a message
relayGET remoteside len = relayDATAStart client $
relayDATACore len (runRemoteSide remoteside) client $
relayDATAFinish (runRemoteSide remoteside) client $
relayonemessage client (runRemoteSide remoteside) $
const requestcomplete
relayPUT remoteside k len = relayDATAStart (runRemoteSide remoteside) $
relayDATACore len client (runRemoteSide remoteside) $
relayDATAFinish client (runRemoteSide remoteside) $
relayonemessage (runRemoteSide remoteside) client finished
where
finished resp () = do
void $ relayPUTRecord k remoteside resp
requestcomplete ()
relayPUTRecord k remoteside SUCCESS = do
addedContent (proxyMethods proxyparams)
(remoteLiveUpdate remoteside)
(Remote.uuid (remote remoteside))
k
return $ Just [Remote.uuid (remote remoteside)]
relayPUTRecord k remoteside (SUCCESS_PLUS us) = do
addedContent (proxyMethods proxyparams)
(remoteLiveUpdate remoteside)
(Remote.uuid (remote remoteside))
k
forM_ us $ \u ->
addedContent (proxyMethods proxyparams) NoLiveUpdate u k
return $ Just (Remote.uuid (remote remoteside) : us)
relayPUTRecord _ _ _ =
return Nothing
handlePutMulti remotesides k message = do
let initiate remoteside = do
resp <- runRemoteSideOrSkipFailed remoteside $ net $ do
sendMessage message
receiveMessage
case resp of
Just (Just (PUT_FROM (Offset offset))) ->
return $ Right $
Right (remoteside, offset)
Just (Just ALREADY_HAVE) ->
return $ Right $ Left remoteside
Just (Just _) -> protoerr
Just Nothing -> return (Left ())
Nothing -> return (Left ())
let alreadyhave = \case
Right (Left _) -> True
_ -> False
l <- forMC (proxyConcurrencyConfig proxyparams) remotesides initiate
if all alreadyhave l
then if proxyClientProtocolVersion proxyparams < ProtocolVersion 2
then protoerrhandler requestcomplete $
client $ net $ sendMessage ALREADY_HAVE
else protoerrhandler requestcomplete $
client $ net $ sendMessage $ ALREADY_HAVE_PLUS $
filter (/= proxyUUID proxyparams) $
map (Remote.uuid . remote) (lefts (rights l))
else if null (rights l)
-- no response from any remote
then proxydone
else do
let l' = rights (rights l)
let minoffset = minimum (map snd l')
getresponse client (PUT_FROM (Offset minoffset)) $
withDATA (relayPUTMulti minoffset l' k)
(handlePutMulti_DATA_PRESENT l' k)
relayPUTMulti minoffset remotes k (Len datalen) _ = do
-- Tell each remote how much data to expect, depending
-- on the remote's offset.
rs <- forMC (proxyConcurrencyConfig proxyparams) remotes $ \r@(remoteside, remoteoffset) ->
runRemoteSideOrSkipFailed remoteside $ do
net $ sendMessage $ DATA $ Len $
if remoteoffset > totallen
then 0
else totallen - remoteoffset
return r
protoerrhandler (send (catMaybes rs) minoffset) $
client $ net $ receiveBytes (Len datalen) nullMeterUpdate
where
totallen = datalen + minoffset
chunksize = fromIntegral defaultChunkSize
-- Stream the lazy bytestring out to the remotes in chunks.
-- Only start sending to a remote once past its desired
-- offset.
send rs n b = do
let (chunk, b') = L.splitAt chunksize b
let chunklen = fromIntegral (L.length chunk)
let !n' = n + chunklen
rs' <- forMC (proxyConcurrencyConfig proxyparams) rs $ \r@(remoteside, remoteoffset) ->
if n >= remoteoffset
then runRemoteSideOrSkipFailed remoteside $ do
net $ sendBytes (Len chunklen) chunk nullMeterUpdate
return r
else if (n' > remoteoffset)
then do
let chunkoffset = remoteoffset - n
let subchunklen = chunklen - chunkoffset
let subchunk = L.drop (fromIntegral chunkoffset) chunk
runRemoteSideOrSkipFailed remoteside $ do
net $ sendBytes (Len subchunklen) subchunk nullMeterUpdate
return r
else return (Just r)
if L.null b'
then do
-- If we didn't receive as much
-- data as expected, close
-- connections to all the remotes,
-- because they are still waiting
-- on the rest of the data.
when (n' /= totallen) $
mapM_ (closeRemoteSide . fst) rs
sent (catMaybes rs')
else send (catMaybes rs') n' b'
sent [] = proxydone
sent rs = relayDATAFinishMulti k (map fst rs)
runRemoteSideOrSkipFailed remoteside a =
runRemoteSide remoteside a >>= \case
Right v -> return (Just v)
Left _ -> do
-- This connection to the remote is
-- unrecoverable at this point, so close it.
closeRemoteSide remoteside
return Nothing
relayDATAStart x receive message =
protoerrhandler (\() -> receive) $
x $ net $ sendMessage message
relayDATACore len x y a = protoerrhandler send $
x $ net $ receiveBytes len nullMeterUpdate
where
send b = protoerrhandler a $
y $ net $ sendBytes len b nullMeterUpdate
relayDATAFinish x y sendsuccessfailure ()
| proxyClientProtocolVersion proxyparams == ProtocolVersion 0 = sendsuccessfailure
-- Protocol version 1 has a VALID or
-- INVALID message after the data.
| otherwise = relayonemessage x y (\_ () -> sendsuccessfailure)
relayDATAFinishMulti k rs
| proxyClientProtocolVersion proxyparams == ProtocolVersion 0 =
finish $ net receiveMessage
| otherwise =
flip protoerrhandler (client $ net receiveMessage) $
withresp $ \message ->
finish $ do
-- Relay VALID or INVALID message
-- only to remotes that support
-- protocol version 1.
net getProtocolVersion >>= \case
ProtocolVersion 0 -> return ()
_ -> net $ sendMessage message
net receiveMessage
where
finish a = do
storeduuids <- forMC (proxyConcurrencyConfig proxyparams) rs $ \r ->
runRemoteSideOrSkipFailed r a >>= \case
Just (Just resp) ->
relayPUTRecord k r resp
_ -> return Nothing
relayDATAFinishMulti' storeduuids
relayDATAFinishMulti' storeduuids =
protoerrhandler requestcomplete $
client $ net $ sendMessage $
case concat (catMaybes storeduuids) of
[] -> FAILURE
us
| proxyClientProtocolVersion proxyparams < ProtocolVersion 2 -> SUCCESS
| otherwise -> SUCCESS_PLUS us
handlePutMulti_DATA_PRESENT remotes k DATA_PRESENT = do
storeduuids <- forMC (proxyConcurrencyConfig proxyparams) remotes $ \(remoteside, _) -> do
resp <- runRemoteSideOrSkipFailed remoteside $ do
net $ sendMessage DATA_PRESENT
net receiveMessage
case resp of
Just (Just resp') -> relayPUTRecord k remoteside resp'
_ -> return Nothing
relayDATAFinishMulti' storeduuids
handlePutMulti_DATA_PRESENT _ _ _ = protoerr
handlePut_DATA_PRESENT remoteside k DATA_PRESENT =
getresponse (runRemoteSide remoteside) DATA_PRESENT $ \resp -> do
void $ relayPUTRecord k remoteside resp
protoerrhandler requestcomplete $
client $ net $ sendMessage resp
handlePut_DATA_PRESENT _ _ _ = protoerr
-- The associated file received from the P2P protocol
-- is relative to the top of the git repository. But this process
-- may be running with a different cwd.
getassociatedfile (ProtoAssociatedFile (AssociatedFile (Just f))) =
AssociatedFile . Just
<$> fromRepo (fromTopFilePath (asTopFilePath f))
getassociatedfile (ProtoAssociatedFile (AssociatedFile Nothing)) =
return $ AssociatedFile Nothing
data ConcurrencyConfig = ConcurrencyConfig Int (MSem.MSem Int)
noConcurrencyConfig :: Annex ConcurrencyConfig
noConcurrencyConfig = mkConcurrencyConfig 1
mkConcurrencyConfig :: Int -> Annex ConcurrencyConfig
mkConcurrencyConfig n = liftIO $ ConcurrencyConfig n <$> MSem.new n
concurrencyConfigJobs :: Annex ConcurrencyConfig
concurrencyConfigJobs = (annexJobs <$> Annex.getGitConfig) >>= \case
NonConcurrent -> noConcurrencyConfig
Concurrent n -> go n
ConcurrentPerCpu -> go =<< liftIO getNumProcessors
where
go n = do
c <- liftIO getNumCapabilities
when (n > c) $
liftIO $ setNumCapabilities n
setConcurrency (ConcurrencyGitConfig (Concurrent n))
mkConcurrencyConfig n
forMC :: ConcurrencyConfig -> [a] -> (a -> Annex b) -> Annex [b]
forMC _ (x:[]) a = do
r <- a x
return [r]
forMC (ConcurrencyConfig n msem) xs a
| n < 2 = forM xs a
| otherwise = do
runners <- forM xs $ \x ->
forkState $ bracketIO
(MSem.wait msem)
(const $ MSem.signal msem)
(const $ a x)
mapM id =<< liftIO (forConcurrently runners id)