move protocol version stuff to the Net free monad

Needs to be in Net not Local, so that Net actions can take the protocol
version into account.

This commit was sponsored by an anonymous bitcoin donor.
This commit is contained in:
Joey Hess 2018-03-12 15:19:40 -04:00
parent c81768d425
commit 596af7cbc4
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
9 changed files with 61 additions and 52 deletions

View file

@ -57,6 +57,7 @@ connectService address port service = do
myuuid <- getUUID
g <- Annex.gitRepo
conn <- liftIO $ connectPeer g (TorAnnex address port)
liftIO $ runNetProto conn $ auth myuuid authtoken noop >>= \case
runst <- liftIO $ mkRunState Client
liftIO $ runNetProto runst conn $ auth myuuid authtoken noop >>= \case
Just _theiruuid -> connect service stdin stdout
Nothing -> giveup $ "authentication failed, perhaps you need to set " ++ p2pAuthTokenEnv

View file

@ -123,7 +123,8 @@ checkHiddenService = bracket setup cleanup go
, connIhdl = h
, connOhdl = h
}
void $ runNetProto conn $ P2P.serveAuth u
runst <- mkRunState Client
void $ runNetProto runst conn $ P2P.serveAuth u
hClose h
haslistener sockfile = catchBoolIO $ do

View file

@ -310,7 +310,8 @@ setupLink remotename (P2PAddressAuth addr authtoken) = do
Right conn -> do
u <- getUUID
let proto = P2P.auth u authtoken noop
go =<< liftIO (runNetProto conn proto)
runst <- liftIO $ mkRunState Client
go =<< liftIO (runNetProto runst conn proto)
where
go (Right (Just theiruuid)) = do
ok <- inRepo $ Git.Command.runBool

View file

@ -25,16 +25,6 @@ import Types.NumCopies
import Utility.Metered
import Control.Monad.Free
import Control.Concurrent.STM
data RunState
= Serving UUID (Maybe ChangedRefsHandle) (TVar ProtocolVersion)
| Client (TVar ProtocolVersion)
mkRunState :: (TVar ProtocolVersion -> RunState) -> IO RunState
mkRunState mk = do
tvar <- newTVarIO defaultProtocolVersion
return (mk tvar)
-- Full interpreter for Proto, that can receive and send objects.
runFullProto :: RunState -> P2PConnection -> Proto a -> Annex (Either String a)
@ -42,7 +32,7 @@ runFullProto runst conn = go
where
go :: RunProto Annex
go (Pure v) = return (Right v)
go (Free (Net n)) = runNet conn go n
go (Free (Net n)) = runNet runst conn go n
go (Free (Local l)) = runLocal runst go l
runLocal :: RunState -> RunProto Annex -> LocalF (Proto a) -> Annex (Either String a)
@ -127,11 +117,6 @@ runLocal runst runner a = case a of
Left e -> return (Left (show e))
Right changedrefs -> runner (next changedrefs)
_ -> return $ Left "change notification not available"
SetProtocolVersion v next -> do
liftIO $ atomically $ writeTVar versiontvar v
runner next
GetProtocolVersion next ->
liftIO (readTVarIO versiontvar) >>= runner . next
where
transfer mk k af ta = case runst of
-- Update transfer logs when serving.
@ -164,6 +149,3 @@ runLocal runst runner a = case a of
liftIO $ hSeek h AbsoluteSeek o
b <- liftIO $ hGetContentsMetered h p'
runner (sender b)
versiontvar = case runst of
Serving _ _ tv -> tv
Client tv -> tv

View file

