refactor ref change watching

Added to change notification to P2P protocol.

Switched to a TBChan so that a single long-running thread can be
started, and serve perhaps intermittent requests for change
notifications, without buffering all changes in memory.

The P2P runner currently starts up a new thread each times it waits
for a change, but that should allow later reusing a thread. Although
each connection from a peer will still need a new watcher thread to run.

The dependency on stm-chans is more or less free; some stuff in yesod
uses it, so it was already indirectly pulled in when building with the
webapp.

This commit was sponsored by Francois Marier on Patreon.
This commit is contained in:
Joey Hess 2016-12-09 14:52:38 -04:00
parent 596e1685a6
commit e152c322f8
No known key found for this signature in database
GPG key ID: C910D9222512E3C7
9 changed files with 142 additions and 53 deletions

105
Annex/ChangedRefs.hs Normal file
View file

@ -0,0 +1,105 @@
{- Waiting for changed git refs
-
- Copyright 2014-216 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Annex.ChangedRefs
( ChangedRefs(..)
, ChangedRefsHandle
, waitChangedRefs
, drainChangedRefs
, stopWatchingChangedRefs
, watchChangedRefs
) where
import Annex.Common
import Utility.DirWatcher
import Utility.DirWatcher.Types
import qualified Git
import Git.Sha
import qualified Utility.SimpleProtocol as Proto
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMChan
newtype ChangedRefs = ChangedRefs [Git.Ref]
deriving (Show)
instance Proto.Serializable ChangedRefs where
serialize (ChangedRefs l) = unwords $ map Git.fromRef l
deserialize = Just . ChangedRefs . map Git.Ref . words
data ChangedRefsHandle = ChangedRefsHandle DirWatcherHandle (TBMChan Git.Sha)
-- | Wait for one or more git refs to change.
--
-- When possible, coalesce ref writes that occur closely together
-- in time. Delay up to 0.05 seconds to get more ref writes.
waitChangedRefs :: ChangedRefsHandle -> IO ChangedRefs
waitChangedRefs (ChangedRefsHandle _ chan) = do
v <- atomically $ readTBMChan chan
case v of
Nothing -> return $ ChangedRefs []
Just r -> do
threadDelay 50000
rs <- atomically $ loop []
return $ ChangedRefs (r:rs)
where
loop rs = do
v <- tryReadTBMChan chan
case v of
Just (Just r) -> loop (r:rs)
_ -> return rs
-- | Remove any changes that might be buffered in the channel,
-- without waiting for any new changes.
drainChangedRefs :: ChangedRefsHandle -> IO ()
drainChangedRefs (ChangedRefsHandle _ chan) = atomically go
where
go = do
v <- tryReadTBMChan chan
case v of
Just (Just _) -> go
_ -> return ()
stopWatchingChangedRefs :: ChangedRefsHandle -> IO ()
stopWatchingChangedRefs h@(ChangedRefsHandle wh chan) = do
stopWatchDir wh
atomically $ closeTBMChan chan
drainChangedRefs h
watchChangedRefs :: Annex ChangedRefsHandle
watchChangedRefs = do
-- This channel is used to accumulate notifications,
-- because the DirWatcher might have multiple threads that find
-- changes at the same time. It is bounded to allow a watcher
-- to be started once and reused, without too many changes being
-- buffered in memory.
chan <- liftIO $ newTBMChanIO 100
g <- gitRepo
let refdir = Git.localGitDir g </> "refs"
liftIO $ createDirectoryIfMissing True refdir
let notifyhook = Just $ notifyHook chan
let hooks = mkWatchHooks
{ addHook = notifyhook
, modifyHook = notifyhook
}
h <- liftIO $ watchDir refdir (const False) True hooks id
return $ ChangedRefsHandle h chan
notifyHook :: TBMChan Git.Sha -> FilePath -> Maybe FileStatus -> IO ()
notifyHook chan reffile _
| ".lock" `isSuffixOf` reffile = noop
| otherwise = void $ do
sha <- catchDefaultIO Nothing $
extractSha <$> readFile reffile
-- When the channel is full, there is probably no reader
-- running, or ref changes have been occuring very fast,
-- so it's ok to not write the change to it.
maybe noop (void . atomically . tryWriteTBMChan chan) sha

View file

@ -8,6 +8,7 @@
module Command.NotifyChanges where module Command.NotifyChanges where
import Command import Command
import Annex.ChangedRefs
import Utility.DirWatcher import Utility.DirWatcher
import Utility.DirWatcher.Types import Utility.DirWatcher.Types
import qualified Git import qualified Git
@ -30,55 +31,18 @@ seek = withNothing start
start :: CommandStart start :: CommandStart
start = do start = do
-- This channel is used to accumulate notifcations, h <- watchChangedRefs
-- because the DirWatcher might have multiple threads that find
-- changes at the same time.
chan <- liftIO newTChanIO
g <- gitRepo
let refdir = Git.localGitDir g </> "refs"
liftIO $ createDirectoryIfMissing True refdir
let notifyhook = Just $ notifyHook chan
let hooks = mkWatchHooks
{ addHook = notifyhook
, modifyHook = notifyhook
}
void $ liftIO $ watchDir refdir (const False) True hooks id
let sender = do
send READY
forever $ send . CHANGED =<< drain chan
-- No messages need to be received from the caller, -- No messages need to be received from the caller,
-- but when it closes the connection, notice and terminate. -- but when it closes the connection, notice and terminate.
let receiver = forever $ void $ getProtocolLine stdin let receiver = forever $ void $ getProtocolLine stdin
let sender = forever $ send . CHANGED =<< waitChangedRefs h
liftIO $ send READY
void $ liftIO $ concurrently sender receiver void $ liftIO $ concurrently sender receiver
liftIO $ stopWatchingChangedRefs h
stop stop
notifyHook :: TChan Git.Sha -> FilePath -> Maybe FileStatus -> IO ()
notifyHook chan reffile _
| ".lock" `isSuffixOf` reffile = noop
| otherwise = void $ do
sha <- catchDefaultIO Nothing $
extractSha <$> readFile reffile
maybe noop (atomically . writeTChan chan) sha
-- When possible, coalesce ref writes that occur closely together
-- in time. Delay up to 0.05 seconds to get more ref writes.
drain :: TChan Git.Sha -> IO [Git.Sha]
drain chan = do
r <- atomically $ readTChan chan
threadDelay 50000
rs <- atomically $ drain' chan
return (r:rs)
drain' :: TChan Git.Sha -> STM [Git.Sha]
drain' chan = loop []
where
loop rs = maybe (return rs) (\r -> loop (r:rs)) =<< tryReadTChan chan
send :: Notification -> IO () send :: Notification -> IO ()
send n = do send n = do
putStrLn $ unwords $ formatMessage n putStrLn $ unwords $ formatMessage n

View file

@ -16,6 +16,7 @@ module P2P.Annex
import Annex.Common import Annex.Common
import Annex.Content import Annex.Content
import Annex.Transfer import Annex.Transfer
import Annex.ChangedRefs
import P2P.Protocol import P2P.Protocol
import P2P.IO import P2P.IO
import Logs.Location import Logs.Location
@ -114,6 +115,14 @@ runLocal runmode runner a = case a of
protoaction False protoaction False
next next
Right _ -> runner next Right _ -> runner next
WaitRefChange next -> do
v <- tryNonAsync $ bracket
watchChangedRefs
(liftIO . stopWatchingChangedRefs)
(liftIO . waitChangedRefs)
case v of
Left e -> return (Left (show e))
Right changedrefs -> runner (next changedrefs)
where where
transfer mk k af ta = case runmode of transfer mk k af ta = case runmode of
-- Update transfer logs when serving. -- Update transfer logs when serving.

View file

@ -19,6 +19,7 @@ import Utility.Applicative
import Utility.PartialPrelude import Utility.PartialPrelude
import Utility.Metered import Utility.Metered
import Git.FilePath import Git.FilePath
import Annex.ChangedRefs (ChangedRefs)
import Control.Monad import Control.Monad
import Control.Monad.Free import Control.Monad.Free
@ -50,6 +51,8 @@ data Message
| AUTH_FAILURE | AUTH_FAILURE
| CONNECT Service | CONNECT Service
| CONNECTDONE ExitCode | CONNECTDONE ExitCode
| NOTIFYCHANGE
| CHANGED ChangedRefs
| CHECKPRESENT Key | CHECKPRESENT Key
| LOCKCONTENT Key | LOCKCONTENT Key
| UNLOCKCONTENT | UNLOCKCONTENT
@ -70,6 +73,8 @@ instance Proto.Sendable Message where
formatMessage AUTH_FAILURE = ["AUTH-FAILURE"] formatMessage AUTH_FAILURE = ["AUTH-FAILURE"]
formatMessage (CONNECT service) = ["CONNECT", Proto.serialize service] formatMessage (CONNECT service) = ["CONNECT", Proto.serialize service]
formatMessage (CONNECTDONE exitcode) = ["CONNECTDONE", Proto.serialize exitcode] formatMessage (CONNECTDONE exitcode) = ["CONNECTDONE", Proto.serialize exitcode]
formatMessage NOTIFYCHANGE = ["NOTIFYCHANGE"]
formatMessage (CHANGED refs) = ["CHANGED", Proto.serialize refs]
formatMessage (CHECKPRESENT key) = ["CHECKPRESENT", Proto.serialize key] formatMessage (CHECKPRESENT key) = ["CHECKPRESENT", Proto.serialize key]
formatMessage (LOCKCONTENT key) = ["LOCKCONTENT", Proto.serialize key] formatMessage (LOCKCONTENT key) = ["LOCKCONTENT", Proto.serialize key]
formatMessage UNLOCKCONTENT = ["UNLOCKCONTENT"] formatMessage UNLOCKCONTENT = ["UNLOCKCONTENT"]
@ -89,6 +94,8 @@ instance Proto.Receivable Message where
parseCommand "AUTH-FAILURE" = Proto.parse0 AUTH_FAILURE parseCommand "AUTH-FAILURE" = Proto.parse0 AUTH_FAILURE
parseCommand "CONNECT" = Proto.parse1 CONNECT parseCommand "CONNECT" = Proto.parse1 CONNECT
parseCommand "CONNECTDONE" = Proto.parse1 CONNECTDONE parseCommand "CONNECTDONE" = Proto.parse1 CONNECTDONE
parseCommand "NOTIFYCHANGE" = Proto.parse0 NOTIFYCHANGE
parseCommand "CHANGED" = Proto.parse1 CHANGED
parseCommand "CHECKPRESENT" = Proto.parse1 CHECKPRESENT parseCommand "CHECKPRESENT" = Proto.parse1 CHECKPRESENT
parseCommand "LOCKCONTENT" = Proto.parse1 LOCKCONTENT parseCommand "LOCKCONTENT" = Proto.parse1 LOCKCONTENT
parseCommand "UNLOCKCONTENT" = Proto.parse0 UNLOCKCONTENT parseCommand "UNLOCKCONTENT" = Proto.parse0 UNLOCKCONTENT
@ -227,6 +234,8 @@ data LocalF c
-- from being deleted, while running the provided protocol -- from being deleted, while running the provided protocol
-- action. If unable to lock the content, runs the protocol action -- action. If unable to lock the content, runs the protocol action
-- with False. -- with False.
| WaitRefChange (ChangedRefs -> c)
-- ^ Waits for one or more git refs to change and returns them.
deriving (Functor) deriving (Functor)
type Local = Free LocalF type Local = Free LocalF
@ -379,6 +388,10 @@ serveAuthed myuuid = void $ serverLoop handler
handler (CONNECT service) = do handler (CONNECT service) = do
net $ relayService service net $ relayService service
return ServerContinue return ServerContinue
handler NOTIFYCHANGE = do
refs <- local waitRefChange
net $ sendMessage (CHANGED refs)
return ServerContinue
handler _ = return ServerUnexpected handler _ = return ServerUnexpected
sendContent :: Key -> AssociatedFile -> Offset -> MeterUpdate -> Proto Bool sendContent :: Key -> AssociatedFile -> Offset -> MeterUpdate -> Proto Bool

View file

@ -17,6 +17,7 @@ import Utility.SimpleProtocol
import qualified Git import qualified Git
import Git.Command import Git.Command
import Utility.ThreadScheduler import Utility.ThreadScheduler
import Annex.ChangedRefs
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.Async import Control.Concurrent.Async
@ -73,7 +74,7 @@ transportUsingCmd' cmd params (RemoteRepo r _) url transporthandle ichan ochan =
Just SshRemote.READY -> do Just SshRemote.READY -> do
send (CONNECTED url) send (CONNECTED url)
handlestdout fromh handlestdout fromh
Just (SshRemote.CHANGED shas) -> do Just (SshRemote.CHANGED (ChangedRefs shas)) -> do
whenM (checkNewShas transporthandle shas) $ whenM (checkNewShas transporthandle shas) $
fetch fetch
handlestdout fromh handlestdout fromh

View file

@ -16,11 +16,11 @@ module RemoteDaemon.Transport.Ssh.Types (
) where ) where
import qualified Utility.SimpleProtocol as Proto import qualified Utility.SimpleProtocol as Proto
import RemoteDaemon.Types (RefList) import Annex.ChangedRefs (ChangedRefs)
data Notification data Notification
= READY = READY
| CHANGED RefList | CHANGED ChangedRefs
instance Proto.Sendable Notification where instance Proto.Sendable Notification where
formatMessage READY = ["READY"] formatMessage READY = ["READY"]

