diff --git a/Command/P2PHttp.hs b/Command/P2PHttp.hs index b27a9adcac..53abf19fff 100644 --- a/Command/P2PHttp.hs +++ b/Command/P2PHttp.hs @@ -14,10 +14,13 @@ module Command.P2PHttp where import Command import P2P.Http +import qualified Network.Wai.Handler.Warp as Warp + cmd :: Command cmd = command "p2phttp" SectionPlumbing "communicate in P2P protocol over http" paramNothing (withParams seek) seek :: CmdParams -> CommandSeek -seek _ = liftIO $ P2P.Http.run +seek ["server"] = liftIO $ Warp.run 8080 p2pHttpApp +seek ["client"] = liftIO testClientLock diff --git a/P2P/Http.hs b/P2P/Http.hs index 6ecbecd857..2cc8f452a5 100644 --- a/P2P/Http.hs +++ b/P2P/Http.hs @@ -25,24 +25,16 @@ import Utility.MonotonicClock import Servant import Servant.Client.Streaming -import Servant.Client.Core.RunClient -import qualified Servant.Client.Core.Request as R import qualified Servant.Types.SourceT as S -import Servant.API.WebSocket -import qualified Network.WebSockets as Websocket -import qualified Network.WebSockets.Client as Websocket -import Network.Wai -import Network.Wai.Handler.Warp -import Network.HTTP.Client (newManager, defaultManagerSettings, path) +import Network.HTTP.Client (defaultManagerSettings, newManager) import qualified Data.Text as T import qualified Data.Text.Encoding as TE import qualified Data.ByteString as B import Text.Read (readMaybe) import Data.Aeson hiding (Key) -import Data.Maybe -import Data.Foldable -import Control.Monad.Reader import Control.DeepSeq +import Control.Concurrent +import Control.Concurrent.STM import GHC.Generics type P2PHttpAPI @@ -82,13 +74,17 @@ type P2PHttpAPI :<|> "git-annex" :> "v2" :> "lockcontent" :> LockContentAPI :<|> "git-annex" :> "v1" :> "lockcontent" :> LockContentAPI :<|> "git-annex" :> "v0" :> "lockcontent" :> LockContentAPI + :<|> "git-annex" :> "v3" :> "keeplocked" :> KeepLockedAPI + :<|> "git-annex" :> "v2" :> "keeplocked" :> KeepLockedAPI + :<|> "git-annex" :> "v1" :> "keeplocked" :> KeepLockedAPI + :<|> "git-annex" :> "v0" :> "keeplocked" :> KeepLockedAPI :<|> "git-annex" :> "key" :> CaptureKey :> GetAPI '[] p2pHttpAPI :: Proxy P2PHttpAPI p2pHttpAPI = Proxy -p2pHttp :: Application -p2pHttp = serve p2pHttpAPI serveP2pHttp +p2pHttpApp :: Application +p2pHttpApp = serve p2pHttpAPI serveP2pHttp serveP2pHttp :: Server P2PHttpAPI serveP2pHttp @@ -117,6 +113,10 @@ serveP2pHttp :<|> serveLockContent :<|> serveLockContent :<|> serveLockContent + :<|> serveKeepLocked + :<|> serveKeepLocked + :<|> serveKeepLocked + :<|> serveKeepLocked :<|> serveGet0 type GetAPI headers @@ -388,33 +388,29 @@ type LockContentAPI :> ClientUUID Required :> ServerUUID Required :> BypassUUIDs - :> WebSocket + :> Post '[JSON] LockResult serveLockContent :: B64Key -> B64UUID ClientSide -> B64UUID ServerSide -> [B64UUID Bypass] - -> Websocket.Connection - -> Handler () + -> Handler LockResult serveLockContent = undefined -data WebSocketClient = WebSocketClient R.Request - --- XXX this is enough to let servant-client work, but it's not yet --- possible to run a WebSocketClient. -instance RunClient m => HasClient m WebSocket where - type Client m WebSocket = WebSocketClient - clientWithRoute _pm Proxy req = WebSocketClient req - hoistClientMonad _ _ _ w = w - clientLockContent - :: B64Key + :: P2P.ProtocolVersion + -> B64Key -> B64UUID ClientSide -> B64UUID ServerSide -> [B64UUID Bypass] - -> WebSocketClient -clientLockContent = v3 + -> ClientM LockResult +clientLockContent (P2P.ProtocolVersion ver) = case ver of + 3 -> v3 + 2 -> v2 + 1 -> v1 + 0 -> v0 + _ -> error "unsupported protocol version" where _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> @@ -423,49 +419,113 @@ clientLockContent = v3 _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> - v3 :<|> _ = client p2pHttpAPI - -- XXX add other protocol versions + v3 :<|> v2 :<|> v1 :<|> v0 :<|> _ = client p2pHttpAPI ---XXX test code -query :: ClientM PutOffsetResultPlus -query = do - clientPutOffset (P2P.ProtocolVersion 3) - (B64Key (fromJust $ deserializeKey "WORM--foo")) - (B64UUID (toUUID ("client" :: String))) - (B64UUID (toUUID ("server" :: String))) - [] +type KeepLockedAPI + = KeyParam + :> ClientUUID Required + :> ServerUUID Required + :> BypassUUIDs + :> Header "Connection" ConnectionKeepAlive + :> Header "Keep-Alive" KeepAlive + :> StreamBody NewlineFraming JSON (SourceIO UnlockRequest) + :> Post '[JSON] LockResult ---XXX test code -query' :: WebSocketClient -query' = do - clientLockContent (B64Key (fromJust $ deserializeKey "WORM--foo")) - (B64UUID (toUUID ("client" :: String))) - (B64UUID (toUUID ("server" :: String))) - [] - -runWebSocketClient :: WebSocketClient -> Websocket.ClientApp a -> ClientM a -runWebSocketClient (WebSocketClient req) app = do - clientenv <- ask - let burl = baseUrl clientenv - let creq = defaultMakeClientRequest burl req - case baseUrlScheme burl of - Http -> liftIO $ Websocket.runClient - (baseUrlHost burl) - (baseUrlPort burl) - (decodeBS (path creq)) - app - Https -> error "TODO" -- XXX - ---XXX test code -run :: IO () -run = do - manager' <- newManager defaultManagerSettings - let WebSocketClient wscreq = query' - _ <- runClientM (runWebSocketClient query' wsapp) - (mkClientEnv manager' (BaseUrl Http "localhost" 8081 "")) - return () +serveKeepLocked + :: B64Key + -> B64UUID ClientSide + -> B64UUID ServerSide + -> [B64UUID Bypass] + -> Maybe ConnectionKeepAlive + -> Maybe KeepAlive + -> S.SourceT IO UnlockRequest + -> Handler LockResult +serveKeepLocked k cu su _ _ _ unlockrequeststream = do + _ <- liftIO $ S.unSourceT unlockrequeststream go + return (LockResult False) where - wsapp conn = Websocket.sendTextData conn ("hello, world" :: T.Text) + go S.Stop = do + print "lost connection to client, drop lock here" -- XXX TODO + go (S.Error err) = do + print ("Error", err) + print "error, drop lock here" -- XXX TODO + go (S.Skip s) = go s + go (S.Effect ms) = ms >>= go + go (S.Yield (UnlockRequest False) s) = go s + go (S.Yield (UnlockRequest True) _) = do + print ("got unlock request, drop lock here") -- XXX TODO + +clientKeepLocked + :: P2P.ProtocolVersion + -> B64Key + -> B64UUID ClientSide + -> B64UUID ServerSide + -> [B64UUID Bypass] + -> Maybe ConnectionKeepAlive + -> Maybe KeepAlive + -> S.SourceT IO UnlockRequest + -> ClientM LockResult +clientKeepLocked (P2P.ProtocolVersion ver) = case ver of + 3 -> v3 + 2 -> v2 + 1 -> v1 + 0 -> v0 + _ -> error "unsupported protocol version" + where + _ :<|> _ :<|> _ :<|> _ :<|> + _ :<|> _ :<|> _ :<|> _ :<|> + _ :<|> _ :<|> _ :<|> _ :<|> + _ :<|> + _ :<|> + _ :<|> _ :<|> _ :<|> _ :<|> + _ :<|> _ :<|> _ :<|> + _ :<|> _ :<|> _ :<|> _ :<|> + v3 :<|> v2 :<|> v1 :<|> v0 :<|> _ = client p2pHttpAPI + +clientKeepLocked' + :: ClientEnv + -> P2P.ProtocolVersion + -> B64Key + -> B64UUID ClientSide + -> B64UUID ServerSide + -> [B64UUID Bypass] + -> TMVar Bool + -> IO () +clientKeepLocked' clientenv protover key cu su bypass keeplocked = do + let cli = clientKeepLocked protover key cu su bypass + (Just connectionKeepAlive) (Just keepAlive) + (S.fromStepT unlocksender) + withClientM cli clientenv $ \case + Left err -> throwM err + Right (LockResult _) -> + liftIO $ print "end of lock connection to server" + where + unlocksender = + S.Yield (UnlockRequest False) $ S.Effect $ do + liftIO $ print "sent keep locked request" + return $ S.Effect $ do + stilllock <- liftIO $ atomically $ takeTMVar keeplocked + if stilllock + then return unlocksender + else do + liftIO $ print "sending unlock request" + return $ S.Yield (UnlockRequest True) S.Stop + +testClientLock = do + mgr <- newManager defaultManagerSettings + burl <- parseBaseUrl "http://localhost:8080/" + keeplocked <- newEmptyTMVarIO + _ <- forkIO $ do + print "running, press enter to drop lock" + _ <- getLine + atomically $ writeTMVar keeplocked False + clientKeepLocked' (mkClientEnv mgr burl) + (P2P.ProtocolVersion 3) + (B64Key (fromJust $ deserializeKey "WORM--foo")) + (B64UUID (toUUID ("cu" :: String))) + (B64UUID (toUUID ("su" :: String))) + [] + keeplocked type ClientUUID req = QueryParam' '[req] "clientuuid" (B64UUID ClientSide) @@ -533,6 +593,34 @@ newtype Offset = Offset P2P.Offset newtype Timestamp = Timestamp MonotonicTimestamp deriving (Show) +newtype LockResult = LockResult Bool + deriving (Show, Generic, NFData) + +newtype UnlockRequest = UnlockRequest Bool + deriving (Show, Generic, NFData) + +newtype ConnectionKeepAlive = ConnectionKeepAlive T.Text + +connectionKeepAlive :: ConnectionKeepAlive +connectionKeepAlive = ConnectionKeepAlive "Keep-Alive" + +newtype KeepAlive = KeepAlive T.Text + +keepAlive :: KeepAlive +keepAlive = KeepAlive "timeout=1200" + +instance ToHttpApiData ConnectionKeepAlive where + toUrlPiece (ConnectionKeepAlive t) = t + +instance FromHttpApiData ConnectionKeepAlive where + parseUrlPiece = Right . ConnectionKeepAlive + +instance ToHttpApiData KeepAlive where + toUrlPiece (KeepAlive t) = t + +instance FromHttpApiData KeepAlive where + parseUrlPiece = Right . KeepAlive + instance ToHttpApiData B64Key where toUrlPiece (B64Key k) = TE.decodeUtf8Lenient $ toB64 (serializeKey' k) @@ -668,6 +756,22 @@ instance FromJSON (B64UUID t) where _ -> mempty parseJSON _ = mempty +instance ToJSON LockResult where + toJSON (LockResult v) = object + ["locked" .= v] + +instance FromJSON LockResult where + parseJSON = withObject "LockResult" $ \v -> LockResult + <$> v .: "locked" + +instance ToJSON UnlockRequest where + toJSON (UnlockRequest v) = object + ["unlock" .= v] + +instance FromJSON UnlockRequest where + parseJSON = withObject "UnlockRequest" $ \v -> UnlockRequest + <$> v .: "unlock" + plusList :: [B64UUID Plus] -> [String] plusList = map (\(B64UUID u) -> fromUUID u) diff --git a/doc/design/p2p_protocol_over_http/draft1.mdwn b/doc/design/p2p_protocol_over_http/draft1.mdwn index 6841c66cc7..5e889d7587 100644 --- a/doc/design/p2p_protocol_over_http/draft1.mdwn +++ b/doc/design/p2p_protocol_over_http/draft1.mdwn @@ -183,22 +183,15 @@ Locks the content of a key on the server, preventing it from being removed. Example: > POST /git-annex/v3/lockcontent?key=SHA1--foo&clientuuid=79a5a1f4-07e8-11ef-873d-97f93ca91925&serveruuid=ecf6d4ca-07e8-11ef-8990-9b8c1f696bf6 HTTP/1.1 - [websocket protocol follows] - < SUCCESS - > UNLOCKCONTENT + < {"locked": true} There is one required additional parameter, `key`. -This request opens a websocket between the client and the server. -The server sends "SUCCESS" over the websocket once it has locked -the content. Or it sends "FAILURE" if it is unable to lock the content. +The server will return `{"locked": true}` if it was able to lock the key, +or `{"locked": false}` if it was not. -Once the server has sent "SUCCESS", the content remains locked -until the client sends "UNLOCKCONTENT" over the websocket. - -If the client disconnects without sending "UNLOCKCONTENT", or the web -server gets shut down before it can receive that, the content will remain -locked for at least 10 minutes from when the server sent "SUCCESS". +The key will remain locked for 10 minutes. But, usually `keeplocked` +is used to control the lifetime of the lock. (See below.) ### POST /git-annex/v2/lockcontent @@ -212,6 +205,52 @@ Identical to v3. Identical to v3. +### POST /git-annex/v3/keeplocked + +Controls the lifetime of a lock on a key that was earlier obtained +with `lockcontent`. + +Example: + + > POST /git-annex/v3/keeplocked?key=SHA1--foo&clientuuid=79a5a1f4-07e8-11ef-873d-97f93ca91925&serveruuid=ecf6d4ca-07e8-11ef-8990-9b8c1f696bf6 HTTP/1.1 + > Connection: Keep-Alive + > Keep-Alive: timeout=1200 + [some time later] + > {"unlock": true} + < {"locked": false} + +There is one required additional parameter, `key`. + +This uses long polling. So it's important to use +Connection and Keep-Alive headers. + +This keeps an active lock from expiring until the client sends +`{"unlock": true}`, and then it immediately unlocks it. + +The client can send `{"unlock": false}` any number of times first. +This has no effect, but may be useful to keep the connection alive. + +This must be called within ten minutes of `lockcontent`, otherwise +the lock will have already expired when this runs. Note that this +does not indicate if the lock expired, it always returns +`{"locked": false}`. + +If the connection is closed before the client sends `{"unlock": true}, +or even if the web server gets shut down, the content will remain +locked for 10 minutes from the time it was first locked. + +### POST /git-annex/v2/keeplocked + +Identical to v3. + +### POST /git-annex/v1/keeplocked + +Identical to v3. + +### POST /git-annex/v0/keeplocked + +Identical to v3. + ### POST /git-annex/v3/remove Remove a key's content from the server. diff --git a/git-annex.cabal b/git-annex.cabal index d647668062..f15270d35d 100644 --- a/git-annex.cabal +++ b/git-annex.cabal @@ -320,9 +320,7 @@ Executable git-annex servant, servant-server, servant-client, - servant-client-core, - servant-websockets, - websockets + servant-client-core CPP-Options: -DWITH_SERVANT Other-Modules: Command.P2PHttp