The database queue was left empty, which caused subsequent calls to flushDbQueue to deadlock. Sponsored-by: Dartmouth College's Datalad project
		
			
				
	
	
		
			113 lines
		
	
	
	
		
			3.2 KiB
			
		
	
	
	
		
			Haskell
		
	
	
	
	
	
			
		
		
	
	
			113 lines
		
	
	
	
		
			3.2 KiB
			
		
	
	
	
		
			Haskell
		
	
	
	
	
	
{- Persistent sqlite database queues
 | 
						|
 -
 | 
						|
 - Copyright 2015-2022 Joey Hess <id@joeyh.name>
 | 
						|
 -
 | 
						|
 - Licensed under the GNU AGPL version 3 or higher.
 | 
						|
 -}
 | 
						|
 | 
						|
{-# LANGUAGE BangPatterns #-}
 | 
						|
 | 
						|
module Database.Queue (
 | 
						|
	DbQueue,
 | 
						|
	openDbQueue,
 | 
						|
	queryDbQueue,
 | 
						|
	closeDbQueue,
 | 
						|
	flushDbQueue,
 | 
						|
	QueueSize,
 | 
						|
	queueDb,
 | 
						|
) where
 | 
						|
 | 
						|
import Utility.Monad
 | 
						|
import Utility.RawFilePath
 | 
						|
import Utility.DebugLocks
 | 
						|
import Utility.Exception
 | 
						|
import Database.Handle
 | 
						|
 | 
						|
import Database.Persist.Sqlite
 | 
						|
import Control.Concurrent
 | 
						|
import Data.Time.Clock
 | 
						|
import Control.Applicative
 | 
						|
import Prelude
 | 
						|
 | 
						|
{- A DbQueue wraps a DbHandle, adding a queue of writes to perform.
 | 
						|
 -
 | 
						|
 - This is efficient when there are frequent writes, but
 | 
						|
 - reads will not immediately have access to queued writes. -}
 | 
						|
data DbQueue = DQ DbHandle (MVar Queue)
 | 
						|
 | 
						|
{- Opens the database queue, but does not perform any migrations. Only use
 | 
						|
 - if the database is known to exist and have the right tables; ie after
 | 
						|
 - running initDb. -}
 | 
						|
openDbQueue :: RawFilePath -> TableName -> IO DbQueue
 | 
						|
openDbQueue db tablename = DQ
 | 
						|
	<$> openDb db tablename
 | 
						|
	<*> (newMVar =<< emptyQueue)
 | 
						|
 | 
						|
{- This or flushDbQueue must be called, eg at program exit to ensure
 | 
						|
 - queued changes get written to the database. -}
 | 
						|
closeDbQueue :: DbQueue -> IO ()
 | 
						|
closeDbQueue h@(DQ hdl _) = do
 | 
						|
	flushDbQueue h
 | 
						|
	closeDb hdl
 | 
						|
 | 
						|
{- Blocks until all queued changes have been written to the database. -}
 | 
						|
flushDbQueue :: DbQueue -> IO ()
 | 
						|
flushDbQueue (DQ hdl qvar) = do
 | 
						|
	q@(Queue sz _ qa) <- debugLocks $ takeMVar qvar
 | 
						|
	if sz > 0
 | 
						|
		then tryNonAsync (commitDb hdl qa) >>= \case
 | 
						|
			Right () -> debugLocks $ putMVar qvar =<< emptyQueue
 | 
						|
			Left e -> do
 | 
						|
				debugLocks $ putMVar qvar q
 | 
						|
				throwM e
 | 
						|
		else debugLocks $ putMVar qvar q
 | 
						|
 | 
						|
{- Makes a query using the DbQueue's database connection.
 | 
						|
 - This should not be used to make changes to the database!
 | 
						|
 -
 | 
						|
 - Queries will not see changes that have been recently queued,
 | 
						|
 - so use with care.
 | 
						|
 -}
 | 
						|
queryDbQueue :: DbQueue -> SqlPersistM a -> IO a
 | 
						|
queryDbQueue (DQ hdl _) = queryDb hdl
 | 
						|
 | 
						|
{- A queue of actions to perform, with a count of the number of actions
 | 
						|
 - queued, and a last commit time. -}
 | 
						|
data Queue = Queue QueueSize LastCommitTime (SqlPersistM ())
 | 
						|
 | 
						|
type QueueSize = Int
 | 
						|
 | 
						|
type LastCommitTime = UTCTime
 | 
						|
 | 
						|
emptyQueue :: IO Queue
 | 
						|
emptyQueue = do
 | 
						|
	now <- getCurrentTime
 | 
						|
	return $ Queue 0 now (return ())
 | 
						|
 | 
						|
{- Queues a change to be made to the database. It will be queued
 | 
						|
 - to be committed later, unless the commitchecker action returns true,
 | 
						|
 - in which case any previously queued changes are also committed.
 | 
						|
 -
 | 
						|
 - Transactions built up by queueDb are sent to sqlite all at once.
 | 
						|
 - If sqlite fails due to another change being made concurrently by another
 | 
						|
 - process, the transaction is put back in the queue. This avoids
 | 
						|
 - the sqlite multiple writer problem.
 | 
						|
 -}
 | 
						|
queueDb 
 | 
						|
	:: DbQueue
 | 
						|
	-> (QueueSize -> LastCommitTime -> IO Bool) 
 | 
						|
	-> SqlPersistM ()
 | 
						|
	-> IO ()
 | 
						|
queueDb (DQ hdl qvar) commitchecker a = do
 | 
						|
	Queue sz lastcommittime qa <- debugLocks $ takeMVar qvar
 | 
						|
	let !sz' = sz + 1
 | 
						|
	let qa' = qa >> a
 | 
						|
	let enqueue = debugLocks . putMVar qvar
 | 
						|
	ifM (commitchecker sz' lastcommittime)
 | 
						|
		( do
 | 
						|
			r <- commitDb' hdl qa'
 | 
						|
			case r of
 | 
						|
				Left _ -> enqueue $ Queue sz' lastcommittime qa'
 | 
						|
				Right _ -> enqueue =<< emptyQueue
 | 
						|
		, enqueue $ Queue sz' lastcommittime qa'
 | 
						|
		)
 |