make git-remote-daemon ssh transport robust
* Remote system might be available, and connection get lost. Should reconnect, but needs to avoid bad behavior (ie, constant reconnect attempts.) Use exponential backoff. * Detect if old system had a too old git-annex-shell, and show the user a nice message in the webapp. Required parsing error messages, so perhaps this code shoudl be removed once enough time has passed.. * Switch the protocol to using remote URI's, rather than remote names. Names change. Also avoids issues with serialization of names containing whitespace. This is nearly ready for merge into master now. I'd still like to make the ssh transport smarter about reusing ssh connection caching during git pull. This commit was sponsored by Jim Paris.
This commit is contained in:
parent
f67d5abc41
commit
fb73792f72
6 changed files with 179 additions and 78 deletions
|
@ -15,11 +15,16 @@ import Utility.SimpleProtocol
|
||||||
import Assistant.Alert
|
import Assistant.Alert
|
||||||
import Assistant.Alert.Utility
|
import Assistant.Alert.Utility
|
||||||
import Assistant.DaemonStatus
|
import Assistant.DaemonStatus
|
||||||
|
import qualified Git
|
||||||
|
import qualified Git.Types as Git
|
||||||
|
import qualified Remote
|
||||||
|
import qualified Types.Remote as Remote
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
import Control.Concurrent.Async
|
import Control.Concurrent.Async
|
||||||
import System.Process (std_in, std_out)
|
import System.Process (std_in, std_out)
|
||||||
import qualified Data.Map as M
|
import qualified Data.Map as M
|
||||||
|
import Network.URI
|
||||||
|
|
||||||
remoteControlThread :: NamedThread
|
remoteControlThread :: NamedThread
|
||||||
remoteControlThread = namedThread "RemoteControl" $ do
|
remoteControlThread = namedThread "RemoteControl" $ do
|
||||||
|
@ -32,8 +37,10 @@ remoteControlThread = namedThread "RemoteControl" $ do
|
||||||
, std_out = CreatePipe
|
, std_out = CreatePipe
|
||||||
}
|
}
|
||||||
|
|
||||||
|
urimap <- liftIO . newMVar =<< liftAnnex getURIMap
|
||||||
|
|
||||||
controller <- asIO $ remoteControllerThread toh
|
controller <- asIO $ remoteControllerThread toh
|
||||||
responder <- asIO $ remoteResponderThread fromh
|
responder <- asIO $ remoteResponderThread fromh urimap
|
||||||
|
|
||||||
-- run controller and responder until the remotedaemon dies
|
-- run controller and responder until the remotedaemon dies
|
||||||
liftIO $ do
|
liftIO $ do
|
||||||
|
@ -50,31 +57,60 @@ remoteControllerThread toh = do
|
||||||
hFlush toh
|
hFlush toh
|
||||||
|
|
||||||
-- read status messages emitted by the remotedaemon and handle them
|
-- read status messages emitted by the remotedaemon and handle them
|
||||||
remoteResponderThread :: Handle -> Assistant ()
|
remoteResponderThread :: Handle -> MVar (M.Map URI Remote) -> Assistant ()
|
||||||
remoteResponderThread fromh = go M.empty
|
remoteResponderThread fromh urimap = go M.empty
|
||||||
where
|
where
|
||||||
go syncalerts = do
|
go syncalerts = do
|
||||||
|
let cont = go syncalerts
|
||||||
|
let withr uri = withRemote uri urimap cont
|
||||||
l <- liftIO $ hGetLine fromh
|
l <- liftIO $ hGetLine fromh
|
||||||
case parseMessage l of
|
case parseMessage l of
|
||||||
Just (CONNECTED _rn) -> do
|
Just (CONNECTED _uri) -> do
|
||||||
go syncalerts
|
cont
|
||||||
Just (DISCONNECTED _rn) -> do
|
Just (DISCONNECTED _uri) -> do
|
||||||
go syncalerts
|
cont
|
||||||
Just (SYNCING rn)
|
Just (SYNCING uri) -> withr uri $ \r ->
|
||||||
| M.member rn syncalerts -> go syncalerts
|
if M.member (Remote.uuid r) syncalerts
|
||||||
| otherwise -> do
|
then go syncalerts
|
||||||
i <- addAlert $ syncAlert' [rn]
|
else do
|
||||||
go (M.insert rn i syncalerts)
|
i <- addAlert $ syncAlert [r]
|
||||||
Just (DONESYNCING status rn) ->
|
go (M.insert (Remote.uuid r) i syncalerts)
|
||||||
case M.lookup rn syncalerts of
|
Just (DONESYNCING uri status) -> withr uri $ \r ->
|
||||||
Nothing -> go syncalerts
|
case M.lookup (Remote.uuid r) syncalerts of
|
||||||
|
Nothing -> cont
|
||||||
Just i -> do
|
Just i -> do
|
||||||
let (succeeded, failed) = if status
|
let (succeeded, failed) = if status
|
||||||
then ([rn], [])
|
then ([r], [])
|
||||||
else ([], [rn])
|
else ([], [r])
|
||||||
updateAlertMap $ mergeAlert i $
|
updateAlertMap $ mergeAlert i $
|
||||||
syncResultAlert' succeeded failed
|
syncResultAlert succeeded failed
|
||||||
go (M.delete rn syncalerts)
|
go (M.delete (Remote.uuid r) syncalerts)
|
||||||
|
Just (WARNING (RemoteURI uri) msg) -> do
|
||||||
|
void $ addAlert $
|
||||||
|
warningAlert ("RemoteControl "++ show uri) msg
|
||||||
|
cont
|
||||||
Nothing -> do
|
Nothing -> do
|
||||||
debug ["protocol error from remotedaemon: ", l]
|
debug ["protocol error from remotedaemon: ", l]
|
||||||
go syncalerts
|
cont
|
||||||
|
|
||||||
|
getURIMap :: Annex (M.Map URI Remote)
|
||||||
|
getURIMap = Remote.remoteMap' id (mkk . Git.location . Remote.repo)
|
||||||
|
where
|
||||||
|
mkk (Git.Url u) = Just u
|
||||||
|
mkk _ = Nothing
|
||||||
|
|
||||||
|
withRemote
|
||||||
|
:: RemoteURI
|
||||||
|
-> MVar (M.Map URI Remote)
|
||||||
|
-> Assistant a
|
||||||
|
-> (Remote -> Assistant a)
|
||||||
|
-> Assistant a
|
||||||
|
withRemote (RemoteURI uri) remotemap noremote a = do
|
||||||
|
m <- liftIO $ readMVar remotemap
|
||||||
|
case M.lookup uri m of
|
||||||
|
Just r -> a r
|
||||||
|
Nothing -> do
|
||||||
|
{- Reload map, in case a new remote has been added. -}
|
||||||
|
m' <- liftAnnex getURIMap
|
||||||
|
void $ liftIO $ swapMVar remotemap $ m'
|
||||||
|
maybe noremote a (M.lookup uri m')
|
||||||
|
|
17
Remote.hs
17
Remote.hs
|
@ -22,6 +22,7 @@ module Remote (
|
||||||
remoteList,
|
remoteList,
|
||||||
gitSyncableRemote,
|
gitSyncableRemote,
|
||||||
remoteMap,
|
remoteMap,
|
||||||
|
remoteMap',
|
||||||
uuidDescriptions,
|
uuidDescriptions,
|
||||||
byName,
|
byName,
|
||||||
byNameOnly,
|
byNameOnly,
|
||||||
|
@ -64,9 +65,19 @@ import Git.Types (RemoteName)
|
||||||
import qualified Git
|
import qualified Git
|
||||||
|
|
||||||
{- Map from UUIDs of Remotes to a calculated value. -}
|
{- Map from UUIDs of Remotes to a calculated value. -}
|
||||||
remoteMap :: (Remote -> a) -> Annex (M.Map UUID a)
|
remoteMap :: (Remote -> v) -> Annex (M.Map UUID v)
|
||||||
remoteMap c = M.fromList . map (\r -> (uuid r, c r)) .
|
remoteMap mkv = remoteMap' mkv mkk
|
||||||
filter (\r -> uuid r /= NoUUID) <$> remoteList
|
where
|
||||||
|
mkk r = case uuid r of
|
||||||
|
NoUUID -> Nothing
|
||||||
|
u -> Just u
|
||||||
|
|
||||||
|
remoteMap' :: Ord k => (Remote -> v) -> (Remote -> Maybe k) -> Annex (M.Map k v)
|
||||||
|
remoteMap' mkv mkk = M.fromList . mapMaybe mk <$> remoteList
|
||||||
|
where
|
||||||
|
mk r = case mkk r of
|
||||||
|
Nothing -> Nothing
|
||||||
|
Just k -> Just (k, mkv r)
|
||||||
|
|
||||||
{- Map of UUIDs of remotes and their descriptions.
|
{- Map of UUIDs of remotes and their descriptions.
|
||||||
- The names of Remotes are added to suppliment any description that has
|
- The names of Remotes are added to suppliment any description that has
|
||||||
|
|
|
@ -106,7 +106,7 @@ genRemoteMap h@(TransportHandle g _) ochan =
|
||||||
ichan <- newChan :: IO (Chan Consumed)
|
ichan <- newChan :: IO (Chan Consumed)
|
||||||
return $ Just
|
return $ Just
|
||||||
( r
|
( r
|
||||||
, (transport r (Git.repoDescribe r) h ichan ochan, ichan)
|
, (transport r (RemoteURI u) h ichan ochan, ichan)
|
||||||
)
|
)
|
||||||
_ -> return Nothing
|
_ -> return Nothing
|
||||||
_ -> return Nothing
|
_ -> return Nothing
|
||||||
|
|
|
@ -13,60 +13,103 @@ import RemoteDaemon.Common
|
||||||
import Remote.Helper.Ssh
|
import Remote.Helper.Ssh
|
||||||
import qualified RemoteDaemon.Transport.Ssh.Types as SshRemote
|
import qualified RemoteDaemon.Transport.Ssh.Types as SshRemote
|
||||||
import Utility.SimpleProtocol
|
import Utility.SimpleProtocol
|
||||||
|
import qualified Git
|
||||||
import Git.Command
|
import Git.Command
|
||||||
|
import Utility.ThreadScheduler
|
||||||
|
|
||||||
import Control.Concurrent.Chan
|
import Control.Concurrent.Chan
|
||||||
import Control.Concurrent.Async
|
import Control.Concurrent.Async
|
||||||
import System.Process (std_in, std_out)
|
import System.Process (std_in, std_out, std_err)
|
||||||
|
|
||||||
transport :: Transport
|
transport :: Transport
|
||||||
transport r remotename transporthandle ichan ochan = do
|
transport r url transporthandle ichan ochan = do
|
||||||
v <- liftAnnex transporthandle $ git_annex_shell r "notifychanges" [] []
|
v <- liftAnnex transporthandle $ git_annex_shell r "notifychanges" [] []
|
||||||
case v of
|
case v of
|
||||||
Nothing -> noop
|
Nothing -> noop
|
||||||
Just (cmd, params) -> go cmd (toCommand params)
|
Just (cmd, params) -> robustly 1 $
|
||||||
|
connect cmd (toCommand params)
|
||||||
where
|
where
|
||||||
go cmd params = do
|
connect cmd params = do
|
||||||
(Just toh, Just fromh, _, pid) <- createProcess (proc cmd params)
|
(Just toh, Just fromh, Just errh, pid) <-
|
||||||
|
createProcess (proc cmd params)
|
||||||
{ std_in = CreatePipe
|
{ std_in = CreatePipe
|
||||||
, std_out = CreatePipe
|
, std_out = CreatePipe
|
||||||
|
, std_err = CreatePipe
|
||||||
}
|
}
|
||||||
|
|
||||||
let shutdown = do
|
-- Run all threads until one finishes and get the status
|
||||||
hClose toh
|
-- of the first to finish. Cancel the rest.
|
||||||
hClose fromh
|
status <- catchDefaultIO (Right ConnectionClosed) $
|
||||||
void $ waitForProcess pid
|
handlestderr errh
|
||||||
send DISCONNECTED
|
`race` handlestdout fromh
|
||||||
|
`race` handlecontrol
|
||||||
|
|
||||||
let fromshell = forever $ do
|
send (DISCONNECTED url)
|
||||||
l <- hGetLine fromh
|
hClose toh
|
||||||
case parseMessage l of
|
hClose fromh
|
||||||
Just SshRemote.READY -> send CONNECTED
|
void $ waitForProcess pid
|
||||||
Just (SshRemote.CHANGED shas) ->
|
|
||||||
whenM (checkNewShas transporthandle shas) $
|
|
||||||
fetch
|
|
||||||
Nothing -> shutdown
|
|
||||||
|
|
||||||
-- The only control message that matters is STOP.
|
return $ either (either id id) id status
|
||||||
--
|
|
||||||
-- Note that a CHANGED control message is not handled;
|
|
||||||
-- we don't push to the ssh remote. The assistant
|
|
||||||
-- and git-annex sync both handle pushes, so there's no
|
|
||||||
-- need to do it here.
|
|
||||||
let handlecontrol = forever $ do
|
|
||||||
msg <- readChan ichan
|
|
||||||
case msg of
|
|
||||||
STOP -> ioError (userError "done")
|
|
||||||
_ -> noop
|
|
||||||
|
|
||||||
-- Run both threads until one finishes.
|
send msg = writeChan ochan msg
|
||||||
void $ tryIO $ concurrently fromshell handlecontrol
|
|
||||||
shutdown
|
|
||||||
|
|
||||||
send msg = writeChan ochan (msg remotename)
|
|
||||||
|
|
||||||
fetch = do
|
fetch = do
|
||||||
send SYNCING
|
send (SYNCING url)
|
||||||
ok <- inLocalRepo transporthandle $
|
ok <- inLocalRepo transporthandle $
|
||||||
runBool [Param "fetch", Param remotename]
|
runBool [Param "fetch", Param $ Git.repoDescribe r]
|
||||||
send (DONESYNCING ok)
|
send (DONESYNCING url ok)
|
||||||
|
|
||||||
|
handlestdout fromh = do
|
||||||
|
l <- hGetLine fromh
|
||||||
|
case parseMessage l of
|
||||||
|
Just SshRemote.READY -> do
|
||||||
|
send (CONNECTED url)
|
||||||
|
handlestdout fromh
|
||||||
|
Just (SshRemote.CHANGED shas) -> do
|
||||||
|
whenM (checkNewShas transporthandle shas) $
|
||||||
|
fetch
|
||||||
|
handlestdout fromh
|
||||||
|
-- avoid reconnect on protocol error
|
||||||
|
Nothing -> return Stopping
|
||||||
|
|
||||||
|
handlecontrol = do
|
||||||
|
msg <- readChan ichan
|
||||||
|
case msg of
|
||||||
|
STOP -> return Stopping
|
||||||
|
_ -> handlecontrol
|
||||||
|
|
||||||
|
-- Old versions of git-annex-shell that do not support
|
||||||
|
-- the notifychanges command will exit with a not very useful
|
||||||
|
-- error message. Detect that error, and avoid reconnecting.
|
||||||
|
-- Propigate all stderr.
|
||||||
|
handlestderr errh = do
|
||||||
|
s <- hGetSomeString errh 1024
|
||||||
|
hPutStr stderr s
|
||||||
|
hFlush stderr
|
||||||
|
if "git-annex-shell: git-shell failed" `isInfixOf` s
|
||||||
|
then do
|
||||||
|
send $ WARNING url $ unwords
|
||||||
|
[ "Remote", Git.repoDescribe r
|
||||||
|
, "needs its git-annex upgraded"
|
||||||
|
, "to 5.20140405 or newer"
|
||||||
|
]
|
||||||
|
return Stopping
|
||||||
|
else handlestderr errh
|
||||||
|
|
||||||
|
data Status = Stopping | ConnectionClosed
|
||||||
|
|
||||||
|
{- Make connection robustly, with exponentioal backoff on failure. -}
|
||||||
|
robustly :: Int -> IO Status -> IO ()
|
||||||
|
robustly backoff a = handle =<< catchDefaultIO ConnectionClosed a
|
||||||
|
where
|
||||||
|
handle Stopping = return ()
|
||||||
|
handle ConnectionClosed = do
|
||||||
|
threadDelaySeconds (Seconds backoff)
|
||||||
|
robustly increasedbackoff a
|
||||||
|
|
||||||
|
increasedbackoff
|
||||||
|
| b2 > maxbackoff = maxbackoff
|
||||||
|
| otherwise = b2
|
||||||
|
where
|
||||||
|
b2 = backoff * 2
|
||||||
|
maxbackoff = 3600 -- one hour
|
||||||
|
|
|
@ -10,15 +10,20 @@
|
||||||
|
|
||||||
module RemoteDaemon.Types where
|
module RemoteDaemon.Types where
|
||||||
|
|
||||||
|
import Common
|
||||||
import qualified Annex
|
import qualified Annex
|
||||||
import qualified Git.Types as Git
|
import qualified Git.Types as Git
|
||||||
import qualified Utility.SimpleProtocol as Proto
|
import qualified Utility.SimpleProtocol as Proto
|
||||||
|
|
||||||
|
import Network.URI
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
|
|
||||||
|
-- The URI of a remote is used to uniquely identify it (names change..)
|
||||||
|
newtype RemoteURI = RemoteURI URI
|
||||||
|
|
||||||
-- A Transport for a particular git remote consumes some messages
|
-- A Transport for a particular git remote consumes some messages
|
||||||
-- from a Chan, and emits others to another Chan.
|
-- from a Chan, and emits others to another Chan.
|
||||||
type Transport = RemoteRepo -> RemoteName -> TransportHandle -> Chan Consumed -> Chan Emitted -> IO ()
|
type Transport = RemoteRepo -> RemoteURI -> TransportHandle -> Chan Consumed -> Chan Emitted -> IO ()
|
||||||
|
|
||||||
type RemoteRepo = Git.Repo
|
type RemoteRepo = Git.Repo
|
||||||
type LocalRepo = Git.Repo
|
type LocalRepo = Git.Repo
|
||||||
|
@ -28,10 +33,11 @@ data TransportHandle = TransportHandle LocalRepo (MVar Annex.AnnexState)
|
||||||
|
|
||||||
-- Messages that the daemon emits.
|
-- Messages that the daemon emits.
|
||||||
data Emitted
|
data Emitted
|
||||||
= CONNECTED RemoteName
|
= CONNECTED RemoteURI
|
||||||
| DISCONNECTED RemoteName
|
| DISCONNECTED RemoteURI
|
||||||
| SYNCING RemoteName
|
| SYNCING RemoteURI
|
||||||
| DONESYNCING Bool RemoteName
|
| DONESYNCING RemoteURI Bool
|
||||||
|
| WARNING RemoteURI String
|
||||||
|
|
||||||
-- Messages that the deamon consumes.
|
-- Messages that the deamon consumes.
|
||||||
data Consumed
|
data Consumed
|
||||||
|
@ -41,7 +47,6 @@ data Consumed
|
||||||
| RELOAD
|
| RELOAD
|
||||||
| STOP
|
| STOP
|
||||||
|
|
||||||
type RemoteName = String
|
|
||||||
type RefList = [Git.Ref]
|
type RefList = [Git.Ref]
|
||||||
|
|
||||||
instance Proto.Sendable Emitted where
|
instance Proto.Sendable Emitted where
|
||||||
|
@ -51,8 +56,10 @@ instance Proto.Sendable Emitted where
|
||||||
["DISCONNECTED", Proto.serialize remote]
|
["DISCONNECTED", Proto.serialize remote]
|
||||||
formatMessage (SYNCING remote) =
|
formatMessage (SYNCING remote) =
|
||||||
["SYNCING", Proto.serialize remote]
|
["SYNCING", Proto.serialize remote]
|
||||||
formatMessage (DONESYNCING status remote) =
|
formatMessage (DONESYNCING remote status) =
|
||||||
["DONESYNCING", Proto.serialize status, Proto.serialize remote]
|
["DONESYNCING", Proto.serialize remote, Proto.serialize status]
|
||||||
|
formatMessage (WARNING remote message) =
|
||||||
|
["WARNING", Proto.serialize remote, Proto.serialize message]
|
||||||
|
|
||||||
instance Proto.Sendable Consumed where
|
instance Proto.Sendable Consumed where
|
||||||
formatMessage PAUSE = ["PAUSE"]
|
formatMessage PAUSE = ["PAUSE"]
|
||||||
|
@ -66,6 +73,7 @@ instance Proto.Receivable Emitted where
|
||||||
parseCommand "DISCONNECTED" = Proto.parse1 DISCONNECTED
|
parseCommand "DISCONNECTED" = Proto.parse1 DISCONNECTED
|
||||||
parseCommand "SYNCING" = Proto.parse1 SYNCING
|
parseCommand "SYNCING" = Proto.parse1 SYNCING
|
||||||
parseCommand "DONESYNCING" = Proto.parse2 DONESYNCING
|
parseCommand "DONESYNCING" = Proto.parse2 DONESYNCING
|
||||||
|
parseCommand "WARNING" = Proto.parse2 WARNING
|
||||||
parseCommand _ = Proto.parseFail
|
parseCommand _ = Proto.parseFail
|
||||||
|
|
||||||
instance Proto.Receivable Consumed where
|
instance Proto.Receivable Consumed where
|
||||||
|
@ -76,6 +84,10 @@ instance Proto.Receivable Consumed where
|
||||||
parseCommand "STOP" = Proto.parse0 STOP
|
parseCommand "STOP" = Proto.parse0 STOP
|
||||||
parseCommand _ = Proto.parseFail
|
parseCommand _ = Proto.parseFail
|
||||||
|
|
||||||
|
instance Proto.Serializable RemoteURI where
|
||||||
|
serialize (RemoteURI u) = show u
|
||||||
|
deserialize = RemoteURI <$$> parseURI
|
||||||
|
|
||||||
instance Proto.Serializable [Char] where
|
instance Proto.Serializable [Char] where
|
||||||
serialize = id
|
serialize = id
|
||||||
deserialize = Just
|
deserialize = Just
|
||||||
|
|
|
@ -69,24 +69,28 @@ the webapp.
|
||||||
|
|
||||||
## emitted messages
|
## emitted messages
|
||||||
|
|
||||||
* `CONNECTED $remote`
|
* `CONNECTED uri`
|
||||||
|
|
||||||
Sent when a connection has been made with a remote.
|
Sent when a connection has been made with a remote.
|
||||||
|
|
||||||
* `DISCONNECTED $remote`
|
* `DISCONNECTED uri`
|
||||||
|
|
||||||
Sent when connection with a remote has been lost.
|
Sent when connection with a remote has been lost.
|
||||||
|
|
||||||
* `SYNCING $remote`
|
* `SYNCING uri`
|
||||||
|
|
||||||
Indicates that a pull or a push with a remote is in progress.
|
Indicates that a pull or a push with a remote is in progress.
|
||||||
Always followed by DONESYNCING.
|
Always followed by DONESYNCING.
|
||||||
|
|
||||||
* `DONESYNCING 1|0 $remote`
|
* `DONESYNCING uri 1|0`
|
||||||
|
|
||||||
Indicates that syncing with a remote is done, and either succeeded
|
Indicates that syncing with a remote is done, and either succeeded
|
||||||
(1) or failed (0).
|
(1) or failed (0).
|
||||||
|
|
||||||
|
* `WARNING`uri string`
|
||||||
|
|
||||||
|
A message to display to the user about a remote.
|
||||||
|
|
||||||
## consumed messages
|
## consumed messages
|
||||||
|
|
||||||
* `PAUSE`
|
* `PAUSE`
|
||||||
|
@ -166,11 +170,6 @@ TODO:
|
||||||
* Remote system might not be available. Find a smart way to detect it,
|
* Remote system might not be available. Find a smart way to detect it,
|
||||||
ideally w/o generating network traffic. One way might be to check
|
ideally w/o generating network traffic. One way might be to check
|
||||||
if the ssh connection caching control socket exists, for example.
|
if the ssh connection caching control socket exists, for example.
|
||||||
* Remote system might be available, and connection get lost. Should
|
|
||||||
reconnect, but needs to avoid bad behavior (ie, constant reconnect
|
|
||||||
attempts.)
|
|
||||||
* Detect if old system had a too old git-annex-shell and avoid bad
|
|
||||||
behavior.
|
|
||||||
* CONNECTED and DISCONNECTED are not wired into any webapp UI; could be
|
* CONNECTED and DISCONNECTED are not wired into any webapp UI; could be
|
||||||
used to show an icon when a ssh remote is available
|
used to show an icon when a ssh remote is available
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue