Merge branch 'p2phttp-multi'

This commit is contained in:
Joey Hess 2024-11-21 15:16:06 -04:00
commit 757f93203a
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
6 changed files with 276 additions and 117 deletions

View file

@ -23,6 +23,8 @@ git-annex (10.20241032) UNRELEASED; urgency=medium
unnecessary duplicate password prompts. unnecessary duplicate password prompts.
* git-remote-annex: Require git version 2.31 or newer, since old * git-remote-annex: Require git version 2.31 or newer, since old
ones had a buggy git bundle command. ones had a buggy git bundle command.
* p2phttp: Added --directory option which serves multiple git-annex
repositories located inside a directory.
-- Joey Hess <id@joeyh.name> Mon, 11 Nov 2024 12:26:00 -0400 -- Joey Hess <id@joeyh.name> Mon, 11 Nov 2024 12:26:00 -0400

View file

@ -511,15 +511,19 @@ jsonProgressOption =
-- action in `allowConcurrentOutput`. -- action in `allowConcurrentOutput`.
jobsOption :: [AnnexOption] jobsOption :: [AnnexOption]
jobsOption = jobsOption =
[ annexOption (setAnnexState . setConcurrency . ConcurrencyCmdLine) $ [ annexOption (setAnnexState . setConcurrency . ConcurrencyCmdLine)
option (maybeReader parseConcurrency) jobsOptionParser
( long "jobs" <> short 'J'
<> metavar (paramNumber `paramOr` "cpus")
<> help "enable concurrent jobs"
<> hidden
)
] ]
jobsOptionParser :: Parser Concurrency
jobsOptionParser =
option (maybeReader parseConcurrency)
( long "jobs" <> short 'J'
<> metavar (paramNumber `paramOr` "cpus")
<> help "enable concurrent jobs"
<> hidden
)
timeLimitOption :: [AnnexOption] timeLimitOption :: [AnnexOption]
timeLimitOption = timeLimitOption =
[ annexOption settimelimit $ option (eitherReader parseDuration) [ annexOption settimelimit $ option (eitherReader parseDuration)

View file

@ -11,11 +11,16 @@
module Command.P2PHttp where module Command.P2PHttp where
import Command import Command hiding (jobsOption)
import P2P.Http.Server import P2P.Http.Server
import P2P.Http.Url import P2P.Http.Url
import qualified P2P.Protocol as P2P import qualified P2P.Protocol as P2P
import Utility.Env import Utility.Env
import Annex.UUID
import qualified Git
import qualified Git.Construct
import qualified Annex
import Types.Concurrency
import Servant import Servant
import qualified Network.Wai.Handler.Warp as Warp import qualified Network.Wai.Handler.Warp as Warp
@ -23,12 +28,14 @@ import qualified Network.Wai.Handler.WarpTLS as Warp
import Network.Socket (PortNumber) import Network.Socket (PortNumber)
import qualified Data.Map as M import qualified Data.Map as M
import Data.String import Data.String
import Control.Concurrent.STM
cmd :: Command cmd :: Command
cmd = noMessages $ withAnnexOptions [jobsOption] $ cmd = noMessages $ dontCheck repoExists $
command "p2phttp" SectionPlumbing noRepo (startIO <$$> optParser) $
"communicate in P2P protocol over http" command "p2phttp" SectionPlumbing
paramNothing (seek <$$> optParser) "communicate in P2P protocol over http"
paramNothing (startAnnex <$$> optParser)
data Options = Options data Options = Options
{ portOption :: Maybe PortNumber { portOption :: Maybe PortNumber
@ -43,7 +50,9 @@ data Options = Options
, unauthNoLockingOption :: Bool , unauthNoLockingOption :: Bool
, wideOpenOption :: Bool , wideOpenOption :: Bool
, proxyConnectionsOption :: Maybe Integer , proxyConnectionsOption :: Maybe Integer
, jobsOption :: Maybe Concurrency
, clusterJobsOption :: Maybe Int , clusterJobsOption :: Maybe Int
, directoryOption :: [FilePath]
} }
optParser :: CmdParamsDesc -> Parser Options optParser :: CmdParamsDesc -> Parser Options
@ -96,32 +105,80 @@ optParser _ = Options
( long "proxyconnections" <> metavar paramNumber ( long "proxyconnections" <> metavar paramNumber
<> help "maximum number of idle connections when proxying" <> help "maximum number of idle connections when proxying"
)) ))
<*> optional jobsOptionParser
<*> optional (option auto <*> optional (option auto
( long "clusterjobs" <> metavar paramNumber ( long "clusterjobs" <> metavar paramNumber
<> help "number of concurrent node accesses per connection" <> help "number of concurrent node accesses per connection"
)) ))
<*> many (strOption
( long "directory" <> metavar paramPath
<> help "serve repositories in subdirectories of a directory"
))
seek :: Options -> CommandSeek startAnnex :: Options -> Annex ()
seek o = getAnnexWorkerPool $ \workerpool -> startAnnex o
withP2PConnections workerpool | null (directoryOption o) = ifM ((/=) NoUUID <$> getUUID)
(fromMaybe 1 $ proxyConnectionsOption o) ( do
(fromMaybe 1 $ clusterJobsOption o) authenv <- liftIO getAuthEnv
(go workerpool) st <- mkServerState o authenv
where liftIO $ runServer o st
go workerpool acquireconn = liftIO $ do -- Run in a git repository that is not a git-annex repository.
, liftIO $ startIO o
)
| otherwise = liftIO $ startIO o
startIO :: Options -> IO ()
startIO o
| null (directoryOption o) =
giveup "Use the --directory option to specify which git-annex repositories to serve."
| otherwise = do
authenv <- getAuthEnv authenv <- getAuthEnv
st <- mkP2PHttpServerState acquireconn workerpool $ st <- mkst authenv mempty
mkGetServerMode authenv o runServer o st
where
mkst authenv oldst = do
repos <- findRepos o
sts <- forM repos $ \r -> do
strd <- Annex.new r
Annex.eval strd (mkstannex authenv oldst)
return (mconcat sts)
{ updateRepos = updaterepos authenv
}
mkstannex authenv oldst = do
u <- getUUID
if u == NoUUID
then return mempty
else case M.lookup u (servedRepos oldst) of
Nothing -> mkServerState o authenv
Just old -> return $ P2PHttpServerState
{ servedRepos = M.singleton u old
, serverShutdownCleanup = mempty
, updateRepos = mempty
}
updaterepos authenv oldst = do
newst <- mkst authenv oldst
return $ newst
{ serverShutdownCleanup =
serverShutdownCleanup newst
<> serverShutdownCleanup oldst
}
runServer :: Options -> P2PHttpServerState -> IO ()
runServer o mst = go `finally` serverShutdownCleanup mst
where
go = do
let settings = Warp.setPort port $ Warp.setHost host $ let settings = Warp.setPort port $ Warp.setHost host $
Warp.defaultSettings Warp.defaultSettings
mstv <- newTMVarIO mst
case (certFileOption o, privateKeyFileOption o) of case (certFileOption o, privateKeyFileOption o) of
(Nothing, Nothing) -> Warp.runSettings settings (p2pHttpApp st) (Nothing, Nothing) -> Warp.runSettings settings (p2pHttpApp mstv)
(Just certfile, Just privatekeyfile) -> do (Just certfile, Just privatekeyfile) -> do
let tlssettings = Warp.tlsSettingsChain let tlssettings = Warp.tlsSettingsChain
certfile (chainFileOption o) privatekeyfile certfile (chainFileOption o) privatekeyfile
Warp.runTLS tlssettings settings (p2pHttpApp st) Warp.runTLS tlssettings settings (p2pHttpApp mstv)
_ -> giveup "You must use both --certfile and --privatekeyfile options to enable HTTPS." _ -> giveup "You must use both --certfile and --privatekeyfile options to enable HTTPS."
port = maybe port = maybe
(fromIntegral defaultP2PHttpProtocolPort) (fromIntegral defaultP2PHttpProtocolPort)
fromIntegral fromIntegral
@ -131,6 +188,15 @@ seek o = getAnnexWorkerPool $ \workerpool ->
fromString fromString
(bindOption o) (bindOption o)
mkServerState :: Options -> M.Map Auth P2P.ServerMode -> Annex P2PHttpServerState
mkServerState o authenv =
withAnnexWorkerPool (jobsOption o) $
mkP2PHttpServerState
(mkGetServerMode authenv o)
return
(fromMaybe 1 $ proxyConnectionsOption o)
(fromMaybe 1 $ clusterJobsOption o)
mkGetServerMode :: M.Map Auth P2P.ServerMode -> Options -> GetServerMode mkGetServerMode :: M.Map Auth P2P.ServerMode -> Options -> GetServerMode
mkGetServerMode _ o _ Nothing mkGetServerMode _ o _ Nothing
| wideOpenOption o = ServerMode | wideOpenOption o = ServerMode
@ -197,3 +263,11 @@ getAuthEnv = do
case M.lookup user permmap of case M.lookup user permmap of
Nothing -> (auth, P2P.ServeReadWrite) Nothing -> (auth, P2P.ServeReadWrite)
Just perms -> (auth, perms) Just perms -> (auth, perms)
findRepos :: Options -> IO [Git.Repo]
findRepos o = do
files <- map toRawFilePath . concat
<$> mapM dirContents (directoryOption o)
map Git.Construct.newFrom . catMaybes
<$> mapM Git.Construct.checkForRepo files

