make watch use the queue

May not work. Certianly needs to flush the queue from time to time
when only symlink changes are being made.
This commit is contained in:
Joey Hess 2012-06-07 15:40:44 -04:00
parent 0a11b35d89
commit 20f425be19
5 changed files with 35 additions and 24 deletions

View file

@ -1,12 +1,13 @@
{- git-annex command queue {- git-annex command queue
- -
- Copyright 2011 Joey Hess <joey@kitenet.net> - Copyright 2011, 2012 Joey Hess <joey@kitenet.net>
- -
- Licensed under the GNU GPL version 3 or higher. - Licensed under the GNU GPL version 3 or higher.
-} -}
module Annex.Queue ( module Annex.Queue (
addCommand, addCommand,
addUpdateIndex,
flush, flush,
flushWhenFull flushWhenFull
) where ) where
@ -14,6 +15,7 @@ module Annex.Queue (
import Common.Annex import Common.Annex
import Annex hiding (new) import Annex hiding (new)
import qualified Git.Queue import qualified Git.Queue
import qualified Git.UpdateIndex
import Config import Config
{- Adds a git command to the queue. -} {- Adds a git command to the queue. -}
@ -22,6 +24,12 @@ addCommand command params files = do
q <- get q <- get
store =<< inRepo (Git.Queue.addCommand command params files q) store =<< inRepo (Git.Queue.addCommand command params files q)
{- Adds an update-index stream to the queue. -}
addUpdateIndex :: Git.UpdateIndex.Streamer -> Annex ()
addUpdateIndex streamer = do
q <- get
store =<< inRepo (Git.Queue.addUpdateIndex streamer q)
{- Runs the queue if it is full. Should be called periodically. -} {- Runs the queue if it is full. Should be called periodically. -}
flushWhenFull :: Annex () flushWhenFull :: Annex ()
flushWhenFull = do flushWhenFull = do

View file

@ -15,8 +15,8 @@ import Command
import Utility.Inotify import Utility.Inotify
import Utility.ThreadLock import Utility.ThreadLock
import qualified Annex import qualified Annex
import qualified Annex.Queue
import qualified Command.Add import qualified Command.Add
import qualified Git
import qualified Git.Command import qualified Git.Command
import qualified Git.UpdateIndex import qualified Git.UpdateIndex
import Git.HashObject import Git.HashObject
@ -99,7 +99,7 @@ onAdd file = do
go Nothing = showEndFail go Nothing = showEndFail
go (Just key) = do go (Just key) = do
link <- Command.Add.link file key True link <- Command.Add.link file key True
inRepo $ stageSymlink file link stageSymlink file link
showEndOk showEndOk
{- A symlink might be an arbitrary symlink, which is just added. {- A symlink might be an arbitrary symlink, which is just added.
@ -119,7 +119,7 @@ onAddSymlink file = go =<< Backend.lookupFile file
liftIO $ createSymbolicLink link file liftIO $ createSymbolicLink link file
addlink link addlink link
) )
addlink link = inRepo $ stageSymlink file link addlink link = stageSymlink file link
{- The file could reappear at any time, so --cached is used, to only delete {- The file could reappear at any time, so --cached is used, to only delete
- it from the index. -} - it from the index. -}
@ -139,12 +139,10 @@ onErr = warning
{- Adds a symlink to the index, without ever accessing the actual symlink {- Adds a symlink to the index, without ever accessing the actual symlink
- on disk. -} - on disk. -}
stageSymlink :: FilePath -> String -> Git.Repo -> IO () stageSymlink :: FilePath -> String -> Annex ()
stageSymlink file linktext repo = Git.UpdateIndex.stream_update_index repo [stage] stageSymlink file linktext = do
where line <- Git.UpdateIndex.update_index_line
stage streamer = do <$> inRepo (hashObject BlobObject linktext)
line <- Git.UpdateIndex.update_index_line <*> pure SymlinkBlob
<$> (hashObject repo BlobObject linktext) <*> inRepo (toTopFilePath file)
<*> pure SymlinkBlob Annex.Queue.addUpdateIndex $ \streamer -> streamer line
<*> toTopFilePath file repo
streamer line

View file

@ -36,8 +36,8 @@ hashFile h file = CoProcess.query h send receive
receive from = getSha "hash-object" $ hGetLine from receive from = getSha "hash-object" $ hGetLine from
{- Injects some content into git, returning its Sha. -} {- Injects some content into git, returning its Sha. -}
hashObject :: Repo -> ObjectType -> String -> IO Sha hashObject :: ObjectType -> String -> Repo -> IO Sha
hashObject repo objtype content = getSha subcmd $ do hashObject objtype content repo = getSha subcmd $ do
(h, s) <- pipeWriteRead (map Param params) content repo (h, s) <- pipeWriteRead (map Param params) content repo
length s `seq` do length s `seq` do
forceSuccess h forceSuccess h

View file

@ -77,13 +77,11 @@ defaultLimit = 10240
new :: Maybe Int -> Queue new :: Maybe Int -> Queue
new lim = Queue 0 (fromMaybe defaultLimit lim) M.empty new lim = Queue 0 (fromMaybe defaultLimit lim) M.empty
{- Adds a command to a queue. If the queue already contains a different {- Adds an git command to the queue.
- action, it will be flushed; this is to ensure that conflicting actions,
- like add and rm, are run in the right order.
- -
- Actions with the same subcommand but different parameters are - Git commands with the same subcommand but different parameters are
- roughly equivilant; assumed equivilant enough to perform in any order - assumed to be equivilant enough to perform in any order with the same
- with the same result. - result.
-} -}
addCommand :: String -> [CommandParam] -> [FilePath] -> Queue -> Repo -> IO Queue addCommand :: String -> [CommandParam] -> [FilePath] -> Queue -> Repo -> IO Queue
addCommand subcommand params files q repo = addCommand subcommand params files q repo =
@ -100,6 +98,11 @@ addCommand subcommand params files q repo =
different (CommandAction { getSubcommand = s }) = s /= subcommand different (CommandAction { getSubcommand = s }) = s /= subcommand
different _ = True different _ = True
{- Adds an update-index streamer to the queue.
-
- Note that this does not increase the queue size, because data is
- streamed into update-index, so command-line length limits are not
- involved. -}
addUpdateIndex :: Git.UpdateIndex.Streamer -> Queue -> Repo -> IO Queue addUpdateIndex :: Git.UpdateIndex.Streamer -> Queue -> Repo -> IO Queue
addUpdateIndex streamer q repo = addUpdateIndex streamer q repo =
updateQueue action different 0 q repo updateQueue action different 0 q repo
@ -147,7 +150,8 @@ flush (Queue _ lim m) repo = do
- Intentionally runs the command even if the list of files is empty; - Intentionally runs the command even if the list of files is empty;
- this allows queueing commands that do not need a list of files. -} - this allows queueing commands that do not need a list of files. -}
runAction :: Repo -> Action -> IO () runAction :: Repo -> Action -> IO ()
runAction _repo _action@(UpdateIndexAction {}) = error "TODO" runAction repo (UpdateIndexAction streamers) =
Git.UpdateIndex.stream_update_index repo streamers
runAction repo action@(CommandAction {}) = runAction repo action@(CommandAction {}) =
pOpen WriteToPipe "xargs" ("-0":"git":params) feedxargs pOpen WriteToPipe "xargs" ("-0":"git":params) feedxargs
where where

View file

@ -74,8 +74,9 @@ mergeFile :: String -> FilePath -> CatFileHandle -> Repo -> IO (Maybe String)
mergeFile info file h repo = case filter (/= nullSha) [Ref asha, Ref bsha] of mergeFile info file h repo = case filter (/= nullSha) [Ref asha, Ref bsha] of
[] -> return Nothing [] -> return Nothing
(sha:[]) -> use sha (sha:[]) -> use sha
shas -> use =<< either return (hashObject repo BlobObject . unlines) =<< shas -> use
calcMerge . zip shas <$> mapM getcontents shas =<< either return (\s -> hashObject BlobObject (unlines s) repo)
=<< calcMerge . zip shas <$> mapM getcontents shas
where where
[_colonmode, _bmode, asha, bsha, _status] = words info [_colonmode, _bmode, asha, bsha, _status] = words info
getcontents s = map L.unpack . L.lines . getcontents s = map L.unpack . L.lines .