From c2f612292ac8166ef6bc170947984bba531d5d70 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Fri, 2 Apr 2021 15:26:21 -0400 Subject: [PATCH] start splitting out readonly values from AnnexState Values in AnnexRead can be read more efficiently, without MVar overhead. Only a few things have been moved into there, and the performance increase so far is not likely to be noticable. This is groundwork for putting more stuff in there, particularly a value that indicates if debugging is enabled. The obvious next step is to change option parsing to not run in the Annex monad to set values in AnnexState, and instead return a pure value that gets stored in AnnexRead. --- Annex.hs | 169 +++++++++++++++++-------------- Annex/Action.hs | 2 +- Annex/Concurrent.hs | 7 +- Annex/Link.hs | 2 +- Annex/Ssh.hs | 2 +- Annex/Transfer.hs | 2 +- Annex/TransferrerPool.hs | 6 +- Annex/WorkerPool.hs | 12 +-- Assistant/TransferSlots.hs | 4 +- Assistant/Types/ThreadedMonad.hs | 10 +- CmdLine/Action.hs | 22 ++-- Command/TestRemote.hs | 6 +- Database/Keys.hs | 6 +- Remote/Git.hs | 20 ++-- RemoteDaemon/Common.hs | 10 +- RemoteDaemon/Core.hs | 13 +-- RemoteDaemon/Transport/GCrypt.hs | 2 +- RemoteDaemon/Transport/Ssh.hs | 4 +- RemoteDaemon/Transport/Tor.hs | 6 +- RemoteDaemon/Types.hs | 4 +- 20 files changed, 169 insertions(+), 140 deletions(-) diff --git a/Annex.hs b/Annex.hs index 02d4fa0354..e48011bba9 100644 --- a/Annex.hs +++ b/Annex.hs @@ -10,10 +10,12 @@ module Annex ( Annex, AnnexState(..), + AnnexRead(..), new, run, eval, makeRunner, + getRead, getState, changeState, withState, @@ -88,18 +90,18 @@ import qualified Data.Map.Strict as M import qualified Data.Set as S import Data.Time.Clock.POSIX -{- git-annex's monad is a ReaderT around an AnnexState stored in a MVar. - - The MVar is not exposed outside this module. +{- git-annex's monad is a ReaderT around an AnnexState stored in a MVar, + - and an AnnexRead. The MVar is not exposed outside this module. - - Note that when an Annex action fails and the exception is caught, - any changes the action has made to the AnnexState are retained, - due to the use of the MVar to store the state. -} -newtype Annex a = Annex { runAnnex :: ReaderT (MVar AnnexState) IO a } +newtype Annex a = Annex { runAnnex :: ReaderT (MVar AnnexState, AnnexRead) IO a } deriving ( Monad, MonadIO, - MonadReader (MVar AnnexState), + MonadReader (MVar AnnexState, AnnexRead), MonadCatch, MonadThrow, MonadMask, @@ -109,7 +111,34 @@ newtype Annex a = Annex { runAnnex :: ReaderT (MVar AnnexState) IO a } Alternative ) --- internal state storage +-- Values that can be read, but not modified by an Annex action. +data AnnexRead = AnnexRead + { activekeys :: TVar (M.Map Key ThreadId) + , activeremotes :: MVar (M.Map (Types.Remote.RemoteA Annex) Integer) + , keysdbhandle :: Keys.DbHandle + , sshstalecleaned :: TMVar Bool + , signalactions :: TVar (M.Map SignalAction (Int -> IO ())) + , transferrerpool :: TransferrerPool + } + +newAnnexRead :: IO AnnexRead +newAnnexRead = do + emptyactivekeys <- newTVarIO M.empty + emptyactiveremotes <- newMVar M.empty + kh <- Keys.newDbHandle + sc <- newTMVarIO False + si <- newTVarIO M.empty + tp <- newTransferrerPool + return $ AnnexRead + { activekeys = emptyactivekeys + , activeremotes = emptyactiveremotes + , keysdbhandle = kh + , sshstalecleaned = sc + , signalactions = si + , transferrerpool = tp + } + +-- Values that can change while running an Annex action. data AnnexState = AnnexState { repo :: Git.Repo , repoadjustment :: (Git.Repo -> IO Git.Repo) @@ -125,7 +154,6 @@ data AnnexState = AnnexState , fast :: Bool , daemon :: Bool , branchstate :: BranchState - , getvectorclock :: IO VectorClock , repoqueue :: Maybe (Git.Queue.Queue Annex) , catfilehandles :: CatFileHandles , hashobjecthandle :: Maybe HashObjectHandle @@ -147,11 +175,9 @@ data AnnexState = AnnexState , groupmap :: Maybe GroupMap , ciphers :: M.Map StorableCipher Cipher , lockcache :: LockCache - , sshstalecleaned :: TMVar Bool , flags :: M.Map String Bool , fields :: M.Map String String , cleanupactions :: M.Map CleanupAction (Annex ()) - , signalactions :: TVar (M.Map SignalAction (Int -> IO ())) , sentinalstatus :: Maybe SentinalStatus , useragent :: Maybe String , errcounter :: Integer @@ -160,26 +186,17 @@ data AnnexState = AnnexState , tempurls :: M.Map Key URLString , existinghooks :: M.Map Git.Hook.Hook Bool , desktopnotify :: DesktopNotify - , workers :: Maybe (TMVar (WorkerPool AnnexState)) - , activekeys :: TVar (M.Map Key ThreadId) - , activeremotes :: MVar (M.Map (Types.Remote.RemoteA Annex) Integer) - , keysdbhandle :: Keys.DbHandle + , workers :: Maybe (TMVar (WorkerPool (AnnexState, AnnexRead))) , cachedcurrentbranch :: (Maybe (Maybe Git.Branch, Maybe Adjustment)) , cachedgitenv :: Maybe (AltIndexFile, FilePath, [(String, String)]) , urloptions :: Maybe UrlOptions , insmudgecleanfilter :: Bool - , transferrerpool :: TransferrerPool + , getvectorclock :: IO VectorClock } -newState :: GitConfig -> Git.Repo -> IO AnnexState -newState c r = do - emptyactiveremotes <- newMVar M.empty - emptyactivekeys <- newTVarIO M.empty - si <- newTVarIO M.empty +newAnnexState :: GitConfig -> Git.Repo -> IO AnnexState +newAnnexState c r = do o <- newMessageState - sc <- newTMVarIO False - kh <- Keys.newDbHandle - tp <- newTransferrerPool vc <- startVectorClock return $ AnnexState { repo = r @@ -196,7 +213,6 @@ newState c r = do , fast = False , daemon = False , branchstate = startBranchState - , getvectorclock = vc , repoqueue = Nothing , catfilehandles = catFileHandlesNonConcurrent , hashobjecthandle = Nothing @@ -218,11 +234,9 @@ newState c r = do , groupmap = Nothing , ciphers = M.empty , lockcache = M.empty - , sshstalecleaned = sc , flags = M.empty , fields = M.empty , cleanupactions = M.empty - , signalactions = si , sentinalstatus = Nothing , useragent = Nothing , errcounter = 0 @@ -232,91 +246,95 @@ newState c r = do , existinghooks = M.empty , desktopnotify = mempty , workers = Nothing - , activekeys = emptyactivekeys - , activeremotes = emptyactiveremotes - , keysdbhandle = kh , cachedcurrentbranch = Nothing , cachedgitenv = Nothing , urloptions = Nothing , insmudgecleanfilter = False - , transferrerpool = tp + , getvectorclock = vc } {- Makes an Annex state object for the specified git repo. - Ensures the config is read, if it was not already, and performs - any necessary git repo fixups. -} -new :: Git.Repo -> IO AnnexState +new :: Git.Repo -> IO (AnnexState, AnnexRead) new r = do r' <- Git.Config.read r let c = extractGitConfig FromGitConfig r' - newState c =<< fixupRepo r' c + st <- newAnnexState c =<< fixupRepo r' c + rd <- newAnnexRead + return (st, rd) {- Performs an action in the Annex monad from a starting state, - returning a new state. -} -run :: AnnexState -> Annex a -> IO (a, AnnexState) -run s a = flip run' a =<< newMVar s +run :: (AnnexState, AnnexRead) -> Annex a -> IO (a, (AnnexState, AnnexRead)) +run (st, rd) a = do + mv <- newMVar st + run' mv rd a -run' :: MVar AnnexState -> Annex a -> IO (a, AnnexState) -run' mvar a = do - r <- runReaderT (runAnnex a) mvar - `onException` (flush =<< readMVar mvar) - s' <- takeMVar mvar - flush s' - return (r, s') +run' :: MVar AnnexState -> AnnexRead -> Annex a -> IO (a, (AnnexState, AnnexRead)) +run' mvar rd a = do + r <- runReaderT (runAnnex a) (mvar, rd) + `onException` (flush rd) + flush rd + st <- takeMVar mvar + return (r, (st, rd)) where flush = Keys.flushDbQueue . keysdbhandle {- Performs an action in the Annex monad from a starting state, - - and throws away the new state. -} -eval :: AnnexState -> Annex a -> IO a -eval s a = fst <$> run s a + - and throws away the changed state. -} +eval :: (AnnexState, AnnexRead) -> Annex a -> IO a +eval v a = fst <$> run v a {- Makes a runner action, that allows diving into IO and from inside - the IO action, running an Annex action. -} makeRunner :: Annex (Annex a -> IO a) makeRunner = do - mvar <- ask + (mvar, rd) <- ask return $ \a -> do - (r, s) <- run' mvar a + (r, (s, _rd)) <- run' mvar rd a putMVar mvar s return r +getRead :: (AnnexRead -> v) -> Annex v +getRead selector = selector . snd <$> ask + getState :: (AnnexState -> v) -> Annex v getState selector = do - mvar <- ask - s <- liftIO $ readMVar mvar - return $ selector s + mvar <- fst <$> ask + st <- liftIO $ readMVar mvar + return $ selector st changeState :: (AnnexState -> AnnexState) -> Annex () changeState modifier = do - mvar <- ask + mvar <- fst <$> ask liftIO $ modifyMVar_ mvar $ return . modifier withState :: (AnnexState -> IO (AnnexState, b)) -> Annex b withState modifier = do - mvar <- ask + mvar <- fst <$> ask liftIO $ modifyMVar mvar modifier {- Sets a flag to True -} setFlag :: String -> Annex () -setFlag flag = changeState $ \s -> - s { flags = M.insert flag True $ flags s } +setFlag flag = changeState $ \st -> + st { flags = M.insert flag True $ flags st } {- Sets a field to a value -} setField :: String -> String -> Annex () -setField field value = changeState $ \s -> - s { fields = M.insert field value $ fields s } +setField field value = changeState $ \st -> + st { fields = M.insert field value $ fields st } {- Adds a cleanup action to perform. -} addCleanupAction :: CleanupAction -> Annex () -> Annex () -addCleanupAction k a = changeState $ \s -> - s { cleanupactions = M.insert k a $ cleanupactions s } +addCleanupAction k a = changeState $ \st -> + st { cleanupactions = M.insert k a $ cleanupactions st } {- Sets the type of output to emit. -} setOutput :: OutputType -> Annex () -setOutput o = changeState $ \s -> - let m = output s - in s { output = m { outputType = adjustOutputType (outputType m) o } } +setOutput o = changeState $ \st -> + let m = output st + in st { output = m { outputType = adjustOutputType (outputType m) o } } {- Checks if a flag was set. -} getFlag :: String -> Annex Bool @@ -351,9 +369,9 @@ getGitConfig = getState gitconfig {- Overrides a GitConfig setting. The modification persists across - reloads of the repo's config. -} overrideGitConfig :: (GitConfig -> GitConfig) -> Annex () -overrideGitConfig f = changeState $ \s -> s - { gitconfigadjustment = gitconfigadjustment s . f - , gitconfig = f (gitconfig s) +overrideGitConfig f = changeState $ \st -> st + { gitconfigadjustment = gitconfigadjustment st . f + , gitconfig = f (gitconfig st) } {- Adds an adjustment to the Repo data. Adjustments persist across reloads @@ -364,7 +382,7 @@ overrideGitConfig f = changeState $ \s -> s -} adjustGitRepo :: (Git.Repo -> IO Git.Repo) -> Annex () adjustGitRepo a = do - changeState $ \s -> s { repoadjustment = \r -> repoadjustment s r >>= a } + changeState $ \st -> st { repoadjustment = \r -> repoadjustment st r >>= a } changeGitRepo =<< gitRepo {- Adds git config setting, like "foo=bar". It will be passed with -c @@ -375,7 +393,7 @@ addGitConfigOverride v = do adjustGitRepo $ \r -> Git.Config.store (encodeBS' v) Git.Config.ConfigList $ r { Git.gitGlobalOpts = go (Git.gitGlobalOpts r) } - changeState $ \s -> s { gitconfigoverride = v : gitconfigoverride s } + changeState $ \st -> st { gitconfigoverride = v : gitconfigoverride st } where -- Remove any prior occurrance of the setting to avoid -- building up many of them when the adjustment is run repeatedly, @@ -394,7 +412,7 @@ changeGitRepo r = do repoadjuster <- getState repoadjustment gitconfigadjuster <- getState gitconfigadjustment r' <- liftIO $ repoadjuster r - changeState $ \s -> s + changeState $ \st -> st { repo = r' , gitconfig = gitconfigadjuster $ extractGitConfig FromGitConfig r' @@ -414,8 +432,9 @@ getRemoteGitConfig r = do - state, as it will be thrown away. -} withCurrentState :: Annex a -> Annex (IO a) withCurrentState a = do - s <- getState id - return $ eval s a + (mvar, rd) <- ask + st <- liftIO $ readMVar mvar + return $ eval (st, rd) a {- It's not safe to use setCurrentDirectory in the Annex monad, - because the git repo paths are stored relative. @@ -426,20 +445,20 @@ changeDirectory d = do r <- liftIO . Git.adjustPath absPath =<< gitRepo liftIO $ setCurrentDirectory d r' <- liftIO $ Git.relPath r - changeState $ \s -> s { repo = r' } + changeState $ \st -> st { repo = r' } incError :: Annex () -incError = changeState $ \s -> - let !c = errcounter s + 1 - !s' = s { errcounter = c } - in s' +incError = changeState $ \st -> + let !c = errcounter st + 1 + !st' = st { errcounter = c } + in st' getGitRemotes :: Annex [Git.Repo] getGitRemotes = do - s <- getState id - case gitremotes s of + st <- getState id + case gitremotes st of Just rs -> return rs Nothing -> do - rs <- liftIO $ Git.Construct.fromRemotes (repo s) - changeState $ \s' -> s' { gitremotes = Just rs } + rs <- liftIO $ Git.Construct.fromRemotes (repo st) + changeState $ \st' -> st' { gitremotes = Just rs } return rs diff --git a/Annex/Action.hs b/Annex/Action.hs index f23a564194..c6f1c47583 100644 --- a/Annex/Action.hs +++ b/Annex/Action.hs @@ -52,7 +52,7 @@ verifiedAction a = tryNonAsync a >>= \case startup :: Annex () startup = do #ifndef mingw32_HOST_OS - av <- Annex.getState Annex.signalactions + av <- Annex.getRead Annex.signalactions let propagate sig = liftIO $ installhandleronce sig av propagate sigINT propagate sigQUIT diff --git a/Annex/Concurrent.hs b/Annex/Concurrent.hs index 9314554322..f341e21d0d 100644 --- a/Annex/Concurrent.hs +++ b/Annex/Concurrent.hs @@ -55,9 +55,10 @@ setConcurrency' c f = do -} forkState :: Annex a -> Annex (IO (Annex a)) forkState a = do + rd <- Annex.getRead id st <- dupState return $ do - (ret, newst) <- run st a + (ret, (newst, _rd)) <- run (st, rd) a return $ do mergeState newst return ret @@ -90,7 +91,9 @@ dupState = do - Also closes various handles in it. -} mergeState :: AnnexState -> Annex () mergeState st = do - st' <- liftIO $ snd <$> run st stopNonConcurrentSafeCoProcesses + rd <- Annex.getRead id + st' <- liftIO $ (fst . snd) + <$> run (st, rd) stopNonConcurrentSafeCoProcesses forM_ (M.toList $ Annex.cleanupactions st') $ uncurry addCleanupAction Annex.Queue.mergeFrom st' diff --git a/Annex/Link.hs b/Annex/Link.hs index 5b840bf730..5853eb0003 100644 --- a/Annex/Link.hs +++ b/Annex/Link.hs @@ -204,7 +204,7 @@ restagePointerFile (Restage True) f orig = withTSDelta $ \tsd -> runner :: Git.Queue.InternalActionRunner Annex runner = Git.Queue.InternalActionRunner "restagePointerFile" $ \r l -> do liftIO . Database.Keys.Handle.flushDbQueue - =<< Annex.getState Annex.keysdbhandle + =<< Annex.getRead Annex.keysdbhandle realindex <- liftIO $ Git.Index.currentIndexFile r let lock = fromRawFilePath (Git.Index.indexFileLock realindex) lockindex = liftIO $ catchMaybeIO $ Git.LockFile.openLock' lock diff --git a/Annex/Ssh.hs b/Annex/Ssh.hs index 48578a4680..50d5248852 100644 --- a/Annex/Ssh.hs +++ b/Annex/Ssh.hs @@ -219,7 +219,7 @@ prepSocket socketfile sshhost sshparams = do -- from a previous git-annex run that was interrupted. -- This must run only once, before we have made any ssh connection, -- and any other prepSocket calls must block while it's run. - tv <- Annex.getState Annex.sshstalecleaned + tv <- Annex.getRead Annex.sshstalecleaned join $ liftIO $ atomically $ do cleaned <- takeTMVar tv if cleaned diff --git a/Annex/Transfer.hs b/Annex/Transfer.hs index bf7494eed3..76d93df00f 100644 --- a/Annex/Transfer.hs +++ b/Annex/Transfer.hs @@ -371,7 +371,7 @@ pickRemote l a = debugLocks $ go l =<< getConcurrency else gononconcurrent rs goconcurrent rs = do - mv <- Annex.getState Annex.activeremotes + mv <- Annex.getRead Annex.activeremotes active <- liftIO $ takeMVar mv let rs' = sortBy (lessActiveFirst active) rs goconcurrent' mv active rs' diff --git a/Annex/TransferrerPool.hs b/Annex/TransferrerPool.hs index c6fa40ca7a..7ccb49b8dd 100644 --- a/Annex/TransferrerPool.hs +++ b/Annex/TransferrerPool.hs @@ -50,9 +50,9 @@ mkRunTransferrer batchmaker = RunTransferrer withTransferrer :: (Transferrer -> Annex a) -> Annex a withTransferrer a = do rt <- mkRunTransferrer nonBatchCommandMaker - pool <- Annex.getState Annex.transferrerpool + pool <- Annex.getRead Annex.transferrerpool let nocheck = pure (pure True) - signalactonsvar <- Annex.getState Annex.signalactions + signalactonsvar <- Annex.getRead Annex.signalactions withTransferrer' False signalactonsvar nocheck rt pool a withTransferrer' @@ -279,7 +279,7 @@ killTransferrer t = do {- Stop all transferrers in the pool. -} emptyTransferrerPool :: Annex () emptyTransferrerPool = do - poolvar <- Annex.getState Annex.transferrerpool + poolvar <- Annex.getRead Annex.transferrerpool pool <- liftIO $ atomically $ swapTVar poolvar [] liftIO $ forM_ pool $ \case TransferrerPoolItem (Just t) _ -> transferrerShutdown t diff --git a/Annex/WorkerPool.hs b/Annex/WorkerPool.hs index 8d6ddcd835..9f2b7f872f 100644 --- a/Annex/WorkerPool.hs +++ b/Annex/WorkerPool.hs @@ -60,7 +60,7 @@ enteringInitialStage = Annex.getState Annex.workers >>= \case - in the pool than spareVals. That does not prevent other threads that call - this from using them though, so it's fine. -} -changeStageTo :: ThreadId -> TMVar (WorkerPool AnnexState) -> (UsedStages -> WorkerStage) -> Annex (Maybe WorkerStage) +changeStageTo :: ThreadId -> TMVar (WorkerPool t) -> (UsedStages -> WorkerStage) -> Annex (Maybe WorkerStage) changeStageTo mytid tv getnewstage = liftIO $ replaceidle >>= maybe (return Nothing) @@ -99,11 +99,11 @@ changeStageTo mytid tv getnewstage = liftIO $ -- removes it from the pool, and returns its state. -- -- If the worker pool is not already allocated, returns Nothing. -waitStartWorkerSlot :: TMVar (WorkerPool Annex.AnnexState) -> STM (Maybe (Annex.AnnexState, WorkerStage)) +waitStartWorkerSlot :: TMVar (WorkerPool t) -> STM (Maybe (t, WorkerStage)) waitStartWorkerSlot tv = do pool <- takeTMVar tv - st <- go pool - return $ Just (st, StartStage) + v <- go pool + return $ Just (v, StartStage) where go pool = case spareVals pool of [] -> retry @@ -112,10 +112,10 @@ waitStartWorkerSlot tv = do putTMVar tv =<< waitIdleWorkerSlot StartStage pool' return v -waitIdleWorkerSlot :: WorkerStage -> WorkerPool Annex.AnnexState -> STM (WorkerPool Annex.AnnexState) +waitIdleWorkerSlot :: WorkerStage -> WorkerPool t -> STM (WorkerPool t) waitIdleWorkerSlot wantstage = maybe retry return . getIdleWorkerSlot wantstage -getIdleWorkerSlot :: WorkerStage -> WorkerPool Annex.AnnexState -> Maybe (WorkerPool Annex.AnnexState) +getIdleWorkerSlot :: WorkerStage -> WorkerPool t -> Maybe (WorkerPool t) getIdleWorkerSlot wantstage pool = do l <- findidle [] (workerList pool) return $ pool { workerList = l } diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs index 4ff870b6c1..add60706d6 100644 --- a/Assistant/TransferSlots.hs +++ b/Assistant/TransferSlots.hs @@ -91,9 +91,9 @@ runTransferThread' mkcheck rt d run = go where go = catchPauseResume $ do p <- runAssistant d $ liftAnnex $ - Annex.getState Annex.transferrerpool + Annex.getRead Annex.transferrerpool signalactonsvar <- runAssistant d $ liftAnnex $ - Annex.getState Annex.signalactions + Annex.getRead Annex.signalactions withTransferrer' True signalactonsvar mkcheck rt p run pause = catchPauseResume $ runEvery (Seconds 86400) noop diff --git a/Assistant/Types/ThreadedMonad.hs b/Assistant/Types/ThreadedMonad.hs index aeb95a8e16..4c07317807 100644 --- a/Assistant/Types/ThreadedMonad.hs +++ b/Assistant/Types/ThreadedMonad.hs @@ -15,7 +15,7 @@ import Data.Tuple {- The Annex state is stored in a MVar, so that threaded actions can access - it. -} -type ThreadState = MVar Annex.AnnexState +type ThreadState = MVar (Annex.AnnexState, Annex.AnnexRead) {- Stores the Annex state in a MVar. - @@ -24,9 +24,10 @@ type ThreadState = MVar Annex.AnnexState withThreadState :: (ThreadState -> Annex a) -> Annex a withThreadState a = do state <- Annex.getState id - mvar <- liftIO $ newMVar state + rd <- Annex.getRead id + mvar <- liftIO $ newMVar (state, rd) r <- a mvar - newstate <- liftIO $ takeMVar mvar + newstate <- liftIO $ fst <$> takeMVar mvar Annex.changeState (const newstate) return r @@ -35,4 +36,5 @@ withThreadState a = do - This serializes calls by threads; only one thread can run in Annex at a - time. -} runThreadState :: ThreadState -> Annex a -> IO a -runThreadState mvar a = modifyMVar mvar $ \state -> swap <$> Annex.run state a +runThreadState mvar a = modifyMVar mvar $ \v -> swap <$> Annex.run v a + diff --git a/CmdLine/Action.hs b/CmdLine/Action.hs index 61a2935bd5..1e18490193 100644 --- a/CmdLine/Action.hs +++ b/CmdLine/Action.hs @@ -68,9 +68,10 @@ commandAction start = getConcurrency >>= \case Just tv -> liftIO (atomically (waitStartWorkerSlot tv)) >>= maybe runnonconcurrent (runconcurrent' tv) - runconcurrent' tv (workerst, workerstage) = do - aid <- liftIO $ async $ snd <$> Annex.run workerst - (concurrentjob workerst) + runconcurrent' tv (workerstrd, workerstage) = do + aid <- liftIO $ async $ snd + <$> Annex.run workerstrd + (concurrentjob (fst workerstrd)) liftIO $ atomically $ do pool <- takeTMVar tv let !pool' = addWorkerPool (ActiveWorker aid workerstage) pool @@ -78,12 +79,12 @@ commandAction start = getConcurrency >>= \case void $ liftIO $ forkIO $ debugLocks $ do -- accountCommandAction will usually catch -- exceptions. Just in case, fall back to the - -- original workerst. - workerst' <- either (const workerst) id + -- original workerstrd. + workerstrd' <- either (const workerstrd) id <$> waitCatch aid atomically $ do pool <- takeTMVar tv - let !pool' = deactivateWorker pool aid workerst' + let !pool' = deactivateWorker pool aid workerstrd' putTMVar tv pool' concurrentjob workerst = start >>= \case @@ -133,12 +134,12 @@ finishCommandActions = Annex.getState Annex.workers >>= \case Nothing -> noop Just tv -> do Annex.changeState $ \s -> s { Annex.workers = Nothing } - sts <- liftIO $ atomically $ do + vs <- liftIO $ atomically $ do pool <- readTMVar tv if allIdle pool then return (spareVals pool) else retry - mapM_ mergeState sts + mapM_ (mergeState . fst) vs {- Waits for all worker threads that have been started so far to finish. -} waitForAllRunningCommandActions :: Annex () @@ -254,8 +255,9 @@ startConcurrency usedstages a = do Annex.changeState $ \s -> s { Annex.workers = Just tv } prepDupState st <- dupState + rd <- Annex.getRead id liftIO $ atomically $ putTMVar tv $ - allocateWorkerPool st (max n 1) usedstages + allocateWorkerPool (st, rd) (max n 1) usedstages -- Make sure that some expensive actions have been done before -- starting threads. This way the state has them already run, @@ -277,7 +279,7 @@ ensureOnlyActionOn k a = debugLocks $ go (Concurrent _) = goconcurrent go ConcurrentPerCpu = goconcurrent goconcurrent = do - tv <- Annex.getState Annex.activekeys + tv <- Annex.getRead Annex.activekeys bracket (setup tv) id (const a) setup tv = liftIO $ do mytid <- myThreadId diff --git a/Command/TestRemote.hs b/Command/TestRemote.hs index 21b38dc248..59a8d24784 100644 --- a/Command/TestRemote.hs +++ b/Command/TestRemote.hs @@ -102,7 +102,9 @@ start o = starting "testremote" (ActionItemOther (Just (testRemote o))) si $ do perform :: [Described (Annex (Maybe Remote))] -> Maybe Remote -> Annex (Maybe Remote) -> [Key] -> CommandPerform perform drs unavailr exportr ks = do - st <- liftIO . newTVarIO =<< Annex.getState id + st <- liftIO . newTVarIO =<< (,) + <$> Annex.getState id + <*> Annex.getRead id let tests = testGroup "Remote Tests" $ mkTestTrees (runTestCase st) drs @@ -198,7 +200,7 @@ data Described t = Described type RunAnnex = forall a. Annex a -> IO a -runTestCase :: TVar Annex.AnnexState -> RunAnnex +runTestCase :: TVar (Annex.AnnexState, Annex.AnnexRead) -> RunAnnex runTestCase stv a = do st <- atomically $ readTVar stv (r, st') <- Annex.run st $ do diff --git a/Database/Keys.hs b/Database/Keys.hs index 337f232a36..ebaee739a5 100644 --- a/Database/Keys.hs +++ b/Database/Keys.hs @@ -63,7 +63,7 @@ import qualified System.FilePath.ByteString as P -} runReader :: Monoid v => (SQL.ReadHandle -> Annex v) -> Annex v runReader a = do - h <- Annex.getState Annex.keysdbhandle + h <- Annex.getRead Annex.keysdbhandle withDbState h go where go DbUnavailable = return (mempty, DbUnavailable) @@ -87,7 +87,7 @@ runReaderIO a = runReader (liftIO . a) - The database is created if it doesn't exist yet. -} runWriter :: (SQL.WriteHandle -> Annex ()) -> Annex () runWriter a = do - h <- Annex.getState Annex.keysdbhandle + h <- Annex.getRead Annex.keysdbhandle withDbState h go where go st@(DbOpen qh) = do @@ -144,7 +144,7 @@ openDb createdb _ = catchPermissionDenied permerr $ withExclusiveLock gitAnnexKe - data to it. -} closeDb :: Annex () -closeDb = liftIO . closeDbHandle =<< Annex.getState Annex.keysdbhandle +closeDb = liftIO . closeDbHandle =<< Annex.getRead Annex.keysdbhandle addAssociatedFile :: Key -> TopFilePath -> Annex () addAssociatedFile k f = runWriterIO $ SQL.addAssociatedFile k f diff --git a/Remote/Git.hs b/Remote/Git.hs index b2f49593d8..fb637d856a 100644 --- a/Remote/Git.hs +++ b/Remote/Git.hs @@ -750,7 +750,7 @@ repairRemote r a = return $ do ensureInitialized a `finally` stopCoProcesses -data LocalRemoteAnnex = LocalRemoteAnnex Git.Repo (MVar (Maybe Annex.AnnexState)) +data LocalRemoteAnnex = LocalRemoteAnnex Git.Repo (MVar (Maybe (Annex.AnnexState, Annex.AnnexRead))) {- This can safely be called on a Repo that is not local, but of course - onLocal will not work if used with the result. -} @@ -775,20 +775,20 @@ onLocalRepo repo a = do onLocal' lra a onLocal' :: LocalRemoteAnnex -> Annex a -> Annex a -onLocal' (LocalRemoteAnnex repo v) a = liftIO (takeMVar v) >>= \case +onLocal' (LocalRemoteAnnex repo mv) a = liftIO (takeMVar mv) >>= \case Nothing -> do - st <- liftIO $ Annex.new repo - go (st, ensureInitialized >> a) - Just st -> go (st, a) + v <- liftIO $ Annex.new repo + go (v, ensureInitialized >> a) + Just v -> go (v, a) where - go (st, a') = do + go ((st, rd), a') = do curro <- Annex.getState Annex.output - let act = Annex.run (st { Annex.output = curro }) $ + let act = Annex.run (st { Annex.output = curro }, rd) $ a' `finally` stopCoProcesses - (ret, st') <- liftIO $ act `onException` cache st - liftIO $ cache st' + (ret, (st', _rd)) <- liftIO $ act `onException` cache (st, rd) + liftIO $ cache (st', rd) return ret - cache st = putMVar v (Just st) + cache = putMVar mv . Just {- Faster variant of onLocal. - diff --git a/RemoteDaemon/Common.hs b/RemoteDaemon/Common.hs index 1af54e18e8..e7d83f8b03 100644 --- a/RemoteDaemon/Common.hs +++ b/RemoteDaemon/Common.hs @@ -26,14 +26,14 @@ import Control.Concurrent -- since only one liftAnnex can be running at a time, across all -- transports. liftAnnex :: TransportHandle -> Annex a -> IO a -liftAnnex (TransportHandle _ annexstate) a = do - st <- takeMVar annexstate - (r, st') <- Annex.run st a - putMVar annexstate st' +liftAnnex (TransportHandle _ stmv rd) a = do + st <- takeMVar stmv + (r, (st', _rd)) <- Annex.run (st, rd) a + putMVar stmv st' return r inLocalRepo :: TransportHandle -> (Git.Repo -> IO a) -> IO a -inLocalRepo (TransportHandle (LocalRepo g) _) a = a g +inLocalRepo (TransportHandle (LocalRepo g) _ _) a = a g -- Check if some shas should be fetched from the remote, -- and presumably later merged. diff --git a/RemoteDaemon/Core.hs b/RemoteDaemon/Core.hs index 054bce0aa7..ce29f7f7cc 100644 --- a/RemoteDaemon/Core.hs +++ b/RemoteDaemon/Core.hs @@ -139,7 +139,7 @@ runController ichan ochan = do -- Generates a map with a transport for each supported remote in the git repo, -- except those that have annex.sync = false genRemoteMap :: TransportHandle -> TChan Emitted -> IO RemoteMap -genRemoteMap h@(TransportHandle (LocalRepo g) _) ochan = do +genRemoteMap h@(TransportHandle (LocalRepo g) _ _) ochan = do rs <- Git.Construct.fromRemotes g M.fromList . catMaybes <$> mapM gen rs where @@ -161,17 +161,18 @@ genRemoteMap h@(TransportHandle (LocalRepo g) _) ochan = do genTransportHandle :: IO TransportHandle genTransportHandle = do - annexstate <- newMVar =<< Annex.new =<< Git.CurrentRepo.get - g <- Annex.repo <$> readMVar annexstate - let h = TransportHandle (LocalRepo g) annexstate + (st, rd) <- Annex.new =<< Git.CurrentRepo.get + mvar <- newMVar st + let g = Annex.repo st + let h = TransportHandle (LocalRepo g) mvar rd liftAnnex h $ do Annex.setOutput QuietOutput enableInteractiveBranchAccess return h updateTransportHandle :: TransportHandle -> IO TransportHandle -updateTransportHandle h@(TransportHandle _g annexstate) = do +updateTransportHandle h@(TransportHandle _g st rd) = do g' <- liftAnnex h $ do reloadConfig Annex.gitRepo - return (TransportHandle (LocalRepo g') annexstate) + return (TransportHandle (LocalRepo g') st rd) diff --git a/RemoteDaemon/Transport/GCrypt.hs b/RemoteDaemon/Transport/GCrypt.hs index d88ce74fbd..746946638e 100644 --- a/RemoteDaemon/Transport/GCrypt.hs +++ b/RemoteDaemon/Transport/GCrypt.hs @@ -17,7 +17,7 @@ import Remote.GCrypt (accessShellConfig) import Annex.Ssh transport :: Transport -transport rr@(RemoteRepo r gc) url h@(TransportHandle (LocalRepo g) _) ichan ochan +transport rr@(RemoteRepo r gc) url h@(TransportHandle (LocalRepo g) _ _) ichan ochan | accessShellConfig gc = do r' <- encryptedRemote g r v <- liftAnnex h $ git_annex_shell ConsumeStdin r' "notifychanges" [] [] diff --git a/RemoteDaemon/Transport/Ssh.hs b/RemoteDaemon/Transport/Ssh.hs index fbfdc51a21..2926b34713 100644 --- a/RemoteDaemon/Transport/Ssh.hs +++ b/RemoteDaemon/Transport/Ssh.hs @@ -29,10 +29,10 @@ transport rr@(RemoteRepo r _) url h ichan ochan = do Just (cmd, params) -> transportUsingCmd cmd params rr url h ichan ochan transportUsingCmd :: FilePath -> [CommandParam] -> Transport -transportUsingCmd cmd params rr@(RemoteRepo r gc) url h@(TransportHandle (LocalRepo g) s) ichan ochan = do +transportUsingCmd cmd params rr@(RemoteRepo r gc) url h@(TransportHandle (LocalRepo g) st rd) ichan ochan = do -- enable ssh connection caching wherever inLocalRepo is called g' <- liftAnnex h $ sshOptionsTo r gc g - let transporthandle = TransportHandle (LocalRepo g') s + let transporthandle = TransportHandle (LocalRepo g') st rd transportUsingCmd' cmd params rr url transporthandle ichan ochan transportUsingCmd' :: FilePath -> [CommandParam] -> Transport diff --git a/RemoteDaemon/Transport/Tor.hs b/RemoteDaemon/Transport/Tor.hs index 977a29112e..8f5ca5acdc 100644 --- a/RemoteDaemon/Transport/Tor.hs +++ b/RemoteDaemon/Transport/Tor.hs @@ -39,7 +39,7 @@ import System.Posix.User -- Run tor hidden service. server :: Server -server ichan th@(TransportHandle (LocalRepo r) _) = go +server ichan th@(TransportHandle (LocalRepo r) _ _) = go where go = checkstartservice >>= handlecontrol @@ -88,7 +88,7 @@ maxConnections :: Int maxConnections = 100 serveClient :: TransportHandle -> UUID -> Repo -> TBMQueue Handle -> IO () -serveClient th u r q = bracket setup cleanup start +serveClient th@(TransportHandle _ _ rd) u r q = bracket setup cleanup start where setup = do h <- atomically $ readTBMQueue q @@ -105,7 +105,7 @@ serveClient th u r q = bracket setup cleanup start -- Avoid doing any work in the liftAnnex, since only one -- can run at a time. st <- liftAnnex th dupState - ((), st') <- Annex.run st $ do + ((), (st', _rd)) <- Annex.run (st, rd) $ do -- Load auth tokens for every connection, to notice -- when the allowed set is changed. allowed <- loadP2PAuthTokens diff --git a/RemoteDaemon/Types.hs b/RemoteDaemon/Types.hs index bccef41193..8b31ca16e9 100644 --- a/RemoteDaemon/Types.hs +++ b/RemoteDaemon/Types.hs @@ -35,12 +35,12 @@ type Server = TChan Consumed -> TransportHandle -> IO () data RemoteRepo = RemoteRepo Git.Repo RemoteGitConfig newtype LocalRepo = LocalRepo Git.Repo --- All Transports share a single AnnexState MVar +-- All Transports share a single AnnexState MVar and an AnnexRead. -- -- Different TransportHandles may have different versions of the LocalRepo. -- (For example, the ssh transport modifies it to enable ssh connection -- caching.) -data TransportHandle = TransportHandle LocalRepo (MVar Annex.AnnexState) +data TransportHandle = TransportHandle LocalRepo (MVar Annex.AnnexState) Annex.AnnexRead -- Messages that the daemon emits. data Emitted