View file

@ -45,10 +45,10 @@ import Control.Concurrent
import System.IO.Unsafe import System.IO.Unsafe
import Data.Either import Data.Either
p2pHttpApp :: P2PHttpServerState -> Application p2pHttpApp :: TMVar P2PHttpServerState -> Application
p2pHttpApp = serve p2pHttpAPI . serveP2pHttp p2pHttpApp = serve p2pHttpAPI . serveP2pHttp
serveP2pHttp :: P2PHttpServerState -> Server P2PHttpAPI serveP2pHttp :: TMVar P2PHttpServerState -> Server P2PHttpAPI
serveP2pHttp st serveP2pHttp st
= serveGet st = serveGet st
:<|> serveGet st :<|> serveGet st
@ -91,7 +91,7 @@ serveP2pHttp st
:<|> serveGetGeneric st :<|> serveGetGeneric st
serveGetGeneric serveGetGeneric
:: P2PHttpServerState :: TMVar P2PHttpServerState
-> B64UUID ServerSide -> B64UUID ServerSide
-> B64Key -> B64Key
-> Maybe (B64UUID ClientSide) -> Maybe (B64UUID ClientSide)
@ -109,7 +109,7 @@ serveGetGeneric st su@(B64UUID u) k mcu bypass =
serveGet serveGet
:: APIVersion v :: APIVersion v
=> P2PHttpServerState => TMVar P2PHttpServerState
-> B64UUID ServerSide -> B64UUID ServerSide
-> v -> v
-> B64Key -> B64Key
@ -120,8 +120,8 @@ serveGet
-> IsSecure -> IsSecure
-> Maybe Auth -> Maybe Auth
-> Handler (Headers '[DataLengthHeader] (S.SourceT IO B.ByteString)) -> Handler (Headers '[DataLengthHeader] (S.SourceT IO B.ByteString))
serveGet st su apiver (B64Key k) cu bypass baf startat sec auth = do serveGet mst su apiver (B64Key k) cu bypass baf startat sec auth = do
conn <- getP2PConnection apiver st cu su bypass sec auth ReadAction id (conn, st) <- getP2PConnection apiver mst cu su bypass sec auth ReadAction id
bsv <- liftIO newEmptyTMVarIO bsv <- liftIO newEmptyTMVarIO
endv <- liftIO newEmptyTMVarIO endv <- liftIO newEmptyTMVarIO
validityv <- liftIO newEmptyTMVarIO validityv <- liftIO newEmptyTMVarIO
@ -222,7 +222,7 @@ serveGet st su apiver (B64Key k) cu bypass baf startat sec auth = do
serveCheckPresent serveCheckPresent
:: APIVersion v :: APIVersion v
=> P2PHttpServerState => TMVar P2PHttpServerState
-> B64UUID ServerSide -> B64UUID ServerSide
-> v -> v
-> B64Key -> B64Key
@ -233,14 +233,14 @@ serveCheckPresent
-> Handler CheckPresentResult -> Handler CheckPresentResult
serveCheckPresent st su apiver (B64Key k) cu bypass sec auth = do serveCheckPresent st su apiver (B64Key k) cu bypass sec auth = do
res <- withP2PConnection apiver st cu su bypass sec auth ReadAction id res <- withP2PConnection apiver st cu su bypass sec auth ReadAction id
$ \conn -> liftIO $ proxyClientNetProto conn $ checkPresent k $ \(conn, _) -> liftIO $ proxyClientNetProto conn $ checkPresent k
case res of case res of
Right b -> return (CheckPresentResult b) Right b -> return (CheckPresentResult b)
Left err -> throwError $ err500 { errBody = encodeBL err } Left err -> throwError $ err500 { errBody = encodeBL err }
serveRemove serveRemove
:: APIVersion v :: APIVersion v
=> P2PHttpServerState => TMVar P2PHttpServerState
-> (RemoveResultPlus -> t) -> (RemoveResultPlus -> t)
-> B64UUID ServerSide -> B64UUID ServerSide
-> v -> v
@ -252,7 +252,7 @@ serveRemove
-> Handler t -> Handler t
serveRemove st resultmangle su apiver (B64Key k) cu bypass sec auth = do serveRemove st resultmangle su apiver (B64Key k) cu bypass sec auth = do
res <- withP2PConnection apiver st cu su bypass sec auth RemoveAction id res <- withP2PConnection apiver st cu su bypass sec auth RemoveAction id
$ \conn -> $ \(conn, _) ->
liftIO $ proxyClientNetProto conn $ remove Nothing k liftIO $ proxyClientNetProto conn $ remove Nothing k
case res of case res of
(Right b, plusuuids) -> return $ resultmangle $ (Right b, plusuuids) -> return $ resultmangle $
@ -262,7 +262,7 @@ serveRemove st resultmangle su apiver (B64Key k) cu bypass sec auth = do
serveRemoveBefore serveRemoveBefore
:: APIVersion v :: APIVersion v
=> P2PHttpServerState => TMVar P2PHttpServerState
-> B64UUID ServerSide -> B64UUID ServerSide
-> v -> v
-> B64Key -> B64Key
@ -274,7 +274,7 @@ serveRemoveBefore
-> Handler RemoveResultPlus -> Handler RemoveResultPlus
serveRemoveBefore st su apiver (B64Key k) cu bypass (Timestamp ts) sec auth = do serveRemoveBefore st su apiver (B64Key k) cu bypass (Timestamp ts) sec auth = do
res <- withP2PConnection apiver st cu su bypass sec auth RemoveAction id res <- withP2PConnection apiver st cu su bypass sec auth RemoveAction id
$ \conn -> $ \(conn, _) ->
liftIO $ proxyClientNetProto conn $ liftIO $ proxyClientNetProto conn $
removeBeforeRemoteEndTime ts k removeBeforeRemoteEndTime ts k
case res of case res of
@ -285,7 +285,7 @@ serveRemoveBefore st su apiver (B64Key k) cu bypass (Timestamp ts) sec auth = do
serveGetTimestamp serveGetTimestamp
:: APIVersion v :: APIVersion v
=> P2PHttpServerState => TMVar P2PHttpServerState
-> B64UUID ServerSide -> B64UUID ServerSide
-> v -> v
-> B64UUID ClientSide -> B64UUID ClientSide
@ -295,7 +295,7 @@ serveGetTimestamp
-> Handler GetTimestampResult -> Handler GetTimestampResult
serveGetTimestamp st su apiver cu bypass sec auth = do serveGetTimestamp st su apiver cu bypass sec auth = do
res <- withP2PConnection apiver st cu su bypass sec auth ReadAction id res <- withP2PConnection apiver st cu su bypass sec auth ReadAction id
$ \conn -> $ \(conn, _) ->
liftIO $ proxyClientNetProto conn getTimestamp liftIO $ proxyClientNetProto conn getTimestamp
case res of case res of
Right ts -> return $ GetTimestampResult (Timestamp ts) Right ts -> return $ GetTimestampResult (Timestamp ts)
@ -304,7 +304,7 @@ serveGetTimestamp st su apiver cu bypass sec auth = do
servePut servePut
:: APIVersion v :: APIVersion v
=> P2PHttpServerState => TMVar P2PHttpServerState
-> (PutResultPlus -> t) -> (PutResultPlus -> t)
-> B64UUID ServerSide -> B64UUID ServerSide
-> v -> v
@ -319,28 +319,28 @@ servePut
-> IsSecure -> IsSecure
-> Maybe Auth -> Maybe Auth
-> Handler t -> Handler t
servePut st resultmangle su apiver (Just True) _ k cu bypass baf _ _ sec auth = do servePut mst resultmangle su apiver (Just True) _ k cu bypass baf _ _ sec auth = do
res <- withP2PConnection' apiver st cu su bypass sec auth WriteAction res <- withP2PConnection' apiver mst cu su bypass sec auth WriteAction
(\cst -> cst { connectionWaitVar = False }) (liftIO . protoaction) (\cst -> cst { connectionWaitVar = False }) (liftIO . protoaction)
servePutResult resultmangle res servePutResult resultmangle res
where where
protoaction conn = servePutAction st conn k baf $ \_offset -> do protoaction conn = servePutAction conn k baf $ \_offset -> do
net $ sendMessage DATA_PRESENT net $ sendMessage DATA_PRESENT
checkSuccessPlus checkSuccessPlus
servePut st resultmangle su apiver _datapresent (DataLength len) k cu bypass baf moffset stream sec auth = do servePut mst resultmangle su apiver _datapresent (DataLength len) k cu bypass baf moffset stream sec auth = do
validityv <- liftIO newEmptyTMVarIO validityv <- liftIO newEmptyTMVarIO
let validitycheck = local $ runValidityCheck $ let validitycheck = local $ runValidityCheck $
liftIO $ atomically $ readTMVar validityv liftIO $ atomically $ readTMVar validityv
tooshortv <- liftIO newEmptyTMVarIO tooshortv <- liftIO newEmptyTMVarIO
content <- liftIO $ S.unSourceT stream (gather validityv tooshortv) content <- liftIO $ S.unSourceT stream (gather validityv tooshortv)
res <- withP2PConnection' apiver st cu su bypass sec auth WriteAction res <- withP2PConnection' apiver mst cu su bypass sec auth WriteAction
(\cst -> cst { connectionWaitVar = False }) $ \conn -> do (\cst -> cst { connectionWaitVar = False }) $ \(conn, st) -> do
liftIO $ void $ async $ checktooshort conn tooshortv liftIO $ void $ async $ checktooshort conn tooshortv
liftIO (protoaction conn content validitycheck) liftIO (protoaction conn st content validitycheck)
servePutResult resultmangle res servePutResult resultmangle res
where where
protoaction conn content validitycheck = protoaction conn st content validitycheck =
servePutAction st conn k baf $ \offset' -> servePutAction (conn, st) k baf $ \offset' ->
let offsetdelta = offset' - offset let offsetdelta = offset' - offset
in case compare offset' offset of in case compare offset' offset of
EQ -> sendContent' nullMeterUpdate (Len len) EQ -> sendContent' nullMeterUpdate (Len len)
@ -396,13 +396,12 @@ servePut st resultmangle su apiver _datapresent (DataLength len) k cu bypass baf
closeP2PConnection conn closeP2PConnection conn
servePutAction servePutAction
:: P2PHttpServerState :: (P2PConnectionPair, PerRepoServerState)
-> P2PConnectionPair
-> B64Key -> B64Key
-> Maybe B64FilePath -> Maybe B64FilePath
-> (P2P.Protocol.Offset -> Proto (Maybe [UUID])) -> (P2P.Protocol.Offset -> Proto (Maybe [UUID]))
-> IO (Either SomeException (Either ProtoFailure (Maybe [UUID]))) -> IO (Either SomeException (Either ProtoFailure (Maybe [UUID])))
servePutAction st conn (B64Key k) baf a = inAnnexWorker st $ servePutAction (conn, st) (B64Key k) baf a = inAnnexWorker st $
enteringStage (TransferStage Download) $ enteringStage (TransferStage Download) $
runFullProto (clientRunState conn) (clientP2PConnection conn) $ runFullProto (clientRunState conn) (clientP2PConnection conn) $
put' k af a put' k af a
@ -422,7 +421,7 @@ servePutResult resultmangle res = case res of
servePut' servePut'
:: APIVersion v :: APIVersion v
=> P2PHttpServerState => TMVar P2PHttpServerState
-> (PutResultPlus -> t) -> (PutResultPlus -> t)
-> B64UUID ServerSide -> B64UUID ServerSide
-> v -> v
@ -440,7 +439,7 @@ servePut' st resultmangle su v = servePut st resultmangle su v Nothing
servePutOffset servePutOffset
:: APIVersion v :: APIVersion v
=> P2PHttpServerState => TMVar P2PHttpServerState
-> (PutOffsetResultPlus -> t) -> (PutOffsetResultPlus -> t)
-> B64UUID ServerSide -> B64UUID ServerSide
-> v -> v
@ -452,7 +451,7 @@ servePutOffset
-> Handler t -> Handler t
servePutOffset st resultmangle su apiver (B64Key k) cu bypass sec auth = do servePutOffset st resultmangle su apiver (B64Key k) cu bypass sec auth = do
res <- withP2PConnection apiver st cu su bypass sec auth WriteAction res <- withP2PConnection apiver st cu su bypass sec auth WriteAction
(\cst -> cst { connectionWaitVar = False }) $ \conn -> (\cst -> cst { connectionWaitVar = False }) $ \(conn, _) ->
liftIO $ proxyClientNetProto conn $ getPutOffset k af liftIO $ proxyClientNetProto conn $ getPutOffset k af
case res of case res of
Right offset -> return $ resultmangle $ Right offset -> return $ resultmangle $
@ -464,7 +463,7 @@ servePutOffset st resultmangle su apiver (B64Key k) cu bypass sec auth = do
serveLockContent serveLockContent
:: APIVersion v :: APIVersion v
=> P2PHttpServerState => TMVar P2PHttpServerState
-> B64UUID ServerSide -> B64UUID ServerSide
-> v -> v
-> B64Key -> B64Key
@ -473,8 +472,8 @@ serveLockContent
-> IsSecure -> IsSecure
-> Maybe Auth -> Maybe Auth
-> Handler LockResult -> Handler LockResult
serveLockContent st su apiver (B64Key k) cu bypass sec auth = do serveLockContent mst su apiver (B64Key k) cu bypass sec auth = do
conn <- getP2PConnection apiver st cu su bypass sec auth LockAction id (conn, st) <- getP2PConnection apiver mst cu su bypass sec auth LockAction id
let lock = do let lock = do
lockresv <- newEmptyTMVarIO lockresv <- newEmptyTMVarIO
unlockv <- newEmptyTMVarIO unlockv <- newEmptyTMVarIO
@ -501,7 +500,7 @@ serveLockContent st su apiver (B64Key k) cu bypass sec auth = do
serveKeepLocked serveKeepLocked
:: APIVersion v :: APIVersion v
=> P2PHttpServerState => TMVar P2PHttpServerState
-> B64UUID ServerSide -> B64UUID ServerSide
-> v -> v
-> LockID -> LockID
@ -513,15 +512,15 @@ serveKeepLocked
-> Maybe KeepAlive -> Maybe KeepAlive
-> S.SourceT IO UnlockRequest -> S.SourceT IO UnlockRequest
-> Handler LockResult -> Handler LockResult
serveKeepLocked st _su _apiver lckid _cu _bypass sec auth _ _ unlockrequeststream = do serveKeepLocked mst su _apiver lckid _cu _bypass sec auth _ _ unlockrequeststream = do
checkAuthActionClass st sec auth LockAction $ \_ -> do checkAuthActionClass mst su sec auth LockAction $ \st _ -> do
liftIO $ keepingLocked lckid st liftIO $ keepingLocked lckid st
_ <- liftIO $ S.unSourceT unlockrequeststream go _ <- liftIO $ S.unSourceT unlockrequeststream (go st)
return (LockResult False Nothing) return (LockResult False Nothing)
where where
go S.Stop = dropLock lckid st go st S.Stop = dropLock lckid st
go (S.Error _err) = dropLock lckid st go st (S.Error _err) = dropLock lckid st
go (S.Skip s) = go s go st (S.Skip s) = go st s
go (S.Effect ms) = ms >>= go go st (S.Effect ms) = ms >>= go st
go (S.Yield (UnlockRequest False) s) = go s go st (S.Yield (UnlockRequest False) s) = go st s
go (S.Yield (UnlockRequest True) _) = dropLock lckid st go st (S.Yield (UnlockRequest True) _) = dropLock lckid st

