fix serveGet hang

This came down to SendBytes waiting on the waitv. Nothing ever filled
it.

Only Annex.Proxy needs the waitv, and it handles filling it. So make it
optional.
This commit is contained in:
Joey Hess 2024-07-11 07:46:52 -04:00
parent 80fb5445b5
commit 3b37b9e53f
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
6 changed files with 34 additions and 45 deletions

View file

@ -59,8 +59,8 @@ proxySpecialRemoteSide clientmaxversion r = mkRemoteSide r $ do
let remoteconn = P2PConnection let remoteconn = P2PConnection
{ connRepo = Nothing { connRepo = Nothing
, connCheckAuth = const False , connCheckAuth = const False
, connIhdl = P2PHandleTMVar ihdl iwaitv , connIhdl = P2PHandleTMVar ihdl (Just iwaitv)
, connOhdl = P2PHandleTMVar ohdl owaitv , connOhdl = P2PHandleTMVar ohdl (Just owaitv)
, connIdent = ConnIdent (Just (Remote.name r)) , connIdent = ConnIdent (Just (Remote.name r))
} }
let closeremoteconn = do let closeremoteconn = do

View file

@ -107,23 +107,16 @@ runLocal runst runner a = case a of
ProtoFailureMessage "Transfer failed" ProtoFailureMessage "Transfer failed"
let consumer' b ti = do let consumer' b ti = do
validator <- consumer b validator <- consumer b
liftIO $ print "got validator"
indicatetransferred ti indicatetransferred ti
liftIO $ print "indicatetransferred ti done"
return validator return validator
runner getb >>= \case runner getb >>= \case
Left e -> giveup $ describeProtoFailure e Left e -> giveup $ describeProtoFailure e
Right b -> checktransfer (\ti -> Right <$> consumer' b ti) fallback >>= \case Right b -> checktransfer (\ti -> Right <$> consumer' b ti) fallback >>= \case
Left e -> return (Left e) Left e -> return (Left e)
Right validator -> do Right validator ->
liftIO $ print "running validity check"
runner validitycheck >>= \case runner validitycheck >>= \case
Right v -> do Right v -> Right <$> validator v
liftIO $ print ("calling validator 1", v) _ -> Right <$> validator Nothing
Right <$> validator v
_ -> do
liftIO $ print "calling validator nothing"
Right <$> validator Nothing
case v of case v of
Left e -> return $ Left $ ProtoFailureException e Left e -> return $ Left $ ProtoFailureException e
Right (Left e) -> return $ Left e Right (Left e) -> return $ Left e

View file