View file

@ -5,7 +5,6 @@
- Licensed under the GNU GPL version 3 or higher. - Licensed under the GNU GPL version 3 or higher.
-} -}
{-# LANGUAGE TypeSynonymInstances, FlexibleInstances #-}
{-# OPTIONS_GHC -fno-warn-orphans #-} {-# OPTIONS_GHC -fno-warn-orphans #-}
module RemoteDaemon.Types where module RemoteDaemon.Types where
@ -15,6 +14,7 @@ import qualified Annex
import qualified Git.Types as Git import qualified Git.Types as Git
import qualified Utility.SimpleProtocol as Proto import qualified Utility.SimpleProtocol as Proto
import Types.GitConfig import Types.GitConfig
import Annex.ChangedRefs (ChangedRefs)
import Network.URI import Network.URI
import Control.Concurrent import Control.Concurrent
@ -52,13 +52,11 @@ data Consumed
= PAUSE = PAUSE
| LOSTNET | LOSTNET
| RESUME | RESUME
| CHANGED RefList | CHANGED ChangedRefs
| RELOAD | RELOAD
| STOP | STOP
deriving (Show) deriving (Show)
type RefList = [Git.Ref]
instance Proto.Sendable Emitted where instance Proto.Sendable Emitted where
formatMessage (CONNECTED remote) = formatMessage (CONNECTED remote) =
["CONNECTED", Proto.serialize remote] ["CONNECTED", Proto.serialize remote]
@ -100,10 +98,6 @@ instance Proto.Serializable RemoteURI where
serialize (RemoteURI u) = show u serialize (RemoteURI u) = show u
deserialize = RemoteURI <$$> parseURI deserialize = RemoteURI <$$> parseURI
instance Proto.Serializable RefList where
serialize = unwords . map Git.fromRef
deserialize = Just . map Git.Ref . words
instance Proto.Serializable Bool where instance Proto.Serializable Bool where
serialize False = "0" serialize False = "0"
serialize True = "1" serialize True = "1"

1
debian/control vendored
View file

@ -50,6 +50,7 @@ Build-Depends:
libghc-esqueleto-dev, libghc-esqueleto-dev,
libghc-securemem-dev, libghc-securemem-dev,
libghc-byteable-dev, libghc-byteable-dev,
libghc-stm-chans-dev,
libghc-dns-dev, libghc-dns-dev,
libghc-case-insensitive-dev, libghc-case-insensitive-dev,
libghc-http-types-dev, libghc-http-types-dev,

View file

@ -371,6 +371,7 @@ Executable git-annex
regex-tdfa, regex-tdfa,
socks, socks,
byteable, byteable,
stm-chans,
securemem securemem
CC-Options: -Wall CC-Options: -Wall
GHC-Options: -Wall -fno-warn-tabs GHC-Options: -Wall -fno-warn-tabs
@ -513,6 +514,7 @@ Executable git-annex
Annex.Branch.Transitions Annex.Branch.Transitions
Annex.BranchState Annex.BranchState
Annex.CatFile Annex.CatFile
Annex.ChangedRefs
Annex.CheckAttr Annex.CheckAttr
Annex.CheckIgnore Annex.CheckIgnore
Annex.Common Annex.Common