2b66492d6e
And for tab completion, by not unnessessarily statting paths to remotes, which used to cause eg, spin-up of removable drives. Got rid of the remotes member of Git.Repo. This was a bit painful. Remote.Git modifies the list of remotes as it reads their configs, so still need a persistent list of remotes. So, put it in as Annex.gitremotes. It's only populated by getGitRemotes, so commands like examinekey that don't care about remotes won't do so. This commit was sponsored by Jake Vosloo on Patreon.
174 lines
5 KiB
Haskell
174 lines
5 KiB
Haskell
{- git-remote-daemon core
|
|
-
|
|
- Copyright 2014-2016 Joey Hess <id@joeyh.name>
|
|
-
|
|
- Licensed under the GNU GPL version 3 or higher.
|
|
-}
|
|
|
|
module RemoteDaemon.Core (runInteractive, runNonInteractive) where
|
|
|
|
import qualified Annex
|
|
import Common
|
|
import Types.GitConfig
|
|
import Config.DynamicConfig
|
|
import RemoteDaemon.Common
|
|
import RemoteDaemon.Types
|
|
import RemoteDaemon.Transport
|
|
import qualified Git
|
|
import qualified Git.Types as Git
|
|
import qualified Git.CurrentRepo
|
|
import qualified Git.Construct
|
|
import Utility.SimpleProtocol
|
|
import Utility.ThreadScheduler
|
|
import Config
|
|
import Annex.Ssh
|
|
import Types.Messages
|
|
|
|
import Control.Concurrent
|
|
import Control.Concurrent.Async
|
|
import Control.Concurrent.STM
|
|
import Network.URI
|
|
import qualified Data.Map as M
|
|
|
|
runInteractive :: IO ()
|
|
runInteractive = do
|
|
(readh, writeh) <- dupIoHandles
|
|
ichan <- newTChanIO :: IO (TChan Consumed)
|
|
ochan <- newTChanIO :: IO (TChan Emitted)
|
|
|
|
let reader = forever $ do
|
|
l <- hGetLine readh
|
|
case parseMessage l of
|
|
Nothing -> error $ "protocol error: " ++ l
|
|
Just cmd -> atomically $ writeTChan ichan cmd
|
|
let writer = forever $ do
|
|
msg <- atomically $ readTChan ochan
|
|
hPutStrLn writeh $ unwords $ formatMessage msg
|
|
hFlush writeh
|
|
let controller = runController ichan ochan
|
|
|
|
-- If any thread fails, the rest will be killed.
|
|
void $ tryIO $ reader
|
|
`concurrently` writer
|
|
`concurrently` controller
|
|
|
|
runNonInteractive :: IO ()
|
|
runNonInteractive = do
|
|
ichan <- newTChanIO :: IO (TChan Consumed)
|
|
ochan <- newTChanIO :: IO (TChan Emitted)
|
|
|
|
let reader = forever $ do
|
|
threadDelaySeconds (Seconds (60*60))
|
|
atomically $ writeTChan ichan RELOAD
|
|
let writer = forever $
|
|
void $ atomically $ readTChan ochan
|
|
let controller = runController ichan ochan
|
|
|
|
void $ tryIO $ reader
|
|
`concurrently` writer
|
|
`concurrently` controller
|
|
|
|
type RemoteMap = M.Map Git.Repo (IO (), TChan Consumed)
|
|
|
|
-- Runs the transports, dispatching messages to them, and handling
|
|
-- the main control messages.
|
|
runController :: TChan Consumed -> TChan Emitted -> IO ()
|
|
runController ichan ochan = do
|
|
h <- genTransportHandle
|
|
m <- genRemoteMap h ochan
|
|
starttransports m
|
|
serverchans <- mapM (startserver h) remoteServers
|
|
go h False m serverchans
|
|
where
|
|
go h paused m serverchans = do
|
|
cmd <- atomically $ readTChan ichan
|
|
broadcast cmd serverchans
|
|
case cmd of
|
|
RELOAD -> do
|
|
h' <- updateTransportHandle h
|
|
m' <- genRemoteMap h' ochan
|
|
let common = M.intersection m m'
|
|
let new = M.difference m' m
|
|
let old = M.difference m m'
|
|
broadcast STOP (mchans old)
|
|
unless paused $
|
|
starttransports new
|
|
go h' paused (M.union common new) serverchans
|
|
LOSTNET -> do
|
|
-- force close all cached ssh connections
|
|
-- (done here so that if there are multiple
|
|
-- ssh remotes, it's only done once)
|
|
liftAnnex h forceSshCleanup
|
|
broadcast LOSTNET transportchans
|
|
go h True m serverchans
|
|
PAUSE -> do
|
|
broadcast STOP transportchans
|
|
go h True m serverchans
|
|
RESUME -> do
|
|
when paused $
|
|
starttransports m
|
|
go h False m serverchans
|
|
STOP -> exitSuccess
|
|
-- All remaining messages are sent to
|
|
-- all Transports.
|
|
msg -> do
|
|
unless paused $
|
|
broadcast msg transportchans
|
|
go h paused m serverchans
|
|
where
|
|
transportchans = mchans m
|
|
mchans = map snd . M.elems
|
|
|
|
startserver h server = do
|
|
c <- newTChanIO
|
|
void $ async $ server c h
|
|
return c
|
|
|
|
starttransports m = forM_ (M.elems m) starttransports'
|
|
starttransports' (transport, c) = do
|
|
-- drain any old control messages from the channel
|
|
-- to avoid confusing the transport with them
|
|
atomically $ drain c
|
|
void $ async transport
|
|
|
|
drain c = maybe noop (const $ drain c) =<< tryReadTChan c
|
|
|
|
broadcast msg cs = atomically $ forM_ cs $ \c -> writeTChan c msg
|
|
|
|
-- Generates a map with a transport for each supported remote in the git repo,
|
|
-- except those that have annex.sync = false
|
|
genRemoteMap :: TransportHandle -> TChan Emitted -> IO RemoteMap
|
|
genRemoteMap h@(TransportHandle (LocalRepo g) _) ochan = do
|
|
rs <- Git.Construct.fromRemotes g
|
|
M.fromList . catMaybes <$> mapM gen rs
|
|
where
|
|
gen r = do
|
|
gc <- atomically $ extractRemoteGitConfig g (Git.repoDescribe r)
|
|
case Git.location r of
|
|
Git.Url u -> case M.lookup (uriScheme u) remoteTransports of
|
|
Just transport -> ifM (getDynamicConfig (remoteAnnexSync gc))
|
|
( do
|
|
ichan <- newTChanIO :: IO (TChan Consumed)
|
|
return $ Just
|
|
( r
|
|
, (transport (RemoteRepo r gc) (RemoteURI u) h ichan ochan, ichan)
|
|
)
|
|
, return Nothing
|
|
)
|
|
Nothing -> return Nothing
|
|
_ -> return Nothing
|
|
|
|
genTransportHandle :: IO TransportHandle
|
|
genTransportHandle = do
|
|
annexstate <- newMVar =<< Annex.new =<< Git.CurrentRepo.get
|
|
g <- Annex.repo <$> readMVar annexstate
|
|
let h = TransportHandle (LocalRepo g) annexstate
|
|
liftAnnex h $ Annex.setOutput QuietOutput
|
|
return h
|
|
|
|
updateTransportHandle :: TransportHandle -> IO TransportHandle
|
|
updateTransportHandle h@(TransportHandle _g annexstate) = do
|
|
g' <- liftAnnex h $ do
|
|
reloadConfig
|
|
Annex.fromRepo id
|
|
return (TransportHandle (LocalRepo g') annexstate)
|