WIP try sending using RequestBodyStreamChunked

May not work; if it does this is gonna be the simplest way to get good
memory size and progress reporting.
This commit is contained in:
Joey Hess 2014-11-03 19:18:46 -04:00
parent 628304136b
commit 5360417436

View file

@ -27,7 +27,6 @@ import Control.Monad.Trans.Resource
import Control.Monad.Catch import Control.Monad.Catch
import Data.Conduit import Data.Conduit
#if MIN_VERSION_aws(0,10,6) #if MIN_VERSION_aws(0,10,6)
import qualified Aws.S3.Commands.Multipart as Multipart
import qualified Data.Conduit.List as CL import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Binary as CB import qualified Data.Conduit.Binary as CB
import Network.HTTP.Conduit (withManager) import Network.HTTP.Conduit (withManager)
@ -160,10 +159,10 @@ prepareS3 r info = resourcePrepare $ const $
store :: Remote -> S3Handle -> Storer store :: Remote -> S3Handle -> Storer
store r h = fileStorer $ \k f p -> do store r h = fileStorer $ \k f p -> do
case partSize (hinfo h) of case partSize (hinfo h) of
Just sz -> do Just partsz | partsz > 0 -> do
fsz <- fromIntegral . fileSize <$> liftIO (getFileStatus f) fsz <- fromIntegral . fileSize <$> liftIO (getFileStatus f)
if fsz > sz if fsz > partsz
then multipartupload sz k f p then multipartupload fsz partsz k f p
else singlepartupload k f p else singlepartupload k f p
Nothing -> singlepartupload k f p Nothing -> singlepartupload k f p
-- Store public URL to item in Internet Archive. -- Store public URL to item in Internet Archive.
@ -174,7 +173,7 @@ store r h = fileStorer $ \k f p -> do
singlepartupload k f p = do singlepartupload k f p = do
rbody <- liftIO $ httpBodyStorer f p rbody <- liftIO $ httpBodyStorer f p
void $ sendS3Handle h $ putObject h (bucketObject (hinfo h) k) rbody void $ sendS3Handle h $ putObject h (bucketObject (hinfo h) k) rbody
multipartupload sz k f p = do multipartupload fsz partsz k f p = do
#if MIN_VERSION_aws(0,10,6) #if MIN_VERSION_aws(0,10,6)
let info = hinfo h let info = hinfo h
let object = bucketObject info k let object = bucketObject info k
@ -187,19 +186,30 @@ store r h = fileStorer $ \k f p -> do
} }
uploadid <- S3.imurUploadId <$> sendS3Handle h req uploadid <- S3.imurUploadId <$> sendS3Handle h req
-- TODO: progress display -- Send parts of the file, taking care to stream each part
-- TODO: avoid needing tons of memory -- w/o buffering in memory, since the parts can be large.
-- https://github.com/aristidb/aws/issues/142 etags <- bracketIO (openBinaryFile f ReadMode) hClose $ \h -> do
etags <- liftIO $ withManager $ \mgr -> let sendparts etags partnum = do
CB.sourceFile f b <- liftIO $ hGetUntilMetered h (< partsz) p
$= Multipart.chunkedConduit sz if L.null b
$= Multipart.putConduit (hawscfg h) (hs3cfg h) mgr (bucket info) object uploadid then return (reverse etags)
$$ CL.consume else do
mvar <- newMVar $ L.toChunks b
let streamer sink = do
let getnextchunk = modifyMVar mvar $ pure . pop
sink getnextchunk
let body = RequestBodyStreamChunked streamer
S3.UploadPartResponse _ etag <- sendS3Handle h $
S3.uploadPart (bucket info) object partnum uploadid body
sendparts (etag:etags) (partnum + 1)
sendparts [] 0 1
void $ sendS3Handle h $ S3.postCompleteMultipartUpload void $ sendS3Handle h $ S3.postCompleteMultipartUpload
(bucket info) object uploadid (zip [1..] etags) (bucket info) object uploadid (zip [1..] etags)
pop [] = ([], S.empty)
pop (c:cs) = (cs, c)
#else #else
warning $ "Cannot do multipart upload (partsize " ++ show sz ++ "); built with too old a version of the aws library." warning $ "Cannot do multipart upload (partsize " ++ show partsz ++ " vs filesize " ++ show fsz ++ "); built with too old a version of the aws library."
singlepartupload k f p singlepartupload k f p
#endif #endif