git-annex/P2P/Proxy.hs
Joey Hess f18740699e
P2P protocol version 2, adding SUCCESS-PLUS and ALREADY-HAVE-PLUS
Client side support for SUCCESS-PLUS and ALREADY-HAVE-PLUS
is complete, when a PUT stores to additional repositories
than the expected on, the location log is updated with the
additional UUIDs that contain the content.

Started implementing PUT fanout to multiple remotes for clusters.
It is untested, and I fear fencepost errors in the relative
offset calculations. And it is missing proxying for the protocol
after DATA.
2024-06-18 16:21:40 -04:00

411 lines
14 KiB
Haskell

{- P2P protocol proxying
-
- Copyright 2024 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
{-# LANGUAGE RankNTypes, FlexibleContexts, ScopedTypeVariables #-}
{-# LANGUAGE BangPatterns #-}
module P2P.Proxy where
import Annex.Common
import P2P.Protocol
import P2P.IO
import Utility.Metered
import Data.Either
import Control.Concurrent.STM
import qualified Data.ByteString.Lazy as L
type ProtoCloser = Annex ()
data ClientSide = ClientSide RunState P2PConnection
data RemoteSide = RemoteSide
{ remoteUUID :: UUID
, remoteConnect :: Annex (Maybe (RunState, P2PConnection, ProtoCloser))
, remoteTMVar :: TMVar (RunState, P2PConnection, ProtoCloser)
}
mkRemoteSide :: UUID -> Annex (Maybe (RunState, P2PConnection, ProtoCloser)) -> Annex RemoteSide
mkRemoteSide remoteuuid remoteconnect = RemoteSide
<$> pure remoteuuid
<*> pure remoteconnect
<*> liftIO (atomically newEmptyTMVar)
runRemoteSide :: RemoteSide -> Proto a -> Annex (Either ProtoFailure a)
runRemoteSide remoteside a =
liftIO (atomically $ tryReadTMVar $ remoteTMVar remoteside) >>= \case
Just (runst, conn, _closer) -> liftIO $ runNetProto runst conn a
Nothing -> remoteConnect remoteside >>= \case
Just (runst, conn, closer) -> do
liftIO $ atomically $ putTMVar
(remoteTMVar remoteside)
(runst, conn, closer)
liftIO $ runNetProto runst conn a
Nothing -> giveup "Unable to connect to remote."
closeRemoteSide :: RemoteSide -> Annex ()
closeRemoteSide remoteside =
liftIO (atomically $ tryReadTMVar $ remoteTMVar remoteside) >>= \case
Just (_, _, closer) -> closer
Nothing -> return ()
{- Selects what remotes to proxy to for top-level P2P protocol
- actions.
- -}
data ProxySelector = ProxySelector
{ proxyCHECKPRESENT :: Key -> Annex (Maybe RemoteSide)
, proxyLOCKCONTENT :: Key -> Annex (Maybe RemoteSide)
, proxyUNLOCKCONTENT :: Annex (Maybe RemoteSide)
, proxyREMOVE :: Key -> Annex RemoteSide
, proxyGET :: Key -> Annex (Maybe RemoteSide)
, proxyPUT :: Key -> Annex [RemoteSide]
}
singleProxySelector :: RemoteSide -> ProxySelector
singleProxySelector r = ProxySelector
{ proxyCHECKPRESENT = const (pure (Just r))
, proxyLOCKCONTENT = const (pure (Just r))
, proxyUNLOCKCONTENT = pure (Just r)
, proxyREMOVE = const (pure r)
, proxyGET = const (pure (Just r))
, proxyPUT = const (pure [r])
}
{- To keep this module limited to P2P protocol actions,
- all other actions that a proxy needs to do are provided
- here. -}
data ProxyMethods = ProxyMethods
{ removedContent :: UUID -> Key -> Annex ()
-- ^ called when content is removed from a repository
, addedContent :: UUID -> Key -> Annex ()
-- ^ called when content is added to a repository
}
{- Type of function that takes a error handler, which is
- used to handle a ProtoFailure when receiving a message
- from the client or remote.
-}
type ProtoErrorHandled r =
(forall t. ((t -> Annex r) -> Annex (Either ProtoFailure t) -> Annex r)) -> Annex r
{- This is the first thing run when proxying with a client.
- The client has already authenticated. Most clients will send a
- VERSION message, although version 0 clients will not and will send
- some other message.
-
- But before the client will send VERSION, it needs to see AUTH_SUCCESS.
- So send that, although the connection with the remote is not actually
- brought up yet.
-}
getClientProtocolVersion
:: UUID
-> ClientSide
-> (Maybe (ProtocolVersion, Maybe Message) -> Annex r)
-> ProtoErrorHandled r
getClientProtocolVersion remoteuuid (ClientSide clientrunst clientconn) cont protoerrhandler =
protoerrhandler cont $ client $ getClientProtocolVersion' remoteuuid
where
client = liftIO . runNetProto clientrunst clientconn
getClientProtocolVersion'
:: UUID
-> Proto (Maybe (ProtocolVersion, Maybe Message))
getClientProtocolVersion' remoteuuid = do
net $ sendMessage (AUTH_SUCCESS remoteuuid)
msg <- net receiveMessage
case msg of
Nothing -> return Nothing
Just (VERSION v) ->
-- If the client sends a newer version than we
-- understand, reduce it; we need to parse the
-- protocol too.
let v' = min v maxProtocolVersion
in return (Just (v', Nothing))
Just othermsg -> return
(Just (defaultProtocolVersion, Just othermsg))
{- Proxy between the client and the remote. This picks up after
- getClientProtocolVersion.
-}
proxy
:: Annex r
-> ProxyMethods
-> ServerMode
-> ClientSide
-> UUID
-> ProxySelector
-> ProtocolVersion
-> Maybe Message
-- ^ non-VERSION message that was received from the client when
-- negotiating protocol version, and has not been responded to yet
-> ProtoErrorHandled r
proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remoteuuid proxyselector (ProtocolVersion protocolversion) othermessage protoerrhandler = do
case othermessage of
Nothing -> protoerrhandler proxynextclientmessage $
client $ net $ sendMessage $ VERSION $ ProtocolVersion protocolversion
Just message -> proxyclientmessage (Just message)
where
client = liftIO . runNetProto clientrunst clientconn
proxynextclientmessage () = protoerrhandler proxyclientmessage $
client (net receiveMessage)
servermodechecker c a = c servermode $ \case
Nothing -> a
Just notallowed ->
protoerrhandler proxynextclientmessage $
client notallowed
proxyclientmessage Nothing = proxydone
proxyclientmessage (Just message) = case message of
CHECKPRESENT k -> proxyCHECKPRESENT proxyselector k >>= \case
Just remoteside ->
proxyresponse remoteside message
(const proxynextclientmessage)
Nothing ->
protoerrhandler proxynextclientmessage $
client $ net $ sendMessage FAILURE
LOCKCONTENT k -> proxyLOCKCONTENT proxyselector k >>= \case
Just remoteside ->
proxyresponse remoteside message
(const proxynextclientmessage)
Nothing ->
protoerrhandler proxynextclientmessage $
client $ net $ sendMessage FAILURE
UNLOCKCONTENT -> proxyUNLOCKCONTENT proxyselector >>= \case
Just remoteside ->
proxynoresponse remoteside message
proxynextclientmessage
Nothing -> proxynextclientmessage ()
REMOVE k -> do
remoteside <- proxyREMOVE proxyselector k
servermodechecker checkREMOVEServerMode $
handleREMOVE remoteside k message
GET _ _ k -> proxyGET proxyselector k >>= \case
Just remoteside -> handleGET remoteside message
Nothing ->
protoerrhandler proxynextclientmessage $
client $ net $ sendMessage $
ERROR "content not present"
PUT _ k -> do
remotesides <- proxyPUT proxyselector k
servermodechecker checkPUTServerMode $
handlePUT remotesides k message
-- These messages involve the git repository, not the
-- annex. So they affect the git repository of the proxy,
-- not the remote.
CONNECT service ->
servermodechecker (checkCONNECTServerMode service) $
-- P2P protocol does not continue after
-- relaying from git.
protoerrhandler (\() -> proxydone) $
client $ net $ relayService service
NOTIFYCHANGE -> protoerr
-- Messages that the client should only send after one of
-- the messages above.
SUCCESS -> protoerr
SUCCESS_PLUS _ -> protoerr
FAILURE -> protoerr
DATA _ -> protoerr
VALIDITY _ -> protoerr
-- If the client errors out, give up.
ERROR msg -> giveup $ "client error: " ++ msg
-- Messages that only the server should send.
CONNECTDONE _ -> protoerr
CHANGED _ -> protoerr
AUTH_SUCCESS _ -> protoerr
AUTH_FAILURE -> protoerr
PUT_FROM _ -> protoerr
ALREADY_HAVE -> protoerr
ALREADY_HAVE_PLUS _ -> protoerr
-- Early messages that the client should not send now.
AUTH _ _ -> protoerr
VERSION _ -> protoerr
-- Send a message to the remote, send its response back to the
-- client, and pass it to the continuation.
proxyresponse remoteside message a =
getresponse (runRemoteSide remoteside) message $ \resp ->
protoerrhandler (a resp) $
client $ net $ sendMessage resp
-- Send a message to the remote, that it will not respond to.
proxynoresponse remoteside message a =
protoerrhandler a $
runRemoteSide remoteside $ net $ sendMessage message
-- Send a message to the endpoint and get back its response.
getresponse endpoint message handleresp =
protoerrhandler (withresp handleresp) $
endpoint $ net $ do
sendMessage message
receiveMessage
withresp a (Just resp) = a resp
-- Whichever of the remote or client the message was read from
-- hung up.
withresp _ Nothing = proxydone
-- Read a message from one party, send it to the other,
-- and then pass the message to the continuation.
relayonemessage from to cont =
flip protoerrhandler (from $ net $ receiveMessage) $
withresp $ \message ->
protoerrhandler (cont message) $
to $ net $ sendMessage message
protoerr = do
_ <- client $ net $ sendMessage (ERROR "protocol error")
giveup "protocol error"
handleREMOVE remoteside k message =
proxyresponse remoteside message $ \resp () -> do
case resp of
SUCCESS -> removedContent proxymethods
(remoteUUID remoteside) k
_ -> return ()
proxynextclientmessage ()
handleGET remoteside message = getresponse (runRemoteSide remoteside) message $
withDATA (relayGET remoteside)
handlePUT (remoteside:[]) k message =
getresponse (runRemoteSide remoteside) message $ \resp -> case resp of
ALREADY_HAVE -> protoerrhandler proxynextclientmessage $
client $ net $ sendMessage resp
ALREADY_HAVE_PLUS _ -> protoerrhandler proxynextclientmessage $
client $ net $ sendMessage resp
PUT_FROM _ ->
getresponse client resp $
withDATA (relayPUT remoteside k)
_ -> protoerr
handlePUT [] _ _ =
protoerrhandler proxynextclientmessage $
client $ net $ sendMessage ALREADY_HAVE
handlePUT remotesides k message =
handlePutMulti remotesides k message
withDATA a message@(DATA len) = a len message
withDATA _ _ = protoerr
relayGET remoteside len = relayDATAStart client $
relayDATACore len (runRemoteSide remoteside) client $
relayDATAFinish (runRemoteSide remoteside) client $
relayonemessage client (runRemoteSide remoteside) $
const proxynextclientmessage
relayPUT remoteside k len = relayDATAStart (runRemoteSide remoteside) $
relayDATACore len client (runRemoteSide remoteside) $
relayDATAFinish client (runRemoteSide remoteside) $
relayonemessage (runRemoteSide remoteside) client finished
where
finished resp () = do
case resp of
SUCCESS -> addedContent proxymethods (remoteUUID remoteside) k
SUCCESS_PLUS us ->
forM_ (remoteUUID remoteside:us) $ \u ->
addedContent proxymethods u k
_ -> return ()
proxynextclientmessage ()
relayDATAStart x receive message =
protoerrhandler (\() -> receive) $
x $ net $ sendMessage message
relayDATACore len x y a = protoerrhandler send $
x $ net $ receiveBytes len nullMeterUpdate
where
send b = protoerrhandler a $
y $ net $ sendBytes len b nullMeterUpdate
relayDATAFinish x y sendsuccessfailure ()
| protocolversion == 0 = sendsuccessfailure
-- Protocol version 1 has a VALID or
-- INVALID message after the data.
| otherwise = relayonemessage x y (\_ () -> sendsuccessfailure)
handlePutMulti remotesides k message = do
let initiate remoteside = do
resp <- runRemoteSide remoteside $ net $ do
sendMessage message
receiveMessage
case resp of
Right (Just (PUT_FROM (Offset offset))) ->
return $ Right $
Right (remoteside, offset)
Right (Just ALREADY_HAVE) ->
return $ Right $ Left remoteside
Right (Just _) -> protoerr
Right Nothing -> return (Left ())
Left _err -> return (Left ())
let alreadyhave = \case
Right (Left _) -> True
_ -> False
l <- forM remotesides initiate
if all alreadyhave l
then if protocolversion < 2
then protoerrhandler proxynextclientmessage $
client $ net $ sendMessage ALREADY_HAVE
else protoerrhandler proxynextclientmessage $
client $ net $ sendMessage $ ALREADY_HAVE_PLUS $
filter (/= remoteuuid) $
map remoteUUID (lefts (rights l))
else if null (rights l)
-- no response from any remote
then proxydone
else do
let l' = rights (rights l)
let minoffset = minimum (map snd l')
getresponse client (PUT_FROM (Offset minoffset)) $
withDATA (relayPUTMulti minoffset l' k)
relayPUTMulti minoffset remotes k (Len datalen) _ = do
let totallen = datalen + minoffset
-- Tell each remote how much data to expect, depending
-- on the remote's offset.
forM_ remotes $ \(remoteside, remoteoffset) ->
runRemoteSide remoteside $
net $ sendMessage $ DATA $ Len $
totallen - remoteoffset
protoerrhandler (send remotes minoffset) $
client $ net $ receiveBytes (Len datalen) nullMeterUpdate
where
chunksize = fromIntegral defaultChunkSize
-- Stream the lazy bytestring out to the remotes in chunks.
-- Only start sending to a remote once past its desired
-- offset.
send rs n b = do
let (chunk, b') = L.splitAt chunksize b
let chunklen = fromIntegral (L.length chunk)
let !n' = n + chunklen
rs' <- forM rs $ \r@(remoteside, remoteoffset) ->
if n >= remoteoffset
then skipfailed r $ runRemoteSide remoteside $
net $ sendBytes (Len chunklen) chunk nullMeterUpdate
else if (n' <= remoteoffset)
then do
let chunkoffset = remoteoffset - n
let subchunklen = chunklen - chunkoffset
let subchunk = L.drop (fromIntegral chunkoffset) chunk
skipfailed r $ runRemoteSide remoteside $
net $ sendBytes (Len subchunklen) subchunk nullMeterUpdate
else return (Just r)
if L.null b'
then sent (catMaybes rs')
else send (catMaybes rs') n' b'
sent [] = proxydone
sent rs = giveup "XXX" -- XXX
skipfailed r@(remoteside, _) a = a >>= \case
Right _ -> return (Just r)
Left _ -> do
-- This connection to the remote is
-- unrecoverable at this point, so close it.
closeRemoteSide remoteside
return Nothing