2024-07-09 01:11:01 +00:00
|
|
|
{- P2P protocol over HTTP, server state
|
|
|
|
-
|
|
|
|
- https://git-annex.branchable.com/design/p2p_protocol_over_http/
|
|
|
|
-
|
|
|
|
- Copyright 2024 Joey Hess <id@joeyh.name>
|
|
|
|
-
|
|
|
|
- Licensed under the GNU AGPL version 3 or higher.
|
|
|
|
-}
|
|
|
|
|
|
|
|
{-# LANGUAGE BangPatterns #-}
|
2024-07-10 13:13:01 +00:00
|
|
|
{-# LANGUAGE OverloadedStrings #-}
|
2024-07-10 20:06:39 +00:00
|
|
|
{-# LANGUAGE FlexibleContexts #-}
|
2024-07-09 01:11:01 +00:00
|
|
|
|
|
|
|
module P2P.Http.State where
|
|
|
|
|
|
|
|
import Annex.Common
|
2024-07-10 16:19:47 +00:00
|
|
|
import qualified Annex
|
2024-07-09 01:11:01 +00:00
|
|
|
import P2P.Http.Types
|
2024-07-09 13:08:42 +00:00
|
|
|
import qualified P2P.Protocol as P2P
|
2024-07-11 13:55:17 +00:00
|
|
|
import qualified P2P.IO as P2P
|
2024-07-09 17:37:55 +00:00
|
|
|
import P2P.IO
|
|
|
|
import P2P.Annex
|
|
|
|
import Annex.UUID
|
2024-07-10 16:19:47 +00:00
|
|
|
import Types.WorkerPool
|
|
|
|
import Annex.WorkerPool
|
|
|
|
import CmdLine.Action (startConcurrency)
|
2024-07-09 01:11:01 +00:00
|
|
|
|
2024-07-09 17:37:55 +00:00
|
|
|
import Servant
|
2024-07-09 01:11:01 +00:00
|
|
|
import qualified Data.Map as M
|
|
|
|
import Control.Concurrent.Async
|
|
|
|
import Control.Concurrent.STM
|
|
|
|
|
|
|
|
data P2PHttpServerState = P2PHttpServerState
|
2024-07-09 17:37:55 +00:00
|
|
|
{ acquireP2PConnection :: AcquireP2PConnection
|
2024-07-10 16:19:47 +00:00
|
|
|
, annexWorkerPool :: AnnexWorkerPool
|
2024-07-09 21:30:55 +00:00
|
|
|
, getServerMode :: GetServerMode
|
2024-07-09 17:37:55 +00:00
|
|
|
, openLocks :: TMVar (M.Map LockID Locker)
|
2024-07-09 01:11:01 +00:00
|
|
|
}
|
|
|
|
|
2024-07-10 16:19:47 +00:00
|
|
|
type AnnexWorkerPool = TMVar (WorkerPool (Annex.AnnexState, Annex.AnnexRead))
|
|
|
|
|
|
|
|
-- Nothing when the server is not allowed to serve any requests.
|
|
|
|
type GetServerMode = IsSecure -> Maybe Auth -> Maybe P2P.ServerMode
|
|
|
|
|
|
|
|
mkP2PHttpServerState :: AcquireP2PConnection -> AnnexWorkerPool -> GetServerMode -> IO P2PHttpServerState
|
|
|
|
mkP2PHttpServerState acquireconn annexworkerpool getservermode = P2PHttpServerState
|
2024-07-09 17:37:55 +00:00
|
|
|
<$> pure acquireconn
|
2024-07-10 16:19:47 +00:00
|
|
|
<*> pure annexworkerpool
|
2024-07-09 21:30:55 +00:00
|
|
|
<*> pure getservermode
|
2024-07-09 17:37:55 +00:00
|
|
|
<*> newTMVarIO mempty
|
2024-07-09 01:11:01 +00:00
|
|
|
|
2024-07-10 00:52:56 +00:00
|
|
|
data ActionClass = ReadAction | WriteAction | RemoveAction
|
|
|
|
deriving (Eq)
|
|
|
|
|
2024-07-09 17:37:55 +00:00
|
|
|
withP2PConnection
|
|
|
|
:: APIVersion v
|
|
|
|
=> v
|
|
|
|
-> P2PHttpServerState
|
2024-07-09 13:08:42 +00:00
|
|
|
-> B64UUID ClientSide
|
|
|
|
-> B64UUID ServerSide
|
|
|
|
-> [B64UUID Bypass]
|
2024-07-10 03:44:40 +00:00
|
|
|
-> IsSecure
|
2024-07-10 00:52:56 +00:00
|
|
|
-> Maybe Auth
|
|
|
|
-> ActionClass
|
2024-07-22 15:26:22 +00:00
|
|
|
-> (ConnectionParams -> ConnectionParams)
|
2024-07-11 13:55:17 +00:00
|
|
|
-> (P2PConnectionPair -> Handler (Either ProtoFailure a))
|
2024-07-09 17:37:55 +00:00
|
|
|
-> Handler a
|
2024-07-22 15:26:22 +00:00
|
|
|
withP2PConnection apiver st cu su bypass sec auth actionclass fconnparams connaction =
|
|
|
|
withP2PConnection' apiver st cu su bypass sec auth actionclass fconnparams connaction'
|
2024-07-10 20:06:39 +00:00
|
|
|
where
|
2024-07-11 13:55:17 +00:00
|
|
|
connaction' conn = connaction conn >>= \case
|
2024-07-10 20:06:39 +00:00
|
|
|
Right r -> return r
|
|
|
|
Left err -> throwError $
|
|
|
|
err500 { errBody = encodeBL (describeProtoFailure err) }
|
|
|
|
|
2024-07-22 15:26:22 +00:00
|
|
|
withP2PConnection'
|
|
|
|
:: APIVersion v
|
|
|
|
=> v
|
|
|
|
-> P2PHttpServerState
|
|
|
|
-> B64UUID ClientSide
|
|
|
|
-> B64UUID ServerSide
|
|
|
|
-> [B64UUID Bypass]
|
|
|
|
-> IsSecure
|
|
|
|
-> Maybe Auth
|
|
|
|
-> ActionClass
|
|
|
|
-> (ConnectionParams -> ConnectionParams)
|
|
|
|
-> (P2PConnectionPair -> Handler a)
|
|
|
|
-> Handler a
|
|
|
|
withP2PConnection' apiver st cu su bypass sec auth actionclass fconnparams connaction = do
|
|
|
|
conn <- getP2PConnection apiver st cu su bypass sec auth actionclass fconnparams
|
|
|
|
connaction conn
|
|
|
|
`finally` liftIO (releaseP2PConnection conn)
|
|
|
|
|
2024-07-10 20:06:39 +00:00
|
|
|
getP2PConnection
|
|
|
|
:: APIVersion v
|
|
|
|
=> v
|
|
|
|
-> P2PHttpServerState
|
|
|
|
-> B64UUID ClientSide
|
|
|
|
-> B64UUID ServerSide
|
|
|
|
-> [B64UUID Bypass]
|
|
|
|
-> IsSecure
|
|
|
|
-> Maybe Auth
|
|
|
|
-> ActionClass
|
2024-07-22 14:20:18 +00:00
|
|
|
-> (ConnectionParams -> ConnectionParams)
|
2024-07-11 13:55:17 +00:00
|
|
|
-> Handler P2PConnectionPair
|
2024-07-22 14:20:18 +00:00
|
|
|
getP2PConnection apiver st cu su bypass sec auth actionclass fconnparams =
|
2024-07-10 03:44:40 +00:00
|
|
|
case (getServerMode st sec auth, actionclass) of
|
2024-07-10 00:52:56 +00:00
|
|
|
(Just P2P.ServeReadWrite, _) -> go P2P.ServeReadWrite
|
|
|
|
(Just P2P.ServeAppendOnly, RemoveAction) -> throwError err403
|
|
|
|
(Just P2P.ServeAppendOnly, _) -> go P2P.ServeAppendOnly
|
|
|
|
(Just P2P.ServeReadOnly, ReadAction) -> go P2P.ServeReadOnly
|
|
|
|
(Just P2P.ServeReadOnly, _) -> throwError err403
|
2024-07-10 13:13:01 +00:00
|
|
|
(Nothing, _) -> throwError basicAuthRequired
|
2024-07-10 00:52:56 +00:00
|
|
|
where
|
|
|
|
go servermode = liftIO (acquireP2PConnection st cp) >>= \case
|
2024-07-09 17:37:55 +00:00
|
|
|
Left (ConnectionFailed err) ->
|
|
|
|
throwError err502 { errBody = encodeBL err }
|
|
|
|
Left TooManyConnections ->
|
|
|
|
throwError err503
|
2024-07-10 20:06:39 +00:00
|
|
|
Right v -> return v
|
2024-07-10 00:52:56 +00:00
|
|
|
where
|
2024-07-22 14:20:18 +00:00
|
|
|
cp = fconnparams $ ConnectionParams
|
2024-07-10 00:52:56 +00:00
|
|
|
{ connectionProtocolVersion = protocolVersion apiver
|
|
|
|
, connectionServerUUID = fromB64UUID su
|
|
|
|
, connectionClientUUID = fromB64UUID cu
|
|
|
|
, connectionBypass = map fromB64UUID bypass
|
|
|
|
, connectionServerMode = servermode
|
2024-07-22 14:20:18 +00:00
|
|
|
, connectionWaitVar = True
|
2024-07-10 00:52:56 +00:00
|
|
|
}
|
2024-07-10 13:13:01 +00:00
|
|
|
|
|
|
|
basicAuthRequired :: ServerError
|
|
|
|
basicAuthRequired = err401 { errHeaders = [(h, v)] }
|
|
|
|
where
|
|
|
|
h = "WWW-Authenticate"
|
|
|
|
v = "Basic realm=\"git-annex\", charset=\"UTF-8\""
|
2024-07-09 17:37:55 +00:00
|
|
|
|
|
|
|
data ConnectionParams = ConnectionParams
|
|
|
|
{ connectionProtocolVersion :: P2P.ProtocolVersion
|
|
|
|
, connectionServerUUID :: UUID
|
|
|
|
, connectionClientUUID :: UUID
|
|
|
|
, connectionBypass :: [UUID]
|
|
|
|
, connectionServerMode :: P2P.ServerMode
|
2024-07-22 14:20:18 +00:00
|
|
|
, connectionWaitVar :: Bool
|
2024-07-09 17:37:55 +00:00
|
|
|
}
|
|
|
|
deriving (Show, Eq, Ord)
|
|
|
|
|
|
|
|
data ConnectionProblem
|
|
|
|
= ConnectionFailed String
|
|
|
|
| TooManyConnections
|
|
|
|
deriving (Show, Eq)
|
|
|
|
|
2024-07-11 13:55:17 +00:00
|
|
|
data P2PConnectionPair = P2PConnectionPair
|
|
|
|
{ clientRunState :: RunState
|
|
|
|
, clientP2PConnection :: P2PConnection
|
|
|
|
, serverP2PConnection :: P2PConnection
|
|
|
|
, releaseP2PConnection :: IO ()
|
2024-07-22 16:30:30 +00:00
|
|
|
-- ^ Releases a P2P connection, which can be reused for other
|
|
|
|
-- requests.
|
|
|
|
, closeP2PConnection :: IO ()
|
|
|
|
-- ^ Closes a P2P connection, which is in a state where it is no
|
|
|
|
-- longer usable.
|
2024-07-11 13:55:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
proxyClientNetProto :: P2PConnectionPair -> P2P.Proto a -> IO (Either P2P.ProtoFailure a)
|
|
|
|
proxyClientNetProto conn = runNetProto
|
|
|
|
(clientRunState conn) (clientP2PConnection conn)
|
2024-07-09 17:37:55 +00:00
|
|
|
|
2024-07-11 13:55:17 +00:00
|
|
|
type AcquireP2PConnection
|
|
|
|
= ConnectionParams
|
|
|
|
-> IO (Either ConnectionProblem P2PConnectionPair)
|
2024-07-10 20:06:39 +00:00
|
|
|
|
|
|
|
{- Acquire P2P connections to the local repository. -}
|
2024-07-11 18:37:52 +00:00
|
|
|
withLocalP2PConnections :: AnnexWorkerPool -> (AcquireP2PConnection -> Annex a) -> Annex a
|
|
|
|
withLocalP2PConnections workerpool a = do
|
|
|
|
myuuid <- getUUID
|
2024-07-09 17:37:55 +00:00
|
|
|
reqv <- liftIO newEmptyTMVarIO
|
|
|
|
relv <- liftIO newEmptyTMVarIO
|
2024-07-11 18:37:52 +00:00
|
|
|
asyncservicer <- liftIO $ async $ servicer myuuid reqv relv
|
2024-07-09 17:37:55 +00:00
|
|
|
a (acquireconn reqv) `finally` join (liftIO (wait asyncservicer))
|
|
|
|
where
|
|
|
|
acquireconn reqv connparams = do
|
|
|
|
respvar <- newEmptyTMVarIO
|
2024-07-10 20:06:39 +00:00
|
|
|
atomically $ putTMVar reqv (connparams, respvar)
|
|
|
|
atomically $ takeTMVar respvar
|
2024-07-09 17:37:55 +00:00
|
|
|
|
2024-07-11 18:37:52 +00:00
|
|
|
servicer myuuid reqv relv = do
|
2024-07-09 17:37:55 +00:00
|
|
|
reqrel <- liftIO $
|
|
|
|
atomically $
|
|
|
|
(Right <$> takeTMVar reqv)
|
|
|
|
`orElse`
|
|
|
|
(Left <$> takeTMVar relv)
|
|
|
|
case reqrel of
|
|
|
|
Right (connparams, respvar) ->
|
2024-07-11 18:37:52 +00:00
|
|
|
servicereq myuuid relv connparams
|
|
|
|
>>= atomically . putTMVar respvar
|
2024-07-09 17:37:55 +00:00
|
|
|
Left releaseconn -> releaseconn
|
2024-07-11 18:37:52 +00:00
|
|
|
servicer myuuid reqv relv
|
2024-07-09 17:37:55 +00:00
|
|
|
|
2024-07-11 18:37:52 +00:00
|
|
|
servicereq myuuid relv connparams
|
|
|
|
| connectionServerUUID connparams /= myuuid =
|
|
|
|
return $ Left $ ConnectionFailed "unknown uuid"
|
|
|
|
| otherwise = mkP2PConnectionPair connparams relv $
|
|
|
|
\serverrunst serverconn -> inAnnexWorker' workerpool $
|
|
|
|
void $ runFullProto serverrunst serverconn $
|
|
|
|
P2P.serveOneCommandAuthed
|
|
|
|
(connectionServerMode connparams)
|
|
|
|
(connectionServerUUID connparams)
|
2024-07-09 17:37:55 +00:00
|
|
|
|
2024-07-11 18:37:52 +00:00
|
|
|
mkP2PConnectionPair
|
|
|
|
:: ConnectionParams
|
|
|
|
-> TMVar (IO ())
|
|
|
|
-> (RunState -> P2PConnection -> IO (Either SomeException ()))
|
|
|
|
-> IO (Either ConnectionProblem P2PConnectionPair)
|
|
|
|
mkP2PConnectionPair connparams relv startworker = do
|
|
|
|
hdl1 <- newEmptyTMVarIO
|
|
|
|
hdl2 <- newEmptyTMVarIO
|
|
|
|
wait1 <- newEmptyTMVarIO
|
|
|
|
wait2 <- newEmptyTMVarIO
|
2024-07-22 14:20:18 +00:00
|
|
|
let h1 = P2PHandleTMVar hdl1 $
|
|
|
|
if connectionWaitVar connparams
|
|
|
|
then Just wait1
|
|
|
|
else Nothing
|
|
|
|
let h2 = P2PHandleTMVar hdl2 $
|
|
|
|
if connectionWaitVar connparams
|
|
|
|
then Just wait2
|
|
|
|
else Nothing
|
2024-07-11 18:37:52 +00:00
|
|
|
let serverconn = P2PConnection Nothing
|
|
|
|
(const True) h1 h2
|
|
|
|
(ConnIdent (Just "http server"))
|
|
|
|
let clientconn = P2PConnection Nothing
|
|
|
|
(const True) h2 h1
|
|
|
|
(ConnIdent (Just "http client"))
|
|
|
|
clientrunst <- mkclientrunst
|
|
|
|
serverrunst <- mkserverrunst
|
|
|
|
asyncworker <- async $
|
|
|
|
startworker serverrunst serverconn
|
2024-07-22 16:30:30 +00:00
|
|
|
let releaseconn = atomically $ void $ tryPutTMVar relv $
|
2024-07-11 18:37:52 +00:00
|
|
|
liftIO $ wait asyncworker
|
|
|
|
>>= either throwM return
|
|
|
|
return $ Right $ P2PConnectionPair
|
|
|
|
{ clientRunState = clientrunst
|
|
|
|
, clientP2PConnection = clientconn
|
|
|
|
, serverP2PConnection = serverconn
|
|
|
|
, releaseP2PConnection = releaseconn
|
2024-07-22 16:30:30 +00:00
|
|
|
, closeP2PConnection = releaseconn
|
2024-07-11 18:37:52 +00:00
|
|
|
}
|
|
|
|
where
|
|
|
|
mkserverrunst = do
|
2024-07-09 17:37:55 +00:00
|
|
|
prototvar <- newTVarIO $ connectionProtocolVersion connparams
|
|
|
|
mkRunState $ const $ Serving
|
|
|
|
(connectionClientUUID connparams)
|
|
|
|
Nothing
|
|
|
|
prototvar
|
2024-07-11 11:46:52 +00:00
|
|
|
|
2024-07-11 18:37:52 +00:00
|
|
|
mkclientrunst = do
|
2024-07-11 11:46:52 +00:00
|
|
|
prototvar <- newTVarIO $ connectionProtocolVersion connparams
|
|
|
|
mkRunState $ const $ Client prototvar
|
2024-07-09 13:08:42 +00:00
|
|
|
|
2024-07-09 01:11:01 +00:00
|
|
|
data Locker = Locker
|
|
|
|
{ lockerThread :: Async ()
|
|
|
|
, lockerVar :: TMVar Bool
|
|
|
|
-- ^ Left empty until the thread has taken the lock
|
|
|
|
-- (or failed to do so), then True while the lock is held,
|
|
|
|
-- and setting to False causes the lock to be released.
|
|
|
|
}
|
|
|
|
|
|
|
|
mkLocker :: IO () -> IO () -> IO (Maybe (Locker, LockID))
|
|
|
|
mkLocker lock unlock = do
|
|
|
|
lv <- newEmptyTMVarIO
|
|
|
|
let setlocked = putTMVar lv
|
|
|
|
tid <- async $
|
|
|
|
tryNonAsync lock >>= \case
|
|
|
|
Left _ -> do
|
|
|
|
atomically $ setlocked False
|
|
|
|
unlock
|
|
|
|
Right () -> do
|
|
|
|
atomically $ setlocked True
|
|
|
|
atomically $ do
|
|
|
|
v <- takeTMVar lv
|
|
|
|
if v
|
|
|
|
then retry
|
|
|
|
else setlocked False
|
|
|
|
unlock
|
|
|
|
locksuccess <- atomically $ readTMVar lv
|
|
|
|
if locksuccess
|
|
|
|
then do
|
|
|
|
lckid <- B64UUID <$> genUUID
|
|
|
|
return (Just (Locker tid lv, lckid))
|
|
|
|
else do
|
|
|
|
wait tid
|
|
|
|
return Nothing
|
|
|
|
|
|
|
|
storeLock :: LockID -> Locker -> P2PHttpServerState -> IO ()
|
|
|
|
storeLock lckid locker st = atomically $ do
|
|
|
|
m <- takeTMVar (openLocks st)
|
|
|
|
let !m' = M.insert lckid locker m
|
|
|
|
putTMVar (openLocks st) m'
|
|
|
|
|
|
|
|
dropLock :: LockID -> P2PHttpServerState -> IO ()
|
|
|
|
dropLock lckid st = do
|
|
|
|
v <- atomically $ do
|
|
|
|
m <- takeTMVar (openLocks st)
|
|
|
|
let (mlocker, !m') =
|
|
|
|
M.updateLookupWithKey (\_ _ -> Nothing) lckid m
|
|
|
|
putTMVar (openLocks st) m'
|
|
|
|
case mlocker of
|
|
|
|
Nothing -> return Nothing
|
|
|
|
-- Signal to the locker's thread that it can release the lock.
|
|
|
|
Just locker -> do
|
|
|
|
_ <- swapTMVar (lockerVar locker) False
|
|
|
|
return (Just locker)
|
|
|
|
case v of
|
|
|
|
Nothing -> return ()
|
|
|
|
Just locker -> wait (lockerThread locker)
|
2024-07-10 16:19:47 +00:00
|
|
|
|
|
|
|
getAnnexWorkerPool :: (AnnexWorkerPool -> Annex a) -> Annex a
|
|
|
|
getAnnexWorkerPool a = startConcurrency transferStages $
|
|
|
|
Annex.getState Annex.workers >>= \case
|
|
|
|
Nothing -> giveup "Use -Jn or set annex.jobs to configure the number of worker threads."
|
|
|
|
Just wp -> a wp
|
|
|
|
|
|
|
|
inAnnexWorker :: P2PHttpServerState -> Annex a -> IO (Either SomeException a)
|
2024-07-11 18:37:52 +00:00
|
|
|
inAnnexWorker st = inAnnexWorker' (annexWorkerPool st)
|
|
|
|
|
|
|
|
inAnnexWorker' :: AnnexWorkerPool -> Annex a -> IO (Either SomeException a)
|
|
|
|
inAnnexWorker' poolv annexaction = do
|
|
|
|
(workerstrd, workerstage) <- atomically $ waitStartWorkerSlot poolv
|
2024-07-10 16:19:47 +00:00
|
|
|
resv <- newEmptyTMVarIO
|
|
|
|
aid <- async $ do
|
|
|
|
(res, strd) <- Annex.run workerstrd annexaction
|
|
|
|
atomically $ putTMVar resv res
|
|
|
|
return strd
|
|
|
|
atomically $ do
|
2024-07-11 18:37:52 +00:00
|
|
|
pool <- takeTMVar poolv
|
2024-07-10 16:19:47 +00:00
|
|
|
let !pool' = addWorkerPool (ActiveWorker aid workerstage) pool
|
2024-07-11 18:37:52 +00:00
|
|
|
putTMVar poolv pool'
|
2024-07-10 16:19:47 +00:00
|
|
|
(res, workerstrd') <- waitCatch aid >>= \case
|
|
|
|
Right strd -> do
|
|
|
|
r <- atomically $ takeTMVar resv
|
|
|
|
return (Right r, strd)
|
|
|
|
Left err -> return (Left err, workerstrd)
|
|
|
|
atomically $ do
|
2024-07-11 18:37:52 +00:00
|
|
|
pool <- takeTMVar poolv
|
2024-07-10 16:19:47 +00:00
|
|
|
let !pool' = deactivateWorker pool aid workerstrd'
|
2024-07-11 18:37:52 +00:00
|
|
|
putTMVar poolv pool'
|
2024-07-10 16:19:47 +00:00
|
|
|
return res
|