git-annex/RemoteDaemon/Core.hs
Joey Hess 1ce8367417 remotedaemon: Fix problem that could prevent ssh connections being made after two LOSTNET messages were received in a row
Perhaps due to two different network interfaces being brought down.

Since there is no reliable way to drain a Chan, I switched to STM TChan.
2015-01-15 15:37:53 -04:00

140 lines
3.9 KiB
Haskell

{- git-remote-daemon core
-
- Copyright 2014 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module RemoteDaemon.Core (runForeground) where
import qualified Annex
import Common
import Types.GitConfig
import RemoteDaemon.Common
import RemoteDaemon.Types
import RemoteDaemon.Transport
import qualified Git
import qualified Git.Types as Git
import qualified Git.CurrentRepo
import Utility.SimpleProtocol
import Config
import Annex.Ssh
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Network.URI
import qualified Data.Map as M
runForeground :: IO ()
runForeground = do
(readh, writeh) <- ioHandles
ichan <- newTChanIO :: IO (TChan Consumed)
ochan <- newTChanIO :: IO (TChan Emitted)
let reader = forever $ do
l <- hGetLine readh
case parseMessage l of
Nothing -> error $ "protocol error: " ++ l
Just cmd -> atomically $ writeTChan ichan cmd
let writer = forever $ do
msg <- atomically $ readTChan ochan
hPutStrLn writeh $ unwords $ formatMessage msg
hFlush writeh
let controller = runController ichan ochan
-- If any thread fails, the rest will be killed.
void $ tryIO $
reader `concurrently` writer `concurrently` controller
type RemoteMap = M.Map Git.Repo (IO (), TChan Consumed)
-- Runs the transports, dispatching messages to them, and handling
-- the main control messages.
runController :: TChan Consumed -> TChan Emitted -> IO ()
runController ichan ochan = do
h <- genTransportHandle
m <- genRemoteMap h ochan
startrunning m
go h False m
where
go h paused m = do
cmd <- atomically $ readTChan ichan
case cmd of
RELOAD -> do
h' <- updateTransportHandle h
m' <- genRemoteMap h' ochan
let common = M.intersection m m'
let new = M.difference m' m
let old = M.difference m m'
broadcast STOP old
unless paused $
startrunning new
go h' paused (M.union common new)
LOSTNET -> do
-- force close all cached ssh connections
-- (done here so that if there are multiple
-- ssh remotes, it's only done once)
liftAnnex h forceSshCleanup
broadcast LOSTNET m
go h True m
PAUSE -> do
broadcast STOP m
go h True m
RESUME -> do
when paused $
startrunning m
go h False m
STOP -> exitSuccess
-- All remaining messages are sent to
-- all Transports.
msg -> do
unless paused $ atomically $
forM_ chans (`writeTChan` msg)
go h paused m
where
chans = map snd (M.elems m)
startrunning m = forM_ (M.elems m) startrunning'
startrunning' (transport, c) = do
-- drain any old control messages from the channel
-- to avoid confusing the transport with them
atomically $ drain c
void $ async transport
drain c = maybe noop (const $ drain c) =<< tryReadTChan c
broadcast msg m = atomically $ forM_ (M.elems m) send
where
send (_, c) = writeTChan c msg
-- Generates a map with a transport for each supported remote in the git repo,
-- except those that have annex.sync = false
genRemoteMap :: TransportHandle -> TChan Emitted -> IO RemoteMap
genRemoteMap h@(TransportHandle g _) ochan =
M.fromList . catMaybes <$> mapM gen (Git.remotes g)
where
gen r = case Git.location r of
Git.Url u -> case M.lookup (uriScheme u) remoteTransports of
Just transport
| remoteAnnexSync (extractRemoteGitConfig r (Git.repoDescribe r)) -> do
ichan <- newTChanIO :: IO (TChan Consumed)
return $ Just
( r
, (transport r (RemoteURI u) h ichan ochan, ichan)
)
_ -> return Nothing
_ -> return Nothing
genTransportHandle :: IO TransportHandle
genTransportHandle = do
annexstate <- newMVar =<< Annex.new =<< Git.CurrentRepo.get
g <- Annex.repo <$> readMVar annexstate
return $ TransportHandle g annexstate
updateTransportHandle :: TransportHandle -> IO TransportHandle
updateTransportHandle h@(TransportHandle _g annexstate) = do
g' <- liftAnnex h $ do
reloadConfig
Annex.fromRepo id
return (TransportHandle g' annexstate)