buffer responses to compute programs in a TQueue

This avoids a potential problem where the program sends several INPUT
before reading responses, so flushing the respose to the pipe could
block. It's unlikely, but seemed worth making sure it can't happen.
This commit is contained in:
Joey Hess 2025-03-11 12:40:21 -04:00
parent 0ee644b417
commit 5f269513af
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
2 changed files with 39 additions and 25 deletions

View file

@ -435,10 +435,12 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
showOutput showOutput
starttime <- liftIO currentMonotonicTimestamp starttime <- liftIO currentMonotonicTimestamp
let startresult = ComputeProgramResult state False False False let startresult = ComputeProgramResult state False False False
result <- withmeterfile $ \meterfile -> bracket result <- withmeterfile $ \meterfile ->
bracket
(liftIO $ createProcess pr) (liftIO $ createProcess pr)
(liftIO . cleanupProcess) (liftIO . cleanupProcess) $ \p ->
(getinput tmpdir subdir startresult meterfile) withoutputv p $
getinput tmpdir subdir startresult meterfile p
endtime <- liftIO currentMonotonicTimestamp endtime <- liftIO currentMonotonicTimestamp
liftIO $ checkoutputs result subdir liftIO $ checkoutputs result subdir
cont result subdir (calcduration starttime endtime) cont result subdir (calcduration starttime endtime)
@ -453,14 +455,14 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
, return tmpdir , return tmpdir
) )
getinput tmpdir subdir result meterfile p = getinput tmpdir subdir result meterfile p outputv =
liftIO (hGetLineUntilExitOrEOF (processHandle p) (stdoutHandle p)) >>= \case liftIO (hGetLineUntilExitOrEOF (processHandle p) (stdoutHandle p)) >>= \case
Just l Just l
| null l -> getinput tmpdir subdir result meterfile p | null l -> getinput tmpdir subdir result meterfile p outputv
| otherwise -> do | otherwise -> do
fastDebug "Compute" ("< " ++ l) fastDebug "Compute" ("< " ++ l)
result' <- parseoutput p tmpdir subdir result meterfile l result' <- parseoutput outputv tmpdir subdir result meterfile l
getinput tmpdir subdir result' meterfile p getinput tmpdir subdir result' meterfile p outputv
Nothing -> do Nothing -> do
liftIO $ hClose (stdoutHandle p) liftIO $ hClose (stdoutHandle p)
liftIO $ hClose (stdinHandle p) liftIO $ hClose (stdinHandle p)
@ -468,19 +470,14 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
giveup $ program ++ " exited unsuccessfully" giveup $ program ++ " exited unsuccessfully"
return result return result
sendresponse p s = do parseoutput outputv tmpdir subdir result meterfile l = case Proto.parseMessage l of
fastDebug "Compute" ("> " ++ s) Just (ProcessInput f) -> handleinput f False outputv tmpdir subdir result
liftIO $ hPutStrLn (stdinHandle p) s Just (ProcessInputRequired f) -> handleinput f True outputv tmpdir subdir result
liftIO $ hFlush (stdinHandle p)
parseoutput p tmpdir subdir result meterfile l = case Proto.parseMessage l of
Just (ProcessInput f) -> handleinput f False p tmpdir subdir result
Just (ProcessInputRequired f) -> handleinput f True p tmpdir subdir result
Just (ProcessOutput f) -> do Just (ProcessOutput f) -> do
let f' = toOsPath f let f' = toOsPath f
checksafefile tmpdir subdir f' "output" checksafefile tmpdir subdir f' "output"
-- Modify filename so eg "-foo" becomes "./-foo" -- Modify filename so eg "-foo" becomes "./-foo"
sendresponse p $ toCommand' (File f) sendresponse outputv $ toCommand' (File f)
-- If the output file is in a subdirectory, make -- If the output file is in a subdirectory, make
-- the directories so the compute program doesn't -- the directories so the compute program doesn't
-- need to. -- need to.
@ -508,7 +505,7 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
Just ProcessSandbox -> do Just ProcessSandbox -> do
sandboxpath <- liftIO $ fromOsPath <$> sandboxpath <- liftIO $ fromOsPath <$>
relPathDirToFile subdir tmpdir relPathDirToFile subdir tmpdir
sendresponse p $ sendresponse outputv $
if null sandboxpath if null sandboxpath
then "." then "."
else sandboxpath else sandboxpath
@ -516,7 +513,7 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
Nothing -> giveup $ Nothing -> giveup $
program ++ " output an unparseable line: \"" ++ l ++ "\"" program ++ " output an unparseable line: \"" ++ l ++ "\""
handleinput f required p tmpdir subdir result = do handleinput f required outputv tmpdir subdir result = do
let f' = toOsPath f let f' = toOsPath f
let knowninput = M.member f' let knowninput = M.member f'
(computeInputs (computeState result)) (computeInputs (computeState result))
@ -534,7 +531,7 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
mkrel $ pure obj mkrel $ pure obj
Just (Left gitsha) -> Just (Left gitsha) ->
mkrel $ populategitsha gitsha tmpdir mkrel $ populategitsha gitsha tmpdir
sendresponse p $ sendresponse outputv $
maybe "" fromOsPath mp maybe "" fromOsPath mp
let result' = result let result' = result
{ computeInputsUnavailable = { computeInputsUnavailable =
@ -631,6 +628,28 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
progress $ BytesProcessed $ floor $ progress $ BytesProcessed $ floor $
fromIntegral sz * percent / 100 fromIntegral sz * percent / 100
withoutputv p a = do
outputv <- liftIO $ atomically newTQueue
let cleanup pid = do
atomically $ writeTQueue outputv Nothing
wait pid
bracket
(liftIO $ async $ sendoutput' p outputv)
(liftIO . cleanup)
(const $ a outputv)
sendoutput' p outputv =
atomically (readTQueue outputv) >>= \case
Nothing -> return ()
Just s -> do
liftIO $ hPutStrLn (stdinHandle p) s
liftIO $ hFlush (stdinHandle p)
sendoutput' p outputv
sendresponse outputv s = do
fastDebug "Compute" ("> " ++ s)
liftIO $ atomically $ writeTQueue outputv (Just s)
computationBehaviorChangeError :: ComputeProgram -> String -> OsPath -> Annex a computationBehaviorChangeError :: ComputeProgram -> String -> OsPath -> Annex a
computationBehaviorChangeError (ComputeProgram program) requestdesc p = computationBehaviorChangeError (ComputeProgram program) requestdesc p =
giveup $ program ++ " is not behaving the same way it used to, now " ++ requestdesc ++ ": " ++ fromOsPath p giveup $ program ++ " is not behaving the same way it used to, now " ++ requestdesc ++ ": " ++ fromOsPath p

View file

@ -1,11 +1,6 @@
This is the remainder of my todo list while I was building the This is the remainder of my todo list while I was building the
compute special remote. --[[Joey]] compute special remote. --[[Joey]]
* git-annex responds to each INPUT immediately, and flushes stdout.
This could cause problems if the program is sending several INPUT
first, before reading responses, as is documented it should do to allow
for parallel get of the input files.
* write a tip showing how to use this * write a tip showing how to use this
* Support parallel get of input files. The design allows for this, * Support parallel get of input files. The design allows for this,