half way complete cronner thread to run scheduled activities
This commit is contained in:
parent
36ddd000ea
commit
af5e1d0494
9 changed files with 129 additions and 17 deletions
|
@ -22,6 +22,7 @@ import Assistant.Threads.Merger
|
||||||
import Assistant.Threads.TransferWatcher
|
import Assistant.Threads.TransferWatcher
|
||||||
import Assistant.Threads.Transferrer
|
import Assistant.Threads.Transferrer
|
||||||
import Assistant.Threads.SanityChecker
|
import Assistant.Threads.SanityChecker
|
||||||
|
import Assistant.Threads.Cronner
|
||||||
#ifdef WITH_CLIBS
|
#ifdef WITH_CLIBS
|
||||||
import Assistant.Threads.MountWatcher
|
import Assistant.Threads.MountWatcher
|
||||||
#endif
|
#endif
|
||||||
|
@ -133,6 +134,7 @@ startDaemon assistant foreground listenhost startbrowser = do
|
||||||
, assist $ netWatcherThread
|
, assist $ netWatcherThread
|
||||||
, assist $ netWatcherFallbackThread
|
, assist $ netWatcherFallbackThread
|
||||||
, assist $ transferScannerThread urlrenderer
|
, assist $ transferScannerThread urlrenderer
|
||||||
|
, assist $ cronnerThread
|
||||||
, assist $ configMonitorThread
|
, assist $ configMonitorThread
|
||||||
, assist $ glacierThread
|
, assist $ glacierThread
|
||||||
, watch $ watchThread
|
, watch $ watchThread
|
||||||
|
|
|
@ -76,6 +76,10 @@ updateSyncRemotes = do
|
||||||
M.filter $ \alert ->
|
M.filter $ \alert ->
|
||||||
alertName alert /= Just CloudRepoNeededAlert
|
alertName alert /= Just CloudRepoNeededAlert
|
||||||
|
|
||||||
|
updateScheduleLog :: Assistant ()
|
||||||
|
updateScheduleLog =
|
||||||
|
liftIO . sendNotification =<< scheduleLogNotifier <$> getDaemonStatus
|
||||||
|
|
||||||
{- Load any previous daemon status file, and store it in a MVar for this
|
{- Load any previous daemon status file, and store it in a MVar for this
|
||||||
- process to use as its DaemonStatus. Also gets current transfer status. -}
|
- process to use as its DaemonStatus. Also gets current transfer status. -}
|
||||||
startDaemonStatus :: Annex DaemonStatusHandle
|
startDaemonStatus :: Annex DaemonStatusHandle
|
||||||
|
|
|
@ -12,9 +12,9 @@ import Assistant.BranchChange
|
||||||
import Assistant.DaemonStatus
|
import Assistant.DaemonStatus
|
||||||
import Assistant.Commits
|
import Assistant.Commits
|
||||||
import Utility.ThreadScheduler
|
import Utility.ThreadScheduler
|
||||||
|
import Logs
|
||||||
import Logs.UUID
|
import Logs.UUID
|
||||||
import Logs.Trust
|
import Logs.Trust
|
||||||
import Logs.Remote
|
|
||||||
import Logs.PreferredContent
|
import Logs.PreferredContent
|
||||||
import Logs.Group
|
import Logs.Group
|
||||||
import Remote.List (remoteListRefresh)
|
import Remote.List (remoteListRefresh)
|
||||||
|
@ -52,12 +52,13 @@ configMonitorThread = namedThread "ConfigMonitor" $ loop =<< getConfigs
|
||||||
type Configs = S.Set (FilePath, String)
|
type Configs = S.Set (FilePath, String)
|
||||||
|
|
||||||
{- All git-annex's config files, and actions to run when they change. -}
|
{- All git-annex's config files, and actions to run when they change. -}
|
||||||
configFilesActions :: [(FilePath, Annex ())]
|
configFilesActions :: [(FilePath, Assistant ())]
|
||||||
configFilesActions =
|
configFilesActions =
|
||||||
[ (uuidLog, void uuidMapLoad)
|
[ (uuidLog, void $ liftAnnex uuidMapLoad)
|
||||||
, (remoteLog, void remoteListRefresh)
|
, (remoteLog, void $ liftAnnex remoteListRefresh)
|
||||||
, (trustLog, void trustMapLoad)
|
, (trustLog, void $ liftAnnex trustMapLoad)
|
||||||
, (groupLog, void groupMapLoad)
|
, (groupLog, void $ liftAnnex groupMapLoad)
|
||||||
|
, (scheduleLog, void updateScheduleLog)
|
||||||
-- Preferred content settings depend on most of the other configs,
|
-- Preferred content settings depend on most of the other configs,
|
||||||
-- so will be reloaded whenever any configs change.
|
-- so will be reloaded whenever any configs change.
|
||||||
, (preferredContentLog, noop)
|
, (preferredContentLog, noop)
|
||||||
|
@ -65,9 +66,8 @@ configFilesActions =
|
||||||
|
|
||||||
reloadConfigs :: Configs -> Assistant ()
|
reloadConfigs :: Configs -> Assistant ()
|
||||||
reloadConfigs changedconfigs = do
|
reloadConfigs changedconfigs = do
|
||||||
liftAnnex $ do
|
sequence_ as
|
||||||
sequence_ as
|
void $ liftAnnex preferredContentMapLoad
|
||||||
void preferredContentMapLoad
|
|
||||||
{- Changes to the remote log, or the trust log, can affect the
|
{- Changes to the remote log, or the trust log, can affect the
|
||||||
- syncRemotes list. Changes to the uuid log may affect its
|
- syncRemotes list. Changes to the uuid log may affect its
|
||||||
- display so are also included. -}
|
- display so are also included. -}
|
||||||
|
|
80
Assistant/Threads/Cronner.hs
Normal file
80
Assistant/Threads/Cronner.hs
Normal file
|
@ -0,0 +1,80 @@
|
||||||
|
{- git-annex assistant sceduled jobs runner
|
||||||
|
-
|
||||||
|
- Copyright 2013 Joey Hess <joey@kitenet.net>
|
||||||
|
-
|
||||||
|
- Licensed under the GNU GPL version 3 or higher.
|
||||||
|
-}
|
||||||
|
|
||||||
|
{-# LANGUAGE DeriveDataTypeable #-}
|
||||||
|
|
||||||
|
module Assistant.Threads.Cronner (
|
||||||
|
cronnerThread
|
||||||
|
) where
|
||||||
|
|
||||||
|
import Assistant.Common
|
||||||
|
import Assistant.DaemonStatus
|
||||||
|
import Utility.NotificationBroadcaster
|
||||||
|
import Logs.Schedule
|
||||||
|
import Annex.UUID
|
||||||
|
import Types.ScheduledActivity
|
||||||
|
|
||||||
|
import Control.Concurrent.Async
|
||||||
|
import Data.Time.LocalTime
|
||||||
|
import qualified Data.Map as M
|
||||||
|
import qualified Data.Set as S
|
||||||
|
import qualified Control.Exception as E
|
||||||
|
import Data.Typeable
|
||||||
|
|
||||||
|
data ActivityException = PleaseTerminate
|
||||||
|
deriving (Typeable, Show)
|
||||||
|
|
||||||
|
instance E.Exception ActivityException
|
||||||
|
|
||||||
|
{- Loads schedules for this repository, and fires off one thread for each
|
||||||
|
- scheduled event. These threads sleep until the next time the event
|
||||||
|
- should run.
|
||||||
|
-
|
||||||
|
- In the meantime the main thread waits for any changes to the
|
||||||
|
- schedules. When there's a change, compare the old and new list of
|
||||||
|
- schedules to find deleted and added ones. Start new threads for added
|
||||||
|
- ones, and send the threads a PleaseTerminate exception for the deleted
|
||||||
|
- ones. -}
|
||||||
|
cronnerThread :: NamedThread
|
||||||
|
cronnerThread = namedThreadUnchecked "Cronner" $ do
|
||||||
|
dstatus <- getDaemonStatus
|
||||||
|
h <- liftIO $ newNotificationHandle False (scheduleLogNotifier dstatus)
|
||||||
|
go h M.empty
|
||||||
|
where
|
||||||
|
go h m = do
|
||||||
|
activities <- liftAnnex $ scheduleGet =<< getUUID
|
||||||
|
|
||||||
|
let addedactivities = activities `S.difference` M.keysSet m
|
||||||
|
let removedactivities = M.keysSet m `S.difference` activities
|
||||||
|
|
||||||
|
liftIO $ forM_ (mapMaybe (`M.lookup` m) $ S.toList removedactivities) $
|
||||||
|
flip cancelWith PleaseTerminate
|
||||||
|
|
||||||
|
lastruntimes <- liftAnnex getLastRunTimes
|
||||||
|
addedm <- M.fromList <$> startactivities (S.toList addedactivities) lastruntimes
|
||||||
|
|
||||||
|
liftIO $ waitNotification h
|
||||||
|
|
||||||
|
let m' = M.difference (M.union addedm m)
|
||||||
|
(M.filterWithKey (\k _ -> S.member k removedactivities) m)
|
||||||
|
go h m'
|
||||||
|
startactivities as lastruntimes = forM as $ \activity -> do
|
||||||
|
runner <- asIO2 activityThread
|
||||||
|
a <- liftIO $ async $
|
||||||
|
runner activity (M.lookup activity lastruntimes)
|
||||||
|
return (activity, a)
|
||||||
|
|
||||||
|
{- Calculate the next time the activity is scheduled to run, then
|
||||||
|
- sleep until that time, and run it. Then call setLastRunTime, and
|
||||||
|
- loop.
|
||||||
|
-
|
||||||
|
- At any point, a PleaseTerminate could be received. This should result in
|
||||||
|
- the thread and any processes it has run shutting down.
|
||||||
|
-}
|
||||||
|
activityThread :: ScheduledActivity -> Maybe LocalTime -> Assistant ()
|
||||||
|
activityThread activity lastrun = do
|
||||||
|
noop
|
|
@ -62,6 +62,9 @@ data DaemonStatus = DaemonStatus
|
||||||
, alertNotifier :: NotificationBroadcaster
|
, alertNotifier :: NotificationBroadcaster
|
||||||
-- Broadcasts notifications when the syncRemotes change
|
-- Broadcasts notifications when the syncRemotes change
|
||||||
, syncRemotesNotifier :: NotificationBroadcaster
|
, syncRemotesNotifier :: NotificationBroadcaster
|
||||||
|
-- Broadcasts notifications when the scheduleLog changes
|
||||||
|
, scheduleLogNotifier :: NotificationBroadcaster
|
||||||
|
-- Broadcasts a notification once the startup sanity check has run.
|
||||||
, startupSanityCheckNotifier :: NotificationBroadcaster
|
, startupSanityCheckNotifier :: NotificationBroadcaster
|
||||||
-- When the XMPP client is connected, this will contain the XMPP
|
-- When the XMPP client is connected, this will contain the XMPP
|
||||||
-- address.
|
-- address.
|
||||||
|
@ -95,4 +98,5 @@ newDaemonStatus = DaemonStatus
|
||||||
<*> newNotificationBroadcaster
|
<*> newNotificationBroadcaster
|
||||||
<*> newNotificationBroadcaster
|
<*> newNotificationBroadcaster
|
||||||
<*> newNotificationBroadcaster
|
<*> newNotificationBroadcaster
|
||||||
|
<*> newNotificationBroadcaster
|
||||||
<*> pure Nothing
|
<*> pure Nothing
|
||||||
|
|
|
@ -28,6 +28,7 @@ module Locations (
|
||||||
gitAnnexBadLocation,
|
gitAnnexBadLocation,
|
||||||
gitAnnexUnusedLog,
|
gitAnnexUnusedLog,
|
||||||
gitAnnexFsckState,
|
gitAnnexFsckState,
|
||||||
|
gitAnnexScheduleState,
|
||||||
gitAnnexTransferDir,
|
gitAnnexTransferDir,
|
||||||
gitAnnexCredsDir,
|
gitAnnexCredsDir,
|
||||||
gitAnnexFeedStateDir,
|
gitAnnexFeedStateDir,
|
||||||
|
@ -192,6 +193,11 @@ gitAnnexUnusedLog prefix r = gitAnnexDir r </> (prefix ++ "unused")
|
||||||
gitAnnexFsckState :: Git.Repo -> FilePath
|
gitAnnexFsckState :: Git.Repo -> FilePath
|
||||||
gitAnnexFsckState r = gitAnnexDir r </> "fsckstate"
|
gitAnnexFsckState r = gitAnnexDir r </> "fsckstate"
|
||||||
|
|
||||||
|
{- .git/annex/schedulestate is used to store information about when
|
||||||
|
- scheduled jobs were last run. -}
|
||||||
|
gitAnnexScheduleState :: Git.Repo -> FilePath
|
||||||
|
gitAnnexScheduleState r = gitAnnexDir r </> "schedulestate"
|
||||||
|
|
||||||
{- .git/annex/creds/ is used to store credentials to access some special
|
{- .git/annex/creds/ is used to store credentials to access some special
|
||||||
- remotes. -}
|
- remotes. -}
|
||||||
gitAnnexCredsDir :: Git.Repo -> FilePath
|
gitAnnexCredsDir :: Git.Repo -> FilePath
|
||||||
|
|
|
@ -10,10 +10,14 @@ module Logs.Schedule (
|
||||||
scheduleSet,
|
scheduleSet,
|
||||||
scheduleGet,
|
scheduleGet,
|
||||||
scheduleMap,
|
scheduleMap,
|
||||||
|
getLastRunTimes,
|
||||||
|
setLastRunTime,
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import qualified Data.Map as M
|
import qualified Data.Map as M
|
||||||
|
import qualified Data.Set as S
|
||||||
import Data.Time.Clock.POSIX
|
import Data.Time.Clock.POSIX
|
||||||
|
import Data.Time.LocalTime
|
||||||
|
|
||||||
import Common.Annex
|
import Common.Annex
|
||||||
import Types.ScheduledActivity
|
import Types.ScheduledActivity
|
||||||
|
@ -37,7 +41,19 @@ scheduleMap = simpleMap
|
||||||
where
|
where
|
||||||
parser _uuid = Just . mapMaybe toScheduledActivity . split "; "
|
parser _uuid = Just . mapMaybe toScheduledActivity . split "; "
|
||||||
|
|
||||||
scheduleGet :: UUID -> Annex [ScheduledActivity]
|
scheduleGet :: UUID -> Annex (S.Set ScheduledActivity)
|
||||||
scheduleGet u = do
|
scheduleGet u = do
|
||||||
m <- scheduleMap
|
m <- scheduleMap
|
||||||
return $ fromMaybe [] $ M.lookup u m
|
return $ maybe S.empty S.fromList (M.lookup u m)
|
||||||
|
|
||||||
|
getLastRunTimes :: Annex (M.Map ScheduledActivity LocalTime)
|
||||||
|
getLastRunTimes = do
|
||||||
|
f <- fromRepo gitAnnexScheduleState
|
||||||
|
liftIO $ fromMaybe M.empty
|
||||||
|
<$> catchDefaultIO Nothing (readish <$> readFile f)
|
||||||
|
|
||||||
|
setLastRunTime :: ScheduledActivity -> LocalTime -> Annex ()
|
||||||
|
setLastRunTime activity lastrun = do
|
||||||
|
f <- fromRepo gitAnnexScheduleState
|
||||||
|
liftIO . writeFile f . show . M.insert activity lastrun
|
||||||
|
=<< getLastRunTimes
|
||||||
|
|
|
@ -14,7 +14,7 @@ import Types.UUID
|
||||||
data ScheduledActivity
|
data ScheduledActivity
|
||||||
= ScheduledSelfFsck Schedule
|
= ScheduledSelfFsck Schedule
|
||||||
| ScheduledRemoteFsck UUID Schedule
|
| ScheduledRemoteFsck UUID Schedule
|
||||||
deriving (Eq, Show, Ord)
|
deriving (Eq, Read, Show, Ord)
|
||||||
|
|
||||||
fromScheduledActivity :: ScheduledActivity -> String
|
fromScheduledActivity :: ScheduledActivity -> String
|
||||||
fromScheduledActivity (ScheduledSelfFsck s) =
|
fromScheduledActivity (ScheduledSelfFsck s) =
|
||||||
|
|
|
@ -29,7 +29,7 @@ import Data.Tuple.Utils
|
||||||
|
|
||||||
{- Some sort of scheduled event. -}
|
{- Some sort of scheduled event. -}
|
||||||
data Schedule = Schedule Recurrance ScheduledTime Duration
|
data Schedule = Schedule Recurrance ScheduledTime Duration
|
||||||
deriving (Eq, Show, Ord)
|
deriving (Eq, Read, Show, Ord)
|
||||||
|
|
||||||
data Recurrance
|
data Recurrance
|
||||||
= Daily
|
= Daily
|
||||||
|
@ -39,7 +39,7 @@ data Recurrance
|
||||||
-- Days, Weeks, or Months of the year evenly divisible by a number.
|
-- Days, Weeks, or Months of the year evenly divisible by a number.
|
||||||
-- (Divisible Year is years evenly divisible by a number.)
|
-- (Divisible Year is years evenly divisible by a number.)
|
||||||
| Divisible Int Recurrance
|
| Divisible Int Recurrance
|
||||||
deriving (Eq, Show, Ord)
|
deriving (Eq, Read, Show, Ord)
|
||||||
|
|
||||||
type WeekDay = Int
|
type WeekDay = Int
|
||||||
type MonthDay = Int
|
type MonthDay = Int
|
||||||
|
@ -48,20 +48,20 @@ type YearDay = Int
|
||||||
data ScheduledTime
|
data ScheduledTime
|
||||||
= AnyTime
|
= AnyTime
|
||||||
| SpecificTime Hour Minute
|
| SpecificTime Hour Minute
|
||||||
deriving (Eq, Show, Ord)
|
deriving (Eq, Read, Show, Ord)
|
||||||
|
|
||||||
type Hour = Int
|
type Hour = Int
|
||||||
type Minute = Int
|
type Minute = Int
|
||||||
|
|
||||||
data Duration = MinutesDuration Int
|
data Duration = MinutesDuration Int
|
||||||
deriving (Eq, Show, Ord)
|
deriving (Eq, Read, Show, Ord)
|
||||||
|
|
||||||
{- Next time a Schedule should take effect. The NextTimeWindow is used
|
{- Next time a Schedule should take effect. The NextTimeWindow is used
|
||||||
- when a Schedule is allowed to start at some point within the window. -}
|
- when a Schedule is allowed to start at some point within the window. -}
|
||||||
data NextTime
|
data NextTime
|
||||||
= NextTimeExactly LocalTime
|
= NextTimeExactly LocalTime
|
||||||
| NextTimeWindow LocalTime LocalTime
|
| NextTimeWindow LocalTime LocalTime
|
||||||
deriving (Eq, Show)
|
deriving (Eq, Read, Show)
|
||||||
|
|
||||||
nextTime :: Schedule -> Maybe LocalTime -> IO (Maybe NextTime)
|
nextTime :: Schedule -> Maybe LocalTime -> IO (Maybe NextTime)
|
||||||
nextTime schedule lasttime = do
|
nextTime schedule lasttime = do
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue