refactor sinkResponseFile and add downloadC
Remote.S3 and Remote.Helper.Http both had similar code to sink a http-conduit Response to a file; refactor out sinkResponseFile. downloadC downloads an url to a file using http-conduit, and supports resuming. Falls back to curl to handle urls that http-conduit does not support. This is not used yet, but the goal is to replace download with it. git-annex.cabal: conduit-extra was not actually used for a long time, remove the dep. conduit moves into the main dependency list, but since http-conduit was already in there, and it depends on conduit, that's not really adding a new build dep. This commit was supported by the NSF-funded DataLad project.
This commit is contained in:
parent
0e8564201e
commit
0f6775f1ff
6 changed files with 138 additions and 35 deletions
|
@ -15,6 +15,7 @@ import Utility.Metered
|
||||||
import Remote.Helper.Special
|
import Remote.Helper.Special
|
||||||
import Network.HTTP.Client (RequestBody(..), Response, responseStatus, responseBody, BodyReader, NeedsPopper)
|
import Network.HTTP.Client (RequestBody(..), Response, responseStatus, responseBody, BodyReader, NeedsPopper)
|
||||||
import Network.HTTP.Types
|
import Network.HTTP.Types
|
||||||
|
import Network.HTTP.Conduit
|
||||||
|
|
||||||
import qualified Data.ByteString.Lazy as L
|
import qualified Data.ByteString.Lazy as L
|
||||||
import qualified Data.ByteString as S
|
import qualified Data.ByteString as S
|
||||||
|
@ -71,15 +72,5 @@ handlePopper numchunks chunksize meterupdate h sink = do
|
||||||
httpBodyRetriever :: FilePath -> MeterUpdate -> Response BodyReader -> IO ()
|
httpBodyRetriever :: FilePath -> MeterUpdate -> Response BodyReader -> IO ()
|
||||||
httpBodyRetriever dest meterupdate resp
|
httpBodyRetriever dest meterupdate resp
|
||||||
| responseStatus resp /= ok200 = giveup $ show $ responseStatus resp
|
| responseStatus resp /= ok200 = giveup $ show $ responseStatus resp
|
||||||
| otherwise = bracket (openBinaryFile dest WriteMode) hClose (go zeroBytesProcessed)
|
| otherwise = runResourceT $
|
||||||
where
|
sinkResponseFile meterupdate zeroBytesProcessed dest WriteMode resp
|
||||||
reader = responseBody resp
|
|
||||||
go sofar h = do
|
|
||||||
b <- reader
|
|
||||||
if S.null b
|
|
||||||
then return ()
|
|
||||||
else do
|
|
||||||
let sofar' = addBytesProcessed sofar $ S.length b
|
|
||||||
S.hPut h b
|
|
||||||
meterupdate sofar'
|
|
||||||
go sofar' h
|
|
||||||
|
|
16
Remote/S3.hs
16
Remote/S3.hs
|
@ -47,6 +47,7 @@ import Creds
|
||||||
import Annex.UUID
|
import Annex.UUID
|
||||||
import Logs.Web
|
import Logs.Web
|
||||||
import Utility.Metered
|
import Utility.Metered
|
||||||
|
import qualified Utility.Url as Url
|
||||||
import Utility.DataUnits
|
import Utility.DataUnits
|
||||||
import Utility.FileSystemEncoding
|
import Utility.FileSystemEncoding
|
||||||
import Annex.Content
|
import Annex.Content
|
||||||
|
@ -259,22 +260,9 @@ retrieve r info Nothing = case getpublicurl info of
|
||||||
|
|
||||||
retrieveHelper :: S3Info -> S3Handle -> S3.Object -> FilePath -> MeterUpdate -> Annex ()
|
retrieveHelper :: S3Info -> S3Handle -> S3.Object -> FilePath -> MeterUpdate -> Annex ()
|
||||||
retrieveHelper info h object f p = liftIO $ runResourceT $ do
|
retrieveHelper info h object f p = liftIO $ runResourceT $ do
|
||||||
(fr, fh) <- allocate (openFile f WriteMode) hClose
|
|
||||||
let req = S3.getObject (bucket info) object
|
let req = S3.getObject (bucket info) object
|
||||||
S3.GetObjectResponse { S3.gorResponse = rsp } <- sendS3Handle' h req
|
S3.GetObjectResponse { S3.gorResponse = rsp } <- sendS3Handle' h req
|
||||||
responseBody rsp $$+- sinkprogressfile fh p zeroBytesProcessed
|
Url.sinkResponseFile p zeroBytesProcessed f WriteMode rsp
|
||||||
release fr
|
|
||||||
where
|
|
||||||
sinkprogressfile fh meterupdate sofar = do
|
|
||||||
mbs <- await
|
|
||||||
case mbs of
|
|
||||||
Nothing -> return ()
|
|
||||||
Just bs -> do
|
|
||||||
let sofar' = addBytesProcessed sofar (S.length bs)
|
|
||||||
liftIO $ do
|
|
||||||
void $ meterupdate sofar'
|
|
||||||
S.hPut fh bs
|
|
||||||
sinkprogressfile fh meterupdate sofar'
|
|
||||||
|
|
||||||
retrieveCheap :: Key -> AssociatedFile -> FilePath -> Annex Bool
|
retrieveCheap :: Key -> AssociatedFile -> FilePath -> Annex Bool
|
||||||
retrieveCheap _ _ _ = return False
|
retrieveCheap _ _ _ = return False
|
||||||
|
|
|
@ -326,7 +326,7 @@ setMeterTotalSize :: Meter -> Integer -> IO ()
|
||||||
setMeterTotalSize (Meter totalsizev _ _ _) = void . swapMVar totalsizev . Just
|
setMeterTotalSize (Meter totalsizev _ _ _) = void . swapMVar totalsizev . Just
|
||||||
|
|
||||||
-- | Updates the meter, displaying it if necessary.
|
-- | Updates the meter, displaying it if necessary.
|
||||||
updateMeter :: Meter -> BytesProcessed -> IO ()
|
updateMeter :: Meter -> MeterUpdate
|
||||||
updateMeter (Meter totalsizev sv bv displaymeter) new = do
|
updateMeter (Meter totalsizev sv bv displaymeter) new = do
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
(old, before) <- swapMVar sv (new, now)
|
(old, before) <- swapMVar sv (new, now)
|
||||||
|
|
132
Utility/Url.hs
132
Utility/Url.hs
|
@ -26,6 +26,8 @@ module Utility.Url (
|
||||||
assumeUrlExists,
|
assumeUrlExists,
|
||||||
download,
|
download,
|
||||||
downloadQuiet,
|
downloadQuiet,
|
||||||
|
downloadC,
|
||||||
|
sinkResponseFile,
|
||||||
downloadPartial,
|
downloadPartial,
|
||||||
parseURIRelaxed,
|
parseURIRelaxed,
|
||||||
matchStatusCodeException,
|
matchStatusCodeException,
|
||||||
|
@ -34,6 +36,7 @@ module Utility.Url (
|
||||||
|
|
||||||
import Common
|
import Common
|
||||||
import Utility.Tmp.Dir
|
import Utility.Tmp.Dir
|
||||||
|
import Utility.Metered
|
||||||
import qualified BuildInfo
|
import qualified BuildInfo
|
||||||
|
|
||||||
import Network.URI
|
import Network.URI
|
||||||
|
@ -45,6 +48,7 @@ import qualified Data.ByteString.Lazy as L
|
||||||
import Control.Monad.Trans.Resource
|
import Control.Monad.Trans.Resource
|
||||||
import Network.HTTP.Conduit
|
import Network.HTTP.Conduit
|
||||||
import Network.HTTP.Client (brRead, withResponse)
|
import Network.HTTP.Client (brRead, withResponse)
|
||||||
|
import Data.Conduit
|
||||||
|
|
||||||
#if ! MIN_VERSION_http_client(0,5,0)
|
#if ! MIN_VERSION_http_client(0,5,0)
|
||||||
responseTimeoutNone :: Maybe Int
|
responseTimeoutNone :: Maybe Int
|
||||||
|
@ -312,6 +316,111 @@ download' quiet url file uo = do
|
||||||
| quiet = [Param s]
|
| quiet = [Param s]
|
||||||
| otherwise = []
|
| otherwise = []
|
||||||
|
|
||||||
|
{- Download a perhaps large file, with auto-resume of incomplete downloads.
|
||||||
|
-
|
||||||
|
- By default, conduit is used for the download, except for file: urls,
|
||||||
|
- which are copied. If the url scheme is not supported by conduit, falls
|
||||||
|
- back to using curl.
|
||||||
|
-}
|
||||||
|
downloadC :: MeterUpdate -> URLString -> FilePath -> UrlOptions -> IO Bool
|
||||||
|
downloadC meterupdate url file uo = go `catchNonAsync` (const $ return False)
|
||||||
|
where
|
||||||
|
go = case parseURIRelaxed url of
|
||||||
|
Just u -> case parseUrlConduit (show u) of
|
||||||
|
Just req -> catchJust
|
||||||
|
-- When http redirects to a protocol which
|
||||||
|
-- conduit does not support, it will throw
|
||||||
|
-- a StatusCodeException with found302.
|
||||||
|
(matchStatusCodeException (== found302))
|
||||||
|
(downloadconduit req)
|
||||||
|
(const downloadcurl)
|
||||||
|
Nothing
|
||||||
|
| uriScheme u == "file:" -> do
|
||||||
|
let src = unEscapeString (uriPath u)
|
||||||
|
withMeteredFile src meterupdate $
|
||||||
|
L.writeFile file
|
||||||
|
return True
|
||||||
|
| BuildInfo.curl -> downloadcurl
|
||||||
|
| otherwise -> return False
|
||||||
|
Nothing -> return False
|
||||||
|
|
||||||
|
downloadconduit req = catchMaybeIO (getFileSize file) >>= \case
|
||||||
|
Nothing -> runResourceT $ do
|
||||||
|
resp <- http req (httpManager uo)
|
||||||
|
if responseStatus resp == ok200
|
||||||
|
then store zeroBytesProcessed WriteMode resp
|
||||||
|
else return False
|
||||||
|
Just sz -> resumeconduit req sz
|
||||||
|
|
||||||
|
alreadydownloaded sz s h = s == requestedRangeNotSatisfiable416
|
||||||
|
&& case lookup hContentRange h of
|
||||||
|
-- This could be improved by fixing
|
||||||
|
-- https://github.com/aristidb/http-types/issues/87
|
||||||
|
Just crh -> crh == B8.fromString ("bytes */" ++ show sz)
|
||||||
|
Nothing -> False
|
||||||
|
|
||||||
|
-- Resume download from where a previous download was interrupted,
|
||||||
|
-- when supported by the http server. The server may also opt to
|
||||||
|
-- send the whole file rather than resuming.
|
||||||
|
resumeconduit req sz = catchJust
|
||||||
|
(matchStatusCodeHeadersException (alreadydownloaded sz))
|
||||||
|
dl
|
||||||
|
(const $ return True)
|
||||||
|
where
|
||||||
|
dl = runResourceT $ do
|
||||||
|
let req' = req { requestHeaders = resumeFromHeader sz : requestHeaders req }
|
||||||
|
resp <- http req' (httpManager uo)
|
||||||
|
liftIO $ print ("XXX", responseStatus resp)
|
||||||
|
if responseStatus resp == partialContent206
|
||||||
|
then store (BytesProcessed sz) AppendMode resp
|
||||||
|
else if responseStatus resp == ok200
|
||||||
|
then store zeroBytesProcessed WriteMode resp
|
||||||
|
else return False
|
||||||
|
|
||||||
|
store initialp mode resp = do
|
||||||
|
sinkResponseFile meterupdate initialp file mode resp
|
||||||
|
return True
|
||||||
|
|
||||||
|
downloadcurl = do
|
||||||
|
-- curl does not create destination file
|
||||||
|
-- if the url happens to be empty, so pre-create.
|
||||||
|
unlessM (doesFileExist file) $
|
||||||
|
writeFile file ""
|
||||||
|
let headerparams = map (\h -> Param $ "--header=" ++ h) (reqHeaders uo)
|
||||||
|
let opts =
|
||||||
|
[ Param "-sS"
|
||||||
|
, Param "-f"
|
||||||
|
, Param "-L"
|
||||||
|
, Param "-C", Param "-"
|
||||||
|
, Param "-o"
|
||||||
|
]
|
||||||
|
boolSystem "curl" $ addUserAgent uo $ concat
|
||||||
|
[ headerparams
|
||||||
|
, opts
|
||||||
|
, reqParams uo
|
||||||
|
, [File file, File url]
|
||||||
|
]
|
||||||
|
|
||||||
|
{- Sinks a Response's body to a file. The file can either be opened in
|
||||||
|
- WriteMode or AppendMode. Updates the meter as data is received.
|
||||||
|
-
|
||||||
|
- Note that the responseStatus is not checked by this function.
|
||||||
|
-}
|
||||||
|
sinkResponseFile :: MonadResource m => MeterUpdate -> BytesProcessed -> FilePath -> IOMode -> Response (ResumableSource m B8.ByteString) -> m ()
|
||||||
|
sinkResponseFile meterupdate initialp file mode resp = do
|
||||||
|
(fr, fh) <- allocate (openBinaryFile file mode) hClose
|
||||||
|
responseBody resp $$+- go initialp fh
|
||||||
|
release fr
|
||||||
|
where
|
||||||
|
go sofar fh = await >>= \case
|
||||||
|
Nothing -> return ()
|
||||||
|
Just bs -> do
|
||||||
|
let sofar' = addBytesProcessed sofar (B.length bs)
|
||||||
|
liftIO $ do
|
||||||
|
void $ meterupdate sofar'
|
||||||
|
B.hPut fh bs
|
||||||
|
go sofar' fh
|
||||||
|
|
||||||
{- Downloads at least the specified number of bytes from an url. -}
|
{- Downloads at least the specified number of bytes from an url. -}
|
||||||
downloadPartial :: URLString -> UrlOptions -> Int -> IO (Maybe L.ByteString)
|
downloadPartial :: URLString -> UrlOptions -> Int -> IO (Maybe L.ByteString)
|
||||||
downloadPartial url uo n = case parseURIRelaxed url of
|
downloadPartial url uo n = case parseURIRelaxed url of
|
||||||
|
@ -371,20 +480,29 @@ hAcceptEncoding = "Accept-Encoding"
|
||||||
hContentDisposition :: CI.CI B.ByteString
|
hContentDisposition :: CI.CI B.ByteString
|
||||||
hContentDisposition = "Content-Disposition"
|
hContentDisposition = "Content-Disposition"
|
||||||
|
|
||||||
|
hContentRange :: CI.CI B.ByteString
|
||||||
|
hContentRange = "Content-Range"
|
||||||
|
|
||||||
|
resumeFromHeader :: FileSize -> Header
|
||||||
|
resumeFromHeader sz = (hRange, renderByteRanges [ByteRangeFrom sz])
|
||||||
|
|
||||||
{- Use with eg:
|
{- Use with eg:
|
||||||
-
|
-
|
||||||
- > catchJust (matchStatusCodeException (== notFound404))
|
- > catchJust (matchStatusCodeException (== notFound404))
|
||||||
-}
|
-}
|
||||||
|
matchStatusCodeException :: (Status -> Bool) -> HttpException -> Maybe HttpException
|
||||||
|
matchStatusCodeException want = matchStatusCodeHeadersException (\s _h -> want s)
|
||||||
|
|
||||||
#if MIN_VERSION_http_client(0,5,0)
|
#if MIN_VERSION_http_client(0,5,0)
|
||||||
matchStatusCodeException :: (Status -> Bool) -> HttpException -> Maybe HttpException
|
matchStatusCodeHeadersException :: (Status -> ResponseHeaders -> Bool) -> HttpException -> Maybe HttpException
|
||||||
matchStatusCodeException want e@(HttpExceptionRequest _ (StatusCodeException r _))
|
matchStatusCodeHeadersException want e@(HttpExceptionRequest _ (StatusCodeException r _))
|
||||||
| want (responseStatus r) = Just e
|
| want (responseStatus r) (responseHeaders r) = Just e
|
||||||
| otherwise = Nothing
|
| otherwise = Nothing
|
||||||
matchStatusCodeException _ _ = Nothing
|
matchStatusCodeHeadersException _ _ = Nothing
|
||||||
#else
|
#else
|
||||||
matchStatusCodeException :: (Status -> Bool) -> HttpException -> Maybe HttpException
|
matchStatusCodeHeadersException :: (Status -> ResponseHeaders -> Bool) -> HttpException -> Maybe HttpException
|
||||||
matchStatusCodeException want e@(StatusCodeException s _ _)
|
matchStatusCodeHeadersException want e@(StatusCodeException s r _)
|
||||||
| want s = Just e
|
| want s r = Just e
|
||||||
| otherwise = Nothing
|
| otherwise = Nothing
|
||||||
matchStatusCodeException _ _ = Nothing
|
matchStatusCodeException _ _ = Nothing
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -32,3 +32,8 @@ to http-conduit which does not support it. Maybe require users to set
|
||||||
supports netrc?
|
supports netrc?
|
||||||
|
|
||||||
--[[Joey]]
|
--[[Joey]]
|
||||||
|
|
||||||
|
> Implemented Utility.Url.downloadC that is the (nontrivial)
|
||||||
|
> download a file with resume support using http-conduit.
|
||||||
|
> It falls back to curl to handle urls that http-conduit does not support.
|
||||||
|
> Now we only have to decide what to do about the above edge cases..
|
||||||
|
|
|
@ -343,6 +343,7 @@ Executable git-annex
|
||||||
http-client,
|
http-client,
|
||||||
http-types (>= 0.7),
|
http-types (>= 0.7),
|
||||||
http-conduit (>= 2.0),
|
http-conduit (>= 2.0),
|
||||||
|
conduit,
|
||||||
time,
|
time,
|
||||||
old-locale,
|
old-locale,
|
||||||
esqueleto,
|
esqueleto,
|
||||||
|
@ -409,7 +410,7 @@ Executable git-annex
|
||||||
Other-Modules: Utility.Touch.Old
|
Other-Modules: Utility.Touch.Old
|
||||||
|
|
||||||
if flag(S3)
|
if flag(S3)
|
||||||
Build-Depends: conduit, conduit-extra, aws (>= 0.9.2)
|
Build-Depends: aws (>= 0.9.2)
|
||||||
CPP-Options: -DWITH_S3
|
CPP-Options: -DWITH_S3
|
||||||
Other-Modules: Remote.S3
|
Other-Modules: Remote.S3
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue