complete work around for sqlite SELECT ErrorBusy on new connection bug
This commit is contained in:
parent
b541a5e38b
commit
bf80a16c2e
3 changed files with 63 additions and 36 deletions
|
@ -72,16 +72,15 @@ openDb u = do
|
||||||
let tmpdb = tmpdbdir </> "db"
|
let tmpdb = tmpdbdir </> "db"
|
||||||
liftIO $ do
|
liftIO $ do
|
||||||
createDirectoryIfMissing True tmpdbdir
|
createDirectoryIfMissing True tmpdbdir
|
||||||
h <- H.initDb tmpdb $ void $
|
H.initDb tmpdb $ void $
|
||||||
runMigrationSilent migrateFsck
|
runMigrationSilent migrateFsck
|
||||||
H.closeDb h
|
|
||||||
setAnnexDirPerm tmpdbdir
|
setAnnexDirPerm tmpdbdir
|
||||||
setAnnexFilePerm tmpdb
|
setAnnexFilePerm tmpdb
|
||||||
liftIO $ do
|
liftIO $ do
|
||||||
void $ tryIO $ removeDirectoryRecursive dbdir
|
void $ tryIO $ removeDirectoryRecursive dbdir
|
||||||
rename tmpdbdir dbdir
|
rename tmpdbdir dbdir
|
||||||
lockFileShared =<< fromRepo (gitAnnexFsckDbLock u)
|
lockFileShared =<< fromRepo (gitAnnexFsckDbLock u)
|
||||||
h <- liftIO $ H.openDb db
|
h <- liftIO $ H.openDb db "fscked"
|
||||||
return $ FsckHandle h u
|
return $ FsckHandle h u
|
||||||
|
|
||||||
closeDb :: FsckHandle -> Annex ()
|
closeDb :: FsckHandle -> Annex ()
|
||||||
|
@ -90,14 +89,13 @@ closeDb (FsckHandle h u) = do
|
||||||
unlockFile =<< fromRepo (gitAnnexFsckDbLock u)
|
unlockFile =<< fromRepo (gitAnnexFsckDbLock u)
|
||||||
|
|
||||||
addDb :: FsckHandle -> Key -> IO ()
|
addDb :: FsckHandle -> Key -> IO ()
|
||||||
addDb (FsckHandle h _) k = H.queueDb h 1000 $
|
addDb (FsckHandle h _) k = H.queueDb h 1000 $
|
||||||
unlessM (inDb' sk) $
|
void $ insertUnique $ Fscked sk
|
||||||
insert_ $ Fscked sk
|
|
||||||
where
|
where
|
||||||
sk = toSKey k
|
sk = toSKey k
|
||||||
|
|
||||||
inDb :: FsckHandle -> Key -> IO Bool
|
inDb :: FsckHandle -> Key -> IO Bool
|
||||||
inDb (FsckHandle h _) = H.queryDb h False . inDb' . toSKey
|
inDb (FsckHandle h _) = H.queryDb h . inDb' . toSKey
|
||||||
|
|
||||||
inDb' :: SKey -> SqlPersistM Bool
|
inDb' :: SKey -> SqlPersistM Bool
|
||||||
inDb' sk = do
|
inDb' sk = do
|
||||||
|
|
|
@ -30,6 +30,9 @@ import Control.Concurrent
|
||||||
import Control.Concurrent.Async
|
import Control.Concurrent.Async
|
||||||
import Control.Exception (throwIO)
|
import Control.Exception (throwIO)
|
||||||
import qualified Data.Text as T
|
import qualified Data.Text as T
|
||||||
|
import Control.Monad.Trans.Resource (runResourceT)
|
||||||
|
import Control.Monad.Logger (runNoLoggingT)
|
||||||
|
import Data.List
|
||||||
|
|
||||||
{- A DbHandle is a reference to a worker thread that communicates with
|
{- A DbHandle is a reference to a worker thread that communicates with
|
||||||
- the database. It has a MVar which Jobs are submitted to. -}
|
- the database. It has a MVar which Jobs are submitted to. -}
|
||||||
|
@ -41,16 +44,15 @@ data DbHandle = DbHandle (Async ()) (MVar Job) (MVar DbQueue)
|
||||||
- The database is put into WAL mode, to prevent readers from blocking
|
- The database is put into WAL mode, to prevent readers from blocking
|
||||||
- writers, and prevent a writer from blocking readers.
|
- writers, and prevent a writer from blocking readers.
|
||||||
-}
|
-}
|
||||||
initDb :: FilePath -> SqlPersistM () -> IO DbHandle
|
initDb :: FilePath -> SqlPersistM () -> IO ()
|
||||||
initDb db migration = do
|
initDb f migration = do
|
||||||
|
let db = T.pack f
|
||||||
enableWAL db
|
enableWAL db
|
||||||
h <- openDb db
|
runSqlite db migration
|
||||||
either throwIO (const $ return ()) =<< commitDb h migration
|
|
||||||
return h
|
enableWAL :: T.Text -> IO ()
|
||||||
|
|
||||||
enableWAL :: FilePath -> IO ()
|
|
||||||
enableWAL db = do
|
enableWAL db = do
|
||||||
conn <- Sqlite.open (T.pack db)
|
conn <- Sqlite.open db
|
||||||
stmt <- Sqlite.prepare conn (T.pack "PRAGMA journal_mode=WAL;")
|
stmt <- Sqlite.prepare conn (T.pack "PRAGMA journal_mode=WAL;")
|
||||||
void $ Sqlite.step stmt
|
void $ Sqlite.step stmt
|
||||||
void $ Sqlite.finalize stmt
|
void $ Sqlite.finalize stmt
|
||||||
|
@ -58,10 +60,10 @@ enableWAL db = do
|
||||||
|
|
||||||
{- Opens the database, but does not perform any migrations. Only use
|
{- Opens the database, but does not perform any migrations. Only use
|
||||||
- if the database is known to exist and have the right tables. -}
|
- if the database is known to exist and have the right tables. -}
|
||||||
openDb :: FilePath -> IO DbHandle
|
openDb :: FilePath -> TableName -> IO DbHandle
|
||||||
openDb db = do
|
openDb db tablename = do
|
||||||
jobs <- newEmptyMVar
|
jobs <- newEmptyMVar
|
||||||
worker <- async (workerThread (T.pack db) jobs)
|
worker <- async (workerThread (T.pack db) tablename jobs)
|
||||||
q <- newMVar emptyDbQueue
|
q <- newMVar emptyDbQueue
|
||||||
return $ DbHandle worker jobs q
|
return $ DbHandle worker jobs q
|
||||||
|
|
||||||
|
@ -70,12 +72,14 @@ data Job
|
||||||
| ChangeJob ((SqlPersistM () -> IO ()) -> IO ())
|
| ChangeJob ((SqlPersistM () -> IO ()) -> IO ())
|
||||||
| CloseJob
|
| CloseJob
|
||||||
|
|
||||||
workerThread :: T.Text -> MVar Job -> IO ()
|
type TableName = String
|
||||||
workerThread db jobs = catchNonAsync loop showerr
|
|
||||||
|
workerThread :: T.Text -> TableName -> MVar Job -> IO ()
|
||||||
|
workerThread db tablename jobs = catchNonAsync loop showerr
|
||||||
where
|
where
|
||||||
showerr e = liftIO $ warningIO $
|
showerr e = liftIO $ warningIO $
|
||||||
"sqlite worker thread crashed: " ++ show e
|
"sqlite worker thread crashed: " ++ show e
|
||||||
run = runSqlite db
|
|
||||||
loop = do
|
loop = do
|
||||||
r <- run queryloop
|
r <- run queryloop
|
||||||
case r of
|
case r of
|
||||||
|
@ -85,11 +89,42 @@ workerThread db jobs = catchNonAsync loop showerr
|
||||||
-- time, and it may crash the database connection
|
-- time, and it may crash the database connection
|
||||||
ChangeJob a -> a run >> loop
|
ChangeJob a -> a run >> loop
|
||||||
CloseJob -> return ()
|
CloseJob -> return ()
|
||||||
|
|
||||||
queryloop = do
|
queryloop = do
|
||||||
job <- liftIO $ takeMVar jobs
|
job <- liftIO $ takeMVar jobs
|
||||||
case job of
|
case job of
|
||||||
QueryJob a -> a >> queryloop
|
QueryJob a -> a >> queryloop
|
||||||
_ -> return job
|
_ -> return job
|
||||||
|
|
||||||
|
-- like runSqlite, but calls settle on the raw sql Connection.
|
||||||
|
run a = do
|
||||||
|
conn <- Sqlite.open db
|
||||||
|
settle conn
|
||||||
|
runResourceT $ runNoLoggingT $
|
||||||
|
withSqlConn (wrapConnection conn) $
|
||||||
|
runSqlConn a
|
||||||
|
|
||||||
|
-- Work around a bug in sqlite: New database connections can
|
||||||
|
-- sometimes take a while to become usable; select statements will
|
||||||
|
-- fail with ErrorBusy for some time. So, loop until a select
|
||||||
|
-- succeeds; once one succeeds the connection will stay usable.
|
||||||
|
-- <http://thread.gmane.org/gmane.comp.db.sqlite.general/93116>
|
||||||
|
settle conn = do
|
||||||
|
r <- tryNonAsync $ do
|
||||||
|
stmt <- Sqlite.prepare conn nullselect
|
||||||
|
void $ Sqlite.step stmt
|
||||||
|
void $ Sqlite.finalize stmt
|
||||||
|
case r of
|
||||||
|
Right _ -> return ()
|
||||||
|
Left e -> do
|
||||||
|
if "ErrorBusy" `isInfixOf` show e
|
||||||
|
then do
|
||||||
|
threadDelay 1000 -- 1/1000th second
|
||||||
|
settle conn
|
||||||
|
else throwIO e
|
||||||
|
|
||||||
|
-- This should succeed for any table.
|
||||||
|
nullselect = T.pack $ "SELECT null from " ++ tablename ++ " limit 1"
|
||||||
|
|
||||||
{- Makes a query using the DbHandle. This should not be used to make
|
{- Makes a query using the DbHandle. This should not be used to make
|
||||||
- changes to the database!
|
- changes to the database!
|
||||||
|
@ -100,20 +135,13 @@ workerThread db jobs = catchNonAsync loop showerr
|
||||||
- Only one action can be run at a time against a given DbHandle.
|
- Only one action can be run at a time against a given DbHandle.
|
||||||
- If called concurrently in the same process, this will block until
|
- If called concurrently in the same process, this will block until
|
||||||
- it is able to run.
|
- it is able to run.
|
||||||
-
|
|
||||||
- Warning: Under heavy traffic, this can fail with an exception
|
|
||||||
- that contains "ErrorBusy". WAL mode does not entirely prevent this.
|
|
||||||
- The fallback value is returned in this case.
|
|
||||||
-}
|
-}
|
||||||
queryDb :: DbHandle -> a -> SqlPersistM a -> IO a
|
queryDb :: DbHandle -> SqlPersistM a -> IO a
|
||||||
queryDb (DbHandle _ jobs _) fallback a =
|
queryDb (DbHandle _ jobs _) a = do
|
||||||
catchNonAsync go (\e -> print e >> return fallback )
|
res <- newEmptyMVar
|
||||||
where
|
putMVar jobs $ QueryJob $
|
||||||
go = do
|
liftIO . putMVar res =<< tryNonAsync a
|
||||||
res <- newEmptyMVar
|
either throwIO return =<< takeMVar res
|
||||||
putMVar jobs $ QueryJob $
|
|
||||||
liftIO . putMVar res =<< tryNonAsync a
|
|
||||||
either throwIO return =<< takeMVar res
|
|
||||||
|
|
||||||
closeDb :: DbHandle -> IO ()
|
closeDb :: DbHandle -> IO ()
|
||||||
closeDb h@(DbHandle worker jobs _) = do
|
closeDb h@(DbHandle worker jobs _) = do
|
||||||
|
@ -152,7 +180,7 @@ queueDb h@(DbHandle _ _ qvar) maxsz a = do
|
||||||
then do
|
then do
|
||||||
r <- commitDb h qa'
|
r <- commitDb h qa'
|
||||||
case r of
|
case r of
|
||||||
Left e -> enqueue 0
|
Left _ -> enqueue 0
|
||||||
Right _ -> putMVar qvar emptyDbQueue
|
Right _ -> putMVar qvar emptyDbQueue
|
||||||
else enqueue sz'
|
else enqueue sz'
|
||||||
|
|
||||||
|
|
|
@ -111,7 +111,8 @@ Executable git-annex
|
||||||
SafeSemaphore, uuid, random, dlist, unix-compat, async, stm (>= 2.3),
|
SafeSemaphore, uuid, random, dlist, unix-compat, async, stm (>= 2.3),
|
||||||
data-default, case-insensitive, http-conduit, http-types,
|
data-default, case-insensitive, http-conduit, http-types,
|
||||||
cryptohash (>= 0.10.0),
|
cryptohash (>= 0.10.0),
|
||||||
esqueleto, persistent-sqlite (>= 2.1.0.1), persistent, persistent-template
|
esqueleto, persistent-sqlite, persistent, persistent-template,
|
||||||
|
monad-logger
|
||||||
CC-Options: -Wall
|
CC-Options: -Wall
|
||||||
GHC-Options: -Wall
|
GHC-Options: -Wall
|
||||||
Extensions: PackageImports
|
Extensions: PackageImports
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue