use S3 version ID for retrieval
Have to store the S3 object along with the version ID, so retrieval can use the same object. This commit was supported by the NSF-funded DataLad project.
This commit is contained in:
parent
794e9a7a44
commit
19dcff2b71
2 changed files with 60 additions and 31 deletions
85
Remote/S3.hs
85
Remote/S3.hs
|
@ -206,7 +206,7 @@ storeHelper info h f object p = case partSize info of
|
||||||
singlepartupload = do
|
singlepartupload = do
|
||||||
rbody <- liftIO $ httpBodyStorer f p
|
rbody <- liftIO $ httpBodyStorer f p
|
||||||
r <- sendS3Handle h $ putObject info object rbody
|
r <- sendS3Handle h $ putObject info object rbody
|
||||||
return (mkS3VersionID (S3.porVersionId r))
|
return (mkS3VersionID object (S3.porVersionId r))
|
||||||
multipartupload fsz partsz = do
|
multipartupload fsz partsz = do
|
||||||
#if MIN_VERSION_aws(0,10,6)
|
#if MIN_VERSION_aws(0,10,6)
|
||||||
let startreq = (S3.postInitiateMultipartUpload (bucket info) object)
|
let startreq = (S3.postInitiateMultipartUpload (bucket info) object)
|
||||||
|
@ -245,7 +245,7 @@ storeHelper info h f object p = case partSize info of
|
||||||
|
|
||||||
r <- sendS3Handle h $ S3.postCompleteMultipartUpload
|
r <- sendS3Handle h $ S3.postCompleteMultipartUpload
|
||||||
(bucket info) object uploadid (zip [1..] etags)
|
(bucket info) object uploadid (zip [1..] etags)
|
||||||
return (mkS3VersionID (S3.cmurVersionId r))
|
return (mkS3VersionID object (S3.cmurVersionId r))
|
||||||
#else
|
#else
|
||||||
warning $ "Cannot do multipart upload (partsize " ++ show partsz ++ ") of large file (" ++ show fsz ++ "); built with too old a version of the aws library."
|
warning $ "Cannot do multipart upload (partsize " ++ show partsz ++ ") of large file (" ++ show fsz ++ "); built with too old a version of the aws library."
|
||||||
singlepartupload
|
singlepartupload
|
||||||
|
@ -255,8 +255,9 @@ storeHelper info h f object p = case partSize info of
|
||||||
- out to the file. Would be better to implement a byteRetriever, but
|
- out to the file. Would be better to implement a byteRetriever, but
|
||||||
- that is difficult. -}
|
- that is difficult. -}
|
||||||
retrieve :: Remote -> S3Info -> Maybe S3Handle -> Retriever
|
retrieve :: Remote -> S3Info -> Maybe S3Handle -> Retriever
|
||||||
retrieve _ info (Just h) = fileRetriever $ \f k p ->
|
retrieve r info (Just h) = fileRetriever $ \f k p -> do
|
||||||
retrieveHelper info h (T.pack $ bucketObject info k) f p
|
loc <- getS3VersionID info (uuid r) k (T.pack $ bucketObject info k)
|
||||||
|
retrieveHelper info h loc f p
|
||||||
retrieve r info Nothing = case getpublicurl info of
|
retrieve r info Nothing = case getpublicurl info of
|
||||||
Nothing -> \_ _ _ -> do
|
Nothing -> \_ _ _ -> do
|
||||||
needS3Creds (uuid r)
|
needS3Creds (uuid r)
|
||||||
|
@ -265,9 +266,12 @@ retrieve r info Nothing = case getpublicurl info of
|
||||||
unlessM (downloadUrl k p [geturl $ bucketObject info k] f) $
|
unlessM (downloadUrl k p [geturl $ bucketObject info k] f) $
|
||||||
giveup "failed to download content"
|
giveup "failed to download content"
|
||||||
|
|
||||||
retrieveHelper :: S3Info -> S3Handle -> S3.Object -> FilePath -> MeterUpdate -> Annex ()
|
retrieveHelper :: S3Info -> S3Handle -> (Either S3.Object S3VersionID) -> FilePath -> MeterUpdate -> Annex ()
|
||||||
retrieveHelper info h object f p = liftIO $ runResourceT $ do
|
retrieveHelper info h loc f p = liftIO $ runResourceT $ do
|
||||||
let req = S3.getObject (bucket info) object
|
let req = case loc of
|
||||||
|
Left o -> S3.getObject (bucket info) o
|
||||||
|
Right (S3VersionID o vid) -> (S3.getObject (bucket info) o)
|
||||||
|
{ S3.goVersionId = Just (T.pack vid) }
|
||||||
S3.GetObjectResponse { S3.gorResponse = rsp } <- sendS3Handle' h req
|
S3.GetObjectResponse { S3.gorResponse = rsp } <- sendS3Handle' h req
|
||||||
Url.sinkResponseFile p zeroBytesProcessed f WriteMode rsp
|
Url.sinkResponseFile p zeroBytesProcessed f WriteMode rsp
|
||||||
|
|
||||||
|
@ -294,10 +298,11 @@ checkKey r info Nothing k = case getpublicurl info of
|
||||||
checkBoth (geturl $ bucketObject info k) (keySize k)
|
checkBoth (geturl $ bucketObject info k) (keySize k)
|
||||||
checkKey r info (Just h) k = do
|
checkKey r info (Just h) k = do
|
||||||
showChecking r
|
showChecking r
|
||||||
checkKeyHelper info h (T.pack $ bucketObject info k)
|
loc <- getS3VersionID info (uuid r) k (T.pack $ bucketObject info k)
|
||||||
|
checkKeyHelper info h loc
|
||||||
|
|
||||||
checkKeyHelper :: S3Info -> S3Handle -> S3.Object -> Annex Bool
|
checkKeyHelper :: S3Info -> S3Handle -> (Either S3.Object S3VersionID) -> Annex Bool
|
||||||
checkKeyHelper info h object = do
|
checkKeyHelper info h loc = do
|
||||||
#if MIN_VERSION_aws(0,10,0)
|
#if MIN_VERSION_aws(0,10,0)
|
||||||
rsp <- go
|
rsp <- go
|
||||||
return (isJust $ S3.horMetadata rsp)
|
return (isJust $ S3.horMetadata rsp)
|
||||||
|
@ -307,7 +312,11 @@ checkKeyHelper info h object = do
|
||||||
return True
|
return True
|
||||||
#endif
|
#endif
|
||||||
where
|
where
|
||||||
go = sendS3Handle h $ S3.headObject (bucket info) object
|
go = sendS3Handle h req
|
||||||
|
req = case loc of
|
||||||
|
Left o -> S3.headObject (bucket info) o
|
||||||
|
Right (S3VersionID o vid) -> (S3.headObject (bucket info) o)
|
||||||
|
{ S3.hoVersionId = Just (T.pack vid) }
|
||||||
|
|
||||||
#if ! MIN_VERSION_aws(0,10,0)
|
#if ! MIN_VERSION_aws(0,10,0)
|
||||||
{- Catch exception headObject returns when an object is not present
|
{- Catch exception headObject returns when an object is not present
|
||||||
|
@ -327,10 +336,9 @@ storeExportS3 u info (Just h) f k loc p =
|
||||||
catchNonAsync go (\e -> warning (show e) >> return False)
|
catchNonAsync go (\e -> warning (show e) >> return False)
|
||||||
where
|
where
|
||||||
go = do
|
go = do
|
||||||
storeHelper info h f (T.pack $ bucketExportLocation info loc) p
|
let o = T.pack $ bucketExportLocation info loc
|
||||||
>>= if versioning info
|
storeHelper info h f o p
|
||||||
then setS3VersionID u k
|
>>= setS3VersionID info u k
|
||||||
else const noop
|
|
||||||
return True
|
return True
|
||||||
storeExportS3 u _ Nothing _ _ _ _ = do
|
storeExportS3 u _ Nothing _ _ _ _ = do
|
||||||
needS3Creds u
|
needS3Creds u
|
||||||
|
@ -342,7 +350,7 @@ retrieveExportS3 u info mh _k loc f p =
|
||||||
where
|
where
|
||||||
go = case mh of
|
go = case mh of
|
||||||
Just h -> do
|
Just h -> do
|
||||||
retrieveHelper info h (T.pack exporturl) f p
|
retrieveHelper info h (Left (T.pack exporturl)) f p
|
||||||
return True
|
return True
|
||||||
Nothing -> case getpublicurl info of
|
Nothing -> case getpublicurl info of
|
||||||
Nothing -> do
|
Nothing -> do
|
||||||
|
@ -366,7 +374,7 @@ removeExportS3 u _ Nothing _ _ = do
|
||||||
|
|
||||||
checkPresentExportS3 :: UUID -> S3Info -> Maybe S3Handle -> Key -> ExportLocation -> Annex Bool
|
checkPresentExportS3 :: UUID -> S3Info -> Maybe S3Handle -> Key -> ExportLocation -> Annex Bool
|
||||||
checkPresentExportS3 _u info (Just h) _k loc =
|
checkPresentExportS3 _u info (Just h) _k loc =
|
||||||
checkKeyHelper info h (T.pack $ bucketExportLocation info loc)
|
checkKeyHelper info h (Left (T.pack $ bucketExportLocation info loc))
|
||||||
checkPresentExportS3 u info Nothing k loc = case getpublicurl info of
|
checkPresentExportS3 u info Nothing k loc = case getpublicurl info of
|
||||||
Nothing -> do
|
Nothing -> do
|
||||||
needS3Creds u
|
needS3Creds u
|
||||||
|
@ -732,26 +740,45 @@ getWebUrls info c k
|
||||||
(True, Just geturl) -> return [geturl $ bucketObject info k]
|
(True, Just geturl) -> return [geturl $ bucketObject info k]
|
||||||
_ -> return []
|
_ -> return []
|
||||||
|
|
||||||
newtype S3VersionID = S3VersionID String
|
data S3VersionID = S3VersionID S3.Object String
|
||||||
deriving (Show)
|
deriving (Show)
|
||||||
|
|
||||||
-- smart constructor
|
-- smart constructor
|
||||||
mkS3VersionID :: Maybe T.Text -> Maybe S3VersionID
|
mkS3VersionID :: S3.Object -> Maybe T.Text -> Maybe S3VersionID
|
||||||
mkS3VersionID = mkS3VersionID' . fmap T.unpack
|
mkS3VersionID o = mkS3VersionID' o . fmap T.unpack
|
||||||
|
|
||||||
mkS3VersionID' :: Maybe String -> Maybe S3VersionID
|
mkS3VersionID' :: S3.Object -> Maybe String -> Maybe S3VersionID
|
||||||
mkS3VersionID' Nothing = Nothing
|
mkS3VersionID' o (Just s)
|
||||||
mkS3VersionID' (Just s)
|
|
||||||
| null s = Nothing
|
| null s = Nothing
|
||||||
-- AWS documentation says a version ID is at most 1024 bytes long.
|
-- AWS documentation says a version ID is at most 1024 bytes long.
|
||||||
-- Since they are stored in the git-annex branch, prevent them from
|
-- Since they are stored in the git-annex branch, prevent them from
|
||||||
-- being very much larger than that.
|
-- being very much larger than that.
|
||||||
| length s < 2048 = Just (S3VersionID s)
|
| length s < 2048 = Just (S3VersionID o s)
|
||||||
| otherwise = Nothing
|
| otherwise = Nothing
|
||||||
|
mkS3VersionID' _ Nothing = Nothing
|
||||||
|
|
||||||
setS3VersionID :: UUID -> Key -> Maybe S3VersionID -> Annex ()
|
-- A S3 version ID is "url ready" so does not contain spaces,
|
||||||
setS3VersionID u k (Just (S3VersionID v)) = setRemoteState u k v
|
-- but an Object may contain spaces, so put it last.
|
||||||
setS3VersionID _ _ Nothing = noop
|
formatS3VersionID :: S3VersionID -> String
|
||||||
|
formatS3VersionID (S3VersionID o v) = v ++ ' ' : T.unpack o
|
||||||
|
|
||||||
getS3VersionID :: UUID -> Key -> Annex (Maybe S3VersionID)
|
parseS3VersionID :: String -> Maybe S3VersionID
|
||||||
getS3VersionID u k = mkS3VersionID' <$> getRemoteState u k
|
parseS3VersionID s =
|
||||||
|
let (v, o) = separate (== ' ') s
|
||||||
|
in mkS3VersionID' (T.pack o) (Just v)
|
||||||
|
|
||||||
|
setS3VersionID :: S3Info -> UUID -> Key -> Maybe S3VersionID -> Annex ()
|
||||||
|
setS3VersionID info u k vid
|
||||||
|
| versioning info = maybe noop (setS3VersionID' u k) vid
|
||||||
|
| otherwise = noop
|
||||||
|
|
||||||
|
setS3VersionID' :: UUID -> Key -> S3VersionID -> Annex ()
|
||||||
|
setS3VersionID' u k vid = setRemoteState u k (formatS3VersionID vid)
|
||||||
|
|
||||||
|
getS3VersionID :: S3Info -> UUID -> Key -> S3.Object -> Annex (Either S3.Object S3VersionID)
|
||||||
|
getS3VersionID info u k fallback
|
||||||
|
| versioning info = maybe (Left fallback) Right <$> getS3VersionID' u k
|
||||||
|
| otherwise = return (Left fallback)
|
||||||
|
|
||||||
|
getS3VersionID' :: UUID -> Key -> Annex (Maybe S3VersionID)
|
||||||
|
getS3VersionID' u k = maybe Nothing parseS3VersionID <$> getRemoteState u k
|
||||||
|
|
|
@ -63,9 +63,11 @@ done
|
||||||
Make S3 store version IDs for exported files in the per-remote log when so
|
Make S3 store version IDs for exported files in the per-remote log when so
|
||||||
configured. done
|
configured. done
|
||||||
|
|
||||||
Use version IDs when retrieving keys and for checkpresent.
|
Use version IDs when retrieving keys and for checkpresent. done
|
||||||
|
|
||||||
Can public urls be generated using version IDs?
|
Can public urls be generated using version IDs? The url has
|
||||||
|
"?versionId=" appended, but all the examples I've seen include S3
|
||||||
|
authorization headers.
|
||||||
|
|
||||||
When a file was deleted from an exported tree, and then put back
|
When a file was deleted from an exported tree, and then put back
|
||||||
in a later exported tree, it might get re-uploaded even though the content
|
in a later exported tree, it might get re-uploaded even though the content
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue