use closedv rather than separate endv

Doesn't fix any known problem, but this way if the connection does get
closed, it will notice.
This commit is contained in:
Joey Hess 2024-07-28 15:11:31 -04:00
parent 66679c9bb4
commit 18ed4e5b20
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38

View file

@ -62,9 +62,8 @@ proxySpecialRemoteSide clientmaxversion r = mkRemoteSide r $ do
owaitv <- liftIO newEmptyTMVarIO owaitv <- liftIO newEmptyTMVarIO
iclosedv <- liftIO newEmptyTMVarIO iclosedv <- liftIO newEmptyTMVarIO
oclosedv <- liftIO newEmptyTMVarIO oclosedv <- liftIO newEmptyTMVarIO
endv <- liftIO newEmptyTMVarIO
worker <- liftIO . async =<< forkState worker <- liftIO . async =<< forkState
(proxySpecialRemote protoversion r ihdl ohdl owaitv endv) (proxySpecialRemote protoversion r ihdl ohdl owaitv oclosedv)
let remoteconn = P2PConnection let remoteconn = P2PConnection
{ connRepo = Nothing { connRepo = Nothing
, connCheckAuth = const False , connCheckAuth = const False
@ -73,7 +72,7 @@ proxySpecialRemoteSide clientmaxversion r = mkRemoteSide r $ do
, connIdent = ConnIdent (Just (Remote.name r)) , connIdent = ConnIdent (Just (Remote.name r))
} }
let closeremoteconn = do let closeremoteconn = do
liftIO $ atomically $ putTMVar endv () liftIO $ atomically $ putTMVar oclosedv ()
join $ liftIO (wait worker) join $ liftIO (wait worker)
return $ Just return $ Just
( remoterunst ( remoterunst
@ -90,7 +89,7 @@ proxySpecialRemote
-> TMVar () -> TMVar ()
-> TMVar () -> TMVar ()
-> Annex () -> Annex ()
proxySpecialRemote protoversion r ihdl ohdl owaitv endv = go proxySpecialRemote protoversion r ihdl ohdl owaitv oclosedv = go
where where
go :: Annex () go :: Annex ()
go = liftIO receivemessage >>= \case go = liftIO receivemessage >>= \case
@ -126,20 +125,25 @@ proxySpecialRemote protoversion r ihdl ohdl owaitv endv = go
Just _ -> giveup "protocol error" Just _ -> giveup "protocol error"
Nothing -> return () Nothing -> return ()
getnextmessageorend = receivemessage = liftIO (atomically recv) >>= \case
liftIO $ atomically $
(Right <$> takeTMVar ohdl)
`orElse`
(Left <$> readTMVar endv)
receivemessage = getnextmessageorend >>= \case
Right (Right m) -> return (Just m) Right (Right m) -> return (Just m)
Right (Left _b) -> giveup "unexpected ByteString received from P2P MVar" Right (Left _b) -> giveup "unexpected ByteString received from P2P MVar"
Left () -> return Nothing Left () -> return Nothing
where
recv =
(Right <$> takeTMVar ohdl)
`orElse`
(Left <$> readTMVar oclosedv)
receivebytestring = atomically (takeTMVar ohdl) >>= \case receivebytestring = atomically recv >>= \case
Left b -> return b Right (Left b) -> return b
Right _m -> giveup "did not receive ByteString from P2P MVar" Right (Right _m) -> giveup "did not receive ByteString from P2P MVar"
Left () -> giveup "connection closed"
where
recv =
(Right <$> takeTMVar ohdl)
`orElse`
(Left <$> readTMVar oclosedv)
sendmessage m = atomically $ putTMVar ihdl (Right m) sendmessage m = atomically $ putTMVar ihdl (Right m)
@ -169,7 +173,10 @@ proxySpecialRemote protoversion r ihdl ohdl owaitv endv = go
liftIO $ L.writeFile (fromRawFilePath tmpfile) b liftIO $ L.writeFile (fromRawFilePath tmpfile) b
-- Signal that the whole bytestring -- Signal that the whole bytestring
-- has been received. -- has been received.
liftIO $ atomically $ putTMVar owaitv () liftIO $ atomically $
putTMVar owaitv ()
`orElse`
readTMVar oclosedv
if protoversion > ProtocolVersion 1 if protoversion > ProtocolVersion 1
then liftIO receivemessage >>= \case then liftIO receivemessage >>= \case
Just (VALIDITY Valid) -> Just (VALIDITY Valid) ->