convert to withCreateProcess for async exception safety
This handles all sites where checkSuccessProcess/ignoreFailureProcess is used, except for one: Git.Command.pipeReadLazy That one will be significantly more work to convert to bracketing. (Also skipped Command.Assistant.autoStart, but it does not need to shut down the processes it started on exception because they are git-annex assistant daemons..) forceSuccessProcess is done, except for createProcessSuccess. All call sites of createProcessSuccess will need to be converted to bracketing. (process pools still todo also)
This commit is contained in:
parent
2dc7b5186a
commit
438dbe3b66
13 changed files with 125 additions and 87 deletions
|
@ -205,12 +205,14 @@ youtubeDlFileNameHtmlOnly' url uo
|
||||||
, Param "--no-warnings"
|
, Param "--no-warnings"
|
||||||
, Param "--no-playlist"
|
, Param "--no-playlist"
|
||||||
]
|
]
|
||||||
(Nothing, Just o, Just e, pid) <- liftIO $ createProcess
|
let p = (proc "youtube-dl" (toCommand opts))
|
||||||
(proc "youtube-dl" (toCommand opts))
|
{ std_out = CreatePipe
|
||||||
{ std_out = CreatePipe
|
, std_err = CreatePipe
|
||||||
, std_err = CreatePipe
|
}
|
||||||
}
|
liftIO $ withCreateProcess p waitproc
|
||||||
output <- liftIO $ fmap fst $
|
|
||||||
|
waitproc Nothing (Just o) (Just e) pid = do
|
||||||
|
output <- fmap fst $
|
||||||
hGetContentsStrict o
|
hGetContentsStrict o
|
||||||
`concurrently`
|
`concurrently`
|
||||||
hGetContentsStrict e
|
hGetContentsStrict e
|
||||||
|
@ -218,6 +220,8 @@ youtubeDlFileNameHtmlOnly' url uo
|
||||||
return $ case (ok, lines output) of
|
return $ case (ok, lines output) of
|
||||||
(True, (f:_)) | not (null f) -> Right f
|
(True, (f:_)) | not (null f) -> Right f
|
||||||
_ -> nomedia
|
_ -> nomedia
|
||||||
|
waitproc _ _ _ _ = error "internal"
|
||||||
|
|
||||||
nomedia = Left "no media in url"
|
nomedia = Left "no media in url"
|
||||||
|
|
||||||
youtubeDlOpts :: [CommandParam] -> Annex [CommandParam]
|
youtubeDlOpts :: [CommandParam] -> Annex [CommandParam]
|
||||||
|
|
|
@ -113,7 +113,5 @@ assistantListening url = catchBoolIO $ do
|
||||||
startAssistant :: FilePath -> IO ()
|
startAssistant :: FilePath -> IO ()
|
||||||
startAssistant repo = void $ forkIO $ do
|
startAssistant repo = void $ forkIO $ do
|
||||||
program <- programPath
|
program <- programPath
|
||||||
(_, _, _, pid) <-
|
let p = (proc program ["assistant"]) { cwd = Just repo }
|
||||||
createProcess $
|
withCreateProcess p $ \_ _ _ pid -> void $ checkSuccessProcess pid
|
||||||
(proc program ["assistant"]) { cwd = Just repo }
|
|
||||||
void $ checkSuccessProcess pid
|
|
||||||
|
|
|
@ -32,20 +32,27 @@ remoteControlThread = namedThread "RemoteControl" $ do
|
||||||
(cmd, params) <- liftIO $ toBatchCommand
|
(cmd, params) <- liftIO $ toBatchCommand
|
||||||
(program, [Param "remotedaemon", Param "--foreground"])
|
(program, [Param "remotedaemon", Param "--foreground"])
|
||||||
let p = proc cmd (toCommand params)
|
let p = proc cmd (toCommand params)
|
||||||
(Just toh, Just fromh, _, pid) <- liftIO $ createProcess p
|
bracket (setup p) cleanup (go p)
|
||||||
|
where
|
||||||
|
setup p = liftIO $ createProcess $ p
|
||||||
{ std_in = CreatePipe
|
{ std_in = CreatePipe
|
||||||
, std_out = CreatePipe
|
, std_out = CreatePipe
|
||||||
}
|
}
|
||||||
|
cleanup = liftIO . cleanupProcess
|
||||||
urimap <- liftIO . newMVar =<< liftAnnex getURIMap
|
|
||||||
|
|
||||||
controller <- asIO $ remoteControllerThread toh
|
go p (Just toh, Just fromh, _, pid) = do
|
||||||
responder <- asIO $ remoteResponderThread fromh urimap
|
urimap <- liftIO . newMVar =<< liftAnnex getURIMap
|
||||||
|
|
||||||
-- run controller and responder until the remotedaemon dies
|
controller <- asIO $ remoteControllerThread toh
|
||||||
liftIO $ void $ tryNonAsync $ controller `concurrently` responder
|
responder <- asIO $ remoteResponderThread fromh urimap
|
||||||
debug ["remotedaemon exited"]
|
|
||||||
liftIO $ forceSuccessProcess p pid
|
-- run controller and responder until the remotedaemon dies
|
||||||
|
liftIO $ void $ tryNonAsync $
|
||||||
|
controller `concurrently` responder
|
||||||
|
debug ["remotedaemon exited"]
|
||||||
|
liftIO $ forceSuccessProcess p pid
|
||||||
|
go _ _ = error "internal"
|
||||||
|
|
||||||
|
|
||||||
-- feed from the remoteControl channel into the remotedaemon
|
-- feed from the remoteControl channel into the remotedaemon
|
||||||
remoteControllerThread :: Handle -> Assistant ()
|
remoteControllerThread :: Handle -> Assistant ()
|
||||||
|
|
|
@ -180,14 +180,16 @@ querySingle o r repo reader = assertLocal repo $
|
||||||
, std_in = Inherit
|
, std_in = Inherit
|
||||||
, std_out = CreatePipe
|
, std_out = CreatePipe
|
||||||
}
|
}
|
||||||
pid <- createProcess p'
|
withCreateProcess p' go
|
||||||
let h = stdoutHandle pid
|
where
|
||||||
output <- reader h
|
go _ (Just outh) _ pid = do
|
||||||
hClose h
|
output <- reader outh
|
||||||
ifM (checkSuccessProcess (processHandle pid))
|
hClose outh
|
||||||
|
ifM (checkSuccessProcess pid)
|
||||||
( return (Just output)
|
( return (Just output)
|
||||||
, return Nothing
|
, return Nothing
|
||||||
)
|
)
|
||||||
|
go _ _ _ _ = error "internal"
|
||||||
|
|
||||||
querySize :: Ref -> Repo -> IO (Maybe FileSize)
|
querySize :: Ref -> Repo -> IO (Maybe FileSize)
|
||||||
querySize r repo = maybe Nothing (readMaybe . takeWhile (/= '\n'))
|
querySize r repo = maybe Nothing (readMaybe . takeWhile (/= '\n'))
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{- running git commands
|
{- running git commands
|
||||||
-
|
-
|
||||||
- Copyright 2010-2013 Joey Hess <id@joeyh.name>
|
- Copyright 2010-2020 Joey Hess <id@joeyh.name>
|
||||||
-
|
-
|
||||||
- Licensed under the GNU AGPL version 3 or higher.
|
- Licensed under the GNU AGPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
@ -70,13 +70,17 @@ pipeReadStrict = pipeReadStrict' S.hGetContents
|
||||||
|
|
||||||
{- The reader action must be strict. -}
|
{- The reader action must be strict. -}
|
||||||
pipeReadStrict' :: (Handle -> IO a) -> [CommandParam] -> Repo -> IO a
|
pipeReadStrict' :: (Handle -> IO a) -> [CommandParam] -> Repo -> IO a
|
||||||
pipeReadStrict' reader params repo = assertLocal repo $
|
pipeReadStrict' reader params repo = assertLocal repo $ withCreateProcess p go
|
||||||
withHandle StdoutHandle (createProcessChecked ignoreFailureProcess) p $ \h -> do
|
|
||||||
output <- reader h
|
|
||||||
hClose h
|
|
||||||
return output
|
|
||||||
where
|
where
|
||||||
p = gitCreateProcess params repo
|
p = (gitCreateProcess params repo)
|
||||||
|
{ std_out = CreatePipe }
|
||||||
|
|
||||||
|
go _ (Just outh) _ pid = do
|
||||||
|
output <- reader outh
|
||||||
|
hClose outh
|
||||||
|
void $ waitForProcess pid
|
||||||
|
return output
|
||||||
|
go _ _ _ _ = error "internal"
|
||||||
|
|
||||||
{- Runs a git command, feeding it an input, and returning its output,
|
{- Runs a git command, feeding it an input, and returning its output,
|
||||||
- which is expected to be fairly small, since it's all read into memory
|
- which is expected to be fairly small, since it's all read into memory
|
||||||
|
|
44
Git/Fsck.hs
44
Git/Fsck.hs
|
@ -77,27 +77,31 @@ findBroken batchmode r = do
|
||||||
then toBatchCommand (command, params)
|
then toBatchCommand (command, params)
|
||||||
else return (command, params)
|
else return (command, params)
|
||||||
|
|
||||||
p@(_, _, _, pid) <- createProcess $
|
let p = (proc command' (toCommand params'))
|
||||||
(proc command' (toCommand params'))
|
{ std_out = CreatePipe
|
||||||
{ std_out = CreatePipe
|
, std_err = CreatePipe
|
||||||
, std_err = CreatePipe
|
}
|
||||||
}
|
withCreateProcess p go
|
||||||
(o1, o2) <- concurrently
|
|
||||||
(parseFsckOutput maxobjs r (stdoutHandle p))
|
|
||||||
(parseFsckOutput maxobjs r (stderrHandle p))
|
|
||||||
fsckok <- checkSuccessProcess pid
|
|
||||||
case mappend o1 o2 of
|
|
||||||
FsckOutput badobjs truncated
|
|
||||||
| S.null badobjs && not fsckok -> return FsckFailed
|
|
||||||
| otherwise -> return $ FsckFoundMissing badobjs truncated
|
|
||||||
NoFsckOutput
|
|
||||||
| not fsckok -> return FsckFailed
|
|
||||||
| otherwise -> return noproblem
|
|
||||||
-- If all fsck output was duplicateEntries warnings,
|
|
||||||
-- the repository is not broken, it just has some unusual
|
|
||||||
-- tree objects in it. So ignore nonzero exit status.
|
|
||||||
AllDuplicateEntriesWarning -> return noproblem
|
|
||||||
where
|
where
|
||||||
|
go _ (Just outh) (Just errh) pid = do
|
||||||
|
(o1, o2) <- concurrently
|
||||||
|
(parseFsckOutput maxobjs r outh)
|
||||||
|
(parseFsckOutput maxobjs r errh)
|
||||||
|
fsckok <- checkSuccessProcess pid
|
||||||
|
case mappend o1 o2 of
|
||||||
|
FsckOutput badobjs truncated
|
||||||
|
| S.null badobjs && not fsckok -> return FsckFailed
|
||||||
|
| otherwise -> return $ FsckFoundMissing badobjs truncated
|
||||||
|
NoFsckOutput
|
||||||
|
| not fsckok -> return FsckFailed
|
||||||
|
| otherwise -> return noproblem
|
||||||
|
-- If all fsck output was duplicateEntries warnings,
|
||||||
|
-- the repository is not broken, it just has some
|
||||||
|
-- unusual tree objects in it. So ignore nonzero
|
||||||
|
-- exit status.
|
||||||
|
AllDuplicateEntriesWarning -> return noproblem
|
||||||
|
go _ _ _ _ = error "internal"
|
||||||
|
|
||||||
maxobjs = 10000
|
maxobjs = 10000
|
||||||
noproblem = FsckFoundMissing S.empty False
|
noproblem = FsckFoundMissing S.empty False
|
||||||
|
|
||||||
|
|
|
@ -136,15 +136,7 @@ indexPath = toInternalGitPath . getTopFilePath
|
||||||
|
|
||||||
{- Refreshes the index, by checking file stat information. -}
|
{- Refreshes the index, by checking file stat information. -}
|
||||||
refreshIndex :: Repo -> ((FilePath -> IO ()) -> IO ()) -> IO Bool
|
refreshIndex :: Repo -> ((FilePath -> IO ()) -> IO ()) -> IO Bool
|
||||||
refreshIndex repo feeder = do
|
refreshIndex repo feeder = withCreateProcess p go
|
||||||
(Just h, _, _, p) <- createProcess (gitCreateProcess params repo)
|
|
||||||
{ std_in = CreatePipe }
|
|
||||||
feeder $ \f -> do
|
|
||||||
hPutStr h f
|
|
||||||
hPutStr h "\0"
|
|
||||||
hFlush h
|
|
||||||
hClose h
|
|
||||||
checkSuccessProcess p
|
|
||||||
where
|
where
|
||||||
params =
|
params =
|
||||||
[ Param "update-index"
|
[ Param "update-index"
|
||||||
|
@ -153,3 +145,15 @@ refreshIndex repo feeder = do
|
||||||
, Param "-z"
|
, Param "-z"
|
||||||
, Param "--stdin"
|
, Param "--stdin"
|
||||||
]
|
]
|
||||||
|
|
||||||
|
p = (gitCreateProcess params repo)
|
||||||
|
{ std_in = CreatePipe }
|
||||||
|
|
||||||
|
go (Just h) _ _ pid = do
|
||||||
|
feeder $ \f -> do
|
||||||
|
hPutStr h f
|
||||||
|
hPutStr h "\0"
|
||||||
|
hFlush h
|
||||||
|
hClose h
|
||||||
|
checkSuccessProcess pid
|
||||||
|
go _ _ _ _ = error "internal"
|
||||||
|
|
|
@ -164,10 +164,16 @@ store r buprepo = byteStorer $ \k b p -> do
|
||||||
retrieve :: BupRepo -> Retriever
|
retrieve :: BupRepo -> Retriever
|
||||||
retrieve buprepo = byteRetriever $ \k sink -> do
|
retrieve buprepo = byteRetriever $ \k sink -> do
|
||||||
let params = bupParams "join" buprepo [Param $ bupRef k]
|
let params = bupParams "join" buprepo [Param $ bupRef k]
|
||||||
let p = proc "bup" (toCommand params)
|
let p = (proc "bup" (toCommand params))
|
||||||
(_, Just h, _, pid) <- liftIO $ createProcess $ p { std_out = CreatePipe }
|
{ std_out = CreatePipe }
|
||||||
liftIO (hClose h >> forceSuccessProcess p pid)
|
bracketIO (createProcess p) cleanupProcess (go sink p)
|
||||||
`after` (sink =<< liftIO (L.hGetContents h))
|
where
|
||||||
|
go sink p (_, Just h, _, pid) = do
|
||||||
|
() <- sink =<< liftIO (L.hGetContents h)
|
||||||
|
liftIO $ do
|
||||||
|
hClose h
|
||||||
|
forceSuccessProcess p pid
|
||||||
|
go _ _ _ = error "internal"
|
||||||
|
|
||||||
{- Cannot revert having stored a key in bup, but at least the data for the
|
{- Cannot revert having stored a key in bup, but at least the data for the
|
||||||
- key will be used for deltaing data of other keys stored later.
|
- key will be used for deltaing data of other keys stored later.
|
||||||
|
|
|
@ -157,10 +157,16 @@ ddarExtractRemoteCall cs ddarrepo k =
|
||||||
retrieve :: DdarRepo -> Retriever
|
retrieve :: DdarRepo -> Retriever
|
||||||
retrieve ddarrepo = byteRetriever $ \k sink -> do
|
retrieve ddarrepo = byteRetriever $ \k sink -> do
|
||||||
(cmd, params) <- ddarExtractRemoteCall NoConsumeStdin ddarrepo k
|
(cmd, params) <- ddarExtractRemoteCall NoConsumeStdin ddarrepo k
|
||||||
let p = (proc cmd $ toCommand params) { std_out = CreatePipe }
|
let p = (proc cmd $ toCommand params)
|
||||||
(_, Just h, _, pid) <- liftIO $ createProcess p
|
{ std_out = CreatePipe }
|
||||||
liftIO (hClose h >> forceSuccessProcess p pid)
|
bracketIO (createProcess p) cleanupProcess (go sink p)
|
||||||
`after` (sink =<< liftIO (L.hGetContents h))
|
where
|
||||||
|
go sink p (_, Just h, _, pid) = do
|
||||||
|
() <- sink =<< liftIO (L.hGetContents h)
|
||||||
|
liftIO $ do
|
||||||
|
hClose h
|
||||||
|
forceSuccessProcess p pid
|
||||||
|
go _ _ _ = error "internal"
|
||||||
|
|
||||||
remove :: DdarRepo -> Remover
|
remove :: DdarRepo -> Remover
|
||||||
remove ddarrepo key = do
|
remove ddarrepo key = do
|
||||||
|
|
|
@ -185,14 +185,15 @@ retrieve' r k sink = go =<< glacierEnv c gc u
|
||||||
]
|
]
|
||||||
go Nothing = giveup "cannot retrieve from glacier"
|
go Nothing = giveup "cannot retrieve from glacier"
|
||||||
go (Just environ) = do
|
go (Just environ) = do
|
||||||
let cmd = (proc "glacier" (toCommand params))
|
let p = (proc "glacier" (toCommand params))
|
||||||
{ env = Just environ
|
{ env = Just environ
|
||||||
, std_out = CreatePipe
|
, std_out = CreatePipe
|
||||||
}
|
}
|
||||||
(_, Just h, _, pid) <- liftIO $ createProcess cmd
|
bracketIO (createProcess p) cleanupProcess (go' p)
|
||||||
|
go' p (_, Just h, _, pid) = do
|
||||||
let cleanup = liftIO $ do
|
let cleanup = liftIO $ do
|
||||||
hClose h
|
hClose h
|
||||||
forceSuccessProcess cmd pid
|
forceSuccessProcess p pid
|
||||||
flip finally cleanup $ do
|
flip finally cleanup $ do
|
||||||
-- Glacier cannot store empty files, so if
|
-- Glacier cannot store empty files, so if
|
||||||
-- the output is empty, the content is not
|
-- the output is empty, the content is not
|
||||||
|
@ -200,6 +201,7 @@ retrieve' r k sink = go =<< glacierEnv c gc u
|
||||||
whenM (liftIO $ hIsEOF h) $
|
whenM (liftIO $ hIsEOF h) $
|
||||||
giveup "Content is not available from glacier yet. Recommend you wait up to 4 hours, and then run this command again."
|
giveup "Content is not available from glacier yet. Recommend you wait up to 4 hours, and then run this command again."
|
||||||
sink =<< liftIO (L.hGetContents h)
|
sink =<< liftIO (L.hGetContents h)
|
||||||
|
go' _ _ = error "internal"
|
||||||
|
|
||||||
remove :: Remote -> Remover
|
remove :: Remote -> Remover
|
||||||
remove r k = unlessM go $
|
remove r k = unlessM go $
|
||||||
|
|
|
@ -171,16 +171,19 @@ pipeLazy (GpgCmd cmd) params feeder reader = do
|
||||||
, std_out = CreatePipe
|
, std_out = CreatePipe
|
||||||
, std_err = Inherit
|
, std_err = Inherit
|
||||||
}
|
}
|
||||||
bracket (setup p) (cleanup p) go
|
bracket (setup p) cleanup (go p)
|
||||||
where
|
where
|
||||||
setup = liftIO . createProcess
|
setup = liftIO . createProcess
|
||||||
cleanup p (_, _, _, pid) = liftIO $ forceSuccessProcess p pid
|
cleanup = liftIO . cleanupProcess
|
||||||
go p = do
|
|
||||||
let (to, from) = ioHandles p
|
go p (Just to, Just from, _, pid) = do
|
||||||
liftIO $ void $ forkIO $ do
|
liftIO $ void $ forkIO $ do
|
||||||
feeder to
|
feeder to
|
||||||
hClose to
|
hClose to
|
||||||
reader from
|
r <- reader from
|
||||||
|
liftIO $ forceSuccessProcess p pid
|
||||||
|
return r
|
||||||
|
go _ _ = error "internal"
|
||||||
|
|
||||||
{- Finds gpg public keys matching some string. (Could be an email address,
|
{- Finds gpg public keys matching some string. (Could be an email address,
|
||||||
- a key id, or a name; See the section 'HOW TO SPECIFY A USER ID' of
|
- a key id, or a name; See the section 'HOW TO SPECIFY A USER ID' of
|
||||||
|
|
|
@ -49,11 +49,16 @@ queryDir path = query ["+d", path]
|
||||||
- Note: If lsof is not available, this always returns [] !
|
- Note: If lsof is not available, this always returns [] !
|
||||||
-}
|
-}
|
||||||
query :: [String] -> IO [(FilePath, LsofOpenMode, ProcessInfo)]
|
query :: [String] -> IO [(FilePath, LsofOpenMode, ProcessInfo)]
|
||||||
query opts =
|
query opts = withCreateProcess p go
|
||||||
withHandle StdoutHandle (createProcessChecked checkSuccessProcess) p $
|
|
||||||
parse <$$> hGetContentsStrict
|
|
||||||
where
|
where
|
||||||
p = proc "lsof" ("-F0can" : opts)
|
p = (proc "lsof" ("-F0can" : opts))
|
||||||
|
{ std_out = CreatePipe }
|
||||||
|
|
||||||
|
go _ (Just outh) _ pid = do
|
||||||
|
r <- parse <$> hGetContentsStrict outh
|
||||||
|
void $ waitForProcess pid
|
||||||
|
return r
|
||||||
|
go _ _ _ _ = error "internal"
|
||||||
|
|
||||||
type LsofParser = String -> [(FilePath, LsofOpenMode, ProcessInfo)]
|
type LsofParser = String -> [(FilePath, LsofOpenMode, ProcessInfo)]
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,6 @@ module Utility.Process (
|
||||||
forceSuccessProcess,
|
forceSuccessProcess,
|
||||||
forceSuccessProcess',
|
forceSuccessProcess',
|
||||||
checkSuccessProcess,
|
checkSuccessProcess,
|
||||||
ignoreFailureProcess,
|
|
||||||
createProcessSuccess,
|
createProcessSuccess,
|
||||||
createProcessChecked,
|
createProcessChecked,
|
||||||
createBackgroundProcess,
|
createBackgroundProcess,
|
||||||
|
@ -53,7 +52,6 @@ import System.Log.Logger
|
||||||
import Control.Monad.IO.Class
|
import Control.Monad.IO.Class
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
import qualified Control.Exception as E
|
import qualified Control.Exception as E
|
||||||
import Control.Monad
|
|
||||||
import qualified Data.ByteString as S
|
import qualified Data.ByteString as S
|
||||||
|
|
||||||
type CreateProcessRunner = forall a. CreateProcess -> ((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> IO a) -> IO a
|
type CreateProcessRunner = forall a. CreateProcess -> ((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> IO a) -> IO a
|
||||||
|
@ -135,11 +133,6 @@ checkSuccessProcess pid = do
|
||||||
code <- waitForProcess pid
|
code <- waitForProcess pid
|
||||||
return $ code == ExitSuccess
|
return $ code == ExitSuccess
|
||||||
|
|
||||||
ignoreFailureProcess :: ProcessHandle -> IO Bool
|
|
||||||
ignoreFailureProcess pid = do
|
|
||||||
void $ waitForProcess pid
|
|
||||||
return True
|
|
||||||
|
|
||||||
-- | Runs createProcess, then an action on its handles, and then
|
-- | Runs createProcess, then an action on its handles, and then
|
||||||
-- forceSuccessProcess.
|
-- forceSuccessProcess.
|
||||||
createProcessSuccess :: CreateProcessRunner
|
createProcessSuccess :: CreateProcessRunner
|
||||||
|
|
Loading…
Add table
Reference in a new issue