2024-06-10 22:01:36 +00:00
|
|
|
{- P2P protocol proxying
|
|
|
|
-
|
|
|
|
- Copyright 2024 Joey Hess <id@joeyh.name>
|
|
|
|
-
|
|
|
|
- Licensed under the GNU AGPL version 3 or higher.
|
|
|
|
-}
|
|
|
|
|
2024-06-11 20:56:52 +00:00
|
|
|
{-# LANGUAGE RankNTypes, FlexibleContexts, ScopedTypeVariables #-}
|
2024-06-18 16:07:01 +00:00
|
|
|
{-# LANGUAGE BangPatterns #-}
|
2024-06-10 22:01:36 +00:00
|
|
|
|
|
|
|
module P2P.Proxy where
|
|
|
|
|
|
|
|
import Annex.Common
|
|
|
|
import P2P.Protocol
|
|
|
|
import P2P.IO
|
2024-06-18 16:07:01 +00:00
|
|
|
import Utility.Metered
|
2024-06-10 22:01:36 +00:00
|
|
|
|
2024-06-18 16:07:01 +00:00
|
|
|
import Data.Either
|
2024-06-17 18:14:08 +00:00
|
|
|
import Control.Concurrent.STM
|
2024-06-18 16:07:01 +00:00
|
|
|
import qualified Data.ByteString.Lazy as L
|
2024-06-17 16:44:08 +00:00
|
|
|
|
2024-06-17 18:14:08 +00:00
|
|
|
type ProtoCloser = Annex ()
|
|
|
|
|
|
|
|
data ClientSide = ClientSide RunState P2PConnection
|
|
|
|
|
|
|
|
data RemoteSide = RemoteSide
|
|
|
|
{ remoteUUID :: UUID
|
|
|
|
, remoteConnect :: Annex (Maybe (RunState, P2PConnection, ProtoCloser))
|
|
|
|
, remoteTMVar :: TMVar (RunState, P2PConnection, ProtoCloser)
|
|
|
|
}
|
|
|
|
|
|
|
|
mkRemoteSide :: UUID -> Annex (Maybe (RunState, P2PConnection, ProtoCloser)) -> Annex RemoteSide
|
|
|
|
mkRemoteSide remoteuuid remoteconnect = RemoteSide
|
|
|
|
<$> pure remoteuuid
|
|
|
|
<*> pure remoteconnect
|
|
|
|
<*> liftIO (atomically newEmptyTMVar)
|
2024-06-12 15:37:14 +00:00
|
|
|
|
2024-06-17 19:51:10 +00:00
|
|
|
runRemoteSide :: RemoteSide -> Proto a -> Annex (Either ProtoFailure a)
|
|
|
|
runRemoteSide remoteside a =
|
|
|
|
liftIO (atomically $ tryReadTMVar $ remoteTMVar remoteside) >>= \case
|
|
|
|
Just (runst, conn, _closer) -> liftIO $ runNetProto runst conn a
|
|
|
|
Nothing -> remoteConnect remoteside >>= \case
|
|
|
|
Just (runst, conn, closer) -> do
|
|
|
|
liftIO $ atomically $ putTMVar
|
|
|
|
(remoteTMVar remoteside)
|
|
|
|
(runst, conn, closer)
|
|
|
|
liftIO $ runNetProto runst conn a
|
|
|
|
Nothing -> giveup "Unable to connect to remote."
|
|
|
|
|
|
|
|
closeRemoteSide :: RemoteSide -> Annex ()
|
|
|
|
closeRemoteSide remoteside =
|
|
|
|
liftIO (atomically $ tryReadTMVar $ remoteTMVar remoteside) >>= \case
|
|
|
|
Just (_, _, closer) -> closer
|
|
|
|
Nothing -> return ()
|
|
|
|
|
2024-06-17 23:19:15 +00:00
|
|
|
{- Selects what remotes to proxy to for top-level P2P protocol
|
|
|
|
- actions.
|
|
|
|
- -}
|
|
|
|
data ProxySelector = ProxySelector
|
checkpresent support for clusters
This assumes that the proxy for a cluster has up-to-date location
logs. If it didn't, it might proxy the checkpresent to a node that no
longer has the content, while some other node still does, and so
it would incorrectly appear that the cluster no longer contains the
content.
Since cluster UUIDs are not stored to location logs,
git-annex fsck --fast when claiming to fix a location log when
that occurred would not cause any problems. And presumably the location
tracking would later get sorted out.
At least usually, changes to the content of nodes goes via the proxy,
and it will update its location logs, so they will be accurate. However,
if there were multiple proxies to the same cluster, or nodes were
accessed directly (or via proxy to the node and not the cluster),
the proxy's location log could certainly be wrong.
(The location log access for GET has the same issues.)
2024-06-18 15:10:48 +00:00
|
|
|
{ proxyCHECKPRESENT :: Key -> Annex (Maybe RemoteSide)
|
2024-06-18 15:01:10 +00:00
|
|
|
, proxyLOCKCONTENT :: Key -> Annex (Maybe RemoteSide)
|
|
|
|
, proxyUNLOCKCONTENT :: Annex (Maybe RemoteSide)
|
2024-06-23 13:28:18 +00:00
|
|
|
, proxyREMOVE :: Key -> Annex [RemoteSide]
|
|
|
|
-- ^ remove from all of these remotes
|
2024-06-18 15:01:10 +00:00
|
|
|
, proxyGET :: Key -> Annex (Maybe RemoteSide)
|
2024-06-18 16:07:01 +00:00
|
|
|
, proxyPUT :: Key -> Annex [RemoteSide]
|
2024-06-23 13:28:18 +00:00
|
|
|
-- ^ put to some/all of these remotes
|
2024-06-17 23:19:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
singleProxySelector :: RemoteSide -> ProxySelector
|
|
|
|
singleProxySelector r = ProxySelector
|
checkpresent support for clusters
This assumes that the proxy for a cluster has up-to-date location
logs. If it didn't, it might proxy the checkpresent to a node that no
longer has the content, while some other node still does, and so
it would incorrectly appear that the cluster no longer contains the
content.
Since cluster UUIDs are not stored to location logs,
git-annex fsck --fast when claiming to fix a location log when
that occurred would not cause any problems. And presumably the location
tracking would later get sorted out.
At least usually, changes to the content of nodes goes via the proxy,
and it will update its location logs, so they will be accurate. However,
if there were multiple proxies to the same cluster, or nodes were
accessed directly (or via proxy to the node and not the cluster),
the proxy's location log could certainly be wrong.
(The location log access for GET has the same issues.)
2024-06-18 15:10:48 +00:00
|
|
|
{ proxyCHECKPRESENT = const (pure (Just r))
|
2024-06-18 15:01:10 +00:00
|
|
|
, proxyLOCKCONTENT = const (pure (Just r))
|
|
|
|
, proxyUNLOCKCONTENT = pure (Just r)
|
2024-06-23 13:28:18 +00:00
|
|
|
, proxyREMOVE = const (pure [r])
|
2024-06-18 15:01:10 +00:00
|
|
|
, proxyGET = const (pure (Just r))
|
2024-06-18 16:07:01 +00:00
|
|
|
, proxyPUT = const (pure [r])
|
2024-06-17 23:19:15 +00:00
|
|
|
}
|
|
|
|
|
2024-06-12 15:37:14 +00:00
|
|
|
{- To keep this module limited to P2P protocol actions,
|
|
|
|
- all other actions that a proxy needs to do are provided
|
|
|
|
- here. -}
|
|
|
|
data ProxyMethods = ProxyMethods
|
|
|
|
{ removedContent :: UUID -> Key -> Annex ()
|
|
|
|
-- ^ called when content is removed from a repository
|
|
|
|
, addedContent :: UUID -> Key -> Annex ()
|
|
|
|
-- ^ called when content is added to a repository
|
|
|
|
}
|
2024-06-10 22:01:36 +00:00
|
|
|
|
2024-06-11 16:05:44 +00:00
|
|
|
{- Type of function that takes a error handler, which is
|
2024-06-11 14:20:11 +00:00
|
|
|
- used to handle a ProtoFailure when receiving a message
|
2024-06-11 16:05:44 +00:00
|
|
|
- from the client or remote.
|
2024-06-11 14:20:11 +00:00
|
|
|
-}
|
2024-06-17 17:04:40 +00:00
|
|
|
type ProtoErrorHandled r =
|
|
|
|
(forall t. ((t -> Annex r) -> Annex (Either ProtoFailure t) -> Annex r)) -> Annex r
|
2024-06-11 14:20:11 +00:00
|
|
|
|
2024-06-11 16:05:44 +00:00
|
|
|
{- This is the first thing run when proxying with a client.
|
|
|
|
- The client has already authenticated. Most clients will send a
|
|
|
|
- VERSION message, although version 0 clients will not and will send
|
|
|
|
- some other message.
|
2024-06-10 22:01:36 +00:00
|
|
|
-
|
|
|
|
- But before the client will send VERSION, it needs to see AUTH_SUCCESS.
|
|
|
|
- So send that, although the connection with the remote is not actually
|
|
|
|
- brought up yet.
|
|
|
|
-}
|
|
|
|
getClientProtocolVersion
|
2024-06-17 19:00:11 +00:00
|
|
|
:: UUID
|
2024-06-10 22:01:36 +00:00
|
|
|
-> ClientSide
|
|
|
|
-> (Maybe (ProtocolVersion, Maybe Message) -> Annex r)
|
2024-06-17 17:04:40 +00:00
|
|
|
-> ProtoErrorHandled r
|
2024-06-17 19:00:11 +00:00
|
|
|
getClientProtocolVersion remoteuuid (ClientSide clientrunst clientconn) cont protoerrhandler =
|
|
|
|
protoerrhandler cont $ client $ getClientProtocolVersion' remoteuuid
|
2024-06-17 18:14:08 +00:00
|
|
|
where
|
|
|
|
client = liftIO . runNetProto clientrunst clientconn
|
2024-06-10 22:01:36 +00:00
|
|
|
|
2024-06-11 14:20:11 +00:00
|
|
|
getClientProtocolVersion'
|
2024-06-17 19:00:11 +00:00
|
|
|
:: UUID
|
2024-06-11 14:20:11 +00:00
|
|
|
-> Proto (Maybe (ProtocolVersion, Maybe Message))
|
2024-06-17 19:00:11 +00:00
|
|
|
getClientProtocolVersion' remoteuuid = do
|
|
|
|
net $ sendMessage (AUTH_SUCCESS remoteuuid)
|
2024-06-10 22:01:36 +00:00
|
|
|
msg <- net receiveMessage
|
|
|
|
case msg of
|
|
|
|
Nothing -> return Nothing
|
|
|
|
Just (VERSION v) ->
|
|
|
|
-- If the client sends a newer version than we
|
|
|
|
-- understand, reduce it; we need to parse the
|
|
|
|
-- protocol too.
|
2024-06-18 16:07:01 +00:00
|
|
|
let v' = min v maxProtocolVersion
|
2024-06-10 22:01:36 +00:00
|
|
|
in return (Just (v', Nothing))
|
|
|
|
Just othermsg -> return
|
|
|
|
(Just (defaultProtocolVersion, Just othermsg))
|
|
|
|
|
|
|
|
{- Proxy between the client and the remote. This picks up after
|
2024-06-17 18:14:08 +00:00
|
|
|
- getClientProtocolVersion.
|
2024-06-10 22:01:36 +00:00
|
|
|
-}
|
|
|
|
proxy
|
2024-06-11 14:20:11 +00:00
|
|
|
:: Annex r
|
2024-06-12 15:37:14 +00:00
|
|
|
-> ProxyMethods
|
2024-06-10 22:01:36 +00:00
|
|
|
-> ServerMode
|
|
|
|
-> ClientSide
|
2024-06-18 16:07:01 +00:00
|
|
|
-> UUID
|
2024-06-17 23:19:15 +00:00
|
|
|
-> ProxySelector
|
2024-06-17 19:51:10 +00:00
|
|
|
-> ProtocolVersion
|
2024-06-20 14:04:26 +00:00
|
|
|
-- ^ Protocol version being spoken between the proxy and the
|
|
|
|
-- client. When there are multiple remotes, some may speak an
|
|
|
|
-- earlier version.
|
2024-06-10 22:01:36 +00:00
|
|
|
-> Maybe Message
|
2024-06-11 14:20:11 +00:00
|
|
|
-- ^ non-VERSION message that was received from the client when
|
|
|
|
-- negotiating protocol version, and has not been responded to yet
|
2024-06-17 17:04:40 +00:00
|
|
|
-> ProtoErrorHandled r
|
2024-06-18 16:07:01 +00:00
|
|
|
proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remoteuuid proxyselector (ProtocolVersion protocolversion) othermessage protoerrhandler = do
|
2024-06-10 22:01:36 +00:00
|
|
|
case othermessage of
|
2024-06-17 19:51:10 +00:00
|
|
|
Nothing -> protoerrhandler proxynextclientmessage $
|
2024-06-18 16:07:01 +00:00
|
|
|
client $ net $ sendMessage $ VERSION $ ProtocolVersion protocolversion
|
2024-06-11 16:05:44 +00:00
|
|
|
Just message -> proxyclientmessage (Just message)
|
2024-06-10 22:01:36 +00:00
|
|
|
where
|
2024-06-17 18:14:08 +00:00
|
|
|
client = liftIO . runNetProto clientrunst clientconn
|
|
|
|
|
2024-06-11 16:05:44 +00:00
|
|
|
proxynextclientmessage () = protoerrhandler proxyclientmessage $
|
2024-06-11 19:01:14 +00:00
|
|
|
client (net receiveMessage)
|
2024-06-10 22:01:36 +00:00
|
|
|
|
2024-06-11 16:05:44 +00:00
|
|
|
servermodechecker c a = c servermode $ \case
|
|
|
|
Nothing -> a
|
|
|
|
Just notallowed ->
|
|
|
|
protoerrhandler proxynextclientmessage $
|
2024-06-11 19:01:14 +00:00
|
|
|
client notallowed
|
2024-06-11 16:05:44 +00:00
|
|
|
|
2024-06-17 19:51:10 +00:00
|
|
|
proxyclientmessage Nothing = proxydone
|
2024-06-11 16:05:44 +00:00
|
|
|
proxyclientmessage (Just message) = case message of
|
checkpresent support for clusters
This assumes that the proxy for a cluster has up-to-date location
logs. If it didn't, it might proxy the checkpresent to a node that no
longer has the content, while some other node still does, and so
it would incorrectly appear that the cluster no longer contains the
content.
Since cluster UUIDs are not stored to location logs,
git-annex fsck --fast when claiming to fix a location log when
that occurred would not cause any problems. And presumably the location
tracking would later get sorted out.
At least usually, changes to the content of nodes goes via the proxy,
and it will update its location logs, so they will be accurate. However,
if there were multiple proxies to the same cluster, or nodes were
accessed directly (or via proxy to the node and not the cluster),
the proxy's location log could certainly be wrong.
(The location log access for GET has the same issues.)
2024-06-18 15:10:48 +00:00
|
|
|
CHECKPRESENT k -> proxyCHECKPRESENT proxyselector k >>= \case
|
|
|
|
Just remoteside ->
|
|
|
|
proxyresponse remoteside message
|
|
|
|
(const proxynextclientmessage)
|
|
|
|
Nothing ->
|
|
|
|
protoerrhandler proxynextclientmessage $
|
|
|
|
client $ net $ sendMessage FAILURE
|
2024-06-18 15:01:10 +00:00
|
|
|
LOCKCONTENT k -> proxyLOCKCONTENT proxyselector k >>= \case
|
|
|
|
Just remoteside ->
|
|
|
|
proxyresponse remoteside message
|
|
|
|
(const proxynextclientmessage)
|
|
|
|
Nothing ->
|
|
|
|
protoerrhandler proxynextclientmessage $
|
|
|
|
client $ net $ sendMessage FAILURE
|
|
|
|
UNLOCKCONTENT -> proxyUNLOCKCONTENT proxyselector >>= \case
|
|
|
|
Just remoteside ->
|
|
|
|
proxynoresponse remoteside message
|
|
|
|
proxynextclientmessage
|
|
|
|
Nothing -> proxynextclientmessage ()
|
2024-06-17 19:51:10 +00:00
|
|
|
REMOVE k -> do
|
2024-06-23 13:28:18 +00:00
|
|
|
remotesides <- proxyREMOVE proxyselector k
|
2024-06-11 16:05:44 +00:00
|
|
|
servermodechecker checkREMOVEServerMode $
|
2024-06-23 13:28:18 +00:00
|
|
|
handleREMOVE remotesides k message
|
2024-06-18 15:01:10 +00:00
|
|
|
GET _ _ k -> proxyGET proxyselector k >>= \case
|
|
|
|
Just remoteside -> handleGET remoteside message
|
|
|
|
Nothing ->
|
|
|
|
protoerrhandler proxynextclientmessage $
|
|
|
|
client $ net $ sendMessage $
|
|
|
|
ERROR "content not present"
|
2024-06-17 19:51:10 +00:00
|
|
|
PUT _ k -> do
|
2024-06-18 16:07:01 +00:00
|
|
|
remotesides <- proxyPUT proxyselector k
|
2024-06-11 16:05:44 +00:00
|
|
|
servermodechecker checkPUTServerMode $
|
2024-06-18 16:07:01 +00:00
|
|
|
handlePUT remotesides k message
|
2024-06-11 16:05:44 +00:00
|
|
|
-- These messages involve the git repository, not the
|
|
|
|
-- annex. So they affect the git repository of the proxy,
|
|
|
|
-- not the remote.
|
|
|
|
CONNECT service ->
|
|
|
|
servermodechecker (checkCONNECTServerMode service) $
|
2024-06-12 14:40:51 +00:00
|
|
|
-- P2P protocol does not continue after
|
|
|
|
-- relaying from git.
|
2024-06-17 19:51:10 +00:00
|
|
|
protoerrhandler (\() -> proxydone) $
|
2024-06-12 14:40:51 +00:00
|
|
|
client $ net $ relayService service
|
|
|
|
NOTIFYCHANGE -> protoerr
|
2024-06-11 16:05:44 +00:00
|
|
|
-- Messages that the client should only send after one of
|
|
|
|
-- the messages above.
|
|
|
|
SUCCESS -> protoerr
|
2024-06-18 16:07:01 +00:00
|
|
|
SUCCESS_PLUS _ -> protoerr
|
2024-06-11 16:05:44 +00:00
|
|
|
FAILURE -> protoerr
|
2024-06-23 13:28:18 +00:00
|
|
|
FAILURE_PLUS _ -> protoerr
|
2024-06-11 19:01:14 +00:00
|
|
|
DATA _ -> protoerr
|
|
|
|
VALIDITY _ -> protoerr
|
2024-06-11 16:05:44 +00:00
|
|
|
-- If the client errors out, give up.
|
|
|
|
ERROR msg -> giveup $ "client error: " ++ msg
|
|
|
|
-- Messages that only the server should send.
|
|
|
|
CONNECTDONE _ -> protoerr
|
|
|
|
CHANGED _ -> protoerr
|
|
|
|
AUTH_SUCCESS _ -> protoerr
|
|
|
|
AUTH_FAILURE -> protoerr
|
|
|
|
PUT_FROM _ -> protoerr
|
|
|
|
ALREADY_HAVE -> protoerr
|
2024-06-18 16:07:01 +00:00
|
|
|
ALREADY_HAVE_PLUS _ -> protoerr
|
2024-06-11 16:05:44 +00:00
|
|
|
-- Early messages that the client should not send now.
|
|
|
|
AUTH _ _ -> protoerr
|
|
|
|
VERSION _ -> protoerr
|
|
|
|
|
2024-06-11 20:56:52 +00:00
|
|
|
-- Send a message to the remote, send its response back to the
|
2024-06-12 15:37:14 +00:00
|
|
|
-- client, and pass it to the continuation.
|
2024-06-17 19:51:10 +00:00
|
|
|
proxyresponse remoteside message a =
|
|
|
|
getresponse (runRemoteSide remoteside) message $ \resp ->
|
|
|
|
protoerrhandler (a resp) $
|
|
|
|
client $ net $ sendMessage resp
|
2024-06-11 20:56:52 +00:00
|
|
|
|
|
|
|
-- Send a message to the remote, that it will not respond to.
|
2024-06-17 19:51:10 +00:00
|
|
|
proxynoresponse remoteside message a =
|
2024-06-12 15:37:14 +00:00
|
|
|
protoerrhandler a $
|
2024-06-17 19:51:10 +00:00
|
|
|
runRemoteSide remoteside $ net $ sendMessage message
|
2024-06-11 20:56:52 +00:00
|
|
|
|
|
|
|
-- Send a message to the endpoint and get back its response.
|
|
|
|
getresponse endpoint message handleresp =
|
|
|
|
protoerrhandler (withresp handleresp) $
|
|
|
|
endpoint $ net $ do
|
|
|
|
sendMessage message
|
|
|
|
receiveMessage
|
2024-06-11 19:01:14 +00:00
|
|
|
|
2024-06-11 20:56:52 +00:00
|
|
|
withresp a (Just resp) = a resp
|
|
|
|
-- Whichever of the remote or client the message was read from
|
|
|
|
-- hung up.
|
2024-06-17 19:51:10 +00:00
|
|
|
withresp _ Nothing = proxydone
|
2024-06-11 20:56:52 +00:00
|
|
|
|
|
|
|
-- Read a message from one party, send it to the other,
|
2024-06-12 15:37:14 +00:00
|
|
|
-- and then pass the message to the continuation.
|
2024-06-11 20:56:52 +00:00
|
|
|
relayonemessage from to cont =
|
|
|
|
flip protoerrhandler (from $ net $ receiveMessage) $
|
2024-06-12 15:37:14 +00:00
|
|
|
withresp $ \message ->
|
|
|
|
protoerrhandler (cont message) $
|
|
|
|
to $ net $ sendMessage message
|
2024-06-11 20:56:52 +00:00
|
|
|
|
2024-06-11 16:05:44 +00:00
|
|
|
protoerr = do
|
2024-06-11 19:01:14 +00:00
|
|
|
_ <- client $ net $ sendMessage (ERROR "protocol error")
|
2024-06-11 16:05:44 +00:00
|
|
|
giveup "protocol error"
|
2024-06-23 13:28:18 +00:00
|
|
|
|
|
|
|
handleREMOVE [] _ _ =
|
|
|
|
-- When no places are provided to remove from,
|
|
|
|
-- don't report a successful remote.
|
|
|
|
protoerrhandler proxynextclientmessage $
|
|
|
|
client $ net $ sendMessage FAILURE
|
|
|
|
handleREMOVE remotesides k message = do
|
|
|
|
v <- forM remotesides $ \r ->
|
|
|
|
runRemoteSideOrSkipFailed r $ do
|
|
|
|
net $ sendMessage message
|
|
|
|
net receiveMessage >>= return . \case
|
|
|
|
Just SUCCESS ->
|
|
|
|
Just (True, [remoteUUID r])
|
|
|
|
Just (SUCCESS_PLUS us) ->
|
|
|
|
Just (True, remoteUUID r:us)
|
|
|
|
Just FAILURE ->
|
|
|
|
Just (False, [])
|
|
|
|
Just (FAILURE_PLUS us) ->
|
|
|
|
Just (False, us)
|
|
|
|
_ -> Nothing
|
|
|
|
let v' = map join v
|
|
|
|
let us = concatMap snd $ catMaybes v'
|
|
|
|
mapM_ (\u -> removedContent proxymethods u k) us
|
|
|
|
protoerrhandler proxynextclientmessage $
|
|
|
|
client $ net $ sendMessage $
|
|
|
|
if all (maybe False fst) v'
|
|
|
|
then if null us || protocolversion < 2
|
|
|
|
then SUCCESS
|
|
|
|
else SUCCESS_PLUS us
|
|
|
|
else if null us || protocolversion < 2
|
|
|
|
then FAILURE
|
|
|
|
else FAILURE_PLUS us
|
2024-06-12 15:37:14 +00:00
|
|
|
|
2024-06-17 19:51:10 +00:00
|
|
|
handleGET remoteside message = getresponse (runRemoteSide remoteside) message $
|
|
|
|
withDATA (relayGET remoteside)
|
2024-06-11 21:15:52 +00:00
|
|
|
|
2024-06-23 13:53:33 +00:00
|
|
|
handlePUT (remoteside:[]) k message
|
|
|
|
| remoteUUID remoteside == remoteuuid =
|
|
|
|
getresponse (runRemoteSide remoteside) message $ \resp -> case resp of
|
|
|
|
ALREADY_HAVE -> protoerrhandler proxynextclientmessage $
|
|
|
|
client $ net $ sendMessage resp
|
|
|
|
ALREADY_HAVE_PLUS _ -> protoerrhandler proxynextclientmessage $
|
|
|
|
client $ net $ sendMessage resp
|
|
|
|
PUT_FROM _ ->
|
|
|
|
getresponse client resp $
|
|
|
|
withDATA (relayPUT remoteside k)
|
|
|
|
_ -> protoerr
|
2024-06-18 16:07:01 +00:00
|
|
|
handlePUT [] _ _ =
|
|
|
|
protoerrhandler proxynextclientmessage $
|
|
|
|
client $ net $ sendMessage ALREADY_HAVE
|
|
|
|
handlePUT remotesides k message =
|
|
|
|
handlePutMulti remotesides k message
|
2024-06-11 21:15:52 +00:00
|
|
|
|
|
|
|
withDATA a message@(DATA len) = a len message
|
|
|
|
withDATA _ _ = protoerr
|
2024-06-20 14:04:26 +00:00
|
|
|
|
2024-06-17 19:51:10 +00:00
|
|
|
relayGET remoteside len = relayDATAStart client $
|
|
|
|
relayDATACore len (runRemoteSide remoteside) client $
|
|
|
|
relayDATAFinish (runRemoteSide remoteside) client $
|
|
|
|
relayonemessage client (runRemoteSide remoteside) $
|
2024-06-12 15:37:14 +00:00
|
|
|
const proxynextclientmessage
|
2024-06-11 21:15:52 +00:00
|
|
|
|
2024-06-17 19:51:10 +00:00
|
|
|
relayPUT remoteside k len = relayDATAStart (runRemoteSide remoteside) $
|
|
|
|
relayDATACore len client (runRemoteSide remoteside) $
|
|
|
|
relayDATAFinish client (runRemoteSide remoteside) $
|
|
|
|
relayonemessage (runRemoteSide remoteside) client finished
|
2024-06-12 15:37:14 +00:00
|
|
|
where
|
|
|
|
finished resp () = do
|
2024-06-20 14:04:26 +00:00
|
|
|
void $ relayPUTRecord k remoteside resp
|
2024-06-12 15:37:14 +00:00
|
|
|
proxynextclientmessage ()
|
2024-06-11 21:15:52 +00:00
|
|
|
|
2024-06-20 14:04:26 +00:00
|
|
|
relayPUTRecord k remoteside SUCCESS = do
|
|
|
|
addedContent proxymethods (remoteUUID remoteside) k
|
|
|
|
return $ Just [remoteUUID remoteside]
|
|
|
|
relayPUTRecord k remoteside (SUCCESS_PLUS us) = do
|
|
|
|
let us' = remoteUUID remoteside : us
|
|
|
|
forM_ us' $ \u ->
|
|
|
|
addedContent proxymethods u k
|
|
|
|
return $ Just us'
|
|
|
|
relayPUTRecord _ _ _ =
|
|
|
|
return Nothing
|
2024-06-18 16:07:01 +00:00
|
|
|
|
|
|
|
handlePutMulti remotesides k message = do
|
|
|
|
let initiate remoteside = do
|
|
|
|
resp <- runRemoteSide remoteside $ net $ do
|
|
|
|
sendMessage message
|
|
|
|
receiveMessage
|
|
|
|
case resp of
|
|
|
|
Right (Just (PUT_FROM (Offset offset))) ->
|
|
|
|
return $ Right $
|
|
|
|
Right (remoteside, offset)
|
|
|
|
Right (Just ALREADY_HAVE) ->
|
|
|
|
return $ Right $ Left remoteside
|
|
|
|
Right (Just _) -> protoerr
|
|
|
|
Right Nothing -> return (Left ())
|
|
|
|
Left _err -> return (Left ())
|
|
|
|
let alreadyhave = \case
|
|
|
|
Right (Left _) -> True
|
|
|
|
_ -> False
|
|
|
|
l <- forM remotesides initiate
|
|
|
|
if all alreadyhave l
|
|
|
|
then if protocolversion < 2
|
|
|
|
then protoerrhandler proxynextclientmessage $
|
|
|
|
client $ net $ sendMessage ALREADY_HAVE
|
|
|
|
else protoerrhandler proxynextclientmessage $
|
|
|
|
client $ net $ sendMessage $ ALREADY_HAVE_PLUS $
|
|
|
|
filter (/= remoteuuid) $
|
|
|
|
map remoteUUID (lefts (rights l))
|
|
|
|
else if null (rights l)
|
|
|
|
-- no response from any remote
|
|
|
|
then proxydone
|
|
|
|
else do
|
|
|
|
let l' = rights (rights l)
|
|
|
|
let minoffset = minimum (map snd l')
|
|
|
|
getresponse client (PUT_FROM (Offset minoffset)) $
|
|
|
|
withDATA (relayPUTMulti minoffset l' k)
|
2024-06-20 14:04:26 +00:00
|
|
|
|
2024-06-18 16:07:01 +00:00
|
|
|
relayPUTMulti minoffset remotes k (Len datalen) _ = do
|
|
|
|
let totallen = datalen + minoffset
|
|
|
|
-- Tell each remote how much data to expect, depending
|
|
|
|
-- on the remote's offset.
|
|
|
|
forM_ remotes $ \(remoteside, remoteoffset) ->
|
|
|
|
runRemoteSide remoteside $
|
|
|
|
net $ sendMessage $ DATA $ Len $
|
|
|
|
totallen - remoteoffset
|
|
|
|
protoerrhandler (send remotes minoffset) $
|
|
|
|
client $ net $ receiveBytes (Len datalen) nullMeterUpdate
|
|
|
|
where
|
|
|
|
chunksize = fromIntegral defaultChunkSize
|
|
|
|
|
|
|
|
-- Stream the lazy bytestring out to the remotes in chunks.
|
|
|
|
-- Only start sending to a remote once past its desired
|
|
|
|
-- offset.
|
|
|
|
send rs n b = do
|
|
|
|
let (chunk, b') = L.splitAt chunksize b
|
|
|
|
let chunklen = fromIntegral (L.length chunk)
|
|
|
|
let !n' = n + chunklen
|
|
|
|
rs' <- forM rs $ \r@(remoteside, remoteoffset) ->
|
|
|
|
if n >= remoteoffset
|
2024-06-20 14:04:26 +00:00
|
|
|
then runRemoteSideOrSkipFailed remoteside $ do
|
2024-06-18 16:07:01 +00:00
|
|
|
net $ sendBytes (Len chunklen) chunk nullMeterUpdate
|
2024-06-20 14:04:26 +00:00
|
|
|
return r
|
2024-06-18 16:07:01 +00:00
|
|
|
else if (n' <= remoteoffset)
|
|
|
|
then do
|
|
|
|
let chunkoffset = remoteoffset - n
|
|
|
|
let subchunklen = chunklen - chunkoffset
|
|
|
|
let subchunk = L.drop (fromIntegral chunkoffset) chunk
|
2024-06-20 14:04:26 +00:00
|
|
|
runRemoteSideOrSkipFailed remoteside $ do
|
2024-06-18 16:07:01 +00:00
|
|
|
net $ sendBytes (Len subchunklen) subchunk nullMeterUpdate
|
2024-06-20 14:04:26 +00:00
|
|
|
return r
|
2024-06-18 16:07:01 +00:00
|
|
|
else return (Just r)
|
|
|
|
if L.null b'
|
|
|
|
then sent (catMaybes rs')
|
|
|
|
else send (catMaybes rs') n' b'
|
|
|
|
|
|
|
|
sent [] = proxydone
|
2024-06-20 14:04:26 +00:00
|
|
|
sent rs = relayDATAFinishMulti k (map fst rs)
|
|
|
|
|
|
|
|
runRemoteSideOrSkipFailed remoteside a =
|
|
|
|
runRemoteSide remoteside a >>= \case
|
|
|
|
Right v -> return (Just v)
|
2024-06-18 16:07:01 +00:00
|
|
|
Left _ -> do
|
|
|
|
-- This connection to the remote is
|
|
|
|
-- unrecoverable at this point, so close it.
|
|
|
|
closeRemoteSide remoteside
|
|
|
|
return Nothing
|
2024-06-20 14:04:26 +00:00
|
|
|
|
|
|
|
relayDATAStart x receive message =
|
|
|
|
protoerrhandler (\() -> receive) $
|
|
|
|
x $ net $ sendMessage message
|
|
|
|
|
|
|
|
relayDATACore len x y a = protoerrhandler send $
|
|
|
|
x $ net $ receiveBytes len nullMeterUpdate
|
|
|
|
where
|
|
|
|
send b = protoerrhandler a $
|
|
|
|
y $ net $ sendBytes len b nullMeterUpdate
|
|
|
|
|
|
|
|
relayDATAFinish x y sendsuccessfailure ()
|
|
|
|
| protocolversion == 0 = sendsuccessfailure
|
|
|
|
-- Protocol version 1 has a VALID or
|
|
|
|
-- INVALID message after the data.
|
|
|
|
| otherwise = relayonemessage x y (\_ () -> sendsuccessfailure)
|
|
|
|
|
|
|
|
relayDATAFinishMulti k rs
|
|
|
|
| protocolversion == 0 =
|
|
|
|
finish $ net receiveMessage
|
|
|
|
| otherwise =
|
|
|
|
flip protoerrhandler (client $ net $ receiveMessage) $
|
|
|
|
withresp $ \message ->
|
|
|
|
finish $ do
|
|
|
|
-- Relay VALID or INVALID message
|
|
|
|
-- only to remotes that support
|
|
|
|
-- protocol version 1.
|
|
|
|
net getProtocolVersion >>= \case
|
|
|
|
ProtocolVersion 0 -> return ()
|
|
|
|
_ -> net $ sendMessage message
|
|
|
|
net receiveMessage
|
|
|
|
where
|
|
|
|
finish a = do
|
|
|
|
storeduuids <- forM rs $ \r ->
|
|
|
|
runRemoteSideOrSkipFailed r a >>= \case
|
|
|
|
Just (Just resp) ->
|
|
|
|
relayPUTRecord k r resp
|
|
|
|
_ -> return Nothing
|
|
|
|
protoerrhandler proxynextclientmessage $
|
|
|
|
client $ net $ sendMessage $
|
|
|
|
case concat (catMaybes storeduuids) of
|
|
|
|
[] -> FAILURE
|
|
|
|
us
|
|
|
|
| protocolversion < 2 -> SUCCESS
|
|
|
|
| otherwise -> SUCCESS_PLUS us
|
|
|
|
|