added a merger thread

Wow! I can create a file in repo a, and it instantly* shows up in repo b!

* under 1 second anyway
This commit is contained in:
Joey Hess 2012-06-22 17:01:08 -04:00
parent e9630e90de
commit e699ce1841
4 changed files with 112 additions and 26 deletions

View file

@ -22,11 +22,17 @@
- Thread 5: committer - Thread 5: committer
- Waits for changes to occur, and runs the git queue to update its - Waits for changes to occur, and runs the git queue to update its
- index, then commits. - index, then commits.
- Thread 6: syncer - Thread 6: pusher
- Waits for commits to be made, and syncs the git repo to remotes. - Waits for commits to be made, and pushes updated branches to remotes,
- Thread 7: status logger - in parallel. (Forks a process for each git push.)
- Thread 7: merger
- Waits for pushes to be received from remotes, and merges the
- updated branches into the current branch. This uses inotify
- on .git/refs/heads, so there are additional inotify threads
- associated with it, too.
- Thread 8: status logger
- Wakes up periodically and records the daemon's status to disk. - Wakes up periodically and records the daemon's status to disk.
- Thread 8: sanity checker - Thread 9: sanity checker
- Wakes up periodically (rarely) and does sanity checks. - Wakes up periodically (rarely) and does sanity checks.
- -
- ThreadState: (MVar) - ThreadState: (MVar)
@ -41,6 +47,9 @@
- ChangeChan: (STM TChan) - ChangeChan: (STM TChan)
- Changes are indicated by writing to this channel. The committer - Changes are indicated by writing to this channel. The committer
- reads from it. - reads from it.
- CommitChan: (STM TChan)
- Commits are indicated by writing to this channel. The pusher reads
- from it.
-} -}
module Assistant where module Assistant where
@ -52,7 +61,8 @@ import Assistant.Changes
import Assistant.Commits import Assistant.Commits
import Assistant.Watcher import Assistant.Watcher
import Assistant.Committer import Assistant.Committer
import Assistant.Syncer import Assistant.Pusher
import Assistant.Merger
import Assistant.SanityChecker import Assistant.SanityChecker
import qualified Utility.Daemon import qualified Utility.Daemon
import Utility.LogFile import Utility.LogFile
@ -76,7 +86,8 @@ startDaemon assistant foreground
changechan <- newChangeChan changechan <- newChangeChan
commitchan <- newCommitChan commitchan <- newCommitChan
_ <- forkIO $ commitThread st changechan commitchan _ <- forkIO $ commitThread st changechan commitchan
_ <- forkIO $ syncThread st commitchan _ <- forkIO $ pushThread st commitchan
_ <- forkIO $ mergeThread st
_ <- forkIO $ daemonStatusThread st dstatus _ <- forkIO $ daemonStatusThread st dstatus
_ <- forkIO $ sanityCheckerThread st dstatus changechan _ <- forkIO $ sanityCheckerThread st dstatus changechan
-- Does not return. -- Does not return.

72
Assistant/Merger.hs Normal file
View file

