git-annex/RemoteDaemon/Core.hs

179 lines
5 KiB
Haskell
Raw Normal View History

{- git-remote-daemon core
-
- Copyright 2014-2016 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
module RemoteDaemon.Core (runInteractive, runNonInteractive) where
import qualified Annex
import Common
import Types.GitConfig
import Config.DynamicConfig
import RemoteDaemon.Common
import RemoteDaemon.Types
import RemoteDaemon.Transport
import qualified Git
import qualified Git.Types as Git
import qualified Git.CurrentRepo
import qualified Git.Construct
import Utility.SimpleProtocol
import Utility.ThreadScheduler
import Config
import Annex.Ssh
import Annex.BranchState
import Types.Messages
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Network.URI
import qualified Data.Map as M
runInteractive :: IO ()
runInteractive = do
2015-04-03 19:33:28 +00:00
(readh, writeh) <- dupIoHandles
ichan <- newTChanIO :: IO (TChan Consumed)
ochan <- newTChanIO :: IO (TChan Emitted)
let reader = forever $ do
l <- hGetLine readh
case parseMessage l of
Nothing -> giveup $ "protocol error: " ++ l
Just cmd -> atomically $ writeTChan ichan cmd
let writer = forever $ do
msg <- atomically $ readTChan ochan
hPutStrLn writeh $ unwords $ formatMessage msg
hFlush writeh
2014-04-08 17:51:49 +00:00
let controller = runController ichan ochan
2014-04-08 17:51:49 +00:00
-- If any thread fails, the rest will be killed.
2016-11-20 19:45:01 +00:00
void $ tryIO $ reader
`concurrently` writer
`concurrently` controller
runNonInteractive :: IO ()
runNonInteractive = do
ichan <- newTChanIO :: IO (TChan Consumed)
ochan <- newTChanIO :: IO (TChan Emitted)
let reader = forever $ do
threadDelaySeconds (Seconds (60*60))
atomically $ writeTChan ichan RELOAD
let writer = forever $
void $ atomically $ readTChan ochan
let controller = runController ichan ochan
2016-11-20 19:45:01 +00:00
void $ tryIO $ reader
`concurrently` writer
`concurrently` controller
type RemoteMap = M.Map Git.Repo (IO (), TChan Consumed)
-- Runs the transports, dispatching messages to them, and handling
-- the main control messages.
runController :: TChan Consumed -> TChan Emitted -> IO ()
2014-04-08 17:51:49 +00:00
runController ichan ochan = do
h <- genTransportHandle
m <- genRemoteMap h ochan
starttransports m
serverchans <- mapM (startserver h) remoteServers
go h False m serverchans
where
go h paused m serverchans = do
cmd <- atomically $ readTChan ichan
broadcast cmd serverchans
case cmd of
RELOAD -> do
2014-04-20 19:45:14 +00:00
h' <- updateTransportHandle h
m' <- genRemoteMap h' ochan
let common = M.intersection m m'
let new = M.difference m' m
let old = M.difference m m'
broadcast STOP (mchans old)
unless paused $
starttransports new
go h' paused (M.union common new) serverchans
LOSTNET -> do
-- force close all cached ssh connections
-- (done here so that if there are multiple
-- ssh remotes, it's only done once)
liftAnnex h forceSshCleanup
broadcast LOSTNET transportchans
go h True m serverchans
PAUSE -> do
broadcast STOP transportchans
go h True m serverchans
RESUME -> do
when paused $
starttransports m
go h False m serverchans
STOP -> exitSuccess
-- All remaining messages are sent to
-- all Transports.
msg -> do
unless paused $
broadcast msg transportchans
go h paused m serverchans
where
transportchans = mchans m
mchans = map snd . M.elems
startserver h server = do
c <- newTChanIO
void $ async $ server c h
return c
starttransports m = forM_ (M.elems m) starttransports'
starttransports' (transport, c) = do
-- drain any old control messages from the channel
-- to avoid confusing the transport with them
atomically $ drain c
void $ async transport
drain c = maybe noop (const $ drain c) =<< tryReadTChan c
broadcast msg cs = atomically $ forM_ cs $ \c -> writeTChan c msg
-- Generates a map with a transport for each supported remote in the git repo,
-- except those that have annex.sync = false
genRemoteMap :: TransportHandle -> TChan Emitted -> IO RemoteMap
genRemoteMap h@(TransportHandle (LocalRepo g) _ _) ochan = do
rs <- Git.Construct.fromRemotes g
M.fromList . catMaybes <$> mapM gen rs
where
gen r = do
gc <- atomically $ extractRemoteGitConfig g (Git.repoDescribe r)
case Git.location r of
Git.Url u -> case M.lookup (uriScheme u) remoteTransports of
Just transport -> ifM (getDynamicConfig (remoteAnnexSync gc))
( do
ichan <- newTChanIO :: IO (TChan Consumed)
return $ Just
( r
, (transport (RemoteRepo r gc) (RemoteURI u) h ichan ochan, ichan)
)
, return Nothing
)
Nothing -> return Nothing
_ -> return Nothing
genTransportHandle :: IO TransportHandle
genTransportHandle = do
(st, rd) <- Annex.new =<< Git.CurrentRepo.get
mvar <- newMVar st
let g = Annex.repo st
let h = TransportHandle (LocalRepo g) mvar rd
liftAnnex h $ do
Annex.setOutput QuietOutput
enableInteractiveBranchAccess
return h
2014-04-20 19:45:14 +00:00
updateTransportHandle :: TransportHandle -> IO TransportHandle
updateTransportHandle h@(TransportHandle _g st rd) = do
2014-04-20 19:45:14 +00:00
g' <- liftAnnex h $ do
reloadConfig
2019-11-12 14:32:14 +00:00
Annex.gitRepo
return (TransportHandle (LocalRepo g') st rd)