From 5f269513af543e9e6f65824aaf22d54d72005c10 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 11 Mar 2025 12:40:21 -0400 Subject: [PATCH] buffer responses to compute programs in a TQueue This avoids a potential problem where the program sends several INPUT before reading responses, so flushing the respose to the pipe could block. It's unlikely, but seemed worth making sure it can't happen. --- Remote/Compute.hs | 59 ++++++++++++------- ...ompute_special_remote_remaining_todos.mdwn | 5 -- 2 files changed, 39 insertions(+), 25 deletions(-) diff --git a/Remote/Compute.hs b/Remote/Compute.hs index 0b27d135ba..2ef7844808 100644 --- a/Remote/Compute.hs +++ b/Remote/Compute.hs @@ -435,10 +435,12 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) showOutput starttime <- liftIO currentMonotonicTimestamp let startresult = ComputeProgramResult state False False False - result <- withmeterfile $ \meterfile -> bracket - (liftIO $ createProcess pr) - (liftIO . cleanupProcess) - (getinput tmpdir subdir startresult meterfile) + result <- withmeterfile $ \meterfile -> + bracket + (liftIO $ createProcess pr) + (liftIO . cleanupProcess) $ \p -> + withoutputv p $ + getinput tmpdir subdir startresult meterfile p endtime <- liftIO currentMonotonicTimestamp liftIO $ checkoutputs result subdir cont result subdir (calcduration starttime endtime) @@ -453,14 +455,14 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) , return tmpdir ) - getinput tmpdir subdir result meterfile p = + getinput tmpdir subdir result meterfile p outputv = liftIO (hGetLineUntilExitOrEOF (processHandle p) (stdoutHandle p)) >>= \case Just l - | null l -> getinput tmpdir subdir result meterfile p + | null l -> getinput tmpdir subdir result meterfile p outputv | otherwise -> do fastDebug "Compute" ("< " ++ l) - result' <- parseoutput p tmpdir subdir result meterfile l - getinput tmpdir subdir result' meterfile p + result' <- parseoutput outputv tmpdir subdir result meterfile l + getinput tmpdir subdir result' meterfile p outputv Nothing -> do liftIO $ hClose (stdoutHandle p) liftIO $ hClose (stdinHandle p) @@ -468,19 +470,14 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) giveup $ program ++ " exited unsuccessfully" return result - sendresponse p s = do - fastDebug "Compute" ("> " ++ s) - liftIO $ hPutStrLn (stdinHandle p) s - liftIO $ hFlush (stdinHandle p) - - parseoutput p tmpdir subdir result meterfile l = case Proto.parseMessage l of - Just (ProcessInput f) -> handleinput f False p tmpdir subdir result - Just (ProcessInputRequired f) -> handleinput f True p tmpdir subdir result + parseoutput outputv tmpdir subdir result meterfile l = case Proto.parseMessage l of + Just (ProcessInput f) -> handleinput f False outputv tmpdir subdir result + Just (ProcessInputRequired f) -> handleinput f True outputv tmpdir subdir result Just (ProcessOutput f) -> do let f' = toOsPath f checksafefile tmpdir subdir f' "output" -- Modify filename so eg "-foo" becomes "./-foo" - sendresponse p $ toCommand' (File f) + sendresponse outputv $ toCommand' (File f) -- If the output file is in a subdirectory, make -- the directories so the compute program doesn't -- need to. @@ -508,7 +505,7 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) Just ProcessSandbox -> do sandboxpath <- liftIO $ fromOsPath <$> relPathDirToFile subdir tmpdir - sendresponse p $ + sendresponse outputv $ if null sandboxpath then "." else sandboxpath @@ -516,7 +513,7 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) Nothing -> giveup $ program ++ " output an unparseable line: \"" ++ l ++ "\"" - handleinput f required p tmpdir subdir result = do + handleinput f required outputv tmpdir subdir result = do let f' = toOsPath f let knowninput = M.member f' (computeInputs (computeState result)) @@ -534,7 +531,7 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) mkrel $ pure obj Just (Left gitsha) -> mkrel $ populategitsha gitsha tmpdir - sendresponse p $ + sendresponse outputv $ maybe "" fromOsPath mp let result' = result { computeInputsUnavailable = @@ -630,6 +627,28 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) Just sz -> progress $ BytesProcessed $ floor $ fromIntegral sz * percent / 100 + + withoutputv p a = do + outputv <- liftIO $ atomically newTQueue + let cleanup pid = do + atomically $ writeTQueue outputv Nothing + wait pid + bracket + (liftIO $ async $ sendoutput' p outputv) + (liftIO . cleanup) + (const $ a outputv) + + sendoutput' p outputv = + atomically (readTQueue outputv) >>= \case + Nothing -> return () + Just s -> do + liftIO $ hPutStrLn (stdinHandle p) s + liftIO $ hFlush (stdinHandle p) + sendoutput' p outputv + + sendresponse outputv s = do + fastDebug "Compute" ("> " ++ s) + liftIO $ atomically $ writeTQueue outputv (Just s) computationBehaviorChangeError :: ComputeProgram -> String -> OsPath -> Annex a computationBehaviorChangeError (ComputeProgram program) requestdesc p = diff --git a/doc/todo/compute_special_remote_remaining_todos.mdwn b/doc/todo/compute_special_remote_remaining_todos.mdwn index bba17b2300..f478c5d966 100644 --- a/doc/todo/compute_special_remote_remaining_todos.mdwn +++ b/doc/todo/compute_special_remote_remaining_todos.mdwn @@ -1,11 +1,6 @@ This is the remainder of my todo list while I was building the compute special remote. --[[Joey]] -* git-annex responds to each INPUT immediately, and flushes stdout. - This could cause problems if the program is sending several INPUT - first, before reading responses, as is documented it should do to allow - for parallel get of the input files. - * write a tip showing how to use this * Support parallel get of input files. The design allows for this,