diff --git a/P2P/Address.hs b/P2P/Address.hs index 09ffc79735..1b1f66059e 100644 --- a/P2P/Address.hs +++ b/P2P/Address.hs @@ -37,15 +37,18 @@ class FormatP2PAddress a where instance FormatP2PAddress P2PAddress where formatP2PAddress (TorAnnex (OnionAddress onionaddr) onionport) = - "tor-annex::" ++ onionaddr ++ ":" ++ show onionport + torAnnexScheme ++ ":" ++ onionaddr ++ ":" ++ show onionport unformatP2PAddress s - | "tor-annex::" `isPrefixOf` s = do + | (torAnnexScheme ++ ":") `isPrefixOf` s = do let s' = dropWhile (== ':') $ dropWhile (/= ':') s let (onionaddr, ps) = separate (== ':') s' onionport <- readish ps return (TorAnnex (OnionAddress onionaddr) onionport) | otherwise = Nothing +torAnnexScheme :: String +torAnnexScheme = "tor-annex:" + instance FormatP2PAddress P2PAddressAuth where formatP2PAddress (P2PAddressAuth addr authtoken) = formatP2PAddress addr ++ ":" ++ T.unpack (fromAuthToken authtoken) diff --git a/P2P/Protocol.hs b/P2P/Protocol.hs index d8be3ff426..c3c362f37d 100644 --- a/P2P/Protocol.hs +++ b/P2P/Protocol.hs @@ -441,6 +441,16 @@ sendSuccess :: Bool -> Proto () sendSuccess True = net $ sendMessage SUCCESS sendSuccess False = net $ sendMessage FAILURE +notifyChange :: Proto (Maybe ChangedRefs) +notifyChange = do + net $ sendMessage NOTIFYCHANGE + ack <- net receiveMessage + case ack of + CHANGED rs -> return (Just rs) + _ -> do + net $ sendMessage (ERROR "expected CHANGED") + return Nothing + connect :: Service -> Handle -> Handle -> Proto ExitCode connect service hin hout = do net $ sendMessage (CONNECT service) diff --git a/RemoteDaemon/Common.hs b/RemoteDaemon/Common.hs index 982a84b439..711771f974 100644 --- a/RemoteDaemon/Common.hs +++ b/RemoteDaemon/Common.hs @@ -1,6 +1,6 @@ {- git-remote-daemon utilities - - - Copyright 2014 Joey Hess + - Copyright 2014-2016 Joey Hess - - Licensed under the GNU GPL version 3 or higher. -} @@ -9,6 +9,8 @@ module RemoteDaemon.Common ( liftAnnex , inLocalRepo , checkNewShas + , ConnectionStatus(..) + , robustConnection ) where import qualified Annex @@ -16,6 +18,7 @@ import Annex.Common import RemoteDaemon.Types import qualified Git import Annex.CatFile +import Utility.ThreadScheduler import Control.Concurrent @@ -40,3 +43,22 @@ checkNewShas transporthandle = check check [] = return True check (r:rs) = maybe (check rs) (const $ return False) =<< liftAnnex transporthandle (catObjectDetails r) + +data ConnectionStatus = ConnectionStopping | ConnectionClosed + +{- Make connection robust, retrying on error, with exponential backoff. -} +robustConnection :: Int -> IO ConnectionStatus -> IO () +robustConnection backoff a = + caught =<< a `catchNonAsync` (const $ return ConnectionClosed) + where + caught ConnectionStopping = return () + caught ConnectionClosed = do + threadDelaySeconds (Seconds backoff) + robustConnection increasedbackoff a + + increasedbackoff + | b2 > maxbackoff = maxbackoff + | otherwise = b2 + where + b2 = backoff * 2 + maxbackoff = 3600 -- one hour diff --git a/RemoteDaemon/Transport.hs b/RemoteDaemon/Transport.hs index 6605012de3..053973424f 100644 --- a/RemoteDaemon/Transport.hs +++ b/RemoteDaemon/Transport.hs @@ -12,6 +12,7 @@ import qualified RemoteDaemon.Transport.Ssh import qualified RemoteDaemon.Transport.GCrypt import qualified RemoteDaemon.Transport.Tor import qualified Git.GCrypt +import P2P.Address (torAnnexScheme) import qualified Data.Map as M @@ -22,6 +23,7 @@ remoteTransports :: M.Map TransportScheme Transport remoteTransports = M.fromList [ ("ssh:", RemoteDaemon.Transport.Ssh.transport) , (Git.GCrypt.urlScheme, RemoteDaemon.Transport.GCrypt.transport) + , (torAnnexScheme, RemoteDaemon.Transport.Tor.transport) ] remoteServers :: [TransportHandle -> IO ()] diff --git a/RemoteDaemon/Transport/Ssh.hs b/RemoteDaemon/Transport/Ssh.hs index 59502f8d3c..6f8e8323e8 100644 --- a/RemoteDaemon/Transport/Ssh.hs +++ b/RemoteDaemon/Transport/Ssh.hs @@ -16,7 +16,6 @@ import qualified RemoteDaemon.Transport.Ssh.Types as SshRemote import Utility.SimpleProtocol import qualified Git import Git.Command -import Utility.ThreadScheduler import Annex.ChangedRefs import Control.Concurrent.STM @@ -38,7 +37,7 @@ transportUsingCmd cmd params rr@(RemoteRepo r gc) url h@(TransportHandle (LocalR transportUsingCmd' :: FilePath -> [CommandParam] -> Transport transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan = - robustly 1 $ do + robustConnection 1 $ do (Just toh, Just fromh, Just errh, pid) <- createProcess (proc cmd (toCommand params)) { std_in = CreatePipe @@ -79,13 +78,13 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan = fetch handlestdout fromh -- avoid reconnect on protocol error - Nothing -> return Stopping + Nothing -> return ConnectionStopping handlecontrol = do msg <- atomically $ readTChan ichan case msg of - STOP -> return Stopping - LOSTNET -> return Stopping + STOP -> return ConnectionStopping + LOSTNET -> return ConnectionStopping _ -> handlecontrol -- Old versions of git-annex-shell that do not support @@ -103,23 +102,5 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan = , "needs its git-annex upgraded" , "to 5.20140405 or newer" ] - return Stopping + return ConnectionStopping else handlestderr errh - -data Status = Stopping | ConnectionClosed - -{- Make connection robustly, with exponential backoff on failure. -} -robustly :: Int -> IO Status -> IO () -robustly backoff a = caught =<< catchDefaultIO ConnectionClosed a - where - caught Stopping = return () - caught ConnectionClosed = do - threadDelaySeconds (Seconds backoff) - robustly increasedbackoff a - - increasedbackoff - | b2 > maxbackoff = maxbackoff - | otherwise = b2 - where - b2 = backoff * 2 - maxbackoff = 3600 -- one hour diff --git a/RemoteDaemon/Transport/Tor.hs b/RemoteDaemon/Transport/Tor.hs index 5ea06ac2c3..20320caddd 100644 --- a/RemoteDaemon/Transport/Tor.hs +++ b/RemoteDaemon/Transport/Tor.hs @@ -1,11 +1,11 @@ -{- git-remote-daemon, tor hidden service transport +{- git-remote-daemon, tor hidden service server and transport - - Copyright 2016 Joey Hess - - Licensed under the GNU GPL version 3 or higher. -} -module RemoteDaemon.Transport.Tor (server) where +module RemoteDaemon.Transport.Tor (server, transport) where import Common import qualified Annex @@ -16,20 +16,23 @@ import RemoteDaemon.Common import Utility.Tor import Utility.FileMode import Utility.AuthToken -import P2P.Protocol +import P2P.Protocol as P2P import P2P.IO import P2P.Annex import P2P.Auth +import P2P.Address import Annex.UUID import Types.UUID import Messages import Git +import Git.Command import System.PosixCompat.User -import Network.Socket import Control.Concurrent import System.Log.Logger (debugM) import Control.Concurrent.STM +import Control.Concurrent.Async +import qualified Network.Socket as S -- Run tor hidden service. server :: TransportHandle -> IO () @@ -44,17 +47,17 @@ server th@(TransportHandle (LocalRepo r) _) = do let ident = fromUUID u let sock = hiddenServiceSocketFile uid ident nukeFile sock - soc <- socket AF_UNIX Stream defaultProtocol - bind soc (SockAddrUnix sock) + soc <- S.socket S.AF_UNIX S.Stream S.defaultProtocol + S.bind soc (S.SockAddrUnix sock) -- Allow everyone to read and write to the socket; tor is probably -- running as a different user. Connections have to authenticate -- to do anything, so it's fine that other local users can connect. modifyFileMode sock $ addModes [groupReadMode, groupWriteMode, otherReadMode, otherWriteMode] - listen soc 2 + S.listen soc 2 debugM "remotedaemon" "Tor hidden service running" forever $ do - (conn, _) <- accept soc + (conn, _) <- S.accept soc h <- setupHandle conn ok <- atomically $ ifM (isFullTBQueue q) ( return False @@ -97,7 +100,7 @@ serveClient th u r q = bracket setup cleanup start , connIhdl = h , connOhdl = h } - v <- liftIO $ runNetProto conn $ serveAuth u + v <- liftIO $ runNetProto conn $ P2P.serveAuth u case v of Right (Just theiruuid) -> authed conn theiruuid Right Nothing -> liftIO $ @@ -110,7 +113,57 @@ serveClient th u r q = bracket setup cleanup start authed conn theiruuid = bracket watchChangedRefs (liftIO . stopWatchingChangedRefs) $ \crh -> do v' <- runFullProto (Serving theiruuid crh) conn $ - serveAuthed u + P2P.serveAuthed u case v' of Right () -> return () Left e -> liftIO $ debugM "remotedaemon" ("Tor connection error: " ++ e) + +-- Connect to peer's tor hidden service. +transport :: Transport +transport (RemoteRepo r _) url@(RemoteURI uri) th ichan ochan = + case unformatP2PAddress (show uri) of + Nothing -> return () + Just addr -> robustConnection 1 $ do + g <- liftAnnex th Annex.gitRepo + bracket (connectPeer g addr) closeConnection (go addr) + where + go addr conn = do + myuuid <- liftAnnex th getUUID + authtoken <- fromMaybe nullAuthToken + <$> liftAnnex th (loadP2PRemoteAuthToken addr) + res <- runNetProto conn $ + P2P.auth myuuid authtoken + case res of + Right (Just theiruuid) + | getUncachedUUID r == theiruuid -> do + send (CONNECTED url) + status <- handlecontrol + `race` handlepeer conn + send (DISCONNECTED url) + return $ either id id status + | otherwise -> return ConnectionStopping + _ -> return ConnectionClosed + + send msg = atomically $ writeTChan ochan msg + + handlecontrol = do + msg <- atomically $ readTChan ichan + case msg of + STOP -> return ConnectionStopping + LOSTNET -> return ConnectionStopping + _ -> handlecontrol + + handlepeer conn = do + v <- runNetProto conn P2P.notifyChange + case v of + Right (Just (ChangedRefs shas)) -> do + whenM (checkNewShas th shas) $ + fetch + handlepeer conn + _ -> return ConnectionClosed + + fetch = do + send (SYNCING url) + ok <- inLocalRepo th $ + runBool [Param "fetch", Param $ Git.repoDescribe r] + send (DONESYNCING url ok) diff --git a/doc/git-annex-remotedaemon.mdwn b/doc/git-annex-remotedaemon.mdwn index d4960c4ff5..b01002dc95 100644 --- a/doc/git-annex-remotedaemon.mdwn +++ b/doc/git-annex-remotedaemon.mdwn @@ -9,8 +9,8 @@ git annex remotedaemon # DESCRIPTION The remotedaemon provides persistent communication with remotes. -This is useful to detect when remotes have received git pushes, so the -changes can be promptly fetched and the local repository updated. +It detects when git branches on remotes have changes, and fetches +the changes from them. The assistant runs the remotedaemon and communicates with it on stdio using a simple textual protocol. @@ -19,12 +19,12 @@ Several types of remotes are supported: For ssh remotes, the remotedaemon tries to maintain a connection to the remote git repository, and uses git-annex-shell notifychanges to detect -when the remote git repository has changed, and fetch the changes from -it. For this to work, the git remote must have [[git-annex-shell]](1) -installed, with notifychanges support. The first version of git-annex-shell -that supports it is 5.20140405. +when the remote git repository has changed. For this to work, the git +remote must have [[git-annex-shell]](1) installed, with notifychanges +support. The first version of git-annex-shell that supports it is +5.20140405. -For tor-annex remotes, the remotedaemon runs as a tor hidden service, +For tor-annex remotes, the remotedaemon runs a tor hidden service, accepting connections from other nodes and serving up the contents of the repository. This is only done if you first run `git annex enable-tor`. Use `git annex p2p` to configure access to tor-annex remotes. diff --git a/doc/todo/tor.mdwn b/doc/todo/tor.mdwn index 0980fbaf5f..9bbc257ad4 100644 --- a/doc/todo/tor.mdwn +++ b/doc/todo/tor.mdwn @@ -10,8 +10,7 @@ Current todo list: "Connection reset by peer" * Think about locking some more. What happens if the connection to the peer is dropped while we think we're locking content there from being dropped? -* Make remotedaemon connect to tor peers, notice when their repos have - changed, and pull, like it does for ssh peers. +* test remotedaemon's change detection Eventually: