update RepoSize database from git-annex branch incrementally
The use of catObjectStream is optimally fast. Although it might be possible to combine this with git-annex branch merge to avoid some redundant work. Benchmarking, a git-annex branch that had 100000 files changed took less than 1.88 seconds to run through this.
This commit is contained in:
parent
8239824d92
commit
d09a005f2b
9 changed files with 115 additions and 33 deletions
|
@ -17,12 +17,18 @@ import qualified Annex
|
||||||
import Annex.Branch (UnmergedBranches(..), getBranch)
|
import Annex.Branch (UnmergedBranches(..), getBranch)
|
||||||
import Types.RepoSize
|
import Types.RepoSize
|
||||||
import qualified Database.RepoSize as Db
|
import qualified Database.RepoSize as Db
|
||||||
|
import Logs
|
||||||
import Logs.Location
|
import Logs.Location
|
||||||
import Logs.UUID
|
import Logs.UUID
|
||||||
import Git.Types (Sha)
|
import Git.Types (Sha)
|
||||||
|
import Git.FilePath
|
||||||
|
import Git.CatFile
|
||||||
|
import qualified Git.DiffTree as DiffTree
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
|
import Control.Concurrent.Async
|
||||||
import qualified Data.Map.Strict as M
|
import qualified Data.Map.Strict as M
|
||||||
|
import qualified Data.Set as S
|
||||||
|
|
||||||
{- Gets the repo size map. Cached for speed. -}
|
{- Gets the repo size map. Cached for speed. -}
|
||||||
getRepoSizes :: Annex (M.Map UUID RepoSize)
|
getRepoSizes :: Annex (M.Map UUID RepoSize)
|
||||||
|
@ -49,10 +55,7 @@ calcRepoSizes rsv = bracket setup cleanup $ \h -> go h `onException` failed
|
||||||
currbranchsha <- getBranch
|
currbranchsha <- getBranch
|
||||||
if oldbranchsha == currbranchsha
|
if oldbranchsha == currbranchsha
|
||||||
then calcJournalledRepoSizes oldsizemap oldbranchsha
|
then calcJournalledRepoSizes oldsizemap oldbranchsha
|
||||||
else do
|
else incrementalupdate h oldsizemap oldbranchsha currbranchsha
|
||||||
-- XXX todo incremental update by diffing
|
|
||||||
-- from old to new branch.
|
|
||||||
calculatefromscratch h
|
|
||||||
liftIO $ putMVar rsv (Just sizemap)
|
liftIO $ putMVar rsv (Just sizemap)
|
||||||
return sizemap
|
return sizemap
|
||||||
|
|
||||||
|
@ -62,6 +65,11 @@ calcRepoSizes rsv = bracket setup cleanup $ \h -> go h `onException` failed
|
||||||
liftIO $ Db.setRepoSizes h sizemap branchsha
|
liftIO $ Db.setRepoSizes h sizemap branchsha
|
||||||
calcJournalledRepoSizes sizemap branchsha
|
calcJournalledRepoSizes sizemap branchsha
|
||||||
|
|
||||||
|
incrementalupdate h oldsizemap oldbranchsha currbranchsha = do
|
||||||
|
(sizemap, branchsha) <- diffBranchRepoSizes oldsizemap oldbranchsha currbranchsha
|
||||||
|
liftIO $ Db.setRepoSizes h sizemap branchsha
|
||||||
|
calcJournalledRepoSizes sizemap branchsha
|
||||||
|
|
||||||
setup = Db.openDb
|
setup = Db.openDb
|
||||||
|
|
||||||
cleanup = Db.closeDb
|
cleanup = Db.closeDb
|
||||||
|
@ -75,7 +83,7 @@ calcRepoSizes rsv = bracket setup cleanup $ \h -> go h `onException` failed
|
||||||
- branch commit that was used.
|
- branch commit that was used.
|
||||||
-
|
-
|
||||||
- The map includes the UUIDs of all known repositories, including
|
- The map includes the UUIDs of all known repositories, including
|
||||||
- repositories that are empty.
|
- repositories that are empty. But clusters are not included.
|
||||||
-
|
-
|
||||||
- Note that private repositories, which do not get recorded in
|
- Note that private repositories, which do not get recorded in
|
||||||
- the git-annex branch, will have 0 size. journalledRepoSizes
|
- the git-annex branch, will have 0 size. journalledRepoSizes
|
||||||
|
@ -100,8 +108,48 @@ calcJournalledRepoSizes
|
||||||
-> Sha
|
-> Sha
|
||||||
-> Annex (M.Map UUID RepoSize)
|
-> Annex (M.Map UUID RepoSize)
|
||||||
calcJournalledRepoSizes startmap branchsha =
|
calcJournalledRepoSizes startmap branchsha =
|
||||||
overLocationLogsJournal startmap branchsha accumsizes Nothing
|
overLocationLogsJournal startmap branchsha
|
||||||
|
(\k v m -> pure (accumRepoSizes k v m))
|
||||||
|
Nothing
|
||||||
|
|
||||||
|
{- Incremental update by diffing. -}
|
||||||
|
diffBranchRepoSizes :: M.Map UUID RepoSize -> Sha -> Sha -> Annex (M.Map UUID RepoSize, Sha)
|
||||||
|
diffBranchRepoSizes oldsizemap oldbranchsha newbranchsha = do
|
||||||
|
g <- Annex.gitRepo
|
||||||
|
catObjectStream g $ \feeder closer reader -> do
|
||||||
|
(l, cleanup) <- inRepo $
|
||||||
|
DiffTree.diffTreeRecursive oldbranchsha newbranchsha
|
||||||
|
feedtid <- liftIO $ async $ do
|
||||||
|
forM_ l $ feedpairs feeder
|
||||||
|
closer
|
||||||
|
newsizemap <- readpairs 500000 reader oldsizemap Nothing
|
||||||
|
liftIO $ wait feedtid
|
||||||
|
ifM (liftIO cleanup)
|
||||||
|
( return (newsizemap, newbranchsha)
|
||||||
|
, return (oldsizemap, oldbranchsha)
|
||||||
|
)
|
||||||
where
|
where
|
||||||
accumsizes k (newlocs, removedlocs) m = return $
|
feedpairs feeder ti =
|
||||||
let m' = foldl' (flip $ M.alter $ addKeyRepoSize k) m newlocs
|
let f = getTopFilePath (DiffTree.file ti)
|
||||||
in foldl' (flip $ M.alter $ removeKeyRepoSize k) m' removedlocs
|
in case extLogFileKey locationLogExt f of
|
||||||
|
Nothing -> noop
|
||||||
|
Just k -> do
|
||||||
|
feeder (k, DiffTree.srcsha ti)
|
||||||
|
feeder (k, DiffTree.dstsha ti)
|
||||||
|
|
||||||
|
readpairs n reader sizemap Nothing = liftIO reader >>= \case
|
||||||
|
Just (_k, oldcontent) -> readpairs n reader sizemap (Just oldcontent)
|
||||||
|
Nothing -> return sizemap
|
||||||
|
readpairs n reader sizemap (Just oldcontent) = liftIO reader >>= \case
|
||||||
|
Just (k, newcontent) ->
|
||||||
|
let prevlog = parselog oldcontent
|
||||||
|
currlog = parselog newcontent
|
||||||
|
newlocs = S.difference currlog prevlog
|
||||||
|
removedlocs = S.difference prevlog currlog
|
||||||
|
!sizemap' = accumRepoSizes k (newlocs, removedlocs) sizemap
|
||||||
|
in do
|
||||||
|
n' <- countdownToMessage n $
|
||||||
|
showSideAction "calculating repository sizes"
|
||||||
|
readpairs n' reader sizemap' Nothing
|
||||||
|
Nothing -> return sizemap
|
||||||
|
parselog = maybe mempty (S.fromList . parseLoggedLocationsWithoutClusters)
|
||||||
|
|
|
@ -16,6 +16,7 @@ import Logs.Presence.Pure
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
import qualified Data.Map.Strict as M
|
import qualified Data.Map.Strict as M
|
||||||
|
import qualified Data.Set as S
|
||||||
|
|
||||||
updateRepoSize :: UUID -> Key -> LogStatus -> Annex ()
|
updateRepoSize :: UUID -> Key -> LogStatus -> Annex ()
|
||||||
updateRepoSize u k s = do
|
updateRepoSize u k s = do
|
||||||
|
@ -46,3 +47,8 @@ removeKeyRepoSize k mrs = case mrs of
|
||||||
Nothing -> Nothing
|
Nothing -> Nothing
|
||||||
where
|
where
|
||||||
ksz = fromMaybe 0 $ fromKey keySize k
|
ksz = fromMaybe 0 $ fromKey keySize k
|
||||||
|
|
||||||
|
accumRepoSizes :: Key -> (S.Set UUID, S.Set UUID) -> M.Map UUID RepoSize -> M.Map UUID RepoSize
|
||||||
|
accumRepoSizes k (newlocs, removedlocs) sizemap =
|
||||||
|
let !sizemap' = foldl' (flip $ M.alter $ addKeyRepoSize k) sizemap newlocs
|
||||||
|
in foldl' (flip $ M.alter $ removeKeyRepoSize k) sizemap' removedlocs
|
||||||
|
|
|
@ -476,19 +476,11 @@ reconcileStaged dbisnew qh = ifM isBareRepo
|
||||||
dbwriter dbchanged n catreader = liftIO catreader >>= \case
|
dbwriter dbchanged n catreader = liftIO catreader >>= \case
|
||||||
Just (ka, content) -> do
|
Just (ka, content) -> do
|
||||||
changed <- ka (parseLinkTargetOrPointerLazy =<< content)
|
changed <- ka (parseLinkTargetOrPointerLazy =<< content)
|
||||||
!n' <- countdownToMessage n
|
n' <- countdownToMessage n $
|
||||||
|
showSideAction "scanning for annexed files"
|
||||||
dbwriter (dbchanged || changed) n' catreader
|
dbwriter (dbchanged || changed) n' catreader
|
||||||
Nothing -> return dbchanged
|
Nothing -> return dbchanged
|
||||||
|
|
||||||
-- When the diff is large, the scan can take a while,
|
|
||||||
-- so let the user know what's going on.
|
|
||||||
countdownToMessage n
|
|
||||||
| n < 1 = return 0
|
|
||||||
| n == 1 = do
|
|
||||||
showSideAction "scanning for annexed files"
|
|
||||||
return 0
|
|
||||||
| otherwise = return (pred n)
|
|
||||||
|
|
||||||
-- How large is large? Too large and there will be a long
|
-- How large is large? Too large and there will be a long
|
||||||
-- delay before the message is shown; too short and the message
|
-- delay before the message is shown; too short and the message
|
||||||
-- will clutter things up unnecessarily. It's uncommon for 1000
|
-- will clutter things up unnecessarily. It's uncommon for 1000
|
||||||
|
|
5
Logs.hs
5
Logs.hs
|
@ -179,7 +179,10 @@ migrationTreeGraftPoint = "migrate.tree"
|
||||||
{- The pathname of the location log file for a given key. -}
|
{- The pathname of the location log file for a given key. -}
|
||||||
locationLogFile :: GitConfig -> Key -> RawFilePath
|
locationLogFile :: GitConfig -> Key -> RawFilePath
|
||||||
locationLogFile config key =
|
locationLogFile config key =
|
||||||
branchHashDir config key P.</> keyFile key <> ".log"
|
branchHashDir config key P.</> keyFile key <> locationLogExt
|
||||||
|
|
||||||
|
locationLogExt :: S.ByteString
|
||||||
|
locationLogExt = ".log"
|
||||||
|
|
||||||
{- The filename of the url log for a given key. -}
|
{- The filename of the url log for a given key. -}
|
||||||
urlLogFile :: GitConfig -> Key -> RawFilePath
|
urlLogFile :: GitConfig -> Key -> RawFilePath
|
||||||
|
|
|
@ -35,6 +35,8 @@ module Logs.Location (
|
||||||
overLocationLogs,
|
overLocationLogs,
|
||||||
overLocationLogs',
|
overLocationLogs',
|
||||||
overLocationLogsJournal,
|
overLocationLogsJournal,
|
||||||
|
parseLoggedLocations,
|
||||||
|
parseLoggedLocationsWithoutClusters,
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Annex.Common
|
import Annex.Common
|
||||||
|
@ -110,7 +112,10 @@ loggedLocationsHistorical = getLoggedLocations . historicalLogInfo
|
||||||
loggedLocationsRef :: Ref -> Annex [UUID]
|
loggedLocationsRef :: Ref -> Annex [UUID]
|
||||||
loggedLocationsRef ref = map (toUUID . fromLogInfo) . getLog <$> catObject ref
|
loggedLocationsRef ref = map (toUUID . fromLogInfo) . getLog <$> catObject ref
|
||||||
|
|
||||||
{- Parses the content of a log file and gets the locations in it. -}
|
{- Parses the content of a log file and gets the locations in it.
|
||||||
|
-
|
||||||
|
- Adds the UUIDs of any clusters whose nodes are in the list.
|
||||||
|
-}
|
||||||
parseLoggedLocations :: Clusters -> L.ByteString -> [UUID]
|
parseLoggedLocations :: Clusters -> L.ByteString -> [UUID]
|
||||||
parseLoggedLocations clusters =
|
parseLoggedLocations clusters =
|
||||||
addClusterUUIDs clusters . parseLoggedLocationsWithoutClusters
|
addClusterUUIDs clusters . parseLoggedLocationsWithoutClusters
|
||||||
|
@ -127,7 +132,6 @@ getLoggedLocations getter key = do
|
||||||
clusters <- getClusters
|
clusters <- getClusters
|
||||||
return $ addClusterUUIDs clusters locs
|
return $ addClusterUUIDs clusters locs
|
||||||
|
|
||||||
-- Add UUIDs of any clusters whose nodes are in the list.
|
|
||||||
addClusterUUIDs :: Clusters -> [UUID] -> [UUID]
|
addClusterUUIDs :: Clusters -> [UUID] -> [UUID]
|
||||||
addClusterUUIDs clusters locs
|
addClusterUUIDs clusters locs
|
||||||
| M.null clustermap = locs
|
| M.null clustermap = locs
|
||||||
|
|
17
Messages.hs
17
Messages.hs
|
@ -5,7 +5,7 @@
|
||||||
- Licensed under the GNU AGPL version 3 or higher.
|
- Licensed under the GNU AGPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
|
||||||
{-# LANGUAGE OverloadedStrings, ScopedTypeVariables, CPP #-}
|
{-# LANGUAGE OverloadedStrings, ScopedTypeVariables, BangPatterns, CPP #-}
|
||||||
|
|
||||||
module Messages (
|
module Messages (
|
||||||
showStartMessage,
|
showStartMessage,
|
||||||
|
@ -54,6 +54,7 @@ module Messages (
|
||||||
prompt,
|
prompt,
|
||||||
mkPrompter,
|
mkPrompter,
|
||||||
sanitizeTopLevelExceptionMessages,
|
sanitizeTopLevelExceptionMessages,
|
||||||
|
countdownToMessage,
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
|
@ -364,3 +365,17 @@ sanitizeTopLevelExceptionMessages a = a `catches`
|
||||||
go e = do
|
go e = do
|
||||||
hPutStrLn stderr $ safeOutput $ toplevelMsg (show e)
|
hPutStrLn stderr $ safeOutput $ toplevelMsg (show e)
|
||||||
exitWith $ ExitFailure 1
|
exitWith $ ExitFailure 1
|
||||||
|
|
||||||
|
{- Used to only run an action that displays a message after the specified
|
||||||
|
- number of steps. This is useful when performing an action that can
|
||||||
|
- sometimes take a long time, but often does not.
|
||||||
|
-}
|
||||||
|
countdownToMessage :: Int -> Annex () -> Annex Int
|
||||||
|
countdownToMessage n showmsg
|
||||||
|
| n < 1 = return 0
|
||||||
|
| n == 1 = do
|
||||||
|
showmsg
|
||||||
|
return 0
|
||||||
|
| otherwise = do
|
||||||
|
let !n' = pred n
|
||||||
|
return n'
|
||||||
|
|
|
@ -30,19 +30,11 @@ Planned schedule of work:
|
||||||
|
|
||||||
## work notes
|
## work notes
|
||||||
|
|
||||||
* Implement [[track_free_space_in_repos_via_git-annex_branch]]:
|
* Concurrency issues with RepoSizes calculation and balanced content:
|
||||||
|
|
||||||
* updateRepoSizes incrementally when the git-annex branch sha in the
|
|
||||||
database is older than the current git-annex branch. Diff from old to
|
|
||||||
new branch to efficiently update.
|
|
||||||
|
|
||||||
Note ideas in above todo about doing this at git-annex branch merge
|
|
||||||
time to reuse the git diff done there.
|
|
||||||
|
|
||||||
* What if 2 concurrent threads are considering sending two different
|
* What if 2 concurrent threads are considering sending two different
|
||||||
keys to a repo at the same time. It can hold either but not both.
|
keys to a repo at the same time. It can hold either but not both.
|
||||||
It should avoid sending both in this situation. (Also discussed in
|
It should avoid sending both in this situation.
|
||||||
above todo)
|
|
||||||
|
|
||||||
* There can also be a race with 2 concurrent threads where one just
|
* There can also be a race with 2 concurrent threads where one just
|
||||||
finished sending to a repo, but has not yet updated the location log.
|
finished sending to a repo, but has not yet updated the location log.
|
||||||
|
@ -101,6 +93,7 @@ Planned schedule of work:
|
||||||
|
|
||||||
* Balanced preferred content basic implementation, including --rebalance
|
* Balanced preferred content basic implementation, including --rebalance
|
||||||
option.
|
option.
|
||||||
|
* Implemented [[track_free_space_in_repos_via_git-annex_branch]]
|
||||||
|
|
||||||
## completed items for August's work on git-annex proxy support for exporttre
|
## completed items for August's work on git-annex proxy support for exporttre
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,15 @@
|
||||||
|
When git-annex merges a remote into the git-annex branch, it uses
|
||||||
|
a CatFileHandle, making a query get the contents of each file in the
|
||||||
|
diff. It would be faster for it to use catObjectStream.
|
||||||
|
[[!commit d010ab04be5a8d74fe85a2fa27a853784d1f9009]] saw a 2x-16x
|
||||||
|
improvement to a similar process.
|
||||||
|
|
||||||
|
Also, Database.ContentIdentifier.updateFromLog,
|
||||||
|
Database.ImportFeed.updateFromLog, and Annex.RepoSize.diffBranchRepoSizes
|
||||||
|
each do a similar diff and cat-file to update information cached from the
|
||||||
|
git-annex branch into a database. (diffBranchRepoSizes does use
|
||||||
|
catObjectStream, the others don't.)
|
||||||
|
|
||||||
|
It seems like it might be possible to
|
||||||
|
make merging the git-annex branch do these updates in passing, and reduce
|
||||||
|
the overhead of diff and cat-file 4x. --[[Joey]]
|
|
@ -92,6 +92,9 @@ merge time. Those are less expensive than diffing the location logs only
|
||||||
because the logs they diff are less often used, and the work is only
|
because the logs they diff are less often used, and the work is only
|
||||||
done when relevant commands are run.
|
done when relevant commands are run.
|
||||||
|
|
||||||
|
(Opened [[todo/optimise_git-annex_branch_merge_and_database_updates]]
|
||||||
|
about that possibility.)
|
||||||
|
|
||||||
## concurrency
|
## concurrency
|
||||||
|
|
||||||
Suppose a repository is almost full. Two concurrent threads or processes
|
Suppose a repository is almost full. Two concurrent threads or processes
|
||||||
|
@ -106,3 +109,6 @@ sizeOfDownloadsInProgress. It would be possible to make a
|
||||||
`sizeOfUploadsInProgressToRemote r` similarly.
|
`sizeOfUploadsInProgressToRemote r` similarly.
|
||||||
|
|
||||||
[[!tag projects/openneuro]]
|
[[!tag projects/openneuro]]
|
||||||
|
|
||||||
|
> Current status: This is implemented, but concurrency issues remain.
|
||||||
|
> --[[Joey]]
|
||||||
|
|
Loading…
Add table
Reference in a new issue