added git-annex remotedaemon

So far, handling connecting to git-annex-shell notifychanges, and
pulling immediately when a change is pushed to a remote.

A little bit buggy (crashes after the first pull), but it already works!

This commit was sponsored by Mark Sheppard.
This commit is contained in:
Joey Hess 2014-04-06 19:06:03 -04:00
parent 6ca01f40cf
commit 43909723b3
11 changed files with 376 additions and 116 deletions

View file

@ -89,6 +89,7 @@ import qualified Command.WebApp
#ifdef WITH_XMPP
import qualified Command.XMPPGit
#endif
import qualified Command.RemoteDaemon
#endif
import qualified Command.Test
#ifdef WITH_TESTSUITE
@ -176,6 +177,7 @@ cmds = concat
#ifdef WITH_XMPP
, Command.XMPPGit.def
#endif
, Command.RemoteDaemon.def
#endif
, Command.Test.def
#ifdef WITH_TESTSUITE

View file

@ -13,7 +13,7 @@ import Utility.DirWatcher
import Utility.DirWatcher.Types
import qualified Git
import Git.Sha
import RemoteDaemon.EndPoint.GitAnnexShell.Types
import RemoteDaemon.Transport.Ssh.Types
import Control.Concurrent
import Control.Concurrent.Async

24
Command/RemoteDaemon.hs Normal file
View file

@ -0,0 +1,24 @@
{- git-annex command
-
- Copyright 2014 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Command.RemoteDaemon where
import Common.Annex
import Command
import RemoteDaemon.Core
def :: [Command]
def = [noCommit $ command "remotedaemon" paramNothing seek SectionPlumbing
"detects when remotes have changed, and fetches from them"]
seek :: CommandSeek
seek = withNothing start
start :: CommandStart
start = do
liftIO runForeground
stop

View file

@ -27,7 +27,7 @@ data RepoLocation
| LocalUnknown FilePath
| Url URI
| Unknown
deriving (Show, Eq)
deriving (Show, Eq, Ord)
data Repo = Repo
{ location :: RepoLocation
@ -41,7 +41,7 @@ data Repo = Repo
, gitEnv :: Maybe [(String, String)]
-- global options to pass to git when running git commands
, gitGlobalOpts :: [CommandParam]
} deriving (Show, Eq)
} deriving (Show, Eq, Ord)
type RemoteName = String

114
RemoteDaemon/Core.hs Normal file
View file

