From 267a202e72b9dddf5c3950b97a5824f4e31bcc11 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Fri, 26 Jul 2024 15:25:15 -0400 Subject: [PATCH] clean up after http p2p proxy GET is interrupted There was an annex worker thread that did not get stopped. It was stuck in ReceiveMessage from the P2PHandleTMVar. Fixed by making P2PHandleTMVar closeable. In serveGet, releaseP2PConnection has to come first, else the annexworker may not shut down, if it's waiting to read from it. In proxyConnection, call closeRemoteSide in order to wait for the ssh process (for example). --- Annex/Proxy.hs | 6 ++++-- P2P/Http/Server.hs | 2 +- P2P/Http/State.hs | 43 ++++++++++++++++++++-------------------- P2P/IO.hs | 49 ++++++++++++++++++++++++++++++++-------------- 4 files changed, 60 insertions(+), 40 deletions(-) diff --git a/Annex/Proxy.hs b/Annex/Proxy.hs index 2bab026bac..ba46087451 100644 --- a/Annex/Proxy.hs +++ b/Annex/Proxy.hs @@ -60,14 +60,16 @@ proxySpecialRemoteSide clientmaxversion r = mkRemoteSide r $ do ohdl <- liftIO newEmptyTMVarIO iwaitv <- liftIO newEmptyTMVarIO owaitv <- liftIO newEmptyTMVarIO + iclosedv <- liftIO newEmptyTMVarIO + oclosedv <- liftIO newEmptyTMVarIO endv <- liftIO newEmptyTMVarIO worker <- liftIO . async =<< forkState (proxySpecialRemote protoversion r ihdl ohdl owaitv endv) let remoteconn = P2PConnection { connRepo = Nothing , connCheckAuth = const False - , connIhdl = P2PHandleTMVar ihdl (Just iwaitv) - , connOhdl = P2PHandleTMVar ohdl (Just owaitv) + , connIhdl = P2PHandleTMVar ihdl (Just iwaitv) iclosedv + , connOhdl = P2PHandleTMVar ohdl (Just owaitv) oclosedv , connIdent = ConnIdent (Just (Remote.name r)) } let closeremoteconn = do diff --git a/P2P/Http/Server.hs b/P2P/Http/Server.hs index 42e06d3ee7..26a7080d9b 100644 --- a/P2P/Http/Server.hs +++ b/P2P/Http/Server.hs @@ -194,8 +194,8 @@ serveGet st su apiver (B64Key k) cu bypass baf startat sec auth = do -- Make sure the annexworker is not left blocked on endv -- if the client disconnected early. void $ liftIO $ atomically $ tryPutTMVar endv () - void $ tryNonAsync $ wait annexworker void $ tryNonAsync $ releaseP2PConnection conn + void $ tryNonAsync $ wait annexworker sizer = pure $ Len $ case startat of Just (Offset o) -> fromIntegral o diff --git a/P2P/Http/State.hs b/P2P/Http/State.hs index 19ae0c29b2..6db1d2f93d 100644 --- a/P2P/Http/State.hs +++ b/P2P/Http/State.hs @@ -302,14 +302,14 @@ mkP2PConnectionPair connparams (n1, n2) = do hdl2 <- newEmptyTMVarIO wait1 <- newEmptyTMVarIO wait2 <- newEmptyTMVarIO - let h1 = P2PHandleTMVar hdl1 $ - if connectionWaitVar connparams - then Just wait1 - else Nothing - let h2 = P2PHandleTMVar hdl2 $ - if connectionWaitVar connparams - then Just wait2 - else Nothing + closed1 <- newEmptyTMVarIO + closed2 <- newEmptyTMVarIO + let h1 = P2PHandleTMVar hdl1 + (if connectionWaitVar connparams then Just wait1 else Nothing) + closed1 + let h2 = P2PHandleTMVar hdl2 + (if connectionWaitVar connparams then Just wait2 else Nothing) + closed2 let clientconn = P2PConnection Nothing (const True) h2 h1 (ConnIdent (Just n1)) @@ -366,10 +366,15 @@ proxyConnection relv connparams workerpool proxyconn = do liftIO $ runNetProto proxyfromclientrunst proxyfromclientconn $ P2P.net P2P.receiveMessage - let releaseconn returntopool = - atomically $ void $ tryPutTMVar relv $ - liftIO $ wait asyncworker - >>= either throwM return + let releaseconn returntopool = + atomically $ void $ tryPutTMVar relv $ do + r <- liftIO $ wait asyncworker + liftIO $ closeConnection proxyfromclientconn + liftIO $ closeConnection clientconn + inAnnexWorker' workerpool $ + Proxy.closeRemoteSide $ + proxyConnectionRemoteSide proxyconn + either throwM return r return $ Right $ P2PConnectionPair { clientRunState = clientrunst @@ -380,16 +385,10 @@ proxyConnection relv connparams workerpool proxyconn = do } where protoerrhandler cont a = a >>= \case - -- TODO protocol error, or client hung up, release the p2p - -- connection - Left err -> do - liftIO $ hPutStrLn stderr ("protoerrhandler: " ++ show err) - Proxy.closeRemoteSide $ proxyConnectionRemoteSide proxyconn - return () - Right v -> do - liftIO $ print "protoerrhandler returned" - Proxy.closeRemoteSide $ proxyConnectionRemoteSide proxyconn - cont v + Left err -> + Proxy.closeRemoteSide $ + proxyConnectionRemoteSide proxyconn + Right v -> cont v proxydone = return () requestcomplete () = return () diff --git a/P2P/IO.hs b/P2P/IO.hs index 75a323a0d9..9158c7b6d1 100644 --- a/P2P/IO.hs +++ b/P2P/IO.hs @@ -81,13 +81,17 @@ mkRunState mk = do data P2PHandle = P2PHandle Handle - | P2PHandleTMVar (TMVar (Either L.ByteString Message)) (Maybe (TMVar ())) + | P2PHandleTMVar + (TMVar (Either L.ByteString Message)) + (Maybe (TMVar ())) + (TMVar ()) signalFullyConsumedByteString :: P2PHandle -> IO () signalFullyConsumedByteString (P2PHandle _) = return () -signalFullyConsumedByteString (P2PHandleTMVar _ Nothing) = return () -signalFullyConsumedByteString (P2PHandleTMVar _ (Just waitv)) = +signalFullyConsumedByteString (P2PHandleTMVar _ Nothing _) = return () +signalFullyConsumedByteString (P2PHandleTMVar _ (Just waitv) closedv) = atomically $ putTMVar waitv () + `orElse` readTMVar closedv data P2PConnection = P2PConnection { connRepo :: Maybe Repo @@ -99,6 +103,7 @@ data P2PConnection = P2PConnection -- Identifier for a connection, only used for debugging. newtype ConnIdent = ConnIdent (Maybe String) + deriving (Show) data ClosableConnection conn = OpenConnection conn @@ -146,7 +151,8 @@ closeConnection conn = do closehandle (connOhdl conn) where closehandle (P2PHandle h) = hClose h - closehandle (P2PHandleTMVar _ _) = return () + closehandle (P2PHandleTMVar _ _ closedv) = + atomically $ void $ tryPutTMVar closedv () -- Serves the protocol on a unix socket. -- @@ -209,8 +215,9 @@ runNet runst conn runner f = case f of P2PHandle h -> tryNonAsync $ do hPutStrLn h $ unwords (formatMessage m) hFlush h - P2PHandleTMVar mv _ -> tryNonAsync $ do + P2PHandleTMVar mv _ closedv -> tryNonAsync $ atomically $ putTMVar mv (Right m) + `orElse` readTMVar closedv case v of Left e -> return $ Left $ ProtoFailureException e Right () -> runner next @@ -229,10 +236,13 @@ runNet runst conn runner f = case f of Right (Just l) -> case parseMessage l of Just m -> gotmessage m Nothing -> runner (next Nothing) - P2PHandleTMVar mv _ -> - liftIO (atomically (takeTMVar mv)) >>= \case - Right m -> gotmessage m - Left _b -> protoerr + P2PHandleTMVar mv _ closedv -> do + let recv = (Just <$> takeTMVar mv) + `orElse` (readTMVar closedv >> return Nothing) + liftIO (atomically recv) >>= \case + Just (Right m) -> gotmessage m + Just (Left _b) -> protoerr + Nothing -> runner (next Nothing) SendBytes len b p next -> case connOhdl conn of P2PHandle h -> do @@ -245,11 +255,16 @@ runNet runst conn runner f = case f of Right False -> return $ Left $ ProtoFailureMessage "short data write" Left e -> return $ Left $ ProtoFailureException e - P2PHandleTMVar mv waitv -> do + P2PHandleTMVar mv waitv closedv -> do liftIO $ atomically $ putTMVar mv (Left b) + `orElse` readTMVar closedv -- Wait for the whole bytestring to -- be processed. - liftIO $ maybe noop (atomically . takeTMVar) waitv + case waitv of + Nothing -> noop + Just v -> liftIO $ atomically $ + takeTMVar v + `orElse` readTMVar closedv runner next ReceiveBytes len p next -> case connIhdl conn of @@ -259,11 +274,15 @@ runNet runst conn runner f = case f of Right b -> runner (next b) Left e -> return $ Left $ ProtoFailureException e - P2PHandleTMVar mv _ -> - liftIO (atomically (takeTMVar mv)) >>= \case - Left b -> runner (next b) - Right _ -> return $ Left $ + P2PHandleTMVar mv _ closedv -> do + let recv = (Just <$> takeTMVar mv) + `orElse` (readTMVar closedv >> return Nothing) + liftIO (atomically recv) >>= \case + Just (Left b) -> runner (next b) + Just (Right _) -> return $ Left $ ProtoFailureMessage "protocol error" + Nothing -> return $ Left $ + ProtoFailureMessage "connection closed" CheckAuthToken _u t next -> do let authed = connCheckAuth conn t runner (next authed)