optimise read and write for Keys database (untested)

Writes are optimised by queueing up multiple writes when possible.
The queue is flushed after the Annex monad action finishes. That makes it
happen on program termination, and also whenever a nested Annex monad action
finishes.

Reads are optimised by checking once (per AnnexState) if the database
exists. If the database doesn't exist yet, all reads return mempty.

Reads also cause queued writes to be flushed, so reads will always be
consistent with writes (as long as they're made inside the same Annex monad).
A future optimisation path would be to determine when that's not necessary,
which is probably most of the time, and avoid flushing unncessarily.

Design notes for this commit:

- separate reads from writes
- reuse a handle which is left open until program
  exit or until the MVar goes out of scope (and autoclosed then)
- writes are queued
  - queue is flushed periodically
  - immediate queue flush before any read
  - auto-flush queue when database handle is garbage collected
  - flush queue on exit from Annex monad
    (Note that this may happen repeatedly for a single database connection;
    or a connection may be reused for multiple Annex monad actions,
    possibly even concurrent ones.)
- if database does not exist (or is empty) the handle
  is not opened by reads; reads instead return empty results
- writes open the handle if it was not open previously
This commit is contained in:
Joey Hess 2015-12-23 18:34:51 -04:00
parent 959b060e26
commit 4224fae71f
Failed to extract signature
10 changed files with 213 additions and 91 deletions

View file

@ -60,6 +60,7 @@ import Types.NumCopies
import Types.LockCache import Types.LockCache
import Types.DesktopNotify import Types.DesktopNotify
import Types.CleanupActions import Types.CleanupActions
import qualified Database.Keys.Handle as Keys
#ifdef WITH_QUVI #ifdef WITH_QUVI
import Utility.Quvi (QuviVersion) import Utility.Quvi (QuviVersion)
#endif #endif
@ -134,6 +135,7 @@ data AnnexState = AnnexState
, desktopnotify :: DesktopNotify , desktopnotify :: DesktopNotify
, workers :: [Either AnnexState (Async AnnexState)] , workers :: [Either AnnexState (Async AnnexState)]
, concurrentjobs :: Maybe Int , concurrentjobs :: Maybe Int
, keysdbhandle :: Maybe Keys.DbHandle
} }
newState :: GitConfig -> Git.Repo -> AnnexState newState :: GitConfig -> Git.Repo -> AnnexState
@ -179,6 +181,7 @@ newState c r = AnnexState
, desktopnotify = mempty , desktopnotify = mempty
, workers = [] , workers = []
, concurrentjobs = Nothing , concurrentjobs = Nothing
, keysdbhandle = Nothing
} }
{- Makes an Annex state object for the specified git repo. {- Makes an Annex state object for the specified git repo.
@ -193,25 +196,26 @@ new r = do
{- Performs an action in the Annex monad from a starting state, {- Performs an action in the Annex monad from a starting state,
- returning a new state. -} - returning a new state. -}
run :: AnnexState -> Annex a -> IO (a, AnnexState) run :: AnnexState -> Annex a -> IO (a, AnnexState)
run s a = do run s a = flip run' a =<< newMVar s
mvar <- newMVar s
run' :: MVar AnnexState -> Annex a -> IO (a, AnnexState)
run' mvar a = do
r <- runReaderT (runAnnex a) mvar r <- runReaderT (runAnnex a) mvar
s' <- takeMVar mvar s' <- takeMVar mvar
maybe noop Keys.flushDbQueue (keysdbhandle s')
return (r, s') return (r, s')
{- Performs an action in the Annex monad from a starting state, {- Performs an action in the Annex monad from a starting state,
- and throws away the new state. -} - and throws away the new state. -}
eval :: AnnexState -> Annex a -> IO a eval :: AnnexState -> Annex a -> IO a
eval s a = do eval s a = fst <$> run s a
mvar <- newMVar s
runReaderT (runAnnex a) mvar
{- Makes a runner action, that allows diving into IO and from inside {- Makes a runner action, that allows diving into IO and from inside
- the IO action, running an Annex action. -} - the IO action, running an Annex action. -}
makeRunner :: Annex (Annex a -> IO a) makeRunner :: Annex (Annex a -> IO a)
makeRunner = do makeRunner = do
mvar <- ask mvar <- ask
return $ \a -> runReaderT (runAnnex a) mvar return $ \a -> fst <$> run' mvar a
getState :: (AnnexState -> v) -> Annex v getState :: (AnnexState -> v) -> Annex v
getState selector = do getState selector = do

View file

@ -78,10 +78,6 @@ openDb u = do
rename tmpdbdir dbdir rename tmpdbdir dbdir
lockFileCached =<< fromRepo (gitAnnexFsckDbLock u) lockFileCached =<< fromRepo (gitAnnexFsckDbLock u)
h <- liftIO $ H.openDbQueue db "fscked" h <- liftIO $ H.openDbQueue db "fscked"
-- work around https://github.com/yesodweb/persistent/issues/474
liftIO setConsoleEncoding
return $ FsckHandle h u return $ FsckHandle h u
closeDb :: FsckHandle -> Annex () closeDb :: FsckHandle -> Annex ()

View file

@ -19,6 +19,7 @@ module Database.Handle (
) where ) where
import Utility.Exception import Utility.Exception
import Utility.FileSystemEncoding
import Database.Persist.Sqlite import Database.Persist.Sqlite
import qualified Database.Sqlite as Sqlite import qualified Database.Sqlite as Sqlite
@ -66,6 +67,10 @@ openDb :: FilePath -> TableName -> IO DbHandle
openDb db tablename = do openDb db tablename = do
jobs <- newEmptyMVar jobs <- newEmptyMVar
worker <- async (workerThread (T.pack db) tablename jobs) worker <- async (workerThread (T.pack db) tablename jobs)
-- work around https://github.com/yesodweb/persistent/issues/474
liftIO setConsoleEncoding
return $ DbHandle worker jobs return $ DbHandle worker jobs
{- This is optional; when the DbHandle gets garbage collected it will {- This is optional; when the DbHandle gets garbage collected it will

View file

@ -12,8 +12,6 @@
module Database.Keys ( module Database.Keys (
DbHandle, DbHandle,
openDb,
closeDb,
addAssociatedFile, addAssociatedFile,
getAssociatedFiles, getAssociatedFiles,
getAssociatedKey, getAssociatedKey,
@ -27,7 +25,7 @@ module Database.Keys (
) where ) where
import Database.Types import Database.Types
import Database.Keys.Types import Database.Keys.Handle
import qualified Database.Queue as H import qualified Database.Queue as H
import Locations import Locations
import Common hiding (delete) import Common hiding (delete)
@ -35,12 +33,12 @@ import Annex
import Types.Key import Types.Key
import Annex.Perms import Annex.Perms
import Annex.LockFile import Annex.LockFile
import Messages
import Utility.InodeCache import Utility.InodeCache
import Annex.InodeSentinal import Annex.InodeSentinal
import Database.Persist.TH import Database.Persist.TH
import Database.Esqueleto hiding (Key) import Database.Esqueleto hiding (Key)
import Data.Time.Clock
share [mkPersist sqlSettings, mkMigrate "migrateKeysDb"] [persistLowerCase| share [mkPersist sqlSettings, mkMigrate "migrateKeysDb"] [persistLowerCase|
Associated Associated
@ -53,7 +51,86 @@ Content
KeyCacheIndex key cache KeyCacheIndex key cache
|] |]
{- Opens the database, creating it if it doesn't exist yet. newtype ReadHandle = ReadHandle H.DbQueue
type Reader v = ReadHandle -> Annex v
{- Runs an action that reads from the database.
-
- If the database doesn't already exist, it's not created; mempty is
- returned instead. This way, when the keys database is not in use,
- there's minimal overhead in checking it.
-
- If the database is already open, any writes are flushed to it, to ensure
- consistency.
-
- Any queued writes will be flushed before the read.
-}
runReader :: Monoid v => Reader v -> Annex v
runReader a = do
h <- getDbHandle
withDbState h go
where
go DbEmpty = return (mempty, DbEmpty)
go st@(DbOpen qh) = do
liftIO $ H.flushDbQueue qh
v <- a (ReadHandle qh)
return (v, st)
go DbClosed = do
st' <- openDb False DbClosed
v <- case st' of
(DbOpen qh) -> a (ReadHandle qh)
_ -> return mempty
return (v, st')
readDb :: SqlPersistM a -> ReadHandle -> Annex a
readDb a (ReadHandle h) = liftIO $ H.queryDbQueue h a
newtype WriteHandle = WriteHandle H.DbQueue
type Writer = WriteHandle -> Annex ()
{- Runs an action that writes to the database. Typically this is used to
- queue changes, which will be flushed at a later point.
-
- The database is created if it doesn't exist yet. -}
runWriter :: Writer -> Annex ()
runWriter a = do
h <- getDbHandle
withDbState h go
where
go st@(DbOpen qh) = do
v <- a (WriteHandle qh)
return (v, st)
go st = do
st' <- openDb True st
v <- case st' of
DbOpen qh -> a (WriteHandle qh)
_ -> error "internal"
return (v, st)
queueDb :: SqlPersistM () -> WriteHandle -> Annex ()
queueDb a (WriteHandle h) = liftIO $ H.queueDb h checkcommit a
where
-- commit queue after 1000 changes or 5 minutes, whichever comes first
checkcommit sz lastcommittime
| sz > 1000 = return True
| otherwise = do
now <- getCurrentTime
return $ diffUTCTime lastcommittime now > 300
{- Gets the handle cached in Annex state; creates a new one if it's not yet
- available, but doesn't open the database. -}
getDbHandle :: Annex DbHandle
getDbHandle = go =<< getState keysdbhandle
where
go (Just h) = pure h
go Nothing = do
h <- liftIO newDbHandle
changeState $ \s -> s { keysdbhandle = Just h }
return h
{- Opens the database, perhaps creating it if it doesn't exist yet.
- -
- Multiple readers and writers can have the database open at the same - Multiple readers and writers can have the database open at the same
- time. Database.Handle deals with the concurrency issues. - time. Database.Handle deals with the concurrency issues.
@ -61,32 +138,32 @@ Content
- the database doesn't exist yet, one caller wins the lock and - the database doesn't exist yet, one caller wins the lock and
- can create it undisturbed. - can create it undisturbed.
-} -}
openDb :: Annex DbHandle openDb :: Bool -> DbState -> Annex DbState
openDb = withExclusiveLock gitAnnexKeysDbLock $ do openDb _ st@(DbOpen _) = return st
openDb False DbEmpty = return DbEmpty
openDb createdb _ = withExclusiveLock gitAnnexKeysDbLock $ do
dbdir <- fromRepo gitAnnexKeysDb dbdir <- fromRepo gitAnnexKeysDb
let db = dbdir </> "db" let db = dbdir </> "db"
unlessM (liftIO $ doesFileExist db) $ do dbexists <- liftIO $ doesFileExist db
liftIO $ do case (dbexists, createdb) of
createDirectoryIfMissing True dbdir (True, _) -> open db
H.initDb db $ void $ (False, True) -> do
runMigrationSilent migrateKeysDb liftIO $ do
setAnnexDirPerm dbdir createDirectoryIfMissing True dbdir
setAnnexFilePerm db H.initDb db $ void $
h <- liftIO $ H.openDbQueue db "content" runMigrationSilent migrateKeysDb
setAnnexDirPerm dbdir
-- work around https://github.com/yesodweb/persistent/issues/474 setAnnexFilePerm db
liftIO setConsoleEncoding open db
(False, False) -> return DbEmpty
return $ DbHandle h where
open db = liftIO $ DbOpen <$> H.openDbQueue db "content"
closeDb :: DbHandle -> IO ()
closeDb (DbHandle h) = H.closeDbQueue h
withDbHandle :: (H.DbQueue -> IO a) -> Annex a
withDbHandle a = bracket openDb (liftIO . closeDb) (\(DbHandle h) -> liftIO (a h))
addAssociatedFile :: Key -> FilePath -> Annex () addAssociatedFile :: Key -> FilePath -> Annex ()
addAssociatedFile k f = withDbHandle $ \h -> H.queueDb h (\_ _ -> pure True) $ do addAssociatedFile k f = runWriter $ addAssociatedFile' k f
addAssociatedFile' :: Key -> FilePath -> Writer
addAssociatedFile' k f = queueDb $ do
-- If the same file was associated with a different key before, -- If the same file was associated with a different key before,
-- remove that. -- remove that.
delete $ from $ \r -> do delete $ from $ \r -> do
@ -98,11 +175,10 @@ addAssociatedFile k f = withDbHandle $ \h -> H.queueDb h (\_ _ -> pure True) $ d
{- Note that the files returned were once associated with the key, but {- Note that the files returned were once associated with the key, but
- some of them may not be any longer. -} - some of them may not be any longer. -}
getAssociatedFiles :: Key -> Annex [FilePath] getAssociatedFiles :: Key -> Annex [FilePath]
getAssociatedFiles k = withDbHandle $ \h -> H.queryDbQueue h $ getAssociatedFiles = runReader . getAssociatedFiles' . toSKey
getAssociatedFiles' $ toSKey k
getAssociatedFiles' :: SKey -> SqlPersistM [FilePath] getAssociatedFiles' :: SKey -> Reader [FilePath]
getAssociatedFiles' sk = do getAssociatedFiles' sk = readDb $ do
l <- select $ from $ \r -> do l <- select $ from $ \r -> do
where_ (r ^. AssociatedKey ==. val sk) where_ (r ^. AssociatedKey ==. val sk)
return (r ^. AssociatedFile) return (r ^. AssociatedFile)
@ -111,22 +187,22 @@ getAssociatedFiles' sk = do
{- Gets any keys that are on record as having a particular associated file. {- Gets any keys that are on record as having a particular associated file.
- (Should be one or none but the database doesn't enforce that.) -} - (Should be one or none but the database doesn't enforce that.) -}
getAssociatedKey :: FilePath -> Annex [Key] getAssociatedKey :: FilePath -> Annex [Key]
getAssociatedKey f = withDbHandle $ \h -> H.queryDbQueue h $ getAssociatedKey = runReader . getAssociatedKey'
getAssociatedKey' f
getAssociatedKey' :: FilePath -> SqlPersistM [Key] getAssociatedKey' :: FilePath -> Reader [Key]
getAssociatedKey' f = do getAssociatedKey' f = readDb $ do
l <- select $ from $ \r -> do l <- select $ from $ \r -> do
where_ (r ^. AssociatedFile ==. val f) where_ (r ^. AssociatedFile ==. val f)
return (r ^. AssociatedKey) return (r ^. AssociatedKey)
return $ map (fromSKey . unValue) l return $ map (fromSKey . unValue) l
removeAssociatedFile :: Key -> FilePath -> Annex () removeAssociatedFile :: Key -> FilePath -> Annex ()
removeAssociatedFile k f = withDbHandle $ \h -> H.queueDb h (\_ _ -> pure True) $ removeAssociatedFile k = runWriter . removeAssociatedFile' (toSKey k)
removeAssociatedFile' :: SKey -> FilePath -> Writer
removeAssociatedFile' sk f = queueDb $
delete $ from $ \r -> do delete $ from $ \r -> do
where_ (r ^. AssociatedKey ==. val sk &&. r ^. AssociatedFile ==. val f) where_ (r ^. AssociatedKey ==. val sk &&. r ^. AssociatedFile ==. val f)
where
sk = toSKey k
{- Stats the files, and stores their InodeCaches. -} {- Stats the files, and stores their InodeCaches. -}
storeInodeCaches :: Key -> [FilePath] -> Annex () storeInodeCaches :: Key -> [FilePath] -> Annex ()
@ -134,23 +210,28 @@ storeInodeCaches k fs = withTSDelta $ \d ->
addInodeCaches k . catMaybes =<< liftIO (mapM (`genInodeCache` d) fs) addInodeCaches k . catMaybes =<< liftIO (mapM (`genInodeCache` d) fs)
addInodeCaches :: Key -> [InodeCache] -> Annex () addInodeCaches :: Key -> [InodeCache] -> Annex ()
addInodeCaches k is = withDbHandle $ \h -> H.queueDb h (\_ _ -> pure True) $ addInodeCaches k is = runWriter $ addInodeCaches' (toSKey k) is
forM_ is $ \i -> insertUnique $ Content (toSKey k) (toSInodeCache i)
addInodeCaches' :: SKey -> [InodeCache] -> Writer
addInodeCaches' sk is = queueDb $
forM_ is $ \i -> insertUnique $ Content sk (toSInodeCache i)
{- A key may have multiple InodeCaches; one for the annex object, and one {- A key may have multiple InodeCaches; one for the annex object, and one
- for each pointer file that is a copy of it. -} - for each pointer file that is a copy of it. -}
getInodeCaches :: Key -> Annex [InodeCache] getInodeCaches :: Key -> Annex [InodeCache]
getInodeCaches k = withDbHandle $ \h -> H.queryDbQueue h $ do getInodeCaches = runReader . getInodeCaches' . toSKey
getInodeCaches' :: SKey -> Reader [InodeCache]
getInodeCaches' sk = readDb $ do
l <- select $ from $ \r -> do l <- select $ from $ \r -> do
where_ (r ^. ContentKey ==. val sk) where_ (r ^. ContentKey ==. val sk)
return (r ^. ContentCache) return (r ^. ContentCache)
return $ map (fromSInodeCache. unValue) l return $ map (fromSInodeCache. unValue) l
where
sk = toSKey k
removeInodeCaches :: Key -> Annex () removeInodeCaches :: Key -> Annex ()
removeInodeCaches k = withDbHandle $ \h -> H.queueDb h (\_ _ -> pure True) $ removeInodeCaches = runWriter . removeInodeCaches' . toSKey
removeInodeCaches' :: SKey -> Writer
removeInodeCaches' sk = queueDb $
delete $ from $ \r -> do delete $ from $ \r -> do
where_ (r ^. ContentKey ==. val sk) where_ (r ^. ContentKey ==. val sk)
where
sk = toSKey k

55
Database/Keys/Handle.hs Normal file
View file

@ -0,0 +1,55 @@
{- Handle for the Keys database.
-
- Copyright 2015 Joey Hess <id@joeyh.name>
-:
- Licensed under the GNU GPL version 3 or higher.
-}
module Database.Keys.Handle (
DbHandle,
newDbHandle,
DbState(..),
withDbState,
flushDbQueue,
) where
import qualified Database.Queue as H
import Utility.Exception
import Control.Concurrent
import Control.Monad.IO.Class (liftIO, MonadIO)
-- The MVar is always left full except when actions are run
-- that access the database.
newtype DbHandle = DbHandle (MVar DbState)
-- The database can be closed or open, but it also may have been
-- tried to open (for read) and didn't exist yet.
data DbState = DbClosed | DbOpen H.DbQueue | DbEmpty
newDbHandle :: IO DbHandle
newDbHandle = DbHandle <$> newMVar DbClosed
-- Runs an action on the state of the handle, which can change its state.
-- The MVar is empty while the action runs, which blocks other users
-- of the handle from running.
withDbState
:: (MonadIO m, MonadCatch m)
=> DbHandle
-> (DbState
-> m (v, DbState))
-> m v
withDbState (DbHandle mvar) a = do
st <- liftIO $ takeMVar mvar
go st `onException` (liftIO $ putMVar mvar st)
where
go st = do
(v, st') <- a st
liftIO $ putMVar mvar st'
return v
flushDbQueue :: DbHandle -> IO ()
flushDbQueue (DbHandle mvar) = go =<< readMVar mvar
where
go (DbOpen qh) = H.flushDbQueue qh
go _ = return ()

View file

@ -1,14 +0,0 @@
{- Sqlite database of information about Keys, data types.
-
- Copyright 2015 Joey Hess <id@joeyh.name>
-:
- Licensed under the GNU GPL version 3 or higher.
-}
module Database.Keys.Types (
DbHandle(..)
) where
import qualified Database.Queue as H
newtype DbHandle = DbHandle H.DbQueue

View file

@ -22,7 +22,6 @@ import Utility.Monad
import Database.Handle import Database.Handle
import Database.Persist.Sqlite import Database.Persist.Sqlite
import Control.Monad
import Control.Concurrent import Control.Concurrent
import Data.Time.Clock import Data.Time.Clock

View file

@ -31,7 +31,6 @@ module Messages (
showHeader, showHeader,
showRaw, showRaw,
setupConsole, setupConsole,
setConsoleEncoding,
enableDebugOutput, enableDebugOutput,
disableDebugOutput, disableDebugOutput,
debugEnabled, debugEnabled,
@ -183,13 +182,6 @@ setupConsole = do
updateGlobalLogger rootLoggerName (setLevel NOTICE . setHandlers [s]) updateGlobalLogger rootLoggerName (setLevel NOTICE . setHandlers [s])
setConsoleEncoding setConsoleEncoding
{- This avoids ghc's output layer crashing on invalid encoded characters in
- filenames when printing them out. -}
setConsoleEncoding :: IO ()
setConsoleEncoding = do
fileEncoding stdout
fileEncoding stderr
{- Log formatter with precision into fractions of a second. -} {- Log formatter with precision into fractions of a second. -}
preciseLogFormatter :: LogFormatter a preciseLogFormatter :: LogFormatter a
preciseLogFormatter = tfLogFormatter "%F %X%Q" "[$time] $msg" preciseLogFormatter = tfLogFormatter "%F %X%Q" "[$time] $msg"

View file

@ -19,6 +19,7 @@ module Utility.FileSystemEncoding (
encodeW8NUL, encodeW8NUL,
decodeW8NUL, decodeW8NUL,
truncateFilePath, truncateFilePath,
setConsoleEncoding,
) where ) where
import qualified GHC.Foreign as GHC import qualified GHC.Foreign as GHC
@ -164,3 +165,10 @@ truncateFilePath n = reverse . go [] n . L8.fromString
else go (c:coll) (cnt - x') (L8.drop 1 bs) else go (c:coll) (cnt - x') (L8.drop 1 bs)
_ -> coll _ -> coll
#endif #endif
{- This avoids ghc's output layer crashing on invalid encoded characters in
- filenames when printing them out. -}
setConsoleEncoding :: IO ()
setConsoleEncoding = do
fileEncoding stdout
fileEncoding stderr

View file

@ -331,18 +331,6 @@ files to be unlocked, while the indirect upgrades don't touch the files.
# fails to drop content from associated file othername, # fails to drop content from associated file othername,
# because it doesn't know it has that name # because it doesn't know it has that name
# git commit clears up this mess # git commit clears up this mess
* A new connection to the Keys database is opened each time.
It would be more efficient to reuse a connection.
However, that needs a way to close the connection, which was a problem.
See 38a23928e9d45b56d6836a4eac703862d63cf93c for details.
* See if the cases where the Keys database is not used can be
optimised. Eg, if the Keys database doesn't exist at all,
we know smudge/clean are not used, so queries don't
need to open the database or do reconciliation, but can simply return none.
Also, no need for Backend.lookupFile to catKeyFile in this case
(when not in direct mode).
However, beware over-optimisation breaking the assistant or perhaps other
long-lived processes.
* Interaction with shared clones. Should avoid hard linking from/to a * Interaction with shared clones. Should avoid hard linking from/to a
object in a shared clone if either repository has the object unlocked. object in a shared clone if either repository has the object unlocked.
(And should avoid unlocking an object if it's hard linked to a shared clone, (And should avoid unlocking an object if it's hard linked to a shared clone,
@ -368,6 +356,14 @@ files to be unlocked, while the indirect upgrades don't touch the files.
smudged files.) smudged files.)
* Audit code for all uses of isDirect. These places almost always need * Audit code for all uses of isDirect. These places almost always need
adjusting to support v6, if they haven't already. adjusting to support v6, if they haven't already.
* Optimisation: See if the database schema can be improved to speed things
up. Are there enough indexes? getAssociatedKey in particular does a
reverse lookup and might benefit from an index.
* Optimisation: Reads from the Keys database avoid doing anything if the
database doesn't exist. This makes v5 repos, or v6 with all locked files
faster. However, if a v6 repo unlocks and then re-locks a file, its
database will exist, and so this optimisation will no longer apply.
Could try to detect when the database is empty, and remove it or avoid reads.
* Eventually (but not yet), make v6 the default for new repositories. * Eventually (but not yet), make v6 the default for new repositories.
Note that the assistant forces repos into direct mode; that will need to Note that the assistant forces repos into direct mode; that will need to