diff --git a/Annex/Init.hs b/Annex/Init.hs index fc6d2a7f04..614c307a0b 100644 --- a/Annex/Init.hs +++ b/Annex/Init.hs @@ -252,7 +252,8 @@ probeLockSupport = return True probeLockSupport = withEventuallyCleanedOtherTmp $ \tmp -> do let f = tmp P. "lockprobe" mode <- annexFileMode - liftIO $ withAsync warnstall (const (go f mode)) + annexrunner <- Annex.makeRunner + liftIO $ withAsync (warnstall annexrunner) (const (go f mode)) where go f mode = do removeWhenExistsWith R.removeLink f @@ -264,10 +265,11 @@ probeLockSupport = withEventuallyCleanedOtherTmp $ \tmp -> do removeWhenExistsWith R.removeLink f return ok - warnstall = do + warnstall annexrunner = do threadDelaySeconds (Seconds 10) - warningIO "Probing the filesystem for POSIX fcntl lock support is taking a long time." - warningIO "(Setting annex.pidlock will avoid this probe.)" + annexrunner $ do + warning "Probing the filesystem for POSIX fcntl lock support is taking a long time." + warning "(Setting annex.pidlock will avoid this probe.)" #endif probeFifoSupport :: Annex Bool diff --git a/Remote/External.hs b/Remote/External.hs index 729e166893..2ece936b8f 100644 --- a/Remote/External.hs +++ b/Remote/External.hs @@ -619,7 +619,8 @@ startExternal external = `onException` store UncheckedExternalAsync if asyncExtensionEnabled extensions then do - relay <- liftIO $ runRelayToExternalAsync external st + annexrunner <- Annex.makeRunner + relay <- liftIO $ runRelayToExternalAsync external st annexrunner st' <- liftIO $ asyncRelayExternalState relay store (ExternalAsync relay) return st' diff --git a/Remote/External/AsyncExtension.hs b/Remote/External/AsyncExtension.hs index ee60bc8e2f..fd88349a45 100644 --- a/Remote/External/AsyncExtension.hs +++ b/Remote/External/AsyncExtension.hs @@ -11,9 +11,10 @@ module Remote.External.AsyncExtension (runRelayToExternalAsync) where import Common +import Annex import Messages import Remote.External.Types -import Utility.SimpleProtocol as Proto +import qualified Utility.SimpleProtocol as Proto import Control.Concurrent.Async import Control.Concurrent.STM @@ -23,13 +24,13 @@ import qualified Data.Map.Strict as M -- | Starts a thread that will handle all communication with the external -- process. The input ExternalState communicates directly with the external -- process. -runRelayToExternalAsync :: External -> ExternalState -> IO ExternalAsyncRelay -runRelayToExternalAsync external st = do +runRelayToExternalAsync :: External -> ExternalState -> (Annex () -> IO ()) -> IO ExternalAsyncRelay +runRelayToExternalAsync external st annexrunner = do jidmap <- newTVarIO M.empty sendq <- newSendQueue nextjid <- newTVarIO (JobId 1) sender <- async $ sendloop st sendq - receiver <- async $ receiveloop external st jidmap sendq sender + receiver <- async $ receiveloop external st jidmap sendq sender annexrunner return $ ExternalAsyncRelay $ do receiveq <- newReceiveQueue jid <- atomically $ do @@ -65,14 +66,14 @@ newReceiveQueue = newTBMChanIO 10 newSendQueue :: IO SendQueue newSendQueue = newTBMChanIO 10 -receiveloop :: External -> ExternalState -> JidMap -> SendQueue -> Async () -> IO () -receiveloop external st jidmap sendq sendthread = externalReceive st >>= \case +receiveloop :: External -> ExternalState -> JidMap -> SendQueue -> Async () -> (Annex () -> IO ()) -> IO () +receiveloop external st jidmap sendq sendthread annexrunner = externalReceive st >>= \case Just l -> case parseMessage l :: Maybe AsyncMessage of Just (AsyncMessage jid msg) -> M.lookup jid <$> readTVarIO jidmap >>= \case Just c -> do atomically $ writeTBMChan c msg - receiveloop external st jidmap sendq sendthread + receiveloop external st jidmap sendq sendthread annexrunner Nothing -> protoerr "unknown job number" Nothing -> case parseMessage l :: Maybe ExceptionalMessage of Just _ -> do @@ -80,12 +81,12 @@ receiveloop external st jidmap sendq sendthread = externalReceive st >>= \case m <- readTVarIO jidmap forM_ (M.elems m) $ \c -> atomically $ writeTBMChan c l - receiveloop external st jidmap sendq sendthread + receiveloop external st jidmap sendq sendthread annexrunner Nothing -> protoerr "unexpected non-async message" Nothing -> closeandshutdown where protoerr s = do - warningIO $ "async external special remote protocol error: " ++ s + annexrunner $ warning $ "async external special remote protocol error: " ++ s closeandshutdown closeandshutdown = do diff --git a/Remote/Helper/Chunked/Legacy.hs b/Remote/Helper/Chunked/Legacy.hs index 9f0678e7bd..b236b8cb18 100644 --- a/Remote/Helper/Chunked/Legacy.hs +++ b/Remote/Helper/Chunked/Legacy.hs @@ -85,8 +85,8 @@ storeChunks key tmp dest storer recorder finalizer = do - But this is the best that can be done with the storer interface that - writes a whole L.ByteString at a time. -} -storeChunked :: ChunkSize -> [FilePath] -> (FilePath -> L.ByteString -> IO ()) -> L.ByteString -> IO [FilePath] -storeChunked chunksize dests storer content = +storeChunked :: (Annex () -> IO ()) -> ChunkSize -> [FilePath] -> (FilePath -> L.ByteString -> IO ()) -> L.ByteString -> IO [FilePath] +storeChunked annexrunner chunksize dests storer content = either onerr return =<< tryNonAsync (go (Just chunksize) dests) where go _ [] = return [] -- no dests!? @@ -99,7 +99,7 @@ storeChunked chunksize dests storer content = | otherwise = storechunks sz [] dests content onerr e = do - warningIO (show e) + annexrunner $ warning (show e) return [] storechunks _ _ [] _ = return [] -- ran out of dests diff --git a/Remote/WebDAV.hs b/Remote/WebDAV.hs index 018987d401..b649ae0444 100644 --- a/Remote/WebDAV.hs +++ b/Remote/WebDAV.hs @@ -28,6 +28,7 @@ import Annex.Common import Types.Remote import Types.Export import qualified Git +import qualified Annex import Config import Config.Cost import Annex.SpecialRemote.Config @@ -139,8 +140,9 @@ webdavSetup _ mu mcreds c gc = do store :: DavHandleVar -> ChunkConfig -> Storer store hv (LegacyChunks chunksize) = fileStorer $ \k f p -> - withDavHandle hv $ \dav -> liftIO $ - withMeteredFile f p $ storeLegacyChunked chunksize k dav + withDavHandle hv $ \dav -> do + annexrunner <- Annex.makeRunner + liftIO $ withMeteredFile f p $ storeLegacyChunked annexrunner chunksize k dav store hv _ = httpStorer $ \k reqbody -> withDavHandle hv $ \dav -> liftIO $ goDAV dav $ do let tmp = keyTmpLocation k @@ -448,15 +450,15 @@ prepDAV user pass = do -- Legacy chunking code, to be removed eventually. -- -storeLegacyChunked :: ChunkSize -> Key -> DavHandle -> L.ByteString -> IO () -storeLegacyChunked chunksize k dav b = +storeLegacyChunked :: (Annex () -> IO ()) -> ChunkSize -> Key -> DavHandle -> L.ByteString -> IO () +storeLegacyChunked annexrunner chunksize k dav b = Legacy.storeChunks k tmp dest storer recorder finalizer where storehttp l b' = void $ goDAV dav $ do maybe noop (void . mkColRecursive) (locationParent l) debugDav $ "putContent " ++ l inLocation l $ putContentM (contentType, b') - storer locs = Legacy.storeChunked chunksize locs storehttp b + storer locs = Legacy.storeChunked annexrunner chunksize locs storehttp b recorder l s = storehttp l (L8.fromString s) finalizer tmp' dest' = goDAV dav $ finalizeStore dav tmp' (fromJust $ locationParent dest')