working PUT fanout to multiple remotes for clusters

Still need to check for fencepost errors on resume when
different nodes have different amounts of data.
This commit is contained in:
Joey Hess 2024-06-20 10:04:26 -04:00
parent 54307af8c0
commit ecab2e03b9
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38

View file

@ -62,7 +62,9 @@ data ProxySelector = ProxySelector
, proxyUNLOCKCONTENT :: Annex (Maybe RemoteSide) , proxyUNLOCKCONTENT :: Annex (Maybe RemoteSide)
, proxyREMOVE :: Key -> Annex RemoteSide , proxyREMOVE :: Key -> Annex RemoteSide
, proxyGET :: Key -> Annex (Maybe RemoteSide) , proxyGET :: Key -> Annex (Maybe RemoteSide)
-- ^ can get from any of these remotes
, proxyPUT :: Key -> Annex [RemoteSide] , proxyPUT :: Key -> Annex [RemoteSide]
-- ^ can put to some/all of these remotes
} }
singleProxySelector :: RemoteSide -> ProxySelector singleProxySelector :: RemoteSide -> ProxySelector
@ -139,6 +141,9 @@ proxy
-> UUID -> UUID
-> ProxySelector -> ProxySelector
-> ProtocolVersion -> ProtocolVersion
-- ^ Protocol version being spoken between the proxy and the
-- client. When there are multiple remotes, some may speak an
-- earlier version.
-> Maybe Message -> Maybe Message
-- ^ non-VERSION message that was received from the client when -- ^ non-VERSION message that was received from the client when
-- negotiating protocol version, and has not been responded to yet -- negotiating protocol version, and has not been responded to yet
@ -304,29 +309,19 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
relayonemessage (runRemoteSide remoteside) client finished relayonemessage (runRemoteSide remoteside) client finished
where where
finished resp () = do finished resp () = do
case resp of void $ relayPUTRecord k remoteside resp
SUCCESS -> addedContent proxymethods (remoteUUID remoteside) k
SUCCESS_PLUS us ->
forM_ (remoteUUID remoteside:us) $ \u ->
addedContent proxymethods u k
_ -> return ()
proxynextclientmessage () proxynextclientmessage ()
relayDATAStart x receive message = relayPUTRecord k remoteside SUCCESS = do
protoerrhandler (\() -> receive) $ addedContent proxymethods (remoteUUID remoteside) k
x $ net $ sendMessage message return $ Just [remoteUUID remoteside]
relayPUTRecord k remoteside (SUCCESS_PLUS us) = do
relayDATACore len x y a = protoerrhandler send $ let us' = remoteUUID remoteside : us
x $ net $ receiveBytes len nullMeterUpdate forM_ us' $ \u ->
where addedContent proxymethods u k
send b = protoerrhandler a $ return $ Just us'
y $ net $ sendBytes len b nullMeterUpdate relayPUTRecord _ _ _ =
return Nothing
relayDATAFinish x y sendsuccessfailure ()
| protocolversion == 0 = sendsuccessfailure
-- Protocol version 1 has a VALID or
-- INVALID message after the data.
| otherwise = relayonemessage x y (\_ () -> sendsuccessfailure)
handlePutMulti remotesides k message = do handlePutMulti remotesides k message = do
let initiate remoteside = do let initiate remoteside = do
@ -385,27 +380,77 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
let !n' = n + chunklen let !n' = n + chunklen
rs' <- forM rs $ \r@(remoteside, remoteoffset) -> rs' <- forM rs $ \r@(remoteside, remoteoffset) ->
if n >= remoteoffset if n >= remoteoffset
then skipfailed r $ runRemoteSide remoteside $ then runRemoteSideOrSkipFailed remoteside $ do
net $ sendBytes (Len chunklen) chunk nullMeterUpdate net $ sendBytes (Len chunklen) chunk nullMeterUpdate
return r
else if (n' <= remoteoffset) else if (n' <= remoteoffset)
then do then do
let chunkoffset = remoteoffset - n let chunkoffset = remoteoffset - n
let subchunklen = chunklen - chunkoffset let subchunklen = chunklen - chunkoffset
let subchunk = L.drop (fromIntegral chunkoffset) chunk let subchunk = L.drop (fromIntegral chunkoffset) chunk
skipfailed r $ runRemoteSide remoteside $ runRemoteSideOrSkipFailed remoteside $ do
net $ sendBytes (Len subchunklen) subchunk nullMeterUpdate net $ sendBytes (Len subchunklen) subchunk nullMeterUpdate
return r
else return (Just r) else return (Just r)
if L.null b' if L.null b'
then sent (catMaybes rs') then sent (catMaybes rs')
else send (catMaybes rs') n' b' else send (catMaybes rs') n' b'
sent [] = proxydone sent [] = proxydone
sent rs = giveup "XXX" -- XXX sent rs = relayDATAFinishMulti k (map fst rs)
skipfailed r@(remoteside, _) a = a >>= \case runRemoteSideOrSkipFailed remoteside a =
Right _ -> return (Just r) runRemoteSide remoteside a >>= \case
Right v -> return (Just v)
Left _ -> do Left _ -> do
-- This connection to the remote is -- This connection to the remote is
-- unrecoverable at this point, so close it. -- unrecoverable at this point, so close it.
closeRemoteSide remoteside closeRemoteSide remoteside
return Nothing return Nothing
relayDATAStart x receive message =
protoerrhandler (\() -> receive) $
x $ net $ sendMessage message
relayDATACore len x y a = protoerrhandler send $
x $ net $ receiveBytes len nullMeterUpdate
where
send b = protoerrhandler a $
y $ net $ sendBytes len b nullMeterUpdate
relayDATAFinish x y sendsuccessfailure ()
| protocolversion == 0 = sendsuccessfailure
-- Protocol version 1 has a VALID or
-- INVALID message after the data.
| otherwise = relayonemessage x y (\_ () -> sendsuccessfailure)
relayDATAFinishMulti k rs
| protocolversion == 0 =
finish $ net receiveMessage
| otherwise =
flip protoerrhandler (client $ net $ receiveMessage) $
withresp $ \message ->
finish $ do
-- Relay VALID or INVALID message
-- only to remotes that support
-- protocol version 1.
net getProtocolVersion >>= \case
ProtocolVersion 0 -> return ()
_ -> net $ sendMessage message
net receiveMessage
where
finish a = do
storeduuids <- forM rs $ \r ->
runRemoteSideOrSkipFailed r a >>= \case
Just (Just resp) ->
relayPUTRecord k r resp
_ -> return Nothing
protoerrhandler proxynextclientmessage $
client $ net $ sendMessage $
case concat (catMaybes storeduuids) of
[] -> FAILURE
(_u:[]) -> SUCCESS
us
| protocolversion < 2 -> SUCCESS
| otherwise -> SUCCESS_PLUS us