the syncer now pushes out changes to remotes, in parallel

Note that, since this always pushes branch synced/master to the remote, it
assumes that master has already gotten all the commits that are on the
remote merged in. Otherwise, fast-forward prevention may prevent the push.

That's probably ok, because the next stage is to automatically detect
incoming pushes and merge.
This commit is contained in:
Joey Hess 2012-06-22 15:46:21 -04:00
parent 28e28bc043
commit e9630e90de
4 changed files with 105 additions and 28 deletions

View file

@ -5,25 +5,63 @@
module Assistant.Syncer where
import Common.Annex
import Assistant.Commits
import Assistant.ThreadedMonad
import qualified Command.Sync
import Utility.ThreadScheduler
import Utility.Parallel
import Data.Time.Clock
data FailedSync = FailedSync
{ failedRemote :: Remote
, failedTimeStamp :: UTCTime
}
{- This thread syncs git commits out to remotes. -}
syncThread :: ThreadState -> CommitChan -> IO ()
syncThread st commitchan = runEvery (Seconds 2) $ do
syncThread st commitchan = do
remotes <- runThreadState st $ Command.Sync.syncRemotes []
runEveryWith (Seconds 2) [] $ \failedsyncs -> do
-- We already waited two seconds as a simple rate limiter.
-- Next, wait until at least one commit has been made
commits <- getCommits commitchan
-- Now see if now's a good time to sync.
if shouldSync commits
then syncToRemotes
else refillCommits commitchan commits
time <- getCurrentTime
if shouldSync time commits failedsyncs
then syncToRemotes time st remotes
else do
refillCommits commitchan commits
return failedsyncs
{- Decide if now is a good time to sync commits to remotes. -}
shouldSync :: [Commit] -> Bool
shouldSync commits = not (null commits)
{- Decide if now is a good time to sync to remotes.
-
- Current strategy: Immediately sync all commits. The commit machinery
- already determines batches of changes, so we can't easily determine
- batches better.
-
- TODO: FailedSyncs are only retried the next time there's a commit.
- Should retry them periodically, or when a remote that was not available
- becomes available.
-}
shouldSync :: UTCTime -> [Commit] -> [FailedSync] -> Bool
shouldSync _now commits _failedremotes
| not (null commits) = True
| otherwise = False
syncToRemotes :: IO ()
syncToRemotes = return () -- TOOD
{- Updates the local sync branch, then pushes it to all remotes, in
- parallel.
-
- Avoids running possibly long-duration commands in the Annex monad, so
- as not to block other threads. -}
syncToRemotes :: UTCTime -> ThreadState -> [Remote] -> IO [FailedSync]
syncToRemotes now st remotes = do
(g, branch) <- runThreadState st $
(,) <$> fromRepo id <*> Command.Sync.currentBranch
Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
map (`FailedSync` now) <$> inParallel (go g branch) remotes
where
go g branch remote =
ifM (Command.Sync.pushBranch remote branch g)
( exitSuccess, exitFailure)

View file

@ -31,7 +31,7 @@ def = [command "sync" (paramOptional (paramRepeating paramRemote))
-- syncing involves several operations, any of which can independently fail
seek :: CommandSeek
seek rs = do
!branch <- fromMaybe nobranch <$> inRepo Git.Branch.current
branch <- currentBranch
remotes <- syncRemotes rs
return $ concat
[ [ commit ]
@ -41,6 +41,11 @@ seek rs = do
, [ pushLocal branch ]
, [ pushRemote remote branch | remote <- remotes ]
]
currentBranch :: Annex Git.Ref
currentBranch = do
!branch <- fromMaybe nobranch <$> inRepo Git.Branch.current
return branch
where
nobranch = error "no branch is checked out"
@ -90,7 +95,7 @@ mergeLocal branch = go =<< needmerge
syncbranch = syncBranch branch
needmerge = do
unlessM (inRepo $ Git.Ref.exists syncbranch) $
updateBranch syncbranch
inRepo $ updateBranch syncbranch
inRepo $ Git.Branch.changed branch syncbranch
go False = stop
go True = do
@ -99,17 +104,17 @@ mergeLocal branch = go =<< needmerge
pushLocal :: Git.Ref -> CommandStart
pushLocal branch = do
updateBranch $ syncBranch branch
inRepo $ updateBranch $ syncBranch branch
stop
updateBranch :: Git.Ref -> Annex ()
updateBranch syncbranch =
updateBranch :: Git.Ref -> Git.Repo -> IO ()
updateBranch syncbranch g =
unlessM go $ error $ "failed to update " ++ show syncbranch
where
go = inRepo $ Git.Command.runBool "branch"
go = Git.Command.runBool "branch"
[ Param "-f"
, Param $ show $ Git.Ref.base syncbranch
]
] g
pullRemote :: Remote -> Git.Ref -> CommandStart
pullRemote remote branch = do
@ -135,19 +140,27 @@ mergeRemote remote branch = all id <$> (mapM merge =<< tomerge)
pushRemote :: Remote -> Git.Ref -> CommandStart
pushRemote remote branch = go =<< needpush
where
needpush = anyM (newer remote) [syncbranch, Annex.Branch.name]
needpush = anyM (newer remote) [syncBranch branch, Annex.Branch.name]
go False = stop
go True = do
showStart "push" (Remote.name remote)
next $ next $ do
showOutput
inRepo $ Git.Command.runBool "push"
inRepo $ pushBranch remote branch
pushBranch :: Remote -> Git.Ref -> Git.Repo -> IO Bool
pushBranch remote branch g =
Git.Command.runBool "push"
[ Param (Remote.name remote)
, Param (show Annex.Branch.name)
, Param refspec
] g
where
refspec = concat
[ show $ Git.Ref.base branch
, ":"
, show $ Git.Ref.base $ syncBranch branch
]
refspec = show (Git.Ref.base branch) ++ ":" ++ show (Git.Ref.base syncbranch)
syncbranch = syncBranch branch
mergeAnnex :: CommandStart
mergeAnnex = do

20
Utility/Parallel.hs Normal file
View file

@ -0,0 +1,20 @@
{- parallel processes
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-
- Licensed under the GNU GPL version 3 or higher.
-}
module Utility.Parallel where
import Common
{- Runs an action in parallel with a set of values.
- Returns values that caused the action to fail. -}
inParallel :: (v -> IO ()) -> [v] -> IO [v]
inParallel a v = do
pids <- mapM (forkProcess . a) v
statuses <- mapM (getProcessStatus True False) pids
return $ map fst $ filter failed $ zip v statuses
where
failed (_, status) = status /= Just (Exited ExitSuccess)

View file

@ -24,6 +24,12 @@ runEvery n a = forever $ do
threadDelaySeconds n
a
runEveryWith :: Seconds -> a -> (a -> IO a) -> IO ()
runEveryWith n val a = do
threadDelaySeconds n
val' <- a val
runEveryWith n val' a
threadDelaySeconds :: Seconds -> IO ()
threadDelaySeconds (Seconds n) = unboundDelay (fromIntegral n * oneSecond)
where