send notifications when the TransferQueue is changed

The fun part was making it move things from TransferQueue to currentTransfers
entirely atomically. Which will avoid inconsistent display if the WebApp
renders the current status at just the wrong time. STM to the rescue!
This commit is contained in:
Joey Hess 2012-07-28 18:47:24 -04:00
parent 3cc1885793
commit e31277d38a
5 changed files with 67 additions and 32 deletions

View file

@ -153,7 +153,7 @@ startDaemon assistant foreground webappwaiter
, daemonStatusThread st dstatus , daemonStatusThread st dstatus
, sanityCheckerThread st dstatus transferqueue changechan , sanityCheckerThread st dstatus transferqueue changechan
, mountWatcherThread st dstatus scanremotes , mountWatcherThread st dstatus scanremotes
, transferScannerThread st scanremotes transferqueue , transferScannerThread st dstatus scanremotes transferqueue
#ifdef WITH_WEBAPP #ifdef WITH_WEBAPP
, webAppThread st dstatus transferqueue webappwaiter , webAppThread st dstatus transferqueue webappwaiter
#endif #endif

View file

@ -36,6 +36,7 @@ data DaemonStatus = DaemonStatus
-- Ordered list of remotes to talk to. -- Ordered list of remotes to talk to.
, knownRemotes :: [Remote] , knownRemotes :: [Remote]
-- Clients can use this to wait on changes to the DaemonStatus -- Clients can use this to wait on changes to the DaemonStatus
-- and other related things like the TransferQueue.
, notificationBroadcaster :: NotificationBroadcaster , notificationBroadcaster :: NotificationBroadcaster
} }
@ -72,6 +73,12 @@ modifyDaemonStatus handle a = do
sendNotification nb sendNotification nb
return b return b
{- Can be used to send a notification that the daemon status, or other
- associated thing, like the TransferQueue, has changed. -}
notifyDaemonStatusChange :: DaemonStatusHandle -> IO ()
notifyDaemonStatusChange handle = sendNotification
=<< notificationBroadcaster <$> atomically (readTMVar handle)
{- Updates the cached ordered list of remotes from the list in Annex {- Updates the cached ordered list of remotes from the list in Annex
- state. -} - state. -}
updateKnownRemotes :: DaemonStatusHandle -> Annex () updateKnownRemotes :: DaemonStatusHandle -> Annex ()
@ -164,7 +171,16 @@ afterLastDaemonRun timestamp status = maybe False (< t) (lastRunning status)
tenMinutes :: Int tenMinutes :: Int
tenMinutes = 10 * 60 tenMinutes = 10 * 60
{- Mutates the transfer map. -} {- Mutates the transfer map. Runs in STM so that the transfer map can
- be modified in the same transaction that modifies the transfer queue.
- Note that this does not send a notification of the change; that's left
- to the caller. -}
adjustTransfersSTM :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> STM ()
adjustTransfersSTM dstatus a = do
s <- takeTMVar dstatus
putTMVar dstatus $ s { currentTransfers = a (currentTransfers s) }
{- Variant that does send notifications. -}
adjustTransfers :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> IO () adjustTransfers :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> IO ()
adjustTransfers dstatus a = modifyDaemonStatus_ dstatus $ adjustTransfers dstatus a = modifyDaemonStatus_ dstatus $
\s -> s { currentTransfers = a (currentTransfers s) } \s -> s { currentTransfers = a (currentTransfers s) }

View file

@ -11,6 +11,7 @@ import Assistant.Common
import Assistant.ScanRemotes import Assistant.ScanRemotes
import Assistant.TransferQueue import Assistant.TransferQueue
import Assistant.ThreadedMonad import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Logs.Transfer import Logs.Transfer
import Logs.Location import Logs.Location
import qualified Remote import qualified Remote
@ -25,20 +26,20 @@ thisThread = "TransferScanner"
{- This thread waits until a remote needs to be scanned, to find transfers {- This thread waits until a remote needs to be scanned, to find transfers
- that need to be made, to keep data in sync. - that need to be made, to keep data in sync.
-} -}
transferScannerThread :: ThreadState -> ScanRemoteMap -> TransferQueue -> IO () transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> IO ()
transferScannerThread st scanremotes transferqueue = do transferScannerThread st dstatus scanremotes transferqueue = do
runEvery (Seconds 2) $ do runEvery (Seconds 2) $ do
r <- getScanRemote scanremotes r <- getScanRemote scanremotes
liftIO $ debug thisThread ["starting scan of", show r] liftIO $ debug thisThread ["starting scan of", show r]
scan st transferqueue r scan st dstatus transferqueue r
liftIO $ debug thisThread ["finished scan of", show r] liftIO $ debug thisThread ["finished scan of", show r]
where where
{- This is a naive scan through the git work tree. {- This is a naive scan through the git work tree.
- -
- The scan is blocked when the transfer queue gets too large. -} - The scan is blocked when the transfer queue gets too large. -}
scan :: ThreadState -> TransferQueue -> Remote -> IO () scan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Remote -> IO ()
scan st transferqueue r = do scan st dstatus transferqueue r = do
g <- runThreadState st $ fromRepo id g <- runThreadState st $ fromRepo id
files <- LsFiles.inRepo [] g files <- LsFiles.inRepo [] g
go files go files
@ -63,7 +64,7 @@ scan st transferqueue r = do
| otherwise = return Nothing | otherwise = return Nothing
u = Remote.uuid r u = Remote.uuid r
enqueue f t = queueTransferAt smallsize Later transferqueue (Just f) t r enqueue f t = queueTransferAt smallsize Later transferqueue dstatus (Just f) t r
smallsize = 10 smallsize = 10
{- Look directly in remote for the key when it's cheap; {- Look directly in remote for the key when it's cheap;

View file

@ -34,12 +34,18 @@ transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Transf
transfererThread st dstatus transferqueue slots = go transfererThread st dstatus transferqueue slots = go
where where
go = do go = do
(t, info) <- getNextTransfer transferqueue (t, info) <- getNextTransfer transferqueue dstatus
ifM (runThreadState st $ shouldTransfer dstatus t info) ifM (runThreadState st $ shouldTransfer dstatus t info)
( do ( do
debug thisThread [ "Transferring:" , show t ] debug thisThread [ "Transferring:" , show t ]
notifyDaemonStatusChange dstatus
transferThread st dstatus slots t info transferThread st dstatus slots t info
, debug thisThread [ "Skipping unnecessary transfer:" , show t ] , do
debug thisThread [ "Skipping unnecessary transfer:" , show t ]
-- getNextTransfer added t to the
-- daemonstatus's transfer map.
void $ removeTransfer dstatus t
notifyDaemonStatusChange dstatus
) )
go go

View file

@ -23,6 +23,7 @@ import Types.Remote
import qualified Remote import qualified Remote
import Control.Concurrent.STM import Control.Concurrent.STM
import qualified Data.Map as M
{- The transfer queue consists of a channel listing the transfers to make; {- The transfer queue consists of a channel listing the transfers to make;
- the size of the queue is also tracked, and a list is maintained - the size of the queue is also tracked, and a list is maintained
@ -58,8 +59,8 @@ stubInfo f r = TransferInfo
{- Adds transfers to queue for some of the known remotes. -} {- Adds transfers to queue for some of the known remotes. -}
queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex () queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
queueTransfers schedule q daemonstatus k f direction = do queueTransfers schedule q dstatus k f direction = do
rs <- knownRemotes <$> liftIO (getDaemonStatus daemonstatus) rs <- knownRemotes <$> liftIO (getDaemonStatus dstatus)
mapM_ go =<< sufficientremotes rs mapM_ go =<< sufficientremotes rs
where where
sufficientremotes rs sufficientremotes rs
@ -80,37 +81,48 @@ queueTransfers schedule q daemonstatus k f direction = do
, transferKey = k , transferKey = k
, transferUUID = Remote.uuid r , transferUUID = Remote.uuid r
} }
go r = liftIO $ atomically $ go r = liftIO $
enqueue schedule q (gentransfer r) (stubInfo f r) enqueue schedule q dstatus (gentransfer r) (stubInfo f r)
enqueue :: Schedule -> TransferQueue -> Transfer -> TransferInfo -> STM () enqueue :: Schedule -> TransferQueue -> DaemonStatusHandle -> Transfer -> TransferInfo -> IO ()
enqueue schedule q t info enqueue schedule q dstatus t info
| schedule == Next = go unGetTChan (new:) | schedule == Next = go unGetTChan (new:)
| otherwise = go writeTChan (\l -> l++[new]) | otherwise = go writeTChan (\l -> l++[new])
where where
new = (t, info) new = (t, info)
go modqueue modlist = do go modqueue modlist = do
atomically $ do
void $ modqueue (queue q) new void $ modqueue (queue q) new
void $ modifyTVar' (queuesize q) succ void $ modifyTVar' (queuesize q) succ
void $ modifyTVar' (queuelist q) modlist void $ modifyTVar' (queuelist q) modlist
void $ notifyDaemonStatusChange dstatus
{- Adds a transfer to the queue. -} {- Adds a transfer to the queue. -}
queueTransfer :: Schedule -> TransferQueue -> AssociatedFile -> Transfer -> Remote -> IO () queueTransfer :: Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
queueTransfer schedule q f t remote = atomically $ queueTransfer schedule q dstatus f t remote =
enqueue schedule q t (stubInfo f remote) enqueue schedule q dstatus t (stubInfo f remote)
{- Blocks until the queue is no larger than a given size, and then adds a {- Blocks until the queue is no larger than a given size, and then adds a
- transfer to the queue. -} - transfer to the queue. -}
queueTransferAt :: Integer -> Schedule -> TransferQueue -> AssociatedFile -> Transfer -> Remote -> IO () queueTransferAt :: Integer -> Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
queueTransferAt wantsz schedule q f t remote = atomically $ do queueTransferAt wantsz schedule q dstatus f t remote = do
atomically $ do
sz <- readTVar (queuesize q) sz <- readTVar (queuesize q)
if sz <= wantsz if sz <= wantsz
then enqueue schedule q t (stubInfo f remote) then return ()
else retry -- blocks until queuesize changes else retry -- blocks until queuesize changes
enqueue schedule q dstatus t (stubInfo f remote)
{- Blocks until a pending transfer is available from the queue. -} {- Blocks until a pending transfer is available from the queue.
getNextTransfer :: TransferQueue -> IO (Transfer, TransferInfo) - The transfer is removed from the transfer queue, and added to
getNextTransfer q = atomically $ do - the daemon status currentTransfers map. This is done in a single STM
- transaction, so there is no window where an observer sees an
- inconsistent status. -}
getNextTransfer :: TransferQueue -> DaemonStatusHandle -> IO (Transfer, TransferInfo)
getNextTransfer q dstatus = atomically $ do
void $ modifyTVar' (queuesize q) pred void $ modifyTVar' (queuesize q) pred
void $ modifyTVar' (queuelist q) (drop 1) void $ modifyTVar' (queuelist q) (drop 1)
readTChan (queue q) r@(t, info) <- readTChan (queue q)
adjustTransfersSTM dstatus $
M.insertWith' const t info
return r