use a TMVar

SampleMVar won't work; between getting the current value and changing
it, another thread could made a change, which would get lost.

TMVar works well; this update situation is handled by atomic transactions.
This commit is contained in:
Joey Hess 2012-06-26 17:33:34 -04:00
parent e0a65247ae
commit 67c8ef7de2
4 changed files with 61 additions and 47 deletions

View file

@ -53,6 +53,9 @@
- CommitChan: (STM TChan) - CommitChan: (STM TChan)
- Commits are indicated by writing to this channel. The pusher reads - Commits are indicated by writing to this channel. The pusher reads
- from it. - from it.
- FailedPushMap (STM TMVar)
- Failed pushes are indicated by writing to this TMVar. The push
- retrier blocks until they're available.
-} -}
module Assistant where module Assistant where
@ -89,10 +92,10 @@ startDaemon assistant foreground
liftIO $ a $ do liftIO $ a $ do
changechan <- newChangeChan changechan <- newChangeChan
commitchan <- newCommitChan commitchan <- newCommitChan
pushchan <- newFailedPushChan pushmap <- newFailedPushMap
_ <- forkIO $ commitThread st changechan commitchan _ <- forkIO $ commitThread st changechan commitchan
_ <- forkIO $ pushThread st commitchan pushchan _ <- forkIO $ pushThread st commitchan pushmap
_ <- forkIO $ pushRetryThread st pushchan _ <- forkIO $ pushRetryThread st pushmap
_ <- forkIO $ mergeThread st _ <- forkIO $ mergeThread st
_ <- forkIO $ daemonStatusThread st dstatus _ <- forkIO $ daemonStatusThread st dstatus
_ <- forkIO $ sanityCheckerThread st dstatus changechan _ <- forkIO $ sanityCheckerThread st dstatus changechan

View file

@ -8,30 +8,39 @@
module Assistant.Pushes where module Assistant.Pushes where
import Common.Annex import Common.Annex
import Control.Concurrent.SampleVar
import Control.Concurrent.STM
import Data.Time.Clock import Data.Time.Clock
import qualified Data.Map as M import qualified Data.Map as M
{- Track the most recent push failure for each remote. -} {- Track the most recent push failure for each remote. -}
type PushMap = M.Map Remote UTCTime type PushMap = M.Map Remote UTCTime
type FailedPushes = SampleVar PushMap type FailedPushMap = TMVar PushMap
newFailedPushChan :: IO FailedPushChan {- The TMVar starts empty, and is left empty when there are no
newFailedPushChan = newEmptySampleVar - failed pushes. This way we can block until there are some failed pushes.
-}
newFailedPushMap :: IO FailedPushMap
newFailedPushMap = atomically newEmptyTMVar
{- Gets all failed pushes. Blocks until set. -} {- Blocks until there are failed pushes.
getFailedPushes :: FailedPushChan -> IO PushMap - Returns Remotes whose pushes failed a given time duration or more ago.
getFailedPushes = readSampleVar - (This may be an empty list.) -}
getFailedPushesBefore :: FailedPushMap -> NominalDiffTime -> IO [Remote]
getFailedPushesBefore v duration = do
m <- atomically $ readTMVar v
now <- getCurrentTime
return $ M.keys $ M.filter (not . toorecent now) m
where
toorecent now time = now `diffUTCTime` time < duration
{- Sets all failed pushes to passed PushMap -} {- Modifies the map. -}
setFailedPushes :: FailedPushChan -> PushMap -> IO () changeFailedPushMap :: FailedPushMap -> (PushMap -> PushMap) -> IO ()
setFailedPushes = writeSampleVar changeFailedPushMap v a = atomically $
store . a . fromMaybe M.empty =<< tryTakeTMVar v
{- Indicates a failure to push to a single remote. -} where
failedPush :: FailedPushChan -> Remote -> IO () {- tryTakeTMVar empties the TMVar; refill it only if
failedPush c r = - the modified map is not itself empty -}
store m
{- Indicates that a remote was pushed to successfully. -} | m == M.empty = noop
successfulPush :: FailedPushChan -> Remote -> IO () | otherwise = putTMVar v $! m
successfulPush c r =

View file

