diff --git a/Command/P2PHttp.hs b/Command/P2PHttp.hs index 6b0cb92ac6..5393d65b0d 100644 --- a/Command/P2PHttp.hs +++ b/Command/P2PHttp.hs @@ -13,8 +13,13 @@ module Command.P2PHttp where import Command import P2P.Http +import qualified P2P.Protocol as P2P +import Annex.Url import qualified Network.Wai.Handler.Warp as Warp +import Servant.Client.Streaming +import Control.Concurrent +import Control.Concurrent.STM cmd :: Command cmd = command "p2phttp" SectionPlumbing @@ -22,7 +27,35 @@ cmd = command "p2phttp" SectionPlumbing paramNothing (withParams seek) seek :: CmdParams -> CommandSeek -seek ["server"] = liftIO $ do - st <- mkP2PHttpServerState - Warp.run 8080 (p2pHttpApp st) -seek ["client"] = liftIO testClientLock +seek ["server"] = startConcurrency commandStages $ + withLocalP2PConnections $ \acquireconn -> liftIO $ do + st <- mkP2PHttpServerState acquireconn + Warp.run 8080 (p2pHttpApp st) +seek ["client"] = testCheckPresent + +testKeepLocked = do + mgr <- httpManager <$> getUrlOptions + burl <- liftIO $ parseBaseUrl "http://localhost:8080/" + keeplocked <- liftIO newEmptyTMVarIO + _ <- liftIO $ forkIO $ do + print "running, press enter to drop lock" + _ <- getLine + atomically $ writeTMVar keeplocked False + liftIO $ clientKeepLocked (mkClientEnv mgr burl) + (P2P.ProtocolVersion 3) + (B64UUID (toUUID ("lck" :: String))) + (B64UUID (toUUID ("cu" :: String))) + (B64UUID (toUUID ("su" :: String))) + [] + keeplocked + +testCheckPresent = do + mgr <- httpManager <$> getUrlOptions + burl <- liftIO $ parseBaseUrl "http://localhost:8080/" + res <- liftIO $ clientCheckPresent (mkClientEnv mgr burl) + (P2P.ProtocolVersion 3) + (B64Key (fromJust $ deserializeKey ("WORM--foo" :: String))) + (B64UUID (toUUID ("cu" :: String))) + (B64UUID (toUUID ("f11773f0-11e1-45b2-9805-06db16768efe" :: String))) + [] + liftIO $ print res diff --git a/P2P/Http.hs b/P2P/Http.hs index cf597b4d0f..f015f372e8 100644 --- a/P2P/Http.hs +++ b/P2P/Http.hs @@ -21,14 +21,13 @@ module P2P.Http ( import Annex.Common import P2P.Http.Types import P2P.Http.State -import qualified P2P.Protocol as P2P +import P2P.Protocol hiding (Offset, Bypass) +import P2P.IO import Servant import Servant.Client.Streaming import qualified Servant.Types.SourceT as S -import Network.HTTP.Client (defaultManagerSettings, newManager) import qualified Data.ByteString as B -import Control.Concurrent import Control.Concurrent.STM type P2PHttpAPI @@ -137,7 +136,7 @@ serveGet serveGet = undefined clientGet - :: P2P.ProtocolVersion + :: ProtocolVersion -> B64Key -> Maybe (B64UUID ClientSide) -> Maybe (B64UUID ServerSide) @@ -145,7 +144,7 @@ clientGet -> Maybe B64FilePath -> Maybe Offset -> ClientM (Headers '[DataLengthHeader] (S.SourceT IO B.ByteString)) -clientGet (P2P.ProtocolVersion ver) = case ver of +clientGet (ProtocolVersion ver) = case ver of 3 -> v3 V3 2 -> v2 V2 1 -> v1 V1 @@ -171,22 +170,21 @@ serveCheckPresent -> [B64UUID Bypass] -> Handler CheckPresentResult serveCheckPresent st apiver (B64Key k) cu su bypass = do - res <- liftIO $ inP2PConnection st cu su bypass $ P2P.checkPresent k + res <- withP2PConnection apiver st cu su bypass $ \runst conn -> + liftIO $ runNetProto runst conn $ checkPresent k case res of Right (Right b) -> return (CheckPresentResult b) - Right (Left err) -> - throwError $ err500 { errBody = encodeBL err } - Left err -> - throwError $ err500 { errBody = encodeBL err } + Right (Left err) -> throwError $ err500 { errBody = encodeBL err } + Left err -> throwError $ err500 { errBody = encodeBL (describeProtoFailure err) } -clientCheckPresent - :: P2P.ProtocolVersion +clientCheckPresent' + :: ProtocolVersion -> B64Key -> B64UUID ClientSide -> B64UUID ServerSide -> [B64UUID Bypass] -> ClientM CheckPresentResult -clientCheckPresent (P2P.ProtocolVersion ver) = case ver of +clientCheckPresent' (ProtocolVersion ver) = case ver of 3 -> v3 V3 2 -> v2 V2 1 -> v1 V1 @@ -196,6 +194,20 @@ clientCheckPresent (P2P.ProtocolVersion ver) = case ver of _ :<|> _ :<|> _ :<|> _ :<|> v3 :<|> v2 :<|> v1 :<|> v0 :<|> _ = client p2pHttpAPI +clientCheckPresent + :: ClientEnv + -> ProtocolVersion + -> B64Key + -> B64UUID ClientSide + -> B64UUID ServerSide + -> [B64UUID Bypass] + -> IO Bool +clientCheckPresent clientenv protover key cu su bypass = do + let cli = clientCheckPresent' protover key cu su bypass + withClientM cli clientenv $ \case + Left err -> throwM err + Right (CheckPresentResult res) -> return res + type RemoveAPI result = KeyParam :> ClientUUID Required @@ -216,13 +228,13 @@ serveRemove serveRemove = undefined clientRemove - :: P2P.ProtocolVersion + :: ProtocolVersion -> B64Key -> B64UUID ClientSide -> B64UUID ServerSide -> [B64UUID Bypass] -> ClientM RemoveResultPlus -clientRemove (P2P.ProtocolVersion ver) k cu su bypass = case ver of +clientRemove (ProtocolVersion ver) k cu su bypass = case ver of 3 -> v3 V3 k cu su bypass 2 -> v2 V2 k cu su bypass 1 -> plus <$> v1 V1 k cu su bypass @@ -254,14 +266,14 @@ serveRemoveBefore serveRemoveBefore = undefined clientRemoveBefore - :: P2P.ProtocolVersion + :: ProtocolVersion -> B64Key -> B64UUID ClientSide -> B64UUID ServerSide -> [B64UUID Bypass] -> Timestamp -> ClientM RemoveResult -clientRemoveBefore (P2P.ProtocolVersion ver) = case ver of +clientRemoveBefore (ProtocolVersion ver) = case ver of 3 -> v3 V3 _ -> error "unsupported protocol version" where @@ -288,12 +300,12 @@ serveGetTimestamp serveGetTimestamp = undefined clientGetTimestamp - :: P2P.ProtocolVersion + :: ProtocolVersion -> B64UUID ClientSide -> B64UUID ServerSide -> [B64UUID Bypass] -> ClientM GetTimestampResult -clientGetTimestamp (P2P.ProtocolVersion ver) = case ver of +clientGetTimestamp (ProtocolVersion ver) = case ver of 3 -> v3 V3 _ -> error "unsupported protocol version" where @@ -332,7 +344,7 @@ servePut servePut = undefined clientPut - :: P2P.ProtocolVersion + :: ProtocolVersion -> Maybe Integer -> B64Key -> B64UUID ClientSide @@ -343,7 +355,7 @@ clientPut -> DataLength -> S.SourceT IO B.ByteString -> ClientM PutResultPlus -clientPut (P2P.ProtocolVersion ver) sz k cu su bypass af o l src = case ver of +clientPut (ProtocolVersion ver) sz k cu su bypass af o l src = case ver of 3 -> v3 V3 sz k cu su bypass af o l src 2 -> v2 V2 sz k cu su bypass af o l src 1 -> plus <$> v1 V1 sz k cu su bypass af o l src @@ -377,13 +389,13 @@ servePutOffset servePutOffset = undefined clientPutOffset - :: P2P.ProtocolVersion + :: ProtocolVersion -> B64Key -> B64UUID ClientSide -> B64UUID ServerSide -> [B64UUID Bypass] -> ClientM PutOffsetResultPlus -clientPutOffset (P2P.ProtocolVersion ver) = case ver of +clientPutOffset (ProtocolVersion ver) = case ver of 3 -> v3 V3 2 -> v2 V2 _ -> error "unsupported protocol version" @@ -415,13 +427,13 @@ serveLockContent serveLockContent = undefined clientLockContent - :: P2P.ProtocolVersion + :: ProtocolVersion -> B64Key -> B64UUID ClientSide -> B64UUID ServerSide -> [B64UUID Bypass] -> ClientM LockResult -clientLockContent (P2P.ProtocolVersion ver) = case ver of +clientLockContent (ProtocolVersion ver) = case ver of 3 -> v3 V3 2 -> v2 V2 1 -> v1 V1 @@ -470,8 +482,8 @@ serveKeepLocked st apiver lckid cu su _ _ _ unlockrequeststream = do go (S.Yield (UnlockRequest False) s) = go s go (S.Yield (UnlockRequest True) _) = dropLock lckid st -clientKeepLocked - :: P2P.ProtocolVersion +clientKeepLocked' + :: ProtocolVersion -> LockID -> B64UUID ClientSide -> B64UUID ServerSide @@ -480,7 +492,7 @@ clientKeepLocked -> Maybe KeepAlive -> S.SourceT IO UnlockRequest -> ClientM LockResult -clientKeepLocked (P2P.ProtocolVersion ver) = case ver of +clientKeepLocked' (ProtocolVersion ver) = case ver of 3 -> v3 V3 2 -> v2 V2 1 -> v1 V1 @@ -497,17 +509,17 @@ clientKeepLocked (P2P.ProtocolVersion ver) = case ver of _ :<|> _ :<|> _ :<|> _ :<|> v3 :<|> v2 :<|> v1 :<|> v0 :<|> _ = client p2pHttpAPI -clientKeepLocked' +clientKeepLocked :: ClientEnv - -> P2P.ProtocolVersion + -> ProtocolVersion -> LockID -> B64UUID ClientSide -> B64UUID ServerSide -> [B64UUID Bypass] -> TMVar Bool -> IO () -clientKeepLocked' clientenv protover lckid cu su bypass keeplocked = do - let cli = clientKeepLocked protover lckid cu su bypass +clientKeepLocked clientenv protover lckid cu su bypass keeplocked = do + let cli = clientKeepLocked' protover lckid cu su bypass (Just connectionKeepAlive) (Just keepAlive) (S.fromStepT unlocksender) withClientM cli clientenv $ \case @@ -526,22 +538,6 @@ clientKeepLocked' clientenv protover lckid cu su bypass keeplocked = do liftIO $ print "sending unlock request" return $ S.Yield (UnlockRequest True) S.Stop -testClientLock = do - mgr <- newManager defaultManagerSettings - burl <- parseBaseUrl "http://localhost:8080/" - keeplocked <- newEmptyTMVarIO - _ <- forkIO $ do - print "running, press enter to drop lock" - _ <- getLine - atomically $ writeTMVar keeplocked False - clientKeepLocked' (mkClientEnv mgr burl) - (P2P.ProtocolVersion 3) - (B64UUID (toUUID ("lck" :: String))) - (B64UUID (toUUID ("cu" :: String))) - (B64UUID (toUUID ("su" :: String))) - [] - keeplocked - type PV3 = Capture "v3" V3 type PV2 = Capture "v2" V2 diff --git a/P2P/Http/State.hs b/P2P/Http/State.hs index 36ef235fc8..a6ab76f1fb 100644 --- a/P2P/Http/State.hs +++ b/P2P/Http/State.hs @@ -13,29 +13,134 @@ module P2P.Http.State where import Annex.Common import P2P.Http.Types -import Annex.UUID (genUUID) import qualified P2P.Protocol as P2P +import P2P.IO +import P2P.Annex +import Annex.UUID +import Annex.Concurrent +import Servant import qualified Data.Map as M import Control.Concurrent.Async import Control.Concurrent.STM data P2PHttpServerState = P2PHttpServerState - { openLocks :: TMVar (M.Map LockID Locker) + { acquireP2PConnection :: AcquireP2PConnection + , openLocks :: TMVar (M.Map LockID Locker) } -mkP2PHttpServerState :: IO P2PHttpServerState -mkP2PHttpServerState = P2PHttpServerState - <$> newTMVarIO mempty +mkP2PHttpServerState :: AcquireP2PConnection -> IO P2PHttpServerState +mkP2PHttpServerState acquireconn = P2PHttpServerState + <$> pure acquireconn + <*> newTMVarIO mempty -inP2PConnection - :: P2PHttpServerState +withP2PConnection + :: APIVersion v + => v + -> P2PHttpServerState -> B64UUID ClientSide -> B64UUID ServerSide -> [B64UUID Bypass] - -> P2P.Proto a - -> IO (Either String a) -inP2PConnection st cu su bypass a = undefined + -> (RunState -> P2PConnection -> Handler a) + -> Handler a +withP2PConnection apiver st cu su bypass connaction = do + liftIO (acquireP2PConnection st cp) >>= \case + Left (ConnectionFailed err) -> + throwError err502 { errBody = encodeBL err } + Left TooManyConnections -> + throwError err503 + Right (runst, conn, releaseconn) -> + connaction runst conn + `finally` liftIO releaseconn + where + cp = ConnectionParams + { connectionProtocolVersion = protocolVersion apiver + , connectionServerUUID = fromB64UUID su + , connectionClientUUID = fromB64UUID cu + , connectionBypass = map fromB64UUID bypass + , connectionServerMode = P2P.ServeReadWrite -- XXX auth + } + +data ConnectionParams = ConnectionParams + { connectionProtocolVersion :: P2P.ProtocolVersion + , connectionServerUUID :: UUID + , connectionClientUUID :: UUID + , connectionBypass :: [UUID] + , connectionServerMode :: P2P.ServerMode + } + deriving (Show, Eq, Ord) + +data ConnectionProblem + = ConnectionFailed String + | TooManyConnections + deriving (Show, Eq) + +type AcquireP2PConnection = + ConnectionParams -> IO + ( Either ConnectionProblem + ( RunState + , P2PConnection + , IO () -- ^ release connection + ) + ) + +{- Runs P2P actions in the local repository only. -} +-- TODO need worker pool, this can only service a single request at +-- a time. +-- TODO proxies +-- TODO clusters +withLocalP2PConnections :: (AcquireP2PConnection -> Annex a) -> Annex a +withLocalP2PConnections a = do + reqv <- liftIO newEmptyTMVarIO + relv <- liftIO newEmptyTMVarIO + asyncservicer <- liftIO . async =<< forkState (servicer reqv relv) + a (acquireconn reqv) `finally` join (liftIO (wait asyncservicer)) + where + acquireconn reqv connparams = do + respvar <- newEmptyTMVarIO + liftIO $ atomically $ putTMVar reqv (connparams, respvar) + liftIO $ atomically $ takeTMVar respvar + + servicer reqv relv = do + reqrel <- liftIO $ + atomically $ + (Right <$> takeTMVar reqv) + `orElse` + (Left <$> takeTMVar relv) + case reqrel of + Right (connparams, respvar) -> + servicereq relv connparams respvar + Left releaseconn -> releaseconn + servicer reqv relv + + servicereq relv connparams respvar = do + myuuid <- getUUID + resp <- if connectionServerUUID connparams /= myuuid + then return $ Left $ ConnectionFailed "unknown uuid" + else do + runst <- liftIO $ mkrunst connparams + -- TODO not this, need one with MVars. + let conn = stdioP2PConnection Nothing + -- TODO is this right? It needs to exit + -- when the client stops sending messages. + let server = P2P.serveAuthed + (connectionServerMode connparams) + (connectionServerUUID connparams) + let protorunner = void $ + runFullProto runst conn server + asyncworker <- liftIO . async + =<< forkState protorunner + let releaseconn = atomically $ putTMVar relv $ + join (liftIO (wait asyncworker)) + return $ Right (runst, conn, releaseconn) + liftIO $ atomically $ putTMVar respvar resp + + mkrunst connparams = do + prototvar <- newTVarIO $ connectionProtocolVersion connparams + mkRunState $ const $ Serving + (connectionClientUUID connparams) + Nothing + prototvar data Locker = Locker { lockerThread :: Async () diff --git a/P2P/Protocol.hs b/P2P/Protocol.hs index 9ec1451b1d..1225801a01 100644 --- a/P2P/Protocol.hs +++ b/P2P/Protocol.hs @@ -526,7 +526,7 @@ data ServerMode -- ^ Allow reading, and storing new objects, but not deleting objects. | ServeReadWrite -- ^ Full read and write access. - deriving (Eq, Ord) + deriving (Show, Eq, Ord) -- | Serve the protocol, with a peer that has authenticated. serveAuthed :: ServerMode -> UUID -> Proto ()