2013-10-08 15:48:28 +00:00
|
|
|
{- git-annex assistant sceduled jobs runner
|
|
|
|
-
|
|
|
|
- Copyright 2013 Joey Hess <joey@kitenet.net>
|
|
|
|
-
|
|
|
|
- Licensed under the GNU GPL version 3 or higher.
|
|
|
|
-}
|
|
|
|
|
|
|
|
{-# LANGUAGE DeriveDataTypeable #-}
|
|
|
|
|
|
|
|
module Assistant.Threads.Cronner (
|
|
|
|
cronnerThread
|
|
|
|
) where
|
|
|
|
|
|
|
|
import Assistant.Common
|
|
|
|
import Assistant.DaemonStatus
|
|
|
|
import Utility.NotificationBroadcaster
|
|
|
|
import Annex.UUID
|
2013-10-08 22:01:03 +00:00
|
|
|
import Config.Files
|
|
|
|
import Logs.Schedule
|
|
|
|
import Utility.Scheduled
|
2013-10-08 15:48:28 +00:00
|
|
|
import Types.ScheduledActivity
|
2013-10-08 22:01:03 +00:00
|
|
|
import Utility.ThreadScheduler
|
|
|
|
import Utility.HumanTime
|
|
|
|
import qualified Build.SysConfig
|
2013-10-10 21:27:00 +00:00
|
|
|
import Assistant.TransferQueue
|
|
|
|
import Annex.Content
|
|
|
|
import Logs.Transfer
|
2013-10-08 15:48:28 +00:00
|
|
|
|
|
|
|
import Control.Concurrent.Async
|
|
|
|
import Data.Time.LocalTime
|
2013-10-08 22:01:03 +00:00
|
|
|
import Data.Time.Clock
|
2013-10-08 15:48:28 +00:00
|
|
|
import qualified Data.Map as M
|
|
|
|
import qualified Data.Set as S
|
|
|
|
import qualified Control.Exception as E
|
|
|
|
|
|
|
|
{- Loads schedules for this repository, and fires off one thread for each
|
|
|
|
- scheduled event. These threads sleep until the next time the event
|
|
|
|
- should run.
|
|
|
|
-
|
|
|
|
- In the meantime the main thread waits for any changes to the
|
|
|
|
- schedules. When there's a change, compare the old and new list of
|
|
|
|
- schedules to find deleted and added ones. Start new threads for added
|
2013-10-10 20:15:12 +00:00
|
|
|
- ones, and kill the threads for deleted ones. -}
|
2013-10-08 15:48:28 +00:00
|
|
|
cronnerThread :: NamedThread
|
|
|
|
cronnerThread = namedThreadUnchecked "Cronner" $ do
|
|
|
|
dstatus <- getDaemonStatus
|
|
|
|
h <- liftIO $ newNotificationHandle False (scheduleLogNotifier dstatus)
|
|
|
|
go h M.empty
|
|
|
|
where
|
|
|
|
go h m = do
|
|
|
|
activities <- liftAnnex $ scheduleGet =<< getUUID
|
|
|
|
|
|
|
|
let addedactivities = activities `S.difference` M.keysSet m
|
|
|
|
let removedactivities = M.keysSet m `S.difference` activities
|
|
|
|
|
2013-10-10 20:15:12 +00:00
|
|
|
forM_ (S.toList removedactivities) $ \activity ->
|
|
|
|
case M.lookup activity m of
|
|
|
|
Just a -> do
|
|
|
|
debug ["stopping removed job for", fromScheduledActivity activity, show (asyncThreadId a)]
|
|
|
|
liftIO $ cancel a
|
|
|
|
Nothing -> noop
|
2013-10-08 15:48:28 +00:00
|
|
|
|
|
|
|
lastruntimes <- liftAnnex getLastRunTimes
|
|
|
|
addedm <- M.fromList <$> startactivities (S.toList addedactivities) lastruntimes
|
|
|
|
|
|
|
|
liftIO $ waitNotification h
|
2013-10-10 20:15:12 +00:00
|
|
|
debug ["reloading changed activities"]
|
2013-10-08 15:48:28 +00:00
|
|
|
|
|
|
|
let m' = M.difference (M.union addedm m)
|
|
|
|
(M.filterWithKey (\k _ -> S.member k removedactivities) m)
|
|
|
|
go h m'
|
|
|
|
startactivities as lastruntimes = forM as $ \activity -> do
|
|
|
|
runner <- asIO2 activityThread
|
|
|
|
a <- liftIO $ async $
|
|
|
|
runner activity (M.lookup activity lastruntimes)
|
|
|
|
return (activity, a)
|
|
|
|
|
|
|
|
{- Calculate the next time the activity is scheduled to run, then
|
|
|
|
- sleep until that time, and run it. Then call setLastRunTime, and
|
|
|
|
- loop.
|
|
|
|
-}
|
|
|
|
activityThread :: ScheduledActivity -> Maybe LocalTime -> Assistant ()
|
2013-10-08 22:01:03 +00:00
|
|
|
activityThread activity lasttime = go lasttime =<< getnexttime lasttime
|
|
|
|
where
|
|
|
|
getnexttime = liftIO . nextTime schedule
|
|
|
|
go _ Nothing = debug ["no scheduled events left for", desc]
|
|
|
|
go l (Just (NextTimeExactly t)) = runafter l t Nothing run
|
|
|
|
go l (Just (NextTimeWindow windowstart windowend)) =
|
|
|
|
runafter l windowstart (Just windowend) run
|
|
|
|
desc = fromScheduledActivity activity
|
|
|
|
schedule = getSchedule activity
|
|
|
|
runafter l t mmaxt a = do
|
|
|
|
seconds <- liftIO $ secondsUntilLocalTime t
|
|
|
|
when (seconds > Seconds 0) $ do
|
|
|
|
debug ["waiting", show seconds, "for next scheduled", desc]
|
|
|
|
liftIO $ threadDelaySeconds seconds
|
|
|
|
now <- liftIO getCurrentTime
|
|
|
|
tz <- liftIO $ getTimeZone now
|
|
|
|
let nowt = utcToLocalTime tz now
|
|
|
|
if tolate nowt tz
|
|
|
|
then do
|
|
|
|
debug ["too late to run scheduled", desc]
|
|
|
|
go l =<< getnexttime l
|
|
|
|
else a nowt
|
|
|
|
where
|
|
|
|
tolate nowt tz = case mmaxt of
|
|
|
|
Just maxt -> nowt > maxt
|
|
|
|
-- allow the job to start 10 minutes late
|
|
|
|
Nothing ->diffUTCTime
|
|
|
|
(localTimeToUTC tz nowt)
|
|
|
|
(localTimeToUTC tz t) > 600
|
|
|
|
run nowt = do
|
|
|
|
debug ["starting", desc]
|
|
|
|
runActivity activity
|
|
|
|
debug ["finished", desc]
|
|
|
|
liftAnnex $ setLastRunTime activity nowt
|
|
|
|
go (Just nowt) =<< getnexttime (Just nowt)
|
|
|
|
|
|
|
|
secondsUntilLocalTime :: LocalTime -> IO Seconds
|
|
|
|
secondsUntilLocalTime t = do
|
|
|
|
now <- getCurrentTime
|
|
|
|
tz <- getTimeZone now
|
2013-10-10 16:54:59 +00:00
|
|
|
let secs = truncate $ diffUTCTime (localTimeToUTC tz t) now
|
2013-10-08 22:01:03 +00:00
|
|
|
return $ if secs > 0
|
|
|
|
then Seconds secs
|
|
|
|
else Seconds 0
|
|
|
|
|
|
|
|
runActivity :: ScheduledActivity -> Assistant ()
|
2013-10-10 21:27:00 +00:00
|
|
|
runActivity (ScheduledSelfFsck _ d) = do
|
|
|
|
program <- liftIO $ readProgramFile
|
|
|
|
void $ liftIO $ niceShell $
|
2013-10-10 16:49:54 +00:00
|
|
|
program ++ " fsck --incremental-schedule=1d --time-limit=" ++ fromDuration d
|
2013-10-10 21:27:00 +00:00
|
|
|
queueBad
|
2013-10-08 22:01:03 +00:00
|
|
|
runActivity (ScheduledRemoteFsck _ _ _) =
|
|
|
|
debug ["remote fsck not implemented yet"]
|
|
|
|
|
2013-10-10 21:27:00 +00:00
|
|
|
queueBad :: Assistant ()
|
|
|
|
queueBad = mapM_ queue =<< liftAnnex (dirKeys gitAnnexBadDir)
|
|
|
|
where
|
|
|
|
queue k = queueTransfers "fsck found bad file; redownloading" Next k Nothing Download
|
|
|
|
|
2013-10-10 20:15:12 +00:00
|
|
|
{- Runs a shell command niced, until it terminates.
|
|
|
|
-
|
|
|
|
- When an async exception is received, the command is sent a SIGTERM,
|
|
|
|
- and after it finishes shutting down the exception is re-raised. -}
|
|
|
|
niceShell :: String -> IO ExitCode
|
|
|
|
niceShell command = do
|
|
|
|
(_, _, _, pid) <- createProcess $ proc "sh"
|
|
|
|
[ "-c"
|
|
|
|
, "exec " ++ nicedcommand
|
|
|
|
]
|
|
|
|
r <- E.try (waitForProcess pid) :: IO (Either E.SomeException ExitCode)
|
|
|
|
case r of
|
|
|
|
Right exitcode -> return exitcode
|
|
|
|
Left asyncexception -> do
|
|
|
|
terminateProcess pid
|
|
|
|
void $ waitForProcess pid
|
|
|
|
E.throwIO asyncexception
|
2013-10-08 22:01:03 +00:00
|
|
|
where
|
|
|
|
nicedcommand
|
|
|
|
| Build.SysConfig.nice = "nice " ++ command
|
|
|
|
| otherwise = command
|