share single BranchState amoung all threads
This fixes a problem when git-annex testremote is run against a cluster
accessed via the http server. Annex.Cluster uses the location log
to find nodes that contain a key when checking if the key is present or getting
it. Just after a key was stored to a cluster node, reading the location log
was not getting the UUID of that node.
Apparently the Annex action that wrote to the location log, and the one
that read from it were run with two different Annex states. The http server
does use several different Annex threads.
BranchState was part of the AnnexState, and so two threads could have
different BranchStates.
Moved BranchState to the AnnexRead, so all threads will see the common state.
This might possibly impact performance. If one thread is writing changes to the
branch, and another thread is reading from the branch, the writing thread will
now invalidate the BranchState's cache, which will cause the reading thread to
need to do extra work. But correctness is surely more important. If did is
found to have impacted performance, it could probably be dealt with by doing
smarter BranchState cache invalidation.
Another way this might impact performance is that the BranchState has a small
cache. If several threads were reading from the branch and relying on the value
they just read still being in the case, now a cache miss will be more likely.
Increasing the BranchState cache to the number of jobs might be a good
idea to amelorate that. But the cache is currently an innefficient list,
so making it large would need changes to the data types.
(Commit 4304f1b6ae
dealt with a follow-on
effect of the bug fixed here.)
This commit is contained in:
parent
4304f1b6ae
commit
770aac97a7
4 changed files with 29 additions and 16 deletions
9
Annex.hs
9
Annex.hs
|
@ -115,7 +115,8 @@ newtype Annex a = Annex { runAnnex :: ReaderT (MVar AnnexState, AnnexRead) IO a
|
||||||
|
|
||||||
-- Values that can be read, but not modified by an Annex action.
|
-- Values that can be read, but not modified by an Annex action.
|
||||||
data AnnexRead = AnnexRead
|
data AnnexRead = AnnexRead
|
||||||
{ activekeys :: TVar (M.Map Key ThreadId)
|
{ branchstate :: MVar BranchState
|
||||||
|
, activekeys :: TVar (M.Map Key ThreadId)
|
||||||
, activeremotes :: MVar (M.Map (Types.Remote.RemoteA Annex) Integer)
|
, activeremotes :: MVar (M.Map (Types.Remote.RemoteA Annex) Integer)
|
||||||
, keysdbhandle :: Keys.DbHandle
|
, keysdbhandle :: Keys.DbHandle
|
||||||
, sshstalecleaned :: TMVar Bool
|
, sshstalecleaned :: TMVar Bool
|
||||||
|
@ -137,6 +138,7 @@ data AnnexRead = AnnexRead
|
||||||
|
|
||||||
newAnnexRead :: GitConfig -> IO AnnexRead
|
newAnnexRead :: GitConfig -> IO AnnexRead
|
||||||
newAnnexRead c = do
|
newAnnexRead c = do
|
||||||
|
bs <- newMVar startBranchState
|
||||||
emptyactivekeys <- newTVarIO M.empty
|
emptyactivekeys <- newTVarIO M.empty
|
||||||
emptyactiveremotes <- newMVar M.empty
|
emptyactiveremotes <- newMVar M.empty
|
||||||
kh <- Keys.newDbHandle
|
kh <- Keys.newDbHandle
|
||||||
|
@ -146,7 +148,8 @@ newAnnexRead c = do
|
||||||
cm <- newTMVarIO M.empty
|
cm <- newTMVarIO M.empty
|
||||||
cc <- newTMVarIO (CredentialCache M.empty)
|
cc <- newTMVarIO (CredentialCache M.empty)
|
||||||
return $ AnnexRead
|
return $ AnnexRead
|
||||||
{ activekeys = emptyactivekeys
|
{ branchstate = bs
|
||||||
|
, activekeys = emptyactivekeys
|
||||||
, activeremotes = emptyactiveremotes
|
, activeremotes = emptyactiveremotes
|
||||||
, keysdbhandle = kh
|
, keysdbhandle = kh
|
||||||
, sshstalecleaned = sc
|
, sshstalecleaned = sc
|
||||||
|
@ -180,7 +183,6 @@ data AnnexState = AnnexState
|
||||||
, output :: MessageState
|
, output :: MessageState
|
||||||
, concurrency :: ConcurrencySetting
|
, concurrency :: ConcurrencySetting
|
||||||
, daemon :: Bool
|
, daemon :: Bool
|
||||||
, branchstate :: BranchState
|
|
||||||
, repoqueue :: Maybe (Git.Queue.Queue Annex)
|
, repoqueue :: Maybe (Git.Queue.Queue Annex)
|
||||||
, catfilehandles :: CatFileHandles
|
, catfilehandles :: CatFileHandles
|
||||||
, hashobjecthandle :: Maybe (ResourcePool HashObjectHandle)
|
, hashobjecthandle :: Maybe (ResourcePool HashObjectHandle)
|
||||||
|
@ -235,7 +237,6 @@ newAnnexState c r = do
|
||||||
, output = o
|
, output = o
|
||||||
, concurrency = ConcurrencyCmdLine NonConcurrent
|
, concurrency = ConcurrencyCmdLine NonConcurrent
|
||||||
, daemon = False
|
, daemon = False
|
||||||
, branchstate = startBranchState
|
|
||||||
, repoqueue = Nothing
|
, repoqueue = Nothing
|
||||||
, catfilehandles = catFileHandlesNonConcurrent
|
, catfilehandles = catFileHandlesNonConcurrent
|
||||||
, hashobjecthandle = Nothing
|
, hashobjecthandle = Nothing
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
-
|
-
|
||||||
- Runtime state about the git-annex branch, and a small cache.
|
- Runtime state about the git-annex branch, and a small cache.
|
||||||
-
|
-
|
||||||
- Copyright 2011-2022 Joey Hess <id@joeyh.name>
|
- Copyright 2011-2024 Joey Hess <id@joeyh.name>
|
||||||
-
|
-
|
||||||
- Licensed under the GNU AGPL version 3 or higher.
|
- Licensed under the GNU AGPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
@ -16,14 +16,18 @@ import qualified Annex
|
||||||
import Logs
|
import Logs
|
||||||
import qualified Git
|
import qualified Git
|
||||||
|
|
||||||
|
import Control.Concurrent
|
||||||
import qualified Data.ByteString.Lazy as L
|
import qualified Data.ByteString.Lazy as L
|
||||||
|
|
||||||
getState :: Annex BranchState
|
getState :: Annex BranchState
|
||||||
getState = Annex.getState Annex.branchstate
|
getState = do
|
||||||
|
v <- Annex.getRead Annex.branchstate
|
||||||
|
liftIO $ readMVar v
|
||||||
|
|
||||||
changeState :: (BranchState -> BranchState) -> Annex ()
|
changeState :: (BranchState -> BranchState) -> Annex ()
|
||||||
changeState changer = Annex.changeState $ \s ->
|
changeState changer = do
|
||||||
s { Annex.branchstate = changer (Annex.branchstate s) }
|
v <- Annex.getRead Annex.branchstate
|
||||||
|
liftIO $ modifyMVar_ v $ return . changer
|
||||||
|
|
||||||
{- Runs an action to check that the index file exists, if it's not been
|
{- Runs an action to check that the index file exists, if it's not been
|
||||||
- checked before in this run of git-annex. -}
|
- checked before in this run of git-annex. -}
|
||||||
|
|
|
@ -181,11 +181,13 @@ data GetPrivate = GetPrivate Bool
|
||||||
getJournalFileStale :: GetPrivate -> RawFilePath -> Annex JournalledContent
|
getJournalFileStale :: GetPrivate -> RawFilePath -> Annex JournalledContent
|
||||||
getJournalFileStale (GetPrivate getprivate) file = do
|
getJournalFileStale (GetPrivate getprivate) file = do
|
||||||
st <- Annex.getState id
|
st <- Annex.getState id
|
||||||
|
let repo = Annex.repo st
|
||||||
|
bs <- getState
|
||||||
liftIO $
|
liftIO $
|
||||||
if getprivate && privateUUIDsKnown' st
|
if getprivate && privateUUIDsKnown' st
|
||||||
then do
|
then do
|
||||||
x <- getfrom (gitAnnexJournalDir (Annex.branchstate st) (Annex.repo st))
|
x <- getfrom (gitAnnexJournalDir bs repo)
|
||||||
getfrom (gitAnnexPrivateJournalDir (Annex.branchstate st) (Annex.repo st)) >>= \case
|
getfrom (gitAnnexPrivateJournalDir bs repo) >>= \case
|
||||||
Nothing -> return $ case x of
|
Nothing -> return $ case x of
|
||||||
Nothing -> NoJournalledContent
|
Nothing -> NoJournalledContent
|
||||||
Just b -> JournalledContent b
|
Just b -> JournalledContent b
|
||||||
|
@ -195,7 +197,7 @@ getJournalFileStale (GetPrivate getprivate) file = do
|
||||||
-- happens in a merge of two
|
-- happens in a merge of two
|
||||||
-- git-annex branches.
|
-- git-annex branches.
|
||||||
Just x' -> x' <> y
|
Just x' -> x' <> y
|
||||||
else getfrom (gitAnnexJournalDir (Annex.branchstate st) (Annex.repo st)) >>= return . \case
|
else getfrom (gitAnnexJournalDir bs repo) >>= return . \case
|
||||||
Nothing -> NoJournalledContent
|
Nothing -> NoJournalledContent
|
||||||
Just b -> JournalledContent b
|
Just b -> JournalledContent b
|
||||||
where
|
where
|
||||||
|
@ -223,8 +225,9 @@ discardIncompleteAppend v
|
||||||
- journal is staged as it is run. -}
|
- journal is staged as it is run. -}
|
||||||
getJournalledFilesStale :: (BranchState -> Git.Repo -> RawFilePath) -> Annex [RawFilePath]
|
getJournalledFilesStale :: (BranchState -> Git.Repo -> RawFilePath) -> Annex [RawFilePath]
|
||||||
getJournalledFilesStale getjournaldir = do
|
getJournalledFilesStale getjournaldir = do
|
||||||
st <- Annex.getState id
|
bs <- getState
|
||||||
let d = getjournaldir (Annex.branchstate st) (Annex.repo st)
|
repo <- Annex.gitRepo
|
||||||
|
let d = getjournaldir bs repo
|
||||||
fs <- liftIO $ catchDefaultIO [] $
|
fs <- liftIO $ catchDefaultIO [] $
|
||||||
getDirectoryContents (fromRawFilePath d)
|
getDirectoryContents (fromRawFilePath d)
|
||||||
return $ filter (`notElem` [".", ".."]) $
|
return $ filter (`notElem` [".", ".."]) $
|
||||||
|
@ -233,8 +236,9 @@ getJournalledFilesStale getjournaldir = do
|
||||||
{- Directory handle open on a journal directory. -}
|
{- Directory handle open on a journal directory. -}
|
||||||
withJournalHandle :: (BranchState -> Git.Repo -> RawFilePath) -> (DirectoryHandle -> IO a) -> Annex a
|
withJournalHandle :: (BranchState -> Git.Repo -> RawFilePath) -> (DirectoryHandle -> IO a) -> Annex a
|
||||||
withJournalHandle getjournaldir a = do
|
withJournalHandle getjournaldir a = do
|
||||||
st <- Annex.getState id
|
bs <- getState
|
||||||
let d = getjournaldir (Annex.branchstate st) (Annex.repo st)
|
repo <- Annex.gitRepo
|
||||||
|
let d = getjournaldir bs repo
|
||||||
bracket (opendir d) (liftIO . closeDirectory) (liftIO . a)
|
bracket (opendir d) (liftIO . closeDirectory) (liftIO . a)
|
||||||
where
|
where
|
||||||
-- avoid overhead of creating the journal directory when it already
|
-- avoid overhead of creating the journal directory when it already
|
||||||
|
|
|
@ -30,7 +30,11 @@ Planned schedule of work:
|
||||||
|
|
||||||
* http proxying for a local git remote seems to probably not work
|
* http proxying for a local git remote seems to probably not work
|
||||||
|
|
||||||
* git-annex testremote cluster
|
* An interrupted `git-annex copy --to` a cluster via the http server,
|
||||||
|
when repeated, fails. The http server outputs "transfer already in
|
||||||
|
progress, or unable to take transfer lock". Apparently a second
|
||||||
|
connection gets opened to the cluster, because the first connection
|
||||||
|
never got shut down.
|
||||||
|
|
||||||
## completed items for July's work on p2p protocol over http
|
## completed items for July's work on p2p protocol over http
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue