fix consistency bug reading from export database
The export database has writes made to it and then expects to read back the same data immediately. But, the way that Database.Handle does writes, in order to support multiple writers, makes that not work, due to caching issues. This resulted in export re-uploading files it had already successfully renamed into place. Fixed by allowing databases to be opened in MultiWriter or SingleWriter mode. The export database only needs to support a single writer; it does not make sense for multiple exports to run at the same time to the same special remote. All other databases still use MultiWriter mode. And by inspection, nothing else in git-annex seems to be relying on being able to immediately query for changes that were just written to the database. This commit was supported by the NSF-funded DataLad project.
This commit is contained in:
parent
4f657ba918
commit
6ab14710fc
7 changed files with 65 additions and 23 deletions
|
@ -48,7 +48,7 @@ openDb u = do
|
||||||
unlessM (liftIO $ doesFileExist db) $ do
|
unlessM (liftIO $ doesFileExist db) $ do
|
||||||
initDb db $ void $
|
initDb db $ void $
|
||||||
runMigrationSilent migrateExport
|
runMigrationSilent migrateExport
|
||||||
h <- liftIO $ H.openDbQueue db "exported"
|
h <- liftIO $ H.openDbQueue H.SingleWriter db "exported"
|
||||||
return $ ExportHandle h
|
return $ ExportHandle h
|
||||||
|
|
||||||
closeDb :: ExportHandle -> Annex ()
|
closeDb :: ExportHandle -> Annex ()
|
||||||
|
|
|
@ -63,7 +63,7 @@ openDb u = do
|
||||||
initDb db $ void $
|
initDb db $ void $
|
||||||
runMigrationSilent migrateFsck
|
runMigrationSilent migrateFsck
|
||||||
lockFileCached =<< fromRepo (gitAnnexFsckDbLock u)
|
lockFileCached =<< fromRepo (gitAnnexFsckDbLock u)
|
||||||
h <- liftIO $ H.openDbQueue db "fscked"
|
h <- liftIO $ H.openDbQueue H.MultiWriter db "fscked"
|
||||||
return $ FsckHandle h u
|
return $ FsckHandle h u
|
||||||
|
|
||||||
closeDb :: FsckHandle -> Annex ()
|
closeDb :: FsckHandle -> Annex ()
|
||||||
|
|
|
@ -9,6 +9,7 @@
|
||||||
|
|
||||||
module Database.Handle (
|
module Database.Handle (
|
||||||
DbHandle,
|
DbHandle,
|
||||||
|
DbConcurrency(..),
|
||||||
openDb,
|
openDb,
|
||||||
TableName,
|
TableName,
|
||||||
queryDb,
|
queryDb,
|
||||||
|
@ -35,27 +36,49 @@ import System.IO
|
||||||
|
|
||||||
{- A DbHandle is a reference to a worker thread that communicates with
|
{- A DbHandle is a reference to a worker thread that communicates with
|
||||||
- the database. It has a MVar which Jobs are submitted to. -}
|
- the database. It has a MVar which Jobs are submitted to. -}
|
||||||
data DbHandle = DbHandle (Async ()) (MVar Job)
|
data DbHandle = DbHandle DbConcurrency (Async ()) (MVar Job)
|
||||||
|
|
||||||
{- Name of a table that should exist once the database is initialized. -}
|
{- Name of a table that should exist once the database is initialized. -}
|
||||||
type TableName = String
|
type TableName = String
|
||||||
|
|
||||||
|
{- Sqlite only allows a single write to a database at a time; a concurrent
|
||||||
|
- write will crash.
|
||||||
|
-
|
||||||
|
- While a DbHandle serializes concurrent writes from
|
||||||
|
- multiple threads. But, when a database can be written to by
|
||||||
|
- multiple processes concurrently, use MultiWriter to make writes
|
||||||
|
- to the database be done robustly.
|
||||||
|
-
|
||||||
|
- The downside of using MultiWriter is that after writing a change to the
|
||||||
|
- database, the a query using the same DbHandle will not immediately see
|
||||||
|
- the change! This is because the change is actually written using a
|
||||||
|
- separate database connection, and caching can prevent seeing the change.
|
||||||
|
- Also, consider that if multiple processes are writing to a database,
|
||||||
|
- you can't rely on seeing values you've just written anyway, as another
|
||||||
|
- process may change them.
|
||||||
|
-
|
||||||
|
- When a database can only be written to by a single process, use
|
||||||
|
- SingleWriter. Changes written to the database will always be immediately
|
||||||
|
- visible then.
|
||||||
|
-}
|
||||||
|
data DbConcurrency = SingleWriter | MultiWriter
|
||||||
|
|
||||||
{- Opens the database, but does not perform any migrations. Only use
|
{- Opens the database, but does not perform any migrations. Only use
|
||||||
- if the database is known to exist and have the right tables. -}
|
- once the database is known to exist and have the right tables. -}
|
||||||
openDb :: FilePath -> TableName -> IO DbHandle
|
openDb :: DbConcurrency -> FilePath -> TableName -> IO DbHandle
|
||||||
openDb db tablename = do
|
openDb dbconcurrency db tablename = do
|
||||||
jobs <- newEmptyMVar
|
jobs <- newEmptyMVar
|
||||||
worker <- async (workerThread (T.pack db) tablename jobs)
|
worker <- async (workerThread (T.pack db) tablename jobs)
|
||||||
|
|
||||||
-- work around https://github.com/yesodweb/persistent/issues/474
|
-- work around https://github.com/yesodweb/persistent/issues/474
|
||||||
liftIO $ fileEncoding stderr
|
liftIO $ fileEncoding stderr
|
||||||
|
|
||||||
return $ DbHandle worker jobs
|
return $ DbHandle dbconcurrency worker jobs
|
||||||
|
|
||||||
{- This is optional; when the DbHandle gets garbage collected it will
|
{- This is optional; when the DbHandle gets garbage collected it will
|
||||||
- auto-close. -}
|
- auto-close. -}
|
||||||
closeDb :: DbHandle -> IO ()
|
closeDb :: DbHandle -> IO ()
|
||||||
closeDb (DbHandle worker jobs) = do
|
closeDb (DbHandle _ worker jobs) = do
|
||||||
putMVar jobs CloseJob
|
putMVar jobs CloseJob
|
||||||
wait worker
|
wait worker
|
||||||
|
|
||||||
|
@ -68,9 +91,12 @@ closeDb (DbHandle worker jobs) = do
|
||||||
- Only one action can be run at a time against a given DbHandle.
|
- Only one action can be run at a time against a given DbHandle.
|
||||||
- If called concurrently in the same process, this will block until
|
- If called concurrently in the same process, this will block until
|
||||||
- it is able to run.
|
- it is able to run.
|
||||||
|
-
|
||||||
|
- Note that when the DbHandle was opened in MultiWriter mode, recent
|
||||||
|
- writes may not be seen by queryDb.
|
||||||
-}
|
-}
|
||||||
queryDb :: DbHandle -> SqlPersistM a -> IO a
|
queryDb :: DbHandle -> SqlPersistM a -> IO a
|
||||||
queryDb (DbHandle _ jobs) a = do
|
queryDb (DbHandle _ _ jobs) a = do
|
||||||
res <- newEmptyMVar
|
res <- newEmptyMVar
|
||||||
putMVar jobs $ QueryJob $
|
putMVar jobs $ QueryJob $
|
||||||
liftIO . putMVar res =<< tryNonAsync a
|
liftIO . putMVar res =<< tryNonAsync a
|
||||||
|
@ -79,9 +105,9 @@ queryDb (DbHandle _ jobs) a = do
|
||||||
|
|
||||||
{- Writes a change to the database.
|
{- Writes a change to the database.
|
||||||
-
|
-
|
||||||
- If a database is opened multiple times and there's a concurrent writer,
|
- In MultiWriter mode, catches failure to write to the database,
|
||||||
- the write could fail. Retries repeatedly for up to 10 seconds,
|
- and retries repeatedly for up to 10 seconds, which should avoid
|
||||||
- which should avoid all but the most exceptional problems.
|
- all but the most exceptional problems.
|
||||||
-}
|
-}
|
||||||
commitDb :: DbHandle -> SqlPersistM () -> IO ()
|
commitDb :: DbHandle -> SqlPersistM () -> IO ()
|
||||||
commitDb h wa = robustly Nothing 100 (commitDb' h wa)
|
commitDb h wa = robustly Nothing 100 (commitDb' h wa)
|
||||||
|
@ -97,15 +123,22 @@ commitDb h wa = robustly Nothing 100 (commitDb' h wa)
|
||||||
robustly (Just e) (n-1) a
|
robustly (Just e) (n-1) a
|
||||||
|
|
||||||
commitDb' :: DbHandle -> SqlPersistM () -> IO (Either SomeException ())
|
commitDb' :: DbHandle -> SqlPersistM () -> IO (Either SomeException ())
|
||||||
commitDb' (DbHandle _ jobs) a = do
|
commitDb' (DbHandle MultiWriter _ jobs) a = do
|
||||||
res <- newEmptyMVar
|
res <- newEmptyMVar
|
||||||
putMVar jobs $ ChangeJob $ \runner ->
|
putMVar jobs $ RobustChangeJob $ \runner ->
|
||||||
liftIO $ putMVar res =<< tryNonAsync (runner a)
|
liftIO $ putMVar res =<< tryNonAsync (runner a)
|
||||||
takeMVar res
|
takeMVar res
|
||||||
|
commitDb' (DbHandle SingleWriter _ jobs) a = do
|
||||||
|
res <- newEmptyMVar
|
||||||
|
putMVar jobs $ ChangeJob $
|
||||||
|
liftIO . putMVar res =<< tryNonAsync a
|
||||||
|
takeMVar res
|
||||||
|
`catchNonAsync` (const $ error "sqlite commit crashed")
|
||||||
|
|
||||||
data Job
|
data Job
|
||||||
= QueryJob (SqlPersistM ())
|
= QueryJob (SqlPersistM ())
|
||||||
| ChangeJob ((SqlPersistM () -> IO ()) -> IO ())
|
| ChangeJob (SqlPersistM ())
|
||||||
|
| RobustChangeJob ((SqlPersistM () -> IO ()) -> IO ())
|
||||||
| CloseJob
|
| CloseJob
|
||||||
|
|
||||||
workerThread :: T.Text -> TableName -> MVar Job -> IO ()
|
workerThread :: T.Text -> TableName -> MVar Job -> IO ()
|
||||||
|
@ -127,10 +160,12 @@ workerThread db tablename jobs =
|
||||||
Left BlockedIndefinitelyOnMVar -> return ()
|
Left BlockedIndefinitelyOnMVar -> return ()
|
||||||
Right CloseJob -> return ()
|
Right CloseJob -> return ()
|
||||||
Right (QueryJob a) -> a >> loop
|
Right (QueryJob a) -> a >> loop
|
||||||
-- change is run in a separate database connection
|
Right (ChangeJob a) -> a >> loop
|
||||||
|
-- Change is run in a separate database connection
|
||||||
-- since sqlite only supports a single writer at a
|
-- since sqlite only supports a single writer at a
|
||||||
-- time, and it may crash the database connection
|
-- time, and it may crash the database connection
|
||||||
Right (ChangeJob a) -> liftIO (a (runSqliteRobustly tablename db)) >> loop
|
-- that the write is made to.
|
||||||
|
Right (RobustChangeJob a) -> liftIO (a (runSqliteRobustly tablename db)) >> loop
|
||||||
|
|
||||||
-- like runSqlite, but calls settle on the raw sql Connection.
|
-- like runSqlite, but calls settle on the raw sql Connection.
|
||||||
runSqliteRobustly :: TableName -> T.Text -> (SqlPersistM a) -> IO a
|
runSqliteRobustly :: TableName -> T.Text -> (SqlPersistM a) -> IO a
|
||||||
|
|
|
@ -124,7 +124,7 @@ openDb createdb _ = catchPermissionDenied permerr $ withExclusiveLock gitAnnexKe
|
||||||
open db
|
open db
|
||||||
(False, False) -> return DbUnavailable
|
(False, False) -> return DbUnavailable
|
||||||
where
|
where
|
||||||
open db = liftIO $ DbOpen <$> H.openDbQueue db SQL.containedTable
|
open db = liftIO $ DbOpen <$> H.openDbQueue H.MultiWriter db SQL.containedTable
|
||||||
-- If permissions don't allow opening the database, treat it as if
|
-- If permissions don't allow opening the database, treat it as if
|
||||||
-- it does not exist.
|
-- it does not exist.
|
||||||
permerr e = case createdb of
|
permerr e = case createdb of
|
||||||
|
|
|
@ -9,6 +9,7 @@
|
||||||
|
|
||||||
module Database.Queue (
|
module Database.Queue (
|
||||||
DbQueue,
|
DbQueue,
|
||||||
|
DbConcurrency(..),
|
||||||
openDbQueue,
|
openDbQueue,
|
||||||
queryDbQueue,
|
queryDbQueue,
|
||||||
closeDbQueue,
|
closeDbQueue,
|
||||||
|
@ -35,9 +36,9 @@ data DbQueue = DQ DbHandle (MVar Queue)
|
||||||
{- Opens the database queue, but does not perform any migrations. Only use
|
{- Opens the database queue, but does not perform any migrations. Only use
|
||||||
- if the database is known to exist and have the right tables; ie after
|
- if the database is known to exist and have the right tables; ie after
|
||||||
- running initDb. -}
|
- running initDb. -}
|
||||||
openDbQueue :: FilePath -> TableName -> IO DbQueue
|
openDbQueue :: DbConcurrency -> FilePath -> TableName -> IO DbQueue
|
||||||
openDbQueue db tablename = DQ
|
openDbQueue dbconcurrency db tablename = DQ
|
||||||
<$> openDb db tablename
|
<$> openDb dbconcurrency db tablename
|
||||||
<*> (newMVar =<< emptyQueue)
|
<*> (newMVar =<< emptyQueue)
|
||||||
|
|
||||||
{- This or flushDbQueue must be called, eg at program exit to ensure
|
{- This or flushDbQueue must be called, eg at program exit to ensure
|
||||||
|
@ -60,8 +61,11 @@ flushDbQueue (DQ hdl qvar) = do
|
||||||
{- Makes a query using the DbQueue's database connection.
|
{- Makes a query using the DbQueue's database connection.
|
||||||
- This should not be used to make changes to the database!
|
- This should not be used to make changes to the database!
|
||||||
-
|
-
|
||||||
- Queries will not return changes that have been recently queued,
|
- Queries will not see changes that have been recently queued,
|
||||||
- so use with care.
|
- so use with care.
|
||||||
|
-
|
||||||
|
- Also, when the database was opened in MultiWriter mode,
|
||||||
|
- queries may not see changes even after flushDbQueue.
|
||||||
-}
|
-}
|
||||||
queryDbQueue :: DbQueue -> SqlPersistM a -> IO a
|
queryDbQueue :: DbQueue -> SqlPersistM a -> IO a
|
||||||
queryDbQueue (DQ hdl _) = queryDb hdl
|
queryDbQueue (DQ hdl _) = queryDb hdl
|
||||||
|
|
|
@ -273,6 +273,7 @@ of a file when an export was interrupted.)
|
||||||
|
|
||||||
So, before an export does anything, need to record the tree that's about
|
So, before an export does anything, need to record the tree that's about
|
||||||
to be exported to export.log, not as an exported tree, but as a goal.
|
to be exported to export.log, not as an exported tree, but as a goal.
|
||||||
|
Then on resume, the temp files for that can be cleaned up.
|
||||||
|
|
||||||
## renames and export conflicts
|
## renames and export conflicts
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,8 @@ Work is in progress. Todo list:
|
||||||
export from another repository also doesn't work right, because the
|
export from another repository also doesn't work right, because the
|
||||||
export database is not populated. So, seems that the export database needs
|
export database is not populated. So, seems that the export database needs
|
||||||
to get populated based on the export log in these cases.
|
to get populated based on the export log in these cases.
|
||||||
* Efficient handling of renames.
|
* Currently all modified/deleted files are renamed to temp files,
|
||||||
|
even when they won't be used. Avoid doing this unless the
|
||||||
|
temp file will be renamed to the new filename.
|
||||||
* Support export to aditional special remotes (S3 etc)
|
* Support export to aditional special remotes (S3 etc)
|
||||||
* Support export to external special remotes.
|
* Support export to external special remotes.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue