convert lockcontent api to http long polling
Websockets would work, but the problem with using them for this is that each lockcontent call is a separate websocket connection. And that's an actual TCP connection. One TCP connection per file dropped would be too expensive. With http long polling, regular http pipelining can be used, so it will reuse a TCP connection. Unfortunately, at least with servant, bi-directional streams with long polling don't result in true bidirectional full duplex communication. Servant processes the whole client body stream before generating the server body stream. I think it's entirely possible to do full bi-directional communication over http, but it would need changes to servant. And, there's no way for the client to tell if the server successfully locked the content, since the server will keep processing the client stream no matter what.: So, added a new api endpoint, keeplocked. lockcontent will lock the key for 10 minutes with retention lock, and then a call to keeplocked will keep it locked for as long as needed. This does mean that there will need to be a Map of locks by key, and I will probably want to add some kind of lock identifier that lockcontent returns.
This commit is contained in:
parent
522700d1c4
commit
82d66ede5e
4 changed files with 228 additions and 84 deletions
|
@ -14,10 +14,13 @@ module Command.P2PHttp where
|
|||
import Command
|
||||
import P2P.Http
|
||||
|
||||
import qualified Network.Wai.Handler.Warp as Warp
|
||||
|
||||
cmd :: Command
|
||||
cmd = command "p2phttp" SectionPlumbing
|
||||
"communicate in P2P protocol over http"
|
||||
paramNothing (withParams seek)
|
||||
|
||||
seek :: CmdParams -> CommandSeek
|
||||
seek _ = liftIO $ P2P.Http.run
|
||||
seek ["server"] = liftIO $ Warp.run 8080 p2pHttpApp
|
||||
seek ["client"] = liftIO testClientLock
|
||||
|
|
240
P2P/Http.hs
240
P2P/Http.hs
|
@ -25,24 +25,16 @@ import Utility.MonotonicClock
|
|||
|
||||
import Servant
|
||||
import Servant.Client.Streaming
|
||||
import Servant.Client.Core.RunClient
|
||||
import qualified Servant.Client.Core.Request as R
|
||||
import qualified Servant.Types.SourceT as S
|
||||
import Servant.API.WebSocket
|
||||
import qualified Network.WebSockets as Websocket
|
||||
import qualified Network.WebSockets.Client as Websocket
|
||||
import Network.Wai
|
||||
import Network.Wai.Handler.Warp
|
||||
import Network.HTTP.Client (newManager, defaultManagerSettings, path)
|
||||
import Network.HTTP.Client (defaultManagerSettings, newManager)
|
||||
import qualified Data.Text as T
|
||||
import qualified Data.Text.Encoding as TE
|
||||
import qualified Data.ByteString as B
|
||||
import Text.Read (readMaybe)
|
||||
import Data.Aeson hiding (Key)
|
||||
import Data.Maybe
|
||||
import Data.Foldable
|
||||
import Control.Monad.Reader
|
||||
import Control.DeepSeq
|
||||
import Control.Concurrent
|
||||
import Control.Concurrent.STM
|
||||
import GHC.Generics
|
||||
|
||||
type P2PHttpAPI
|
||||
|
@ -82,13 +74,17 @@ type P2PHttpAPI
|
|||
:<|> "git-annex" :> "v2" :> "lockcontent" :> LockContentAPI
|
||||
:<|> "git-annex" :> "v1" :> "lockcontent" :> LockContentAPI
|
||||
:<|> "git-annex" :> "v0" :> "lockcontent" :> LockContentAPI
|
||||
:<|> "git-annex" :> "v3" :> "keeplocked" :> KeepLockedAPI
|
||||
:<|> "git-annex" :> "v2" :> "keeplocked" :> KeepLockedAPI
|
||||
:<|> "git-annex" :> "v1" :> "keeplocked" :> KeepLockedAPI
|
||||
:<|> "git-annex" :> "v0" :> "keeplocked" :> KeepLockedAPI
|
||||
:<|> "git-annex" :> "key" :> CaptureKey :> GetAPI '[]
|
||||
|
||||
p2pHttpAPI :: Proxy P2PHttpAPI
|
||||
p2pHttpAPI = Proxy
|
||||
|
||||
p2pHttp :: Application
|
||||
p2pHttp = serve p2pHttpAPI serveP2pHttp
|
||||
p2pHttpApp :: Application
|
||||
p2pHttpApp = serve p2pHttpAPI serveP2pHttp
|
||||
|
||||
serveP2pHttp :: Server P2PHttpAPI
|
||||
serveP2pHttp
|
||||
|
@ -117,6 +113,10 @@ serveP2pHttp
|
|||
:<|> serveLockContent
|
||||
:<|> serveLockContent
|
||||
:<|> serveLockContent
|
||||
:<|> serveKeepLocked
|
||||
:<|> serveKeepLocked
|
||||
:<|> serveKeepLocked
|
||||
:<|> serveKeepLocked
|
||||
:<|> serveGet0
|
||||
|
||||
type GetAPI headers
|
||||
|
@ -388,33 +388,29 @@ type LockContentAPI
|
|||
:> ClientUUID Required
|
||||
:> ServerUUID Required
|
||||
:> BypassUUIDs
|
||||
:> WebSocket
|
||||
:> Post '[JSON] LockResult
|
||||
|
||||
serveLockContent
|
||||
:: B64Key
|
||||
-> B64UUID ClientSide
|
||||
-> B64UUID ServerSide
|
||||
-> [B64UUID Bypass]
|
||||
-> Websocket.Connection
|
||||
-> Handler ()
|
||||
-> Handler LockResult
|
||||
serveLockContent = undefined
|
||||
|
||||
data WebSocketClient = WebSocketClient R.Request
|
||||
|
||||
-- XXX this is enough to let servant-client work, but it's not yet
|
||||
-- possible to run a WebSocketClient.
|
||||
instance RunClient m => HasClient m WebSocket where
|
||||
type Client m WebSocket = WebSocketClient
|
||||
clientWithRoute _pm Proxy req = WebSocketClient req
|
||||
hoistClientMonad _ _ _ w = w
|
||||
|
||||
clientLockContent
|
||||
:: B64Key
|
||||
:: P2P.ProtocolVersion
|
||||
-> B64Key
|
||||
-> B64UUID ClientSide
|
||||
-> B64UUID ServerSide
|
||||
-> [B64UUID Bypass]
|
||||
-> WebSocketClient
|
||||
clientLockContent = v3
|
||||
-> ClientM LockResult
|
||||
clientLockContent (P2P.ProtocolVersion ver) = case ver of
|
||||
3 -> v3
|
||||
2 -> v2
|
||||
1 -> v1
|
||||
0 -> v0
|
||||
_ -> error "unsupported protocol version"
|
||||
where
|
||||
_ :<|> _ :<|> _ :<|> _ :<|>
|
||||
_ :<|> _ :<|> _ :<|> _ :<|>
|
||||
|
@ -423,49 +419,113 @@ clientLockContent = v3
|
|||
_ :<|>
|
||||
_ :<|> _ :<|> _ :<|> _ :<|>
|
||||
_ :<|> _ :<|> _ :<|>
|
||||
v3 :<|> _ = client p2pHttpAPI
|
||||
-- XXX add other protocol versions
|
||||
v3 :<|> v2 :<|> v1 :<|> v0 :<|> _ = client p2pHttpAPI
|
||||
|
||||
--XXX test code
|
||||
query :: ClientM PutOffsetResultPlus
|
||||
query = do
|
||||
clientPutOffset (P2P.ProtocolVersion 3)
|
||||
(B64Key (fromJust $ deserializeKey "WORM--foo"))
|
||||
(B64UUID (toUUID ("client" :: String)))
|
||||
(B64UUID (toUUID ("server" :: String)))
|
||||
[]
|
||||
type KeepLockedAPI
|
||||
= KeyParam
|
||||
:> ClientUUID Required
|
||||
:> ServerUUID Required
|
||||
:> BypassUUIDs
|
||||
:> Header "Connection" ConnectionKeepAlive
|
||||
:> Header "Keep-Alive" KeepAlive
|
||||
:> StreamBody NewlineFraming JSON (SourceIO UnlockRequest)
|
||||
:> Post '[JSON] LockResult
|
||||
|
||||
--XXX test code
|
||||
query' :: WebSocketClient
|
||||
query' = do
|
||||
clientLockContent (B64Key (fromJust $ deserializeKey "WORM--foo"))
|
||||
(B64UUID (toUUID ("client" :: String)))
|
||||
(B64UUID (toUUID ("server" :: String)))
|
||||
[]
|
||||
|
||||
runWebSocketClient :: WebSocketClient -> Websocket.ClientApp a -> ClientM a
|
||||
runWebSocketClient (WebSocketClient req) app = do
|
||||
clientenv <- ask
|
||||
let burl = baseUrl clientenv
|
||||
let creq = defaultMakeClientRequest burl req
|
||||
case baseUrlScheme burl of
|
||||
Http -> liftIO $ Websocket.runClient
|
||||
(baseUrlHost burl)
|
||||
(baseUrlPort burl)
|
||||
(decodeBS (path creq))
|
||||
app
|
||||
Https -> error "TODO" -- XXX
|
||||
|
||||
--XXX test code
|
||||
run :: IO ()
|
||||
run = do
|
||||
manager' <- newManager defaultManagerSettings
|
||||
let WebSocketClient wscreq = query'
|
||||
_ <- runClientM (runWebSocketClient query' wsapp)
|
||||
(mkClientEnv manager' (BaseUrl Http "localhost" 8081 ""))
|
||||
return ()
|
||||
serveKeepLocked
|
||||
:: B64Key
|
||||
-> B64UUID ClientSide
|
||||
-> B64UUID ServerSide
|
||||
-> [B64UUID Bypass]
|
||||
-> Maybe ConnectionKeepAlive
|
||||
-> Maybe KeepAlive
|
||||
-> S.SourceT IO UnlockRequest
|
||||
-> Handler LockResult
|
||||
serveKeepLocked k cu su _ _ _ unlockrequeststream = do
|
||||
_ <- liftIO $ S.unSourceT unlockrequeststream go
|
||||
return (LockResult False)
|
||||
where
|
||||
wsapp conn = Websocket.sendTextData conn ("hello, world" :: T.Text)
|
||||
go S.Stop = do
|
||||
print "lost connection to client, drop lock here" -- XXX TODO
|
||||
go (S.Error err) = do
|
||||
print ("Error", err)
|
||||
print "error, drop lock here" -- XXX TODO
|
||||
go (S.Skip s) = go s
|
||||
go (S.Effect ms) = ms >>= go
|
||||
go (S.Yield (UnlockRequest False) s) = go s
|
||||
go (S.Yield (UnlockRequest True) _) = do
|
||||
print ("got unlock request, drop lock here") -- XXX TODO
|
||||
|
||||
clientKeepLocked
|
||||
:: P2P.ProtocolVersion
|
||||
-> B64Key
|
||||
-> B64UUID ClientSide
|
||||
-> B64UUID ServerSide
|
||||
-> [B64UUID Bypass]
|
||||
-> Maybe ConnectionKeepAlive
|
||||
-> Maybe KeepAlive
|
||||
-> S.SourceT IO UnlockRequest
|
||||
-> ClientM LockResult
|
||||
clientKeepLocked (P2P.ProtocolVersion ver) = case ver of
|
||||
3 -> v3
|
||||
2 -> v2
|
||||
1 -> v1
|
||||
0 -> v0
|
||||
_ -> error "unsupported protocol version"
|
||||
where
|
||||
_ :<|> _ :<|> _ :<|> _ :<|>
|
||||
_ :<|> _ :<|> _ :<|> _ :<|>
|
||||
_ :<|> _ :<|> _ :<|> _ :<|>
|
||||
_ :<|>
|
||||
_ :<|>
|
||||
_ :<|> _ :<|> _ :<|> _ :<|>
|
||||
_ :<|> _ :<|> _ :<|>
|
||||
_ :<|> _ :<|> _ :<|> _ :<|>
|
||||
v3 :<|> v2 :<|> v1 :<|> v0 :<|> _ = client p2pHttpAPI
|
||||
|
||||
clientKeepLocked'
|
||||
:: ClientEnv
|
||||
-> P2P.ProtocolVersion
|
||||
-> B64Key
|
||||
-> B64UUID ClientSide
|
||||
-> B64UUID ServerSide
|
||||
-> [B64UUID Bypass]
|
||||
-> TMVar Bool
|
||||
-> IO ()
|
||||
clientKeepLocked' clientenv protover key cu su bypass keeplocked = do
|
||||
let cli = clientKeepLocked protover key cu su bypass
|
||||
(Just connectionKeepAlive) (Just keepAlive)
|
||||
(S.fromStepT unlocksender)
|
||||
withClientM cli clientenv $ \case
|
||||
Left err -> throwM err
|
||||
Right (LockResult _) ->
|
||||
liftIO $ print "end of lock connection to server"
|
||||
where
|
||||
unlocksender =
|
||||
S.Yield (UnlockRequest False) $ S.Effect $ do
|
||||
liftIO $ print "sent keep locked request"
|
||||
return $ S.Effect $ do
|
||||
stilllock <- liftIO $ atomically $ takeTMVar keeplocked
|
||||
if stilllock
|
||||
then return unlocksender
|
||||
else do
|
||||
liftIO $ print "sending unlock request"
|
||||
return $ S.Yield (UnlockRequest True) S.Stop
|
||||
|
||||
testClientLock = do
|
||||
mgr <- newManager defaultManagerSettings
|
||||
burl <- parseBaseUrl "http://localhost:8080/"
|
||||
keeplocked <- newEmptyTMVarIO
|
||||
_ <- forkIO $ do
|
||||
print "running, press enter to drop lock"
|
||||
_ <- getLine
|
||||
atomically $ writeTMVar keeplocked False
|
||||
clientKeepLocked' (mkClientEnv mgr burl)
|
||||
(P2P.ProtocolVersion 3)
|
||||
(B64Key (fromJust $ deserializeKey "WORM--foo"))
|
||||
(B64UUID (toUUID ("cu" :: String)))
|
||||
(B64UUID (toUUID ("su" :: String)))
|
||||
[]
|
||||
keeplocked
|
||||
|
||||
type ClientUUID req = QueryParam' '[req] "clientuuid" (B64UUID ClientSide)
|
||||
|
||||
|
@ -533,6 +593,34 @@ newtype Offset = Offset P2P.Offset
|
|||
newtype Timestamp = Timestamp MonotonicTimestamp
|
||||
deriving (Show)
|
||||
|
||||
newtype LockResult = LockResult Bool
|
||||
deriving (Show, Generic, NFData)
|
||||
|
||||
newtype UnlockRequest = UnlockRequest Bool
|
||||
deriving (Show, Generic, NFData)
|
||||
|
||||
newtype ConnectionKeepAlive = ConnectionKeepAlive T.Text
|
||||
|
||||
connectionKeepAlive :: ConnectionKeepAlive
|
||||
connectionKeepAlive = ConnectionKeepAlive "Keep-Alive"
|
||||
|
||||
newtype KeepAlive = KeepAlive T.Text
|
||||
|
||||
keepAlive :: KeepAlive
|
||||
keepAlive = KeepAlive "timeout=1200"
|
||||
|
||||
instance ToHttpApiData ConnectionKeepAlive where
|
||||
toUrlPiece (ConnectionKeepAlive t) = t
|
||||
|
||||
instance FromHttpApiData ConnectionKeepAlive where
|
||||
parseUrlPiece = Right . ConnectionKeepAlive
|
||||
|
||||
instance ToHttpApiData KeepAlive where
|
||||
toUrlPiece (KeepAlive t) = t
|
||||
|
||||
instance FromHttpApiData KeepAlive where
|
||||
parseUrlPiece = Right . KeepAlive
|
||||
|
||||
instance ToHttpApiData B64Key where
|
||||
toUrlPiece (B64Key k) = TE.decodeUtf8Lenient $
|
||||
toB64 (serializeKey' k)
|
||||
|
@ -668,6 +756,22 @@ instance FromJSON (B64UUID t) where
|
|||
_ -> mempty
|
||||
parseJSON _ = mempty
|
||||
|
||||
instance ToJSON LockResult where
|
||||
toJSON (LockResult v) = object
|
||||
["locked" .= v]
|
||||
|
||||
instance FromJSON LockResult where
|
||||
parseJSON = withObject "LockResult" $ \v -> LockResult
|
||||
<$> v .: "locked"
|
||||
|
||||
instance ToJSON UnlockRequest where
|
||||
toJSON (UnlockRequest v) = object
|
||||
["unlock" .= v]
|
||||
|
||||
instance FromJSON UnlockRequest where
|
||||
parseJSON = withObject "UnlockRequest" $ \v -> UnlockRequest
|
||||
<$> v .: "unlock"
|
||||
|
||||
plusList :: [B64UUID Plus] -> [String]
|
||||
plusList = map (\(B64UUID u) -> fromUUID u)
|
||||
|
||||
|
|
|
@ -183,22 +183,15 @@ Locks the content of a key on the server, preventing it from being removed.
|
|||
Example:
|
||||
|
||||
> POST /git-annex/v3/lockcontent?key=SHA1--foo&clientuuid=79a5a1f4-07e8-11ef-873d-97f93ca91925&serveruuid=ecf6d4ca-07e8-11ef-8990-9b8c1f696bf6 HTTP/1.1
|
||||
[websocket protocol follows]
|
||||
< SUCCESS
|
||||
> UNLOCKCONTENT
|
||||
< {"locked": true}
|
||||
|
||||
There is one required additional parameter, `key`.
|
||||
|
||||
This request opens a websocket between the client and the server.
|
||||
The server sends "SUCCESS" over the websocket once it has locked
|
||||
the content. Or it sends "FAILURE" if it is unable to lock the content.
|
||||
The server will return `{"locked": true}` if it was able to lock the key,
|
||||
or `{"locked": false}` if it was not.
|
||||
|
||||
Once the server has sent "SUCCESS", the content remains locked
|
||||
until the client sends "UNLOCKCONTENT" over the websocket.
|
||||
|
||||
If the client disconnects without sending "UNLOCKCONTENT", or the web
|
||||
server gets shut down before it can receive that, the content will remain
|
||||
locked for at least 10 minutes from when the server sent "SUCCESS".
|
||||
The key will remain locked for 10 minutes. But, usually `keeplocked`
|
||||
is used to control the lifetime of the lock. (See below.)
|
||||
|
||||
### POST /git-annex/v2/lockcontent
|
||||
|
||||
|
@ -212,6 +205,52 @@ Identical to v3.
|
|||
|
||||
Identical to v3.
|
||||
|
||||
### POST /git-annex/v3/keeplocked
|
||||
|
||||
Controls the lifetime of a lock on a key that was earlier obtained
|
||||
with `lockcontent`.
|
||||
|
||||
Example:
|
||||
|
||||
> POST /git-annex/v3/keeplocked?key=SHA1--foo&clientuuid=79a5a1f4-07e8-11ef-873d-97f93ca91925&serveruuid=ecf6d4ca-07e8-11ef-8990-9b8c1f696bf6 HTTP/1.1
|
||||
> Connection: Keep-Alive
|
||||
> Keep-Alive: timeout=1200
|
||||
[some time later]
|
||||
> {"unlock": true}
|
||||
< {"locked": false}
|
||||
|
||||
There is one required additional parameter, `key`.
|
||||
|
||||
This uses long polling. So it's important to use
|
||||
Connection and Keep-Alive headers.
|
||||
|
||||
This keeps an active lock from expiring until the client sends
|
||||
`{"unlock": true}`, and then it immediately unlocks it.
|
||||
|
||||
The client can send `{"unlock": false}` any number of times first.
|
||||
This has no effect, but may be useful to keep the connection alive.
|
||||
|
||||
This must be called within ten minutes of `lockcontent`, otherwise
|
||||
the lock will have already expired when this runs. Note that this
|
||||
does not indicate if the lock expired, it always returns
|
||||
`{"locked": false}`.
|
||||
|
||||
If the connection is closed before the client sends `{"unlock": true},
|
||||
or even if the web server gets shut down, the content will remain
|
||||
locked for 10 minutes from the time it was first locked.
|
||||
|
||||
### POST /git-annex/v2/keeplocked
|
||||
|
||||
Identical to v3.
|
||||
|
||||
### POST /git-annex/v1/keeplocked
|
||||
|
||||
Identical to v3.
|
||||
|
||||
### POST /git-annex/v0/keeplocked
|
||||
|
||||
Identical to v3.
|
||||
|
||||
### POST /git-annex/v3/remove
|
||||
|
||||
Remove a key's content from the server.
|
||||
|
|
|
@ -320,9 +320,7 @@ Executable git-annex
|
|||
servant,
|
||||
servant-server,
|
||||
servant-client,
|
||||
servant-client-core,
|
||||
servant-websockets,
|
||||
websockets
|
||||
servant-client-core
|
||||
CPP-Options: -DWITH_SERVANT
|
||||
Other-Modules:
|
||||
Command.P2PHttp
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue