start splitting out readonly values from AnnexState
Values in AnnexRead can be read more efficiently, without MVar overhead. Only a few things have been moved into there, and the performance increase so far is not likely to be noticable. This is groundwork for putting more stuff in there, particularly a value that indicates if debugging is enabled. The obvious next step is to change option parsing to not run in the Annex monad to set values in AnnexState, and instead return a pure value that gets stored in AnnexRead.
This commit is contained in:
parent
3204f0bbaa
commit
c2f612292a
20 changed files with 169 additions and 140 deletions
169
Annex.hs
169
Annex.hs
|
@ -10,10 +10,12 @@
|
|||
module Annex (
|
||||
Annex,
|
||||
AnnexState(..),
|
||||
AnnexRead(..),
|
||||
new,
|
||||
run,
|
||||
eval,
|
||||
makeRunner,
|
||||
getRead,
|
||||
getState,
|
||||
changeState,
|
||||
withState,
|
||||
|
@ -88,18 +90,18 @@ import qualified Data.Map.Strict as M
|
|||
import qualified Data.Set as S
|
||||
import Data.Time.Clock.POSIX
|
||||
|
||||
{- git-annex's monad is a ReaderT around an AnnexState stored in a MVar.
|
||||
- The MVar is not exposed outside this module.
|
||||
{- git-annex's monad is a ReaderT around an AnnexState stored in a MVar,
|
||||
- and an AnnexRead. The MVar is not exposed outside this module.
|
||||
-
|
||||
- Note that when an Annex action fails and the exception is caught,
|
||||
- any changes the action has made to the AnnexState are retained,
|
||||
- due to the use of the MVar to store the state.
|
||||
-}
|
||||
newtype Annex a = Annex { runAnnex :: ReaderT (MVar AnnexState) IO a }
|
||||
newtype Annex a = Annex { runAnnex :: ReaderT (MVar AnnexState, AnnexRead) IO a }
|
||||
deriving (
|
||||
Monad,
|
||||
MonadIO,
|
||||
MonadReader (MVar AnnexState),
|
||||
MonadReader (MVar AnnexState, AnnexRead),
|
||||
MonadCatch,
|
||||
MonadThrow,
|
||||
MonadMask,
|
||||
|
@ -109,7 +111,34 @@ newtype Annex a = Annex { runAnnex :: ReaderT (MVar AnnexState) IO a }
|
|||
Alternative
|
||||
)
|
||||
|
||||
-- internal state storage
|
||||
-- Values that can be read, but not modified by an Annex action.
|
||||
data AnnexRead = AnnexRead
|
||||
{ activekeys :: TVar (M.Map Key ThreadId)
|
||||
, activeremotes :: MVar (M.Map (Types.Remote.RemoteA Annex) Integer)
|
||||
, keysdbhandle :: Keys.DbHandle
|
||||
, sshstalecleaned :: TMVar Bool
|
||||
, signalactions :: TVar (M.Map SignalAction (Int -> IO ()))
|
||||
, transferrerpool :: TransferrerPool
|
||||
}
|
||||
|
||||
newAnnexRead :: IO AnnexRead
|
||||
newAnnexRead = do
|
||||
emptyactivekeys <- newTVarIO M.empty
|
||||
emptyactiveremotes <- newMVar M.empty
|
||||
kh <- Keys.newDbHandle
|
||||
sc <- newTMVarIO False
|
||||
si <- newTVarIO M.empty
|
||||
tp <- newTransferrerPool
|
||||
return $ AnnexRead
|
||||
{ activekeys = emptyactivekeys
|
||||
, activeremotes = emptyactiveremotes
|
||||
, keysdbhandle = kh
|
||||
, sshstalecleaned = sc
|
||||
, signalactions = si
|
||||
, transferrerpool = tp
|
||||
}
|
||||
|
||||
-- Values that can change while running an Annex action.
|
||||
data AnnexState = AnnexState
|
||||
{ repo :: Git.Repo
|
||||
, repoadjustment :: (Git.Repo -> IO Git.Repo)
|
||||
|
@ -125,7 +154,6 @@ data AnnexState = AnnexState
|
|||
, fast :: Bool
|
||||
, daemon :: Bool
|
||||
, branchstate :: BranchState
|
||||
, getvectorclock :: IO VectorClock
|
||||
, repoqueue :: Maybe (Git.Queue.Queue Annex)
|
||||
, catfilehandles :: CatFileHandles
|
||||
, hashobjecthandle :: Maybe HashObjectHandle
|
||||
|
@ -147,11 +175,9 @@ data AnnexState = AnnexState
|
|||
, groupmap :: Maybe GroupMap
|
||||
, ciphers :: M.Map StorableCipher Cipher
|
||||
, lockcache :: LockCache
|
||||
, sshstalecleaned :: TMVar Bool
|
||||
, flags :: M.Map String Bool
|
||||
, fields :: M.Map String String
|
||||
, cleanupactions :: M.Map CleanupAction (Annex ())
|
||||
, signalactions :: TVar (M.Map SignalAction (Int -> IO ()))
|
||||
, sentinalstatus :: Maybe SentinalStatus
|
||||
, useragent :: Maybe String
|
||||
, errcounter :: Integer
|
||||
|
@ -160,26 +186,17 @@ data AnnexState = AnnexState
|
|||
, tempurls :: M.Map Key URLString
|
||||
, existinghooks :: M.Map Git.Hook.Hook Bool
|
||||
, desktopnotify :: DesktopNotify
|
||||
, workers :: Maybe (TMVar (WorkerPool AnnexState))
|
||||
, activekeys :: TVar (M.Map Key ThreadId)
|
||||
, activeremotes :: MVar (M.Map (Types.Remote.RemoteA Annex) Integer)
|
||||
, keysdbhandle :: Keys.DbHandle
|
||||
, workers :: Maybe (TMVar (WorkerPool (AnnexState, AnnexRead)))
|
||||
, cachedcurrentbranch :: (Maybe (Maybe Git.Branch, Maybe Adjustment))
|
||||
, cachedgitenv :: Maybe (AltIndexFile, FilePath, [(String, String)])
|
||||
, urloptions :: Maybe UrlOptions
|
||||
, insmudgecleanfilter :: Bool
|
||||
, transferrerpool :: TransferrerPool
|
||||
, getvectorclock :: IO VectorClock
|
||||
}
|
||||
|
||||
newState :: GitConfig -> Git.Repo -> IO AnnexState
|
||||
newState c r = do
|
||||
emptyactiveremotes <- newMVar M.empty
|
||||
emptyactivekeys <- newTVarIO M.empty
|
||||
si <- newTVarIO M.empty
|
||||
newAnnexState :: GitConfig -> Git.Repo -> IO AnnexState
|
||||
newAnnexState c r = do
|
||||
o <- newMessageState
|
||||
sc <- newTMVarIO False
|
||||
kh <- Keys.newDbHandle
|
||||
tp <- newTransferrerPool
|
||||
vc <- startVectorClock
|
||||
return $ AnnexState
|
||||
{ repo = r
|
||||
|
@ -196,7 +213,6 @@ newState c r = do
|
|||
, fast = False
|
||||
, daemon = False
|
||||
, branchstate = startBranchState
|
||||
, getvectorclock = vc
|
||||
, repoqueue = Nothing
|
||||
, catfilehandles = catFileHandlesNonConcurrent
|
||||
, hashobjecthandle = Nothing
|
||||
|
@ -218,11 +234,9 @@ newState c r = do
|
|||
, groupmap = Nothing
|
||||
, ciphers = M.empty
|
||||
, lockcache = M.empty
|
||||
, sshstalecleaned = sc
|
||||
, flags = M.empty
|
||||
, fields = M.empty
|
||||
, cleanupactions = M.empty
|
||||
, signalactions = si
|
||||
, sentinalstatus = Nothing
|
||||
, useragent = Nothing
|
||||
, errcounter = 0
|
||||
|
@ -232,91 +246,95 @@ newState c r = do
|
|||
, existinghooks = M.empty
|
||||
, desktopnotify = mempty
|
||||
, workers = Nothing
|
||||
, activekeys = emptyactivekeys
|
||||
, activeremotes = emptyactiveremotes
|
||||
, keysdbhandle = kh
|
||||
, cachedcurrentbranch = Nothing
|
||||
, cachedgitenv = Nothing
|
||||
, urloptions = Nothing
|
||||
, insmudgecleanfilter = False
|
||||
, transferrerpool = tp
|
||||
, getvectorclock = vc
|
||||
}
|
||||
|
||||
{- Makes an Annex state object for the specified git repo.
|
||||
- Ensures the config is read, if it was not already, and performs
|
||||
- any necessary git repo fixups. -}
|
||||
new :: Git.Repo -> IO AnnexState
|
||||
new :: Git.Repo -> IO (AnnexState, AnnexRead)
|
||||
new r = do
|
||||
r' <- Git.Config.read r
|
||||
let c = extractGitConfig FromGitConfig r'
|
||||
newState c =<< fixupRepo r' c
|
||||
st <- newAnnexState c =<< fixupRepo r' c
|
||||
rd <- newAnnexRead
|
||||
return (st, rd)
|
||||
|
||||
{- Performs an action in the Annex monad from a starting state,
|
||||
- returning a new state. -}
|
||||
run :: AnnexState -> Annex a -> IO (a, AnnexState)
|
||||
run s a = flip run' a =<< newMVar s
|
||||
run :: (AnnexState, AnnexRead) -> Annex a -> IO (a, (AnnexState, AnnexRead))
|
||||
run (st, rd) a = do
|
||||
mv <- newMVar st
|
||||
run' mv rd a
|
||||
|
||||
run' :: MVar AnnexState -> Annex a -> IO (a, AnnexState)
|
||||
run' mvar a = do
|
||||
r <- runReaderT (runAnnex a) mvar
|
||||
`onException` (flush =<< readMVar mvar)
|
||||
s' <- takeMVar mvar
|
||||
flush s'
|
||||
return (r, s')
|
||||
run' :: MVar AnnexState -> AnnexRead -> Annex a -> IO (a, (AnnexState, AnnexRead))
|
||||
run' mvar rd a = do
|
||||
r <- runReaderT (runAnnex a) (mvar, rd)
|
||||
`onException` (flush rd)
|
||||
flush rd
|
||||
st <- takeMVar mvar
|
||||
return (r, (st, rd))
|
||||
where
|
||||
flush = Keys.flushDbQueue . keysdbhandle
|
||||
|
||||
{- Performs an action in the Annex monad from a starting state,
|
||||
- and throws away the new state. -}
|
||||
eval :: AnnexState -> Annex a -> IO a
|
||||
eval s a = fst <$> run s a
|
||||
- and throws away the changed state. -}
|
||||
eval :: (AnnexState, AnnexRead) -> Annex a -> IO a
|
||||
eval v a = fst <$> run v a
|
||||
|
||||
{- Makes a runner action, that allows diving into IO and from inside
|
||||
- the IO action, running an Annex action. -}
|
||||
makeRunner :: Annex (Annex a -> IO a)
|
||||
makeRunner = do
|
||||
mvar <- ask
|
||||
(mvar, rd) <- ask
|
||||
return $ \a -> do
|
||||
(r, s) <- run' mvar a
|
||||
(r, (s, _rd)) <- run' mvar rd a
|
||||
putMVar mvar s
|
||||
return r
|
||||
|
||||
getRead :: (AnnexRead -> v) -> Annex v
|
||||
getRead selector = selector . snd <$> ask
|
||||
|
||||
getState :: (AnnexState -> v) -> Annex v
|
||||
getState selector = do
|
||||
mvar <- ask
|
||||
s <- liftIO $ readMVar mvar
|
||||
return $ selector s
|
||||
mvar <- fst <$> ask
|
||||
st <- liftIO $ readMVar mvar
|
||||
return $ selector st
|
||||
|
||||
changeState :: (AnnexState -> AnnexState) -> Annex ()
|
||||
changeState modifier = do
|
||||
mvar <- ask
|
||||
mvar <- fst <$> ask
|
||||
liftIO $ modifyMVar_ mvar $ return . modifier
|
||||
|
||||
withState :: (AnnexState -> IO (AnnexState, b)) -> Annex b
|
||||
withState modifier = do
|
||||
mvar <- ask
|
||||
mvar <- fst <$> ask
|
||||
liftIO $ modifyMVar mvar modifier
|
||||
|
||||
{- Sets a flag to True -}
|
||||
setFlag :: String -> Annex ()
|
||||
setFlag flag = changeState $ \s ->
|
||||
s { flags = M.insert flag True $ flags s }
|
||||
setFlag flag = changeState $ \st ->
|
||||
st { flags = M.insert flag True $ flags st }
|
||||
|
||||
{- Sets a field to a value -}
|
||||
setField :: String -> String -> Annex ()
|
||||
setField field value = changeState $ \s ->
|
||||
s { fields = M.insert field value $ fields s }
|
||||
setField field value = changeState $ \st ->
|
||||
st { fields = M.insert field value $ fields st }
|
||||
|
||||
{- Adds a cleanup action to perform. -}
|
||||
addCleanupAction :: CleanupAction -> Annex () -> Annex ()
|
||||
addCleanupAction k a = changeState $ \s ->
|
||||
s { cleanupactions = M.insert k a $ cleanupactions s }
|
||||
addCleanupAction k a = changeState $ \st ->
|
||||
st { cleanupactions = M.insert k a $ cleanupactions st }
|
||||
|
||||
{- Sets the type of output to emit. -}
|
||||
setOutput :: OutputType -> Annex ()
|
||||
setOutput o = changeState $ \s ->
|
||||
let m = output s
|
||||
in s { output = m { outputType = adjustOutputType (outputType m) o } }
|
||||
setOutput o = changeState $ \st ->
|
||||
let m = output st
|
||||
in st { output = m { outputType = adjustOutputType (outputType m) o } }
|
||||
|
||||
{- Checks if a flag was set. -}
|
||||
getFlag :: String -> Annex Bool
|
||||
|
@ -351,9 +369,9 @@ getGitConfig = getState gitconfig
|
|||
{- Overrides a GitConfig setting. The modification persists across
|
||||
- reloads of the repo's config. -}
|
||||
overrideGitConfig :: (GitConfig -> GitConfig) -> Annex ()
|
||||
overrideGitConfig f = changeState $ \s -> s
|
||||
{ gitconfigadjustment = gitconfigadjustment s . f
|
||||
, gitconfig = f (gitconfig s)
|
||||
overrideGitConfig f = changeState $ \st -> st
|
||||
{ gitconfigadjustment = gitconfigadjustment st . f
|
||||
, gitconfig = f (gitconfig st)
|
||||
}
|
||||
|
||||
{- Adds an adjustment to the Repo data. Adjustments persist across reloads
|
||||
|
@ -364,7 +382,7 @@ overrideGitConfig f = changeState $ \s -> s
|
|||
-}
|
||||
adjustGitRepo :: (Git.Repo -> IO Git.Repo) -> Annex ()
|
||||
adjustGitRepo a = do
|
||||
changeState $ \s -> s { repoadjustment = \r -> repoadjustment s r >>= a }
|
||||
changeState $ \st -> st { repoadjustment = \r -> repoadjustment st r >>= a }
|
||||
changeGitRepo =<< gitRepo
|
||||
|
||||
{- Adds git config setting, like "foo=bar". It will be passed with -c
|
||||
|
@ -375,7 +393,7 @@ addGitConfigOverride v = do
|
|||
adjustGitRepo $ \r ->
|
||||
Git.Config.store (encodeBS' v) Git.Config.ConfigList $
|
||||
r { Git.gitGlobalOpts = go (Git.gitGlobalOpts r) }
|
||||
changeState $ \s -> s { gitconfigoverride = v : gitconfigoverride s }
|
||||
changeState $ \st -> st { gitconfigoverride = v : gitconfigoverride st }
|
||||
where
|
||||
-- Remove any prior occurrance of the setting to avoid
|
||||
-- building up many of them when the adjustment is run repeatedly,
|
||||
|
@ -394,7 +412,7 @@ changeGitRepo r = do
|
|||
repoadjuster <- getState repoadjustment
|
||||
gitconfigadjuster <- getState gitconfigadjustment
|
||||
r' <- liftIO $ repoadjuster r
|
||||
changeState $ \s -> s
|
||||
changeState $ \st -> st
|
||||
{ repo = r'
|
||||
, gitconfig = gitconfigadjuster $
|
||||
extractGitConfig FromGitConfig r'
|
||||
|
@ -414,8 +432,9 @@ getRemoteGitConfig r = do
|
|||
- state, as it will be thrown away. -}
|
||||
withCurrentState :: Annex a -> Annex (IO a)
|
||||
withCurrentState a = do
|
||||
s <- getState id
|
||||
return $ eval s a
|
||||
(mvar, rd) <- ask
|
||||
st <- liftIO $ readMVar mvar
|
||||
return $ eval (st, rd) a
|
||||
|
||||
{- It's not safe to use setCurrentDirectory in the Annex monad,
|
||||
- because the git repo paths are stored relative.
|
||||
|
@ -426,20 +445,20 @@ changeDirectory d = do
|
|||
r <- liftIO . Git.adjustPath absPath =<< gitRepo
|
||||
liftIO $ setCurrentDirectory d
|
||||
r' <- liftIO $ Git.relPath r
|
||||
changeState $ \s -> s { repo = r' }
|
||||
changeState $ \st -> st { repo = r' }
|
||||
|
||||
incError :: Annex ()
|
||||
incError = changeState $ \s ->
|
||||
let !c = errcounter s + 1
|
||||
!s' = s { errcounter = c }
|
||||
in s'
|
||||
incError = changeState $ \st ->
|
||||
let !c = errcounter st + 1
|
||||
!st' = st { errcounter = c }
|
||||
in st'
|
||||
|
||||
getGitRemotes :: Annex [Git.Repo]
|
||||
getGitRemotes = do
|
||||
s <- getState id
|
||||
case gitremotes s of
|
||||
st <- getState id
|
||||
case gitremotes st of
|
||||
Just rs -> return rs
|
||||
Nothing -> do
|
||||
rs <- liftIO $ Git.Construct.fromRemotes (repo s)
|
||||
changeState $ \s' -> s' { gitremotes = Just rs }
|
||||
rs <- liftIO $ Git.Construct.fromRemotes (repo st)
|
||||
changeState $ \st' -> st' { gitremotes = Just rs }
|
||||
return rs
|
||||
|
|
|
@ -52,7 +52,7 @@ verifiedAction a = tryNonAsync a >>= \case
|
|||
startup :: Annex ()
|
||||
startup = do
|
||||
#ifndef mingw32_HOST_OS
|
||||
av <- Annex.getState Annex.signalactions
|
||||
av <- Annex.getRead Annex.signalactions
|
||||
let propagate sig = liftIO $ installhandleronce sig av
|
||||
propagate sigINT
|
||||
propagate sigQUIT
|
||||
|
|
|
@ -55,9 +55,10 @@ setConcurrency' c f = do
|
|||
-}
|
||||
forkState :: Annex a -> Annex (IO (Annex a))
|
||||
forkState a = do
|
||||
rd <- Annex.getRead id
|
||||
st <- dupState
|
||||
return $ do
|
||||
(ret, newst) <- run st a
|
||||
(ret, (newst, _rd)) <- run (st, rd) a
|
||||
return $ do
|
||||
mergeState newst
|
||||
return ret
|
||||
|
@ -90,7 +91,9 @@ dupState = do
|
|||
- Also closes various handles in it. -}
|
||||
mergeState :: AnnexState -> Annex ()
|
||||
mergeState st = do
|
||||
st' <- liftIO $ snd <$> run st stopNonConcurrentSafeCoProcesses
|
||||
rd <- Annex.getRead id
|
||||
st' <- liftIO $ (fst . snd)
|
||||
<$> run (st, rd) stopNonConcurrentSafeCoProcesses
|
||||
forM_ (M.toList $ Annex.cleanupactions st') $
|
||||
uncurry addCleanupAction
|
||||
Annex.Queue.mergeFrom st'
|
||||
|
|
|
@ -204,7 +204,7 @@ restagePointerFile (Restage True) f orig = withTSDelta $ \tsd ->
|
|||
runner :: Git.Queue.InternalActionRunner Annex
|
||||
runner = Git.Queue.InternalActionRunner "restagePointerFile" $ \r l -> do
|
||||
liftIO . Database.Keys.Handle.flushDbQueue
|
||||
=<< Annex.getState Annex.keysdbhandle
|
||||
=<< Annex.getRead Annex.keysdbhandle
|
||||
realindex <- liftIO $ Git.Index.currentIndexFile r
|
||||
let lock = fromRawFilePath (Git.Index.indexFileLock realindex)
|
||||
lockindex = liftIO $ catchMaybeIO $ Git.LockFile.openLock' lock
|
||||
|
|
|
@ -219,7 +219,7 @@ prepSocket socketfile sshhost sshparams = do
|
|||
-- from a previous git-annex run that was interrupted.
|
||||
-- This must run only once, before we have made any ssh connection,
|
||||
-- and any other prepSocket calls must block while it's run.
|
||||
tv <- Annex.getState Annex.sshstalecleaned
|
||||
tv <- Annex.getRead Annex.sshstalecleaned
|
||||
join $ liftIO $ atomically $ do
|
||||
cleaned <- takeTMVar tv
|
||||
if cleaned
|
||||
|
|
|
@ -371,7 +371,7 @@ pickRemote l a = debugLocks $ go l =<< getConcurrency
|
|||
else gononconcurrent rs
|
||||
|
||||
goconcurrent rs = do
|
||||
mv <- Annex.getState Annex.activeremotes
|
||||
mv <- Annex.getRead Annex.activeremotes
|
||||
active <- liftIO $ takeMVar mv
|
||||
let rs' = sortBy (lessActiveFirst active) rs
|
||||
goconcurrent' mv active rs'
|
||||
|
|
|
@ -50,9 +50,9 @@ mkRunTransferrer batchmaker = RunTransferrer
|
|||
withTransferrer :: (Transferrer -> Annex a) -> Annex a
|
||||
withTransferrer a = do
|
||||
rt <- mkRunTransferrer nonBatchCommandMaker
|
||||
pool <- Annex.getState Annex.transferrerpool
|
||||
pool <- Annex.getRead Annex.transferrerpool
|
||||
let nocheck = pure (pure True)
|
||||
signalactonsvar <- Annex.getState Annex.signalactions
|
||||
signalactonsvar <- Annex.getRead Annex.signalactions
|
||||
withTransferrer' False signalactonsvar nocheck rt pool a
|
||||
|
||||
withTransferrer'
|
||||
|
@ -279,7 +279,7 @@ killTransferrer t = do
|
|||
{- Stop all transferrers in the pool. -}
|
||||
emptyTransferrerPool :: Annex ()
|
||||
emptyTransferrerPool = do
|
||||
poolvar <- Annex.getState Annex.transferrerpool
|
||||
poolvar <- Annex.getRead Annex.transferrerpool
|
||||
pool <- liftIO $ atomically $ swapTVar poolvar []
|
||||
liftIO $ forM_ pool $ \case
|
||||
TransferrerPoolItem (Just t) _ -> transferrerShutdown t
|
||||
|
|
|
@ -60,7 +60,7 @@ enteringInitialStage = Annex.getState Annex.workers >>= \case
|
|||
- in the pool than spareVals. That does not prevent other threads that call
|
||||
- this from using them though, so it's fine.
|
||||
-}
|
||||
changeStageTo :: ThreadId -> TMVar (WorkerPool AnnexState) -> (UsedStages -> WorkerStage) -> Annex (Maybe WorkerStage)
|
||||
changeStageTo :: ThreadId -> TMVar (WorkerPool t) -> (UsedStages -> WorkerStage) -> Annex (Maybe WorkerStage)
|
||||
changeStageTo mytid tv getnewstage = liftIO $
|
||||
replaceidle >>= maybe
|
||||
(return Nothing)
|
||||
|
@ -99,11 +99,11 @@ changeStageTo mytid tv getnewstage = liftIO $
|
|||
-- removes it from the pool, and returns its state.
|
||||
--
|
||||
-- If the worker pool is not already allocated, returns Nothing.
|
||||
waitStartWorkerSlot :: TMVar (WorkerPool Annex.AnnexState) -> STM (Maybe (Annex.AnnexState, WorkerStage))
|
||||
waitStartWorkerSlot :: TMVar (WorkerPool t) -> STM (Maybe (t, WorkerStage))
|
||||
waitStartWorkerSlot tv = do
|
||||
pool <- takeTMVar tv
|
||||
st <- go pool
|
||||
return $ Just (st, StartStage)
|
||||
v <- go pool
|
||||
return $ Just (v, StartStage)
|
||||
where
|
||||
go pool = case spareVals pool of
|
||||
[] -> retry
|
||||
|
@ -112,10 +112,10 @@ waitStartWorkerSlot tv = do
|
|||
putTMVar tv =<< waitIdleWorkerSlot StartStage pool'
|
||||
return v
|
||||
|
||||
waitIdleWorkerSlot :: WorkerStage -> WorkerPool Annex.AnnexState -> STM (WorkerPool Annex.AnnexState)
|
||||
waitIdleWorkerSlot :: WorkerStage -> WorkerPool t -> STM (WorkerPool t)
|
||||
waitIdleWorkerSlot wantstage = maybe retry return . getIdleWorkerSlot wantstage
|
||||
|
||||
getIdleWorkerSlot :: WorkerStage -> WorkerPool Annex.AnnexState -> Maybe (WorkerPool Annex.AnnexState)
|
||||
getIdleWorkerSlot :: WorkerStage -> WorkerPool t -> Maybe (WorkerPool t)
|
||||
getIdleWorkerSlot wantstage pool = do
|
||||
l <- findidle [] (workerList pool)
|
||||
return $ pool { workerList = l }
|
||||
|
|
|
@ -91,9 +91,9 @@ runTransferThread' mkcheck rt d run = go
|
|||
where
|
||||
go = catchPauseResume $ do
|
||||
p <- runAssistant d $ liftAnnex $
|
||||
Annex.getState Annex.transferrerpool
|
||||
Annex.getRead Annex.transferrerpool
|
||||
signalactonsvar <- runAssistant d $ liftAnnex $
|
||||
Annex.getState Annex.signalactions
|
||||
Annex.getRead Annex.signalactions
|
||||
withTransferrer' True signalactonsvar mkcheck rt p run
|
||||
pause = catchPauseResume $
|
||||
runEvery (Seconds 86400) noop
|
||||
|
|
|
@ -15,7 +15,7 @@ import Data.Tuple
|
|||
|
||||
{- The Annex state is stored in a MVar, so that threaded actions can access
|
||||
- it. -}
|
||||
type ThreadState = MVar Annex.AnnexState
|
||||
type ThreadState = MVar (Annex.AnnexState, Annex.AnnexRead)
|
||||
|
||||
{- Stores the Annex state in a MVar.
|
||||
-
|
||||
|
@ -24,9 +24,10 @@ type ThreadState = MVar Annex.AnnexState
|
|||
withThreadState :: (ThreadState -> Annex a) -> Annex a
|
||||
withThreadState a = do
|
||||
state <- Annex.getState id
|
||||
mvar <- liftIO $ newMVar state
|
||||
rd <- Annex.getRead id
|
||||
mvar <- liftIO $ newMVar (state, rd)
|
||||
r <- a mvar
|
||||
newstate <- liftIO $ takeMVar mvar
|
||||
newstate <- liftIO $ fst <$> takeMVar mvar
|
||||
Annex.changeState (const newstate)
|
||||
return r
|
||||
|
||||
|
@ -35,4 +36,5 @@ withThreadState a = do
|
|||
- This serializes calls by threads; only one thread can run in Annex at a
|
||||
- time. -}
|
||||
runThreadState :: ThreadState -> Annex a -> IO a
|
||||
runThreadState mvar a = modifyMVar mvar $ \state -> swap <$> Annex.run state a
|
||||
runThreadState mvar a = modifyMVar mvar $ \v -> swap <$> Annex.run v a
|
||||
|
||||
|
|
|
@ -68,9 +68,10 @@ commandAction start = getConcurrency >>= \case
|
|||
Just tv ->
|
||||
liftIO (atomically (waitStartWorkerSlot tv)) >>=
|
||||
maybe runnonconcurrent (runconcurrent' tv)
|
||||
runconcurrent' tv (workerst, workerstage) = do
|
||||
aid <- liftIO $ async $ snd <$> Annex.run workerst
|
||||
(concurrentjob workerst)
|
||||
runconcurrent' tv (workerstrd, workerstage) = do
|
||||
aid <- liftIO $ async $ snd
|
||||
<$> Annex.run workerstrd
|
||||
(concurrentjob (fst workerstrd))
|
||||
liftIO $ atomically $ do
|
||||
pool <- takeTMVar tv
|
||||
let !pool' = addWorkerPool (ActiveWorker aid workerstage) pool
|
||||
|
@ -78,12 +79,12 @@ commandAction start = getConcurrency >>= \case
|
|||
void $ liftIO $ forkIO $ debugLocks $ do
|
||||
-- accountCommandAction will usually catch
|
||||
-- exceptions. Just in case, fall back to the
|
||||
-- original workerst.
|
||||
workerst' <- either (const workerst) id
|
||||
-- original workerstrd.
|
||||
workerstrd' <- either (const workerstrd) id
|
||||
<$> waitCatch aid
|
||||
atomically $ do
|
||||
pool <- takeTMVar tv
|
||||
let !pool' = deactivateWorker pool aid workerst'
|
||||
let !pool' = deactivateWorker pool aid workerstrd'
|
||||
putTMVar tv pool'
|
||||
|
||||
concurrentjob workerst = start >>= \case
|
||||
|
@ -133,12 +134,12 @@ finishCommandActions = Annex.getState Annex.workers >>= \case
|
|||
Nothing -> noop
|
||||
Just tv -> do
|
||||
Annex.changeState $ \s -> s { Annex.workers = Nothing }
|
||||
sts <- liftIO $ atomically $ do
|
||||
vs <- liftIO $ atomically $ do
|
||||
pool <- readTMVar tv
|
||||
if allIdle pool
|
||||
then return (spareVals pool)
|
||||
else retry
|
||||
mapM_ mergeState sts
|
||||
mapM_ (mergeState . fst) vs
|
||||
|
||||
{- Waits for all worker threads that have been started so far to finish. -}
|
||||
waitForAllRunningCommandActions :: Annex ()
|
||||
|
@ -254,8 +255,9 @@ startConcurrency usedstages a = do
|
|||
Annex.changeState $ \s -> s { Annex.workers = Just tv }
|
||||
prepDupState
|
||||
st <- dupState
|
||||
rd <- Annex.getRead id
|
||||
liftIO $ atomically $ putTMVar tv $
|
||||
allocateWorkerPool st (max n 1) usedstages
|
||||
allocateWorkerPool (st, rd) (max n 1) usedstages
|
||||
|
||||
-- Make sure that some expensive actions have been done before
|
||||
-- starting threads. This way the state has them already run,
|
||||
|
@ -277,7 +279,7 @@ ensureOnlyActionOn k a = debugLocks $
|
|||
go (Concurrent _) = goconcurrent
|
||||
go ConcurrentPerCpu = goconcurrent
|
||||
goconcurrent = do
|
||||
tv <- Annex.getState Annex.activekeys
|
||||
tv <- Annex.getRead Annex.activekeys
|
||||
bracket (setup tv) id (const a)
|
||||
setup tv = liftIO $ do
|
||||
mytid <- myThreadId
|
||||
|
|
|
@ -102,7 +102,9 @@ start o = starting "testremote" (ActionItemOther (Just (testRemote o))) si $ do
|
|||
|
||||
perform :: [Described (Annex (Maybe Remote))] -> Maybe Remote -> Annex (Maybe Remote) -> [Key] -> CommandPerform
|
||||
perform drs unavailr exportr ks = do
|
||||
st <- liftIO . newTVarIO =<< Annex.getState id
|
||||
st <- liftIO . newTVarIO =<< (,)
|
||||
<$> Annex.getState id
|
||||
<*> Annex.getRead id
|
||||
let tests = testGroup "Remote Tests" $ mkTestTrees
|
||||
(runTestCase st)
|
||||
drs
|
||||
|
@ -198,7 +200,7 @@ data Described t = Described
|
|||
|
||||
type RunAnnex = forall a. Annex a -> IO a
|
||||
|
||||
runTestCase :: TVar Annex.AnnexState -> RunAnnex
|
||||
runTestCase :: TVar (Annex.AnnexState, Annex.AnnexRead) -> RunAnnex
|
||||
runTestCase stv a = do
|
||||
st <- atomically $ readTVar stv
|
||||
(r, st') <- Annex.run st $ do
|
||||
|
|
|
@ -63,7 +63,7 @@ import qualified System.FilePath.ByteString as P
|
|||
-}
|
||||
runReader :: Monoid v => (SQL.ReadHandle -> Annex v) -> Annex v
|
||||
runReader a = do
|
||||
h <- Annex.getState Annex.keysdbhandle
|
||||
h <- Annex.getRead Annex.keysdbhandle
|
||||
withDbState h go
|
||||
where
|
||||
go DbUnavailable = return (mempty, DbUnavailable)
|
||||
|
@ -87,7 +87,7 @@ runReaderIO a = runReader (liftIO . a)
|
|||
- The database is created if it doesn't exist yet. -}
|
||||
runWriter :: (SQL.WriteHandle -> Annex ()) -> Annex ()
|
||||
runWriter a = do
|
||||
h <- Annex.getState Annex.keysdbhandle
|
||||
h <- Annex.getRead Annex.keysdbhandle
|
||||
withDbState h go
|
||||
where
|
||||
go st@(DbOpen qh) = do
|
||||
|
@ -144,7 +144,7 @@ openDb createdb _ = catchPermissionDenied permerr $ withExclusiveLock gitAnnexKe
|
|||
- data to it.
|
||||
-}
|
||||
closeDb :: Annex ()
|
||||
closeDb = liftIO . closeDbHandle =<< Annex.getState Annex.keysdbhandle
|
||||
closeDb = liftIO . closeDbHandle =<< Annex.getRead Annex.keysdbhandle
|
||||
|
||||
addAssociatedFile :: Key -> TopFilePath -> Annex ()
|
||||
addAssociatedFile k f = runWriterIO $ SQL.addAssociatedFile k f
|
||||
|
|
|
@ -750,7 +750,7 @@ repairRemote r a = return $ do
|
|||
ensureInitialized
|
||||
a `finally` stopCoProcesses
|
||||
|
||||
data LocalRemoteAnnex = LocalRemoteAnnex Git.Repo (MVar (Maybe Annex.AnnexState))
|
||||
data LocalRemoteAnnex = LocalRemoteAnnex Git.Repo (MVar (Maybe (Annex.AnnexState, Annex.AnnexRead)))
|
||||
|
||||
{- This can safely be called on a Repo that is not local, but of course
|
||||
- onLocal will not work if used with the result. -}
|
||||
|
@ -775,20 +775,20 @@ onLocalRepo repo a = do
|
|||
onLocal' lra a
|
||||
|
||||
onLocal' :: LocalRemoteAnnex -> Annex a -> Annex a
|
||||
onLocal' (LocalRemoteAnnex repo v) a = liftIO (takeMVar v) >>= \case
|
||||
onLocal' (LocalRemoteAnnex repo mv) a = liftIO (takeMVar mv) >>= \case
|
||||
Nothing -> do
|
||||
st <- liftIO $ Annex.new repo
|
||||
go (st, ensureInitialized >> a)
|
||||
Just st -> go (st, a)
|
||||
v <- liftIO $ Annex.new repo
|
||||
go (v, ensureInitialized >> a)
|
||||
Just v -> go (v, a)
|
||||
where
|
||||
go (st, a') = do
|
||||
go ((st, rd), a') = do
|
||||
curro <- Annex.getState Annex.output
|
||||
let act = Annex.run (st { Annex.output = curro }) $
|
||||
let act = Annex.run (st { Annex.output = curro }, rd) $
|
||||
a' `finally` stopCoProcesses
|
||||
(ret, st') <- liftIO $ act `onException` cache st
|
||||
liftIO $ cache st'
|
||||
(ret, (st', _rd)) <- liftIO $ act `onException` cache (st, rd)
|
||||
liftIO $ cache (st', rd)
|
||||
return ret
|
||||
cache st = putMVar v (Just st)
|
||||
cache = putMVar mv . Just
|
||||
|
||||
{- Faster variant of onLocal.
|
||||
-
|
||||
|
|
|
@ -26,14 +26,14 @@ import Control.Concurrent
|
|||
-- since only one liftAnnex can be running at a time, across all
|
||||
-- transports.
|
||||
liftAnnex :: TransportHandle -> Annex a -> IO a
|
||||
liftAnnex (TransportHandle _ annexstate) a = do
|
||||
st <- takeMVar annexstate
|
||||
(r, st') <- Annex.run st a
|
||||
putMVar annexstate st'
|
||||
liftAnnex (TransportHandle _ stmv rd) a = do
|
||||
st <- takeMVar stmv
|
||||
(r, (st', _rd)) <- Annex.run (st, rd) a
|
||||
putMVar stmv st'
|
||||
return r
|
||||
|
||||
inLocalRepo :: TransportHandle -> (Git.Repo -> IO a) -> IO a
|
||||
inLocalRepo (TransportHandle (LocalRepo g) _) a = a g
|
||||
inLocalRepo (TransportHandle (LocalRepo g) _ _) a = a g
|
||||
|
||||
-- Check if some shas should be fetched from the remote,
|
||||
-- and presumably later merged.
|
||||
|
|
|
@ -139,7 +139,7 @@ runController ichan ochan = do
|
|||
-- Generates a map with a transport for each supported remote in the git repo,
|
||||
-- except those that have annex.sync = false
|
||||
genRemoteMap :: TransportHandle -> TChan Emitted -> IO RemoteMap
|
||||
genRemoteMap h@(TransportHandle (LocalRepo g) _) ochan = do
|
||||
genRemoteMap h@(TransportHandle (LocalRepo g) _ _) ochan = do
|
||||
rs <- Git.Construct.fromRemotes g
|
||||
M.fromList . catMaybes <$> mapM gen rs
|
||||
where
|
||||
|
@ -161,17 +161,18 @@ genRemoteMap h@(TransportHandle (LocalRepo g) _) ochan = do
|
|||
|
||||
genTransportHandle :: IO TransportHandle
|
||||
genTransportHandle = do
|
||||
annexstate <- newMVar =<< Annex.new =<< Git.CurrentRepo.get
|
||||
g <- Annex.repo <$> readMVar annexstate
|
||||
let h = TransportHandle (LocalRepo g) annexstate
|
||||
(st, rd) <- Annex.new =<< Git.CurrentRepo.get
|
||||
mvar <- newMVar st
|
||||
let g = Annex.repo st
|
||||
let h = TransportHandle (LocalRepo g) mvar rd
|
||||
liftAnnex h $ do
|
||||
Annex.setOutput QuietOutput
|
||||
enableInteractiveBranchAccess
|
||||
return h
|
||||
|
||||
updateTransportHandle :: TransportHandle -> IO TransportHandle
|
||||
updateTransportHandle h@(TransportHandle _g annexstate) = do
|
||||
updateTransportHandle h@(TransportHandle _g st rd) = do
|
||||
g' <- liftAnnex h $ do
|
||||
reloadConfig
|
||||
Annex.gitRepo
|
||||
return (TransportHandle (LocalRepo g') annexstate)
|
||||
return (TransportHandle (LocalRepo g') st rd)
|
||||
|
|
|
@ -17,7 +17,7 @@ import Remote.GCrypt (accessShellConfig)
|
|||
import Annex.Ssh
|
||||
|
||||
transport :: Transport
|
||||
transport rr@(RemoteRepo r gc) url h@(TransportHandle (LocalRepo g) _) ichan ochan
|
||||
transport rr@(RemoteRepo r gc) url h@(TransportHandle (LocalRepo g) _ _) ichan ochan
|
||||
| accessShellConfig gc = do
|
||||
r' <- encryptedRemote g r
|
||||
v <- liftAnnex h $ git_annex_shell ConsumeStdin r' "notifychanges" [] []
|
||||
|
|
|
@ -29,10 +29,10 @@ transport rr@(RemoteRepo r _) url h ichan ochan = do
|
|||
Just (cmd, params) -> transportUsingCmd cmd params rr url h ichan ochan
|
||||
|
||||
transportUsingCmd :: FilePath -> [CommandParam] -> Transport
|
||||
transportUsingCmd cmd params rr@(RemoteRepo r gc) url h@(TransportHandle (LocalRepo g) s) ichan ochan = do
|
||||
transportUsingCmd cmd params rr@(RemoteRepo r gc) url h@(TransportHandle (LocalRepo g) st rd) ichan ochan = do
|
||||
-- enable ssh connection caching wherever inLocalRepo is called
|
||||
g' <- liftAnnex h $ sshOptionsTo r gc g
|
||||
let transporthandle = TransportHandle (LocalRepo g') s
|
||||
let transporthandle = TransportHandle (LocalRepo g') st rd
|
||||
transportUsingCmd' cmd params rr url transporthandle ichan ochan
|
||||
|
||||
transportUsingCmd' :: FilePath -> [CommandParam] -> Transport
|
||||
|
|
|
@ -39,7 +39,7 @@ import System.Posix.User
|
|||
|
||||
-- Run tor hidden service.
|
||||
server :: Server
|
||||
server ichan th@(TransportHandle (LocalRepo r) _) = go
|
||||
server ichan th@(TransportHandle (LocalRepo r) _ _) = go
|
||||
where
|
||||
go = checkstartservice >>= handlecontrol
|
||||
|
||||
|
@ -88,7 +88,7 @@ maxConnections :: Int
|
|||
maxConnections = 100
|
||||
|
||||
serveClient :: TransportHandle -> UUID -> Repo -> TBMQueue Handle -> IO ()
|
||||
serveClient th u r q = bracket setup cleanup start
|
||||
serveClient th@(TransportHandle _ _ rd) u r q = bracket setup cleanup start
|
||||
where
|
||||
setup = do
|
||||
h <- atomically $ readTBMQueue q
|
||||
|
@ -105,7 +105,7 @@ serveClient th u r q = bracket setup cleanup start
|
|||
-- Avoid doing any work in the liftAnnex, since only one
|
||||
-- can run at a time.
|
||||
st <- liftAnnex th dupState
|
||||
((), st') <- Annex.run st $ do
|
||||
((), (st', _rd)) <- Annex.run (st, rd) $ do
|
||||
-- Load auth tokens for every connection, to notice
|
||||
-- when the allowed set is changed.
|
||||
allowed <- loadP2PAuthTokens
|
||||
|
|
|
@ -35,12 +35,12 @@ type Server = TChan Consumed -> TransportHandle -> IO ()
|
|||
data RemoteRepo = RemoteRepo Git.Repo RemoteGitConfig
|
||||
newtype LocalRepo = LocalRepo Git.Repo
|
||||
|
||||
-- All Transports share a single AnnexState MVar
|
||||
-- All Transports share a single AnnexState MVar and an AnnexRead.
|
||||
--
|
||||
-- Different TransportHandles may have different versions of the LocalRepo.
|
||||
-- (For example, the ssh transport modifies it to enable ssh connection
|
||||
-- caching.)
|
||||
data TransportHandle = TransportHandle LocalRepo (MVar Annex.AnnexState)
|
||||
data TransportHandle = TransportHandle LocalRepo (MVar Annex.AnnexState) Annex.AnnexRead
|
||||
|
||||
-- Messages that the daemon emits.
|
||||
data Emitted
|
||||
|
|
Loading…
Reference in a new issue