S3 export (untested)
It opens a http connection per file exported, but then so does git annex copy --to s3. Decided not to munge exported filenames for IA. Too large a chance of the munging having confusing results. Instead, export of files not supported by IA, eg with spaces in their name, will fail. This commit was supported by the NSF-funded DataLad project.
This commit is contained in:
parent
a1b195d84c
commit
44cd5ae313
5 changed files with 121 additions and 64 deletions
134
Remote/S3.hs
134
Remote/S3.hs
|
@ -59,7 +59,7 @@ remote = RemoteType
|
|||
, enumerate = const (findSpecialRemotes "s3")
|
||||
, generate = gen
|
||||
, setup = s3Setup
|
||||
, exportSupported = exportUnsupported
|
||||
, exportSupported = exportIsSupported
|
||||
}
|
||||
|
||||
gen :: Git.Repo -> UUID -> RemoteConfig -> RemoteGitConfig -> Annex (Maybe Remote)
|
||||
|
@ -86,7 +86,13 @@ gen r u c gc = do
|
|||
, lockContent = Nothing
|
||||
, checkPresent = checkPresentDummy
|
||||
, checkPresentCheap = False
|
||||
, exportActions = exportUnsupported
|
||||
, exportActions = ExportActions
|
||||
{ storeExport = storeExportS3 this info
|
||||
, retrieveExport = retrieveExportS3 this info
|
||||
, removeExport = removeExportS3 this info
|
||||
, checkPresentExport = checkPresentExportS3 this info
|
||||
, renameExport = renameExportS3 this info
|
||||
}
|
||||
, whereisKey = Just (getWebUrls info)
|
||||
, remoteFsck = Nothing
|
||||
, repairRepo = Nothing
|
||||
|
@ -107,6 +113,7 @@ s3Setup :: SetupStage -> Maybe UUID -> Maybe CredPair -> RemoteConfig -> RemoteG
|
|||
s3Setup ss mu mcreds c gc = do
|
||||
u <- maybe (liftIO genUUID) return mu
|
||||
s3Setup' ss u mcreds c gc
|
||||
|
||||
s3Setup' :: SetupStage -> UUID -> Maybe CredPair -> RemoteConfig -> RemoteGitConfig -> Annex (RemoteConfig, UUID)
|
||||
s3Setup' ss u mcreds c gc
|
||||
| configIA c = archiveorg
|
||||
|
@ -170,25 +177,26 @@ prepareS3HandleMaybe r = resourcePrepare $ const $
|
|||
|
||||
store :: Remote -> S3Info -> S3Handle -> Storer
|
||||
store _r info h = fileStorer $ \k f p -> do
|
||||
case partSize info of
|
||||
Just partsz | partsz > 0 -> do
|
||||
fsz <- liftIO $ getFileSize f
|
||||
if fsz > partsz
|
||||
then multipartupload fsz partsz k f p
|
||||
else singlepartupload k f p
|
||||
_ -> singlepartupload k f p
|
||||
storeHelper info h f (T.pack $ bucketObject info k) p
|
||||
-- Store public URL to item in Internet Archive.
|
||||
when (isIA info && not (isChunkKey k)) $
|
||||
setUrlPresent webUUID k (iaPublicKeyUrl info k)
|
||||
return True
|
||||
where
|
||||
singlepartupload k f p = do
|
||||
rbody <- liftIO $ httpBodyStorer f p
|
||||
void $ sendS3Handle h $ putObject info (T.pack $ bucketObject info k) rbody
|
||||
multipartupload fsz partsz k f p = do
|
||||
#if MIN_VERSION_aws(0,10,6)
|
||||
let object = T.pack (bucketObject info k)
|
||||
|
||||
storeHelper :: S3Info -> S3Handle -> FilePath -> S3.Object -> MeterUpdate -> Annex ()
|
||||
storeHelper info h f object p = case partSize info of
|
||||
Just partsz | partsz > 0 -> do
|
||||
fsz <- liftIO $ getFileSize f
|
||||
if fsz > partsz
|
||||
then multipartupload fsz partsz
|
||||
else singlepartupload
|
||||
_ -> singlepartupload
|
||||
where
|
||||
singlepartupload = do
|
||||
rbody <- liftIO $ httpBodyStorer f p
|
||||
void $ sendS3Handle h $ putObject info object rbody
|
||||
multipartupload fsz partsz = do
|
||||
#if MIN_VERSION_aws(0,10,6)
|
||||
let startreq = (S3.postInitiateMultipartUpload (bucket info) object)
|
||||
{ S3.imuStorageClass = Just (storageClass info)
|
||||
, S3.imuMetadata = metaHeaders info
|
||||
|
@ -227,16 +235,27 @@ store _r info h = fileStorer $ \k f p -> do
|
|||
(bucket info) object uploadid (zip [1..] etags)
|
||||
#else
|
||||
warning $ "Cannot do multipart upload (partsize " ++ show partsz ++ ") of large file (" ++ show fsz ++ "); built with too old a version of the aws library."
|
||||
singlepartupload k f p
|
||||
singlepartupload
|
||||
#endif
|
||||
|
||||
{- Implemented as a fileRetriever, that uses conduit to stream the chunks
|
||||
- out to the file. Would be better to implement a byteRetriever, but
|
||||
- that is difficult. -}
|
||||
retrieve :: Remote -> S3Info -> Maybe S3Handle -> Retriever
|
||||
retrieve _ info (Just h) = fileRetriever $ \f k p -> liftIO $ runResourceT $ do
|
||||
retrieve _ info (Just h) = fileRetriever $ \f k p ->
|
||||
retrieveHelper info h (T.pack $ bucketObject info k) f p
|
||||
retrieve r info Nothing = case getpublicurl info of
|
||||
Nothing -> \_ _ _ -> do
|
||||
warnMissingCredPairFor "S3" (AWS.creds $ uuid r)
|
||||
return False
|
||||
Just geturl -> fileRetriever $ \f k p ->
|
||||
unlessM (downloadUrl k p [geturl k] f) $
|
||||
giveup "failed to download content"
|
||||
|
||||
retrieveHelper :: S3Info -> S3Handle -> S3.Object -> FilePath -> MeterUpdate -> Annex ()
|
||||
retrieveHelper info h object f p = liftIO $ runResourceT $ do
|
||||
(fr, fh) <- allocate (openFile f WriteMode) hClose
|
||||
let req = S3.getObject (bucket info) (T.pack $ bucketObject info k)
|
||||
let req = S3.getObject (bucket info) object
|
||||
S3.GetObjectResponse { S3.gorResponse = rsp } <- sendS3Handle' h req
|
||||
responseBody rsp $$+- sinkprogressfile fh p zeroBytesProcessed
|
||||
release fr
|
||||
|
@ -251,13 +270,6 @@ retrieve _ info (Just h) = fileRetriever $ \f k p -> liftIO $ runResourceT $ do
|
|||
void $ meterupdate sofar'
|
||||
S.hPut fh bs
|
||||
sinkprogressfile fh meterupdate sofar'
|
||||
retrieve r info Nothing = case getpublicurl info of
|
||||
Nothing -> \_ _ _ -> do
|
||||
warnMissingCredPairFor "S3" (AWS.creds $ uuid r)
|
||||
return False
|
||||
Just geturl -> fileRetriever $ \f k p ->
|
||||
unlessM (downloadUrl k p [geturl k] f) $
|
||||
giveup "failed to download content"
|
||||
|
||||
retrieveCheap :: Key -> AssociatedFile -> FilePath -> Annex Bool
|
||||
retrieveCheap _ _ _ = return False
|
||||
|
@ -276,8 +288,19 @@ remove info h k
|
|||
return $ either (const False) (const True) res
|
||||
|
||||
checkKey :: Remote -> S3Info -> Maybe S3Handle -> CheckPresent
|
||||
checkKey r info Nothing k = case getpublicurl info of
|
||||
Nothing -> do
|
||||
warnMissingCredPairFor "S3" (AWS.creds $ uuid r)
|
||||
giveup "No S3 credentials configured"
|
||||
Just geturl -> do
|
||||
showChecking r
|
||||
withUrlOptions $ checkBoth (geturl k) (keySize k)
|
||||
checkKey r info (Just h) k = do
|
||||
showChecking r
|
||||
checkKeyHelper info h (T.pack $ bucketObject info k)
|
||||
|
||||
checkKeyHelper :: S3Info -> S3Handle -> S3.Object -> Annex Bool
|
||||
checkKeyHelper info h object = do
|
||||
#if MIN_VERSION_aws(0,10,0)
|
||||
rsp <- go
|
||||
return (isJust $ S3.horMetadata rsp)
|
||||
|
@ -287,8 +310,7 @@ checkKey r info (Just h) k = do
|
|||
return True
|
||||
#endif
|
||||
where
|
||||
go = sendS3Handle h $
|
||||
S3.headObject (bucket info) (T.pack $ bucketObject info k)
|
||||
go = sendS3Handle h $ S3.headObject (bucket info) object
|
||||
|
||||
#if ! MIN_VERSION_aws(0,10,0)
|
||||
{- Catch exception headObject returns when an object is not present
|
||||
|
@ -303,13 +325,50 @@ checkKey r info (Just h) k = do
|
|||
| otherwise = Nothing
|
||||
#endif
|
||||
|
||||
checkKey r info Nothing k = case getpublicurl info of
|
||||
Nothing -> do
|
||||
warnMissingCredPairFor "S3" (AWS.creds $ uuid r)
|
||||
giveup "No S3 credentials configured"
|
||||
Just geturl -> do
|
||||
showChecking r
|
||||
withUrlOptions $ checkBoth (geturl k) (keySize k)
|
||||
storeExportS3 :: Remote -> S3Info -> FilePath -> Key -> ExportLocation -> MeterUpdate -> Annex Bool
|
||||
storeExportS3 r info f _k loc p =
|
||||
catchNonAsync go (\e -> warning (show e) >> return False)
|
||||
where
|
||||
go = withS3Handle (config r) (gitconfig r) (uuid r) $ \h -> do
|
||||
storeHelper info h f (T.pack $ bucketExportLocation info loc) p
|
||||
return True
|
||||
|
||||
retrieveExportS3 :: Remote -> S3Info -> Key -> ExportLocation -> FilePath -> MeterUpdate -> Annex Bool
|
||||
retrieveExportS3 r info _k loc f p =
|
||||
catchNonAsync go (\e -> warning (show e) >> return False)
|
||||
where
|
||||
go = withS3Handle (config r) (gitconfig r) (uuid r) $ \h -> do
|
||||
retrieveHelper info h (T.pack $ bucketExportLocation info loc) f p
|
||||
return True
|
||||
|
||||
removeExportS3 :: Remote -> S3Info -> Key -> ExportLocation -> Annex Bool
|
||||
removeExportS3 r info _k loc =
|
||||
catchNonAsync go (\e -> warning (show e) >> return False)
|
||||
where
|
||||
go = withS3Handle (config r) (gitconfig r) (uuid r) $ \h -> do
|
||||
res <- tryNonAsync $ sendS3Handle h $
|
||||
S3.DeleteObject (T.pack $ bucketExportLocation info loc) (bucket info)
|
||||
return $ either (const False) (const True) res
|
||||
|
||||
checkPresentExportS3 :: Remote -> S3Info -> Key -> ExportLocation -> Annex Bool
|
||||
checkPresentExportS3 r info _k loc =
|
||||
catchNonAsync go (\e -> warning (show e) >> return False)
|
||||
where
|
||||
go = withS3Handle (config r) (gitconfig r) (uuid r) $ \h -> do
|
||||
checkKeyHelper info h (T.pack $ bucketExportLocation info loc)
|
||||
|
||||
renameExportS3 :: Remote -> S3Info -> Key -> ExportLocation -> ExportLocation -> Annex Bool
|
||||
renameExportS3 r info _k src dest = catchNonAsync go (\e -> warning (show e) >> return False)
|
||||
where
|
||||
go = withS3Handle (config r) (gitconfig r) (uuid r) $ \h -> do
|
||||
-- S3 has no move primitive; copy and delete.
|
||||
void $ sendS3Handle h $ S3.copyObject (bucket info) dstobject
|
||||
(S3.ObjectId (bucket info) srcobject Nothing)
|
||||
S3.CopyMetadata
|
||||
void $ sendS3Handle h $ S3.DeleteObject srcobject (bucket info)
|
||||
return True
|
||||
srcobject = T.pack $ bucketExportLocation info src
|
||||
dstobject = T.pack $ bucketExportLocation info dest
|
||||
|
||||
{- Generate the bucket if it does not already exist, including creating the
|
||||
- UUID file within the bucket.
|
||||
|
@ -474,6 +533,7 @@ data S3Info = S3Info
|
|||
{ bucket :: S3.Bucket
|
||||
, storageClass :: S3.StorageClass
|
||||
, bucketObject :: Key -> String
|
||||
, bucketExportLocation :: ExportLocation -> String
|
||||
, metaHeaders :: [(T.Text, T.Text)]
|
||||
, partSize :: Maybe Integer
|
||||
, isIA :: Bool
|
||||
|
@ -491,6 +551,7 @@ extractS3Info c = do
|
|||
{ bucket = b
|
||||
, storageClass = getStorageClass c
|
||||
, bucketObject = getBucketObject c
|
||||
, bucketExportLocation = getBucketExportLocation c
|
||||
, metaHeaders = getMetaHeaders c
|
||||
, partSize = getPartSize c
|
||||
, isIA = configIA c
|
||||
|
@ -554,6 +615,9 @@ getBucketObject c = munge . key2file
|
|||
Just "ia" -> iaMunge $ getFilePrefix c ++ s
|
||||
_ -> getFilePrefix c ++ s
|
||||
|
||||
getBucketExportLocation :: RemoteConfig -> ExportLocation -> FilePath
|
||||
getBucketExportLocation c (ExportLocation loc) = getFilePrefix c ++ loc
|
||||
|
||||
{- Internet Archive limits filenames to a subset of ascii,
|
||||
- with no whitespace. Other characters are xml entity
|
||||
- encoded. -}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue