converted 2 more threads.. only 2 more to go

This commit is contained in:
Joey Hess 2012-10-29 13:09:58 -04:00
parent 0ba4df3c1a
commit 3eecb5b7bb
3 changed files with 126 additions and 132 deletions

View file

@ -199,12 +199,12 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do
, assist $ pushThread , assist $ pushThread
, assist $ pushRetryThread , assist $ pushRetryThread
, assist $ mergeThread , assist $ mergeThread
, assist $ transferWatcherThread st dstatus transferqueue , assist $ transferWatcherThread
, assist $ transferPollerThread , assist $ transferPollerThread
, assist $ transfererThread st dstatus transferqueue transferslots commitchan , assist $ transfererThread st dstatus transferqueue transferslots commitchan
, assist $ daemonStatusThread , assist $ daemonStatusThread
, assist $ sanityCheckerThread , assist $ sanityCheckerThread
, assist $ mountWatcherThread st dstatus scanremotes pushnotifier , assist $ mountWatcherThread
, assist $ netWatcherThread , assist $ netWatcherThread
, assist $ netWatcherFallbackThread , assist $ netWatcherFallbackThread
, assist $ transferScannerThread , assist $ transferScannerThread

View file

@ -11,11 +11,8 @@
module Assistant.Threads.MountWatcher where module Assistant.Threads.MountWatcher where
import Assistant.Common import Assistant.Common
import Assistant.ThreadedMonad
import Assistant.DaemonStatus import Assistant.DaemonStatus
import Assistant.ScanRemotes
import Assistant.Sync import Assistant.Sync
import Assistant.Pushes
import qualified Annex import qualified Annex
import qualified Git import qualified Git
import Utility.ThreadScheduler import Utility.ThreadScheduler
@ -39,70 +36,70 @@ import qualified Control.Exception as E
thisThread :: ThreadName thisThread :: ThreadName
thisThread = "MountWatcher" thisThread = "MountWatcher"
mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread mountWatcherThread :: NamedThread
mountWatcherThread st handle scanremotes pushnotifier = thread $ liftIO $ mountWatcherThread = NamedThread "MountWatcher" $
#if WITH_DBUS #if WITH_DBUS
dbusThread st handle scanremotes pushnotifier dbusThread
#else #else
pollingThread st handle scanremotes pushnotifier pollingThread
#endif #endif
where
thread = NamedThread thisThread
#if WITH_DBUS #if WITH_DBUS
dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () dbusThread :: Assistant ()
dbusThread st dstatus scanremotes pushnotifier = dbusThread = do
E.catch (runClient getSessionAddress go) onerr runclient <- asIO go
where r <- liftIO $ E.try $ runClient getSessionAddress runclient
go client = ifM (checkMountMonitor client) either onerr (const noop) r
( do where
{- Store the current mount points in an mvar, go client = ifM (checkMountMonitor client)
- to be compared later. We could in theory ( do
- work out the mount point from the dbus {- Store the current mount points in an MVar, to be
- message, but this is easier. -} - compared later. We could in theory work out the
mvar <- newMVar =<< currentMountPoints - mount point from the dbus message, but this is
forM_ mountChanged $ \matcher -> - easier. -}
listen client matcher $ \_event -> do mvar <- liftIO $ newMVar =<< currentMountPoints
nowmounted <- currentMountPoints handleevent <- asIO $ \_event -> do
wasmounted <- swapMVar mvar nowmounted nowmounted <- liftIO $ currentMountPoints
handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted wasmounted <- liftIO $ swapMVar mvar nowmounted
, do handleMounts wasmounted nowmounted
runThreadState st $ liftIO $ forM_ mountChanged $ \matcher ->
warning "No known volume monitor available through dbus; falling back to mtab polling" listen client matcher handleevent
pollinstead , do
) liftAnnex $
onerr :: E.SomeException -> IO () warning "No known volume monitor available through dbus; falling back to mtab polling"
onerr e = do pollingThread
{- If the session dbus fails, the user probably )
- logged out of their desktop. Even if they log onerr :: E.SomeException -> Assistant ()
- back in, we won't have access to the dbus onerr e = do
- session key, so polling is the best that can be {- If the session dbus fails, the user probably
- done in this situation. -} - logged out of their desktop. Even if they log
runThreadState st $ - back in, we won't have access to the dbus
warning $ "dbus failed; falling back to mtab polling (" ++ show e ++ ")" - session key, so polling is the best that can be
pollinstead - done in this situation. -}
pollinstead = pollingThread st dstatus scanremotes pushnotifier liftAnnex $
warning $ "dbus failed; falling back to mtab polling (" ++ show e ++ ")"
pollingThread
{- Examine the list of services connected to dbus, to see if there {- Examine the list of services connected to dbus, to see if there
- are any we can use to monitor mounts. If not, will attempt to start one. -} - are any we can use to monitor mounts. If not, will attempt to start one. -}
checkMountMonitor :: Client -> IO Bool checkMountMonitor :: Client -> Assistant Bool
checkMountMonitor client = do checkMountMonitor client = do
running <- filter (`elem` usableservices) running <- filter (`elem` usableservices)
<$> listServiceNames client <$> liftIO (listServiceNames client)
case running of case running of
[] -> startOneService client startableservices [] -> liftIO $ startOneService client startableservices
(service:_) -> do (service:_) -> do
brokendebug thisThread [ "Using running DBUS service" debug [ "Using running DBUS service"
, service , service
, "to monitor mount events." , "to monitor mount events."
] ]
return True return True
where where
startableservices = [gvfs] startableservices = [gvfs]
usableservices = startableservices ++ [kde] usableservices = startableservices ++ [kde]
gvfs = "org.gtk.Private.GduVolumeMonitor" gvfs = "org.gtk.Private.GduVolumeMonitor"
kde = "org.kde.DeviceNotifications" kde = "org.kde.DeviceNotifications"
startOneService :: Client -> [ServiceName] -> IO Bool startOneService :: Client -> [ServiceName] -> IO Bool
startOneService _ [] = return False startOneService _ [] = return False
@ -144,26 +141,29 @@ mountChanged = [gvfs True, gvfs False, kde, kdefallback]
#endif #endif
pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO () pollingThread :: Assistant ()
pollingThread st dstatus scanremotes pushnotifier = go =<< currentMountPoints pollingThread = go =<< liftIO currentMountPoints
where where
go wasmounted = do go wasmounted = do
threadDelaySeconds (Seconds 10) liftIO $ threadDelaySeconds (Seconds 10)
nowmounted <- currentMountPoints nowmounted <- liftIO currentMountPoints
handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted handleMounts wasmounted nowmounted
go nowmounted go nowmounted
handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> MountPoints -> MountPoints -> IO () handleMounts :: MountPoints -> MountPoints -> Assistant ()
handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted = handleMounts wasmounted nowmounted =
mapM_ (handleMount st dstatus scanremotes pushnotifier . mnt_dir) $ mapM_ (handleMount . mnt_dir) $
S.toList $ newMountPoints wasmounted nowmounted S.toList $ newMountPoints wasmounted nowmounted
handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> FilePath -> IO () handleMount :: FilePath -> Assistant ()
handleMount st dstatus scanremotes pushnotifier dir = do handleMount dir = do
brokendebug thisThread ["detected mount of", dir] debug ["detected mount of", dir]
reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier) rs <- filter (Git.repoIsLocal . Remote.repo) <$> remotesUnder dir
=<< filter (Git.repoIsLocal . Remote.repo) d <- getAssistant id
<$> remotesUnder st dstatus dir liftIO $
reconnectRemotes (threadName d) (threadState d)
(daemonStatusHandle d) (scanRemoteMap d)
(Just $ pushNotifier d) rs
{- Finds remotes located underneath the mount point. {- Finds remotes located underneath the mount point.
- -
@ -173,15 +173,15 @@ handleMount st dstatus scanremotes pushnotifier dir = do
- at startup time, or may have changed (it could even be a different - at startup time, or may have changed (it could even be a different
- repository at the same remote location..) - repository at the same remote location..)
-} -}
remotesUnder :: ThreadState -> DaemonStatusHandle -> FilePath -> IO [Remote] remotesUnder :: FilePath -> Assistant [Remote]
remotesUnder st dstatus dir = runThreadState st $ do remotesUnder dir = do
repotop <- fromRepo Git.repoPath repotop <- liftAnnex $ fromRepo Git.repoPath
rs <- remoteList rs <- liftAnnex remoteList
pairs <- mapM (checkremote repotop) rs pairs <- liftAnnex $ mapM (checkremote repotop) rs
let (waschanged, rs') = unzip pairs let (waschanged, rs') = unzip pairs
when (any id waschanged) $ do when (any id waschanged) $ do
Annex.changeState $ \s -> s { Annex.remotes = rs' } liftAnnex $ Annex.changeState $ \s -> s { Annex.remotes = rs' }
updateSyncRemotes dstatus liftAnnex . updateSyncRemotes =<< getAssistant daemonStatusHandle
return $ map snd $ filter fst pairs return $ map snd $ filter fst pairs
where where
checkremote repotop r = case Remote.localpath r of checkremote repotop r = case Remote.localpath r of

