From 82d66ede5e6edc932f9ef92f7f37505332895ff5 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 8 Jul 2024 10:40:38 -0400 Subject: [PATCH] convert lockcontent api to http long polling Websockets would work, but the problem with using them for this is that each lockcontent call is a separate websocket connection. And that's an actual TCP connection. One TCP connection per file dropped would be too expensive. With http long polling, regular http pipelining can be used, so it will reuse a TCP connection. Unfortunately, at least with servant, bi-directional streams with long polling don't result in true bidirectional full duplex communication. Servant processes the whole client body stream before generating the server body stream. I think it's entirely possible to do full bi-directional communication over http, but it would need changes to servant. And, there's no way for the client to tell if the server successfully locked the content, since the server will keep processing the client stream no matter what.: So, added a new api endpoint, keeplocked. lockcontent will lock the key for 10 minutes with retention lock, and then a call to keeplocked will keep it locked for as long as needed. This does mean that there will need to be a Map of locks by key, and I will probably want to add some kind of lock identifier that lockcontent returns. --- Command/P2PHttp.hs | 5 +- P2P/Http.hs | 240 +++++++++++++----- doc/design/p2p_protocol_over_http/draft1.mdwn | 63 ++++- git-annex.cabal | 4 +- 4 files changed, 228 insertions(+), 84 deletions(-) diff --git a/Command/P2PHttp.hs b/Command/P2PHttp.hs index b27a9adcac..53abf19fff 100644 --- a/Command/P2PHttp.hs +++ b/Command/P2PHttp.hs @@ -14,10 +14,13 @@ module Command.P2PHttp where import Command import P2P.Http +import qualified Network.Wai.Handler.Warp as Warp + cmd :: Command cmd = command "p2phttp" SectionPlumbing "communicate in P2P protocol over http" paramNothing (withParams seek) seek :: CmdParams -> CommandSeek -seek _ = liftIO $ P2P.Http.run +seek ["server"] = liftIO $ Warp.run 8080 p2pHttpApp +seek ["client"] = liftIO testClientLock diff --git a/P2P/Http.hs b/P2P/Http.hs index 6ecbecd857..2cc8f452a5 100644 --- a/P2P/Http.hs +++ b/P2P/Http.hs @@ -25,24 +25,16 @@ import Utility.MonotonicClock import Servant import Servant.Client.Streaming -import Servant.Client.Core.RunClient -import qualified Servant.Client.Core.Request as R import qualified Servant.Types.SourceT as S -import Servant.API.WebSocket -import qualified Network.WebSockets as Websocket -import qualified Network.WebSockets.Client as Websocket -import Network.Wai -import Network.Wai.Handler.Warp -import Network.HTTP.Client (newManager, defaultManagerSettings, path) +import Network.HTTP.Client (defaultManagerSettings, newManager) import qualified Data.Text as T import qualified Data.Text.Encoding as TE import qualified Data.ByteString as B import Text.Read (readMaybe) import Data.Aeson hiding (Key) -import Data.Maybe -import Data.Foldable -import Control.Monad.Reader import Control.DeepSeq +import Control.Concurrent +import Control.Concurrent.STM import GHC.Generics type P2PHttpAPI @@ -82,13 +74,17 @@ type P2PHttpAPI :<|> "git-annex" :> "v2" :> "lockcontent" :> LockContentAPI :<|> "git-annex" :> "v1" :> "lockcontent" :> LockContentAPI :<|> "git-annex" :> "v0" :> "lockcontent" :> LockContentAPI + :<|> "git-annex" :> "v3" :> "keeplocked" :> KeepLockedAPI + :<|> "git-annex" :> "v2" :> "keeplocked" :> KeepLockedAPI + :<|> "git-annex" :> "v1" :> "keeplocked" :> KeepLockedAPI + :<|> "git-annex" :> "v0" :> "keeplocked" :> KeepLockedAPI :<|> "git-annex" :> "key" :> CaptureKey :> GetAPI '[] p2pHttpAPI :: Proxy P2PHttpAPI p2pHttpAPI = Proxy -p2pHttp :: Application -p2pHttp = serve p2pHttpAPI serveP2pHttp +p2pHttpApp :: Application +p2pHttpApp = serve p2pHttpAPI serveP2pHttp serveP2pHttp :: Server P2PHttpAPI serveP2pHttp @@ -117,6 +113,10 @@ serveP2pHttp :<|> serveLockContent :<|> serveLockContent :<|> serveLockContent + :<|> serveKeepLocked + :<|> serveKeepLocked + :<|> serveKeepLocked + :<|> serveKeepLocked :<|> serveGet0 type GetAPI headers @@ -388,33 +388,29 @@ type LockContentAPI :> ClientUUID Required :> ServerUUID Required :> BypassUUIDs - :> WebSocket + :> Post '[JSON] LockResult serveLockContent :: B64Key -> B64UUID ClientSide -> B64UUID ServerSide -> [B64UUID Bypass] - -> Websocket.Connection - -> Handler () + -> Handler LockResult serveLockContent = undefined -data WebSocketClient = WebSocketClient R.Request - --- XXX this is enough to let servant-client work, but it's not yet --- possible to run a WebSocketClient. -instance RunClient m => HasClient m WebSocket where - type Client m WebSocket = WebSocketClient - clientWithRoute _pm Proxy req = WebSocketClient req - hoistClientMonad _ _ _ w = w - clientLockContent - :: B64Key + :: P2P.ProtocolVersion + -> B64Key -> B64UUID ClientSide -> B64UUID ServerSide -> [B64UUID Bypass] - -> WebSocketClient -clientLockContent = v3 + -> ClientM LockResult +clientLockContent (P2P.ProtocolVersion ver) = case ver of + 3 -> v3 + 2 -> v2 + 1 -> v1 + 0 -> v0 + _ -> error "unsupported protocol version" where _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> @@ -423,49 +419,113 @@ clientLockContent = v3 _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> - v3 :<|> _ = client p2pHttpAPI - -- XXX add other protocol versions + v3 :<|> v2 :<|> v1 :<|> v0 :<|> _ = client p2pHttpAPI ---XXX test code -query :: ClientM PutOffsetResultPlus -query = do - clientPutOffset (P2P.ProtocolVersion 3) - (B64Key (fromJust $ deserializeKey "WORM--foo")) - (B64UUID (toUUID ("client" :: String))) - (B64UUID (toUUID ("server" :: String))) - [] +type KeepLockedAPI + = KeyParam + :> ClientUUID Required + :> ServerUUID Required + :> BypassUUIDs + :> Header "Connection" ConnectionKeepAlive + :> Header "Keep-Alive" KeepAlive + :> StreamBody NewlineFraming JSON (SourceIO UnlockRequest) + :> Post '[JSON] LockResult ---XXX test code -query' :: WebSocketClient -query' = do - clientLockContent (B64Key (fromJust $ deserializeKey "WORM--foo")) - (B64UUID (toUUID ("client" :: String))) - (B64UUID (toUUID ("server" :: String))) - [] - -runWebSocketClient :: WebSocketClient -> Websocket.ClientApp a -> ClientM a -runWebSocketClient (WebSocketClient req) app = do - clientenv <- ask - let burl = baseUrl clientenv - let creq = defaultMakeClientRequest burl req - case baseUrlScheme burl of - Http -> liftIO $ Websocket.runClient - (baseUrlHost burl) - (baseUrlPort burl) - (decodeBS (path creq)) - app - Https -> error "TODO" -- XXX - ---XXX test code -run :: IO () -run = do - manager' <- newManager defaultManagerSettings - let WebSocketClient wscreq = query' - _ <- runClientM (runWebSocketClient query' wsapp) - (mkClientEnv manager' (BaseUrl Http "localhost" 8081 "")) - return () +serveKeepLocked + :: B64Key + -> B64UUID ClientSide + -> B64UUID ServerSide + -> [B64UUID Bypass] + -> Maybe ConnectionKeepAlive + -> Maybe KeepAlive + -> S.SourceT IO UnlockRequest + -> Handler LockResult +serveKeepLocked k cu su _ _ _ unlockrequeststream = do + _ <- liftIO $ S.unSourceT unlockrequeststream go + return (LockResult False) where - wsapp conn = Websocket.sendTextData conn ("hello, world" :: T.Text) + go S.Stop = do + print "lost connection to client, drop lock here" -- XXX TODO + go (S.Error err) = do + print ("Error", err) + print "error, drop lock here" -- XXX TODO + go (S.Skip s) = go s + go (S.Effect ms) = ms >>= go + go (S.Yield (UnlockRequest False) s) = go s + go (S.Yield (UnlockRequest True) _) = do + print ("got unlock request, drop lock here") -- XXX TODO + +clientKeepLocked + :: P2P.ProtocolVersion + -> B64Key + -> B64UUID ClientSide + -> B64UUID ServerSide + -> [B64UUID Bypass] + -> Maybe ConnectionKeepAlive + -> Maybe KeepAlive + -> S.SourceT IO UnlockRequest + -> ClientM LockResult +clientKeepLocked (P2P.ProtocolVersion ver) = case ver of + 3 -> v3 + 2 -> v2 + 1 -> v1 + 0 -> v0 + _ -> error "unsupported protocol version" + where + _ :<|> _ :<|> _ :<|> _ :<|> + _ :<|> _ :<|> _ :<|> _ :<|> + _ :<|> _ :<|> _ :<|> _ :<|> + _ :<|> + _ :<|> + _ :<|> _ :<|> _ :<|> _ :<|> + _ :<|> _ :<|> _ :<|> + _ :<|> _ :<|> _ :<|> _ :<|> + v3 :<|> v2 :<|> v1 :<|> v0 :<|> _ = client p2pHttpAPI + +clientKeepLocked' + :: ClientEnv + -> P2P.ProtocolVersion + -> B64Key + -> B64UUID ClientSide + -> B64UUID ServerSide + -> [B64UUID Bypass] + -> TMVar Bool + -> IO () +clientKeepLocked' clientenv protover key cu su bypass keeplocked = do + let cli = clientKeepLocked protover key cu su bypass + (Just connectionKeepAlive) (Just keepAlive) + (S.fromStepT unlocksender) + withClientM cli clientenv $ \case + Left err -> throwM err + Right (LockResult _) -> + liftIO $ print "end of lock connection to server" + where + unlocksender = + S.Yield (UnlockRequest False) $ S.Effect $ do + liftIO $ print "sent keep locked request" + return $ S.Effect $ do + stilllock <- liftIO $ atomically $ takeTMVar keeplocked + if stilllock + then return unlocksender + else do + liftIO $ print "sending unlock request" + return $ S.Yield (UnlockRequest True) S.Stop + +testClientLock = do + mgr <- newManager defaultManagerSettings + burl <- parseBaseUrl "http://localhost:8080/" + keeplocked <- newEmptyTMVarIO + _ <- forkIO $ do + print "running, press enter to drop lock" + _ <- getLine + atomically $ writeTMVar keeplocked False + clientKeepLocked' (mkClientEnv mgr burl) + (P2P.ProtocolVersion 3) + (B64Key (fromJust $ deserializeKey "WORM--foo")) + (B64UUID (toUUID ("cu" :: String))) + (B64UUID (toUUID ("su" :: String))) + [] + keeplocked type ClientUUID req = QueryParam' '[req] "clientuuid" (B64UUID ClientSide) @@ -533,6 +593,34 @@ newtype Offset = Offset P2P.Offset newtype Timestamp = Timestamp MonotonicTimestamp deriving (Show) +newtype LockResult = LockResult Bool + deriving (Show, Generic, NFData) + +newtype UnlockRequest = UnlockRequest Bool + deriving (Show, Generic, NFData) + +newtype ConnectionKeepAlive = ConnectionKeepAlive T.Text + +connectionKeepAlive :: ConnectionKeepAlive +connectionKeepAlive = ConnectionKeepAlive "Keep-Alive" + +newtype KeepAlive = KeepAlive T.Text + +keepAlive :: KeepAlive +keepAlive = KeepAlive "timeout=1200" + +instance ToHttpApiData ConnectionKeepAlive where + toUrlPiece (ConnectionKeepAlive t) = t + +instance FromHttpApiData ConnectionKeepAlive where + parseUrlPiece = Right . ConnectionKeepAlive + +instance ToHttpApiData KeepAlive where + toUrlPiece (KeepAlive t) = t + +instance FromHttpApiData KeepAlive where + parseUrlPiece = Right . KeepAlive + instance ToHttpApiData B64Key where toUrlPiece (B64Key k) = TE.decodeUtf8Lenient $ toB64 (serializeKey' k) @@ -668,6 +756,22 @@ instance FromJSON (B64UUID t) where _ -> mempty parseJSON _ = mempty +instance ToJSON LockResult where + toJSON (LockResult v) = object + ["locked" .= v] + +instance FromJSON LockResult where + parseJSON = withObject "LockResult" $ \v -> LockResult + <$> v .: "locked" + +instance ToJSON UnlockRequest where + toJSON (UnlockRequest v) = object + ["unlock" .= v] + +instance FromJSON UnlockRequest where + parseJSON = withObject "UnlockRequest" $ \v -> UnlockRequest + <$> v .: "unlock" + plusList :: [B64UUID Plus] -> [String] plusList = map (\(B64UUID u) -> fromUUID u) diff --git a/doc/design/p2p_protocol_over_http/draft1.mdwn b/doc/design/p2p_protocol_over_http/draft1.mdwn index 6841c66cc7..5e889d7587 100644 --- a/doc/design/p2p_protocol_over_http/draft1.mdwn +++ b/doc/design/p2p_protocol_over_http/draft1.mdwn @@ -183,22 +183,15 @@ Locks the content of a key on the server, preventing it from being removed. Example: > POST /git-annex/v3/lockcontent?key=SHA1--foo&clientuuid=79a5a1f4-07e8-11ef-873d-97f93ca91925&serveruuid=ecf6d4ca-07e8-11ef-8990-9b8c1f696bf6 HTTP/1.1 - [websocket protocol follows] - < SUCCESS - > UNLOCKCONTENT + < {"locked": true} There is one required additional parameter, `key`. -This request opens a websocket between the client and the server. -The server sends "SUCCESS" over the websocket once it has locked -the content. Or it sends "FAILURE" if it is unable to lock the content. +The server will return `{"locked": true}` if it was able to lock the key, +or `{"locked": false}` if it was not. -Once the server has sent "SUCCESS", the content remains locked -until the client sends "UNLOCKCONTENT" over the websocket. - -If the client disconnects without sending "UNLOCKCONTENT", or the web -server gets shut down before it can receive that, the content will remain -locked for at least 10 minutes from when the server sent "SUCCESS". +The key will remain locked for 10 minutes. But, usually `keeplocked` +is used to control the lifetime of the lock. (See below.) ### POST /git-annex/v2/lockcontent @@ -212,6 +205,52 @@ Identical to v3. Identical to v3. +### POST /git-annex/v3/keeplocked + +Controls the lifetime of a lock on a key that was earlier obtained +with `lockcontent`. + +Example: + + > POST /git-annex/v3/keeplocked?key=SHA1--foo&clientuuid=79a5a1f4-07e8-11ef-873d-97f93ca91925&serveruuid=ecf6d4ca-07e8-11ef-8990-9b8c1f696bf6 HTTP/1.1 + > Connection: Keep-Alive + > Keep-Alive: timeout=1200 + [some time later] + > {"unlock": true} + < {"locked": false} + +There is one required additional parameter, `key`. + +This uses long polling. So it's important to use +Connection and Keep-Alive headers. + +This keeps an active lock from expiring until the client sends +`{"unlock": true}`, and then it immediately unlocks it. + +The client can send `{"unlock": false}` any number of times first. +This has no effect, but may be useful to keep the connection alive. + +This must be called within ten minutes of `lockcontent`, otherwise +the lock will have already expired when this runs. Note that this +does not indicate if the lock expired, it always returns +`{"locked": false}`. + +If the connection is closed before the client sends `{"unlock": true}, +or even if the web server gets shut down, the content will remain +locked for 10 minutes from the time it was first locked. + +### POST /git-annex/v2/keeplocked + +Identical to v3. + +### POST /git-annex/v1/keeplocked + +Identical to v3. + +### POST /git-annex/v0/keeplocked + +Identical to v3. + ### POST /git-annex/v3/remove Remove a key's content from the server. diff --git a/git-annex.cabal b/git-annex.cabal index d647668062..f15270d35d 100644 --- a/git-annex.cabal +++ b/git-annex.cabal @@ -320,9 +320,7 @@ Executable git-annex servant, servant-server, servant-client, - servant-client-core, - servant-websockets, - websockets + servant-client-core CPP-Options: -DWITH_SERVANT Other-Modules: Command.P2PHttp