cronner: run jobs triggered by remotes becoming connected (untested)

This commit is contained in:
Joey Hess 2013-10-13 17:14:56 -04:00
parent 57d369c5a8
commit 25462f125d
4 changed files with 94 additions and 31 deletions

View file

@ -44,13 +44,17 @@ import Control.Concurrent
- they push to us. Since XMPP pushes run ansynchronously, any scan of the
- XMPP remotes has to be deferred until they're done pushing to us, so
- all XMPP remotes are marked as possibly desynced.
-
- Also handles signaling any connectRemoteNotifiers, after the syncing is
- done.
-}
reconnectRemotes :: Bool -> [Remote] -> Assistant ()
reconnectRemotes _ [] = noop
reconnectRemotes notifypushes rs = void $ do
modifyDaemonStatus_ $ \s -> s
{ desynced = S.union (S.fromList $ map Remote.uuid xmppremotes) (desynced s) }
syncAction rs (const go)
void $ syncAction rs (const go)
mapM_ signal rs
where
gitremotes = filter (notspecialremote . Remote.repo) rs
(xmppremotes, nonxmppremotes) = partition isXMPPRemote rs
@ -73,6 +77,9 @@ reconnectRemotes notifypushes rs = void $ do
filter (not . remoteAnnexIgnore . Remote.gitconfig)
nonxmppremotes
return failed
signal r = liftIO . mapM_ (flip tryPutMVar ())
=<< fromMaybe [] . M.lookup (Remote.uuid r) . connectRemoteNotifiers
<$> getDaemonStatus
{- Updates the local sync branch, then pushes it to all remotes, in
- parallel, along with the git-annex branch. This is the same

View file

@ -34,6 +34,7 @@ import Assistant.WebApp.Types
import Git.Remote (RemoteName)
import Control.Concurrent.Async
import Control.Concurrent.MVar
import Data.Time.LocalTime
import Data.Time.Clock
import qualified Data.Map as M
@ -42,8 +43,13 @@ import qualified Control.Exception as E
import qualified Data.Text as T
{- Loads schedules for this repository, and fires off one thread for each
- scheduled event. These threads sleep until the next time the event
- should run.
- scheduled event that runs on this repository. Each thread sleeps until
- its event is scheduled to run.
-
- To handle events that run on remotes, which need to only run when
- their remote gets connected, threads are also started, and are passed
- a MVar to wait on, which is stored in the DaemonStatus's
- connectRemoteNotifiers.
-
- In the meantime the main thread waits for any changes to the
- schedules. When there's a change, compare the old and new list of
@ -53,51 +59,63 @@ cronnerThread :: UrlRenderer -> NamedThread
cronnerThread urlrenderer = namedThreadUnchecked "Cronner" $ do
dstatus <- getDaemonStatus
h <- liftIO $ newNotificationHandle False (scheduleLogNotifier dstatus)
go h M.empty
go h M.empty M.empty
where
go h m = do
go h amap nmap = do
activities <- liftAnnex $ scheduleGet =<< getUUID
let addedactivities = activities `S.difference` M.keysSet m
let removedactivities = M.keysSet m `S.difference` activities
let addedactivities = activities `S.difference` M.keysSet amap
let removedactivities = M.keysSet amap `S.difference` activities
forM_ (S.toList removedactivities) $ \activity ->
case M.lookup activity m of
case M.lookup activity amap of
Just a -> do
debug ["stopping removed job for", fromScheduledActivity activity, show (asyncThreadId a)]
liftIO $ cancel a
Nothing -> noop
lastruntimes <- liftAnnex getLastRunTimes
addedm <- M.fromList <$> startactivities (S.toList addedactivities) lastruntimes
started <- startactivities (S.toList addedactivities) lastruntimes
let addedamap = M.fromList $ map fst started
let addednmap = M.fromList $ catMaybes $ map snd started
let removefiltered = M.filterWithKey (\k _ -> S.member k removedactivities)
let amap' = M.difference (M.union addedamap amap) (removefiltered amap)
let nmap' = M.difference (M.union addednmap nmap) (removefiltered nmap)
modifyDaemonStatus_ $ \s -> s { connectRemoteNotifiers = M.fromListWith (++) (M.elems nmap') }
liftIO $ waitNotification h
debug ["reloading changed activities"]
let m' = M.difference (M.union addedm m)
(M.filterWithKey (\k _ -> S.member k removedactivities) m)
go h m'
startactivities as lastruntimes = forM as $ \activity -> do
runner <- asIO2 (activityThread urlrenderer)
a <- liftIO $ async $
runner activity (M.lookup activity lastruntimes)
return (activity, a)
go h amap' nmap'
startactivities as lastruntimes = forM as $ \activity ->
case connectActivityUUID activity of
Nothing -> do
runner <- asIO2 (sleepingActivityThread urlrenderer)
a <- liftIO $ async $
runner activity (M.lookup activity lastruntimes)
return ((activity, a), Nothing)
Just u -> do
mvar <- liftIO newEmptyMVar
runner <- asIO2 (remoteActivityThread urlrenderer mvar)
a <- liftIO $ async $
runner activity (M.lookup activity lastruntimes)
return ((activity, a), Just (activity, (u, [mvar])))
{- Calculate the next time the activity is scheduled to run, then
- sleep until that time, and run it. Then call setLastRunTime, and
- loop.
-}
activityThread :: UrlRenderer -> ScheduledActivity -> Maybe LocalTime -> Assistant ()
activityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lasttime
sleepingActivityThread :: UrlRenderer -> ScheduledActivity -> Maybe LocalTime -> Assistant ()
sleepingActivityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lasttime
where
getnexttime = liftIO . nextTime schedule
go _ Nothing = debug ["no scheduled events left for", desc]
go l (Just (NextTimeExactly t)) = runafter l t Nothing run
go l (Just (NextTimeExactly t)) = waitrun l t Nothing
go l (Just (NextTimeWindow windowstart windowend)) =
runafter l windowstart (Just windowend) run
waitrun l windowstart (Just windowend)
desc = fromScheduledActivity activity
schedule = getSchedule activity
runafter l t mmaxt a = do
waitrun l t mmaxt = do
seconds <- liftIO $ secondsUntilLocalTime t
when (seconds > Seconds 0) $ do
debug ["waiting", show seconds, "for next scheduled", desc]
@ -109,7 +127,7 @@ activityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lastt
then do
debug ["too late to run scheduled", desc]
go l =<< getnexttime l
else a nowt
else run nowt
where
tolate nowt tz = case mmaxt of
Just maxt -> nowt > maxt
@ -118,12 +136,31 @@ activityThread urlrenderer activity lasttime = go lasttime =<< getnexttime lastt
(localTimeToUTC tz nowt)
(localTimeToUTC tz t) > 600
run nowt = do
debug ["starting", desc]
runActivity urlrenderer activity
debug ["finished", desc]
liftAnnex $ setLastRunTime activity nowt
runActivity urlrenderer activity nowt
go (Just nowt) =<< getnexttime (Just nowt)
{- Wait for the remote to become available by waiting on the MVar.
- Then check if the time is within a time window when activity
- is scheduled to run, and if so run it.
- Otherwise, just wait again on the MVar.
-}
remoteActivityThread :: UrlRenderer -> MVar () -> ScheduledActivity -> Maybe LocalTime -> Assistant ()
remoteActivityThread urlrenderer mvar activity lasttime = do
liftIO $ takeMVar mvar
go =<< liftIO (nextTime (getSchedule activity) lasttime)
where
go (Just (NextTimeWindow windowstart windowend)) = do
now <- liftIO getCurrentTime
tz <- liftIO $ getTimeZone now
if now >= localTimeToUTC tz windowstart && now <= localTimeToUTC tz windowend
then do
let nowt = utcToLocalTime tz now
runActivity urlrenderer activity nowt
loop (Just nowt)
else loop lasttime
go _ = noop -- running at exact time not handled here
loop = remoteActivityThread urlrenderer mvar activity
secondsUntilLocalTime :: LocalTime -> IO Seconds
secondsUntilLocalTime t = do
now <- getCurrentTime
@ -133,15 +170,24 @@ secondsUntilLocalTime t = do
then Seconds secs
else Seconds 0
runActivity :: UrlRenderer -> ScheduledActivity -> Assistant ()
runActivity urlrenderer (ScheduledSelfFsck _ d) = do
runActivity :: UrlRenderer -> ScheduledActivity -> LocalTime -> Assistant ()
runActivity urlrenderer activity nowt = do
debug ["starting", desc]
runActivity' urlrenderer activity
debug ["finished", desc]
liftAnnex $ setLastRunTime activity nowt
where
desc = fromScheduledActivity activity
runActivity' :: UrlRenderer -> ScheduledActivity -> Assistant ()
runActivity' urlrenderer (ScheduledSelfFsck _ d) = do
program <- liftIO $ readProgramFile
void $ runFsck urlrenderer Nothing $
batchCommand program (Param "fsck" : fsckParams d)
mapM_ reget =<< liftAnnex (dirKeys gitAnnexBadDir)
where
reget k = queueTransfers "fsck found bad file; redownloading" Next k Nothing Download
runActivity urlrenderer (ScheduledRemoteFsck u s d) = go =<< liftAnnex (remoteFromUUID u)
runActivity' urlrenderer (ScheduledRemoteFsck u s d) = go =<< liftAnnex (remoteFromUUID u)
where
go (Just r) = void $ case Remote.remoteFsck r of
Nothing -> void $ runFsck urlrenderer (Just $ Remote.name r) $ do

View file

@ -18,6 +18,7 @@ import Assistant.Types.NetMessager
import Assistant.Types.Alert
import Control.Concurrent.STM
import Control.Concurrent.MVar
import Control.Concurrent.Async
import Data.Time.Clock.POSIX
import qualified Data.Map as M
@ -69,6 +70,8 @@ data DaemonStatus = DaemonStatus
-- When the XMPP client is connected, this will contain the XMPP
-- address.
, xmppClientID :: Maybe ClientID
-- MVars to signal when a remote gets connected.
, connectRemoteNotifiers :: M.Map UUID [MVar ()]
}
type TransferMap = M.Map Transfer TransferInfo
@ -100,3 +103,4 @@ newDaemonStatus = DaemonStatus
<*> newNotificationBroadcaster
<*> newNotificationBroadcaster
<*> pure Nothing
<*> pure M.empty

View file

@ -19,6 +19,12 @@ data ScheduledActivity
| ScheduledRemoteFsck UUID Schedule Duration
deriving (Eq, Read, Show, Ord)
{- Activities that run on a remote, within a time window, so
- should be run when the remote gets connected. -}
connectActivityUUID :: ScheduledActivity -> Maybe UUID
connectActivityUUID (ScheduledRemoteFsck u (Schedule _ AnyTime) _) = Just u
connectActivityUUID _ = Nothing
getSchedule :: ScheduledActivity -> Schedule
getSchedule (ScheduledSelfFsck s _) = s
getSchedule (ScheduledRemoteFsck _ s _) = s