diff --git a/Annex/Proxy.hs b/Annex/Proxy.hs index 2bab026bac..ba46087451 100644 --- a/Annex/Proxy.hs +++ b/Annex/Proxy.hs @@ -60,14 +60,16 @@ proxySpecialRemoteSide clientmaxversion r = mkRemoteSide r $ do ohdl <- liftIO newEmptyTMVarIO iwaitv <- liftIO newEmptyTMVarIO owaitv <- liftIO newEmptyTMVarIO + iclosedv <- liftIO newEmptyTMVarIO + oclosedv <- liftIO newEmptyTMVarIO endv <- liftIO newEmptyTMVarIO worker <- liftIO . async =<< forkState (proxySpecialRemote protoversion r ihdl ohdl owaitv endv) let remoteconn = P2PConnection { connRepo = Nothing , connCheckAuth = const False - , connIhdl = P2PHandleTMVar ihdl (Just iwaitv) - , connOhdl = P2PHandleTMVar ohdl (Just owaitv) + , connIhdl = P2PHandleTMVar ihdl (Just iwaitv) iclosedv + , connOhdl = P2PHandleTMVar ohdl (Just owaitv) oclosedv , connIdent = ConnIdent (Just (Remote.name r)) } let closeremoteconn = do diff --git a/P2P/Http/Server.hs b/P2P/Http/Server.hs index 42e06d3ee7..26a7080d9b 100644 --- a/P2P/Http/Server.hs +++ b/P2P/Http/Server.hs @@ -194,8 +194,8 @@ serveGet st su apiver (B64Key k) cu bypass baf startat sec auth = do -- Make sure the annexworker is not left blocked on endv -- if the client disconnected early. void $ liftIO $ atomically $ tryPutTMVar endv () - void $ tryNonAsync $ wait annexworker void $ tryNonAsync $ releaseP2PConnection conn + void $ tryNonAsync $ wait annexworker sizer = pure $ Len $ case startat of Just (Offset o) -> fromIntegral o diff --git a/P2P/Http/State.hs b/P2P/Http/State.hs index 19ae0c29b2..6db1d2f93d 100644 --- a/P2P/Http/State.hs +++ b/P2P/Http/State.hs @@ -302,14 +302,14 @@ mkP2PConnectionPair connparams (n1, n2) = do hdl2 <- newEmptyTMVarIO wait1 <- newEmptyTMVarIO wait2 <- newEmptyTMVarIO - let h1 = P2PHandleTMVar hdl1 $ - if connectionWaitVar connparams - then Just wait1 - else Nothing - let h2 = P2PHandleTMVar hdl2 $ - if connectionWaitVar connparams - then Just wait2 - else Nothing + closed1 <- newEmptyTMVarIO + closed2 <- newEmptyTMVarIO + let h1 = P2PHandleTMVar hdl1 + (if connectionWaitVar connparams then Just wait1 else Nothing) + closed1 + let h2 = P2PHandleTMVar hdl2 + (if connectionWaitVar connparams then Just wait2 else Nothing) + closed2 let clientconn = P2PConnection Nothing (const True) h2 h1 (ConnIdent (Just n1)) @@ -366,10 +366,15 @@ proxyConnection relv connparams workerpool proxyconn = do liftIO $ runNetProto proxyfromclientrunst proxyfromclientconn $ P2P.net P2P.receiveMessage - let releaseconn returntopool = - atomically $ void $ tryPutTMVar relv $ - liftIO $ wait asyncworker - >>= either throwM return + let releaseconn returntopool = + atomically $ void $ tryPutTMVar relv $ do + r <- liftIO $ wait asyncworker + liftIO $ closeConnection proxyfromclientconn + liftIO $ closeConnection clientconn + inAnnexWorker' workerpool $ + Proxy.closeRemoteSide $ + proxyConnectionRemoteSide proxyconn + either throwM return r return $ Right $ P2PConnectionPair { clientRunState = clientrunst @@ -380,16 +385,10 @@ proxyConnection relv connparams workerpool proxyconn = do } where protoerrhandler cont a = a >>= \case - -- TODO protocol error, or client hung up, release the p2p - -- connection - Left err -> do - liftIO $ hPutStrLn stderr ("protoerrhandler: " ++ show err) - Proxy.closeRemoteSide $ proxyConnectionRemoteSide proxyconn - return () - Right v -> do - liftIO $ print "protoerrhandler returned" - Proxy.closeRemoteSide $ proxyConnectionRemoteSide proxyconn - cont v + Left err -> + Proxy.closeRemoteSide $ + proxyConnectionRemoteSide proxyconn + Right v -> cont v proxydone = return () requestcomplete () = return () diff --git a/P2P/IO.hs b/P2P/IO.hs index 75a323a0d9..9158c7b6d1 100644 --- a/P2P/IO.hs +++ b/P2P/IO.hs @@ -81,13 +81,17 @@ mkRunState mk = do data P2PHandle = P2PHandle Handle - | P2PHandleTMVar (TMVar (Either L.ByteString Message)) (Maybe (TMVar ())) + | P2PHandleTMVar + (TMVar (Either L.ByteString Message)) + (Maybe (TMVar ())) + (TMVar ()) signalFullyConsumedByteString :: P2PHandle -> IO () signalFullyConsumedByteString (P2PHandle _) = return () -signalFullyConsumedByteString (P2PHandleTMVar _ Nothing) = return () -signalFullyConsumedByteString (P2PHandleTMVar _ (Just waitv)) = +signalFullyConsumedByteString (P2PHandleTMVar _ Nothing _) = return () +signalFullyConsumedByteString (P2PHandleTMVar _ (Just waitv) closedv) = atomically $ putTMVar waitv () + `orElse` readTMVar closedv data P2PConnection = P2PConnection { connRepo :: Maybe Repo @@ -99,6 +103,7 @@ data P2PConnection = P2PConnection -- Identifier for a connection, only used for debugging. newtype ConnIdent = ConnIdent (Maybe String) + deriving (Show) data ClosableConnection conn = OpenConnection conn @@ -146,7 +151,8 @@ closeConnection conn = do closehandle (connOhdl conn) where closehandle (P2PHandle h) = hClose h - closehandle (P2PHandleTMVar _ _) = return () + closehandle (P2PHandleTMVar _ _ closedv) = + atomically $ void $ tryPutTMVar closedv () -- Serves the protocol on a unix socket. -- @@ -209,8 +215,9 @@ runNet runst conn runner f = case f of P2PHandle h -> tryNonAsync $ do hPutStrLn h $ unwords (formatMessage m) hFlush h - P2PHandleTMVar mv _ -> tryNonAsync $ do + P2PHandleTMVar mv _ closedv -> tryNonAsync $ atomically $ putTMVar mv (Right m) + `orElse` readTMVar closedv case v of Left e -> return $ Left $ ProtoFailureException e Right () -> runner next @@ -229,10 +236,13 @@ runNet runst conn runner f = case f of Right (Just l) -> case parseMessage l of Just m -> gotmessage m Nothing -> runner (next Nothing) - P2PHandleTMVar mv _ -> - liftIO (atomically (takeTMVar mv)) >>= \case - Right m -> gotmessage m - Left _b -> protoerr + P2PHandleTMVar mv _ closedv -> do + let recv = (Just <$> takeTMVar mv) + `orElse` (readTMVar closedv >> return Nothing) + liftIO (atomically recv) >>= \case + Just (Right m) -> gotmessage m + Just (Left _b) -> protoerr + Nothing -> runner (next Nothing) SendBytes len b p next -> case connOhdl conn of P2PHandle h -> do @@ -245,11 +255,16 @@ runNet runst conn runner f = case f of Right False -> return $ Left $ ProtoFailureMessage "short data write" Left e -> return $ Left $ ProtoFailureException e - P2PHandleTMVar mv waitv -> do + P2PHandleTMVar mv waitv closedv -> do liftIO $ atomically $ putTMVar mv (Left b) + `orElse` readTMVar closedv -- Wait for the whole bytestring to -- be processed. - liftIO $ maybe noop (atomically . takeTMVar) waitv + case waitv of + Nothing -> noop + Just v -> liftIO $ atomically $ + takeTMVar v + `orElse` readTMVar closedv runner next ReceiveBytes len p next -> case connIhdl conn of @@ -259,11 +274,15 @@ runNet runst conn runner f = case f of Right b -> runner (next b) Left e -> return $ Left $ ProtoFailureException e - P2PHandleTMVar mv _ -> - liftIO (atomically (takeTMVar mv)) >>= \case - Left b -> runner (next b) - Right _ -> return $ Left $ + P2PHandleTMVar mv _ closedv -> do + let recv = (Just <$> takeTMVar mv) + `orElse` (readTMVar closedv >> return Nothing) + liftIO (atomically recv) >>= \case + Just (Left b) -> runner (next b) + Just (Right _) -> return $ Left $ ProtoFailureMessage "protocol error" + Nothing -> return $ Left $ + ProtoFailureMessage "connection closed" CheckAuthToken _u t next -> do let authed = connCheckAuth conn t runner (next authed)