avoid flushing keys db queue after each Annex action

The flush was only done Annex.run' to make sure that the queue was flushed
before git-annex exits. But, doing it there means that as soon as one
change gets queued, it gets flushed soon after, which contributes to
excessive writes to the database, slowing git-annex down.
(This does not yet speed git-annex up, but it is a stepping stone to
doing so.)

Database queues do not autoflush when garbage collected, so have to
be flushed explicitly. I don't think it's possible to make them
autoflush (except perhaps if git-annex sqitched to using ResourceT..).
The comment in Database.Keys.closeDb used to be accurate, since the
automatic flushing did mean that all writes reached the database even
when closeDb was not called. But now, closeDb or flushDb needs to be
called before stopping using an Annex state. So, removed that comment.

In Remote.Git, change to using quiesce everywhere that it used to use
stopCoProcesses. This means that uses on onLocal in there are just as
slow as before. I considered only calling closeDb on the local git remotes
when git-annex exits. But, the reason that Remote.Git calls stopCoProcesses
in each onLocal is so as not to leave git processes running that have files
open on the remote repo, when it's on removable media. So, it seemed to make
sense to also closeDb after each one, since sqlite may also keep files
open. Although that has not seemed to cause problems with removable
media so far. It was also just easier to quiesce in each onLocal than
once at the end. This does likely leave performance on the floor, so
could be revisited.

In Annex.Content.saveState, there was no reason to close the db,
flushing it is enough.

The rest of the changes are from auditing for Annex.new, and making
sure that quiesce is called, after any action that might possibly need
it.

After that audit, I'm pretty sure that the change to Annex.run' is
safe. The only concern might be that this does let more changes get
queued for write to the db, and if git-annex is interrupted, those will be
lost. But interrupting git-annex can obviously already prevent it from
writing the most recent change to the db, so it must recover from such
lost data... right?

Sponsored-by: Dartmouth College's Datalad project
This commit is contained in:
Joey Hess 2022-10-12 13:50:46 -04:00
parent b312b2a30b
commit ba7ecbc6a9
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
11 changed files with 49 additions and 32 deletions

View file

@ -287,12 +287,8 @@ run (st, rd) a = do
run' :: MVar AnnexState -> AnnexRead -> Annex a -> IO (a, (AnnexState, AnnexRead)) run' :: MVar AnnexState -> AnnexRead -> Annex a -> IO (a, (AnnexState, AnnexRead))
run' mvar rd a = do run' mvar rd a = do
r <- runReaderT (runAnnex a) (mvar, rd) r <- runReaderT (runAnnex a) (mvar, rd)
`onException` (flush rd)
flush rd
st <- takeMVar mvar st <- takeMVar mvar
return (r, (st, rd)) return (r, (st, rd))
where
flush = Keys.flushDbQueue . keysdbhandle
{- Performs an action in the Annex monad from a starting state, {- Performs an action in the Annex monad from a starting state,
- and throws away the changed state. -} - and throws away the changed state. -}

View file

@ -1,6 +1,6 @@
{- git-annex actions {- git-annex actions
- -
- Copyright 2010-2020 Joey Hess <id@joeyh.name> - Copyright 2010-2022 Joey Hess <id@joeyh.name>
- -
- Licensed under the GNU AGPL version 3 or higher. - Licensed under the GNU AGPL version 3 or higher.
-} -}
@ -11,7 +11,7 @@ module Annex.Action (
action, action,
verifiedAction, verifiedAction,
startup, startup,
shutdown, quiesce,
stopCoProcesses, stopCoProcesses,
) where ) where
@ -25,6 +25,7 @@ import Annex.CheckAttr
import Annex.HashObject import Annex.HashObject
import Annex.CheckIgnore import Annex.CheckIgnore
import Annex.TransferrerPool import Annex.TransferrerPool
import qualified Database.Keys
import Control.Concurrent.STM import Control.Concurrent.STM
#ifndef mingw32_HOST_OS #ifndef mingw32_HOST_OS
@ -74,12 +75,25 @@ startup = do
return () return ()
#endif #endif
{- Cleanup actions. -} {- Rn all cleanup actions, save all state, stop all long-running child
shutdown :: Bool -> Annex () - processes.
shutdown nocommit = do -
- This can be run repeatedly with other Annex actions run in between,
- but usually it is run only once at the end.
-
- When passed True, avoids making any commits to the git-annex branch,
- leaving changes in the journal for later commit.
-}
quiesce :: Bool -> Annex ()
quiesce nocommit = do
cas <- Annex.withState $ \st -> return
( st { Annex.cleanupactions = mempty }
, Annex.cleanupactions st
)
sequence_ (M.elems cas)
saveState nocommit saveState nocommit
sequence_ =<< M.elems <$> Annex.getState Annex.cleanupactions
stopCoProcesses stopCoProcesses
Database.Keys.closeDb
{- Stops all long-running child processes, including git query processes. -} {- Stops all long-running child processes, including git query processes. -}
stopCoProcesses :: Annex () stopCoProcesses :: Annex ()

