fix serveGet early handle close

Needed that waitv after all..
This commit is contained in:
Joey Hess 2024-07-11 09:55:17 -04:00
parent 2c13e6c165
commit 74c6175795
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
5 changed files with 59 additions and 46 deletions

View file

@ -59,8 +59,8 @@ proxySpecialRemoteSide clientmaxversion r = mkRemoteSide r $ do
let remoteconn = P2PConnection let remoteconn = P2PConnection
{ connRepo = Nothing { connRepo = Nothing
, connCheckAuth = const False , connCheckAuth = const False
, connIhdl = P2PHandleTMVar ihdl (Just iwaitv) , connIhdl = P2PHandleTMVar ihdl iwaitv
, connOhdl = P2PHandleTMVar ohdl (Just owaitv) , connOhdl = P2PHandleTMVar ohdl owaitv
, connIdent = ConnIdent (Just (Remote.name r)) , connIdent = ConnIdent (Just (Remote.name r))
} }
let closeremoteconn = do let closeremoteconn = do

View file

@ -163,7 +163,7 @@ testGet = do
burl <- liftIO $ parseBaseUrl "http://localhost:8080/" burl <- liftIO $ parseBaseUrl "http://localhost:8080/"
res <- liftIO $ clientGet (mkClientEnv mgr burl) res <- liftIO $ clientGet (mkClientEnv mgr burl)
(P2P.ProtocolVersion 3) (P2P.ProtocolVersion 3)
(B64Key (fromJust $ deserializeKey ("WORM-s3218-m1720641607--passwd" :: String))) (B64Key (fromJust $ deserializeKey ("SHA256E-s1048576000--e3b67ce72aa2571c799d6419e3e36828461ac1c78f8ef300c7f9c8ae671c517f" :: String)))
(B64UUID (toUUID ("cu" :: String))) (B64UUID (toUUID ("cu" :: String)))
(B64UUID (toUUID ("f11773f0-11e1-45b2-9805-06db16768efe" :: String))) (B64UUID (toUUID ("f11773f0-11e1-45b2-9805-06db16768efe" :: String)))
[] []

View file

