http p2p proxy is now largely working

This commit is contained in:
Joey Hess 2024-07-26 10:24:23 -04:00
parent b391756b32
commit cc1da2d516
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
6 changed files with 180 additions and 87 deletions

View file

@ -22,6 +22,7 @@ import Annex.Tmp
import Logs.Proxy import Logs.Proxy
import Logs.Cluster import Logs.Cluster
import Logs.UUID import Logs.UUID
import Logs.Location
import Utility.Tmp.Dir import Utility.Tmp.Dir
import Utility.Metered import Utility.Metered
@ -274,3 +275,9 @@ checkCanProxy' remoteuuid ouruuid = M.lookup ouruuid <$> getProxies >>= \case
Just desc -> return $ Left $ Just $ Just desc -> return $ Left $ Just $
"not configured to proxy for repository " ++ fromUUIDDesc desc "not configured to proxy for repository " ++ fromUUIDDesc desc
Nothing -> return $ Left Nothing Nothing -> return $ Left Nothing
mkProxyMethods :: ProxyMethods
mkProxyMethods = ProxyMethods
{ removedContent = \u k -> logChange k u InfoMissing
, addedContent = \u k -> logChange k u InfoPresent
}

View file

@ -16,7 +16,6 @@ import qualified Annex
import Annex.Proxy import Annex.Proxy
import Annex.UUID import Annex.UUID
import qualified CmdLine.GitAnnexShell.Checks as Checks import qualified CmdLine.GitAnnexShell.Checks as Checks
import Logs.Location
import Logs.Cluster import Logs.Cluster
import Annex.Cluster import Annex.Cluster
import qualified Remote import qualified Remote
@ -78,7 +77,7 @@ performProxy clientuuid servermode r = do
let errhandler = p2pErrHandler (closeRemoteSide remoteside) let errhandler = p2pErrHandler (closeRemoteSide remoteside)
proxystate <- liftIO mkProxyState proxystate <- liftIO mkProxyState
let proxyparams = ProxyParams let proxyparams = ProxyParams
{ proxyMethods = proxymethods { proxyMethods = mkProxyMethods
, proxyState = proxystate , proxyState = proxystate
, proxyServerMode = servermode , proxyServerMode = servermode
, proxyClientSide = clientside , proxyClientSide = clientside
@ -93,11 +92,6 @@ performProxy clientuuid servermode r = do
runproxy errhandler runproxy errhandler
withclientversion _ Nothing = p2pDone withclientversion _ Nothing = p2pDone
proxymethods = ProxyMethods
{ removedContent = \u k -> logChange k u InfoMissing
, addedContent = \u k -> logChange k u InfoPresent
}
performProxyCluster :: UUID -> ClusterUUID -> P2P.ServerMode -> CommandPerform performProxyCluster :: UUID -> ClusterUUID -> P2P.ServerMode -> CommandPerform
performProxyCluster clientuuid clusteruuid servermode = do performProxyCluster clientuuid clusteruuid servermode = do
clientside <- mkProxyClientSide clientuuid clientside <- mkProxyClientSide clientuuid

View file

@ -116,11 +116,14 @@ serveGet st su apiver (B64Key k) cu bypass baf startat sec auth = do
validityv <- liftIO newEmptyTMVarIO validityv <- liftIO newEmptyTMVarIO
finalv <- liftIO newEmptyTMVarIO finalv <- liftIO newEmptyTMVarIO
annexworker <- liftIO $ async $ inAnnexWorker st $ do annexworker <- liftIO $ async $ inAnnexWorker st $ do
let storer _offset len = sendContentWith $ \bs -> do let storer _offset len = sendContentWith $ \bs -> liftIO $ do
liftIO $ atomically $ putTMVar bsv (len, bs) atomically $ putTMVar bsv (len, bs)
liftIO $ atomically $ takeTMVar endv atomically $ takeTMVar endv
liftIO $ signalFullyConsumedByteString $ case serverP2PConnection conn of
connOhdl $ serverP2PConnection conn Just c ->
signalFullyConsumedByteString $
connOhdl c
Nothing -> noop
return $ \v -> do return $ \v -> do
liftIO $ atomically $ putTMVar validityv v liftIO $ atomically $ putTMVar validityv v
return True return True

View file

@ -24,15 +24,17 @@ import Annex.UUID
import Types.NumCopies import Types.NumCopies
import Types.WorkerPool import Types.WorkerPool
import Annex.WorkerPool import Annex.WorkerPool
import Types.Cluster
import CmdLine.Action (startConcurrency) import CmdLine.Action (startConcurrency)
import Utility.ThreadScheduler import Utility.ThreadScheduler
import Utility.HumanTime import Utility.HumanTime
import Annex.Proxy import Annex.Proxy
import qualified P2P.Proxy as Proxy import qualified P2P.Proxy as Proxy
import Types.Cluster import qualified Types.Remote as Remote
import Servant import Servant
import qualified Data.Map as M import qualified Data.Map as M
import qualified Data.Set as S
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.STM import Control.Concurrent.STM
@ -225,52 +227,28 @@ withP2PConnections workerpool a = do
Right (Left reason) -> return $ Left $ Right (Left reason) -> return $ Left $
ConnectionFailed $ ConnectionFailed $
fromMaybe "unknown uuid" reason fromMaybe "unknown uuid" reason
Right (Right (Right proxyremote)) -> do Right (Right (Right proxyremote)) ->
openProxyConnectionToRemote proxyremote openProxyConnectionToRemote workerpool
>>= proxyConnection relv connparams workerpool (connectionProtocolVersion connparams)
Right (Right (Left cluster)) -> do (connectionBypass connparams)
openProxyConnectionToCluster cluster proxyremote
>>= proxyConnection relv connparams workerpool >>= \case
Right conn -> proxyConnection relv connparams workerpool conn
Left ex -> return $ Left $
ConnectionFailed $ show ex
Right (Right (Left clusteruuid)) ->
undefined -- XXX todo
{-
openProxyConnectionToCluster clusteruuid
>>= proxyConnection clusteruuid relv connparams workerpool
-}
Left ex -> return $ Left $ Left ex -> return $ Left $
ConnectionFailed $ show ex ConnectionFailed $ show ex
proxyConnection
:: TMVar (IO ())
-> ConnectionParams
-> AnnexWorkerPool
-> ProxyConnection
-> IO (Either ConnectionProblem P2PConnectionPair)
proxyConnection relv connparams workerpool proxyconn =
-- XXX fixme mkP2PConnectionPair is not quite right for this
mkP2PConnectionPair connparams relv $ \serverrunst serverconn ->
inAnnexWorker' workerpool $ do
let proxyparams = undefined -- XXX
let remoteside = undefined -- XXX
let requestmessage = undefined -- XXX
let proxydone = return ()
let requestcomplete = \() -> return ()
let protoerrhandler = \a -> \case
Left err -> giveup err
Right v -> return v
Proxy.proxyRequest proxydone proxyparams requestcomplete requestmessage protoerrhandler
localConnection
:: TMVar (IO ())
-> ConnectionParams
-> AnnexWorkerPool
-> IO (Either ConnectionProblem P2PConnectionPair)
localConnection relv connparams workerpool =
mkP2PConnectionPair connparams relv $ \serverrunst serverconn ->
inAnnexWorker' workerpool $
void $ runFullProto serverrunst serverconn $
P2P.serveOneCommandAuthed
(connectionServerMode connparams)
(connectionServerUUID connparams)
data P2PConnectionPair = P2PConnectionPair data P2PConnectionPair = P2PConnectionPair
{ clientRunState :: RunState { clientRunState :: RunState
, clientP2PConnection :: P2PConnection , clientP2PConnection :: P2PConnection
, serverP2PConnection :: P2PConnection , serverP2PConnection :: Maybe P2PConnection
, releaseP2PConnection :: IO () , releaseP2PConnection :: IO ()
-- ^ Releases a P2P connection, which can be reused for other -- ^ Releases a P2P connection, which can be reused for other
-- requests. -- requests.
@ -279,12 +257,47 @@ data P2PConnectionPair = P2PConnectionPair
-- longer usable. -- longer usable.
} }
mkP2PConnectionPair localConnection
:: TMVar (IO ())
-> ConnectionParams
-> AnnexWorkerPool
-> IO (Either ConnectionProblem P2PConnectionPair)
localConnection relv connparams workerpool =
localP2PConnectionPair connparams relv $ \serverrunst serverconn ->
inAnnexWorker' workerpool $
void $ runFullProto serverrunst serverconn $
P2P.serveOneCommandAuthed
(connectionServerMode connparams)
(connectionServerUUID connparams)
localP2PConnectionPair
:: ConnectionParams :: ConnectionParams
-> TMVar (IO ()) -> TMVar (IO ())
-> (RunState -> P2PConnection -> IO (Either SomeException ())) -> (RunState -> P2PConnection -> IO (Either SomeException ()))
-> IO (Either ConnectionProblem P2PConnectionPair) -> IO (Either ConnectionProblem P2PConnectionPair)
mkP2PConnectionPair connparams relv startworker = do localP2PConnectionPair connparams relv startworker = do
(clientconn, serverconn) <- mkP2PConnectionPair connparams
("http client", "http server")
clientrunst <- mkClientRunState connparams
serverrunst <- mkServerRunState connparams
asyncworker <- async $
startworker serverrunst serverconn
let releaseconn = atomically $ void $ tryPutTMVar relv $
liftIO $ wait asyncworker
>>= either throwM return
return $ Right $ P2PConnectionPair
{ clientRunState = clientrunst
, clientP2PConnection = clientconn
, serverP2PConnection = Just serverconn
, releaseP2PConnection = releaseconn
, closeP2PConnection = releaseconn
}
mkP2PConnectionPair
:: ConnectionParams
-> (String, String)
-> IO (P2PConnection, P2PConnection)
mkP2PConnectionPair connparams (n1, n2) = do
hdl1 <- newEmptyTMVarIO hdl1 <- newEmptyTMVarIO
hdl2 <- newEmptyTMVarIO hdl2 <- newEmptyTMVarIO
wait1 <- newEmptyTMVarIO wait1 <- newEmptyTMVarIO
@ -297,37 +310,89 @@ mkP2PConnectionPair connparams relv startworker = do
if connectionWaitVar connparams if connectionWaitVar connparams
then Just wait2 then Just wait2
else Nothing else Nothing
let serverconn = P2PConnection Nothing
(const True) h1 h2
(ConnIdent (Just "http server"))
let clientconn = P2PConnection Nothing let clientconn = P2PConnection Nothing
(const True) h2 h1 (const True) h2 h1
(ConnIdent (Just "http client")) (ConnIdent (Just n1))
clientrunst <- mkclientrunst let serverconn = P2PConnection Nothing
serverrunst <- mkserverrunst (const True) h1 h2
(ConnIdent (Just n2))
return (clientconn, serverconn)
mkServerRunState :: ConnectionParams -> IO RunState
mkServerRunState connparams = do
prototvar <- newTVarIO $ connectionProtocolVersion connparams
mkRunState $ const $ Serving
(connectionClientUUID connparams)
Nothing
prototvar
mkClientRunState :: ConnectionParams -> IO RunState
mkClientRunState connparams = do
prototvar <- newTVarIO $ connectionProtocolVersion connparams
mkRunState $ const $ Client prototvar
proxyConnection
:: TMVar (IO ())
-> ConnectionParams
-> AnnexWorkerPool
-> ProxyConnection
-> IO (Either ConnectionProblem P2PConnectionPair)
proxyConnection relv connparams workerpool proxyconn = do
(clientconn, proxyfromclientconn) <- mkP2PConnectionPair connparams
("http client", "proxy")
clientrunst <- mkClientRunState connparams
proxyfromclientrunst <- mkClientRunState connparams
asyncworker <- async $ asyncworker <- async $
startworker serverrunst serverconn inAnnexWorker' workerpool $ do
let releaseconn = atomically $ void $ tryPutTMVar relv $ proxystate <- liftIO Proxy.mkProxyState
liftIO $ wait asyncworker concurrencyconfig <- Proxy.noConcurrencyConfig
>>= either throwM return -- TODO run remote protocol to get its version and
-- take minimum of that and connectionProtocolVersion
let protocolversion = connectionProtocolVersion connparams
let proxyparams = Proxy.ProxyParams
{ Proxy.proxyMethods = mkProxyMethods
, Proxy.proxyState = proxystate
, Proxy.proxyServerMode = connectionServerMode connparams
, Proxy.proxyClientSide = Proxy.ClientSide proxyfromclientrunst proxyfromclientconn
, Proxy.proxyUUID = proxyConnectionRemoteUUID proxyconn
, Proxy.proxySelector = Proxy.singleProxySelector $
proxyConnectionRemoteSide proxyconn
, Proxy.proxyConcurrencyConfig = concurrencyconfig
, Proxy.proxyProtocolVersion = protocolversion
}
let proxy mrequestmessage = case mrequestmessage of
Just requestmessage -> do
Proxy.proxyRequest proxydone proxyparams
requestcomplete requestmessage protoerrhandler
Nothing -> return ()
protoerrhandler proxy $
liftIO $ runNetProto proxyfromclientrunst proxyfromclientconn $
P2P.net P2P.receiveMessage
let releaseconn returntopool =
atomically $ void $ tryPutTMVar relv $
liftIO $ wait asyncworker
>>= either throwM return
return $ Right $ P2PConnectionPair return $ Right $ P2PConnectionPair
{ clientRunState = clientrunst { clientRunState = clientrunst
, clientP2PConnection = clientconn , clientP2PConnection = clientconn
, serverP2PConnection = serverconn , serverP2PConnection = Nothing
, releaseP2PConnection = releaseconn , releaseP2PConnection = releaseconn True
, closeP2PConnection = releaseconn , closeP2PConnection = releaseconn False
} }
where where
mkserverrunst = do protoerrhandler cont a = a >>= \case
prototvar <- newTVarIO $ connectionProtocolVersion connparams -- TODO protocol error, or client hung up, release the p2p
mkRunState $ const $ Serving -- connection
(connectionClientUUID connparams) Left err -> do
Nothing liftIO $ hPutStrLn stderr ("protoerrhandler: " ++ show err)
prototvar return ()
Right v -> do
mkclientrunst = do liftIO $ print "protoerrhandler returned"
prototvar <- newTVarIO $ connectionProtocolVersion connparams cont v
mkRunState $ const $ Client prototvar proxydone = return ()
requestcomplete () = return ()
data Locker = Locker data Locker = Locker
{ lockerThread :: Async () { lockerThread :: Async ()
@ -441,9 +506,27 @@ inAnnexWorker' poolv annexaction = do
return res return res
data ProxyConnection = ProxyConnection data ProxyConnection = ProxyConnection
{ proxyP2PConnectionPair :: P2PConnectionPair { proxyConnectionRemoteUUID :: UUID
, proxyConnectionRemoteSide :: Proxy.RemoteSide
} }
openProxyConnectionToRemote
:: AnnexWorkerPool
-> P2P.ProtocolVersion
-> [UUID]
-> Remote
-> IO (Either SomeException ProxyConnection)
openProxyConnectionToRemote workerpool protoversion bypass remote =
inAnnexWorker' workerpool (proxyRemoteSide protoversion bypass' remote) >>= \case
Left ex -> return (Left ex)
Right remoteside -> return $ Right $
ProxyConnection (Remote.uuid remote) remoteside
where
bypass' = P2P.Bypass (S.fromList bypass)
openProxyConnectionToCluster :: ClusterUUID -> IO ProxyConnection
openProxyConnectionToCluster cu = error "XXX" -- TODO
type ProxyConnectionPool = type ProxyConnectionPool =
M.Map (UUID, UUID, P2P.ProtocolVersion) [ProxyConnection] M.Map (UUID, UUID, P2P.ProtocolVersion) [ProxyConnection]
@ -469,9 +552,3 @@ getProxyConnectionFromPool proxypool connparams = do
, connectionClientUUID connparams , connectionClientUUID connparams
, connectionProtocolVersion connparams , connectionProtocolVersion connparams
) )
openProxyConnectionToRemote :: Remote -> IO ProxyConnection
openProxyConnectionToRemote remote = error "XXX" -- TODO
openProxyConnectionToCluster :: ClusterUUID -> IO ProxyConnection
openProxyConnectionToCluster cu = error "XXX" -- TODO

View file

@ -63,6 +63,7 @@ data ProtoFailure
= ProtoFailureMessage String = ProtoFailureMessage String
| ProtoFailureException SomeException | ProtoFailureException SomeException
| ProtoFailureIOError IOError | ProtoFailureIOError IOError
deriving (Show)
describeProtoFailure :: ProtoFailure -> String describeProtoFailure :: ProtoFailure -> String
describeProtoFailure (ProtoFailureMessage s) = s describeProtoFailure (ProtoFailureMessage s) = s

View file

@ -28,9 +28,18 @@ Planned schedule of work:
## work notes ## work notes
* Make http server support proxies and clusters. * http server proxying hangs when git-annex copy --to it, maybe other
times. Need to fully test.
Current status: laying the keystone * http server proxying needs to get the version negotiated with the proxied
remote and feed that into the proxy function.
* test http server proxying with special remotes
* http server proxying needs to reuse connections to special remotes,
keeping a pool of open ones. Question: How many to keep in the pool?
* Make http server support clusters.
* Support proxying to git remotes using annex+http urls. * Support proxying to git remotes using annex+http urls.
(Current documentation says proxying only works with ssh remotes, (Current documentation says proxying only works with ssh remotes,
@ -51,6 +60,8 @@ Planned schedule of work:
* Allow using annex+http urls in remote.name.annexUrl * Allow using annex+http urls in remote.name.annexUrl
* Make http server support proxying.
## items deferred until later for p2p protocol over http ## items deferred until later for p2p protocol over http
* `git-annex p2phttp` could support systemd socket activation. This would * `git-annex p2phttp` could support systemd socket activation. This would