This commit is contained in:
Joey Hess 2014-11-03 19:50:33 -04:00
parent 5360417436
commit d16382e99f
2 changed files with 23 additions and 27 deletions

View file

@ -11,7 +11,7 @@ import Common.Annex
import Types.StoreRetrieve import Types.StoreRetrieve
import Utility.Metered import Utility.Metered
import Remote.Helper.Special import Remote.Helper.Special
import Network.HTTP.Client (RequestBody(..), Response, responseStatus, responseBody, BodyReader) import Network.HTTP.Client (RequestBody(..), Response, responseStatus, responseBody, BodyReader, NeedsPopper)
import Network.HTTP.Types import Network.HTTP.Types
import qualified Data.ByteString.Lazy as L import qualified Data.ByteString.Lazy as L
@ -31,11 +31,14 @@ httpStorer a = fileStorer $ \k f m -> a k =<< liftIO (httpBodyStorer f m)
httpBodyStorer :: FilePath -> MeterUpdate -> IO RequestBody httpBodyStorer :: FilePath -> MeterUpdate -> IO RequestBody
httpBodyStorer src m = do httpBodyStorer src m = do
size <- fromIntegral . fileSize <$> getFileStatus src :: IO Integer size <- fromIntegral . fileSize <$> getFileStatus src :: IO Integer
let streamer sink = withMeteredFile src m $ \b -> do let streamer sink = withMeteredFile src m $ \b -> mkPopper b sink
mvar <- newMVar $ L.toChunks b
let getnextchunk = modifyMVar mvar $ pure . pop
sink getnextchunk
return $ RequestBodyStream (fromInteger size) streamer return $ RequestBodyStream (fromInteger size) streamer
mkPopper :: L.ByteString -> NeedsPopper () -> IO ()
mkPopper b sink = do
mvar <- newMVar $ L.toChunks b
let getnextchunk = modifyMVar mvar $ pure . pop
sink getnextchunk
where where
pop [] = ([], S.empty) pop [] = ([], S.empty)
pop (c:cs) = (cs, c) pop (c:cs) = (cs, c)

View file

@ -29,7 +29,6 @@ import Data.Conduit
#if MIN_VERSION_aws(0,10,6) #if MIN_VERSION_aws(0,10,6)
import qualified Data.Conduit.List as CL import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Binary as CB import qualified Data.Conduit.Binary as CB
import Network.HTTP.Conduit (withManager)
#endif #endif
import Common.Annex import Common.Annex
@ -162,9 +161,9 @@ store r h = fileStorer $ \k f p -> do
Just partsz | partsz > 0 -> do Just partsz | partsz > 0 -> do
fsz <- fromIntegral . fileSize <$> liftIO (getFileStatus f) fsz <- fromIntegral . fileSize <$> liftIO (getFileStatus f)
if fsz > partsz if fsz > partsz
then multipartupload fsz partsz k f p then multipartupload partsz k f p
else singlepartupload k f p else singlepartupload k f p
Nothing -> singlepartupload k f p _ -> singlepartupload k f p
-- Store public URL to item in Internet Archive. -- Store public URL to item in Internet Archive.
when (isIA (hinfo h) && not (isChunkKey k)) $ when (isIA (hinfo h) && not (isChunkKey k)) $
setUrlPresent k (iaKeyUrl r k) setUrlPresent k (iaKeyUrl r k)
@ -173,7 +172,7 @@ store r h = fileStorer $ \k f p -> do
singlepartupload k f p = do singlepartupload k f p = do
rbody <- liftIO $ httpBodyStorer f p rbody <- liftIO $ httpBodyStorer f p
void $ sendS3Handle h $ putObject h (bucketObject (hinfo h) k) rbody void $ sendS3Handle h $ putObject h (bucketObject (hinfo h) k) rbody
multipartupload fsz partsz k f p = do multipartupload partsz k f p = do
#if MIN_VERSION_aws(0,10,6) #if MIN_VERSION_aws(0,10,6)
let info = hinfo h let info = hinfo h
let object = bucketObject info k let object = bucketObject info k
@ -188,28 +187,22 @@ store r h = fileStorer $ \k f p -> do
-- Send parts of the file, taking care to stream each part -- Send parts of the file, taking care to stream each part
-- w/o buffering in memory, since the parts can be large. -- w/o buffering in memory, since the parts can be large.
etags <- bracketIO (openBinaryFile f ReadMode) hClose $ \h -> do etags <- bracketIO (openBinaryFile f ReadMode) hClose $ \fh -> do
let sendparts etags partnum = do let sendparts etags partnum = ifM (hIsEOF fh)
b <- liftIO $ hGetUntilMetered h (< partsz) p ( return (reverse etags)
if L.null b , do
then return (reverse etags) b <- liftIO $ hGetUntilMetered fh (< partsz) p
else do let body = RequestBodyStream (L.length b) (mkPopper b)
mvar <- newMVar $ L.toChunks b S3.UploadPartResponse _ etag <- sendS3Handle h $
let streamer sink = do S3.uploadPart (bucket info) object partnum uploadid body
let getnextchunk = modifyMVar mvar $ pure . pop sendparts (etag:etags) (partnum + 1)
sink getnextchunk )
let body = RequestBodyStreamChunked streamer sendparts [] 1
S3.UploadPartResponse _ etag <- sendS3Handle h $
S3.uploadPart (bucket info) object partnum uploadid body
sendparts (etag:etags) (partnum + 1)
sendparts [] 0 1
void $ sendS3Handle h $ S3.postCompleteMultipartUpload void $ sendS3Handle h $ S3.postCompleteMultipartUpload
(bucket info) object uploadid (zip [1..] etags) (bucket info) object uploadid (zip [1..] etags)
pop [] = ([], S.empty)
pop (c:cs) = (cs, c)
#else #else
warning $ "Cannot do multipart upload (partsize " ++ show partsz ++ " vs filesize " ++ show fsz ++ "); built with too old a version of the aws library." warning $ "Cannot do multipart upload (partsize " ++ show partsz ++ "); built with too old a version of the aws library."
singlepartupload k f p singlepartupload k f p
#endif #endif