This didn't work. In case I want to revisit, here's what I tried.
diff --git a/Annex/Proxy.hs b/Annex/Proxy.hs
index 48222872c1..e4e526d3dd 100644
--- a/Annex/Proxy.hs
+++ b/Annex/Proxy.hs
@@ -26,16 +26,21 @@ import Logs.UUID
import Logs.Location
import Utility.Tmp.Dir
import Utility.Metered
+import Utility.ThreadScheduler
+import Utility.OpenFd
import Git.Types
import qualified Database.Export as Export
import Control.Concurrent.STM
import Control.Concurrent.Async
+import Control.Concurrent.MVar
import qualified Data.ByteString as B
+import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as L
import qualified System.FilePath.ByteString as P
import qualified Data.Map as M
import qualified Data.Set as S
+import System.IO.Unsafe
proxyRemoteSide :: ProtocolVersion -> Bypass -> Remote -> Annex RemoteSide
proxyRemoteSide clientmaxversion bypass r
@@ -240,21 +245,99 @@ proxySpecialRemote protoversion r ihdl ohdl owaitv oclosedv mexportdb = go
writeVerifyChunk iv h b
storetofile iv h (n - fromIntegral (B.length b)) bs
- proxyget offset af k = withproxytmpfile k $ \tmpfile -> do
+ proxyget offset af k = withproxytmpfile k $ \tmpfile ->
+ let retrieve = tryNonAsync $ Remote.retrieveKeyFile r k af
+ (fromRawFilePath tmpfile) nullMeterUpdate vc
+ in case fromKey keySize k of
+ Just size | size > 0 -> do
+ cancelv <- liftIO newEmptyMVar
+ donev <- liftIO newEmptyMVar
+ streamer <- liftIO $ async $
+ streamdata offset tmpfile size cancelv donev
+ retrieve >>= \case
+ Right _ -> liftIO $ do
+ putMVar donev ()
+ wait streamer
+ Left err -> liftIO $ do
+ putMVar cancelv ()
+ wait streamer
+ propagateerror err
+ _ -> retrieve >>= \case
+ Right _ -> liftIO $ senddata offset tmpfile
+ Left err -> liftIO $ propagateerror err
+ where
-- Don't verify the content from the remote,
-- because the client will do its own verification.
- let vc = Remote.NoVerify
- tryNonAsync (Remote.retrieveKeyFile r k af (fromRawFilePath tmpfile) nullMeterUpdate vc) >>= \case
- Right _ -> liftIO $ senddata offset tmpfile
- Left err -> liftIO $ propagateerror err
+ vc = Remote.NoVerify
+ streamdata (Offset offset) f size cancelv donev = do
+ sendlen offset size
+ waitforfile
+ x <- tryNonAsync $ do
+ fd <- openFdWithMode f ReadOnly Nothing defaultFileFlags
+ h <- fdToHandle fd
+ hSeek h AbsoluteSeek offset
+ senddata' h (getcontents size)
+ case x of
+ Left err -> do
+ throwM err
+ Right res -> return res
+ where
+ -- The file doesn't exist at the start.
+ -- Wait for some data to be written to it as well,
+ -- in case an empty file is first created and then
+ -- overwritten. When there is an offset, wait for
+ -- the file to get that large. Note that this is not used
+ -- when the size is 0.
+ waitforfile = tryNonAsync (fromIntegral <$> getFileSize f) >>= \case
+ Right sz | sz > 0 && sz >= offset -> return ()
+ _ -> ifM (isEmptyMVar cancelv)
+ ( do
+ threadDelaySeconds (Seconds 1)
+ waitforfile
+ , do
+ return ()
+ )
+
+ getcontents n h = unsafeInterleaveIO $ do
+ isdone <- isEmptyMVar donev <||> isEmptyMVar cancelv
+ c <- BS.hGet h defaultChunkSize
+ let n' = n - fromIntegral (BS.length c)
+ let c' = L.fromChunks [BS.take (fromIntegral n) c]
+ if BS.null c
+ then if isdone
+ then return mempty
+ else do
+ -- Wait for more data to be
+ -- written to the file.
+ threadDelaySeconds (Seconds 1)
+ getcontents n h
+ else if n' > 0
+ then do
+ -- unsafeInterleaveIO causes
+ -- this to be deferred until
+ -- data is read from the lazy
+ -- ByteString.
+ cs <- getcontents n' h
+ return $ L.append c' cs
+ else return c'
+
senddata (Offset offset) f = do
size <- fromIntegral <$> getFileSize f
- let n = max 0 (size - offset)
- sendmessage $ DATA (Len n)
+ sendlen offset size
withBinaryFile (fromRawFilePath f) ReadMode $ \h -> do
hSeek h AbsoluteSeek offset
- sendbs =<< L.hGetContents h
+ senddata' h L.hGetContents
+
+ senddata' h getcontents = do
+ sendbs =<< getcontents h
-- Important to keep the handle open until
-- the client responds. The bytestring
-- could still be lazily streaming out to
@@ -272,6 +355,11 @@ proxySpecialRemote protoversion r ihdl ohdl owaitv oclosedv mexportdb = go
Just FAILURE -> return ()
Just _ -> giveup "protocol error"
Nothing -> return ()
+
+ sendlen offset size = do
+ let n = max 0 (size - offset)
+ sendmessage $ DATA (Len n)
+
{- Check if this repository can proxy for a specified remote uuid,
- and if so enable proxying for it. -}
Rejected the idea of automatically instantiating remotes for proxies-of-proxies.
That needs cycle protection, while the current behavior, which happened
for free, is that running git-annex updateproxy on the proxy can be used
to configure it, but only for topologies that actually exist.
Client side support for SUCCESS-PLUS and ALREADY-HAVE-PLUS
is complete, when a PUT stores to additional repositories
than the expected on, the location log is updated with the
additional UUIDs that contain the content.
Started implementing PUT fanout to multiple remotes for clusters.
It is untested, and I fear fencepost errors in the relative
offset calculations. And it is missing proxying for the protocol
after DATA.
Added to git-annex_proxies todo because this is something OpenNeuro
would need in order to use the git-annex proxy.
Sponsored-by: Dartmouth College's OpenNeuro project