listImportableContents filtering to wanted files
This could in theory allow importing subsets of files with less memory use. Rather than building up a big import list and then filtering it to a smaller list of wanted files, support optionally filtering wanted files first. So far, the directory special remote implements it and will probably use less memory. (Since dirContentsRecursiveSkipping does lazy streaming.) Implementation in Remote.S3 is incomplete and fails to compile. Bit of a mess with ResourceT needing to use Annex. Also, in Remote.S3, filtering is not done for old versions. And mkImportableContentsUnversioned is doing now redundant work to filterwanted.
This commit is contained in:
parent
d7ca716759
commit
41edf73789
7 changed files with 112 additions and 50 deletions
|
@ -294,8 +294,13 @@ renameExportM serial adir _k old new = do
|
|||
, File newloc
|
||||
]
|
||||
|
||||
listImportableContentsM :: AndroidSerial -> AndroidPath -> ParsedRemoteConfig -> Annex (Maybe (ImportableContentsChunkable Annex (ContentIdentifier, ByteSize)))
|
||||
listImportableContentsM serial adir c = adbfind >>= \case
|
||||
listImportableContentsM
|
||||
:: AndroidSerial
|
||||
-> AndroidPath
|
||||
-> ParsedRemoteConfig
|
||||
-> ImportWantedChecker Annex
|
||||
-> Annex (Maybe (ImportableContentsChunkable Annex (ContentIdentifier, ByteSize)))
|
||||
listImportableContentsM serial adir c _wanted = adbfind >>= \case
|
||||
Just ls -> return $ Just $ ImportableContentsComplete $
|
||||
ImportableContents (mapMaybe mk ls) []
|
||||
Nothing -> giveup "adb find failed"
|
||||
|
@ -344,7 +349,7 @@ listImportableContentsM serial adir c = adbfind >>= \case
|
|||
cid = ContentIdentifier (encodeBS stat)
|
||||
loc = mkImportLocation $ toRawFilePath $
|
||||
Posix.makeRelative (fromAndroidPath adir) fn
|
||||
in Just (loc, (cid, sz))
|
||||
in Just (ImportWantedChecked False, loc, (cid, sz))
|
||||
mk _ = Nothing
|
||||
|
||||
-- This does not guard against every possible race. As long as the adb
|
||||
|
|
|
@ -168,8 +168,13 @@ checkAvailability :: BorgRepo -> Annex Availability
|
|||
checkAvailability borgrepo@(BorgRepo r) =
|
||||
checkPathAvailability (borgLocal borgrepo) r
|
||||
|
||||
listImportableContentsM :: UUID -> BorgRepo -> ParsedRemoteConfig -> Annex (Maybe (ImportableContentsChunkable Annex (ContentIdentifier, ByteSize)))
|
||||
listImportableContentsM u borgrepo c = prompt $ do
|
||||
listImportableContentsM
|
||||
:: UUID
|
||||
-> BorgRepo
|
||||
-> ParsedRemoteConfig
|
||||
-> ImportWantedChecker Annex
|
||||
-> Annex (Maybe (ImportableContentsChunkable Annex (ContentIdentifier, ByteSize)))
|
||||
listImportableContentsM u borgrepo c _wanted = prompt $ do
|
||||
imported <- getImported u
|
||||
ls <- withborglist (locBorgRepo borgrepo) Nothing formatarchivelist $ \as ->
|
||||
forM (filter (not . S.null) as) $ \archivename ->
|
||||
|
|
|
@ -382,21 +382,28 @@ removeExportLocation topdir loc =
|
|||
mkExportLocation loc'
|
||||
in go (upFrom loc') =<< tryIO (removeDirectory p)
|
||||
|
||||
listImportableContentsM :: IgnoreInodes -> RawFilePath -> Annex (Maybe (ImportableContentsChunkable Annex (ContentIdentifier, ByteSize)))
|
||||
listImportableContentsM ii dir = liftIO $ do
|
||||
l <- dirContentsRecursiveSkipping (const False) False (fromRawFilePath dir)
|
||||
listImportableContentsM
|
||||
:: IgnoreInodes
|
||||
-> RawFilePath
|
||||
-> ImportWantedChecker Annex
|
||||
-> Annex (Maybe (ImportableContentsChunkable Annex (ContentIdentifier, ByteSize)))
|
||||
listImportableContentsM ii dir wanted = do
|
||||
l <- liftIO $ dirContentsRecursiveSkipping (const False) False (fromRawFilePath dir)
|
||||
l' <- mapM (go . toRawFilePath) l
|
||||
return $ Just $ ImportableContentsComplete $
|
||||
ImportableContents (catMaybes l') []
|
||||
where
|
||||
go f = do
|
||||
st <- R.getSymbolicLinkStatus f
|
||||
mkContentIdentifier ii f st >>= \case
|
||||
Nothing -> return Nothing
|
||||
Just cid -> do
|
||||
relf <- relPathDirToFile dir f
|
||||
sz <- getFileSize' f st
|
||||
return $ Just (mkImportLocation relf, (cid, sz))
|
||||
relf <- liftIO $ relPathDirToFile dir f
|
||||
st <- liftIO $ R.getSymbolicLinkStatus f
|
||||
sz <- liftIO $ getFileSize' f st
|
||||
let loc = mkImportLocation relf
|
||||
wanted loc sz >>= \case
|
||||
ImportWantedChecked False -> return Nothing
|
||||
checked@(ImportWantedChecked True) ->
|
||||
liftIO (mkContentIdentifier ii f st) >>= return . \case
|
||||
Nothing -> Nothing
|
||||
Just cid -> Just (checked, loc, (cid, sz))
|
||||
|
||||
newtype IgnoreInodes = IgnoreInodes Bool
|
||||
|
||||
|
|
48
Remote/S3.hs
48
Remote/S3.hs
|
@ -34,6 +34,7 @@ import Network.HTTP.Types
|
|||
import Network.URI
|
||||
import Control.Monad.Trans.Resource
|
||||
import Control.Monad.Catch
|
||||
import Control.Monad.Trans
|
||||
import Control.Concurrent.STM (atomically)
|
||||
import Control.Concurrent.STM.TVar
|
||||
import Data.Maybe
|
||||
|
@ -568,43 +569,68 @@ renameExportS3 hv r rs info k src dest = Just <$> go
|
|||
srcobject = T.pack $ bucketExportLocation info src
|
||||
dstobject = T.pack $ bucketExportLocation info dest
|
||||
|
||||
listImportableContentsS3 :: S3HandleVar -> Remote -> S3Info -> ParsedRemoteConfig -> Annex (Maybe (ImportableContentsChunkable Annex (ContentIdentifier, ByteSize)))
|
||||
listImportableContentsS3 hv r info c =
|
||||
listImportableContentsS3
|
||||
:: S3HandleVar
|
||||
-> Remote
|
||||
-> S3Info
|
||||
-> ParsedRemoteConfig
|
||||
-> ImportWantedChecker Annex
|
||||
-> Annex (Maybe (ImportableContentsChunkable Annex (ContentIdentifier, ByteSize)))
|
||||
listImportableContentsS3 hv r info c wanted =
|
||||
withS3Handle hv $ \case
|
||||
Right h -> Just <$> go h
|
||||
Left p -> giveupS3HandleProblem p (uuid r)
|
||||
where
|
||||
go :: S3Handle -> Annex (ImportableContentsChunkable Annex (ContentIdentifier, ByteSize))
|
||||
go h = do
|
||||
ic <- liftIO $ runResourceT $ extractFromResourceT =<< startlist h
|
||||
ic <- runResourceT $ extractFromResourceT =<< startlist h
|
||||
return (ImportableContentsComplete ic)
|
||||
|
||||
fileprefix = T.pack <$> getRemoteConfigValue fileprefixField c
|
||||
|
||||
startlist :: S3Handle -> ResourceT Annex (ImportableContents (ContentIdentifier, ByteSize))
|
||||
startlist h
|
||||
| versioning info = do
|
||||
rsp <- sendS3Handle h $
|
||||
rsp <- lift $ sendS3Handle h $
|
||||
S3.getBucketObjectVersions (bucket info)
|
||||
continuelistversioned h [] rsp
|
||||
| otherwise = do
|
||||
rsp <- sendS3Handle h $
|
||||
rsp <- lift $ sendS3Handle h $
|
||||
(S3.getBucket (bucket info))
|
||||
{ S3.gbPrefix = fileprefix }
|
||||
continuelistunversioned h [] rsp
|
||||
|
||||
continuelistunversioned :: S3Handle -> [S3.GetBucketResponse] -> S3.GetBucketResponse -> ResourceT Annex (ImportableContents (ContentIdentifier, ByteSize))
|
||||
continuelistunversioned h l rsp
|
||||
| S3.gbrIsTruncated rsp = do
|
||||
rsp' <- sendS3Handle h $
|
||||
rsp' <- lift $ sendS3Handle h $
|
||||
(S3.getBucket (bucket info))
|
||||
{ S3.gbMarker = S3.gbrNextMarker rsp
|
||||
, S3.gbPrefix = fileprefix
|
||||
}
|
||||
-- wantedrsp <- filterwanted rsp
|
||||
continuelistunversioned h (rsp:l) rsp'
|
||||
| otherwise = return $
|
||||
mkImportableContentsUnversioned info (reverse (rsp:l))
|
||||
| otherwise = do
|
||||
--wantedrsp <- filterwanted rsp
|
||||
return $
|
||||
mkImportableContentsUnversioned info (reverse (rsp:l))
|
||||
|
||||
filterwanted rsp = filterwanted' [] (S3.gbrContents rsp)
|
||||
filterwanted' c [] = pure c
|
||||
filterwanted' c (oi:ois) =
|
||||
case bucketImportLocation info $ T.unpack $ S3.objectKey oi of
|
||||
Nothing -> filterwanted' c ois
|
||||
Just loc -> do
|
||||
let sz = S3.objectSize oi
|
||||
let cid = mkS3UnversionedContentIdentifier $ S3.objectETag oi
|
||||
wanted loc sz >>= \case
|
||||
ImportWantedChecked False -> filterwanted' c ois
|
||||
checked@(ImportWantedChecked True) ->
|
||||
filterwanted' ((checked, loc, (cid, sz)):c) ois
|
||||
|
||||
continuelistversioned h l rsp
|
||||
| S3.gbovrIsTruncated rsp = do
|
||||
rsp' <- sendS3Handle h $
|
||||
rsp' <- lift $ sendS3Handle h $
|
||||
(S3.getBucketObjectVersions (bucket info))
|
||||
{ S3.gbovKeyMarker = S3.gbovrNextKeyMarker rsp
|
||||
, S3.gbovVersionIdMarker = S3.gbovrNextVersionIdMarker rsp
|
||||
|
@ -625,7 +651,7 @@ mkImportableContentsUnversioned info l = ImportableContents
|
|||
T.unpack $ S3.objectKey oi
|
||||
let sz = S3.objectSize oi
|
||||
let cid = mkS3UnversionedContentIdentifier $ S3.objectETag oi
|
||||
return (loc, (cid, sz))
|
||||
return (ImportWantedChecked False, loc, (cid, sz))
|
||||
|
||||
mkImportableContentsVersioned :: S3Info -> [S3.GetBucketObjectVersionsResponse] -> ImportableContents (ContentIdentifier, ByteSize)
|
||||
mkImportableContentsVersioned info = build . groupfiles
|
||||
|
@ -645,7 +671,7 @@ mkImportableContentsVersioned info = build . groupfiles
|
|||
T.unpack $ S3.oviKey ovi
|
||||
let sz = S3.oviSize ovi
|
||||
let cid = mkS3VersionedContentIdentifier' ovi
|
||||
return (loc, (cid, sz))
|
||||
return (ImportWantedChecked False, loc, (cid, sz))
|
||||
extract (S3.DeleteMarker {}) = Nothing
|
||||
|
||||
-- group files so all versions of a file are in a sublist,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue