From e699ce18417729abbb9606f6a011628ad6616a64 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Fri, 22 Jun 2012 17:01:08 -0400 Subject: [PATCH] added a merger thread Wow! I can create a file in repo a, and it instantly* shows up in repo b! * under 1 second anyway --- Assistant.hs | 23 +++++++--- Assistant/Merger.hs | 72 ++++++++++++++++++++++++++++++ Assistant/{Syncer.hs => Pusher.hs} | 40 ++++++++--------- Utility/Types/DirWatcher.hs | 3 ++ 4 files changed, 112 insertions(+), 26 deletions(-) create mode 100644 Assistant/Merger.hs rename Assistant/{Syncer.hs => Pusher.hs} (58%) diff --git a/Assistant.hs b/Assistant.hs index 5a3fa5a9d4..ce230533cf 100644 --- a/Assistant.hs +++ b/Assistant.hs @@ -22,11 +22,17 @@ - Thread 5: committer - Waits for changes to occur, and runs the git queue to update its - index, then commits. - - Thread 6: syncer - - Waits for commits to be made, and syncs the git repo to remotes. - - Thread 7: status logger + - Thread 6: pusher + - Waits for commits to be made, and pushes updated branches to remotes, + - in parallel. (Forks a process for each git push.) + - Thread 7: merger + - Waits for pushes to be received from remotes, and merges the + - updated branches into the current branch. This uses inotify + - on .git/refs/heads, so there are additional inotify threads + - associated with it, too. + - Thread 8: status logger - Wakes up periodically and records the daemon's status to disk. - - Thread 8: sanity checker + - Thread 9: sanity checker - Wakes up periodically (rarely) and does sanity checks. - - ThreadState: (MVar) @@ -41,6 +47,9 @@ - ChangeChan: (STM TChan) - Changes are indicated by writing to this channel. The committer - reads from it. + - CommitChan: (STM TChan) + - Commits are indicated by writing to this channel. The pusher reads + - from it. -} module Assistant where @@ -52,7 +61,8 @@ import Assistant.Changes import Assistant.Commits import Assistant.Watcher import Assistant.Committer -import Assistant.Syncer +import Assistant.Pusher +import Assistant.Merger import Assistant.SanityChecker import qualified Utility.Daemon import Utility.LogFile @@ -76,7 +86,8 @@ startDaemon assistant foreground changechan <- newChangeChan commitchan <- newCommitChan _ <- forkIO $ commitThread st changechan commitchan - _ <- forkIO $ syncThread st commitchan + _ <- forkIO $ pushThread st commitchan + _ <- forkIO $ mergeThread st _ <- forkIO $ daemonStatusThread st dstatus _ <- forkIO $ sanityCheckerThread st dstatus changechan -- Does not return. diff --git a/Assistant/Merger.hs b/Assistant/Merger.hs new file mode 100644 index 0000000000..660636842b --- /dev/null +++ b/Assistant/Merger.hs @@ -0,0 +1,72 @@ +{- git-annex assistant git merge thread + - + - Copyright 2012 Joey Hess + -} + +module Assistant.Merger where + +import Common.Annex +import Assistant.ThreadedMonad +import Utility.DirWatcher +import Utility.Types.DirWatcher +import qualified Git +import qualified Git.Command +import qualified Git.Branch +import qualified Command.Sync + +{- This thread watches for changes to .git/refs/heads/synced/*, + - which indicate incoming pushes. It merges those pushes into the + - currently checked out branch. -} +mergeThread :: ThreadState -> IO () +mergeThread st = do + g <- runThreadState st $ fromRepo id + let dir = Git.localGitDir g "refs" "heads" "synced" + createDirectoryIfMissing True dir + let hook a = Just $ runHandler g a + let hooks = mkWatchHooks + { addHook = hook onAdd + , errHook = hook onErr + } + watchDir dir (const False) hooks id + where + +type Handler = Git.Repo -> FilePath -> Maybe FileStatus -> IO () + +{- Runs an action handler. + - + - Exceptions are ignored, otherwise a whole thread could be crashed. + -} +runHandler :: Git.Repo -> Handler -> FilePath -> Maybe FileStatus -> IO () +runHandler g handler file filestatus = void $ do + either print (const noop) =<< tryIO go + where + go = handler g file filestatus + +{- Called when there's an error with inotify. -} +onErr :: Handler +onErr _ msg _ = error msg + +{- Called when a new branch ref is written. + - + - This relies on git's atomic method of updating branch ref files, + - which is to first write the new file to .lock, and then rename it + - over the old file. So, ignore .lock files, and the rename ensures + - the watcher sees a new file being added on each update. + - + - At startup, synthetic add events fire, causing this to run, but that's + - ok; it ensures that any changes pushed since the last time the assistant + - ran are merged in. + -} +onAdd :: Handler +onAdd g file _ + | ".lock" `isSuffixOf` file = noop + | otherwise = do + let branch = Git.Ref $ "refs" "heads" takeFileName file + current <- Git.Branch.current g + print (branch, current) + when (Just branch == current) $ + void $ mergeBranch branch g + +mergeBranch :: Git.Ref -> Git.Repo -> IO Bool +mergeBranch branch = Git.Command.runBool "merge" + [Param $ show $ Command.Sync.syncBranch branch] diff --git a/Assistant/Syncer.hs b/Assistant/Pusher.hs similarity index 58% rename from Assistant/Syncer.hs rename to Assistant/Pusher.hs index c579c1c280..119575b92a 100644 --- a/Assistant/Syncer.hs +++ b/Assistant/Pusher.hs @@ -1,9 +1,9 @@ -{- git-annex assistant git syncing thread +{- git-annex assistant git pushing thread - - Copyright 2012 Joey Hess -} -module Assistant.Syncer where +module Assistant.Pusher where import Common.Annex import Assistant.Commits @@ -14,39 +14,39 @@ import Utility.Parallel import Data.Time.Clock -data FailedSync = FailedSync +data FailedPush = FailedPush { failedRemote :: Remote , failedTimeStamp :: UTCTime } -{- This thread syncs git commits out to remotes. -} -syncThread :: ThreadState -> CommitChan -> IO () -syncThread st commitchan = do +{- This thread pushes git commits out to remotes. -} +pushThread :: ThreadState -> CommitChan -> IO () +pushThread st commitchan = do remotes <- runThreadState st $ Command.Sync.syncRemotes [] - runEveryWith (Seconds 2) [] $ \failedsyncs -> do + runEveryWith (Seconds 2) [] $ \failedpushes -> do -- We already waited two seconds as a simple rate limiter. -- Next, wait until at least one commit has been made commits <- getCommits commitchan - -- Now see if now's a good time to sync. + -- Now see if now's a good time to push. time <- getCurrentTime - if shouldSync time commits failedsyncs - then syncToRemotes time st remotes + if shouldPush time commits failedpushes + then pushToRemotes time st remotes else do refillCommits commitchan commits - return failedsyncs + return failedpushes -{- Decide if now is a good time to sync to remotes. +{- Decide if now is a good time to push to remotes. - - - Current strategy: Immediately sync all commits. The commit machinery + - Current strategy: Immediately push all commits. The commit machinery - already determines batches of changes, so we can't easily determine - batches better. - - - TODO: FailedSyncs are only retried the next time there's a commit. + - TODO: FailedPushs are only retried the next time there's a commit. - Should retry them periodically, or when a remote that was not available - becomes available. -} -shouldSync :: UTCTime -> [Commit] -> [FailedSync] -> Bool -shouldSync _now commits _failedremotes +shouldPush :: UTCTime -> [Commit] -> [FailedPush] -> Bool +shouldPush _now commits _failedremotes | not (null commits) = True | otherwise = False @@ -55,13 +55,13 @@ shouldSync _now commits _failedremotes - - Avoids running possibly long-duration commands in the Annex monad, so - as not to block other threads. -} -syncToRemotes :: UTCTime -> ThreadState -> [Remote] -> IO [FailedSync] -syncToRemotes now st remotes = do +pushToRemotes :: UTCTime -> ThreadState -> [Remote] -> IO [FailedPush] +pushToRemotes now st remotes = do (g, branch) <- runThreadState st $ (,) <$> fromRepo id <*> Command.Sync.currentBranch Command.Sync.updateBranch (Command.Sync.syncBranch branch) g - map (`FailedSync` now) <$> inParallel (go g branch) remotes + map (`FailedPush` now) <$> inParallel (push g branch) remotes where - go g branch remote = + push g branch remote = ifM (Command.Sync.pushBranch remote branch g) ( exitSuccess, exitFailure) diff --git a/Utility/Types/DirWatcher.hs b/Utility/Types/DirWatcher.hs index c828a05938..ba7eae6a16 100644 --- a/Utility/Types/DirWatcher.hs +++ b/Utility/Types/DirWatcher.hs @@ -20,3 +20,6 @@ data WatchHooks = WatchHooks , delDirHook :: Hook FilePath , errHook :: Hook String -- error message } + +mkWatchHooks :: WatchHooks +mkWatchHooks = WatchHooks Nothing Nothing Nothing Nothing Nothing