PUT to proxied special remote working

Still needs some work.

The reason that the waitv is necessary is because without it,
runNet loops back around and reads the next protocol message. But it's
not finished reading the whole bytestring yet, and so it reads some part
of it.
This commit is contained in:
Joey Hess 2024-06-28 17:07:01 -04:00
parent 2e5af38f86
commit 711a5166e2
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
4 changed files with 92 additions and 18 deletions

View file

@ -15,8 +15,8 @@ import qualified Remote
import qualified Types.Remote as Remote
import qualified Remote.Git
import Remote.Helper.Ssh (openP2PShellConnection', closeP2PShellConnection)
import Annex.Content
import Annex.Concurrent
import Annex.Verify
import Annex.Tmp
import Utility.Tmp.Dir
import Utility.Metered
@ -51,14 +51,16 @@ proxySpecialRemoteSide clientmaxversion r = mkRemoteSide r $ do
liftIO (newTVarIO protoversion)
ihdl <- liftIO newEmptyTMVarIO
ohdl <- liftIO newEmptyTMVarIO
iwaitv <- liftIO newEmptyTMVarIO
owaitv <- liftIO newEmptyTMVarIO
endv <- liftIO newEmptyTMVarIO
worker <- liftIO . async =<< forkState
(proxySpecialRemote protoversion r ihdl ohdl endv)
(proxySpecialRemote protoversion r ihdl ohdl owaitv endv)
let remoteconn = P2PConnection
{ connRepo = Nothing
, connCheckAuth = const False
, connIhdl = P2PHandleTMVar ihdl
, connOhdl = P2PHandleTMVar ohdl
, connIhdl = P2PHandleTMVar ihdl iwaitv
, connOhdl = P2PHandleTMVar ohdl owaitv
, connIdent = ConnIdent (Just (Remote.name r))
}
let closeremoteconn = do
@ -77,8 +79,9 @@ proxySpecialRemote
-> TMVar (Either L.ByteString Message)
-> TMVar (Either L.ByteString Message)
-> TMVar ()
-> TMVar ()
-> Annex ()
proxySpecialRemote protoversion r ihdl ohdl endv = go
proxySpecialRemote protoversion r ihdl ohdl owaitv endv = go
where
go :: Annex ()
go = liftIO receivemessage >>= \case
@ -97,7 +100,9 @@ proxySpecialRemote protoversion r ihdl ohdl endv = go
Right () -> liftIO $ sendmessage SUCCESS
Left err -> liftIO $ propagateerror err
go
Just (PUT af k) -> giveup "TODO PUT" -- XXX
Just (PUT (ProtoAssociatedFile af) k) -> do
proxyput af k
go
Just (GET offset (ProtoAssociatedFile af) k) -> do
proxyget offset af k
go
@ -122,9 +127,10 @@ proxySpecialRemote protoversion r ihdl ohdl endv = go
Right (Right m) -> return (Just m)
Right (Left _b) -> giveup "unexpected ByteString received from P2P MVar"
Left () -> return Nothing
--receivebytestring = liftIO (atomically $ takeTMVar ohdl) >>= \case
-- Left b -> return b
-- Right _m -> giveup "did not receive ByteString from P2P MVar"
receivebytestring = atomically (takeTMVar ohdl) >>= \case
Left b -> return b
Right _m -> giveup "did not receive ByteString from P2P MVar"
sendmessage m = atomically $ putTMVar ihdl (Right m)
@ -142,13 +148,66 @@ proxySpecialRemote protoversion r ihdl ohdl endv = go
withTmpDirIn (fromRawFilePath othertmpdir) "proxy" $ \tmpdir ->
a (toRawFilePath tmpdir P.</> keyFile k)
proxyput af k = do
-- In order to send to the special remote, the key will
-- need to be inserted into the object directory.
-- It will be dropped again afterwards. Unless it's already
-- present there.
ifM (inAnnex k)
( tryNonAsync (Remote.storeKey r k af nullMeterUpdate) >>= \case
Right () -> liftIO $ sendmessage ALREADY_HAVE
Left err -> liftIO $ propagateerror err
, do
liftIO $ sendmessage $ PUT_FROM (Offset 0)
ifM receivedata
( do
tryNonAsync (Remote.storeKey r k af nullMeterUpdate) >>= \case
Right () -> do
depopulateobjectfile
liftIO $ sendmessage SUCCESS
Left err -> do
depopulateobjectfile
liftIO $ propagateerror err
, liftIO $ sendmessage FAILURE
)
)
where
receivedata = withproxytmpfile k $ \tmpfile ->
liftIO receivemessage >>= \case
Just (DATA (Len _)) -> do
b <- liftIO receivebytestring
liftIO $ L.writeFile (fromRawFilePath tmpfile) b
-- Signal that the whole bytestring
-- has been stored.
liftIO $ atomically $ putTMVar owaitv ()
if protoversion > ProtocolVersion 1
then do
liftIO receivemessage >>= \case
Just (VALIDITY Valid) ->
populateobjectfile tmpfile
Just (VALIDITY Invalid) -> return False
_ -> giveup "protocol error"
else populateobjectfile tmpfile
_ -> giveup "protocol error"
populateobjectfile tmpfile =
getViaTmpFromDisk Remote.RetrievalAllKeysSecure Remote.DefaultVerify k af $ \dest -> do
unVerified $ do
liftIO $ renameFile
(fromRawFilePath tmpfile)
(fromRawFilePath dest)
return True
depopulateobjectfile = void $ tryNonAsync $
lockContentForRemoval k noop removeAnnex
proxyget offset af k = withproxytmpfile k $ \tmpfile -> do
-- Don't verify the content from the remote,
-- because the client will do its own verification.
let vc = Remote.NoVerify
tryNonAsync (Remote.retrieveKeyFile r k af (fromRawFilePath tmpfile) nullMeterUpdate vc) >>= \case
Right v ->
ifM (verifyKeyContentPostRetrieval Remote.RetrievalAllKeysSecure vc v k tmpfile)
ifM (verifyKeyContentPostRetrieval Remote.RetrievalVerifiableKeysSecure vc v k tmpfile)
( liftIO $ senddata offset tmpfile
, liftIO $ sendmessage $
ERROR "verification of content failed"

View file

@ -77,7 +77,7 @@ mkRunState mk = do
data P2PHandle
= P2PHandle Handle
| P2PHandleTMVar (TMVar (Either L.ByteString Message))
| P2PHandleTMVar (TMVar (Either L.ByteString Message)) (TMVar ())
data P2PConnection = P2PConnection
{ connRepo :: Maybe Repo
@ -122,7 +122,7 @@ closeConnection conn = do
closehandle (connOhdl conn)
where
closehandle (P2PHandle h) = hClose h
closehandle (P2PHandleTMVar _) = return ()
closehandle (P2PHandleTMVar _ _) = return ()
-- Serves the protocol on a unix socket.
--
@ -190,7 +190,7 @@ runNet runst conn runner f = case f of
P2PHandle h -> tryNonAsync $ do
hPutStrLn h $ unwords (formatMessage m)
hFlush h
P2PHandleTMVar mv ->
P2PHandleTMVar mv _ ->
ifM (atomically (tryPutTMVar mv (Right m)))
( return $ Right ()
, return $ Left $ toException $
@ -214,7 +214,7 @@ runNet runst conn runner f = case f of
Right (Just l) -> case parseMessage l of
Just m -> gotmessage m
Nothing -> runner (next Nothing)
P2PHandleTMVar mv ->
P2PHandleTMVar mv _ ->
liftIO (atomically (takeTMVar mv)) >>= \case
Right m -> gotmessage m
Left _b -> protoerr
@ -230,8 +230,11 @@ runNet runst conn runner f = case f of
Right False -> return $ Left $
ProtoFailureMessage "short data write"
Left e -> return $ Left $ ProtoFailureException e
P2PHandleTMVar mv -> do
P2PHandleTMVar mv waitv -> do
liftIO $ atomically $ putTMVar mv (Left b)
-- Wait for the whole bytestring to be
-- processed. Necessary due to lazyiness.
liftIO $ atomically $ takeTMVar waitv
runner next
ReceiveBytes len p next ->
case connIhdl conn of
@ -241,7 +244,7 @@ runNet runst conn runner f = case f of
Right b -> runner (next b)
Left e -> return $ Left $
ProtoFailureException e
P2PHandleTMVar mv ->
P2PHandleTMVar mv _ ->
liftIO (atomically (takeTMVar mv)) >>= \case
Left b -> runner (next b)
Right _ -> return $ Left $

View file

@ -149,7 +149,7 @@ spaces, since it's not the last token in the line. Use '%' to indicate
whitespace.)
The server may respond with ALREADY-HAVE if it already
had the conent of that key.
had the content of that key.
In protocol version 2, the server can optionally reply with
ALREADY-HAVE-PLUS. The subsequent list of UUIDs are additional

View file

@ -29,14 +29,26 @@ For June's work on [[design/passthrough_proxy]], remaining todos:
object files around when the client does not send SUCCESS. This would
use more disk, but without streaming, proxying a special remote already
needs some disk. And it could minimize to eg, the last 2 or so.
The design doc has some more thoughts about this.
* If GET from a proxied special remote sends an ERROR with a message
from the special remote, currently the user sees "protocol error".
* Implement PUT to proxied special remotes.
* convert Remote.storeKey to take the path of the object file to send.
It's too ugly that PUT to a proxied special remote currently has to
temporarily populate the proxy's annex object file. There are too many
ways that could lead to surprising behavior, like an interrupted PUT
leaving it populated, or simulantaneous PUTs.
* PUT to a proxied special remote, in the case where the proxy contains
the key, and the special remote is not accessible, sends back ERROR
rather than PUT-FROM or ALREADY-HAVE. Verify that the client processes
that ok and displays it to the user.
* Streaming download from proxied special remotes. See design.
* Check annex.diskreserve when proxying for special remotes.
# items deferred until later for [[design/passthrough_proxy]]
* Indirect uploads when proxying for special remote