2014-08-07 23:32:23 +00:00
|
|
|
{- helpers for remotes using http
|
|
|
|
-
|
2015-01-21 16:50:09 +00:00
|
|
|
- Copyright 2014 Joey Hess <id@joeyh.name>
|
2014-08-07 23:32:23 +00:00
|
|
|
-
|
2019-03-13 19:48:14 +00:00
|
|
|
- Licensed under the GNU AGPL version 3 or higher.
|
2014-08-07 23:32:23 +00:00
|
|
|
-}
|
|
|
|
|
2014-11-04 19:22:08 +00:00
|
|
|
{-# LANGUAGE BangPatterns #-}
|
|
|
|
|
2014-08-07 23:32:23 +00:00
|
|
|
module Remote.Helper.Http where
|
|
|
|
|
2016-01-20 20:36:33 +00:00
|
|
|
import Annex.Common
|
2014-08-07 23:32:23 +00:00
|
|
|
import Types.StoreRetrieve
|
|
|
|
import Remote.Helper.Special
|
2018-04-06 20:59:14 +00:00
|
|
|
import Utility.Metered
|
2014-08-07 23:32:23 +00:00
|
|
|
|
|
|
|
import qualified Data.ByteString.Lazy as L
|
|
|
|
import qualified Data.ByteString as S
|
|
|
|
import Control.Concurrent
|
2018-04-06 20:59:14 +00:00
|
|
|
import Network.HTTP.Client (RequestBody(..), Response, responseStatus, responseBody, BodyReader, NeedsPopper)
|
|
|
|
import Network.HTTP.Types
|
2014-08-07 23:32:23 +00:00
|
|
|
|
|
|
|
-- A storer that expects to be provided with a http RequestBody containing
|
|
|
|
-- the content to store.
|
|
|
|
--
|
|
|
|
-- Implemented as a fileStorer, so that the content can be streamed
|
|
|
|
-- from the file in constant space.
|
2020-05-13 18:03:00 +00:00
|
|
|
httpStorer :: (Key -> RequestBody -> Annex ()) -> Storer
|
2014-08-09 18:23:54 +00:00
|
|
|
httpStorer a = fileStorer $ \k f m -> a k =<< liftIO (httpBodyStorer f m)
|
|
|
|
|
|
|
|
-- Reads the file and generates a streaming request body, that will update
|
|
|
|
-- the meter as it's sent.
|
|
|
|
httpBodyStorer :: FilePath -> MeterUpdate -> IO RequestBody
|
|
|
|
httpBodyStorer src m = do
|
2020-11-05 15:26:34 +00:00
|
|
|
size <- getFileSize (toRawFilePath src)
|
2014-11-04 19:22:08 +00:00
|
|
|
let streamer sink = withMeteredFile src m $ \b -> byteStringPopper b sink
|
2014-08-09 18:23:54 +00:00
|
|
|
return $ RequestBodyStream (fromInteger size) streamer
|
2014-11-03 23:50:33 +00:00
|
|
|
|
2014-11-04 19:22:08 +00:00
|
|
|
byteStringPopper :: L.ByteString -> NeedsPopper () -> IO ()
|
|
|
|
byteStringPopper b sink = do
|
2014-11-03 23:50:33 +00:00
|
|
|
mvar <- newMVar $ L.toChunks b
|
2014-11-04 19:22:08 +00:00
|
|
|
let getnextchunk = modifyMVar mvar $ \v ->
|
|
|
|
case v of
|
|
|
|
[] -> return ([], S.empty)
|
|
|
|
(c:cs) -> return (cs, c)
|
|
|
|
sink getnextchunk
|
|
|
|
|
|
|
|
{- Makes a Popper that streams a given number of chunks of a given
|
|
|
|
- size from the handle, updating the meter as the chunks are read. -}
|
|
|
|
handlePopper :: Integer -> Int -> MeterUpdate -> Handle -> NeedsPopper () -> IO ()
|
|
|
|
handlePopper numchunks chunksize meterupdate h sink = do
|
|
|
|
mvar <- newMVar zeroBytesProcessed
|
|
|
|
let getnextchunk = do
|
|
|
|
sent <- takeMVar mvar
|
|
|
|
if sent >= target
|
|
|
|
then do
|
|
|
|
putMVar mvar sent
|
|
|
|
return S.empty
|
|
|
|
else do
|
|
|
|
b <- S.hGet h chunksize
|
|
|
|
let !sent' = addBytesProcessed sent chunksize
|
|
|
|
putMVar mvar sent'
|
|
|
|
meterupdate sent'
|
|
|
|
return b
|
2014-11-03 23:50:33 +00:00
|
|
|
sink getnextchunk
|
2014-08-07 23:32:23 +00:00
|
|
|
where
|
2014-11-04 19:22:08 +00:00
|
|
|
target = toBytesProcessed (numchunks * fromIntegral chunksize)
|
2014-08-07 23:32:23 +00:00
|
|
|
|
2014-08-08 17:40:55 +00:00
|
|
|
-- Reads the http body and stores it to the specified file, updating the
|
|
|
|
-- meter as it goes.
|
|
|
|
httpBodyRetriever :: FilePath -> MeterUpdate -> Response BodyReader -> IO ()
|
2014-08-08 21:17:36 +00:00
|
|
|
httpBodyRetriever dest meterupdate resp
|
2016-11-16 01:29:54 +00:00
|
|
|
| responseStatus resp /= ok200 = giveup $ show $ responseStatus resp
|
2018-04-06 20:59:14 +00:00
|
|
|
| otherwise = bracket (openBinaryFile dest WriteMode) hClose (go zeroBytesProcessed)
|
|
|
|
where
|
|
|
|
reader = responseBody resp
|
|
|
|
go sofar h = do
|
|
|
|
b <- reader
|
|
|
|
if S.null b
|
|
|
|
then return ()
|
|
|
|
else do
|
|
|
|
let sofar' = addBytesProcessed sofar $ S.length b
|
|
|
|
S.hPut h b
|
|
|
|
meterupdate sofar'
|
|
|
|
go sofar' h
|