View file

@ -8,7 +8,6 @@
module Assistant.Threads.TransferWatcher where module Assistant.Threads.TransferWatcher where
import Assistant.Common import Assistant.Common
import Assistant.ThreadedMonad
import Assistant.DaemonStatus import Assistant.DaemonStatus
import Assistant.TransferQueue import Assistant.TransferQueue
import Assistant.Drop import Assistant.Drop
@ -20,76 +19,69 @@ import qualified Remote
import Control.Concurrent import Control.Concurrent
thisThread :: ThreadName
thisThread = "TransferWatcher"
{- This thread watches for changes to the gitAnnexTransferDir, {- This thread watches for changes to the gitAnnexTransferDir,
- and updates the DaemonStatus's map of ongoing transfers. -} - and updates the DaemonStatus's map of ongoing transfers. -}
transferWatcherThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> NamedThread transferWatcherThread :: NamedThread
transferWatcherThread st dstatus transferqueue = thread $ liftIO $ do transferWatcherThread = NamedThread "TransferWatcher" $ do
g <- runThreadState st gitRepo dir <- liftAnnex $ gitAnnexTransferDir <$> gitRepo
let dir = gitAnnexTransferDir g liftIO $ createDirectoryIfMissing True dir
createDirectoryIfMissing True dir let hook a = Just <$> asIO2 (runHandler a)
let hook a = Just $ runHandler st dstatus transferqueue a addhook <- hook onAdd
delhook <- hook onDel
modifyhook <- hook onModify
errhook <- hook onErr
let hooks = mkWatchHooks let hooks = mkWatchHooks
{ addHook = hook onAdd { addHook = addhook
, delHook = hook onDel , delHook = delhook
, modifyHook = hook onModify , modifyHook = modifyhook
, errHook = hook onErr , errHook = errhook
} }
void $ watchDir dir (const False) hooks id void $ liftIO $ watchDir dir (const False) hooks id
brokendebug thisThread ["watching for transfers"] debug ["watching for transfers"]
where
thread = NamedThread thisThread
type Handler = ThreadState -> DaemonStatusHandle -> TransferQueue -> FilePath -> Maybe FileStatus -> IO () type Handler = FilePath -> Assistant ()
{- Runs an action handler. {- Runs an action handler.
- -
- Exceptions are ignored, otherwise a whole thread could be crashed. - Exceptions are ignored, otherwise a whole thread could be crashed.
-} -}
runHandler :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Handler -> FilePath -> Maybe FileStatus -> IO () runHandler :: Handler -> FilePath -> Maybe FileStatus -> Assistant ()
runHandler st dstatus transferqueue handler file filestatus = void $ runHandler handler file _filestatus =
either print (const noop) =<< tryIO go either (liftIO . print) (const noop) =<< tryIO <~> handler file
where
go = handler st dstatus transferqueue file filestatus
{- Called when there's an error with inotify. -} {- Called when there's an error with inotify. -}
onErr :: Handler onErr :: Handler
onErr _ _ _ msg _ = error msg onErr msg = error msg
{- Called when a new transfer information file is written. -} {- Called when a new transfer information file is written. -}
onAdd :: Handler onAdd :: Handler
onAdd st dstatus _ file _ = case parseTransferFile file of onAdd file = case parseTransferFile file of
Nothing -> noop Nothing -> noop
Just t -> go t =<< runThreadState st (checkTransfer t) Just t -> go t =<< liftAnnex (checkTransfer t)
where where
go _ Nothing = noop -- transfer already finished go _ Nothing = noop -- transfer already finished
go t (Just info) = do go t (Just info) = do
brokendebug thisThread debug [ "transfer starting:", show t]
[ "transfer starting:" r <- headMaybe . filter (sameuuid t)
, show t <$> liftAnnex Remote.remoteList
] dstatus <- getAssistant daemonStatusHandle
r <- headMaybe . filter (sameuuid t) liftIO $ updateTransferInfo dstatus t info { transferRemote = r }
<$> runThreadState st Remote.remoteList sameuuid t r = Remote.uuid r == transferUUID t
updateTransferInfo dstatus t info
{ transferRemote = r }
sameuuid t r = Remote.uuid r == transferUUID t
{- Called when a transfer information file is updated. {- Called when a transfer information file is updated.
- -
- The only thing that should change in the transfer info is the - The only thing that should change in the transfer info is the
- bytesComplete, so that's the only thing updated in the DaemonStatus. -} - bytesComplete, so that's the only thing updated in the DaemonStatus. -}
onModify :: Handler onModify :: Handler
onModify _ dstatus _ file _ = do onModify file = do
case parseTransferFile file of case parseTransferFile file of
Nothing -> noop Nothing -> noop
Just t -> go t =<< readTransferInfoFile Nothing file Just t -> go t =<< liftIO (readTransferInfoFile Nothing file)
where where
go _ Nothing = noop go _ Nothing = noop
go t (Just newinfo) = alterTransferInfo t go t (Just newinfo) = alterTransferInfo t
(\i -> i { bytesComplete = bytesComplete newinfo }) (\i -> i { bytesComplete = bytesComplete newinfo })
dstatus <<~ daemonStatusHandle
{- This thread can only watch transfer sizes when the DirWatcher supports {- This thread can only watch transfer sizes when the DirWatcher supports
- tracking modificatons to files. -} - tracking modificatons to files. -}
@ -98,21 +90,19 @@ watchesTransferSize = modifyTracked
{- Called when a transfer information file is removed. -} {- Called when a transfer information file is removed. -}
onDel :: Handler onDel :: Handler
onDel st dstatus transferqueue file _ = case parseTransferFile file of onDel file = case parseTransferFile file of
Nothing -> noop Nothing -> noop
Just t -> do Just t -> do
brokendebug thisThread debug [ "transfer finishing:", show t]
[ "transfer finishing:" minfo <- flip removeTransfer t <<~ daemonStatusHandle
, show t
]
minfo <- removeTransfer dstatus t
void $ forkIO $ do finished <- asIO2 finishedTransfer
void $ liftIO $ forkIO $ do
{- XXX race workaround delay. The location {- XXX race workaround delay. The location
- log needs to be updated before finishedTransfer - log needs to be updated before finishedTransfer
- runs. -} - runs. -}
threadDelay 10000000 -- 10 seconds threadDelay 10000000 -- 10 seconds
finishedTransfer st dstatus transferqueue t minfo finished t minfo
{- Queue uploads of files we successfully downloaded, spreading them {- Queue uploads of files we successfully downloaded, spreading them
- out to other reachable remotes. - out to other reachable remotes.
@ -123,15 +113,19 @@ onDel st dstatus transferqueue file _ = case parseTransferFile file of
- Uploading a file may cause the local repo, or some other remote to not - Uploading a file may cause the local repo, or some other remote to not
- want it; handle that too. - want it; handle that too.
-} -}
finishedTransfer :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Transfer -> Maybe TransferInfo -> IO () finishedTransfer :: Transfer -> Maybe TransferInfo -> Assistant ()
finishedTransfer st dstatus transferqueue t (Just info) finishedTransfer t (Just info)
| transferDirection t == Download = runThreadState st $ | transferDirection t == Download =
whenM (inAnnex $ transferKey t) $ do whenM (liftAnnex $ inAnnex $ transferKey t) $ do
handleDrops dstatus False dstatus <- getAssistant daemonStatusHandle
transferqueue <- getAssistant transferQueue
liftAnnex $ handleDrops dstatus False
(transferKey t) (associatedFile info) (transferKey t) (associatedFile info)
queueTransfersMatching (/= transferUUID t) liftAnnex $ queueTransfersMatching (/= transferUUID t)
Later transferqueue dstatus Later transferqueue dstatus
(transferKey t) (associatedFile info) Upload (transferKey t) (associatedFile info) Upload
| otherwise = runThreadState st $ | otherwise = do
handleDrops dstatus True (transferKey t) (associatedFile info) dstatus <- getAssistant daemonStatusHandle
finishedTransfer _ _ _ _ _ = noop liftAnnex $ handleDrops dstatus True (transferKey t) (associatedFile info)
finishedTransfer _ _ = noop