WIP multipart S3 upload
I'm a little stuck on getting the list of etags of the parts. This seems to require taking the md5 of each part locally, which doesn't get along well with lazily streaming in the part from the file. It would need to read the file twice, or lose laziness and buffer a whole part -- but parts might be quite large. This seems to be a problem with the API provided; S3 is supposed to return an etag, but that is not exposed. I have filed a bug: https://github.com/aristidb/aws/issues/141
This commit is contained in:
parent
e535ff8fa4
commit
6e89d070bc
3 changed files with 49 additions and 6 deletions
48
Remote/S3.hs
48
Remote/S3.hs
|
@ -40,6 +40,7 @@ import Creds
|
||||||
import Annex.UUID
|
import Annex.UUID
|
||||||
import Logs.Web
|
import Logs.Web
|
||||||
import Utility.Metered
|
import Utility.Metered
|
||||||
|
import Utility.DataUnits
|
||||||
|
|
||||||
type BucketName = String
|
type BucketName = String
|
||||||
|
|
||||||
|
@ -151,14 +152,46 @@ prepareS3 r info = resourcePrepare $ const $
|
||||||
|
|
||||||
store :: Remote -> S3Handle -> Storer
|
store :: Remote -> S3Handle -> Storer
|
||||||
store r h = fileStorer $ \k f p -> do
|
store r h = fileStorer $ \k f p -> do
|
||||||
rbody <- liftIO $ httpBodyStorer f p
|
case partSize (hinfo h) of
|
||||||
void $ sendS3Handle h $ putObject h (bucketObject (hinfo h) k) rbody
|
Just sz -> do
|
||||||
|
fsz <- fromIntegral . fileSize <$> liftIO (getFileStatus f)
|
||||||
|
if fsz > sz
|
||||||
|
then multipartupload sz k f p
|
||||||
|
else singlepartupload k f p
|
||||||
|
Nothing -> singlepartupload k f p
|
||||||
-- Store public URL to item in Internet Archive.
|
-- Store public URL to item in Internet Archive.
|
||||||
when (isIA (hinfo h) && not (isChunkKey k)) $
|
when (isIA (hinfo h) && not (isChunkKey k)) $
|
||||||
setUrlPresent k (iaKeyUrl r k)
|
setUrlPresent k (iaKeyUrl r k)
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
where
|
||||||
|
singlepartupload k f p = do
|
||||||
|
rbody <- liftIO $ httpBodyStorer f p
|
||||||
|
void $ sendS3Handle h $ putObject h (bucketObject (hinfo h) k) rbody
|
||||||
|
multipartupload sz k f p = do
|
||||||
|
#if MIN_VERSION_aws(0,10,4)
|
||||||
|
let info = hinfo h
|
||||||
|
let objects = bucketObject info h
|
||||||
|
|
||||||
|
uploadid <- S3.imurUploadId <$> sendS3Handle' h $
|
||||||
|
(S3.postInitiateMultipartUpload (bucket info) object)
|
||||||
|
{ S3.imuStorageClass = Just (storageClass info)
|
||||||
|
, S3.imuMetadata = metaHeaders info
|
||||||
|
, S3.imuAutoMakeBucket = isIA info
|
||||||
|
, S3.imuExpires = Nothing -- TODO set some reasonable expiry
|
||||||
|
}
|
||||||
|
|
||||||
|
-- TODO open file, read each part of size sz (streaming
|
||||||
|
-- it); send part to S3, and get a list of etags of all
|
||||||
|
-- the parts
|
||||||
|
|
||||||
|
|
||||||
|
void $ sendS3Handle' h $
|
||||||
|
S3.postCompleteMultipartUpload (bucket info) object uploadid $
|
||||||
|
zip [1..] (map T.pack etags)
|
||||||
|
#else
|
||||||
|
warning $ "Cannot do multipart upload (partsize " ++ show sz ++ "); built with too old a version of the aws library."
|
||||||
|
singlepartupload k f p
|
||||||
|
#endif
|
||||||
|
|
||||||
{- Implemented as a fileRetriever, that uses conduit to stream the chunks
|
{- Implemented as a fileRetriever, that uses conduit to stream the chunks
|
||||||
- out to the file. Would be better to implement a byteRetriever, but
|
- out to the file. Would be better to implement a byteRetriever, but
|
||||||
|
@ -373,6 +406,7 @@ data S3Info = S3Info
|
||||||
, storageClass :: S3.StorageClass
|
, storageClass :: S3.StorageClass
|
||||||
, bucketObject :: Key -> T.Text
|
, bucketObject :: Key -> T.Text
|
||||||
, metaHeaders :: [(T.Text, T.Text)]
|
, metaHeaders :: [(T.Text, T.Text)]
|
||||||
|
, partSize :: Maybe Integer
|
||||||
, isIA :: Bool
|
, isIA :: Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -387,6 +421,7 @@ extractS3Info c = do
|
||||||
, storageClass = getStorageClass c
|
, storageClass = getStorageClass c
|
||||||
, bucketObject = T.pack . getBucketObject c
|
, bucketObject = T.pack . getBucketObject c
|
||||||
, metaHeaders = getMetaHeaders c
|
, metaHeaders = getMetaHeaders c
|
||||||
|
, partSize = getPartSize c
|
||||||
, isIA = configIA c
|
, isIA = configIA c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -397,7 +432,10 @@ getStorageClass :: RemoteConfig -> S3.StorageClass
|
||||||
getStorageClass c = case M.lookup "storageclass" c of
|
getStorageClass c = case M.lookup "storageclass" c of
|
||||||
Just "REDUCED_REDUNDANCY" -> S3.ReducedRedundancy
|
Just "REDUCED_REDUNDANCY" -> S3.ReducedRedundancy
|
||||||
_ -> S3.Standard
|
_ -> S3.Standard
|
||||||
|
|
||||||
|
getPartSize :: RemoteConfig -> Maybe Integer
|
||||||
|
getPartSize c = readSize dataUnits =<< M.lookup "partsize" c
|
||||||
|
|
||||||
getMetaHeaders :: RemoteConfig -> [(T.Text, T.Text)]
|
getMetaHeaders :: RemoteConfig -> [(T.Text, T.Text)]
|
||||||
getMetaHeaders = map munge . filter ismetaheader . M.assocs
|
getMetaHeaders = map munge . filter ismetaheader . M.assocs
|
||||||
where
|
where
|
||||||
|
|
|
@ -21,6 +21,11 @@ the S3 remote.
|
||||||
* `chunk` - Enables [[chunking]] when storing large files.
|
* `chunk` - Enables [[chunking]] when storing large files.
|
||||||
`chunk=1MiB` is a good starting point for chunking.
|
`chunk=1MiB` is a good starting point for chunking.
|
||||||
|
|
||||||
|
* `partsize` - Specifies the largest object to attempt to store in the
|
||||||
|
bucket. Multipart uploads will be used when storing larger objects.
|
||||||
|
This is not enabled by default, but can be enabled or changed at any
|
||||||
|
time. Setting `partsize=1GiB` is reasonable for S3.
|
||||||
|
|
||||||
* `keyid` - Specifies the gpg key to use for [[encryption]].
|
* `keyid` - Specifies the gpg key to use for [[encryption]].
|
||||||
|
|
||||||
* `embedcreds` - Optional. Set to "yes" embed the login credentials inside
|
* `embedcreds` - Optional. Set to "yes" embed the login credentials inside
|
||||||
|
|
|
@ -159,7 +159,7 @@ Executable git-annex
|
||||||
if flag(PatchedAWS)
|
if flag(PatchedAWS)
|
||||||
Build-Depends: aws (>= 0.9.2)
|
Build-Depends: aws (>= 0.9.2)
|
||||||
else
|
else
|
||||||
Build-Depends: aws (>= 0.10.2)
|
Build-Depends: aws (>= 0.10.4)
|
||||||
CPP-Options: -DWITH_S3
|
CPP-Options: -DWITH_S3
|
||||||
|
|
||||||
if flag(WebDAV)
|
if flag(WebDAV)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue