GET from proxied special remote
Working, but lots of room for improvement... Without streaming, so there is a delay before download begins as the file is retreived from the special remote. And when resuming it retrieves the whole file from the special remote *again*. Also, if the special remote throws an exception, currently it shows as "protocol error".
This commit is contained in:
parent
158d7bc933
commit
2e5af38f86
3 changed files with 86 additions and 20 deletions
|
@ -16,10 +16,15 @@ import qualified Types.Remote as Remote
|
||||||
import qualified Remote.Git
|
import qualified Remote.Git
|
||||||
import Remote.Helper.Ssh (openP2PShellConnection', closeP2PShellConnection)
|
import Remote.Helper.Ssh (openP2PShellConnection', closeP2PShellConnection)
|
||||||
import Annex.Concurrent
|
import Annex.Concurrent
|
||||||
|
import Annex.Verify
|
||||||
|
import Annex.Tmp
|
||||||
|
import Utility.Tmp.Dir
|
||||||
|
import Utility.Metered
|
||||||
|
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
import Control.Concurrent.Async
|
import Control.Concurrent.Async
|
||||||
import qualified Data.ByteString.Lazy as L
|
import qualified Data.ByteString.Lazy as L
|
||||||
|
import qualified System.FilePath.ByteString as P
|
||||||
|
|
||||||
proxyRemoteSide :: ProtocolVersion -> Bypass -> Remote -> Annex RemoteSide
|
proxyRemoteSide :: ProtocolVersion -> Bypass -> Remote -> Annex RemoteSide
|
||||||
proxyRemoteSide clientmaxversion bypass r
|
proxyRemoteSide clientmaxversion bypass r
|
||||||
|
@ -75,30 +80,34 @@ proxySpecialRemote
|
||||||
-> Annex ()
|
-> Annex ()
|
||||||
proxySpecialRemote protoversion r ihdl ohdl endv = go
|
proxySpecialRemote protoversion r ihdl ohdl endv = go
|
||||||
where
|
where
|
||||||
go = receivemessage >>= \case
|
go :: Annex ()
|
||||||
|
go = liftIO receivemessage >>= \case
|
||||||
Just (CHECKPRESENT k) -> do
|
Just (CHECKPRESENT k) -> do
|
||||||
tryNonAsync (Remote.checkPresent r k) >>= \case
|
tryNonAsync (Remote.checkPresent r k) >>= \case
|
||||||
Right True -> sendmessage SUCCESS
|
Right True -> liftIO $ sendmessage SUCCESS
|
||||||
Right False -> sendmessage FAILURE
|
Right False -> liftIO $ sendmessage FAILURE
|
||||||
Left err -> propagateerror err
|
Left err -> liftIO $ propagateerror err
|
||||||
go
|
go
|
||||||
Just (LOCKCONTENT _) -> do
|
Just (LOCKCONTENT _) -> do
|
||||||
-- Special remotes do not support locking content.
|
-- Special remotes do not support locking content.
|
||||||
sendmessage FAILURE
|
liftIO $ sendmessage FAILURE
|
||||||
go
|
go
|
||||||
Just (REMOVE k) -> do
|
Just (REMOVE k) -> do
|
||||||
tryNonAsync (Remote.removeKey r k) >>= \case
|
tryNonAsync (Remote.removeKey r k) >>= \case
|
||||||
Right () -> sendmessage SUCCESS
|
Right () -> liftIO $ sendmessage SUCCESS
|
||||||
Left err -> propagateerror err
|
Left err -> liftIO $ propagateerror err
|
||||||
go
|
go
|
||||||
Just (PUT af k) -> giveup "TODO PUT" -- XXX
|
Just (PUT af k) -> giveup "TODO PUT" -- XXX
|
||||||
Just (GET offset af k) -> giveup "TODO GET" -- XXX
|
Just (GET offset (ProtoAssociatedFile af) k) -> do
|
||||||
|
proxyget offset af k
|
||||||
|
go
|
||||||
Just (BYPASS _) -> go
|
Just (BYPASS _) -> go
|
||||||
Just (CONNECT _) ->
|
Just (CONNECT _) ->
|
||||||
-- Not supported and the protocol ends here.
|
-- Not supported and the protocol ends here.
|
||||||
sendmessage $ CONNECTDONE (ExitFailure 1)
|
liftIO $ sendmessage $ CONNECTDONE (ExitFailure 1)
|
||||||
Just NOTIFYCHANGE -> do
|
Just NOTIFYCHANGE -> do
|
||||||
sendmessage (ERROR "NOTIFYCHANGE unsupported for a special remote")
|
liftIO $ sendmessage $
|
||||||
|
ERROR "NOTIFYCHANGE unsupported for a special remote"
|
||||||
go
|
go
|
||||||
Just _ -> giveup "protocol error"
|
Just _ -> giveup "protocol error"
|
||||||
Nothing -> return ()
|
Nothing -> return ()
|
||||||
|
@ -107,7 +116,7 @@ proxySpecialRemote protoversion r ihdl ohdl endv = go
|
||||||
liftIO $ atomically $
|
liftIO $ atomically $
|
||||||
(Right <$> takeTMVar ohdl)
|
(Right <$> takeTMVar ohdl)
|
||||||
`orElse`
|
`orElse`
|
||||||
(Left <$> takeTMVar endv)
|
(Left <$> readTMVar endv)
|
||||||
|
|
||||||
receivemessage = getnextmessageorend >>= \case
|
receivemessage = getnextmessageorend >>= \case
|
||||||
Right (Right m) -> return (Just m)
|
Right (Right m) -> return (Just m)
|
||||||
|
@ -117,8 +126,57 @@ proxySpecialRemote protoversion r ihdl ohdl endv = go
|
||||||
-- Left b -> return b
|
-- Left b -> return b
|
||||||
-- Right _m -> giveup "did not receive ByteString from P2P MVar"
|
-- Right _m -> giveup "did not receive ByteString from P2P MVar"
|
||||||
|
|
||||||
sendmessage m = liftIO $ atomically $ putTMVar ihdl (Right m)
|
sendmessage m = atomically $ putTMVar ihdl (Right m)
|
||||||
sendbytestring b = liftIO $ atomically $ putTMVar ihdl (Left b)
|
|
||||||
|
sendbytestring b = atomically $ putTMVar ihdl (Left b)
|
||||||
|
|
||||||
propagateerror err = sendmessage $ ERROR $
|
propagateerror err = sendmessage $ ERROR $
|
||||||
"proxied special remote reports: " ++ show err
|
"proxied special remote reports: " ++ show err
|
||||||
|
|
||||||
|
-- Not using gitAnnexTmpObjectLocation because there might be
|
||||||
|
-- several concurrent GET and PUTs of the same key being proxied
|
||||||
|
-- from this special remote or others, and each needs to happen
|
||||||
|
-- independently. Also, this key is not getting added into the
|
||||||
|
-- local annex objects.
|
||||||
|
withproxytmpfile k a = withOtherTmp $ \othertmpdir ->
|
||||||
|
withTmpDirIn (fromRawFilePath othertmpdir) "proxy" $ \tmpdir ->
|
||||||
|
a (toRawFilePath tmpdir P.</> keyFile k)
|
||||||
|
|
||||||
|
proxyget offset af k = withproxytmpfile k $ \tmpfile -> do
|
||||||
|
-- Don't verify the content from the remote,
|
||||||
|
-- because the client will do its own verification.
|
||||||
|
let vc = Remote.NoVerify
|
||||||
|
tryNonAsync (Remote.retrieveKeyFile r k af (fromRawFilePath tmpfile) nullMeterUpdate vc) >>= \case
|
||||||
|
Right v ->
|
||||||
|
ifM (verifyKeyContentPostRetrieval Remote.RetrievalAllKeysSecure vc v k tmpfile)
|
||||||
|
( liftIO $ senddata offset tmpfile
|
||||||
|
, liftIO $ sendmessage $
|
||||||
|
ERROR "verification of content failed"
|
||||||
|
)
|
||||||
|
Left err -> liftIO $ propagateerror err
|
||||||
|
|
||||||
|
senddata (Offset offset) f = do
|
||||||
|
size <- fromIntegral <$> getFileSize f
|
||||||
|
let n = max 0 (size - offset)
|
||||||
|
sendmessage $ DATA (Len n)
|
||||||
|
withBinaryFile (fromRawFilePath f) ReadMode $ \h -> do
|
||||||
|
hSeek h AbsoluteSeek offset
|
||||||
|
sendbs =<< L.hGetContents h
|
||||||
|
-- Important to keep the handle open until
|
||||||
|
-- the client responds. The bytestring
|
||||||
|
-- could still be lazily streaming out to
|
||||||
|
-- the client.
|
||||||
|
waitclientresponse
|
||||||
|
where
|
||||||
|
sendbs bs = do
|
||||||
|
sendbytestring bs
|
||||||
|
when (protoversion > ProtocolVersion 0) $
|
||||||
|
sendmessage (VALIDITY Valid)
|
||||||
|
|
||||||
|
waitclientresponse =
|
||||||
|
receivemessage >>= \case
|
||||||
|
Just SUCCESS -> return ()
|
||||||
|
Just FAILURE -> return ()
|
||||||
|
Just _ -> giveup "protocol error"
|
||||||
|
Nothing -> return ()
|
||||||
|
|
||||||
|
|
|
@ -241,10 +241,10 @@ runNet runst conn runner f = case f of
|
||||||
Right b -> runner (next b)
|
Right b -> runner (next b)
|
||||||
Left e -> return $ Left $
|
Left e -> return $ Left $
|
||||||
ProtoFailureException e
|
ProtoFailureException e
|
||||||
P2PHandleTMVar mv ->
|
P2PHandleTMVar mv ->
|
||||||
liftIO (atomically (takeTMVar mv)) >>= \case
|
liftIO (atomically (takeTMVar mv)) >>= \case
|
||||||
Left b -> runner (next b)
|
Left b -> runner (next b)
|
||||||
Right _m -> return $ Left $
|
Right _ -> return $ Left $
|
||||||
ProtoFailureMessage "protocol error"
|
ProtoFailureMessage "protocol error"
|
||||||
CheckAuthToken _u t next -> do
|
CheckAuthToken _u t next -> do
|
||||||
let authed = connCheckAuth conn t
|
let authed = connCheckAuth conn t
|
||||||
|
|
|
@ -24,13 +24,18 @@ Planned schedule of work:
|
||||||
|
|
||||||
For June's work on [[design/passthrough_proxy]], remaining todos:
|
For June's work on [[design/passthrough_proxy]], remaining todos:
|
||||||
|
|
||||||
* Since proxying to special remotes is not supported yet, and won't be for
|
* Resuming an interrupted download from proxied special remote makes the proxy
|
||||||
the first release, make it fail in a reasonable way.
|
re-download the whole content. It could instead keep some of the
|
||||||
|
object files around when the client does not send SUCCESS. This would
|
||||||
|
use more disk, but without streaming, proxying a special remote already
|
||||||
|
needs some disk. And it could minimize to eg, the last 2 or so.
|
||||||
|
|
||||||
- or -
|
* If GET from a proxied special remote sends an ERROR with a message
|
||||||
|
from the special remote, currently the user sees "protocol error".
|
||||||
|
|
||||||
* Proxying for special remotes.
|
* Implement PUT to proxied special remotes.
|
||||||
Including encryption and chunking. See design for issues.
|
|
||||||
|
* Streaming download from proxied special remotes. See design.
|
||||||
|
|
||||||
# items deferred until later for [[design/passthrough_proxy]]
|
# items deferred until later for [[design/passthrough_proxy]]
|
||||||
|
|
||||||
|
@ -124,3 +129,6 @@ For June's work on [[design/passthrough_proxy]], remaining todos:
|
||||||
|
|
||||||
* Proxied cluster nodes should have slightly higher cost than the cluster
|
* Proxied cluster nodes should have slightly higher cost than the cluster
|
||||||
gateway. (done)
|
gateway. (done)
|
||||||
|
|
||||||
|
* Basic support for proxying special remotes. (But not exporttree=yes ones
|
||||||
|
yet.) (done)
|
||||||
|
|
Loading…
Reference in a new issue