p2phttp is almost working for checkpresent

The server is fully running annex actions, only the P2PConnection is
wrong, currently using stdio.
This commit is contained in:
Joey Hess 2024-07-09 13:37:55 -04:00
parent a3dd8b4bcb
commit edf8a3df2d
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
4 changed files with 197 additions and 63 deletions

View file

@ -13,8 +13,13 @@ module Command.P2PHttp where
import Command import Command
import P2P.Http import P2P.Http
import qualified P2P.Protocol as P2P
import Annex.Url
import qualified Network.Wai.Handler.Warp as Warp import qualified Network.Wai.Handler.Warp as Warp
import Servant.Client.Streaming
import Control.Concurrent
import Control.Concurrent.STM
cmd :: Command cmd :: Command
cmd = command "p2phttp" SectionPlumbing cmd = command "p2phttp" SectionPlumbing
@ -22,7 +27,35 @@ cmd = command "p2phttp" SectionPlumbing
paramNothing (withParams seek) paramNothing (withParams seek)
seek :: CmdParams -> CommandSeek seek :: CmdParams -> CommandSeek
seek ["server"] = liftIO $ do seek ["server"] = startConcurrency commandStages $
st <- mkP2PHttpServerState withLocalP2PConnections $ \acquireconn -> liftIO $ do
Warp.run 8080 (p2pHttpApp st) st <- mkP2PHttpServerState acquireconn
seek ["client"] = liftIO testClientLock Warp.run 8080 (p2pHttpApp st)
seek ["client"] = testCheckPresent
testKeepLocked = do
mgr <- httpManager <$> getUrlOptions
burl <- liftIO $ parseBaseUrl "http://localhost:8080/"
keeplocked <- liftIO newEmptyTMVarIO
_ <- liftIO $ forkIO $ do
print "running, press enter to drop lock"
_ <- getLine
atomically $ writeTMVar keeplocked False
liftIO $ clientKeepLocked (mkClientEnv mgr burl)
(P2P.ProtocolVersion 3)
(B64UUID (toUUID ("lck" :: String)))
(B64UUID (toUUID ("cu" :: String)))
(B64UUID (toUUID ("su" :: String)))
[]
keeplocked
testCheckPresent = do
mgr <- httpManager <$> getUrlOptions
burl <- liftIO $ parseBaseUrl "http://localhost:8080/"
res <- liftIO $ clientCheckPresent (mkClientEnv mgr burl)
(P2P.ProtocolVersion 3)
(B64Key (fromJust $ deserializeKey ("WORM--foo" :: String)))
(B64UUID (toUUID ("cu" :: String)))
(B64UUID (toUUID ("f11773f0-11e1-45b2-9805-06db16768efe" :: String)))
[]
liftIO $ print res

View file

