 40ecf58d4b
			
		
	
	
	
	
	40ecf58d4bThis does not change the overall license of the git-annex program, which was already AGPL due to a number of sources files being AGPL already. Legally speaking, I'm adding a new license under which these files are now available; I already released their current contents under the GPL license. Now they're dual licensed GPL and AGPL. However, I intend for all my future changes to these files to only be released under the AGPL license, and I won't be tracking the dual licensing status, so I'm simply changing the license statement to say it's AGPL. (In some cases, others wrote parts of the code of a file and released it under the GPL; but in all cases I have contributed a significant portion of the code in each file and it's that code that is getting the AGPL license; the GPL license of other contributors allows combining with AGPL code.)
		
			
				
	
	
		
			112 lines
		
	
	
	
		
			3.1 KiB
			
		
	
	
	
		
			Haskell
		
	
	
	
	
	
			
		
		
	
	
			112 lines
		
	
	
	
		
			3.1 KiB
			
		
	
	
	
		
			Haskell
		
	
	
	
	
	
| {- Persistent sqlite database queues
 | |
|  -
 | |
|  - Copyright 2015 Joey Hess <id@joeyh.name>
 | |
|  -
 | |
|  - Licensed under the GNU AGPL version 3 or higher.
 | |
|  -}
 | |
| 
 | |
| {-# LANGUAGE BangPatterns #-}
 | |
| 
 | |
| module Database.Queue (
 | |
| 	DbQueue,
 | |
| 	DbConcurrency(..),
 | |
| 	openDbQueue,
 | |
| 	queryDbQueue,
 | |
| 	closeDbQueue,
 | |
| 	flushDbQueue,
 | |
| 	QueueSize,
 | |
| 	queueDb,
 | |
| ) where
 | |
| 
 | |
| import Utility.Monad
 | |
| import Database.Handle
 | |
| 
 | |
| import Database.Persist.Sqlite
 | |
| import Control.Concurrent
 | |
| import Data.Time.Clock
 | |
| import Control.Applicative
 | |
| import Prelude
 | |
| 
 | |
| {- A DbQueue wraps a DbHandle, adding a queue of writes to perform.
 | |
|  -
 | |
|  - This is efficient when there are frequent writes, but
 | |
|  - reads will not immediately have access to queued writes. -}
 | |
| data DbQueue = DQ DbHandle (MVar Queue)
 | |
| 
 | |
| {- Opens the database queue, but does not perform any migrations. Only use
 | |
|  - if the database is known to exist and have the right tables; ie after
 | |
|  - running initDb. -}
 | |
| openDbQueue :: DbConcurrency -> FilePath -> TableName -> IO DbQueue
 | |
| openDbQueue dbconcurrency db tablename = DQ
 | |
| 	<$> openDb dbconcurrency db tablename
 | |
| 	<*> (newMVar =<< emptyQueue)
 | |
| 
 | |
| {- This or flushDbQueue must be called, eg at program exit to ensure
 | |
|  - queued changes get written to the database. -}
 | |
| closeDbQueue :: DbQueue -> IO ()
 | |
| closeDbQueue h@(DQ hdl _) = do
 | |
| 	flushDbQueue h
 | |
| 	closeDb hdl
 | |
| 
 | |
| {- Blocks until all queued changes have been written to the database. -}
 | |
| flushDbQueue :: DbQueue -> IO ()
 | |
| flushDbQueue (DQ hdl qvar) = do
 | |
| 	q@(Queue sz _ qa) <- takeMVar qvar
 | |
| 	if sz > 0
 | |
| 		then do
 | |
| 			commitDb hdl qa
 | |
| 			putMVar qvar =<< emptyQueue
 | |
| 		else putMVar qvar q
 | |
| 
 | |
| {- Makes a query using the DbQueue's database connection.
 | |
|  - This should not be used to make changes to the database!
 | |
|  -
 | |
|  - Queries will not see changes that have been recently queued,
 | |
|  - so use with care.
 | |
|  -
 | |
|  - Also, when the database was opened in MultiWriter mode,
 | |
|  - queries may not see changes even after flushDbQueue.
 | |
|  -}
 | |
| queryDbQueue :: DbQueue -> SqlPersistM a -> IO a
 | |
| queryDbQueue (DQ hdl _) = queryDb hdl
 | |
| 
 | |
| {- A queue of actions to perform, with a count of the number of actions
 | |
|  - queued, and a last commit time. -}
 | |
| data Queue = Queue QueueSize LastCommitTime (SqlPersistM ())
 | |
| 
 | |
| type QueueSize = Int
 | |
| 
 | |
| type LastCommitTime = UTCTime
 | |
| 
 | |
| emptyQueue :: IO Queue
 | |
| emptyQueue = do
 | |
| 	now <- getCurrentTime
 | |
| 	return $ Queue 0 now (return ())
 | |
| 
 | |
| {- Queues a change to be made to the database. It will be queued
 | |
|  - to be committed later, unless the commitchecker action returns true,
 | |
|  - in which case any previously queued changes are also committed.
 | |
|  -
 | |
|  - Transactions built up by queueDb are sent to sqlite all at once.
 | |
|  - If sqlite fails due to another change being made concurrently by another
 | |
|  - process, the transaction is put back in the queue. This avoids
 | |
|  - the sqlite multiple writer problem.
 | |
|  -}
 | |
| queueDb 
 | |
| 	:: DbQueue
 | |
| 	-> (QueueSize -> LastCommitTime -> IO Bool) 
 | |
| 	-> SqlPersistM ()
 | |
| 	-> IO ()
 | |
| queueDb (DQ hdl qvar) commitchecker a = do
 | |
| 	Queue sz lastcommittime qa <- takeMVar qvar
 | |
| 	let !sz' = sz + 1
 | |
| 	let qa' = qa >> a
 | |
| 	let enqueue = putMVar qvar
 | |
| 	ifM (commitchecker sz' lastcommittime)
 | |
| 		( do
 | |
| 			r <- commitDb' hdl qa'
 | |
| 			case r of
 | |
| 				Left _ -> enqueue $ Queue sz' lastcommittime qa'
 | |
| 				Right _ -> enqueue =<< emptyQueue
 | |
| 		, enqueue $ Queue sz' lastcommittime qa'
 | |
| 		)
 |