p2phttp --directory implementation
Untested, but it compiles, so. Known problems: * --jobs is not available to startIO * Does not notice when new repositories are added to a directory. * Does not notice when repositories are removed from a directory.
This commit is contained in:
parent
6bdf4a85fb
commit
9f84dd82da
3 changed files with 90 additions and 28 deletions
|
@ -16,6 +16,10 @@ import P2P.Http.Server
|
||||||
import P2P.Http.Url
|
import P2P.Http.Url
|
||||||
import qualified P2P.Protocol as P2P
|
import qualified P2P.Protocol as P2P
|
||||||
import Utility.Env
|
import Utility.Env
|
||||||
|
import Annex.UUID
|
||||||
|
import qualified Git
|
||||||
|
import qualified Git.Construct
|
||||||
|
import qualified Annex
|
||||||
|
|
||||||
import Servant
|
import Servant
|
||||||
import qualified Network.Wai.Handler.Warp as Warp
|
import qualified Network.Wai.Handler.Warp as Warp
|
||||||
|
@ -25,10 +29,12 @@ import qualified Data.Map as M
|
||||||
import Data.String
|
import Data.String
|
||||||
|
|
||||||
cmd :: Command
|
cmd :: Command
|
||||||
cmd = noMessages $ withAnnexOptions [jobsOption] $
|
cmd = withAnnexOptions [jobsOption] $
|
||||||
command "p2phttp" SectionPlumbing
|
noMessages $ dontCheck repoExists $
|
||||||
"communicate in P2P protocol over http"
|
noRepo (startIO <$$> optParser) $
|
||||||
paramNothing (seek <$$> optParser)
|
command "p2phttp" SectionPlumbing
|
||||||
|
"communicate in P2P protocol over http"
|
||||||
|
paramNothing (startAnnex <$$> optParser)
|
||||||
|
|
||||||
data Options = Options
|
data Options = Options
|
||||||
{ portOption :: Maybe PortNumber
|
{ portOption :: Maybe PortNumber
|
||||||
|
@ -44,6 +50,7 @@ data Options = Options
|
||||||
, wideOpenOption :: Bool
|
, wideOpenOption :: Bool
|
||||||
, proxyConnectionsOption :: Maybe Integer
|
, proxyConnectionsOption :: Maybe Integer
|
||||||
, clusterJobsOption :: Maybe Int
|
, clusterJobsOption :: Maybe Int
|
||||||
|
, directoryOption :: [FilePath]
|
||||||
}
|
}
|
||||||
|
|
||||||
optParser :: CmdParamsDesc -> Parser Options
|
optParser :: CmdParamsDesc -> Parser Options
|
||||||
|
@ -100,22 +107,41 @@ optParser _ = Options
|
||||||
( long "clusterjobs" <> metavar paramNumber
|
( long "clusterjobs" <> metavar paramNumber
|
||||||
<> help "number of concurrent node accesses per connection"
|
<> help "number of concurrent node accesses per connection"
|
||||||
))
|
))
|
||||||
|
<*> many (strOption
|
||||||
|
( long "directory" <> metavar paramPath
|
||||||
|
<> help "serve repositories in subdirectories of a directory"
|
||||||
|
))
|
||||||
|
|
||||||
seek :: Options -> CommandSeek
|
startAnnex :: Options -> Annex ()
|
||||||
seek o = getAnnexWorkerPool $ \workerpool ->
|
startAnnex o
|
||||||
withP2PConnections workerpool
|
| null (directoryOption o) = ifM ((/=) NoUUID <$> getUUID)
|
||||||
(fromMaybe 1 $ proxyConnectionsOption o)
|
( do
|
||||||
(fromMaybe 1 $ clusterJobsOption o)
|
authenv <- liftIO getAuthEnv
|
||||||
(go workerpool)
|
st <- mkServerState o authenv
|
||||||
where
|
liftIO $ runServer o st
|
||||||
go workerpool servinguuids acquireconn = liftIO $ do
|
-- Run in a git repository that is not a git-annex repository.
|
||||||
|
, liftIO $ startIO o
|
||||||
|
)
|
||||||
|
| otherwise = liftIO $ startIO o
|
||||||
|
|
||||||
|
-- TODO --jobs option only available to startAnnex, not here, need
|
||||||
|
-- to parse it into Options for this command.
|
||||||
|
startIO :: Options -> IO ()
|
||||||
|
startIO o
|
||||||
|
| null (directoryOption o) =
|
||||||
|
giveup "Use the --directory option to specify which git-annex repositories to serve."
|
||||||
|
| otherwise = do
|
||||||
authenv <- getAuthEnv
|
authenv <- getAuthEnv
|
||||||
st <- mkPerRepoServerState acquireconn workerpool $
|
repos <- findRepos o
|
||||||
mkGetServerMode authenv o
|
sts <- forM repos $ \r -> do
|
||||||
let mst = P2PHttpServerState
|
strd <- Annex.new r
|
||||||
{ servedRepos = M.fromList $
|
Annex.eval strd $ mkServerState o authenv
|
||||||
zip servinguuids (repeat st)
|
runServer o (mconcat sts)
|
||||||
}
|
|
||||||
|
runServer :: Options -> P2PHttpServerState -> IO ()
|
||||||
|
runServer o mst = go `finally` serverShutdownCleanup mst
|
||||||
|
where
|
||||||
|
go = do
|
||||||
let settings = Warp.setPort port $ Warp.setHost host $
|
let settings = Warp.setPort port $ Warp.setHost host $
|
||||||
Warp.defaultSettings
|
Warp.defaultSettings
|
||||||
case (certFileOption o, privateKeyFileOption o) of
|
case (certFileOption o, privateKeyFileOption o) of
|
||||||
|
@ -125,7 +151,6 @@ seek o = getAnnexWorkerPool $ \workerpool ->
|
||||||
certfile (chainFileOption o) privatekeyfile
|
certfile (chainFileOption o) privatekeyfile
|
||||||
Warp.runTLS tlssettings settings (p2pHttpApp mst)
|
Warp.runTLS tlssettings settings (p2pHttpApp mst)
|
||||||
_ -> giveup "You must use both --certfile and --privatekeyfile options to enable HTTPS."
|
_ -> giveup "You must use both --certfile and --privatekeyfile options to enable HTTPS."
|
||||||
|
|
||||||
port = maybe
|
port = maybe
|
||||||
(fromIntegral defaultP2PHttpProtocolPort)
|
(fromIntegral defaultP2PHttpProtocolPort)
|
||||||
fromIntegral
|
fromIntegral
|
||||||
|
@ -135,6 +160,14 @@ seek o = getAnnexWorkerPool $ \workerpool ->
|
||||||
fromString
|
fromString
|
||||||
(bindOption o)
|
(bindOption o)
|
||||||
|
|
||||||
|
mkServerState :: Options -> M.Map Auth P2P.ServerMode -> Annex P2PHttpServerState
|
||||||
|
mkServerState o authenv =
|
||||||
|
getAnnexWorkerPool $
|
||||||
|
mkP2PHttpServerState
|
||||||
|
(mkGetServerMode authenv o)
|
||||||
|
(fromMaybe 1 $ proxyConnectionsOption o)
|
||||||
|
(fromMaybe 1 $ clusterJobsOption o)
|
||||||
|
|
||||||
mkGetServerMode :: M.Map Auth P2P.ServerMode -> Options -> GetServerMode
|
mkGetServerMode :: M.Map Auth P2P.ServerMode -> Options -> GetServerMode
|
||||||
mkGetServerMode _ o _ Nothing
|
mkGetServerMode _ o _ Nothing
|
||||||
| wideOpenOption o = ServerMode
|
| wideOpenOption o = ServerMode
|
||||||
|
@ -201,3 +234,11 @@ getAuthEnv = do
|
||||||
case M.lookup user permmap of
|
case M.lookup user permmap of
|
||||||
Nothing -> (auth, P2P.ServeReadWrite)
|
Nothing -> (auth, P2P.ServeReadWrite)
|
||||||
Just perms -> (auth, perms)
|
Just perms -> (auth, perms)
|
||||||
|
|
||||||
|
findRepos :: Options -> IO [Git.Repo]
|
||||||
|
findRepos o = do
|
||||||
|
files <- map toRawFilePath . concat
|
||||||
|
<$> mapM dirContents (directoryOption o)
|
||||||
|
map Git.Construct.newFrom . catMaybes
|
||||||
|
<$> mapM Git.Construct.checkForRepo files
|
||||||
|
|
||||||
|
|
|
@ -42,11 +42,28 @@ import qualified Data.Map.Strict as M
|
||||||
import qualified Data.Set as S
|
import qualified Data.Set as S
|
||||||
import Control.Concurrent.Async
|
import Control.Concurrent.Async
|
||||||
import Data.Time.Clock.POSIX
|
import Data.Time.Clock.POSIX
|
||||||
|
import qualified Data.Semigroup as Sem
|
||||||
|
import Prelude
|
||||||
|
|
||||||
data P2PHttpServerState = P2PHttpServerState
|
data P2PHttpServerState = P2PHttpServerState
|
||||||
{ servedRepos :: M.Map UUID PerRepoServerState
|
{ servedRepos :: M.Map UUID PerRepoServerState
|
||||||
|
, serverShutdownCleanup :: IO ()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
instance Monoid P2PHttpServerState where
|
||||||
|
mempty = P2PHttpServerState
|
||||||
|
{ servedRepos = mempty
|
||||||
|
, serverShutdownCleanup = noop
|
||||||
|
}
|
||||||
|
|
||||||
|
instance Sem.Semigroup P2PHttpServerState where
|
||||||
|
a <> b = P2PHttpServerState
|
||||||
|
{ servedRepos = servedRepos a <> servedRepos b
|
||||||
|
, serverShutdownCleanup = do
|
||||||
|
serverShutdownCleanup a
|
||||||
|
serverShutdownCleanup b
|
||||||
|
}
|
||||||
|
|
||||||
data PerRepoServerState = PerRepoServerState
|
data PerRepoServerState = PerRepoServerState
|
||||||
{ acquireP2PConnection :: AcquireP2PConnection
|
{ acquireP2PConnection :: AcquireP2PConnection
|
||||||
, annexWorkerPool :: AnnexWorkerPool
|
, annexWorkerPool :: AnnexWorkerPool
|
||||||
|
@ -213,13 +230,13 @@ type AcquireP2PConnection
|
||||||
= ConnectionParams
|
= ConnectionParams
|
||||||
-> IO (Either ConnectionProblem P2PConnectionPair)
|
-> IO (Either ConnectionProblem P2PConnectionPair)
|
||||||
|
|
||||||
withP2PConnections
|
mkP2PHttpServerState
|
||||||
:: AnnexWorkerPool
|
:: GetServerMode
|
||||||
-> ProxyConnectionPoolSize
|
-> ProxyConnectionPoolSize
|
||||||
-> ClusterConcurrency
|
-> ClusterConcurrency
|
||||||
-> ([UUID] -> AcquireP2PConnection -> Annex a)
|
-> AnnexWorkerPool
|
||||||
-> Annex a
|
-> Annex P2PHttpServerState
|
||||||
withP2PConnections workerpool proxyconnectionpoolsize clusterconcurrency a = do
|
mkP2PHttpServerState getservermode proxyconnectionpoolsize clusterconcurrency workerpool = do
|
||||||
enableInteractiveBranchAccess
|
enableInteractiveBranchAccess
|
||||||
myuuid <- getUUID
|
myuuid <- getUUID
|
||||||
myproxies <- M.lookup myuuid <$> getProxies
|
myproxies <- M.lookup myuuid <$> getProxies
|
||||||
|
@ -233,7 +250,11 @@ withP2PConnections workerpool proxyconnectionpoolsize clusterconcurrency a = do
|
||||||
liftIO $ atomically $ putTMVar endv ()
|
liftIO $ atomically $ putTMVar endv ()
|
||||||
liftIO $ wait asyncservicer
|
liftIO $ wait asyncservicer
|
||||||
let servinguuids = myuuid : map proxyRemoteUUID (maybe [] S.toList myproxies)
|
let servinguuids = myuuid : map proxyRemoteUUID (maybe [] S.toList myproxies)
|
||||||
a servinguuids (acquireconn reqv) `finally` endit
|
st <- liftIO $ mkPerRepoServerState (acquireconn reqv) workerpool getservermode
|
||||||
|
return $ P2PHttpServerState
|
||||||
|
{ servedRepos = M.fromList $ zip servinguuids (repeat st)
|
||||||
|
, serverShutdownCleanup = endit
|
||||||
|
}
|
||||||
where
|
where
|
||||||
acquireconn reqv connparams = do
|
acquireconn reqv connparams = do
|
||||||
respvar <- newEmptyTMVarIO
|
respvar <- newEmptyTMVarIO
|
||||||
|
|
|
@ -41,10 +41,10 @@ convenient way to download the content of any key, by using the path
|
||||||
|
|
||||||
* `--directory=path`
|
* `--directory=path`
|
||||||
|
|
||||||
Serve each git-annex repository found in a directory. This does not
|
Serve each git-annex repository found in immediate
|
||||||
recurse into subdirectories.
|
subdirectories of a directory.
|
||||||
|
|
||||||
This option can be provided more than once to serve serveral directories
|
This option can be provided more than once to serve several directories
|
||||||
full of git-annex repositories.
|
full of git-annex repositories.
|
||||||
|
|
||||||
New git-annex repositories can be added to the directory, and will be
|
New git-annex repositories can be added to the directory, and will be
|
||||||
|
|
Loading…
Reference in a new issue