git-annex/Database/Queue.hs
Joey Hess f5b642318d
eliminate single/multi writer distinction
After commit f4bdecc4ec, there is no
longer any distinction between SingleWriter and MultiWriter's handling
of read after write.

Databases that were SingleWriter still have lock files that are used to
prevent multiple writers.

This does make writing to such databases a bit more expensive,
because the MultiWriter code path that is now used opens a second db
connection in order to write to them.
2021-10-20 12:26:30 -04:00

109 lines
3 KiB
Haskell

{- Persistent sqlite database queues
-
- Copyright 2015 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
{-# LANGUAGE BangPatterns #-}
module Database.Queue (
DbQueue,
openDbQueue,
queryDbQueue,
closeDbQueue,
flushDbQueue,
QueueSize,
queueDb,
) where
import Utility.Monad
import Utility.RawFilePath
import Database.Handle
import Database.Persist.Sqlite
import Control.Concurrent
import Data.Time.Clock
import Control.Applicative
import Prelude
{- A DbQueue wraps a DbHandle, adding a queue of writes to perform.
-
- This is efficient when there are frequent writes, but
- reads will not immediately have access to queued writes. -}
data DbQueue = DQ DbHandle (MVar Queue)
{- Opens the database queue, but does not perform any migrations. Only use
- if the database is known to exist and have the right tables; ie after
- running initDb. -}
openDbQueue :: RawFilePath -> TableName -> IO DbQueue
openDbQueue db tablename = DQ
<$> openDb db tablename
<*> (newMVar =<< emptyQueue)
{- This or flushDbQueue must be called, eg at program exit to ensure
- queued changes get written to the database. -}
closeDbQueue :: DbQueue -> IO ()
closeDbQueue h@(DQ hdl _) = do
flushDbQueue h
closeDb hdl
{- Blocks until all queued changes have been written to the database. -}
flushDbQueue :: DbQueue -> IO ()
flushDbQueue (DQ hdl qvar) = do
q@(Queue sz _ qa) <- takeMVar qvar
if sz > 0
then do
commitDb hdl qa
putMVar qvar =<< emptyQueue
else putMVar qvar q
{- Makes a query using the DbQueue's database connection.
- This should not be used to make changes to the database!
-
- Queries will not see changes that have been recently queued,
- so use with care.
-}
queryDbQueue :: DbQueue -> SqlPersistM a -> IO a
queryDbQueue (DQ hdl _) = queryDb hdl
{- A queue of actions to perform, with a count of the number of actions
- queued, and a last commit time. -}
data Queue = Queue QueueSize LastCommitTime (SqlPersistM ())
type QueueSize = Int
type LastCommitTime = UTCTime
emptyQueue :: IO Queue
emptyQueue = do
now <- getCurrentTime
return $ Queue 0 now (return ())
{- Queues a change to be made to the database. It will be queued
- to be committed later, unless the commitchecker action returns true,
- in which case any previously queued changes are also committed.
-
- Transactions built up by queueDb are sent to sqlite all at once.
- If sqlite fails due to another change being made concurrently by another
- process, the transaction is put back in the queue. This avoids
- the sqlite multiple writer problem.
-}
queueDb
:: DbQueue
-> (QueueSize -> LastCommitTime -> IO Bool)
-> SqlPersistM ()
-> IO ()
queueDb (DQ hdl qvar) commitchecker a = do
Queue sz lastcommittime qa <- takeMVar qvar
let !sz' = sz + 1
let qa' = qa >> a
let enqueue = putMVar qvar
ifM (commitchecker sz' lastcommittime)
( do
r <- commitDb' hdl qa'
case r of
Left _ -> enqueue $ Queue sz' lastcommittime qa'
Right _ -> enqueue =<< emptyQueue
, enqueue $ Queue sz' lastcommittime qa'
)