cat-file resource pool
Avoid running a large number of git cat-file child processes when run with a large -J value. This implementation takes care to avoid adding any overhead to git-annex when run without -J. When run with -J, there is a small bit of added overhead, to manipulate the resource pool. That optimisation added a fair bit of complexity.
This commit is contained in:
parent
87b7b0f202
commit
cee6b344b4
14 changed files with 243 additions and 47 deletions
6
Annex.hs
6
Annex.hs
|
@ -43,7 +43,6 @@ import qualified Git
|
||||||
import qualified Git.Config
|
import qualified Git.Config
|
||||||
import qualified Git.Construct
|
import qualified Git.Construct
|
||||||
import Annex.Fixup
|
import Annex.Fixup
|
||||||
import Git.CatFile
|
|
||||||
import Git.HashObject
|
import Git.HashObject
|
||||||
import Git.CheckAttr
|
import Git.CheckAttr
|
||||||
import Git.CheckIgnore
|
import Git.CheckIgnore
|
||||||
|
@ -68,6 +67,7 @@ import Types.CleanupActions
|
||||||
import Types.AdjustedBranch
|
import Types.AdjustedBranch
|
||||||
import Types.WorkerPool
|
import Types.WorkerPool
|
||||||
import Types.IndexFiles
|
import Types.IndexFiles
|
||||||
|
import Types.CatFileHandles
|
||||||
import qualified Database.Keys.Handle as Keys
|
import qualified Database.Keys.Handle as Keys
|
||||||
import Utility.InodeCache
|
import Utility.InodeCache
|
||||||
import Utility.Url
|
import Utility.Url
|
||||||
|
@ -116,7 +116,7 @@ data AnnexState = AnnexState
|
||||||
, daemon :: Bool
|
, daemon :: Bool
|
||||||
, branchstate :: BranchState
|
, branchstate :: BranchState
|
||||||
, repoqueue :: Maybe (Git.Queue.Queue Annex)
|
, repoqueue :: Maybe (Git.Queue.Queue Annex)
|
||||||
, catfilehandles :: M.Map FilePath CatFileHandle
|
, catfilehandles :: CatFileHandles
|
||||||
, hashobjecthandle :: Maybe HashObjectHandle
|
, hashobjecthandle :: Maybe HashObjectHandle
|
||||||
, checkattrhandle :: Maybe CheckAttrHandle
|
, checkattrhandle :: Maybe CheckAttrHandle
|
||||||
, checkignorehandle :: Maybe CheckIgnoreHandle
|
, checkignorehandle :: Maybe CheckIgnoreHandle
|
||||||
|
@ -174,7 +174,7 @@ newState c r = do
|
||||||
, daemon = False
|
, daemon = False
|
||||||
, branchstate = startBranchState
|
, branchstate = startBranchState
|
||||||
, repoqueue = Nothing
|
, repoqueue = Nothing
|
||||||
, catfilehandles = M.empty
|
, catfilehandles = catFileHandlesNonConcurrent
|
||||||
, hashobjecthandle = Nothing
|
, hashobjecthandle = Nothing
|
||||||
, checkattrhandle = Nothing
|
, checkattrhandle = Nothing
|
||||||
, checkignorehandle = Nothing
|
, checkignorehandle = Nothing
|
||||||
|
|
|
@ -418,8 +418,8 @@ mergeIndex :: JournalLocked -> [Git.Ref] -> Annex ()
|
||||||
mergeIndex jl branches = do
|
mergeIndex jl branches = do
|
||||||
prepareModifyIndex jl
|
prepareModifyIndex jl
|
||||||
hashhandle <- hashObjectHandle
|
hashhandle <- hashObjectHandle
|
||||||
ch <- catFileHandle
|
withCatFileHandle $ \ch ->
|
||||||
inRepo $ \g -> Git.UnionMerge.mergeIndex hashhandle ch g branches
|
inRepo $ \g -> Git.UnionMerge.mergeIndex hashhandle ch g branches
|
||||||
|
|
||||||
{- Removes any stale git lock file, to avoid git falling over when
|
{- Removes any stale git lock file, to avoid git falling over when
|
||||||
- updating the index.
|
- updating the index.
|
||||||
|
|
|
@ -1,10 +1,12 @@
|
||||||
{- git cat-file interface, with handle automatically stored in the Annex monad
|
{- git cat-file interface, with handle automatically stored in the Annex monad
|
||||||
-
|
-
|
||||||
- Copyright 2011-2019 Joey Hess <id@joeyh.name>
|
- Copyright 2011-2020 Joey Hess <id@joeyh.name>
|
||||||
-
|
-
|
||||||
- Licensed under the GNU AGPL version 3 or higher.
|
- Licensed under the GNU AGPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
|
||||||
|
{-# LANGUAGE BangPatterns #-}
|
||||||
|
|
||||||
module Annex.CatFile (
|
module Annex.CatFile (
|
||||||
catFile,
|
catFile,
|
||||||
catFileDetails,
|
catFileDetails,
|
||||||
|
@ -12,7 +14,7 @@ module Annex.CatFile (
|
||||||
catTree,
|
catTree,
|
||||||
catCommit,
|
catCommit,
|
||||||
catObjectDetails,
|
catObjectDetails,
|
||||||
catFileHandle,
|
withCatFileHandle,
|
||||||
catObjectMetaData,
|
catObjectMetaData,
|
||||||
catFileStop,
|
catFileStop,
|
||||||
catKey,
|
catKey,
|
||||||
|
@ -27,6 +29,7 @@ module Annex.CatFile (
|
||||||
import qualified Data.ByteString.Lazy as L
|
import qualified Data.ByteString.Lazy as L
|
||||||
import qualified Data.Map as M
|
import qualified Data.Map as M
|
||||||
import System.PosixCompat.Types
|
import System.PosixCompat.Types
|
||||||
|
import Control.Concurrent.STM
|
||||||
|
|
||||||
import Annex.Common
|
import Annex.Common
|
||||||
import qualified Git
|
import qualified Git
|
||||||
|
@ -39,64 +42,94 @@ import qualified Git.Ref
|
||||||
import Annex.Link
|
import Annex.Link
|
||||||
import Annex.CurrentBranch
|
import Annex.CurrentBranch
|
||||||
import Types.AdjustedBranch
|
import Types.AdjustedBranch
|
||||||
|
import Types.CatFileHandles
|
||||||
|
import Utility.ResourcePool
|
||||||
|
|
||||||
catFile :: Git.Branch -> RawFilePath -> Annex L.ByteString
|
catFile :: Git.Branch -> RawFilePath -> Annex L.ByteString
|
||||||
catFile branch file = do
|
catFile branch file = withCatFileHandle $ \h ->
|
||||||
h <- catFileHandle
|
|
||||||
liftIO $ Git.CatFile.catFile h branch file
|
liftIO $ Git.CatFile.catFile h branch file
|
||||||
|
|
||||||
catFileDetails :: Git.Branch -> RawFilePath -> Annex (Maybe (L.ByteString, Sha, ObjectType))
|
catFileDetails :: Git.Branch -> RawFilePath -> Annex (Maybe (L.ByteString, Sha, ObjectType))
|
||||||
catFileDetails branch file = do
|
catFileDetails branch file = withCatFileHandle $ \h ->
|
||||||
h <- catFileHandle
|
|
||||||
liftIO $ Git.CatFile.catFileDetails h branch file
|
liftIO $ Git.CatFile.catFileDetails h branch file
|
||||||
|
|
||||||
catObject :: Git.Ref -> Annex L.ByteString
|
catObject :: Git.Ref -> Annex L.ByteString
|
||||||
catObject ref = do
|
catObject ref = withCatFileHandle $ \h ->
|
||||||
h <- catFileHandle
|
|
||||||
liftIO $ Git.CatFile.catObject h ref
|
liftIO $ Git.CatFile.catObject h ref
|
||||||
|
|
||||||
catObjectMetaData :: Git.Ref -> Annex (Maybe (Sha, Integer, ObjectType))
|
catObjectMetaData :: Git.Ref -> Annex (Maybe (Sha, Integer, ObjectType))
|
||||||
catObjectMetaData ref = do
|
catObjectMetaData ref = withCatFileHandle $ \h ->
|
||||||
h <- catFileHandle
|
|
||||||
liftIO $ Git.CatFile.catObjectMetaData h ref
|
liftIO $ Git.CatFile.catObjectMetaData h ref
|
||||||
|
|
||||||
catTree :: Git.Ref -> Annex [(FilePath, FileMode)]
|
catTree :: Git.Ref -> Annex [(FilePath, FileMode)]
|
||||||
catTree ref = do
|
catTree ref = withCatFileHandle $ \h ->
|
||||||
h <- catFileHandle
|
|
||||||
liftIO $ Git.CatFile.catTree h ref
|
liftIO $ Git.CatFile.catTree h ref
|
||||||
|
|
||||||
catCommit :: Git.Ref -> Annex (Maybe Commit)
|
catCommit :: Git.Ref -> Annex (Maybe Commit)
|
||||||
catCommit ref = do
|
catCommit ref = withCatFileHandle $ \h ->
|
||||||
h <- catFileHandle
|
|
||||||
liftIO $ Git.CatFile.catCommit h ref
|
liftIO $ Git.CatFile.catCommit h ref
|
||||||
|
|
||||||
catObjectDetails :: Git.Ref -> Annex (Maybe (L.ByteString, Sha, ObjectType))
|
catObjectDetails :: Git.Ref -> Annex (Maybe (L.ByteString, Sha, ObjectType))
|
||||||
catObjectDetails ref = do
|
catObjectDetails ref = withCatFileHandle $ \h ->
|
||||||
h <- catFileHandle
|
|
||||||
liftIO $ Git.CatFile.catObjectDetails h ref
|
liftIO $ Git.CatFile.catObjectDetails h ref
|
||||||
|
|
||||||
{- There can be multiple index files, and a different cat-file is needed
|
{- There can be multiple index files, and a different cat-file is needed
|
||||||
- for each. This is selected by setting GIT_INDEX_FILE in the gitEnv. -}
|
- for each. That is selected by setting GIT_INDEX_FILE in the gitEnv
|
||||||
catFileHandle :: Annex Git.CatFile.CatFileHandle
|
- before running this. -}
|
||||||
catFileHandle = do
|
withCatFileHandle :: (Git.CatFile.CatFileHandle -> Annex a) -> Annex a
|
||||||
m <- Annex.getState Annex.catfilehandles
|
withCatFileHandle a = do
|
||||||
|
cfh <- Annex.getState Annex.catfilehandles
|
||||||
indexfile <- fromMaybe "" . maybe Nothing (lookup indexEnv)
|
indexfile <- fromMaybe "" . maybe Nothing (lookup indexEnv)
|
||||||
<$> fromRepo gitEnv
|
<$> fromRepo gitEnv
|
||||||
case M.lookup indexfile m of
|
p <- case cfh of
|
||||||
Just h -> return h
|
CatFileHandlesNonConcurrent m -> case M.lookup indexfile m of
|
||||||
Nothing -> do
|
Just p -> return p
|
||||||
h <- inRepo Git.CatFile.catFileStart
|
Nothing -> do
|
||||||
let m' = M.insert indexfile h m
|
p <- mkResourcePoolNonConcurrent startcatfile
|
||||||
Annex.changeState $ \s -> s { Annex.catfilehandles = m' }
|
let !m' = M.insert indexfile p m
|
||||||
return h
|
Annex.changeState $ \s -> s { Annex.catfilehandles = CatFileHandlesNonConcurrent m' }
|
||||||
|
return p
|
||||||
|
CatFileHandlesPool tm -> do
|
||||||
|
m <- liftIO $ atomically $ takeTMVar tm
|
||||||
|
case M.lookup indexfile m of
|
||||||
|
Just p -> do
|
||||||
|
liftIO $ atomically $ putTMVar tm m
|
||||||
|
return p
|
||||||
|
Nothing -> do
|
||||||
|
p <- mkResourcePool maxCatFiles
|
||||||
|
let !m' = M.insert indexfile p m
|
||||||
|
liftIO $ atomically $ putTMVar tm m'
|
||||||
|
return p
|
||||||
|
withResourcePool p startcatfile a
|
||||||
|
where
|
||||||
|
startcatfile = inRepo Git.CatFile.catFileStart
|
||||||
|
|
||||||
|
{- A lot of git cat-file processes are unlikely to improve concurrency,
|
||||||
|
- because a query to them takes only a little bit of CPU, and tends to be
|
||||||
|
- bottlenecked on disk. Also, they each open a number of files, so
|
||||||
|
- using too many might run out of file handles. So, only start a maximum
|
||||||
|
- of 2.
|
||||||
|
-
|
||||||
|
- Note that each different index file gets its own pool of cat-files;
|
||||||
|
- this is the size of each pool. In all, 4 times this many cat-files
|
||||||
|
- may end up running.
|
||||||
|
-}
|
||||||
|
maxCatFiles :: Int
|
||||||
|
maxCatFiles = 2
|
||||||
|
|
||||||
{- Stops all running cat-files. Should only be run when it's known that
|
{- Stops all running cat-files. Should only be run when it's known that
|
||||||
- nothing is using the handles, eg at shutdown. -}
|
- nothing is using the handles, eg at shutdown. -}
|
||||||
catFileStop :: Annex ()
|
catFileStop :: Annex ()
|
||||||
catFileStop = do
|
catFileStop = do
|
||||||
m <- Annex.withState $ pure . \s ->
|
cfh <- Annex.getState Annex.catfilehandles
|
||||||
(s { Annex.catfilehandles = M.empty }, Annex.catfilehandles s)
|
m <- case cfh of
|
||||||
liftIO $ mapM_ Git.CatFile.catFileStop (M.elems m)
|
CatFileHandlesNonConcurrent m -> do
|
||||||
|
Annex.changeState $ \s -> s { Annex.catfilehandles = CatFileHandlesNonConcurrent M.empty }
|
||||||
|
return m
|
||||||
|
CatFileHandlesPool tm ->
|
||||||
|
liftIO $ atomically $ swapTMVar tm M.empty
|
||||||
|
liftIO $ forM_ (M.elems m) $ \p ->
|
||||||
|
freeResourcePool p Git.CatFile.catFileStop
|
||||||
|
|
||||||
{- From ref to a symlink or a pointer file, get the key. -}
|
{- From ref to a symlink or a pointer file, get the key. -}
|
||||||
catKey :: Ref -> Annex (Maybe Key)
|
catKey :: Ref -> Annex (Maybe Key)
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{- git-annex concurrent state
|
{- git-annex concurrent state
|
||||||
-
|
-
|
||||||
- Copyright 2015-2019 Joey Hess <id@joeyh.name>
|
- Copyright 2015-2020 Joey Hess <id@joeyh.name>
|
||||||
-
|
-
|
||||||
- Licensed under the GNU AGPL version 3 or higher.
|
- Licensed under the GNU AGPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
@ -11,13 +11,29 @@ import Annex
|
||||||
import Annex.Common
|
import Annex.Common
|
||||||
import qualified Annex.Queue
|
import qualified Annex.Queue
|
||||||
import Annex.Action
|
import Annex.Action
|
||||||
|
import Types.Concurrency
|
||||||
import Types.WorkerPool
|
import Types.WorkerPool
|
||||||
|
import Types.CatFileHandles
|
||||||
import Remote.List
|
import Remote.List
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
import qualified Data.Map as M
|
import qualified Data.Map as M
|
||||||
|
|
||||||
|
setConcurrency :: Concurrency -> Annex ()
|
||||||
|
setConcurrency NonConcurrent = Annex.changeState $ \s -> s
|
||||||
|
{ Annex.concurrency = NonConcurrent
|
||||||
|
}
|
||||||
|
setConcurrency c = do
|
||||||
|
cfh <- Annex.getState Annex.catfilehandles
|
||||||
|
cfh' <- case cfh of
|
||||||
|
CatFileHandlesNonConcurrent _ -> liftIO catFileHandlesPool
|
||||||
|
CatFileHandlesPool _ -> pure cfh
|
||||||
|
Annex.changeState $ \s -> s
|
||||||
|
{ Annex.concurrency = c
|
||||||
|
, Annex.catfilehandles = cfh'
|
||||||
|
}
|
||||||
|
|
||||||
{- Allows forking off a thread that uses a copy of the current AnnexState
|
{- Allows forking off a thread that uses a copy of the current AnnexState
|
||||||
- to run an Annex action.
|
- to run an Annex action.
|
||||||
-
|
-
|
||||||
|
@ -50,11 +66,17 @@ dupState = do
|
||||||
_ <- remoteList
|
_ <- remoteList
|
||||||
|
|
||||||
st <- Annex.getState id
|
st <- Annex.getState id
|
||||||
return $ st
|
-- Make sure that concurrency is enabled, if it was not already,
|
||||||
|
-- so the resource pools are set up.
|
||||||
|
st' <- case Annex.concurrency st of
|
||||||
|
NonConcurrent -> do
|
||||||
|
setConcurrency (Concurrent 1)
|
||||||
|
Annex.getState id
|
||||||
|
_ -> return st
|
||||||
|
return $ st'
|
||||||
-- each thread has its own repoqueue
|
-- each thread has its own repoqueue
|
||||||
{ Annex.repoqueue = Nothing
|
{ Annex.repoqueue = Nothing
|
||||||
-- avoid sharing eg, open file handles
|
-- avoid sharing open file handles
|
||||||
, Annex.catfilehandles = M.empty
|
|
||||||
, Annex.checkattrhandle = Nothing
|
, Annex.checkattrhandle = Nothing
|
||||||
, Annex.checkignorehandle = Nothing
|
, Annex.checkignorehandle = Nothing
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,8 @@ git-annex (8.20200331) UNRELEASED; urgency=medium
|
||||||
for unlocked files, which already worked for locked files.
|
for unlocked files, which already worked for locked files.
|
||||||
* Avoid repeatedly opening keys db when accessing a local git remote
|
* Avoid repeatedly opening keys db when accessing a local git remote
|
||||||
and -J is used.
|
and -J is used.
|
||||||
|
* Avoid running a large number of git cat-file child processes when run
|
||||||
|
with a large -J value.
|
||||||
|
|
||||||
-- Joey Hess <id@joeyh.name> Mon, 30 Mar 2020 15:58:34 -0400
|
-- Joey Hess <id@joeyh.name> Mon, 30 Mar 2020 15:58:34 -0400
|
||||||
|
|
||||||
|
|
|
@ -199,8 +199,7 @@ startConcurrency :: UsedStages -> Annex a -> Annex a
|
||||||
startConcurrency usedstages a = do
|
startConcurrency usedstages a = do
|
||||||
fromcmdline <- Annex.getState Annex.concurrency
|
fromcmdline <- Annex.getState Annex.concurrency
|
||||||
fromgitcfg <- annexJobs <$> Annex.getGitConfig
|
fromgitcfg <- annexJobs <$> Annex.getGitConfig
|
||||||
let usegitcfg = Annex.changeState $
|
let usegitcfg = setConcurrency fromgitcfg
|
||||||
\c -> c { Annex.concurrency = fromgitcfg }
|
|
||||||
case (fromcmdline, fromgitcfg) of
|
case (fromcmdline, fromgitcfg) of
|
||||||
(NonConcurrent, NonConcurrent) -> a
|
(NonConcurrent, NonConcurrent) -> a
|
||||||
(Concurrent n, _) ->
|
(Concurrent n, _) ->
|
||||||
|
|
|
@ -36,6 +36,7 @@ import CmdLine.GlobalSetter
|
||||||
import qualified Backend
|
import qualified Backend
|
||||||
import qualified Types.Backend as Backend
|
import qualified Types.Backend as Backend
|
||||||
import Utility.HumanTime
|
import Utility.HumanTime
|
||||||
|
import Annex.Concurrent
|
||||||
|
|
||||||
-- Global options that are accepted by all git-annex sub-commands,
|
-- Global options that are accepted by all git-annex sub-commands,
|
||||||
-- although not always used.
|
-- although not always used.
|
||||||
|
@ -395,7 +396,7 @@ jsonProgressOption =
|
||||||
-- action in `allowConcurrentOutput`.
|
-- action in `allowConcurrentOutput`.
|
||||||
jobsOption :: [GlobalOption]
|
jobsOption :: [GlobalOption]
|
||||||
jobsOption =
|
jobsOption =
|
||||||
[ globalSetter set $
|
[ globalSetter setConcurrency $
|
||||||
option (maybeReader parseConcurrency)
|
option (maybeReader parseConcurrency)
|
||||||
( long "jobs" <> short 'J'
|
( long "jobs" <> short 'J'
|
||||||
<> metavar (paramNumber `paramOr` "cpus")
|
<> metavar (paramNumber `paramOr` "cpus")
|
||||||
|
@ -403,8 +404,6 @@ jobsOption =
|
||||||
<> hidden
|
<> hidden
|
||||||
)
|
)
|
||||||
]
|
]
|
||||||
where
|
|
||||||
set v = Annex.changeState $ \s -> s { Annex.concurrency = v }
|
|
||||||
|
|
||||||
timeLimitOption :: [GlobalOption]
|
timeLimitOption :: [GlobalOption]
|
||||||
timeLimitOption =
|
timeLimitOption =
|
||||||
|
|
30
Types/CatFileHandles.hs
Normal file
30
Types/CatFileHandles.hs
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
{- git-cat file handles pools
|
||||||
|
-
|
||||||
|
- Copyright 2020 Joey Hess <id@joeyh.name>
|
||||||
|
-
|
||||||
|
- Licensed under the GNU AGPL version 3 or higher.
|
||||||
|
-}
|
||||||
|
|
||||||
|
module Types.CatFileHandles (
|
||||||
|
CatFileHandles(..),
|
||||||
|
catFileHandlesNonConcurrent,
|
||||||
|
catFileHandlesPool,
|
||||||
|
) where
|
||||||
|
|
||||||
|
import Control.Concurrent.STM
|
||||||
|
import qualified Data.Map as M
|
||||||
|
|
||||||
|
import Utility.ResourcePool
|
||||||
|
import Git.CatFile (CatFileHandle)
|
||||||
|
|
||||||
|
data CatFileHandles
|
||||||
|
= CatFileHandlesNonConcurrent CatMap
|
||||||
|
| CatFileHandlesPool (TMVar CatMap)
|
||||||
|
|
||||||
|
type CatMap = M.Map FilePath (ResourcePool CatFileHandle)
|
||||||
|
|
||||||
|
catFileHandlesNonConcurrent :: CatFileHandles
|
||||||
|
catFileHandlesNonConcurrent = CatFileHandlesNonConcurrent M.empty
|
||||||
|
|
||||||
|
catFileHandlesPool :: IO CatFileHandles
|
||||||
|
catFileHandlesPool = CatFileHandlesPool <$> newTMVarIO M.empty
|
|
@ -11,6 +11,7 @@ import Utility.PartialPrelude
|
||||||
-- the former specifies 1 job of each particular kind, but there can be
|
-- the former specifies 1 job of each particular kind, but there can be
|
||||||
-- more than one kind of job running concurrently.
|
-- more than one kind of job running concurrently.
|
||||||
data Concurrency = NonConcurrent | Concurrent Int | ConcurrentPerCpu
|
data Concurrency = NonConcurrent | Concurrent Int | ConcurrentPerCpu
|
||||||
|
deriving (Eq)
|
||||||
|
|
||||||
parseConcurrency :: String -> Maybe Concurrency
|
parseConcurrency :: String -> Maybe Concurrency
|
||||||
parseConcurrency "cpus" = Just ConcurrentPerCpu
|
parseConcurrency "cpus" = Just ConcurrentPerCpu
|
||||||
|
|
|
@ -9,7 +9,8 @@
|
||||||
{-# LANGUAGE CPP #-}
|
{-# LANGUAGE CPP #-}
|
||||||
|
|
||||||
module Utility.CoProcess (
|
module Utility.CoProcess (
|
||||||
CoProcessHandle,
|
CoProcessHandle(..),
|
||||||
|
CoProcessState(..),
|
||||||
start,
|
start,
|
||||||
stop,
|
stop,
|
||||||
query,
|
query,
|
||||||
|
|
94
Utility/ResourcePool.hs
Normal file
94
Utility/ResourcePool.hs
Normal file
|
@ -0,0 +1,94 @@
|
||||||
|
{- Resource pools.
|
||||||
|
-
|
||||||
|
- Copyright 2020 Joey Hess <id@joeyh.name>
|
||||||
|
-
|
||||||
|
- License: BSD-2-clause
|
||||||
|
-}
|
||||||
|
|
||||||
|
{-# LANGUAGE BangPatterns #-}
|
||||||
|
|
||||||
|
module Utility.ResourcePool (
|
||||||
|
ResourcePool,
|
||||||
|
mkResourcePool,
|
||||||
|
mkResourcePoolNonConcurrent,
|
||||||
|
withResourcePool,
|
||||||
|
freeResourcePool,
|
||||||
|
) where
|
||||||
|
|
||||||
|
import Common
|
||||||
|
|
||||||
|
import Control.Concurrent.STM
|
||||||
|
import Control.Monad.IO.Class
|
||||||
|
import Data.Either
|
||||||
|
|
||||||
|
data ResourcePool r
|
||||||
|
= ResourcePool Int (TVar Int) (TVar [r])
|
||||||
|
| ResourcePoolNonConcurrent r
|
||||||
|
|
||||||
|
{- Make a new resource pool, that can grow to contain the specified number
|
||||||
|
- of resources. -}
|
||||||
|
mkResourcePool :: MonadIO m => Int -> m (ResourcePool r)
|
||||||
|
mkResourcePool maxsz = liftIO $
|
||||||
|
ResourcePool maxsz
|
||||||
|
<$> newTVarIO 0
|
||||||
|
<*> newTVarIO []
|
||||||
|
|
||||||
|
{- When there will not be multiple threads that may
|
||||||
|
- may concurrently try to use it, using this is more
|
||||||
|
- efficient than mkResourcePool.
|
||||||
|
-}
|
||||||
|
mkResourcePoolNonConcurrent :: (MonadMask m, MonadIO m) => m r -> m (ResourcePool r)
|
||||||
|
mkResourcePoolNonConcurrent allocresource =
|
||||||
|
ResourcePoolNonConcurrent <$> allocresource
|
||||||
|
|
||||||
|
{- Runs an action with a resource.
|
||||||
|
-
|
||||||
|
- If no free resource is available in the pool,
|
||||||
|
- will run the action the allocate a new resource if the pool's size
|
||||||
|
- allows. Or will block a resource becomes available to use.
|
||||||
|
-
|
||||||
|
- The resource is returned to the pool at the end.
|
||||||
|
-}
|
||||||
|
withResourcePool :: (MonadMask m, MonadIO m) => ResourcePool r -> m r -> (r -> m a) -> m a
|
||||||
|
withResourcePool (ResourcePoolNonConcurrent r) _ a = a r
|
||||||
|
withResourcePool (ResourcePool maxsz currsz p) allocresource a =
|
||||||
|
bracket setup cleanup a
|
||||||
|
where
|
||||||
|
setup = do
|
||||||
|
mr <- liftIO $ atomically $ do
|
||||||
|
l <- readTVar p
|
||||||
|
case l of
|
||||||
|
(r:rs) -> do
|
||||||
|
writeTVar p rs
|
||||||
|
return (Just r)
|
||||||
|
[] -> do
|
||||||
|
n <- readTVar currsz
|
||||||
|
if n < maxsz
|
||||||
|
then do
|
||||||
|
let !n' = succ n
|
||||||
|
writeTVar currsz n'
|
||||||
|
return Nothing
|
||||||
|
else retry
|
||||||
|
case mr of
|
||||||
|
Just r -> return r
|
||||||
|
Nothing -> allocresource
|
||||||
|
cleanup r = liftIO $ atomically $ modifyTVar' p (r:)
|
||||||
|
|
||||||
|
{- Frees all resources in use in the pool, running the supplied action on
|
||||||
|
- each. (If any of the actions throw an exception, it will be rethrown
|
||||||
|
- after all the actions have completed.)
|
||||||
|
-
|
||||||
|
- The pool should not have any resources in use when this is called,
|
||||||
|
- and the pool should not be used again after calling this.
|
||||||
|
-}
|
||||||
|
freeResourcePool :: (MonadMask m, MonadIO m) => ResourcePool r -> (r -> m ()) -> m ()
|
||||||
|
freeResourcePool (ResourcePoolNonConcurrent r) freeresource = freeresource r
|
||||||
|
freeResourcePool (ResourcePool _ currsz p) freeresource = do
|
||||||
|
rs <- liftIO $ atomically $ do
|
||||||
|
writeTVar currsz 0
|
||||||
|
swapTVar p []
|
||||||
|
res <- forM rs $ tryNonAsync . freeresource
|
||||||
|
case lefts res of
|
||||||
|
[] -> return ()
|
||||||
|
(e:_) -> throwM e
|
||||||
|
|
|
@ -50,5 +50,3 @@ I will try to get a chance to troubleshoot it more to provide possibly more deta
|
||||||
|
|
||||||
[[!meta author=yoh]]
|
[[!meta author=yoh]]
|
||||||
[[!tag projects/datalad]]
|
[[!tag projects/datalad]]
|
||||||
|
|
||||||
[[!tag moreinfo]]
|
|
||||||
|
|
|
@ -0,0 +1,15 @@
|
||||||
|
[[!comment format=mdwn
|
||||||
|
username="joey"
|
||||||
|
subject="""comment 10"""
|
||||||
|
date="2020-04-20T17:24:53Z"
|
||||||
|
content="""
|
||||||
|
Implemented the cat-file pool. Capped at 2 cat-files of each distinct type,
|
||||||
|
so it will start a max of 8 no matter the -J level.
|
||||||
|
|
||||||
|
(Although cat-file can also be run in those repositories so there will be
|
||||||
|
more then.)
|
||||||
|
|
||||||
|
While testing, I noticed git-anenx drop -Jn starts n git check-attr
|
||||||
|
processes, so the same thing ought to be done with them. Leaving this bug open
|
||||||
|
for that, but I do think that the problem you reported should be fixed now.
|
||||||
|
"""]]
|
|
@ -975,6 +975,7 @@ Executable git-annex
|
||||||
Types.Backend
|
Types.Backend
|
||||||
Types.Benchmark
|
Types.Benchmark
|
||||||
Types.BranchState
|
Types.BranchState
|
||||||
|
Types.CatFileHandles
|
||||||
Types.CleanupActions
|
Types.CleanupActions
|
||||||
Types.Command
|
Types.Command
|
||||||
Types.Concurrency
|
Types.Concurrency
|
||||||
|
@ -1089,6 +1090,7 @@ Executable git-annex
|
||||||
Utility.Process.Transcript
|
Utility.Process.Transcript
|
||||||
Utility.QuickCheck
|
Utility.QuickCheck
|
||||||
Utility.RawFilePath
|
Utility.RawFilePath
|
||||||
|
Utility.ResourcePool
|
||||||
Utility.Rsync
|
Utility.Rsync
|
||||||
Utility.SafeCommand
|
Utility.SafeCommand
|
||||||
Utility.Scheduled
|
Utility.Scheduled
|
||||||
|
|
Loading…
Add table
Reference in a new issue