cleaned up AnnexState handling in transports
This commit is contained in:
parent
b3b07ab330
commit
fa0cf81b26
6 changed files with 95 additions and 42 deletions
|
@ -32,7 +32,10 @@ getConfigMaybe (ConfigKey key) = fromRepo $ Git.Config.getMaybe key
|
||||||
setConfig :: ConfigKey -> String -> Annex ()
|
setConfig :: ConfigKey -> String -> Annex ()
|
||||||
setConfig (ConfigKey key) value = do
|
setConfig (ConfigKey key) value = do
|
||||||
inRepo $ Git.Command.run [Param "config", Param key, Param value]
|
inRepo $ Git.Command.run [Param "config", Param key, Param value]
|
||||||
Annex.changeGitRepo =<< inRepo Git.Config.reRead
|
reloadConfig
|
||||||
|
|
||||||
|
reloadConfig :: Annex ()
|
||||||
|
reloadConfig = Annex.changeGitRepo =<< inRepo Git.Config.reRead
|
||||||
|
|
||||||
{- Unsets a git config setting. (Leaves it in state currently.) -}
|
{- Unsets a git config setting. (Leaves it in state currently.) -}
|
||||||
unsetConfig :: ConfigKey -> Annex ()
|
unsetConfig :: ConfigKey -> Annex ()
|
||||||
|
|
42
RemoteDaemon/Common.hs
Normal file
42
RemoteDaemon/Common.hs
Normal file
|
@ -0,0 +1,42 @@
|
||||||
|
{- git-remote-daemon utilities
|
||||||
|
-
|
||||||
|
- Copyright 2014 Joey Hess <joey@kitenet.net>
|
||||||
|
-
|
||||||
|
- Licensed under the GNU GPL version 3 or higher.
|
||||||
|
-}
|
||||||
|
|
||||||
|
module RemoteDaemon.Common
|
||||||
|
( liftAnnex
|
||||||
|
, inLocalRepo
|
||||||
|
, checkNewShas
|
||||||
|
) where
|
||||||
|
|
||||||
|
import qualified Annex
|
||||||
|
import Common.Annex
|
||||||
|
import RemoteDaemon.Types
|
||||||
|
import qualified Git
|
||||||
|
import Annex.CatFile
|
||||||
|
|
||||||
|
import Control.Concurrent
|
||||||
|
|
||||||
|
-- Runs an Annex action. Long-running actions should be avoided,
|
||||||
|
-- since only one liftAnnex can be running at a time, amoung all
|
||||||
|
-- transports.
|
||||||
|
liftAnnex :: TransportHandle -> Annex a -> IO a
|
||||||
|
liftAnnex (TransportHandle _ annexstate) a = do
|
||||||
|
st <- takeMVar annexstate
|
||||||
|
(r, st') <- Annex.run st a
|
||||||
|
putMVar annexstate st'
|
||||||
|
return r
|
||||||
|
|
||||||
|
inLocalRepo :: TransportHandle -> (Git.Repo -> IO a) -> IO a
|
||||||
|
inLocalRepo (TransportHandle g _) a = a g
|
||||||
|
|
||||||
|
-- Check if any of the shas are actally new in the local git repo,
|
||||||
|
-- to avoid unnecessary fetching.
|
||||||
|
checkNewShas :: TransportHandle -> [Git.Sha] -> IO Bool
|
||||||
|
checkNewShas transporthandle = check
|
||||||
|
where
|
||||||
|
check [] = return True
|
||||||
|
check (r:rs) = maybe (check rs) (const $ return False)
|
||||||
|
=<< liftAnnex transporthandle (catObjectDetails r)
|
|
@ -10,15 +10,17 @@ module RemoteDaemon.Core (runForeground) where
|
||||||
import qualified Annex
|
import qualified Annex
|
||||||
import Common
|
import Common
|
||||||
import Types.GitConfig
|
import Types.GitConfig
|
||||||
|
import RemoteDaemon.Common
|
||||||
import RemoteDaemon.Types
|
import RemoteDaemon.Types
|
||||||
import RemoteDaemon.Transport
|
import RemoteDaemon.Transport
|
||||||
import qualified Git
|
import qualified Git
|
||||||
import qualified Git.Types as Git
|
import qualified Git.Types as Git
|
||||||
import qualified Git.CurrentRepo
|
import qualified Git.CurrentRepo
|
||||||
import Utility.SimpleProtocol
|
import Utility.SimpleProtocol
|
||||||
|
import Config
|
||||||
|
|
||||||
import Control.Concurrent.Async
|
import Control.Concurrent.Async
|
||||||
import Control.Concurrent.Chan
|
import Control.Concurrent
|
||||||
import Network.URI
|
import Network.URI
|
||||||
import qualified Data.Map as M
|
import qualified Data.Map as M
|
||||||
|
|
||||||
|
@ -50,36 +52,38 @@ type RemoteMap = M.Map Git.Repo (IO (), Chan Consumed)
|
||||||
-- the main control messages.
|
-- the main control messages.
|
||||||
controller :: Chan Consumed -> Chan Emitted -> IO ()
|
controller :: Chan Consumed -> Chan Emitted -> IO ()
|
||||||
controller ichan ochan = do
|
controller ichan ochan = do
|
||||||
m <- getRemoteMap ochan
|
h <- genTransportHandle
|
||||||
|
m <- genRemoteMap h ochan
|
||||||
startrunning m
|
startrunning m
|
||||||
go False m
|
go h False m
|
||||||
where
|
where
|
||||||
go paused m = do
|
go h paused m = do
|
||||||
cmd <- readChan ichan
|
cmd <- readChan ichan
|
||||||
case cmd of
|
case cmd of
|
||||||
RELOAD -> do
|
RELOAD -> do
|
||||||
m' <- getRemoteMap ochan
|
liftAnnex h reloadConfig
|
||||||
|
m' <- genRemoteMap h ochan
|
||||||
let common = M.intersection m m'
|
let common = M.intersection m m'
|
||||||
let new = M.difference m' m
|
let new = M.difference m' m
|
||||||
let old = M.difference m m'
|
let old = M.difference m m'
|
||||||
stoprunning old
|
stoprunning old
|
||||||
unless paused $
|
unless paused $
|
||||||
startrunning new
|
startrunning new
|
||||||
go paused (M.union common new)
|
go h paused (M.union common new)
|
||||||
PAUSE -> do
|
PAUSE -> do
|
||||||
stoprunning m
|
stoprunning m
|
||||||
go True m
|
go h True m
|
||||||
RESUME -> do
|
RESUME -> do
|
||||||
when paused $
|
when paused $
|
||||||
startrunning m
|
startrunning m
|
||||||
go False m
|
go h False m
|
||||||
STOP -> exitSuccess
|
STOP -> exitSuccess
|
||||||
-- All remaining messages are sent to
|
-- All remaining messages are sent to
|
||||||
-- all Transports.
|
-- all Transports.
|
||||||
msg -> do
|
msg -> do
|
||||||
unless paused $
|
unless paused $
|
||||||
forM_ chans (`writeChan` msg)
|
forM_ chans (`writeChan` msg)
|
||||||
go paused m
|
go h paused m
|
||||||
where
|
where
|
||||||
chans = map snd (M.elems m)
|
chans = map snd (M.elems m)
|
||||||
|
|
||||||
|
@ -90,17 +94,12 @@ controller ichan ochan = do
|
||||||
stoprunning m = forM_ (M.elems m) stoprunning'
|
stoprunning m = forM_ (M.elems m) stoprunning'
|
||||||
stoprunning' (_, c) = writeChan c STOP
|
stoprunning' (_, c) = writeChan c STOP
|
||||||
|
|
||||||
getRemoteMap :: Chan Emitted -> IO RemoteMap
|
|
||||||
getRemoteMap ochan = do
|
|
||||||
annexstate <- Annex.new =<< Git.CurrentRepo.get
|
|
||||||
genRemoteMap annexstate ochan
|
|
||||||
|
|
||||||
-- Generates a map with a transport for each supported remote in the git repo,
|
-- Generates a map with a transport for each supported remote in the git repo,
|
||||||
-- except those that have annex.sync = false
|
-- except those that have annex.sync = false
|
||||||
genRemoteMap :: Annex.AnnexState -> Chan Emitted -> IO RemoteMap
|
genRemoteMap :: TransportHandle -> Chan Emitted -> IO RemoteMap
|
||||||
genRemoteMap annexstate ochan = M.fromList . catMaybes <$> mapM gen rs
|
genRemoteMap h@(TransportHandle g _) ochan =
|
||||||
|
M.fromList . catMaybes <$> mapM gen (Git.remotes g)
|
||||||
where
|
where
|
||||||
rs = Git.remotes (Annex.repo annexstate)
|
|
||||||
gen r = case Git.location r of
|
gen r = case Git.location r of
|
||||||
Git.Url u -> case M.lookup (uriScheme u) remoteTransports of
|
Git.Url u -> case M.lookup (uriScheme u) remoteTransports of
|
||||||
Just transport
|
Just transport
|
||||||
|
@ -108,7 +107,13 @@ genRemoteMap annexstate ochan = M.fromList . catMaybes <$> mapM gen rs
|
||||||
ichan <- newChan :: IO (Chan Consumed)
|
ichan <- newChan :: IO (Chan Consumed)
|
||||||
return $ Just
|
return $ Just
|
||||||
( r
|
( r
|
||||||
, (transport r (Git.repoDescribe r) annexstate ichan ochan, ichan)
|
, (transport r (Git.repoDescribe r) h ichan ochan, ichan)
|
||||||
)
|
)
|
||||||
_ -> return Nothing
|
_ -> return Nothing
|
||||||
_ -> return Nothing
|
_ -> return Nothing
|
||||||
|
|
||||||
|
genTransportHandle :: IO TransportHandle
|
||||||
|
genTransportHandle = do
|
||||||
|
annexstate <- newMVar =<< Annex.new =<< Git.CurrentRepo.get
|
||||||
|
g <- Annex.repo <$> readMVar annexstate
|
||||||
|
return $ TransportHandle g annexstate
|
||||||
|
|
|
@ -8,13 +8,11 @@
|
||||||
module RemoteDaemon.Transport.Ssh (transport) where
|
module RemoteDaemon.Transport.Ssh (transport) where
|
||||||
|
|
||||||
import Common.Annex
|
import Common.Annex
|
||||||
import qualified Annex
|
|
||||||
import RemoteDaemon.Types
|
import RemoteDaemon.Types
|
||||||
import qualified RemoteDaemon.Transport.Ssh.Types as SshRemote
|
import RemoteDaemon.Common
|
||||||
import Remote.Helper.Ssh
|
import Remote.Helper.Ssh
|
||||||
|
import qualified RemoteDaemon.Transport.Ssh.Types as SshRemote
|
||||||
import Utility.SimpleProtocol
|
import Utility.SimpleProtocol
|
||||||
import qualified Git
|
|
||||||
import Annex.CatFile
|
|
||||||
import Git.Command
|
import Git.Command
|
||||||
|
|
||||||
import Control.Concurrent.Chan
|
import Control.Concurrent.Chan
|
||||||
|
@ -22,13 +20,12 @@ import Control.Concurrent.Async
|
||||||
import System.Process (std_in, std_out)
|
import System.Process (std_in, std_out)
|
||||||
|
|
||||||
transport :: Transport
|
transport :: Transport
|
||||||
transport r remotename annexstate ichan ochan = Annex.eval annexstate $ do
|
transport r remotename transporthandle ichan ochan = do
|
||||||
v <- git_annex_shell r "notifychanges" [] []
|
v <- liftAnnex transporthandle $ git_annex_shell r "notifychanges" [] []
|
||||||
case v of
|
case v of
|
||||||
Nothing -> noop
|
Nothing -> noop
|
||||||
Just (cmd, params) -> liftIO $ go cmd (toCommand params)
|
Just (cmd, params) -> go cmd (toCommand params)
|
||||||
where
|
where
|
||||||
send msg = writeChan ochan (msg remotename)
|
|
||||||
go cmd params = do
|
go cmd params = do
|
||||||
(Just toh, Just fromh, _, pid) <- createProcess (proc cmd params)
|
(Just toh, Just fromh, _, pid) <- createProcess (proc cmd params)
|
||||||
{ std_in = CreatePipe
|
{ std_in = CreatePipe
|
||||||
|
@ -45,9 +42,9 @@ transport r remotename annexstate ichan ochan = Annex.eval annexstate $ do
|
||||||
l <- hGetLine fromh
|
l <- hGetLine fromh
|
||||||
case parseMessage l of
|
case parseMessage l of
|
||||||
Just SshRemote.READY -> send CONNECTED
|
Just SshRemote.READY -> send CONNECTED
|
||||||
Just (SshRemote.CHANGED refs) ->
|
Just (SshRemote.CHANGED shas) ->
|
||||||
Annex.eval annexstate $
|
whenM (checkNewShas transporthandle shas) $
|
||||||
fetchNew remotename refs
|
fetch
|
||||||
Nothing -> shutdown
|
Nothing -> shutdown
|
||||||
|
|
||||||
-- The only control message that matters is STOP.
|
-- The only control message that matters is STOP.
|
||||||
|
@ -66,10 +63,10 @@ transport r remotename annexstate ichan ochan = Annex.eval annexstate $ do
|
||||||
void $ tryIO $ concurrently fromshell handlecontrol
|
void $ tryIO $ concurrently fromshell handlecontrol
|
||||||
shutdown
|
shutdown
|
||||||
|
|
||||||
-- Check if any of the shas are actally new, to avoid unnecessary fetching.
|
send msg = writeChan ochan (msg remotename)
|
||||||
fetchNew :: RemoteName -> [Git.Sha] -> Annex ()
|
|
||||||
fetchNew remotename = check
|
fetch = do
|
||||||
where
|
send SYNCING
|
||||||
check [] = void $ inRepo $ runBool [Param "fetch", Param remotename]
|
ok <- inLocalRepo transporthandle $
|
||||||
check (r:rs) = maybe (check rs) (const noop)
|
runBool [Param "fetch", Param remotename]
|
||||||
=<< catObjectDetails r
|
send (DONESYNCING ok)
|
||||||
|
|
|
@ -18,14 +18,20 @@ import Control.Concurrent
|
||||||
|
|
||||||
-- A Transport for a particular git remote consumes some messages
|
-- A Transport for a particular git remote consumes some messages
|
||||||
-- from a Chan, and emits others to another Chan.
|
-- from a Chan, and emits others to another Chan.
|
||||||
type Transport = Git.Repo -> RemoteName -> Annex.AnnexState -> Chan Consumed -> Chan Emitted -> IO ()
|
type Transport = RemoteRepo -> RemoteName -> TransportHandle -> Chan Consumed -> Chan Emitted -> IO ()
|
||||||
|
|
||||||
|
type RemoteRepo = Git.Repo
|
||||||
|
type LocalRepo = Git.Repo
|
||||||
|
|
||||||
|
-- All Transports share a single AnnexState MVar
|
||||||
|
data TransportHandle = TransportHandle LocalRepo (MVar Annex.AnnexState)
|
||||||
|
|
||||||
-- Messages that the daemon emits.
|
-- Messages that the daemon emits.
|
||||||
data Emitted
|
data Emitted
|
||||||
= CONNECTED RemoteName
|
= CONNECTED RemoteName
|
||||||
| DISCONNECTED RemoteName
|
| DISCONNECTED RemoteName
|
||||||
| SYNCING RemoteName
|
| SYNCING RemoteName
|
||||||
| DONESYNCING RemoteName Bool
|
| DONESYNCING Bool RemoteName
|
||||||
|
|
||||||
-- Messages that the deamon consumes.
|
-- Messages that the deamon consumes.
|
||||||
data Consumed
|
data Consumed
|
||||||
|
@ -45,8 +51,8 @@ instance Proto.Sendable Emitted where
|
||||||
["DISCONNECTED", Proto.serialize remote]
|
["DISCONNECTED", Proto.serialize remote]
|
||||||
formatMessage (SYNCING remote) =
|
formatMessage (SYNCING remote) =
|
||||||
["SYNCING", Proto.serialize remote]
|
["SYNCING", Proto.serialize remote]
|
||||||
formatMessage (DONESYNCING remote status) =
|
formatMessage (DONESYNCING status remote) =
|
||||||
["DONESYNCING", Proto.serialize remote, Proto.serialize status]
|
["DONESYNCING", Proto.serialize status, Proto.serialize remote]
|
||||||
|
|
||||||
instance Proto.Sendable Consumed where
|
instance Proto.Sendable Consumed where
|
||||||
formatMessage PAUSE = ["PAUSE"]
|
formatMessage PAUSE = ["PAUSE"]
|
||||||
|
|
|
@ -82,7 +82,7 @@ the webapp.
|
||||||
Indicates that a pull or a push with a remote is in progress.
|
Indicates that a pull or a push with a remote is in progress.
|
||||||
Always followed by DONESYNCING.
|
Always followed by DONESYNCING.
|
||||||
|
|
||||||
* `DONESYNCING $remote 1|0`
|
* `DONESYNCING 1|0 $remote`
|
||||||
|
|
||||||
Indicates that syncing with a remote is done, and either succeeded
|
Indicates that syncing with a remote is done, and either succeeded
|
||||||
(1) or failed (0).
|
(1) or failed (0).
|
||||||
|
|
Loading…
Reference in a new issue