@ -9,6 +9,8 @@
module P2P.IO
( RunProto
, RunState(..)
, mkRunState
, P2PConnection(..)
, ClosableConnection(..)
, stdioP2PConnection
@ -30,6 +32,8 @@ import Utility.SimpleProtocol
import Utility.Metered
import Utility.Tor
import Utility.FileMode
import Types.UUID
import Annex.ChangedRefs
import Control.Monad.Free
import Control.Monad.IO.Class
@ -37,6 +41,7 @@ import System.Exit (ExitCode(..))
import Network.Socket
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as L
import System.Log.Logger (debugM)
@ -45,6 +50,15 @@ import qualified Network.Socket as S
-- Type of interpreters of the Proto free monad.
type RunProto m = forall a. (MonadIO m, MonadMask m) => Proto a -> m (Either String a)
data RunState
= Serving UUID (Maybe ChangedRefsHandle) (TVar ProtocolVersion)
| Client (TVar ProtocolVersion)
mkRunState :: (TVar ProtocolVersion -> RunState) -> IO RunState
mkRunState mk = do
tvar <- newTVarIO defaultProtocolVersion
return (mk tvar)
data P2PConnection = P2PConnection
{ connRepo :: Repo
, connCheckAuth :: (AuthToken -> Bool)
@ -121,20 +135,20 @@ setupHandle s = do
-- This only runs Net actions. No Local actions will be run
-- (those need the Annex monad) -- if the interpreter reaches any,
-- it returns Nothing.
runNetProto :: P2PConnection -> Proto a -> IO (Either String a)
runNetProto conn = go
runNetProto :: RunState -> P2PConnection -> Proto a -> IO (Either String a)
runNetProto runst conn = go
where
go :: RunProto IO
go (Pure v) = return (Right v)
go (Free (Net n)) = runNet conn go n
go (Free (Net n)) = runNet runst conn go n
go (Free (Local _)) = return (Left "unexpected annex operation attempted")
-- Interpreter of the Net part of Proto.
--
-- An interpreter of Proto has to be provided, to handle the rest of Proto
-- actions.
runNet :: (MonadIO m, MonadMask m) => P2PConnection -> RunProto m -> NetF (Proto a) -> m (Either String a)
runNet conn runner f = case f of
runNet :: (MonadIO m, MonadMask m) => RunState -> P2PConnection -> RunProto m -> NetF (Proto a) -> m (Either String a)
runNet runst conn runner f = case f of
SendMessage m next -> do
v <- liftIO $ tryNonAsync $ do
let l = unwords (formatMessage m)
@ -181,11 +195,19 @@ runNet conn runner f = case f of
case v of
Left e -> return (Left e)
Right () -> runner next
SetProtocolVersion v next -> do
liftIO $ atomically $ writeTVar versiontvar v
runner next
GetProtocolVersion next ->
liftIO (readTVarIO versiontvar) >>= runner . next
where
-- This is only used for running Net actions when relaying,
-- so it's ok to use runNetProto, despite it not supporting
-- all Proto actions.
runnerio = runNetProto conn
runnerio = runNetProto runst conn
versiontvar = case runst of
Serving _ _ tv -> tv
Client tv -> tv
debugMessage :: String -> Message -> IO ()
debugMessage prefix m = debugM "p2p" $

View file

@ -208,6 +208,9 @@ data NetF c
-- peer, while at the same time accepting input from the peer
-- which is sent the the second RelayHandle. Continues until
-- the peer sends an ExitCode.
| SetProtocolVersion ProtocolVersion c
--- ^ Called when a new protocol version has been negotiated.
| GetProtocolVersion (ProtocolVersion -> c)
deriving (Functor)
type Net = Free NetF
@ -255,9 +258,6 @@ data LocalF c
-- present, runs the protocol action with False.
| WaitRefChange (ChangedRefs -> c)
-- ^ Waits for one or more git refs to change and returns them.
| SetProtocolVersion ProtocolVersion c
--- ^ Called when a new protocol version has been negotiated.
| GetProtocolVersion (ProtocolVersion -> c)
deriving (Functor)
type Local = Free LocalF
@ -288,7 +288,7 @@ negotiateProtocolVersion preferredversion = do
net $ sendMessage (VERSION preferredversion)
r <- net receiveMessage
case r of
Just (VERSION v) -> local $ setProtocolVersion v
Just (VERSION v) -> net $ setProtocolVersion v
-- Old server doesn't know about the VERSION command.
Just (ERROR _) -> return ()
_ -> net $ sendMessage (ERROR "expected VERSION")
@ -403,7 +403,7 @@ serveAuthed servermode myuuid = void $ serverLoop handler
readonlyerror = net $ sendMessage (ERROR "this repository is read-only; write access denied")
handler (VERSION theirversion) = do
let v = min theirversion maxProtocolVersion
local $ setProtocolVersion v
net $ setProtocolVersion v
net $ sendMessage (VERSION v)
return ServerContinue
handler (LOCKCONTENT key) = do

View file

@ -245,11 +245,10 @@ openP2PSshConnection r connpool = do
return Nothing
Just (cmd, params) -> start cmd params
where
start cmd params = do
start cmd params = liftIO $ withNullHandle $ \nullh -> do
-- stderr is discarded because old versions of git-annex
-- shell always error
(Just from, Just to, Nothing, pid) <- liftIO $
withNullHandle $ \nullh -> createProcess $
(Just from, Just to, Nothing, pid) <- createProcess $
(proc cmd (toCommand params))
{ std_in = CreatePipe
, std_out = CreatePipe
@ -261,18 +260,18 @@ openP2PSshConnection r connpool = do
, P2P.connIhdl = to
, P2P.connOhdl = from
}
runst <- liftIO $ P2P.mkRunState P2P.Client
runst <- P2P.mkRunState P2P.Client
let c = P2P.OpenConnection (runst, conn, pid)
-- When the connection is successful, the remote
-- will send an AUTH_SUCCESS with its uuid.
let proto = P2P.postAuth $
P2P.negotiateProtocolVersion P2P.maxProtocolVersion
tryNonAsync (P2P.runFullProto runst conn proto) >>= \case
tryNonAsync (P2P.runNetProto runst conn proto) >>= \case
Right (Right (Just theiruuid)) | theiruuid == uuid r ->
return $ Just c
_ -> do
void $ liftIO $ closeP2PSshConnection c
liftIO rememberunsupported
void $ closeP2PSshConnection c
rememberunsupported
return Nothing
rememberunsupported = atomically $
modifyTVar' connpool $

View file

@ -148,7 +148,7 @@ openConnection u addr = do
--P2P.negotiateProtocolVersion P2P.maxProtocolVersion
return ()
runst <- liftIO $ mkRunState Client
res <- runFullProto runst conn proto
res <- liftIO $ runNetProto runst conn proto
case res of
Right (Just theiruuid)
| u == theiruuid -> return (OpenConnection (runst, conn))

View file

@ -115,7 +115,9 @@ serveClient th u r q = bracket setup cleanup start
, connIhdl = h
, connOhdl = h
}
v <- liftIO $ runNetProto conn $ P2P.serveAuth u
-- not really Client, but we don't know their uuid yet
runstauth <- liftIO $ mkRunState Client
v <- liftIO $ runNetProto runstauth conn $ P2P.serveAuth u
case v of
Right (Just theiruuid) -> authed conn theiruuid
Right Nothing -> liftIO $
@ -147,7 +149,8 @@ transport (RemoteRepo r gc) url@(RemoteURI uri) th ichan ochan =
myuuid <- liftAnnex th getUUID
authtoken <- fromMaybe nullAuthToken
<$> liftAnnex th (loadP2PRemoteAuthToken addr)
res <- runNetProto conn $ P2P.auth myuuid authtoken noop
runst <- mkRunState Client
res <- runNetProto runst conn $ P2P.auth myuuid authtoken noop
case res of
Right (Just theiruuid) -> do
expecteduuid <- liftAnnex th $ getRepoUUID r
@ -155,7 +158,7 @@ transport (RemoteRepo r gc) url@(RemoteURI uri) th ichan ochan =
then do
send (CONNECTED url)
status <- handlecontrol
`race` handlepeer conn
`race` handlepeer runst conn
send (DISCONNECTED url)
return $ either id id status
else return ConnectionStopping
@ -170,13 +173,13 @@ transport (RemoteRepo r gc) url@(RemoteURI uri) th ichan ochan =
LOSTNET -> return ConnectionStopping
_ -> handlecontrol
handlepeer conn = do
v <- runNetProto conn P2P.notifyChange
handlepeer runst conn = do
v <- runNetProto runst conn P2P.notifyChange
case v of
Right (Just (ChangedRefs shas)) -> do
whenM (checkShouldFetch gc th shas) $
fetch
handlepeer conn
handlepeer runst conn
_ -> return ConnectionClosed
fetch = do