diff --git a/Command/P2PStdIO.hs b/Command/P2PStdIO.hs index f6e4ae0f0e..73c38b9063 100644 --- a/Command/P2PStdIO.hs +++ b/Command/P2PStdIO.hs @@ -11,13 +11,10 @@ import Command import P2P.IO import P2P.Annex import qualified P2P.Protocol as P2P -import Git.Types import qualified Annex import Annex.UUID import qualified CmdLine.GitAnnexShell.Checks as Checks import qualified CmdLine.GitAnnexShell.Fields as Fields -import Utility.AuthToken -import Utility.Tmp.Dir cmd :: Command cmd = noMessages $ command "p2pstdio" SectionPlumbing @@ -38,7 +35,9 @@ start = do Just u -> return (toUUID u) myuuid <- getUUID conn <- stdioP2PConnection <$> Annex.gitRepo - let server = P2P.serveAuthed servermode myuuid + let server = do + P2P.net $ P2P.sendMessage (P2P.AUTH_SUCCESS myuuid) + P2P.serveAuthed servermode myuuid runFullProto (Serving theiruuid Nothing) conn server >>= \case Right () -> next $ next $ return True Left e -> giveup e diff --git a/P2P/IO.hs b/P2P/IO.hs index 6cdc5b7d5b..8b532c7f4e 100644 --- a/P2P/IO.hs +++ b/P2P/IO.hs @@ -10,6 +10,7 @@ module P2P.IO ( RunProto , P2PConnection(..) + , ClosableConnection(..) , stdioP2PConnection , connectPeer , closeConnection @@ -51,6 +52,10 @@ data P2PConnection = P2PConnection , connOhdl :: Handle } +data ClosableConnection conn + = OpenConnection conn + | ClosedConnection + -- P2PConnection using stdio. stdioP2PConnection :: Git.Repo -> P2PConnection stdioP2PConnection g = P2PConnection diff --git a/P2P/Protocol.hs b/P2P/Protocol.hs index c750ae6ff1..4acbaadef8 100644 --- a/P2P/Protocol.hs +++ b/P2P/Protocol.hs @@ -250,6 +250,10 @@ $(makeFree ''LocalF) auth :: UUID -> AuthToken -> Proto (Maybe UUID) auth myuuid t = do net $ sendMessage (AUTH myuuid t) + postAuth + +postAuth :: Proto (Maybe UUID) +postAuth = do r <- net receiveMessage case r of AUTH_SUCCESS theiruuid -> return $ Just theiruuid diff --git a/Remote/Git.hs b/Remote/Git.hs index caa6774648..63cfdeae97 100644 --- a/Remote/Git.hs +++ b/Remote/Git.hs @@ -55,6 +55,9 @@ import qualified Remote.Helper.Ssh as Ssh import qualified Remote.GCrypt import qualified Remote.P2P import P2P.Address +import qualified P2P.Protocol as P2P +import qualified P2P.Annex as P2P +import qualified P2P.IO as P2P import Annex.Path import Creds import Messages.Progress @@ -729,10 +732,11 @@ mkCopier remotewanthardlink rsyncparams = do , return copier ) -{- Normally the UUID is checked at startup, but annex-checkuuid config - - can prevent that. To avoid getting confused, a deferred - - check is done just before the repository is used. This returns False - - when the repository UUID is not as expected. -} +{- Normally the UUID of a local repository is checked at startup, + - but annex-checkuuid config can prevent that. To avoid getting + - confused, a deferred check is done just before the repository + - is used. + - This returns False when the repository UUID is not as expected. -} type DeferredUUIDCheck = Annex Bool mkDeferredUUIDCheck :: Git.Repo -> UUID -> RemoteGitConfig -> Annex DeferredUUIDCheck @@ -751,3 +755,32 @@ mkDeferredUUIDCheck r u gc return ok , liftIO $ readMVar v ) + +-- Runs a P2P Proto action on a remote when it supports that, +-- otherwise the fallback action. +runSsh :: Remote -> Ssh.P2PSshConnectionPool -> P2P.Proto a -> Annex a -> Annex a +runSsh r connpool proto fallback = + Ssh.getP2PSshConnection r connpool >>= maybe fallback go + where + go c = do + (c', v) <- runSsh' proto c + case v of + Just res -> do + liftIO $ Ssh.storeP2PSshConnection connpool c' + return res + -- Running the proto failed, either due to a protocol + -- error or a network error, so discard the + -- connection, and run the fallback. + Nothing -> fallback + +runSsh' :: P2P.Proto a -> Ssh.P2PSshConnection -> Annex (Ssh.P2PSshConnection, Maybe a) +runSsh' _ P2P.ClosedConnection = return (P2P.ClosedConnection, Nothing) +runSsh' a conn@(P2P.OpenConnection (c, _pid)) = + P2P.runFullProto P2P.Client c a >>= \case + Right r -> return (conn, Just r) + -- When runFullProto fails, the connection is no longer + -- usable, so close it. + Left e -> do + warning $ "Lost connection (" ++ e ++ ")" + conn' <- liftIO $ Ssh.closeP2PSshConnection conn + return (conn', Nothing) diff --git a/Remote/Helper/Ssh.hs b/Remote/Helper/Ssh.hs index a4d91ab929..96c419dd4b 100644 --- a/Remote/Helper/Ssh.hs +++ b/Remote/Helper/Ssh.hs @@ -1,6 +1,6 @@ {- git-annex remote access with ssh and git-annex-shell - - - Copyright 2011-2013 Joey Hess + - Copyright 2011-2018 Joey Hess - - Licensed under the GNU GPL version 3 or higher. -} @@ -23,6 +23,11 @@ import Utility.SshHost import Types.Remote import Types.Transfer import Config +import qualified P2P.Protocol as P2P +import qualified P2P.IO as P2P + +import Control.Concurrent.Async +import Control.Concurrent.STM toRepo :: ConsumeStdin -> Git.Repo -> RemoteGitConfig -> SshCommand -> Annex (FilePath, [CommandParam]) toRepo cs r gc remotecmd = do @@ -91,9 +96,9 @@ onRemote cs r (with, errorval) command params fields = do inAnnex :: Git.Repo -> Key -> Annex Bool inAnnex r k = do showChecking r - onRemote NoConsumeStdin r (check, cantCheck r) "inannex" [Param $ key2file k] [] + onRemote NoConsumeStdin r (runcheck, cantCheck r) "inannex" [Param $ key2file k] [] where - check c p = dispatch =<< safeSystem c p + runcheck c p = dispatch =<< safeSystem c p dispatch ExitSuccess = return True dispatch (ExitFailure 1) = return False dispatch _ = cantCheck r @@ -179,3 +184,89 @@ rsyncParams r direction = do -- successfully locked. contentLockedMarker :: String contentLockedMarker = "OK" + +-- A connection over ssh to git-annex shell speaking the P2P protocol. +type P2PSshConnection = P2P.ClosableConnection (P2P.P2PConnection, ProcessHandle) + +closeP2PSshConnection :: P2PSshConnection -> IO P2PSshConnection +closeP2PSshConnection P2P.ClosedConnection = return P2P.ClosedConnection +closeP2PSshConnection (P2P.OpenConnection (conn, pid)) = do + P2P.closeConnection conn + void $ async $ waitForProcess pid + return P2P.ClosedConnection + +-- Pool of connections over ssh to git-annex-shell p2pstdio. +type P2PSshConnectionPool = TVar (Maybe P2PSshConnectionPoolState) + +data P2PSshConnectionPoolState + = P2PSshConnections [P2PSshConnection] + -- Remotes using an old version of git-annex-shell don't support P2P + | P2PSshUnsupported + +mkP2PSshConnectionPool :: Annex P2PSshConnectionPool +mkP2PSshConnectionPool = liftIO $ newTVarIO Nothing + +-- Takes a connection from the pool, if any are available, otherwise +-- tries to open a new one. +getP2PSshConnection :: Remote -> P2PSshConnectionPool -> Annex (Maybe P2PSshConnection) +getP2PSshConnection r connpool = getexistingconn >>= \case + Nothing -> return Nothing + Just Nothing -> openP2PSshConnection r connpool + Just (Just c) -> return (Just c) + where + getexistingconn = liftIO $ atomically $ readTVar connpool >>= \case + Just P2PSshUnsupported -> return Nothing + Just (P2PSshConnections (c:cs)) -> do + writeTVar connpool (Just (P2PSshConnections cs)) + return (Just (Just c)) + Just (P2PSshConnections []) -> return (Just Nothing) + Nothing -> return (Just Nothing) + +-- Add a connection to the pool, unless it's closed. +storeP2PSshConnection :: P2PSshConnectionPool -> P2PSshConnection -> IO () +storeP2PSshConnection _ P2P.ClosedConnection = return () +storeP2PSshConnection connpool conn = atomically $ modifyTVar' connpool $ \case + Just (P2PSshConnections cs) -> Just (P2PSshConnections (conn:cs)) + _ -> Just (P2PSshConnections [conn]) + +-- Try to open a P2PSshConnection. +-- The new connection is not added to the pool, so it's available +-- for the caller to use. +-- If the remote does not support the P2P protocol, that's remembered in +-- the connection pool. +openP2PSshConnection :: Remote -> P2PSshConnectionPool -> Annex (Maybe P2PSshConnection) +openP2PSshConnection r connpool = + git_annex_shell ConsumeStdin (repo r) "p2pstdio" [] [] >>= \case + Nothing -> do + liftIO $ rememberunsupported + return Nothing + Just (cmd, params) -> start cmd params + where + start cmd params = liftIO $ withNullHandle $ \nullh -> do + -- stderr is discarded because old versions of git-annex + -- shell always error + (Just from, Just to, Nothing, pid) <- createProcess $ + (proc cmd (toCommand params)) + { std_in = CreatePipe + , std_out = CreatePipe + , std_err = UseHandle nullh + } + let conn = P2P.P2PConnection + { P2P.connRepo = repo r + , P2P.connCheckAuth = const False + , P2P.connIhdl = from + , P2P.connOhdl = to + } + let c = P2P.OpenConnection (conn, pid) + -- When the connection is successful, the peer + -- will send an AUTH_SUCCESS with its uuid. + tryNonAsync (P2P.runNetProto conn $ P2P.postAuth) >>= \case + Right (Right (Just theiruuid)) | theiruuid == uuid r -> + return $ Just c + _ -> do + void $ closeP2PSshConnection c + rememberunsupported + return Nothing + rememberunsupported = atomically $ + modifyTVar' connpool $ + maybe (Just P2PSshUnsupported) Just diff --git a/Remote/P2P.hs b/Remote/P2P.hs index 83ce258dee..cfed5c6044 100644 --- a/Remote/P2P.hs +++ b/Remote/P2P.hs @@ -116,10 +116,8 @@ lock u addr connpool k callback = go False = giveup "can't lock content" go True = withVerifiedCopy LockedCopy u (return True) callback --- | A connection to the peer. -data Connection - = OpenConnection P2PConnection - | ClosedConnection +-- | A connection to the peer, which can be closed. +type Connection = ClosableConnection P2PConnection type ConnectionPool = TVar [Connection] diff --git a/Utility/Process.hs b/Utility/Process.hs index ff454f7999..1807a13357 100644 --- a/Utility/Process.hs +++ b/Utility/Process.hs @@ -27,6 +27,7 @@ module Utility.Process ( withHandle, withIOHandles, withOEHandles, + withNullHandle, withQuietOutput, feedWithQuietOutput, createProcess, @@ -213,13 +214,16 @@ withOEHandles creator p a = creator p' $ a . oeHandles , std_err = CreatePipe } +withNullHandle :: (Handle -> IO a) -> IO a +withNullHandle = withFile devNull WriteMode + -- | Forces the CreateProcessRunner to run quietly; -- both stdout and stderr are discarded. withQuietOutput :: CreateProcessRunner -> CreateProcess -> IO () -withQuietOutput creator p = withFile devNull WriteMode $ \nullh -> do +withQuietOutput creator p = withNullHandle $ \nullh -> do let p' = p { std_out = UseHandle nullh , std_err = UseHandle nullh