From 63839532c970b70c68d674f864249b58d129329d Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 2 Dec 2020 14:57:43 -0400 Subject: [PATCH] remove uses of warningIO It's not concurrent-output safe, and doesn't support --json-error-messages. Using Annex.makeRunner is a bit scary, because what if it's run in a different thread from an active annex action? Normally the same Annex state is not used concurrently in several threads, and it's not designed to be fully concurrency safe. (Annex.Concurrent exists to deal with that.) I think it will be ok in these simple cases though. Eg, when buffering a warning message to json, Annex.changeState is used, and it modifies the MVar in a concurrency safe way. The only warningIO remaining is not a problem. --- Annex/Init.hs | 10 ++++++---- Remote/External.hs | 3 ++- Remote/External/AsyncExtension.hs | 19 ++++++++++--------- Remote/Helper/Chunked/Legacy.hs | 6 +++--- Remote/WebDAV.hs | 12 +++++++----- 5 files changed, 28 insertions(+), 22 deletions(-) diff --git a/Annex/Init.hs b/Annex/Init.hs index fc6d2a7f04..614c307a0b 100644 --- a/Annex/Init.hs +++ b/Annex/Init.hs @@ -252,7 +252,8 @@ probeLockSupport = return True probeLockSupport = withEventuallyCleanedOtherTmp $ \tmp -> do let f = tmp P. "lockprobe" mode <- annexFileMode - liftIO $ withAsync warnstall (const (go f mode)) + annexrunner <- Annex.makeRunner + liftIO $ withAsync (warnstall annexrunner) (const (go f mode)) where go f mode = do removeWhenExistsWith R.removeLink f @@ -264,10 +265,11 @@ probeLockSupport = withEventuallyCleanedOtherTmp $ \tmp -> do removeWhenExistsWith R.removeLink f return ok - warnstall = do + warnstall annexrunner = do threadDelaySeconds (Seconds 10) - warningIO "Probing the filesystem for POSIX fcntl lock support is taking a long time." - warningIO "(Setting annex.pidlock will avoid this probe.)" + annexrunner $ do + warning "Probing the filesystem for POSIX fcntl lock support is taking a long time." + warning "(Setting annex.pidlock will avoid this probe.)" #endif probeFifoSupport :: Annex Bool diff --git a/Remote/External.hs b/Remote/External.hs index 729e166893..2ece936b8f 100644 --- a/Remote/External.hs +++ b/Remote/External.hs @@ -619,7 +619,8 @@ startExternal external = `onException` store UncheckedExternalAsync if asyncExtensionEnabled extensions then do - relay <- liftIO $ runRelayToExternalAsync external st + annexrunner <- Annex.makeRunner + relay <- liftIO $ runRelayToExternalAsync external st annexrunner st' <- liftIO $ asyncRelayExternalState relay store (ExternalAsync relay) return st' diff --git a/Remote/External/AsyncExtension.hs b/Remote/External/AsyncExtension.hs index ee60bc8e2f..fd88349a45 100644 --- a/Remote/External/AsyncExtension.hs +++ b/Remote/External/AsyncExtension.hs @@ -11,9 +11,10 @@ module Remote.External.AsyncExtension (runRelayToExternalAsync) where import Common +import Annex import Messages import Remote.External.Types -import Utility.SimpleProtocol as Proto +import qualified Utility.SimpleProtocol as Proto import Control.Concurrent.Async import Control.Concurrent.STM @@ -23,13 +24,13 @@ import qualified Data.Map.Strict as M -- | Starts a thread that will handle all communication with the external -- process. The input ExternalState communicates directly with the external -- process. -runRelayToExternalAsync :: External -> ExternalState -> IO ExternalAsyncRelay -runRelayToExternalAsync external st = do +runRelayToExternalAsync :: External -> ExternalState -> (Annex () -> IO ()) -> IO ExternalAsyncRelay +runRelayToExternalAsync external st annexrunner = do jidmap <- newTVarIO M.empty sendq <- newSendQueue nextjid <- newTVarIO (JobId 1) sender <- async $ sendloop st sendq - receiver <- async $ receiveloop external st jidmap sendq sender + receiver <- async $ receiveloop external st jidmap sendq sender annexrunner return $ ExternalAsyncRelay $ do receiveq <- newReceiveQueue jid <- atomically $ do @@ -65,14 +66,14 @@ newReceiveQueue = newTBMChanIO 10 newSendQueue :: IO SendQueue newSendQueue = newTBMChanIO 10 -receiveloop :: External -> ExternalState -> JidMap -> SendQueue -> Async () -> IO () -receiveloop external st jidmap sendq sendthread = externalReceive st >>= \case +receiveloop :: External -> ExternalState -> JidMap -> SendQueue -> Async () -> (Annex () -> IO ()) -> IO () +receiveloop external st jidmap sendq sendthread annexrunner = externalReceive st >>= \case Just l -> case parseMessage l :: Maybe AsyncMessage of Just (AsyncMessage jid msg) -> M.lookup jid <$> readTVarIO jidmap >>= \case Just c -> do atomically $ writeTBMChan c msg - receiveloop external st jidmap sendq sendthread + receiveloop external st jidmap sendq sendthread annexrunner Nothing -> protoerr "unknown job number" Nothing -> case parseMessage l :: Maybe ExceptionalMessage of Just _ -> do @@ -80,12 +81,12 @@ receiveloop external st jidmap sendq sendthread = externalReceive st >>= \case m <- readTVarIO jidmap forM_ (M.elems m) $ \c -> atomically $ writeTBMChan c l - receiveloop external st jidmap sendq sendthread + receiveloop external st jidmap sendq sendthread annexrunner Nothing -> protoerr "unexpected non-async message" Nothing -> closeandshutdown where protoerr s = do - warningIO $ "async external special remote protocol error: " ++ s + annexrunner $ warning $ "async external special remote protocol error: " ++ s closeandshutdown closeandshutdown = do diff --git a/Remote/Helper/Chunked/Legacy.hs b/Remote/Helper/Chunked/Legacy.hs index 9f0678e7bd..b236b8cb18 100644 --- a/Remote/Helper/Chunked/Legacy.hs +++ b/Remote/Helper/Chunked/Legacy.hs @@ -85,8 +85,8 @@ storeChunks key tmp dest storer recorder finalizer = do - But this is the best that can be done with the storer interface that - writes a whole L.ByteString at a time. -} -storeChunked :: ChunkSize -> [FilePath] -> (FilePath -> L.ByteString -> IO ()) -> L.ByteString -> IO [FilePath] -storeChunked chunksize dests storer content = +storeChunked :: (Annex () -> IO ()) -> ChunkSize -> [FilePath] -> (FilePath -> L.ByteString -> IO ()) -> L.ByteString -> IO [FilePath] +storeChunked annexrunner chunksize dests storer content = either onerr return =<< tryNonAsync (go (Just chunksize) dests) where go _ [] = return [] -- no dests!? @@ -99,7 +99,7 @@ storeChunked chunksize dests storer content = | otherwise = storechunks sz [] dests content onerr e = do - warningIO (show e) + annexrunner $ warning (show e) return [] storechunks _ _ [] _ = return [] -- ran out of dests diff --git a/Remote/WebDAV.hs b/Remote/WebDAV.hs index 018987d401..b649ae0444 100644 --- a/Remote/WebDAV.hs +++ b/Remote/WebDAV.hs @@ -28,6 +28,7 @@ import Annex.Common import Types.Remote import Types.Export import qualified Git +import qualified Annex import Config import Config.Cost import Annex.SpecialRemote.Config @@ -139,8 +140,9 @@ webdavSetup _ mu mcreds c gc = do store :: DavHandleVar -> ChunkConfig -> Storer store hv (LegacyChunks chunksize) = fileStorer $ \k f p -> - withDavHandle hv $ \dav -> liftIO $ - withMeteredFile f p $ storeLegacyChunked chunksize k dav + withDavHandle hv $ \dav -> do + annexrunner <- Annex.makeRunner + liftIO $ withMeteredFile f p $ storeLegacyChunked annexrunner chunksize k dav store hv _ = httpStorer $ \k reqbody -> withDavHandle hv $ \dav -> liftIO $ goDAV dav $ do let tmp = keyTmpLocation k @@ -448,15 +450,15 @@ prepDAV user pass = do -- Legacy chunking code, to be removed eventually. -- -storeLegacyChunked :: ChunkSize -> Key -> DavHandle -> L.ByteString -> IO () -storeLegacyChunked chunksize k dav b = +storeLegacyChunked :: (Annex () -> IO ()) -> ChunkSize -> Key -> DavHandle -> L.ByteString -> IO () +storeLegacyChunked annexrunner chunksize k dav b = Legacy.storeChunks k tmp dest storer recorder finalizer where storehttp l b' = void $ goDAV dav $ do maybe noop (void . mkColRecursive) (locationParent l) debugDav $ "putContent " ++ l inLocation l $ putContentM (contentType, b') - storer locs = Legacy.storeChunked chunksize locs storehttp b + storer locs = Legacy.storeChunked annexrunner chunksize locs storehttp b recorder l s = storehttp l (L8.fromString s) finalizer tmp' dest' = goDAV dav $ finalizeStore dav tmp' (fromJust $ locationParent dest')