deepseq all things returned from ResourceT http
Potentially fixes https://git-annex.branchable.com/bugs/concurrent_git-annex-copy_to_s3_special_remote_fails/ although I don't know if it does. My thinking is, ResourceT may allocate a resource and then free it, and a unforced thunk to that resource could result in reading memory that has since been overwritten by something else, or in a SEGV, depending. While that seems kind of like a bug in ResourceT to me, if it is what's happening, this will avoid it. If it's not, this doesn't really hurt much since the values are all smallish. This commit was sponsored by Graham Spencer on Patreon.
This commit is contained in:
parent
e26960752c
commit
ddf963d019
4 changed files with 66 additions and 32 deletions
31
Remote/S3.hs
31
Remote/S3.hs
|
@ -36,6 +36,7 @@ import Data.IORef
|
|||
import System.Log.Logger
|
||||
import Control.Concurrent.STM (atomically)
|
||||
import Control.Concurrent.STM.TVar
|
||||
import Data.Maybe
|
||||
|
||||
import Annex.Common
|
||||
import Types.Remote
|
||||
|
@ -63,6 +64,7 @@ import Utility.Metered
|
|||
import Utility.DataUnits
|
||||
import Annex.Content
|
||||
import qualified Annex.Url as Url
|
||||
import Utility.Url (extractFromResourceT)
|
||||
import Annex.Url (getUrlOptions, withUrlOptions, UrlOptions(..))
|
||||
import Utility.Env
|
||||
|
||||
|
@ -343,12 +345,14 @@ storeHelper info h magic f object p = liftIO $ case partSize info of
|
|||
let req = (putObject info object rbody)
|
||||
{ S3.poContentType = encodeBS <$> contenttype }
|
||||
resp <- sendS3Handle h req
|
||||
let vid = mkS3VersionID object (S3.porVersionId resp)
|
||||
vid <- mkS3VersionID object
|
||||
<$> extractFromResourceT (S3.porVersionId resp)
|
||||
-- FIXME Actual aws version that supports this is not known,
|
||||
-- patch not merged yet.
|
||||
-- https://github.com/aristidb/aws/issues/258
|
||||
#if MIN_VERSION_aws(0,99,0)
|
||||
return (Just (S3.porETag resp), vid)
|
||||
etag <- extractFromResourceT (Just (S3.porETag resp))
|
||||
return (etag, vid)
|
||||
#else
|
||||
return (Nothing, vid)
|
||||
#endif
|
||||
|
@ -389,7 +393,9 @@ storeHelper info h magic f object p = liftIO $ case partSize info of
|
|||
|
||||
resp <- sendS3Handle h $ S3.postCompleteMultipartUpload
|
||||
(bucket info) object uploadid (zip [1..] etags)
|
||||
return (Just (S3.cmurETag resp), mkS3VersionID object (S3.cmurVersionId resp))
|
||||
etag <- extractFromResourceT (Just (S3.cmurETag resp))
|
||||
vid <- extractFromResourceT (S3.cmurVersionId resp)
|
||||
return (etag, mkS3VersionID object vid)
|
||||
getcontenttype = maybe (pure Nothing) (flip getMagicMimeType f) magic
|
||||
|
||||
{- Implemented as a fileRetriever, that uses conduit to stream the chunks
|
||||
|
@ -474,7 +480,7 @@ checkKeyHelper info h loc = checkKeyHelper' info h o limit
|
|||
checkKeyHelper' :: S3Info -> S3Handle -> S3.Object -> (S3.HeadObject -> S3.HeadObject) -> Annex Bool
|
||||
checkKeyHelper' info h o limit = liftIO $ runResourceT $ do
|
||||
rsp <- sendS3Handle h req
|
||||
return (isJust $ S3.horMetadata rsp)
|
||||
extractFromResourceT (isJust $ S3.horMetadata rsp)
|
||||
where
|
||||
req = limit $ S3.headObject (bucket info) o
|
||||
|
||||
|
@ -552,7 +558,8 @@ listImportableContentsS3 hv r info =
|
|||
Nothing -> do
|
||||
warning $ needS3Creds (uuid r)
|
||||
return Nothing
|
||||
Just h -> catchMaybeIO $ liftIO $ runResourceT $ startlist h
|
||||
Just h -> catchMaybeIO $ liftIO $ runResourceT $
|
||||
extractFromResourceT =<< startlist h
|
||||
where
|
||||
startlist h
|
||||
| versioning info = do
|
||||
|
@ -733,11 +740,12 @@ genBucket c gc u = do
|
|||
where
|
||||
go _ _ (Right True) = noop
|
||||
go info h _ = do
|
||||
v <- liftIO $ tryNonAsync $ runResourceT $
|
||||
sendS3Handle h (S3.getBucket $ bucket info)
|
||||
case v of
|
||||
Right _ -> noop
|
||||
Left _ -> do
|
||||
r <- liftIO $ tryNonAsync $ runResourceT $ do
|
||||
void $ sendS3Handle h (S3.getBucket $ bucket info)
|
||||
return True
|
||||
case r of
|
||||
Right True -> noop
|
||||
_ -> do
|
||||
showAction $ "creating bucket in " ++ datacenter
|
||||
void $ liftIO $ runResourceT $ sendS3Handle h $
|
||||
(S3.putBucket (bucket info))
|
||||
|
@ -787,8 +795,7 @@ checkUUIDFile c u info h = tryNonAsync $ liftIO $ runResourceT $ do
|
|||
Left _ -> return False
|
||||
Right r -> do
|
||||
v <- AWS.loadToMemory r
|
||||
let !ok = check v
|
||||
return ok
|
||||
extractFromResourceT (check v)
|
||||
where
|
||||
check (S3.GetObjectMemoryResponse _meta rsp) =
|
||||
responseStatus rsp == ok200 && responseBody rsp == uuidb
|
||||
|
|
|
@ -5,6 +5,8 @@
|
|||
- Licensed under the GNU AGPL version 3 or higher.
|
||||
-}
|
||||
|
||||
{-# LANGUAGE DeriveGeneric #-}
|
||||
|
||||
module Types.Export (
|
||||
ExportLocation,
|
||||
mkExportLocation,
|
||||
|
@ -20,12 +22,16 @@ import Utility.Split
|
|||
import Utility.FileSystemEncoding
|
||||
|
||||
import qualified System.FilePath.Posix as Posix
|
||||
import GHC.Generics
|
||||
import Control.DeepSeq
|
||||
|
||||
-- A location on a remote that a key can be exported to.
|
||||
-- The RawFilePath will be relative to the top of the remote,
|
||||
-- and uses unix-style path separators.
|
||||
newtype ExportLocation = ExportLocation RawFilePath
|
||||
deriving (Show, Eq)
|
||||
deriving (Show, Eq, Generic)
|
||||
|
||||
instance NFData ExportLocation
|
||||
|
||||
mkExportLocation :: RawFilePath -> ExportLocation
|
||||
mkExportLocation = ExportLocation . toInternalGitPath
|
||||
|
|
|
@ -5,10 +5,14 @@
|
|||
- Licensed under the GNU AGPL version 3 or higher.
|
||||
-}
|
||||
|
||||
{-# LANGUAGE DeriveGeneric #-}
|
||||
|
||||
module Types.Import where
|
||||
|
||||
import qualified Data.ByteString as S
|
||||
import Data.Char
|
||||
import Control.DeepSeq
|
||||
import GHC.Generics
|
||||
|
||||
import Types.Export
|
||||
import Utility.QuickCheck
|
||||
|
@ -29,7 +33,9 @@ fromImportLocation = fromExportLocation
|
|||
- the repository. It should be reasonably short since it is stored in the
|
||||
- git-annex branch. -}
|
||||
newtype ContentIdentifier = ContentIdentifier S.ByteString
|
||||
deriving (Eq, Ord, Show)
|
||||
deriving (Eq, Ord, Show, Generic)
|
||||
|
||||
instance NFData ContentIdentifier
|
||||
|
||||
instance Arbitrary ContentIdentifier where
|
||||
-- Avoid non-ascii ContentIdentifiers because fully arbitrary
|
||||
|
@ -47,4 +53,6 @@ data ImportableContents info = ImportableContents
|
|||
-- files that are stored in them. This is equivilant to a git
|
||||
-- commit history.
|
||||
}
|
||||
deriving (Show)
|
||||
deriving (Show, Generic)
|
||||
|
||||
instance NFData info => NFData (ImportableContents info)
|
||||
|
|
|
@ -41,6 +41,7 @@ module Utility.Url (
|
|||
GetBasicAuth,
|
||||
noBasicAuth,
|
||||
applyBasicAuth',
|
||||
extractFromResourceT,
|
||||
) where
|
||||
|
||||
import Common
|
||||
|
@ -60,8 +61,10 @@ import qualified Data.ByteString as B
|
|||
import qualified Data.ByteString.UTF8 as B8
|
||||
import qualified Data.ByteString.Lazy as L
|
||||
import qualified Data.Set as S
|
||||
import Control.Exception (throwIO)
|
||||
import Control.Exception (throwIO, evaluate)
|
||||
import Control.Monad.Trans.Resource
|
||||
import Control.Monad.IO.Class (MonadIO)
|
||||
import Control.DeepSeq
|
||||
import Network.HTTP.Conduit
|
||||
import Network.HTTP.Client
|
||||
import Network.HTTP.Simple (getResponseHeader)
|
||||
|
@ -269,12 +272,10 @@ getUrlInfo url uo = case parseURIRelaxed url of
|
|||
debugM "url" (show req')
|
||||
join $ runResourceT $ do
|
||||
resp <- http req' (httpManager uo)
|
||||
-- forces processing the response while
|
||||
-- within the runResourceT
|
||||
liftIO $ if responseStatus resp == ok200
|
||||
if responseStatus resp == ok200
|
||||
then do
|
||||
let !len = extractlen resp
|
||||
let !fn = extractfilename resp
|
||||
len <- extractFromResourceT (extractlen resp)
|
||||
fn <- extractFromResourceT (extractfilename resp)
|
||||
return $ found len fn
|
||||
else if responseStatus resp == unauthorized401
|
||||
then return $ getBasicAuth uo' (show (getUri req)) >>= \case
|
||||
|
@ -463,11 +464,13 @@ downloadConduit meterupdate req file uo =
|
|||
then do
|
||||
store zeroBytesProcessed WriteMode resp
|
||||
return (return ())
|
||||
else if responseStatus resp == unauthorized401
|
||||
else do
|
||||
rf <- extractFromResourceT (respfailure resp)
|
||||
if responseStatus resp == unauthorized401
|
||||
then return $ getBasicAuth uo (show (getUri req')) >>= \case
|
||||
Nothing -> respfailure resp
|
||||
Nothing -> giveup rf
|
||||
Just ba -> retryauthed ba
|
||||
else return $ respfailure resp
|
||||
else return $ giveup rf
|
||||
where
|
||||
req' = applyRequest uo $ req
|
||||
-- Override http-client's default decompression of gzip
|
||||
|
@ -498,11 +501,13 @@ downloadConduit meterupdate req file uo =
|
|||
then do
|
||||
store zeroBytesProcessed WriteMode resp
|
||||
return (return ())
|
||||
else if responseStatus resp == unauthorized401
|
||||
else do
|
||||
rf <- extractFromResourceT (respfailure resp)
|
||||
if responseStatus resp == unauthorized401
|
||||
then return $ getBasicAuth uo (show (getUri req'')) >>= \case
|
||||
Nothing -> respfailure resp
|
||||
Nothing -> giveup rf
|
||||
Just ba -> retryauthed ba
|
||||
else return $ respfailure resp
|
||||
else return $ giveup rf
|
||||
|
||||
alreadydownloaded sz s h = s == requestedRangeNotSatisfiable416
|
||||
&& case lookup hContentRange h of
|
||||
|
@ -521,7 +526,7 @@ downloadConduit meterupdate req file uo =
|
|||
store initialp mode resp =
|
||||
sinkResponseFile meterupdate initialp file mode resp
|
||||
|
||||
respfailure = giveup . B8.toString . statusMessage . responseStatus
|
||||
respfailure = B8.toString . statusMessage . responseStatus
|
||||
|
||||
retryauthed (ba, signalsuccess) = do
|
||||
r <- tryNonAsync $ downloadConduit
|
||||
|
@ -710,3 +715,11 @@ applyBasicAuth' :: BasicAuth -> Request -> Request
|
|||
applyBasicAuth' ba = applyBasicAuth
|
||||
(encodeBS (basicAuthUser ba))
|
||||
(encodeBS (basicAuthPassword ba))
|
||||
|
||||
{- Make sure whatever is returned is fully evaluated. Avoids any possible
|
||||
- issues with laziness deferring processing until a time when the resource
|
||||
- has been freed. -}
|
||||
extractFromResourceT :: (MonadIO m, NFData a) => a -> ResourceT m a
|
||||
extractFromResourceT v = do
|
||||
liftIO $ evaluate (rnf v)
|
||||
return v
|
||||
|
|
Loading…
Reference in a new issue