git-annex/Annex/Queue.hs
Joey Hess 6a3bd283b8
add restage log
When pointer files need to be restaged, they're first written to the
log, and then when the restage operation runs, it reads the log. This
way, if the git-annex process is interrupted before it can do the
restaging, a later git-annex process can do it.

Currently, this lets a git-annex get/drop command be interrupted and
then re-ran, and as long as it gets/drops additional files, it will
clean up after the interrupted command. But more changes are
needed to make it easier to restage after an interrupted process.

Kept using the git queue to run the restage action, even though the
list of files that it builds up for that action is not actually used by
the action. This could perhaps be simplified to make restaging a cleanup
action that gets registered, rather than using the git queue for it. But
I wasn't sure if that would cause visible behavior changes, when eg
dropping a large number of files, currently the git queue flushes
periodically, and so it restages incrementally, rather than all at the
end.

In restagePointerFiles, it reads the restage log twice, once to get
the number of files and size, and a second time to process it.
This seemed better than reading the whole file into memory, since
potentially a huge number of files could be in there. Probably the OS
will cache the file in memory and there will not be much performance
impact. It might be better to keep running tallies in another file
though. But updating that atomically with the log seems hard.

Also note that it's possible for calcRestageLog to see a different file
than streamRestageLog does. More files may be added to the log in
between. That is ok, it will only cause the filterprocessfaster heuristic to
operate with slightly out of date information, so it may make the wrong
choice for the files that got added and be a little slower than ideal.

Sponsored-by: Dartmouth College's DANDI project
2022-09-23 15:47:24 -04:00

97 lines
2.5 KiB
Haskell

{- git-annex command queue
-
- Copyright 2011-2021 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
{-# LANGUAGE BangPatterns #-}
module Annex.Queue (
addCommand,
addFlushAction,
addUpdateIndex,
flush,
flushWhenFull,
size,
get,
mergeFrom,
) where
import Annex.Common
import Annex hiding (new)
import Annex.LockFile
import qualified Git.Queue
import qualified Git.UpdateIndex
{- Adds a git command to the queue. -}
addCommand :: [CommandParam] -> String -> [CommandParam] -> [FilePath] -> Annex ()
addCommand commonparams command params files = do
q <- get
store =<< flushWhenFull =<<
(Git.Queue.addCommand commonparams command params files q =<< gitRepo)
addFlushAction :: Git.Queue.FlushActionRunner Annex -> [RawFilePath] -> Annex ()
addFlushAction runner files = do
q <- get
store =<< flushWhenFull =<<
(Git.Queue.addFlushAction runner files q =<< gitRepo)
{- Adds an update-index stream to the queue. -}
addUpdateIndex :: Git.UpdateIndex.Streamer -> Annex ()
addUpdateIndex streamer = do
q <- get
store =<< flushWhenFull =<<
(Git.Queue.addUpdateIndex streamer q =<< gitRepo)
{- Runs the queue if it is full. -}
flushWhenFull :: Git.Queue.Queue Annex -> Annex (Git.Queue.Queue Annex)
flushWhenFull q
| Git.Queue.full q = flush' q
| otherwise = return q
{- Runs (and empties) the queue. -}
flush :: Annex ()
flush = do
q <- get
unless (0 == Git.Queue.size q) $ do
store =<< flush' q
{- When there are multiple worker threads, each has its own queue.
- And of course multiple git-annex processes may be running each with its
- own queue.
-
- But, flushing two queues at the same time could lead to failures due to
- git locking files. So, only one queue is allowed to flush at a time.
-}
flush' :: Git.Queue.Queue Annex -> Annex (Git.Queue.Queue Annex)
flush' q = do
lck <- fromRepo gitAnnexGitQueueLock
withExclusiveLock lck $ do
showStoringStateAction
Git.Queue.flush q =<< gitRepo
{- Gets the size of the queue. -}
size :: Annex Int
size = Git.Queue.size <$> get
get :: Annex (Git.Queue.Queue Annex)
get = maybe new return =<< getState repoqueue
new :: Annex (Git.Queue.Queue Annex)
new = do
sz <- annexQueueSize <$> getGitConfig
q <- liftIO $ Git.Queue.new sz Nothing
store q
return q
store :: Git.Queue.Queue Annex -> Annex ()
store q = changeState $ \s -> s { repoqueue = Just q }
mergeFrom :: AnnexState -> Annex ()
mergeFrom st = case repoqueue st of
Nothing -> noop
Just newq -> do
q <- get
let !q' = Git.Queue.merge q newq
store =<< flushWhenFull q'