This didn't work. In case I want to revisit, here's what I tried.
diff --git a/Annex/Proxy.hs b/Annex/Proxy.hs
index 48222872c1..e4e526d3dd 100644
--- a/Annex/Proxy.hs
+++ b/Annex/Proxy.hs
@@ -26,16 +26,21 @@ import Logs.UUID
import Logs.Location
import Utility.Tmp.Dir
import Utility.Metered
+import Utility.ThreadScheduler
+import Utility.OpenFd
import Git.Types
import qualified Database.Export as Export
import Control.Concurrent.STM
import Control.Concurrent.Async
+import Control.Concurrent.MVar
import qualified Data.ByteString as B
+import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as L
import qualified System.FilePath.ByteString as P
import qualified Data.Map as M
import qualified Data.Set as S
+import System.IO.Unsafe
proxyRemoteSide :: ProtocolVersion -> Bypass -> Remote -> Annex RemoteSide
proxyRemoteSide clientmaxversion bypass r
@@ -240,21 +245,99 @@ proxySpecialRemote protoversion r ihdl ohdl owaitv oclosedv mexportdb = go
writeVerifyChunk iv h b
storetofile iv h (n - fromIntegral (B.length b)) bs
- proxyget offset af k = withproxytmpfile k $ \tmpfile -> do
+ proxyget offset af k = withproxytmpfile k $ \tmpfile ->
+ let retrieve = tryNonAsync $ Remote.retrieveKeyFile r k af
+ (fromRawFilePath tmpfile) nullMeterUpdate vc
+ in case fromKey keySize k of
+ Just size | size > 0 -> do
+ cancelv <- liftIO newEmptyMVar
+ donev <- liftIO newEmptyMVar
+ streamer <- liftIO $ async $
+ streamdata offset tmpfile size cancelv donev
+ retrieve >>= \case
+ Right _ -> liftIO $ do
+ putMVar donev ()
+ wait streamer
+ Left err -> liftIO $ do
+ putMVar cancelv ()
+ wait streamer
+ propagateerror err
+ _ -> retrieve >>= \case
+ Right _ -> liftIO $ senddata offset tmpfile
+ Left err -> liftIO $ propagateerror err
+ where
-- Don't verify the content from the remote,
-- because the client will do its own verification.
- let vc = Remote.NoVerify
- tryNonAsync (Remote.retrieveKeyFile r k af (fromRawFilePath tmpfile) nullMeterUpdate vc) >>= \case
- Right _ -> liftIO $ senddata offset tmpfile
- Left err -> liftIO $ propagateerror err
+ vc = Remote.NoVerify
+ streamdata (Offset offset) f size cancelv donev = do
+ sendlen offset size
+ waitforfile
+ x <- tryNonAsync $ do
+ fd <- openFdWithMode f ReadOnly Nothing defaultFileFlags
+ h <- fdToHandle fd
+ hSeek h AbsoluteSeek offset
+ senddata' h (getcontents size)
+ case x of
+ Left err -> do
+ throwM err
+ Right res -> return res
+ where
+ -- The file doesn't exist at the start.
+ -- Wait for some data to be written to it as well,
+ -- in case an empty file is first created and then
+ -- overwritten. When there is an offset, wait for
+ -- the file to get that large. Note that this is not used
+ -- when the size is 0.
+ waitforfile = tryNonAsync (fromIntegral <$> getFileSize f) >>= \case
+ Right sz | sz > 0 && sz >= offset -> return ()
+ _ -> ifM (isEmptyMVar cancelv)
+ ( do
+ threadDelaySeconds (Seconds 1)
+ waitforfile
+ , do
+ return ()
+ )
+
+ getcontents n h = unsafeInterleaveIO $ do
+ isdone <- isEmptyMVar donev <||> isEmptyMVar cancelv
+ c <- BS.hGet h defaultChunkSize
+ let n' = n - fromIntegral (BS.length c)
+ let c' = L.fromChunks [BS.take (fromIntegral n) c]
+ if BS.null c
+ then if isdone
+ then return mempty
+ else do
+ -- Wait for more data to be
+ -- written to the file.
+ threadDelaySeconds (Seconds 1)
+ getcontents n h
+ else if n' > 0
+ then do
+ -- unsafeInterleaveIO causes
+ -- this to be deferred until
+ -- data is read from the lazy
+ -- ByteString.
+ cs <- getcontents n' h
+ return $ L.append c' cs
+ else return c'
+
senddata (Offset offset) f = do
size <- fromIntegral <$> getFileSize f
- let n = max 0 (size - offset)
- sendmessage $ DATA (Len n)
+ sendlen offset size
withBinaryFile (fromRawFilePath f) ReadMode $ \h -> do
hSeek h AbsoluteSeek offset
- sendbs =<< L.hGetContents h
+ senddata' h L.hGetContents
+
+ senddata' h getcontents = do
+ sendbs =<< getcontents h
-- Important to keep the handle open until
-- the client responds. The bytestring
-- could still be lazily streaming out to
@@ -272,6 +355,11 @@ proxySpecialRemote protoversion r ihdl ohdl owaitv oclosedv mexportdb = go
Just FAILURE -> return ()
Just _ -> giveup "protocol error"
Nothing -> return ()
+
+ sendlen offset size = do
+ let n = max 0 (size - offset)
+ sendmessage $ DATA (Len n)
+
{- Check if this repository can proxy for a specified remote uuid,
- and if so enable proxying for it. -}
This all works fine. But it doesn't check repository sizes yet, and
without repository size checking, once a repository gets full, there
will be no other repository that will want its files.
Use of sha2 seems unncessary, probably alder2 or md5 or crc would have
been enough. Possibly just summing up the bytes of the key mod the number
of repositories would have sufficed. But sha2 is there, and probably
hardware accellerated. I doubt very much there is any security benefit
to using it though. If someone wants to construct a key that will be
balanced onto a given repository, sha2 is certianly not going to stop
them.
This removes versionedExport, which was only used by the S3 special
remote. Instead, versionedexport=yes is a common way for remotes to
indicate that they are versioned.
proxyRequest was treating UNLOCKCONTENT as a separate request.
That made it possible for there to be two different connections to the
proxied remote, with LOCKCONTENT being sent to one, and UNLOCKCONTENT
to the other one. A protocol error.
git-annex testremote now passes against a http proxied remote.
sendExactly will now be sure to evaluate the whole lazy ByteString.
In this case, the lazy ByteString was exactly the right lenth.
But, it seems that L.take caused it to not actually be fully evaluated.
In servePut, this manifested as gather never being fully evaluated,
which caused the hang.
Very, very subtle, and horrible bug. Clearly the use of lazy ByteString
(or really just laziness) is at fault, and it would be very worth moving
to conduit or whatever to avoid this.
But, it's buggy: the server hangs without processing the VALIDITY,
and I can't seem to work out why. As far as I can see, storefile
is getting as far as running the validitycheck, which is supposed to
read that, but never does.
This is especially strange because what seems like the same protocol
doesn't hang when servePut runs it. This made me think that it needed
to use inAnnexWorker to be more like servePut, but that didn't help.
Another small problem with this is that it does create an empty
.git/annex/tmp/ file for the key. Since this will usually be used in
combination with servePut, that doesn't seem worth worrying about much.
Made the data-length header required even for v0. This simplifies the
implementation, and doesn't preclude extra verification being done for
v0.
The connectionWaitVar is an ugly hack. In servePut, nothing waits
on the waitvar, and I could not find a good way to make anything wait on
it.
Base64 can include '/', and with UUIDs and keys both used in routes,
the encoding needs to avoid that. Use base64url everywhere in the HTTP
protocol for consistency.
The reason to use removeBeforeRemoteEndTime is twofold.
First, removeBefore sends two protocol commands. Currently, the HTTP
protocol runner only supports sending a single command per invocation.
Secondly, the http server gets a monotonic timestamp from the client. So
translating back to a POSIXTime would be annoying.
The timestamp flow with a proxy will be:
- client gets timestamp, which gets the monotonic timestamp from the
proxied remote via the proxy. The timestamp is currently not
proxied when there is a single proxy.
- client calls remove-before
- http server calls removeBeforeRemoteEndTime which sends REMOVE-BEFORE
to the proxied remote.
Websockets would work, but the problem with using them for this is that
each lockcontent call is a separate websocket connection. And that's an
actual TCP connection. One TCP connection per file dropped would be too
expensive. With http long polling, regular http pipelining can be used,
so it will reuse a TCP connection.
Unfortunately, at least with servant, bi-directional streams with long
polling don't result in true bidirectional full duplex communication.
Servant processes the whole client body stream before generating the server
body stream. I think it's entirely possible to do full bi-directional
communication over http, but it would need changes to servant.
And, there's no way for the client to tell if the server successfully
locked the content, since the server will keep processing the client
stream no matter what.:
So, added a new api endpoint, keeplocked. lockcontent will lock the key
for 10 minutes with retention lock, and then a call to keeplocked will
keep it locked for as long as needed. This does mean that there will
need to be a Map of locks by key, and I will probably want to add
some kind of lock identifier that lockcontent returns.
Added v2-v0 endpoints. These are tedious, but will be needed in order to
use the HTTP protocol to proxy to repositories with older git-annex,
where git-annex-shell will be speaking an older version of the protocol.
Changed GET to use 422 when the content is not present. 404 is needed to
detect when a protocol version is not supported.
Managed to avoid netstrings. Actually, using netstrings while streaming
lazy ByteString turns out to be very difficult. So instead, have a
header that specifies the expected amount of data, and then it can just
arrange to send a different amount of data if it needs to indicate
INVALID.
Also improved the interface for GET of a key.
This will be easy to implement with servant. It's also very efficient,
and fairly future-proof. Eg, could add another frame with other data.
This does make it a bit harder to use this protocol, but netstrings
probably take about 5 minutes to implement? Let's see...
import Text.Read
import Data.List
toNetString :: String -> String
toNetString s = show (length s) ++ ":" ++ s ++ ","
nextNetString :: String -> Maybe (String, String)
nextNetString s = case break (== ':') s of
([], _) -> Nothing
(sn, rest) -> do
n <- readMaybe sn
let (v, rest') = splitAt n (drop 1 rest)
return (v, drop 1 rest')
Ok, well, that took about 10 minutes ;-)