View file

@ -718,7 +718,7 @@ listKeys' keyloc want = do
saveState :: Bool -> Annex () saveState :: Bool -> Annex ()
saveState nocommit = doSideAction $ do saveState nocommit = doSideAction $ do
Annex.Queue.flush Annex.Queue.flush
Database.Keys.closeDb Database.Keys.flushDb
unless nocommit $ unless nocommit $
whenM (annexAlwaysCommit <$> Annex.getGitConfig) $ whenM (annexAlwaysCommit <$> Annex.getGitConfig) $
Annex.Branch.commit =<< Annex.Branch.commitMessage Annex.Branch.commit =<< Annex.Branch.commitMessage

View file

@ -49,7 +49,7 @@ inDir dir a = do
state <- Annex.new state <- Annex.new
=<< Git.Config.read =<< Git.Config.read
=<< Git.Construct.fromPath (toRawFilePath dir) =<< Git.Construct.fromPath (toRawFilePath dir)
Annex.eval state $ a `finally` stopCoProcesses Annex.eval state $ a `finally` quiesce True
{- Creates a new repository, and returns its UUID. -} {- Creates a new repository, and returns its UUID. -}
initRepo :: Bool -> Bool -> FilePath -> Maybe String -> Maybe StandardGroup -> IO UUID initRepo :: Bool -> Bool -> FilePath -> Maybe String -> Maybe StandardGroup -> IO UUID

View file

@ -24,6 +24,7 @@ import Annex.Content
import Annex.WorkTree import Annex.WorkTree
import Git.Command import Git.Command
import qualified Utility.RawFilePath as R import qualified Utility.RawFilePath as R
import Annex.Actions
import Data.Time.Clock import Data.Time.Clock
import Data.Char import Data.Char
@ -70,6 +71,7 @@ main = do
ood <- Annex.eval state $ do ood <- Annex.eval state $ do
buildrpms topdir updated buildrpms topdir updated
makeinfos updated version makeinfos updated version
quiesce False
syncToArchiveOrg syncToArchiveOrg
unless (null ood) $ unless (null ood) $
error $ "Some info files are out of date: " ++ show (map fst ood) error $ "Some info files are out of date: " ++ show (map fst ood)

View file

@ -63,7 +63,7 @@ dispatch' subcommandname args fuzzy cmds allargs allcmds fields getgitrepo progn
prepRunCommand cmd annexsetter prepRunCommand cmd annexsetter
startup startup
performCommandAction True cmd seek $ performCommandAction True cmd seek $
shutdown $ cmdnocommit cmd quiesce $ cmdnocommit cmd
go (Left norepo) = do go (Left norepo) = do
let ingitrepo = \a -> a =<< Git.Config.global let ingitrepo = \a -> a =<< Git.Config.global
-- Parse command line with full cmdparser first, -- Parse command line with full cmdparser first,

View file

@ -17,6 +17,7 @@ import Utility.AuthToken
import Annex.UUID import Annex.UUID
import P2P.Address import P2P.Address
import P2P.Auth import P2P.Auth
import Annex.Action
run :: [String] -> IO () run :: [String] -> IO ()
run (_remotename:address:[]) = forever $ run (_remotename:address:[]) = forever $
@ -59,6 +60,8 @@ connectService address port service = do
g <- Annex.gitRepo g <- Annex.gitRepo
conn <- liftIO $ connectPeer g (TorAnnex address port) conn <- liftIO $ connectPeer g (TorAnnex address port)
runst <- liftIO $ mkRunState Client runst <- liftIO $ mkRunState Client
liftIO $ runNetProto runst conn $ auth myuuid authtoken noop >>= \case r <- liftIO $ runNetProto runst conn $ auth myuuid authtoken noop >>= \case
Just _theiruuid -> connect service stdin stdout Just _theiruuid -> connect service stdin stdout
Nothing -> giveup $ "authentication failed, perhaps you need to set " ++ p2pAuthTokenEnv Nothing -> giveup $ "authentication failed, perhaps you need to set " ++ p2pAuthTokenEnv
quiesce False
return r

View file

@ -31,9 +31,7 @@ start (_, key) = fieldTransfer Download key $ \_p -> do
ifM (getViaTmp rsp DefaultVerify key (AssociatedFile Nothing) go) ifM (getViaTmp rsp DefaultVerify key (AssociatedFile Nothing) go)
( do ( do
logStatus key InfoPresent logStatus key InfoPresent
-- forcibly quit after receiving one key, _ <- quiesce True
-- and shutdown cleanly
_ <- shutdown True
return True return True
, return False , return False
) )

View file

@ -30,6 +30,7 @@ import qualified Annex
import Config.Files.AutoStart import Config.Files.AutoStart
import Upgrade import Upgrade
import Annex.Version import Annex.Version
import Annex.Action
import Utility.Android import Utility.Android
import Control.Concurrent import Control.Concurrent
@ -126,8 +127,10 @@ startNoRepo o = go =<< liftIO (filterM doesDirectoryExist =<< readAutoStartFile)
Right state -> void $ Annex.eval state $ do Right state -> void $ Annex.eval state $ do
whenM (fromRepo Git.repoIsLocalBare) $ whenM (fromRepo Git.repoIsLocalBare) $
giveup $ d ++ " is a bare git repository, cannot run the webapp in it" giveup $ d ++ " is a bare git repository, cannot run the webapp in it"
callCommandAction $ r <- callCommandAction $
start' False o start' False o
quiesce False
return r
cannotStartIn :: FilePath -> String -> IO () cannotStartIn :: FilePath -> String -> IO ()
cannotStartIn d reason = warningIO $ "unable to start webapp in repository " ++ d ++ ": " ++ reason cannotStartIn d reason = warningIO $ "unable to start webapp in repository " ++ d ++ ": " ++ reason

View file

