p2p ssh connection pools

Much like Remote.P2P, there's a pool of connections to a peer, in order
to support concurrent operations.

Deals with old git-annex-ssh on the remote that does not support p2pstdio,
by only trying once to use it, and remembering if it's not supported.

Made p2pstdio send an AUTH_SUCCESS with its uuid, which serves the dual
purposes of something to detect to see that the connection is working,
and a way to verify that it's connected to the right uuid.
(There's a redundant uuid check since the uuid field is sent
by git_annex_shell, but I anticipate that being removed later when
the legacy git-annex-shell stuff gets removed.)

Not entirely happy with Remote.Git.runSsh's behavior
when the proto action fails. Running the fallback will work ok, but what
will we do when the fallbacks later get removed? It might be better to
try to reconnect, in case the connection got closed.

This commit was sponsored by Boyd Stephen Smith Jr. on Patreon.
This commit is contained in:
Joey Hess 2018-03-08 14:02:18 -04:00
parent 978078f0fe
commit c036a380b2
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
7 changed files with 150 additions and 16 deletions

View file

@ -11,13 +11,10 @@ import Command
import P2P.IO
import P2P.Annex
import qualified P2P.Protocol as P2P
import Git.Types
import qualified Annex
import Annex.UUID
import qualified CmdLine.GitAnnexShell.Checks as Checks
import qualified CmdLine.GitAnnexShell.Fields as Fields
import Utility.AuthToken
import Utility.Tmp.Dir
cmd :: Command
cmd = noMessages $ command "p2pstdio" SectionPlumbing
@ -38,7 +35,9 @@ start = do
Just u -> return (toUUID u)
myuuid <- getUUID
conn <- stdioP2PConnection <$> Annex.gitRepo
let server = P2P.serveAuthed servermode myuuid
let server = do
P2P.net $ P2P.sendMessage (P2P.AUTH_SUCCESS myuuid)
P2P.serveAuthed servermode myuuid
runFullProto (Serving theiruuid Nothing) conn server >>= \case
Right () -> next $ next $ return True
Left e -> giveup e

View file

@ -10,6 +10,7 @@
module P2P.IO
( RunProto
, P2PConnection(..)
, ClosableConnection(..)
, stdioP2PConnection
, connectPeer
, closeConnection
@ -51,6 +52,10 @@ data P2PConnection = P2PConnection
, connOhdl :: Handle
}
data ClosableConnection conn
= OpenConnection conn
| ClosedConnection
-- P2PConnection using stdio.
stdioP2PConnection :: Git.Repo -> P2PConnection
stdioP2PConnection g = P2PConnection

View file

@ -250,6 +250,10 @@ $(makeFree ''LocalF)
auth :: UUID -> AuthToken -> Proto (Maybe UUID)
auth myuuid t = do
net $ sendMessage (AUTH myuuid t)
postAuth
postAuth :: Proto (Maybe UUID)
postAuth = do
r <- net receiveMessage
case r of
AUTH_SUCCESS theiruuid -> return $ Just theiruuid

View file

