From 40be51c98a3035f5c2af4c95c4ccf0e85010c558 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 24 Feb 2025 15:48:42 -0400 Subject: [PATCH] reimplement using new compute program interface --- Remote/Compute.hs | 509 +++++++++++++++++----------------------------- 1 file changed, 185 insertions(+), 324 deletions(-) diff --git a/Remote/Compute.hs b/Remote/Compute.hs index 3012337f3d..2142c96e48 100644 --- a/Remote/Compute.hs +++ b/Remote/Compute.hs @@ -9,12 +9,9 @@ module Remote.Compute ( remote, - Interface, ComputeState(..), setComputeState, getComputeStates, - InterfaceEnv, - interfaceEnv, getComputeProgram, runComputeProgram, ) where @@ -40,9 +37,7 @@ import qualified Git import qualified Utility.SimpleProtocol as Proto import Network.HTTP.Types.URI -import Control.Concurrent.STM import Data.Time.Clock -import Data.Either import Text.Read import qualified Data.Map as M import qualified Data.Set as S @@ -66,23 +61,22 @@ gen :: Git.Repo -> UUID -> RemoteConfig -> RemoteGitConfig -> RemoteStateHandle gen r u rc gc rs = case getComputeProgram rc of Left _err -> return Nothing Right program -> do - interface <- liftIO $ newTMVarIO Nothing c <- parsedRemoteConfig remote rc cst <- remoteCost gc c veryExpensiveRemoteCost - return $ Just $ mk program interface c cst + return $ Just $ mk program c cst where - mk program interface c cst = Remote + mk program c cst = Remote { uuid = u , cost = cst , name = Git.repoDescribe r , storeKey = storeKeyUnsupported - , retrieveKeyFile = computeKey rs program interface + , retrieveKeyFile = computeKey rs program , retrieveKeyFileInOrder = pure True , retrieveKeyFileCheap = Nothing , retrievalSecurityPolicy = RetrievalAllKeysSecure , removeKey = dropKey rs , lockContent = Nothing - , checkPresent = checkKey rs program interface + , checkPresent = checkKey rs , checkPresentCheap = False , exportActions = exportUnsupported , importActions = importUnsupported @@ -114,33 +108,19 @@ setupInstance _ mu _ c _ = do gitConfigSpecialRemote u c [("compute", "true")] return (c, u) --- The RemoteConfig is allowed to contain fields from the program's --- interface. That provides defaults for git-annex addcomputed. computeConfigParser :: RemoteConfig -> Annex RemoteConfigParser -computeConfigParser rc = do - Interface interface <- case getComputeProgram rc of - Left _ -> pure $ Interface [] - Right program -> liftIO (getInterface program) >>= return . \case - Left _ -> Interface [] - Right interface -> interface - let m = M.fromList $ mapMaybe collectfields interface - let ininterface f = M.member (Field (fromProposedAccepted f)) m - return $ RemoteConfigParser - { remoteConfigFieldParsers = - [ optionalStringParser programField - (FieldDesc $ "compute program (must start with \"" ++ safetyPrefix ++ "\")") - ] - , remoteConfigRestPassthrough = Just - ( ininterface - , M.toList $ M.mapKeys fromField m - ) - } - where - collectfields (InterfaceInput f d) = Just (f, FieldDesc d) - collectfields (InterfaceOptionalInput f d) = Just (f, FieldDesc d) - collectfields (InterfaceValue f d) = Just (f, FieldDesc d) - collectfields (InterfaceOptionalValue f d) = Just (f, FieldDesc d) - collectfields _ = Nothing +computeConfigParser _ = return $ RemoteConfigParser + { remoteConfigFieldParsers = + [ optionalStringParser programField + (FieldDesc $ "compute program (must start with \"" ++ safetyPrefix ++ "\")") + ] + -- Pass through all other params, which git-annex addcomputed adds + -- to the input params. + , remoteConfigRestPassthrough = Just + ( const True + , [] + ) + } newtype ComputeProgram = ComputeProgram String deriving (Show) @@ -163,49 +143,20 @@ safetyPrefix = "git-annex-compute-" programField :: RemoteConfigField programField = Accepted "program" -type Description = String - -newtype Field = Field { fromField :: String } - deriving (Show, Eq, Ord) - -data InterfaceItem - = InterfaceInput Field Description - | InterfaceOptionalInput Field Description - | InterfaceValue Field Description - | InterfaceOptionalValue Field Description - | InterfaceOutput Field Description - | InterfaceReproducible +data ProcessCommand + = ProcessInput FilePath + | ProcessOutput FilePath + | ProcessReproducible + | ProcessProgress PercentFloat deriving (Show, Eq) --- List order matters, because when displaying the interface to the --- user, need to display it in the same order as the program --- does. -data Interface = Interface [InterfaceItem] - deriving (Show, Eq) - -instance Proto.Receivable InterfaceItem where - parseCommand "INPUT" = Proto.parse2 InterfaceInput - parseCommand "INPUT?" = Proto.parse2 InterfaceOptionalInput - parseCommand "VALUE" = Proto.parse2 InterfaceValue - parseCommand "VALUE?" = Proto.parse2 InterfaceOptionalValue - parseCommand "OUTPUT" = Proto.parse2 InterfaceOutput - parseCommand "REPRODUCIBLE" = Proto.parse0 InterfaceReproducible +instance Proto.Receivable ProcessCommand where + parseCommand "INPUT" = Proto.parse1 ProcessInput + parseCommand "OUTPUT" = Proto.parse1 ProcessOutput + parseCommand "REPRODUCIBLE" = Proto.parse0 ProcessReproducible + parseCommand "PROGRESS" = Proto.parse1 ProcessProgress parseCommand _ = Proto.parseFail -data ProcessOutput - = Computing Field FilePath - | Progress PercentFloat - deriving (Show, Eq) - -instance Proto.Receivable ProcessOutput where - parseCommand "COMPUTING" = Proto.parse2 Computing - parseCommand "PROGRESS" = Proto.parse1 Progress - parseCommand _ = Proto.parseFail - -instance Proto.Serializable Field where - serialize = fromField - deserialize = Just . Field - newtype PercentFloat = PercentFloat Float deriving (Show, Eq) @@ -213,136 +164,80 @@ instance Proto.Serializable PercentFloat where serialize (PercentFloat p) = show p deserialize s = PercentFloat <$> readMaybe s -getInterfaceCached :: ComputeProgram -> TMVar (Maybe Interface) -> IO (Either String Interface) -getInterfaceCached program iv = - atomically (takeTMVar iv) >>= \case - Nothing -> getInterface program >>= \case - Left err -> do - atomically $ putTMVar iv Nothing - return (Left err) - Right interface -> ret interface - Just interface -> ret interface - where - ret interface = do - atomically $ putTMVar iv (Just interface) - return (Right interface) - -getInterface :: ComputeProgram -> IO (Either String Interface) -getInterface (ComputeProgram program) = - catchMaybeIO (readProcess program ["interface"]) >>= \case - Nothing -> return $ Left $ "Failed to run " ++ program - Just output -> return $ case parseInterface output of - Right i -> Right i - Left err -> Left $ program ++ " interface output problem: " ++ err - -parseInterface :: String -> Either String Interface -parseInterface = go [] . lines - where - go is [] - | null is = Left "empty interface output" - | otherwise = Right (Interface (reverse is)) - go is (l:ls) - | null l = go is ls - | otherwise = case Proto.parseMessage l of - Just i -> go (i:is) ls - Nothing -> Left $ "Unable to parse line: \"" ++ l ++ "\"" - -data ComputeInput = ComputeInput Key FilePath - deriving (Show, Eq) - -data ComputeValue = ComputeValue String - deriving (Show, Eq) - -data ComputeOutput = ComputeOutput Key - deriving (Show, Eq) - data ComputeState = ComputeState - { computeInputs :: M.Map Field ComputeInput - , computeValues :: M.Map Field ComputeValue - , computeOutputs :: M.Map Field ComputeOutput + { computeParams :: [String] + , computeInputs :: M.Map FilePath Key + , computeOutputs :: M.Map FilePath (Maybe Key) + , computeReproducible :: Bool } deriving (Show, Eq) {- Formats a ComputeState as an URL query string. - - - Prefixes fields with "k" and "f" for computeInputs, with - - "v" for computeValues and "o" for computeOutputs. + - Prefixes computeParams with 'p', computeInputs with 'i', + - and computeOutput with 'o'. - - When the passed Key is an output, rather than duplicate it - in the query string, that output has no value. - - - Fields in the query string are sorted. This is in order to ensure - - that the same ComputeState is always formatted the same way. + - Example: "psomefile&pdestfile&pbaz&isomefile=WORM--foo&odestfile=" - - - Example: "ffoo=somefile&kfoo=WORM--foo&oresult&vbar=11" + - The computeParams are in the order they were given. The computeInputs + - and computeOutputs are sorted in ascending order for stability. -} formatComputeState :: Key -> ComputeState -> B.ByteString -formatComputeState k st = renderQuery False $ sortOn fst $ concat - [ concatMap formatinput $ M.toList (computeInputs st) - , map formatvalue $ M.toList (computeValues st) - , map formatoutput $ M.toList (computeOutputs st) +formatComputeState k st = renderQuery False $ concat + [ map formatparam (computeParams st) + , map formatinput (M.toAscList (computeInputs st)) + , mapMaybe formatoutput (M.toAscList (computeOutputs st)) ] where - formatinput (f, ComputeInput key file) = - [ ("k" <> fb, Just (serializeKey' key)) - , ("f" <> fb, Just (toRawFilePath file)) - ] - where - fb = encodeBS (fromField f) - formatvalue (f, ComputeValue v) = - ("v" <> encodeBS (fromField f), Just (encodeBS v)) - formatoutput (f, ComputeOutput key) = - ("o" <> encodeBS (fromField f), + formatparam p = ("p" <> encodeBS p, Nothing) + formatinput (file, key) = + ("i" <> toRawFilePath file, Just (serializeKey' key)) + formatoutput (file, (Just key)) = Just $ + ("o" <> toRawFilePath file, if key == k then Nothing else Just (serializeKey' key) ) + formatoutput (_, Nothing) = Nothing parseComputeState :: Key -> B.ByteString -> Maybe ComputeState parseComputeState k b = - let q = parseQuery b - st = go emptycomputestate (M.fromList q) q + let st = go emptycomputestate (parseQuery b) in if st == emptycomputestate then Nothing else Just st where - emptycomputestate = ComputeState mempty mempty mempty - go c _ [] = c - go c m ((f, v):rest) = + emptycomputestate = ComputeState mempty mempty mempty False + go :: ComputeState -> [QueryItem] -> ComputeState + go c [] = c { computeParams = reverse (computeParams c) } + go c ((f, v):rest) = let c' = fromMaybe c $ case decodeBS f of - ('f':f') -> do - file <- fromRawFilePath <$> v - kv <- M.lookup (encodeBS ('k':f')) m - key <- deserializeKey' =<< kv + ('p':p) -> Just $ c + { computeParams = p : computeParams c + } + ('i':i) -> do + key <- deserializeKey' =<< v Just $ c - { computeInputs = - M.insert (Field f') - (ComputeInput key file) + { computeInputs = + M.insert i key (computeInputs c) } - ('v':f') -> do - val <- decodeBS <$> v - Just $ c - { computeValues = - M.insert (Field f') - (ComputeValue val) - (computeValues c) - } - ('o':f') -> case v of + ('o':o) -> case v of Just kv -> do key <- deserializeKey' kv Just $ c - { computeOutputs = - M.insert (Field f') - (ComputeOutput key) + { computeOutputs = + M.insert o (Just key) (computeOutputs c) } Nothing -> Just $ c - { computeOutputs = - M.insert (Field f') - (ComputeOutput k) + { computeOutputs = + M.insert o (Just k) (computeOutputs c) } _ -> Nothing - in go c' m rest + in go c' rest {- The per remote metadata is used to store ComputeState. This allows - recording multiple ComputeStates that generate the same key. @@ -369,162 +264,142 @@ getComputeStates rs k = do Just ts -> go (zip (repeat ts) sts : c) rest Nothing -> go c rest -data InterfaceEnv = InterfaceEnv [(String, Either Key String)] - -data InterfaceOutputs = InterfaceOutputs (M.Map Field Key) - -{- Finds the first compute state that provides everything required by the - - interface, and returns a list of what should be provided to the program - - in its environment, and what outputs the program is expected to make. - -} -interfaceEnv :: [ComputeState] -> Interface -> Either String (InterfaceEnv, InterfaceOutputs) -interfaceEnv states interface = go Nothing states - where - go (Just firsterr) [] = Left firsterr - go Nothing [] = interfaceEnv' (ComputeState mempty mempty mempty) interface - go firsterr (state:rest) = case interfaceEnv' state interface of - Right v -> Right v - Left e - | null rest -> Left (fromMaybe e firsterr) - | otherwise -> go (firsterr <|> Just e) rest - -interfaceEnv' :: ComputeState -> Interface -> Either String (InterfaceEnv, InterfaceOutputs) -interfaceEnv' state interface@(Interface i) = - case partitionEithers (mapMaybe go i) of - ([], r) -> Right - ( InterfaceEnv (map (\(f, v) -> (fromField f, v)) r) - , interfaceOutputs state interface - ) - (problems, _) -> Left $ unlines problems - where - go (InterfaceInput field desc) = - case M.lookup field (computeInputs state) of - Just (ComputeInput key _file) -> Just $ - Right (field, Left key) - Nothing -> Just $ - Left $ "Missing required input \"" ++ fromField field ++ "\" -- " ++ desc - go (InterfaceOptionalInput field _desc) = - case M.lookup field (computeInputs state) of - Just (ComputeInput key _file) -> Just $ - Right (field, Left key) - Nothing -> Nothing - go (InterfaceValue field desc) = - case M.lookup field (computeValues state) of - Just (ComputeValue v) -> Just $ - Right (field, Right v) - Nothing -> Just $ - Left $ "Missing required value \"" ++ fromField field ++ "\" -- " ++ desc - go (InterfaceOptionalValue field _desc) = - case M.lookup field (computeValues state) of - Just (ComputeValue v) -> Just $ - Right (field, Right v) - Nothing -> Nothing - go (InterfaceOutput _ _) = Nothing - go InterfaceReproducible = Nothing - -interfaceOutputs :: ComputeState -> Interface -> InterfaceOutputs -interfaceOutputs state (Interface interface) = - InterfaceOutputs $ M.fromList $ mapMaybe go interface - where - go (InterfaceOutput field _) = do - ComputeOutput key <- M.lookup field (computeOutputs state) - Just (field, key) - go _ = Nothing - -computeProgramEnvironment :: InterfaceEnv -> Annex [(String, String)] -computeProgramEnvironment (InterfaceEnv ienv) = do +computeProgramEnvironment :: ComputeState -> Annex [(String, String)] +computeProgramEnvironment st = do environ <- filter (caninherit . fst) <$> liftIO getEnvironment - interfaceenv <- mapM go ienv - return $ environ ++ interfaceenv + let addenv = mapMaybe go (computeParams st) + return $ environ ++ addenv where envprefix = "ANNEX_COMPUTE_" caninherit v = not (envprefix `isPrefixOf` v) - go (f, Right v) = return (envprefix ++ f, v) - go (f, Left k) = - ifM (inAnnex k) - ( do - objloc <- calcRepo (gitAnnexLocation k) - return (envprefix ++ f, fromOsPath objloc) - , giveup "missing an input to the computation" - ) + go p + | '=' `elem` p = + let (f, v) = separate (== '=') p + in Just (envprefix ++ f, v) + | otherwise = Nothing + +newtype ImmutableState = ImmutableState Bool runComputeProgram :: ComputeProgram - -> Key - -> AssociatedFile - -> OsPath - -> MeterUpdate - -> VerifyConfig - -> (InterfaceEnv, InterfaceOutputs) - -> Annex Verification -runComputeProgram (ComputeProgram program) k _af dest p vc (ienv, InterfaceOutputs iout) = do - environ <- computeProgramEnvironment ienv + -> ComputeState + -> ImmutableState + -> (OsPath -> Annex (Key, Maybe OsPath)) + -> (ComputeState -> OsPath -> Annex v) + -> Annex v +runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) getinputcontent cont = withOtherTmp $ \tmpdir -> - go environ tmpdir + go tmpdir `finally` liftIO (removeDirectoryRecursive tmpdir) where - go environ tmpdir = do - let pr = (proc program []) - { cwd = Just $ fromOsPath tmpdir + go tmpdir = do + environ <- computeProgramEnvironment state + let pr = (proc program (computeParams state)) + { cwd = Just (fromOsPath tmpdir) + , std_in = CreatePipe , std_out = CreatePipe , env = Just environ } - computing <- liftIO $ withCreateProcess pr $ - processoutput mempty tmpdir - finish computing tmpdir + state' <- bracket + (liftIO $ createProcess pr) + (liftIO . cleanupProcess) + (getinput state tmpdir) + cont state' tmpdir - processoutput computing tmpdir _ (Just h) _ pid = - hGetLineUntilExitOrEOF pid h >>= \case + getinput state' tmpdir p = + liftIO (hGetLineUntilExitOrEOF (processHandle p) (stdoutHandle p)) >>= \case Just l - | null l -> processoutput computing tmpdir Nothing (Just h) Nothing pid - | otherwise -> parseoutput computing l >>= \case - Just computing' -> - processoutput computing' tmpdir Nothing (Just h) Nothing pid - Nothing -> do - hClose h - ifM (checkSuccessProcess pid) - ( giveup $ program ++ " output included an unparseable line: \"" ++ l ++ "\"" - , giveup $ program ++ " exited unsuccessfully" - ) + | null l -> getinput state' tmpdir p + | otherwise -> do + state'' <- parseoutput p state' l + getinput state'' tmpdir p Nothing -> do - hClose h - unlessM (checkSuccessProcess pid) $ + liftIO $ hClose (stdoutHandle p) + liftIO $ hClose (stdinHandle p) + unlessM (liftIO $ checkSuccessProcess (processHandle p)) $ giveup $ program ++ " exited unsuccessfully" - return computing - processoutput _ _ _ _ _ _ = error "internal" + return state' - parseoutput computing l = case Proto.parseMessage l of - Just (Computing field file) -> - case M.lookup field iout of - Just key -> do - when (key == k) $ - -- XXX can start watching the file and updating progess now - return () - return $ Just $ - M.insert key (toRawFilePath file) computing - Nothing -> return (Just computing) - Just (Progress percent) -> do + parseoutput p state' l = case Proto.parseMessage l of + Just (ProcessInput f) -> + let knowninput = M.member f (computeInputs state') + in checkimmutable knowninput l $ do + (k, mp) <- getinputcontent (toOsPath f) + liftIO $ hPutStrLn (stdinHandle p) $ + maybe "" fromOsPath mp + return $ if knowninput + then state' + else state' + { computeInputs = + M.insert f k + (computeInputs state') + } + Just (ProcessOutput f) -> + let knownoutput = M.member f (computeOutputs state') + in checkimmutable knownoutput l $ + return $ if knownoutput + then state' + else state' + { computeOutputs = + M.insert f Nothing + (computeOutputs state') + } + Just (ProcessProgress percent) -> do -- XXX - return Nothing - Nothing -> return Nothing - - finish computing tmpdir = do - case M.lookup k computing of - Nothing -> giveup $ program ++ " exited successfully, but failed to output a filename" - Just file -> do - let file' = tmpdir file - unlessM (liftIO $ doesFileExist file') $ - giveup $ program ++ " exited sucessfully, but failed to write the computed file" - catchNonAsync (liftIO $ moveFile file' dest) - (\err -> giveup $ "failed to move the computed file: " ++ show err) + return state' + Just ProcessReproducible -> + return $ state' { computeReproducible = True } + Nothing -> giveup $ + program ++ " output included an unparseable line: \"" ++ l ++ "\"" + + checkimmutable True _ a = a + checkimmutable False l a + | not immutablestate = a + | otherwise = giveup $ + program ++ " is not behaving the same way it used to, now outputting: \"" ++ l ++ "\"" + +computeKey :: RemoteStateHandle -> ComputeProgram -> Key -> AssociatedFile -> OsPath -> MeterUpdate -> VerifyConfig -> Annex Verification +computeKey rs (ComputeProgram program) k af dest p vc = do + states <- map snd . sortOn fst -- least expensive probably + <$> getComputeStates rs k + case mapMaybe computeskey states of + ((keyfile, state):_) -> runComputeProgram + (ComputeProgram program) + state + (ImmutableState True) + (getinputcontent state) + (go keyfile) + [] -> giveup "Missing compute state" + where + getinputcontent state f = + case M.lookup (fromOsPath f) (computeInputs state) of + Just inputkey -> do + obj <- calcRepo (gitAnnexLocation inputkey) + -- XXX get input object when not present + return (inputkey, Just obj) + Nothing -> error "internal" + + computeskey state = + case M.keys $ M.filter (== Just k) (computeOutputs state) of + (keyfile : _) -> Just (keyfile, state) + [] -> Nothing + + go keyfile state tmpdir = do + let keyfile' = tmpdir toOsPath keyfile + unlessM (liftIO $ doesFileExist keyfile') $ + giveup $ program ++ " exited sucessfully, but failed to write the computed file" + catchNonAsync (liftIO $ moveFile keyfile' dest) + (\err -> giveup $ "failed to move the computed file: " ++ show err) -- Try to move any other computed object files into the annex. - forM_ (M.toList computing) $ \(key, file) -> - when (k /= key) $ do - let file' = tmpdir file - whenM (liftIO $ doesFileExist file') $ - whenM (verifyKeyContentPostRetrieval RetrievalAllKeysSecure vc verification k file') $ - void $ tryNonAsync $ moveAnnex k file' + forM_ (M.toList $ computeOutputs state) $ \case + (file, (Just key)) -> + when (k /= key) $ do + let file' = tmpdir toOsPath file + whenM (liftIO $ doesFileExist file') $ + whenM (verifyKeyContentPostRetrieval RetrievalAllKeysSecure vc verification k file') $ + void $ tryNonAsync $ moveAnnex k file' + _ -> noop return verification @@ -532,27 +407,13 @@ runComputeProgram (ComputeProgram program) k _af dest p vc (ienv, InterfaceOutpu -- verification. verification = MustVerify -computeKey :: RemoteStateHandle -> ComputeProgram -> TMVar (Maybe Interface) -> Key -> AssociatedFile -> OsPath -> MeterUpdate -> VerifyConfig -> Annex Verification -computeKey rs program iv k af dest p vc = - liftIO (getInterfaceCached program iv) >>= \case - Left err -> giveup err - Right interface -> do - states <- map snd . sortOn fst - <$> getComputeStates rs k - either giveup (runComputeProgram program k af dest p vc) - (interfaceEnv states interface) - --- Make sure that the compute state has everything needed by --- the program's current interface. -checkKey :: RemoteStateHandle -> ComputeProgram -> TMVar (Maybe Interface) -> Key -> Annex Bool -checkKey rs program iv k = do - states <- map snd <$> getComputeStates rs k - liftIO (getInterfaceCached program iv) >>= \case - Left err -> giveup err - Right interface -> - case interfaceEnv states interface of - Right _ -> return True - Left _ -> return False +-- Make sure that the compute state exists. +checkKey :: RemoteStateHandle -> Key -> Annex Bool +checkKey rs k = do + states <- getComputeStates rs k + if null states + then giveup "Missing compute state" + else return True -- Unsetting the compute state will prevent computing the key. dropKey :: RemoteStateHandle -> Maybe SafeDropProof -> Key -> Annex ()