40ecf58d4b
This does not change the overall license of the git-annex program, which was already AGPL due to a number of sources files being AGPL already. Legally speaking, I'm adding a new license under which these files are now available; I already released their current contents under the GPL license. Now they're dual licensed GPL and AGPL. However, I intend for all my future changes to these files to only be released under the AGPL license, and I won't be tracking the dual licensing status, so I'm simply changing the license statement to say it's AGPL. (In some cases, others wrote parts of the code of a file and released it under the GPL; but in all cases I have contributed a significant portion of the code in each file and it's that code that is getting the AGPL license; the GPL license of other contributors allows combining with AGPL code.)
174 lines
5 KiB
Haskell
174 lines
5 KiB
Haskell
{- git-remote-daemon core
|
|
-
|
|
- Copyright 2014-2016 Joey Hess <id@joeyh.name>
|
|
-
|
|
- Licensed under the GNU AGPL version 3 or higher.
|
|
-}
|
|
|
|
module RemoteDaemon.Core (runInteractive, runNonInteractive) where
|
|
|
|
import qualified Annex
|
|
import Common
|
|
import Types.GitConfig
|
|
import Config.DynamicConfig
|
|
import RemoteDaemon.Common
|
|
import RemoteDaemon.Types
|
|
import RemoteDaemon.Transport
|
|
import qualified Git
|
|
import qualified Git.Types as Git
|
|
import qualified Git.CurrentRepo
|
|
import qualified Git.Construct
|
|
import Utility.SimpleProtocol
|
|
import Utility.ThreadScheduler
|
|
import Config
|
|
import Annex.Ssh
|
|
import Types.Messages
|
|
|
|
import Control.Concurrent
|
|
import Control.Concurrent.Async
|
|
import Control.Concurrent.STM
|
|
import Network.URI
|
|
import qualified Data.Map as M
|
|
|
|
runInteractive :: IO ()
|
|
runInteractive = do
|
|
(readh, writeh) <- dupIoHandles
|
|
ichan <- newTChanIO :: IO (TChan Consumed)
|
|
ochan <- newTChanIO :: IO (TChan Emitted)
|
|
|
|
let reader = forever $ do
|
|
l <- hGetLine readh
|
|
case parseMessage l of
|
|
Nothing -> error $ "protocol error: " ++ l
|
|
Just cmd -> atomically $ writeTChan ichan cmd
|
|
let writer = forever $ do
|
|
msg <- atomically $ readTChan ochan
|
|
hPutStrLn writeh $ unwords $ formatMessage msg
|
|
hFlush writeh
|
|
let controller = runController ichan ochan
|
|
|
|
-- If any thread fails, the rest will be killed.
|
|
void $ tryIO $ reader
|
|
`concurrently` writer
|
|
`concurrently` controller
|
|
|
|
runNonInteractive :: IO ()
|
|
runNonInteractive = do
|
|
ichan <- newTChanIO :: IO (TChan Consumed)
|
|
ochan <- newTChanIO :: IO (TChan Emitted)
|
|
|
|
let reader = forever $ do
|
|
threadDelaySeconds (Seconds (60*60))
|
|
atomically $ writeTChan ichan RELOAD
|
|
let writer = forever $
|
|
void $ atomically $ readTChan ochan
|
|
let controller = runController ichan ochan
|
|
|
|
void $ tryIO $ reader
|
|
`concurrently` writer
|
|
`concurrently` controller
|
|
|
|
type RemoteMap = M.Map Git.Repo (IO (), TChan Consumed)
|
|
|
|
-- Runs the transports, dispatching messages to them, and handling
|
|
-- the main control messages.
|
|
runController :: TChan Consumed -> TChan Emitted -> IO ()
|
|
runController ichan ochan = do
|
|
h <- genTransportHandle
|
|
m <- genRemoteMap h ochan
|
|
starttransports m
|
|
serverchans <- mapM (startserver h) remoteServers
|
|
go h False m serverchans
|
|
where
|
|
go h paused m serverchans = do
|
|
cmd <- atomically $ readTChan ichan
|
|
broadcast cmd serverchans
|
|
case cmd of
|
|
RELOAD -> do
|
|
h' <- updateTransportHandle h
|
|
m' <- genRemoteMap h' ochan
|
|
let common = M.intersection m m'
|
|
let new = M.difference m' m
|
|
let old = M.difference m m'
|
|
broadcast STOP (mchans old)
|
|
unless paused $
|
|
starttransports new
|
|
go h' paused (M.union common new) serverchans
|
|
LOSTNET -> do
|
|
-- force close all cached ssh connections
|
|
-- (done here so that if there are multiple
|
|
-- ssh remotes, it's only done once)
|
|
liftAnnex h forceSshCleanup
|
|
broadcast LOSTNET transportchans
|
|
go h True m serverchans
|
|
PAUSE -> do
|
|
broadcast STOP transportchans
|
|
go h True m serverchans
|
|
RESUME -> do
|
|
when paused $
|
|
starttransports m
|
|
go h False m serverchans
|
|
STOP -> exitSuccess
|
|
-- All remaining messages are sent to
|
|
-- all Transports.
|
|
msg -> do
|
|
unless paused $
|
|
broadcast msg transportchans
|
|
go h paused m serverchans
|
|
where
|
|
transportchans = mchans m
|
|
mchans = map snd . M.elems
|
|
|
|
startserver h server = do
|
|
c <- newTChanIO
|
|
void $ async $ server c h
|
|
return c
|
|
|
|
starttransports m = forM_ (M.elems m) starttransports'
|
|
starttransports' (transport, c) = do
|
|
-- drain any old control messages from the channel
|
|
-- to avoid confusing the transport with them
|
|
atomically $ drain c
|
|
void $ async transport
|
|
|
|
drain c = maybe noop (const $ drain c) =<< tryReadTChan c
|
|
|
|
broadcast msg cs = atomically $ forM_ cs $ \c -> writeTChan c msg
|
|
|
|
-- Generates a map with a transport for each supported remote in the git repo,
|
|
-- except those that have annex.sync = false
|
|
genRemoteMap :: TransportHandle -> TChan Emitted -> IO RemoteMap
|
|
genRemoteMap h@(TransportHandle (LocalRepo g) _) ochan = do
|
|
rs <- Git.Construct.fromRemotes g
|
|
M.fromList . catMaybes <$> mapM gen rs
|
|
where
|
|
gen r = do
|
|
gc <- atomically $ extractRemoteGitConfig g (Git.repoDescribe r)
|
|
case Git.location r of
|
|
Git.Url u -> case M.lookup (uriScheme u) remoteTransports of
|
|
Just transport -> ifM (getDynamicConfig (remoteAnnexSync gc))
|
|
( do
|
|
ichan <- newTChanIO :: IO (TChan Consumed)
|
|
return $ Just
|
|
( r
|
|
, (transport (RemoteRepo r gc) (RemoteURI u) h ichan ochan, ichan)
|
|
)
|
|
, return Nothing
|
|
)
|
|
Nothing -> return Nothing
|
|
_ -> return Nothing
|
|
|
|
genTransportHandle :: IO TransportHandle
|
|
genTransportHandle = do
|
|
annexstate <- newMVar =<< Annex.new =<< Git.CurrentRepo.get
|
|
g <- Annex.repo <$> readMVar annexstate
|
|
let h = TransportHandle (LocalRepo g) annexstate
|
|
liftAnnex h $ Annex.setOutput QuietOutput
|
|
return h
|
|
|
|
updateTransportHandle :: TransportHandle -> IO TransportHandle
|
|
updateTransportHandle h@(TransportHandle _g annexstate) = do
|
|
g' <- liftAnnex h $ do
|
|
reloadConfig
|
|
Annex.fromRepo id
|
|
return (TransportHandle (LocalRepo g') annexstate)
|