reimplement using new compute program interface

This commit is contained in:
Joey Hess 2025-02-24 15:48:42 -04:00
parent 921850d05c
commit 40be51c98a
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38

View file

@ -9,12 +9,9 @@
module Remote.Compute (
remote,
Interface,
ComputeState(..),
setComputeState,
getComputeStates,
InterfaceEnv,
interfaceEnv,
getComputeProgram,
runComputeProgram,
) where
@ -40,9 +37,7 @@ import qualified Git
import qualified Utility.SimpleProtocol as Proto
import Network.HTTP.Types.URI
import Control.Concurrent.STM
import Data.Time.Clock
import Data.Either
import Text.Read
import qualified Data.Map as M
import qualified Data.Set as S
@ -66,23 +61,22 @@ gen :: Git.Repo -> UUID -> RemoteConfig -> RemoteGitConfig -> RemoteStateHandle
gen r u rc gc rs = case getComputeProgram rc of
Left _err -> return Nothing
Right program -> do
interface <- liftIO $ newTMVarIO Nothing
c <- parsedRemoteConfig remote rc
cst <- remoteCost gc c veryExpensiveRemoteCost
return $ Just $ mk program interface c cst
return $ Just $ mk program c cst
where
mk program interface c cst = Remote
mk program c cst = Remote
{ uuid = u
, cost = cst
, name = Git.repoDescribe r
, storeKey = storeKeyUnsupported
, retrieveKeyFile = computeKey rs program interface
, retrieveKeyFile = computeKey rs program
, retrieveKeyFileInOrder = pure True
, retrieveKeyFileCheap = Nothing
, retrievalSecurityPolicy = RetrievalAllKeysSecure
, removeKey = dropKey rs
, lockContent = Nothing
, checkPresent = checkKey rs program interface
, checkPresent = checkKey rs
, checkPresentCheap = False
, exportActions = exportUnsupported
, importActions = importUnsupported
@ -114,33 +108,19 @@ setupInstance _ mu _ c _ = do
gitConfigSpecialRemote u c [("compute", "true")]
return (c, u)
-- The RemoteConfig is allowed to contain fields from the program's
-- interface. That provides defaults for git-annex addcomputed.
computeConfigParser :: RemoteConfig -> Annex RemoteConfigParser
computeConfigParser rc = do
Interface interface <- case getComputeProgram rc of
Left _ -> pure $ Interface []
Right program -> liftIO (getInterface program) >>= return . \case
Left _ -> Interface []
Right interface -> interface
let m = M.fromList $ mapMaybe collectfields interface
let ininterface f = M.member (Field (fromProposedAccepted f)) m
return $ RemoteConfigParser
{ remoteConfigFieldParsers =
[ optionalStringParser programField
(FieldDesc $ "compute program (must start with \"" ++ safetyPrefix ++ "\")")
]
, remoteConfigRestPassthrough = Just
( ininterface
, M.toList $ M.mapKeys fromField m
)
}
where
collectfields (InterfaceInput f d) = Just (f, FieldDesc d)
collectfields (InterfaceOptionalInput f d) = Just (f, FieldDesc d)
collectfields (InterfaceValue f d) = Just (f, FieldDesc d)
collectfields (InterfaceOptionalValue f d) = Just (f, FieldDesc d)
collectfields _ = Nothing
computeConfigParser _ = return $ RemoteConfigParser
{ remoteConfigFieldParsers =
[ optionalStringParser programField
(FieldDesc $ "compute program (must start with \"" ++ safetyPrefix ++ "\")")
]
-- Pass through all other params, which git-annex addcomputed adds
-- to the input params.
, remoteConfigRestPassthrough = Just
( const True
, []
)
}
newtype ComputeProgram = ComputeProgram String
deriving (Show)
@ -163,49 +143,20 @@ safetyPrefix = "git-annex-compute-"
programField :: RemoteConfigField
programField = Accepted "program"
type Description = String
newtype Field = Field { fromField :: String }
deriving (Show, Eq, Ord)
data InterfaceItem
= InterfaceInput Field Description
| InterfaceOptionalInput Field Description
| InterfaceValue Field Description
| InterfaceOptionalValue Field Description
| InterfaceOutput Field Description
| InterfaceReproducible
data ProcessCommand
= ProcessInput FilePath
| ProcessOutput FilePath
| ProcessReproducible
| ProcessProgress PercentFloat
deriving (Show, Eq)
-- List order matters, because when displaying the interface to the
-- user, need to display it in the same order as the program
-- does.
data Interface = Interface [InterfaceItem]
deriving (Show, Eq)
instance Proto.Receivable InterfaceItem where
parseCommand "INPUT" = Proto.parse2 InterfaceInput
parseCommand "INPUT?" = Proto.parse2 InterfaceOptionalInput
parseCommand "VALUE" = Proto.parse2 InterfaceValue
parseCommand "VALUE?" = Proto.parse2 InterfaceOptionalValue
parseCommand "OUTPUT" = Proto.parse2 InterfaceOutput
parseCommand "REPRODUCIBLE" = Proto.parse0 InterfaceReproducible
instance Proto.Receivable ProcessCommand where
parseCommand "INPUT" = Proto.parse1 ProcessInput
parseCommand "OUTPUT" = Proto.parse1 ProcessOutput
parseCommand "REPRODUCIBLE" = Proto.parse0 ProcessReproducible
parseCommand "PROGRESS" = Proto.parse1 ProcessProgress
parseCommand _ = Proto.parseFail
data ProcessOutput
= Computing Field FilePath
| Progress PercentFloat
deriving (Show, Eq)
instance Proto.Receivable ProcessOutput where
parseCommand "COMPUTING" = Proto.parse2 Computing
parseCommand "PROGRESS" = Proto.parse1 Progress
parseCommand _ = Proto.parseFail
instance Proto.Serializable Field where
serialize = fromField
deserialize = Just . Field
newtype PercentFloat = PercentFloat Float
deriving (Show, Eq)
@ -213,136 +164,80 @@ instance Proto.Serializable PercentFloat where
serialize (PercentFloat p) = show p
deserialize s = PercentFloat <$> readMaybe s
getInterfaceCached :: ComputeProgram -> TMVar (Maybe Interface) -> IO (Either String Interface)
getInterfaceCached program iv =
atomically (takeTMVar iv) >>= \case
Nothing -> getInterface program >>= \case
Left err -> do
atomically $ putTMVar iv Nothing
return (Left err)
Right interface -> ret interface
Just interface -> ret interface
where
ret interface = do
atomically $ putTMVar iv (Just interface)
return (Right interface)
getInterface :: ComputeProgram -> IO (Either String Interface)
getInterface (ComputeProgram program) =
catchMaybeIO (readProcess program ["interface"]) >>= \case
Nothing -> return $ Left $ "Failed to run " ++ program
Just output -> return $ case parseInterface output of
Right i -> Right i
Left err -> Left $ program ++ " interface output problem: " ++ err
parseInterface :: String -> Either String Interface
parseInterface = go [] . lines
where
go is []
| null is = Left "empty interface output"
| otherwise = Right (Interface (reverse is))
go is (l:ls)
| null l = go is ls
| otherwise = case Proto.parseMessage l of
Just i -> go (i:is) ls
Nothing -> Left $ "Unable to parse line: \"" ++ l ++ "\""
data ComputeInput = ComputeInput Key FilePath
deriving (Show, Eq)
data ComputeValue = ComputeValue String
deriving (Show, Eq)
data ComputeOutput = ComputeOutput Key
deriving (Show, Eq)
data ComputeState = ComputeState
{ computeInputs :: M.Map Field ComputeInput
, computeValues :: M.Map Field ComputeValue
, computeOutputs :: M.Map Field ComputeOutput
{ computeParams :: [String]
, computeInputs :: M.Map FilePath Key
, computeOutputs :: M.Map FilePath (Maybe Key)
, computeReproducible :: Bool
}
deriving (Show, Eq)
{- Formats a ComputeState as an URL query string.
-
- Prefixes fields with "k" and "f" for computeInputs, with
- "v" for computeValues and "o" for computeOutputs.
- Prefixes computeParams with 'p', computeInputs with 'i',
- and computeOutput with 'o'.
-
- When the passed Key is an output, rather than duplicate it
- in the query string, that output has no value.
-
- Fields in the query string are sorted. This is in order to ensure
- that the same ComputeState is always formatted the same way.
- Example: "psomefile&pdestfile&pbaz&isomefile=WORM--foo&odestfile="
-
- Example: "ffoo=somefile&kfoo=WORM--foo&oresult&vbar=11"
- The computeParams are in the order they were given. The computeInputs
- and computeOutputs are sorted in ascending order for stability.
-}
formatComputeState :: Key -> ComputeState -> B.ByteString
formatComputeState k st = renderQuery False $ sortOn fst $ concat
[ concatMap formatinput $ M.toList (computeInputs st)
, map formatvalue $ M.toList (computeValues st)
, map formatoutput $ M.toList (computeOutputs st)
formatComputeState k st = renderQuery False $ concat
[ map formatparam (computeParams st)
, map formatinput (M.toAscList (computeInputs st))
, mapMaybe formatoutput (M.toAscList (computeOutputs st))
]
where
formatinput (f, ComputeInput key file) =
[ ("k" <> fb, Just (serializeKey' key))
, ("f" <> fb, Just (toRawFilePath file))
]
where
fb = encodeBS (fromField f)
formatvalue (f, ComputeValue v) =
("v" <> encodeBS (fromField f), Just (encodeBS v))
formatoutput (f, ComputeOutput key) =
("o" <> encodeBS (fromField f),
formatparam p = ("p" <> encodeBS p, Nothing)
formatinput (file, key) =
("i" <> toRawFilePath file, Just (serializeKey' key))
formatoutput (file, (Just key)) = Just $
("o" <> toRawFilePath file,
if key == k
then Nothing
else Just (serializeKey' key)
)
formatoutput (_, Nothing) = Nothing
parseComputeState :: Key -> B.ByteString -> Maybe ComputeState
parseComputeState k b =
let q = parseQuery b
st = go emptycomputestate (M.fromList q) q
let st = go emptycomputestate (parseQuery b)
in if st == emptycomputestate then Nothing else Just st
where
emptycomputestate = ComputeState mempty mempty mempty
go c _ [] = c
go c m ((f, v):rest) =
emptycomputestate = ComputeState mempty mempty mempty False
go :: ComputeState -> [QueryItem] -> ComputeState
go c [] = c { computeParams = reverse (computeParams c) }
go c ((f, v):rest) =
let c' = fromMaybe c $ case decodeBS f of
('f':f') -> do
file <- fromRawFilePath <$> v
kv <- M.lookup (encodeBS ('k':f')) m
key <- deserializeKey' =<< kv
('p':p) -> Just $ c
{ computeParams = p : computeParams c
}
('i':i) -> do
key <- deserializeKey' =<< v
Just $ c
{ computeInputs =
M.insert (Field f')
(ComputeInput key file)
{ computeInputs =
M.insert i key
(computeInputs c)
}
('v':f') -> do
val <- decodeBS <$> v
Just $ c
{ computeValues =
M.insert (Field f')
(ComputeValue val)
(computeValues c)
}
('o':f') -> case v of
('o':o) -> case v of
Just kv -> do
key <- deserializeKey' kv
Just $ c
{ computeOutputs =
M.insert (Field f')
(ComputeOutput key)
{ computeOutputs =
M.insert o (Just key)
(computeOutputs c)
}
Nothing -> Just $ c
{ computeOutputs =
M.insert (Field f')
(ComputeOutput k)
{ computeOutputs =
M.insert o (Just k)
(computeOutputs c)
}
_ -> Nothing
in go c' m rest
in go c' rest
{- The per remote metadata is used to store ComputeState. This allows
- recording multiple ComputeStates that generate the same key.
@ -369,162 +264,142 @@ getComputeStates rs k = do
Just ts -> go (zip (repeat ts) sts : c) rest
Nothing -> go c rest
data InterfaceEnv = InterfaceEnv [(String, Either Key String)]
data InterfaceOutputs = InterfaceOutputs (M.Map Field Key)
{- Finds the first compute state that provides everything required by the
- interface, and returns a list of what should be provided to the program
- in its environment, and what outputs the program is expected to make.
-}
interfaceEnv :: [ComputeState] -> Interface -> Either String (InterfaceEnv, InterfaceOutputs)
interfaceEnv states interface = go Nothing states
where
go (Just firsterr) [] = Left firsterr
go Nothing [] = interfaceEnv' (ComputeState mempty mempty mempty) interface
go firsterr (state:rest) = case interfaceEnv' state interface of
Right v -> Right v
Left e
| null rest -> Left (fromMaybe e firsterr)
| otherwise -> go (firsterr <|> Just e) rest
interfaceEnv' :: ComputeState -> Interface -> Either String (InterfaceEnv, InterfaceOutputs)
interfaceEnv' state interface@(Interface i) =
case partitionEithers (mapMaybe go i) of
([], r) -> Right
( InterfaceEnv (map (\(f, v) -> (fromField f, v)) r)
, interfaceOutputs state interface
)
(problems, _) -> Left $ unlines problems
where
go (InterfaceInput field desc) =
case M.lookup field (computeInputs state) of
Just (ComputeInput key _file) -> Just $
Right (field, Left key)
Nothing -> Just $
Left $ "Missing required input \"" ++ fromField field ++ "\" -- " ++ desc
go (InterfaceOptionalInput field _desc) =
case M.lookup field (computeInputs state) of
Just (ComputeInput key _file) -> Just $
Right (field, Left key)
Nothing -> Nothing
go (InterfaceValue field desc) =
case M.lookup field (computeValues state) of
Just (ComputeValue v) -> Just $
Right (field, Right v)
Nothing -> Just $
Left $ "Missing required value \"" ++ fromField field ++ "\" -- " ++ desc
go (InterfaceOptionalValue field _desc) =
case M.lookup field (computeValues state) of
Just (ComputeValue v) -> Just $
Right (field, Right v)
Nothing -> Nothing
go (InterfaceOutput _ _) = Nothing
go InterfaceReproducible = Nothing
interfaceOutputs :: ComputeState -> Interface -> InterfaceOutputs
interfaceOutputs state (Interface interface) =
InterfaceOutputs $ M.fromList $ mapMaybe go interface
where
go (InterfaceOutput field _) = do
ComputeOutput key <- M.lookup field (computeOutputs state)
Just (field, key)
go _ = Nothing
computeProgramEnvironment :: InterfaceEnv -> Annex [(String, String)]
computeProgramEnvironment (InterfaceEnv ienv) = do
computeProgramEnvironment :: ComputeState -> Annex [(String, String)]
computeProgramEnvironment st = do
environ <- filter (caninherit . fst) <$> liftIO getEnvironment
interfaceenv <- mapM go ienv
return $ environ ++ interfaceenv
let addenv = mapMaybe go (computeParams st)
return $ environ ++ addenv
where
envprefix = "ANNEX_COMPUTE_"
caninherit v = not (envprefix `isPrefixOf` v)
go (f, Right v) = return (envprefix ++ f, v)
go (f, Left k) =
ifM (inAnnex k)
( do
objloc <- calcRepo (gitAnnexLocation k)
return (envprefix ++ f, fromOsPath objloc)
, giveup "missing an input to the computation"
)
go p
| '=' `elem` p =
let (f, v) = separate (== '=') p
in Just (envprefix ++ f, v)
| otherwise = Nothing
newtype ImmutableState = ImmutableState Bool
runComputeProgram
:: ComputeProgram
-> Key
-> AssociatedFile
-> OsPath
-> MeterUpdate
-> VerifyConfig
-> (InterfaceEnv, InterfaceOutputs)
-> Annex Verification
runComputeProgram (ComputeProgram program) k _af dest p vc (ienv, InterfaceOutputs iout) = do
environ <- computeProgramEnvironment ienv
-> ComputeState
-> ImmutableState
-> (OsPath -> Annex (Key, Maybe OsPath))
-> (ComputeState -> OsPath -> Annex v)
-> Annex v
runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) getinputcontent cont =
withOtherTmp $ \tmpdir ->
go environ tmpdir
go tmpdir
`finally` liftIO (removeDirectoryRecursive tmpdir)
where
go environ tmpdir = do
let pr = (proc program [])
{ cwd = Just $ fromOsPath tmpdir
go tmpdir = do
environ <- computeProgramEnvironment state
let pr = (proc program (computeParams state))
{ cwd = Just (fromOsPath tmpdir)
, std_in = CreatePipe
, std_out = CreatePipe
, env = Just environ
}
computing <- liftIO $ withCreateProcess pr $
processoutput mempty tmpdir
finish computing tmpdir
state' <- bracket
(liftIO $ createProcess pr)
(liftIO . cleanupProcess)
(getinput state tmpdir)
cont state' tmpdir
processoutput computing tmpdir _ (Just h) _ pid =
hGetLineUntilExitOrEOF pid h >>= \case
getinput state' tmpdir p =
liftIO (hGetLineUntilExitOrEOF (processHandle p) (stdoutHandle p)) >>= \case
Just l
| null l -> processoutput computing tmpdir Nothing (Just h) Nothing pid
| otherwise -> parseoutput computing l >>= \case
Just computing' ->
processoutput computing' tmpdir Nothing (Just h) Nothing pid
Nothing -> do
hClose h
ifM (checkSuccessProcess pid)
( giveup $ program ++ " output included an unparseable line: \"" ++ l ++ "\""
, giveup $ program ++ " exited unsuccessfully"
)
| null l -> getinput state' tmpdir p
| otherwise -> do
state'' <- parseoutput p state' l
getinput state'' tmpdir p
Nothing -> do
hClose h
unlessM (checkSuccessProcess pid) $
liftIO $ hClose (stdoutHandle p)
liftIO $ hClose (stdinHandle p)
unlessM (liftIO $ checkSuccessProcess (processHandle p)) $
giveup $ program ++ " exited unsuccessfully"
return computing
processoutput _ _ _ _ _ _ = error "internal"
return state'
parseoutput computing l = case Proto.parseMessage l of
Just (Computing field file) ->
case M.lookup field iout of
Just key -> do
when (key == k) $
-- XXX can start watching the file and updating progess now
return ()
return $ Just $
M.insert key (toRawFilePath file) computing
Nothing -> return (Just computing)
Just (Progress percent) -> do
parseoutput p state' l = case Proto.parseMessage l of
Just (ProcessInput f) ->
let knowninput = M.member f (computeInputs state')
in checkimmutable knowninput l $ do
(k, mp) <- getinputcontent (toOsPath f)
liftIO $ hPutStrLn (stdinHandle p) $
maybe "" fromOsPath mp
return $ if knowninput
then state'
else state'
{ computeInputs =
M.insert f k
(computeInputs state')
}
Just (ProcessOutput f) ->
let knownoutput = M.member f (computeOutputs state')
in checkimmutable knownoutput l $
return $ if knownoutput
then state'
else state'
{ computeOutputs =
M.insert f Nothing
(computeOutputs state')
}
Just (ProcessProgress percent) -> do
-- XXX
return Nothing
Nothing -> return Nothing
finish computing tmpdir = do
case M.lookup k computing of
Nothing -> giveup $ program ++ " exited successfully, but failed to output a filename"
Just file -> do
let file' = tmpdir </> file
unlessM (liftIO $ doesFileExist file') $
giveup $ program ++ " exited sucessfully, but failed to write the computed file"
catchNonAsync (liftIO $ moveFile file' dest)
(\err -> giveup $ "failed to move the computed file: " ++ show err)
return state'
Just ProcessReproducible ->
return $ state' { computeReproducible = True }
Nothing -> giveup $
program ++ " output included an unparseable line: \"" ++ l ++ "\""
checkimmutable True _ a = a
checkimmutable False l a
| not immutablestate = a
| otherwise = giveup $
program ++ " is not behaving the same way it used to, now outputting: \"" ++ l ++ "\""
computeKey :: RemoteStateHandle -> ComputeProgram -> Key -> AssociatedFile -> OsPath -> MeterUpdate -> VerifyConfig -> Annex Verification
computeKey rs (ComputeProgram program) k af dest p vc = do
states <- map snd . sortOn fst -- least expensive probably
<$> getComputeStates rs k
case mapMaybe computeskey states of
((keyfile, state):_) -> runComputeProgram
(ComputeProgram program)
state
(ImmutableState True)
(getinputcontent state)
(go keyfile)
[] -> giveup "Missing compute state"
where
getinputcontent state f =
case M.lookup (fromOsPath f) (computeInputs state) of
Just inputkey -> do
obj <- calcRepo (gitAnnexLocation inputkey)
-- XXX get input object when not present
return (inputkey, Just obj)
Nothing -> error "internal"
computeskey state =
case M.keys $ M.filter (== Just k) (computeOutputs state) of
(keyfile : _) -> Just (keyfile, state)
[] -> Nothing
go keyfile state tmpdir = do
let keyfile' = tmpdir </> toOsPath keyfile
unlessM (liftIO $ doesFileExist keyfile') $
giveup $ program ++ " exited sucessfully, but failed to write the computed file"
catchNonAsync (liftIO $ moveFile keyfile' dest)
(\err -> giveup $ "failed to move the computed file: " ++ show err)
-- Try to move any other computed object files into the annex.
forM_ (M.toList computing) $ \(key, file) ->
when (k /= key) $ do
let file' = tmpdir </> file
whenM (liftIO $ doesFileExist file') $
whenM (verifyKeyContentPostRetrieval RetrievalAllKeysSecure vc verification k file') $
void $ tryNonAsync $ moveAnnex k file'
forM_ (M.toList $ computeOutputs state) $ \case
(file, (Just key)) ->
when (k /= key) $ do
let file' = tmpdir </> toOsPath file
whenM (liftIO $ doesFileExist file') $
whenM (verifyKeyContentPostRetrieval RetrievalAllKeysSecure vc verification k file') $
void $ tryNonAsync $ moveAnnex k file'
_ -> noop
return verification
@ -532,27 +407,13 @@ runComputeProgram (ComputeProgram program) k _af dest p vc (ienv, InterfaceOutpu
-- verification.
verification = MustVerify
computeKey :: RemoteStateHandle -> ComputeProgram -> TMVar (Maybe Interface) -> Key -> AssociatedFile -> OsPath -> MeterUpdate -> VerifyConfig -> Annex Verification
computeKey rs program iv k af dest p vc =
liftIO (getInterfaceCached program iv) >>= \case
Left err -> giveup err
Right interface -> do
states <- map snd . sortOn fst
<$> getComputeStates rs k
either giveup (runComputeProgram program k af dest p vc)
(interfaceEnv states interface)
-- Make sure that the compute state has everything needed by
-- the program's current interface.
checkKey :: RemoteStateHandle -> ComputeProgram -> TMVar (Maybe Interface) -> Key -> Annex Bool
checkKey rs program iv k = do
states <- map snd <$> getComputeStates rs k
liftIO (getInterfaceCached program iv) >>= \case
Left err -> giveup err
Right interface ->
case interfaceEnv states interface of
Right _ -> return True
Left _ -> return False
-- Make sure that the compute state exists.
checkKey :: RemoteStateHandle -> Key -> Annex Bool
checkKey rs k = do
states <- getComputeStates rs k
if null states
then giveup "Missing compute state"
else return True
-- Unsetting the compute state will prevent computing the key.
dropKey :: RemoteStateHandle -> Maybe SafeDropProof -> Key -> Annex ()