WIP
for https://git-annex.branchable.com/bugs/Buggy_external_special_remote_stalls_after_7245a9e/
This commit is contained in:
parent
3dda21d292
commit
aafae46bcb
4 changed files with 88 additions and 38 deletions
|
@ -53,7 +53,7 @@ startExternalAddonProcess basecmd pid = do
|
|||
Left _ -> runerr cmdpath
|
||||
|
||||
started cmd errrelayer pall@(Just hin, Just hout, Just herr, ph) = do
|
||||
stderrelay <- async $ errrelayer herr
|
||||
stderrelay <- async $ errrelayer ph herr
|
||||
let shutdown forcestop = do
|
||||
-- Close the process's stdin, to let it know there
|
||||
-- are no more requests, so it will exit.
|
||||
|
|
|
@ -140,11 +140,11 @@ mkOutputHandlerQuiet = OutputHandler
|
|||
<$> pure True
|
||||
<*> mkStderrEmitter
|
||||
|
||||
mkStderrRelayer :: Annex (Handle -> IO ())
|
||||
mkStderrRelayer :: Annex (ProcessHandle -> Handle -> IO ())
|
||||
mkStderrRelayer = do
|
||||
quiet <- commandProgressDisabled
|
||||
emitter <- mkStderrEmitter
|
||||
return $ \h -> avoidProgress quiet h emitter
|
||||
return $ \ph h -> avoidProgress quiet ph h emitter
|
||||
|
||||
{- Generates an IO action that can be used to emit stderr.
|
||||
-
|
||||
|
|
|
@ -49,7 +49,6 @@ import Common
|
|||
import Utility.Percentage
|
||||
import Utility.DataUnits
|
||||
import Utility.HumanTime
|
||||
import Utility.ThreadScheduler
|
||||
import Utility.SimpleProtocol as Proto
|
||||
|
||||
import qualified Data.ByteString.Lazy as L
|
||||
|
@ -266,7 +265,7 @@ commandMeterExitCode progressparser oh meter meterupdate cmd params =
|
|||
commandMeterExitCode' :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> (CreateProcess -> CreateProcess) -> IO (Maybe ExitCode)
|
||||
commandMeterExitCode' progressparser oh mmeter meterupdate cmd params mkprocess =
|
||||
outputFilter cmd params mkprocess Nothing
|
||||
(feedprogress mmeter zeroBytesProcessed [])
|
||||
(const $ feedprogress mmeter zeroBytesProcessed [])
|
||||
handlestderr
|
||||
where
|
||||
feedprogress sendtotalsize prev buf h = do
|
||||
|
@ -291,9 +290,12 @@ commandMeterExitCode' progressparser oh mmeter meterupdate cmd params mkprocess
|
|||
meterupdate bytes
|
||||
feedprogress sendtotalsize' bytes buf' h
|
||||
|
||||
handlestderr h = unlessM (hIsEOF h) $ do
|
||||
stderrHandler oh =<< hGetLine h
|
||||
handlestderr h
|
||||
handlestderr ph h = unlessM (hIsEOF h) $ do
|
||||
cancelOnExit ph (hGetLine h) >>= \case
|
||||
Just l -> do
|
||||
stderrHandler oh l
|
||||
handlestderr ph h
|
||||
Nothing -> return ()
|
||||
|
||||
{- Runs a command, that may display one or more progress meters on
|
||||
- either stdout or stderr, and prevents the meters from being displayed.
|
||||
|
@ -306,8 +308,8 @@ demeterCommand oh cmd params = demeterCommandEnv oh cmd params Nothing
|
|||
demeterCommandEnv :: OutputHandler -> FilePath -> [CommandParam] -> Maybe [(String, String)] -> IO Bool
|
||||
demeterCommandEnv oh cmd params environ = do
|
||||
ret <- outputFilter cmd params id environ
|
||||
(\outh -> avoidProgress True outh stdouthandler)
|
||||
(\errh -> avoidProgress True errh $ stderrHandler oh)
|
||||
(\ph outh -> avoidProgress True ph outh stdouthandler)
|
||||
(\ph errh -> avoidProgress True ph errh $ stderrHandler oh)
|
||||
return $ case ret of
|
||||
Just ExitSuccess -> True
|
||||
_ -> False
|
||||
|
@ -320,44 +322,32 @@ demeterCommandEnv oh cmd params environ = do
|
|||
- filter out lines that contain \r (typically used to reset to the
|
||||
- beginning of the line when updating a progress display).
|
||||
-}
|
||||
avoidProgress :: Bool -> Handle -> (String -> IO ()) -> IO ()
|
||||
avoidProgress doavoid h emitter = unlessM (hIsEOF h) $ do
|
||||
s <- hGetLine h
|
||||
unless (doavoid && '\r' `elem` s) $
|
||||
emitter s
|
||||
avoidProgress doavoid h emitter
|
||||
avoidProgress :: Bool -> ProcessHandle -> Handle -> (String -> IO ()) -> IO ()
|
||||
avoidProgress doavoid ph h emitter = unlessM (hIsEOF h) $
|
||||
cancelOnExit ph (hGetLine h) >>= \case
|
||||
Just s -> do
|
||||
unless (doavoid && '\r' `elem` s) $
|
||||
emitter s
|
||||
avoidProgress doavoid ph h emitter
|
||||
Nothing -> return ()
|
||||
|
||||
outputFilter
|
||||
:: FilePath
|
||||
-> [CommandParam]
|
||||
-> (CreateProcess -> CreateProcess)
|
||||
-> Maybe [(String, String)]
|
||||
-> (Handle -> IO ())
|
||||
-> (Handle -> IO ())
|
||||
-> (ProcessHandle -> Handle -> IO ())
|
||||
-> (ProcessHandle -> Handle -> IO ())
|
||||
-> IO (Maybe ExitCode)
|
||||
outputFilter cmd params mkprocess environ outfilter errfilter =
|
||||
catchMaybeIO $ withCreateProcess p go
|
||||
where
|
||||
go _ (Just outh) (Just errh) pid = do
|
||||
outt <- async $ tryIO (outfilter outh) >> hClose outh
|
||||
errt <- async $ tryIO (errfilter errh) >> hClose errh
|
||||
ret <- waitForProcess pid
|
||||
|
||||
-- Normally, now that the process has exited, the threads
|
||||
-- will finish processing its output and terminate.
|
||||
-- But, just in case the process did something evil like
|
||||
-- forking to the background while inheriting stderr,
|
||||
-- it's possible that the threads will not finish, which
|
||||
-- would result in a deadlock. So, wait a few seconds
|
||||
-- maximum for them to finish and then cancel them.
|
||||
-- (One program that has behaved this way in the past is
|
||||
-- openssh.)
|
||||
void $ tryNonAsync $ race_
|
||||
(wait outt >> wait errt)
|
||||
(threadDelaySeconds (Seconds 2))
|
||||
cancel outt
|
||||
cancel errt
|
||||
|
||||
go _ (Just outh) (Just errh) ph = do
|
||||
outt <- async $ tryIO (outfilter ph outh) >> hClose outh
|
||||
errt <- async $ tryIO (errfilter ph errh) >> hClose errh
|
||||
ret <- waitForProcess ph
|
||||
wait outt
|
||||
wait errt
|
||||
return ret
|
||||
go _ _ _ _ = error "internal"
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ module Utility.Process (
|
|||
withCreateProcess,
|
||||
waitForProcess,
|
||||
cleanupProcess,
|
||||
cancelOnExit,
|
||||
startInteractiveProcess,
|
||||
stdinHandle,
|
||||
stdoutHandle,
|
||||
|
@ -42,8 +43,10 @@ import System.Exit
|
|||
import System.IO
|
||||
import System.Log.Logger
|
||||
import Control.Monad.IO.Class
|
||||
import Control.Concurrent
|
||||
import Control.Concurrent.Async
|
||||
import qualified Data.ByteString as S
|
||||
import GHC.IO.Handle (hWaitForInput)
|
||||
|
||||
data StdHandle = StdinHandle | StdoutHandle | StderrHandle
|
||||
deriving (Eq)
|
||||
|
@ -229,3 +232,60 @@ cleanupProcess (mb_stdin, mb_stdout, mb_stderr, pid) = do
|
|||
maybe (return ()) hClose mb_stderr
|
||||
void $ waitForProcess pid
|
||||
#endif
|
||||
|
||||
{- | Like hGetLine, reads a line from the Handle. If the Handle is already
|
||||
- closed, returns Nothing. If the process exits without writing a line,
|
||||
- also returns Nothing.
|
||||
-
|
||||
- This is useful to protect against situations where the process might
|
||||
- have transferred the handle being read to another process, and so
|
||||
- the handle could remain open after the process has exited. That is a rare
|
||||
- situation, but can happen. Consider a the process that started up a
|
||||
- daemon, and the daemon inherited stderr from it, rather than the more
|
||||
- usual behavior of closing the file descriptor. Reading from stderr
|
||||
- would block past the exit of the process.
|
||||
-
|
||||
- In that situation, this will detect when the process has exited,
|
||||
- and avoid blocking forever. But will still return anything the process
|
||||
- buffered to the handle before exiting.
|
||||
-}
|
||||
hGetLineUntilExitOrEOF :: ProcessHandle -> Handle -> IO (Maybe String)
|
||||
hGetLineUntilExitOrEOF ph h = either Just id <$> (reader `race` waiter)
|
||||
where
|
||||
reader = hGetLine isEOF h >>= \case
|
||||
True -> return Nothing
|
||||
False -> Just <$> hGetLine h
|
||||
|
||||
waiter = do
|
||||
smalldelay
|
||||
_ <- waitForProcess ph
|
||||
waiter'
|
||||
-- Reached the end of the processes output.
|
||||
hClose h
|
||||
|
||||
waiter' = isanythingbuffered >>= \case
|
||||
True -> do
|
||||
-- Waiting for the reader to consume
|
||||
-- buffered output after the process has
|
||||
-- exited.
|
||||
threadDelay 10000 -- 1/100th second
|
||||
waiter'
|
||||
False -> return ()
|
||||
|
||||
isanythingbuffered =
|
||||
-- waitForInput is documented to throw an encoding
|
||||
-- error in some cases, if the Handle has buffered on it
|
||||
-- something that cannot be decoded. If it does,
|
||||
-- that does imply there's still something buffered though.
|
||||
catchNonAsync
|
||||
(const (return True))
|
||||
-- waitForInput can throw an EOF error
|
||||
(catchIOErrorType EOF
|
||||
(const (return False))
|
||||
(hWaitForInput h 0))
|
||||
|
||||
|
||||
-- A small delay avoids starting the work of waitForProcess
|
||||
-- unncessarily in the common case where hGetLine gets a buffered
|
||||
-- line immediately.
|
||||
smalldelay = threadDelay 10000 -- 1/100th second
|
||||
|
|
Loading…
Reference in a new issue