From dc5bf24823d78c59d06a9e855d157f04b80bb0a8 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Fri, 15 Nov 2024 14:19:02 -0400 Subject: [PATCH] use 80% less memory when importing from a versioned S3 bucket Same idea as commit eb714c107ba81d805458c770db8d4f22ae12a077, but even better, because a lot of the response is DeleteMarker, that can be garbage collected now. --- CHANGELOG | 2 ++ Remote/S3.hs | 74 +++++++++++++++++++++++++++------------------------- 2 files changed, 41 insertions(+), 35 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index c80d47f2b0..254f914b59 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -13,6 +13,8 @@ git-annex (10.20241032) UNRELEASED; urgency=medium (Needs aws-0.24.3) * S3: Fix infinite loop and memory blowup when importing from an unversioned S3 bucket that is large enough to need pagination. + * S3: Use significantly less memory when importing from a + versioned S3 bucket. -- Joey Hess Mon, 11 Nov 2024 12:26:00 -0400 diff --git a/Remote/S3.hs b/Remote/S3.hs index 36cbedef50..423a254ce8 100644 --- a/Remote/S3.hs +++ b/Remote/S3.hs @@ -1,6 +1,6 @@ {- S3 remotes - - - Copyright 2011-2023 Joey Hess + - Copyright 2011-2024 Joey Hess - - Licensed under the GNU AGPL version 3 or higher. -} @@ -27,6 +27,8 @@ import qualified Data.Set as S import qualified System.FilePath.Posix as Posix import Data.Char import Data.String +import Data.Maybe +import Data.Time.Clock import Network.Socket (HostName) import Network.HTTP.Conduit (Manager) import Network.HTTP.Client (responseStatus, responseBody, RequestBody(..)) @@ -36,7 +38,6 @@ import Control.Monad.Trans.Resource import Control.Monad.Catch import Control.Concurrent.STM (atomically) import Control.Concurrent.STM.TVar -import Data.Maybe import Annex.Common import Types.Remote @@ -581,7 +582,7 @@ listImportableContentsS3 hv r info c = | versioning info = do rsp <- sendS3Handle h $ S3.getBucketObjectVersions (bucket info) - continuelistversioned h [] rsp + continuelistversioned 0 h [] rsp | otherwise = do rsp <- sendS3Handle h $ (S3.getBucket (bucket info)) @@ -610,7 +611,22 @@ listImportableContentsS3 hv r info c = nomore = return $ mkImportableContentsUnversioned (reverse (extractunversioned rsp:l)) - + + continuelistversioned n h l rsp + | S3.gbovrIsTruncated rsp = do + rsp' <- sendS3Handle h $ + (S3.getBucketObjectVersions (bucket info)) + { S3.gbovKeyMarker = S3.gbovrNextKeyMarker rsp + , S3.gbovVersionIdMarker = S3.gbovrNextVersionIdMarker rsp + , S3.gbovPrefix = fileprefix + } + l' <- extractFromResourceT $ + extractversioned rsp + continuelistversioned (length l' + n) h (l':l) rsp' + | otherwise = return $ + mkImportableContentsVersioned + (reverse (extractversioned rsp:l)) + extractunversioned = mapMaybe extractunversioned' . S3.gbrContents extractunversioned' oi = do loc <- bucketImportLocation info $ @@ -618,21 +634,15 @@ listImportableContentsS3 hv r info c = let sz = S3.objectSize oi let cid = mkS3UnversionedContentIdentifier $ S3.objectETag oi return (loc, (cid, sz)) - - continuelistversioned h l rsp - | S3.gbovrIsTruncated rsp = do - let showme x = case x of - S3.DeleteMarker {} -> "delete" - v -> S3.oviKey v - rsp' <- sendS3Handle h $ - (S3.getBucketObjectVersions (bucket info)) - { S3.gbovKeyMarker = S3.gbovrNextKeyMarker rsp - , S3.gbovVersionIdMarker = S3.gbovrNextVersionIdMarker rsp - , S3.gbovPrefix = fileprefix - } - continuelistversioned h (rsp:l) rsp' - | otherwise = return $ - mkImportableContentsVersioned info (reverse (rsp:l)) + + extractversioned = mapMaybe extractversioned' . S3.gbovrContents + extractversioned' ovi@(S3.ObjectVersion {}) = do + loc <- bucketImportLocation info $ + T.unpack $ S3.oviKey ovi + let sz = S3.oviSize ovi + let cid = mkS3VersionedContentIdentifier' ovi + return ((loc, (cid, sz)), S3.oviLastModified ovi) + extractversioned' (S3.DeleteMarker {}) = Nothing mkImportableContentsUnversioned :: [[(ImportLocation, (ContentIdentifier, ByteSize))]] -> ImportableContents (ContentIdentifier, ByteSize) mkImportableContentsUnversioned l = ImportableContents @@ -640,48 +650,42 @@ mkImportableContentsUnversioned l = ImportableContents , importableHistory = [] } -mkImportableContentsVersioned :: S3Info -> [S3.GetBucketObjectVersionsResponse] -> ImportableContents (ContentIdentifier, ByteSize) -mkImportableContentsVersioned info = build . groupfiles +mkImportableContentsVersioned :: [[((ImportLocation, (ContentIdentifier, ByteSize)), UTCTime)]] -> ImportableContents (ContentIdentifier, ByteSize) +mkImportableContentsVersioned = build . groupfiles where + ovilastmodified = snd + loc = fst . fst + build [] = ImportableContents [] [] build l = let (l', v) = latestversion l in ImportableContents - { importableContents = mapMaybe extract v + { importableContents = map fst v , importableHistory = case build l' of ImportableContents [] [] -> [] h -> [h] } - - extract ovi@(S3.ObjectVersion {}) = do - loc <- bucketImportLocation info $ - T.unpack $ S3.oviKey ovi - let sz = S3.oviSize ovi - let cid = mkS3VersionedContentIdentifier' ovi - return (loc, (cid, sz)) - extract (S3.DeleteMarker {}) = Nothing -- group files so all versions of a file are in a sublist, -- with the newest first. S3 uses such an order, so it's just a -- matter of breaking up the response list into sublists. - groupfiles = groupBy (\a b -> S3.oviKey a == S3.oviKey b) - . concatMap S3.gbovrContents + groupfiles = groupBy (\a b -> loc a == loc b) . concat latestversion [] = ([], []) latestversion ([]:rest) = latestversion rest latestversion l@((first:_old):remainder) = - go (S3.oviLastModified first) [first] remainder + go (ovilastmodified first) [first] remainder where go mtime c [] = (removemostrecent mtime l, reverse c) go mtime c ([]:rest) = go mtime c rest go mtime c ((latest:_old):rest) = - let !mtime' = max mtime (S3.oviLastModified latest) + let !mtime' = max mtime (ovilastmodified latest) in go mtime' (latest:c) rest removemostrecent _ [] = [] removemostrecent mtime ([]:rest) = removemostrecent mtime rest removemostrecent mtime (i@(curr:old):rest) - | S3.oviLastModified curr == mtime = + | ovilastmodified curr == mtime = old : removemostrecent mtime rest | otherwise = i : removemostrecent mtime rest