diff --git a/Annex/Proxy.hs b/Annex/Proxy.hs index 193a01aa08..2bab026bac 100644 --- a/Annex/Proxy.hs +++ b/Annex/Proxy.hs @@ -22,6 +22,7 @@ import Annex.Tmp import Logs.Proxy import Logs.Cluster import Logs.UUID +import Logs.Location import Utility.Tmp.Dir import Utility.Metered @@ -274,3 +275,9 @@ checkCanProxy' remoteuuid ouruuid = M.lookup ouruuid <$> getProxies >>= \case Just desc -> return $ Left $ Just $ "not configured to proxy for repository " ++ fromUUIDDesc desc Nothing -> return $ Left Nothing + +mkProxyMethods :: ProxyMethods +mkProxyMethods = ProxyMethods + { removedContent = \u k -> logChange k u InfoMissing + , addedContent = \u k -> logChange k u InfoPresent + } diff --git a/Command/P2PStdIO.hs b/Command/P2PStdIO.hs index cf04478397..dfc4499601 100644 --- a/Command/P2PStdIO.hs +++ b/Command/P2PStdIO.hs @@ -16,7 +16,6 @@ import qualified Annex import Annex.Proxy import Annex.UUID import qualified CmdLine.GitAnnexShell.Checks as Checks -import Logs.Location import Logs.Cluster import Annex.Cluster import qualified Remote @@ -78,7 +77,7 @@ performProxy clientuuid servermode r = do let errhandler = p2pErrHandler (closeRemoteSide remoteside) proxystate <- liftIO mkProxyState let proxyparams = ProxyParams - { proxyMethods = proxymethods + { proxyMethods = mkProxyMethods , proxyState = proxystate , proxyServerMode = servermode , proxyClientSide = clientside @@ -92,11 +91,6 @@ performProxy clientuuid servermode r = do sendClientProtocolVersion clientside othermsg protocolversion runproxy errhandler withclientversion _ Nothing = p2pDone - - proxymethods = ProxyMethods - { removedContent = \u k -> logChange k u InfoMissing - , addedContent = \u k -> logChange k u InfoPresent - } performProxyCluster :: UUID -> ClusterUUID -> P2P.ServerMode -> CommandPerform performProxyCluster clientuuid clusteruuid servermode = do diff --git a/P2P/Http/Server.hs b/P2P/Http/Server.hs index a6c3cbde56..f6e77f9180 100644 --- a/P2P/Http/Server.hs +++ b/P2P/Http/Server.hs @@ -116,11 +116,14 @@ serveGet st su apiver (B64Key k) cu bypass baf startat sec auth = do validityv <- liftIO newEmptyTMVarIO finalv <- liftIO newEmptyTMVarIO annexworker <- liftIO $ async $ inAnnexWorker st $ do - let storer _offset len = sendContentWith $ \bs -> do - liftIO $ atomically $ putTMVar bsv (len, bs) - liftIO $ atomically $ takeTMVar endv - liftIO $ signalFullyConsumedByteString $ - connOhdl $ serverP2PConnection conn + let storer _offset len = sendContentWith $ \bs -> liftIO $ do + atomically $ putTMVar bsv (len, bs) + atomically $ takeTMVar endv + case serverP2PConnection conn of + Just c -> + signalFullyConsumedByteString $ + connOhdl c + Nothing -> noop return $ \v -> do liftIO $ atomically $ putTMVar validityv v return True diff --git a/P2P/Http/State.hs b/P2P/Http/State.hs index 31864876c1..569853f7a7 100644 --- a/P2P/Http/State.hs +++ b/P2P/Http/State.hs @@ -24,15 +24,17 @@ import Annex.UUID import Types.NumCopies import Types.WorkerPool import Annex.WorkerPool +import Types.Cluster import CmdLine.Action (startConcurrency) import Utility.ThreadScheduler import Utility.HumanTime import Annex.Proxy import qualified P2P.Proxy as Proxy -import Types.Cluster +import qualified Types.Remote as Remote import Servant import qualified Data.Map as M +import qualified Data.Set as S import Control.Concurrent.Async import Control.Concurrent.STM @@ -225,52 +227,28 @@ withP2PConnections workerpool a = do Right (Left reason) -> return $ Left $ ConnectionFailed $ fromMaybe "unknown uuid" reason - Right (Right (Right proxyremote)) -> do - openProxyConnectionToRemote proxyremote - >>= proxyConnection relv connparams workerpool - Right (Right (Left cluster)) -> do - openProxyConnectionToCluster cluster - >>= proxyConnection relv connparams workerpool + Right (Right (Right proxyremote)) -> + openProxyConnectionToRemote workerpool + (connectionProtocolVersion connparams) + (connectionBypass connparams) + proxyremote + >>= \case + Right conn -> proxyConnection relv connparams workerpool conn + Left ex -> return $ Left $ + ConnectionFailed $ show ex + Right (Right (Left clusteruuid)) -> + undefined -- XXX todo + {- + openProxyConnectionToCluster clusteruuid + >>= proxyConnection clusteruuid relv connparams workerpool + -} Left ex -> return $ Left $ ConnectionFailed $ show ex -proxyConnection - :: TMVar (IO ()) - -> ConnectionParams - -> AnnexWorkerPool - -> ProxyConnection - -> IO (Either ConnectionProblem P2PConnectionPair) -proxyConnection relv connparams workerpool proxyconn = - -- XXX fixme mkP2PConnectionPair is not quite right for this - mkP2PConnectionPair connparams relv $ \serverrunst serverconn -> - inAnnexWorker' workerpool $ do - let proxyparams = undefined -- XXX - let remoteside = undefined -- XXX - let requestmessage = undefined -- XXX - let proxydone = return () - let requestcomplete = \() -> return () - let protoerrhandler = \a -> \case - Left err -> giveup err - Right v -> return v - Proxy.proxyRequest proxydone proxyparams requestcomplete requestmessage protoerrhandler - -localConnection - :: TMVar (IO ()) - -> ConnectionParams - -> AnnexWorkerPool - -> IO (Either ConnectionProblem P2PConnectionPair) -localConnection relv connparams workerpool = - mkP2PConnectionPair connparams relv $ \serverrunst serverconn -> - inAnnexWorker' workerpool $ - void $ runFullProto serverrunst serverconn $ - P2P.serveOneCommandAuthed - (connectionServerMode connparams) - (connectionServerUUID connparams) - data P2PConnectionPair = P2PConnectionPair { clientRunState :: RunState , clientP2PConnection :: P2PConnection - , serverP2PConnection :: P2PConnection + , serverP2PConnection :: Maybe P2PConnection , releaseP2PConnection :: IO () -- ^ Releases a P2P connection, which can be reused for other -- requests. @@ -279,12 +257,47 @@ data P2PConnectionPair = P2PConnectionPair -- longer usable. } -mkP2PConnectionPair +localConnection + :: TMVar (IO ()) + -> ConnectionParams + -> AnnexWorkerPool + -> IO (Either ConnectionProblem P2PConnectionPair) +localConnection relv connparams workerpool = + localP2PConnectionPair connparams relv $ \serverrunst serverconn -> + inAnnexWorker' workerpool $ + void $ runFullProto serverrunst serverconn $ + P2P.serveOneCommandAuthed + (connectionServerMode connparams) + (connectionServerUUID connparams) + +localP2PConnectionPair :: ConnectionParams -> TMVar (IO ()) -> (RunState -> P2PConnection -> IO (Either SomeException ())) -> IO (Either ConnectionProblem P2PConnectionPair) -mkP2PConnectionPair connparams relv startworker = do +localP2PConnectionPair connparams relv startworker = do + (clientconn, serverconn) <- mkP2PConnectionPair connparams + ("http client", "http server") + clientrunst <- mkClientRunState connparams + serverrunst <- mkServerRunState connparams + asyncworker <- async $ + startworker serverrunst serverconn + let releaseconn = atomically $ void $ tryPutTMVar relv $ + liftIO $ wait asyncworker + >>= either throwM return + return $ Right $ P2PConnectionPair + { clientRunState = clientrunst + , clientP2PConnection = clientconn + , serverP2PConnection = Just serverconn + , releaseP2PConnection = releaseconn + , closeP2PConnection = releaseconn + } + +mkP2PConnectionPair + :: ConnectionParams + -> (String, String) + -> IO (P2PConnection, P2PConnection) +mkP2PConnectionPair connparams (n1, n2) = do hdl1 <- newEmptyTMVarIO hdl2 <- newEmptyTMVarIO wait1 <- newEmptyTMVarIO @@ -297,37 +310,89 @@ mkP2PConnectionPair connparams relv startworker = do if connectionWaitVar connparams then Just wait2 else Nothing - let serverconn = P2PConnection Nothing - (const True) h1 h2 - (ConnIdent (Just "http server")) let clientconn = P2PConnection Nothing (const True) h2 h1 - (ConnIdent (Just "http client")) - clientrunst <- mkclientrunst - serverrunst <- mkserverrunst + (ConnIdent (Just n1)) + let serverconn = P2PConnection Nothing + (const True) h1 h2 + (ConnIdent (Just n2)) + return (clientconn, serverconn) + +mkServerRunState :: ConnectionParams -> IO RunState +mkServerRunState connparams = do + prototvar <- newTVarIO $ connectionProtocolVersion connparams + mkRunState $ const $ Serving + (connectionClientUUID connparams) + Nothing + prototvar + +mkClientRunState :: ConnectionParams -> IO RunState +mkClientRunState connparams = do + prototvar <- newTVarIO $ connectionProtocolVersion connparams + mkRunState $ const $ Client prototvar + +proxyConnection + :: TMVar (IO ()) + -> ConnectionParams + -> AnnexWorkerPool + -> ProxyConnection + -> IO (Either ConnectionProblem P2PConnectionPair) +proxyConnection relv connparams workerpool proxyconn = do + (clientconn, proxyfromclientconn) <- mkP2PConnectionPair connparams + ("http client", "proxy") + clientrunst <- mkClientRunState connparams + proxyfromclientrunst <- mkClientRunState connparams asyncworker <- async $ - startworker serverrunst serverconn - let releaseconn = atomically $ void $ tryPutTMVar relv $ - liftIO $ wait asyncworker - >>= either throwM return + inAnnexWorker' workerpool $ do + proxystate <- liftIO Proxy.mkProxyState + concurrencyconfig <- Proxy.noConcurrencyConfig + -- TODO run remote protocol to get its version and + -- take minimum of that and connectionProtocolVersion + let protocolversion = connectionProtocolVersion connparams + let proxyparams = Proxy.ProxyParams + { Proxy.proxyMethods = mkProxyMethods + , Proxy.proxyState = proxystate + , Proxy.proxyServerMode = connectionServerMode connparams + , Proxy.proxyClientSide = Proxy.ClientSide proxyfromclientrunst proxyfromclientconn + , Proxy.proxyUUID = proxyConnectionRemoteUUID proxyconn + , Proxy.proxySelector = Proxy.singleProxySelector $ + proxyConnectionRemoteSide proxyconn + , Proxy.proxyConcurrencyConfig = concurrencyconfig + , Proxy.proxyProtocolVersion = protocolversion + } + let proxy mrequestmessage = case mrequestmessage of + Just requestmessage -> do + Proxy.proxyRequest proxydone proxyparams + requestcomplete requestmessage protoerrhandler + Nothing -> return () + protoerrhandler proxy $ + liftIO $ runNetProto proxyfromclientrunst proxyfromclientconn $ + P2P.net P2P.receiveMessage + + let releaseconn returntopool = + atomically $ void $ tryPutTMVar relv $ + liftIO $ wait asyncworker + >>= either throwM return + return $ Right $ P2PConnectionPair { clientRunState = clientrunst , clientP2PConnection = clientconn - , serverP2PConnection = serverconn - , releaseP2PConnection = releaseconn - , closeP2PConnection = releaseconn + , serverP2PConnection = Nothing + , releaseP2PConnection = releaseconn True + , closeP2PConnection = releaseconn False } where - mkserverrunst = do - prototvar <- newTVarIO $ connectionProtocolVersion connparams - mkRunState $ const $ Serving - (connectionClientUUID connparams) - Nothing - prototvar - - mkclientrunst = do - prototvar <- newTVarIO $ connectionProtocolVersion connparams - mkRunState $ const $ Client prototvar + protoerrhandler cont a = a >>= \case + -- TODO protocol error, or client hung up, release the p2p + -- connection + Left err -> do + liftIO $ hPutStrLn stderr ("protoerrhandler: " ++ show err) + return () + Right v -> do + liftIO $ print "protoerrhandler returned" + cont v + proxydone = return () + requestcomplete () = return () data Locker = Locker { lockerThread :: Async () @@ -441,9 +506,27 @@ inAnnexWorker' poolv annexaction = do return res data ProxyConnection = ProxyConnection - { proxyP2PConnectionPair :: P2PConnectionPair + { proxyConnectionRemoteUUID :: UUID + , proxyConnectionRemoteSide :: Proxy.RemoteSide } +openProxyConnectionToRemote + :: AnnexWorkerPool + -> P2P.ProtocolVersion + -> [UUID] + -> Remote + -> IO (Either SomeException ProxyConnection) +openProxyConnectionToRemote workerpool protoversion bypass remote = + inAnnexWorker' workerpool (proxyRemoteSide protoversion bypass' remote) >>= \case + Left ex -> return (Left ex) + Right remoteside -> return $ Right $ + ProxyConnection (Remote.uuid remote) remoteside + where + bypass' = P2P.Bypass (S.fromList bypass) + +openProxyConnectionToCluster :: ClusterUUID -> IO ProxyConnection +openProxyConnectionToCluster cu = error "XXX" -- TODO + type ProxyConnectionPool = M.Map (UUID, UUID, P2P.ProtocolVersion) [ProxyConnection] @@ -469,9 +552,3 @@ getProxyConnectionFromPool proxypool connparams = do , connectionClientUUID connparams , connectionProtocolVersion connparams ) - -openProxyConnectionToRemote :: Remote -> IO ProxyConnection -openProxyConnectionToRemote remote = error "XXX" -- TODO - -openProxyConnectionToCluster :: ClusterUUID -> IO ProxyConnection -openProxyConnectionToCluster cu = error "XXX" -- TODO diff --git a/P2P/IO.hs b/P2P/IO.hs index 54cea2a224..75a323a0d9 100644 --- a/P2P/IO.hs +++ b/P2P/IO.hs @@ -63,6 +63,7 @@ data ProtoFailure = ProtoFailureMessage String | ProtoFailureException SomeException | ProtoFailureIOError IOError + deriving (Show) describeProtoFailure :: ProtoFailure -> String describeProtoFailure (ProtoFailureMessage s) = s diff --git a/doc/todo/git-annex_proxies.mdwn b/doc/todo/git-annex_proxies.mdwn index 28d090073a..45575aaab2 100644 --- a/doc/todo/git-annex_proxies.mdwn +++ b/doc/todo/git-annex_proxies.mdwn @@ -28,9 +28,18 @@ Planned schedule of work: ## work notes -* Make http server support proxies and clusters. +* http server proxying hangs when git-annex copy --to it, maybe other + times. Need to fully test. - Current status: laying the keystone +* http server proxying needs to get the version negotiated with the proxied + remote and feed that into the proxy function. + +* test http server proxying with special remotes + +* http server proxying needs to reuse connections to special remotes, + keeping a pool of open ones. Question: How many to keep in the pool? + +* Make http server support clusters. * Support proxying to git remotes using annex+http urls. (Current documentation says proxying only works with ssh remotes, @@ -51,6 +60,8 @@ Planned schedule of work: * Allow using annex+http urls in remote.name.annexUrl +* Make http server support proxying. + ## items deferred until later for p2p protocol over http * `git-annex p2phttp` could support systemd socket activation. This would