@ -1,6 +1,6 @@
{- Sqlite database of information about Keys {- Sqlite database of information about Keys
- -
- Copyright 2015-2021 Joey Hess <id@joeyh.name> - Copyright 2015-2022 Joey Hess <id@joeyh.name>
- -
- Licensed under the GNU AGPL version 3 or higher. - Licensed under the GNU AGPL version 3 or higher.
-} -}
@ -12,6 +12,7 @@
module Database.Keys ( module Database.Keys (
DbHandle, DbHandle,
closeDb, closeDb,
flushDb,
addAssociatedFile, addAssociatedFile,
getAssociatedFiles, getAssociatedFiles,
getAssociatedFilesIncluding, getAssociatedFilesIncluding,
@ -143,14 +144,16 @@ openDb forwrite _ = do
{- Closes the database if it was open. Any writes will be flushed to it. {- Closes the database if it was open. Any writes will be flushed to it.
- -
- This does not normally need to be called; the database will auto-close - This does not prevent further use of the database; it will be re-opened
- when the handle is garbage collected. However, this can be used to - as necessary.
- force a re-read of the database, in case another process has written
- data to it.
-} -}
closeDb :: Annex () closeDb :: Annex ()
closeDb = liftIO . closeDbHandle =<< Annex.getRead Annex.keysdbhandle closeDb = liftIO . closeDbHandle =<< Annex.getRead Annex.keysdbhandle
{- Flushes any queued writes to the database. -}
flushDb :: Annex ()
flushDb = liftIO . flushDbQueue =<< Annex.getRead Annex.keysdbhandle
addAssociatedFile :: Key -> TopFilePath -> Annex () addAssociatedFile :: Key -> TopFilePath -> Annex ()
addAssociatedFile k f = runWriterIO $ SQL.addAssociatedFile k f addAssociatedFile k f = runWriterIO $ SQL.addAssociatedFile k f

View file

@ -355,7 +355,8 @@ tryGitConfigRead autoinit r hasuuid
":" ++ show e ":" ++ show e
Annex.getState Annex.repo Annex.getState Annex.repo
s <- newLocal r s <- newLocal r
liftIO $ Annex.eval s $ check `finally` stopCoProcesses liftIO $ Annex.eval s $ check
`finally` quiesce True
failedreadlocalconfig = do failedreadlocalconfig = do
unless hasuuid $ case Git.remoteName r of unless hasuuid $ case Git.remoteName r of
@ -449,7 +450,6 @@ dropKey' repo r st@(State connpool duc _ _ _) key
Annex.Content.lockContentForRemoval key cleanup $ \lock -> do Annex.Content.lockContentForRemoval key cleanup $ \lock -> do
Annex.Content.removeAnnex lock Annex.Content.removeAnnex lock
cleanup cleanup
Annex.Content.saveState True
, giveup "remote does not have expected annex.uuid value" , giveup "remote does not have expected annex.uuid value"
) )
| Git.repoIsHttp repo = giveup "dropping from http remote not supported" | Git.repoIsHttp repo = giveup "dropping from http remote not supported"
@ -577,11 +577,9 @@ copyToRemote' repo r st@(State connpool duc _ _ _) key file meterupdate
let checksuccess = liftIO checkio >>= \case let checksuccess = liftIO checkio >>= \case
Just err -> giveup err Just err -> giveup err
Nothing -> return True Nothing -> return True
res <- logStatusAfter key $ Annex.Content.getViaTmp rsp verify key file $ \dest -> logStatusAfter key $ Annex.Content.getViaTmp rsp verify key file $ \dest ->
metered (Just (combineMeterUpdate meterupdate p)) key bwlimit $ \_ p' -> metered (Just (combineMeterUpdate meterupdate p)) key bwlimit $ \_ p' ->
copier object (fromRawFilePath dest) key p' checksuccess verify copier object (fromRawFilePath dest) key p' checksuccess verify
Annex.Content.saveState True
return res
) )
unless res $ unless res $
giveup "failed to send content to remote" giveup "failed to send content to remote"
@ -606,7 +604,7 @@ repairRemote r a = return $ do
Annex.eval s $ do Annex.eval s $ do
Annex.BranchState.disableUpdate Annex.BranchState.disableUpdate
ensureInitialized (pure []) ensureInitialized (pure [])
a `finally` stopCoProcesses a `finally` quiesce True
data LocalRemoteAnnex = LocalRemoteAnnex Git.Repo (MVar [(Annex.AnnexState, Annex.AnnexRead)]) data LocalRemoteAnnex = LocalRemoteAnnex Git.Repo (MVar [(Annex.AnnexState, Annex.AnnexRead)])
@ -618,8 +616,8 @@ mkLocalRemoteAnnex repo = LocalRemoteAnnex repo <$> liftIO (newMVar [])
{- Runs an action from the perspective of a local remote. {- Runs an action from the perspective of a local remote.
- -
- The AnnexState is cached for speed and to avoid resource leaks. - The AnnexState is cached for speed and to avoid resource leaks.
- However, coprocesses are stopped after each call to avoid git - However, it is quiesced after each call to avoid git processes
- processes hanging around on removable media. - hanging around on removable media.
- -
- The remote will be automatically initialized/upgraded first, - The remote will be automatically initialized/upgraded first,
- when possible. - when possible.
@ -655,7 +653,7 @@ onLocal' (LocalRemoteAnnex repo mv) a = liftIO (takeMVar mv) >>= \case
go ((st, rd), a') = do go ((st, rd), a') = do
curro <- Annex.getState Annex.output curro <- Annex.getState Annex.output
let act = Annex.run (st { Annex.output = curro }, rd) $ let act = Annex.run (st { Annex.output = curro }, rd) $
a' `finally` stopCoProcesses a' `finally` quiesce True
(ret, (st', _rd)) <- liftIO $ act `onException` cache (st, rd) (ret, (st', _rd)) <- liftIO $ act `onException` cache (st, rd)
liftIO $ cache (st', rd) liftIO $ cache (st', rd)
return ret return ret