avoid potentually unsafe use of runResourceT
Pushed the ResourceT out into larger code blocks, and made sure that the the http result from a sendS3Handle is processed inside the same ResourceT block. I don't think this fixes any bugs, but it allows getting rid of a scary comment. This commit was sponsored by Eric Drechsel on Patreon.
This commit is contained in:
parent
9cebfd7002
commit
8eb66a5c40
1 changed files with 25 additions and 35 deletions
60
Remote/S3.hs
60
Remote/S3.hs
|
@ -191,23 +191,24 @@ store r info magic = fileStorer $ \k f p -> withS3Handle r $ \h -> do
|
|||
return True
|
||||
|
||||
storeHelper :: S3Info -> S3Handle -> Maybe Magic -> FilePath -> S3.Object -> MeterUpdate -> Annex (Maybe S3VersionID)
|
||||
storeHelper info h magic f object p = case partSize info of
|
||||
storeHelper info h magic f object p = liftIO $ case partSize info of
|
||||
Just partsz | partsz > 0 -> do
|
||||
fsz <- liftIO $ getFileSize f
|
||||
fsz <- getFileSize f
|
||||
if fsz > partsz
|
||||
then multipartupload fsz partsz
|
||||
else singlepartupload
|
||||
_ -> singlepartupload
|
||||
where
|
||||
singlepartupload = do
|
||||
contenttype <- getcontenttype
|
||||
singlepartupload = runResourceT $ do
|
||||
contenttype <- liftIO getcontenttype
|
||||
rbody <- liftIO $ httpBodyStorer f p
|
||||
r <- sendS3Handle h $ (putObject info object rbody)
|
||||
let req = (putObject info object rbody)
|
||||
{ S3.poContentType = encodeBS <$> contenttype }
|
||||
return (mkS3VersionID object (S3.porVersionId r))
|
||||
multipartupload fsz partsz = do
|
||||
vid <- S3.porVersionId <$> sendS3Handle h req
|
||||
return (mkS3VersionID object vid)
|
||||
multipartupload fsz partsz = runResourceT $ do
|
||||
#if MIN_VERSION_aws(0,16,0)
|
||||
contenttype <- getcontenttype
|
||||
contenttype <- liftIO getcontenttype
|
||||
let startreq = (S3.postInitiateMultipartUpload (bucket info) object)
|
||||
{ S3.imuStorageClass = Just (storageClass info)
|
||||
, S3.imuMetadata = metaHeaders info
|
||||
|
@ -247,11 +248,10 @@ storeHelper info h magic f object p = case partSize info of
|
|||
(bucket info) object uploadid (zip [1..] etags)
|
||||
return (mkS3VersionID object (S3.cmurVersionId r))
|
||||
#else
|
||||
warning $ "Cannot do multipart upload (partsize " ++ show partsz ++ ") of large file (" ++ show fsz ++ "); built with too old a version of the aws library."
|
||||
warningIO $ "Cannot do multipart upload (partsize " ++ show partsz ++ ") of large file (" ++ show fsz ++ "); built with too old a version of the aws library."
|
||||
singlepartupload
|
||||
#endif
|
||||
getcontenttype = liftIO $
|
||||
maybe (pure Nothing) (flip getMagicMimeType f) magic
|
||||
getcontenttype = maybe (pure Nothing) (flip getMagicMimeType f) magic
|
||||
|
||||
{- Implemented as a fileRetriever, that uses conduit to stream the chunks
|
||||
- out to the file. Would be better to implement a byteRetriever, but
|
||||
|
@ -278,14 +278,14 @@ retrieveHelper info h loc f p = liftIO $ runResourceT $ do
|
|||
Left o -> S3.getObject (bucket info) o
|
||||
Right (S3VersionID o vid) -> (S3.getObject (bucket info) o)
|
||||
{ S3.goVersionId = Just vid }
|
||||
S3.GetObjectResponse { S3.gorResponse = rsp } <- sendS3Handle' h req
|
||||
S3.GetObjectResponse { S3.gorResponse = rsp } <- sendS3Handle h req
|
||||
Url.sinkResponseFile p zeroBytesProcessed f WriteMode rsp
|
||||
|
||||
retrieveCheap :: Key -> AssociatedFile -> FilePath -> Annex Bool
|
||||
retrieveCheap _ _ _ = return False
|
||||
|
||||
remove :: Remote -> S3Info -> Remover
|
||||
remove r info k = withS3Handle r $ \h -> do
|
||||
remove r info k = withS3Handle r $ \h -> liftIO $ runResourceT $ do
|
||||
res <- tryNonAsync $ sendS3Handle h $
|
||||
S3.DeleteObject (T.pack $ bucketObject info k) (bucket info)
|
||||
return $ either (const False) (const True) res
|
||||
|
@ -311,7 +311,7 @@ checkKey r c info k = withS3HandleMaybe r $ \case
|
|||
anyM check us
|
||||
|
||||
checkKeyHelper :: S3Info -> S3Handle -> (Either S3.Object S3VersionID) -> Annex Bool
|
||||
checkKeyHelper info h loc = do
|
||||
checkKeyHelper info h loc = liftIO $ runResourceT $ do
|
||||
#if MIN_VERSION_aws(0,10,0)
|
||||
rsp <- go
|
||||
return (isJust $ S3.horMetadata rsp)
|
||||
|
@ -377,7 +377,7 @@ removeExportS3 r info k loc = withS3HandleMaybe r $ \case
|
|||
warning $ needS3Creds (uuid r)
|
||||
return False
|
||||
where
|
||||
go h = do
|
||||
go h = liftIO $ runResourceT $ do
|
||||
res <- tryNonAsync $ sendS3Handle h $
|
||||
S3.DeleteObject (T.pack $ bucketExportLocation info loc) (bucket info)
|
||||
return $ either (const False) (const True) res
|
||||
|
@ -401,7 +401,7 @@ renameExportS3 r info k src dest = withS3HandleMaybe r $ \case
|
|||
warning $ needS3Creds (uuid r)
|
||||
return False
|
||||
where
|
||||
go h = do
|
||||
go h = liftIO $ runResourceT $ do
|
||||
let co = S3.copyObject (bucket info) dstobject
|
||||
(S3.ObjectId (bucket info) srcobject Nothing)
|
||||
S3.CopyMetadata
|
||||
|
@ -428,12 +428,13 @@ genBucket c gc u = do
|
|||
where
|
||||
go _ _ (Right True) = noop
|
||||
go info h _ = do
|
||||
v <- tryNonAsync $ sendS3Handle h (S3.getBucket $ bucket info)
|
||||
v <- liftIO $ tryNonAsync $ runResourceT $
|
||||
sendS3Handle h (S3.getBucket $ bucket info)
|
||||
case v of
|
||||
Right _ -> noop
|
||||
Left _ -> do
|
||||
showAction $ "creating bucket in " ++ datacenter
|
||||
void $ sendS3Handle h $
|
||||
void $ liftIO $ runResourceT $ sendS3Handle h $
|
||||
(S3.putBucket (bucket info))
|
||||
{ S3.pbCannedAcl = acl info
|
||||
, S3.pbLocationConstraint = locconstraint
|
||||
|
@ -469,7 +470,7 @@ writeUUIDFile c u info h = do
|
|||
Right False -> do
|
||||
warning "The bucket already exists, and its annex-uuid file indicates it is used by a different special remote."
|
||||
giveup "Cannot reuse this bucket."
|
||||
_ -> void $ sendS3Handle h mkobject
|
||||
_ -> void $ liftIO $ runResourceT $ sendS3Handle h mkobject
|
||||
where
|
||||
file = T.pack $ uuidFile c
|
||||
uuidb = L.fromChunks [T.encodeUtf8 $ T.pack $ fromUUID u]
|
||||
|
@ -480,7 +481,7 @@ writeUUIDFile c u info h = do
|
|||
- and has the specified UUID already. -}
|
||||
checkUUIDFile :: RemoteConfig -> UUID -> S3Info -> S3Handle -> Annex (Either SomeException Bool)
|
||||
checkUUIDFile c u info h = tryNonAsync $ liftIO $ runResourceT $ do
|
||||
resp <- tryS3 $ sendS3Handle' h (S3.getObject (bucket info) file)
|
||||
resp <- tryS3 $ sendS3Handle h (S3.getObject (bucket info) file)
|
||||
case resp of
|
||||
Left _ -> return False
|
||||
Right r -> do
|
||||
|
@ -506,25 +507,13 @@ data S3Handle = S3Handle
|
|||
, hs3cfg :: S3.S3Configuration AWS.NormalQuery
|
||||
}
|
||||
|
||||
{- Sends a request to S3 and gets back the response.
|
||||
-
|
||||
- Note that pureAws's use of ResourceT is bypassed here;
|
||||
- the response should be fully processed while the S3Handle
|
||||
- is still open, eg within a call to withS3Handle.
|
||||
-}
|
||||
{- Sends a request to S3 and gets back the response. -}
|
||||
sendS3Handle
|
||||
:: (AWS.Transaction req res, AWS.ServiceConfiguration req ~ S3.S3Configuration)
|
||||
=> S3Handle
|
||||
-> req
|
||||
-> Annex res
|
||||
sendS3Handle h r = liftIO $ runResourceT $ sendS3Handle' h r
|
||||
|
||||
sendS3Handle'
|
||||
:: (AWS.Transaction r a, AWS.ServiceConfiguration r ~ S3.S3Configuration)
|
||||
=> S3Handle
|
||||
-> r
|
||||
-> ResourceT IO a
|
||||
sendS3Handle' h r = AWS.pureAws (hawscfg h) (hs3cfg h) (hmanager h) r
|
||||
sendS3Handle h r = AWS.pureAws (hawscfg h) (hs3cfg h) (hmanager h) r
|
||||
|
||||
withS3Handle :: Remote -> (S3Handle -> Annex a) -> Annex a
|
||||
withS3Handle r = withS3Handle' (config r) (gitconfig r) (uuid r)
|
||||
|
@ -877,7 +866,8 @@ enableBucketVersioning ss c _ _ = do
|
|||
#if MIN_VERSION_aws(0,22,0)
|
||||
showAction "enabling bucket versioning"
|
||||
withS3Handle' c gc u $ \h ->
|
||||
void $ sendS3Handle h $ S3.putBucketVersioning b S3.VersioningEnabled
|
||||
void $ liftIO $ runResourceT $ sendS3Handle h $
|
||||
S3.putBucketVersioning b S3.VersioningEnabled
|
||||
#else
|
||||
showLongNote $ unlines
|
||||
[ "This version of git-annex cannot auto-enable S3 bucket versioning."
|
||||
|
|
Loading…
Reference in a new issue