@ -32,7 +32,6 @@ import Utility.Metered
import Servant import Servant
import Servant.Client.Streaming import Servant.Client.Streaming
import Servant.API
import qualified Servant.Types.SourceT as S import qualified Servant.Types.SourceT as S
import qualified Data.ByteString as B import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as L import qualified Data.ByteString.Lazy as L
@ -150,8 +149,7 @@ serveGet
-> Maybe Auth -> Maybe Auth
-> Handler (Headers '[DataLengthHeader] (S.SourceT IO B.ByteString)) -> Handler (Headers '[DataLengthHeader] (S.SourceT IO B.ByteString))
serveGet st apiver (B64Key k) cu su bypass baf startat sec auth = do serveGet st apiver (B64Key k) cu su bypass baf startat sec auth = do
(runst, conn, releaseconn) <- conn <- getP2PConnection apiver st cu su bypass sec auth ReadAction
getP2PConnection apiver st cu su bypass sec auth ReadAction
bsv <- liftIO newEmptyTMVarIO bsv <- liftIO newEmptyTMVarIO
endv <- liftIO newEmptyTMVarIO endv <- liftIO newEmptyTMVarIO
validityv <- liftIO newEmptyTMVarIO validityv <- liftIO newEmptyTMVarIO
@ -160,15 +158,17 @@ serveGet st apiver (B64Key k) cu su bypass baf startat sec auth = do
let storer _offset len = sendContentWith $ \bs -> do let storer _offset len = sendContentWith $ \bs -> do
liftIO $ atomically $ putTMVar bsv (len, bs) liftIO $ atomically $ putTMVar bsv (len, bs)
liftIO $ atomically $ takeTMVar endv liftIO $ atomically $ takeTMVar endv
liftIO $ signalFullyConsumedByteString $
connOhdl $ serverP2PConnection conn
return $ \v -> do return $ \v -> do
liftIO $ atomically $ putTMVar validityv v liftIO $ atomically $ putTMVar validityv v
return True return True
v <- enteringStage (TransferStage Upload) $ v <- enteringStage (TransferStage Upload) $
runFullProto runst conn $ runFullProto (clientRunState conn) (clientP2PConnection conn) $
void $ receiveContent Nothing nullMeterUpdate void $ receiveContent Nothing nullMeterUpdate
sizer storer getreq sizer storer getreq
return v return v
liftIO $ forkIO $ waitfinal endv finalv releaseconn annexworker void $ liftIO $ forkIO $ waitfinal endv finalv conn annexworker
(Len len, bs) <- liftIO $ atomically $ takeTMVar bsv (Len len, bs) <- liftIO $ atomically $ takeTMVar bsv
bv <- liftIO $ newMVar (L.toChunks bs) bv <- liftIO $ newMVar (L.toChunks bs)
let streamer = S.SourceT $ \s -> s =<< return let streamer = S.SourceT $ \s -> s =<< return
@ -206,7 +206,7 @@ serveGet st apiver (B64Key k) cu su bypass baf startat sec auth = do
, pure mempty , pure mempty
) )
waitfinal endv finalv releaseconn annexworker = do waitfinal endv finalv conn annexworker = do
-- Wait for everything to be transferred before -- Wait for everything to be transferred before
-- stopping the annexworker. The validityv will usually -- stopping the annexworker. The validityv will usually
-- be written to at the end. If the client disconnects -- be written to at the end. If the client disconnects
@ -215,8 +215,8 @@ serveGet st apiver (B64Key k) cu su bypass baf startat sec auth = do
-- Make sure the annexworker is not left blocked on endv -- Make sure the annexworker is not left blocked on endv
-- if the client disconnected early. -- if the client disconnected early.
liftIO $ atomically $ tryPutTMVar endv () liftIO $ atomically $ tryPutTMVar endv ()
void $ tryNonAsync $ wait annexworker void $ void $ tryNonAsync $ wait annexworker
void $ tryNonAsync releaseconn void $ tryNonAsync $ releaseP2PConnection conn
sizer = pure $ Len $ case startat of sizer = pure $ Len $ case startat of
Just (Offset o) -> fromIntegral o Just (Offset o) -> fromIntegral o
@ -301,8 +301,7 @@ serveCheckPresent
-> Handler CheckPresentResult -> Handler CheckPresentResult
serveCheckPresent st apiver (B64Key k) cu su bypass sec auth = do serveCheckPresent st apiver (B64Key k) cu su bypass sec auth = do
res <- withP2PConnection apiver st cu su bypass sec auth ReadAction res <- withP2PConnection apiver st cu su bypass sec auth ReadAction
$ \runst conn -> $ \conn -> liftIO $ proxyClientNetProto conn $ checkPresent k
liftIO $ runNetProto runst conn $ checkPresent k
case res of case res of
Right b -> return (CheckPresentResult b) Right b -> return (CheckPresentResult b)
Left err -> throwError $ err500 { errBody = encodeBL err } Left err -> throwError $ err500 { errBody = encodeBL err }
@ -354,8 +353,8 @@ serveRemove
-> Handler t -> Handler t
serveRemove st resultmangle apiver (B64Key k) cu su bypass sec auth = do serveRemove st resultmangle apiver (B64Key k) cu su bypass sec auth = do
res <- withP2PConnection apiver st cu su bypass sec auth RemoveAction res <- withP2PConnection apiver st cu su bypass sec auth RemoveAction
$ \runst conn -> $ \conn ->
liftIO $ runNetProto runst conn $ remove Nothing k liftIO $ proxyClientNetProto conn $ remove Nothing k
case res of case res of
(Right b, plusuuids) -> return $ resultmangle $ (Right b, plusuuids) -> return $ resultmangle $
RemoveResultPlus b (map B64UUID (fromMaybe [] plusuuids)) RemoveResultPlus b (map B64UUID (fromMaybe [] plusuuids))
@ -411,8 +410,8 @@ serveRemoveBefore
-> Handler RemoveResultPlus -> Handler RemoveResultPlus
serveRemoveBefore st apiver (B64Key k) cu su bypass (Timestamp ts) sec auth = do serveRemoveBefore st apiver (B64Key k) cu su bypass (Timestamp ts) sec auth = do
res <- withP2PConnection apiver st cu su bypass sec auth RemoveAction res <- withP2PConnection apiver st cu su bypass sec auth RemoveAction
$ \runst conn -> $ \conn ->
liftIO $ runNetProto runst conn $ liftIO $ proxyClientNetProto conn $
removeBeforeRemoteEndTime ts k removeBeforeRemoteEndTime ts k
case res of case res of
(Right b, plusuuids) -> return $ (Right b, plusuuids) -> return $
@ -464,8 +463,8 @@ serveGetTimestamp
-> Handler GetTimestampResult -> Handler GetTimestampResult
serveGetTimestamp st apiver cu su bypass sec auth = do serveGetTimestamp st apiver cu su bypass sec auth = do
res <- withP2PConnection apiver st cu su bypass sec auth ReadAction res <- withP2PConnection apiver st cu su bypass sec auth ReadAction
$ \runst conn -> $ \conn ->
liftIO $ runNetProto runst conn getTimestamp liftIO $ proxyClientNetProto conn getTimestamp
case res of case res of
Right ts -> return $ GetTimestampResult (Timestamp ts) Right ts -> return $ GetTimestampResult (Timestamp ts)
Left err -> throwError $ Left err -> throwError $

View file

@ -17,6 +17,7 @@ import Annex.Common
import qualified Annex import qualified Annex
import P2P.Http.Types import P2P.Http.Types
import qualified P2P.Protocol as P2P import qualified P2P.Protocol as P2P
import qualified P2P.IO as P2P
import P2P.IO import P2P.IO
import P2P.Annex import P2P.Annex
import Annex.UUID import Annex.UUID
@ -62,15 +63,14 @@ withP2PConnection
-> IsSecure -> IsSecure
-> Maybe Auth -> Maybe Auth
-> ActionClass -> ActionClass
-> (RunState -> P2PConnection -> Handler (Either ProtoFailure a)) -> (P2PConnectionPair -> Handler (Either ProtoFailure a))
-> Handler a -> Handler a
withP2PConnection apiver st cu su bypass sec auth actionclass connaction = do withP2PConnection apiver st cu su bypass sec auth actionclass connaction = do
(runst, conn, releaseconn) <- conn <- getP2PConnection apiver st cu su bypass sec auth actionclass
getP2PConnection apiver st cu su bypass sec auth actionclass connaction' conn
connaction' runst conn `finally` liftIO (releaseP2PConnection conn)
`finally` liftIO releaseconn
where where
connaction' runst conn = connaction runst conn >>= \case connaction' conn = connaction conn >>= \case
Right r -> return r Right r -> return r
Left err -> throwError $ Left err -> throwError $
err500 { errBody = encodeBL (describeProtoFailure err) } err500 { errBody = encodeBL (describeProtoFailure err) }
@ -85,7 +85,7 @@ getP2PConnection
-> IsSecure -> IsSecure
-> Maybe Auth -> Maybe Auth
-> ActionClass -> ActionClass
-> Handler (RunState, P2PConnection, ReleaseP2PConnection) -> Handler P2PConnectionPair
getP2PConnection apiver st cu su bypass sec auth actionclass = getP2PConnection apiver st cu su bypass sec auth actionclass =
case (getServerMode st sec auth, actionclass) of case (getServerMode st sec auth, actionclass) of
(Just P2P.ServeReadWrite, _) -> go P2P.ServeReadWrite (Just P2P.ServeReadWrite, _) -> go P2P.ServeReadWrite
@ -130,16 +130,20 @@ data ConnectionProblem
| TooManyConnections | TooManyConnections
deriving (Show, Eq) deriving (Show, Eq)
type AcquireP2PConnection = data P2PConnectionPair = P2PConnectionPair
ConnectionParams -> IO { clientRunState :: RunState
( Either ConnectionProblem , clientP2PConnection :: P2PConnection
( RunState , serverP2PConnection :: P2PConnection
, P2PConnection , releaseP2PConnection :: IO ()
, ReleaseP2PConnection -- ^ release connection }
)
)
type ReleaseP2PConnection = IO () proxyClientNetProto :: P2PConnectionPair -> P2P.Proto a -> IO (Either P2P.ProtoFailure a)
proxyClientNetProto conn = runNetProto
(clientRunState conn) (clientP2PConnection conn)
type AcquireP2PConnection
= ConnectionParams
-> IO (Either ConnectionProblem P2PConnectionPair)
{- Acquire P2P connections to the local repository. -} {- Acquire P2P connections to the local repository. -}
-- TODO need worker pool, this can only service a single request at -- TODO need worker pool, this can only service a single request at
@ -177,8 +181,10 @@ withLocalP2PConnections a = do
else do else do
hdl1 <- liftIO newEmptyTMVarIO hdl1 <- liftIO newEmptyTMVarIO
hdl2 <- liftIO newEmptyTMVarIO hdl2 <- liftIO newEmptyTMVarIO
let h1 = P2PHandleTMVar hdl1 Nothing wait1 <- liftIO newEmptyTMVarIO
let h2 = P2PHandleTMVar hdl2 Nothing wait2 <- liftIO newEmptyTMVarIO
let h1 = P2PHandleTMVar hdl1 wait1
let h2 = P2PHandleTMVar hdl2 wait2
let serverconn = P2PConnection Nothing let serverconn = P2PConnection Nothing
(const True) h1 h2 (const True) h1 h2
(ConnIdent (Just "http server")) (ConnIdent (Just "http server"))
@ -196,7 +202,12 @@ withLocalP2PConnections a = do
=<< forkState protorunner =<< forkState protorunner
let releaseconn = atomically $ putTMVar relv $ let releaseconn = atomically $ putTMVar relv $
join (liftIO (wait asyncworker)) join (liftIO (wait asyncworker))
return $ Right (clientrunst, clientconn, releaseconn) return $ Right $ P2PConnectionPair
{ clientRunState = clientrunst
, clientP2PConnection = clientconn
, serverP2PConnection = serverconn
, releaseP2PConnection = releaseconn
}
liftIO $ atomically $ putTMVar respvar resp liftIO $ atomically $ putTMVar respvar resp
mkserverrunst connparams = do mkserverrunst connparams = do

