git-annex/Annex/Queue.hs
Joey Hess 759a87ad70
fix git command queue to be concurrency safe
Probably not noticed until now because the queue is large enough that two
threads each filling theirs at the same time and flushing is unlikely to
happen.

Also made explicit that each worker thread gets its own queue.
I think that was the case before, but if something was put in the queue
before worker threads were forked off, they could have each inherited the
same queue.

Could have gone with a single shared queue, but per-worker queues is more
efficient, because a worker can add lots of stuff to its own queue without
any locking.

This commit was sponsored by Ole-Morten Duesund on Patreon.
2018-08-28 13:16:33 -04:00

101 lines
2.4 KiB
Haskell

{- git-annex command queue
-
- Copyright 2011, 2012 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU GPL version 3 or higher.
-}
{-# LANGUAGE BangPatterns #-}
module Annex.Queue (
addCommand,
addInternalAction,
addUpdateIndex,
flush,
flushWhenFull,
size,
get,
mergeFrom,
) where
import Annex.Common
import Annex hiding (new)
import qualified Git.Queue
import qualified Git.UpdateIndex
import qualified Control.Concurrent.SSem as SSem
{- Adds a git command to the queue. -}
addCommand :: String -> [CommandParam] -> [FilePath] -> Annex ()
addCommand command params files = do
q <- get
store <=< flushWhenFull <=< inRepo $
Git.Queue.addCommand command params files q
addInternalAction :: Git.Queue.InternalActionRunner -> [(FilePath, IO Bool)] -> Annex ()
addInternalAction runner files = do
q <- get
store <=< flushWhenFull <=< inRepo $
Git.Queue.addInternalAction runner files q
{- Adds an update-index stream to the queue. -}
addUpdateIndex :: Git.UpdateIndex.Streamer -> Annex ()
addUpdateIndex streamer = do
q <- get
store <=< flushWhenFull <=< inRepo $
Git.Queue.addUpdateIndex streamer q
{- Runs the queue if it is full. -}
flushWhenFull :: Git.Queue.Queue -> Annex Git.Queue.Queue
flushWhenFull q
| Git.Queue.full q = flush' q
| otherwise = return q
{- Runs (and empties) the queue. -}
flush :: Annex ()
flush = do
q <- get
unless (0 == Git.Queue.size q) $ do
store =<< flush' q
{- When there are multiple worker threads, each has its own queue.
-
- But, flushing two queues at the same time could lead to failures due to
- git locking files. So, only one queue is allowed to flush at a time.
- The repoqueuesem is shared between threads.
-}
flush' :: Git.Queue.Queue -> Annex Git.Queue.Queue
flush' q = bracket lock unlock go
where
lock = do
s <- getState repoqueuesem
liftIO $ SSem.wait s
return s
unlock = liftIO . SSem.signal
go _ = do
showStoringStateAction
inRepo $ Git.Queue.flush q
{- Gets the size of the queue. -}
size :: Annex Int
size = Git.Queue.size <$> get
get :: Annex Git.Queue.Queue
get = maybe new return =<< getState repoqueue
new :: Annex Git.Queue.Queue
new = do
q <- Git.Queue.new . annexQueueSize <$> getGitConfig
store q
return q
store :: Git.Queue.Queue -> Annex ()
store q = changeState $ \s -> s { repoqueue = Just q }
mergeFrom :: AnnexState -> Annex ()
mergeFrom st = case repoqueue st of
Nothing -> noop
Just newq -> do
q <- get
let !q' = Git.Queue.merge q newq
store =<< flushWhenFull q'