better locking for json with -J

Avoid threads emitting json at the same time and scrambling, which was
still possible even with the buffering, just less likely.

Converted json IO actions to JSONChunk data too.
This commit is contained in:
Joey Hess 2016-09-09 15:49:44 -04:00
parent 05d4438383
commit a108235565
No known key found for this signature in database
GPG key ID: C910D9222512E3C7
6 changed files with 63 additions and 42 deletions

View file

@ -105,7 +105,6 @@ showRemoteUrls :: M.Map UUID Remote -> (UUID, [URLString]) -> Annex ()
showRemoteUrls remotemap (uu, us) showRemoteUrls remotemap (uu, us)
| null us = noop | null us = noop
| otherwise = case M.lookup uu remotemap of | otherwise = case M.lookup uu remotemap of
Just r -> do Just r -> showLongNote $
let ls = unlines $ map (\u -> name r ++ ": " ++ u) us unlines $ map (\u -> name r ++ ": " ++ u) us
outputMessage noop ('\n' : indent ls ++ "\n")
Nothing -> noop Nothing -> noop

View file

@ -85,7 +85,7 @@ showSideAction m = Annex.getState Annex.output >>= go
Annex.changeState $ \s -> s { Annex.output = st' } Annex.changeState $ \s -> s { Annex.output = st' }
| sideActionBlock st == InBlock = return () | sideActionBlock st == InBlock = return ()
| otherwise = p | otherwise = p
p = outputMessage q $ "(" ++ m ++ "...)\n" p = outputMessage JSON.none $ "(" ++ m ++ "...)\n"
showStoringStateAction :: Annex () showStoringStateAction :: Annex ()
showStoringStateAction = showSideAction "recording state in git" showStoringStateAction = showSideAction "recording state in git"
@ -110,7 +110,7 @@ doSideAction' b a = do
{- Make way for subsequent output of a command. -} {- Make way for subsequent output of a command. -}
showOutput :: Annex () showOutput :: Annex ()
showOutput = unlessM commandProgressDisabled $ showOutput = unlessM commandProgressDisabled $
outputMessage q "\n" outputMessage JSON.none "\n"
showLongNote :: String -> Annex () showLongNote :: String -> Annex ()
showLongNote s = outputMessage (JSON.note s) ('\n' : indent s ++ "\n") showLongNote s = outputMessage (JSON.note s) ('\n' : indent s ++ "\n")
@ -140,7 +140,7 @@ earlyWarning = warning' False
warning' :: Bool -> String -> Annex () warning' :: Bool -> String -> Annex ()
warning' makeway w = do warning' makeway w = do
when makeway $ when makeway $
outputMessage q "\n" outputMessage JSON.none "\n"
outputError (w ++ "\n") outputError (w ++ "\n")
{- Not concurrent output safe. -} {- Not concurrent output safe. -}
@ -173,10 +173,10 @@ showCustom command a = do
outputMessage (JSON.end r) "" outputMessage (JSON.end r) ""
showHeader :: String -> Annex () showHeader :: String -> Annex ()
showHeader h = outputMessage q $ (h ++ ": ") showHeader h = outputMessage JSON.none $ (h ++ ": ")
showRaw :: String -> Annex () showRaw :: String -> Annex ()
showRaw s = outputMessage q (s ++ "\n") showRaw s = outputMessage JSON.none (s ++ "\n")
setupConsole :: IO () setupConsole :: IO ()
setupConsole = do setupConsole = do

View file

@ -11,17 +11,20 @@ import Common
import Annex import Annex
import Types.Messages import Types.Messages
import Messages.Concurrent import Messages.Concurrent
import Messages.JSON
import qualified Data.ByteString.Lazy as B
withMessageState :: (MessageState -> Annex a) -> Annex a withMessageState :: (MessageState -> Annex a) -> Annex a
withMessageState a = Annex.getState Annex.output >>= a withMessageState a = Annex.getState Annex.output >>= a
outputMessage :: IO () -> String -> Annex () outputMessage :: JSONChunk -> String -> Annex ()
outputMessage = outputMessage' False outputMessage = outputMessage' False
outputMessageFinal :: IO () -> String -> Annex () outputMessageFinal :: JSONChunk -> String -> Annex ()
outputMessageFinal = outputMessage' True outputMessageFinal = outputMessage' True
outputMessage' :: Bool -> IO () -> String -> Annex () outputMessage' :: Bool -> JSONChunk -> String -> Annex ()
outputMessage' endmessage json msg = withMessageState $ \s -> case outputType s of outputMessage' endmessage json msg = withMessageState $ \s -> case outputType s of
NormalOutput NormalOutput
| concurrentOutputEnabled s -> concurrentMessage s False msg q | concurrentOutputEnabled s -> concurrentMessage s False msg q
@ -29,7 +32,7 @@ outputMessage' endmessage json msg = withMessageState $ \s -> case outputType s
JSONOutput _ -> void $ outputJSON json endmessage s JSONOutput _ -> void $ outputJSON json endmessage s
QuietOutput -> q QuietOutput -> q
outputJSON :: IO () -> Bool -> MessageState -> Annex Bool outputJSON :: JSONChunk -> Bool -> MessageState -> Annex Bool
outputJSON json endmessage s = case outputType s of outputJSON json endmessage s = case outputType s of
JSONOutput withprogress JSONOutput withprogress
| withprogress || concurrentOutputEnabled s -> do | withprogress || concurrentOutputEnabled s -> do
@ -37,20 +40,17 @@ outputJSON json endmessage s = case outputType s of
if endmessage if endmessage
then do then do
Annex.changeState $ \st -> Annex.changeState $ \st ->
st { Annex.output = s { jsonBuffer = [] } } st { Annex.output = s { jsonBuffer = none } }
liftIO $ flushed $ do liftIO $ flushed $ emit b
showJSONBuffer s
json
else Annex.changeState $ \st -> else Annex.changeState $ \st ->
st { Annex.output = s { jsonBuffer = json : jsonBuffer s } } st { Annex.output = s { jsonBuffer = b } }
return True return True
| otherwise -> do | otherwise -> do
liftIO $ flushed json liftIO $ flushed $ emit json
return True return True
_ -> return False _ -> return False
where
showJSONBuffer :: MessageState -> IO () b = jsonBuffer s `B.append` json
showJSONBuffer s = sequence_ $ reverse $ jsonBuffer s
outputError :: String -> Annex () outputError :: String -> Annex ()
outputError msg = withMessageState $ \s -> outputError msg = withMessageState $ \s ->

View file

