use a record to reduce the huge number of parameters
This commit is contained in:
parent
7b56fe1350
commit
6ef6ad808f
3 changed files with 86 additions and 75 deletions
|
@ -68,10 +68,18 @@ proxyCluster clusteruuid proxydone servermode clientside protoerrhandler = do
|
|||
protocolversion bypassuuids
|
||||
concurrencyconfig <- getConcurrencyConfig
|
||||
proxystate <- liftIO mkProxyState
|
||||
proxy proxydone proxymethods proxystate servermode clientside
|
||||
(fromClusterUUID clusteruuid)
|
||||
selectnode concurrencyconfig protocolversion
|
||||
othermsg (protoerrhandler closenodes)
|
||||
let proxyparams = ProxyParams
|
||||
{ proxyMethods = proxymethods
|
||||
, proxyState = proxystate
|
||||
, proxyServerMode = servermode
|
||||
, proxyClientSide = clientside
|
||||
, proxyUUID = fromClusterUUID clusteruuid
|
||||
, proxySelector = selectnode
|
||||
, proxyConcurrencyConfig = concurrencyconfig
|
||||
, proxyProtocolVersion = protocolversion
|
||||
}
|
||||
proxy proxydone proxyparams othermsg
|
||||
(protoerrhandler closenodes)
|
||||
|
||||
clusterProxySelector :: ClusterUUID -> ProtocolVersion -> Bypass -> Annex (ProxySelector, Annex ())
|
||||
clusterProxySelector clusteruuid protocolversion (Bypass bypass) = do
|
||||
|
|
|
@ -61,7 +61,7 @@ performLocal theiruuid servermode = do
|
|||
|
||||
performProxy :: UUID -> P2P.ServerMode -> Remote -> CommandPerform
|
||||
performProxy clientuuid servermode r = do
|
||||
clientside <- proxyClientSide clientuuid
|
||||
clientside <- mkProxyClientSide clientuuid
|
||||
getClientProtocolVersion (Remote.uuid r) clientside
|
||||
(withclientversion clientside)
|
||||
(p2pErrHandler noop)
|
||||
|
@ -77,13 +77,18 @@ performProxy clientuuid servermode r = do
|
|||
p2pDone
|
||||
let errhandler = p2pErrHandler (closeRemoteSide remoteside)
|
||||
proxystate <- liftIO mkProxyState
|
||||
let runproxy othermsg' = proxy closer
|
||||
proxymethods proxystate
|
||||
servermode clientside
|
||||
(Remote.uuid r)
|
||||
(singleProxySelector remoteside)
|
||||
concurrencyconfig
|
||||
protocolversion othermsg' errhandler
|
||||
let proxyparams = ProxyParams
|
||||
{ proxyMethods = proxymethods
|
||||
, proxyState = proxystate
|
||||
, proxyServerMode = servermode
|
||||
, proxyClientSide = clientside
|
||||
, proxyUUID = Remote.uuid r
|
||||
, proxySelector = singleProxySelector remoteside
|
||||
, proxyConcurrencyConfig = concurrencyconfig
|
||||
, proxyProtocolVersion = protocolversion
|
||||
}
|
||||
let runproxy othermsg' = proxy closer proxyparams
|
||||
othermsg' errhandler
|
||||
sendClientProtocolVersion clientside othermsg protocolversion
|
||||
runproxy errhandler
|
||||
withclientversion _ Nothing = p2pDone
|
||||
|
@ -95,11 +100,11 @@ performProxy clientuuid servermode r = do
|
|||
|
||||
performProxyCluster :: UUID -> ClusterUUID -> P2P.ServerMode -> CommandPerform
|
||||
performProxyCluster clientuuid clusteruuid servermode = do
|
||||
clientside <- proxyClientSide clientuuid
|
||||
clientside <- mkProxyClientSide clientuuid
|
||||
proxyCluster clusteruuid p2pDone servermode clientside p2pErrHandler
|
||||
|
||||
proxyClientSide :: UUID -> Annex ClientSide
|
||||
proxyClientSide clientuuid = do
|
||||
mkProxyClientSide :: UUID -> Annex ClientSide
|
||||
mkProxyClientSide clientuuid = do
|
||||
clientrunst <- liftIO (mkRunState $ Serving clientuuid Nothing)
|
||||
ClientSide clientrunst <$> liftIO (stdioP2PConnectionDupped Nothing)
|
||||
|
||||
|
|
118
P2P/Proxy.hs
118
P2P/Proxy.hs
|
@ -200,91 +200,87 @@ mkProxyState = ProxyState
|
|||
<$> newTVarIO mempty
|
||||
<*> newTVarIO Nothing
|
||||
|
||||
data ProxyParams = ProxyParams
|
||||
{ proxyMethods :: ProxyMethods
|
||||
, proxyState :: ProxyState
|
||||
, proxyServerMode :: ServerMode
|
||||
, proxyClientSide :: ClientSide
|
||||
, proxyUUID :: UUID
|
||||
, proxySelector :: ProxySelector
|
||||
, proxyConcurrencyConfig :: ConcurrencyConfig
|
||||
, proxyProtocolVersion :: ProtocolVersion
|
||||
-- ^ Protocol version being spoken between the proxy and the
|
||||
-- client. When there are multiple remotes, some may speak an
|
||||
-- earlier version.
|
||||
}
|
||||
|
||||
{- Proxy between the client and the remote. This picks up after
|
||||
- sendClientProtocolVersion.
|
||||
-}
|
||||
proxy
|
||||
:: Annex r
|
||||
-> ProxyMethods
|
||||
-> ProxyState
|
||||
-> ServerMode
|
||||
-> ClientSide
|
||||
-> UUID
|
||||
-> ProxySelector
|
||||
-> ConcurrencyConfig
|
||||
-> ProtocolVersion
|
||||
-- ^ Protocol version being spoken between the proxy and the
|
||||
-- client. When there are multiple remotes, some may speak an
|
||||
-- earlier version.
|
||||
-> ProxyParams
|
||||
-> Maybe Message
|
||||
-- ^ non-VERSION message that was received from the client when
|
||||
-- negotiating protocol version, and has not been responded to yet
|
||||
-> ProtoErrorHandled r
|
||||
proxy proxydone proxymethods proxystate servermode (ClientSide clientrunst clientconn) remoteuuid proxyselector concurrencyconfig protocolversion othermsg protoerrhandler = do
|
||||
proxy proxydone proxyparams othermsg protoerrhandler = do
|
||||
case othermsg of
|
||||
Nothing -> proxynextclientmessage ()
|
||||
Just message -> proxyclientmessage (Just message)
|
||||
where
|
||||
client = liftIO . runNetProto clientrunst clientconn
|
||||
|
||||
proxyclientmessage Nothing = proxydone
|
||||
proxyclientmessage (Just message) = proxyRequest
|
||||
proxydone proxymethods proxystate servermode
|
||||
(ClientSide clientrunst clientconn) remoteuuid
|
||||
proxyselector concurrencyconfig protocolversion
|
||||
proxynextclientmessage message
|
||||
protoerrhandler
|
||||
proxydone proxyparams proxynextclientmessage
|
||||
message protoerrhandler
|
||||
|
||||
proxynextclientmessage () = protoerrhandler proxyclientmessage $
|
||||
client (net receiveMessage)
|
||||
|
||||
client = liftIO . runNetProto clientrunst clientconn
|
||||
|
||||
ClientSide clientrunst clientconn = proxyClientSide proxyparams
|
||||
|
||||
{- Handles proxying a single request between the client and remote. -}
|
||||
proxyRequest
|
||||
:: Annex r
|
||||
-> ProxyMethods
|
||||
-> ProxyState
|
||||
-> ServerMode
|
||||
-> ClientSide
|
||||
-> UUID
|
||||
-> ProxySelector
|
||||
-> ConcurrencyConfig
|
||||
-> ProtocolVersion
|
||||
-> ProxyParams
|
||||
-> (() -> Annex r) -- ^ called once the request has been handled
|
||||
-> Message
|
||||
-> ProtoErrorHandled r
|
||||
proxyRequest proxydone proxymethods proxystate servermode (ClientSide clientrunst clientconn) remoteuuid proxyselector concurrencyconfig (ProtocolVersion protocolversion) requestcomplete requestmessage protoerrhandler =
|
||||
proxyRequest proxydone proxyparams requestcomplete requestmessage protoerrhandler =
|
||||
case requestmessage of
|
||||
CHECKPRESENT k -> proxyCHECKPRESENT proxyselector k >>= \case
|
||||
CHECKPRESENT k -> proxyCHECKPRESENT (proxySelector proxyparams) k >>= \case
|
||||
Just remoteside ->
|
||||
proxyresponse remoteside requestmessage
|
||||
(const requestcomplete)
|
||||
Nothing ->
|
||||
protoerrhandler requestcomplete $
|
||||
client $ net $ sendMessage FAILURE
|
||||
LOCKCONTENT k -> proxyLOCKCONTENT proxyselector k >>= \case
|
||||
LOCKCONTENT k -> proxyLOCKCONTENT (proxySelector proxyparams) k >>= \case
|
||||
Just remoteside ->
|
||||
proxyresponse remoteside requestmessage
|
||||
(const requestcomplete)
|
||||
Nothing ->
|
||||
protoerrhandler requestcomplete $
|
||||
client $ net $ sendMessage FAILURE
|
||||
UNLOCKCONTENT -> proxyUNLOCKCONTENT proxyselector >>= \case
|
||||
UNLOCKCONTENT -> proxyUNLOCKCONTENT (proxySelector proxyparams) >>= \case
|
||||
Just remoteside ->
|
||||
proxynoresponse remoteside requestmessage
|
||||
requestcomplete
|
||||
Nothing -> requestcomplete ()
|
||||
REMOVE k -> do
|
||||
remotesides <- proxyREMOVE proxyselector k
|
||||
remotesides <- proxyREMOVE (proxySelector proxyparams) k
|
||||
servermodechecker checkREMOVEServerMode $
|
||||
handleREMOVE remotesides k requestmessage
|
||||
REMOVE_BEFORE _ k -> do
|
||||
remotesides <- proxyREMOVE proxyselector k
|
||||
remotesides <- proxyREMOVE (proxySelector proxyparams) k
|
||||
servermodechecker checkREMOVEServerMode $
|
||||
handleREMOVE remotesides k requestmessage
|
||||
GETTIMESTAMP -> do
|
||||
remotesides <- proxyGETTIMESTAMP proxyselector
|
||||
remotesides <- proxyGETTIMESTAMP (proxySelector proxyparams)
|
||||
handleGETTIMESTAMP remotesides
|
||||
GET _ _ k -> proxyGET proxyselector k >>= \case
|
||||
GET _ _ k -> proxyGET (proxySelector proxyparams) k >>= \case
|
||||
Just remoteside -> handleGET remoteside requestmessage
|
||||
Nothing ->
|
||||
protoerrhandler requestcomplete $
|
||||
|
@ -292,7 +288,7 @@ proxyRequest proxydone proxymethods proxystate servermode (ClientSide clientruns
|
|||
ERROR "content not present"
|
||||
PUT paf k -> do
|
||||
af <- getassociatedfile paf
|
||||
remotesides <- proxyPUT proxyselector af k
|
||||
remotesides <- proxyPUT (proxySelector proxyparams) af k
|
||||
servermodechecker checkPUTServerMode $
|
||||
handlePUT remotesides k requestmessage
|
||||
BYPASS _ -> requestcomplete ()
|
||||
|
@ -330,8 +326,10 @@ proxyRequest proxydone proxymethods proxystate servermode (ClientSide clientruns
|
|||
VERSION _ -> protoerr
|
||||
where
|
||||
client = liftIO . runNetProto clientrunst clientconn
|
||||
|
||||
ClientSide clientrunst clientconn = proxyClientSide proxyparams
|
||||
|
||||
servermodechecker c a = c servermode $ \case
|
||||
servermodechecker c a = c (proxyServerMode proxyparams) $ \case
|
||||
Nothing -> a
|
||||
Just notallowed ->
|
||||
protoerrhandler requestcomplete $
|
||||
|
@ -377,9 +375,9 @@ proxyRequest proxydone proxymethods proxystate servermode (ClientSide clientruns
|
|||
-- to avoid needing timestamp translation.
|
||||
handleGETTIMESTAMP (remoteside:[]) = do
|
||||
liftIO $ atomically $ do
|
||||
writeTVar (proxyRemoteLatestTimestamps proxystate)
|
||||
writeTVar (proxyRemoteLatestTimestamps (proxyState proxyparams))
|
||||
mempty
|
||||
writeTVar (proxyRemoteLatestLocalTimestamp proxystate)
|
||||
writeTVar (proxyRemoteLatestLocalTimestamp (proxyState proxyparams))
|
||||
Nothing
|
||||
proxyresponse remoteside GETTIMESTAMP
|
||||
(const requestcomplete)
|
||||
|
@ -394,14 +392,14 @@ proxyRequest proxydone proxymethods proxystate servermode (ClientSide clientruns
|
|||
remotetimes <- (M.fromList . mapMaybe join) <$> getremotetimes
|
||||
localtime <- liftIO currentMonotonicTimestamp
|
||||
liftIO $ atomically $ do
|
||||
writeTVar (proxyRemoteLatestTimestamps proxystate)
|
||||
writeTVar (proxyRemoteLatestTimestamps (proxyState proxyparams))
|
||||
remotetimes
|
||||
writeTVar (proxyRemoteLatestLocalTimestamp proxystate)
|
||||
writeTVar (proxyRemoteLatestLocalTimestamp (proxyState proxyparams))
|
||||
(Just localtime)
|
||||
protoerrhandler requestcomplete $
|
||||
client $ net $ sendMessage (TIMESTAMP localtime)
|
||||
where
|
||||
getremotetimes = forMC concurrencyconfig remotesides $ \r ->
|
||||
getremotetimes = forMC (proxyConcurrencyConfig proxyparams) remotesides $ \r ->
|
||||
runRemoteSideOrSkipFailed r $ do
|
||||
net $ sendMessage GETTIMESTAMP
|
||||
net receiveMessage >>= return . \case
|
||||
|
@ -422,10 +420,10 @@ proxyRequest proxydone proxymethods proxystate servermode (ClientSide clientruns
|
|||
client $ net $ sendMessage FAILURE
|
||||
handleREMOVE remotesides k message = do
|
||||
tsm <- liftIO $ readTVarIO $
|
||||
proxyRemoteLatestTimestamps proxystate
|
||||
proxyRemoteLatestTimestamps (proxyState proxyparams)
|
||||
oldlocaltime <- liftIO $ readTVarIO $
|
||||
proxyRemoteLatestLocalTimestamp proxystate
|
||||
v <- forMC concurrencyconfig remotesides $ \r ->
|
||||
proxyRemoteLatestLocalTimestamp (proxyState proxyparams)
|
||||
v <- forMC (proxyConcurrencyConfig proxyparams) remotesides $ \r ->
|
||||
runRemoteSideOrSkipFailed r $ do
|
||||
case message of
|
||||
REMOVE_BEFORE ts _ -> do
|
||||
|
@ -450,11 +448,11 @@ proxyRequest proxydone proxymethods proxystate servermode (ClientSide clientruns
|
|||
_ -> Nothing
|
||||
let v' = map join v
|
||||
let us = concatMap snd $ catMaybes v'
|
||||
mapM_ (\u -> removedContent proxymethods u k) us
|
||||
mapM_ (\u -> removedContent (proxyMethods proxyparams) u k) us
|
||||
protoerrhandler requestcomplete $
|
||||
client $ net $ sendMessage $
|
||||
let nonplussed = all (== remoteuuid) us
|
||||
|| protocolversion < 2
|
||||
let nonplussed = all (== proxyUUID proxyparams) us
|
||||
|| proxyProtocolVersion proxyparams < ProtocolVersion 2
|
||||
in if all (maybe False (fst . fst)) v'
|
||||
then if nonplussed
|
||||
then SUCCESS
|
||||
|
@ -472,7 +470,7 @@ proxyRequest proxydone proxymethods proxystate servermode (ClientSide clientruns
|
|||
_ -> protoerr
|
||||
|
||||
handlePUT (remoteside:[]) k message
|
||||
| Remote.uuid (remote remoteside) == remoteuuid =
|
||||
| Remote.uuid (remote remoteside) == proxyUUID proxyparams =
|
||||
getresponse (runRemoteSide remoteside) message $ \resp -> case resp of
|
||||
ALREADY_HAVE -> protoerrhandler requestcomplete $
|
||||
client $ net $ sendMessage resp
|
||||
|
@ -509,12 +507,12 @@ proxyRequest proxydone proxymethods proxystate servermode (ClientSide clientruns
|
|||
requestcomplete ()
|
||||
|
||||
relayPUTRecord k remoteside SUCCESS = do
|
||||
addedContent proxymethods (Remote.uuid (remote remoteside)) k
|
||||
addedContent (proxyMethods proxyparams) (Remote.uuid (remote remoteside)) k
|
||||
return $ Just [Remote.uuid (remote remoteside)]
|
||||
relayPUTRecord k remoteside (SUCCESS_PLUS us) = do
|
||||
let us' = (Remote.uuid (remote remoteside)) : us
|
||||
forM_ us' $ \u ->
|
||||
addedContent proxymethods u k
|
||||
addedContent (proxyMethods proxyparams) u k
|
||||
return $ Just us'
|
||||
relayPUTRecord _ _ _ =
|
||||
return Nothing
|
||||
|
@ -536,14 +534,14 @@ proxyRequest proxydone proxymethods proxystate servermode (ClientSide clientruns
|
|||
let alreadyhave = \case
|
||||
Right (Left _) -> True
|
||||
_ -> False
|
||||
l <- forMC concurrencyconfig remotesides initiate
|
||||
l <- forMC (proxyConcurrencyConfig proxyparams) remotesides initiate
|
||||
if all alreadyhave l
|
||||
then if protocolversion < 2
|
||||
then if proxyProtocolVersion proxyparams < ProtocolVersion 2
|
||||
then protoerrhandler requestcomplete $
|
||||
client $ net $ sendMessage ALREADY_HAVE
|
||||
else protoerrhandler requestcomplete $
|
||||
client $ net $ sendMessage $ ALREADY_HAVE_PLUS $
|
||||
filter (/= remoteuuid) $
|
||||
filter (/= proxyUUID proxyparams) $
|
||||
map (Remote.uuid . remote) (lefts (rights l))
|
||||
else if null (rights l)
|
||||
-- no response from any remote
|
||||
|
@ -559,7 +557,7 @@ proxyRequest proxydone proxymethods proxystate servermode (ClientSide clientruns
|
|||
let totallen = datalen + minoffset
|
||||
-- Tell each remote how much data to expect, depending
|
||||
-- on the remote's offset.
|
||||
rs <- forMC concurrencyconfig remotes $ \r@(remoteside, remoteoffset) ->
|
||||
rs <- forMC (proxyConcurrencyConfig proxyparams) remotes $ \r@(remoteside, remoteoffset) ->
|
||||
runRemoteSideOrSkipFailed remoteside $ do
|
||||
net $ sendMessage $ DATA $ Len $
|
||||
totallen - remoteoffset
|
||||
|
@ -576,7 +574,7 @@ proxyRequest proxydone proxymethods proxystate servermode (ClientSide clientruns
|
|||
let (chunk, b') = L.splitAt chunksize b
|
||||
let chunklen = fromIntegral (L.length chunk)
|
||||
let !n' = n + chunklen
|
||||
rs' <- forMC concurrencyconfig rs $ \r@(remoteside, remoteoffset) ->
|
||||
rs' <- forMC (proxyConcurrencyConfig proxyparams) rs $ \r@(remoteside, remoteoffset) ->
|
||||
if n >= remoteoffset
|
||||
then runRemoteSideOrSkipFailed remoteside $ do
|
||||
net $ sendBytes (Len chunklen) chunk nullMeterUpdate
|
||||
|
@ -617,13 +615,13 @@ proxyRequest proxydone proxymethods proxystate servermode (ClientSide clientruns
|
|||
y $ net $ sendBytes len b nullMeterUpdate
|
||||
|
||||
relayDATAFinish x y sendsuccessfailure ()
|
||||
| protocolversion == 0 = sendsuccessfailure
|
||||
| proxyProtocolVersion proxyparams == ProtocolVersion 0 = sendsuccessfailure
|
||||
-- Protocol version 1 has a VALID or
|
||||
-- INVALID message after the data.
|
||||
| otherwise = relayonemessage x y (\_ () -> sendsuccessfailure)
|
||||
|
||||
relayDATAFinishMulti k rs
|
||||
| protocolversion == 0 =
|
||||
| proxyProtocolVersion proxyparams == ProtocolVersion 0 =
|
||||
finish $ net receiveMessage
|
||||
| otherwise =
|
||||
flip protoerrhandler (client $ net $ receiveMessage) $
|
||||
|
@ -638,7 +636,7 @@ proxyRequest proxydone proxymethods proxystate servermode (ClientSide clientruns
|
|||
net receiveMessage
|
||||
where
|
||||
finish a = do
|
||||
storeduuids <- forMC concurrencyconfig rs $ \r ->
|
||||
storeduuids <- forMC (proxyConcurrencyConfig proxyparams) rs $ \r ->
|
||||
runRemoteSideOrSkipFailed r a >>= \case
|
||||
Just (Just resp) ->
|
||||
relayPUTRecord k r resp
|
||||
|
@ -648,7 +646,7 @@ proxyRequest proxydone proxymethods proxystate servermode (ClientSide clientruns
|
|||
case concat (catMaybes storeduuids) of
|
||||
[] -> FAILURE
|
||||
us
|
||||
| protocolversion < 2 -> SUCCESS
|
||||
| proxyProtocolVersion proxyparams < ProtocolVersion 2 -> SUCCESS
|
||||
| otherwise -> SUCCESS_PLUS us
|
||||
|
||||
-- The associated file received from the P2P protocol
|
||||
|
|
Loading…
Reference in a new issue