diff --git a/Assistant/Threads/RemoteControl.hs b/Assistant/Threads/RemoteControl.hs index b67b0e07f1..d33a4858d8 100644 --- a/Assistant/Threads/RemoteControl.hs +++ b/Assistant/Threads/RemoteControl.hs @@ -15,11 +15,16 @@ import Utility.SimpleProtocol import Assistant.Alert import Assistant.Alert.Utility import Assistant.DaemonStatus +import qualified Git +import qualified Git.Types as Git +import qualified Remote +import qualified Types.Remote as Remote import Control.Concurrent import Control.Concurrent.Async import System.Process (std_in, std_out) import qualified Data.Map as M +import Network.URI remoteControlThread :: NamedThread remoteControlThread = namedThread "RemoteControl" $ do @@ -32,8 +37,10 @@ remoteControlThread = namedThread "RemoteControl" $ do , std_out = CreatePipe } + urimap <- liftIO . newMVar =<< liftAnnex getURIMap + controller <- asIO $ remoteControllerThread toh - responder <- asIO $ remoteResponderThread fromh + responder <- asIO $ remoteResponderThread fromh urimap -- run controller and responder until the remotedaemon dies liftIO $ do @@ -50,31 +57,60 @@ remoteControllerThread toh = do hFlush toh -- read status messages emitted by the remotedaemon and handle them -remoteResponderThread :: Handle -> Assistant () -remoteResponderThread fromh = go M.empty +remoteResponderThread :: Handle -> MVar (M.Map URI Remote) -> Assistant () +remoteResponderThread fromh urimap = go M.empty where go syncalerts = do + let cont = go syncalerts + let withr uri = withRemote uri urimap cont l <- liftIO $ hGetLine fromh case parseMessage l of - Just (CONNECTED _rn) -> do - go syncalerts - Just (DISCONNECTED _rn) -> do - go syncalerts - Just (SYNCING rn) - | M.member rn syncalerts -> go syncalerts - | otherwise -> do - i <- addAlert $ syncAlert' [rn] - go (M.insert rn i syncalerts) - Just (DONESYNCING status rn) -> - case M.lookup rn syncalerts of - Nothing -> go syncalerts + Just (CONNECTED _uri) -> do + cont + Just (DISCONNECTED _uri) -> do + cont + Just (SYNCING uri) -> withr uri $ \r -> + if M.member (Remote.uuid r) syncalerts + then go syncalerts + else do + i <- addAlert $ syncAlert [r] + go (M.insert (Remote.uuid r) i syncalerts) + Just (DONESYNCING uri status) -> withr uri $ \r -> + case M.lookup (Remote.uuid r) syncalerts of + Nothing -> cont Just i -> do let (succeeded, failed) = if status - then ([rn], []) - else ([], [rn]) + then ([r], []) + else ([], [r]) updateAlertMap $ mergeAlert i $ - syncResultAlert' succeeded failed - go (M.delete rn syncalerts) + syncResultAlert succeeded failed + go (M.delete (Remote.uuid r) syncalerts) + Just (WARNING (RemoteURI uri) msg) -> do + void $ addAlert $ + warningAlert ("RemoteControl "++ show uri) msg + cont Nothing -> do debug ["protocol error from remotedaemon: ", l] - go syncalerts + cont + +getURIMap :: Annex (M.Map URI Remote) +getURIMap = Remote.remoteMap' id (mkk . Git.location . Remote.repo) + where + mkk (Git.Url u) = Just u + mkk _ = Nothing + +withRemote + :: RemoteURI + -> MVar (M.Map URI Remote) + -> Assistant a + -> (Remote -> Assistant a) + -> Assistant a +withRemote (RemoteURI uri) remotemap noremote a = do + m <- liftIO $ readMVar remotemap + case M.lookup uri m of + Just r -> a r + Nothing -> do + {- Reload map, in case a new remote has been added. -} + m' <- liftAnnex getURIMap + void $ liftIO $ swapMVar remotemap $ m' + maybe noremote a (M.lookup uri m') diff --git a/Remote.hs b/Remote.hs index 0f31b99b29..da33e195e5 100644 --- a/Remote.hs +++ b/Remote.hs @@ -22,6 +22,7 @@ module Remote ( remoteList, gitSyncableRemote, remoteMap, + remoteMap', uuidDescriptions, byName, byNameOnly, @@ -64,9 +65,19 @@ import Git.Types (RemoteName) import qualified Git {- Map from UUIDs of Remotes to a calculated value. -} -remoteMap :: (Remote -> a) -> Annex (M.Map UUID a) -remoteMap c = M.fromList . map (\r -> (uuid r, c r)) . - filter (\r -> uuid r /= NoUUID) <$> remoteList +remoteMap :: (Remote -> v) -> Annex (M.Map UUID v) +remoteMap mkv = remoteMap' mkv mkk + where + mkk r = case uuid r of + NoUUID -> Nothing + u -> Just u + +remoteMap' :: Ord k => (Remote -> v) -> (Remote -> Maybe k) -> Annex (M.Map k v) +remoteMap' mkv mkk = M.fromList . mapMaybe mk <$> remoteList + where + mk r = case mkk r of + Nothing -> Nothing + Just k -> Just (k, mkv r) {- Map of UUIDs of remotes and their descriptions. - The names of Remotes are added to suppliment any description that has diff --git a/RemoteDaemon/Core.hs b/RemoteDaemon/Core.hs index 7d07c35b10..0c29371033 100644 --- a/RemoteDaemon/Core.hs +++ b/RemoteDaemon/Core.hs @@ -106,7 +106,7 @@ genRemoteMap h@(TransportHandle g _) ochan = ichan <- newChan :: IO (Chan Consumed) return $ Just ( r - , (transport r (Git.repoDescribe r) h ichan ochan, ichan) + , (transport r (RemoteURI u) h ichan ochan, ichan) ) _ -> return Nothing _ -> return Nothing diff --git a/RemoteDaemon/Transport/Ssh.hs b/RemoteDaemon/Transport/Ssh.hs index 557a3dce90..87fcf6f8c0 100644 --- a/RemoteDaemon/Transport/Ssh.hs +++ b/RemoteDaemon/Transport/Ssh.hs @@ -13,60 +13,103 @@ import RemoteDaemon.Common import Remote.Helper.Ssh import qualified RemoteDaemon.Transport.Ssh.Types as SshRemote import Utility.SimpleProtocol +import qualified Git import Git.Command +import Utility.ThreadScheduler import Control.Concurrent.Chan import Control.Concurrent.Async -import System.Process (std_in, std_out) +import System.Process (std_in, std_out, std_err) transport :: Transport -transport r remotename transporthandle ichan ochan = do +transport r url transporthandle ichan ochan = do v <- liftAnnex transporthandle $ git_annex_shell r "notifychanges" [] [] case v of Nothing -> noop - Just (cmd, params) -> go cmd (toCommand params) + Just (cmd, params) -> robustly 1 $ + connect cmd (toCommand params) where - go cmd params = do - (Just toh, Just fromh, _, pid) <- createProcess (proc cmd params) + connect cmd params = do + (Just toh, Just fromh, Just errh, pid) <- + createProcess (proc cmd params) { std_in = CreatePipe , std_out = CreatePipe + , std_err = CreatePipe } - let shutdown = do - hClose toh - hClose fromh - void $ waitForProcess pid - send DISCONNECTED + -- Run all threads until one finishes and get the status + -- of the first to finish. Cancel the rest. + status <- catchDefaultIO (Right ConnectionClosed) $ + handlestderr errh + `race` handlestdout fromh + `race` handlecontrol - let fromshell = forever $ do - l <- hGetLine fromh - case parseMessage l of - Just SshRemote.READY -> send CONNECTED - Just (SshRemote.CHANGED shas) -> - whenM (checkNewShas transporthandle shas) $ - fetch - Nothing -> shutdown + send (DISCONNECTED url) + hClose toh + hClose fromh + void $ waitForProcess pid - -- The only control message that matters is STOP. - -- - -- Note that a CHANGED control message is not handled; - -- we don't push to the ssh remote. The assistant - -- and git-annex sync both handle pushes, so there's no - -- need to do it here. - let handlecontrol = forever $ do - msg <- readChan ichan - case msg of - STOP -> ioError (userError "done") - _ -> noop + return $ either (either id id) id status - -- Run both threads until one finishes. - void $ tryIO $ concurrently fromshell handlecontrol - shutdown - - send msg = writeChan ochan (msg remotename) + send msg = writeChan ochan msg fetch = do - send SYNCING + send (SYNCING url) ok <- inLocalRepo transporthandle $ - runBool [Param "fetch", Param remotename] - send (DONESYNCING ok) + runBool [Param "fetch", Param $ Git.repoDescribe r] + send (DONESYNCING url ok) + + handlestdout fromh = do + l <- hGetLine fromh + case parseMessage l of + Just SshRemote.READY -> do + send (CONNECTED url) + handlestdout fromh + Just (SshRemote.CHANGED shas) -> do + whenM (checkNewShas transporthandle shas) $ + fetch + handlestdout fromh + -- avoid reconnect on protocol error + Nothing -> return Stopping + + handlecontrol = do + msg <- readChan ichan + case msg of + STOP -> return Stopping + _ -> handlecontrol + + -- Old versions of git-annex-shell that do not support + -- the notifychanges command will exit with a not very useful + -- error message. Detect that error, and avoid reconnecting. + -- Propigate all stderr. + handlestderr errh = do + s <- hGetSomeString errh 1024 + hPutStr stderr s + hFlush stderr + if "git-annex-shell: git-shell failed" `isInfixOf` s + then do + send $ WARNING url $ unwords + [ "Remote", Git.repoDescribe r + , "needs its git-annex upgraded" + , "to 5.20140405 or newer" + ] + return Stopping + else handlestderr errh + +data Status = Stopping | ConnectionClosed + +{- Make connection robustly, with exponentioal backoff on failure. -} +robustly :: Int -> IO Status -> IO () +robustly backoff a = handle =<< catchDefaultIO ConnectionClosed a + where + handle Stopping = return () + handle ConnectionClosed = do + threadDelaySeconds (Seconds backoff) + robustly increasedbackoff a + + increasedbackoff + | b2 > maxbackoff = maxbackoff + | otherwise = b2 + where + b2 = backoff * 2 + maxbackoff = 3600 -- one hour diff --git a/RemoteDaemon/Types.hs b/RemoteDaemon/Types.hs index 025c602df0..eef7389cc6 100644 --- a/RemoteDaemon/Types.hs +++ b/RemoteDaemon/Types.hs @@ -10,15 +10,20 @@ module RemoteDaemon.Types where +import Common import qualified Annex import qualified Git.Types as Git import qualified Utility.SimpleProtocol as Proto +import Network.URI import Control.Concurrent +-- The URI of a remote is used to uniquely identify it (names change..) +newtype RemoteURI = RemoteURI URI + -- A Transport for a particular git remote consumes some messages -- from a Chan, and emits others to another Chan. -type Transport = RemoteRepo -> RemoteName -> TransportHandle -> Chan Consumed -> Chan Emitted -> IO () +type Transport = RemoteRepo -> RemoteURI -> TransportHandle -> Chan Consumed -> Chan Emitted -> IO () type RemoteRepo = Git.Repo type LocalRepo = Git.Repo @@ -28,10 +33,11 @@ data TransportHandle = TransportHandle LocalRepo (MVar Annex.AnnexState) -- Messages that the daemon emits. data Emitted - = CONNECTED RemoteName - | DISCONNECTED RemoteName - | SYNCING RemoteName - | DONESYNCING Bool RemoteName + = CONNECTED RemoteURI + | DISCONNECTED RemoteURI + | SYNCING RemoteURI + | DONESYNCING RemoteURI Bool + | WARNING RemoteURI String -- Messages that the deamon consumes. data Consumed @@ -41,7 +47,6 @@ data Consumed | RELOAD | STOP -type RemoteName = String type RefList = [Git.Ref] instance Proto.Sendable Emitted where @@ -51,8 +56,10 @@ instance Proto.Sendable Emitted where ["DISCONNECTED", Proto.serialize remote] formatMessage (SYNCING remote) = ["SYNCING", Proto.serialize remote] - formatMessage (DONESYNCING status remote) = - ["DONESYNCING", Proto.serialize status, Proto.serialize remote] + formatMessage (DONESYNCING remote status) = + ["DONESYNCING", Proto.serialize remote, Proto.serialize status] + formatMessage (WARNING remote message) = + ["WARNING", Proto.serialize remote, Proto.serialize message] instance Proto.Sendable Consumed where formatMessage PAUSE = ["PAUSE"] @@ -66,6 +73,7 @@ instance Proto.Receivable Emitted where parseCommand "DISCONNECTED" = Proto.parse1 DISCONNECTED parseCommand "SYNCING" = Proto.parse1 SYNCING parseCommand "DONESYNCING" = Proto.parse2 DONESYNCING + parseCommand "WARNING" = Proto.parse2 WARNING parseCommand _ = Proto.parseFail instance Proto.Receivable Consumed where @@ -76,6 +84,10 @@ instance Proto.Receivable Consumed where parseCommand "STOP" = Proto.parse0 STOP parseCommand _ = Proto.parseFail +instance Proto.Serializable RemoteURI where + serialize (RemoteURI u) = show u + deserialize = RemoteURI <$$> parseURI + instance Proto.Serializable [Char] where serialize = id deserialize = Just diff --git a/doc/design/git-remote-daemon.mdwn b/doc/design/git-remote-daemon.mdwn index f7de3a280b..8c74433198 100644 --- a/doc/design/git-remote-daemon.mdwn +++ b/doc/design/git-remote-daemon.mdwn @@ -69,24 +69,28 @@ the webapp. ## emitted messages -* `CONNECTED $remote` +* `CONNECTED uri` Sent when a connection has been made with a remote. -* `DISCONNECTED $remote` +* `DISCONNECTED uri` Sent when connection with a remote has been lost. -* `SYNCING $remote` +* `SYNCING uri` Indicates that a pull or a push with a remote is in progress. Always followed by DONESYNCING. -* `DONESYNCING 1|0 $remote` +* `DONESYNCING uri 1|0` Indicates that syncing with a remote is done, and either succeeded (1) or failed (0). +* `WARNING`uri string` + + A message to display to the user about a remote. + ## consumed messages * `PAUSE` @@ -166,11 +170,6 @@ TODO: * Remote system might not be available. Find a smart way to detect it, ideally w/o generating network traffic. One way might be to check if the ssh connection caching control socket exists, for example. -* Remote system might be available, and connection get lost. Should - reconnect, but needs to avoid bad behavior (ie, constant reconnect - attempts.) -* Detect if old system had a too old git-annex-shell and avoid bad - behavior. * CONNECTED and DISCONNECTED are not wired into any webapp UI; could be used to show an icon when a ssh remote is available