From 3b37b9e53fa8bc16e99f12c00c6811a3e1c32ce6 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Thu, 11 Jul 2024 07:46:52 -0400 Subject: [PATCH] fix serveGet hang This came down to SendBytes waiting on the waitv. Nothing ever filled it. Only Annex.Proxy needs the waitv, and it handles filling it. So make it optional. --- Annex/Proxy.hs | 4 ++-- P2P/Annex.hs | 13 +++---------- P2P/Http.hs | 23 +++++++---------------- P2P/Http/State.hs | 19 +++++++++++-------- P2P/IO.hs | 17 ++++++++++------- doc/todo/git-annex_proxies.mdwn | 3 +-- 6 files changed, 34 insertions(+), 45 deletions(-) diff --git a/Annex/Proxy.hs b/Annex/Proxy.hs index 4563579ef2..ffa732a08d 100644 --- a/Annex/Proxy.hs +++ b/Annex/Proxy.hs @@ -59,8 +59,8 @@ proxySpecialRemoteSide clientmaxversion r = mkRemoteSide r $ do let remoteconn = P2PConnection { connRepo = Nothing , connCheckAuth = const False - , connIhdl = P2PHandleTMVar ihdl iwaitv - , connOhdl = P2PHandleTMVar ohdl owaitv + , connIhdl = P2PHandleTMVar ihdl (Just iwaitv) + , connOhdl = P2PHandleTMVar ohdl (Just owaitv) , connIdent = ConnIdent (Just (Remote.name r)) } let closeremoteconn = do diff --git a/P2P/Annex.hs b/P2P/Annex.hs index d922b28e50..59428db8c9 100644 --- a/P2P/Annex.hs +++ b/P2P/Annex.hs @@ -107,23 +107,16 @@ runLocal runst runner a = case a of ProtoFailureMessage "Transfer failed" let consumer' b ti = do validator <- consumer b - liftIO $ print "got validator" indicatetransferred ti - liftIO $ print "indicatetransferred ti done" return validator runner getb >>= \case Left e -> giveup $ describeProtoFailure e Right b -> checktransfer (\ti -> Right <$> consumer' b ti) fallback >>= \case Left e -> return (Left e) - Right validator -> do - liftIO $ print "running validity check" + Right validator -> runner validitycheck >>= \case - Right v -> do - liftIO $ print ("calling validator 1", v) - Right <$> validator v - _ -> do - liftIO $ print "calling validator nothing" - Right <$> validator Nothing + Right v -> Right <$> validator v + _ -> Right <$> validator Nothing case v of Left e -> return $ Left $ ProtoFailureException e Right (Left e) -> return $ Left e diff --git a/P2P/Http.hs b/P2P/Http.hs index 7e5092fdb4..c0728fde11 100644 --- a/P2P/Http.hs +++ b/P2P/Http.hs @@ -156,31 +156,24 @@ serveGet st apiver (B64Key k) cu su bypass baf startat sec auth = do endv <- liftIO newEmptyTMVarIO validityv <- liftIO newEmptyTMVarIO aid <- liftIO $ async $ inAnnexWorker st $ do - let consumer bs = do - liftIO $ atomically $ putTMVar bsv bs - liftIO $ print "consumer waiting for endv" + let storer _offset len = sendContentWith $ \bs -> do + liftIO $ atomically $ putTMVar bsv (len, bs) liftIO $ atomically $ takeTMVar endv - liftIO $ print "consumer took endv" return $ \v -> do - liftIO $ print "consumer put validityv" - liftIO $ atomically $ - putTMVar validityv v + liftIO $ atomically $ putTMVar validityv v return True - let storer _offset _len getdata checkvalidity = - sendContentWith consumer getdata checkvalidity enteringStage (TransferStage Upload) $ runFullProto runst conn $ void $ receiveContent Nothing nullMeterUpdate sizer storer getreq - bs <- liftIO $ atomically $ takeTMVar bsv + (Len len, bs) <- liftIO $ atomically $ takeTMVar bsv bv <- liftIO $ newMVar (L.toChunks bs) let streamer = S.SourceT $ \s -> s =<< return (stream (releaseconn, bv, endv, validityv, aid)) - return $ addHeader 111111 streamer + return $ addHeader len streamer where stream (releaseconn, bv, endv, validityv, aid) = - S.fromActionStep B.null $ do - print "chunk" + S.fromActionStep B.null $ modifyMVar bv $ nextchunk $ cleanup (releaseconn, endv, validityv, aid) @@ -194,11 +187,8 @@ serveGet st apiver (B64Key k) cu su bypass baf startat sec auth = do cleanup (releaseconn, endv, validityv, aid) = ifM (atomically $ isEmptyTMVar endv) ( do - print "at end" atomically $ putTMVar endv () - print "signaled end" validity <- atomically $ takeTMVar validityv - print ("got validity", validity) wait aid >>= \case Left ex -> throwM ex Right (Left err) -> error $ @@ -263,6 +253,7 @@ gatherbytestring x = do go (S.Effect ms) = do ms >>= go go (S.Yield v s) = do + liftIO $ print ("chunk", B.length v) LI.Chunk v <$> unsafeInterleaveIO (go s) clientGet' diff --git a/P2P/Http/State.hs b/P2P/Http/State.hs index d3e00dfa21..8e9cae3fa6 100644 --- a/P2P/Http/State.hs +++ b/P2P/Http/State.hs @@ -177,35 +177,38 @@ withLocalP2PConnections a = do else do hdl1 <- liftIO newEmptyTMVarIO hdl2 <- liftIO newEmptyTMVarIO - waitv1 <- liftIO newEmptyTMVarIO - waitv2 <- liftIO newEmptyTMVarIO - let h1 = P2PHandleTMVar hdl1 waitv1 - let h2 = P2PHandleTMVar hdl2 waitv2 + let h1 = P2PHandleTMVar hdl1 Nothing + let h2 = P2PHandleTMVar hdl2 Nothing let serverconn = P2PConnection Nothing (const True) h1 h2 (ConnIdent (Just "http server")) let clientconn = P2PConnection Nothing (const True) h2 h1 (ConnIdent (Just "http client")) - runst <- liftIO $ mkrunst connparams + clientrunst <- liftIO $ mkclientrunst connparams + serverrunst <- liftIO $ mkserverrunst connparams let server = P2P.serveOneCommandAuthed (connectionServerMode connparams) (connectionServerUUID connparams) let protorunner = void $ - runFullProto runst serverconn server + runFullProto serverrunst serverconn server asyncworker <- liftIO . async =<< forkState protorunner let releaseconn = atomically $ putTMVar relv $ join (liftIO (wait asyncworker)) - return $ Right (runst, clientconn, releaseconn) + return $ Right (clientrunst, clientconn, releaseconn) liftIO $ atomically $ putTMVar respvar resp - mkrunst connparams = do + mkserverrunst connparams = do prototvar <- newTVarIO $ connectionProtocolVersion connparams mkRunState $ const $ Serving (connectionClientUUID connparams) Nothing prototvar + + mkclientrunst connparams = do + prototvar <- newTVarIO $ connectionProtocolVersion connparams + mkRunState $ const $ Client prototvar data Locker = Locker { lockerThread :: Async () diff --git a/P2P/IO.hs b/P2P/IO.hs index cd725c6e9e..66a4c08fea 100644 --- a/P2P/IO.hs +++ b/P2P/IO.hs @@ -79,7 +79,7 @@ mkRunState mk = do data P2PHandle = P2PHandle Handle - | P2PHandleTMVar (TMVar (Either L.ByteString Message)) (TMVar ()) + | P2PHandleTMVar (TMVar (Either L.ByteString Message)) (Maybe (TMVar ())) data P2PConnection = P2PConnection { connRepo :: Maybe Repo @@ -217,7 +217,7 @@ runNet runst conn runner f = case f of Right () -> runner next ReceiveMessage next -> let protoerr = return $ Left $ - ProtoFailureMessage "protocol error 1" + ProtoFailureMessage "protocol error" gotmessage m = do liftIO $ debugMessage conn "P2P <" m runner (next (Just m)) @@ -246,11 +246,14 @@ runNet runst conn runner f = case f of Right False -> return $ Left $ ProtoFailureMessage "short data write" Left e -> return $ Left $ ProtoFailureException e - P2PHandleTMVar mv waitv -> do + P2PHandleTMVar mv mwaitv -> do liftIO $ atomically $ putTMVar mv (Left b) - -- Wait for the whole bytestring to be - -- processed. Necessary due to lazyiness. - liftIO $ atomically $ takeTMVar waitv + case mwaitv of + -- Wait for the whole bytestring to + -- be processed. + Just waitv -> liftIO $ atomically $ + takeTMVar waitv + Nothing -> return () runner next ReceiveBytes len p next -> case connIhdl conn of @@ -264,7 +267,7 @@ runNet runst conn runner f = case f of liftIO (atomically (takeTMVar mv)) >>= \case Left b -> runner (next b) Right _ -> return $ Left $ - ProtoFailureMessage "protocol error 2" + ProtoFailureMessage "protocol error" CheckAuthToken _u t next -> do let authed = connCheckAuth conn t runner (next authed) diff --git a/doc/todo/git-annex_proxies.mdwn b/doc/todo/git-annex_proxies.mdwn index 1cea7d8b28..c1d16bb339 100644 --- a/doc/todo/git-annex_proxies.mdwn +++ b/doc/todo/git-annex_proxies.mdwn @@ -31,8 +31,7 @@ Planned schedule of work: * http server and client are working, remaining server API endpoints need wiring up and testing. -* serveGet works as proof of concept, but is very buggy. - See commit 1e0f92a5a1ccf7ff4c51c67c27a826709a99301b +* serveGet needs to handle invalidation * I have a file `servant.hs` in the httpproto branch that works through some of the bytestring streaming issues.