View file

@ -26,6 +26,8 @@ import Types.NumCopies
import Types.WorkerPool import Types.WorkerPool
import Annex.WorkerPool import Annex.WorkerPool
import Annex.BranchState import Annex.BranchState
import Annex.Concurrent
import Types.Concurrency
import Types.Cluster import Types.Cluster
import CmdLine.Action (startConcurrency) import CmdLine.Action (startConcurrency)
import Utility.ThreadScheduler import Utility.ThreadScheduler
@ -42,8 +44,37 @@ import qualified Data.Map.Strict as M
import qualified Data.Set as S import qualified Data.Set as S
import Control.Concurrent.Async import Control.Concurrent.Async
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import qualified Data.Semigroup as Sem
import Prelude
data P2PHttpServerState = P2PHttpServerState data P2PHttpServerState = P2PHttpServerState
{ servedRepos :: M.Map UUID PerRepoServerState
, serverShutdownCleanup :: IO ()
, updateRepos :: UpdateRepos
}
type UpdateRepos = P2PHttpServerState -> IO P2PHttpServerState
instance Monoid P2PHttpServerState where
mempty = P2PHttpServerState
{ servedRepos = mempty
, serverShutdownCleanup = noop
, updateRepos = const mempty
}
instance Sem.Semigroup P2PHttpServerState where
a <> b = P2PHttpServerState
{ servedRepos = servedRepos a <> servedRepos b
, serverShutdownCleanup = do
serverShutdownCleanup a
serverShutdownCleanup b
, updateRepos = \st -> do
a' <- updateRepos a st
b' <- updateRepos b st
return (a' <> b')
}
data PerRepoServerState = PerRepoServerState
{ acquireP2PConnection :: AcquireP2PConnection { acquireP2PConnection :: AcquireP2PConnection
, annexWorkerPool :: AnnexWorkerPool , annexWorkerPool :: AnnexWorkerPool
, getServerMode :: GetServerMode , getServerMode :: GetServerMode
@ -62,8 +93,8 @@ data ServerMode
} }
| CannotServeRequests | CannotServeRequests
mkP2PHttpServerState :: AcquireP2PConnection -> AnnexWorkerPool -> GetServerMode -> IO P2PHttpServerState mkPerRepoServerState :: AcquireP2PConnection -> AnnexWorkerPool -> GetServerMode -> IO PerRepoServerState
mkP2PHttpServerState acquireconn annexworkerpool getservermode = P2PHttpServerState mkPerRepoServerState acquireconn annexworkerpool getservermode = PerRepoServerState
<$> pure acquireconn <$> pure acquireconn
<*> pure annexworkerpool <*> pure annexworkerpool
<*> pure getservermode <*> pure getservermode
@ -75,7 +106,7 @@ data ActionClass = ReadAction | WriteAction | RemoveAction | LockAction
withP2PConnection withP2PConnection
:: APIVersion v :: APIVersion v
=> v => v
-> P2PHttpServerState -> TMVar P2PHttpServerState
-> B64UUID ClientSide -> B64UUID ClientSide
-> B64UUID ServerSide -> B64UUID ServerSide
-> [B64UUID Bypass] -> [B64UUID Bypass]
@ -83,10 +114,10 @@ withP2PConnection
-> Maybe Auth -> Maybe Auth
-> ActionClass -> ActionClass
-> (ConnectionParams -> ConnectionParams) -> (ConnectionParams -> ConnectionParams)
-> (P2PConnectionPair -> Handler (Either ProtoFailure a)) -> ((P2PConnectionPair, PerRepoServerState) -> Handler (Either ProtoFailure a))
-> Handler a -> Handler a
withP2PConnection apiver st cu su bypass sec auth actionclass fconnparams connaction = withP2PConnection apiver mst cu su bypass sec auth actionclass fconnparams connaction =
withP2PConnection' apiver st cu su bypass sec auth actionclass fconnparams connaction' withP2PConnection' apiver mst cu su bypass sec auth actionclass fconnparams connaction'
where where
connaction' conn = connaction conn >>= \case connaction' conn = connaction conn >>= \case
Right r -> return r Right r -> return r
@ -96,7 +127,7 @@ withP2PConnection apiver st cu su bypass sec auth actionclass fconnparams connac
withP2PConnection' withP2PConnection'
:: APIVersion v :: APIVersion v
=> v => v
-> P2PHttpServerState -> TMVar P2PHttpServerState
-> B64UUID ClientSide -> B64UUID ClientSide
-> B64UUID ServerSide -> B64UUID ServerSide
-> [B64UUID Bypass] -> [B64UUID Bypass]
@ -104,17 +135,17 @@ withP2PConnection'
-> Maybe Auth -> Maybe Auth
-> ActionClass -> ActionClass
-> (ConnectionParams -> ConnectionParams) -> (ConnectionParams -> ConnectionParams)
-> (P2PConnectionPair -> Handler a) -> ((P2PConnectionPair, PerRepoServerState) -> Handler a)
-> Handler a -> Handler a
withP2PConnection' apiver st cu su bypass sec auth actionclass fconnparams connaction = do withP2PConnection' apiver mst cu su bypass sec auth actionclass fconnparams connaction = do
conn <- getP2PConnection apiver st cu su bypass sec auth actionclass fconnparams (conn, st) <- getP2PConnection apiver mst cu su bypass sec auth actionclass fconnparams
connaction conn connaction (conn, st)
`finally` liftIO (releaseP2PConnection conn) `finally` liftIO (releaseP2PConnection conn)
getP2PConnection getP2PConnection
:: APIVersion v :: APIVersion v
=> v => v
-> P2PHttpServerState -> TMVar P2PHttpServerState
-> B64UUID ClientSide -> B64UUID ClientSide
-> B64UUID ServerSide -> B64UUID ServerSide
-> [B64UUID Bypass] -> [B64UUID Bypass]
@ -122,16 +153,16 @@ getP2PConnection
-> Maybe Auth -> Maybe Auth
-> ActionClass -> ActionClass
-> (ConnectionParams -> ConnectionParams) -> (ConnectionParams -> ConnectionParams)
-> Handler P2PConnectionPair -> Handler (P2PConnectionPair, PerRepoServerState)
getP2PConnection apiver st cu su bypass sec auth actionclass fconnparams = getP2PConnection apiver mst cu su bypass sec auth actionclass fconnparams =
checkAuthActionClass st sec auth actionclass go checkAuthActionClass mst su sec auth actionclass go
where where
go servermode = liftIO (acquireP2PConnection st cp) >>= \case go st servermode = liftIO (acquireP2PConnection st cp) >>= \case
Left (ConnectionFailed err) -> Left (ConnectionFailed err) ->
throwError err502 { errBody = encodeBL err } throwError err502 { errBody = encodeBL err }
Left TooManyConnections -> Left TooManyConnections ->
throwError err503 throwError err503
Right v -> return v Right v -> return (v, st)
where where
cp = fconnparams $ ConnectionParams cp = fconnparams $ ConnectionParams
{ connectionProtocolVersion = protocolVersion apiver { connectionProtocolVersion = protocolVersion apiver
@ -142,30 +173,51 @@ getP2PConnection apiver st cu su bypass sec auth actionclass fconnparams =
, connectionWaitVar = True , connectionWaitVar = True
} }
getPerRepoServerState :: TMVar P2PHttpServerState -> B64UUID ServerSide -> IO (Maybe PerRepoServerState)
getPerRepoServerState mstv su = do
mst <- atomically $ readTMVar mstv
case lookupst mst of
Just st -> return (Just st)
Nothing -> do
mst' <- atomically $ takeTMVar mstv
mst'' <- updateRepos mst' mst'
debug "P2P.Http" $
"Rescanned for repositories, now serving UUIDs: "
++ show (M.keys (servedRepos mst''))
atomically $ putTMVar mstv mst''
return $ lookupst mst''
where
lookupst mst = M.lookup (fromB64UUID su) (servedRepos mst)
checkAuthActionClass checkAuthActionClass
:: P2PHttpServerState :: TMVar P2PHttpServerState
-> B64UUID ServerSide
-> IsSecure -> IsSecure
-> Maybe Auth -> Maybe Auth
-> ActionClass -> ActionClass
-> (P2P.ServerMode -> Handler a) -> (PerRepoServerState -> P2P.ServerMode -> Handler a)
-> Handler a -> Handler a
checkAuthActionClass st sec auth actionclass go = checkAuthActionClass mstv su sec auth actionclass go =
case (sm, actionclass) of liftIO (getPerRepoServerState mstv su) >>= \case
Just st -> select st
Nothing -> throwError err404
where
select st = case (sm, actionclass) of
(ServerMode { serverMode = P2P.ServeReadWrite }, _) -> (ServerMode { serverMode = P2P.ServeReadWrite }, _) ->
go P2P.ServeReadWrite go st P2P.ServeReadWrite
(ServerMode { unauthenticatedLockingAllowed = True }, LockAction) -> (ServerMode { unauthenticatedLockingAllowed = True }, LockAction) ->
go P2P.ServeReadOnly go st P2P.ServeReadOnly
(ServerMode { serverMode = P2P.ServeAppendOnly }, RemoveAction) -> (ServerMode { serverMode = P2P.ServeAppendOnly }, RemoveAction) ->
throwError $ forbiddenWithoutAuth sm throwError $ forbiddenWithoutAuth sm
(ServerMode { serverMode = P2P.ServeAppendOnly }, _) -> (ServerMode { serverMode = P2P.ServeAppendOnly }, _) ->
go P2P.ServeAppendOnly go st P2P.ServeAppendOnly
(ServerMode { serverMode = P2P.ServeReadOnly }, ReadAction) -> (ServerMode { serverMode = P2P.ServeReadOnly }, ReadAction) ->
go P2P.ServeReadOnly go st P2P.ServeReadOnly
(ServerMode { serverMode = P2P.ServeReadOnly }, _) -> (ServerMode { serverMode = P2P.ServeReadOnly }, _) ->
throwError $ forbiddenWithoutAuth sm throwError $ forbiddenWithoutAuth sm
(CannotServeRequests, _) -> throwError basicAuthRequired (CannotServeRequests, _) -> throwError basicAuthRequired
where where
sm = getServerMode st sec auth sm = getServerMode st sec auth
forbiddenAction :: ServerError forbiddenAction :: ServerError
forbiddenAction = err403 forbiddenAction = err403
@ -204,13 +256,14 @@ type AcquireP2PConnection
= ConnectionParams = ConnectionParams
-> IO (Either ConnectionProblem P2PConnectionPair) -> IO (Either ConnectionProblem P2PConnectionPair)
withP2PConnections mkP2PHttpServerState
:: AnnexWorkerPool :: GetServerMode
-> UpdateRepos
-> ProxyConnectionPoolSize -> ProxyConnectionPoolSize
-> ClusterConcurrency -> ClusterConcurrency
-> (AcquireP2PConnection -> Annex a) -> AnnexWorkerPool
-> Annex a -> Annex P2PHttpServerState
withP2PConnections workerpool proxyconnectionpoolsize clusterconcurrency a = do mkP2PHttpServerState getservermode updaterepos proxyconnectionpoolsize clusterconcurrency workerpool = do
enableInteractiveBranchAccess enableInteractiveBranchAccess
myuuid <- getUUID myuuid <- getUUID
myproxies <- M.lookup myuuid <$> getProxies myproxies <- M.lookup myuuid <$> getProxies
@ -223,7 +276,13 @@ withP2PConnections workerpool proxyconnectionpoolsize clusterconcurrency a = do
let endit = do let endit = do
liftIO $ atomically $ putTMVar endv () liftIO $ atomically $ putTMVar endv ()
liftIO $ wait asyncservicer liftIO $ wait asyncservicer
a (acquireconn reqv) `finally` endit let servinguuids = myuuid : map proxyRemoteUUID (maybe [] S.toList myproxies)
st <- liftIO $ mkPerRepoServerState (acquireconn reqv) workerpool getservermode
return $ P2PHttpServerState
{ servedRepos = M.fromList $ zip servinguuids (repeat st)
, serverShutdownCleanup = endit
, updateRepos = updaterepos
}
where where
acquireconn reqv connparams = do acquireconn reqv connparams = do
respvar <- newEmptyTMVarIO respvar <- newEmptyTMVarIO
@ -487,13 +546,13 @@ mkLocker lock unlock = do
wait locktid wait locktid
return Nothing return Nothing
storeLock :: LockID -> Locker -> P2PHttpServerState -> IO () storeLock :: LockID -> Locker -> PerRepoServerState -> IO ()
storeLock lckid locker st = atomically $ do storeLock lckid locker st = atomically $ do
m <- takeTMVar (openLocks st) m <- takeTMVar (openLocks st)
let !m' = M.insert lckid locker m let !m' = M.insert lckid locker m
putTMVar (openLocks st) m' putTMVar (openLocks st) m'
keepingLocked :: LockID -> P2PHttpServerState -> IO () keepingLocked :: LockID -> PerRepoServerState -> IO ()
keepingLocked lckid st = do keepingLocked lckid st = do
m <- atomically $ readTMVar (openLocks st) m <- atomically $ readTMVar (openLocks st)
case M.lookup lckid m of case M.lookup lckid m of
@ -502,7 +561,7 @@ keepingLocked lckid st = do
atomically $ void $ atomically $ void $
tryPutTMVar (lockerTimeoutDisable locker) () tryPutTMVar (lockerTimeoutDisable locker) ()
dropLock :: LockID -> P2PHttpServerState -> IO () dropLock :: LockID -> PerRepoServerState -> IO ()
dropLock lckid st = do dropLock lckid st = do
v <- atomically $ do v <- atomically $ do
m <- takeTMVar (openLocks st) m <- takeTMVar (openLocks st)
@ -520,13 +579,15 @@ dropLock lckid st = do
Nothing -> return () Nothing -> return ()
Just locker -> wait (lockerThread locker) Just locker -> wait (lockerThread locker)
getAnnexWorkerPool :: (AnnexWorkerPool -> Annex a) -> Annex a withAnnexWorkerPool :: (Maybe Concurrency) -> (AnnexWorkerPool -> Annex a) -> Annex a
getAnnexWorkerPool a = startConcurrency transferStages $ withAnnexWorkerPool mc a = do
Annex.getState Annex.workers >>= \case maybe noop (setConcurrency . ConcurrencyCmdLine) mc
Nothing -> giveup "Use -Jn or set annex.jobs to configure the number of worker threads." startConcurrency transferStages $
Just wp -> a wp Annex.getState Annex.workers >>= \case
Nothing -> giveup "Use -Jn or set annex.jobs to configure the number of worker threads."
Just wp -> a wp
inAnnexWorker :: P2PHttpServerState -> Annex a -> IO (Either SomeException a) inAnnexWorker :: PerRepoServerState -> Annex a -> IO (Either SomeException a)
inAnnexWorker st = inAnnexWorker' (annexWorkerPool st) inAnnexWorker st = inAnnexWorker' (annexWorkerPool st)
inAnnexWorker' :: AnnexWorkerPool -> Annex a -> IO (Either SomeException a) inAnnexWorker' :: AnnexWorkerPool -> Annex a -> IO (Either SomeException a)

