stub syncer thread and commit channel

This commit is contained in:
Joey Hess 2012-06-22 13:39:44 -04:00
parent 3ee44cf8fe
commit 28e28bc043
8 changed files with 125 additions and 30 deletions

View file

@ -22,9 +22,11 @@
- Thread 5: committer
- Waits for changes to occur, and runs the git queue to update its
- index, then commits.
- Thread 6: status logger
- Thread 6: syncer
- Waits for commits to be made, and syncs the git repo to remotes.
- Thread 7: status logger
- Wakes up periodically and records the daemon's status to disk.
- Thread 7: sanity checker
- Thread 8: sanity checker
- Wakes up periodically (rarely) and does sanity checks.
-
- ThreadState: (MVar)
@ -47,8 +49,10 @@ import Common.Annex
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.Changes
import Assistant.Commits
import Assistant.Watcher
import Assistant.Committer
import Assistant.Syncer
import Assistant.SanityChecker
import qualified Utility.Daemon
import Utility.LogFile
@ -70,12 +74,9 @@ startDaemon assistant foreground
dstatus <- startDaemonStatus
liftIO $ a $ do
changechan <- newChangeChan
-- The commit thread is started early,
-- so that the user can immediately
-- begin adding files and having them
-- committed, even while the startup scan
-- is taking place.
_ <- forkIO $ commitThread st changechan
commitchan <- newCommitChan
_ <- forkIO $ commitThread st changechan commitchan
_ <- forkIO $ syncThread st commitchan
_ <- forkIO $ daemonStatusThread st dstatus
_ <- forkIO $ sanityCheckerThread st dstatus changechan
-- Does not return.

View file

@ -8,14 +8,14 @@ module Assistant.Changes where
import Common.Annex
import qualified Annex.Queue
import Types.KeySource
import Utility.TSet
import Control.Concurrent.STM
import Data.Time.Clock
data ChangeType = AddChange | LinkChange | RmChange | RmDirChange
deriving (Show, Eq)
type ChangeChan = TChan Change
type ChangeChan = TSet Change
data Change
= Change
@ -29,11 +29,8 @@ data Change
}
deriving (Show)
runChangeChan :: STM a -> IO a
runChangeChan = atomically
newChangeChan :: IO ChangeChan
newChangeChan = atomically newTChan
newChangeChan = newTSet
{- Handlers call this when they made a change that needs to get committed. -}
madeChange :: FilePath -> ChangeType -> Annex (Maybe Change)
@ -65,17 +62,13 @@ finishedChange c = c
{- Gets all unhandled changes.
- Blocks until at least one change is made. -}
getChanges :: ChangeChan -> IO [Change]
getChanges chan = runChangeChan $ do
c <- readTChan chan
go [c]
where
go l = do
v <- tryReadTChan chan
case v of
Nothing -> return l
Just c -> go (c:l)
getChanges = getTSet
{- Puts unhandled changes back into the channel.
- Note: Original order is not preserved. -}
refillChanges :: ChangeChan -> [Change] -> IO ()
refillChanges chan cs = runChangeChan $ mapM_ (writeTChan chan) cs
refillChanges = putTSet
{- Records a change in the channel. -}
recordChange :: ChangeChan -> Change -> IO ()
recordChange = putTSet1

32
Assistant/Commits.hs Normal file
View file

@ -0,0 +1,32 @@
{- git-annex assistant commit tracking
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-}
module Assistant.Commits where
import Utility.TSet
import Data.Time.Clock
type CommitChan = TSet Commit
data Commit = Commit UTCTime
deriving (Show)
newCommitChan :: IO CommitChan
newCommitChan = newTSet
{- Gets all unhandled commits.
- Blocks until at least one commit is made. -}
getCommits :: CommitChan -> IO [Commit]
getCommits = getTSet
{- Puts unhandled commits back into the channel.
- Note: Original order is not preserved. -}
refillCommits :: CommitChan -> [Commit] -> IO ()
refillCommits = putTSet
{- Records a commit in the channel. -}
recordCommit :: CommitChan -> Commit -> IO ()
recordCommit = putTSet1