@ -55,6 +55,9 @@ import qualified Remote.Helper.Ssh as Ssh
import qualified Remote.GCrypt
import qualified Remote.P2P
import P2P.Address
import qualified P2P.Protocol as P2P
import qualified P2P.Annex as P2P
import qualified P2P.IO as P2P
import Annex.Path
import Creds
import Messages.Progress
@ -729,10 +732,11 @@ mkCopier remotewanthardlink rsyncparams = do
, return copier
)
{- Normally the UUID is checked at startup, but annex-checkuuid config
- can prevent that. To avoid getting confused, a deferred
- check is done just before the repository is used. This returns False
- when the repository UUID is not as expected. -}
{- Normally the UUID of a local repository is checked at startup,
- but annex-checkuuid config can prevent that. To avoid getting
- confused, a deferred check is done just before the repository
- is used.
- This returns False when the repository UUID is not as expected. -}
type DeferredUUIDCheck = Annex Bool
mkDeferredUUIDCheck :: Git.Repo -> UUID -> RemoteGitConfig -> Annex DeferredUUIDCheck
@ -751,3 +755,32 @@ mkDeferredUUIDCheck r u gc
return ok
, liftIO $ readMVar v
)
-- Runs a P2P Proto action on a remote when it supports that,
-- otherwise the fallback action.
runSsh :: Remote -> Ssh.P2PSshConnectionPool -> P2P.Proto a -> Annex a -> Annex a
runSsh r connpool proto fallback =
Ssh.getP2PSshConnection r connpool >>= maybe fallback go
where
go c = do
(c', v) <- runSsh' proto c
case v of
Just res -> do
liftIO $ Ssh.storeP2PSshConnection connpool c'
return res
-- Running the proto failed, either due to a protocol
-- error or a network error, so discard the
-- connection, and run the fallback.
Nothing -> fallback
runSsh' :: P2P.Proto a -> Ssh.P2PSshConnection -> Annex (Ssh.P2PSshConnection, Maybe a)
runSsh' _ P2P.ClosedConnection = return (P2P.ClosedConnection, Nothing)
runSsh' a conn@(P2P.OpenConnection (c, _pid)) =
P2P.runFullProto P2P.Client c a >>= \case
Right r -> return (conn, Just r)
-- When runFullProto fails, the connection is no longer
-- usable, so close it.
Left e -> do
warning $ "Lost connection (" ++ e ++ ")"
conn' <- liftIO $ Ssh.closeP2PSshConnection conn
return (conn', Nothing)

View file

@ -1,6 +1,6 @@
{- git-annex remote access with ssh and git-annex-shell
-
- Copyright 2011-2013 Joey Hess <id@joeyh.name>
- Copyright 2011-2018 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU GPL version 3 or higher.
-}
@ -23,6 +23,11 @@ import Utility.SshHost
import Types.Remote
import Types.Transfer
import Config
import qualified P2P.Protocol as P2P
import qualified P2P.IO as P2P
import Control.Concurrent.Async
import Control.Concurrent.STM
toRepo :: ConsumeStdin -> Git.Repo -> RemoteGitConfig -> SshCommand -> Annex (FilePath, [CommandParam])
toRepo cs r gc remotecmd = do
@ -91,9 +96,9 @@ onRemote cs r (with, errorval) command params fields = do
inAnnex :: Git.Repo -> Key -> Annex Bool
inAnnex r k = do
showChecking r
onRemote NoConsumeStdin r (check, cantCheck r) "inannex" [Param $ key2file k] []
onRemote NoConsumeStdin r (runcheck, cantCheck r) "inannex" [Param $ key2file k] []
where
check c p = dispatch =<< safeSystem c p
runcheck c p = dispatch =<< safeSystem c p
dispatch ExitSuccess = return True
dispatch (ExitFailure 1) = return False
dispatch _ = cantCheck r
@ -179,3 +184,89 @@ rsyncParams r direction = do
-- successfully locked.
contentLockedMarker :: String
contentLockedMarker = "OK"
-- A connection over ssh to git-annex shell speaking the P2P protocol.
type P2PSshConnection = P2P.ClosableConnection (P2P.P2PConnection, ProcessHandle)
closeP2PSshConnection :: P2PSshConnection -> IO P2PSshConnection
closeP2PSshConnection P2P.ClosedConnection = return P2P.ClosedConnection
closeP2PSshConnection (P2P.OpenConnection (conn, pid)) = do
P2P.closeConnection conn
void $ async $ waitForProcess pid
return P2P.ClosedConnection
-- Pool of connections over ssh to git-annex-shell p2pstdio.
type P2PSshConnectionPool = TVar (Maybe P2PSshConnectionPoolState)
data P2PSshConnectionPoolState
= P2PSshConnections [P2PSshConnection]
-- Remotes using an old version of git-annex-shell don't support P2P
| P2PSshUnsupported
mkP2PSshConnectionPool :: Annex P2PSshConnectionPool
mkP2PSshConnectionPool = liftIO $ newTVarIO Nothing
-- Takes a connection from the pool, if any are available, otherwise
-- tries to open a new one.
getP2PSshConnection :: Remote -> P2PSshConnectionPool -> Annex (Maybe P2PSshConnection)
getP2PSshConnection r connpool = getexistingconn >>= \case
Nothing -> return Nothing
Just Nothing -> openP2PSshConnection r connpool
Just (Just c) -> return (Just c)
where
getexistingconn = liftIO $ atomically $ readTVar connpool >>= \case
Just P2PSshUnsupported -> return Nothing
Just (P2PSshConnections (c:cs)) -> do
writeTVar connpool (Just (P2PSshConnections cs))
return (Just (Just c))
Just (P2PSshConnections []) -> return (Just Nothing)
Nothing -> return (Just Nothing)
-- Add a connection to the pool, unless it's closed.
storeP2PSshConnection :: P2PSshConnectionPool -> P2PSshConnection -> IO ()
storeP2PSshConnection _ P2P.ClosedConnection = return ()
storeP2PSshConnection connpool conn = atomically $ modifyTVar' connpool $ \case
Just (P2PSshConnections cs) -> Just (P2PSshConnections (conn:cs))
_ -> Just (P2PSshConnections [conn])
-- Try to open a P2PSshConnection.
-- The new connection is not added to the pool, so it's available
-- for the caller to use.
-- If the remote does not support the P2P protocol, that's remembered in
-- the connection pool.
openP2PSshConnection :: Remote -> P2PSshConnectionPool -> Annex (Maybe P2PSshConnection)
openP2PSshConnection r connpool =
git_annex_shell ConsumeStdin (repo r) "p2pstdio" [] [] >>= \case
Nothing -> do
liftIO $ rememberunsupported
return Nothing
Just (cmd, params) -> start cmd params
where
start cmd params = liftIO $ withNullHandle $ \nullh -> do
-- stderr is discarded because old versions of git-annex
-- shell always error
(Just from, Just to, Nothing, pid) <- createProcess $
(proc cmd (toCommand params))
{ std_in = CreatePipe
, std_out = CreatePipe
, std_err = UseHandle nullh
}
let conn = P2P.P2PConnection
{ P2P.connRepo = repo r
, P2P.connCheckAuth = const False
, P2P.connIhdl = from
, P2P.connOhdl = to
}
let c = P2P.OpenConnection (conn, pid)
-- When the connection is successful, the peer
-- will send an AUTH_SUCCESS with its uuid.
tryNonAsync (P2P.runNetProto conn $ P2P.postAuth) >>= \case
Right (Right (Just theiruuid)) | theiruuid == uuid r ->
return $ Just c
_ -> do
void $ closeP2PSshConnection c
rememberunsupported
return Nothing
rememberunsupported = atomically $
modifyTVar' connpool $
maybe (Just P2PSshUnsupported) Just

View file

@ -116,10 +116,8 @@ lock u addr connpool k callback =
go False = giveup "can't lock content"
go True = withVerifiedCopy LockedCopy u (return True) callback
-- | A connection to the peer.
data Connection
= OpenConnection P2PConnection
| ClosedConnection
-- | A connection to the peer, which can be closed.
type Connection = ClosableConnection P2PConnection
type ConnectionPool = TVar [Connection]

View file

@ -27,6 +27,7 @@ module Utility.Process (
withHandle,
withIOHandles,
withOEHandles,
withNullHandle,
withQuietOutput,
feedWithQuietOutput,
createProcess,
@ -213,13 +214,16 @@ withOEHandles creator p a = creator p' $ a . oeHandles
, std_err = CreatePipe
}
withNullHandle :: (Handle -> IO a) -> IO a
withNullHandle = withFile devNull WriteMode
-- | Forces the CreateProcessRunner to run quietly;
-- both stdout and stderr are discarded.
withQuietOutput
:: CreateProcessRunner
-> CreateProcess
-> IO ()
withQuietOutput creator p = withFile devNull WriteMode $ \nullh -> do
withQuietOutput creator p = withNullHandle $ \nullh -> do
let p' = p
{ std_out = UseHandle nullh
, std_err = UseHandle nullh