@ -0,0 +1,114 @@
{- git-remote-daemon core
-
- Copyright 2014 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module RemoteDaemon.Core (runForeground) where
import qualified Annex
import Common
import Types.GitConfig
import RemoteDaemon.Types
import RemoteDaemon.Transport
import qualified Git
import qualified Git.Types as Git
import qualified Git.CurrentRepo
import Utility.SimpleProtocol
import Control.Concurrent.Async
import Control.Concurrent.Chan
import Network.URI
import qualified Data.Map as M
runForeground :: IO ()
runForeground = do
ichan <- newChan :: IO (Chan Consumed)
ochan <- newChan :: IO (Chan Emitted)
void $ async $ controller ichan ochan
let reader = forever $ do
l <- getLine
case parseMessage l of
Nothing -> error $ "protocol error: " ++ l
Just cmd -> writeChan ichan cmd
let writer = forever $ do
msg <- readChan ochan
putStrLn $ unwords $ formatMessage msg
hFlush stdout
-- If the reader or writer fails, for example because stdin/stdout
-- gets closed, kill the other one, and throw an exception which
-- will take down the daemon.
void $ concurrently reader writer
type RemoteMap = M.Map Git.Repo (IO (), Chan Consumed)
-- Runs the transports, dispatching messages to them, and handling
-- the main control messages.
controller :: Chan Consumed -> Chan Emitted -> IO ()
controller ichan ochan = do
m <- getRemoteMap ochan
startrunning m
go False m
where
go paused m = do
cmd <- readChan ichan
case cmd of
RELOAD -> do
m' <- getRemoteMap ochan
let common = M.intersection m m'
let new = M.difference m' m
let old = M.difference m m'
stoprunning old
unless paused $
startrunning new
go paused (M.union common new)
PAUSE -> do
stoprunning m
go True m
RESUME -> do
when paused $
startrunning m
go False m
STOP -> exitSuccess
-- All remaining messages are sent to
-- all Transports.
msg -> do
unless paused $
forM_ chans (`writeChan` msg)
go paused m
where
chans = map snd (M.elems m)
startrunning m = forM_ (M.elems m) startrunning'
startrunning' (transport, _) = void $ async transport
-- Ask the transport nicely to stop.
stoprunning m = forM_ (M.elems m) stoprunning'
stoprunning' (_, c) = writeChan c STOP
getRemoteMap :: Chan Emitted -> IO RemoteMap
getRemoteMap ochan = do
annexstate <- Annex.new =<< Git.CurrentRepo.get
genRemoteMap annexstate ochan
-- Generates a map with a transport for each supported remote in the git repo,
-- except those that have annex.sync = false
genRemoteMap :: Annex.AnnexState -> Chan Emitted -> IO RemoteMap
genRemoteMap annexstate ochan = M.fromList . catMaybes <$> mapM gen rs
where
rs = Git.remotes (Annex.repo annexstate)
gen r = case Git.location r of
Git.Url u -> case M.lookup (uriScheme u) remoteTransports of
Just transport
| remoteAnnexSync (extractRemoteGitConfig r (Git.repoDescribe r)) -> do
ichan <- newChan :: IO (Chan Consumed)
return $ Just
( r
, (transport r (Git.repoDescribe r) annexstate ichan ochan, ichan)
)
_ -> return Nothing
_ -> return Nothing

21
RemoteDaemon/Transport.hs Normal file
View file

@ -0,0 +1,21 @@
{- git-remote-daemon transports
-
- Copyright 2014 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module RemoteDaemon.Transport where
import RemoteDaemon.Types
import qualified RemoteDaemon.Transport.Ssh
import qualified Data.Map as M
-- Corresponds to uriScheme
type TransportScheme = String
remoteTransports :: M.Map TransportScheme Transport
remoteTransports = M.fromList
[ ("ssh:", RemoteDaemon.Transport.Ssh.transport)
]

View file

@ -0,0 +1,75 @@
{- git-remote-daemon, git-annex-shell over ssh transport
-
- Copyright 2014 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module RemoteDaemon.Transport.Ssh (transport) where
import Common.Annex
import qualified Annex
import RemoteDaemon.Types
import qualified RemoteDaemon.Transport.Ssh.Types as SshRemote
import Remote.Helper.Ssh
import Utility.SimpleProtocol
import qualified Git
import Annex.CatFile
import Git.Command
import Control.Concurrent.Chan
import Control.Concurrent.Async
import System.Process (std_in, std_out)
transport :: Transport
transport r remotename annexstate ichan ochan = Annex.eval annexstate $ do
v <- git_annex_shell r "notifychanges" [] []
case v of
Nothing -> noop
Just (cmd, params) -> liftIO $ go cmd (toCommand params)
where
send msg = writeChan ochan (msg remotename)
go cmd params = do
(Just toh, Just fromh, _, pid) <- createProcess (proc cmd params)
{ std_in = CreatePipe
, std_out = CreatePipe
}
let shutdown = do
hClose toh
hClose fromh
void $ waitForProcess pid
send DISCONNECTED
let fromshell = forever $ do
l <- hGetLine fromh
case parseMessage l of
Just SshRemote.READY -> send CONNECTED
Just (SshRemote.CHANGED refs) ->
Annex.eval annexstate $
fetchNew remotename refs
Nothing -> shutdown
-- The only control message that matters is STOP.
--
-- Note that a CHANGED control message is not handled;
-- we don't push to the ssh remote. The assistant
-- and git-annex sync both handle pushes, so there's no
-- need to do it here.
let handlecontrol = forever $ do
msg <- readChan ichan
case msg of
STOP -> ioError (userError "done")
_ -> noop
-- Run both threads until one finishes.
void $ tryIO $ concurrently fromshell handlecontrol
shutdown
-- Check if any of the shas are actally new, to avoid unnecessary fetching.
fetchNew :: RemoteName -> [Git.Sha] -> Annex ()
fetchNew remotename = check
where
check [] = void $ inRepo $ runBool [Param "fetch", Param remotename]
check (r:rs) = maybe (check rs) (const noop)
=<< catObjectDetails r

View file

@ -1,4 +1,4 @@
{- git-remote-daemon, git-annex-shell endpoint, datatypes
{- git-remote-daemon, git-annex-shell notifychanges protocol types
-
- Copyright 2014 Joey Hess <joey@kitenet.net>
-
@ -8,7 +8,7 @@
{-# LANGUAGE TypeSynonymInstances, FlexibleInstances #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module RemoteDaemon.EndPoint.GitAnnexShell.Types (
module RemoteDaemon.Transport.Ssh.Types (
Notification(..),
Proto.serialize,
Proto.deserialize,
@ -16,11 +16,11 @@ module RemoteDaemon.EndPoint.GitAnnexShell.Types (
) where
import qualified Utility.SimpleProtocol as Proto
import RemoteDaemon.Types (ShaList)
import RemoteDaemon.Types (RefList)
data Notification
= READY
| CHANGED ShaList
| CHANGED RefList
instance Proto.Sendable Notification where
formatMessage READY = ["READY"]

View file

@ -10,74 +10,78 @@
module RemoteDaemon.Types where
import qualified Annex
import qualified Git.Types as Git
import qualified Utility.SimpleProtocol as Proto
import Control.Concurrent
-- A Transport for a particular git remote consumes some messages
-- from a Chan, and emits others to another Chan.
type Transport = Git.Repo -> RemoteName -> Annex.AnnexState -> Chan Consumed -> Chan Emitted -> IO ()
-- Messages that the daemon emits.
data Emitted
= CONNECTED RemoteName
| DISCONNECTED RemoteName
| CHANGED RemoteName ShaList
| STATUS RemoteName UserMessage
| ERROR RemoteName UserMessage
| SYNCING RemoteName
| DONESYNCING RemoteName Bool
-- Messages that the deamon consumes.
data Consumed
= PAUSE
| RESUME
| PUSH RemoteName
| CHANGED RefList
| RELOAD
| STOP
type RemoteName = String
type UserMessage = String
type ShaList = [Git.Sha]
type RefList = [Git.Ref]
instance Proto.Sendable Emitted where
formatMessage (CONNECTED remote) =
["CONNECTED", Proto.serialize remote]
formatMessage (DISCONNECTED remote) =
["DISCONNECTED", Proto.serialize remote]
formatMessage (CHANGED remote shas) =
["CHANGED"
, Proto.serialize remote
, Proto.serialize shas
]
formatMessage (STATUS remote msg) =
["STATUS"
, Proto.serialize remote
, Proto.serialize msg
]
formatMessage (ERROR remote msg) =
["ERROR"
, Proto.serialize remote
, Proto.serialize msg
]
formatMessage (SYNCING remote) =
["SYNCING", Proto.serialize remote]
formatMessage (DONESYNCING remote status) =
["DONESYNCING", Proto.serialize remote, Proto.serialize status]
instance Proto.Sendable Consumed where
formatMessage PAUSE = ["PAUSE"]
formatMessage RESUME = ["RESUME"]
formatMessage (PUSH remote) = ["PUSH", Proto.serialize remote]
formatMessage (CHANGED refs) =["CHANGED", Proto.serialize refs]
formatMessage RELOAD = ["RELOAD"]
formatMessage STOP = ["STOP"]
instance Proto.Receivable Emitted where
parseCommand "CONNECTED" = Proto.parse1 CONNECTED
parseCommand "DISCONNECTED" = Proto.parse1 DISCONNECTED
parseCommand "CHANGED" = Proto.parse2 CHANGED
parseCommand "STATUS" = Proto.parse2 STATUS
parseCommand "ERROR" = Proto.parse2 ERROR
parseCommand "SYNCING" = Proto.parse1 SYNCING
parseCommand "DONESYNCING" = Proto.parse2 DONESYNCING
parseCommand _ = Proto.parseFail
instance Proto.Receivable Consumed where
parseCommand "PAUSE" = Proto.parse0 PAUSE
parseCommand "RESUME" = Proto.parse0 RESUME
parseCommand "PUSH" = Proto.parse1 PUSH
parseCommand "CHANGED" = Proto.parse1 CHANGED
parseCommand "RELOAD" = Proto.parse0 RELOAD
parseCommand "STOP" = Proto.parse0 STOP
parseCommand _ = Proto.parseFail
instance Proto.Serializable [Char] where
serialize = id
deserialize = Just
instance Proto.Serializable ShaList where
instance Proto.Serializable RefList where
serialize = unwords . map Git.fromRef
deserialize = Just . map Git.Ref . words
instance Proto.Serializable Bool where
serialize False = "0"
serialize True = "1"
deserialize "0" = Just False
deserialize "1" = Just True
deserialize _ = Nothing

View file

@ -37,15 +37,89 @@
# design
Let git-remote-daemon be the name. It runs in a repo and
either:
Let git-remote-daemon be the name. Or for git-annex,
`git annex remotedaemon`.
* forks to background and performs configured actions (ie, `git pull`)
* with --foreground, communicates over stdio
with its caller using a simple protocol (exiting when its caller closes its
stdin handle so it will stop when the assistant stops).
It runs in one of two ways:
It is configured entirely by .git/config.
1. Forked to background, using a named pipe for the control protocol.
2. With --foreground, the control protocol goes over stdio.
Either way, behavior is the same:
* Get a list of remotes to act on by looking at .git/config
* Automatically notices when a remote has changes to branches
matching remote.$name.fetch, and pulls them down to the appropriate
location.
* When the control protocol informs it about a new ref that's available,
it offers the ref to any interested remotes.
# control protocol
This is an asynchronous protocol. Ie, either side can send any message
at any time, and the other side does not send a reply.
It is line based and intended to be low volume and not used for large data.
TODO: Expand with commands for sending/receiving git-annex objects, and
progress during transfer.
TODO: Will probably need to add something for whatever pairing is done by
the webapp.
## emitted messages
* `CONNECTED $remote`
Sent when a connection has been made with a remote.
* `DISCONNECTED $remote`
Sent when connection with a remote has been lost.
* `SYNCING $remote`
Indicates that a pull or a push with a remote is in progress.
Always followed by DONESYNCING.
* `DONESYNCING $remote 1|0`
Indicates that syncing with a remote is done, and either succeeded
(1) or failed (0).
## consumed messages
* `PAUSE`
This indicates that the network connection has gone down,
or the user has requested a pause.
git-remote-daemon should close connections and idle.
Affects all remotes.
* `RESUME`
This indicates that the network connection has come back up, or the user
has asked it to run again. Start back up network connections.
Affects all remotes.
* `CHANGED ref ...`
Indicates that a ref is new or has changed. These can be offered to peers,
and peers that are interested in them can pull the content.
* `RELOAD`
Indicates that configs have changed. Daemon should reload .git/config
and/or restart.
* `STOP`
Shut down git-remote-daemon
(When using stdio, it also should shutdown when it reaches EOF on
stdin.)
# encryption & authentication
@ -65,76 +139,6 @@ For example, in telehash, each node has its own keypair, which is used
or authentication and encryption, and is all that's needed to route
messages to that node.
# stdio protocol
This is an asynchronous protocol. Ie, either side can send any message
at any time, and the other side does not send a reply.
It is line based and intended to be low volume and not used for large data.
TODO: Expand with commands for sending/receiving git-annex objects, and
progress during transfer.
TODO: Will probably need to add something for whatever pairing is done by
the webapp.
## emitted messages
* `CONNECTED $remote`
Send when a connection has been made with a remote.
* `DISCONNECTED $remote`
Send when connection with a remote has been lost.
* `CHANGED $remote $sha ...`
This indicates that refs in the named git remote have changed,
and indicates the new shas.
* `STATUS $remote $string`
A user-visible status message about a named remote.
* `ERROR $remote $string`
A user-visible error about a named remote.
(Can continue running past this point, for this or other remotes.)
## consumed messages
* `PAUSE`
This indicates that the network connection has gone down,
or the user has requested a pause.
git-remote-daemon should close connections and idle.
Affects all remotes.
* `RESUME`
This indicates that the network connection has come back up, or the user
has asked it to run again. Start back up network connections.
Affects all remotes.
* `PUSH $remote`
Requests that a git push be done with the remote over the network
transport when next possible. May be repeated many times before the push
finally happens.
* `RELOAD`
Indicates that configs have changed. Daemon should reload .git/config
and/or restart.
# send-pack and receive-pack
Done as the assistant does with XMPP currently. Does not involve
communication over the above stdio protocol.
# network level protocol
How do peers communicate with one another over the network?
@ -143,17 +147,29 @@ This seems to need to be network-layer dependant. Telehash will need
one design, and git-annex-shell on a central ssh server has a very different
(and much simpler) design.
## git-annex-shell
## ssh
Speak a subset of the stdio protocol between git-annex-shell and
git-remote-daemon, over ssh.
`git-annex-shell notifychanges` is run, and speaks a simple protocol
over stdio to inform when refs on the remote have changed.
Only thing that seems to be needed is CHANGED, actually!
No pushing is done for CHANGED, since git handles ssh natively.
TODO:
* It already detects changes and pulls, but it then dies with a protocol
error.
* Remote system might not be available. Find a smart way to detect it,
ideally w/o generating network traffic. One way might be to check
if the ssh connection caching control socket exists, for example.
* Remote system might be available, and connection get lost. Should
reconnect, but needs to avoid bad behavior (ie, constant reconnect
attempts.)
* Detect if old system had a too old git-annex-shell and avoid bad behavior
## telehash
TODO
## XMPP
## xmpp
Reuse [[assistant/xmpp]]

View file

@ -922,6 +922,10 @@ subdirectories).
There are several parameters, provided by Haskell's tasty test framework.
* `remotedaemon`
Detects when remotes have changed and fetches from them.
* `xmppgit`
This command is used internally to perform git pulls over XMPP.