remotedaemon: git change detection over tor hidden service
This commit is contained in:
parent
f7687e0876
commit
2c907fff51
8 changed files with 116 additions and 46 deletions
|
@ -37,15 +37,18 @@ class FormatP2PAddress a where
|
|||
|
||||
instance FormatP2PAddress P2PAddress where
|
||||
formatP2PAddress (TorAnnex (OnionAddress onionaddr) onionport) =
|
||||
"tor-annex::" ++ onionaddr ++ ":" ++ show onionport
|
||||
torAnnexScheme ++ ":" ++ onionaddr ++ ":" ++ show onionport
|
||||
unformatP2PAddress s
|
||||
| "tor-annex::" `isPrefixOf` s = do
|
||||
| (torAnnexScheme ++ ":") `isPrefixOf` s = do
|
||||
let s' = dropWhile (== ':') $ dropWhile (/= ':') s
|
||||
let (onionaddr, ps) = separate (== ':') s'
|
||||
onionport <- readish ps
|
||||
return (TorAnnex (OnionAddress onionaddr) onionport)
|
||||
| otherwise = Nothing
|
||||
|
||||
torAnnexScheme :: String
|
||||
torAnnexScheme = "tor-annex:"
|
||||
|
||||
instance FormatP2PAddress P2PAddressAuth where
|
||||
formatP2PAddress (P2PAddressAuth addr authtoken) =
|
||||
formatP2PAddress addr ++ ":" ++ T.unpack (fromAuthToken authtoken)
|
||||
|
|
|
@ -441,6 +441,16 @@ sendSuccess :: Bool -> Proto ()
|
|||
sendSuccess True = net $ sendMessage SUCCESS
|
||||
sendSuccess False = net $ sendMessage FAILURE
|
||||
|
||||
notifyChange :: Proto (Maybe ChangedRefs)
|
||||
notifyChange = do
|
||||
net $ sendMessage NOTIFYCHANGE
|
||||
ack <- net receiveMessage
|
||||
case ack of
|
||||
CHANGED rs -> return (Just rs)
|
||||
_ -> do
|
||||
net $ sendMessage (ERROR "expected CHANGED")
|
||||
return Nothing
|
||||
|
||||
connect :: Service -> Handle -> Handle -> Proto ExitCode
|
||||
connect service hin hout = do
|
||||
net $ sendMessage (CONNECT service)
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{- git-remote-daemon utilities
|
||||
-
|
||||
- Copyright 2014 Joey Hess <id@joeyh.name>
|
||||
- Copyright 2014-2016 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
@ -9,6 +9,8 @@ module RemoteDaemon.Common
|
|||
( liftAnnex
|
||||
, inLocalRepo
|
||||
, checkNewShas
|
||||
, ConnectionStatus(..)
|
||||
, robustConnection
|
||||
) where
|
||||
|
||||
import qualified Annex
|
||||
|
@ -16,6 +18,7 @@ import Annex.Common
|
|||
import RemoteDaemon.Types
|
||||
import qualified Git
|
||||
import Annex.CatFile
|
||||
import Utility.ThreadScheduler
|
||||
|
||||
import Control.Concurrent
|
||||
|
||||
|
@ -40,3 +43,22 @@ checkNewShas transporthandle = check
|
|||
check [] = return True
|
||||
check (r:rs) = maybe (check rs) (const $ return False)
|
||||
=<< liftAnnex transporthandle (catObjectDetails r)
|
||||
|
||||
data ConnectionStatus = ConnectionStopping | ConnectionClosed
|
||||
|
||||
{- Make connection robust, retrying on error, with exponential backoff. -}
|
||||
robustConnection :: Int -> IO ConnectionStatus -> IO ()
|
||||
robustConnection backoff a =
|
||||
caught =<< a `catchNonAsync` (const $ return ConnectionClosed)
|
||||
where
|
||||
caught ConnectionStopping = return ()
|
||||
caught ConnectionClosed = do
|
||||
threadDelaySeconds (Seconds backoff)
|
||||
robustConnection increasedbackoff a
|
||||
|
||||
increasedbackoff
|
||||
| b2 > maxbackoff = maxbackoff
|
||||
| otherwise = b2
|
||||
where
|
||||
b2 = backoff * 2
|
||||
maxbackoff = 3600 -- one hour
|
||||
|
|
|
@ -12,6 +12,7 @@ import qualified RemoteDaemon.Transport.Ssh
|
|||
import qualified RemoteDaemon.Transport.GCrypt
|
||||
import qualified RemoteDaemon.Transport.Tor
|
||||
import qualified Git.GCrypt
|
||||
import P2P.Address (torAnnexScheme)
|
||||
|
||||
import qualified Data.Map as M
|
||||
|
||||
|
@ -22,6 +23,7 @@ remoteTransports :: M.Map TransportScheme Transport
|
|||
remoteTransports = M.fromList
|
||||
[ ("ssh:", RemoteDaemon.Transport.Ssh.transport)
|
||||
, (Git.GCrypt.urlScheme, RemoteDaemon.Transport.GCrypt.transport)
|
||||
, (torAnnexScheme, RemoteDaemon.Transport.Tor.transport)
|
||||
]
|
||||
|
||||
remoteServers :: [TransportHandle -> IO ()]
|
||||
|
|
|
@ -16,7 +16,6 @@ import qualified RemoteDaemon.Transport.Ssh.Types as SshRemote
|
|||
import Utility.SimpleProtocol
|
||||
import qualified Git
|
||||
import Git.Command
|
||||
import Utility.ThreadScheduler
|
||||
import Annex.ChangedRefs
|
||||
|
||||
import Control.Concurrent.STM
|
||||
|
@ -38,7 +37,7 @@ transportUsingCmd cmd params rr@(RemoteRepo r gc) url h@(TransportHandle (LocalR
|
|||
|
||||
transportUsingCmd' :: FilePath -> [CommandParam] -> Transport
|
||||
transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan =
|
||||
robustly 1 $ do
|
||||
robustConnection 1 $ do
|
||||
(Just toh, Just fromh, Just errh, pid) <-
|
||||
createProcess (proc cmd (toCommand params))
|
||||
{ std_in = CreatePipe
|
||||
|
@ -79,13 +78,13 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan =
|
|||
fetch
|
||||
handlestdout fromh
|
||||
-- avoid reconnect on protocol error
|
||||
Nothing -> return Stopping
|
||||
Nothing -> return ConnectionStopping
|
||||
|
||||
handlecontrol = do
|
||||
msg <- atomically $ readTChan ichan
|
||||
case msg of
|
||||
STOP -> return Stopping
|
||||
LOSTNET -> return Stopping
|
||||
STOP -> return ConnectionStopping
|
||||
LOSTNET -> return ConnectionStopping
|
||||
_ -> handlecontrol
|
||||
|
||||
-- Old versions of git-annex-shell that do not support
|
||||
|
@ -103,23 +102,5 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan =
|
|||
, "needs its git-annex upgraded"
|
||||
, "to 5.20140405 or newer"
|
||||
]
|
||||
return Stopping
|
||||
return ConnectionStopping
|
||||
else handlestderr errh
|
||||
|
||||
data Status = Stopping | ConnectionClosed
|
||||
|
||||
{- Make connection robustly, with exponential backoff on failure. -}
|
||||
robustly :: Int -> IO Status -> IO ()
|
||||
robustly backoff a = caught =<< catchDefaultIO ConnectionClosed a
|
||||
where
|
||||
caught Stopping = return ()
|
||||
caught ConnectionClosed = do
|
||||
threadDelaySeconds (Seconds backoff)
|
||||
robustly increasedbackoff a
|
||||
|
||||
increasedbackoff
|
||||
| b2 > maxbackoff = maxbackoff
|
||||
| otherwise = b2
|
||||
where
|
||||
b2 = backoff * 2
|
||||
maxbackoff = 3600 -- one hour
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
{- git-remote-daemon, tor hidden service transport
|
||||
{- git-remote-daemon, tor hidden service server and transport
|
||||
-
|
||||
- Copyright 2016 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
module RemoteDaemon.Transport.Tor (server) where
|
||||
module RemoteDaemon.Transport.Tor (server, transport) where
|
||||
|
||||
import Common
|
||||
import qualified Annex
|
||||
|
@ -16,20 +16,23 @@ import RemoteDaemon.Common
|
|||
import Utility.Tor
|
||||
import Utility.FileMode
|
||||
import Utility.AuthToken
|
||||
import P2P.Protocol
|
||||
import P2P.Protocol as P2P
|
||||
import P2P.IO
|
||||
import P2P.Annex
|
||||
import P2P.Auth
|
||||
import P2P.Address
|
||||
import Annex.UUID
|
||||
import Types.UUID
|
||||
import Messages
|
||||
import Git
|
||||
import Git.Command
|
||||
|
||||
import System.PosixCompat.User
|
||||
import Network.Socket
|
||||
import Control.Concurrent
|
||||
import System.Log.Logger (debugM)
|
||||
import Control.Concurrent.STM
|
||||
import Control.Concurrent.Async
|
||||
import qualified Network.Socket as S
|
||||
|
||||
-- Run tor hidden service.
|
||||
server :: TransportHandle -> IO ()
|
||||
|
@ -44,17 +47,17 @@ server th@(TransportHandle (LocalRepo r) _) = do
|
|||
let ident = fromUUID u
|
||||
let sock = hiddenServiceSocketFile uid ident
|
||||
nukeFile sock
|
||||
soc <- socket AF_UNIX Stream defaultProtocol
|
||||
bind soc (SockAddrUnix sock)
|
||||
soc <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
|
||||
S.bind soc (S.SockAddrUnix sock)
|
||||
-- Allow everyone to read and write to the socket; tor is probably
|
||||
-- running as a different user. Connections have to authenticate
|
||||
-- to do anything, so it's fine that other local users can connect.
|
||||
modifyFileMode sock $ addModes
|
||||
[groupReadMode, groupWriteMode, otherReadMode, otherWriteMode]
|
||||
listen soc 2
|
||||
S.listen soc 2
|
||||
debugM "remotedaemon" "Tor hidden service running"
|
||||
forever $ do
|
||||
(conn, _) <- accept soc
|
||||
(conn, _) <- S.accept soc
|
||||
h <- setupHandle conn
|
||||
ok <- atomically $ ifM (isFullTBQueue q)
|
||||
( return False
|
||||
|
@ -97,7 +100,7 @@ serveClient th u r q = bracket setup cleanup start
|
|||
, connIhdl = h
|
||||
, connOhdl = h
|
||||
}
|
||||
v <- liftIO $ runNetProto conn $ serveAuth u
|
||||
v <- liftIO $ runNetProto conn $ P2P.serveAuth u
|
||||
case v of
|
||||
Right (Just theiruuid) -> authed conn theiruuid
|
||||
Right Nothing -> liftIO $
|
||||
|
@ -110,7 +113,57 @@ serveClient th u r q = bracket setup cleanup start
|
|||
authed conn theiruuid =
|
||||
bracket watchChangedRefs (liftIO . stopWatchingChangedRefs) $ \crh -> do
|
||||
v' <- runFullProto (Serving theiruuid crh) conn $
|
||||
serveAuthed u
|
||||
P2P.serveAuthed u
|
||||
case v' of
|
||||
Right () -> return ()
|
||||
Left e -> liftIO $ debugM "remotedaemon" ("Tor connection error: " ++ e)
|
||||
|
||||
-- Connect to peer's tor hidden service.
|
||||
transport :: Transport
|
||||
transport (RemoteRepo r _) url@(RemoteURI uri) th ichan ochan =
|
||||
case unformatP2PAddress (show uri) of
|
||||
Nothing -> return ()
|
||||
Just addr -> robustConnection 1 $ do
|
||||
g <- liftAnnex th Annex.gitRepo
|
||||
bracket (connectPeer g addr) closeConnection (go addr)
|
||||
where
|
||||
go addr conn = do
|
||||
myuuid <- liftAnnex th getUUID
|
||||
authtoken <- fromMaybe nullAuthToken
|
||||
<$> liftAnnex th (loadP2PRemoteAuthToken addr)
|
||||
res <- runNetProto conn $
|
||||
P2P.auth myuuid authtoken
|
||||
case res of
|
||||
Right (Just theiruuid)
|
||||
| getUncachedUUID r == theiruuid -> do
|
||||
send (CONNECTED url)
|
||||
status <- handlecontrol
|
||||
`race` handlepeer conn
|
||||
send (DISCONNECTED url)
|
||||
return $ either id id status
|
||||
| otherwise -> return ConnectionStopping
|
||||
_ -> return ConnectionClosed
|
||||
|
||||
send msg = atomically $ writeTChan ochan msg
|
||||
|
||||
handlecontrol = do
|
||||
msg <- atomically $ readTChan ichan
|
||||
case msg of
|
||||
STOP -> return ConnectionStopping
|
||||
LOSTNET -> return ConnectionStopping
|
||||
_ -> handlecontrol
|
||||
|
||||
handlepeer conn = do
|
||||
v <- runNetProto conn P2P.notifyChange
|
||||
case v of
|
||||
Right (Just (ChangedRefs shas)) -> do
|
||||
whenM (checkNewShas th shas) $
|
||||
fetch
|
||||
handlepeer conn
|
||||
_ -> return ConnectionClosed
|
||||
|
||||
fetch = do
|
||||
send (SYNCING url)
|
||||
ok <- inLocalRepo th $
|
||||
runBool [Param "fetch", Param $ Git.repoDescribe r]
|
||||
send (DONESYNCING url ok)
|
||||
|
|
|
@ -9,8 +9,8 @@ git annex remotedaemon
|
|||
# DESCRIPTION
|
||||
|
||||
The remotedaemon provides persistent communication with remotes.
|
||||
This is useful to detect when remotes have received git pushes, so the
|
||||
changes can be promptly fetched and the local repository updated.
|
||||
It detects when git branches on remotes have changes, and fetches
|
||||
the changes from them.
|
||||
|
||||
The assistant runs the remotedaemon and communicates with it on
|
||||
stdio using a simple textual protocol.
|
||||
|
@ -19,12 +19,12 @@ Several types of remotes are supported:
|
|||
|
||||
For ssh remotes, the remotedaemon tries to maintain a connection to the
|
||||
remote git repository, and uses git-annex-shell notifychanges to detect
|
||||
when the remote git repository has changed, and fetch the changes from
|
||||
it. For this to work, the git remote must have [[git-annex-shell]](1)
|
||||
installed, with notifychanges support. The first version of git-annex-shell
|
||||
that supports it is 5.20140405.
|
||||
when the remote git repository has changed. For this to work, the git
|
||||
remote must have [[git-annex-shell]](1) installed, with notifychanges
|
||||
support. The first version of git-annex-shell that supports it is
|
||||
5.20140405.
|
||||
|
||||
For tor-annex remotes, the remotedaemon runs as a tor hidden service,
|
||||
For tor-annex remotes, the remotedaemon runs a tor hidden service,
|
||||
accepting connections from other nodes and serving up the contents of the
|
||||
repository. This is only done if you first run `git annex enable-tor`.
|
||||
Use `git annex p2p` to configure access to tor-annex remotes.
|
||||
|
|
|
@ -10,8 +10,7 @@ Current todo list:
|
|||
"Connection reset by peer"
|
||||
* Think about locking some more. What happens if the connection to the peer
|
||||
is dropped while we think we're locking content there from being dropped?
|
||||
* Make remotedaemon connect to tor peers, notice when their repos have
|
||||
changed, and pull, like it does for ssh peers.
|
||||
* test remotedaemon's change detection
|
||||
|
||||
Eventually:
|
||||
|
||||
|
|
Loading…
Reference in a new issue