2013-10-08 15:48:28 +00:00
|
|
|
{- git-annex assistant sceduled jobs runner
|
|
|
|
-
|
2015-01-21 16:50:09 +00:00
|
|
|
- Copyright 2013 Joey Hess <id@joeyh.name>
|
2013-10-08 15:48:28 +00:00
|
|
|
-
|
|
|
|
- Licensed under the GNU GPL version 3 or higher.
|
|
|
|
-}
|
|
|
|
|
2013-10-27 20:42:13 +00:00
|
|
|
{-# LANGUAGE DeriveDataTypeable #-}
|
2013-10-08 15:48:28 +00:00
|
|
|
|
|
|
|
module Assistant.Threads.Cronner (
|
|
|
|
cronnerThread
|
|
|
|
) where
|
|
|
|
|
|
|
|
import Assistant.Common
|
|
|
|
import Assistant.DaemonStatus
|
|
|
|
import Utility.NotificationBroadcaster
|
|
|
|
import Annex.UUID
|
2015-02-28 21:23:13 +00:00
|
|
|
import Annex.Path
|
2013-10-08 22:01:03 +00:00
|
|
|
import Logs.Schedule
|
|
|
|
import Utility.Scheduled
|
2013-10-08 15:48:28 +00:00
|
|
|
import Types.ScheduledActivity
|
2013-10-08 22:01:03 +00:00
|
|
|
import Utility.ThreadScheduler
|
|
|
|
import Utility.HumanTime
|
2013-10-11 20:03:18 +00:00
|
|
|
import Utility.Batch
|
2013-10-10 21:27:00 +00:00
|
|
|
import Assistant.TransferQueue
|
|
|
|
import Annex.Content
|
2016-08-03 16:37:12 +00:00
|
|
|
import Types.Transfer
|
2013-10-10 22:02:33 +00:00
|
|
|
import Assistant.Types.UrlRenderer
|
|
|
|
import Assistant.Alert
|
2013-10-11 20:03:18 +00:00
|
|
|
import Remote
|
2013-10-27 19:38:59 +00:00
|
|
|
import qualified Types.Remote as Remote
|
2013-10-29 20:48:06 +00:00
|
|
|
import qualified Git
|
2013-10-22 20:02:52 +00:00
|
|
|
import qualified Git.Fsck
|
2013-10-29 20:48:06 +00:00
|
|
|
import Assistant.Fsck
|
2013-10-26 21:16:29 +00:00
|
|
|
import Assistant.Repair
|
2013-10-08 15:48:28 +00:00
|
|
|
|
|
|
|
import Control.Concurrent.Async
|
2013-10-13 21:14:56 +00:00
|
|
|
import Control.Concurrent.MVar
|
2013-10-08 15:48:28 +00:00
|
|
|
import Data.Time.LocalTime
|
2013-10-08 22:01:03 +00:00
|
|
|
import Data.Time.Clock
|
2013-10-08 15:48:28 +00:00
|
|
|
import qualified Data.Map as M
|
|
|
|
import qualified Data.Set as S
|
|
|
|
|
|
|
|
{- Loads schedules for this repository, and fires off one thread for each
|
2013-10-13 21:14:56 +00:00
|
|
|
- scheduled event that runs on this repository. Each thread sleeps until
|
|
|
|
- its event is scheduled to run.
|
|
|
|
-
|
|
|
|
- To handle events that run on remotes, which need to only run when
|
|
|
|
- their remote gets connected, threads are also started, and are passed
|
|
|
|
- a MVar to wait on, which is stored in the DaemonStatus's
|
|
|
|
- connectRemoteNotifiers.
|
2013-10-08 15:48:28 +00:00
|
|
|
-
|
|
|
|
- In the meantime the main thread waits for any changes to the
|
|
|
|
- schedules. When there's a change, compare the old and new list of
|
|
|
|
- schedules to find deleted and added ones. Start new threads for added
|
2013-10-10 20:15:12 +00:00
|
|
|
- ones, and kill the threads for deleted ones. -}
|
2013-10-10 22:02:33 +00:00
|
|
|
cronnerThread :: UrlRenderer -> NamedThread
|
|
|
|
cronnerThread urlrenderer = namedThreadUnchecked "Cronner" $ do
|
2013-10-29 20:48:06 +00:00
|
|
|
fsckNudge urlrenderer Nothing
|
2013-10-08 15:48:28 +00:00
|
|
|
dstatus <- getDaemonStatus
|
|
|
|
h <- liftIO $ newNotificationHandle False (scheduleLogNotifier dstatus)
|
2013-10-13 21:14:56 +00:00
|
|
|
go h M.empty M.empty
|
2013-10-08 15:48:28 +00:00
|
|
|
where
|
2013-10-13 21:14:56 +00:00
|
|
|
go h amap nmap = do
|
2013-10-08 15:48:28 +00:00
|
|
|
activities <- liftAnnex $ scheduleGet =<< getUUID
|
|
|
|
|
2013-10-13 21:14:56 +00:00
|
|
|
let addedactivities = activities `S.difference` M.keysSet amap
|
|
|
|
let removedactivities = M.keysSet amap `S.difference` activities
|
2013-10-08 15:48:28 +00:00
|
|
|
|
2013-10-10 20:15:12 +00:00
|
|
|
forM_ (S.toList removedactivities) $ \activity ->
|
2013-10-13 21:14:56 +00:00
|
|
|
case M.lookup activity amap of
|
2013-10-10 20:15:12 +00:00
|
|
|
Just a -> do
|
|
|
|
debug ["stopping removed job for", fromScheduledActivity activity, show (asyncThreadId a)]
|
|
|
|
liftIO $ cancel a
|
|
|
|
Nothing -> noop
|
2013-10-08 15:48:28 +00:00
|
|
|
|
|
|
|
lastruntimes <- liftAnnex getLastRunTimes
|
2013-10-13 21:14:56 +00:00
|
|
|
started <- startactivities (S.toList addedactivities) lastruntimes
|
|
|
|
let addedamap = M.fromList $ map fst started
|
|
|
|
let addednmap = M.fromList $ catMaybes $ map snd started
|
|
|
|
|
|
|
|
let removefiltered = M.filterWithKey (\k _ -> S.member k removedactivities)
|
|
|
|
let amap' = M.difference (M.union addedamap amap) (removefiltered amap)
|
|
|
|
let nmap' = M.difference (M.union addednmap nmap) (removefiltered nmap)
|
|
|
|
modifyDaemonStatus_ $ \s -> s { connectRemoteNotifiers = M.fromListWith (++) (M.elems nmap') }
|
2013-10-08 15:48:28 +00:00
|
|
|
|
|
|
|
liftIO $ waitNotification h
|
2013-10-10 20:15:12 +00:00
|
|
|
debug ["reloading changed activities"]
|
2013-10-13 21:14:56 +00:00
|
|
|
go h amap' nmap'
|
2014-10-09 18:53:13 +00:00
|
|
|
startactivities as lastruntimes = forM as $ \activity ->
|
2013-10-13 21:14:56 +00:00
|
|
|
case connectActivityUUID activity of
|
|
|
|
Nothing -> do
|
|
|
|
runner <- asIO2 (sleepingActivityThread urlrenderer)
|
|
|
|
a <- liftIO $ async $
|
|
|
|
runner activity (M.lookup activity lastruntimes)
|
|
|
|
return ((activity, a), Nothing)
|
|
|
|
Just u -> do
|
|
|
|
mvar <- liftIO newEmptyMVar
|
|
|
|
runner <- asIO2 (remoteActivityThread urlrenderer mvar)
|
|
|
|
a <- liftIO $ async $
|
|
|
|
runner activity (M.lookup activity lastruntimes)
|
|
|
|
return ((activity, a), Just (activity, (u, [mvar])))
|
2013-10-08 15:48:28 +00:00
|
|
|
|
|
|
|
{- Calculate the next time the activity is scheduled to run, then
|
|
|
|
- sleep until that time, and run it. Then call setLastRunTime, and
|
|
|
|
- loop.
|
|
|
|
-}
|
2013-10-13 21:14:56 +00:00
|
|
|
sleepingActivityThread :: UrlRenderer -> ScheduledActivity -> Maybe LocalTime -> Assistant ()
|
|
|
|
sleepingActivityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lasttime
|
2013-10-08 22:01:03 +00:00
|
|
|
where
|
2014-10-09 18:53:13 +00:00
|
|
|
getnexttime = liftIO . nextTime schedule
|
|
|
|
go _ Nothing = debug ["no scheduled events left for", desc]
|
2013-10-13 21:14:56 +00:00
|
|
|
go l (Just (NextTimeExactly t)) = waitrun l t Nothing
|
2013-10-08 22:01:03 +00:00
|
|
|
go l (Just (NextTimeWindow windowstart windowend)) =
|
2013-10-13 21:14:56 +00:00
|
|
|
waitrun l windowstart (Just windowend)
|
2013-10-08 22:01:03 +00:00
|
|
|
desc = fromScheduledActivity activity
|
|
|
|
schedule = getSchedule activity
|
2013-10-13 21:14:56 +00:00
|
|
|
waitrun l t mmaxt = do
|
2013-10-08 22:01:03 +00:00
|
|
|
seconds <- liftIO $ secondsUntilLocalTime t
|
|
|
|
when (seconds > Seconds 0) $ do
|
|
|
|
debug ["waiting", show seconds, "for next scheduled", desc]
|
|
|
|
liftIO $ threadDelaySeconds seconds
|
|
|
|
now <- liftIO getCurrentTime
|
|
|
|
tz <- liftIO $ getTimeZone now
|
|
|
|
let nowt = utcToLocalTime tz now
|
|
|
|
if tolate nowt tz
|
|
|
|
then do
|
|
|
|
debug ["too late to run scheduled", desc]
|
|
|
|
go l =<< getnexttime l
|
2013-10-13 21:14:56 +00:00
|
|
|
else run nowt
|
2013-10-08 22:01:03 +00:00
|
|
|
where
|
2014-10-09 18:53:13 +00:00
|
|
|
tolate nowt tz = case mmaxt of
|
2013-10-08 22:01:03 +00:00
|
|
|
Just maxt -> nowt > maxt
|
|
|
|
-- allow the job to start 10 minutes late
|
|
|
|
Nothing ->diffUTCTime
|
|
|
|
(localTimeToUTC tz nowt)
|
|
|
|
(localTimeToUTC tz t) > 600
|
|
|
|
run nowt = do
|
2013-10-13 21:14:56 +00:00
|
|
|
runActivity urlrenderer activity nowt
|
2013-10-08 22:01:03 +00:00
|
|
|
go (Just nowt) =<< getnexttime (Just nowt)
|
|
|
|
|
2013-10-13 21:14:56 +00:00
|
|
|
{- Wait for the remote to become available by waiting on the MVar.
|
|
|
|
- Then check if the time is within a time window when activity
|
|
|
|
- is scheduled to run, and if so run it.
|
|
|
|
- Otherwise, just wait again on the MVar.
|
|
|
|
-}
|
|
|
|
remoteActivityThread :: UrlRenderer -> MVar () -> ScheduledActivity -> Maybe LocalTime -> Assistant ()
|
|
|
|
remoteActivityThread urlrenderer mvar activity lasttime = do
|
|
|
|
liftIO $ takeMVar mvar
|
|
|
|
go =<< liftIO (nextTime (getSchedule activity) lasttime)
|
|
|
|
where
|
|
|
|
go (Just (NextTimeWindow windowstart windowend)) = do
|
|
|
|
now <- liftIO getCurrentTime
|
|
|
|
tz <- liftIO $ getTimeZone now
|
|
|
|
if now >= localTimeToUTC tz windowstart && now <= localTimeToUTC tz windowend
|
|
|
|
then do
|
|
|
|
let nowt = utcToLocalTime tz now
|
|
|
|
runActivity urlrenderer activity nowt
|
|
|
|
loop (Just nowt)
|
|
|
|
else loop lasttime
|
|
|
|
go _ = noop -- running at exact time not handled here
|
|
|
|
loop = remoteActivityThread urlrenderer mvar activity
|
|
|
|
|
2013-10-08 22:01:03 +00:00
|
|
|
secondsUntilLocalTime :: LocalTime -> IO Seconds
|
|
|
|
secondsUntilLocalTime t = do
|
|
|
|
now <- getCurrentTime
|
|
|
|
tz <- getTimeZone now
|
2013-10-10 16:54:59 +00:00
|
|
|
let secs = truncate $ diffUTCTime (localTimeToUTC tz t) now
|
2013-10-08 22:01:03 +00:00
|
|
|
return $ if secs > 0
|
|
|
|
then Seconds secs
|
|
|
|
else Seconds 0
|
|
|
|
|
2013-10-13 21:14:56 +00:00
|
|
|
runActivity :: UrlRenderer -> ScheduledActivity -> LocalTime -> Assistant ()
|
|
|
|
runActivity urlrenderer activity nowt = do
|
|
|
|
debug ["starting", desc]
|
|
|
|
runActivity' urlrenderer activity
|
|
|
|
debug ["finished", desc]
|
|
|
|
liftAnnex $ setLastRunTime activity nowt
|
|
|
|
where
|
|
|
|
desc = fromScheduledActivity activity
|
|
|
|
|
|
|
|
runActivity' :: UrlRenderer -> ScheduledActivity -> Assistant ()
|
|
|
|
runActivity' urlrenderer (ScheduledSelfFsck _ d) = do
|
2015-02-28 21:23:13 +00:00
|
|
|
program <- liftIO programPath
|
2013-10-22 20:02:52 +00:00
|
|
|
g <- liftAnnex gitRepo
|
|
|
|
fsckresults <- showFscking urlrenderer Nothing $ tryNonAsync $ do
|
|
|
|
void $ batchCommand program (Param "fsck" : annexFsckParams d)
|
2013-10-27 19:38:59 +00:00
|
|
|
Git.Fsck.findBroken True g
|
|
|
|
u <- liftAnnex getUUID
|
2013-10-27 20:42:13 +00:00
|
|
|
void $ repairWhenNecessary urlrenderer u Nothing fsckresults
|
2013-10-11 20:03:18 +00:00
|
|
|
mapM_ reget =<< liftAnnex (dirKeys gitAnnexBadDir)
|
|
|
|
where
|
2017-03-10 17:12:24 +00:00
|
|
|
reget k = queueTransfers "fsck found bad file; redownloading" Next k (AssociatedFile Nothing) Download
|
unify exception handling into Utility.Exception
Removed old extensible-exceptions, only needed for very old ghc.
Made webdav use Utility.Exception, to work after some changes in DAV's
exception handling.
Removed Annex.Exception. Mostly this was trivial, but note that
tryAnnex is replaced with tryNonAsync and catchAnnex replaced with
catchNonAsync. In theory that could be a behavior change, since the former
caught all exceptions, and the latter don't catch async exceptions.
However, in practice, nothing in the Annex monad uses async exceptions.
Grepping for throwTo and killThread only find stuff in the assistant,
which does not seem related.
Command.Add.undo is changed to accept a SomeException, and things
that use it for rollback now catch non-async exceptions, rather than
only IOExceptions.
2014-08-08 01:55:44 +00:00
|
|
|
runActivity' urlrenderer (ScheduledRemoteFsck u s d) = dispatch =<< liftAnnex (remoteFromUUID u)
|
2013-10-11 20:03:18 +00:00
|
|
|
where
|
unify exception handling into Utility.Exception
Removed old extensible-exceptions, only needed for very old ghc.
Made webdav use Utility.Exception, to work after some changes in DAV's
exception handling.
Removed Annex.Exception. Mostly this was trivial, but note that
tryAnnex is replaced with tryNonAsync and catchAnnex replaced with
catchNonAsync. In theory that could be a behavior change, since the former
caught all exceptions, and the latter don't catch async exceptions.
However, in practice, nothing in the Annex monad uses async exceptions.
Grepping for throwTo and killThread only find stuff in the assistant,
which does not seem related.
Command.Add.undo is changed to accept a SomeException, and things
that use it for rollback now catch non-async exceptions, rather than
only IOExceptions.
2014-08-08 01:55:44 +00:00
|
|
|
dispatch Nothing = debug ["skipping remote fsck of uuid without a configured remote", fromUUID u, fromSchedule s]
|
|
|
|
dispatch (Just rmt) = void $ case Remote.remoteFsck rmt of
|
2013-10-27 19:38:59 +00:00
|
|
|
Nothing -> go rmt $ do
|
2015-02-28 21:23:13 +00:00
|
|
|
program <- programPath
|
2013-10-27 19:38:59 +00:00
|
|
|
void $ batchCommand program $
|
2013-10-11 20:03:18 +00:00
|
|
|
[ Param "fsck"
|
|
|
|
-- avoid downloading files
|
|
|
|
, Param "--fast"
|
|
|
|
, Param "--from"
|
2013-10-27 19:38:59 +00:00
|
|
|
, Param $ Remote.name rmt
|
2013-10-22 20:02:52 +00:00
|
|
|
] ++ annexFsckParams d
|
2013-10-27 19:38:59 +00:00
|
|
|
Just mkfscker -> do
|
2013-10-11 20:03:18 +00:00
|
|
|
{- Note that having mkfsker return an IO action
|
|
|
|
- avoids running a long duration fsck in the
|
|
|
|
- Annex monad. -}
|
2013-10-27 19:38:59 +00:00
|
|
|
go rmt =<< liftAnnex (mkfscker (annexFsckParams d))
|
|
|
|
go rmt annexfscker = do
|
2013-10-29 20:48:06 +00:00
|
|
|
fsckresults <- showFscking urlrenderer (Just rmt) $ tryNonAsync $ do
|
2013-10-27 19:38:59 +00:00
|
|
|
void annexfscker
|
|
|
|
let r = Remote.repo rmt
|
|
|
|
if Git.repoIsLocal r && not (Git.repoIsLocalUnknown r)
|
|
|
|
then Just <$> Git.Fsck.findBroken True r
|
|
|
|
else pure Nothing
|
2013-10-27 20:42:13 +00:00
|
|
|
maybe noop (void . repairWhenNecessary urlrenderer u (Just rmt)) fsckresults
|
2013-10-08 22:01:03 +00:00
|
|
|
|
2013-10-22 20:02:52 +00:00
|
|
|
annexFsckParams :: Duration -> [CommandParam]
|
|
|
|
annexFsckParams d =
|
2013-10-11 20:03:18 +00:00
|
|
|
[ Param "--incremental-schedule=1d"
|
|
|
|
, Param $ "--time-limit=" ++ fromDuration d
|
|
|
|
]
|