eliminate single/multi writer distinction
After commit f4bdecc4ec
, there is no
longer any distinction between SingleWriter and MultiWriter's handling
of read after write.
Databases that were SingleWriter still have lock files that are used to
prevent multiple writers.
This does make writing to such databases a bit more expensive,
because the MultiWriter code path that is now used opens a second db
connection in order to write to them.
This commit is contained in:
parent
c47794991c
commit
f5b642318d
7 changed files with 21 additions and 49 deletions
|
@ -107,7 +107,7 @@ benchDb :: RawFilePath -> Integer -> Annex BenchDb
|
||||||
benchDb tmpdir num = do
|
benchDb tmpdir num = do
|
||||||
liftIO $ putStrLn $ "setting up database with " ++ show num ++ " items"
|
liftIO $ putStrLn $ "setting up database with " ++ show num ++ " items"
|
||||||
initDb db SQL.createTables
|
initDb db SQL.createTables
|
||||||
h <- liftIO $ H.openDbQueue H.MultiWriter db SQL.containedTable
|
h <- liftIO $ H.openDbQueue db SQL.containedTable
|
||||||
liftIO $ populateAssociatedFiles h num
|
liftIO $ populateAssociatedFiles h num
|
||||||
sz <- liftIO $ getFileSize db
|
sz <- liftIO $ getFileSize db
|
||||||
liftIO $ putStrLn $ "size of database on disk: " ++
|
liftIO $ putStrLn $ "size of database on disk: " ++
|
||||||
|
|
|
@ -89,7 +89,7 @@ openDb = do
|
||||||
, liftIO $ runSqlite (T.pack (fromRawFilePath db)) $ void $
|
, liftIO $ runSqlite (T.pack (fromRawFilePath db)) $ void $
|
||||||
runMigrationSilent migrateContentIdentifier
|
runMigrationSilent migrateContentIdentifier
|
||||||
)
|
)
|
||||||
h <- liftIO $ H.openDbQueue H.SingleWriter db "content_identifiers"
|
h <- liftIO $ H.openDbQueue db "content_identifiers"
|
||||||
return $ ContentIdentifierHandle h
|
return $ ContentIdentifierHandle h
|
||||||
|
|
||||||
closeDb :: ContentIdentifierHandle -> Annex ()
|
closeDb :: ContentIdentifierHandle -> Annex ()
|
||||||
|
|
|
@ -104,7 +104,7 @@ openDb u = do
|
||||||
unlessM (liftIO $ R.doesPathExist db) $ do
|
unlessM (liftIO $ R.doesPathExist db) $ do
|
||||||
initDb db $ void $
|
initDb db $ void $
|
||||||
runMigrationSilent migrateExport
|
runMigrationSilent migrateExport
|
||||||
h <- liftIO $ H.openDbQueue H.SingleWriter db "exported"
|
h <- liftIO $ H.openDbQueue db "exported"
|
||||||
return $ ExportHandle h u
|
return $ ExportHandle h u
|
||||||
|
|
||||||
closeDb :: ExportHandle -> Annex ()
|
closeDb :: ExportHandle -> Annex ()
|
||||||
|
|
|
@ -76,7 +76,7 @@ openDb u = do
|
||||||
initDb db $ void $
|
initDb db $ void $
|
||||||
runMigrationSilent migrateFsck
|
runMigrationSilent migrateFsck
|
||||||
lockFileCached =<< fromRepo (gitAnnexFsckDbLock u)
|
lockFileCached =<< fromRepo (gitAnnexFsckDbLock u)
|
||||||
h <- liftIO $ H.openDbQueue H.MultiWriter db "fscked"
|
h <- liftIO $ H.openDbQueue db "fscked"
|
||||||
return $ FsckHandle h u
|
return $ FsckHandle h u
|
||||||
|
|
||||||
closeDb :: FsckHandle -> Annex ()
|
closeDb :: FsckHandle -> Annex ()
|
||||||
|
|
|
@ -9,7 +9,6 @@
|
||||||
|
|
||||||
module Database.Handle (
|
module Database.Handle (
|
||||||
DbHandle,
|
DbHandle,
|
||||||
DbConcurrency(..),
|
|
||||||
openDb,
|
openDb,
|
||||||
TableName,
|
TableName,
|
||||||
queryDb,
|
queryDb,
|
||||||
|
@ -37,40 +36,27 @@ import System.IO
|
||||||
|
|
||||||
{- A DbHandle is a reference to a worker thread that communicates with
|
{- A DbHandle is a reference to a worker thread that communicates with
|
||||||
- the database. It has a MVar which Jobs are submitted to. -}
|
- the database. It has a MVar which Jobs are submitted to. -}
|
||||||
data DbHandle = DbHandle DbConcurrency (Async ()) (MVar Job)
|
data DbHandle = DbHandle (Async ()) (MVar Job)
|
||||||
|
|
||||||
{- Name of a table that should exist once the database is initialized. -}
|
{- Name of a table that should exist once the database is initialized. -}
|
||||||
type TableName = String
|
type TableName = String
|
||||||
|
|
||||||
{- Sqlite only allows a single write to a database at a time; a concurrent
|
|
||||||
- write will crash.
|
|
||||||
-
|
|
||||||
- MultiWrter works around this limitation. It uses additional resources
|
|
||||||
- when writing, because it needs to open the database multiple times. And
|
|
||||||
- writes to the database may block for some time, if other processes are also
|
|
||||||
- writing to it.
|
|
||||||
-
|
|
||||||
- When a database can only be written to by a single process (enforced by
|
|
||||||
- a lock file), use SingleWriter. (Multiple threads can still write.)
|
|
||||||
-}
|
|
||||||
data DbConcurrency = SingleWriter | MultiWriter
|
|
||||||
|
|
||||||
{- Opens the database, but does not perform any migrations. Only use
|
{- Opens the database, but does not perform any migrations. Only use
|
||||||
- once the database is known to exist and have the right tables. -}
|
- once the database is known to exist and have the right tables. -}
|
||||||
openDb :: DbConcurrency -> RawFilePath -> TableName -> IO DbHandle
|
openDb :: RawFilePath -> TableName -> IO DbHandle
|
||||||
openDb dbconcurrency db tablename = do
|
openDb db tablename = do
|
||||||
jobs <- newEmptyMVar
|
jobs <- newEmptyMVar
|
||||||
worker <- async (workerThread (T.pack (fromRawFilePath db)) tablename jobs)
|
worker <- async (workerThread (T.pack (fromRawFilePath db)) tablename jobs)
|
||||||
|
|
||||||
-- work around https://github.com/yesodweb/persistent/issues/474
|
-- work around https://github.com/yesodweb/persistent/issues/474
|
||||||
liftIO $ fileEncoding stderr
|
liftIO $ fileEncoding stderr
|
||||||
|
|
||||||
return $ DbHandle dbconcurrency worker jobs
|
return $ DbHandle worker jobs
|
||||||
|
|
||||||
{- This is optional; when the DbHandle gets garbage collected it will
|
{- This is optional; when the DbHandle gets garbage collected it will
|
||||||
- auto-close. -}
|
- auto-close. -}
|
||||||
closeDb :: DbHandle -> IO ()
|
closeDb :: DbHandle -> IO ()
|
||||||
closeDb (DbHandle _ worker jobs) = do
|
closeDb (DbHandle worker jobs) = do
|
||||||
putMVar jobs CloseJob
|
putMVar jobs CloseJob
|
||||||
wait worker
|
wait worker
|
||||||
|
|
||||||
|
@ -85,7 +71,7 @@ closeDb (DbHandle _ worker jobs) = do
|
||||||
- it is able to run.
|
- it is able to run.
|
||||||
-}
|
-}
|
||||||
queryDb :: DbHandle -> SqlPersistM a -> IO a
|
queryDb :: DbHandle -> SqlPersistM a -> IO a
|
||||||
queryDb (DbHandle _ _ jobs) a = do
|
queryDb (DbHandle _ jobs) a = do
|
||||||
res <- newEmptyMVar
|
res <- newEmptyMVar
|
||||||
putMVar jobs $ QueryJob $
|
putMVar jobs $ QueryJob $
|
||||||
liftIO . putMVar res =<< tryNonAsync a
|
liftIO . putMVar res =<< tryNonAsync a
|
||||||
|
@ -94,10 +80,9 @@ queryDb (DbHandle _ _ jobs) a = do
|
||||||
|
|
||||||
{- Writes a change to the database.
|
{- Writes a change to the database.
|
||||||
-
|
-
|
||||||
- In MultiWriter mode, writes can fail if another write is happening
|
- Writes can fail if another write is happening concurrently.
|
||||||
- concurrently. So write failures are caught and retried repeatedly
|
- So write failures are caught and retried repeatedly for up to 10
|
||||||
- for up to 10 seconds, which should avoid all but the most exceptional
|
- seconds, which should avoid all but the most exceptional problems.
|
||||||
- problems.
|
|
||||||
-}
|
-}
|
||||||
commitDb :: DbHandle -> SqlPersistM () -> IO ()
|
commitDb :: DbHandle -> SqlPersistM () -> IO ()
|
||||||
commitDb h wa = robustly Nothing 100 (commitDb' h wa)
|
commitDb h wa = robustly Nothing 100 (commitDb' h wa)
|
||||||
|
@ -113,22 +98,15 @@ commitDb h wa = robustly Nothing 100 (commitDb' h wa)
|
||||||
robustly (Just e) (n-1) a
|
robustly (Just e) (n-1) a
|
||||||
|
|
||||||
commitDb' :: DbHandle -> SqlPersistM () -> IO (Either SomeException ())
|
commitDb' :: DbHandle -> SqlPersistM () -> IO (Either SomeException ())
|
||||||
commitDb' (DbHandle MultiWriter _ jobs) a = do
|
commitDb' (DbHandle _ jobs) a = do
|
||||||
res <- newEmptyMVar
|
res <- newEmptyMVar
|
||||||
putMVar jobs $ RobustChangeJob $ \runner ->
|
putMVar jobs $ ChangeJob $ \runner ->
|
||||||
liftIO $ putMVar res =<< tryNonAsync (runner a)
|
liftIO $ putMVar res =<< tryNonAsync (runner a)
|
||||||
takeMVar res
|
takeMVar res
|
||||||
commitDb' (DbHandle SingleWriter _ jobs) a = do
|
|
||||||
res <- newEmptyMVar
|
|
||||||
putMVar jobs $ ChangeJob $
|
|
||||||
liftIO . putMVar res =<< tryNonAsync a
|
|
||||||
takeMVar res
|
|
||||||
`catchNonAsync` (const $ error "sqlite commit crashed")
|
|
||||||
|
|
||||||
data Job
|
data Job
|
||||||
= QueryJob (SqlPersistM ())
|
= QueryJob (SqlPersistM ())
|
||||||
| ChangeJob (SqlPersistM ())
|
| ChangeJob ((SqlPersistM () -> IO ()) -> IO ())
|
||||||
| RobustChangeJob ((SqlPersistM () -> IO ()) -> IO ())
|
|
||||||
| CloseJob
|
| CloseJob
|
||||||
|
|
||||||
workerThread :: T.Text -> TableName -> MVar Job -> IO ()
|
workerThread :: T.Text -> TableName -> MVar Job -> IO ()
|
||||||
|
@ -150,16 +128,11 @@ workerThread db tablename jobs = newconn
|
||||||
Left BlockedIndefinitelyOnMVar -> return (return ())
|
Left BlockedIndefinitelyOnMVar -> return (return ())
|
||||||
Right CloseJob -> return (return ())
|
Right CloseJob -> return (return ())
|
||||||
Right (QueryJob a) -> a >> loop
|
Right (QueryJob a) -> a >> loop
|
||||||
Right (ChangeJob a) -> do
|
|
||||||
a
|
|
||||||
-- Exit this sqlite connection so the
|
|
||||||
-- database gets updated on disk.
|
|
||||||
return newconn
|
|
||||||
-- Change is run in a separate database connection
|
-- Change is run in a separate database connection
|
||||||
-- since sqlite only supports a single writer at a
|
-- since sqlite only supports a single writer at a
|
||||||
-- time, and it may crash the database connection
|
-- time, and it may crash the database connection
|
||||||
-- that the write is made to.
|
-- that the write is made to.
|
||||||
Right (RobustChangeJob a) -> do
|
Right (ChangeJob a) -> do
|
||||||
liftIO (a (runSqliteRobustly tablename db))
|
liftIO (a (runSqliteRobustly tablename db))
|
||||||
-- Exit this sqlite connection so the
|
-- Exit this sqlite connection so the
|
||||||
-- change that was just written, using
|
-- change that was just written, using
|
||||||
|
|
|
@ -134,7 +134,7 @@ openDb forwrite _ = catchPermissionDenied permerr $ withExclusiveLock gitAnnexKe
|
||||||
| otherwise = return DbUnavailable
|
| otherwise = return DbUnavailable
|
||||||
|
|
||||||
open db = do
|
open db = do
|
||||||
qh <- liftIO $ H.openDbQueue H.MultiWriter db SQL.containedTable
|
qh <- liftIO $ H.openDbQueue db SQL.containedTable
|
||||||
reconcileStaged qh
|
reconcileStaged qh
|
||||||
return $ DbOpen qh
|
return $ DbOpen qh
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,6 @@
|
||||||
|
|
||||||
module Database.Queue (
|
module Database.Queue (
|
||||||
DbQueue,
|
DbQueue,
|
||||||
DbConcurrency(..),
|
|
||||||
openDbQueue,
|
openDbQueue,
|
||||||
queryDbQueue,
|
queryDbQueue,
|
||||||
closeDbQueue,
|
closeDbQueue,
|
||||||
|
@ -37,9 +36,9 @@ data DbQueue = DQ DbHandle (MVar Queue)
|
||||||
{- Opens the database queue, but does not perform any migrations. Only use
|
{- Opens the database queue, but does not perform any migrations. Only use
|
||||||
- if the database is known to exist and have the right tables; ie after
|
- if the database is known to exist and have the right tables; ie after
|
||||||
- running initDb. -}
|
- running initDb. -}
|
||||||
openDbQueue :: DbConcurrency -> RawFilePath -> TableName -> IO DbQueue
|
openDbQueue :: RawFilePath -> TableName -> IO DbQueue
|
||||||
openDbQueue dbconcurrency db tablename = DQ
|
openDbQueue db tablename = DQ
|
||||||
<$> openDb dbconcurrency db tablename
|
<$> openDb db tablename
|
||||||
<*> (newMVar =<< emptyQueue)
|
<*> (newMVar =<< emptyQueue)
|
||||||
|
|
||||||
{- This or flushDbQueue must be called, eg at program exit to ensure
|
{- This or flushDbQueue must be called, eg at program exit to ensure
|
||||||
|
|
Loading…
Reference in a new issue