@ -0,0 +1,72 @@
{- git-annex assistant git merge thread
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-}
module Assistant.Merger where
import Common.Annex
import Assistant.ThreadedMonad
import Utility.DirWatcher
import Utility.Types.DirWatcher
import qualified Git
import qualified Git.Command
import qualified Git.Branch
import qualified Command.Sync
{- This thread watches for changes to .git/refs/heads/synced/*,
- which indicate incoming pushes. It merges those pushes into the
- currently checked out branch. -}
mergeThread :: ThreadState -> IO ()
mergeThread st = do
g <- runThreadState st $ fromRepo id
let dir = Git.localGitDir g </> "refs" </> "heads" </> "synced"
createDirectoryIfMissing True dir
let hook a = Just $ runHandler g a
let hooks = mkWatchHooks
{ addHook = hook onAdd
, errHook = hook onErr
}
watchDir dir (const False) hooks id
where
type Handler = Git.Repo -> FilePath -> Maybe FileStatus -> IO ()
{- Runs an action handler.
-
- Exceptions are ignored, otherwise a whole thread could be crashed.
-}
runHandler :: Git.Repo -> Handler -> FilePath -> Maybe FileStatus -> IO ()
runHandler g handler file filestatus = void $ do
either print (const noop) =<< tryIO go
where
go = handler g file filestatus
{- Called when there's an error with inotify. -}
onErr :: Handler
onErr _ msg _ = error msg
{- Called when a new branch ref is written.
-
- This relies on git's atomic method of updating branch ref files,
- which is to first write the new file to .lock, and then rename it
- over the old file. So, ignore .lock files, and the rename ensures
- the watcher sees a new file being added on each update.
-
- At startup, synthetic add events fire, causing this to run, but that's
- ok; it ensures that any changes pushed since the last time the assistant
- ran are merged in.
-}
onAdd :: Handler
onAdd g file _
| ".lock" `isSuffixOf` file = noop
| otherwise = do
let branch = Git.Ref $ "refs" </> "heads" </> takeFileName file
current <- Git.Branch.current g
print (branch, current)
when (Just branch == current) $
void $ mergeBranch branch g
mergeBranch :: Git.Ref -> Git.Repo -> IO Bool
mergeBranch branch = Git.Command.runBool "merge"
[Param $ show $ Command.Sync.syncBranch branch]

View file

@ -1,9 +1,9 @@
{- git-annex assistant git syncing thread {- git-annex assistant git pushing thread
- -
- Copyright 2012 Joey Hess <joey@kitenet.net> - Copyright 2012 Joey Hess <joey@kitenet.net>
-} -}
module Assistant.Syncer where module Assistant.Pusher where
import Common.Annex import Common.Annex
import Assistant.Commits import Assistant.Commits
@ -14,39 +14,39 @@ import Utility.Parallel
import Data.Time.Clock import Data.Time.Clock
data FailedSync = FailedSync data FailedPush = FailedPush
{ failedRemote :: Remote { failedRemote :: Remote
, failedTimeStamp :: UTCTime , failedTimeStamp :: UTCTime
} }
{- This thread syncs git commits out to remotes. -} {- This thread pushes git commits out to remotes. -}
syncThread :: ThreadState -> CommitChan -> IO () pushThread :: ThreadState -> CommitChan -> IO ()
syncThread st commitchan = do pushThread st commitchan = do
remotes <- runThreadState st $ Command.Sync.syncRemotes [] remotes <- runThreadState st $ Command.Sync.syncRemotes []
runEveryWith (Seconds 2) [] $ \failedsyncs -> do runEveryWith (Seconds 2) [] $ \failedpushes -> do
-- We already waited two seconds as a simple rate limiter. -- We already waited two seconds as a simple rate limiter.
-- Next, wait until at least one commit has been made -- Next, wait until at least one commit has been made
commits <- getCommits commitchan commits <- getCommits commitchan
-- Now see if now's a good time to sync. -- Now see if now's a good time to push.
time <- getCurrentTime time <- getCurrentTime
if shouldSync time commits failedsyncs if shouldPush time commits failedpushes
then syncToRemotes time st remotes then pushToRemotes time st remotes
else do else do
refillCommits commitchan commits refillCommits commitchan commits
return failedsyncs return failedpushes
{- Decide if now is a good time to sync to remotes. {- Decide if now is a good time to push to remotes.
- -
- Current strategy: Immediately sync all commits. The commit machinery - Current strategy: Immediately push all commits. The commit machinery
- already determines batches of changes, so we can't easily determine - already determines batches of changes, so we can't easily determine
- batches better. - batches better.
- -
- TODO: FailedSyncs are only retried the next time there's a commit. - TODO: FailedPushs are only retried the next time there's a commit.
- Should retry them periodically, or when a remote that was not available - Should retry them periodically, or when a remote that was not available
- becomes available. - becomes available.
-} -}
shouldSync :: UTCTime -> [Commit] -> [FailedSync] -> Bool shouldPush :: UTCTime -> [Commit] -> [FailedPush] -> Bool
shouldSync _now commits _failedremotes shouldPush _now commits _failedremotes
| not (null commits) = True | not (null commits) = True
| otherwise = False | otherwise = False
@ -55,13 +55,13 @@ shouldSync _now commits _failedremotes
- -
- Avoids running possibly long-duration commands in the Annex monad, so - Avoids running possibly long-duration commands in the Annex monad, so
- as not to block other threads. -} - as not to block other threads. -}
syncToRemotes :: UTCTime -> ThreadState -> [Remote] -> IO [FailedSync] pushToRemotes :: UTCTime -> ThreadState -> [Remote] -> IO [FailedPush]
syncToRemotes now st remotes = do pushToRemotes now st remotes = do
(g, branch) <- runThreadState st $ (g, branch) <- runThreadState st $
(,) <$> fromRepo id <*> Command.Sync.currentBranch (,) <$> fromRepo id <*> Command.Sync.currentBranch
Command.Sync.updateBranch (Command.Sync.syncBranch branch) g Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
map (`FailedSync` now) <$> inParallel (go g branch) remotes map (`FailedPush` now) <$> inParallel (push g branch) remotes
where where
go g branch remote = push g branch remote =
ifM (Command.Sync.pushBranch remote branch g) ifM (Command.Sync.pushBranch remote branch g)
( exitSuccess, exitFailure) ( exitSuccess, exitFailure)

View file

@ -20,3 +20,6 @@ data WatchHooks = WatchHooks
, delDirHook :: Hook FilePath , delDirHook :: Hook FilePath
, errHook :: Hook String -- error message , errHook :: Hook String -- error message
} }
mkWatchHooks :: WatchHooks
mkWatchHooks = WatchHooks Nothing Nothing Nothing Nothing Nothing