diff --git a/Annex.hs b/Annex.hs index e4d4251c01..4c666a5acc 100644 --- a/Annex.hs +++ b/Annex.hs @@ -43,7 +43,6 @@ import qualified Git import qualified Git.Config import qualified Git.Construct import Annex.Fixup -import Git.CatFile import Git.HashObject import Git.CheckAttr import Git.CheckIgnore @@ -68,6 +67,7 @@ import Types.CleanupActions import Types.AdjustedBranch import Types.WorkerPool import Types.IndexFiles +import Types.CatFileHandles import qualified Database.Keys.Handle as Keys import Utility.InodeCache import Utility.Url @@ -116,7 +116,7 @@ data AnnexState = AnnexState , daemon :: Bool , branchstate :: BranchState , repoqueue :: Maybe (Git.Queue.Queue Annex) - , catfilehandles :: M.Map FilePath CatFileHandle + , catfilehandles :: CatFileHandles , hashobjecthandle :: Maybe HashObjectHandle , checkattrhandle :: Maybe CheckAttrHandle , checkignorehandle :: Maybe CheckIgnoreHandle @@ -174,7 +174,7 @@ newState c r = do , daemon = False , branchstate = startBranchState , repoqueue = Nothing - , catfilehandles = M.empty + , catfilehandles = catFileHandlesNonConcurrent , hashobjecthandle = Nothing , checkattrhandle = Nothing , checkignorehandle = Nothing diff --git a/Annex/Branch.hs b/Annex/Branch.hs index 52e210c54b..ace4bb1719 100644 --- a/Annex/Branch.hs +++ b/Annex/Branch.hs @@ -418,8 +418,8 @@ mergeIndex :: JournalLocked -> [Git.Ref] -> Annex () mergeIndex jl branches = do prepareModifyIndex jl hashhandle <- hashObjectHandle - ch <- catFileHandle - inRepo $ \g -> Git.UnionMerge.mergeIndex hashhandle ch g branches + withCatFileHandle $ \ch -> + inRepo $ \g -> Git.UnionMerge.mergeIndex hashhandle ch g branches {- Removes any stale git lock file, to avoid git falling over when - updating the index. diff --git a/Annex/CatFile.hs b/Annex/CatFile.hs index 427d80db20..94c26b68bf 100644 --- a/Annex/CatFile.hs +++ b/Annex/CatFile.hs @@ -1,10 +1,12 @@ {- git cat-file interface, with handle automatically stored in the Annex monad - - - Copyright 2011-2019 Joey Hess + - Copyright 2011-2020 Joey Hess - - Licensed under the GNU AGPL version 3 or higher. -} +{-# LANGUAGE BangPatterns #-} + module Annex.CatFile ( catFile, catFileDetails, @@ -12,7 +14,7 @@ module Annex.CatFile ( catTree, catCommit, catObjectDetails, - catFileHandle, + withCatFileHandle, catObjectMetaData, catFileStop, catKey, @@ -27,6 +29,7 @@ module Annex.CatFile ( import qualified Data.ByteString.Lazy as L import qualified Data.Map as M import System.PosixCompat.Types +import Control.Concurrent.STM import Annex.Common import qualified Git @@ -39,64 +42,94 @@ import qualified Git.Ref import Annex.Link import Annex.CurrentBranch import Types.AdjustedBranch +import Types.CatFileHandles +import Utility.ResourcePool catFile :: Git.Branch -> RawFilePath -> Annex L.ByteString -catFile branch file = do - h <- catFileHandle +catFile branch file = withCatFileHandle $ \h -> liftIO $ Git.CatFile.catFile h branch file catFileDetails :: Git.Branch -> RawFilePath -> Annex (Maybe (L.ByteString, Sha, ObjectType)) -catFileDetails branch file = do - h <- catFileHandle +catFileDetails branch file = withCatFileHandle $ \h -> liftIO $ Git.CatFile.catFileDetails h branch file catObject :: Git.Ref -> Annex L.ByteString -catObject ref = do - h <- catFileHandle +catObject ref = withCatFileHandle $ \h -> liftIO $ Git.CatFile.catObject h ref catObjectMetaData :: Git.Ref -> Annex (Maybe (Sha, Integer, ObjectType)) -catObjectMetaData ref = do - h <- catFileHandle +catObjectMetaData ref = withCatFileHandle $ \h -> liftIO $ Git.CatFile.catObjectMetaData h ref catTree :: Git.Ref -> Annex [(FilePath, FileMode)] -catTree ref = do - h <- catFileHandle +catTree ref = withCatFileHandle $ \h -> liftIO $ Git.CatFile.catTree h ref catCommit :: Git.Ref -> Annex (Maybe Commit) -catCommit ref = do - h <- catFileHandle +catCommit ref = withCatFileHandle $ \h -> liftIO $ Git.CatFile.catCommit h ref catObjectDetails :: Git.Ref -> Annex (Maybe (L.ByteString, Sha, ObjectType)) -catObjectDetails ref = do - h <- catFileHandle +catObjectDetails ref = withCatFileHandle $ \h -> liftIO $ Git.CatFile.catObjectDetails h ref {- There can be multiple index files, and a different cat-file is needed - - for each. This is selected by setting GIT_INDEX_FILE in the gitEnv. -} -catFileHandle :: Annex Git.CatFile.CatFileHandle -catFileHandle = do - m <- Annex.getState Annex.catfilehandles + - for each. That is selected by setting GIT_INDEX_FILE in the gitEnv + - before running this. -} +withCatFileHandle :: (Git.CatFile.CatFileHandle -> Annex a) -> Annex a +withCatFileHandle a = do + cfh <- Annex.getState Annex.catfilehandles indexfile <- fromMaybe "" . maybe Nothing (lookup indexEnv) <$> fromRepo gitEnv - case M.lookup indexfile m of - Just h -> return h - Nothing -> do - h <- inRepo Git.CatFile.catFileStart - let m' = M.insert indexfile h m - Annex.changeState $ \s -> s { Annex.catfilehandles = m' } - return h + p <- case cfh of + CatFileHandlesNonConcurrent m -> case M.lookup indexfile m of + Just p -> return p + Nothing -> do + p <- mkResourcePoolNonConcurrent startcatfile + let !m' = M.insert indexfile p m + Annex.changeState $ \s -> s { Annex.catfilehandles = CatFileHandlesNonConcurrent m' } + return p + CatFileHandlesPool tm -> do + m <- liftIO $ atomically $ takeTMVar tm + case M.lookup indexfile m of + Just p -> do + liftIO $ atomically $ putTMVar tm m + return p + Nothing -> do + p <- mkResourcePool maxCatFiles + let !m' = M.insert indexfile p m + liftIO $ atomically $ putTMVar tm m' + return p + withResourcePool p startcatfile a + where + startcatfile = inRepo Git.CatFile.catFileStart + +{- A lot of git cat-file processes are unlikely to improve concurrency, + - because a query to them takes only a little bit of CPU, and tends to be + - bottlenecked on disk. Also, they each open a number of files, so + - using too many might run out of file handles. So, only start a maximum + - of 2. + - + - Note that each different index file gets its own pool of cat-files; + - this is the size of each pool. In all, 4 times this many cat-files + - may end up running. + -} +maxCatFiles :: Int +maxCatFiles = 2 {- Stops all running cat-files. Should only be run when it's known that - nothing is using the handles, eg at shutdown. -} catFileStop :: Annex () catFileStop = do - m <- Annex.withState $ pure . \s -> - (s { Annex.catfilehandles = M.empty }, Annex.catfilehandles s) - liftIO $ mapM_ Git.CatFile.catFileStop (M.elems m) + cfh <- Annex.getState Annex.catfilehandles + m <- case cfh of + CatFileHandlesNonConcurrent m -> do + Annex.changeState $ \s -> s { Annex.catfilehandles = CatFileHandlesNonConcurrent M.empty } + return m + CatFileHandlesPool tm -> + liftIO $ atomically $ swapTMVar tm M.empty + liftIO $ forM_ (M.elems m) $ \p -> + freeResourcePool p Git.CatFile.catFileStop {- From ref to a symlink or a pointer file, get the key. -} catKey :: Ref -> Annex (Maybe Key) diff --git a/Annex/Concurrent.hs b/Annex/Concurrent.hs index f8d40273bc..64f7911734 100644 --- a/Annex/Concurrent.hs +++ b/Annex/Concurrent.hs @@ -1,6 +1,6 @@ {- git-annex concurrent state - - - Copyright 2015-2019 Joey Hess + - Copyright 2015-2020 Joey Hess - - Licensed under the GNU AGPL version 3 or higher. -} @@ -11,13 +11,29 @@ import Annex import Annex.Common import qualified Annex.Queue import Annex.Action +import Types.Concurrency import Types.WorkerPool +import Types.CatFileHandles import Remote.List import Control.Concurrent import Control.Concurrent.STM import qualified Data.Map as M +setConcurrency :: Concurrency -> Annex () +setConcurrency NonConcurrent = Annex.changeState $ \s -> s + { Annex.concurrency = NonConcurrent + } +setConcurrency c = do + cfh <- Annex.getState Annex.catfilehandles + cfh' <- case cfh of + CatFileHandlesNonConcurrent _ -> liftIO catFileHandlesPool + CatFileHandlesPool _ -> pure cfh + Annex.changeState $ \s -> s + { Annex.concurrency = c + , Annex.catfilehandles = cfh' + } + {- Allows forking off a thread that uses a copy of the current AnnexState - to run an Annex action. - @@ -50,11 +66,17 @@ dupState = do _ <- remoteList st <- Annex.getState id - return $ st + -- Make sure that concurrency is enabled, if it was not already, + -- so the resource pools are set up. + st' <- case Annex.concurrency st of + NonConcurrent -> do + setConcurrency (Concurrent 1) + Annex.getState id + _ -> return st + return $ st' -- each thread has its own repoqueue { Annex.repoqueue = Nothing - -- avoid sharing eg, open file handles - , Annex.catfilehandles = M.empty + -- avoid sharing open file handles , Annex.checkattrhandle = Nothing , Annex.checkignorehandle = Nothing } diff --git a/CHANGELOG b/CHANGELOG index 5ad1e70a1a..c3c1b1abf9 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -15,6 +15,8 @@ git-annex (8.20200331) UNRELEASED; urgency=medium for unlocked files, which already worked for locked files. * Avoid repeatedly opening keys db when accessing a local git remote and -J is used. + * Avoid running a large number of git cat-file child processes when run + with a large -J value. -- Joey Hess Mon, 30 Mar 2020 15:58:34 -0400 diff --git a/CmdLine/Action.hs b/CmdLine/Action.hs index e72d095292..da1f7dd3ec 100644 --- a/CmdLine/Action.hs +++ b/CmdLine/Action.hs @@ -199,8 +199,7 @@ startConcurrency :: UsedStages -> Annex a -> Annex a startConcurrency usedstages a = do fromcmdline <- Annex.getState Annex.concurrency fromgitcfg <- annexJobs <$> Annex.getGitConfig - let usegitcfg = Annex.changeState $ - \c -> c { Annex.concurrency = fromgitcfg } + let usegitcfg = setConcurrency fromgitcfg case (fromcmdline, fromgitcfg) of (NonConcurrent, NonConcurrent) -> a (Concurrent n, _) -> diff --git a/CmdLine/GitAnnex/Options.hs b/CmdLine/GitAnnex/Options.hs index 4a0bcb8857..37e7acac08 100644 --- a/CmdLine/GitAnnex/Options.hs +++ b/CmdLine/GitAnnex/Options.hs @@ -36,6 +36,7 @@ import CmdLine.GlobalSetter import qualified Backend import qualified Types.Backend as Backend import Utility.HumanTime +import Annex.Concurrent -- Global options that are accepted by all git-annex sub-commands, -- although not always used. @@ -395,7 +396,7 @@ jsonProgressOption = -- action in `allowConcurrentOutput`. jobsOption :: [GlobalOption] jobsOption = - [ globalSetter set $ + [ globalSetter setConcurrency $ option (maybeReader parseConcurrency) ( long "jobs" <> short 'J' <> metavar (paramNumber `paramOr` "cpus") @@ -403,8 +404,6 @@ jobsOption = <> hidden ) ] - where - set v = Annex.changeState $ \s -> s { Annex.concurrency = v } timeLimitOption :: [GlobalOption] timeLimitOption = diff --git a/Types/CatFileHandles.hs b/Types/CatFileHandles.hs new file mode 100644 index 0000000000..2e98ae30b6 --- /dev/null +++ b/Types/CatFileHandles.hs @@ -0,0 +1,30 @@ +{- git-cat file handles pools + - + - Copyright 2020 Joey Hess + - + - Licensed under the GNU AGPL version 3 or higher. + -} + +module Types.CatFileHandles ( + CatFileHandles(..), + catFileHandlesNonConcurrent, + catFileHandlesPool, +) where + +import Control.Concurrent.STM +import qualified Data.Map as M + +import Utility.ResourcePool +import Git.CatFile (CatFileHandle) + +data CatFileHandles + = CatFileHandlesNonConcurrent CatMap + | CatFileHandlesPool (TMVar CatMap) + +type CatMap = M.Map FilePath (ResourcePool CatFileHandle) + +catFileHandlesNonConcurrent :: CatFileHandles +catFileHandlesNonConcurrent = CatFileHandlesNonConcurrent M.empty + +catFileHandlesPool :: IO CatFileHandles +catFileHandlesPool = CatFileHandlesPool <$> newTMVarIO M.empty diff --git a/Types/Concurrency.hs b/Types/Concurrency.hs index 3eab7b71e6..abae4a7729 100644 --- a/Types/Concurrency.hs +++ b/Types/Concurrency.hs @@ -11,6 +11,7 @@ import Utility.PartialPrelude -- the former specifies 1 job of each particular kind, but there can be -- more than one kind of job running concurrently. data Concurrency = NonConcurrent | Concurrent Int | ConcurrentPerCpu + deriving (Eq) parseConcurrency :: String -> Maybe Concurrency parseConcurrency "cpus" = Just ConcurrentPerCpu diff --git a/Utility/CoProcess.hs b/Utility/CoProcess.hs index 2bae40fbae..59c0064232 100644 --- a/Utility/CoProcess.hs +++ b/Utility/CoProcess.hs @@ -9,7 +9,8 @@ {-# LANGUAGE CPP #-} module Utility.CoProcess ( - CoProcessHandle, + CoProcessHandle(..), + CoProcessState(..), start, stop, query, diff --git a/Utility/ResourcePool.hs b/Utility/ResourcePool.hs new file mode 100644 index 0000000000..be9010e608 --- /dev/null +++ b/Utility/ResourcePool.hs @@ -0,0 +1,94 @@ +{- Resource pools. + - + - Copyright 2020 Joey Hess + - + - License: BSD-2-clause + -} + +{-# LANGUAGE BangPatterns #-} + +module Utility.ResourcePool ( + ResourcePool, + mkResourcePool, + mkResourcePoolNonConcurrent, + withResourcePool, + freeResourcePool, +) where + +import Common + +import Control.Concurrent.STM +import Control.Monad.IO.Class +import Data.Either + +data ResourcePool r + = ResourcePool Int (TVar Int) (TVar [r]) + | ResourcePoolNonConcurrent r + +{- Make a new resource pool, that can grow to contain the specified number + - of resources. -} +mkResourcePool :: MonadIO m => Int -> m (ResourcePool r) +mkResourcePool maxsz = liftIO $ + ResourcePool maxsz + <$> newTVarIO 0 + <*> newTVarIO [] + +{- When there will not be multiple threads that may + - may concurrently try to use it, using this is more + - efficient than mkResourcePool. + -} +mkResourcePoolNonConcurrent :: (MonadMask m, MonadIO m) => m r -> m (ResourcePool r) +mkResourcePoolNonConcurrent allocresource = + ResourcePoolNonConcurrent <$> allocresource + +{- Runs an action with a resource. + - + - If no free resource is available in the pool, + - will run the action the allocate a new resource if the pool's size + - allows. Or will block a resource becomes available to use. + - + - The resource is returned to the pool at the end. + -} +withResourcePool :: (MonadMask m, MonadIO m) => ResourcePool r -> m r -> (r -> m a) -> m a +withResourcePool (ResourcePoolNonConcurrent r) _ a = a r +withResourcePool (ResourcePool maxsz currsz p) allocresource a = + bracket setup cleanup a + where + setup = do + mr <- liftIO $ atomically $ do + l <- readTVar p + case l of + (r:rs) -> do + writeTVar p rs + return (Just r) + [] -> do + n <- readTVar currsz + if n < maxsz + then do + let !n' = succ n + writeTVar currsz n' + return Nothing + else retry + case mr of + Just r -> return r + Nothing -> allocresource + cleanup r = liftIO $ atomically $ modifyTVar' p (r:) + +{- Frees all resources in use in the pool, running the supplied action on + - each. (If any of the actions throw an exception, it will be rethrown + - after all the actions have completed.) + - + - The pool should not have any resources in use when this is called, + - and the pool should not be used again after calling this. + -} +freeResourcePool :: (MonadMask m, MonadIO m) => ResourcePool r -> (r -> m ()) -> m () +freeResourcePool (ResourcePoolNonConcurrent r) freeresource = freeresource r +freeResourcePool (ResourcePool _ currsz p) freeresource = do + rs <- liftIO $ atomically $ do + writeTVar currsz 0 + swapTVar p [] + res <- forM rs $ tryNonAsync . freeresource + case lefts res of + [] -> return () + (e:_) -> throwM e + diff --git a/doc/bugs/get_-J8_on_OSX_leads_to_git-annex__58___git__58___createProcess__58___runInteractiveProcess__58___pipe__58___resource_exhausted___40__Too_many_open_files__41__.mdwn b/doc/bugs/get_-J8_on_OSX_leads_to_git-annex__58___git__58___createProcess__58___runInteractiveProcess__58___pipe__58___resource_exhausted___40__Too_many_open_files__41__.mdwn index e416c89a8d..3e58b2cde4 100644 --- a/doc/bugs/get_-J8_on_OSX_leads_to_git-annex__58___git__58___createProcess__58___runInteractiveProcess__58___pipe__58___resource_exhausted___40__Too_many_open_files__41__.mdwn +++ b/doc/bugs/get_-J8_on_OSX_leads_to_git-annex__58___git__58___createProcess__58___runInteractiveProcess__58___pipe__58___resource_exhausted___40__Too_many_open_files__41__.mdwn @@ -50,5 +50,3 @@ I will try to get a chance to troubleshoot it more to provide possibly more deta [[!meta author=yoh]] [[!tag projects/datalad]] - -[[!tag moreinfo]] diff --git a/doc/bugs/get_-J8_on_OSX_leads_to_git-annex__58___git__58___createProcess__58___runInteractiveProcess__58___pipe__58___resource_exhausted___40__Too_many_open_files__41__/comment_10_61622483d4f1962f191fb6a791c6817d._comment b/doc/bugs/get_-J8_on_OSX_leads_to_git-annex__58___git__58___createProcess__58___runInteractiveProcess__58___pipe__58___resource_exhausted___40__Too_many_open_files__41__/comment_10_61622483d4f1962f191fb6a791c6817d._comment new file mode 100644 index 0000000000..3a12fad787 --- /dev/null +++ b/doc/bugs/get_-J8_on_OSX_leads_to_git-annex__58___git__58___createProcess__58___runInteractiveProcess__58___pipe__58___resource_exhausted___40__Too_many_open_files__41__/comment_10_61622483d4f1962f191fb6a791c6817d._comment @@ -0,0 +1,15 @@ +[[!comment format=mdwn + username="joey" + subject="""comment 10""" + date="2020-04-20T17:24:53Z" + content=""" +Implemented the cat-file pool. Capped at 2 cat-files of each distinct type, +so it will start a max of 8 no matter the -J level. + +(Although cat-file can also be run in those repositories so there will be +more then.) + +While testing, I noticed git-anenx drop -Jn starts n git check-attr +processes, so the same thing ought to be done with them. Leaving this bug open +for that, but I do think that the problem you reported should be fixed now. +"""]] diff --git a/git-annex.cabal b/git-annex.cabal index 1bb92a2220..5ff0b9e727 100644 --- a/git-annex.cabal +++ b/git-annex.cabal @@ -975,6 +975,7 @@ Executable git-annex Types.Backend Types.Benchmark Types.BranchState + Types.CatFileHandles Types.CleanupActions Types.Command Types.Concurrency @@ -1089,6 +1090,7 @@ Executable git-annex Utility.Process.Transcript Utility.QuickCheck Utility.RawFilePath + Utility.ResourcePool Utility.Rsync Utility.SafeCommand Utility.Scheduled