View file

@ -12,8 +12,12 @@ This is a HTTP server for the git-annex API.
It is the git-annex equivilant of git-http-backend(1), for serving It is the git-annex equivilant of git-http-backend(1), for serving
a repository over HTTP with write access for authenticated users. a repository over HTTP with write access for authenticated users.
This does not serve the git repository over HTTP, only the git-annex This does not serve a git repository over HTTP, only the git-annex
API. API.
By default, this serves the git-annex API for the git-annex repository
in the current working directory. It can also serve more than one
repository, see the `--directory` parameter.
Typically a remote will have `remote.name.url` set to a http url Typically a remote will have `remote.name.url` set to a http url
as usual, and `remote.name.annexUrl` set to an annex+http url such as as usual, and `remote.name.annexUrl` set to an annex+http url such as
@ -35,10 +39,25 @@ convenient way to download the content of any key, by using the path
# OPTIONS # OPTIONS
* `--directory=path`
Serve each git-annex repository found in immediate
subdirectories of a directory.
This option can be provided more than once to serve several directories
full of git-annex repositories.
New git-annex repositories can be added to the directory, and will be
noticed and served immediately. There is no need to restart the server.
When a git-annex repository is removed from the directory, the server
will stop serving it as well. This may not be immediate, as some files
in the deleted repository may still be open.
* `--jobs=N` `-JN` * `--jobs=N` `-JN`
This or annex.jobs must be set to configure the number of worker This or annex.jobs must be set to configure the number of worker
threads that serve connections to the webserver. threads, per repository served, that serve connections to the webserver.
Since the webserver itself also uses one of these threads, Since the webserver itself also uses one of these threads,
this needs to be set to 2 or more. this needs to be set to 2 or more.
@ -47,15 +66,15 @@ convenient way to download the content of any key, by using the path
* `--proxyconnections=N` * `--proxyconnections=N`
When this command is run in a repository that is configured to act as a When serving a repository that is configured to act as a proxy for some
proxy for some of its remotes, this is the maximum number of idle of its remotes, this is the maximum number of idle connections to keep
connections to keep open to proxied remotes. open to proxied remotes.
The default is 1. The default is 1.
* `--clusterjobs=N` * `--clusterjobs=N`
When this command is run in a repository that is a gateway for a cluster, When serving a repository that is a gateway for a cluster,
this is the number of concurrent jobs to use to access nodes of the this is the number of concurrent jobs to use to access nodes of the
cluster, per connection to the webserver. cluster, per connection to the webserver.