improve handling of cluster nodes disconnecting

This commit is contained in:
Joey Hess 2024-06-25 14:10:06 -04:00
parent 5ede109ae5
commit 818030e4d3
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38

View file

@ -355,18 +355,18 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
handlePutMulti remotesides k message = do handlePutMulti remotesides k message = do
let initiate remoteside = do let initiate remoteside = do
resp <- runRemoteSide remoteside $ net $ do resp <- runRemoteSideOrSkipFailed remoteside $ net $ do
sendMessage message sendMessage message
receiveMessage receiveMessage
case resp of case resp of
Right (Just (PUT_FROM (Offset offset))) -> Just (Just (PUT_FROM (Offset offset))) ->
return $ Right $ return $ Right $
Right (remoteside, offset) Right (remoteside, offset)
Right (Just ALREADY_HAVE) -> Just (Just ALREADY_HAVE) ->
return $ Right $ Left remoteside return $ Right $ Left remoteside
Right (Just _) -> protoerr Just (Just _) -> protoerr
Right Nothing -> return (Left ()) Just Nothing -> return (Left ())
Left _err -> return (Left ()) Nothing -> return (Left ())
let alreadyhave = \case let alreadyhave = \case
Right (Left _) -> True Right (Left _) -> True
_ -> False _ -> False
@ -392,11 +392,12 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
let totallen = datalen + minoffset let totallen = datalen + minoffset
-- Tell each remote how much data to expect, depending -- Tell each remote how much data to expect, depending
-- on the remote's offset. -- on the remote's offset.
forM_ remotes $ \(remoteside, remoteoffset) -> rs <- forM remotes $ \remote@(remoteside, remoteoffset) ->
runRemoteSide remoteside $ runRemoteSideOrSkipFailed remoteside $ do
net $ sendMessage $ DATA $ Len $ net $ sendMessage $ DATA $ Len $
totallen - remoteoffset totallen - remoteoffset
protoerrhandler (send remotes minoffset) $ return remote
protoerrhandler (send (catMaybes rs) minoffset) $
client $ net $ receiveBytes (Len datalen) nullMeterUpdate client $ net $ receiveBytes (Len datalen) nullMeterUpdate
where where
chunksize = fromIntegral defaultChunkSize chunksize = fromIntegral defaultChunkSize