shut down RemoteSides cleanly
Before it just exited without actually shutting down the RemoteSides, when the client hung up.
This commit is contained in:
parent
c3a785204e
commit
62750f0102
3 changed files with 27 additions and 19 deletions
|
@ -35,11 +35,11 @@ proxyCluster
|
||||||
-> CommandPerform
|
-> CommandPerform
|
||||||
-> ServerMode
|
-> ServerMode
|
||||||
-> ClientSide
|
-> ClientSide
|
||||||
-> (forall a. ((a -> CommandPerform) -> Annex (Either ProtoFailure a) -> CommandPerform))
|
-> (forall a. Annex () -> ((a -> CommandPerform) -> Annex (Either ProtoFailure a) -> CommandPerform))
|
||||||
-> CommandPerform
|
-> CommandPerform
|
||||||
proxyCluster clusteruuid proxydone servermode clientside protoerrhandler = do
|
proxyCluster clusteruuid proxydone servermode clientside protoerrhandler = do
|
||||||
getClientProtocolVersion (fromClusterUUID clusteruuid) clientside
|
getClientProtocolVersion (fromClusterUUID clusteruuid) clientside
|
||||||
withclientversion protoerrhandler
|
withclientversion (protoerrhandler noop)
|
||||||
where
|
where
|
||||||
proxymethods = ProxyMethods
|
proxymethods = ProxyMethods
|
||||||
{ removedContent = \u k -> logChange k u InfoMissing
|
{ removedContent = \u k -> logChange k u InfoMissing
|
||||||
|
@ -56,22 +56,23 @@ proxyCluster clusteruuid proxydone servermode clientside protoerrhandler = do
|
||||||
-- versions.
|
-- versions.
|
||||||
let protocolversion = min maxProtocolVersion clientmaxversion
|
let protocolversion = min maxProtocolVersion clientmaxversion
|
||||||
sendClientProtocolVersion clientside othermsg protocolversion
|
sendClientProtocolVersion clientside othermsg protocolversion
|
||||||
(getclientbypass protocolversion) protoerrhandler
|
(getclientbypass protocolversion) (protoerrhandler noop)
|
||||||
withclientversion Nothing = proxydone
|
withclientversion Nothing = proxydone
|
||||||
|
|
||||||
getclientbypass protocolversion othermsg =
|
getclientbypass protocolversion othermsg =
|
||||||
getClientBypass clientside protocolversion othermsg
|
getClientBypass clientside protocolversion othermsg
|
||||||
(withclientbypass protocolversion) protoerrhandler
|
(withclientbypass protocolversion) (protoerrhandler noop)
|
||||||
|
|
||||||
withclientbypass protocolversion (bypassuuids, othermsg) = do
|
withclientbypass protocolversion (bypassuuids, othermsg) = do
|
||||||
selectnode <- clusterProxySelector clusteruuid protocolversion bypassuuids
|
(selectnode, closenodes) <- clusterProxySelector clusteruuid
|
||||||
|
protocolversion bypassuuids
|
||||||
concurrencyconfig <- getConcurrencyConfig
|
concurrencyconfig <- getConcurrencyConfig
|
||||||
proxy proxydone proxymethods servermode clientside
|
proxy proxydone proxymethods servermode clientside
|
||||||
(fromClusterUUID clusteruuid)
|
(fromClusterUUID clusteruuid)
|
||||||
selectnode concurrencyconfig protocolversion
|
selectnode concurrencyconfig protocolversion
|
||||||
othermsg protoerrhandler
|
othermsg (protoerrhandler closenodes)
|
||||||
|
|
||||||
clusterProxySelector :: ClusterUUID -> ProtocolVersion -> Bypass -> Annex ProxySelector
|
clusterProxySelector :: ClusterUUID -> ProtocolVersion -> Bypass -> Annex (ProxySelector, Annex ())
|
||||||
clusterProxySelector clusteruuid protocolversion (Bypass bypass) = do
|
clusterProxySelector clusteruuid protocolversion (Bypass bypass) = do
|
||||||
nodeuuids <- (fromMaybe S.empty . M.lookup clusteruuid . clusterUUIDs)
|
nodeuuids <- (fromMaybe S.empty . M.lookup clusteruuid . clusterUUIDs)
|
||||||
<$> getClusters
|
<$> getClusters
|
||||||
|
@ -85,8 +86,9 @@ clusterProxySelector clusteruuid protocolversion (Bypass bypass) = do
|
||||||
, "connecting to", show (map Remote.name clusterremotes)
|
, "connecting to", show (map Remote.name clusterremotes)
|
||||||
, "bypass", show (S.toList bypass)
|
, "bypass", show (S.toList bypass)
|
||||||
]
|
]
|
||||||
nodes <- mapM (proxySshRemoteSide protocolversion (Bypass bypass')) clusterremotes
|
nodes <- mapM (proxyRemoteSide protocolversion (Bypass bypass')) clusterremotes
|
||||||
return $ ProxySelector
|
let closenodes = mapM_ closeRemoteSide nodes
|
||||||
|
let proxyselector = ProxySelector
|
||||||
{ proxyCHECKPRESENT = nodecontaining nodes
|
{ proxyCHECKPRESENT = nodecontaining nodes
|
||||||
, proxyGET = nodecontaining nodes
|
, proxyGET = nodecontaining nodes
|
||||||
-- The key is sent to multiple nodes at the same time,
|
-- The key is sent to multiple nodes at the same time,
|
||||||
|
@ -111,6 +113,7 @@ clusterProxySelector clusteruuid protocolversion (Bypass bypass) = do
|
||||||
, proxyLOCKCONTENT = const (pure Nothing)
|
, proxyLOCKCONTENT = const (pure Nothing)
|
||||||
, proxyUNLOCKCONTENT = pure Nothing
|
, proxyUNLOCKCONTENT = pure Nothing
|
||||||
}
|
}
|
||||||
|
return (proxyselector, closenodes)
|
||||||
where
|
where
|
||||||
-- Nodes of the cluster have remote.name.annex-cluster-node
|
-- Nodes of the cluster have remote.name.annex-cluster-node
|
||||||
-- containing its name.
|
-- containing its name.
|
||||||
|
|
|
@ -57,32 +57,33 @@ performLocal theiruuid servermode = do
|
||||||
P2P.net $ P2P.sendMessage (P2P.AUTH_SUCCESS myuuid)
|
P2P.net $ P2P.sendMessage (P2P.AUTH_SUCCESS myuuid)
|
||||||
P2P.serveAuthed servermode myuuid
|
P2P.serveAuthed servermode myuuid
|
||||||
runst <- liftIO $ mkRunState $ Serving theiruuid Nothing
|
runst <- liftIO $ mkRunState $ Serving theiruuid Nothing
|
||||||
p2pErrHandler (const p2pDone) (runFullProto runst conn server)
|
p2pErrHandler noop (const p2pDone) (runFullProto runst conn server)
|
||||||
|
|
||||||
performProxy :: UUID -> P2P.ServerMode -> Remote -> CommandPerform
|
performProxy :: UUID -> P2P.ServerMode -> Remote -> CommandPerform
|
||||||
performProxy clientuuid servermode r = do
|
performProxy clientuuid servermode r = do
|
||||||
clientside <- proxyClientSide clientuuid
|
clientside <- proxyClientSide clientuuid
|
||||||
getClientProtocolVersion (Remote.uuid r) clientside
|
getClientProtocolVersion (Remote.uuid r) clientside
|
||||||
(withclientversion clientside)
|
(withclientversion clientside)
|
||||||
p2pErrHandler
|
(p2pErrHandler noop)
|
||||||
where
|
where
|
||||||
withclientversion clientside (Just (clientmaxversion, othermsg)) = do
|
withclientversion clientside (Just (clientmaxversion, othermsg)) = do
|
||||||
remoteside <- proxyRemoteSide clientmaxversion mempty r
|
remoteside <- proxyRemoteSide clientmaxversion mempty r
|
||||||
protocolversion <- either (const (min P2P.maxProtocolVersion clientmaxversion)) id
|
protocolversion <- either (const (min P2P.maxProtocolVersion clientmaxversion)) id
|
||||||
<$> runRemoteSide remoteside
|
<$> runRemoteSide remoteside
|
||||||
(P2P.net P2P.getProtocolVersion)
|
(P2P.net P2P.getProtocolVersion)
|
||||||
|
concurrencyconfig <- noConcurrencyConfig
|
||||||
let closer = do
|
let closer = do
|
||||||
closeRemoteSide remoteside
|
closeRemoteSide remoteside
|
||||||
p2pDone
|
p2pDone
|
||||||
concurrencyconfig <- noConcurrencyConfig
|
let errhandler = p2pErrHandler (closeRemoteSide remoteside)
|
||||||
let runproxy othermsg' = proxy closer proxymethods
|
let runproxy othermsg' = proxy closer proxymethods
|
||||||
servermode clientside
|
servermode clientside
|
||||||
(Remote.uuid r)
|
(Remote.uuid r)
|
||||||
(singleProxySelector remoteside)
|
(singleProxySelector remoteside)
|
||||||
concurrencyconfig
|
concurrencyconfig
|
||||||
protocolversion othermsg' p2pErrHandler
|
protocolversion othermsg' errhandler
|
||||||
sendClientProtocolVersion clientside othermsg protocolversion
|
sendClientProtocolVersion clientside othermsg protocolversion
|
||||||
runproxy p2pErrHandler
|
runproxy errhandler
|
||||||
withclientversion _ Nothing = p2pDone
|
withclientversion _ Nothing = p2pDone
|
||||||
|
|
||||||
proxymethods = ProxyMethods
|
proxymethods = ProxyMethods
|
||||||
|
@ -100,11 +101,15 @@ proxyClientSide clientuuid = do
|
||||||
clientrunst <- liftIO (mkRunState $ Serving clientuuid Nothing)
|
clientrunst <- liftIO (mkRunState $ Serving clientuuid Nothing)
|
||||||
return $ ClientSide clientrunst (stdioP2PConnection Nothing)
|
return $ ClientSide clientrunst (stdioP2PConnection Nothing)
|
||||||
|
|
||||||
p2pErrHandler :: (a -> CommandPerform) -> Annex (Either ProtoFailure a) -> CommandPerform
|
p2pErrHandler :: Annex () -> (a -> CommandPerform) -> Annex (Either ProtoFailure a) -> CommandPerform
|
||||||
p2pErrHandler cont a = a >>= \case
|
p2pErrHandler closeconn cont a = a >>= \case
|
||||||
-- Avoid displaying an error when the client hung up on us.
|
-- Avoid displaying an error when the client hung up on us.
|
||||||
Left (ProtoFailureIOError e) | isEOFError e -> p2pDone
|
Left (ProtoFailureIOError e) | isEOFError e -> do
|
||||||
Left e -> giveup (describeProtoFailure e)
|
closeconn
|
||||||
|
p2pDone
|
||||||
|
Left e -> do
|
||||||
|
closeconn
|
||||||
|
giveup (describeProtoFailure e)
|
||||||
Right v -> cont v
|
Right v -> cont v
|
||||||
|
|
||||||
p2pDone :: CommandPerform
|
p2pDone :: CommandPerform
|
||||||
|
|
|
@ -58,7 +58,7 @@ runRemoteSide remoteside a =
|
||||||
|
|
||||||
closeRemoteSide :: RemoteSide -> Annex ()
|
closeRemoteSide :: RemoteSide -> Annex ()
|
||||||
closeRemoteSide remoteside =
|
closeRemoteSide remoteside =
|
||||||
liftIO (atomically $ tryReadTMVar $ remoteTMVar remoteside) >>= \case
|
liftIO (atomically $ tryTakeTMVar $ remoteTMVar remoteside) >>= \case
|
||||||
Just (_, _, closer) -> closer
|
Just (_, _, closer) -> closer
|
||||||
Nothing -> return ()
|
Nothing -> return ()
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue