LiveUpdate for clusters

This commit is contained in:
Joey Hess 2024-08-24 10:12:05 -04:00
parent 18cd8bf43a
commit 84d1bb746b
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
4 changed files with 31 additions and 18 deletions

View file

@ -5,7 +5,7 @@
- Licensed under the GNU AGPL version 3 or higher.
-}
{-# LANGUAGE RankNTypes, OverloadedStrings #-}
{-# LANGUAGE RankNTypes, OverloadedStrings, TupleSections #-}
module Annex.Cluster where
@ -19,6 +19,7 @@ import P2P.IO
import Annex.Proxy
import Annex.UUID
import Annex.BranchState
import Annex.RepoSize.LiveUpdate
import Logs.Location
import Logs.PreferredContent
import Types.Command
@ -108,10 +109,15 @@ clusterProxySelector clusteruuid protocolversion (Bypass bypass) = do
, proxyPUT = \af k -> do
locs <- S.fromList <$> loggedLocations k
let l = filter (flip S.notMember locs . Remote.uuid . remote) nodes
--- XXX FIXME TODO NoLiveUpdate should not be used
-- here. Doing a live update here is exactly why
-- live update is needed.
l' <- filterM (\n -> isPreferredContent NoLiveUpdate (Just (Remote.uuid (remote n))) mempty (Just k) af True) l
let checkpreferred n = do
let u = Just (Remote.uuid (remote n))
lu <- prepareLiveUpdate u k AddingKey
ifM (isPreferredContent lu u mempty (Just k) af True)
( return $ Just $ n
{ remoteLiveUpdate = lu }
, return Nothing
)
l' <- catMaybes <$> mapM checkpreferred l
-- PUT to no nodes doesn't work, so fall
-- back to all nodes.
return $ nonempty [l', l] nodes

View file

@ -365,6 +365,6 @@ canProxyForRemote rs myproxies myclusters remoteuuid =
mkProxyMethods :: ProxyMethods
mkProxyMethods = ProxyMethods
{ removedContent = \u k -> logChange NoLiveUpdate k u InfoMissing
, addedContent = \u k -> logChange NoLiveUpdate k u InfoPresent
{ removedContent = \lu u k -> logChange lu k u InfoMissing
, addedContent = \lu u k -> logChange lu k u InfoPresent
}

View file

@ -43,6 +43,7 @@ data RemoteSide = RemoteSide
, remoteConnect :: Annex (Maybe (RunState, P2PConnection, ProtoCloser))
, remoteTMVar :: TMVar (RunState, P2PConnection, ProtoCloser)
, remoteSideId :: RemoteSideId
, remoteLiveUpdate :: LiveUpdate
}
instance Show RemoteSide where
@ -54,6 +55,7 @@ mkRemoteSide r remoteconnect = RemoteSide
<*> pure remoteconnect
<*> liftIO (atomically newEmptyTMVar)
<*> liftIO (RemoteSideId <$> newUnique)
<*> pure NoLiveUpdate
runRemoteSide :: RemoteSide -> Proto a -> Annex (Either ProtoFailure a)
runRemoteSide remoteside a =
@ -103,9 +105,9 @@ singleProxySelector r = ProxySelector
- all other actions that a proxy needs to do are provided
- here. -}
data ProxyMethods = ProxyMethods
{ removedContent :: UUID -> Key -> Annex ()
{ removedContent :: LiveUpdate -> UUID -> Key -> Annex ()
-- ^ called when content is removed from a repository
, addedContent :: UUID -> Key -> Annex ()
, addedContent :: LiveUpdate -> UUID -> Key -> Annex ()
-- ^ called when content is added to a repository
}
@ -443,7 +445,7 @@ proxyRequest proxydone proxyparams requestcomplete requestmessage protoerrhandle
_ -> Nothing
let v' = map join v
let us = concatMap snd $ catMaybes v'
mapM_ (\u -> removedContent (proxyMethods proxyparams) u k) us
mapM_ (\u -> removedContent (proxyMethods proxyparams) NoLiveUpdate u k) us
protoerrhandler requestcomplete $
client $ net $ sendMessage $
let nonplussed = all (== proxyUUID proxyparams) us
@ -511,13 +513,19 @@ proxyRequest proxydone proxyparams requestcomplete requestmessage protoerrhandle
requestcomplete ()
relayPUTRecord k remoteside SUCCESS = do
addedContent (proxyMethods proxyparams) (Remote.uuid (remote remoteside)) k
addedContent (proxyMethods proxyparams)
(remoteLiveUpdate remoteside)
(Remote.uuid (remote remoteside))
k
return $ Just [Remote.uuid (remote remoteside)]
relayPUTRecord k remoteside (SUCCESS_PLUS us) = do
let us' = (Remote.uuid (remote remoteside)) : us
forM_ us' $ \u ->
addedContent (proxyMethods proxyparams) u k
return $ Just us'
addedContent (proxyMethods proxyparams)
(remoteLiveUpdate remoteside)
(Remote.uuid (remote remoteside))
k
forM_ us $ \u ->
addedContent (proxyMethods proxyparams) NoLiveUpdate u k
return $ Just (Remote.uuid (remote remoteside) : us)
relayPUTRecord _ _ _ =
return Nothing

View file

@ -142,11 +142,10 @@ Planned schedule of work:
also be done by just repeatedly touching a file named with the processes's
pid in it, to avoid sqlite overhead.
* Check for TODO XXX markers
* Still implementing LiveUpdate. Check for TODO XXX markers
* Check all uses of NoLiveUpdate to see if a live update can be started and
performed there. There is one in Annex.Cluster in particular that needs a
live update.
performed there.
* The assistant is using NoLiveUpdate, but it should be posssible to plumb
a LiveUpdate through it from preferred content checking to location log