View file

@ -7,6 +7,7 @@ module Assistant.Committer where
import Common.Annex
import Assistant.Changes
import Assistant.Commits
import Assistant.ThreadedMonad
import Assistant.Watcher
import qualified Annex
@ -26,8 +27,8 @@ import qualified Data.Set as S
import Data.Either
{- This thread makes git commits at appropriate times. -}
commitThread :: ThreadState -> ChangeChan -> IO ()
commitThread st changechan = runEvery (Seconds 1) $ do
commitThread :: ThreadState -> ChangeChan -> CommitChan -> IO ()
commitThread st changechan commitchan = runEvery (Seconds 1) $ do
-- We already waited one second as a simple rate limiter.
-- Next, wait until at least one change is available for
-- processing.
@ -40,6 +41,7 @@ commitThread st changechan = runEvery (Seconds 1) $ do
if shouldCommit time readychanges
then do
void $ tryIO $ runThreadState st commitStaged
recordCommit commitchan (Commit time)
else refillChanges changechan readychanges
else refillChanges changechan changes

29
Assistant/Syncer.hs Normal file
View file

@ -0,0 +1,29 @@
{- git-annex assistant git syncing thread
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-}
module Assistant.Syncer where
import Assistant.Commits
import Assistant.ThreadedMonad
import qualified Command.Sync
import Utility.ThreadScheduler
{- This thread syncs git commits out to remotes. -}
syncThread :: ThreadState -> CommitChan -> IO ()
syncThread st commitchan = runEvery (Seconds 2) $ do
-- We already waited two seconds as a simple rate limiter.
-- Next, wait until at least one commit has been made
commits <- getCommits commitchan
-- Now see if now's a good time to sync.
if shouldSync commits
then syncToRemotes
else refillCommits commitchan commits
{- Decide if now is a good time to sync commits to remotes. -}
shouldSync :: [Commit] -> Bool
shouldSync commits = not (null commits)
syncToRemotes :: IO ()
syncToRemotes = return () -- TOOD

View file

@ -32,7 +32,8 @@ withThreadState a = do
{- Runs an Annex action, using the state from the MVar.
-
- This serializes calls by threads. -}
- This serializes calls by threads; only one thread can run in Annex at a
- time. -}
runThreadState :: ThreadState -> Annex a -> IO a
runThreadState mvar a = do
startstate <- takeMVar mvar

View file

@ -27,7 +27,6 @@ import Annex.Content
import Annex.CatFile
import Git.Types
import Control.Concurrent.STM
import Data.Bits.Utils
import qualified Data.ByteString.Lazy as L
@ -96,8 +95,7 @@ runHandler st dstatus changechan handler file filestatus = void $ do
case r of
Left e -> print e
Right Nothing -> noop
Right (Just change) -> void $
runChangeChan $ writeTChan changechan change
Right (Just change) -> recordChange changechan change
where
go = runThreadState st $ handler file filestatus dstatus

39
Utility/TSet.hs Normal file
View file

@ -0,0 +1,39 @@
{- Transactional sets
-
- Copyright 2012 Joey Hess <joey@kitenet.net>
-}
module Utility.TSet where
import Common
import Control.Concurrent.STM
type TSet = TChan
runTSet :: STM a -> IO a
runTSet = atomically
newTSet :: IO (TSet a)
newTSet = atomically newTChan
{- Gets the contents of the TSet. Blocks until at least one item is
- present. -}
getTSet :: TSet a -> IO [a]
getTSet tset = runTSet $ do
c <- readTChan tset
go [c]
where
go l = do
v <- tryReadTChan tset
case v of
Nothing -> return l
Just c -> go (c:l)
{- Puts items into a TSet. -}
putTSet :: TSet a -> [a] -> IO ()
putTSet tset vs = runTSet $ mapM_ (writeTChan tset) vs
{- Put a single item into a TSet. -}
putTSet1 :: TSet a -> a -> IO ()
putTSet1 tset v = void $ runTSet $ writeTChan tset v