use 80% less memory when importing from a versioned S3 bucket
Same idea as commit eb714c107b
, but even
better, because a lot of the response is DeleteMarker, that can be garbage
collected now.
This commit is contained in:
parent
eb714c107b
commit
dc5bf24823
2 changed files with 41 additions and 35 deletions
|
@ -13,6 +13,8 @@ git-annex (10.20241032) UNRELEASED; urgency=medium
|
||||||
(Needs aws-0.24.3)
|
(Needs aws-0.24.3)
|
||||||
* S3: Fix infinite loop and memory blowup when importing from an
|
* S3: Fix infinite loop and memory blowup when importing from an
|
||||||
unversioned S3 bucket that is large enough to need pagination.
|
unversioned S3 bucket that is large enough to need pagination.
|
||||||
|
* S3: Use significantly less memory when importing from a
|
||||||
|
versioned S3 bucket.
|
||||||
|
|
||||||
-- Joey Hess <id@joeyh.name> Mon, 11 Nov 2024 12:26:00 -0400
|
-- Joey Hess <id@joeyh.name> Mon, 11 Nov 2024 12:26:00 -0400
|
||||||
|
|
||||||
|
|
74
Remote/S3.hs
74
Remote/S3.hs
|
@ -1,6 +1,6 @@
|
||||||
{- S3 remotes
|
{- S3 remotes
|
||||||
-
|
-
|
||||||
- Copyright 2011-2023 Joey Hess <id@joeyh.name>
|
- Copyright 2011-2024 Joey Hess <id@joeyh.name>
|
||||||
-
|
-
|
||||||
- Licensed under the GNU AGPL version 3 or higher.
|
- Licensed under the GNU AGPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
@ -27,6 +27,8 @@ import qualified Data.Set as S
|
||||||
import qualified System.FilePath.Posix as Posix
|
import qualified System.FilePath.Posix as Posix
|
||||||
import Data.Char
|
import Data.Char
|
||||||
import Data.String
|
import Data.String
|
||||||
|
import Data.Maybe
|
||||||
|
import Data.Time.Clock
|
||||||
import Network.Socket (HostName)
|
import Network.Socket (HostName)
|
||||||
import Network.HTTP.Conduit (Manager)
|
import Network.HTTP.Conduit (Manager)
|
||||||
import Network.HTTP.Client (responseStatus, responseBody, RequestBody(..))
|
import Network.HTTP.Client (responseStatus, responseBody, RequestBody(..))
|
||||||
|
@ -36,7 +38,6 @@ import Control.Monad.Trans.Resource
|
||||||
import Control.Monad.Catch
|
import Control.Monad.Catch
|
||||||
import Control.Concurrent.STM (atomically)
|
import Control.Concurrent.STM (atomically)
|
||||||
import Control.Concurrent.STM.TVar
|
import Control.Concurrent.STM.TVar
|
||||||
import Data.Maybe
|
|
||||||
|
|
||||||
import Annex.Common
|
import Annex.Common
|
||||||
import Types.Remote
|
import Types.Remote
|
||||||
|
@ -581,7 +582,7 @@ listImportableContentsS3 hv r info c =
|
||||||
| versioning info = do
|
| versioning info = do
|
||||||
rsp <- sendS3Handle h $
|
rsp <- sendS3Handle h $
|
||||||
S3.getBucketObjectVersions (bucket info)
|
S3.getBucketObjectVersions (bucket info)
|
||||||
continuelistversioned h [] rsp
|
continuelistversioned 0 h [] rsp
|
||||||
| otherwise = do
|
| otherwise = do
|
||||||
rsp <- sendS3Handle h $
|
rsp <- sendS3Handle h $
|
||||||
(S3.getBucket (bucket info))
|
(S3.getBucket (bucket info))
|
||||||
|
@ -610,7 +611,22 @@ listImportableContentsS3 hv r info c =
|
||||||
nomore = return $
|
nomore = return $
|
||||||
mkImportableContentsUnversioned
|
mkImportableContentsUnversioned
|
||||||
(reverse (extractunversioned rsp:l))
|
(reverse (extractunversioned rsp:l))
|
||||||
|
|
||||||
|
continuelistversioned n h l rsp
|
||||||
|
| S3.gbovrIsTruncated rsp = do
|
||||||
|
rsp' <- sendS3Handle h $
|
||||||
|
(S3.getBucketObjectVersions (bucket info))
|
||||||
|
{ S3.gbovKeyMarker = S3.gbovrNextKeyMarker rsp
|
||||||
|
, S3.gbovVersionIdMarker = S3.gbovrNextVersionIdMarker rsp
|
||||||
|
, S3.gbovPrefix = fileprefix
|
||||||
|
}
|
||||||
|
l' <- extractFromResourceT $
|
||||||
|
extractversioned rsp
|
||||||
|
continuelistversioned (length l' + n) h (l':l) rsp'
|
||||||
|
| otherwise = return $
|
||||||
|
mkImportableContentsVersioned
|
||||||
|
(reverse (extractversioned rsp:l))
|
||||||
|
|
||||||
extractunversioned = mapMaybe extractunversioned' . S3.gbrContents
|
extractunversioned = mapMaybe extractunversioned' . S3.gbrContents
|
||||||
extractunversioned' oi = do
|
extractunversioned' oi = do
|
||||||
loc <- bucketImportLocation info $
|
loc <- bucketImportLocation info $
|
||||||
|
@ -618,21 +634,15 @@ listImportableContentsS3 hv r info c =
|
||||||
let sz = S3.objectSize oi
|
let sz = S3.objectSize oi
|
||||||
let cid = mkS3UnversionedContentIdentifier $ S3.objectETag oi
|
let cid = mkS3UnversionedContentIdentifier $ S3.objectETag oi
|
||||||
return (loc, (cid, sz))
|
return (loc, (cid, sz))
|
||||||
|
|
||||||
continuelistversioned h l rsp
|
extractversioned = mapMaybe extractversioned' . S3.gbovrContents
|
||||||
| S3.gbovrIsTruncated rsp = do
|
extractversioned' ovi@(S3.ObjectVersion {}) = do
|
||||||
let showme x = case x of
|
loc <- bucketImportLocation info $
|
||||||
S3.DeleteMarker {} -> "delete"
|
T.unpack $ S3.oviKey ovi
|
||||||
v -> S3.oviKey v
|
let sz = S3.oviSize ovi
|
||||||
rsp' <- sendS3Handle h $
|
let cid = mkS3VersionedContentIdentifier' ovi
|
||||||
(S3.getBucketObjectVersions (bucket info))
|
return ((loc, (cid, sz)), S3.oviLastModified ovi)
|
||||||
{ S3.gbovKeyMarker = S3.gbovrNextKeyMarker rsp
|
extractversioned' (S3.DeleteMarker {}) = Nothing
|
||||||
, S3.gbovVersionIdMarker = S3.gbovrNextVersionIdMarker rsp
|
|
||||||
, S3.gbovPrefix = fileprefix
|
|
||||||
}
|
|
||||||
continuelistversioned h (rsp:l) rsp'
|
|
||||||
| otherwise = return $
|
|
||||||
mkImportableContentsVersioned info (reverse (rsp:l))
|
|
||||||
|
|
||||||
mkImportableContentsUnversioned :: [[(ImportLocation, (ContentIdentifier, ByteSize))]] -> ImportableContents (ContentIdentifier, ByteSize)
|
mkImportableContentsUnversioned :: [[(ImportLocation, (ContentIdentifier, ByteSize))]] -> ImportableContents (ContentIdentifier, ByteSize)
|
||||||
mkImportableContentsUnversioned l = ImportableContents
|
mkImportableContentsUnversioned l = ImportableContents
|
||||||
|
@ -640,48 +650,42 @@ mkImportableContentsUnversioned l = ImportableContents
|
||||||
, importableHistory = []
|
, importableHistory = []
|
||||||
}
|
}
|
||||||
|
|
||||||
mkImportableContentsVersioned :: S3Info -> [S3.GetBucketObjectVersionsResponse] -> ImportableContents (ContentIdentifier, ByteSize)
|
mkImportableContentsVersioned :: [[((ImportLocation, (ContentIdentifier, ByteSize)), UTCTime)]] -> ImportableContents (ContentIdentifier, ByteSize)
|
||||||
mkImportableContentsVersioned info = build . groupfiles
|
mkImportableContentsVersioned = build . groupfiles
|
||||||
where
|
where
|
||||||
|
ovilastmodified = snd
|
||||||
|
loc = fst . fst
|
||||||
|
|
||||||
build [] = ImportableContents [] []
|
build [] = ImportableContents [] []
|
||||||
build l =
|
build l =
|
||||||
let (l', v) = latestversion l
|
let (l', v) = latestversion l
|
||||||
in ImportableContents
|
in ImportableContents
|
||||||
{ importableContents = mapMaybe extract v
|
{ importableContents = map fst v
|
||||||
, importableHistory = case build l' of
|
, importableHistory = case build l' of
|
||||||
ImportableContents [] [] -> []
|
ImportableContents [] [] -> []
|
||||||
h -> [h]
|
h -> [h]
|
||||||
}
|
}
|
||||||
|
|
||||||
extract ovi@(S3.ObjectVersion {}) = do
|
|
||||||
loc <- bucketImportLocation info $
|
|
||||||
T.unpack $ S3.oviKey ovi
|
|
||||||
let sz = S3.oviSize ovi
|
|
||||||
let cid = mkS3VersionedContentIdentifier' ovi
|
|
||||||
return (loc, (cid, sz))
|
|
||||||
extract (S3.DeleteMarker {}) = Nothing
|
|
||||||
|
|
||||||
-- group files so all versions of a file are in a sublist,
|
-- group files so all versions of a file are in a sublist,
|
||||||
-- with the newest first. S3 uses such an order, so it's just a
|
-- with the newest first. S3 uses such an order, so it's just a
|
||||||
-- matter of breaking up the response list into sublists.
|
-- matter of breaking up the response list into sublists.
|
||||||
groupfiles = groupBy (\a b -> S3.oviKey a == S3.oviKey b)
|
groupfiles = groupBy (\a b -> loc a == loc b) . concat
|
||||||
. concatMap S3.gbovrContents
|
|
||||||
|
|
||||||
latestversion [] = ([], [])
|
latestversion [] = ([], [])
|
||||||
latestversion ([]:rest) = latestversion rest
|
latestversion ([]:rest) = latestversion rest
|
||||||
latestversion l@((first:_old):remainder) =
|
latestversion l@((first:_old):remainder) =
|
||||||
go (S3.oviLastModified first) [first] remainder
|
go (ovilastmodified first) [first] remainder
|
||||||
where
|
where
|
||||||
go mtime c [] = (removemostrecent mtime l, reverse c)
|
go mtime c [] = (removemostrecent mtime l, reverse c)
|
||||||
go mtime c ([]:rest) = go mtime c rest
|
go mtime c ([]:rest) = go mtime c rest
|
||||||
go mtime c ((latest:_old):rest) =
|
go mtime c ((latest:_old):rest) =
|
||||||
let !mtime' = max mtime (S3.oviLastModified latest)
|
let !mtime' = max mtime (ovilastmodified latest)
|
||||||
in go mtime' (latest:c) rest
|
in go mtime' (latest:c) rest
|
||||||
|
|
||||||
removemostrecent _ [] = []
|
removemostrecent _ [] = []
|
||||||
removemostrecent mtime ([]:rest) = removemostrecent mtime rest
|
removemostrecent mtime ([]:rest) = removemostrecent mtime rest
|
||||||
removemostrecent mtime (i@(curr:old):rest)
|
removemostrecent mtime (i@(curr:old):rest)
|
||||||
| S3.oviLastModified curr == mtime =
|
| ovilastmodified curr == mtime =
|
||||||
old : removemostrecent mtime rest
|
old : removemostrecent mtime rest
|
||||||
| otherwise =
|
| otherwise =
|
||||||
i : removemostrecent mtime rest
|
i : removemostrecent mtime rest
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue