diff --git a/Annex.hs b/Annex.hs index 02d4fa0354..e48011bba9 100644 --- a/Annex.hs +++ b/Annex.hs @@ -10,10 +10,12 @@ module Annex ( Annex, AnnexState(..), + AnnexRead(..), new, run, eval, makeRunner, + getRead, getState, changeState, withState, @@ -88,18 +90,18 @@ import qualified Data.Map.Strict as M import qualified Data.Set as S import Data.Time.Clock.POSIX -{- git-annex's monad is a ReaderT around an AnnexState stored in a MVar. - - The MVar is not exposed outside this module. +{- git-annex's monad is a ReaderT around an AnnexState stored in a MVar, + - and an AnnexRead. The MVar is not exposed outside this module. - - Note that when an Annex action fails and the exception is caught, - any changes the action has made to the AnnexState are retained, - due to the use of the MVar to store the state. -} -newtype Annex a = Annex { runAnnex :: ReaderT (MVar AnnexState) IO a } +newtype Annex a = Annex { runAnnex :: ReaderT (MVar AnnexState, AnnexRead) IO a } deriving ( Monad, MonadIO, - MonadReader (MVar AnnexState), + MonadReader (MVar AnnexState, AnnexRead), MonadCatch, MonadThrow, MonadMask, @@ -109,7 +111,34 @@ newtype Annex a = Annex { runAnnex :: ReaderT (MVar AnnexState) IO a } Alternative ) --- internal state storage +-- Values that can be read, but not modified by an Annex action. +data AnnexRead = AnnexRead + { activekeys :: TVar (M.Map Key ThreadId) + , activeremotes :: MVar (M.Map (Types.Remote.RemoteA Annex) Integer) + , keysdbhandle :: Keys.DbHandle + , sshstalecleaned :: TMVar Bool + , signalactions :: TVar (M.Map SignalAction (Int -> IO ())) + , transferrerpool :: TransferrerPool + } + +newAnnexRead :: IO AnnexRead +newAnnexRead = do + emptyactivekeys <- newTVarIO M.empty + emptyactiveremotes <- newMVar M.empty + kh <- Keys.newDbHandle + sc <- newTMVarIO False + si <- newTVarIO M.empty + tp <- newTransferrerPool + return $ AnnexRead + { activekeys = emptyactivekeys + , activeremotes = emptyactiveremotes + , keysdbhandle = kh + , sshstalecleaned = sc + , signalactions = si + , transferrerpool = tp + } + +-- Values that can change while running an Annex action. data AnnexState = AnnexState { repo :: Git.Repo , repoadjustment :: (Git.Repo -> IO Git.Repo) @@ -125,7 +154,6 @@ data AnnexState = AnnexState , fast :: Bool , daemon :: Bool , branchstate :: BranchState - , getvectorclock :: IO VectorClock , repoqueue :: Maybe (Git.Queue.Queue Annex) , catfilehandles :: CatFileHandles , hashobjecthandle :: Maybe HashObjectHandle @@ -147,11 +175,9 @@ data AnnexState = AnnexState , groupmap :: Maybe GroupMap , ciphers :: M.Map StorableCipher Cipher , lockcache :: LockCache - , sshstalecleaned :: TMVar Bool , flags :: M.Map String Bool , fields :: M.Map String String , cleanupactions :: M.Map CleanupAction (Annex ()) - , signalactions :: TVar (M.Map SignalAction (Int -> IO ())) , sentinalstatus :: Maybe SentinalStatus , useragent :: Maybe String , errcounter :: Integer @@ -160,26 +186,17 @@ data AnnexState = AnnexState , tempurls :: M.Map Key URLString , existinghooks :: M.Map Git.Hook.Hook Bool , desktopnotify :: DesktopNotify - , workers :: Maybe (TMVar (WorkerPool AnnexState)) - , activekeys :: TVar (M.Map Key ThreadId) - , activeremotes :: MVar (M.Map (Types.Remote.RemoteA Annex) Integer) - , keysdbhandle :: Keys.DbHandle + , workers :: Maybe (TMVar (WorkerPool (AnnexState, AnnexRead))) , cachedcurrentbranch :: (Maybe (Maybe Git.Branch, Maybe Adjustment)) , cachedgitenv :: Maybe (AltIndexFile, FilePath, [(String, String)]) , urloptions :: Maybe UrlOptions , insmudgecleanfilter :: Bool - , transferrerpool :: TransferrerPool + , getvectorclock :: IO VectorClock } -newState :: GitConfig -> Git.Repo -> IO AnnexState -newState c r = do - emptyactiveremotes <- newMVar M.empty - emptyactivekeys <- newTVarIO M.empty - si <- newTVarIO M.empty +newAnnexState :: GitConfig -> Git.Repo -> IO AnnexState +newAnnexState c r = do o <- newMessageState - sc <- newTMVarIO False - kh <- Keys.newDbHandle - tp <- newTransferrerPool vc <- startVectorClock return $ AnnexState { repo = r @@ -196,7 +213,6 @@ newState c r = do , fast = False , daemon = False , branchstate = startBranchState - , getvectorclock = vc , repoqueue = Nothing , catfilehandles = catFileHandlesNonConcurrent , hashobjecthandle = Nothing @@ -218,11 +234,9 @@ newState c r = do , groupmap = Nothing , ciphers = M.empty , lockcache = M.empty - , sshstalecleaned = sc , flags = M.empty , fields = M.empty , cleanupactions = M.empty - , signalactions = si , sentinalstatus = Nothing , useragent = Nothing , errcounter = 0 @@ -232,91 +246,95 @@ newState c r = do , existinghooks = M.empty , desktopnotify = mempty , workers = Nothing - , activekeys = emptyactivekeys - , activeremotes = emptyactiveremotes - , keysdbhandle = kh , cachedcurrentbranch = Nothing , cachedgitenv = Nothing , urloptions = Nothing , insmudgecleanfilter = False - , transferrerpool = tp + , getvectorclock = vc } {- Makes an Annex state object for the specified git repo. - Ensures the config is read, if it was not already, and performs - any necessary git repo fixups. -} -new :: Git.Repo -> IO AnnexState +new :: Git.Repo -> IO (AnnexState, AnnexRead) new r = do r' <- Git.Config.read r let c = extractGitConfig FromGitConfig r' - newState c =<< fixupRepo r' c + st <- newAnnexState c =<< fixupRepo r' c + rd <- newAnnexRead + return (st, rd) {- Performs an action in the Annex monad from a starting state, - returning a new state. -} -run :: AnnexState -> Annex a -> IO (a, AnnexState) -run s a = flip run' a =<< newMVar s +run :: (AnnexState, AnnexRead) -> Annex a -> IO (a, (AnnexState, AnnexRead)) +run (st, rd) a = do + mv <- newMVar st + run' mv rd a -run' :: MVar AnnexState -> Annex a -> IO (a, AnnexState) -run' mvar a = do - r <- runReaderT (runAnnex a) mvar - `onException` (flush =<< readMVar mvar) - s' <- takeMVar mvar - flush s' - return (r, s') +run' :: MVar AnnexState -> AnnexRead -> Annex a -> IO (a, (AnnexState, AnnexRead)) +run' mvar rd a = do + r <- runReaderT (runAnnex a) (mvar, rd) + `onException` (flush rd) + flush rd + st <- takeMVar mvar + return (r, (st, rd)) where flush = Keys.flushDbQueue . keysdbhandle {- Performs an action in the Annex monad from a starting state, - - and throws away the new state. -} -eval :: AnnexState -> Annex a -> IO a -eval s a = fst <$> run s a + - and throws away the changed state. -} +eval :: (AnnexState, AnnexRead) -> Annex a -> IO a +eval v a = fst <$> run v a {- Makes a runner action, that allows diving into IO and from inside - the IO action, running an Annex action. -} makeRunner :: Annex (Annex a -> IO a) makeRunner = do - mvar <- ask + (mvar, rd) <- ask return $ \a -> do - (r, s) <- run' mvar a + (r, (s, _rd)) <- run' mvar rd a putMVar mvar s return r +getRead :: (AnnexRead -> v) -> Annex v +getRead selector = selector . snd <$> ask + getState :: (AnnexState -> v) -> Annex v getState selector = do - mvar <- ask - s <- liftIO $ readMVar mvar - return $ selector s + mvar <- fst <$> ask + st <- liftIO $ readMVar mvar + return $ selector st changeState :: (AnnexState -> AnnexState) -> Annex () changeState modifier = do - mvar <- ask + mvar <- fst <$> ask liftIO $ modifyMVar_ mvar $ return . modifier withState :: (AnnexState -> IO (AnnexState, b)) -> Annex b withState modifier = do - mvar <- ask + mvar <- fst <$> ask liftIO $ modifyMVar mvar modifier {- Sets a flag to True -} setFlag :: String -> Annex () -setFlag flag = changeState $ \s -> - s { flags = M.insert flag True $ flags s } +setFlag flag = changeState $ \st -> + st { flags = M.insert flag True $ flags st } {- Sets a field to a value -} setField :: String -> String -> Annex () -setField field value = changeState $ \s -> - s { fields = M.insert field value $ fields s } +setField field value = changeState $ \st -> + st { fields = M.insert field value $ fields st } {- Adds a cleanup action to perform. -} addCleanupAction :: CleanupAction -> Annex () -> Annex () -addCleanupAction k a = changeState $ \s -> - s { cleanupactions = M.insert k a $ cleanupactions s } +addCleanupAction k a = changeState $ \st -> + st { cleanupactions = M.insert k a $ cleanupactions st } {- Sets the type of output to emit. -} setOutput :: OutputType -> Annex () -setOutput o = changeState $ \s -> - let m = output s - in s { output = m { outputType = adjustOutputType (outputType m) o } } +setOutput o = changeState $ \st -> + let m = output st + in st { output = m { outputType = adjustOutputType (outputType m) o } } {- Checks if a flag was set. -} getFlag :: String -> Annex Bool @@ -351,9 +369,9 @@ getGitConfig = getState gitconfig {- Overrides a GitConfig setting. The modification persists across - reloads of the repo's config. -} overrideGitConfig :: (GitConfig -> GitConfig) -> Annex () -overrideGitConfig f = changeState $ \s -> s - { gitconfigadjustment = gitconfigadjustment s . f - , gitconfig = f (gitconfig s) +overrideGitConfig f = changeState $ \st -> st + { gitconfigadjustment = gitconfigadjustment st . f + , gitconfig = f (gitconfig st) } {- Adds an adjustment to the Repo data. Adjustments persist across reloads @@ -364,7 +382,7 @@ overrideGitConfig f = changeState $ \s -> s -} adjustGitRepo :: (Git.Repo -> IO Git.Repo) -> Annex () adjustGitRepo a = do - changeState $ \s -> s { repoadjustment = \r -> repoadjustment s r >>= a } + changeState $ \st -> st { repoadjustment = \r -> repoadjustment st r >>= a } changeGitRepo =<< gitRepo {- Adds git config setting, like "foo=bar". It will be passed with -c @@ -375,7 +393,7 @@ addGitConfigOverride v = do adjustGitRepo $ \r -> Git.Config.store (encodeBS' v) Git.Config.ConfigList $ r { Git.gitGlobalOpts = go (Git.gitGlobalOpts r) } - changeState $ \s -> s { gitconfigoverride = v : gitconfigoverride s } + changeState $ \st -> st { gitconfigoverride = v : gitconfigoverride st } where -- Remove any prior occurrance of the setting to avoid -- building up many of them when the adjustment is run repeatedly, @@ -394,7 +412,7 @@ changeGitRepo r = do repoadjuster <- getState repoadjustment gitconfigadjuster <- getState gitconfigadjustment r' <- liftIO $ repoadjuster r - changeState $ \s -> s + changeState $ \st -> st { repo = r' , gitconfig = gitconfigadjuster $ extractGitConfig FromGitConfig r' @@ -414,8 +432,9 @@ getRemoteGitConfig r = do - state, as it will be thrown away. -} withCurrentState :: Annex a -> Annex (IO a) withCurrentState a = do - s <- getState id - return $ eval s a + (mvar, rd) <- ask + st <- liftIO $ readMVar mvar + return $ eval (st, rd) a {- It's not safe to use setCurrentDirectory in the Annex monad, - because the git repo paths are stored relative. @@ -426,20 +445,20 @@ changeDirectory d = do r <- liftIO . Git.adjustPath absPath =<< gitRepo liftIO $ setCurrentDirectory d r' <- liftIO $ Git.relPath r - changeState $ \s -> s { repo = r' } + changeState $ \st -> st { repo = r' } incError :: Annex () -incError = changeState $ \s -> - let !c = errcounter s + 1 - !s' = s { errcounter = c } - in s' +incError = changeState $ \st -> + let !c = errcounter st + 1 + !st' = st { errcounter = c } + in st' getGitRemotes :: Annex [Git.Repo] getGitRemotes = do - s <- getState id - case gitremotes s of + st <- getState id + case gitremotes st of Just rs -> return rs Nothing -> do - rs <- liftIO $ Git.Construct.fromRemotes (repo s) - changeState $ \s' -> s' { gitremotes = Just rs } + rs <- liftIO $ Git.Construct.fromRemotes (repo st) + changeState $ \st' -> st' { gitremotes = Just rs } return rs diff --git a/Annex/Action.hs b/Annex/Action.hs index f23a564194..c6f1c47583 100644 --- a/Annex/Action.hs +++ b/Annex/Action.hs @@ -52,7 +52,7 @@ verifiedAction a = tryNonAsync a >>= \case startup :: Annex () startup = do #ifndef mingw32_HOST_OS - av <- Annex.getState Annex.signalactions + av <- Annex.getRead Annex.signalactions let propagate sig = liftIO $ installhandleronce sig av propagate sigINT propagate sigQUIT diff --git a/Annex/Concurrent.hs b/Annex/Concurrent.hs index 9314554322..f341e21d0d 100644 --- a/Annex/Concurrent.hs +++ b/Annex/Concurrent.hs @@ -55,9 +55,10 @@ setConcurrency' c f = do -} forkState :: Annex a -> Annex (IO (Annex a)) forkState a = do + rd <- Annex.getRead id st <- dupState return $ do - (ret, newst) <- run st a + (ret, (newst, _rd)) <- run (st, rd) a return $ do mergeState newst return ret @@ -90,7 +91,9 @@ dupState = do - Also closes various handles in it. -} mergeState :: AnnexState -> Annex () mergeState st = do - st' <- liftIO $ snd <$> run st stopNonConcurrentSafeCoProcesses + rd <- Annex.getRead id + st' <- liftIO $ (fst . snd) + <$> run (st, rd) stopNonConcurrentSafeCoProcesses forM_ (M.toList $ Annex.cleanupactions st') $ uncurry addCleanupAction Annex.Queue.mergeFrom st' diff --git a/Annex/Link.hs b/Annex/Link.hs index 5b840bf730..5853eb0003 100644 --- a/Annex/Link.hs +++ b/Annex/Link.hs @@ -204,7 +204,7 @@ restagePointerFile (Restage True) f orig = withTSDelta $ \tsd -> runner :: Git.Queue.InternalActionRunner Annex runner = Git.Queue.InternalActionRunner "restagePointerFile" $ \r l -> do liftIO . Database.Keys.Handle.flushDbQueue - =<< Annex.getState Annex.keysdbhandle + =<< Annex.getRead Annex.keysdbhandle realindex <- liftIO $ Git.Index.currentIndexFile r let lock = fromRawFilePath (Git.Index.indexFileLock realindex) lockindex = liftIO $ catchMaybeIO $ Git.LockFile.openLock' lock diff --git a/Annex/Ssh.hs b/Annex/Ssh.hs index 48578a4680..50d5248852 100644 --- a/Annex/Ssh.hs +++ b/Annex/Ssh.hs @@ -219,7 +219,7 @@ prepSocket socketfile sshhost sshparams = do -- from a previous git-annex run that was interrupted. -- This must run only once, before we have made any ssh connection, -- and any other prepSocket calls must block while it's run. - tv <- Annex.getState Annex.sshstalecleaned + tv <- Annex.getRead Annex.sshstalecleaned join $ liftIO $ atomically $ do cleaned <- takeTMVar tv if cleaned diff --git a/Annex/Transfer.hs b/Annex/Transfer.hs index bf7494eed3..76d93df00f 100644 --- a/Annex/Transfer.hs +++ b/Annex/Transfer.hs @@ -371,7 +371,7 @@ pickRemote l a = debugLocks $ go l =<< getConcurrency else gononconcurrent rs goconcurrent rs = do - mv <- Annex.getState Annex.activeremotes + mv <- Annex.getRead Annex.activeremotes active <- liftIO $ takeMVar mv let rs' = sortBy (lessActiveFirst active) rs goconcurrent' mv active rs' diff --git a/Annex/TransferrerPool.hs b/Annex/TransferrerPool.hs index c6fa40ca7a..7ccb49b8dd 100644 --- a/Annex/TransferrerPool.hs +++ b/Annex/TransferrerPool.hs @@ -50,9 +50,9 @@ mkRunTransferrer batchmaker = RunTransferrer withTransferrer :: (Transferrer -> Annex a) -> Annex a withTransferrer a = do rt <- mkRunTransferrer nonBatchCommandMaker - pool <- Annex.getState Annex.transferrerpool + pool <- Annex.getRead Annex.transferrerpool let nocheck = pure (pure True) - signalactonsvar <- Annex.getState Annex.signalactions + signalactonsvar <- Annex.getRead Annex.signalactions withTransferrer' False signalactonsvar nocheck rt pool a withTransferrer' @@ -279,7 +279,7 @@ killTransferrer t = do {- Stop all transferrers in the pool. -} emptyTransferrerPool :: Annex () emptyTransferrerPool = do - poolvar <- Annex.getState Annex.transferrerpool + poolvar <- Annex.getRead Annex.transferrerpool pool <- liftIO $ atomically $ swapTVar poolvar [] liftIO $ forM_ pool $ \case TransferrerPoolItem (Just t) _ -> transferrerShutdown t diff --git a/Annex/WorkerPool.hs b/Annex/WorkerPool.hs index 8d6ddcd835..9f2b7f872f 100644 --- a/Annex/WorkerPool.hs +++ b/Annex/WorkerPool.hs @@ -60,7 +60,7 @@ enteringInitialStage = Annex.getState Annex.workers >>= \case - in the pool than spareVals. That does not prevent other threads that call - this from using them though, so it's fine. -} -changeStageTo :: ThreadId -> TMVar (WorkerPool AnnexState) -> (UsedStages -> WorkerStage) -> Annex (Maybe WorkerStage) +changeStageTo :: ThreadId -> TMVar (WorkerPool t) -> (UsedStages -> WorkerStage) -> Annex (Maybe WorkerStage) changeStageTo mytid tv getnewstage = liftIO $ replaceidle >>= maybe (return Nothing) @@ -99,11 +99,11 @@ changeStageTo mytid tv getnewstage = liftIO $ -- removes it from the pool, and returns its state. -- -- If the worker pool is not already allocated, returns Nothing. -waitStartWorkerSlot :: TMVar (WorkerPool Annex.AnnexState) -> STM (Maybe (Annex.AnnexState, WorkerStage)) +waitStartWorkerSlot :: TMVar (WorkerPool t) -> STM (Maybe (t, WorkerStage)) waitStartWorkerSlot tv = do pool <- takeTMVar tv - st <- go pool - return $ Just (st, StartStage) + v <- go pool + return $ Just (v, StartStage) where go pool = case spareVals pool of [] -> retry @@ -112,10 +112,10 @@ waitStartWorkerSlot tv = do putTMVar tv =<< waitIdleWorkerSlot StartStage pool' return v -waitIdleWorkerSlot :: WorkerStage -> WorkerPool Annex.AnnexState -> STM (WorkerPool Annex.AnnexState) +waitIdleWorkerSlot :: WorkerStage -> WorkerPool t -> STM (WorkerPool t) waitIdleWorkerSlot wantstage = maybe retry return . getIdleWorkerSlot wantstage -getIdleWorkerSlot :: WorkerStage -> WorkerPool Annex.AnnexState -> Maybe (WorkerPool Annex.AnnexState) +getIdleWorkerSlot :: WorkerStage -> WorkerPool t -> Maybe (WorkerPool t) getIdleWorkerSlot wantstage pool = do l <- findidle [] (workerList pool) return $ pool { workerList = l } diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs index 4ff870b6c1..add60706d6 100644 --- a/Assistant/TransferSlots.hs +++ b/Assistant/TransferSlots.hs @@ -91,9 +91,9 @@ runTransferThread' mkcheck rt d run = go where go = catchPauseResume $ do p <- runAssistant d $ liftAnnex $ - Annex.getState Annex.transferrerpool + Annex.getRead Annex.transferrerpool signalactonsvar <- runAssistant d $ liftAnnex $ - Annex.getState Annex.signalactions + Annex.getRead Annex.signalactions withTransferrer' True signalactonsvar mkcheck rt p run pause = catchPauseResume $ runEvery (Seconds 86400) noop diff --git a/Assistant/Types/ThreadedMonad.hs b/Assistant/Types/ThreadedMonad.hs index aeb95a8e16..4c07317807 100644 --- a/Assistant/Types/ThreadedMonad.hs +++ b/Assistant/Types/ThreadedMonad.hs @@ -15,7 +15,7 @@ import Data.Tuple {- The Annex state is stored in a MVar, so that threaded actions can access - it. -} -type ThreadState = MVar Annex.AnnexState +type ThreadState = MVar (Annex.AnnexState, Annex.AnnexRead) {- Stores the Annex state in a MVar. - @@ -24,9 +24,10 @@ type ThreadState = MVar Annex.AnnexState withThreadState :: (ThreadState -> Annex a) -> Annex a withThreadState a = do state <- Annex.getState id - mvar <- liftIO $ newMVar state + rd <- Annex.getRead id + mvar <- liftIO $ newMVar (state, rd) r <- a mvar - newstate <- liftIO $ takeMVar mvar + newstate <- liftIO $ fst <$> takeMVar mvar Annex.changeState (const newstate) return r @@ -35,4 +36,5 @@ withThreadState a = do - This serializes calls by threads; only one thread can run in Annex at a - time. -} runThreadState :: ThreadState -> Annex a -> IO a -runThreadState mvar a = modifyMVar mvar $ \state -> swap <$> Annex.run state a +runThreadState mvar a = modifyMVar mvar $ \v -> swap <$> Annex.run v a + diff --git a/CmdLine/Action.hs b/CmdLine/Action.hs index 61a2935bd5..1e18490193 100644 --- a/CmdLine/Action.hs +++ b/CmdLine/Action.hs @@ -68,9 +68,10 @@ commandAction start = getConcurrency >>= \case Just tv -> liftIO (atomically (waitStartWorkerSlot tv)) >>= maybe runnonconcurrent (runconcurrent' tv) - runconcurrent' tv (workerst, workerstage) = do - aid <- liftIO $ async $ snd <$> Annex.run workerst - (concurrentjob workerst) + runconcurrent' tv (workerstrd, workerstage) = do + aid <- liftIO $ async $ snd + <$> Annex.run workerstrd + (concurrentjob (fst workerstrd)) liftIO $ atomically $ do pool <- takeTMVar tv let !pool' = addWorkerPool (ActiveWorker aid workerstage) pool @@ -78,12 +79,12 @@ commandAction start = getConcurrency >>= \case void $ liftIO $ forkIO $ debugLocks $ do -- accountCommandAction will usually catch -- exceptions. Just in case, fall back to the - -- original workerst. - workerst' <- either (const workerst) id + -- original workerstrd. + workerstrd' <- either (const workerstrd) id <$> waitCatch aid atomically $ do pool <- takeTMVar tv - let !pool' = deactivateWorker pool aid workerst' + let !pool' = deactivateWorker pool aid workerstrd' putTMVar tv pool' concurrentjob workerst = start >>= \case @@ -133,12 +134,12 @@ finishCommandActions = Annex.getState Annex.workers >>= \case Nothing -> noop Just tv -> do Annex.changeState $ \s -> s { Annex.workers = Nothing } - sts <- liftIO $ atomically $ do + vs <- liftIO $ atomically $ do pool <- readTMVar tv if allIdle pool then return (spareVals pool) else retry - mapM_ mergeState sts + mapM_ (mergeState . fst) vs {- Waits for all worker threads that have been started so far to finish. -} waitForAllRunningCommandActions :: Annex () @@ -254,8 +255,9 @@ startConcurrency usedstages a = do Annex.changeState $ \s -> s { Annex.workers = Just tv } prepDupState st <- dupState + rd <- Annex.getRead id liftIO $ atomically $ putTMVar tv $ - allocateWorkerPool st (max n 1) usedstages + allocateWorkerPool (st, rd) (max n 1) usedstages -- Make sure that some expensive actions have been done before -- starting threads. This way the state has them already run, @@ -277,7 +279,7 @@ ensureOnlyActionOn k a = debugLocks $ go (Concurrent _) = goconcurrent go ConcurrentPerCpu = goconcurrent goconcurrent = do - tv <- Annex.getState Annex.activekeys + tv <- Annex.getRead Annex.activekeys bracket (setup tv) id (const a) setup tv = liftIO $ do mytid <- myThreadId diff --git a/Command/TestRemote.hs b/Command/TestRemote.hs index 21b38dc248..59a8d24784 100644 --- a/Command/TestRemote.hs +++ b/Command/TestRemote.hs @@ -102,7 +102,9 @@ start o = starting "testremote" (ActionItemOther (Just (testRemote o))) si $ do perform :: [Described (Annex (Maybe Remote))] -> Maybe Remote -> Annex (Maybe Remote) -> [Key] -> CommandPerform perform drs unavailr exportr ks = do - st <- liftIO . newTVarIO =<< Annex.getState id + st <- liftIO . newTVarIO =<< (,) + <$> Annex.getState id + <*> Annex.getRead id let tests = testGroup "Remote Tests" $ mkTestTrees (runTestCase st) drs @@ -198,7 +200,7 @@ data Described t = Described type RunAnnex = forall a. Annex a -> IO a -runTestCase :: TVar Annex.AnnexState -> RunAnnex +runTestCase :: TVar (Annex.AnnexState, Annex.AnnexRead) -> RunAnnex runTestCase stv a = do st <- atomically $ readTVar stv (r, st') <- Annex.run st $ do diff --git a/Database/Keys.hs b/Database/Keys.hs index 337f232a36..ebaee739a5 100644 --- a/Database/Keys.hs +++ b/Database/Keys.hs @@ -63,7 +63,7 @@ import qualified System.FilePath.ByteString as P -} runReader :: Monoid v => (SQL.ReadHandle -> Annex v) -> Annex v runReader a = do - h <- Annex.getState Annex.keysdbhandle + h <- Annex.getRead Annex.keysdbhandle withDbState h go where go DbUnavailable = return (mempty, DbUnavailable) @@ -87,7 +87,7 @@ runReaderIO a = runReader (liftIO . a) - The database is created if it doesn't exist yet. -} runWriter :: (SQL.WriteHandle -> Annex ()) -> Annex () runWriter a = do - h <- Annex.getState Annex.keysdbhandle + h <- Annex.getRead Annex.keysdbhandle withDbState h go where go st@(DbOpen qh) = do @@ -144,7 +144,7 @@ openDb createdb _ = catchPermissionDenied permerr $ withExclusiveLock gitAnnexKe - data to it. -} closeDb :: Annex () -closeDb = liftIO . closeDbHandle =<< Annex.getState Annex.keysdbhandle +closeDb = liftIO . closeDbHandle =<< Annex.getRead Annex.keysdbhandle addAssociatedFile :: Key -> TopFilePath -> Annex () addAssociatedFile k f = runWriterIO $ SQL.addAssociatedFile k f diff --git a/Remote/Git.hs b/Remote/Git.hs index b2f49593d8..fb637d856a 100644 --- a/Remote/Git.hs +++ b/Remote/Git.hs @@ -750,7 +750,7 @@ repairRemote r a = return $ do ensureInitialized a `finally` stopCoProcesses -data LocalRemoteAnnex = LocalRemoteAnnex Git.Repo (MVar (Maybe Annex.AnnexState)) +data LocalRemoteAnnex = LocalRemoteAnnex Git.Repo (MVar (Maybe (Annex.AnnexState, Annex.AnnexRead))) {- This can safely be called on a Repo that is not local, but of course - onLocal will not work if used with the result. -} @@ -775,20 +775,20 @@ onLocalRepo repo a = do onLocal' lra a onLocal' :: LocalRemoteAnnex -> Annex a -> Annex a -onLocal' (LocalRemoteAnnex repo v) a = liftIO (takeMVar v) >>= \case +onLocal' (LocalRemoteAnnex repo mv) a = liftIO (takeMVar mv) >>= \case Nothing -> do - st <- liftIO $ Annex.new repo - go (st, ensureInitialized >> a) - Just st -> go (st, a) + v <- liftIO $ Annex.new repo + go (v, ensureInitialized >> a) + Just v -> go (v, a) where - go (st, a') = do + go ((st, rd), a') = do curro <- Annex.getState Annex.output - let act = Annex.run (st { Annex.output = curro }) $ + let act = Annex.run (st { Annex.output = curro }, rd) $ a' `finally` stopCoProcesses - (ret, st') <- liftIO $ act `onException` cache st - liftIO $ cache st' + (ret, (st', _rd)) <- liftIO $ act `onException` cache (st, rd) + liftIO $ cache (st', rd) return ret - cache st = putMVar v (Just st) + cache = putMVar mv . Just {- Faster variant of onLocal. - diff --git a/RemoteDaemon/Common.hs b/RemoteDaemon/Common.hs index 1af54e18e8..e7d83f8b03 100644 --- a/RemoteDaemon/Common.hs +++ b/RemoteDaemon/Common.hs @@ -26,14 +26,14 @@ import Control.Concurrent -- since only one liftAnnex can be running at a time, across all -- transports. liftAnnex :: TransportHandle -> Annex a -> IO a -liftAnnex (TransportHandle _ annexstate) a = do - st <- takeMVar annexstate - (r, st') <- Annex.run st a - putMVar annexstate st' +liftAnnex (TransportHandle _ stmv rd) a = do + st <- takeMVar stmv + (r, (st', _rd)) <- Annex.run (st, rd) a + putMVar stmv st' return r inLocalRepo :: TransportHandle -> (Git.Repo -> IO a) -> IO a -inLocalRepo (TransportHandle (LocalRepo g) _) a = a g +inLocalRepo (TransportHandle (LocalRepo g) _ _) a = a g -- Check if some shas should be fetched from the remote, -- and presumably later merged. diff --git a/RemoteDaemon/Core.hs b/RemoteDaemon/Core.hs index 054bce0aa7..ce29f7f7cc 100644 --- a/RemoteDaemon/Core.hs +++ b/RemoteDaemon/Core.hs @@ -139,7 +139,7 @@ runController ichan ochan = do -- Generates a map with a transport for each supported remote in the git repo, -- except those that have annex.sync = false genRemoteMap :: TransportHandle -> TChan Emitted -> IO RemoteMap -genRemoteMap h@(TransportHandle (LocalRepo g) _) ochan = do +genRemoteMap h@(TransportHandle (LocalRepo g) _ _) ochan = do rs <- Git.Construct.fromRemotes g M.fromList . catMaybes <$> mapM gen rs where @@ -161,17 +161,18 @@ genRemoteMap h@(TransportHandle (LocalRepo g) _) ochan = do genTransportHandle :: IO TransportHandle genTransportHandle = do - annexstate <- newMVar =<< Annex.new =<< Git.CurrentRepo.get - g <- Annex.repo <$> readMVar annexstate - let h = TransportHandle (LocalRepo g) annexstate + (st, rd) <- Annex.new =<< Git.CurrentRepo.get + mvar <- newMVar st + let g = Annex.repo st + let h = TransportHandle (LocalRepo g) mvar rd liftAnnex h $ do Annex.setOutput QuietOutput enableInteractiveBranchAccess return h updateTransportHandle :: TransportHandle -> IO TransportHandle -updateTransportHandle h@(TransportHandle _g annexstate) = do +updateTransportHandle h@(TransportHandle _g st rd) = do g' <- liftAnnex h $ do reloadConfig Annex.gitRepo - return (TransportHandle (LocalRepo g') annexstate) + return (TransportHandle (LocalRepo g') st rd) diff --git a/RemoteDaemon/Transport/GCrypt.hs b/RemoteDaemon/Transport/GCrypt.hs index d88ce74fbd..746946638e 100644 --- a/RemoteDaemon/Transport/GCrypt.hs +++ b/RemoteDaemon/Transport/GCrypt.hs @@ -17,7 +17,7 @@ import Remote.GCrypt (accessShellConfig) import Annex.Ssh transport :: Transport -transport rr@(RemoteRepo r gc) url h@(TransportHandle (LocalRepo g) _) ichan ochan +transport rr@(RemoteRepo r gc) url h@(TransportHandle (LocalRepo g) _ _) ichan ochan | accessShellConfig gc = do r' <- encryptedRemote g r v <- liftAnnex h $ git_annex_shell ConsumeStdin r' "notifychanges" [] [] diff --git a/RemoteDaemon/Transport/Ssh.hs b/RemoteDaemon/Transport/Ssh.hs index fbfdc51a21..2926b34713 100644 --- a/RemoteDaemon/Transport/Ssh.hs +++ b/RemoteDaemon/Transport/Ssh.hs @@ -29,10 +29,10 @@ transport rr@(RemoteRepo r _) url h ichan ochan = do Just (cmd, params) -> transportUsingCmd cmd params rr url h ichan ochan transportUsingCmd :: FilePath -> [CommandParam] -> Transport -transportUsingCmd cmd params rr@(RemoteRepo r gc) url h@(TransportHandle (LocalRepo g) s) ichan ochan = do +transportUsingCmd cmd params rr@(RemoteRepo r gc) url h@(TransportHandle (LocalRepo g) st rd) ichan ochan = do -- enable ssh connection caching wherever inLocalRepo is called g' <- liftAnnex h $ sshOptionsTo r gc g - let transporthandle = TransportHandle (LocalRepo g') s + let transporthandle = TransportHandle (LocalRepo g') st rd transportUsingCmd' cmd params rr url transporthandle ichan ochan transportUsingCmd' :: FilePath -> [CommandParam] -> Transport diff --git a/RemoteDaemon/Transport/Tor.hs b/RemoteDaemon/Transport/Tor.hs index 977a29112e..8f5ca5acdc 100644 --- a/RemoteDaemon/Transport/Tor.hs +++ b/RemoteDaemon/Transport/Tor.hs @@ -39,7 +39,7 @@ import System.Posix.User -- Run tor hidden service. server :: Server -server ichan th@(TransportHandle (LocalRepo r) _) = go +server ichan th@(TransportHandle (LocalRepo r) _ _) = go where go = checkstartservice >>= handlecontrol @@ -88,7 +88,7 @@ maxConnections :: Int maxConnections = 100 serveClient :: TransportHandle -> UUID -> Repo -> TBMQueue Handle -> IO () -serveClient th u r q = bracket setup cleanup start +serveClient th@(TransportHandle _ _ rd) u r q = bracket setup cleanup start where setup = do h <- atomically $ readTBMQueue q @@ -105,7 +105,7 @@ serveClient th u r q = bracket setup cleanup start -- Avoid doing any work in the liftAnnex, since only one -- can run at a time. st <- liftAnnex th dupState - ((), st') <- Annex.run st $ do + ((), (st', _rd)) <- Annex.run (st, rd) $ do -- Load auth tokens for every connection, to notice -- when the allowed set is changed. allowed <- loadP2PAuthTokens diff --git a/RemoteDaemon/Types.hs b/RemoteDaemon/Types.hs index bccef41193..8b31ca16e9 100644 --- a/RemoteDaemon/Types.hs +++ b/RemoteDaemon/Types.hs @@ -35,12 +35,12 @@ type Server = TChan Consumed -> TransportHandle -> IO () data RemoteRepo = RemoteRepo Git.Repo RemoteGitConfig newtype LocalRepo = LocalRepo Git.Repo --- All Transports share a single AnnexState MVar +-- All Transports share a single AnnexState MVar and an AnnexRead. -- -- Different TransportHandles may have different versions of the LocalRepo. -- (For example, the ssh transport modifies it to enable ssh connection -- caching.) -data TransportHandle = TransportHandle LocalRepo (MVar Annex.AnnexState) +data TransportHandle = TransportHandle LocalRepo (MVar Annex.AnnexState) Annex.AnnexRead -- Messages that the daemon emits. data Emitted