142 lines
		
	
	
	
		
			3.9 KiB
			
		
	
	
	
		
			Haskell
		
	
	
	
	
	
			
		
		
	
	
			142 lines
		
	
	
	
		
			3.9 KiB
			
		
	
	
	
		
			Haskell
		
	
	
	
	
	
{- git-remote-daemon core
 | 
						|
 -
 | 
						|
 - Copyright 2014 Joey Hess <id@joeyh.name>
 | 
						|
 -
 | 
						|
 - Licensed under the GNU GPL version 3 or higher.
 | 
						|
 -}
 | 
						|
 | 
						|
module RemoteDaemon.Core (runForeground) where
 | 
						|
 | 
						|
import qualified Annex
 | 
						|
import Common
 | 
						|
import Types.GitConfig
 | 
						|
import RemoteDaemon.Common
 | 
						|
import RemoteDaemon.Types
 | 
						|
import RemoteDaemon.Transport
 | 
						|
import qualified Git
 | 
						|
import qualified Git.Types as Git
 | 
						|
import qualified Git.CurrentRepo
 | 
						|
import Utility.SimpleProtocol
 | 
						|
import Config
 | 
						|
import Annex.Ssh
 | 
						|
 | 
						|
import Control.Concurrent
 | 
						|
import Control.Concurrent.Async
 | 
						|
import Control.Concurrent.STM
 | 
						|
import Network.URI
 | 
						|
import qualified Data.Map as M
 | 
						|
 | 
						|
runForeground :: IO ()
 | 
						|
runForeground = do
 | 
						|
	(readh, writeh) <- dupIoHandles
 | 
						|
	ichan <- newTChanIO :: IO (TChan Consumed)
 | 
						|
	ochan <- newTChanIO :: IO (TChan Emitted)
 | 
						|
 | 
						|
	let reader = forever $ do
 | 
						|
		l <- hGetLine readh
 | 
						|
		case parseMessage l of
 | 
						|
			Nothing -> error $ "protocol error: " ++ l
 | 
						|
			Just cmd -> atomically $ writeTChan ichan cmd
 | 
						|
	let writer = forever $ do
 | 
						|
		msg <- atomically $ readTChan ochan
 | 
						|
		hPutStrLn writeh $ unwords $ formatMessage msg
 | 
						|
		hFlush writeh
 | 
						|
	let controller = runController ichan ochan
 | 
						|
	
 | 
						|
	-- If any thread fails, the rest will be killed.
 | 
						|
	void $ tryIO $
 | 
						|
		reader `concurrently` writer `concurrently` controller
 | 
						|
 | 
						|
type RemoteMap = M.Map Git.Repo (IO (), TChan Consumed)
 | 
						|
 | 
						|
-- Runs the transports, dispatching messages to them, and handling
 | 
						|
-- the main control messages.
 | 
						|
runController :: TChan Consumed -> TChan Emitted -> IO ()
 | 
						|
runController ichan ochan = do
 | 
						|
	h <- genTransportHandle
 | 
						|
	m <- genRemoteMap h ochan
 | 
						|
	startrunning m
 | 
						|
	go h False m
 | 
						|
  where
 | 
						|
	go h paused m = do
 | 
						|
		cmd <- atomically $ readTChan ichan
 | 
						|
		case cmd of
 | 
						|
			RELOAD -> do
 | 
						|
				h' <- updateTransportHandle h
 | 
						|
				m' <- genRemoteMap h' ochan
 | 
						|
				let common = M.intersection m m'
 | 
						|
				let new = M.difference m' m
 | 
						|
				let old = M.difference m m'
 | 
						|
				broadcast STOP old
 | 
						|
				unless paused $
 | 
						|
					startrunning new
 | 
						|
				go h' paused (M.union common new)
 | 
						|
			LOSTNET -> do
 | 
						|
				-- force close all cached ssh connections
 | 
						|
				-- (done here so that if there are multiple
 | 
						|
				-- ssh remotes, it's only done once)
 | 
						|
				liftAnnex h forceSshCleanup
 | 
						|
				broadcast LOSTNET m
 | 
						|
				go h True m
 | 
						|
			PAUSE -> do
 | 
						|
				broadcast STOP m
 | 
						|
				go h True m
 | 
						|
			RESUME -> do
 | 
						|
				when paused $
 | 
						|
					startrunning m
 | 
						|
				go h False m
 | 
						|
			STOP -> exitSuccess
 | 
						|
			-- All remaining messages are sent to
 | 
						|
			-- all Transports.
 | 
						|
			msg -> do
 | 
						|
				unless paused $ atomically $
 | 
						|
					forM_ chans (`writeTChan` msg)
 | 
						|
				go h paused m
 | 
						|
	  where
 | 
						|
		chans = map snd (M.elems m)
 | 
						|
 | 
						|
	startrunning m = forM_ (M.elems m) startrunning'
 | 
						|
	startrunning' (transport, c) = do
 | 
						|
		-- drain any old control messages from the channel
 | 
						|
		-- to avoid confusing the transport with them
 | 
						|
		atomically $ drain c
 | 
						|
		void $ async transport
 | 
						|
	
 | 
						|
	drain c = maybe noop (const $ drain c) =<< tryReadTChan c
 | 
						|
	
 | 
						|
	broadcast msg m = atomically $ forM_ (M.elems m) send
 | 
						|
	  where
 | 
						|
		send (_, c) = writeTChan c msg
 | 
						|
 | 
						|
-- Generates a map with a transport for each supported remote in the git repo,
 | 
						|
-- except those that have annex.sync = false
 | 
						|
genRemoteMap :: TransportHandle -> TChan Emitted -> IO RemoteMap
 | 
						|
genRemoteMap h@(TransportHandle g _) ochan = 
 | 
						|
	M.fromList . catMaybes <$> mapM gen (Git.remotes g)
 | 
						|
  where
 | 
						|
	gen r = case Git.location r of
 | 
						|
		Git.Url u -> case M.lookup (uriScheme u) remoteTransports of
 | 
						|
			Just transport
 | 
						|
				| remoteAnnexSync gc -> do
 | 
						|
					ichan <- newTChanIO :: IO (TChan Consumed)
 | 
						|
					return $ Just
 | 
						|
						( r
 | 
						|
						, (transport (RemoteRepo r gc) (RemoteURI u) h ichan ochan, ichan)
 | 
						|
						)
 | 
						|
			_ -> return Nothing
 | 
						|
		_ -> return Nothing
 | 
						|
	  where
 | 
						|
		gc = extractRemoteGitConfig r (Git.repoDescribe r)
 | 
						|
 | 
						|
genTransportHandle :: IO TransportHandle
 | 
						|
genTransportHandle = do
 | 
						|
	annexstate <- newMVar =<< Annex.new =<< Git.CurrentRepo.get
 | 
						|
	g <- Annex.repo <$> readMVar annexstate
 | 
						|
	return $ TransportHandle g annexstate
 | 
						|
 | 
						|
updateTransportHandle :: TransportHandle -> IO TransportHandle
 | 
						|
updateTransportHandle h@(TransportHandle _g annexstate) = do
 | 
						|
	g' <- liftAnnex h $ do
 | 
						|
		reloadConfig
 | 
						|
		Annex.fromRepo id
 | 
						|
	return (TransportHandle g' annexstate)
 |