@ -8,6 +8,9 @@
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
module Messages.JSON ( module Messages.JSON (
JSONChunk,
emit,
none,
start, start,
end, end,
note, note,
@ -25,6 +28,8 @@ import qualified Data.Map as M
import qualified Data.Text as T import qualified Data.Text as T
import qualified Data.ByteString.Lazy as B import qualified Data.ByteString.Lazy as B
import System.IO import System.IO
import System.IO.Unsafe (unsafePerformIO)
import Control.Concurrent
import Data.Maybe import Data.Maybe
import Data.Monoid import Data.Monoid
import Prelude import Prelude
@ -34,8 +39,24 @@ import Types.Key
import Utility.Metered import Utility.Metered
import Utility.Percentage import Utility.Percentage
start :: String -> Maybe FilePath -> Maybe Key -> IO () type JSONChunk = B.ByteString
start command file key = B.hPut stdout $ Stream.start $ Stream.AesonObject o
-- A global lock to avoid concurrent threads emitting json at the same time.
{-# NOINLINE emitLock #-}
emitLock :: MVar ()
emitLock = unsafePerformIO $ newMVar ()
emit :: JSONChunk -> IO ()
emit v = do
takeMVar emitLock
B.hPut stdout v
putMVar emitLock ()
none :: JSONChunk
none = B.empty
start :: String -> Maybe FilePath -> Maybe Key -> JSONChunk
start command file key = Stream.start $ Stream.AesonObject o
where where
Object o = toJSON $ JSONActionItem Object o = toJSON $ JSONActionItem
{ itemCommand = Just command { itemCommand = Just command
@ -44,25 +65,26 @@ start command file key = B.hPut stdout $ Stream.start $ Stream.AesonObject o
, itemAdded = Nothing , itemAdded = Nothing
} }
end :: Bool -> IO () end :: Bool -> JSONChunk
end b = B.hPut stdout $ Stream.add (Stream.JSONChunk [("success", b)]) `B.append` Stream.end end b =Stream.add (Stream.JSONChunk [("success", b)]) `B.append` Stream.end
note :: String -> IO () note :: String -> JSONChunk
note s = add (Stream.JSONChunk [("note", s)]) note s = add (Stream.JSONChunk [("note", s)])
add :: Stream.JSONChunk a -> IO () add :: Stream.JSONChunk a -> JSONChunk
add = B.hPut stdout . Stream.add add = Stream.add
complete :: Stream.JSONChunk a -> IO () complete :: Stream.JSONChunk a -> JSONChunk
complete v = B.hPut stdout $ Stream.start v `B.append` Stream.end complete v = Stream.start v `B.append` Stream.end
progress :: IO () -> Integer -> BytesProcessed -> IO () progress :: B.ByteString -> Integer -> BytesProcessed -> IO ()
progress jsonbuffer size bytesprocessed = do progress jsonbuffer size bytesprocessed = emit $ B.concat
B.hPut stdout $ Stream.start $ Stream.AesonObject o [ Stream.start $ Stream.AesonObject o
putStr ",\"action\":" , ",\"action\":"
jsonbuffer , jsonbuffer
B.hPut stdout $ Stream.end , "}"
B.hPut stdout $ Stream.end , Stream.end
]
where where
n = fromBytesProcessed bytesprocessed :: Integer n = fromBytesProcessed bytesprocessed :: Integer
Object o = object Object o = object

View file

@ -11,7 +11,6 @@ module Messages.Progress where
import Common import Common
import Messages import Messages
import Messages.Internal
import Utility.Metered import Utility.Metered
import Types import Types
import Types.Messages import Types.Messages
@ -59,7 +58,7 @@ metered othermeter key a = case keySize key of
#endif #endif
go _ (MessageState { outputType = JSONOutput False }) = nometer go _ (MessageState { outputType = JSONOutput False }) = nometer
go size (MessageState { outputType = JSONOutput True }) = do go size (MessageState { outputType = JSONOutput True }) = do
buf <- withMessageState $ return . showJSONBuffer buf <- withMessageState $ return . jsonBuffer
m <- liftIO $ rateLimitMeterUpdate 0.1 (Just size) $ m <- liftIO $ rateLimitMeterUpdate 0.1 (Just size) $
JSON.progress buf size JSON.progress buf size
a (combinemeter m) a (combinemeter m)
@ -93,7 +92,7 @@ concurrentMeteredFile file combinemeterupdate key a =
{- Progress dots. -} {- Progress dots. -}
showProgressDots :: Annex () showProgressDots :: Annex ()
showProgressDots = outputMessage q "." showProgressDots = outputMessage JSON.none "."
{- Runs a command, that may output progress to either stdout or {- Runs a command, that may output progress to either stdout or
- stderr, as well as other messages. - stderr, as well as other messages.

View file

@ -10,6 +10,7 @@
module Types.Messages where module Types.Messages where
import Data.Default import Data.Default
import qualified Data.ByteString.Lazy as B
#ifdef WITH_CONCURRENTOUTPUT #ifdef WITH_CONCURRENTOUTPUT
import System.Console.Regions (ConsoleRegion) import System.Console.Regions (ConsoleRegion)
@ -30,7 +31,7 @@ data MessageState = MessageState
, consoleRegion :: Maybe ConsoleRegion , consoleRegion :: Maybe ConsoleRegion
, consoleRegionErrFlag :: Bool , consoleRegionErrFlag :: Bool
#endif #endif
, jsonBuffer :: [IO ()] , jsonBuffer :: B.ByteString
} }
instance Default MessageState instance Default MessageState
@ -44,5 +45,5 @@ instance Default MessageState
, consoleRegion = Nothing , consoleRegion = Nothing
, consoleRegionErrFlag = False , consoleRegionErrFlag = False
#endif #endif
, jsonBuffer = [] , jsonBuffer = B.empty
} }