6ab14710fc
The export database has writes made to it and then expects to read back the same data immediately. But, the way that Database.Handle does writes, in order to support multiple writers, makes that not work, due to caching issues. This resulted in export re-uploading files it had already successfully renamed into place. Fixed by allowing databases to be opened in MultiWriter or SingleWriter mode. The export database only needs to support a single writer; it does not make sense for multiple exports to run at the same time to the same special remote. All other databases still use MultiWriter mode. And by inspection, nothing else in git-annex seems to be relying on being able to immediately query for changes that were just written to the database. This commit was supported by the NSF-funded DataLad project.
112 lines
3.1 KiB
Haskell
112 lines
3.1 KiB
Haskell
{- Persistent sqlite database queues
|
|
-
|
|
- Copyright 2015 Joey Hess <id@joeyh.name>
|
|
-
|
|
- Licensed under the GNU GPL version 3 or higher.
|
|
-}
|
|
|
|
{-# LANGUAGE BangPatterns #-}
|
|
|
|
module Database.Queue (
|
|
DbQueue,
|
|
DbConcurrency(..),
|
|
openDbQueue,
|
|
queryDbQueue,
|
|
closeDbQueue,
|
|
flushDbQueue,
|
|
QueueSize,
|
|
queueDb,
|
|
) where
|
|
|
|
import Utility.Monad
|
|
import Database.Handle
|
|
|
|
import Database.Persist.Sqlite
|
|
import Control.Concurrent
|
|
import Data.Time.Clock
|
|
import Control.Applicative
|
|
import Prelude
|
|
|
|
{- A DbQueue wraps a DbHandle, adding a queue of writes to perform.
|
|
-
|
|
- This is efficient when there are frequent writes, but
|
|
- reads will not immediately have access to queued writes. -}
|
|
data DbQueue = DQ DbHandle (MVar Queue)
|
|
|
|
{- Opens the database queue, but does not perform any migrations. Only use
|
|
- if the database is known to exist and have the right tables; ie after
|
|
- running initDb. -}
|
|
openDbQueue :: DbConcurrency -> FilePath -> TableName -> IO DbQueue
|
|
openDbQueue dbconcurrency db tablename = DQ
|
|
<$> openDb dbconcurrency db tablename
|
|
<*> (newMVar =<< emptyQueue)
|
|
|
|
{- This or flushDbQueue must be called, eg at program exit to ensure
|
|
- queued changes get written to the database. -}
|
|
closeDbQueue :: DbQueue -> IO ()
|
|
closeDbQueue h@(DQ hdl _) = do
|
|
flushDbQueue h
|
|
closeDb hdl
|
|
|
|
{- Blocks until all queued changes have been written to the database. -}
|
|
flushDbQueue :: DbQueue -> IO ()
|
|
flushDbQueue (DQ hdl qvar) = do
|
|
q@(Queue sz _ qa) <- takeMVar qvar
|
|
if sz > 0
|
|
then do
|
|
commitDb hdl qa
|
|
putMVar qvar =<< emptyQueue
|
|
else putMVar qvar q
|
|
|
|
{- Makes a query using the DbQueue's database connection.
|
|
- This should not be used to make changes to the database!
|
|
-
|
|
- Queries will not see changes that have been recently queued,
|
|
- so use with care.
|
|
-
|
|
- Also, when the database was opened in MultiWriter mode,
|
|
- queries may not see changes even after flushDbQueue.
|
|
-}
|
|
queryDbQueue :: DbQueue -> SqlPersistM a -> IO a
|
|
queryDbQueue (DQ hdl _) = queryDb hdl
|
|
|
|
{- A queue of actions to perform, with a count of the number of actions
|
|
- queued, and a last commit time. -}
|
|
data Queue = Queue QueueSize LastCommitTime (SqlPersistM ())
|
|
|
|
type QueueSize = Int
|
|
|
|
type LastCommitTime = UTCTime
|
|
|
|
emptyQueue :: IO Queue
|
|
emptyQueue = do
|
|
now <- getCurrentTime
|
|
return $ Queue 0 now (return ())
|
|
|
|
{- Queues a change to be made to the database. It will be queued
|
|
- to be committed later, unless the commitchecker action returns true,
|
|
- in which case any previously queued changes are also committed.
|
|
-
|
|
- Transactions built up by queueDb are sent to sqlite all at once.
|
|
- If sqlite fails due to another change being made concurrently by another
|
|
- process, the transaction is put back in the queue. This avoids
|
|
- the sqlite multiple writer problem.
|
|
-}
|
|
queueDb
|
|
:: DbQueue
|
|
-> (QueueSize -> LastCommitTime -> IO Bool)
|
|
-> SqlPersistM ()
|
|
-> IO ()
|
|
queueDb (DQ hdl qvar) commitchecker a = do
|
|
Queue sz lastcommittime qa <- takeMVar qvar
|
|
let !sz' = sz + 1
|
|
let qa' = qa >> a
|
|
let enqueue = putMVar qvar
|
|
ifM (commitchecker sz' lastcommittime)
|
|
( do
|
|
r <- commitDb' hdl qa'
|
|
case r of
|
|
Left _ -> enqueue $ Queue sz' lastcommittime qa'
|
|
Right _ -> enqueue =<< emptyQueue
|
|
, enqueue $ Queue sz' lastcommittime qa'
|
|
)
|