diff --git a/Assistant.hs b/Assistant.hs index 58561371d4..5b8b236e69 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -199,12 +199,12 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do , assist $ pushThread , assist $ pushRetryThread , assist $ mergeThread - , assist $ transferWatcherThread st dstatus transferqueue + , assist $ transferWatcherThread , assist $ transferPollerThread , assist $ transfererThread st dstatus transferqueue transferslots commitchan , assist $ daemonStatusThread , assist $ sanityCheckerThread - , assist $ mountWatcherThread st dstatus scanremotes pushnotifier + , assist $ mountWatcherThread , assist $ netWatcherThread , assist $ netWatcherFallbackThread , assist $ transferScannerThread diff --git a/Assistant/Threads/MountWatcher.hs b/Assistant/Threads/MountWatcher.hs index f36bb88742..cb08071f57 100644 --- a/Assistant/Threads/MountWatcher.hs +++ b/Assistant/Threads/MountWatcher.hs @@ -11,11 +11,8 @@ module Assistant.Threads.MountWatcher where import Assistant.Common -import Assistant.ThreadedMonad import Assistant.DaemonStatus -import Assistant.ScanRemotes import Assistant.Sync -import Assistant.Pushes import qualified Annex import qualified Git import Utility.ThreadScheduler @@ -39,70 +36,70 @@ import qualified Control.Exception as E thisThread :: ThreadName thisThread = "MountWatcher" -mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread -mountWatcherThread st handle scanremotes pushnotifier = thread $ liftIO $ +mountWatcherThread :: NamedThread +mountWatcherThread = NamedThread "MountWatcher" $ #if WITH_DBUS - dbusThread st handle scanremotes pushnotifier + dbusThread #else - pollingThread st handle scanremotes pushnotifier + pollingThread #endif - where - thread = NamedThread thisThread #if WITH_DBUS -dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () -dbusThread st dstatus scanremotes pushnotifier = - E.catch (runClient getSessionAddress go) onerr - where - go client = ifM (checkMountMonitor client) - ( do - {- Store the current mount points in an mvar, - - to be compared later. We could in theory - - work out the mount point from the dbus - - message, but this is easier. -} - mvar <- newMVar =<< currentMountPoints - forM_ mountChanged $ \matcher -> - listen client matcher $ \_event -> do - nowmounted <- currentMountPoints - wasmounted <- swapMVar mvar nowmounted - handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted - , do - runThreadState st $ - warning "No known volume monitor available through dbus; falling back to mtab polling" - pollinstead - ) - onerr :: E.SomeException -> IO () - onerr e = do - {- If the session dbus fails, the user probably - - logged out of their desktop. Even if they log - - back in, we won't have access to the dbus - - session key, so polling is the best that can be - - done in this situation. -} - runThreadState st $ - warning $ "dbus failed; falling back to mtab polling (" ++ show e ++ ")" - pollinstead - pollinstead = pollingThread st dstatus scanremotes pushnotifier +dbusThread :: Assistant () +dbusThread = do + runclient <- asIO go + r <- liftIO $ E.try $ runClient getSessionAddress runclient + either onerr (const noop) r + where + go client = ifM (checkMountMonitor client) + ( do + {- Store the current mount points in an MVar, to be + - compared later. We could in theory work out the + - mount point from the dbus message, but this is + - easier. -} + mvar <- liftIO $ newMVar =<< currentMountPoints + handleevent <- asIO $ \_event -> do + nowmounted <- liftIO $ currentMountPoints + wasmounted <- liftIO $ swapMVar mvar nowmounted + handleMounts wasmounted nowmounted + liftIO $ forM_ mountChanged $ \matcher -> + listen client matcher handleevent + , do + liftAnnex $ + warning "No known volume monitor available through dbus; falling back to mtab polling" + pollingThread + ) + onerr :: E.SomeException -> Assistant () + onerr e = do + {- If the session dbus fails, the user probably + - logged out of their desktop. Even if they log + - back in, we won't have access to the dbus + - session key, so polling is the best that can be + - done in this situation. -} + liftAnnex $ + warning $ "dbus failed; falling back to mtab polling (" ++ show e ++ ")" + pollingThread {- Examine the list of services connected to dbus, to see if there - are any we can use to monitor mounts. If not, will attempt to start one. -} -checkMountMonitor :: Client -> IO Bool +checkMountMonitor :: Client -> Assistant Bool checkMountMonitor client = do running <- filter (`elem` usableservices) - <$> listServiceNames client + <$> liftIO (listServiceNames client) case running of - [] -> startOneService client startableservices + [] -> liftIO $ startOneService client startableservices (service:_) -> do - brokendebug thisThread [ "Using running DBUS service" + debug [ "Using running DBUS service" , service , "to monitor mount events." ] return True - where - startableservices = [gvfs] - usableservices = startableservices ++ [kde] - gvfs = "org.gtk.Private.GduVolumeMonitor" - kde = "org.kde.DeviceNotifications" + where + startableservices = [gvfs] + usableservices = startableservices ++ [kde] + gvfs = "org.gtk.Private.GduVolumeMonitor" + kde = "org.kde.DeviceNotifications" startOneService :: Client -> [ServiceName] -> IO Bool startOneService _ [] = return False @@ -144,26 +141,29 @@ mountChanged = [gvfs True, gvfs False, kde, kdefallback] #endif -pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () -pollingThread st dstatus scanremotes pushnotifier = go =<< currentMountPoints +pollingThread :: Assistant () +pollingThread = go =<< liftIO currentMountPoints where go wasmounted = do - threadDelaySeconds (Seconds 10) - nowmounted <- currentMountPoints - handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted + liftIO $ threadDelaySeconds (Seconds 10) + nowmounted <- liftIO currentMountPoints + handleMounts wasmounted nowmounted go nowmounted -handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> MountPoints -> MountPoints -> IO () -handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted = - mapM_ (handleMount st dstatus scanremotes pushnotifier . mnt_dir) $ +handleMounts :: MountPoints -> MountPoints -> Assistant () +handleMounts wasmounted nowmounted = + mapM_ (handleMount . mnt_dir) $ S.toList $ newMountPoints wasmounted nowmounted -handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> FilePath -> IO () -handleMount st dstatus scanremotes pushnotifier dir = do - brokendebug thisThread ["detected mount of", dir] - reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier) - =<< filter (Git.repoIsLocal . Remote.repo) - <$> remotesUnder st dstatus dir +handleMount :: FilePath -> Assistant () +handleMount dir = do + debug ["detected mount of", dir] + rs <- filter (Git.repoIsLocal . Remote.repo) <$> remotesUnder dir + d <- getAssistant id + liftIO $ + reconnectRemotes (threadName d) (threadState d) + (daemonStatusHandle d) (scanRemoteMap d) + (Just $ pushNotifier d) rs {- Finds remotes located underneath the mount point. - @@ -173,15 +173,15 @@ handleMount st dstatus scanremotes pushnotifier dir = do - at startup time, or may have changed (it could even be a different - repository at the same remote location..) -} -remotesUnder :: ThreadState -> DaemonStatusHandle -> FilePath -> IO [Remote] -remotesUnder st dstatus dir = runThreadState st $ do - repotop <- fromRepo Git.repoPath - rs <- remoteList - pairs <- mapM (checkremote repotop) rs +remotesUnder :: FilePath -> Assistant [Remote] +remotesUnder dir = do + repotop <- liftAnnex $ fromRepo Git.repoPath + rs <- liftAnnex remoteList + pairs <- liftAnnex $ mapM (checkremote repotop) rs let (waschanged, rs') = unzip pairs when (any id waschanged) $ do - Annex.changeState $ \s -> s { Annex.remotes = rs' } - updateSyncRemotes dstatus + liftAnnex $ Annex.changeState $ \s -> s { Annex.remotes = rs' } + liftAnnex . updateSyncRemotes =<< getAssistant daemonStatusHandle return $ map snd $ filter fst pairs where checkremote repotop r = case Remote.localpath r of diff --git a/Assistant/Threads/TransferWatcher.hs b/Assistant/Threads/TransferWatcher.hs index bb195e519f..ad341b00a5 100644 --- a/Assistant/Threads/TransferWatcher.hs +++ b/Assistant/Threads/TransferWatcher.hs @@ -8,7 +8,6 @@ module Assistant.Threads.TransferWatcher where import Assistant.Common -import Assistant.ThreadedMonad import Assistant.DaemonStatus import Assistant.TransferQueue import Assistant.Drop @@ -20,76 +19,69 @@ import qualified Remote import Control.Concurrent -thisThread :: ThreadName -thisThread = "TransferWatcher" - {- This thread watches for changes to the gitAnnexTransferDir, - and updates the DaemonStatus's map of ongoing transfers. -} -transferWatcherThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> NamedThread -transferWatcherThread st dstatus transferqueue = thread $ liftIO $ do - g <- runThreadState st gitRepo - let dir = gitAnnexTransferDir g - createDirectoryIfMissing True dir - let hook a = Just $ runHandler st dstatus transferqueue a +transferWatcherThread :: NamedThread +transferWatcherThread = NamedThread "TransferWatcher" $ do + dir <- liftAnnex $ gitAnnexTransferDir <$> gitRepo + liftIO $ createDirectoryIfMissing True dir + let hook a = Just <$> asIO2 (runHandler a) + addhook <- hook onAdd + delhook <- hook onDel + modifyhook <- hook onModify + errhook <- hook onErr let hooks = mkWatchHooks - { addHook = hook onAdd - , delHook = hook onDel - , modifyHook = hook onModify - , errHook = hook onErr + { addHook = addhook + , delHook = delhook + , modifyHook = modifyhook + , errHook = errhook } - void $ watchDir dir (const False) hooks id - brokendebug thisThread ["watching for transfers"] - where - thread = NamedThread thisThread + void $ liftIO $ watchDir dir (const False) hooks id + debug ["watching for transfers"] -type Handler = ThreadState -> DaemonStatusHandle -> TransferQueue -> FilePath -> Maybe FileStatus -> IO () +type Handler = FilePath -> Assistant () {- Runs an action handler. - - Exceptions are ignored, otherwise a whole thread could be crashed. -} -runHandler :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Handler -> FilePath -> Maybe FileStatus -> IO () -runHandler st dstatus transferqueue handler file filestatus = void $ - either print (const noop) =<< tryIO go - where - go = handler st dstatus transferqueue file filestatus +runHandler :: Handler -> FilePath -> Maybe FileStatus -> Assistant () +runHandler handler file _filestatus = + either (liftIO . print) (const noop) =<< tryIO <~> handler file {- Called when there's an error with inotify. -} onErr :: Handler -onErr _ _ _ msg _ = error msg +onErr msg = error msg {- Called when a new transfer information file is written. -} onAdd :: Handler -onAdd st dstatus _ file _ = case parseTransferFile file of +onAdd file = case parseTransferFile file of Nothing -> noop - Just t -> go t =<< runThreadState st (checkTransfer t) - where - go _ Nothing = noop -- transfer already finished - go t (Just info) = do - brokendebug thisThread - [ "transfer starting:" - , show t - ] - r <- headMaybe . filter (sameuuid t) - <$> runThreadState st Remote.remoteList - updateTransferInfo dstatus t info - { transferRemote = r } - sameuuid t r = Remote.uuid r == transferUUID t + Just t -> go t =<< liftAnnex (checkTransfer t) + where + go _ Nothing = noop -- transfer already finished + go t (Just info) = do + debug [ "transfer starting:", show t] + r <- headMaybe . filter (sameuuid t) + <$> liftAnnex Remote.remoteList + dstatus <- getAssistant daemonStatusHandle + liftIO $ updateTransferInfo dstatus t info { transferRemote = r } + sameuuid t r = Remote.uuid r == transferUUID t {- Called when a transfer information file is updated. - - The only thing that should change in the transfer info is the - bytesComplete, so that's the only thing updated in the DaemonStatus. -} onModify :: Handler -onModify _ dstatus _ file _ = do +onModify file = do case parseTransferFile file of Nothing -> noop - Just t -> go t =<< readTransferInfoFile Nothing file + Just t -> go t =<< liftIO (readTransferInfoFile Nothing file) where go _ Nothing = noop go t (Just newinfo) = alterTransferInfo t (\i -> i { bytesComplete = bytesComplete newinfo }) - dstatus + <<~ daemonStatusHandle {- This thread can only watch transfer sizes when the DirWatcher supports - tracking modificatons to files. -} @@ -98,21 +90,19 @@ watchesTransferSize = modifyTracked {- Called when a transfer information file is removed. -} onDel :: Handler -onDel st dstatus transferqueue file _ = case parseTransferFile file of +onDel file = case parseTransferFile file of Nothing -> noop Just t -> do - brokendebug thisThread - [ "transfer finishing:" - , show t - ] - minfo <- removeTransfer dstatus t + debug [ "transfer finishing:", show t] + minfo <- flip removeTransfer t <<~ daemonStatusHandle - void $ forkIO $ do + finished <- asIO2 finishedTransfer + void $ liftIO $ forkIO $ do {- XXX race workaround delay. The location - log needs to be updated before finishedTransfer - runs. -} threadDelay 10000000 -- 10 seconds - finishedTransfer st dstatus transferqueue t minfo + finished t minfo {- Queue uploads of files we successfully downloaded, spreading them - out to other reachable remotes. @@ -123,15 +113,19 @@ onDel st dstatus transferqueue file _ = case parseTransferFile file of - Uploading a file may cause the local repo, or some other remote to not - want it; handle that too. -} -finishedTransfer :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Transfer -> Maybe TransferInfo -> IO () -finishedTransfer st dstatus transferqueue t (Just info) - | transferDirection t == Download = runThreadState st $ - whenM (inAnnex $ transferKey t) $ do - handleDrops dstatus False +finishedTransfer :: Transfer -> Maybe TransferInfo -> Assistant () +finishedTransfer t (Just info) + | transferDirection t == Download = + whenM (liftAnnex $ inAnnex $ transferKey t) $ do + dstatus <- getAssistant daemonStatusHandle + transferqueue <- getAssistant transferQueue + liftAnnex $ handleDrops dstatus False (transferKey t) (associatedFile info) - queueTransfersMatching (/= transferUUID t) + liftAnnex $ queueTransfersMatching (/= transferUUID t) Later transferqueue dstatus (transferKey t) (associatedFile info) Upload - | otherwise = runThreadState st $ - handleDrops dstatus True (transferKey t) (associatedFile info) -finishedTransfer _ _ _ _ _ = noop + | otherwise = do + dstatus <- getAssistant daemonStatusHandle + liftAnnex $ handleDrops dstatus True (transferKey t) (associatedFile info) +finishedTransfer _ _ = noop +