Assistant monad, stage 2.5

Converted several threads to run in the monad.

Added a lot of useful combinators for working with the monad.

Now the monad includes the name of the thread.

Some debugging messages are disabled pending converting other threads.
This commit is contained in:
Joey Hess 2012-10-29 02:21:04 -04:00
parent 4e765327ca
commit 4dbdc2b666
29 changed files with 299 additions and 280 deletions

View file

@ -179,7 +179,7 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do
go = do
d <- getAssistant id
st <- getAssistant threadState
dstatus <- getAssistant daemonStatus
dstatus <- getAssistant daemonStatusHandle
changechan <- getAssistant changeChan
commitchan <- getAssistant commitChan
pushmap <- getAssistant failedPushMap
@ -189,7 +189,7 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do
branchhandle <- getAssistant branchChangeHandle
pushnotifier <- getAssistant pushNotifier
#ifdef WITH_WEBAPP
urlrenderer <- liftIO $ newUrlRenderer
urlrenderer <- liftIO newUrlRenderer
#endif
mapM_ (startthread d)
[ watch $ commitThread st changechan commitchan transferqueue dstatus
@ -203,13 +203,13 @@ startAssistant assistant daemonize webappwaiter = withThreadState $ \st -> do
, assist $ pushRetryThread st dstatus pushmap pushnotifier
, assist $ mergeThread st dstatus transferqueue branchhandle
, assist $ transferWatcherThread st dstatus transferqueue
, assist $ transferPollerThread st dstatus
, assist $ transferPollerThread
, assist $ transfererThread st dstatus transferqueue transferslots commitchan
, assist $ daemonStatusThread st dstatus
, assist $ sanityCheckerThread st dstatus transferqueue changechan
, assist $ daemonStatusThread
, assist $ sanityCheckerThread
, assist $ mountWatcherThread st dstatus scanremotes pushnotifier
, assist $ netWatcherThread st dstatus scanremotes pushnotifier
, assist $ netWatcherFallbackThread st dstatus scanremotes pushnotifier
, assist $ netWatcherThread
, assist $ netWatcherFallbackThread
, assist $ transferScannerThread st dstatus scanremotes transferqueue
, assist $ configMonitorThread st dstatus branchhandle commitchan
#ifdef WITH_XMPP

View file

@ -10,7 +10,8 @@ module Assistant.Common (
ThreadName,
NamedThread(..),
runNamedThread,
debug
debug,
brokendebug
) where
import Common.Annex as X
@ -22,25 +23,28 @@ import System.Log.Logger
import qualified Control.Exception as E
type ThreadName = String
data NamedThread = NamedThread ThreadName (IO ())
data NamedThread = NamedThread ThreadName (Assistant ())
debug :: ThreadName -> [String] -> IO ()
debug threadname ws = debugM threadname $ unwords $ (threadname ++ ":") : ws
brokendebug :: ThreadName -> [String] -> IO ()
brokendebug _ _ = noop -- TODO remove this
debug :: [String] -> Assistant ()
debug ws = do
name <- getAssistant threadName
liftIO $ debugM name $ unwords $ (name ++ ":") : ws
runNamedThread :: NamedThread -> Assistant ()
runNamedThread (NamedThread name a) = liftIO . go =<< getAssistant daemonStatus
where
go dstatus = do
r <- E.try a :: IO (Either E.SomeException ())
case r of
Right _ -> noop
Left e -> do
let msg = unwords
[ name
, "crashed:"
, show e
]
hPutStrLn stderr msg
-- TODO click to restart
void $ addAlert dstatus $
warningAlert name msg
runNamedThread (NamedThread name a) = do
d <- getAssistant id
liftIO . go $ d { threadName = name }
where
go d = do
r <- E.try (runAssistant a d) :: IO (Either E.SomeException ())
case r of
Right _ -> noop
Left e -> do
let msg = unwords [name, "crashed:", show e]
hPutStrLn stderr msg
-- TODO click to restart
void $ addAlert (daemonStatusHandle d) $
warningAlert name msg

View file

@ -181,8 +181,8 @@ adjustTransfersSTM dstatus a = do
putTMVar dstatus $ s { currentTransfers = a (currentTransfers s) }
{- Alters a transfer's info, if the transfer is in the map. -}
alterTransferInfo :: DaemonStatusHandle -> Transfer -> (TransferInfo -> TransferInfo) -> IO ()
alterTransferInfo dstatus t a = updateTransferInfo' dstatus $ M.adjust a t
alterTransferInfo :: Transfer -> (TransferInfo -> TransferInfo) -> DaemonStatusHandle -> IO ()
alterTransferInfo t a dstatus = updateTransferInfo' dstatus $ M.adjust a t
{- Updates a transfer's info. Adds the transfer to the map if necessary,
- or if already present, updates it while preserving the old transferTid,

View file

@ -13,7 +13,12 @@ module Assistant.Monad (
newAssistantData,
runAssistant,
getAssistant,
liftAnnex
liftAnnex,
(<~>),
(<<~),
daemonStatus,
asIO,
asIO2,
) where
import "mtl" Control.Monad.Reader
@ -43,8 +48,9 @@ instance MonadBase IO Assistant where
liftBase = Assistant . liftBase
data AssistantData = AssistantData
{ threadState :: ThreadState
, daemonStatus :: DaemonStatusHandle
{ threadName :: String
, threadState :: ThreadState
, daemonStatusHandle :: DaemonStatusHandle
, scanRemoteMap :: ScanRemoteMap
, transferQueue :: TransferQueue
, transferSlots :: TransferSlots
@ -57,7 +63,8 @@ data AssistantData = AssistantData
newAssistantData :: ThreadState -> DaemonStatusHandle -> IO AssistantData
newAssistantData st dstatus = AssistantData
<$> pure st
<$> pure "main"
<*> pure st
<*> pure dstatus
<*> newScanRemoteMap
<*> newTransferQueue
@ -81,3 +88,28 @@ liftAnnex :: Annex a -> Assistant a
liftAnnex a = do
st <- reader threadState
liftIO $ runThreadState st a
{- Runs an IO action, passing it an IO action that runs an Assistant action. -}
(<~>) :: (IO a -> IO b) -> Assistant a -> Assistant b
io <~> a = do
d <- reader id
liftIO $ io $ runAssistant a d
{- Creates an IO action that will run an Assistant action when run. -}
asIO :: (a -> Assistant b) -> Assistant (a -> IO b)
asIO a = do
d <- reader id
return $ \v -> runAssistant (a v) d
{- Creates an IO action that will run an Assistant action when run. -}
asIO2 :: (a -> b -> Assistant c) -> Assistant (a -> b -> IO c)
asIO2 a = do
d <- reader id
return $ \v1 v2 -> runAssistant (a v1 v2) d
{- Runs an IO action on a selected field of the AssistantData. -}
(<<~) :: (a -> IO b) -> (AssistantData -> a) -> Assistant b
io <<~ v = reader v >>= liftIO . io
daemonStatus :: Assistant DaemonStatus
daemonStatus = getDaemonStatus <<~ daemonStatusHandle

View file

@ -93,7 +93,7 @@ pushToRemotes threadname now st mpushnotifier mpushmap remotes = do
where
go _ Nothing _ _ _ = return True -- no branch, so nothing to do
go shouldretry (Just branch) g u rs = do
debug threadname
brokendebug threadname
[ "pushing to"
, show rs
]
@ -117,12 +117,12 @@ pushToRemotes threadname now st mpushnotifier mpushmap remotes = do
makemap l = M.fromList $ zip l (repeat now)
retry branch g u rs = do
debug threadname [ "trying manual pull to resolve failed pushes" ]
brokendebug threadname [ "trying manual pull to resolve failed pushes" ]
void $ manualPull st (Just branch) rs
go False (Just branch) g u rs
fallback branch g u rs = do
debug threadname
brokendebug threadname
[ "fallback pushing to"
, show rs
]

View file

@ -42,7 +42,7 @@ thisThread = "Committer"
{- This thread makes git commits at appropriate times. -}
commitThread :: ThreadState -> ChangeChan -> CommitChan -> TransferQueue -> DaemonStatusHandle -> NamedThread
commitThread st changechan commitchan transferqueue dstatus = thread $ do
commitThread st changechan commitchan transferqueue dstatus = thread $ liftIO $ do
delayadd <- runThreadState st $
maybe delayaddDefault (Just . Seconds) . readish
<$> getConfig (annexConfig "delayadd") ""
@ -58,7 +58,7 @@ commitThread st changechan commitchan transferqueue dstatus = thread $ do
readychanges <- handleAdds delayadd st changechan transferqueue dstatus changes
if shouldCommit time readychanges
then do
debug thisThread
brokendebug thisThread
[ "committing"
, show (length readychanges)
, "changes"
@ -72,7 +72,7 @@ commitThread st changechan commitchan transferqueue dstatus = thread $ do
thread = NamedThread thisThread
refill [] = noop
refill cs = do
debug thisThread
brokendebug thisThread
[ "delaying commit of"
, show (length cs)
, "changes"

View file

@ -38,7 +38,7 @@ thisThread = "ConfigMonitor"
- be detected immediately.
-}
configMonitorThread :: ThreadState -> DaemonStatusHandle -> BranchChangeHandle -> CommitChan -> NamedThread
configMonitorThread st dstatus branchhandle commitchan = thread $ do
configMonitorThread st dstatus branchhandle commitchan = thread $ liftIO $ do
r <- runThreadState st Annex.gitRepo
go r =<< getConfigs r
where
@ -50,7 +50,7 @@ configMonitorThread st dstatus branchhandle commitchan = thread $ do
new <- getConfigs r
when (old /= new) $ do
let changedconfigs = new `S.difference` old
debug thisThread $ "reloading config" :
brokendebug thisThread $ "reloading config" :
map fst (S.toList changedconfigs)
reloadConfigs st dstatus changedconfigs
{- Record a commit to get this config

View file

@ -9,28 +9,21 @@ module Assistant.Threads.DaemonStatus where
import Assistant.Common
import Assistant.DaemonStatus
import Assistant.ThreadedMonad
import Utility.ThreadScheduler
import Utility.NotificationBroadcaster
thisThread :: ThreadName
thisThread = "DaemonStatus"
{- This writes the daemon status to disk, when it changes, but no more
- frequently than once every ten minutes.
-}
daemonStatusThread :: ThreadState -> DaemonStatusHandle -> NamedThread
daemonStatusThread st dstatus = thread $ do
notifier <- newNotificationHandle
=<< changeNotifier <$> getDaemonStatus dstatus
daemonStatusThread :: NamedThread
daemonStatusThread = NamedThread "DaemonStatus" $ do
notifier <- liftIO . newNotificationHandle
=<< changeNotifier <$> daemonStatus
checkpoint
runEvery (Seconds tenMinutes) $ do
waitNotification notifier
runEvery (Seconds tenMinutes) <~> do
liftIO $ waitNotification notifier
checkpoint
where
thread = NamedThread thisThread
checkpoint = do
status <- getDaemonStatus dstatus
file <- runThreadState st $ fromRepo gitAnnexDaemonStatusFile
writeDaemonStatusFile file status
where
checkpoint = do
file <- liftAnnex $ fromRepo gitAnnexDaemonStatusFile
liftIO . writeDaemonStatusFile file =<< daemonStatus

View file

@ -25,7 +25,7 @@ thisThread = "Merger"
{- This thread watches for changes to .git/refs/, and handles incoming
- pushes. -}
mergeThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> BranchChangeHandle -> NamedThread
mergeThread st dstatus transferqueue branchchange = thread $ do
mergeThread st dstatus transferqueue branchchange = thread $ liftIO $ do
g <- runThreadState st gitRepo
let dir = Git.localGitDir g </> "refs"
createDirectoryIfMissing True dir
@ -35,7 +35,7 @@ mergeThread st dstatus transferqueue branchchange = thread $ do
, errHook = hook onErr
}
void $ watchDir dir (const False) hooks id
debug thisThread ["watching", dir]
brokendebug thisThread ["watching", dir]
where
thread = NamedThread thisThread
@ -81,7 +81,7 @@ onAdd st dstatus transferqueue branchchange file _
changedbranch = fileToBranch file
mergecurrent (Just current)
| equivBranches changedbranch current = do
liftIO $ debug thisThread
liftIO $ brokendebug thisThread
[ "merging"
, show changedbranch
, "into"

View file

@ -40,7 +40,7 @@ thisThread :: ThreadName
thisThread = "MountWatcher"
mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread
mountWatcherThread st handle scanremotes pushnotifier = thread $
mountWatcherThread st handle scanremotes pushnotifier = thread $ liftIO $
#if WITH_DBUS
dbusThread st handle scanremotes pushnotifier
#else
@ -93,7 +93,7 @@ checkMountMonitor client = do
case running of
[] -> startOneService client startableservices
(service:_) -> do
debug thisThread [ "Using running DBUS service"
brokendebug thisThread [ "Using running DBUS service"
, service
, "to monitor mount events."
]
@ -111,7 +111,7 @@ startOneService client (x:xs) = do
[toVariant x, toVariant (0 :: Word32)]
ifM (elem x <$> listServiceNames client)
( do
debug thisThread [ "Started DBUS service"
brokendebug thisThread [ "Started DBUS service"
, x
, "to monitor mount events."
]
@ -160,7 +160,7 @@ handleMounts st dstatus scanremotes pushnotifier wasmounted nowmounted =
handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> FilePath -> IO ()
handleMount st dstatus scanremotes pushnotifier dir = do
debug thisThread ["detected mount of", dir]
brokendebug thisThread ["detected mount of", dir]
reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier)
=<< filter (Git.repoIsLocal . Remote.repo)
<$> remotesUnder st dstatus dir

View file

@ -11,9 +11,6 @@
module Assistant.Threads.NetWatcher where
import Assistant.Common
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.ScanRemotes
import Assistant.Sync
import Assistant.Pushes
import Utility.ThreadScheduler
@ -29,72 +26,67 @@ import Data.Word (Word32)
#warning Building without dbus support; will poll for network connection changes
#endif
thisThread :: ThreadName
thisThread = "NetWatcher"
netWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread
netWatcherThread :: NamedThread
#if WITH_DBUS
netWatcherThread st dstatus scanremotes pushnotifier = thread $
dbusThread st dstatus scanremotes pushnotifier
netWatcherThread = thread dbusThread
#else
netWatcherThread _ _ _ _ = thread noop
netWatcherThread = thread noop
#endif
where
thread = NamedThread thisThread
where
thread = NamedThread "NetWatcher"
{- This is a fallback for when dbus cannot be used to detect
- network connection changes, but it also ensures that
- any networked remotes that may have not been routable for a
- while (despite the local network staying up), are synced with
- periodically. -}
netWatcherFallbackThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> NamedThread
netWatcherFallbackThread st dstatus scanremotes pushnotifier = thread $
runEvery (Seconds 3600) $
handleConnection st dstatus scanremotes pushnotifier
where
thread = NamedThread thisThread
netWatcherFallbackThread :: NamedThread
netWatcherFallbackThread = NamedThread "NetWatcherFallback" $
runEvery (Seconds 3600) <~> handleConnection
#if WITH_DBUS
dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO ()
dbusThread st dstatus scanremotes pushnotifier =
persistentClient getSystemAddress () onerr go
where
go client = ifM (checkNetMonitor client)
( do
listenNMConnections client handleconn
listenWicdConnections client handleconn
, do
runThreadState st $
warning "No known network monitor available through dbus; falling back to polling"
)
handleconn = do
debug thisThread ["detected network connection"]
notifyRestart pushnotifier
handleConnection st dstatus scanremotes pushnotifier
onerr e _ = do
runThreadState st $
warning $ "lost dbus connection; falling back to polling (" ++ show e ++ ")"
{- Wait, in hope that dbus will come back -}
threadDelaySeconds (Seconds 60)
dbusThread :: Assistant ()
dbusThread = do
handleerr <- asIO2 onerr
runclient <- asIO go
liftIO $ persistentClient getSystemAddress () handleerr runclient
where
go client = ifM (checkNetMonitor client)
( do
listenNMConnections client <~> handleconn
listenWicdConnections client <~> handleconn
, do
liftAnnex $
warning "No known network monitor available through dbus; falling back to polling"
)
handleconn = do
debug ["detected network connection"]
notifyRestart <<~ pushNotifier
handleConnection
onerr e _ = do
liftAnnex $
warning $ "lost dbus connection; falling back to polling (" ++ show e ++ ")"
{- Wait, in hope that dbus will come back -}
liftIO $ threadDelaySeconds (Seconds 60)
{- Examine the list of services connected to dbus, to see if there
- are any we can use to monitor network connections. -}
checkNetMonitor :: Client -> IO Bool
checkNetMonitor :: Client -> Assistant Bool
checkNetMonitor client = do
running <- filter (`elem` [networkmanager, wicd])
running <- liftIO $ filter (`elem` [networkmanager, wicd])
<$> listServiceNames client
case running of
[] -> return False
(service:_) -> do
debug thisThread [ "Using running DBUS service"
debug [ "Using running DBUS service"
, service
, "to monitor network connection events."
]
return True
where
networkmanager = "org.freedesktop.NetworkManager"
wicd = "org.wicd.daemon"
where
networkmanager = "org.freedesktop.NetworkManager"
wicd = "org.wicd.daemon"
{- Listens for new NetworkManager connections. -}
listenNMConnections :: Client -> IO () -> IO ()
@ -102,18 +94,18 @@ listenNMConnections client callback =
listen client matcher $ \event ->
when (Just True == anyM activeconnection (signalBody event)) $
callback
where
matcher = matchAny
{ matchInterface = Just "org.freedesktop.NetworkManager.Connection.Active"
, matchMember = Just "PropertiesChanged"
}
nm_connection_activated = toVariant (2 :: Word32)
nm_state_key = toVariant ("State" :: String)
activeconnection v = do
m <- fromVariant v
vstate <- lookup nm_state_key $ dictionaryItems m
state <- fromVariant vstate
return $ state == nm_connection_activated
where
matcher = matchAny
{ matchInterface = Just "org.freedesktop.NetworkManager.Connection.Active"
, matchMember = Just "PropertiesChanged"
}
nm_connection_activated = toVariant (2 :: Word32)
nm_state_key = toVariant ("State" :: String)
activeconnection v = do
m <- fromVariant v
vstate <- lookup nm_state_key $ dictionaryItems m
state <- fromVariant vstate
return $ state == nm_connection_activated
{- Listens for new Wicd connections. -}
listenWicdConnections :: Client -> IO () -> IO ()
@ -121,21 +113,23 @@ listenWicdConnections client callback =
listen client matcher $ \event ->
when (any (== wicd_success) (signalBody event)) $
callback
where
matcher = matchAny
{ matchInterface = Just "org.wicd.daemon"
, matchMember = Just "ConnectResultsSent"
}
wicd_success = toVariant ("success" :: String)
where
matcher = matchAny
{ matchInterface = Just "org.wicd.daemon"
, matchMember = Just "ConnectResultsSent"
}
wicd_success = toVariant ("success" :: String)
#endif
handleConnection :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> PushNotifier -> IO ()
handleConnection st dstatus scanremotes pushnotifier =
reconnectRemotes thisThread st dstatus scanremotes (Just pushnotifier)
=<< networkRemotes st
handleConnection :: Assistant ()
handleConnection = do
d <- getAssistant id
liftIO . reconnectRemotes (threadName d) (threadState d)
(daemonStatusHandle d) (scanRemoteMap d) (Just $ pushNotifier d)
=<< networkRemotes
{- Finds network remotes. -}
networkRemotes :: ThreadState -> IO [Remote]
networkRemotes st = runThreadState st $
networkRemotes :: Assistant [Remote]
networkRemotes = liftAnnex $
filter (isNothing . Remote.localpath) <$> remoteList

View file

@ -28,7 +28,7 @@ thisThread :: ThreadName
thisThread = "PairListener"
pairListenerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> UrlRenderer -> NamedThread
pairListenerThread st dstatus scanremotes urlrenderer = thread $ withSocketsDo $
pairListenerThread st dstatus scanremotes urlrenderer = thread $ liftIO $ withSocketsDo $
runEvery (Seconds 1) $ void $ tryIO $ do
sock <- getsock
go sock [] []

View file

@ -35,7 +35,7 @@ controllerThread pushnotifier a = forever $ do
killThread tid
pushNotifierThread :: ThreadState -> DaemonStatusHandle -> PushNotifier -> NamedThread
pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $
pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $ liftIO $
controllerThread pushnotifier $ do
v <- runThreadState st $ getXMPPCreds
case v of
@ -45,7 +45,7 @@ pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $
loop c starttime = do
void $ connectXMPP c $ \jid -> do
fulljid <- bindJID jid
liftIO $ debug thisThread ["XMPP connected", show fulljid]
liftIO $ brokendebug thisThread ["XMPP connected", show fulljid]
putStanza $ gitAnnexPresence gitAnnexSignature
s <- getSession
_ <- liftIO $ forkIO $ void $ runXMPP s $
@ -54,10 +54,10 @@ pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $
now <- getCurrentTime
if diffUTCTime now starttime > 300
then do
debug thisThread ["XMPP connection lost; reconnecting"]
brokendebug thisThread ["XMPP connection lost; reconnecting"]
loop c now
else do
debug thisThread ["XMPP connection failed; will retry"]
brokendebug thisThread ["XMPP connection failed; will retry"]
threadDelaySeconds (Seconds 300)
loop c =<< getCurrentTime
@ -67,7 +67,7 @@ pushNotifierThread st dstatus pushnotifier = NamedThread thisThread $
receivenotifications = forever $ do
s <- getStanza
liftIO $ debug thisThread ["received XMPP:", show s]
liftIO $ brokendebug thisThread ["received XMPP:", show s]
case s of
ReceivedPresence p@(Presence { presenceType = PresenceAvailable }) ->
liftIO $ pull st dstatus $
@ -93,7 +93,7 @@ pull :: ThreadState -> DaemonStatusHandle -> [UUID] -> IO ()
pull _ _ [] = noop
pull st dstatus us = do
rs <- filter matching . syncRemotes <$> getDaemonStatus dstatus
debug thisThread $ "push notification for" :
brokendebug thisThread $ "push notification for" :
map (fromUUID . Remote.uuid ) rs
pullone rs =<< runThreadState st (inRepo Git.Branch.current)
where

View file

@ -25,12 +25,12 @@ thisThread = "Pusher"
{- This thread retries pushes that failed before. -}
pushRetryThread :: ThreadState -> DaemonStatusHandle -> FailedPushMap -> PushNotifier -> NamedThread
pushRetryThread st dstatus pushmap pushnotifier = thread $ runEvery (Seconds halfhour) $ do
pushRetryThread st dstatus pushmap pushnotifier = thread $ liftIO $ runEvery (Seconds halfhour) $ do
-- We already waited half an hour, now wait until there are failed
-- pushes to retry.
topush <- getFailedPushesBefore pushmap (fromIntegral halfhour)
unless (null topush) $ do
debug thisThread
brokendebug thisThread
[ "retrying"
, show (length topush)
, "failed pushes"
@ -44,7 +44,7 @@ pushRetryThread st dstatus pushmap pushnotifier = thread $ runEvery (Seconds hal
{- This thread pushes git commits out to remotes soon after they are made. -}
pushThread :: ThreadState -> DaemonStatusHandle -> CommitChan -> FailedPushMap -> PushNotifier -> NamedThread
pushThread st dstatus commitchan pushmap pushnotifier = thread $ runEvery (Seconds 2) $ do
pushThread st dstatus commitchan pushmap pushnotifier = thread $ liftIO $ runEvery (Seconds 2) $ do
-- We already waited two seconds as a simple rate limiter.
-- Next, wait until at least one commit has been made
commits <- getCommits commitchan
@ -58,7 +58,7 @@ pushThread st dstatus commitchan pushmap pushnotifier = thread $ runEvery (Secon
void $ alertWhile dstatus (pushAlert remotes) $
pushToRemotes thisThread now st (Just pushnotifier) (Just pushmap) remotes
else do
debug thisThread
brokendebug thisThread
[ "delaying push of"
, show (length commits)
, "commits"

View file

@ -11,60 +11,56 @@ module Assistant.Threads.SanityChecker (
import Assistant.Common
import Assistant.DaemonStatus
import Assistant.ThreadedMonad
import Assistant.Changes
import Assistant.Alert
import Assistant.TransferQueue
import qualified Git.LsFiles
import Utility.ThreadScheduler
import qualified Assistant.Threads.Watcher as Watcher
import Data.Time.Clock.POSIX
thisThread :: ThreadName
thisThread = "SanityChecker"
{- This thread wakes up occasionally to make sure the tree is in good shape. -}
sanityCheckerThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> NamedThread
sanityCheckerThread st dstatus transferqueue changechan = thread $ forever $ do
waitForNextCheck dstatus
sanityCheckerThread :: NamedThread
sanityCheckerThread = NamedThread "SanityChecker" $ forever $ do
waitForNextCheck
debug thisThread ["starting sanity check"]
debug ["starting sanity check"]
void $ alertWhile dstatus sanityCheckAlert go
dstatus <- getAssistant daemonStatusHandle
void $ alertWhile dstatus sanityCheckAlert <~> go
debug thisThread ["sanity check complete"]
where
thread = NamedThread thisThread
go = do
modifyDaemonStatus_ dstatus $ \s -> s
{ sanityCheckRunning = True }
debug ["sanity check complete"]
where
go = do
dstatus <- getAssistant daemonStatusHandle
liftIO $ modifyDaemonStatus_ dstatus $ \s -> s
{ sanityCheckRunning = True }
now <- getPOSIXTime -- before check started
r <- catchIO (check st dstatus transferqueue changechan)
$ \e -> do
runThreadState st $ warning $ show e
return False
now <- liftIO $ getPOSIXTime -- before check started
r <- either showerr return =<< tryIO <~> check
modifyDaemonStatus_ dstatus $ \s -> s
{ sanityCheckRunning = False
, lastSanityCheck = Just now
}
liftIO $ modifyDaemonStatus_ dstatus $ \s -> s
{ sanityCheckRunning = False
, lastSanityCheck = Just now
}
return r
return r
showerr e = do
liftAnnex $ warning $ show e
return False
{- Only run one check per day, from the time of the last check. -}
waitForNextCheck :: DaemonStatusHandle -> IO ()
waitForNextCheck dstatus = do
v <- lastSanityCheck <$> getDaemonStatus dstatus
now <- getPOSIXTime
threadDelaySeconds $ Seconds $ calcdelay now v
where
calcdelay _ Nothing = oneDay
calcdelay now (Just lastcheck)
| lastcheck < now = max oneDay $
oneDay - truncate (now - lastcheck)
| otherwise = oneDay
waitForNextCheck :: Assistant ()
waitForNextCheck = do
v <- lastSanityCheck <$> daemonStatus
now <- liftIO getPOSIXTime
liftIO $ threadDelaySeconds $ Seconds $ calcdelay now v
where
calcdelay _ Nothing = oneDay
calcdelay now (Just lastcheck)
| lastcheck < now = max oneDay $
oneDay - truncate (now - lastcheck)
| otherwise = oneDay
oneDay :: Int
oneDay = 24 * 60 * 60
@ -72,29 +68,31 @@ oneDay = 24 * 60 * 60
{- It's important to stay out of the Annex monad as much as possible while
- running potentially expensive parts of this check, since remaining in it
- will block the watcher. -}
check :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> IO Bool
check st dstatus transferqueue changechan = do
g <- runThreadState st gitRepo
check :: Assistant Bool
check = do
g <- liftAnnex gitRepo
-- Find old unstaged symlinks, and add them to git.
(unstaged, cleanup) <- Git.LsFiles.notInRepo False ["."] g
now <- getPOSIXTime
(unstaged, cleanup) <- liftIO $ Git.LsFiles.notInRepo False ["."] g
now <- liftIO $ getPOSIXTime
forM_ unstaged $ \file -> do
ms <- catchMaybeIO $ getSymbolicLinkStatus file
ms <- liftIO $ catchMaybeIO $ getSymbolicLinkStatus file
case ms of
Just s | toonew (statusChangeTime s) now -> noop
| isSymbolicLink s ->
addsymlink file ms
| isSymbolicLink s -> addsymlink file ms
_ -> noop
void cleanup
liftIO $ void cleanup
return True
where
toonew timestamp now = now < (realToFrac (timestamp + slop) :: POSIXTime)
slop = fromIntegral tenMinutes
insanity msg = do
runThreadState st $ warning msg
void $ addAlert dstatus $ sanityCheckFixAlert msg
addsymlink file s = do
Watcher.runHandler thisThread st dstatus
transferqueue changechan
Watcher.onAddSymlink file s
insanity $ "found unstaged symlink: " ++ file
where
toonew timestamp now = now < (realToFrac (timestamp + slop) :: POSIXTime)
slop = fromIntegral tenMinutes
insanity msg = do
liftAnnex $ warning msg
dstatus <- getAssistant daemonStatusHandle
liftIO $ void $ addAlert dstatus $ sanityCheckFixAlert msg
addsymlink file s = do
d <- getAssistant id
liftIO $ Watcher.runHandler (threadName d)
(threadState d) (daemonStatusHandle d)
(transferQueue d) (changeChan d)
Watcher.onAddSymlink file s
insanity $ "found unstaged symlink: " ++ file

View file

@ -8,7 +8,6 @@
module Assistant.Threads.TransferPoller where
import Assistant.Common
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Logs.Transfer
import Utility.NotificationBroadcaster
@ -17,46 +16,42 @@ import qualified Assistant.Threads.TransferWatcher as TransferWatcher
import Control.Concurrent
import qualified Data.Map as M
thisThread :: ThreadName
thisThread = "TransferPoller"
{- This thread polls the status of ongoing transfers, determining how much
- of each transfer is complete. -}
transferPollerThread :: ThreadState -> DaemonStatusHandle -> NamedThread
transferPollerThread st dstatus = thread $ do
g <- runThreadState st gitRepo
tn <- newNotificationHandle =<<
transferNotifier <$> getDaemonStatus dstatus
transferPollerThread :: NamedThread
transferPollerThread = NamedThread "TransferPoller" $ do
g <- liftAnnex gitRepo
tn <- liftIO . newNotificationHandle =<<
transferNotifier <$> daemonStatus
forever $ do
threadDelay 500000 -- 0.5 seconds
ts <- currentTransfers <$> getDaemonStatus dstatus
liftIO $ threadDelay 500000 -- 0.5 seconds
ts <- currentTransfers <$> daemonStatus
if M.null ts
then waitNotification tn -- block until transfers running
-- block until transfers running
then liftIO $ waitNotification tn
else mapM_ (poll g) $ M.toList ts
where
thread = NamedThread thisThread
poll g (t, info)
{- Downloads are polled by checking the size of the
- temp file being used for the transfer. -}
| transferDirection t == Download = do
let f = gitAnnexTmpLocation (transferKey t) g
sz <- catchMaybeIO $
fromIntegral . fileSize
<$> getFileStatus f
newsize t info sz
{- Uploads don't need to be polled for when the
- TransferWatcher thread can track file
- modifications. -}
| TransferWatcher.watchesTransferSize = noop
{- Otherwise, this code polls the upload progress
- by reading the transfer info file. -}
| otherwise = do
let f = transferFile t g
mi <- catchDefaultIO Nothing $
readTransferInfoFile Nothing f
maybe noop (newsize t info . bytesComplete) mi
newsize t info sz
| bytesComplete info /= sz && isJust sz =
alterTransferInfo dstatus t $
\i -> i { bytesComplete = sz }
| otherwise = noop
where
poll g (t, info)
{- Downloads are polled by checking the size of the
- temp file being used for the transfer. -}
| transferDirection t == Download = do
let f = gitAnnexTmpLocation (transferKey t) g
sz <- liftIO $ catchMaybeIO $
fromIntegral . fileSize <$> getFileStatus f
newsize t info sz
{- Uploads don't need to be polled for when the TransferWatcher
- thread can track file modifications. -}
| TransferWatcher.watchesTransferSize = noop
{- Otherwise, this code polls the upload progress
- by reading the transfer info file. -}
| otherwise = do
let f = transferFile t g
mi <- liftIO $ catchDefaultIO Nothing $
readTransferInfoFile Nothing f
maybe noop (newsize t info . bytesComplete) mi
newsize t info sz
| bytesComplete info /= sz && isJust sz =
alterTransferInfo t (\i -> i { bytesComplete = sz })
<<~ daemonStatusHandle
| otherwise = noop

View file

@ -34,7 +34,7 @@ thisThread = "TransferScanner"
- that need to be made, to keep data in sync.
-}
transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> NamedThread
transferScannerThread st dstatus scanremotes transferqueue = thread $ do
transferScannerThread st dstatus scanremotes transferqueue = thread $ liftIO $ do
startupScan
go S.empty
where
@ -100,7 +100,7 @@ failedTransferScan st dstatus transferqueue r = do
-}
expensiveScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> [Remote] -> IO ()
expensiveScan st dstatus transferqueue rs = unless onlyweb $ do
liftIO $ debug thisThread ["starting scan of", show visiblers]
brokendebug thisThread ["starting scan of", show visiblers]
void $ alertWhile dstatus (scanAlert visiblers) $ do
g <- runThreadState st gitRepo
(files, cleanup) <- LsFiles.inRepo [] g
@ -110,13 +110,13 @@ expensiveScan st dstatus transferqueue rs = unless onlyweb $ do
mapM_ (enqueue f) ts
void cleanup
return True
liftIO $ debug thisThread ["finished scan of", show visiblers]
brokendebug thisThread ["finished scan of", show visiblers]
where
onlyweb = all (== webUUID) $ map Remote.uuid rs
visiblers = let rs' = filter (not . Remote.readonly) rs
in if null rs' then rs else rs'
enqueue f (r, t) = do
debug thisThread ["queuing", show t]
brokendebug thisThread ["queuing", show t]
queueTransferWhenSmall transferqueue dstatus (Just f) t r
findtransfers f (key, _) = do
locs <- loggedLocations key

View file

@ -26,7 +26,7 @@ thisThread = "TransferWatcher"
{- This thread watches for changes to the gitAnnexTransferDir,
- and updates the DaemonStatus's map of ongoing transfers. -}
transferWatcherThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> NamedThread
transferWatcherThread st dstatus transferqueue = thread $ do
transferWatcherThread st dstatus transferqueue = thread $ liftIO $ do
g <- runThreadState st gitRepo
let dir = gitAnnexTransferDir g
createDirectoryIfMissing True dir
@ -38,7 +38,7 @@ transferWatcherThread st dstatus transferqueue = thread $ do
, errHook = hook onErr
}
void $ watchDir dir (const False) hooks id
debug thisThread ["watching for transfers"]
brokendebug thisThread ["watching for transfers"]
where
thread = NamedThread thisThread
@ -66,7 +66,7 @@ onAdd st dstatus _ file _ = case parseTransferFile file of
where
go _ Nothing = noop -- transfer already finished
go t (Just info) = do
debug thisThread
brokendebug thisThread
[ "transfer starting:"
, show t
]
@ -87,8 +87,9 @@ onModify _ dstatus _ file _ = do
Just t -> go t =<< readTransferInfoFile Nothing file
where
go _ Nothing = noop
go t (Just newinfo) = alterTransferInfo dstatus t $ \info ->
info { bytesComplete = bytesComplete newinfo }
go t (Just newinfo) = alterTransferInfo t
(\i -> i { bytesComplete = bytesComplete newinfo })
dstatus
{- This thread can only watch transfer sizes when the DirWatcher supports
- tracking modificatons to files. -}
@ -100,7 +101,7 @@ onDel :: Handler
onDel st dstatus transferqueue file _ = case parseTransferFile file of
Nothing -> noop
Just t -> do
debug thisThread
brokendebug thisThread
[ "transfer finishing:"
, show t
]

View file

@ -32,7 +32,7 @@ maxTransfers = 1
{- Dispatches transfers from the queue. -}
transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> TransferSlots -> CommitChan -> NamedThread
transfererThread st dstatus transferqueue slots commitchan = thread $ go =<< readProgramFile
transfererThread st dstatus transferqueue slots commitchan = thread $ liftIO $ go =<< readProgramFile
where
thread = NamedThread thisThread
go program = forever $ inTransferSlot dstatus slots $
@ -47,11 +47,11 @@ startTransfer :: ThreadState -> DaemonStatusHandle -> CommitChan -> FilePath ->
startTransfer st dstatus commitchan program t info = case (transferRemote info, associatedFile info) of
(Just remote, Just file) -> ifM (runThreadState st $ shouldTransfer t info)
( do
debug thisThread [ "Transferring:" , show t ]
brokendebug thisThread [ "Transferring:" , show t ]
notifyTransfer dstatus
return $ Just (t, info, transferprocess remote file)
, do
debug thisThread [ "Skipping unnecessary transfer:" , show t ]
brokendebug thisThread [ "Skipping unnecessary transfer:" , show t ]
void $ removeTransfer dstatus t
return Nothing
)

View file

@ -56,9 +56,9 @@ needLsof = error $ unlines
]
watchThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> ChangeChan -> NamedThread
watchThread st dstatus transferqueue changechan = NamedThread thisThread $ do
watchThread st dstatus transferqueue changechan = NamedThread thisThread $ liftIO $ do
void $ watchDir "." ignored hooks startup
debug thisThread [ "watching", "."]
brokendebug thisThread [ "watching", "."]
where
startup = startupScan st dstatus
hook a = Just $ runHandler thisThread st dstatus transferqueue changechan a
@ -132,7 +132,7 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l
checkcontent key s
ensurestaged link s
, do
liftIO $ debug threadname ["fix symlink", file]
liftIO $ brokendebug threadname ["fix symlink", file]
liftIO $ removeFile file
liftIO $ createSymbolicLink link file
checkcontent key =<< liftIO (getDaemonStatus dstatus)
@ -162,7 +162,7 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l
{- For speed, tries to reuse the existing blob for symlink target. -}
addlink link = do
liftIO $ debug threadname ["add symlink", file]
liftIO $ brokendebug threadname ["add symlink", file]
v <- catObjectDetails $ Ref $ ':':file
case v of
Just (currlink, sha)
@ -187,7 +187,7 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l
onDel :: Handler
onDel threadname file _ _dstatus _ = do
liftIO $ debug threadname ["file deleted", file]
liftIO $ brokendebug threadname ["file deleted", file]
Annex.Queue.addUpdateIndex =<<
inRepo (Git.UpdateIndex.unstageFile file)
madeChange file RmChange
@ -201,7 +201,7 @@ onDel threadname file _ _dstatus _ = do
- just as good. -}
onDelDir :: Handler
onDelDir threadname dir _ _dstatus _ = do
liftIO $ debug threadname ["directory deleted", dir]
liftIO $ brokendebug threadname ["directory deleted", dir]
Annex.Queue.addCommand "rm"
[Params "--quiet -r --cached --ignore-unmatch --"] [dir]
madeChange dir RmDirChange

View file

@ -52,7 +52,7 @@ webAppThread
-> Maybe (IO String)
-> Maybe (Url -> FilePath -> IO ())
-> NamedThread
webAppThread assistantdata urlrenderer noannex postfirstrun onstartup = thread $ do
webAppThread assistantdata urlrenderer noannex postfirstrun onstartup = thread $ liftIO $ do
webapp <- WebApp
<$> pure assistantdata
<*> (pack <$> genRandomToken)
@ -83,7 +83,7 @@ webAppThread assistantdata urlrenderer noannex postfirstrun onstartup = thread $
(relHome =<< absPath
=<< runThreadState (threadState assistantdata) (fromRepo repoPath))
go port webapp htmlshim urlfile = do
debug thisThread ["running on port", show port]
brokendebug thisThread ["running on port", show port]
let url = myUrl webapp port
maybe noop (`writeFile` url) urlfile
writeHtmlShim url htmlshim

View file

@ -76,7 +76,7 @@ getAssistantY :: forall sub a. (AssistantData -> a) -> GHandler sub WebApp a
getAssistantY f = f <$> (assistantData <$> getYesod)
getDaemonStatusY :: forall sub. GHandler sub WebApp DaemonStatus
getDaemonStatusY = liftIO . getDaemonStatus =<< getAssistantY daemonStatus
getDaemonStatusY = liftIO . getDaemonStatus =<< getAssistantY daemonStatusHandle
getWebAppState :: forall sub. GHandler sub WebApp WebAppState
getWebAppState = liftIO . atomically . readTMVar =<< webAppState <$> getYesod

View file

@ -69,7 +69,7 @@ setRepoConfig uuid mremote oldc newc = do
when (repoSyncable oldc /= repoSyncable newc) $
changeSyncable mremote (repoSyncable newc)
when (isJust mremote && repoName oldc /= repoName newc) $ do
dstatus <- getAssistantY daemonStatus
dstatus <- getAssistantY daemonStatusHandle
runAnnex undefined $ do
name <- fromRepo $ uniqueRemoteName (T.unpack $ repoName newc) 0
inRepo $ Git.Command.run "remote"

View file

@ -87,7 +87,7 @@ getInprogressPairR _ = noPairing
-}
startPairing :: PairStage -> IO () -> (AlertButton -> Alert) -> Maybe UUID -> Text -> Secret -> Widget
startPairing stage oncancel alert muuid displaysecret secret = do
dstatus <- lift $ getAssistantY daemonStatus
dstatus <- lift $ getAssistantY daemonStatusHandle
urlrender <- lift getUrlRender
reldir <- fromJust . relDir <$> lift getYesod

View file

@ -124,5 +124,5 @@ makeS3Remote (S3Creds ak sk) name setup config = do
makeSpecialRemote name S3.remote config
return remotename
setup r
liftIO $ syncNewRemote st (daemonStatus d) (scanRemoteMap d) r
liftIO $ syncNewRemote st (daemonStatusHandle d) (scanRemoteMap d) r
redirect $ EditNewCloudRepositoryR $ Remote.uuid r

View file

@ -286,7 +286,7 @@ makeSshRepo forcersync setup sshdata = do
d <- getAssistantY id
r <- liftIO $ makeSshRemote
(threadState d)
(daemonStatus d)
(daemonStatusHandle d)
(scanRemoteMap d)
forcersync sshdata
setup r

View file

@ -34,7 +34,7 @@ import qualified Data.Text as T
{- Displays an alert suggesting to configure XMPP, with a button. -}
xmppNeeded :: Handler ()
xmppNeeded = whenM (isNothing <$> runAnnex Nothing getXMPPCreds) $ do
dstatus <- getAssistantY daemonStatus
dstatus <- getAssistantY daemonStatusHandle
urlrender <- getUrlRender
void $ liftIO $ addAlert dstatus $ xmppNeededAlert $ AlertButton
{ buttonLabel = "Configure a Jabber account"

View file

@ -73,7 +73,7 @@ getSideBarR nid = do
{- Called by the client to close an alert. -}
getCloseAlert :: AlertId -> Handler ()
getCloseAlert i = do
dstatus <- getAssistantY daemonStatus
dstatus <- getAssistantY daemonStatusHandle
liftIO $ removeAlert dstatus i
{- When an alert with a button is clicked on, the button takes us here. -}

View file

@ -37,7 +37,7 @@ changeSyncable (Just r) True = do
changeSyncable (Just r) False = do
changeSyncFlag r False
d <- getAssistantY id
let dstatus = daemonStatus d
let dstatus = daemonStatusHandle d
let st = threadState d
liftIO $ runThreadState st $ updateSyncRemotes dstatus
{- Stop all transfers to or from this remote.
@ -65,7 +65,7 @@ syncRemote remote = do
d <- getAssistantY id
liftIO $ syncNewRemote
(threadState d)
(daemonStatus d)
(daemonStatusHandle d)
(scanRemoteMap d)
remote
@ -74,7 +74,7 @@ pauseTransfer = cancelTransfer True
cancelTransfer :: Bool -> Transfer -> Handler ()
cancelTransfer pause t = do
dstatus <- getAssistantY daemonStatus
dstatus <- getAssistantY daemonStatusHandle
tq <- getAssistantY transferQueue
m <- getCurrentTransfers
liftIO $ do
@ -94,8 +94,9 @@ cancelTransfer pause t = do
maybe noop killproc $ transferPid info
if pause
then void $
alterTransferInfo dstatus t $ \i -> i
{ transferPaused = True }
alterTransferInfo t
(\i -> i { transferPaused = True })
dstatus
else void $
removeTransfer dstatus t
signalthread tid
@ -117,19 +118,20 @@ startTransfer t = do
where
go info = maybe (start info) resume $ transferTid info
startqueued = do
dstatus <- getAssistantY daemonStatus
dstatus <- getAssistantY daemonStatusHandle
q <- getAssistantY transferQueue
is <- liftIO $ map snd <$> getMatchingTransfers q dstatus (== t)
maybe noop start $ headMaybe is
resume tid = do
dstatus <- getAssistantY daemonStatus
dstatus <- getAssistantY daemonStatusHandle
liftIO $ do
alterTransferInfo dstatus t $ \i -> i
{ transferPaused = False }
alterTransferInfo t
(\i -> i { transferPaused = False })
dstatus
throwTo tid ResumeTransfer
start info = do
st <- getAssistantY threadState
dstatus <- getAssistantY daemonStatus
dstatus <- getAssistantY daemonStatusHandle
slots <- getAssistantY transferSlots
commitchan <- getAssistantY commitChan
liftIO $ inImmediateTransferSlot dstatus slots $ do