merge git command queue when joining with concurrent thread

This commit is contained in:
Joey Hess 2015-11-05 18:21:48 -04:00
parent ab6b1edfee
commit 31472161e4
Failed to extract signature
3 changed files with 37 additions and 13 deletions

View file

@ -12,6 +12,7 @@ import Annex
import Annex.CatFile
import Annex.CheckAttr
import Annex.CheckIgnore
import qualified Annex.Queue
import qualified Data.Map as M
@ -57,6 +58,7 @@ mergeState st = do
st' <- liftIO $ snd <$> run st closehandles
forM_ (M.toList $ Annex.cleanup st') $
uncurry addCleanup
Annex.Queue.mergeFrom st'
changeState $ \s -> s { errcounter = errcounter s + errcounter st' }
where
closehandles = do

View file

@ -5,12 +5,15 @@
- Licensed under the GNU GPL version 3 or higher.
-}
{-# LANGUAGE BangPatterns #-}
module Annex.Queue (
addCommand,
addUpdateIndex,
flush,
flushWhenFull,
size
size,
mergeFrom,
) where
import Common.Annex
@ -60,3 +63,12 @@ new = do
store :: Git.Queue.Queue -> Annex ()
store q = changeState $ \s -> s { repoqueue = Just q }
mergeFrom :: AnnexState -> Annex ()
mergeFrom st = case repoqueue st of
Nothing -> noop
Just newq -> do
q <- get
let !q' = Git.Queue.merge q newq
store q'
flushWhenFull

View file

@ -15,6 +15,7 @@ module Git.Queue (
size,
full,
flush,
merge,
) where
import Utility.SafeCommand
@ -25,14 +26,11 @@ import qualified Git.UpdateIndex
import qualified Data.Map as M
{- Queable actions that can be performed in a git repository.
-}
{- Queable actions that can be performed in a git repository. -}
data Action
{- Updating the index file, using a list of streamers that can
- be added to as the queue grows. -}
= UpdateIndexAction
{ getStreamers :: [Git.UpdateIndex.Streamer] -- in reverse order
}
= UpdateIndexAction [Git.UpdateIndex.Streamer] -- in reverse order
{- A git command to run, on a list of files that can be added to
- as the queue grows. -}
| CommandAction
@ -84,13 +82,11 @@ addCommand :: String -> [CommandParam] -> [FilePath] -> Queue -> Repo -> IO Queu
addCommand subcommand params files q repo =
updateQueue action different (length files) q repo
where
key = actionKey action
action = CommandAction
{ getSubcommand = subcommand
, getParams = params
, getFiles = allfiles
, getFiles = map File files
}
allfiles = map File files ++ maybe [] getFiles (M.lookup key $ items q)
different (CommandAction { getSubcommand = s }) = s /= subcommand
different _ = True
@ -100,10 +96,8 @@ addUpdateIndex :: Git.UpdateIndex.Streamer -> Queue -> Repo -> IO Queue
addUpdateIndex streamer q repo =
updateQueue action different 1 q repo
where
key = actionKey action
-- the list is built in reverse order
action = UpdateIndexAction $ streamer : streamers
streamers = maybe [] getStreamers $ M.lookup key $ items q
action = UpdateIndexAction [streamer]
different (UpdateIndexAction _) = False
different _ = True
@ -123,7 +117,23 @@ updateQueue !action different sizeincrease q repo
, items = newitems
}
!newsize = size q' + sizeincrease
!newitems = M.insertWith' const (actionKey action) action (items q')
!newitems = M.insertWith' combineNewOld (actionKey action) action (items q')
combineNewOld :: Action -> Action -> Action
combineNewOld (CommandAction _sc1 _ps1 fs1) (CommandAction sc2 ps2 fs2) =
CommandAction sc2 ps2 (fs1++fs2)
combineNewOld (UpdateIndexAction s1) (UpdateIndexAction s2) =
UpdateIndexAction (s1++s2)
combineNewOld anew _aold = anew
{- Merges the contents of the second queue into the first.
- This should only be used when the two queues are known to contain
- non-conflicting actions. -}
merge :: Queue -> Queue -> Queue
merge origq newq = origq
{ size = size origq + size newq
, items = M.unionWith combineNewOld (items newq) (items origq)
}
{- Is a queue large enough that it should be flushed? -}
full :: Queue -> Bool