@ -21,14 +21,13 @@ module P2P.Http (
import Annex.Common import Annex.Common
import P2P.Http.Types import P2P.Http.Types
import P2P.Http.State import P2P.Http.State
import qualified P2P.Protocol as P2P import P2P.Protocol hiding (Offset, Bypass)
import P2P.IO
import Servant import Servant
import Servant.Client.Streaming import Servant.Client.Streaming
import qualified Servant.Types.SourceT as S import qualified Servant.Types.SourceT as S
import Network.HTTP.Client (defaultManagerSettings, newManager)
import qualified Data.ByteString as B import qualified Data.ByteString as B
import Control.Concurrent
import Control.Concurrent.STM import Control.Concurrent.STM
type P2PHttpAPI type P2PHttpAPI
@ -137,7 +136,7 @@ serveGet
serveGet = undefined serveGet = undefined
clientGet clientGet
:: P2P.ProtocolVersion :: ProtocolVersion
-> B64Key -> B64Key
-> Maybe (B64UUID ClientSide) -> Maybe (B64UUID ClientSide)
-> Maybe (B64UUID ServerSide) -> Maybe (B64UUID ServerSide)
@ -145,7 +144,7 @@ clientGet
-> Maybe B64FilePath -> Maybe B64FilePath
-> Maybe Offset -> Maybe Offset
-> ClientM (Headers '[DataLengthHeader] (S.SourceT IO B.ByteString)) -> ClientM (Headers '[DataLengthHeader] (S.SourceT IO B.ByteString))
clientGet (P2P.ProtocolVersion ver) = case ver of clientGet (ProtocolVersion ver) = case ver of
3 -> v3 V3 3 -> v3 V3
2 -> v2 V2 2 -> v2 V2
1 -> v1 V1 1 -> v1 V1
@ -171,22 +170,21 @@ serveCheckPresent
-> [B64UUID Bypass] -> [B64UUID Bypass]
-> Handler CheckPresentResult -> Handler CheckPresentResult
serveCheckPresent st apiver (B64Key k) cu su bypass = do serveCheckPresent st apiver (B64Key k) cu su bypass = do
res <- liftIO $ inP2PConnection st cu su bypass $ P2P.checkPresent k res <- withP2PConnection apiver st cu su bypass $ \runst conn ->
liftIO $ runNetProto runst conn $ checkPresent k
case res of case res of
Right (Right b) -> return (CheckPresentResult b) Right (Right b) -> return (CheckPresentResult b)
Right (Left err) -> Right (Left err) -> throwError $ err500 { errBody = encodeBL err }
throwError $ err500 { errBody = encodeBL err } Left err -> throwError $ err500 { errBody = encodeBL (describeProtoFailure err) }
Left err ->
throwError $ err500 { errBody = encodeBL err }
clientCheckPresent clientCheckPresent'
:: P2P.ProtocolVersion :: ProtocolVersion
-> B64Key -> B64Key
-> B64UUID ClientSide -> B64UUID ClientSide
-> B64UUID ServerSide -> B64UUID ServerSide
-> [B64UUID Bypass] -> [B64UUID Bypass]
-> ClientM CheckPresentResult -> ClientM CheckPresentResult
clientCheckPresent (P2P.ProtocolVersion ver) = case ver of clientCheckPresent' (ProtocolVersion ver) = case ver of
3 -> v3 V3 3 -> v3 V3
2 -> v2 V2 2 -> v2 V2
1 -> v1 V1 1 -> v1 V1
@ -196,6 +194,20 @@ clientCheckPresent (P2P.ProtocolVersion ver) = case ver of
_ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|>
v3 :<|> v2 :<|> v1 :<|> v0 :<|> _ = client p2pHttpAPI v3 :<|> v2 :<|> v1 :<|> v0 :<|> _ = client p2pHttpAPI
clientCheckPresent
:: ClientEnv
-> ProtocolVersion
-> B64Key
-> B64UUID ClientSide
-> B64UUID ServerSide
-> [B64UUID Bypass]
-> IO Bool
clientCheckPresent clientenv protover key cu su bypass = do
let cli = clientCheckPresent' protover key cu su bypass
withClientM cli clientenv $ \case
Left err -> throwM err
Right (CheckPresentResult res) -> return res
type RemoveAPI result type RemoveAPI result
= KeyParam = KeyParam
:> ClientUUID Required :> ClientUUID Required
@ -216,13 +228,13 @@ serveRemove
serveRemove = undefined serveRemove = undefined
clientRemove clientRemove
:: P2P.ProtocolVersion :: ProtocolVersion
-> B64Key -> B64Key
-> B64UUID ClientSide -> B64UUID ClientSide
-> B64UUID ServerSide -> B64UUID ServerSide
-> [B64UUID Bypass] -> [B64UUID Bypass]
-> ClientM RemoveResultPlus -> ClientM RemoveResultPlus
clientRemove (P2P.ProtocolVersion ver) k cu su bypass = case ver of clientRemove (ProtocolVersion ver) k cu su bypass = case ver of
3 -> v3 V3 k cu su bypass 3 -> v3 V3 k cu su bypass
2 -> v2 V2 k cu su bypass 2 -> v2 V2 k cu su bypass
1 -> plus <$> v1 V1 k cu su bypass 1 -> plus <$> v1 V1 k cu su bypass
@ -254,14 +266,14 @@ serveRemoveBefore
serveRemoveBefore = undefined serveRemoveBefore = undefined
clientRemoveBefore clientRemoveBefore
:: P2P.ProtocolVersion :: ProtocolVersion
-> B64Key -> B64Key
-> B64UUID ClientSide -> B64UUID ClientSide
-> B64UUID ServerSide -> B64UUID ServerSide
-> [B64UUID Bypass] -> [B64UUID Bypass]
-> Timestamp -> Timestamp
-> ClientM RemoveResult -> ClientM RemoveResult
clientRemoveBefore (P2P.ProtocolVersion ver) = case ver of clientRemoveBefore (ProtocolVersion ver) = case ver of
3 -> v3 V3 3 -> v3 V3
_ -> error "unsupported protocol version" _ -> error "unsupported protocol version"
where where
@ -288,12 +300,12 @@ serveGetTimestamp
serveGetTimestamp = undefined serveGetTimestamp = undefined
clientGetTimestamp clientGetTimestamp
:: P2P.ProtocolVersion :: ProtocolVersion
-> B64UUID ClientSide -> B64UUID ClientSide
-> B64UUID ServerSide -> B64UUID ServerSide
-> [B64UUID Bypass] -> [B64UUID Bypass]
-> ClientM GetTimestampResult -> ClientM GetTimestampResult
clientGetTimestamp (P2P.ProtocolVersion ver) = case ver of clientGetTimestamp (ProtocolVersion ver) = case ver of
3 -> v3 V3 3 -> v3 V3
_ -> error "unsupported protocol version" _ -> error "unsupported protocol version"
where where
@ -332,7 +344,7 @@ servePut
servePut = undefined servePut = undefined
clientPut clientPut
:: P2P.ProtocolVersion :: ProtocolVersion
-> Maybe Integer -> Maybe Integer
-> B64Key -> B64Key
-> B64UUID ClientSide -> B64UUID ClientSide
@ -343,7 +355,7 @@ clientPut
-> DataLength -> DataLength
-> S.SourceT IO B.ByteString -> S.SourceT IO B.ByteString
-> ClientM PutResultPlus -> ClientM PutResultPlus
clientPut (P2P.ProtocolVersion ver) sz k cu su bypass af o l src = case ver of clientPut (ProtocolVersion ver) sz k cu su bypass af o l src = case ver of
3 -> v3 V3 sz k cu su bypass af o l src 3 -> v3 V3 sz k cu su bypass af o l src
2 -> v2 V2 sz k cu su bypass af o l src 2 -> v2 V2 sz k cu su bypass af o l src
1 -> plus <$> v1 V1 sz k cu su bypass af o l src 1 -> plus <$> v1 V1 sz k cu su bypass af o l src
@ -377,13 +389,13 @@ servePutOffset
servePutOffset = undefined servePutOffset = undefined
clientPutOffset clientPutOffset
:: P2P.ProtocolVersion :: ProtocolVersion
-> B64Key -> B64Key
-> B64UUID ClientSide -> B64UUID ClientSide
-> B64UUID ServerSide -> B64UUID ServerSide
-> [B64UUID Bypass] -> [B64UUID Bypass]
-> ClientM PutOffsetResultPlus -> ClientM PutOffsetResultPlus
clientPutOffset (P2P.ProtocolVersion ver) = case ver of clientPutOffset (ProtocolVersion ver) = case ver of
3 -> v3 V3 3 -> v3 V3
2 -> v2 V2 2 -> v2 V2
_ -> error "unsupported protocol version" _ -> error "unsupported protocol version"
@ -415,13 +427,13 @@ serveLockContent
serveLockContent = undefined serveLockContent = undefined
clientLockContent clientLockContent
:: P2P.ProtocolVersion :: ProtocolVersion
-> B64Key -> B64Key
-> B64UUID ClientSide -> B64UUID ClientSide
-> B64UUID ServerSide -> B64UUID ServerSide
-> [B64UUID Bypass] -> [B64UUID Bypass]
-> ClientM LockResult -> ClientM LockResult
clientLockContent (P2P.ProtocolVersion ver) = case ver of clientLockContent (ProtocolVersion ver) = case ver of
3 -> v3 V3 3 -> v3 V3
2 -> v2 V2 2 -> v2 V2
1 -> v1 V1 1 -> v1 V1
@ -470,8 +482,8 @@ serveKeepLocked st apiver lckid cu su _ _ _ unlockrequeststream = do
go (S.Yield (UnlockRequest False) s) = go s go (S.Yield (UnlockRequest False) s) = go s
go (S.Yield (UnlockRequest True) _) = dropLock lckid st go (S.Yield (UnlockRequest True) _) = dropLock lckid st
clientKeepLocked clientKeepLocked'
:: P2P.ProtocolVersion :: ProtocolVersion
-> LockID -> LockID
-> B64UUID ClientSide -> B64UUID ClientSide
-> B64UUID ServerSide -> B64UUID ServerSide
@ -480,7 +492,7 @@ clientKeepLocked
-> Maybe KeepAlive -> Maybe KeepAlive
-> S.SourceT IO UnlockRequest -> S.SourceT IO UnlockRequest
-> ClientM LockResult -> ClientM LockResult
clientKeepLocked (P2P.ProtocolVersion ver) = case ver of clientKeepLocked' (ProtocolVersion ver) = case ver of
3 -> v3 V3 3 -> v3 V3
2 -> v2 V2 2 -> v2 V2
1 -> v1 V1 1 -> v1 V1
@ -497,17 +509,17 @@ clientKeepLocked (P2P.ProtocolVersion ver) = case ver of
_ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|>
v3 :<|> v2 :<|> v1 :<|> v0 :<|> _ = client p2pHttpAPI v3 :<|> v2 :<|> v1 :<|> v0 :<|> _ = client p2pHttpAPI
clientKeepLocked' clientKeepLocked
:: ClientEnv :: ClientEnv
-> P2P.ProtocolVersion -> ProtocolVersion
-> LockID -> LockID
-> B64UUID ClientSide -> B64UUID ClientSide
-> B64UUID ServerSide -> B64UUID ServerSide
-> [B64UUID Bypass] -> [B64UUID Bypass]
-> TMVar Bool -> TMVar Bool
-> IO () -> IO ()
clientKeepLocked' clientenv protover lckid cu su bypass keeplocked = do clientKeepLocked clientenv protover lckid cu su bypass keeplocked = do
let cli = clientKeepLocked protover lckid cu su bypass let cli = clientKeepLocked' protover lckid cu su bypass
(Just connectionKeepAlive) (Just keepAlive) (Just connectionKeepAlive) (Just keepAlive)
(S.fromStepT unlocksender) (S.fromStepT unlocksender)
withClientM cli clientenv $ \case withClientM cli clientenv $ \case
@ -526,22 +538,6 @@ clientKeepLocked' clientenv protover lckid cu su bypass keeplocked = do
liftIO $ print "sending unlock request" liftIO $ print "sending unlock request"
return $ S.Yield (UnlockRequest True) S.Stop return $ S.Yield (UnlockRequest True) S.Stop
testClientLock = do
mgr <- newManager defaultManagerSettings
burl <- parseBaseUrl "http://localhost:8080/"
keeplocked <- newEmptyTMVarIO
_ <- forkIO $ do
print "running, press enter to drop lock"
_ <- getLine
atomically $ writeTMVar keeplocked False
clientKeepLocked' (mkClientEnv mgr burl)
(P2P.ProtocolVersion 3)
(B64UUID (toUUID ("lck" :: String)))
(B64UUID (toUUID ("cu" :: String)))
(B64UUID (toUUID ("su" :: String)))
[]
keeplocked
type PV3 = Capture "v3" V3 type PV3 = Capture "v3" V3
type PV2 = Capture "v2" V2 type PV2 = Capture "v2" V2

View file

@ -13,29 +13,134 @@ module P2P.Http.State where
import Annex.Common import Annex.Common
import P2P.Http.Types import P2P.Http.Types
import Annex.UUID (genUUID)
import qualified P2P.Protocol as P2P import qualified P2P.Protocol as P2P
import P2P.IO
import P2P.Annex
import Annex.UUID
import Annex.Concurrent
import Servant
import qualified Data.Map as M import qualified Data.Map as M
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.STM import Control.Concurrent.STM
data P2PHttpServerState = P2PHttpServerState data P2PHttpServerState = P2PHttpServerState
{ openLocks :: TMVar (M.Map LockID Locker) { acquireP2PConnection :: AcquireP2PConnection
, openLocks :: TMVar (M.Map LockID Locker)
} }
mkP2PHttpServerState :: IO P2PHttpServerState mkP2PHttpServerState :: AcquireP2PConnection -> IO P2PHttpServerState
mkP2PHttpServerState = P2PHttpServerState mkP2PHttpServerState acquireconn = P2PHttpServerState
<$> newTMVarIO mempty <$> pure acquireconn
<*> newTMVarIO mempty
inP2PConnection withP2PConnection
:: P2PHttpServerState :: APIVersion v
=> v
-> P2PHttpServerState
-> B64UUID ClientSide -> B64UUID ClientSide
-> B64UUID ServerSide -> B64UUID ServerSide
-> [B64UUID Bypass] -> [B64UUID Bypass]
-> P2P.Proto a -> (RunState -> P2PConnection -> Handler a)
-> IO (Either String a) -> Handler a
inP2PConnection st cu su bypass a = undefined withP2PConnection apiver st cu su bypass connaction = do
liftIO (acquireP2PConnection st cp) >>= \case
Left (ConnectionFailed err) ->
throwError err502 { errBody = encodeBL err }
Left TooManyConnections ->
throwError err503
Right (runst, conn, releaseconn) ->
connaction runst conn
`finally` liftIO releaseconn
where
cp = ConnectionParams
{ connectionProtocolVersion = protocolVersion apiver
, connectionServerUUID = fromB64UUID su
, connectionClientUUID = fromB64UUID cu
, connectionBypass = map fromB64UUID bypass
, connectionServerMode = P2P.ServeReadWrite -- XXX auth
}
data ConnectionParams = ConnectionParams
{ connectionProtocolVersion :: P2P.ProtocolVersion
, connectionServerUUID :: UUID
, connectionClientUUID :: UUID
, connectionBypass :: [UUID]
, connectionServerMode :: P2P.ServerMode
}
deriving (Show, Eq, Ord)
data ConnectionProblem
= ConnectionFailed String
| TooManyConnections
deriving (Show, Eq)
type AcquireP2PConnection =
ConnectionParams -> IO
( Either ConnectionProblem
( RunState
, P2PConnection
, IO () -- ^ release connection
)
)
{- Runs P2P actions in the local repository only. -}
-- TODO need worker pool, this can only service a single request at
-- a time.
-- TODO proxies
-- TODO clusters
withLocalP2PConnections :: (AcquireP2PConnection -> Annex a) -> Annex a
withLocalP2PConnections a = do
reqv <- liftIO newEmptyTMVarIO
relv <- liftIO newEmptyTMVarIO
asyncservicer <- liftIO . async =<< forkState (servicer reqv relv)
a (acquireconn reqv) `finally` join (liftIO (wait asyncservicer))
where
acquireconn reqv connparams = do
respvar <- newEmptyTMVarIO
liftIO $ atomically $ putTMVar reqv (connparams, respvar)
liftIO $ atomically $ takeTMVar respvar
servicer reqv relv = do
reqrel <- liftIO $
atomically $
(Right <$> takeTMVar reqv)
`orElse`
(Left <$> takeTMVar relv)
case reqrel of
Right (connparams, respvar) ->
servicereq relv connparams respvar
Left releaseconn -> releaseconn
servicer reqv relv
servicereq relv connparams respvar = do
myuuid <- getUUID
resp <- if connectionServerUUID connparams /= myuuid
then return $ Left $ ConnectionFailed "unknown uuid"
else do
runst <- liftIO $ mkrunst connparams
-- TODO not this, need one with MVars.
let conn = stdioP2PConnection Nothing
-- TODO is this right? It needs to exit
-- when the client stops sending messages.
let server = P2P.serveAuthed
(connectionServerMode connparams)
(connectionServerUUID connparams)
let protorunner = void $
runFullProto runst conn server
asyncworker <- liftIO . async
=<< forkState protorunner
let releaseconn = atomically $ putTMVar relv $
join (liftIO (wait asyncworker))
return $ Right (runst, conn, releaseconn)
liftIO $ atomically $ putTMVar respvar resp
mkrunst connparams = do
prototvar <- newTVarIO $ connectionProtocolVersion connparams
mkRunState $ const $ Serving
(connectionClientUUID connparams)
Nothing
prototvar
data Locker = Locker data Locker = Locker
{ lockerThread :: Async () { lockerThread :: Async ()

View file

@ -526,7 +526,7 @@ data ServerMode
-- ^ Allow reading, and storing new objects, but not deleting objects. -- ^ Allow reading, and storing new objects, but not deleting objects.
| ServeReadWrite | ServeReadWrite
-- ^ Full read and write access. -- ^ Full read and write access.
deriving (Eq, Ord) deriving (Show, Eq, Ord)
-- | Serve the protocol, with a peer that has authenticated. -- | Serve the protocol, with a peer that has authenticated.
serveAuthed :: ServerMode -> UUID -> Proto () serveAuthed :: ServerMode -> UUID -> Proto ()