View file

@ -25,6 +25,7 @@ module P2P.IO
, describeProtoFailure , describeProtoFailure
, runNetProto , runNetProto
, runNet , runNet
, signalFullyConsumedByteString
) where ) where
import Common import Common
@ -79,7 +80,12 @@ mkRunState mk = do
data P2PHandle data P2PHandle
= P2PHandle Handle = P2PHandle Handle
| P2PHandleTMVar (TMVar (Either L.ByteString Message)) (Maybe (TMVar ())) | P2PHandleTMVar (TMVar (Either L.ByteString Message)) (TMVar ())
signalFullyConsumedByteString :: P2PHandle -> IO ()
signalFullyConsumedByteString (P2PHandle _) = return ()
signalFullyConsumedByteString (P2PHandleTMVar _ waitv) =
atomically $ putTMVar waitv ()
data P2PConnection = P2PConnection data P2PConnection = P2PConnection
{ connRepo :: Maybe Repo { connRepo :: Maybe Repo
@ -246,14 +252,11 @@ runNet runst conn runner f = case f of
Right False -> return $ Left $ Right False -> return $ Left $
ProtoFailureMessage "short data write" ProtoFailureMessage "short data write"
Left e -> return $ Left $ ProtoFailureException e Left e -> return $ Left $ ProtoFailureException e
P2PHandleTMVar mv mwaitv -> do P2PHandleTMVar mv waitv -> do
liftIO $ atomically $ putTMVar mv (Left b) liftIO $ atomically $ putTMVar mv (Left b)
case mwaitv of
-- Wait for the whole bytestring to -- Wait for the whole bytestring to
-- be processed. -- be processed.
Just waitv -> liftIO $ atomically $ liftIO $ atomically $ takeTMVar waitv
takeTMVar waitv
Nothing -> return ()
runner next runner next
ReceiveBytes len p next -> ReceiveBytes len p next ->
case connIhdl conn of case connIhdl conn of