From ccc454a791c303f04c2e38b6afdc03b02049d6c7 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 5 Mar 2025 13:46:06 -0400 Subject: [PATCH] computation progress display --- Command/AddComputed.hs | 1 + Command/Recompute.hs | 1 + Remote/Compute.hs | 85 +++++++++++++++++++++++++++++++----------- TODO-compute | 2 - 4 files changed, 66 insertions(+), 23 deletions(-) diff --git a/Command/AddComputed.hs b/Command/AddComputed.hs index 226f2c0c08..f05f3bdfcd 100644 --- a/Command/AddComputed.hs +++ b/Command/AddComputed.hs @@ -101,6 +101,7 @@ perform o r = do Remote.Compute.runComputeProgram program state (Remote.Compute.ImmutableState False) (getInputContent fast) + Nothing (addComputed "adding" True r (reproducible o) chooseBackend Just fast) next $ return True diff --git a/Command/Recompute.hs b/Command/Recompute.hs index 2eda098867..8233bc87e7 100644 --- a/Command/Recompute.hs +++ b/Command/Recompute.hs @@ -129,6 +129,7 @@ perform o r file origkey origstate = do Remote.Compute.runComputeProgram program origstate (Remote.Compute.ImmutableState False) (getinputcontent program) + Nothing (go program reproducibleconfig) next $ return True where diff --git a/Remote/Compute.hs b/Remote/Compute.hs index 8c06dd9061..8b64dee56e 100644 --- a/Remote/Compute.hs +++ b/Remote/Compute.hs @@ -44,6 +44,7 @@ import qualified Annex.Transfer import Logs.MetaData import Logs.EquivilantKeys import Logs.Location +import Messages.Progress import Utility.Metered import Utility.TimeStamp import Utility.Env @@ -59,6 +60,8 @@ import qualified Utility.SimpleProtocol as Proto import Network.HTTP.Types.URI import Data.Time.Clock import Text.Read +import Control.Concurrent.STM +import Control.Concurrent.Async import qualified Data.Map as M import qualified Data.Set as S import qualified Data.ByteString as B @@ -209,8 +212,10 @@ newtype PercentFloat = PercentFloat Float deriving (Show, Eq) instance Proto.Serializable PercentFloat where - serialize (PercentFloat p) = show p - deserialize s = PercentFloat <$> readMaybe s + serialize (PercentFloat p) = show p ++ "%" + deserialize s = do + s' <- reverse <$> stripPrefix "%" (reverse s) + PercentFloat <$> readMaybe s' data ComputeState = ComputeState { computeParams :: [String] @@ -374,9 +379,11 @@ runComputeProgram -> (OsPath -> Annex (Key, Maybe (Either Git.Sha OsPath))) -- ^ get input file's content, or Nothing the input file's -- content is not available + -> Maybe (Key, MeterUpdate) + -- ^ update meter for this key -> (ComputeState -> OsPath -> NominalDiffTime -> Annex v) -> Annex v -runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) getinputcontent cont = +runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) getinputcontent meterkey cont = withOtherTmp $ \othertmpdir -> withTmpDirIn othertmpdir (literalOsPath "compute") go where @@ -391,10 +398,10 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) } showOutput starttime <- liftIO currentMonotonicTimestamp - state' <- bracket + state' <- withmeterfile $ \meterfile -> bracket (liftIO $ createProcess pr) (liftIO . cleanupProcess) - (getinput state tmpdir subdir) + (getinput tmpdir subdir state meterfile) endtime <- liftIO currentMonotonicTimestamp cont state' subdir (calcduration starttime endtime) @@ -408,13 +415,13 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) , return tmpdir ) - getinput state' tmpdir subdir p = + getinput tmpdir subdir state' meterfile p = liftIO (hGetLineUntilExitOrEOF (processHandle p) (stdoutHandle p)) >>= \case Just l - | null l -> getinput state' tmpdir subdir p + | null l -> getinput tmpdir subdir state' meterfile p | otherwise -> do - state'' <- parseoutput p tmpdir subdir state' l - getinput state'' tmpdir subdir p + state'' <- parseoutput p tmpdir subdir state' meterfile l + getinput tmpdir subdir state'' meterfile p Nothing -> do liftIO $ hClose (stdoutHandle p) liftIO $ hClose (stdinHandle p) @@ -422,7 +429,7 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) giveup $ program ++ " exited unsuccessfully" return state' - parseoutput p tmpdir subdir state' l = case Proto.parseMessage l of + parseoutput p tmpdir subdir state' meterfile l = case Proto.parseMessage l of Just (ProcessInput f) -> do let f' = toOsPath f let knowninput = M.member f' (computeInputs state') @@ -453,7 +460,12 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) Just (ProcessOutput f) -> do let f' = toOsPath f checksafefile tmpdir subdir f' "output" - let knownoutput = M.member f' (computeOutputs state') + knownoutput <- case M.lookup f' (computeOutputs state') of + Nothing -> return False + Just mk -> do + when (mk == fmap fst meterkey) $ + meterfile (subdir f') + return True checkimmutable knownoutput "outputting" f' $ return $ if immutablestate then state' @@ -463,12 +475,12 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) (computeOutputs state') } Just (ProcessProgress percent) -> do - -- XXX + liftIO $ updatepercent percent return state' Just ProcessReproducible -> return $ state' { computeReproducible = True } Nothing -> giveup $ - program ++ " output included an unparseable line: \"" ++ l ++ "\"" + program ++ " output an unparseable line: \"" ++ l ++ "\"" checksafefile tmpdir subdir f fileaction = do let err problem = giveup $ @@ -497,26 +509,57 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) liftIO . F.writeFile f =<< catObject gitsha return f + withmeterfile a = case meterkey of + Nothing -> a (const noop) + Just (_, progress) -> do + filev <- liftIO newEmptyTMVarIO + endv <- liftIO $ newEmptyTMVarIO + let meterfile = void . liftIO + . atomically . tryPutTMVar filev + let endmeterfile = atomically $ putTMVar endv () + tid <- liftIO $ async $ do + v <- liftIO $ atomically $ + (Right <$> takeTMVar filev) + `orElse` + (Left <$> takeTMVar endv) + case v of + Right f -> watchFileSize f progress $ \_ -> + void $ liftIO $ atomically $ + takeTMVar endv + Left () -> return () + a meterfile + `finally` liftIO (endmeterfile >> wait tid) + + updatepercent (PercentFloat percent) = case meterkey of + Nothing -> noop + Just (k, progress) -> case fromKey keySize k of + Nothing -> noop + Just sz -> + progress $ BytesProcessed $ floor $ + fromIntegral sz * percent / 100 + computationBehaviorChangeError :: ComputeProgram -> String -> OsPath -> Annex a computationBehaviorChangeError (ComputeProgram program) requestdesc p = giveup $ program ++ " is not behaving the same way it used to, now " ++ requestdesc ++ ": " ++ fromOsPath p computeKey :: RemoteStateHandle -> ComputeProgram -> Key -> AssociatedFile -> OsPath -> MeterUpdate -> VerifyConfig -> Annex Verification -computeKey rs (ComputeProgram program) k _af dest p vc = +computeKey rs (ComputeProgram program) k _af dest meterupdate vc = getComputeState rs k >>= \case - Just state -> + Just state -> case computeskey state of - Just keyfile -> runComputeProgram - (ComputeProgram program) - state - (ImmutableState True) - (getinputcontent state) - (postcompute keyfile) + Just keyfile -> go state keyfile Nothing -> missingstate Nothing -> missingstate where missingstate = giveup "Missing compute state" + go state keyfile = metered (Just meterupdate) k Nothing $ \_ p -> + runComputeProgram (ComputeProgram program) state + (ImmutableState True) + (getinputcontent state) + (Just (k, p)) + (postcompute keyfile) + getinputcontent state f = case M.lookup f (computeInputs state) of Just inputkey -> case keyGitSha inputkey of diff --git a/TODO-compute b/TODO-compute index 09cc853898..5b212695c6 100644 --- a/TODO-compute +++ b/TODO-compute @@ -11,8 +11,6 @@ * allow git-annex enableremote with program= explicitly specified, without checking annex.security.allowed-compute-programs -* need progress bars for computations and implement PROGRESS message - * addcomputed should honor annex.addunlocked. * Perhaps recompute should write a new version of a file as an unlocked