225 lines
8 KiB
Haskell
225 lines
8 KiB
Haskell
{- git-annex assistant scheduled jobs runner
|
|
-
|
|
- Copyright 2013 Joey Hess <id@joeyh.name>
|
|
-
|
|
- Licensed under the GNU AGPL version 3 or higher.
|
|
-}
|
|
|
|
{-# LANGUAGE DeriveDataTypeable #-}
|
|
|
|
module Assistant.Threads.Cronner (
|
|
cronnerThread
|
|
) where
|
|
|
|
import Assistant.Common
|
|
import Assistant.DaemonStatus
|
|
import Utility.NotificationBroadcaster
|
|
import Annex.UUID
|
|
import Annex.Path
|
|
import Logs.Schedule
|
|
import Utility.Scheduled
|
|
import Types.ScheduledActivity
|
|
import Utility.ThreadScheduler
|
|
import Utility.HumanTime
|
|
import Utility.Batch
|
|
import Assistant.TransferQueue
|
|
import Annex.Content
|
|
import Types.Transfer
|
|
import Assistant.Types.UrlRenderer
|
|
import Assistant.Alert
|
|
import Remote
|
|
import qualified Types.Remote as Remote
|
|
import qualified Git
|
|
import qualified Git.Fsck
|
|
import Assistant.Fsck
|
|
import Assistant.Repair
|
|
|
|
import Control.Concurrent.Async
|
|
import Control.Concurrent.MVar
|
|
import Data.Time.LocalTime
|
|
import Data.Time.Clock
|
|
import qualified Data.Map as M
|
|
import qualified Data.Set as S
|
|
|
|
{- Loads schedules for this repository, and fires off one thread for each
|
|
- scheduled event that runs on this repository. Each thread sleeps until
|
|
- its event is scheduled to run.
|
|
-
|
|
- To handle events that run on remotes, which need to only run when
|
|
- their remote gets connected, threads are also started, and are passed
|
|
- a MVar to wait on, which is stored in the DaemonStatus's
|
|
- connectRemoteNotifiers.
|
|
-
|
|
- In the meantime the main thread waits for any changes to the
|
|
- schedules. When there's a change, compare the old and new list of
|
|
- schedules to find deleted and added ones. Start new threads for added
|
|
- ones, and kill the threads for deleted ones. -}
|
|
cronnerThread :: UrlRenderer -> NamedThread
|
|
cronnerThread urlrenderer = namedThreadUnchecked "Cronner" $ do
|
|
fsckNudge urlrenderer Nothing
|
|
dstatus <- getDaemonStatus
|
|
h <- liftIO $ newNotificationHandle False (scheduleLogNotifier dstatus)
|
|
go h M.empty M.empty
|
|
where
|
|
go h amap nmap = do
|
|
activities <- liftAnnex $ scheduleGet =<< getUUID
|
|
|
|
let addedactivities = activities `S.difference` M.keysSet amap
|
|
let removedactivities = M.keysSet amap `S.difference` activities
|
|
|
|
forM_ (S.toList removedactivities) $ \activity ->
|
|
case M.lookup activity amap of
|
|
Just a -> do
|
|
debug ["stopping removed job for", fromScheduledActivity activity, show (asyncThreadId a)]
|
|
liftIO $ cancel a
|
|
Nothing -> noop
|
|
|
|
lastruntimes <- liftAnnex getLastRunTimes
|
|
started <- startactivities (S.toList addedactivities) lastruntimes
|
|
let addedamap = M.fromList $ map fst started
|
|
let addednmap = M.fromList $ catMaybes $ map snd started
|
|
|
|
let removefiltered = M.filterWithKey (\k _ -> S.member k removedactivities)
|
|
let amap' = M.difference (M.union addedamap amap) (removefiltered amap)
|
|
let nmap' = M.difference (M.union addednmap nmap) (removefiltered nmap)
|
|
modifyDaemonStatus_ $ \s -> s { connectRemoteNotifiers = M.fromListWith (++) (M.elems nmap') }
|
|
|
|
liftIO $ waitNotification h
|
|
debug ["reloading changed activities"]
|
|
go h amap' nmap'
|
|
startactivities as lastruntimes = forM as $ \activity ->
|
|
case connectActivityUUID activity of
|
|
Nothing -> do
|
|
runner <- asIO2 (sleepingActivityThread urlrenderer)
|
|
a <- liftIO $ async $
|
|
runner activity (M.lookup activity lastruntimes)
|
|
return ((activity, a), Nothing)
|
|
Just u -> do
|
|
mvar <- liftIO newEmptyMVar
|
|
runner <- asIO2 (remoteActivityThread urlrenderer mvar)
|
|
a <- liftIO $ async $
|
|
runner activity (M.lookup activity lastruntimes)
|
|
return ((activity, a), Just (activity, (u, [mvar])))
|
|
|
|
{- Calculate the next time the activity is scheduled to run, then
|
|
- sleep until that time, and run it. Then call setLastRunTime, and
|
|
- loop.
|
|
-}
|
|
sleepingActivityThread :: UrlRenderer -> ScheduledActivity -> Maybe LocalTime -> Assistant ()
|
|
sleepingActivityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lasttime
|
|
where
|
|
getnexttime = liftIO . nextTime schedule
|
|
go _ Nothing = debug ["no scheduled events left for", desc]
|
|
go l (Just (NextTimeExactly t)) = waitrun l t Nothing
|
|
go l (Just (NextTimeWindow windowstart windowend)) =
|
|
waitrun l windowstart (Just windowend)
|
|
desc = fromScheduledActivity activity
|
|
schedule = getSchedule activity
|
|
waitrun l t mmaxt = do
|
|
seconds <- liftIO $ secondsUntilLocalTime t
|
|
when (seconds > Seconds 0) $ do
|
|
debug ["waiting", show seconds, "for next scheduled", desc]
|
|
liftIO $ threadDelaySeconds seconds
|
|
now <- liftIO getCurrentTime
|
|
tz <- liftIO $ getTimeZone now
|
|
let nowt = utcToLocalTime tz now
|
|
if tolate nowt tz
|
|
then do
|
|
debug ["too late to run scheduled", desc]
|
|
go l =<< getnexttime l
|
|
else run nowt
|
|
where
|
|
tolate nowt tz = case mmaxt of
|
|
Just maxt -> nowt > maxt
|
|
-- allow the job to start 10 minutes late
|
|
Nothing -> diffUTCTime
|
|
(localTimeToUTC tz nowt)
|
|
(localTimeToUTC tz t) > 600
|
|
run nowt = do
|
|
runActivity urlrenderer activity nowt
|
|
go (Just nowt) =<< getnexttime (Just nowt)
|
|
|
|
{- Wait for the remote to become available by waiting on the MVar.
|
|
- Then check if the time is within a time window when activity
|
|
- is scheduled to run, and if so run it.
|
|
- Otherwise, just wait again on the MVar.
|
|
-}
|
|
remoteActivityThread :: UrlRenderer -> MVar () -> ScheduledActivity -> Maybe LocalTime -> Assistant ()
|
|
remoteActivityThread urlrenderer mvar activity lasttime = do
|
|
liftIO $ takeMVar mvar
|
|
go =<< liftIO (nextTime (getSchedule activity) lasttime)
|
|
where
|
|
go (Just (NextTimeWindow windowstart windowend)) = do
|
|
now <- liftIO getCurrentTime
|
|
tz <- liftIO $ getTimeZone now
|
|
if now >= localTimeToUTC tz windowstart && now <= localTimeToUTC tz windowend
|
|
then do
|
|
let nowt = utcToLocalTime tz now
|
|
runActivity urlrenderer activity nowt
|
|
loop (Just nowt)
|
|
else loop lasttime
|
|
go _ = noop -- running at exact time not handled here
|
|
loop = remoteActivityThread urlrenderer mvar activity
|
|
|
|
secondsUntilLocalTime :: LocalTime -> IO Seconds
|
|
secondsUntilLocalTime t = do
|
|
now <- getCurrentTime
|
|
tz <- getTimeZone now
|
|
let secs = truncate $ diffUTCTime (localTimeToUTC tz t) now
|
|
return $ if secs > 0
|
|
then Seconds secs
|
|
else Seconds 0
|
|
|
|
runActivity :: UrlRenderer -> ScheduledActivity -> LocalTime -> Assistant ()
|
|
runActivity urlrenderer activity nowt = do
|
|
debug ["starting", desc]
|
|
runActivity' urlrenderer activity
|
|
debug ["finished", desc]
|
|
liftAnnex $ setLastRunTime activity nowt
|
|
where
|
|
desc = fromScheduledActivity activity
|
|
|
|
runActivity' :: UrlRenderer -> ScheduledActivity -> Assistant ()
|
|
runActivity' urlrenderer (ScheduledSelfFsck _ d) = do
|
|
program <- liftIO programPath
|
|
g <- liftAnnex gitRepo
|
|
fsckresults <- showFscking urlrenderer Nothing $ tryNonAsync $ do
|
|
void $ batchCommand program (Param "fsck" : annexFsckParams d)
|
|
Git.Fsck.findBroken True False g
|
|
u <- liftAnnex getUUID
|
|
void $ repairWhenNecessary urlrenderer u Nothing fsckresults
|
|
mapM_ reget =<< liftAnnex (dirKeys gitAnnexBadDir)
|
|
where
|
|
reget k = queueTransfers "fsck found bad file; redownloading" Next k (AssociatedFile Nothing) Download
|
|
runActivity' urlrenderer (ScheduledRemoteFsck u s d) = dispatch =<< liftAnnex (remoteFromUUID u)
|
|
where
|
|
dispatch Nothing = debug ["skipping remote fsck of uuid without a configured remote", fromUUID u, fromSchedule s]
|
|
dispatch (Just rmt) = void $ case Remote.remoteFsck rmt of
|
|
Nothing -> go rmt $ do
|
|
program <- programPath
|
|
void $ batchCommand program $
|
|
[ Param "fsck"
|
|
-- avoid downloading files
|
|
, Param "--fast"
|
|
, Param "--from"
|
|
, Param $ Remote.name rmt
|
|
] ++ annexFsckParams d
|
|
Just mkfscker -> do
|
|
{- Note that having mkfsker return an IO action
|
|
- avoids running a long duration fsck in the
|
|
- Annex monad. -}
|
|
go rmt =<< liftAnnex (mkfscker (annexFsckParams d))
|
|
go rmt annexfscker = do
|
|
repo <- liftAnnex $ Remote.getRepo rmt
|
|
fsckresults <- showFscking urlrenderer (Just rmt) $ tryNonAsync $ do
|
|
void annexfscker
|
|
if Git.repoIsLocal repo && not (Git.repoIsLocalUnknown repo)
|
|
then Just <$> Git.Fsck.findBroken True True repo
|
|
else pure Nothing
|
|
maybe noop (void . repairWhenNecessary urlrenderer u (Just rmt)) fsckresults
|
|
|
|
annexFsckParams :: Duration -> [CommandParam]
|
|
annexFsckParams d =
|
|
[ Param "--incremental-schedule=1d"
|
|
, Param $ "--time-limit=" ++ fromDuration d
|
|
]
|