diff --git a/Assistant.hs b/Assistant.hs index 6b25c3c6f1..1f41a9398f 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -153,7 +153,7 @@ startDaemon assistant foreground webappwaiter , daemonStatusThread st dstatus , sanityCheckerThread st dstatus transferqueue changechan , mountWatcherThread st dstatus scanremotes - , transferScannerThread st scanremotes transferqueue + , transferScannerThread st dstatus scanremotes transferqueue #ifdef WITH_WEBAPP , webAppThread st dstatus transferqueue webappwaiter #endif diff --git a/Assistant/DaemonStatus.hs b/Assistant/DaemonStatus.hs index 52165138e6..3610c2fdad 100644 --- a/Assistant/DaemonStatus.hs +++ b/Assistant/DaemonStatus.hs @@ -36,6 +36,7 @@ data DaemonStatus = DaemonStatus -- Ordered list of remotes to talk to. , knownRemotes :: [Remote] -- Clients can use this to wait on changes to the DaemonStatus + -- and other related things like the TransferQueue. , notificationBroadcaster :: NotificationBroadcaster } @@ -72,6 +73,12 @@ modifyDaemonStatus handle a = do sendNotification nb return b +{- Can be used to send a notification that the daemon status, or other + - associated thing, like the TransferQueue, has changed. -} +notifyDaemonStatusChange :: DaemonStatusHandle -> IO () +notifyDaemonStatusChange handle = sendNotification + =<< notificationBroadcaster <$> atomically (readTMVar handle) + {- Updates the cached ordered list of remotes from the list in Annex - state. -} updateKnownRemotes :: DaemonStatusHandle -> Annex () @@ -164,7 +171,16 @@ afterLastDaemonRun timestamp status = maybe False (< t) (lastRunning status) tenMinutes :: Int tenMinutes = 10 * 60 -{- Mutates the transfer map. -} +{- Mutates the transfer map. Runs in STM so that the transfer map can + - be modified in the same transaction that modifies the transfer queue. + - Note that this does not send a notification of the change; that's left + - to the caller. -} +adjustTransfersSTM :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> STM () +adjustTransfersSTM dstatus a = do + s <- takeTMVar dstatus + putTMVar dstatus $ s { currentTransfers = a (currentTransfers s) } + +{- Variant that does send notifications. -} adjustTransfers :: DaemonStatusHandle -> (TransferMap -> TransferMap) -> IO () adjustTransfers dstatus a = modifyDaemonStatus_ dstatus $ \s -> s { currentTransfers = a (currentTransfers s) } diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs index e76cbe81d3..e6a078907b 100644 --- a/Assistant/Threads/TransferScanner.hs +++ b/Assistant/Threads/TransferScanner.hs @@ -11,6 +11,7 @@ import Assistant.Common import Assistant.ScanRemotes import Assistant.TransferQueue import Assistant.ThreadedMonad +import Assistant.DaemonStatus import Logs.Transfer import Logs.Location import qualified Remote @@ -25,20 +26,20 @@ thisThread = "TransferScanner" {- This thread waits until a remote needs to be scanned, to find transfers - that need to be made, to keep data in sync. -} -transferScannerThread :: ThreadState -> ScanRemoteMap -> TransferQueue -> IO () -transferScannerThread st scanremotes transferqueue = do +transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> IO () +transferScannerThread st dstatus scanremotes transferqueue = do runEvery (Seconds 2) $ do r <- getScanRemote scanremotes liftIO $ debug thisThread ["starting scan of", show r] - scan st transferqueue r + scan st dstatus transferqueue r liftIO $ debug thisThread ["finished scan of", show r] where {- This is a naive scan through the git work tree. - - The scan is blocked when the transfer queue gets too large. -} -scan :: ThreadState -> TransferQueue -> Remote -> IO () -scan st transferqueue r = do +scan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Remote -> IO () +scan st dstatus transferqueue r = do g <- runThreadState st $ fromRepo id files <- LsFiles.inRepo [] g go files @@ -63,7 +64,7 @@ scan st transferqueue r = do | otherwise = return Nothing u = Remote.uuid r - enqueue f t = queueTransferAt smallsize Later transferqueue (Just f) t r + enqueue f t = queueTransferAt smallsize Later transferqueue dstatus (Just f) t r smallsize = 10 {- Look directly in remote for the key when it's cheap; diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs index 30802f7428..f011ff0363 100644 --- a/Assistant/Threads/Transferrer.hs +++ b/Assistant/Threads/Transferrer.hs @@ -34,12 +34,18 @@ transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Transf transfererThread st dstatus transferqueue slots = go where go = do - (t, info) <- getNextTransfer transferqueue + (t, info) <- getNextTransfer transferqueue dstatus ifM (runThreadState st $ shouldTransfer dstatus t info) ( do debug thisThread [ "Transferring:" , show t ] + notifyDaemonStatusChange dstatus transferThread st dstatus slots t info - , debug thisThread [ "Skipping unnecessary transfer:" , show t ] + , do + debug thisThread [ "Skipping unnecessary transfer:" , show t ] + -- getNextTransfer added t to the + -- daemonstatus's transfer map. + void $ removeTransfer dstatus t + notifyDaemonStatusChange dstatus ) go diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs index 2f09813eb0..51ed5c9c78 100644 --- a/Assistant/TransferQueue.hs +++ b/Assistant/TransferQueue.hs @@ -23,6 +23,7 @@ import Types.Remote import qualified Remote import Control.Concurrent.STM +import qualified Data.Map as M {- The transfer queue consists of a channel listing the transfers to make; - the size of the queue is also tracked, and a list is maintained @@ -58,8 +59,8 @@ stubInfo f r = TransferInfo {- Adds transfers to queue for some of the known remotes. -} queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex () -queueTransfers schedule q daemonstatus k f direction = do - rs <- knownRemotes <$> liftIO (getDaemonStatus daemonstatus) +queueTransfers schedule q dstatus k f direction = do + rs <- knownRemotes <$> liftIO (getDaemonStatus dstatus) mapM_ go =<< sufficientremotes rs where sufficientremotes rs @@ -80,37 +81,48 @@ queueTransfers schedule q daemonstatus k f direction = do , transferKey = k , transferUUID = Remote.uuid r } - go r = liftIO $ atomically $ - enqueue schedule q (gentransfer r) (stubInfo f r) + go r = liftIO $ + enqueue schedule q dstatus (gentransfer r) (stubInfo f r) -enqueue :: Schedule -> TransferQueue -> Transfer -> TransferInfo -> STM () -enqueue schedule q t info +enqueue :: Schedule -> TransferQueue -> DaemonStatusHandle -> Transfer -> TransferInfo -> IO () +enqueue schedule q dstatus t info | schedule == Next = go unGetTChan (new:) | otherwise = go writeTChan (\l -> l++[new]) where new = (t, info) go modqueue modlist = do - void $ modqueue (queue q) new - void $ modifyTVar' (queuesize q) succ - void $ modifyTVar' (queuelist q) modlist + atomically $ do + void $ modqueue (queue q) new + void $ modifyTVar' (queuesize q) succ + void $ modifyTVar' (queuelist q) modlist + void $ notifyDaemonStatusChange dstatus {- Adds a transfer to the queue. -} -queueTransfer :: Schedule -> TransferQueue -> AssociatedFile -> Transfer -> Remote -> IO () -queueTransfer schedule q f t remote = atomically $ - enqueue schedule q t (stubInfo f remote) +queueTransfer :: Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO () +queueTransfer schedule q dstatus f t remote = + enqueue schedule q dstatus t (stubInfo f remote) {- Blocks until the queue is no larger than a given size, and then adds a - transfer to the queue. -} -queueTransferAt :: Integer -> Schedule -> TransferQueue -> AssociatedFile -> Transfer -> Remote -> IO () -queueTransferAt wantsz schedule q f t remote = atomically $ do - sz <- readTVar (queuesize q) - if sz <= wantsz - then enqueue schedule q t (stubInfo f remote) - else retry -- blocks until queuesize changes +queueTransferAt :: Integer -> Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO () +queueTransferAt wantsz schedule q dstatus f t remote = do + atomically $ do + sz <- readTVar (queuesize q) + if sz <= wantsz + then return () + else retry -- blocks until queuesize changes + enqueue schedule q dstatus t (stubInfo f remote) -{- Blocks until a pending transfer is available from the queue. -} -getNextTransfer :: TransferQueue -> IO (Transfer, TransferInfo) -getNextTransfer q = atomically $ do +{- Blocks until a pending transfer is available from the queue. + - The transfer is removed from the transfer queue, and added to + - the daemon status currentTransfers map. This is done in a single STM + - transaction, so there is no window where an observer sees an + - inconsistent status. -} +getNextTransfer :: TransferQueue -> DaemonStatusHandle -> IO (Transfer, TransferInfo) +getNextTransfer q dstatus = atomically $ do void $ modifyTVar' (queuesize q) pred void $ modifyTVar' (queuelist q) (drop 1) - readTChan (queue q) + r@(t, info) <- readTChan (queue q) + adjustTransfersSTM dstatus $ + M.insertWith' const t info + return r