Sped up proxied downloads from special remotes, by streaming
Currently works for special remotes that don't use fileRetriever. Ones that do will download to another filename and rename it into place, defeating the streaming. This actually benchmarks slightly slower when getting a large file from a fast proxied special remote. However, when the proxied special remote is slow, it will be a big win.
This commit is contained in:
parent
76a1989a0e
commit
edaed18e4c
3 changed files with 110 additions and 32 deletions
117
Annex/Proxy.hs
117
Annex/Proxy.hs
|
@ -5,6 +5,8 @@
|
|||
- Licensed under the GNU AGPL version 3 or higher.
|
||||
-}
|
||||
|
||||
{-# LANGUAGE CPP #-}
|
||||
|
||||
module Annex.Proxy where
|
||||
|
||||
import Annex.Common
|
||||
|
@ -26,16 +28,23 @@ import Logs.UUID
|
|||
import Logs.Location
|
||||
import Utility.Tmp.Dir
|
||||
import Utility.Metered
|
||||
import Utility.ThreadScheduler
|
||||
import Git.Types
|
||||
import qualified Database.Export as Export
|
||||
#ifndef mingw32_HOST_OS
|
||||
import Utility.OpenFile
|
||||
#endif
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Control.Concurrent.Async
|
||||
import Control.Concurrent.MVar
|
||||
import qualified Data.ByteString as B
|
||||
import qualified Data.ByteString as BS
|
||||
import qualified Data.ByteString.Lazy as L
|
||||
import qualified System.FilePath.ByteString as P
|
||||
import qualified Data.Map as M
|
||||
import qualified Data.Set as S
|
||||
import System.IO.Unsafe
|
||||
|
||||
proxyRemoteSide :: ProtocolVersion -> Bypass -> Remote -> Annex RemoteSide
|
||||
proxyRemoteSide clientmaxversion bypass r
|
||||
|
@ -240,26 +249,101 @@ proxySpecialRemote protoversion r ihdl ohdl owaitv oclosedv mexportdb = go
|
|||
writeVerifyChunk iv h b
|
||||
storetofile iv h (n - fromIntegral (B.length b)) bs
|
||||
|
||||
proxyget offset af k = withproxytmpfile k $ \tmpfile -> do
|
||||
proxyget offset af k = withproxytmpfile k $ \tmpfile ->
|
||||
let retrieve = tryNonAsync $ Remote.retrieveKeyFile r k af
|
||||
(fromRawFilePath tmpfile) nullMeterUpdate vc
|
||||
in case fromKey keySize k of
|
||||
#ifndef mingw32_HOST_OS
|
||||
Just size | size > 0 -> do
|
||||
cancelv <- liftIO newEmptyMVar
|
||||
donev <- liftIO newEmptyMVar
|
||||
streamer <- liftIO $ async $
|
||||
streamdata offset tmpfile size cancelv donev
|
||||
retrieve >>= \case
|
||||
Right _ -> liftIO $ do
|
||||
putMVar donev ()
|
||||
wait streamer
|
||||
Left err -> liftIO $ do
|
||||
putMVar cancelv ()
|
||||
wait streamer
|
||||
propagateerror err
|
||||
#endif
|
||||
_ -> retrieve >>= \case
|
||||
Right _ -> liftIO $ senddata offset tmpfile
|
||||
Left err -> liftIO $ propagateerror err
|
||||
where
|
||||
-- Don't verify the content from the remote,
|
||||
-- because the client will do its own verification.
|
||||
let vc = Remote.NoVerify
|
||||
tryNonAsync (Remote.retrieveKeyFile r k af (fromRawFilePath tmpfile) nullMeterUpdate vc) >>= \case
|
||||
Right _ -> liftIO $ senddata offset tmpfile
|
||||
Left err -> liftIO $ propagateerror err
|
||||
|
||||
vc = Remote.NoVerify
|
||||
|
||||
#ifndef mingw32_HOST_OS
|
||||
streamdata (Offset offset) f size cancelv donev = do
|
||||
sendlen offset size
|
||||
waitforfile
|
||||
x <- tryNonAsync $ do
|
||||
h <- openFileBeingWritten f
|
||||
hSeek h AbsoluteSeek offset
|
||||
senddata' h (getcontents size)
|
||||
case x of
|
||||
Left err -> do
|
||||
hPutStrLn stderr (show err)
|
||||
throwM err
|
||||
Right res -> return res
|
||||
where
|
||||
-- The file doesn't exist at the start.
|
||||
-- Wait for some data to be written to it as well,
|
||||
-- in case an empty file is first created and then
|
||||
-- overwritten. When there is an offset, wait for
|
||||
-- the file to get that large. Note that this is not used
|
||||
-- when the size is 0.
|
||||
waitforfile = tryNonAsync (fromIntegral <$> getFileSize f) >>= \case
|
||||
Right sz | sz > 0 && sz >= offset -> return ()
|
||||
_ -> ifM (isEmptyMVar cancelv <&&> isEmptyMVar donev)
|
||||
( do
|
||||
threadDelaySeconds (Seconds 1)
|
||||
waitforfile
|
||||
, do
|
||||
return ()
|
||||
)
|
||||
|
||||
getcontents n h = unsafeInterleaveIO $ do
|
||||
isdone <- isEmptyMVar donev <||> isEmptyMVar cancelv
|
||||
c <- BS.hGet h defaultChunkSize
|
||||
let n' = n - fromIntegral (BS.length c)
|
||||
let c' = L.fromChunks [BS.take (fromIntegral n) c]
|
||||
if BS.null c
|
||||
then if isdone
|
||||
then return mempty
|
||||
else do
|
||||
-- Wait for more data to be
|
||||
-- written to the file.
|
||||
threadDelaySeconds (Seconds 1)
|
||||
getcontents n h
|
||||
else if n' > 0
|
||||
then do
|
||||
-- unsafeInterleaveIO causes
|
||||
-- this to be deferred until
|
||||
-- data is read from the lazy
|
||||
-- ByteString.
|
||||
cs <- getcontents n' h
|
||||
return $ L.append c' cs
|
||||
else return c'
|
||||
#endif
|
||||
|
||||
senddata (Offset offset) f = do
|
||||
size <- fromIntegral <$> getFileSize f
|
||||
let n = max 0 (size - offset)
|
||||
sendmessage $ DATA (Len n)
|
||||
sendlen offset size
|
||||
withBinaryFile (fromRawFilePath f) ReadMode $ \h -> do
|
||||
hSeek h AbsoluteSeek offset
|
||||
sendbs =<< L.hGetContents h
|
||||
-- Important to keep the handle open until
|
||||
-- the client responds. The bytestring
|
||||
-- could still be lazily streaming out to
|
||||
-- the client.
|
||||
waitclientresponse
|
||||
senddata' h L.hGetContents
|
||||
|
||||
senddata' h getcontents = do
|
||||
sendbs =<< getcontents h
|
||||
-- Important to keep the handle open until
|
||||
-- the client responds. The bytestring
|
||||
-- could still be lazily streaming out to
|
||||
-- the client.
|
||||
waitclientresponse
|
||||
where
|
||||
sendbs bs = do
|
||||
sendbytestring bs
|
||||
|
@ -272,6 +356,11 @@ proxySpecialRemote protoversion r ihdl ohdl owaitv oclosedv mexportdb = go
|
|||
Just FAILURE -> return ()
|
||||
Just _ -> giveup "protocol error"
|
||||
Nothing -> return ()
|
||||
|
||||
sendlen offset size = do
|
||||
let n = max 0 (size - offset)
|
||||
sendmessage $ DATA (Len n)
|
||||
|
||||
|
||||
{- Check if this repository can proxy for a specified remote uuid,
|
||||
- and if so enable proxying for it. -}
|
||||
|
|
|
@ -1,3 +1,9 @@
|
|||
git-annex (10.20240928) UNRELEASED; urgency=medium
|
||||
|
||||
* Sped up proxied downloads from special remotes, by streaming.
|
||||
|
||||
-- Joey Hess <id@joeyh.name> Tue, 15 Oct 2024 12:12:18 -0400
|
||||
|
||||
git-annex (10.20240927) upstream; urgency=medium
|
||||
|
||||
* Detect when a preferred content expression contains "not present",
|
||||
|
|
|
@ -30,28 +30,11 @@ Planned schedule of work:
|
|||
|
||||
* Currently working on streaming download via proxy from special remote.
|
||||
|
||||
* Tried implementing a background thread in the proxy that runs while
|
||||
retrieving a file, to stream it out as it comes in. That failed because
|
||||
reading from a file that the same process is writing to is prevented by
|
||||
locking in haskell. (Could be gotten around by using FD rather than Handle,
|
||||
but would need to read from the FD and use packCString to make a ByteString.)
|
||||
|
||||
But also, remotes using fileRetriever retrieve to the temp object file,
|
||||
* Remotes using fileRetriever retrieve to the temp object file,
|
||||
before it is renamed to the requested file. In the case of a proxy,
|
||||
that is a different file, and so it won't see the file until it's all
|
||||
been transferred and renamed.
|
||||
|
||||
* Could the P2P protocol be used as an alternate interface for a special
|
||||
remote? Would avoid needing temp files when proxying for special remotes,
|
||||
and would support resume from offset as well for special remotes for
|
||||
which that makes sense.
|
||||
|
||||
But this would need encryption and chunking to be implemented on top of
|
||||
the P2P protocol, and all special remotes rewritten, and a bridge for the
|
||||
current external special remote interface or rewrite all external special
|
||||
remotes. Probably not worth it to unify the two things like this, if the
|
||||
only benefit is streaming through a proxy.
|
||||
|
||||
## completed items for September's work on proving behavior of preferred content
|
||||
|
||||
* Static analysis to detect "not present", "not balanced", and similar
|
||||
|
|
Loading…
Add table
Reference in a new issue