clean up after http p2p proxy GET is interrupted

There was an annex worker thread that did not get stopped.

It was stuck in ReceiveMessage from the P2PHandleTMVar.

Fixed by making P2PHandleTMVar closeable.

In serveGet, releaseP2PConnection has to come first, else the
annexworker may not shut down, if it's waiting to read from it.

In proxyConnection, call closeRemoteSide in order to wait for the ssh
process (for example).
This commit is contained in:
Joey Hess 2024-07-26 15:25:15 -04:00
parent 5ebbb31b36
commit 267a202e72
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
4 changed files with 60 additions and 40 deletions

View file

@ -60,14 +60,16 @@ proxySpecialRemoteSide clientmaxversion r = mkRemoteSide r $ do
ohdl <- liftIO newEmptyTMVarIO
iwaitv <- liftIO newEmptyTMVarIO
owaitv <- liftIO newEmptyTMVarIO
iclosedv <- liftIO newEmptyTMVarIO
oclosedv <- liftIO newEmptyTMVarIO
endv <- liftIO newEmptyTMVarIO
worker <- liftIO . async =<< forkState
(proxySpecialRemote protoversion r ihdl ohdl owaitv endv)
let remoteconn = P2PConnection
{ connRepo = Nothing
, connCheckAuth = const False
, connIhdl = P2PHandleTMVar ihdl (Just iwaitv)
, connOhdl = P2PHandleTMVar ohdl (Just owaitv)
, connIhdl = P2PHandleTMVar ihdl (Just iwaitv) iclosedv
, connOhdl = P2PHandleTMVar ohdl (Just owaitv) oclosedv
, connIdent = ConnIdent (Just (Remote.name r))
}
let closeremoteconn = do

View file

@ -194,8 +194,8 @@ serveGet st su apiver (B64Key k) cu bypass baf startat sec auth = do
-- Make sure the annexworker is not left blocked on endv
-- if the client disconnected early.
void $ liftIO $ atomically $ tryPutTMVar endv ()
void $ tryNonAsync $ wait annexworker
void $ tryNonAsync $ releaseP2PConnection conn
void $ tryNonAsync $ wait annexworker
sizer = pure $ Len $ case startat of
Just (Offset o) -> fromIntegral o

View file

@ -302,14 +302,14 @@ mkP2PConnectionPair connparams (n1, n2) = do
hdl2 <- newEmptyTMVarIO
wait1 <- newEmptyTMVarIO
wait2 <- newEmptyTMVarIO
let h1 = P2PHandleTMVar hdl1 $
if connectionWaitVar connparams
then Just wait1
else Nothing
let h2 = P2PHandleTMVar hdl2 $
if connectionWaitVar connparams
then Just wait2
else Nothing
closed1 <- newEmptyTMVarIO
closed2 <- newEmptyTMVarIO
let h1 = P2PHandleTMVar hdl1
(if connectionWaitVar connparams then Just wait1 else Nothing)
closed1
let h2 = P2PHandleTMVar hdl2
(if connectionWaitVar connparams then Just wait2 else Nothing)
closed2
let clientconn = P2PConnection Nothing
(const True) h2 h1
(ConnIdent (Just n1))
@ -366,10 +366,15 @@ proxyConnection relv connparams workerpool proxyconn = do
liftIO $ runNetProto proxyfromclientrunst proxyfromclientconn $
P2P.net P2P.receiveMessage
let releaseconn returntopool =
atomically $ void $ tryPutTMVar relv $
liftIO $ wait asyncworker
>>= either throwM return
let releaseconn returntopool =
atomically $ void $ tryPutTMVar relv $ do
r <- liftIO $ wait asyncworker
liftIO $ closeConnection proxyfromclientconn
liftIO $ closeConnection clientconn
inAnnexWorker' workerpool $
Proxy.closeRemoteSide $
proxyConnectionRemoteSide proxyconn
either throwM return r
return $ Right $ P2PConnectionPair
{ clientRunState = clientrunst
@ -380,16 +385,10 @@ proxyConnection relv connparams workerpool proxyconn = do
}
where
protoerrhandler cont a = a >>= \case
-- TODO protocol error, or client hung up, release the p2p
-- connection
Left err -> do
liftIO $ hPutStrLn stderr ("protoerrhandler: " ++ show err)
Proxy.closeRemoteSide $ proxyConnectionRemoteSide proxyconn
return ()
Right v -> do
liftIO $ print "protoerrhandler returned"
Proxy.closeRemoteSide $ proxyConnectionRemoteSide proxyconn
cont v
Left err ->
Proxy.closeRemoteSide $
proxyConnectionRemoteSide proxyconn
Right v -> cont v
proxydone = return ()
requestcomplete () = return ()

