REMOVE-BEFORE and GETTIMESTAMP proxying
For clusters, the timestamps have to be translated, since each node can have its own idea about what time it is. To translate a timestamp, the proxy remembers what time it asked the node for a timestamp in GETTIMESTAMP, and applies the delta as an offset in REMOVE-BEFORE. This does mean that a remove from a cluster has to call GETTIMESTAMP on every node before dropping from nodes. Not very efficient. Although currently it tries to drop from every single node anyway, which is also not very efficient. I thought about caching the GETTIMESTAMP from the nodes on the first call. That would improve efficiency. But, since monotonic clocks on !Linux don't advance when the computer is suspended, consider what might happen if one node was suspended for a while, then came back. Its monotonic timestamp would end up behind where the proxying expects it to be. Would that result in removing when it shouldn't, or refusing to remove when it should? Have not thought it through. Either way, a cluster behaving strangly for an extended period of time because one of its nodes was briefly asleep doesn't seem like good behavior.
This commit is contained in:
parent
99b7a0cfe9
commit
f452bd448a
4 changed files with 97 additions and 5 deletions
|
@ -67,7 +67,8 @@ proxyCluster clusteruuid proxydone servermode clientside protoerrhandler = do
|
|||
(selectnode, closenodes) <- clusterProxySelector clusteruuid
|
||||
protocolversion bypassuuids
|
||||
concurrencyconfig <- getConcurrencyConfig
|
||||
proxy proxydone proxymethods servermode clientside
|
||||
proxystate <- liftIO mkProxyState
|
||||
proxy proxydone proxymethods proxystate servermode clientside
|
||||
(fromClusterUUID clusteruuid)
|
||||
selectnode concurrencyconfig protocolversion
|
||||
othermsg (protoerrhandler closenodes)
|
||||
|
@ -107,6 +108,7 @@ clusterProxySelector clusteruuid protocolversion (Bypass bypass) = do
|
|||
-- could be out of date, actually try to remove from every
|
||||
-- node.
|
||||
, proxyREMOVE = const (pure nodes)
|
||||
, proxyGETTIMESTAMP = pure nodes
|
||||
-- Content is not locked on the cluster as a whole,
|
||||
-- instead it can be locked on individual nodes that are
|
||||
-- proxied to the client.
|
||||
|
|
|
@ -76,7 +76,9 @@ performProxy clientuuid servermode r = do
|
|||
closeRemoteSide remoteside
|
||||
p2pDone
|
||||
let errhandler = p2pErrHandler (closeRemoteSide remoteside)
|
||||
let runproxy othermsg' = proxy closer proxymethods
|
||||
proxystate <- liftIO mkProxyState
|
||||
let runproxy othermsg' = proxy closer
|
||||
proxymethods proxystate
|
||||
servermode clientside
|
||||
(Remote.uuid r)
|
||||
(singleProxySelector remoteside)
|
||||
|
|
90
P2P/Proxy.hs
90
P2P/Proxy.hs
|
@ -15,6 +15,7 @@ import qualified Annex
|
|||
import P2P.Protocol
|
||||
import P2P.IO
|
||||
import Utility.Metered
|
||||
import Utility.MonotonicClock
|
||||
import Git.FilePath
|
||||
import Types.Concurrency
|
||||
import Annex.Concurrent
|
||||
|
@ -26,16 +27,22 @@ import Control.Concurrent.Async
|
|||
import qualified Control.Concurrent.MSem as MSem
|
||||
import qualified Data.ByteString.Lazy as L
|
||||
import qualified Data.Set as S
|
||||
import qualified Data.Map as M
|
||||
import Data.Unique
|
||||
import GHC.Conc
|
||||
|
||||
type ProtoCloser = Annex ()
|
||||
|
||||
data ClientSide = ClientSide RunState P2PConnection
|
||||
|
||||
newtype RemoteSideId = RemoteSideId Unique
|
||||
deriving (Eq, Ord)
|
||||
|
||||
data RemoteSide = RemoteSide
|
||||
{ remote :: Remote
|
||||
, remoteConnect :: Annex (Maybe (RunState, P2PConnection, ProtoCloser))
|
||||
, remoteTMVar :: TMVar (RunState, P2PConnection, ProtoCloser)
|
||||
, remoteSideId :: RemoteSideId
|
||||
}
|
||||
|
||||
mkRemoteSide :: Remote -> Annex (Maybe (RunState, P2PConnection, ProtoCloser)) -> Annex RemoteSide
|
||||
|
@ -43,6 +50,7 @@ mkRemoteSide r remoteconnect = RemoteSide
|
|||
<$> pure r
|
||||
<*> pure remoteconnect
|
||||
<*> liftIO (atomically newEmptyTMVar)
|
||||
<*> liftIO (RemoteSideId <$> newUnique)
|
||||
|
||||
runRemoteSide :: RemoteSide -> Proto a -> Annex (Either ProtoFailure a)
|
||||
runRemoteSide remoteside a =
|
||||
|
@ -71,6 +79,9 @@ data ProxySelector = ProxySelector
|
|||
, proxyUNLOCKCONTENT :: Annex (Maybe RemoteSide)
|
||||
, proxyREMOVE :: Key -> Annex [RemoteSide]
|
||||
-- ^ remove from all of these remotes
|
||||
, proxyGETTIMESTAMP :: Annex [RemoteSide]
|
||||
-- ^ should send every remote that proxyREMOVE can
|
||||
-- ever return for any key
|
||||
, proxyGET :: Key -> Annex (Maybe RemoteSide)
|
||||
, proxyPUT :: AssociatedFile -> Key -> Annex [RemoteSide]
|
||||
-- ^ put to some/all of these remotes
|
||||
|
@ -82,6 +93,7 @@ singleProxySelector r = ProxySelector
|
|||
, proxyLOCKCONTENT = const (pure (Just r))
|
||||
, proxyUNLOCKCONTENT = pure (Just r)
|
||||
, proxyREMOVE = const (pure [r])
|
||||
, proxyGETTIMESTAMP = pure [r]
|
||||
, proxyGET = const (pure (Just r))
|
||||
, proxyPUT = const (const (pure [r]))
|
||||
}
|
||||
|
@ -178,12 +190,23 @@ getClientBypass _ _ (Just othermsg) cont _ =
|
|||
-- Pass along non-BYPASS message from version 0 client.
|
||||
cont (Bypass S.empty, (Just othermsg))
|
||||
|
||||
data ProxyState = ProxyState
|
||||
{ proxyRemoteLatestTimestamps :: TVar (M.Map RemoteSideId MonotonicTimestamp)
|
||||
, proxyRemoteLatestLocalTimestamp :: TVar (Maybe MonotonicTimestamp)
|
||||
}
|
||||
|
||||
mkProxyState :: IO ProxyState
|
||||
mkProxyState = ProxyState
|
||||
<$> newTVarIO mempty
|
||||
<*> newTVarIO Nothing
|
||||
|
||||
{- Proxy between the client and the remote. This picks up after
|
||||
- sendClientProtocolVersion.
|
||||
-}
|
||||
proxy
|
||||
:: Annex r
|
||||
-> ProxyMethods
|
||||
-> ProxyState
|
||||
-> ServerMode
|
||||
-> ClientSide
|
||||
-> UUID
|
||||
|
@ -197,7 +220,7 @@ proxy
|
|||
-- ^ non-VERSION message that was received from the client when
|
||||
-- negotiating protocol version, and has not been responded to yet
|
||||
-> ProtoErrorHandled r
|
||||
proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remoteuuid proxyselector concurrencyconfig (ProtocolVersion protocolversion) othermsg protoerrhandler = do
|
||||
proxy proxydone proxymethods proxystate servermode (ClientSide clientrunst clientconn) remoteuuid proxyselector concurrencyconfig (ProtocolVersion protocolversion) othermsg protoerrhandler = do
|
||||
case othermsg of
|
||||
Nothing -> proxynextclientmessage ()
|
||||
Just message -> proxyclientmessage (Just message)
|
||||
|
@ -238,6 +261,13 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
|
|||
remotesides <- proxyREMOVE proxyselector k
|
||||
servermodechecker checkREMOVEServerMode $
|
||||
handleREMOVE remotesides k message
|
||||
REMOVE_BEFORE _ k -> do
|
||||
remotesides <- proxyREMOVE proxyselector k
|
||||
servermodechecker checkREMOVEServerMode $
|
||||
handleREMOVE remotesides k message
|
||||
GETTIMESTAMP -> do
|
||||
remotesides <- proxyGETTIMESTAMP proxyselector
|
||||
handleGETTIMESTAMP remotesides
|
||||
GET _ _ k -> proxyGET proxyselector k >>= \case
|
||||
Just remoteside -> handleGET remoteside message
|
||||
Nothing ->
|
||||
|
@ -278,6 +308,7 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
|
|||
PUT_FROM _ -> protoerr
|
||||
ALREADY_HAVE -> protoerr
|
||||
ALREADY_HAVE_PLUS _ -> protoerr
|
||||
TIMESTAMP _ -> protoerr
|
||||
-- Early messages that the client should not send now.
|
||||
AUTH _ _ -> protoerr
|
||||
VERSION _ -> protoerr
|
||||
|
@ -318,15 +349,70 @@ proxy proxydone proxymethods servermode (ClientSide clientrunst clientconn) remo
|
|||
_ <- client $ net $ sendMessage (ERROR "protocol error X")
|
||||
giveup "protocol error M"
|
||||
|
||||
-- When there is a single remote, reply with its timestamp,
|
||||
-- to avoid needing timestamp translation.
|
||||
handleGETTIMESTAMP (remoteside:[]) = do
|
||||
liftIO $ hPutStrLn stderr "!!!! single remote side"
|
||||
liftIO $ atomically $ do
|
||||
writeTVar (proxyRemoteLatestTimestamps proxystate)
|
||||
mempty
|
||||
writeTVar (proxyRemoteLatestLocalTimestamp proxystate)
|
||||
Nothing
|
||||
proxyresponse remoteside GETTIMESTAMP
|
||||
(const proxynextclientmessage)
|
||||
-- When there are multiple remotes, reply with our local timestamp,
|
||||
-- and do timestamp translation when sending REMOVE-FROM.
|
||||
handleGETTIMESTAMP remotesides = do
|
||||
-- Order of getting timestamps matters.
|
||||
-- Getting the local time after the time of the remotes
|
||||
-- means that if there is some delay in getting the time
|
||||
-- from a remote, that is reflected in the local time,
|
||||
-- and so reduces the allowed time.
|
||||
remotetimes <- (M.fromList . mapMaybe join) <$> getremotetimes
|
||||
localtime <- liftIO currentMonotonicTimestamp
|
||||
liftIO $ atomically $ do
|
||||
writeTVar (proxyRemoteLatestTimestamps proxystate)
|
||||
remotetimes
|
||||
writeTVar (proxyRemoteLatestLocalTimestamp proxystate)
|
||||
(Just localtime)
|
||||
protoerrhandler proxynextclientmessage $
|
||||
client $ net $ sendMessage (TIMESTAMP localtime)
|
||||
where
|
||||
getremotetimes = forMC concurrencyconfig remotesides $ \r ->
|
||||
runRemoteSideOrSkipFailed r $ do
|
||||
net $ sendMessage GETTIMESTAMP
|
||||
net receiveMessage >>= return . \case
|
||||
Just (TIMESTAMP ts) ->
|
||||
Just (remoteSideId r, ts)
|
||||
_ -> Nothing
|
||||
|
||||
proxyTimestamp ts _ _ Nothing = ts -- not proxying timestamps
|
||||
proxyTimestamp ts r tsm (Just correspondinglocaltime) =
|
||||
case M.lookup (remoteSideId r) tsm of
|
||||
Just oldts -> oldts + (ts - correspondinglocaltime)
|
||||
Nothing -> ts -- not reached
|
||||
|
||||
handleREMOVE [] _ _ =
|
||||
-- When no places are provided to remove from,
|
||||
-- don't report a successful remote.
|
||||
protoerrhandler proxynextclientmessage $
|
||||
client $ net $ sendMessage FAILURE
|
||||
handleREMOVE remotesides k message = do
|
||||
tsm <- liftIO $ readTVarIO $
|
||||
proxyRemoteLatestTimestamps proxystate
|
||||
oldlocaltime <- liftIO $ readTVarIO $
|
||||
proxyRemoteLatestLocalTimestamp proxystate
|
||||
v <- forMC concurrencyconfig remotesides $ \r ->
|
||||
runRemoteSideOrSkipFailed r $ do
|
||||
net $ sendMessage message
|
||||
case message of
|
||||
REMOVE_BEFORE ts _ -> do
|
||||
v <- net getProtocolVersion
|
||||
if v < ProtocolVersion 3
|
||||
then net $ sendMessage $
|
||||
REMOVE k
|
||||
else net $ sendMessage $
|
||||
REMOVE_BEFORE (proxyTimestamp ts r tsm oldlocaltime) k
|
||||
_ -> net $ sendMessage message
|
||||
net receiveMessage >>= return . \case
|
||||
Just SUCCESS ->
|
||||
Just ((True, Nothing), [Remote.uuid (remote r)])
|
||||
|
|
|
@ -73,7 +73,7 @@ Then REMOVE Key Timestamp can have the timestamp adjusted when it's sent
|
|||
out to each client, by calling GETTIMESTAMP again and applying the offsets
|
||||
between the cluster's clock and each node's clock.
|
||||
|
||||
TODO
|
||||
> done
|
||||
|
||||
## future flag day
|
||||
|
||||
|
@ -94,3 +94,5 @@ those remotes. This problem is not likely enough to occur to seem worth
|
|||
that disruption.
|
||||
|
||||
A flag day might be worth doing in a couple of years though. --[[Joey]]
|
||||
|
||||
> [[done]] --[[Joey]]
|
||||
|
|
Loading…
Reference in a new issue