implemented serveGet and clientGet

Both are only at bare proof of concept stage. Still need to deal with
signaling validity and invalidity, and checking it.

And there's a bad bug: After -JN*2 requests, another request hangs!

So, I think it's failing to free up the Annex worker and end of request
lifetime.

Perhaps I need to use this:

https://docs.servant.dev/en/stable/cookbook/managed-resource/ManagedResource.html
This commit is contained in:
Joey Hess 2024-07-10 16:06:39 -04:00
parent f9b7ce7224
commit 1e0f92a5a1
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
5 changed files with 177 additions and 31 deletions

View file

@ -73,7 +73,7 @@ seek o = getAnnexWorkerPool $ \workerpool -> do
-- XXX remove this -- XXX remove this
when (isNothing (portOption o)) $ do when (isNothing (portOption o)) $ do
liftIO $ putStrLn "test begins" liftIO $ putStrLn "test begins"
testGetTimestamp testGet
giveup "TEST DONE" giveup "TEST DONE"
withLocalP2PConnections $ \acquireconn -> liftIO $ do withLocalP2PConnections $ \acquireconn -> liftIO $ do
authenv <- getAuthEnv authenv <- getAuthEnv
@ -158,6 +158,20 @@ testCheckPresent = do
Nothing Nothing
liftIO $ print res liftIO $ print res
testGet = do
mgr <- httpManager <$> getUrlOptions
burl <- liftIO $ parseBaseUrl "http://localhost:8080/"
res <- liftIO $ clientGet (mkClientEnv mgr burl)
(P2P.ProtocolVersion 3)
(B64Key (fromJust $ deserializeKey ("WORM-s3218-m1720641607--passwd" :: String)))
(B64UUID (toUUID ("cu" :: String)))
(B64UUID (toUUID ("f11773f0-11e1-45b2-9805-06db16768efe" :: String)))
[]
Nothing
Nothing
Nothing
liftIO $ print res
testRemove = do testRemove = do
mgr <- httpManager <$> getUrlOptions mgr <- httpManager <$> getUrlOptions
burl <- liftIO $ parseBaseUrl "http://localhost:8080/" burl <- liftIO $ parseBaseUrl "http://localhost:8080/"

View file