@ -156,31 +156,24 @@ serveGet st apiver (B64Key k) cu su bypass baf startat sec auth = do
endv <- liftIO newEmptyTMVarIO endv <- liftIO newEmptyTMVarIO
validityv <- liftIO newEmptyTMVarIO validityv <- liftIO newEmptyTMVarIO
aid <- liftIO $ async $ inAnnexWorker st $ do aid <- liftIO $ async $ inAnnexWorker st $ do
let consumer bs = do let storer _offset len = sendContentWith $ \bs -> do
liftIO $ atomically $ putTMVar bsv bs liftIO $ atomically $ putTMVar bsv (len, bs)
liftIO $ print "consumer waiting for endv"
liftIO $ atomically $ takeTMVar endv liftIO $ atomically $ takeTMVar endv
liftIO $ print "consumer took endv"
return $ \v -> do return $ \v -> do
liftIO $ print "consumer put validityv" liftIO $ atomically $ putTMVar validityv v
liftIO $ atomically $
putTMVar validityv v
return True return True
let storer _offset _len getdata checkvalidity =
sendContentWith consumer getdata checkvalidity
enteringStage (TransferStage Upload) $ enteringStage (TransferStage Upload) $
runFullProto runst conn $ runFullProto runst conn $
void $ receiveContent Nothing nullMeterUpdate void $ receiveContent Nothing nullMeterUpdate
sizer storer getreq sizer storer getreq
bs <- liftIO $ atomically $ takeTMVar bsv (Len len, bs) <- liftIO $ atomically $ takeTMVar bsv
bv <- liftIO $ newMVar (L.toChunks bs) bv <- liftIO $ newMVar (L.toChunks bs)
let streamer = S.SourceT $ \s -> s =<< return let streamer = S.SourceT $ \s -> s =<< return
(stream (releaseconn, bv, endv, validityv, aid)) (stream (releaseconn, bv, endv, validityv, aid))
return $ addHeader 111111 streamer return $ addHeader len streamer
where where
stream (releaseconn, bv, endv, validityv, aid) = stream (releaseconn, bv, endv, validityv, aid) =
S.fromActionStep B.null $ do S.fromActionStep B.null $
print "chunk"
modifyMVar bv $ nextchunk $ modifyMVar bv $ nextchunk $
cleanup (releaseconn, endv, validityv, aid) cleanup (releaseconn, endv, validityv, aid)
@ -194,11 +187,8 @@ serveGet st apiver (B64Key k) cu su bypass baf startat sec auth = do
cleanup (releaseconn, endv, validityv, aid) = cleanup (releaseconn, endv, validityv, aid) =
ifM (atomically $ isEmptyTMVar endv) ifM (atomically $ isEmptyTMVar endv)
( do ( do
print "at end"
atomically $ putTMVar endv () atomically $ putTMVar endv ()
print "signaled end"
validity <- atomically $ takeTMVar validityv validity <- atomically $ takeTMVar validityv
print ("got validity", validity)
wait aid >>= \case wait aid >>= \case
Left ex -> throwM ex Left ex -> throwM ex
Right (Left err) -> error $ Right (Left err) -> error $
@ -263,6 +253,7 @@ gatherbytestring x = do
go (S.Effect ms) = do go (S.Effect ms) = do
ms >>= go ms >>= go
go (S.Yield v s) = do go (S.Yield v s) = do
liftIO $ print ("chunk", B.length v)
LI.Chunk v <$> unsafeInterleaveIO (go s) LI.Chunk v <$> unsafeInterleaveIO (go s)
clientGet' clientGet'

View file

@ -177,35 +177,38 @@ withLocalP2PConnections a = do
else do else do
hdl1 <- liftIO newEmptyTMVarIO hdl1 <- liftIO newEmptyTMVarIO
hdl2 <- liftIO newEmptyTMVarIO hdl2 <- liftIO newEmptyTMVarIO
waitv1 <- liftIO newEmptyTMVarIO let h1 = P2PHandleTMVar hdl1 Nothing
waitv2 <- liftIO newEmptyTMVarIO let h2 = P2PHandleTMVar hdl2 Nothing
let h1 = P2PHandleTMVar hdl1 waitv1
let h2 = P2PHandleTMVar hdl2 waitv2
let serverconn = P2PConnection Nothing let serverconn = P2PConnection Nothing
(const True) h1 h2 (const True) h1 h2
(ConnIdent (Just "http server")) (ConnIdent (Just "http server"))
let clientconn = P2PConnection Nothing let clientconn = P2PConnection Nothing
(const True) h2 h1 (const True) h2 h1
(ConnIdent (Just "http client")) (ConnIdent (Just "http client"))
runst <- liftIO $ mkrunst connparams clientrunst <- liftIO $ mkclientrunst connparams
serverrunst <- liftIO $ mkserverrunst connparams
let server = P2P.serveOneCommandAuthed let server = P2P.serveOneCommandAuthed
(connectionServerMode connparams) (connectionServerMode connparams)
(connectionServerUUID connparams) (connectionServerUUID connparams)
let protorunner = void $ let protorunner = void $
runFullProto runst serverconn server runFullProto serverrunst serverconn server
asyncworker <- liftIO . async asyncworker <- liftIO . async
=<< forkState protorunner =<< forkState protorunner
let releaseconn = atomically $ putTMVar relv $ let releaseconn = atomically $ putTMVar relv $
join (liftIO (wait asyncworker)) join (liftIO (wait asyncworker))
return $ Right (runst, clientconn, releaseconn) return $ Right (clientrunst, clientconn, releaseconn)
liftIO $ atomically $ putTMVar respvar resp liftIO $ atomically $ putTMVar respvar resp
mkrunst connparams = do mkserverrunst connparams = do
prototvar <- newTVarIO $ connectionProtocolVersion connparams prototvar <- newTVarIO $ connectionProtocolVersion connparams
mkRunState $ const $ Serving mkRunState $ const $ Serving
(connectionClientUUID connparams) (connectionClientUUID connparams)
Nothing Nothing
prototvar prototvar
mkclientrunst connparams = do
prototvar <- newTVarIO $ connectionProtocolVersion connparams
mkRunState $ const $ Client prototvar
data Locker = Locker data Locker = Locker
{ lockerThread :: Async () { lockerThread :: Async ()

View file

@ -79,7 +79,7 @@ mkRunState mk = do
data P2PHandle data P2PHandle
= P2PHandle Handle = P2PHandle Handle
| P2PHandleTMVar (TMVar (Either L.ByteString Message)) (TMVar ()) | P2PHandleTMVar (TMVar (Either L.ByteString Message)) (Maybe (TMVar ()))
data P2PConnection = P2PConnection data P2PConnection = P2PConnection
{ connRepo :: Maybe Repo { connRepo :: Maybe Repo
@ -217,7 +217,7 @@ runNet runst conn runner f = case f of
Right () -> runner next Right () -> runner next
ReceiveMessage next -> ReceiveMessage next ->
let protoerr = return $ Left $ let protoerr = return $ Left $
ProtoFailureMessage "protocol error 1" ProtoFailureMessage "protocol error"
gotmessage m = do gotmessage m = do
liftIO $ debugMessage conn "P2P <" m liftIO $ debugMessage conn "P2P <" m
runner (next (Just m)) runner (next (Just m))
@ -246,11 +246,14 @@ runNet runst conn runner f = case f of
Right False -> return $ Left $ Right False -> return $ Left $
ProtoFailureMessage "short data write" ProtoFailureMessage "short data write"
Left e -> return $ Left $ ProtoFailureException e Left e -> return $ Left $ ProtoFailureException e
P2PHandleTMVar mv waitv -> do P2PHandleTMVar mv mwaitv -> do
liftIO $ atomically $ putTMVar mv (Left b) liftIO $ atomically $ putTMVar mv (Left b)
-- Wait for the whole bytestring to be case mwaitv of
-- processed. Necessary due to lazyiness. -- Wait for the whole bytestring to
liftIO $ atomically $ takeTMVar waitv -- be processed.
Just waitv -> liftIO $ atomically $
takeTMVar waitv
Nothing -> return ()
runner next runner next
ReceiveBytes len p next -> ReceiveBytes len p next ->
case connIhdl conn of case connIhdl conn of
@ -264,7 +267,7 @@ runNet runst conn runner f = case f of
liftIO (atomically (takeTMVar mv)) >>= \case liftIO (atomically (takeTMVar mv)) >>= \case
Left b -> runner (next b) Left b -> runner (next b)
Right _ -> return $ Left $ Right _ -> return $ Left $
ProtoFailureMessage "protocol error 2" ProtoFailureMessage "protocol error"
CheckAuthToken _u t next -> do CheckAuthToken _u t next -> do
let authed = connCheckAuth conn t let authed = connCheckAuth conn t
runner (next authed) runner (next authed)

View file

@ -31,8 +31,7 @@ Planned schedule of work:
* http server and client are working, remaining * http server and client are working, remaining
server API endpoints need wiring up and testing. server API endpoints need wiring up and testing.
* serveGet works as proof of concept, but is very buggy. * serveGet needs to handle invalidation
See commit 1e0f92a5a1ccf7ff4c51c67c27a826709a99301b
* I have a file `servant.hs` in the httpproto branch that works through some * I have a file `servant.hs` in the httpproto branch that works through some
of the bytestring streaming issues. of the bytestring streaming issues.