View file

@ -81,13 +81,17 @@ mkRunState mk = do
data P2PHandle
= P2PHandle Handle
| P2PHandleTMVar (TMVar (Either L.ByteString Message)) (Maybe (TMVar ()))
| P2PHandleTMVar
(TMVar (Either L.ByteString Message))
(Maybe (TMVar ()))
(TMVar ())
signalFullyConsumedByteString :: P2PHandle -> IO ()
signalFullyConsumedByteString (P2PHandle _) = return ()
signalFullyConsumedByteString (P2PHandleTMVar _ Nothing) = return ()
signalFullyConsumedByteString (P2PHandleTMVar _ (Just waitv)) =
signalFullyConsumedByteString (P2PHandleTMVar _ Nothing _) = return ()
signalFullyConsumedByteString (P2PHandleTMVar _ (Just waitv) closedv) =
atomically $ putTMVar waitv ()
`orElse` readTMVar closedv
data P2PConnection = P2PConnection
{ connRepo :: Maybe Repo
@ -99,6 +103,7 @@ data P2PConnection = P2PConnection
-- Identifier for a connection, only used for debugging.
newtype ConnIdent = ConnIdent (Maybe String)
deriving (Show)
data ClosableConnection conn
= OpenConnection conn
@ -146,7 +151,8 @@ closeConnection conn = do
closehandle (connOhdl conn)
where
closehandle (P2PHandle h) = hClose h
closehandle (P2PHandleTMVar _ _) = return ()
closehandle (P2PHandleTMVar _ _ closedv) =
atomically $ void $ tryPutTMVar closedv ()
-- Serves the protocol on a unix socket.
--
@ -209,8 +215,9 @@ runNet runst conn runner f = case f of
P2PHandle h -> tryNonAsync $ do
hPutStrLn h $ unwords (formatMessage m)
hFlush h
P2PHandleTMVar mv _ -> tryNonAsync $ do
P2PHandleTMVar mv _ closedv -> tryNonAsync $
atomically $ putTMVar mv (Right m)
`orElse` readTMVar closedv
case v of
Left e -> return $ Left $ ProtoFailureException e
Right () -> runner next
@ -229,10 +236,13 @@ runNet runst conn runner f = case f of
Right (Just l) -> case parseMessage l of
Just m -> gotmessage m
Nothing -> runner (next Nothing)
P2PHandleTMVar mv _ ->
liftIO (atomically (takeTMVar mv)) >>= \case
Right m -> gotmessage m
Left _b -> protoerr
P2PHandleTMVar mv _ closedv -> do
let recv = (Just <$> takeTMVar mv)
`orElse` (readTMVar closedv >> return Nothing)
liftIO (atomically recv) >>= \case
Just (Right m) -> gotmessage m
Just (Left _b) -> protoerr
Nothing -> runner (next Nothing)
SendBytes len b p next ->
case connOhdl conn of
P2PHandle h -> do
@ -245,11 +255,16 @@ runNet runst conn runner f = case f of
Right False -> return $ Left $
ProtoFailureMessage "short data write"
Left e -> return $ Left $ ProtoFailureException e
P2PHandleTMVar mv waitv -> do
P2PHandleTMVar mv waitv closedv -> do
liftIO $ atomically $ putTMVar mv (Left b)
`orElse` readTMVar closedv
-- Wait for the whole bytestring to
-- be processed.
liftIO $ maybe noop (atomically . takeTMVar) waitv
case waitv of
Nothing -> noop
Just v -> liftIO $ atomically $
takeTMVar v
`orElse` readTMVar closedv
runner next
ReceiveBytes len p next ->
case connIhdl conn of
@ -259,11 +274,15 @@ runNet runst conn runner f = case f of
Right b -> runner (next b)
Left e -> return $ Left $
ProtoFailureException e
P2PHandleTMVar mv _ ->
liftIO (atomically (takeTMVar mv)) >>= \case
Left b -> runner (next b)
Right _ -> return $ Left $
P2PHandleTMVar mv _ closedv -> do
let recv = (Just <$> takeTMVar mv)
`orElse` (readTMVar closedv >> return Nothing)
liftIO (atomically recv) >>= \case
Just (Left b) -> runner (next b)
Just (Right _) -> return $ Left $
ProtoFailureMessage "protocol error"
Nothing -> return $ Left $
ProtoFailureMessage "connection closed"
CheckAuthToken _u t next -> do
let authed = connCheckAuth conn t
runner (next authed)