remove uses of warningIO
It's not concurrent-output safe, and doesn't support --json-error-messages. Using Annex.makeRunner is a bit scary, because what if it's run in a different thread from an active annex action? Normally the same Annex state is not used concurrently in several threads, and it's not designed to be fully concurrency safe. (Annex.Concurrent exists to deal with that.) I think it will be ok in these simple cases though. Eg, when buffering a warning message to json, Annex.changeState is used, and it modifies the MVar in a concurrency safe way. The only warningIO remaining is not a problem.
This commit is contained in:
parent
1858b65d88
commit
63839532c9
5 changed files with 28 additions and 22 deletions
|
@ -252,7 +252,8 @@ probeLockSupport = return True
|
||||||
probeLockSupport = withEventuallyCleanedOtherTmp $ \tmp -> do
|
probeLockSupport = withEventuallyCleanedOtherTmp $ \tmp -> do
|
||||||
let f = tmp P.</> "lockprobe"
|
let f = tmp P.</> "lockprobe"
|
||||||
mode <- annexFileMode
|
mode <- annexFileMode
|
||||||
liftIO $ withAsync warnstall (const (go f mode))
|
annexrunner <- Annex.makeRunner
|
||||||
|
liftIO $ withAsync (warnstall annexrunner) (const (go f mode))
|
||||||
where
|
where
|
||||||
go f mode = do
|
go f mode = do
|
||||||
removeWhenExistsWith R.removeLink f
|
removeWhenExistsWith R.removeLink f
|
||||||
|
@ -264,10 +265,11 @@ probeLockSupport = withEventuallyCleanedOtherTmp $ \tmp -> do
|
||||||
removeWhenExistsWith R.removeLink f
|
removeWhenExistsWith R.removeLink f
|
||||||
return ok
|
return ok
|
||||||
|
|
||||||
warnstall = do
|
warnstall annexrunner = do
|
||||||
threadDelaySeconds (Seconds 10)
|
threadDelaySeconds (Seconds 10)
|
||||||
warningIO "Probing the filesystem for POSIX fcntl lock support is taking a long time."
|
annexrunner $ do
|
||||||
warningIO "(Setting annex.pidlock will avoid this probe.)"
|
warning "Probing the filesystem for POSIX fcntl lock support is taking a long time."
|
||||||
|
warning "(Setting annex.pidlock will avoid this probe.)"
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
probeFifoSupport :: Annex Bool
|
probeFifoSupport :: Annex Bool
|
||||||
|
|
|
@ -619,7 +619,8 @@ startExternal external =
|
||||||
`onException` store UncheckedExternalAsync
|
`onException` store UncheckedExternalAsync
|
||||||
if asyncExtensionEnabled extensions
|
if asyncExtensionEnabled extensions
|
||||||
then do
|
then do
|
||||||
relay <- liftIO $ runRelayToExternalAsync external st
|
annexrunner <- Annex.makeRunner
|
||||||
|
relay <- liftIO $ runRelayToExternalAsync external st annexrunner
|
||||||
st' <- liftIO $ asyncRelayExternalState relay
|
st' <- liftIO $ asyncRelayExternalState relay
|
||||||
store (ExternalAsync relay)
|
store (ExternalAsync relay)
|
||||||
return st'
|
return st'
|
||||||
|
|
19
Remote/External/AsyncExtension.hs
vendored
19
Remote/External/AsyncExtension.hs
vendored
|
@ -11,9 +11,10 @@
|
||||||
module Remote.External.AsyncExtension (runRelayToExternalAsync) where
|
module Remote.External.AsyncExtension (runRelayToExternalAsync) where
|
||||||
|
|
||||||
import Common
|
import Common
|
||||||
|
import Annex
|
||||||
import Messages
|
import Messages
|
||||||
import Remote.External.Types
|
import Remote.External.Types
|
||||||
import Utility.SimpleProtocol as Proto
|
import qualified Utility.SimpleProtocol as Proto
|
||||||
|
|
||||||
import Control.Concurrent.Async
|
import Control.Concurrent.Async
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
|
@ -23,13 +24,13 @@ import qualified Data.Map.Strict as M
|
||||||
-- | Starts a thread that will handle all communication with the external
|
-- | Starts a thread that will handle all communication with the external
|
||||||
-- process. The input ExternalState communicates directly with the external
|
-- process. The input ExternalState communicates directly with the external
|
||||||
-- process.
|
-- process.
|
||||||
runRelayToExternalAsync :: External -> ExternalState -> IO ExternalAsyncRelay
|
runRelayToExternalAsync :: External -> ExternalState -> (Annex () -> IO ()) -> IO ExternalAsyncRelay
|
||||||
runRelayToExternalAsync external st = do
|
runRelayToExternalAsync external st annexrunner = do
|
||||||
jidmap <- newTVarIO M.empty
|
jidmap <- newTVarIO M.empty
|
||||||
sendq <- newSendQueue
|
sendq <- newSendQueue
|
||||||
nextjid <- newTVarIO (JobId 1)
|
nextjid <- newTVarIO (JobId 1)
|
||||||
sender <- async $ sendloop st sendq
|
sender <- async $ sendloop st sendq
|
||||||
receiver <- async $ receiveloop external st jidmap sendq sender
|
receiver <- async $ receiveloop external st jidmap sendq sender annexrunner
|
||||||
return $ ExternalAsyncRelay $ do
|
return $ ExternalAsyncRelay $ do
|
||||||
receiveq <- newReceiveQueue
|
receiveq <- newReceiveQueue
|
||||||
jid <- atomically $ do
|
jid <- atomically $ do
|
||||||
|
@ -65,14 +66,14 @@ newReceiveQueue = newTBMChanIO 10
|
||||||
newSendQueue :: IO SendQueue
|
newSendQueue :: IO SendQueue
|
||||||
newSendQueue = newTBMChanIO 10
|
newSendQueue = newTBMChanIO 10
|
||||||
|
|
||||||
receiveloop :: External -> ExternalState -> JidMap -> SendQueue -> Async () -> IO ()
|
receiveloop :: External -> ExternalState -> JidMap -> SendQueue -> Async () -> (Annex () -> IO ()) -> IO ()
|
||||||
receiveloop external st jidmap sendq sendthread = externalReceive st >>= \case
|
receiveloop external st jidmap sendq sendthread annexrunner = externalReceive st >>= \case
|
||||||
Just l -> case parseMessage l :: Maybe AsyncMessage of
|
Just l -> case parseMessage l :: Maybe AsyncMessage of
|
||||||
Just (AsyncMessage jid msg) ->
|
Just (AsyncMessage jid msg) ->
|
||||||
M.lookup jid <$> readTVarIO jidmap >>= \case
|
M.lookup jid <$> readTVarIO jidmap >>= \case
|
||||||
Just c -> do
|
Just c -> do
|
||||||
atomically $ writeTBMChan c msg
|
atomically $ writeTBMChan c msg
|
||||||
receiveloop external st jidmap sendq sendthread
|
receiveloop external st jidmap sendq sendthread annexrunner
|
||||||
Nothing -> protoerr "unknown job number"
|
Nothing -> protoerr "unknown job number"
|
||||||
Nothing -> case parseMessage l :: Maybe ExceptionalMessage of
|
Nothing -> case parseMessage l :: Maybe ExceptionalMessage of
|
||||||
Just _ -> do
|
Just _ -> do
|
||||||
|
@ -80,12 +81,12 @@ receiveloop external st jidmap sendq sendthread = externalReceive st >>= \case
|
||||||
m <- readTVarIO jidmap
|
m <- readTVarIO jidmap
|
||||||
forM_ (M.elems m) $ \c ->
|
forM_ (M.elems m) $ \c ->
|
||||||
atomically $ writeTBMChan c l
|
atomically $ writeTBMChan c l
|
||||||
receiveloop external st jidmap sendq sendthread
|
receiveloop external st jidmap sendq sendthread annexrunner
|
||||||
Nothing -> protoerr "unexpected non-async message"
|
Nothing -> protoerr "unexpected non-async message"
|
||||||
Nothing -> closeandshutdown
|
Nothing -> closeandshutdown
|
||||||
where
|
where
|
||||||
protoerr s = do
|
protoerr s = do
|
||||||
warningIO $ "async external special remote protocol error: " ++ s
|
annexrunner $ warning $ "async external special remote protocol error: " ++ s
|
||||||
closeandshutdown
|
closeandshutdown
|
||||||
|
|
||||||
closeandshutdown = do
|
closeandshutdown = do
|
||||||
|
|
|
@ -85,8 +85,8 @@ storeChunks key tmp dest storer recorder finalizer = do
|
||||||
- But this is the best that can be done with the storer interface that
|
- But this is the best that can be done with the storer interface that
|
||||||
- writes a whole L.ByteString at a time.
|
- writes a whole L.ByteString at a time.
|
||||||
-}
|
-}
|
||||||
storeChunked :: ChunkSize -> [FilePath] -> (FilePath -> L.ByteString -> IO ()) -> L.ByteString -> IO [FilePath]
|
storeChunked :: (Annex () -> IO ()) -> ChunkSize -> [FilePath] -> (FilePath -> L.ByteString -> IO ()) -> L.ByteString -> IO [FilePath]
|
||||||
storeChunked chunksize dests storer content =
|
storeChunked annexrunner chunksize dests storer content =
|
||||||
either onerr return =<< tryNonAsync (go (Just chunksize) dests)
|
either onerr return =<< tryNonAsync (go (Just chunksize) dests)
|
||||||
where
|
where
|
||||||
go _ [] = return [] -- no dests!?
|
go _ [] = return [] -- no dests!?
|
||||||
|
@ -99,7 +99,7 @@ storeChunked chunksize dests storer content =
|
||||||
| otherwise = storechunks sz [] dests content
|
| otherwise = storechunks sz [] dests content
|
||||||
|
|
||||||
onerr e = do
|
onerr e = do
|
||||||
warningIO (show e)
|
annexrunner $ warning (show e)
|
||||||
return []
|
return []
|
||||||
|
|
||||||
storechunks _ _ [] _ = return [] -- ran out of dests
|
storechunks _ _ [] _ = return [] -- ran out of dests
|
||||||
|
|
|
@ -28,6 +28,7 @@ import Annex.Common
|
||||||
import Types.Remote
|
import Types.Remote
|
||||||
import Types.Export
|
import Types.Export
|
||||||
import qualified Git
|
import qualified Git
|
||||||
|
import qualified Annex
|
||||||
import Config
|
import Config
|
||||||
import Config.Cost
|
import Config.Cost
|
||||||
import Annex.SpecialRemote.Config
|
import Annex.SpecialRemote.Config
|
||||||
|
@ -139,8 +140,9 @@ webdavSetup _ mu mcreds c gc = do
|
||||||
|
|
||||||
store :: DavHandleVar -> ChunkConfig -> Storer
|
store :: DavHandleVar -> ChunkConfig -> Storer
|
||||||
store hv (LegacyChunks chunksize) = fileStorer $ \k f p ->
|
store hv (LegacyChunks chunksize) = fileStorer $ \k f p ->
|
||||||
withDavHandle hv $ \dav -> liftIO $
|
withDavHandle hv $ \dav -> do
|
||||||
withMeteredFile f p $ storeLegacyChunked chunksize k dav
|
annexrunner <- Annex.makeRunner
|
||||||
|
liftIO $ withMeteredFile f p $ storeLegacyChunked annexrunner chunksize k dav
|
||||||
store hv _ = httpStorer $ \k reqbody ->
|
store hv _ = httpStorer $ \k reqbody ->
|
||||||
withDavHandle hv $ \dav -> liftIO $ goDAV dav $ do
|
withDavHandle hv $ \dav -> liftIO $ goDAV dav $ do
|
||||||
let tmp = keyTmpLocation k
|
let tmp = keyTmpLocation k
|
||||||
|
@ -448,15 +450,15 @@ prepDAV user pass = do
|
||||||
-- Legacy chunking code, to be removed eventually.
|
-- Legacy chunking code, to be removed eventually.
|
||||||
--
|
--
|
||||||
|
|
||||||
storeLegacyChunked :: ChunkSize -> Key -> DavHandle -> L.ByteString -> IO ()
|
storeLegacyChunked :: (Annex () -> IO ()) -> ChunkSize -> Key -> DavHandle -> L.ByteString -> IO ()
|
||||||
storeLegacyChunked chunksize k dav b =
|
storeLegacyChunked annexrunner chunksize k dav b =
|
||||||
Legacy.storeChunks k tmp dest storer recorder finalizer
|
Legacy.storeChunks k tmp dest storer recorder finalizer
|
||||||
where
|
where
|
||||||
storehttp l b' = void $ goDAV dav $ do
|
storehttp l b' = void $ goDAV dav $ do
|
||||||
maybe noop (void . mkColRecursive) (locationParent l)
|
maybe noop (void . mkColRecursive) (locationParent l)
|
||||||
debugDav $ "putContent " ++ l
|
debugDav $ "putContent " ++ l
|
||||||
inLocation l $ putContentM (contentType, b')
|
inLocation l $ putContentM (contentType, b')
|
||||||
storer locs = Legacy.storeChunked chunksize locs storehttp b
|
storer locs = Legacy.storeChunked annexrunner chunksize locs storehttp b
|
||||||
recorder l s = storehttp l (L8.fromString s)
|
recorder l s = storehttp l (L8.fromString s)
|
||||||
finalizer tmp' dest' = goDAV dav $
|
finalizer tmp' dest' = goDAV dav $
|
||||||
finalizeStore dav tmp' (fromJust $ locationParent dest')
|
finalizeStore dav tmp' (fromJust $ locationParent dest')
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue