avoid unncessary git-annex branch changes for recompute and addcomputed
This commit is contained in:
parent
ccc454a791
commit
c6c6e2632d
3 changed files with 96 additions and 55 deletions
|
@ -20,6 +20,7 @@ import Backend
|
|||
import Annex.CatFile
|
||||
import Annex.Content.Presence
|
||||
import Annex.Ingest
|
||||
import Annex.UUID
|
||||
import Annex.GitShaKey
|
||||
import Types.KeySource
|
||||
import Types.Key
|
||||
|
@ -94,35 +95,35 @@ perform o r = do
|
|||
, Remote.Compute.computeInputs = mempty
|
||||
, Remote.Compute.computeOutputs = mempty
|
||||
, Remote.Compute.computeSubdir = subdir
|
||||
, Remote.Compute.computeReproducible = False
|
||||
, Remote.Compute.computeInputsUnavailable = False
|
||||
}
|
||||
fast <- Annex.getRead Annex.fast
|
||||
Remote.Compute.runComputeProgram program state
|
||||
(Remote.Compute.ImmutableState False)
|
||||
(getInputContent fast)
|
||||
Nothing
|
||||
(addComputed "adding" True r (reproducible o) chooseBackend Just fast)
|
||||
(addComputed (Just "adding") True r (reproducible o) chooseBackend Just fast)
|
||||
next $ return True
|
||||
|
||||
addComputed
|
||||
:: StringContainingQuotedPath
|
||||
:: Maybe StringContainingQuotedPath
|
||||
-> Bool
|
||||
-> Remote
|
||||
-> Maybe Reproducible
|
||||
-> (OsPath -> Annex Backend)
|
||||
-> (OsPath -> Maybe OsPath)
|
||||
-> Bool
|
||||
-> Remote.Compute.ComputeState
|
||||
-> Remote.Compute.ComputeProgramResult
|
||||
-> OsPath
|
||||
-> NominalDiffTime
|
||||
-> Annex ()
|
||||
addComputed addaction stagefiles r reproducibleconfig choosebackend destfile fast state tmpdir ts = do
|
||||
let outputs = Remote.Compute.computeOutputs state
|
||||
addComputed maddaction stagefiles r reproducibleconfig choosebackend destfile fast result tmpdir ts = do
|
||||
when (M.null outputs) $
|
||||
giveup "The computation succeeded, but it did not generate any files."
|
||||
oks <- forM (M.keys outputs) $ \outputfile -> do
|
||||
showAction $ addaction <> " " <> QuotedPath outputfile
|
||||
case maddaction of
|
||||
Just addaction -> showAction $
|
||||
addaction <> " " <> QuotedPath outputfile
|
||||
Nothing -> noop
|
||||
k <- catchNonAsync (addfile outputfile)
|
||||
(\err -> giveup $ "Failed to ingest output file " ++ fromOsPath outputfile ++ ": " ++ show err)
|
||||
return (outputfile, Just k)
|
||||
|
@ -133,8 +134,15 @@ addComputed addaction stagefiles r reproducibleconfig choosebackend destfile fas
|
|||
Remote.Compute.setComputeState
|
||||
(Remote.remoteStateHandle r)
|
||||
k ts state'
|
||||
logChange NoLiveUpdate k (Remote.uuid r) InfoPresent
|
||||
|
||||
let u = Remote.uuid r
|
||||
unlessM (elem u <$> loggedLocations k) $
|
||||
logChange NoLiveUpdate k u InfoPresent
|
||||
where
|
||||
state = Remote.Compute.computeState result
|
||||
|
||||
outputs = Remote.Compute.computeOutputs state
|
||||
|
||||
addfile outputfile
|
||||
| fast = do
|
||||
case destfile outputfile of
|
||||
|
@ -179,6 +187,8 @@ addComputed addaction stagefiles r reproducibleconfig choosebackend destfile fas
|
|||
ingestwith a = a >>= \case
|
||||
Nothing -> giveup "ingestion failed"
|
||||
Just k -> do
|
||||
u <- getUUID
|
||||
unlessM (elem u <$> loggedLocations k) $
|
||||
logStatus NoLiveUpdate k InfoPresent
|
||||
when (fromKey keyVariety k == VURLKey) $ do
|
||||
hb <- hashBackend
|
||||
|
@ -194,7 +204,7 @@ addComputed addaction stagefiles r reproducibleconfig choosebackend destfile fas
|
|||
|
||||
isreproducible = case reproducibleconfig of
|
||||
Just v -> isReproducible v
|
||||
Nothing -> Remote.Compute.computeReproducible state
|
||||
Nothing -> Remote.Compute.computeReproducible result
|
||||
|
||||
getInputContent :: Bool -> OsPath -> Annex (Key, Maybe (Either Git.Sha OsPath))
|
||||
getInputContent fast p = catKeyFile p >>= \case
|
||||
|
|
|
@ -15,6 +15,7 @@ import qualified Remote
|
|||
import qualified Types.Remote as Remote
|
||||
import qualified Git.Ref as Git
|
||||
import Annex.Content
|
||||
import Annex.UUID
|
||||
import Annex.CatFile
|
||||
import Annex.GitShaKey
|
||||
import Git.FilePath
|
||||
|
@ -131,12 +132,13 @@ perform o r file origkey origstate = do
|
|||
(getinputcontent program)
|
||||
Nothing
|
||||
(go program reproducibleconfig)
|
||||
next $ return True
|
||||
next cleanup
|
||||
where
|
||||
go program reproducibleconfig state tmpdir ts = do
|
||||
checkbehaviorchange program state
|
||||
addComputed "processing" False r reproducibleconfig
|
||||
choosebackend destfile False state tmpdir ts
|
||||
go program reproducibleconfig result tmpdir ts = do
|
||||
checkbehaviorchange program
|
||||
(Remote.Compute.computeState result)
|
||||
addComputed Nothing False r reproducibleconfig
|
||||
choosebackend destfile False result tmpdir ts
|
||||
|
||||
checkbehaviorchange program state = do
|
||||
let check s w a b = forM_ (M.keys (w a)) $ \f ->
|
||||
|
@ -168,6 +170,10 @@ perform o r file origkey origstate = do
|
|||
|
||||
origbackendvariety = fromKey keyVariety origkey
|
||||
|
||||
recomputingvurl = case origbackendvariety of
|
||||
VURLKey -> True
|
||||
_ -> False
|
||||
|
||||
getreproducibleconfig = case reproducible o of
|
||||
Just (Reproducible True) -> return (Just (Reproducible True))
|
||||
-- A VURL key is used when the computation was
|
||||
|
@ -177,13 +183,22 @@ perform o r file origkey origstate = do
|
|||
-- delete the annex object first, so that if recomputing
|
||||
-- generates a new version of the file, it replaces
|
||||
-- the old version.
|
||||
v -> case origbackendvariety of
|
||||
VURLKey -> do
|
||||
v -> if recomputingvurl
|
||||
then do
|
||||
lockContentForRemoval origkey noop removeAnnex
|
||||
-- in case computation fails or is interupted
|
||||
logStatus NoLiveUpdate origkey InfoMissing
|
||||
return (Just (Reproducible False))
|
||||
_ -> return v
|
||||
else return v
|
||||
|
||||
cleanup = do
|
||||
case reproducible o of
|
||||
Just (Reproducible True) -> noop
|
||||
-- in case computation failed, update
|
||||
-- location log for removal done earlier
|
||||
_ -> when recomputingvurl $ do
|
||||
u <- getUUID
|
||||
unlessM (elem u <$> loggedLocations origkey) $
|
||||
logStatus NoLiveUpdate origkey InfoMissing
|
||||
return True
|
||||
|
||||
choosebackend _outputfile
|
||||
-- Use the same backend as was used to compute it before,
|
||||
|
|
|
@ -18,6 +18,7 @@ module Remote.Compute (
|
|||
getComputeProgram,
|
||||
runComputeProgram,
|
||||
ImmutableState(..),
|
||||
ComputeProgramResult(..),
|
||||
computationBehaviorChangeError,
|
||||
defaultComputeParams,
|
||||
) where
|
||||
|
@ -222,8 +223,6 @@ data ComputeState = ComputeState
|
|||
, computeInputs :: M.Map OsPath Key
|
||||
, computeOutputs :: M.Map OsPath (Maybe Key)
|
||||
, computeSubdir :: OsPath
|
||||
, computeReproducible :: Bool
|
||||
, computeInputsUnavailable :: Bool
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
|
@ -272,8 +271,6 @@ parseComputeState k b =
|
|||
, computeInputs = mempty
|
||||
, computeOutputs = mempty
|
||||
, computeSubdir = literalOsPath "."
|
||||
, computeReproducible = False
|
||||
, computeInputsUnavailable = False
|
||||
}
|
||||
|
||||
go :: ComputeState -> [QueryItem] -> ComputeState
|
||||
|
@ -330,16 +327,23 @@ computeStateUrl r st p =
|
|||
- The metadata fields are numbers (prefixed with "t" to make them legal
|
||||
- field names), which are estimates of how long it might take to run
|
||||
- the computation (in seconds).
|
||||
-
|
||||
- Avoids redundantly recording a ComputeState when the per remote metadata
|
||||
- already contains it.
|
||||
-}
|
||||
setComputeState :: RemoteStateHandle -> Key -> NominalDiffTime -> ComputeState -> Annex ()
|
||||
setComputeState rs k ts st = addRemoteMetaData k rs $ MetaData $ M.singleton
|
||||
setComputeState rs k ts st = do
|
||||
l <- map snd <$> getComputeStatesUnsorted rs k
|
||||
unless (st `elem` l) go
|
||||
where
|
||||
go = addRemoteMetaData k rs $ MetaData $ M.singleton
|
||||
(mkMetaFieldUnchecked $ T.pack ('t':show (truncateResolution 1 ts)))
|
||||
(S.singleton (MetaValue (CurrentlySet True) (formatComputeState k st)))
|
||||
|
||||
{- When multiple ComputeStates have been recorded for the same key,
|
||||
- this returns one that is probably less expensive to compute,
|
||||
- based on the original time it took to compute it. -}
|
||||
getComputeState:: RemoteStateHandle -> Key -> Annex (Maybe ComputeState)
|
||||
getComputeState :: RemoteStateHandle -> Key -> Annex (Maybe ComputeState)
|
||||
getComputeState rs k = headMaybe . map snd . sortOn fst
|
||||
<$> getComputeStatesUnsorted rs k
|
||||
|
||||
|
@ -372,6 +376,12 @@ computeProgramEnvironment st = do
|
|||
|
||||
newtype ImmutableState = ImmutableState Bool
|
||||
|
||||
data ComputeProgramResult = ComputeProgramResult
|
||||
{ computeState :: ComputeState
|
||||
, computeInputsUnavailable :: Bool
|
||||
, computeReproducible :: Bool
|
||||
}
|
||||
|
||||
runComputeProgram
|
||||
:: ComputeProgram
|
||||
-> ComputeState
|
||||
|
@ -381,7 +391,7 @@ runComputeProgram
|
|||
-- content is not available
|
||||
-> Maybe (Key, MeterUpdate)
|
||||
-- ^ update meter for this key
|
||||
-> (ComputeState -> OsPath -> NominalDiffTime -> Annex v)
|
||||
-> (ComputeProgramResult -> OsPath -> NominalDiffTime -> Annex v)
|
||||
-> Annex v
|
||||
runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) getinputcontent meterkey cont =
|
||||
withOtherTmp $ \othertmpdir ->
|
||||
|
@ -398,12 +408,13 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
|
|||
}
|
||||
showOutput
|
||||
starttime <- liftIO currentMonotonicTimestamp
|
||||
state' <- withmeterfile $ \meterfile -> bracket
|
||||
let startresult = ComputeProgramResult state False False
|
||||
result <- withmeterfile $ \meterfile -> bracket
|
||||
(liftIO $ createProcess pr)
|
||||
(liftIO . cleanupProcess)
|
||||
(getinput tmpdir subdir state meterfile)
|
||||
(getinput tmpdir subdir startresult meterfile)
|
||||
endtime <- liftIO currentMonotonicTimestamp
|
||||
cont state' subdir (calcduration starttime endtime)
|
||||
cont result subdir (calcduration starttime endtime)
|
||||
|
||||
getsubdir tmpdir = do
|
||||
let subdir = tmpdir </> computeSubdir state
|
||||
|
@ -415,24 +426,25 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
|
|||
, return tmpdir
|
||||
)
|
||||
|
||||
getinput tmpdir subdir state' meterfile p =
|
||||
getinput tmpdir subdir result meterfile p =
|
||||
liftIO (hGetLineUntilExitOrEOF (processHandle p) (stdoutHandle p)) >>= \case
|
||||
Just l
|
||||
| null l -> getinput tmpdir subdir state' meterfile p
|
||||
| null l -> getinput tmpdir subdir result meterfile p
|
||||
| otherwise -> do
|
||||
state'' <- parseoutput p tmpdir subdir state' meterfile l
|
||||
getinput tmpdir subdir state'' meterfile p
|
||||
result' <- parseoutput p tmpdir subdir result meterfile l
|
||||
getinput tmpdir subdir result' meterfile p
|
||||
Nothing -> do
|
||||
liftIO $ hClose (stdoutHandle p)
|
||||
liftIO $ hClose (stdinHandle p)
|
||||
unlessM (liftIO $ checkSuccessProcess (processHandle p)) $
|
||||
giveup $ program ++ " exited unsuccessfully"
|
||||
return state'
|
||||
return result
|
||||
|
||||
parseoutput p tmpdir subdir state' meterfile l = case Proto.parseMessage l of
|
||||
parseoutput p tmpdir subdir result meterfile l = case Proto.parseMessage l of
|
||||
Just (ProcessInput f) -> do
|
||||
let f' = toOsPath f
|
||||
let knowninput = M.member f' (computeInputs state')
|
||||
let knowninput = M.member f'
|
||||
(computeInputs (computeState result))
|
||||
checksafefile tmpdir subdir f' "input"
|
||||
checkimmutable knowninput "inputting" f' $ do
|
||||
(k, inputcontent) <- getinputcontent f'
|
||||
|
@ -446,21 +458,21 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
|
|||
liftIO $ hPutStrLn (stdinHandle p) $
|
||||
maybe "" fromOsPath mp
|
||||
liftIO $ hFlush (stdinHandle p)
|
||||
let state'' = state'
|
||||
let result' = result
|
||||
{ computeInputsUnavailable =
|
||||
isNothing mp || computeInputsUnavailable state'
|
||||
isNothing mp || computeInputsUnavailable result
|
||||
}
|
||||
return $ if immutablestate
|
||||
then state''
|
||||
else state''
|
||||
then result'
|
||||
else modresultstate result' $ \s -> s
|
||||
{ computeInputs =
|
||||
M.insert f' k
|
||||
(computeInputs state')
|
||||
(computeInputs s)
|
||||
}
|
||||
Just (ProcessOutput f) -> do
|
||||
let f' = toOsPath f
|
||||
checksafefile tmpdir subdir f' "output"
|
||||
knownoutput <- case M.lookup f' (computeOutputs state') of
|
||||
knownoutput <- case M.lookup f' (computeOutputs $ computeState result) of
|
||||
Nothing -> return False
|
||||
Just mk -> do
|
||||
when (mk == fmap fst meterkey) $
|
||||
|
@ -468,20 +480,23 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
|
|||
return True
|
||||
checkimmutable knownoutput "outputting" f' $
|
||||
return $ if immutablestate
|
||||
then state'
|
||||
else state'
|
||||
then result
|
||||
else modresultstate result $ \s -> s
|
||||
{ computeOutputs =
|
||||
M.insert f' Nothing
|
||||
(computeOutputs state')
|
||||
(computeOutputs s)
|
||||
}
|
||||
Just (ProcessProgress percent) -> do
|
||||
liftIO $ updatepercent percent
|
||||
return state'
|
||||
return result
|
||||
Just ProcessReproducible ->
|
||||
return $ state' { computeReproducible = True }
|
||||
return $ result { computeReproducible = True }
|
||||
Nothing -> giveup $
|
||||
program ++ " output an unparseable line: \"" ++ l ++ "\""
|
||||
|
||||
modresultstate result f =
|
||||
result { computeState = f (computeState result) }
|
||||
|
||||
checksafefile tmpdir subdir f fileaction = do
|
||||
let err problem = giveup $
|
||||
program ++ " tried to " ++ fileaction ++ " a file that is " ++ problem ++ ": " ++ fromOsPath f
|
||||
|
@ -596,10 +611,11 @@ computeKey rs (ComputeProgram program) k _af dest meterupdate vc =
|
|||
(keyfile : _) -> Just keyfile
|
||||
[] -> Nothing
|
||||
|
||||
postcompute keyfile state tmpdir _ts
|
||||
| computeInputsUnavailable state =
|
||||
postcompute keyfile result tmpdir _ts
|
||||
| computeInputsUnavailable result =
|
||||
giveup "Input file(s) unavailable."
|
||||
| otherwise = postcompute' keyfile state tmpdir
|
||||
| otherwise =
|
||||
postcompute' keyfile (computeState result) tmpdir
|
||||
|
||||
postcompute' keyfile state tmpdir = do
|
||||
hb <- hashBackend
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue