diff --git a/Database/Fsck.hs b/Database/Fsck.hs index 6274034481..582fdb3d63 100644 --- a/Database/Fsck.hs +++ b/Database/Fsck.hs @@ -72,16 +72,15 @@ openDb u = do let tmpdb = tmpdbdir "db" liftIO $ do createDirectoryIfMissing True tmpdbdir - h <- H.initDb tmpdb $ void $ + H.initDb tmpdb $ void $ runMigrationSilent migrateFsck - H.closeDb h setAnnexDirPerm tmpdbdir setAnnexFilePerm tmpdb liftIO $ do void $ tryIO $ removeDirectoryRecursive dbdir rename tmpdbdir dbdir lockFileShared =<< fromRepo (gitAnnexFsckDbLock u) - h <- liftIO $ H.openDb db + h <- liftIO $ H.openDb db "fscked" return $ FsckHandle h u closeDb :: FsckHandle -> Annex () @@ -90,14 +89,13 @@ closeDb (FsckHandle h u) = do unlockFile =<< fromRepo (gitAnnexFsckDbLock u) addDb :: FsckHandle -> Key -> IO () -addDb (FsckHandle h _) k = H.queueDb h 1000 $ - unlessM (inDb' sk) $ - insert_ $ Fscked sk +addDb (FsckHandle h _) k = H.queueDb h 1000 $ + void $ insertUnique $ Fscked sk where sk = toSKey k inDb :: FsckHandle -> Key -> IO Bool -inDb (FsckHandle h _) = H.queryDb h False . inDb' . toSKey +inDb (FsckHandle h _) = H.queryDb h . inDb' . toSKey inDb' :: SKey -> SqlPersistM Bool inDb' sk = do diff --git a/Database/Handle.hs b/Database/Handle.hs index cb398ddc21..049007bc43 100644 --- a/Database/Handle.hs +++ b/Database/Handle.hs @@ -30,6 +30,9 @@ import Control.Concurrent import Control.Concurrent.Async import Control.Exception (throwIO) import qualified Data.Text as T +import Control.Monad.Trans.Resource (runResourceT) +import Control.Monad.Logger (runNoLoggingT) +import Data.List {- A DbHandle is a reference to a worker thread that communicates with - the database. It has a MVar which Jobs are submitted to. -} @@ -41,16 +44,15 @@ data DbHandle = DbHandle (Async ()) (MVar Job) (MVar DbQueue) - The database is put into WAL mode, to prevent readers from blocking - writers, and prevent a writer from blocking readers. -} -initDb :: FilePath -> SqlPersistM () -> IO DbHandle -initDb db migration = do +initDb :: FilePath -> SqlPersistM () -> IO () +initDb f migration = do + let db = T.pack f enableWAL db - h <- openDb db - either throwIO (const $ return ()) =<< commitDb h migration - return h - -enableWAL :: FilePath -> IO () + runSqlite db migration + +enableWAL :: T.Text -> IO () enableWAL db = do - conn <- Sqlite.open (T.pack db) + conn <- Sqlite.open db stmt <- Sqlite.prepare conn (T.pack "PRAGMA journal_mode=WAL;") void $ Sqlite.step stmt void $ Sqlite.finalize stmt @@ -58,10 +60,10 @@ enableWAL db = do {- Opens the database, but does not perform any migrations. Only use - if the database is known to exist and have the right tables. -} -openDb :: FilePath -> IO DbHandle -openDb db = do +openDb :: FilePath -> TableName -> IO DbHandle +openDb db tablename = do jobs <- newEmptyMVar - worker <- async (workerThread (T.pack db) jobs) + worker <- async (workerThread (T.pack db) tablename jobs) q <- newMVar emptyDbQueue return $ DbHandle worker jobs q @@ -70,12 +72,14 @@ data Job | ChangeJob ((SqlPersistM () -> IO ()) -> IO ()) | CloseJob -workerThread :: T.Text -> MVar Job -> IO () -workerThread db jobs = catchNonAsync loop showerr +type TableName = String + +workerThread :: T.Text -> TableName -> MVar Job -> IO () +workerThread db tablename jobs = catchNonAsync loop showerr where showerr e = liftIO $ warningIO $ "sqlite worker thread crashed: " ++ show e - run = runSqlite db + loop = do r <- run queryloop case r of @@ -85,11 +89,42 @@ workerThread db jobs = catchNonAsync loop showerr -- time, and it may crash the database connection ChangeJob a -> a run >> loop CloseJob -> return () + queryloop = do job <- liftIO $ takeMVar jobs case job of QueryJob a -> a >> queryloop _ -> return job + + -- like runSqlite, but calls settle on the raw sql Connection. + run a = do + conn <- Sqlite.open db + settle conn + runResourceT $ runNoLoggingT $ + withSqlConn (wrapConnection conn) $ + runSqlConn a + + -- Work around a bug in sqlite: New database connections can + -- sometimes take a while to become usable; select statements will + -- fail with ErrorBusy for some time. So, loop until a select + -- succeeds; once one succeeds the connection will stay usable. + -- + settle conn = do + r <- tryNonAsync $ do + stmt <- Sqlite.prepare conn nullselect + void $ Sqlite.step stmt + void $ Sqlite.finalize stmt + case r of + Right _ -> return () + Left e -> do + if "ErrorBusy" `isInfixOf` show e + then do + threadDelay 1000 -- 1/1000th second + settle conn + else throwIO e + + -- This should succeed for any table. + nullselect = T.pack $ "SELECT null from " ++ tablename ++ " limit 1" {- Makes a query using the DbHandle. This should not be used to make - changes to the database! @@ -100,20 +135,13 @@ workerThread db jobs = catchNonAsync loop showerr - Only one action can be run at a time against a given DbHandle. - If called concurrently in the same process, this will block until - it is able to run. - - - - Warning: Under heavy traffic, this can fail with an exception - - that contains "ErrorBusy". WAL mode does not entirely prevent this. - - The fallback value is returned in this case. -} -queryDb :: DbHandle -> a -> SqlPersistM a -> IO a -queryDb (DbHandle _ jobs _) fallback a = - catchNonAsync go (\e -> print e >> return fallback ) - where - go = do - res <- newEmptyMVar - putMVar jobs $ QueryJob $ - liftIO . putMVar res =<< tryNonAsync a - either throwIO return =<< takeMVar res +queryDb :: DbHandle -> SqlPersistM a -> IO a +queryDb (DbHandle _ jobs _) a = do + res <- newEmptyMVar + putMVar jobs $ QueryJob $ + liftIO . putMVar res =<< tryNonAsync a + either throwIO return =<< takeMVar res closeDb :: DbHandle -> IO () closeDb h@(DbHandle worker jobs _) = do @@ -152,7 +180,7 @@ queueDb h@(DbHandle _ _ qvar) maxsz a = do then do r <- commitDb h qa' case r of - Left e -> enqueue 0 + Left _ -> enqueue 0 Right _ -> putMVar qvar emptyDbQueue else enqueue sz' diff --git a/git-annex.cabal b/git-annex.cabal index 676cae4dda..75e483b84b 100644 --- a/git-annex.cabal +++ b/git-annex.cabal @@ -111,7 +111,8 @@ Executable git-annex SafeSemaphore, uuid, random, dlist, unix-compat, async, stm (>= 2.3), data-default, case-insensitive, http-conduit, http-types, cryptohash (>= 0.10.0), - esqueleto, persistent-sqlite (>= 2.1.0.1), persistent, persistent-template + esqueleto, persistent-sqlite, persistent, persistent-template, + monad-logger CC-Options: -Wall GHC-Options: -Wall Extensions: PackageImports