add a NotificationBroadcaster to DaemonStatus
First use of it is to make the status checkpointer thread block until there is really a change to the status.
This commit is contained in:
parent
ca478b7bcb
commit
a17fde22fa
2 changed files with 50 additions and 30 deletions
|
@ -11,6 +11,7 @@ import Common.Annex
|
|||
import Assistant.ThreadedMonad
|
||||
import Utility.ThreadScheduler
|
||||
import Utility.TempFile
|
||||
import Utility.NotificationBroadcaster
|
||||
import Logs.Transfer
|
||||
import qualified Command.Sync
|
||||
|
||||
|
@ -34,31 +35,43 @@ data DaemonStatus = DaemonStatus
|
|||
, currentTransfers :: TransferMap
|
||||
-- Ordered list of remotes to talk to.
|
||||
, knownRemotes :: [Remote]
|
||||
-- Clients can use this to wait on changes to the DaemonStatus
|
||||
, notificationBroadcaster :: NotificationBroadcaster
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
type TransferMap = M.Map Transfer TransferInfo
|
||||
|
||||
type DaemonStatusHandle = MVar DaemonStatus
|
||||
|
||||
newDaemonStatus :: DaemonStatus
|
||||
newDaemonStatus = DaemonStatus
|
||||
newDaemonStatus :: IO DaemonStatus
|
||||
newDaemonStatus = do
|
||||
nb <- newNotificationBroadcaster
|
||||
return $ DaemonStatus
|
||||
{ scanComplete = False
|
||||
, lastRunning = Nothing
|
||||
, sanityCheckRunning = False
|
||||
, lastSanityCheck = Nothing
|
||||
, currentTransfers = M.empty
|
||||
, knownRemotes = []
|
||||
, notificationBroadcaster = nb
|
||||
}
|
||||
|
||||
getDaemonStatus :: DaemonStatusHandle -> Annex DaemonStatus
|
||||
getDaemonStatus = liftIO . readMVar
|
||||
|
||||
modifyDaemonStatus_ :: DaemonStatusHandle -> (DaemonStatus -> DaemonStatus) -> Annex ()
|
||||
modifyDaemonStatus_ handle a = liftIO $ modifyMVar_ handle (return . a)
|
||||
modifyDaemonStatus_ handle a = do
|
||||
nb <- liftIO $ modifyMVar handle $ \s -> return
|
||||
(a s, notificationBroadcaster s)
|
||||
liftIO $ sendNotification nb
|
||||
|
||||
modifyDaemonStatus :: DaemonStatusHandle -> (DaemonStatus -> (DaemonStatus, b)) -> Annex b
|
||||
modifyDaemonStatus handle a = liftIO $ modifyMVar handle (return . a)
|
||||
modifyDaemonStatus handle a = do
|
||||
(b, nb) <- liftIO $ modifyMVar handle $ \s -> do
|
||||
let (s', b) = a s
|
||||
return $ (s', (b, notificationBroadcaster s))
|
||||
liftIO $ sendNotification nb
|
||||
return b
|
||||
|
||||
{- Updates the cached ordered list of remotes from the list in Annex
|
||||
- state. -}
|
||||
|
@ -74,7 +87,7 @@ startDaemonStatus :: Annex DaemonStatusHandle
|
|||
startDaemonStatus = do
|
||||
file <- fromRepo gitAnnexDaemonStatusFile
|
||||
status <- liftIO $
|
||||
catchDefaultIO (readDaemonStatusFile file) newDaemonStatus
|
||||
catchDefaultIO (readDaemonStatusFile file) =<< newDaemonStatus
|
||||
transfers <- M.fromList <$> getTransfers
|
||||
remotes <- Command.Sync.syncRemotes []
|
||||
liftIO $ newMVar status
|
||||
|
@ -84,11 +97,18 @@ startDaemonStatus = do
|
|||
, knownRemotes = remotes
|
||||
}
|
||||
|
||||
{- This thread wakes up periodically and writes the daemon status to disk. -}
|
||||
{- This writes the daemon status to disk, when it changes, but no more
|
||||
- frequently than once every ten minutes.
|
||||
-}
|
||||
daemonStatusThread :: ThreadState -> DaemonStatusHandle -> IO ()
|
||||
daemonStatusThread st handle = do
|
||||
bhandle <- runThreadState st $
|
||||
liftIO . newNotificationHandle
|
||||
=<< notificationBroadcaster <$> getDaemonStatus handle
|
||||
checkpoint
|
||||
runEvery (Seconds tenMinutes) $ do
|
||||
liftIO $ waitNotification bhandle
|
||||
checkpoint
|
||||
runEvery (Seconds tenMinutes) checkpoint
|
||||
where
|
||||
checkpoint = runThreadState st $ do
|
||||
file <- fromRepo gitAnnexDaemonStatusFile
|
||||
|
@ -109,9 +129,9 @@ writeDaemonStatusFile file status =
|
|||
]
|
||||
|
||||
readDaemonStatusFile :: FilePath -> IO DaemonStatus
|
||||
readDaemonStatusFile file = parse <$> readFile file
|
||||
readDaemonStatusFile file = parse <$> newDaemonStatus <*> readFile file
|
||||
where
|
||||
parse = foldr parseline newDaemonStatus . lines
|
||||
parse status = foldr parseline status . lines
|
||||
parseline line status
|
||||
| key == "lastRunning" = parseval readtime $ \v ->
|
||||
status { lastRunning = Just v }
|
||||
|
|
|
@ -11,10 +11,10 @@
|
|||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
module Assistant.NotificationBroadCaster (
|
||||
NotificationBroadCaster,
|
||||
module Utility.NotificationBroadcaster (
|
||||
NotificationBroadcaster,
|
||||
NotificationHandle,
|
||||
newNotificationBroadCaster,
|
||||
newNotificationBroadcaster,
|
||||
newNotificationHandle,
|
||||
notificationHandleToId,
|
||||
notificationHandleFromId,
|
||||
|
@ -28,21 +28,21 @@ import Control.Concurrent.STM
|
|||
import Control.Concurrent.SampleVar
|
||||
|
||||
{- One SampleVar per client. The TMVar is never empty, so never blocks. -}
|
||||
type NotificationBroadCaster = TMVar [SampleVar ()]
|
||||
type NotificationBroadcaster = TMVar [SampleVar ()]
|
||||
|
||||
{- Handle given out to an individual client. -}
|
||||
data NotificationHandle = NotificationHandle NotificationBroadCaster Int
|
||||
data NotificationHandle = NotificationHandle NotificationBroadcaster Int
|
||||
|
||||
newNotificationBroadCaster :: IO NotificationBroadCaster
|
||||
newNotificationBroadCaster = atomically (newTMVar [])
|
||||
newNotificationBroadcaster :: IO NotificationBroadcaster
|
||||
newNotificationBroadcaster = atomically (newTMVar [])
|
||||
|
||||
{- Allocates a notification handle for a client to use. -}
|
||||
newNotificationHandle :: NotificationBroadCaster -> IO NotificationHandle
|
||||
newNotificationHandle :: NotificationBroadcaster -> IO NotificationHandle
|
||||
newNotificationHandle b = NotificationHandle
|
||||
<$> pure b
|
||||
<*> addclient b
|
||||
<*> addclient
|
||||
where
|
||||
addclient b = do
|
||||
addclient = do
|
||||
s <- newEmptySampleVar
|
||||
atomically $ do
|
||||
l <- readTMVar b
|
||||
|
@ -54,13 +54,13 @@ newNotificationHandle b = NotificationHandle
|
|||
notificationHandleToId :: NotificationHandle -> Int
|
||||
notificationHandleToId (NotificationHandle _ i) = i
|
||||
|
||||
{- Given a NotificationBroadCaster, and an Int identifier, recreates the
|
||||
{- Given a NotificationBroadcaster, and an Int identifier, recreates the
|
||||
- NotificationHandle. -}
|
||||
notificationHandleFromId :: NotificationBroadCaster -> Int -> NotificationHandle
|
||||
notificationHandleFromId :: NotificationBroadcaster -> Int -> NotificationHandle
|
||||
notificationHandleFromId = NotificationHandle
|
||||
|
||||
{- Sends a notification to all clients. -}
|
||||
sendNotification :: NotificationBroadCaster -> IO ()
|
||||
sendNotification :: NotificationBroadcaster -> IO ()
|
||||
sendNotification b = do
|
||||
l <- atomically $ readTMVar b
|
||||
mapM_ notify l
|
||||
|
|
Loading…
Reference in a new issue