ImportableContentsChunkable
This improves the borg special remote memory usage, by letting it only load one archive's worth of filenames into memory at a time, and building up a larger tree out of the chunks. When a borg repository has many archives, git-annex could easily OOM before. Now, it will use only memory proportional to the number of annexed keys in an archive. Minor implementation wart: Each new chunk re-opens the content identifier database, and also a new vector clock is used for each chunk. This is a minor innefficiency only; the use of continuations makes it hard to avoid, although putting the database handle into a Reader monad would be one way to fix it. It may later be possible to extend the ImportableContentsChunkable interface to remotes that are not third-party populated. However, that would perhaps need an interface that does not use continuations. The ImportableContentsChunkable interface currently does not allow populating the top of the tree with anything other than subtrees. It would be easy to extend it to allow putting files in that tree, but borg doesn't need that so I left it out for now. Sponsored-by: Noam Kremen on Patreon
This commit is contained in:
parent
153f3600fb
commit
69f8e6c7c0
13 changed files with 286 additions and 92 deletions
224
Annex/Import.hs
224
Annex/Import.hs
|
@ -1,6 +1,6 @@
|
|||
{- git-annex import from remotes
|
||||
-
|
||||
- Copyright 2019-2020 Joey Hess <id@joeyh.name>
|
||||
- Copyright 2019-2021 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU AGPL version 3 or higher.
|
||||
-}
|
||||
|
@ -98,7 +98,7 @@ buildImportCommit
|
|||
:: Remote
|
||||
-> ImportTreeConfig
|
||||
-> ImportCommitConfig
|
||||
-> ImportableContents (Either Sha Key)
|
||||
-> ImportableContentsChunkable Annex (Either Sha Key)
|
||||
-> Annex (Maybe Ref)
|
||||
buildImportCommit remote importtreeconfig importcommitconfig importable =
|
||||
case importCommitTracking importcommitconfig of
|
||||
|
@ -123,7 +123,7 @@ buildImportCommit remote importtreeconfig importcommitconfig importable =
|
|||
recordImportTree
|
||||
:: Remote
|
||||
-> ImportTreeConfig
|
||||
-> ImportableContents (Either Sha Key)
|
||||
-> ImportableContentsChunkable Annex (Either Sha Key)
|
||||
-> Annex (History Sha, Annex ())
|
||||
recordImportTree remote importtreeconfig importable = do
|
||||
imported@(History finaltree _) <- buildImportTrees basetree subdir importable
|
||||
|
@ -264,25 +264,75 @@ buildImportCommit' remote importcommitconfig mtrackingcommit imported@(History t
|
|||
buildImportTrees
|
||||
:: Ref
|
||||
-> Maybe TopFilePath
|
||||
-> ImportableContents (Either Sha Key)
|
||||
-> ImportableContentsChunkable Annex (Either Sha Key)
|
||||
-> Annex (History Sha)
|
||||
buildImportTrees basetree msubdir importable = History
|
||||
<$> (buildtree (importableContents importable) =<< Annex.gitRepo)
|
||||
<*> buildhistory
|
||||
buildImportTrees basetree msubdir (ImportableContentsComplete importable) = do
|
||||
repo <- Annex.gitRepo
|
||||
withMkTreeHandle repo $ buildImportTrees' basetree msubdir importable
|
||||
buildImportTrees basetree msubdir importable@(ImportableContentsChunked {}) = do
|
||||
repo <- Annex.gitRepo
|
||||
withMkTreeHandle repo $ \hdl ->
|
||||
History
|
||||
<$> go hdl
|
||||
<*> buildImportTreesHistory basetree msubdir
|
||||
(importableHistoryComplete importable) hdl
|
||||
where
|
||||
go hdl = do
|
||||
tree <- gochunks [] (importableContentsChunk importable) hdl
|
||||
importtree <- liftIO $ recordTree' hdl tree
|
||||
graftImportTree basetree msubdir importtree hdl
|
||||
|
||||
gochunks l c hdl = do
|
||||
let subdir = importChunkSubDir $ importableContentsSubDir c
|
||||
-- Full directory prefix where the sub tree is located.
|
||||
let fullprefix = asTopFilePath $ case msubdir of
|
||||
Nothing -> subdir
|
||||
Just d -> getTopFilePath d Posix.</> subdir
|
||||
Tree ts <- convertImportTree (Just fullprefix) $
|
||||
map (\(p, i) -> (mkImportLocation p, i))
|
||||
(importableContentsSubTree c)
|
||||
-- Record this subtree before getting next chunk, this
|
||||
-- avoids buffering all the chunks into memory.
|
||||
tc <- liftIO $ recordSubTree hdl $
|
||||
NewSubTree (asTopFilePath subdir) ts
|
||||
importableContentsNextChunk c >>= \case
|
||||
Nothing -> return (Tree (tc:l))
|
||||
Just c' -> gochunks (tc:l) c' hdl
|
||||
|
||||
buildImportTrees'
|
||||
:: Ref
|
||||
-> Maybe TopFilePath
|
||||
-> ImportableContents (Either Sha Key)
|
||||
-> MkTreeHandle
|
||||
-> Annex (History Sha)
|
||||
buildImportTrees' basetree msubdir importable hdl = History
|
||||
<$> buildImportTree basetree msubdir (importableContents importable) hdl
|
||||
<*> buildImportTreesHistory basetree msubdir (importableHistory importable) hdl
|
||||
|
||||
buildImportTree
|
||||
:: Ref
|
||||
-> Maybe TopFilePath
|
||||
-> [(ImportLocation, Either Sha Key)]
|
||||
-> MkTreeHandle
|
||||
-> Annex Sha
|
||||
buildImportTree basetree msubdir ls hdl = do
|
||||
importtree <- liftIO . recordTree' hdl =<< convertImportTree msubdir ls
|
||||
graftImportTree basetree msubdir importtree hdl
|
||||
|
||||
graftImportTree
|
||||
:: Ref
|
||||
-> Maybe TopFilePath
|
||||
-> Sha
|
||||
-> MkTreeHandle
|
||||
-> Annex Sha
|
||||
graftImportTree basetree msubdir tree hdl = case msubdir of
|
||||
Nothing -> return tree
|
||||
Just subdir -> inRepo $ \repo ->
|
||||
graftTree' tree subdir basetree repo hdl
|
||||
|
||||
convertImportTree :: Maybe TopFilePath -> [(ImportLocation, Either Sha Key)] -> Annex Tree
|
||||
convertImportTree msubdir ls = treeItemsToTree <$> mapM mktreeitem ls
|
||||
where
|
||||
buildhistory = S.fromList
|
||||
<$> mapM (buildImportTrees basetree msubdir)
|
||||
(importableHistory importable)
|
||||
|
||||
buildtree ls repo = withMkTreeHandle repo $ \hdl -> do
|
||||
importtree <- liftIO . recordTree' hdl
|
||||
. treeItemsToTree
|
||||
=<< mapM mktreeitem ls
|
||||
case msubdir of
|
||||
Nothing -> return importtree
|
||||
Just subdir -> liftIO $
|
||||
graftTree' importtree subdir basetree repo hdl
|
||||
|
||||
mktreeitem (loc, v) = case v of
|
||||
Right k -> do
|
||||
relf <- fromRepo $ fromTopFilePath topf
|
||||
|
@ -297,6 +347,15 @@ buildImportTrees basetree msubdir importable = History
|
|||
topf = asTopFilePath $
|
||||
maybe lf (\sd -> getTopFilePath sd P.</> lf) msubdir
|
||||
|
||||
buildImportTreesHistory
|
||||
:: Ref
|
||||
-> Maybe TopFilePath
|
||||
-> [ImportableContents (Either Sha Key)]
|
||||
-> MkTreeHandle
|
||||
-> Annex (S.Set (History Sha))
|
||||
buildImportTreesHistory basetree msubdir history hdl = S.fromList
|
||||
<$> mapM (\ic -> buildImportTrees' basetree msubdir ic hdl) history
|
||||
|
||||
canImportKeys :: Remote -> Bool -> Bool
|
||||
canImportKeys remote importcontent =
|
||||
importcontent || isJust (Remote.importKey ia)
|
||||
|
@ -324,8 +383,8 @@ importKeys
|
|||
-> ImportTreeConfig
|
||||
-> Bool
|
||||
-> Bool
|
||||
-> ImportableContents (ContentIdentifier, ByteSize)
|
||||
-> Annex (Maybe (ImportableContents (Either Sha Key)))
|
||||
-> ImportableContentsChunkable Annex (ContentIdentifier, ByteSize)
|
||||
-> Annex (Maybe (ImportableContentsChunkable Annex (Either Sha Key)))
|
||||
importKeys remote importtreeconfig importcontent thirdpartypopulated importablecontents = do
|
||||
unless (canImportKeys remote importcontent) $
|
||||
giveup "This remote does not support importing without downloading content."
|
||||
|
@ -339,40 +398,82 @@ importKeys remote importtreeconfig importcontent thirdpartypopulated importablec
|
|||
-- When concurrency is enabled, this set is needed to
|
||||
-- avoid two threads both importing the same content identifier.
|
||||
importing <- liftIO $ newTVarIO S.empty
|
||||
withExclusiveLock gitAnnexContentIdentifierLock $
|
||||
bracket CIDDb.openDb CIDDb.closeDb $ \db -> do
|
||||
CIDDb.needsUpdateFromLog db
|
||||
>>= maybe noop (CIDDb.updateFromLog db)
|
||||
(run (go False cidmap importing importablecontents db))
|
||||
withciddb $ \db -> do
|
||||
CIDDb.needsUpdateFromLog db
|
||||
>>= maybe noop (CIDDb.updateFromLog db)
|
||||
(prepclock (run cidmap importing db))
|
||||
where
|
||||
-- When not importing content, reuse the same vector
|
||||
-- clock for all state that's recorded. This can save
|
||||
-- a little bit of disk space. Individual file downloads
|
||||
-- while downloading take too long for this optimisation
|
||||
-- to be safe to do.
|
||||
run a
|
||||
prepclock a
|
||||
| importcontent = a
|
||||
| otherwise = reuseVectorClockWhile a
|
||||
|
||||
go oldversion cidmap importing (ImportableContents l h) db = do
|
||||
withciddb = withExclusiveLock gitAnnexContentIdentifierLock .
|
||||
bracket CIDDb.openDb CIDDb.closeDb
|
||||
|
||||
run cidmap importing db = do
|
||||
largematcher <- largeFilesMatcher
|
||||
case importablecontents of
|
||||
ImportableContentsComplete ic ->
|
||||
go False largematcher cidmap importing db ic >>= return . \case
|
||||
Nothing -> Nothing
|
||||
Just v -> Just $ ImportableContentsComplete v
|
||||
ImportableContentsChunked {} -> do
|
||||
c <- gochunked db (importableContentsChunk importablecontents)
|
||||
gohistory largematcher cidmap importing db (importableHistoryComplete importablecontents) >>= return . \case
|
||||
Nothing -> Nothing
|
||||
Just h -> Just $ ImportableContentsChunked
|
||||
{ importableContentsChunk = c
|
||||
, importableHistoryComplete = h
|
||||
}
|
||||
|
||||
go oldversion largematcher cidmap importing db (ImportableContents l h) = do
|
||||
jobs <- forM l $ \i ->
|
||||
if thirdpartypopulated
|
||||
then thirdpartypopulatedimport cidmap db i
|
||||
then Left <$> thirdpartypopulatedimport db i
|
||||
else startimport cidmap importing db i oldversion largematcher
|
||||
l' <- liftIO $ forM jobs $
|
||||
either pure (atomically . takeTMVar)
|
||||
if any isNothing l'
|
||||
then return Nothing
|
||||
else do
|
||||
h' <- mapM (\ic -> go True cidmap importing ic db) h
|
||||
if any isNothing h'
|
||||
then return Nothing
|
||||
else return $ Just $
|
||||
ImportableContents
|
||||
(catMaybes l')
|
||||
(catMaybes h')
|
||||
else gohistory largematcher cidmap importing db h >>= return . \case
|
||||
Nothing -> Nothing
|
||||
Just h' -> Just $ ImportableContents (catMaybes l') h'
|
||||
|
||||
gohistory largematcher cidmap importing db h = do
|
||||
h' <- mapM (go True largematcher cidmap importing db) h
|
||||
if any isNothing h'
|
||||
then return Nothing
|
||||
else return $ Just $ catMaybes h'
|
||||
|
||||
gochunked db c
|
||||
-- Downloading cannot be done when chunked, since only
|
||||
-- the first chunk is processed before returning.
|
||||
| importcontent = error "importKeys does not support downloading chunked import"
|
||||
-- Chunked import is currently only used by thirdpartypopulated
|
||||
-- remotes.
|
||||
| not thirdpartypopulated = error "importKeys does not support chunked import when not thirdpartypopulated"
|
||||
| otherwise = do
|
||||
l <- forM (importableContentsSubTree c) $ \(loc, i) -> do
|
||||
let loc' = importableContentsChunkFullLocation (importableContentsSubDir c) loc
|
||||
thirdpartypopulatedimport db (loc', i) >>= return . \case
|
||||
Just (_loc, k) -> Just (loc, k)
|
||||
Nothing -> Nothing
|
||||
return $ ImportableContentsChunk
|
||||
{ importableContentsSubDir = importableContentsSubDir c
|
||||
, importableContentsSubTree = catMaybes l
|
||||
, importableContentsNextChunk =
|
||||
importableContentsNextChunk c >>= \case
|
||||
Nothing -> return Nothing
|
||||
Just c' -> withciddb $ \db' ->
|
||||
prepclock $
|
||||
Just <$> gochunked db' c'
|
||||
}
|
||||
|
||||
waitstart importing cid = liftIO $ atomically $ do
|
||||
s <- readTVar importing
|
||||
if S.member cid s
|
||||
|
@ -418,19 +519,19 @@ importKeys remote importtreeconfig importcontent thirdpartypopulated importablec
|
|||
importaction
|
||||
return (Right job)
|
||||
|
||||
thirdpartypopulatedimport cidmap db (loc, (cid, sz)) =
|
||||
thirdpartypopulatedimport db (loc, (cid, sz)) =
|
||||
case Remote.importKey ia of
|
||||
Nothing -> return $ Left Nothing
|
||||
Nothing -> return Nothing
|
||||
Just importkey ->
|
||||
tryNonAsync (importkey loc cid sz nullMeterUpdate) >>= \case
|
||||
Right (Just k) -> do
|
||||
recordcidkey cidmap db cid k
|
||||
logChange k (Remote.uuid remote) InfoPresent
|
||||
return $ Left $ Just (loc, Right k)
|
||||
Right Nothing -> return $ Left Nothing
|
||||
recordcidkey' db cid k
|
||||
logChange k (Remote.uuid remote) InfoPresent
|
||||
return $ Just (loc, Right k)
|
||||
Right Nothing -> return Nothing
|
||||
Left e -> do
|
||||
warning (show e)
|
||||
return $ Left Nothing
|
||||
return Nothing
|
||||
|
||||
importordownload cidmap db (loc, (cid, sz)) largematcher= do
|
||||
f <- locworktreefile loc
|
||||
|
@ -603,6 +704,8 @@ importKeys remote importtreeconfig importcontent thirdpartypopulated importablec
|
|||
recordcidkey cidmap db cid k = do
|
||||
liftIO $ atomically $ modifyTVar' cidmap $
|
||||
M.insert cid k
|
||||
recordcidkey' db cid k
|
||||
recordcidkey' db cid k = do
|
||||
liftIO $ CIDDb.recordContentIdentifier db rs cid k
|
||||
CIDLog.recordContentIdentifier rs cid k
|
||||
|
||||
|
@ -675,18 +778,38 @@ makeImportMatcher r = load preferredContentKeylessTokens >>= \case
|
|||
- Throws exception if unable to contact the remote.
|
||||
- Returns Nothing when there is no change since last time.
|
||||
-}
|
||||
getImportableContents :: Remote -> ImportTreeConfig -> CheckGitIgnore -> FileMatcher Annex -> Annex (Maybe (ImportableContents (ContentIdentifier, ByteSize)))
|
||||
getImportableContents :: Remote -> ImportTreeConfig -> CheckGitIgnore -> FileMatcher Annex -> Annex (Maybe (ImportableContentsChunkable Annex (ContentIdentifier, ByteSize)))
|
||||
getImportableContents r importtreeconfig ci matcher = do
|
||||
Remote.listImportableContents (Remote.importActions r) >>= \case
|
||||
Just importable -> do
|
||||
dbhandle <- Export.openDb (Remote.uuid r)
|
||||
Just <$> filterunwanted dbhandle importable
|
||||
Just (ImportableContentsComplete ic) -> do
|
||||
dbhandle <- opendbhandle
|
||||
Just . ImportableContentsComplete
|
||||
<$> filterunwanted dbhandle ic
|
||||
Just (c@(ImportableContentsChunked {})) -> do
|
||||
dbhandle <- opendbhandle
|
||||
Just <$> filterunwantedchunked dbhandle c
|
||||
Nothing -> return Nothing
|
||||
where
|
||||
filterunwanted dbhandle ic = ImportableContents
|
||||
<$> filterM (wanted dbhandle) (importableContents ic)
|
||||
<*> mapM (filterunwanted dbhandle) (importableHistory ic)
|
||||
|
||||
filterunwantedchunked dbhandle c = ImportableContentsChunked
|
||||
<$> filterunwantedchunk dbhandle (importableContentsChunk c)
|
||||
<*> mapM (filterunwanted dbhandle) (importableHistoryComplete c)
|
||||
|
||||
filterunwantedchunk dbhandle c = ImportableContentsChunk
|
||||
<$> pure (importableContentsSubDir c)
|
||||
<*> filterM (wantedunder dbhandle (importableContentsSubDir c))
|
||||
(importableContentsSubTree c)
|
||||
<*> pure (
|
||||
importableContentsNextChunk c >>= \case
|
||||
Nothing -> return Nothing
|
||||
Just c' -> Just <$> filterunwantedchunk dbhandle c'
|
||||
)
|
||||
|
||||
opendbhandle = Export.openDb (Remote.uuid r)
|
||||
|
||||
wanted dbhandle (loc, (_cid, sz))
|
||||
| ingitdir = pure False
|
||||
| otherwise =
|
||||
|
@ -697,6 +820,9 @@ getImportableContents r importtreeconfig ci matcher = do
|
|||
matches = matchesImportLocation matcher loc sz
|
||||
isknown = isKnownImportLocation dbhandle loc
|
||||
notignored = notIgnoredImportLocation importtreeconfig ci loc
|
||||
|
||||
wantedunder dbhandle root (loc, v) =
|
||||
wanted dbhandle (importableContentsChunkFullLocation root loc, v)
|
||||
|
||||
isKnownImportLocation :: Export.ExportHandle -> ImportLocation -> Annex Bool
|
||||
isKnownImportLocation dbhandle loc = liftIO $
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue