diff --git a/Command/P2PHttp.hs b/Command/P2PHttp.hs index b3924d74be..29f8c54280 100644 --- a/Command/P2PHttp.hs +++ b/Command/P2PHttp.hs @@ -75,7 +75,7 @@ seek o = getAnnexWorkerPool $ \workerpool -> do liftIO $ putStrLn "test begins" testGet giveup "TEST DONE" - withLocalP2PConnections $ \acquireconn -> liftIO $ do + withLocalP2PConnections workerpool $ \acquireconn -> liftIO $ do authenv <- getAuthEnv st <- mkP2PHttpServerState acquireconn workerpool $ mkGetServerMode authenv o diff --git a/P2P/Http/State.hs b/P2P/Http/State.hs index 8e90bc3025..f886627130 100644 --- a/P2P/Http/State.hs +++ b/P2P/Http/State.hs @@ -21,7 +21,6 @@ import qualified P2P.IO as P2P import P2P.IO import P2P.Annex import Annex.UUID -import Annex.Concurrent import Types.WorkerPool import Annex.WorkerPool import CmdLine.Action (startConcurrency) @@ -146,15 +145,12 @@ type AcquireP2PConnection -> IO (Either ConnectionProblem P2PConnectionPair) {- Acquire P2P connections to the local repository. -} --- TODO need worker pool, this can only service a single request at --- a time. --- TODO proxies --- TODO clusters -withLocalP2PConnections :: (AcquireP2PConnection -> Annex a) -> Annex a -withLocalP2PConnections a = do +withLocalP2PConnections :: AnnexWorkerPool -> (AcquireP2PConnection -> Annex a) -> Annex a +withLocalP2PConnections workerpool a = do + myuuid <- getUUID reqv <- liftIO newEmptyTMVarIO relv <- liftIO newEmptyTMVarIO - asyncservicer <- liftIO . async =<< forkState (servicer reqv relv) + asyncservicer <- liftIO $ async $ servicer myuuid reqv relv a (acquireconn reqv) `finally` join (liftIO (wait asyncservicer)) where acquireconn reqv connparams = do @@ -162,7 +158,7 @@ withLocalP2PConnections a = do atomically $ putTMVar reqv (connparams, respvar) atomically $ takeTMVar respvar - servicer reqv relv = do + servicer myuuid reqv relv = do reqrel <- liftIO $ atomically $ (Right <$> takeTMVar reqv) @@ -170,54 +166,61 @@ withLocalP2PConnections a = do (Left <$> takeTMVar relv) case reqrel of Right (connparams, respvar) -> - servicereq relv connparams respvar + servicereq myuuid relv connparams + >>= atomically . putTMVar respvar Left releaseconn -> releaseconn - servicer reqv relv + servicer myuuid reqv relv - servicereq relv connparams respvar = do - myuuid <- getUUID - resp <- if connectionServerUUID connparams /= myuuid - then return $ Left $ ConnectionFailed "unknown uuid" - else do - hdl1 <- liftIO newEmptyTMVarIO - hdl2 <- liftIO newEmptyTMVarIO - wait1 <- liftIO newEmptyTMVarIO - wait2 <- liftIO newEmptyTMVarIO - let h1 = P2PHandleTMVar hdl1 wait1 - let h2 = P2PHandleTMVar hdl2 wait2 - let serverconn = P2PConnection Nothing - (const True) h1 h2 - (ConnIdent (Just "http server")) - let clientconn = P2PConnection Nothing - (const True) h2 h1 - (ConnIdent (Just "http client")) - clientrunst <- liftIO $ mkclientrunst connparams - serverrunst <- liftIO $ mkserverrunst connparams - let server = P2P.serveOneCommandAuthed - (connectionServerMode connparams) - (connectionServerUUID connparams) - let protorunner = void $ - runFullProto serverrunst serverconn server - asyncworker <- liftIO . async - =<< forkState protorunner - let releaseconn = atomically $ putTMVar relv $ - join (liftIO (wait asyncworker)) - return $ Right $ P2PConnectionPair - { clientRunState = clientrunst - , clientP2PConnection = clientconn - , serverP2PConnection = serverconn - , releaseP2PConnection = releaseconn - } - liftIO $ atomically $ putTMVar respvar resp + servicereq myuuid relv connparams + | connectionServerUUID connparams /= myuuid = + return $ Left $ ConnectionFailed "unknown uuid" + | otherwise = mkP2PConnectionPair connparams relv $ + \serverrunst serverconn -> inAnnexWorker' workerpool $ + void $ runFullProto serverrunst serverconn $ + P2P.serveOneCommandAuthed + (connectionServerMode connparams) + (connectionServerUUID connparams) - mkserverrunst connparams = do +mkP2PConnectionPair + :: ConnectionParams + -> TMVar (IO ()) + -> (RunState -> P2PConnection -> IO (Either SomeException ())) + -> IO (Either ConnectionProblem P2PConnectionPair) +mkP2PConnectionPair connparams relv startworker = do + hdl1 <- newEmptyTMVarIO + hdl2 <- newEmptyTMVarIO + wait1 <- newEmptyTMVarIO + wait2 <- newEmptyTMVarIO + let h1 = P2PHandleTMVar hdl1 wait1 + let h2 = P2PHandleTMVar hdl2 wait2 + let serverconn = P2PConnection Nothing + (const True) h1 h2 + (ConnIdent (Just "http server")) + let clientconn = P2PConnection Nothing + (const True) h2 h1 + (ConnIdent (Just "http client")) + clientrunst <- mkclientrunst + serverrunst <- mkserverrunst + asyncworker <- async $ + startworker serverrunst serverconn + let releaseconn = atomically $ putTMVar relv $ + liftIO $ wait asyncworker + >>= either throwM return + return $ Right $ P2PConnectionPair + { clientRunState = clientrunst + , clientP2PConnection = clientconn + , serverP2PConnection = serverconn + , releaseP2PConnection = releaseconn + } + where + mkserverrunst = do prototvar <- newTVarIO $ connectionProtocolVersion connparams mkRunState $ const $ Serving (connectionClientUUID connparams) Nothing prototvar - mkclientrunst connparams = do + mkclientrunst = do prototvar <- newTVarIO $ connectionProtocolVersion connparams mkRunState $ const $ Client prototvar @@ -285,25 +288,27 @@ getAnnexWorkerPool a = startConcurrency transferStages $ Just wp -> a wp inAnnexWorker :: P2PHttpServerState -> Annex a -> IO (Either SomeException a) -inAnnexWorker st annexaction = do - (workerstrd, workerstage) <- atomically $ - waitStartWorkerSlot (annexWorkerPool st) +inAnnexWorker st = inAnnexWorker' (annexWorkerPool st) + +inAnnexWorker' :: AnnexWorkerPool -> Annex a -> IO (Either SomeException a) +inAnnexWorker' poolv annexaction = do + (workerstrd, workerstage) <- atomically $ waitStartWorkerSlot poolv resv <- newEmptyTMVarIO aid <- async $ do (res, strd) <- Annex.run workerstrd annexaction atomically $ putTMVar resv res return strd atomically $ do - pool <- takeTMVar (annexWorkerPool st) + pool <- takeTMVar poolv let !pool' = addWorkerPool (ActiveWorker aid workerstage) pool - putTMVar (annexWorkerPool st) pool' + putTMVar poolv pool' (res, workerstrd') <- waitCatch aid >>= \case Right strd -> do r <- atomically $ takeTMVar resv return (Right r, strd) Left err -> return (Left err, workerstrd) atomically $ do - pool <- takeTMVar (annexWorkerPool st) + pool <- takeTMVar poolv let !pool' = deactivateWorker pool aid workerstrd' - putTMVar (annexWorkerPool st) pool' + putTMVar poolv pool' return res diff --git a/doc/todo/git-annex_proxies.mdwn b/doc/todo/git-annex_proxies.mdwn index 3ee8d45e06..4d18be1605 100644 --- a/doc/todo/git-annex_proxies.mdwn +++ b/doc/todo/git-annex_proxies.mdwn @@ -40,8 +40,6 @@ Planned schedule of work: * Make Remote.Git use http client when annex.url is configured. -* withLocalP2PConnections could use a worker pool. - * Make http server support proxies and clusters. * Perhaps: Support cgi program that proxies over to a webserver