add a push retry thread

This commit is contained in:
Joey Hess 2012-06-25 16:38:12 -04:00
parent 0b146f9ecc
commit 5cfe91f06d
5 changed files with 82 additions and 36 deletions

View file

@ -25,14 +25,17 @@
- Thread 6: pusher - Thread 6: pusher
- Waits for commits to be made, and pushes updated branches to remotes, - Waits for commits to be made, and pushes updated branches to remotes,
- in parallel. (Forks a process for each git push.) - in parallel. (Forks a process for each git push.)
- Thread 7: merger - Thread 7: push retryer
- Runs every 30 minutes when there are failed pushes, and retries
- them.
- Thread 8: merger
- Waits for pushes to be received from remotes, and merges the - Waits for pushes to be received from remotes, and merges the
- updated branches into the current branch. This uses inotify - updated branches into the current branch. This uses inotify
- on .git/refs/heads, so there are additional inotify threads - on .git/refs/heads, so there are additional inotify threads
- associated with it, too. - associated with it, too.
- Thread 8: status logger - Thread 9: status logger
- Wakes up periodically and records the daemon's status to disk. - Wakes up periodically and records the daemon's status to disk.
- Thread 9: sanity checker - Thread 10: sanity checker
- Wakes up periodically (rarely) and does sanity checks. - Wakes up periodically (rarely) and does sanity checks.
- -
- ThreadState: (MVar) - ThreadState: (MVar)
@ -59,6 +62,7 @@ import Assistant.ThreadedMonad
import Assistant.DaemonStatus import Assistant.DaemonStatus
import Assistant.Changes import Assistant.Changes
import Assistant.Commits import Assistant.Commits
import Assistant.Pushes
import Assistant.Threads.Watcher import Assistant.Threads.Watcher
import Assistant.Threads.Committer import Assistant.Threads.Committer
import Assistant.Threads.Pusher import Assistant.Threads.Pusher
@ -85,8 +89,10 @@ startDaemon assistant foreground
liftIO $ a $ do liftIO $ a $ do
changechan <- newChangeChan changechan <- newChangeChan
commitchan <- newCommitChan commitchan <- newCommitChan
pushchan <- newFailedPushChan
_ <- forkIO $ commitThread st changechan commitchan _ <- forkIO $ commitThread st changechan commitchan
_ <- forkIO $ pushThread st commitchan _ <- forkIO $ pushThread st commitchan pushchan
_ <- forkIO $ pushRetryThread st pushchan
_ <- forkIO $ mergeThread st _ <- forkIO $ mergeThread st
_ <- forkIO $ daemonStatusThread st dstatus _ <- forkIO $ daemonStatusThread st dstatus
_ <- forkIO $ sanityCheckerThread st dstatus changechan _ <- forkIO $ sanityCheckerThread st dstatus changechan

36
Assistant/Pushes.hs Normal file
View file

@ -0,0 +1,36 @@
{- git-annex assistant push tracking
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Assistant.Pushes where
import Common.Annex
import Utility.TSet
import Data.Time.Clock
type FailedPushChan = TSet FailedPush
data FailedPush = FailedPush
{ failedRemote :: Remote
, failedTimeStamp :: UTCTime
}
newFailedPushChan :: IO FailedPushChan
newFailedPushChan = newTSet
{- Gets all failed pushes. Blocks until there is at least one failed push. -}
getFailedPushes :: FailedPushChan -> IO [FailedPush]
getFailedPushes = getTSet
{- Puts failed pushes back into the channel.
- Note: Original order is not preserved. -}
refillFailedPushes :: FailedPushChan -> [FailedPush] -> IO ()
refillFailedPushes = putTSet
{- Records a failed push in the channel. -}
recordFailedPush :: FailedPushChan -> FailedPush -> IO ()
recordFailedPush = putTSet1

View file

@ -1,4 +1,4 @@
{- git-annex assistant git pushing thread {- git-annex assistant git pushing threads
- -
- Copyright 2012 Joey Hess <joey@kitenet.net> - Copyright 2012 Joey Hess <joey@kitenet.net>
- -
@ -9,6 +9,7 @@ module Assistant.Threads.Pusher where
import Common.Annex import Common.Annex
import Assistant.Commits import Assistant.Commits
import Assistant.Pushes
import Assistant.ThreadedMonad import Assistant.ThreadedMonad
import qualified Command.Sync import qualified Command.Sync
import Utility.ThreadScheduler import Utility.ThreadScheduler
@ -16,39 +17,45 @@ import Utility.Parallel
import Data.Time.Clock import Data.Time.Clock
data FailedPush = FailedPush {- This thread retries pushes that failed before. -}
{ failedRemote :: Remote pushRetryThread :: ThreadState -> FailedPushChan -> IO ()
, failedTimeStamp :: UTCTime pushRetryThread st pushchan = runEvery (Seconds halfhour) $ do
} -- We already waited half an hour, now wait until there are failed
-- pushes to retry.
pushes <- getFailedPushes pushchan
-- Check times, to avoid repushing a push that's too new.
now <- getCurrentTime
let (newpushes, oldpushes) = partition (toorecent now . failedTimeStamp) pushes
unless (null newpushes) $
refillFailedPushes pushchan newpushes
unless (null oldpushes) $
pushToRemotes now st pushchan $ map failedRemote oldpushes
where
halfhour = 1800
toorecent now time = now `diffUTCTime` time < fromIntegral halfhour
{- This thread pushes git commits out to remotes. -} {- This thread pushes git commits out to remotes soon after they are made. -}
pushThread :: ThreadState -> CommitChan -> IO () pushThread :: ThreadState -> CommitChan -> FailedPushChan -> IO ()
pushThread st commitchan = do pushThread st commitchan pushchan = do
remotes <- runThreadState st $ Command.Sync.syncRemotes [] remotes <- runThreadState st $ Command.Sync.syncRemotes []
runEveryWith (Seconds 2) [] $ \failedpushes -> do runEvery (Seconds 2) $ do
-- We already waited two seconds as a simple rate limiter. -- We already waited two seconds as a simple rate limiter.
-- Next, wait until at least one commit has been made -- Next, wait until at least one commit has been made
commits <- getCommits commitchan commits <- getCommits commitchan
-- Now see if now's a good time to push. -- Now see if now's a good time to push.
time <- getCurrentTime now <- getCurrentTime
if shouldPush time commits failedpushes if shouldPush now commits
then pushToRemotes time st remotes then pushToRemotes now st pushchan remotes
else do else refillCommits commitchan commits
refillCommits commitchan commits
return failedpushes
{- Decide if now is a good time to push to remotes. {- Decide if now is a good time to push to remotes.
- -
- Current strategy: Immediately push all commits. The commit machinery - Current strategy: Immediately push all commits. The commit machinery
- already determines batches of changes, so we can't easily determine - already determines batches of changes, so we can't easily determine
- batches better. - batches better.
-
- TODO: FailedPushs are only retried the next time there's a commit.
- Should retry them periodically, or when a remote that was not available
- becomes available.
-} -}
shouldPush :: UTCTime -> [Commit] -> [FailedPush] -> Bool shouldPush :: UTCTime -> [Commit] -> Bool
shouldPush _now commits _failedremotes shouldPush _now commits
| not (null commits) = True | not (null commits) = True
| otherwise = False | otherwise = False
@ -57,12 +64,14 @@ shouldPush _now commits _failedremotes
- -
- Avoids running possibly long-duration commands in the Annex monad, so - Avoids running possibly long-duration commands in the Annex monad, so
- as not to block other threads. -} - as not to block other threads. -}
pushToRemotes :: UTCTime -> ThreadState -> [Remote] -> IO [FailedPush] pushToRemotes :: UTCTime -> ThreadState -> FailedPushChan -> [Remote] -> IO ()
pushToRemotes now st remotes = do pushToRemotes now st pushchan remotes = do
(g, branch) <- runThreadState st $ (g, branch) <- runThreadState st $
(,) <$> fromRepo id <*> Command.Sync.currentBranch (,) <$> fromRepo id <*> Command.Sync.currentBranch
Command.Sync.updateBranch (Command.Sync.syncBranch branch) g Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
map (`FailedPush` now) <$> inParallel (push g branch) remotes failed <- map (`FailedPush` now) <$> inParallel (push g branch) remotes
unless (null failed) $
refillFailedPushes pushchan failed
where where
push g branch remote = push g branch remote =
ifM (Command.Sync.pushBranch remote branch g) ifM (Command.Sync.pushBranch remote branch g)

View file

@ -24,12 +24,6 @@ runEvery n a = forever $ do
threadDelaySeconds n threadDelaySeconds n
a a
runEveryWith :: Seconds -> a -> (a -> IO a) -> IO ()
runEveryWith n val a = do
threadDelaySeconds n
val' <- a val
runEveryWith n val' a
threadDelaySeconds :: Seconds -> IO () threadDelaySeconds :: Seconds -> IO ()
threadDelaySeconds (Seconds n) = unboundDelay (fromIntegral n * oneSecond) threadDelaySeconds (Seconds n) = unboundDelay (fromIntegral n * oneSecond)
where where

View file

@ -13,8 +13,9 @@ all the other git clones, at both the git level and the key/value level.
[The watching can be done with the existing inotify code! This avoids needing [The watching can be done with the existing inotify code! This avoids needing
any special mechanism to notify a remote that it's been synced to.] any special mechanism to notify a remote that it's been synced to.]
**done** **done**
1. Periodically retry pushes that failed. Also, detect if a push failed 1. Periodically retry pushes that failed. **done** (every half an hour)
due to not being up-to-date, pull, and repush. 1. Also, detect if a push failed due to not being up-to-date, pull,
and repush.
2. Use a git merge driver that adds both conflicting files, 2. Use a git merge driver that adds both conflicting files,
so conflicts never break a sync. so conflicts never break a sync.
3. Investigate the XMPP approach like dvcs-autosync does, or other ways of 3. Investigate the XMPP approach like dvcs-autosync does, or other ways of