http server support for proxies, incomplete
Refactored git-annex-shell code so this can use checkCanProxy'. At this point all that remains is opening a proxy connection, and using a proxy connection.
This commit is contained in:
parent
0bdeafc2c4
commit
3d14e2cf58
4 changed files with 190 additions and 125 deletions
|
@ -8,16 +8,20 @@
|
|||
module Annex.Proxy where
|
||||
|
||||
import Annex.Common
|
||||
import P2P.Proxy
|
||||
import P2P.Protocol
|
||||
import P2P.IO
|
||||
import qualified Annex
|
||||
import qualified Remote
|
||||
import qualified Types.Remote as Remote
|
||||
import qualified Remote.Git
|
||||
import P2P.Proxy
|
||||
import P2P.Protocol
|
||||
import P2P.IO
|
||||
import Remote.Helper.Ssh (openP2PShellConnection', closeP2PShellConnection)
|
||||
import Annex.Content
|
||||
import Annex.Concurrent
|
||||
import Annex.Tmp
|
||||
import Logs.Proxy
|
||||
import Logs.Cluster
|
||||
import Logs.UUID
|
||||
import Utility.Tmp.Dir
|
||||
import Utility.Metered
|
||||
|
||||
|
@ -25,6 +29,8 @@ import Control.Concurrent.STM
|
|||
import Control.Concurrent.Async
|
||||
import qualified Data.ByteString.Lazy as L
|
||||
import qualified System.FilePath.ByteString as P
|
||||
import qualified Data.Map as M
|
||||
import qualified Data.Set as S
|
||||
|
||||
proxyRemoteSide :: ProtocolVersion -> Bypass -> Remote -> Annex RemoteSide
|
||||
proxyRemoteSide clientmaxversion bypass r
|
||||
|
@ -208,4 +214,63 @@ proxySpecialRemote protoversion r ihdl ohdl owaitv endv = go
|
|||
Just FAILURE -> return ()
|
||||
Just _ -> giveup "protocol error P"
|
||||
Nothing -> return ()
|
||||
|
||||
|
||||
{- Check if this repository can proxy for a specified remote uuid,
|
||||
- and if so enable proxying for it. -}
|
||||
checkCanProxy :: UUID -> UUID -> Annex Bool
|
||||
checkCanProxy remoteuuid ouruuid = checkCanProxy' remoteuuid ouruuid >>= \case
|
||||
Right v -> do
|
||||
Annex.changeState $ \st -> st { Annex.proxyremote = Just v }
|
||||
return True
|
||||
Left Nothing -> return False
|
||||
Left (Just err) -> giveup err
|
||||
|
||||
checkCanProxy' :: UUID -> UUID -> Annex (Either (Maybe String) (Either ClusterUUID Remote))
|
||||
checkCanProxy' remoteuuid ouruuid = M.lookup ouruuid <$> getProxies >>= \case
|
||||
Nothing -> return (Left Nothing)
|
||||
-- This repository has (or had) proxying enabled. So it's
|
||||
-- ok to display error messages that talk about proxies.
|
||||
Just proxies ->
|
||||
case filter (\p -> proxyRemoteUUID p == remoteuuid) (S.toList proxies) of
|
||||
[] -> notconfigured
|
||||
ps -> case mkClusterUUID remoteuuid of
|
||||
Just cu -> proxyforcluster cu
|
||||
Nothing -> proxyfor ps
|
||||
where
|
||||
-- This repository may have multiple remotes that access the same
|
||||
-- repository. Proxy for the lowest cost one that is configured to
|
||||
-- be used as a proxy.
|
||||
proxyfor ps = do
|
||||
rs <- concat . Remote.byCost <$> Remote.remoteList
|
||||
myclusters <- annexClusters <$> Annex.getGitConfig
|
||||
let sameuuid r = Remote.uuid r == remoteuuid
|
||||
let samename r p = Remote.name r == proxyRemoteName p
|
||||
case headMaybe (filter (\r -> sameuuid r && proxyisconfigured rs myclusters r && any (samename r) ps) rs) of
|
||||
Nothing -> notconfigured
|
||||
Just r -> return (Right (Right r))
|
||||
|
||||
-- Only proxy for a remote when the git configuration
|
||||
-- allows it. This is important to prevent changes to
|
||||
-- the git-annex branch causing unexpected proxying for remotes.
|
||||
proxyisconfigured rs myclusters r
|
||||
| remoteAnnexProxy (Remote.gitconfig r) = True
|
||||
-- Proxy for remotes that are configured as cluster nodes.
|
||||
| any (`M.member` myclusters) (fromMaybe [] $ remoteAnnexClusterNode $ Remote.gitconfig r) = True
|
||||
-- Proxy for a remote when it is proxied by another remote
|
||||
-- which is itself configured as a cluster gateway.
|
||||
| otherwise = case remoteAnnexProxiedBy (Remote.gitconfig r) of
|
||||
Just proxyuuid -> not $ null $
|
||||
concatMap (remoteAnnexClusterGateway . Remote.gitconfig) $
|
||||
filter (\p -> Remote.uuid p == proxyuuid) rs
|
||||
Nothing -> False
|
||||
|
||||
proxyforcluster cu = do
|
||||
clusters <- getClusters
|
||||
if M.member cu (clusterUUIDs clusters)
|
||||
then return (Right (Left cu))
|
||||
else notconfigured
|
||||
|
||||
notconfigured = M.lookup remoteuuid <$> uuidDescMap >>= \case
|
||||
Just desc -> return $ Left $ Just $
|
||||
"not configured to proxy for repository " ++ fromUUIDDesc desc
|
||||
Nothing -> return $ Left Nothing
|
||||
|
|
|
@ -8,7 +8,6 @@
|
|||
module CmdLine.GitAnnexShell where
|
||||
|
||||
import Annex.Common
|
||||
import qualified Annex
|
||||
import qualified Git.Construct
|
||||
import qualified Git.Config
|
||||
import CmdLine
|
||||
|
@ -20,11 +19,7 @@ import CmdLine.GitAnnexShell.Fields
|
|||
import Remote.GCrypt (getGCryptUUID)
|
||||
import P2P.Protocol (ServerMode(..))
|
||||
import Git.Types
|
||||
import qualified Types.Remote as R
|
||||
import Logs.Proxy
|
||||
import Logs.Cluster
|
||||
import Logs.UUID
|
||||
import Remote
|
||||
import Annex.Proxy
|
||||
|
||||
import qualified Command.ConfigList
|
||||
import qualified Command.NotifyChanges
|
||||
|
@ -36,7 +31,6 @@ import qualified Command.SendKey
|
|||
import qualified Command.DropKey
|
||||
|
||||
import qualified Data.Map as M
|
||||
import qualified Data.Set as S
|
||||
|
||||
cmdsMap :: M.Map ServerMode [Command]
|
||||
cmdsMap = M.fromList $ map mk
|
||||
|
@ -90,7 +84,7 @@ commonShellOptions =
|
|||
check u
|
||||
| u == toUUID expected = noop
|
||||
| otherwise =
|
||||
unlessM (checkProxy (toUUID expected) u) $
|
||||
unlessM (checkCanProxy (toUUID expected) u) $
|
||||
unexpectedUUID expected u
|
||||
|
||||
checkGCryptUUID expected = check =<< getGCryptUUID True =<< gitRepo
|
||||
|
@ -184,61 +178,3 @@ checkField (field, val)
|
|||
| field == fieldName remoteUUID = fieldCheck remoteUUID val
|
||||
| field == fieldName autoInit = fieldCheck autoInit val
|
||||
| otherwise = False
|
||||
|
||||
{- Check if this repository can proxy for a specified remote uuid,
|
||||
- and if so enable proxying for it. -}
|
||||
checkProxy :: UUID -> UUID -> Annex Bool
|
||||
checkProxy remoteuuid ouruuid = M.lookup ouruuid <$> getProxies >>= \case
|
||||
Nothing -> return False
|
||||
-- This repository has (or had) proxying enabled. So it's
|
||||
-- ok to display error messages that talk about proxies.
|
||||
Just proxies ->
|
||||
case filter (\p -> proxyRemoteUUID p == remoteuuid) (S.toList proxies) of
|
||||
[] -> notconfigured
|
||||
ps -> case mkClusterUUID remoteuuid of
|
||||
Just cu -> proxyforcluster cu
|
||||
Nothing -> proxyfor ps
|
||||
where
|
||||
-- This repository may have multiple remotes that access the same
|
||||
-- repository. Proxy for the lowest cost one that is configured to
|
||||
-- be used as a proxy.
|
||||
proxyfor ps = do
|
||||
rs <- concat . byCost <$> remoteList
|
||||
myclusters <- annexClusters <$> Annex.getGitConfig
|
||||
let sameuuid r = uuid r == remoteuuid
|
||||
let samename r p = name r == proxyRemoteName p
|
||||
case headMaybe (filter (\r -> sameuuid r && proxyisconfigured rs myclusters r && any (samename r) ps) rs) of
|
||||
Nothing -> notconfigured
|
||||
Just r -> do
|
||||
Annex.changeState $ \st ->
|
||||
st { Annex.proxyremote = Just (Right r) }
|
||||
return True
|
||||
|
||||
-- Only proxy for a remote when the git configuration
|
||||
-- allows it. This is important to prevent changes to
|
||||
-- the git-annex branch making git-annex-shell unexpectedly
|
||||
-- proxy for remotes.
|
||||
proxyisconfigured rs myclusters r
|
||||
| remoteAnnexProxy (R.gitconfig r) = True
|
||||
-- Proxy for remotes that are configured as cluster nodes.
|
||||
| any (`M.member` myclusters) (fromMaybe [] $ remoteAnnexClusterNode $ R.gitconfig r) = True
|
||||
-- Proxy for a remote when it is proxied by another remote
|
||||
-- which is itself configured as a cluster gateway.
|
||||
| otherwise = case remoteAnnexProxiedBy (R.gitconfig r) of
|
||||
Just proxyuuid -> not $ null $
|
||||
concatMap (remoteAnnexClusterGateway . R.gitconfig) $
|
||||
filter (\p -> R.uuid p == proxyuuid) rs
|
||||
Nothing -> False
|
||||
|
||||
proxyforcluster cu = do
|
||||
clusters <- getClusters
|
||||
if M.member cu (clusterUUIDs clusters)
|
||||
then do
|
||||
Annex.changeState $ \st ->
|
||||
st { Annex.proxyremote = Just (Left cu) }
|
||||
return True
|
||||
else notconfigured
|
||||
|
||||
notconfigured = M.lookup remoteuuid <$> uuidDescMap >>= \case
|
||||
Just desc -> giveup $ "not configured to proxy for repository " ++ fromUUIDDesc desc
|
||||
Nothing -> return False
|
||||
|
|
|
@ -87,7 +87,7 @@ optParser _ = Options
|
|||
|
||||
seek :: Options -> CommandSeek
|
||||
seek o = getAnnexWorkerPool $ \workerpool ->
|
||||
withLocalP2PConnections workerpool $ \acquireconn -> liftIO $ do
|
||||
withP2PConnections workerpool $ \acquireconn -> liftIO $ do
|
||||
authenv <- getAuthEnv
|
||||
st <- mkP2PHttpServerState acquireconn workerpool $
|
||||
mkGetServerMode authenv o
|
||||
|
|
|
@ -27,6 +27,8 @@ import Annex.WorkerPool
|
|||
import CmdLine.Action (startConcurrency)
|
||||
import Utility.ThreadScheduler
|
||||
import Utility.HumanTime
|
||||
import Annex.Proxy
|
||||
import Types.Cluster
|
||||
|
||||
import Servant
|
||||
import qualified Data.Map as M
|
||||
|
@ -162,6 +164,95 @@ data ConnectionProblem
|
|||
| TooManyConnections
|
||||
deriving (Show, Eq)
|
||||
|
||||
proxyClientNetProto :: P2PConnectionPair -> P2P.Proto a -> IO (Either P2P.ProtoFailure a)
|
||||
proxyClientNetProto conn = runNetProto
|
||||
(clientRunState conn) (clientP2PConnection conn)
|
||||
|
||||
type AcquireP2PConnection
|
||||
= ConnectionParams
|
||||
-> IO (Either ConnectionProblem P2PConnectionPair)
|
||||
|
||||
withP2PConnections :: AnnexWorkerPool -> (AcquireP2PConnection -> Annex a) -> Annex a
|
||||
withP2PConnections workerpool a = do
|
||||
myuuid <- getUUID
|
||||
reqv <- liftIO newEmptyTMVarIO
|
||||
relv <- liftIO newEmptyTMVarIO
|
||||
endv <- liftIO newEmptyTMVarIO
|
||||
proxypool <- liftIO $ newTMVarIO mempty
|
||||
asyncservicer <- liftIO $ async $
|
||||
servicer myuuid proxypool reqv relv endv
|
||||
let endit = do
|
||||
liftIO $ atomically $ putTMVar endv ()
|
||||
liftIO $ wait asyncservicer
|
||||
a (acquireconn reqv) `finally` endit
|
||||
where
|
||||
acquireconn reqv connparams = do
|
||||
respvar <- newEmptyTMVarIO
|
||||
atomically $ putTMVar reqv (connparams, respvar)
|
||||
atomically $ takeTMVar respvar
|
||||
|
||||
servicer myuuid proxypool reqv relv endv = do
|
||||
reqrel <- liftIO $
|
||||
atomically $
|
||||
(Right <$> takeTMVar reqv)
|
||||
`orElse`
|
||||
(Left . Right <$> takeTMVar relv)
|
||||
`orElse`
|
||||
(Left . Left <$> takeTMVar endv)
|
||||
case reqrel of
|
||||
Right (connparams, respvar) -> do
|
||||
servicereq myuuid proxypool relv connparams
|
||||
>>= atomically . putTMVar respvar
|
||||
servicer myuuid proxypool reqv relv endv
|
||||
Left (Right releaseconn) -> do
|
||||
releaseconn
|
||||
servicer myuuid proxypool reqv relv endv
|
||||
Left (Left ()) -> return ()
|
||||
|
||||
servicereq myuuid proxypool relv connparams
|
||||
| connectionServerUUID connparams == myuuid =
|
||||
localConnection relv connparams workerpool
|
||||
| otherwise =
|
||||
atomically (getProxyConnection proxypool connparams) >>= \case
|
||||
Just conn -> proxyConnection relv connparams proxypool conn
|
||||
Nothing -> checkcanproxy myuuid proxypool relv connparams
|
||||
|
||||
checkcanproxy myuuid proxypool relv connparams =
|
||||
inAnnexWorker' workerpool
|
||||
(checkCanProxy' (connectionServerUUID connparams) myuuid)
|
||||
>>= \case
|
||||
Right (Left reason) -> return $ Left $
|
||||
ConnectionFailed $
|
||||
fromMaybe "unknown uuid" reason
|
||||
Right (Right (Right proxyremote)) -> do
|
||||
openProxyConnectionToRemote proxyremote
|
||||
>>= proxyConnection relv connparams proxypool
|
||||
Right (Right (Left cluster)) -> do
|
||||
openProxyConnectionToCluster cluster
|
||||
>>= proxyConnection relv connparams proxypool
|
||||
Left ex -> return $ Left $
|
||||
ConnectionFailed $ show ex
|
||||
localConnection
|
||||
:: TMVar (IO ())
|
||||
-> ConnectionParams
|
||||
-> AnnexWorkerPool
|
||||
-> IO (Either ConnectionProblem P2PConnectionPair)
|
||||
localConnection relv connparams workerpool = mkP2PConnectionPair connparams relv $
|
||||
\serverrunst serverconn -> inAnnexWorker' workerpool $
|
||||
void $ runFullProto serverrunst serverconn $
|
||||
P2P.serveOneCommandAuthed
|
||||
(connectionServerMode connparams)
|
||||
(connectionServerUUID connparams)
|
||||
|
||||
proxyConnection
|
||||
:: TMVar (IO ())
|
||||
-> ConnectionParams
|
||||
-> TMVar (M.Map UUID [ProxyConnection])
|
||||
-> ProxyConnection
|
||||
-> IO (Either ConnectionProblem P2PConnectionPair)
|
||||
proxyConnection relv connparams proxypool conn = error "XXX" -- TODO
|
||||
|
||||
|
||||
data P2PConnectionPair = P2PConnectionPair
|
||||
{ clientRunState :: RunState
|
||||
, clientP2PConnection :: P2PConnection
|
||||
|
@ -174,60 +265,6 @@ data P2PConnectionPair = P2PConnectionPair
|
|||
-- longer usable.
|
||||
}
|
||||
|
||||
proxyClientNetProto :: P2PConnectionPair -> P2P.Proto a -> IO (Either P2P.ProtoFailure a)
|
||||
proxyClientNetProto conn = runNetProto
|
||||
(clientRunState conn) (clientP2PConnection conn)
|
||||
|
||||
type AcquireP2PConnection
|
||||
= ConnectionParams
|
||||
-> IO (Either ConnectionProblem P2PConnectionPair)
|
||||
|
||||
{- Acquire P2P connections to the local repository. -}
|
||||
withLocalP2PConnections :: AnnexWorkerPool -> (AcquireP2PConnection -> Annex a) -> Annex a
|
||||
withLocalP2PConnections workerpool a = do
|
||||
myuuid <- getUUID
|
||||
reqv <- liftIO newEmptyTMVarIO
|
||||
relv <- liftIO newEmptyTMVarIO
|
||||
endv <- liftIO newEmptyTMVarIO
|
||||
asyncservicer <- liftIO $ async $ servicer myuuid reqv relv endv
|
||||
let endit = do
|
||||
liftIO $ atomically $ putTMVar endv ()
|
||||
liftIO $ wait asyncservicer
|
||||
a (acquireconn reqv) `finally` endit
|
||||
where
|
||||
acquireconn reqv connparams = do
|
||||
respvar <- newEmptyTMVarIO
|
||||
atomically $ putTMVar reqv (connparams, respvar)
|
||||
atomically $ takeTMVar respvar
|
||||
|
||||
servicer myuuid reqv relv endv = do
|
||||
reqrel <- liftIO $
|
||||
atomically $
|
||||
(Right <$> takeTMVar reqv)
|
||||
`orElse`
|
||||
(Left . Right <$> takeTMVar relv)
|
||||
`orElse`
|
||||
(Left . Left <$> takeTMVar endv)
|
||||
case reqrel of
|
||||
Right (connparams, respvar) -> do
|
||||
servicereq myuuid relv connparams
|
||||
>>= atomically . putTMVar respvar
|
||||
servicer myuuid reqv relv endv
|
||||
Left (Right releaseconn) -> do
|
||||
releaseconn
|
||||
servicer myuuid reqv relv endv
|
||||
Left (Left ()) -> return ()
|
||||
|
||||
servicereq myuuid relv connparams
|
||||
| connectionServerUUID connparams /= myuuid =
|
||||
return $ Left $ ConnectionFailed "unknown uuid"
|
||||
| otherwise = mkP2PConnectionPair connparams relv $
|
||||
\serverrunst serverconn -> inAnnexWorker' workerpool $
|
||||
void $ runFullProto serverrunst serverconn $
|
||||
P2P.serveOneCommandAuthed
|
||||
(connectionServerMode connparams)
|
||||
(connectionServerUUID connparams)
|
||||
|
||||
mkP2PConnectionPair
|
||||
:: ConnectionParams
|
||||
-> TMVar (IO ())
|
||||
|
@ -388,3 +425,30 @@ inAnnexWorker' poolv annexaction = do
|
|||
let !pool' = deactivateWorker pool aid workerstrd'
|
||||
putTMVar poolv pool'
|
||||
return res
|
||||
|
||||
data ProxyConnection = ProxyConnection
|
||||
|
||||
getProxyConnection
|
||||
:: TMVar (M.Map UUID [ProxyConnection])
|
||||
-> ConnectionParams
|
||||
-> STM (Maybe ProxyConnection)
|
||||
getProxyConnection proxypool connparams = do
|
||||
m <- takeTMVar proxypool
|
||||
case M.lookup (connectionServerUUID connparams) m of
|
||||
Nothing -> do
|
||||
putTMVar proxypool m
|
||||
return Nothing
|
||||
Just [] -> do
|
||||
putTMVar proxypool $
|
||||
M.insert (connectionServerUUID connparams) [] m
|
||||
return Nothing
|
||||
Just (c:cs) -> do
|
||||
putTMVar proxypool $
|
||||
M.insert (connectionServerUUID connparams) cs m
|
||||
return (Just c)
|
||||
|
||||
openProxyConnectionToRemote :: Remote -> IO ProxyConnection
|
||||
openProxyConnectionToRemote remote = error "XXX" -- TODO
|
||||
|
||||
openProxyConnectionToCluster :: ClusterUUID -> IO ProxyConnection
|
||||
openProxyConnectionToCluster cu = error "XXX" -- TODO
|
||||
|
|
Loading…
Reference in a new issue