@ -17,27 +17,23 @@ import Utility.ThreadScheduler
import Utility.Parallel import Utility.Parallel
import Data.Time.Clock import Data.Time.Clock
import qualified Data.Map as M
{- This thread retries pushes that failed before. -} {- This thread retries pushes that failed before. -}
pushRetryThread :: ThreadState -> FailedPushChan -> IO () pushRetryThread :: ThreadState -> FailedPushMap -> IO ()
pushRetryThread st pushchan = runEvery (Seconds halfhour) $ do pushRetryThread st pushmap = runEvery (Seconds halfhour) $ do
-- We already waited half an hour, now wait until there are failed -- We already waited half an hour, now wait until there are failed
-- pushes to retry. -- pushes to retry.
pushes <- getFailedPushes pushchan topush <- getFailedPushesBefore pushmap (fromIntegral halfhour)
-- Check times, to avoid repushing a push that's too new. unless (null topush) $ do
now <- getCurrentTime now <- getCurrentTime
let (newpushes, oldpushes) = partition (toorecent now . failedTimeStamp) pushes pushToRemotes now st pushmap topush
unless (null newpushes) $
refillFailedPushes pushchan newpushes
unless (null oldpushes) $
pushToRemotes now st pushchan $ map failedRemote oldpushes
where where
halfhour = 1800 halfhour = 1800
toorecent now time = now `diffUTCTime` time < fromIntegral halfhour
{- This thread pushes git commits out to remotes soon after they are made. -} {- This thread pushes git commits out to remotes soon after they are made. -}
pushThread :: ThreadState -> CommitChan -> FailedPushChan -> IO () pushThread :: ThreadState -> CommitChan -> FailedPushMap -> IO ()
pushThread st commitchan pushchan = do pushThread st commitchan pushmap = do
remotes <- runThreadState st $ Command.Sync.syncRemotes [] remotes <- runThreadState st $ Command.Sync.syncRemotes []
runEvery (Seconds 2) $ do runEvery (Seconds 2) $ do
-- We already waited two seconds as a simple rate limiter. -- We already waited two seconds as a simple rate limiter.
@ -46,7 +42,7 @@ pushThread st commitchan pushchan = do
-- Now see if now's a good time to push. -- Now see if now's a good time to push.
now <- getCurrentTime now <- getCurrentTime
if shouldPush now commits if shouldPush now commits
then pushToRemotes now st pushchan remotes then pushToRemotes now st pushmap remotes
else refillCommits commitchan commits else refillCommits commitchan commits
{- Decide if now is a good time to push to remotes. {- Decide if now is a good time to push to remotes.
@ -65,23 +61,27 @@ shouldPush _now commits
- -
- Avoids running possibly long-duration commands in the Annex monad, so - Avoids running possibly long-duration commands in the Annex monad, so
- as not to block other threads. -} - as not to block other threads. -}
pushToRemotes :: UTCTime -> ThreadState -> FailedPushChan -> [Remote] -> IO () pushToRemotes :: UTCTime -> ThreadState -> FailedPushMap -> [Remote] -> IO ()
pushToRemotes now st pushchan remotes = do pushToRemotes now st pushmap remotes = do
(g, branch) <- runThreadState st $ (g, branch) <- runThreadState st $
(,) <$> fromRepo id <*> Command.Sync.currentBranch (,) <$> fromRepo id <*> Command.Sync.currentBranch
go True branch g remotes go True branch g remotes
where where
go shouldretry branch g rs = do go shouldretry branch g rs = do
Command.Sync.updateBranch (Command.Sync.syncBranch branch) g Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
failed <- inParallel (push g branch) rs (succeeded, failed) <- inParallel (push g branch) rs
unless (null failed) $ changeFailedPushMap pushmap $ \m ->
if shouldretry M.union (makemap failed) $
then retry branch g rs M.difference m (makemap succeeded)
else refillFailedPushes pushchan $ unless (null failed || not shouldretry) $
map (`FailedPush` now) failed retry branch g failed
makemap l = M.fromList $ zip l (repeat now)
push g branch remote = push g branch remote =
ifM (Command.Sync.pushBranch remote branch g) ifM (Command.Sync.pushBranch remote branch g)
( exitSuccess, exitFailure) ( exitSuccess, exitFailure)
retry branch g rs = do retry branch g rs = do
runThreadState st $ manualPull branch rs runThreadState st $ manualPull branch rs
go False branch g rs go False branch g rs

View file

@ -10,11 +10,13 @@ module Utility.Parallel where
import Common import Common
{- Runs an action in parallel with a set of values. {- Runs an action in parallel with a set of values.
- Returns values that caused the action to fail. -} - Returns the values partitioned into ones with which the action succeeded,
inParallel :: (v -> IO ()) -> [v] -> IO [v] - and ones with which it failed. -}
inParallel :: (v -> IO ()) -> [v] -> IO ([v], [v])
inParallel a l = do inParallel a l = do
pids <- mapM (forkProcess . a) l pids <- mapM (forkProcess . a) l
statuses <- mapM (getProcessStatus True False) pids statuses <- mapM (getProcessStatus True False) pids
return $ map fst $ filter (failed . snd) $ zip l statuses return $ reduce $ partition (succeeded . snd) $ zip l statuses
where where
failed v = v /= Just (Exited ExitSuccess) succeeded v = v == Just (Exited ExitSuccess)
reduce (x,y) = (map fst x, map fst y)