fix memory leak
Unfortunately, I don't fully understand why it was leaking using the old method of a lazy bytestring. I just know that it was leaking, despite neither hGetUntilMetered nor byteStringPopper seeming to leak by themselves. The new method avoids the lazy bytestring, and simply reads chunks from the handle and streams them out to the http socket.
This commit is contained in:
parent
29871e320c
commit
fccdd61eec
2 changed files with 34 additions and 10 deletions
|
@ -5,6 +5,8 @@
|
||||||
- Licensed under the GNU GPL version 3 or higher.
|
- Licensed under the GNU GPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
|
||||||
|
{-# LANGUAGE BangPatterns #-}
|
||||||
|
|
||||||
module Remote.Helper.Http where
|
module Remote.Helper.Http where
|
||||||
|
|
||||||
import Common.Annex
|
import Common.Annex
|
||||||
|
@ -31,17 +33,38 @@ httpStorer a = fileStorer $ \k f m -> a k =<< liftIO (httpBodyStorer f m)
|
||||||
httpBodyStorer :: FilePath -> MeterUpdate -> IO RequestBody
|
httpBodyStorer :: FilePath -> MeterUpdate -> IO RequestBody
|
||||||
httpBodyStorer src m = do
|
httpBodyStorer src m = do
|
||||||
size <- fromIntegral . fileSize <$> getFileStatus src :: IO Integer
|
size <- fromIntegral . fileSize <$> getFileStatus src :: IO Integer
|
||||||
let streamer sink = withMeteredFile src m $ \b -> mkPopper b sink
|
let streamer sink = withMeteredFile src m $ \b -> byteStringPopper b sink
|
||||||
return $ RequestBodyStream (fromInteger size) streamer
|
return $ RequestBodyStream (fromInteger size) streamer
|
||||||
|
|
||||||
mkPopper :: L.ByteString -> NeedsPopper () -> IO ()
|
byteStringPopper :: L.ByteString -> NeedsPopper () -> IO ()
|
||||||
mkPopper b sink = do
|
byteStringPopper b sink = do
|
||||||
mvar <- newMVar $ L.toChunks b
|
mvar <- newMVar $ L.toChunks b
|
||||||
let getnextchunk = modifyMVar mvar $ pure . pop
|
let getnextchunk = modifyMVar mvar $ \v ->
|
||||||
|
case v of
|
||||||
|
[] -> return ([], S.empty)
|
||||||
|
(c:cs) -> return (cs, c)
|
||||||
|
sink getnextchunk
|
||||||
|
|
||||||
|
{- Makes a Popper that streams a given number of chunks of a given
|
||||||
|
- size from the handle, updating the meter as the chunks are read. -}
|
||||||
|
handlePopper :: Integer -> Int -> MeterUpdate -> Handle -> NeedsPopper () -> IO ()
|
||||||
|
handlePopper numchunks chunksize meterupdate h sink = do
|
||||||
|
mvar <- newMVar zeroBytesProcessed
|
||||||
|
let getnextchunk = do
|
||||||
|
sent <- takeMVar mvar
|
||||||
|
if sent >= target
|
||||||
|
then do
|
||||||
|
putMVar mvar sent
|
||||||
|
return S.empty
|
||||||
|
else do
|
||||||
|
b <- S.hGet h chunksize
|
||||||
|
let !sent' = addBytesProcessed sent chunksize
|
||||||
|
putMVar mvar sent'
|
||||||
|
meterupdate sent'
|
||||||
|
return b
|
||||||
sink getnextchunk
|
sink getnextchunk
|
||||||
where
|
where
|
||||||
pop [] = ([], S.empty)
|
target = toBytesProcessed (numchunks * fromIntegral chunksize)
|
||||||
pop (c:cs) = (cs, c)
|
|
||||||
|
|
||||||
-- Reads the http body and stores it to the specified file, updating the
|
-- Reads the http body and stores it to the specified file, updating the
|
||||||
-- meter as it goes.
|
-- meter as it goes.
|
||||||
|
|
|
@ -198,10 +198,11 @@ store r h = fileStorer $ \k f p -> do
|
||||||
let sz = if fsz - pos < partsz'
|
let sz = if fsz - pos < partsz'
|
||||||
then fsz - pos
|
then fsz - pos
|
||||||
else partsz'
|
else partsz'
|
||||||
b <- liftIO $ hGetUntilMetered fh (< partsz') meter
|
let numchunks = ceiling (fromIntegral sz / defaultChunkSize)
|
||||||
let body = RequestBodyStream (fromIntegral sz) (mkPopper b)
|
let popper = handlePopper numchunks defaultChunkSize p fh
|
||||||
S3.UploadPartResponse _ etag <- sendS3Handle h $
|
let req = S3.uploadPart (bucket info) object partnum uploadid $
|
||||||
S3.uploadPart (bucket info) object partnum uploadid body
|
RequestBodyStream (fromIntegral sz) popper
|
||||||
|
S3.UploadPartResponse _ etag <- sendS3Handle h req
|
||||||
sendparts (offsetMeterUpdate meter (toBytesProcessed sz)) (etag:etags) (partnum + 1)
|
sendparts (offsetMeterUpdate meter (toBytesProcessed sz)) (etag:etags) (partnum + 1)
|
||||||
sendparts p [] 1
|
sendparts p [] 1
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue