diff --git a/Assistant.hs b/Assistant.hs index c054dafd3d..4f8a868f4f 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -53,6 +53,9 @@ - CommitChan: (STM TChan) - Commits are indicated by writing to this channel. The pusher reads - from it. + - FailedPushMap (STM TMVar) + - Failed pushes are indicated by writing to this TMVar. The push + - retrier blocks until they're available. -} module Assistant where @@ -89,10 +92,10 @@ startDaemon assistant foreground liftIO $ a $ do changechan <- newChangeChan commitchan <- newCommitChan - pushchan <- newFailedPushChan + pushmap <- newFailedPushMap _ <- forkIO $ commitThread st changechan commitchan - _ <- forkIO $ pushThread st commitchan pushchan - _ <- forkIO $ pushRetryThread st pushchan + _ <- forkIO $ pushThread st commitchan pushmap + _ <- forkIO $ pushRetryThread st pushmap _ <- forkIO $ mergeThread st _ <- forkIO $ daemonStatusThread st dstatus _ <- forkIO $ sanityCheckerThread st dstatus changechan diff --git a/Assistant/Pushes.hs b/Assistant/Pushes.hs index 61d2b798b3..f411dda07d 100644 --- a/Assistant/Pushes.hs +++ b/Assistant/Pushes.hs @@ -8,30 +8,39 @@ module Assistant.Pushes where import Common.Annex -import Control.Concurrent.SampleVar +import Control.Concurrent.STM import Data.Time.Clock import qualified Data.Map as M {- Track the most recent push failure for each remote. -} type PushMap = M.Map Remote UTCTime -type FailedPushes = SampleVar PushMap +type FailedPushMap = TMVar PushMap -newFailedPushChan :: IO FailedPushChan -newFailedPushChan = newEmptySampleVar +{- The TMVar starts empty, and is left empty when there are no + - failed pushes. This way we can block until there are some failed pushes. + -} +newFailedPushMap :: IO FailedPushMap +newFailedPushMap = atomically newEmptyTMVar -{- Gets all failed pushes. Blocks until set. -} -getFailedPushes :: FailedPushChan -> IO PushMap -getFailedPushes = readSampleVar +{- Blocks until there are failed pushes. + - Returns Remotes whose pushes failed a given time duration or more ago. + - (This may be an empty list.) -} +getFailedPushesBefore :: FailedPushMap -> NominalDiffTime -> IO [Remote] +getFailedPushesBefore v duration = do + m <- atomically $ readTMVar v + now <- getCurrentTime + return $ M.keys $ M.filter (not . toorecent now) m + where + toorecent now time = now `diffUTCTime` time < duration -{- Sets all failed pushes to passed PushMap -} -setFailedPushes :: FailedPushChan -> PushMap -> IO () -setFailedPushes = writeSampleVar - -{- Indicates a failure to push to a single remote. -} -failedPush :: FailedPushChan -> Remote -> IO () -failedPush c r = - -{- Indicates that a remote was pushed to successfully. -} -successfulPush :: FailedPushChan -> Remote -> IO () -successfulPush c r = +{- Modifies the map. -} +changeFailedPushMap :: FailedPushMap -> (PushMap -> PushMap) -> IO () +changeFailedPushMap v a = atomically $ + store . a . fromMaybe M.empty =<< tryTakeTMVar v + where + {- tryTakeTMVar empties the TMVar; refill it only if + - the modified map is not itself empty -} + store m + | m == M.empty = noop + | otherwise = putTMVar v $! m diff --git a/Assistant/Threads/Pusher.hs b/Assistant/Threads/Pusher.hs index 82c37de5f8..04d3435287 100644 --- a/Assistant/Threads/Pusher.hs +++ b/Assistant/Threads/Pusher.hs @@ -17,27 +17,23 @@ import Utility.ThreadScheduler import Utility.Parallel import Data.Time.Clock +import qualified Data.Map as M {- This thread retries pushes that failed before. -} -pushRetryThread :: ThreadState -> FailedPushChan -> IO () -pushRetryThread st pushchan = runEvery (Seconds halfhour) $ do +pushRetryThread :: ThreadState -> FailedPushMap -> IO () +pushRetryThread st pushmap = runEvery (Seconds halfhour) $ do -- We already waited half an hour, now wait until there are failed -- pushes to retry. - pushes <- getFailedPushes pushchan - -- Check times, to avoid repushing a push that's too new. - now <- getCurrentTime - let (newpushes, oldpushes) = partition (toorecent now . failedTimeStamp) pushes - unless (null newpushes) $ - refillFailedPushes pushchan newpushes - unless (null oldpushes) $ - pushToRemotes now st pushchan $ map failedRemote oldpushes + topush <- getFailedPushesBefore pushmap (fromIntegral halfhour) + unless (null topush) $ do + now <- getCurrentTime + pushToRemotes now st pushmap topush where halfhour = 1800 - toorecent now time = now `diffUTCTime` time < fromIntegral halfhour {- This thread pushes git commits out to remotes soon after they are made. -} -pushThread :: ThreadState -> CommitChan -> FailedPushChan -> IO () -pushThread st commitchan pushchan = do +pushThread :: ThreadState -> CommitChan -> FailedPushMap -> IO () +pushThread st commitchan pushmap = do remotes <- runThreadState st $ Command.Sync.syncRemotes [] runEvery (Seconds 2) $ do -- We already waited two seconds as a simple rate limiter. @@ -46,7 +42,7 @@ pushThread st commitchan pushchan = do -- Now see if now's a good time to push. now <- getCurrentTime if shouldPush now commits - then pushToRemotes now st pushchan remotes + then pushToRemotes now st pushmap remotes else refillCommits commitchan commits {- Decide if now is a good time to push to remotes. @@ -65,23 +61,27 @@ shouldPush _now commits - - Avoids running possibly long-duration commands in the Annex monad, so - as not to block other threads. -} -pushToRemotes :: UTCTime -> ThreadState -> FailedPushChan -> [Remote] -> IO () -pushToRemotes now st pushchan remotes = do +pushToRemotes :: UTCTime -> ThreadState -> FailedPushMap -> [Remote] -> IO () +pushToRemotes now st pushmap remotes = do (g, branch) <- runThreadState st $ (,) <$> fromRepo id <*> Command.Sync.currentBranch go True branch g remotes where go shouldretry branch g rs = do Command.Sync.updateBranch (Command.Sync.syncBranch branch) g - failed <- inParallel (push g branch) rs - unless (null failed) $ - if shouldretry - then retry branch g rs - else refillFailedPushes pushchan $ - map (`FailedPush` now) failed + (succeeded, failed) <- inParallel (push g branch) rs + changeFailedPushMap pushmap $ \m -> + M.union (makemap failed) $ + M.difference m (makemap succeeded) + unless (null failed || not shouldretry) $ + retry branch g failed + + makemap l = M.fromList $ zip l (repeat now) + push g branch remote = ifM (Command.Sync.pushBranch remote branch g) ( exitSuccess, exitFailure) + retry branch g rs = do runThreadState st $ manualPull branch rs go False branch g rs diff --git a/Utility/Parallel.hs b/Utility/Parallel.hs index 6e4671c057..9df95ab2b0 100644 --- a/Utility/Parallel.hs +++ b/Utility/Parallel.hs @@ -10,11 +10,13 @@ module Utility.Parallel where import Common {- Runs an action in parallel with a set of values. - - Returns values that caused the action to fail. -} -inParallel :: (v -> IO ()) -> [v] -> IO [v] + - Returns the values partitioned into ones with which the action succeeded, + - and ones with which it failed. -} +inParallel :: (v -> IO ()) -> [v] -> IO ([v], [v]) inParallel a l = do pids <- mapM (forkProcess . a) l statuses <- mapM (getProcessStatus True False) pids - return $ map fst $ filter (failed . snd) $ zip l statuses + return $ reduce $ partition (succeeded . snd) $ zip l statuses where - failed v = v /= Just (Exited ExitSuccess) + succeeded v = v == Just (Exited ExitSuccess) + reduce (x,y) = (map fst x, map fst y)