versioned import from S3 is working
Still some bugs and two stubbed methods to implement though.
This commit is contained in:
parent
833980c0bc
commit
2f79cb4b45
3 changed files with 70 additions and 23 deletions
|
@ -4,6 +4,14 @@ git-annex (7.20190323) UNRELEASED; urgency=medium
|
|||
to allow git-annex import of files from an Android device. This can be
|
||||
combined with exporttree=yes and git-annex export used to send changes
|
||||
back to the Android device.
|
||||
* S3 special remote supports being configured with importree=yes,
|
||||
to allow git-annex import of files from a S3 bucket. This can be
|
||||
combined with exporttree=yes and git-annex export used to send changes
|
||||
back to the S3 bucket.
|
||||
* S3: When versioning is enabled on a bucket, importing from it will
|
||||
import old versions of files that were written to the bucket as well
|
||||
as the current versions. A git history is synthesized to reflect the way
|
||||
the bucket changed over time.
|
||||
* Fix bug that caused importing from a special remote to repeatedly
|
||||
download unchanged files when multiple files in the remote have the same
|
||||
content.
|
||||
|
|
72
Remote/S3.hs
72
Remote/S3.hs
|
@ -116,7 +116,7 @@ gen r u c gc = do
|
|||
}
|
||||
, importActions = ImportActions
|
||||
{ listImportableContents = listImportableContentsS3 hdl this info
|
||||
, retrieveExportWithContentIdentifier = retrieveExportWithContentIdentifierS3 hdl info
|
||||
, retrieveExportWithContentIdentifier = retrieveExportWithContentIdentifierS3 hdl this info
|
||||
, storeExportWithContentIdentifier = storeExportWithContentIdentifierS3 hdl this info magic
|
||||
, removeExportWithContentIdentifier = removeExportWithContentIdentifierS3 hdl info
|
||||
, removeExportDirectoryWhenEmpty = Nothing
|
||||
|
@ -305,11 +305,14 @@ retrieve hv r c info = fileRetriever $ \f k p -> withS3Handle hv $ \case
|
|||
giveup "failed to download content"
|
||||
|
||||
retrieveHelper :: S3Info -> S3Handle -> (Either S3.Object S3VersionID) -> FilePath -> MeterUpdate -> Annex ()
|
||||
retrieveHelper info h loc f p = liftIO $ runResourceT $ do
|
||||
let req = case loc of
|
||||
retrieveHelper info h loc f p = retrieveHelper' h f p $
|
||||
case loc of
|
||||
Left o -> S3.getObject (bucket info) o
|
||||
Right (S3VersionID o vid) -> (S3.getObject (bucket info) o)
|
||||
{ S3.goVersionId = Just vid }
|
||||
|
||||
retrieveHelper' :: S3Handle -> FilePath -> MeterUpdate -> S3.GetObject -> Annex ()
|
||||
retrieveHelper' h f p req = liftIO $ runResourceT $ do
|
||||
S3.GetObjectResponse { S3.gorResponse = rsp } <- sendS3Handle h req
|
||||
Url.sinkResponseFile p zeroBytesProcessed f WriteMode rsp
|
||||
|
||||
|
@ -443,12 +446,10 @@ listImportableContentsS3 hv r info =
|
|||
| versioning info = do
|
||||
rsp <- sendS3Handle h $
|
||||
S3.getBucketObjectVersions (bucket info)
|
||||
liftIO $ print rsp
|
||||
continuelistversioned h [] rsp
|
||||
| otherwise = do
|
||||
rsp <- sendS3Handle h $
|
||||
S3.getBucket (bucket info)
|
||||
liftIO $ print rsp
|
||||
continuelistunversioned h [] rsp
|
||||
|
||||
continuelistunversioned h l rsp
|
||||
|
@ -457,7 +458,6 @@ listImportableContentsS3 hv r info =
|
|||
(S3.getBucket (bucket info))
|
||||
{ S3.gbMarker = S3.gbrNextMarker rsp
|
||||
}
|
||||
liftIO $ print rsp'
|
||||
continuelistunversioned h (rsp:l) rsp'
|
||||
| otherwise = return $
|
||||
mkImportableContentsUnversioned info (reverse (rsp:l))
|
||||
|
@ -469,12 +469,9 @@ listImportableContentsS3 hv r info =
|
|||
{ S3.gbovKeyMarker = S3.gbovrNextKeyMarker rsp
|
||||
, S3.gbovVersionIdMarker = S3.gbovrNextVersionIdMarker rsp
|
||||
}
|
||||
liftIO $ print rsp
|
||||
continuelistversioned h (rsp:l) rsp'
|
||||
| otherwise = do
|
||||
let v = mkImportableContentsVersioned info (reverse (rsp:l))
|
||||
liftIO $ print v
|
||||
return v
|
||||
| otherwise = return $
|
||||
mkImportableContentsVersioned info (reverse (rsp:l))
|
||||
|
||||
mkImportableContentsUnversioned :: S3Info -> [S3.GetBucketResponse] -> ImportableContents (ContentIdentifier, ByteSize)
|
||||
mkImportableContentsUnversioned info l = ImportableContents
|
||||
|
@ -486,8 +483,7 @@ mkImportableContentsUnversioned info l = ImportableContents
|
|||
loc <- bucketImportLocation info $
|
||||
T.unpack $ S3.objectKey oi
|
||||
let sz = S3.objectSize oi
|
||||
let cid = mkS3UnversionedContentIdentifier $
|
||||
S3.objectETag oi
|
||||
let cid = mkS3UnversionedContentIdentifier $ S3.objectETag oi
|
||||
return (loc, (cid, sz))
|
||||
|
||||
mkImportableContentsVersioned :: S3Info -> [S3.GetBucketObjectVersionsResponse] -> ImportableContents (ContentIdentifier, ByteSize)
|
||||
|
@ -507,8 +503,7 @@ mkImportableContentsVersioned info = build . groupfiles
|
|||
loc <- bucketImportLocation info $
|
||||
T.unpack $ S3.oviKey ovi
|
||||
let sz = S3.oviSize ovi
|
||||
let cid = mkS3UnversionedContentIdentifier $
|
||||
S3.oviETag ovi
|
||||
let cid = mkS3VersionedContentIdentifier' ovi
|
||||
return (loc, (cid, sz))
|
||||
extract (S3.DeleteMarker {}) = Nothing
|
||||
|
||||
|
@ -537,8 +532,27 @@ mkImportableContentsVersioned info = build . groupfiles
|
|||
| otherwise =
|
||||
i : removemostrecent mtime rest
|
||||
|
||||
retrieveExportWithContentIdentifierS3 :: S3HandleVar -> S3Info -> ExportLocation -> ContentIdentifier -> FilePath -> Annex (Maybe Key) -> MeterUpdate -> Annex (Maybe Key)
|
||||
retrieveExportWithContentIdentifierS3 hv info loc cid dest mkkey p = undefined
|
||||
retrieveExportWithContentIdentifierS3 :: S3HandleVar -> Remote -> S3Info -> ExportLocation -> ContentIdentifier -> FilePath -> Annex (Maybe Key) -> MeterUpdate -> Annex (Maybe Key)
|
||||
retrieveExportWithContentIdentifierS3 hv r info loc cid dest mkkey p = withS3Handle hv $ \case
|
||||
Nothing -> do
|
||||
warning $ needS3Creds (uuid r)
|
||||
return Nothing
|
||||
Just h -> flip catchNonAsync (\e -> warning (show e) >> return Nothing) $ do
|
||||
rewritePreconditionException $ retrieveHelper' h dest p $
|
||||
limitGetToContentIdentifier cid $
|
||||
S3.getObject (bucket info) o
|
||||
mkkey
|
||||
where
|
||||
o = T.pack $ bucketExportLocation info loc
|
||||
|
||||
{- Catch exception getObject returns when a precondition is not met,
|
||||
- and replace with a more understandable message for the user. -}
|
||||
rewritePreconditionException :: Annex a -> Annex a
|
||||
rewritePreconditionException a = catchJust (Url.matchStatusCodeException want) a $
|
||||
const $ giveup "requested version of object is not available in S3 bucket"
|
||||
where
|
||||
want st = statusCode st == 412 &&
|
||||
statusMessage st == "Precondition Failed"
|
||||
|
||||
-- Does not check if content on S3 is safe to overwrite, because there
|
||||
-- is no atomic way to do so. When the bucket is versioned, this is
|
||||
|
@ -998,15 +1012,31 @@ mkS3VersionedContentIdentifier :: S3VersionID -> ContentIdentifier
|
|||
mkS3VersionedContentIdentifier (S3VersionID _ v) =
|
||||
ContentIdentifier $ T.encodeUtf8 v
|
||||
|
||||
-- S3 returns etags surrounded by double quotes, and the quotes are
|
||||
-- included here.
|
||||
mkS3VersionedContentIdentifier' :: S3.ObjectVersionInfo -> ContentIdentifier
|
||||
mkS3VersionedContentIdentifier' =
|
||||
ContentIdentifier . T.encodeUtf8 . S3.oviVersionId
|
||||
|
||||
-- S3 returns etags surrounded by double quotes, and the quotes may
|
||||
-- be included here.
|
||||
type S3Etag = T.Text
|
||||
|
||||
-- For an unversioned bucket, the S3Etag is instead used as the
|
||||
-- ContentIdentifier.
|
||||
-- ContentIdentifier. Prefixed by '#' since that cannot appear in a S3
|
||||
-- version id.
|
||||
mkS3UnversionedContentIdentifier :: S3Etag -> ContentIdentifier
|
||||
mkS3UnversionedContentIdentifier t =
|
||||
ContentIdentifier $ T.encodeUtf8 $ T.filter (/= '"') t
|
||||
ContentIdentifier $ T.encodeUtf8 $ "#" <> T.filter (/= '"') t
|
||||
|
||||
-- Makes a GetObject request be guaranteed to get the object version
|
||||
-- matching the ContentIdentifier, or fail.
|
||||
limitGetToContentIdentifier :: ContentIdentifier -> S3.GetObject -> S3.GetObject
|
||||
limitGetToContentIdentifier (ContentIdentifier v) req =
|
||||
let t = either mempty id (T.decodeUtf8' v)
|
||||
in case T.take 1 t of
|
||||
"#" ->
|
||||
let etag = T.drop 1 t
|
||||
in req { S3.goIfMatch = Just etag }
|
||||
_ -> req { S3.goVersionId = Just t }
|
||||
|
||||
setS3VersionID :: S3Info -> UUID -> Key -> Maybe S3VersionID -> Annex ()
|
||||
setS3VersionID info u k vid
|
||||
|
|
|
@ -13,8 +13,17 @@ and `git annex sync --content` can be configured to use it.
|
|||
|
||||
## remaining todo
|
||||
|
||||
* Currently only directory and adb special remotes support importing,
|
||||
at least S3 can also support it.
|
||||
* There's a bug in the way the import history is added to the
|
||||
previous history for the remote. This leads to an ever-growing history
|
||||
when importing from a versioned S3 remote,
|
||||
although the head of it does reflect the current state of the remote.
|
||||
|
||||
* Two undefs in Remote.S3 remain.
|
||||
|
||||
* Need to test S3 import from unversioned bucket.
|
||||
|
||||
* Need to test S3 import from versioned bucket, including handling
|
||||
of deletion of files.
|
||||
|
||||
* Write a tip or tips to document using this new feature.
|
||||
(Have one for adb now.)
|
||||
|
|
Loading…
Add table
Reference in a new issue