remotedaemon: Fix problem that could prevent ssh connections being made after two LOSTNET messages were received in a row
Perhaps due to two different network interfaces being brought down. Since there is no reliable way to drain a Chan, I switched to STM TChan.
This commit is contained in:
parent
ec90116851
commit
1ce8367417
6 changed files with 44 additions and 19 deletions
|
@ -20,24 +20,25 @@ import Utility.SimpleProtocol
|
||||||
import Config
|
import Config
|
||||||
import Annex.Ssh
|
import Annex.Ssh
|
||||||
|
|
||||||
import Control.Concurrent.Async
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
|
import Control.Concurrent.Async
|
||||||
|
import Control.Concurrent.STM
|
||||||
import Network.URI
|
import Network.URI
|
||||||
import qualified Data.Map as M
|
import qualified Data.Map as M
|
||||||
|
|
||||||
runForeground :: IO ()
|
runForeground :: IO ()
|
||||||
runForeground = do
|
runForeground = do
|
||||||
(readh, writeh) <- ioHandles
|
(readh, writeh) <- ioHandles
|
||||||
ichan <- newChan :: IO (Chan Consumed)
|
ichan <- newTChanIO :: IO (TChan Consumed)
|
||||||
ochan <- newChan :: IO (Chan Emitted)
|
ochan <- newTChanIO :: IO (TChan Emitted)
|
||||||
|
|
||||||
let reader = forever $ do
|
let reader = forever $ do
|
||||||
l <- hGetLine readh
|
l <- hGetLine readh
|
||||||
case parseMessage l of
|
case parseMessage l of
|
||||||
Nothing -> error $ "protocol error: " ++ l
|
Nothing -> error $ "protocol error: " ++ l
|
||||||
Just cmd -> writeChan ichan cmd
|
Just cmd -> atomically $ writeTChan ichan cmd
|
||||||
let writer = forever $ do
|
let writer = forever $ do
|
||||||
msg <- readChan ochan
|
msg <- atomically $ readTChan ochan
|
||||||
hPutStrLn writeh $ unwords $ formatMessage msg
|
hPutStrLn writeh $ unwords $ formatMessage msg
|
||||||
hFlush writeh
|
hFlush writeh
|
||||||
let controller = runController ichan ochan
|
let controller = runController ichan ochan
|
||||||
|
@ -46,11 +47,11 @@ runForeground = do
|
||||||
void $ tryIO $
|
void $ tryIO $
|
||||||
reader `concurrently` writer `concurrently` controller
|
reader `concurrently` writer `concurrently` controller
|
||||||
|
|
||||||
type RemoteMap = M.Map Git.Repo (IO (), Chan Consumed)
|
type RemoteMap = M.Map Git.Repo (IO (), TChan Consumed)
|
||||||
|
|
||||||
-- Runs the transports, dispatching messages to them, and handling
|
-- Runs the transports, dispatching messages to them, and handling
|
||||||
-- the main control messages.
|
-- the main control messages.
|
||||||
runController :: Chan Consumed -> Chan Emitted -> IO ()
|
runController :: TChan Consumed -> TChan Emitted -> IO ()
|
||||||
runController ichan ochan = do
|
runController ichan ochan = do
|
||||||
h <- genTransportHandle
|
h <- genTransportHandle
|
||||||
m <- genRemoteMap h ochan
|
m <- genRemoteMap h ochan
|
||||||
|
@ -58,7 +59,7 @@ runController ichan ochan = do
|
||||||
go h False m
|
go h False m
|
||||||
where
|
where
|
||||||
go h paused m = do
|
go h paused m = do
|
||||||
cmd <- readChan ichan
|
cmd <- atomically $ readTChan ichan
|
||||||
case cmd of
|
case cmd of
|
||||||
RELOAD -> do
|
RELOAD -> do
|
||||||
h' <- updateTransportHandle h
|
h' <- updateTransportHandle h
|
||||||
|
@ -88,22 +89,28 @@ runController ichan ochan = do
|
||||||
-- All remaining messages are sent to
|
-- All remaining messages are sent to
|
||||||
-- all Transports.
|
-- all Transports.
|
||||||
msg -> do
|
msg -> do
|
||||||
unless paused $
|
unless paused $ atomically $
|
||||||
forM_ chans (`writeChan` msg)
|
forM_ chans (`writeTChan` msg)
|
||||||
go h paused m
|
go h paused m
|
||||||
where
|
where
|
||||||
chans = map snd (M.elems m)
|
chans = map snd (M.elems m)
|
||||||
|
|
||||||
startrunning m = forM_ (M.elems m) startrunning'
|
startrunning m = forM_ (M.elems m) startrunning'
|
||||||
startrunning' (transport, _) = void $ async transport
|
startrunning' (transport, c) = do
|
||||||
|
-- drain any old control messages from the channel
|
||||||
|
-- to avoid confusing the transport with them
|
||||||
|
atomically $ drain c
|
||||||
|
void $ async transport
|
||||||
|
|
||||||
broadcast msg m = forM_ (M.elems m) send
|
drain c = maybe noop (const $ drain c) =<< tryReadTChan c
|
||||||
|
|
||||||
|
broadcast msg m = atomically $ forM_ (M.elems m) send
|
||||||
where
|
where
|
||||||
send (_, c) = writeChan c msg
|
send (_, c) = writeTChan c msg
|
||||||
|
|
||||||
-- Generates a map with a transport for each supported remote in the git repo,
|
-- Generates a map with a transport for each supported remote in the git repo,
|
||||||
-- except those that have annex.sync = false
|
-- except those that have annex.sync = false
|
||||||
genRemoteMap :: TransportHandle -> Chan Emitted -> IO RemoteMap
|
genRemoteMap :: TransportHandle -> TChan Emitted -> IO RemoteMap
|
||||||
genRemoteMap h@(TransportHandle g _) ochan =
|
genRemoteMap h@(TransportHandle g _) ochan =
|
||||||
M.fromList . catMaybes <$> mapM gen (Git.remotes g)
|
M.fromList . catMaybes <$> mapM gen (Git.remotes g)
|
||||||
where
|
where
|
||||||
|
@ -111,7 +118,7 @@ genRemoteMap h@(TransportHandle g _) ochan =
|
||||||
Git.Url u -> case M.lookup (uriScheme u) remoteTransports of
|
Git.Url u -> case M.lookup (uriScheme u) remoteTransports of
|
||||||
Just transport
|
Just transport
|
||||||
| remoteAnnexSync (extractRemoteGitConfig r (Git.repoDescribe r)) -> do
|
| remoteAnnexSync (extractRemoteGitConfig r (Git.repoDescribe r)) -> do
|
||||||
ichan <- newChan :: IO (Chan Consumed)
|
ichan <- newTChanIO :: IO (TChan Consumed)
|
||||||
return $ Just
|
return $ Just
|
||||||
( r
|
( r
|
||||||
, (transport r (RemoteURI u) h ichan ochan, ichan)
|
, (transport r (RemoteURI u) h ichan ochan, ichan)
|
||||||
|
|
|
@ -18,7 +18,7 @@ import qualified Git
|
||||||
import Git.Command
|
import Git.Command
|
||||||
import Utility.ThreadScheduler
|
import Utility.ThreadScheduler
|
||||||
|
|
||||||
import Control.Concurrent.Chan
|
import Control.Concurrent.STM
|
||||||
import Control.Concurrent.Async
|
import Control.Concurrent.Async
|
||||||
|
|
||||||
transport :: Transport
|
transport :: Transport
|
||||||
|
@ -58,7 +58,7 @@ transport' r url transporthandle ichan ochan = do
|
||||||
|
|
||||||
return $ either (either id id) id status
|
return $ either (either id id) id status
|
||||||
|
|
||||||
send msg = writeChan ochan msg
|
send msg = atomically $ writeTChan ochan msg
|
||||||
|
|
||||||
fetch = do
|
fetch = do
|
||||||
send (SYNCING url)
|
send (SYNCING url)
|
||||||
|
@ -80,7 +80,7 @@ transport' r url transporthandle ichan ochan = do
|
||||||
Nothing -> return Stopping
|
Nothing -> return Stopping
|
||||||
|
|
||||||
handlecontrol = do
|
handlecontrol = do
|
||||||
msg <- readChan ichan
|
msg <- atomically $ readTChan ichan
|
||||||
case msg of
|
case msg of
|
||||||
STOP -> return Stopping
|
STOP -> return Stopping
|
||||||
LOSTNET -> return Stopping
|
LOSTNET -> return Stopping
|
||||||
|
|
|
@ -17,6 +17,7 @@ import qualified Utility.SimpleProtocol as Proto
|
||||||
|
|
||||||
import Network.URI
|
import Network.URI
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
|
import Control.Concurrent.STM
|
||||||
|
|
||||||
-- The URI of a remote is used to uniquely identify it (names change..)
|
-- The URI of a remote is used to uniquely identify it (names change..)
|
||||||
newtype RemoteURI = RemoteURI URI
|
newtype RemoteURI = RemoteURI URI
|
||||||
|
@ -24,7 +25,7 @@ newtype RemoteURI = RemoteURI URI
|
||||||
|
|
||||||
-- A Transport for a particular git remote consumes some messages
|
-- A Transport for a particular git remote consumes some messages
|
||||||
-- from a Chan, and emits others to another Chan.
|
-- from a Chan, and emits others to another Chan.
|
||||||
type Transport = RemoteRepo -> RemoteURI -> TransportHandle -> Chan Consumed -> Chan Emitted -> IO ()
|
type Transport = RemoteRepo -> RemoteURI -> TransportHandle -> TChan Consumed -> TChan Emitted -> IO ()
|
||||||
|
|
||||||
type RemoteRepo = Git.Repo
|
type RemoteRepo = Git.Repo
|
||||||
type LocalRepo = Git.Repo
|
type LocalRepo = Git.Repo
|
||||||
|
|
3
debian/changelog
vendored
3
debian/changelog
vendored
|
@ -6,6 +6,9 @@ git-annex (5.20150114) UNRELEASED; urgency=medium
|
||||||
so comes last and --fast will disable it.
|
so comes last and --fast will disable it.
|
||||||
* Git remote info now includes the date of the last sync with the remote.
|
* Git remote info now includes the date of the last sync with the remote.
|
||||||
* sync: Added --message/-m option like git commit.
|
* sync: Added --message/-m option like git commit.
|
||||||
|
* remotedaemon: Fix problem that could prevent ssh connections being
|
||||||
|
made after two LOSTNET messages were received in a row (perhaps due to
|
||||||
|
two different network interfaces being brought down).
|
||||||
|
|
||||||
-- Joey Hess <id@joeyh.name> Tue, 13 Jan 2015 17:03:39 -0400
|
-- Joey Hess <id@joeyh.name> Tue, 13 Jan 2015 17:03:39 -0400
|
||||||
|
|
||||||
|
|
|
@ -73,3 +73,5 @@ Everything up-to-date
|
||||||
"""]]
|
"""]]
|
||||||
|
|
||||||
[[!tag confirmed]]
|
[[!tag confirmed]]
|
||||||
|
|
||||||
|
> [[fixed|done]] --[[Joey]]
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
[[!comment format=mdwn
|
||||||
|
username="joey"
|
||||||
|
subject="""comment 2"""
|
||||||
|
date="2015-01-15T19:17:20Z"
|
||||||
|
content="""
|
||||||
|
Also, you were spot on about the cause being LOSTNET messages getting
|
||||||
|
queued up. Clearing that queue when restarting the transport
|
||||||
|
will fix this problem.
|
||||||
|
|
||||||
|
Please bring your non-haskell code analysis skills to bear on git-annex
|
||||||
|
anytim! :)
|
||||||
|
"""]]
|
Loading…
Reference in a new issue