more robust handling of deferred commits

Still not robust enough. I have 3 fscks running concurrently, and am
seeing:

("commit deferred",user error (SQLite3 returned ErrorBusy while attempting
to perform step.))

and

git-annex: user error (SQLite3 returned ErrorBusy while attempting to perform prepare "SELECT \"fscked\".\"key\"\nFROM \"fscked\"\nWHERE \"fscked\".\"key\" = ?\n": database is locked)
This commit is contained in:
Joey Hess 2015-02-18 14:11:27 -04:00
parent 39a82a76c1
commit 17cb219231
2 changed files with 58 additions and 46 deletions

View file

@ -65,8 +65,8 @@ openDb u = do
unlessM (liftIO $ doesFileExist db) $ do
let newdb = db ++ ".new"
h <- liftIO $ H.openDb newdb
void $ liftIO $ H.runDb h $
runMigrationSilent migrateFsck
void $ liftIO $ H.commitDb h $
void $ runMigrationSilent migrateFsck
liftIO $ H.closeDb h
setAnnexFilePerm newdb
liftIO $ renameFile newdb db
@ -87,7 +87,7 @@ addDb (FsckHandle h _) k = H.queueDb h 1000 $
sk = toSKey k
inDb :: FsckHandle -> Key -> IO Bool
inDb (FsckHandle h _) = H.runDb h . inDb' . toSKey
inDb (FsckHandle h _) = H.queryDb h . inDb' . toSKey
inDb' :: SKey -> SqlPersistM Bool
inDb' sk = do

View file

@ -10,12 +10,12 @@
module Database.Handle (
DbHandle,
openDb,
runDb,
commitDb,
queryDb,
closeDb,
Size,
queueDb,
flushQueueDb,
commitDb,
) where
import Utility.Exception
@ -33,8 +33,6 @@ import qualified Data.Text as T
- the database. It has a MVar which Jobs are submitted to. -}
data DbHandle = DbHandle (Async ()) (MVar Job) (MVar DbQueue)
data Job = RunJob (SqlPersistM ()) | CommitJob | CloseJob
openDb :: FilePath -> IO DbHandle
openDb db = do
jobs <- newEmptyMVar
@ -42,25 +40,34 @@ openDb db = do
q <- newMVar emptyDbQueue
return $ DbHandle worker jobs q
data Job
= QueryJob (SqlPersistM ())
| ChangeJob ((SqlPersistM () -> IO ()) -> IO ())
| CloseJob
workerThread :: T.Text -> MVar Job -> IO ()
workerThread db jobs = catchNonAsync go showerr
workerThread db jobs = catchNonAsync loop showerr
where
showerr e = liftIO $ warningIO $ "sqlite worker thread crashed: " ++ show e
go = do
r <- runSqlite db transaction
showerr e = liftIO $ warningIO $
"sqlite worker thread crashed: " ++ show e
run = runSqlite db
loop = do
r <- run queryloop
case r of
QueryJob _ -> loop
-- change is run in a separate database connection
-- since sqlite only supports a single writer at a
-- time, and it may crash the database connection
ChangeJob a -> a run >> loop
CloseJob -> return ()
_ -> go
transaction = do
queryloop = do
job <- liftIO $ takeMVar jobs
case job of
RunJob a -> a >> transaction
CommitJob -> return CommitJob
CloseJob -> return CloseJob
QueryJob a -> a >> queryloop
_ -> return job
{- Runs an action using the DbHandle. The action may be a query, or it may
- make a change. Changes are bundled up in a transaction, which does not
- complete until commitDb is called.
{- Makes a query using the DbHandle. This should not be used to make
- changes to the database!
-
- Note that the action is not run by the calling thread, but by a
- worker thread. Exceptions are propigated to the calling thread.
@ -68,23 +75,14 @@ workerThread db jobs = catchNonAsync go showerr
- Only one action can be run at a time against a given DbHandle.
- If called concurrently in the same process, this will block until
- it is able to run.
-
- Note that if multiple processes are trying to change the database
- at the same time, sqlite will only let one build a transaction at a
- time.
-}
runDb :: DbHandle -> SqlPersistM a -> IO a
runDb (DbHandle _ jobs _) a = do
queryDb :: DbHandle -> SqlPersistM a -> IO a
queryDb (DbHandle _ jobs _) a = do
res <- newEmptyMVar
putMVar jobs $ RunJob $
putMVar jobs $ QueryJob $
liftIO . putMVar res =<< tryNonAsync a
either throwIO return =<< takeMVar res
{- Commits any transaction that was created by the previous calls to runDb,
- and starts a new transaction. -}
commitDb :: DbHandle -> IO ()
commitDb (DbHandle _ jobs _) = putMVar jobs CommitJob
closeDb :: DbHandle -> IO ()
closeDb h@(DbHandle worker jobs _) = do
flushQueueDb h
@ -100,11 +98,12 @@ data DbQueue = DbQueue Size (SqlPersistM ())
emptyDbQueue :: DbQueue
emptyDbQueue = DbQueue 0 (return ())
{- Queues a change to be committed to the database. It will be buffered
{- Queues a change to be made to the database. It will be buffered
- to be committed later, unless the queue gets larger than the specified
- size.
-
- (Be sure to call closeDb or flushQueue to ensure the change gets committed.)
- (Be sure to call closeDb or flushQueueDb to ensure the change
- gets committed.)
-
- Transactions built up by queueDb are sent to sqlite all at once.
- If sqlite fails due to another change being made concurrently by another
@ -116,16 +115,18 @@ queueDb h@(DbHandle _ _ qvar) maxsz a = do
DbQueue sz qa <- takeMVar qvar
let !sz' = sz + 1
let qa' = qa >> a
let enqueue = putMVar qvar (DbQueue sz' qa')
let enqueue newsz = putMVar qvar (DbQueue newsz qa')
if sz' > maxsz
then do
r <- tryNonAsync $ do
runDb h qa'
commitDb h
r <- commitDb h qa'
case r of
Left _ -> enqueue
Right _ -> putMVar qvar emptyDbQueue
else enqueue
Left e -> do
print ("commit deferred", e)
enqueue 0
Right _ -> do
print "commit made"
putMVar qvar emptyDbQueue
else enqueue sz'
{- If flushing the queue fails, this could be because there is another
- writer to the database. Retry repeatedly for up to 10 seconds. -}
@ -133,10 +134,21 @@ flushQueueDb :: DbHandle -> IO ()
flushQueueDb h@(DbHandle _ _ qvar) = do
DbQueue sz qa <- takeMVar qvar
when (sz > 0) $
robustly 100 $ runDb h qa
robustly Nothing 100 (commitDb h qa)
where
robustly :: Int -> IO () -> IO ()
robustly 0 _ = error "failed to commit changes to sqlite database"
robustly n a = catchNonAsync a $ \_ -> do
threadDelay 100000 -- 1/10th second
robustly (n-1) a
robustly :: Maybe SomeException -> Int -> IO (Either SomeException ()) -> IO ()
robustly e 0 _ = error $ "failed to commit changes to sqlite database: " ++ show e
robustly _ n a = do
r <- a
case r of
Right _ -> return ()
Left e -> do
threadDelay 100000 -- 1/10th second
robustly (Just e) (n-1) a
commitDb :: DbHandle -> SqlPersistM () -> IO (Either SomeException ())
commitDb (DbHandle _ jobs _) a = do
res <- newEmptyMVar
putMVar jobs $ ChangeJob $ \runner ->
liftIO $ putMVar res =<< tryNonAsync (runner a)
takeMVar res