diff --git a/Command/AddComputed.hs b/Command/AddComputed.hs index f05f3bdfcd..4774caae9b 100644 --- a/Command/AddComputed.hs +++ b/Command/AddComputed.hs @@ -20,6 +20,7 @@ import Backend import Annex.CatFile import Annex.Content.Presence import Annex.Ingest +import Annex.UUID import Annex.GitShaKey import Types.KeySource import Types.Key @@ -94,35 +95,35 @@ perform o r = do , Remote.Compute.computeInputs = mempty , Remote.Compute.computeOutputs = mempty , Remote.Compute.computeSubdir = subdir - , Remote.Compute.computeReproducible = False - , Remote.Compute.computeInputsUnavailable = False } fast <- Annex.getRead Annex.fast Remote.Compute.runComputeProgram program state (Remote.Compute.ImmutableState False) (getInputContent fast) Nothing - (addComputed "adding" True r (reproducible o) chooseBackend Just fast) + (addComputed (Just "adding") True r (reproducible o) chooseBackend Just fast) next $ return True addComputed - :: StringContainingQuotedPath + :: Maybe StringContainingQuotedPath -> Bool -> Remote -> Maybe Reproducible -> (OsPath -> Annex Backend) -> (OsPath -> Maybe OsPath) -> Bool - -> Remote.Compute.ComputeState + -> Remote.Compute.ComputeProgramResult -> OsPath -> NominalDiffTime -> Annex () -addComputed addaction stagefiles r reproducibleconfig choosebackend destfile fast state tmpdir ts = do - let outputs = Remote.Compute.computeOutputs state +addComputed maddaction stagefiles r reproducibleconfig choosebackend destfile fast result tmpdir ts = do when (M.null outputs) $ giveup "The computation succeeded, but it did not generate any files." oks <- forM (M.keys outputs) $ \outputfile -> do - showAction $ addaction <> " " <> QuotedPath outputfile + case maddaction of + Just addaction -> showAction $ + addaction <> " " <> QuotedPath outputfile + Nothing -> noop k <- catchNonAsync (addfile outputfile) (\err -> giveup $ "Failed to ingest output file " ++ fromOsPath outputfile ++ ": " ++ show err) return (outputfile, Just k) @@ -133,8 +134,15 @@ addComputed addaction stagefiles r reproducibleconfig choosebackend destfile fas Remote.Compute.setComputeState (Remote.remoteStateHandle r) k ts state' - logChange NoLiveUpdate k (Remote.uuid r) InfoPresent + + let u = Remote.uuid r + unlessM (elem u <$> loggedLocations k) $ + logChange NoLiveUpdate k u InfoPresent where + state = Remote.Compute.computeState result + + outputs = Remote.Compute.computeOutputs state + addfile outputfile | fast = do case destfile outputfile of @@ -179,7 +187,9 @@ addComputed addaction stagefiles r reproducibleconfig choosebackend destfile fas ingestwith a = a >>= \case Nothing -> giveup "ingestion failed" Just k -> do - logStatus NoLiveUpdate k InfoPresent + u <- getUUID + unlessM (elem u <$> loggedLocations k) $ + logStatus NoLiveUpdate k InfoPresent when (fromKey keyVariety k == VURLKey) $ do hb <- hashBackend void $ addEquivilantKey hb k @@ -194,7 +204,7 @@ addComputed addaction stagefiles r reproducibleconfig choosebackend destfile fas isreproducible = case reproducibleconfig of Just v -> isReproducible v - Nothing -> Remote.Compute.computeReproducible state + Nothing -> Remote.Compute.computeReproducible result getInputContent :: Bool -> OsPath -> Annex (Key, Maybe (Either Git.Sha OsPath)) getInputContent fast p = catKeyFile p >>= \case diff --git a/Command/Recompute.hs b/Command/Recompute.hs index 8233bc87e7..6b21ce8ee7 100644 --- a/Command/Recompute.hs +++ b/Command/Recompute.hs @@ -15,6 +15,7 @@ import qualified Remote import qualified Types.Remote as Remote import qualified Git.Ref as Git import Annex.Content +import Annex.UUID import Annex.CatFile import Annex.GitShaKey import Git.FilePath @@ -131,12 +132,13 @@ perform o r file origkey origstate = do (getinputcontent program) Nothing (go program reproducibleconfig) - next $ return True + next cleanup where - go program reproducibleconfig state tmpdir ts = do - checkbehaviorchange program state - addComputed "processing" False r reproducibleconfig - choosebackend destfile False state tmpdir ts + go program reproducibleconfig result tmpdir ts = do + checkbehaviorchange program + (Remote.Compute.computeState result) + addComputed Nothing False r reproducibleconfig + choosebackend destfile False result tmpdir ts checkbehaviorchange program state = do let check s w a b = forM_ (M.keys (w a)) $ \f -> @@ -168,6 +170,10 @@ perform o r file origkey origstate = do origbackendvariety = fromKey keyVariety origkey + recomputingvurl = case origbackendvariety of + VURLKey -> True + _ -> False + getreproducibleconfig = case reproducible o of Just (Reproducible True) -> return (Just (Reproducible True)) -- A VURL key is used when the computation was @@ -177,13 +183,22 @@ perform o r file origkey origstate = do -- delete the annex object first, so that if recomputing -- generates a new version of the file, it replaces -- the old version. - v -> case origbackendvariety of - VURLKey -> do + v -> if recomputingvurl + then do lockContentForRemoval origkey noop removeAnnex - -- in case computation fails or is interupted - logStatus NoLiveUpdate origkey InfoMissing return (Just (Reproducible False)) - _ -> return v + else return v + + cleanup = do + case reproducible o of + Just (Reproducible True) -> noop + -- in case computation failed, update + -- location log for removal done earlier + _ -> when recomputingvurl $ do + u <- getUUID + unlessM (elem u <$> loggedLocations origkey) $ + logStatus NoLiveUpdate origkey InfoMissing + return True choosebackend _outputfile -- Use the same backend as was used to compute it before, diff --git a/Remote/Compute.hs b/Remote/Compute.hs index 8b64dee56e..b6ec907bda 100644 --- a/Remote/Compute.hs +++ b/Remote/Compute.hs @@ -18,6 +18,7 @@ module Remote.Compute ( getComputeProgram, runComputeProgram, ImmutableState(..), + ComputeProgramResult(..), computationBehaviorChangeError, defaultComputeParams, ) where @@ -222,8 +223,6 @@ data ComputeState = ComputeState , computeInputs :: M.Map OsPath Key , computeOutputs :: M.Map OsPath (Maybe Key) , computeSubdir :: OsPath - , computeReproducible :: Bool - , computeInputsUnavailable :: Bool } deriving (Show, Eq) @@ -272,8 +271,6 @@ parseComputeState k b = , computeInputs = mempty , computeOutputs = mempty , computeSubdir = literalOsPath "." - , computeReproducible = False - , computeInputsUnavailable = False } go :: ComputeState -> [QueryItem] -> ComputeState @@ -330,16 +327,23 @@ computeStateUrl r st p = - The metadata fields are numbers (prefixed with "t" to make them legal - field names), which are estimates of how long it might take to run - the computation (in seconds). + - + - Avoids redundantly recording a ComputeState when the per remote metadata + - already contains it. -} setComputeState :: RemoteStateHandle -> Key -> NominalDiffTime -> ComputeState -> Annex () -setComputeState rs k ts st = addRemoteMetaData k rs $ MetaData $ M.singleton - (mkMetaFieldUnchecked $ T.pack ('t':show (truncateResolution 1 ts))) - (S.singleton (MetaValue (CurrentlySet True) (formatComputeState k st))) +setComputeState rs k ts st = do + l <- map snd <$> getComputeStatesUnsorted rs k + unless (st `elem` l) go + where + go = addRemoteMetaData k rs $ MetaData $ M.singleton + (mkMetaFieldUnchecked $ T.pack ('t':show (truncateResolution 1 ts))) + (S.singleton (MetaValue (CurrentlySet True) (formatComputeState k st))) {- When multiple ComputeStates have been recorded for the same key, - this returns one that is probably less expensive to compute, - based on the original time it took to compute it. -} -getComputeState:: RemoteStateHandle -> Key -> Annex (Maybe ComputeState) +getComputeState :: RemoteStateHandle -> Key -> Annex (Maybe ComputeState) getComputeState rs k = headMaybe . map snd . sortOn fst <$> getComputeStatesUnsorted rs k @@ -372,6 +376,12 @@ computeProgramEnvironment st = do newtype ImmutableState = ImmutableState Bool +data ComputeProgramResult = ComputeProgramResult + { computeState :: ComputeState + , computeInputsUnavailable :: Bool + , computeReproducible :: Bool + } + runComputeProgram :: ComputeProgram -> ComputeState @@ -381,7 +391,7 @@ runComputeProgram -- content is not available -> Maybe (Key, MeterUpdate) -- ^ update meter for this key - -> (ComputeState -> OsPath -> NominalDiffTime -> Annex v) + -> (ComputeProgramResult -> OsPath -> NominalDiffTime -> Annex v) -> Annex v runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) getinputcontent meterkey cont = withOtherTmp $ \othertmpdir -> @@ -398,12 +408,13 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) } showOutput starttime <- liftIO currentMonotonicTimestamp - state' <- withmeterfile $ \meterfile -> bracket + let startresult = ComputeProgramResult state False False + result <- withmeterfile $ \meterfile -> bracket (liftIO $ createProcess pr) (liftIO . cleanupProcess) - (getinput tmpdir subdir state meterfile) + (getinput tmpdir subdir startresult meterfile) endtime <- liftIO currentMonotonicTimestamp - cont state' subdir (calcduration starttime endtime) + cont result subdir (calcduration starttime endtime) getsubdir tmpdir = do let subdir = tmpdir computeSubdir state @@ -415,24 +426,25 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) , return tmpdir ) - getinput tmpdir subdir state' meterfile p = + getinput tmpdir subdir result meterfile p = liftIO (hGetLineUntilExitOrEOF (processHandle p) (stdoutHandle p)) >>= \case Just l - | null l -> getinput tmpdir subdir state' meterfile p + | null l -> getinput tmpdir subdir result meterfile p | otherwise -> do - state'' <- parseoutput p tmpdir subdir state' meterfile l - getinput tmpdir subdir state'' meterfile p + result' <- parseoutput p tmpdir subdir result meterfile l + getinput tmpdir subdir result' meterfile p Nothing -> do liftIO $ hClose (stdoutHandle p) liftIO $ hClose (stdinHandle p) unlessM (liftIO $ checkSuccessProcess (processHandle p)) $ giveup $ program ++ " exited unsuccessfully" - return state' + return result - parseoutput p tmpdir subdir state' meterfile l = case Proto.parseMessage l of + parseoutput p tmpdir subdir result meterfile l = case Proto.parseMessage l of Just (ProcessInput f) -> do let f' = toOsPath f - let knowninput = M.member f' (computeInputs state') + let knowninput = M.member f' + (computeInputs (computeState result)) checksafefile tmpdir subdir f' "input" checkimmutable knowninput "inputting" f' $ do (k, inputcontent) <- getinputcontent f' @@ -446,21 +458,21 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) liftIO $ hPutStrLn (stdinHandle p) $ maybe "" fromOsPath mp liftIO $ hFlush (stdinHandle p) - let state'' = state' + let result' = result { computeInputsUnavailable = - isNothing mp || computeInputsUnavailable state' + isNothing mp || computeInputsUnavailable result } return $ if immutablestate - then state'' - else state'' + then result' + else modresultstate result' $ \s -> s { computeInputs = M.insert f' k - (computeInputs state') + (computeInputs s) } Just (ProcessOutput f) -> do let f' = toOsPath f checksafefile tmpdir subdir f' "output" - knownoutput <- case M.lookup f' (computeOutputs state') of + knownoutput <- case M.lookup f' (computeOutputs $ computeState result) of Nothing -> return False Just mk -> do when (mk == fmap fst meterkey) $ @@ -468,20 +480,23 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) return True checkimmutable knownoutput "outputting" f' $ return $ if immutablestate - then state' - else state' + then result + else modresultstate result $ \s -> s { computeOutputs = M.insert f' Nothing - (computeOutputs state') - } + (computeOutputs s) + } Just (ProcessProgress percent) -> do liftIO $ updatepercent percent - return state' + return result Just ProcessReproducible -> - return $ state' { computeReproducible = True } + return $ result { computeReproducible = True } Nothing -> giveup $ program ++ " output an unparseable line: \"" ++ l ++ "\"" + modresultstate result f = + result { computeState = f (computeState result) } + checksafefile tmpdir subdir f fileaction = do let err problem = giveup $ program ++ " tried to " ++ fileaction ++ " a file that is " ++ problem ++ ": " ++ fromOsPath f @@ -596,10 +611,11 @@ computeKey rs (ComputeProgram program) k _af dest meterupdate vc = (keyfile : _) -> Just keyfile [] -> Nothing - postcompute keyfile state tmpdir _ts - | computeInputsUnavailable state = + postcompute keyfile result tmpdir _ts + | computeInputsUnavailable result = giveup "Input file(s) unavailable." - | otherwise = postcompute' keyfile state tmpdir + | otherwise = + postcompute' keyfile (computeState result) tmpdir postcompute' keyfile state tmpdir = do hb <- hashBackend