@ -101,6 +101,26 @@ runLocal runst runner a = case a of
Left e -> return $ Left $ ProtoFailureException e Left e -> return $ Left $ ProtoFailureException e
Right (Left e) -> return $ Left e Right (Left e) -> return $ Left e
Right (Right ok) -> runner (next ok) Right (Right ok) -> runner (next ok)
SendContentWith consumer getb validitycheck next -> do
v <- tryNonAsync $ do
let fallback = return $ Left $
ProtoFailureMessage "Transfer failed"
let consumer' b ti = do
validator <- consumer b
indicatetransferred ti
return validator
runner getb >>= \case
Left e -> giveup $ describeProtoFailure e
Right b -> checktransfer (\ti -> Right <$> consumer' b ti) fallback >>= \case
Left e -> return (Left e)
Right validator ->
runner validitycheck >>= \case
Right v -> Right <$> validator v
_ -> Right <$> validator Nothing
case v of
Left e -> return $ Left $ ProtoFailureException e
Right (Left e) -> return $ Left e
Right (Right ok) -> runner (next ok)
SetPresent k u next -> do SetPresent k u next -> do
v <- tryNonAsync $ logChange k u InfoPresent v <- tryNonAsync $ logChange k u InfoPresent
case v of case v of

View file

@ -10,6 +10,7 @@
{-# LANGUAGE DataKinds #-} {-# LANGUAGE DataKinds #-}
{-# LANGUAGE TypeOperators #-} {-# LANGUAGE TypeOperators #-}
{-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
module P2P.Http ( module P2P.Http (
@ -23,6 +24,7 @@ import P2P.Http.Types
import P2P.Http.State import P2P.Http.State
import P2P.Protocol hiding (Offset, Bypass, auth) import P2P.Protocol hiding (Offset, Bypass, auth)
import P2P.IO import P2P.IO
import P2P.Annex
import Annex.WorkerPool import Annex.WorkerPool
import Types.WorkerPool import Types.WorkerPool
import Types.Direction import Types.Direction
@ -30,9 +32,15 @@ import Utility.Metered
import Servant import Servant
import Servant.Client.Streaming import Servant.Client.Streaming
import Servant.API
import qualified Servant.Types.SourceT as S import qualified Servant.Types.SourceT as S
import qualified Data.ByteString as B import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as L
import qualified Data.ByteString.Lazy.Internal as LI
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.Async
import Control.Concurrent
import System.IO.Unsafe
type P2PHttpAPI type P2PHttpAPI
= "git-annex" :> PV3 :> "key" :> CaptureKey :> GetAPI = "git-annex" :> PV3 :> "key" :> CaptureKey :> GetAPI
@ -142,23 +150,66 @@ serveGet
-> Maybe Auth -> Maybe Auth
-> Handler (Headers '[DataLengthHeader] (S.SourceT IO B.ByteString)) -> Handler (Headers '[DataLengthHeader] (S.SourceT IO B.ByteString))
serveGet st apiver (B64Key k) cu su bypass baf startat sec auth = do serveGet st apiver (B64Key k) cu su bypass baf startat sec auth = do
res <- withP2PConnection apiver st cu su bypass sec auth ReadAction (runst, conn, releaseconn) <-
$ \runst conn -> do getP2PConnection apiver st cu su bypass sec auth ReadAction
liftIO $ inAnnexWorker st $ bsv <- liftIO newEmptyTMVarIO
enteringStage (TransferStage Upload) $ do endv <- liftIO newEmptyTMVarIO
liftIO $ print "IN ANNEX WORKER!" validityv <- liftIO newEmptyTMVarIO
{- aid <- liftIO $ async $ inAnnexWorker st $ do
let storer offset len getdata checkvalidity = do let consumer bs = do
undefined -- FIXME liftIO $ atomically $ putTMVar bsv bs
-- XXX needs to run in annex monad to runFullProto liftIO $ atomically $ takeTMVar endv
liftIO $ runNetProto runst conn $ return $ \v -> do
receiveContent Nothing nullMeterUpdate liftIO $ atomically $
putTMVar validityv v
return True
let storer _offset _len getdata checkvalidity =
sendContentWith consumer getdata checkvalidity
enteringStage (TransferStage Upload) $
runFullProto runst conn $
void $ receiveContent Nothing nullMeterUpdate
sizer storer getreq sizer storer getreq
-} bs <- liftIO $ atomically $ takeTMVar bsv
undefined bv <- liftIO $ newMVar (L.toChunks bs)
undefined -- XXX fixme streaming out let streamer = S.SourceT $ \s -> s =<< return
(stream (releaseconn, bv, endv, validityv, aid))
return $ addHeader 111111 streamer
where where
sizer = Len $ case startat of stream (releaseconn, bv, endv, validityv, aid) =
S.fromActionStep B.null $ do
print "chunk"
modifyMVar bv $ \case
(b:bs) -> return (bs, b)
[] -> do
endbit <- cleanup (releaseconn, endv, validityv, aid)
return ([], endbit)
cleanup (releaseconn, endv, validityv, aid) =
ifM (atomically $ isEmptyTMVar endv)
( pure mempty
, do
atomically $ putTMVar endv ()
validity <- atomically $ takeTMVar validityv
print ("got validity", validity)
wait aid >>= \case
Left ex -> throwM ex
Right (Left err) -> error $
describeProtoFailure err
Right (Right ()) -> return ()
() <- releaseconn
-- When the key's content is invalid,
-- indicate that to the client by padding
-- the response, so it is not the same
-- length indicated by the DataLengthHeader.
return $ case validity of
Nothing -> mempty
Just Valid -> mempty
Just Invalid -> "XXXXXXX"
-- FIXME: need to count bytes and emit
-- something to make it invalid
)
sizer = pure $ Len $ case startat of
Just (Offset o) -> fromIntegral o Just (Offset o) -> fromIntegral o
Nothing -> 0 Nothing -> 0
@ -169,6 +220,43 @@ serveGet st apiver (B64Key k) cu su bypass baf startat sec auth = do
Nothing -> Nothing Nothing -> Nothing
clientGet clientGet
:: ClientEnv
-> ProtocolVersion
-> B64Key
-> B64UUID ClientSide
-> B64UUID ServerSide
-> [B64UUID Bypass]
-> Maybe B64FilePath
-> Maybe Offset
-> Maybe Auth
-> IO ()
clientGet clientenv ver k cu su bypass af o auth =
withClientM (clientGet' ver k cu su bypass af o auth) clientenv $ \case
Left err -> throwM err
Right respheaders -> do
let dl = case lookupResponseHeader @DataLengthHeader' respheaders of
Header h -> h
_ -> error "missing data length header"
liftIO $ print ("datalength", dl :: Integer)
b <- S.unSourceT (getResponse respheaders) gatherbytestring
liftIO $ print "got it all, writing to file 'got'"
L.writeFile "got" b
gatherbytestring :: S.StepT IO B.ByteString -> IO L.ByteString
gatherbytestring x = do
l <- unsafeInterleaveIO $ go x
return l
where
go S.Stop = return LI.Empty
go (S.Error err) = error $ show ("ERROR", err)
go (S.Skip s) = do
go s
go (S.Effect ms) = do
ms >>= go
go (S.Yield v s) = do
LI.Chunk v <$> unsafeInterleaveIO (go s)
clientGet'
:: ProtocolVersion :: ProtocolVersion
-> B64Key -> B64Key
-> B64UUID ClientSide -> B64UUID ClientSide
@ -178,7 +266,7 @@ clientGet
-> Maybe Offset -> Maybe Offset
-> Maybe Auth -> Maybe Auth
-> ClientM (Headers '[DataLengthHeader] (S.SourceT IO B.ByteString)) -> ClientM (Headers '[DataLengthHeader] (S.SourceT IO B.ByteString))
clientGet (ProtocolVersion ver) = case ver of clientGet' (ProtocolVersion ver) = case ver of
3 -> v3 V3 3 -> v3 V3
2 -> v2 V2 2 -> v2 V2
1 -> v1 V1 1 -> v1 V1
@ -647,7 +735,9 @@ type AssociatedFileParam = QueryParam "associatedfile" B64FilePath
type OffsetParam = QueryParam "offset" Offset type OffsetParam = QueryParam "offset" Offset
type DataLengthHeader = Header "X-git-annex-data-length" Integer type DataLengthHeader = Header DataLengthHeader' Integer
type DataLengthHeader' = "X-git-annex-data-length"
type LockIDParam = QueryParam' '[Required] "lockid" LockID type LockIDParam = QueryParam' '[Required] "lockid" LockID

View file

@ -9,6 +9,7 @@
{-# LANGUAGE BangPatterns #-} {-# LANGUAGE BangPatterns #-}
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}
module P2P.Http.State where module P2P.Http.State where
@ -63,7 +64,29 @@ withP2PConnection
-> ActionClass -> ActionClass
-> (RunState -> P2PConnection -> Handler (Either ProtoFailure a)) -> (RunState -> P2PConnection -> Handler (Either ProtoFailure a))
-> Handler a -> Handler a
withP2PConnection apiver st cu su bypass sec auth actionclass connaction = withP2PConnection apiver st cu su bypass sec auth actionclass connaction = do
(runst, conn, releaseconn) <-
getP2PConnection apiver st cu su bypass sec auth actionclass
connaction' runst conn
`finally` liftIO releaseconn
where
connaction' runst conn = connaction runst conn >>= \case
Right r -> return r
Left err -> throwError $
err500 { errBody = encodeBL (describeProtoFailure err) }
getP2PConnection
:: APIVersion v
=> v
-> P2PHttpServerState
-> B64UUID ClientSide
-> B64UUID ServerSide
-> [B64UUID Bypass]
-> IsSecure
-> Maybe Auth
-> ActionClass
-> Handler (RunState, P2PConnection, ReleaseP2PConnection)
getP2PConnection apiver st cu su bypass sec auth actionclass =
case (getServerMode st sec auth, actionclass) of case (getServerMode st sec auth, actionclass) of
(Just P2P.ServeReadWrite, _) -> go P2P.ServeReadWrite (Just P2P.ServeReadWrite, _) -> go P2P.ServeReadWrite
(Just P2P.ServeAppendOnly, RemoveAction) -> throwError err403 (Just P2P.ServeAppendOnly, RemoveAction) -> throwError err403
@ -77,9 +100,7 @@ withP2PConnection apiver st cu su bypass sec auth actionclass connaction =
throwError err502 { errBody = encodeBL err } throwError err502 { errBody = encodeBL err }
Left TooManyConnections -> Left TooManyConnections ->
throwError err503 throwError err503
Right (runst, conn, releaseconn) -> Right v -> return v
connaction' runst conn
`finally` liftIO releaseconn
where where
cp = ConnectionParams cp = ConnectionParams
{ connectionProtocolVersion = protocolVersion apiver { connectionProtocolVersion = protocolVersion apiver
@ -88,11 +109,6 @@ withP2PConnection apiver st cu su bypass sec auth actionclass connaction =
, connectionBypass = map fromB64UUID bypass , connectionBypass = map fromB64UUID bypass
, connectionServerMode = servermode , connectionServerMode = servermode
} }
connaction' runst conn = connaction runst conn >>= \case
Right r -> return r
Left err -> throwError $
err500 { errBody = encodeBL (describeProtoFailure err) }
basicAuthRequired :: ServerError basicAuthRequired :: ServerError
basicAuthRequired = err401 { errHeaders = [(h, v)] } basicAuthRequired = err401 { errHeaders = [(h, v)] }
@ -119,11 +135,13 @@ type AcquireP2PConnection =
( Either ConnectionProblem ( Either ConnectionProblem
( RunState ( RunState
, P2PConnection , P2PConnection
, IO () -- ^ release connection , ReleaseP2PConnection -- ^ release connection
) )
) )
{- Runs P2P actions in the local repository only. -} type ReleaseP2PConnection = IO ()
{- Acquire P2P connections to the local repository. -}
-- TODO need worker pool, this can only service a single request at -- TODO need worker pool, this can only service a single request at
-- a time. -- a time.
-- TODO proxies -- TODO proxies
@ -137,8 +155,8 @@ withLocalP2PConnections a = do
where where
acquireconn reqv connparams = do acquireconn reqv connparams = do
respvar <- newEmptyTMVarIO respvar <- newEmptyTMVarIO
liftIO $ atomically $ putTMVar reqv (connparams, respvar) atomically $ putTMVar reqv (connparams, respvar)
liftIO $ atomically $ takeTMVar respvar atomically $ takeTMVar respvar
servicer reqv relv = do servicer reqv relv = do
reqrel <- liftIO $ reqrel <- liftIO $

View file

@ -310,6 +310,10 @@ data LocalF c
-- content been transferred. -- content been transferred.
| StoreContentTo FilePath (Maybe IncrementalVerifier) Offset Len (Proto L.ByteString) (Proto (Maybe Validity)) ((Bool, Verification) -> c) | StoreContentTo FilePath (Maybe IncrementalVerifier) Offset Len (Proto L.ByteString) (Proto (Maybe Validity)) ((Bool, Verification) -> c)
-- ^ Like StoreContent, but stores the content to a temp file. -- ^ Like StoreContent, but stores the content to a temp file.
| SendContentWith (L.ByteString -> Annex (Maybe Validity -> Annex Bool)) (Proto L.ByteString) (Proto (Maybe Validity)) (Bool -> c)
-- ^ Reads content from the Proto L.ByteString and sends it to the
-- callback. The callback must consume the whole lazy ByteString,
-- before it returns a validity checker.
| SetPresent Key UUID c | SetPresent Key UUID c
| CheckContentPresent Key (Bool -> c) | CheckContentPresent Key (Bool -> c)
-- ^ Checks if the whole content of the key is locally present. -- ^ Checks if the whole content of the key is locally present.