only start ref change watcher thread once per P2P connection
This is more efficient. Note that the peer will get CHANGED messages for all refs changed since the connection opened, even if those changes happened before it sent NOTIFYCHANGE.
This commit is contained in:
parent
e152c322f8
commit
f7687e0876
2 changed files with 29 additions and 23 deletions
21
P2P/Annex.hs
21
P2P/Annex.hs
|
@ -25,10 +25,8 @@ import Utility.Metered
|
||||||
|
|
||||||
import Control.Monad.Free
|
import Control.Monad.Free
|
||||||
|
|
||||||
-- When we're serving a peer, we know their uuid, and can use it to update
|
|
||||||
-- transfer logs.
|
|
||||||
data RunMode
|
data RunMode
|
||||||
= Serving UUID
|
= Serving UUID ChangedRefsHandle
|
||||||
| Client
|
| Client
|
||||||
|
|
||||||
-- Full interpreter for Proto, that can receive and send objects.
|
-- Full interpreter for Proto, that can receive and send objects.
|
||||||
|
@ -115,18 +113,17 @@ runLocal runmode runner a = case a of
|
||||||
protoaction False
|
protoaction False
|
||||||
next
|
next
|
||||||
Right _ -> runner next
|
Right _ -> runner next
|
||||||
WaitRefChange next -> do
|
WaitRefChange next -> case runmode of
|
||||||
v <- tryNonAsync $ bracket
|
Serving _ h -> do
|
||||||
watchChangedRefs
|
v <- tryNonAsync $ liftIO $ waitChangedRefs h
|
||||||
(liftIO . stopWatchingChangedRefs)
|
case v of
|
||||||
(liftIO . waitChangedRefs)
|
Left e -> return (Left (show e))
|
||||||
case v of
|
Right changedrefs -> runner (next changedrefs)
|
||||||
Left e -> return (Left (show e))
|
_ -> return $ Left "change notification not implemented for client"
|
||||||
Right changedrefs -> runner (next changedrefs)
|
|
||||||
where
|
where
|
||||||
transfer mk k af ta = case runmode of
|
transfer mk k af ta = case runmode of
|
||||||
-- Update transfer logs when serving.
|
-- Update transfer logs when serving.
|
||||||
Serving theiruuid ->
|
Serving theiruuid _ ->
|
||||||
mk theiruuid k af noRetry ta noNotification
|
mk theiruuid k af noRetry ta noNotification
|
||||||
-- Transfer logs are updated higher in the stack when
|
-- Transfer logs are updated higher in the stack when
|
||||||
-- a client.
|
-- a client.
|
||||||
|
|
|
@ -10,6 +10,7 @@ module RemoteDaemon.Transport.Tor (server) where
|
||||||
import Common
|
import Common
|
||||||
import qualified Annex
|
import qualified Annex
|
||||||
import Annex.Concurrent
|
import Annex.Concurrent
|
||||||
|
import Annex.ChangedRefs
|
||||||
import RemoteDaemon.Types
|
import RemoteDaemon.Types
|
||||||
import RemoteDaemon.Common
|
import RemoteDaemon.Common
|
||||||
import Utility.Tor
|
import Utility.Tor
|
||||||
|
@ -71,12 +72,18 @@ maxConnections :: Int
|
||||||
maxConnections = 10
|
maxConnections = 10
|
||||||
|
|
||||||
serveClient :: TransportHandle -> UUID -> Repo -> TBQueue Handle -> IO ()
|
serveClient :: TransportHandle -> UUID -> Repo -> TBQueue Handle -> IO ()
|
||||||
serveClient th u r q = bracket setup cleanup go
|
serveClient th u r q = bracket setup cleanup start
|
||||||
where
|
where
|
||||||
setup = atomically $ readTBQueue q
|
setup = do
|
||||||
cleanup = hClose
|
h <- atomically $ readTBQueue q
|
||||||
go h = do
|
|
||||||
debugM "remotedaemon" "serving a Tor connection"
|
debugM "remotedaemon" "serving a Tor connection"
|
||||||
|
return h
|
||||||
|
|
||||||
|
cleanup h = do
|
||||||
|
debugM "remotedaemon" "done with Tor connection"
|
||||||
|
hClose h
|
||||||
|
|
||||||
|
start h = do
|
||||||
-- Avoid doing any work in the liftAnnex, since only one
|
-- Avoid doing any work in the liftAnnex, since only one
|
||||||
-- can run at a time.
|
-- can run at a time.
|
||||||
st <- liftAnnex th dupState
|
st <- liftAnnex th dupState
|
||||||
|
@ -92,16 +99,18 @@ serveClient th u r q = bracket setup cleanup go
|
||||||
}
|
}
|
||||||
v <- liftIO $ runNetProto conn $ serveAuth u
|
v <- liftIO $ runNetProto conn $ serveAuth u
|
||||||
case v of
|
case v of
|
||||||
Right (Just theiruuid) -> void $ do
|
Right (Just theiruuid) -> authed conn theiruuid
|
||||||
v' <- runFullProto (Serving theiruuid) conn $
|
|
||||||
serveAuthed u
|
|
||||||
case v' of
|
|
||||||
Right () -> return ()
|
|
||||||
Left e -> liftIO $ debugM "remotedaemon" ("Tor connection error: " ++ e)
|
|
||||||
Right Nothing -> liftIO $
|
Right Nothing -> liftIO $
|
||||||
debugM "remotedaemon" "Tor connection failed to authenticate"
|
debugM "remotedaemon" "Tor connection failed to authenticate"
|
||||||
Left e -> liftIO $
|
Left e -> liftIO $
|
||||||
debugM "remotedaemon" ("Tor connection error before authentication: " ++ e)
|
debugM "remotedaemon" ("Tor connection error before authentication: " ++ e)
|
||||||
-- Merge the duplicated state back in.
|
-- Merge the duplicated state back in.
|
||||||
liftAnnex th $ mergeState st'
|
liftAnnex th $ mergeState st'
|
||||||
debugM "remotedaemon" "done with Tor connection"
|
|
||||||
|
authed conn theiruuid =
|
||||||
|
bracket watchChangedRefs (liftIO . stopWatchingChangedRefs) $ \crh -> do
|
||||||
|
v' <- runFullProto (Serving theiruuid crh) conn $
|
||||||
|
serveAuthed u
|
||||||
|
case v' of
|
||||||
|
Right () -> return ()
|
||||||
|
Left e -> liftIO $ debugM "remotedaemon" ("Tor connection error: " ++ e)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue