compute special remote mostly implemented

Except for some of the hard parts: progress displays, incremental
verification, and getting inputs before running a computation.

Untested! In order to test this, git-annex addcomputed needs to be
implemented.
This commit is contained in:
Joey Hess 2025-02-21 15:02:53 -04:00
parent 4f1eea9061
commit e0b46ef7ad
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38

View file

@ -5,7 +5,19 @@
- Licensed under the GNU AGPL version 3 or higher.
-}
module Remote.Compute (remote) where
{-# LANGUAGE OverloadedStrings #-}
module Remote.Compute (
remote,
Interface,
ComputeState(..),
setComputeState,
getComputeStates,
InterfaceEnv,
interfaceEnv,
getComputeProgram,
runComputeProgram,
) where
import Annex.Common
import Types.Remote
@ -18,21 +30,23 @@ import Remote.Helper.Special
import Remote.Helper.ExportImport
import Annex.SpecialRemote.Config
import Annex.UUID
import Annex.Content
import Annex.Tmp
import Logs.MetaData
import Utility.Metered
import Utility.Hash
import Utility.TimeStamp
import Git.FilePath
import Utility.Env
import qualified Git
import qualified Utility.SimpleProtocol as Proto
import Network.HTTP.Types.URI
import Control.Concurrent.STM
import Data.Time.Clock
import Data.Either
import Data.Char
import Data.Ord
import Text.Read
import qualified Data.Map as M
import qualified Data.Set as S
import qualified Data.ByteString as B
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
@ -106,19 +120,20 @@ computeConfigParser :: RemoteConfig -> Annex RemoteConfigParser
computeConfigParser rc = do
Interface interface <- case getComputeProgram rc of
Left _ -> pure $ Interface []
Right program -> liftIO (getInterfaceUncached program) >>= return . \case
Right program -> liftIO (getInterface program) >>= return . \case
Left _ -> Interface []
Right interface -> interface
let m = M.fromList $ mapMaybe collectfields interface
let ininterface f = case toField (fromProposedAccepted f) of
Just f' -> M.member f' m
Nothing -> False
let ininterface f = M.member (Field (fromProposedAccepted f)) m
return $ RemoteConfigParser
{ remoteConfigFieldParsers =
[ optionalStringParser programField
(FieldDesc $ "compute program (must start with \"" ++ safetyPrefix ++ "\")")
]
, remoteConfigRestPassthrough = Just (ininterface, M.toList $ M.mapKeys fromField m)
, remoteConfigRestPassthrough = Just
( ininterface
, M.toList $ M.mapKeys fromField m
)
}
where
collectfields (InterfaceInput f d) = Just (f, FieldDesc d)
@ -150,7 +165,7 @@ programField = Accepted "program"
type Description = String
newtype Field = Field MetaField
newtype Field = Field { fromField :: String }
deriving (Show, Eq, Ord)
data InterfaceItem
@ -175,23 +190,33 @@ instance Proto.Receivable InterfaceItem where
parseCommand "VALUE?" = Proto.parse2 InterfaceOptionalValue
parseCommand "OUTPUT" = Proto.parse2 InterfaceOutput
parseCommand "REPRODUCIBLE" = Proto.parse0 InterfaceReproducible
parseCommand _ = Proto.parseFail
data ProcessOutput
= Computing Field FilePath
| Progress PercentFloat
deriving (Show, Eq)
instance Proto.Receivable ProcessOutput where
parseCommand "COMPUTING" = Proto.parse2 Computing
parseCommand "PROGRESS" = Proto.parse1 Progress
parseCommand _ = Proto.parseFail
instance Proto.Serializable Field where
serialize = fromField
deserialize = toField
deserialize = Just . Field
-- While MetaField is case insensitive, environment variable names are not,
-- so make Field always lower cased.
toField :: String -> Maybe Field
toField f = Field <$> toMetaField (T.pack (map toLower f))
newtype PercentFloat = PercentFloat Float
deriving (Show, Eq)
fromField :: Field -> String
fromField (Field f) = T.unpack (fromMetaField f)
instance Proto.Serializable PercentFloat where
serialize (PercentFloat p) = show p
deserialize s = PercentFloat <$> readMaybe s
getInterface :: ComputeProgram -> TMVar (Maybe Interface) -> IO (Either String Interface)
getInterface program iv =
getInterfaceCached :: ComputeProgram -> TMVar (Maybe Interface) -> IO (Either String Interface)
getInterfaceCached program iv =
atomically (takeTMVar iv) >>= \case
Nothing -> getInterfaceUncached program >>= \case
Nothing -> getInterface program >>= \case
Left err -> do
atomically $ putTMVar iv Nothing
return (Left err)
@ -202,8 +227,8 @@ getInterface program iv =
atomically $ putTMVar iv (Just interface)
return (Right interface)
getInterfaceUncached :: ComputeProgram -> IO (Either String Interface)
getInterfaceUncached (ComputeProgram program) =
getInterface :: ComputeProgram -> IO (Either String Interface)
getInterface (ComputeProgram program) =
catchMaybeIO (readProcess program ["interface"]) >>= \case
Nothing -> return $ Left $ "Failed to run " ++ program
Just output -> return $ case parseInterface output of
@ -235,148 +260,294 @@ data ComputeState = ComputeState
{ computeInputs :: M.Map Field ComputeInput
, computeValues :: M.Map Field ComputeValue
, computeOutputs :: M.Map Field ComputeOutput
, computeTimeEstimate :: NominalDiffTime
}
deriving (Show, Eq)
-- Generates a hash of a ComputeState.
--
-- This is used as a short unique identifier in the metadata fields,
-- since more than one ComputeState may be stored in the compute remote's
-- metadata for a given Key.
--
-- A md5 is fine for this. It does not need to protect against intentional
-- collisions. And 2^64 is a sufficiently small chance of accidental
-- collision.
hashComputeState :: ComputeState -> String
hashComputeState state = show $ md5s $
mconcat (map (go goi) (M.toAscList (computeInputs state)))
<>
mconcat (map (go gov) (M.toAscList (computeValues state)))
<>
mconcat (map (go goo) (M.toAscList (computeOutputs state)))
<>
encodeBS (show (computeTimeEstimate state))
{- Formats a ComputeState as an URL query string.
-
- Prefixes fields with "k" and "f" for computeInputs, with
- "v" for computeValues and "o" for computeOutputs.
-
- When the passed Key is an output, rather than duplicate it
- in the query string, that output has no value.
-
- Fields in the query string are sorted. This is in order to ensure
- that the same ComputeState is always formatted the same way.
-
- Example: "ffoo=somefile&kfoo=WORM--foo&oresult&vbar=11"
-}
formatComputeState :: Key -> ComputeState -> B.ByteString
formatComputeState k st = renderQuery False $ sortOn fst $ concat
[ concatMap formatinput $ M.toList (computeInputs st)
, map formatvalue $ M.toList (computeValues st)
, map formatoutput $ M.toList (computeOutputs st)
]
where
go c (Field f, v) = T.encodeUtf8 (fromMetaField f) <> c v
goi (ComputeInput k f) = serializeKey' k <> encodeBS f
gov (ComputeValue s) = encodeBS s
goo (ComputeOutput k) = serializeKey' k
formatinput (f, ComputeInput key file) =
[ ("k" <> fb, Just (serializeKey' key))
, ("f" <> fb, Just (toRawFilePath file))
]
where
fb = encodeBS (fromField f)
formatvalue (f, ComputeValue v) =
("v" <> encodeBS (fromField f), Just (encodeBS v))
formatoutput (f, ComputeOutput key) =
("o" <> encodeBS (fromField f),
if key == k
then Nothing
else Just (serializeKey' key)
)
computeStateMetaData :: ComputeState -> MetaData
computeStateMetaData = undefined
-- FIXME: Need to unswizzle the mixed up metadata based on hash prefixes.
metaDataComputeStates :: MetaData -> [ComputeState]
metaDataComputeStates (MetaData m) =
go (ComputeState mempty mempty mempty 0) (M.toList m)
parseComputeState :: Key -> B.ByteString -> Maybe ComputeState
parseComputeState k b =
let q = parseQuery b
st = go emptycomputestate (M.fromList q) q
in if st == emptycomputestate then Nothing else Just st
where
go c ((f,v):rest) =
let c' = case T.unpack (fromMetaField f) of
('i':'n':'p':'u':'t':'-':f') -> case M.lookup m =<< toMetaField (T.pack ("key-" ++ f')) of
Nothing -> c
Just kv -> case deserializeKey' (fromMetaValue kv) of
Just k -> c
{ computeInputs =
M.insert (toField f)
(ComputeInput k (decodeBS (fromMetaValue v)))
(computeOutputs c)
}
Nothing -> c
('v':'a':'l':'u':'e':'-':f') -> c
{ computeValues =
M.insert (toField f)
(ComputeValue (decodeBS (fromMetaValue v)))
(computeValues c)
}
('o':'u':'t':'p':'u':'t':'-':f') ->
case deserializeKey' (fromMetaValue v) of
Just k -> c
emptycomputestate = ComputeState mempty mempty mempty
go c _ [] = c
go c m ((f, v):rest) =
let c' = fromMaybe c $ case decodeBS f of
('f':f') -> do
file <- fromRawFilePath <$> v
kv <- M.lookup (encodeBS ('k':f')) m
key <- deserializeKey' =<< kv
Just $ c
{ computeInputs =
M.insert (Field f')
(ComputeInput key file)
(computeInputs c)
}
('v':f') -> do
val <- decodeBS <$> v
Just $ c
{ computeValues =
M.insert (Field f')
(ComputeValue val)
(computeValues c)
}
('o':f') -> case v of
Just kv -> do
key <- deserializeKey' kv
Just $ c
{ computeOutputs =
M.insert (toField f)
(ComputeOutput k)
M.insert (Field f')
(ComputeOutput key)
(computeOutputs c)
}
Nothing -> c
('t':'i':'m':'e':'-':f') ->
case parsePOSIXTime (fromMetaValue v) of
Just t -> c { computeTimeEstimate = t }
Nothing -> c
_ -> c
in go c' rest
Nothing -> Just $ c
{ computeOutputs =
M.insert (Field f')
(ComputeOutput k)
(computeOutputs c)
}
_ -> Nothing
in go c' m rest
getComputeStates :: RemoteStateHandle -> Key -> Annex [ComputeState]
{- The per remote metadata is used to store ComputeState. This allows
- recording multiple ComputeStates that generate the same key.
-
- The metadata fields are numbers (prefixed with "t" to make them legal
- field names), which are estimates of how long it might take to run
- the computation (in seconds).
-}
setComputeState :: RemoteStateHandle -> Key -> NominalDiffTime -> ComputeState -> Annex ()
setComputeState rs k ts st = addRemoteMetaData k rs $ MetaData $ M.singleton
(mkMetaFieldUnchecked $ T.pack ('t':show (truncateResolution 1 ts)))
(S.singleton (MetaValue (CurrentlySet True) (formatComputeState k st)))
getComputeStates :: RemoteStateHandle -> Key -> Annex [(NominalDiffTime, ComputeState)]
getComputeStates rs k = do
RemoteMetaData _ m <- getCurrentRemoteMetaData rs k
return (metaDataComputeStates m)
RemoteMetaData _ (MetaData m) <- getCurrentRemoteMetaData rs k
return $ go [] (M.toList m)
where
go c [] = concat c
go c ((f, s) : rest) =
let sts = mapMaybe (parseComputeState k . fromMetaValue)
(S.toList s)
in case parsePOSIXTime (T.encodeUtf8 (T.drop 1 (fromMetaField f))) of
Just ts -> go (zip (repeat ts) sts : c) rest
Nothing -> go c rest
setComputeState :: RemoteStateHandle -> Key -> ComputeState -> Annex ()
setComputeState rs k st = addRemoteMetaData k rs (computeStateMetaData st)
data InterfaceEnv = InterfaceEnv [(String, Either Key String)]
data InterfaceOutputs = InterfaceOutputs (M.Map Field Key)
{- Finds the first compute state that provides everything required by the
- interface, and returns a list of what should be provided to the program
- in its environment.
- in its environment, and what outputs the program is expected to make.
-}
interfaceEnv :: [ComputeState] -> Interface -> Either String [(String, Either Key String)]
interfaceEnv :: [ComputeState] -> Interface -> Either String (InterfaceEnv, InterfaceOutputs)
interfaceEnv states interface = go Nothing states
where
go (Just firsterr) [] = Left firsterr
go Nothing [] = interfaceEnv' (ComputeState mempty mempty mempty 0) interface
go Nothing [] = interfaceEnv' (ComputeState mempty mempty mempty) interface
go firsterr (state:rest) = case interfaceEnv' state interface of
Right v -> Right v
Left e
| null rest -> Left (fromMaybe e firsterr)
| otherwise -> go (firsterr <|> Just e) rest
interfaceEnv' :: ComputeState -> Interface -> Either String [(String, Either Key String)]
interfaceEnv' state (Interface interface) =
case partitionEithers (mapMaybe go interface) of
([], env) -> Right $
map (\(f, v) -> (fromField f, v)) env
interfaceEnv' :: ComputeState -> Interface -> Either String (InterfaceEnv, InterfaceOutputs)
interfaceEnv' state interface@(Interface i) =
case partitionEithers (mapMaybe go i) of
([], r) -> Right
( InterfaceEnv (map (\(f, v) -> (fromField f, v)) r)
, interfaceOutputs state interface
)
(problems, _) -> Left $ unlines problems
where
go (InterfaceInput name desc) =
case M.lookup name (computeInputs state) of
go (InterfaceInput field desc) =
case M.lookup field (computeInputs state) of
Just (ComputeInput key _file) -> Just $
Right (name, Left key)
Right (field, Left key)
Nothing -> Just $
Left $ "Missing required input \"" ++ fromField name ++ "\" -- " ++ desc
go (InterfaceOptionalInput name desc) =
case M.lookup name (computeInputs state) of
Left $ "Missing required input \"" ++ fromField field ++ "\" -- " ++ desc
go (InterfaceOptionalInput field _desc) =
case M.lookup field (computeInputs state) of
Just (ComputeInput key _file) -> Just $
Right (name, Left key)
Right (field, Left key)
Nothing -> Nothing
go (InterfaceValue name desc) =
case M.lookup name (computeValues state) of
go (InterfaceValue field desc) =
case M.lookup field (computeValues state) of
Just (ComputeValue v) -> Just $
Right (name, Right v)
nothing -> Just $
Left $ "Missing required value \"" ++ fromField name ++ "\" -- " ++ desc
go (InterfaceOptionalValue name desc) =
case M.lookup name (computeValues state) of
Right (field, Right v)
Nothing -> Just $
Left $ "Missing required value \"" ++ fromField field ++ "\" -- " ++ desc
go (InterfaceOptionalValue field _desc) =
case M.lookup field (computeValues state) of
Just (ComputeValue v) -> Just $
Right (name, Right v)
Right (field, Right v)
Nothing -> Nothing
go (InterfaceOutput _ _) = Nothing
go InterfaceReproducible = Nothing
interfaceOutputs :: ComputeState -> Interface -> InterfaceOutputs
interfaceOutputs state (Interface interface) =
InterfaceOutputs $ M.fromList $ mapMaybe go interface
where
go (InterfaceOutput field _) = do
ComputeOutput key <- M.lookup field (computeOutputs state)
Just (field, key)
go _ = Nothing
computeProgramEnvironment :: InterfaceEnv -> Annex [(String, String)]
computeProgramEnvironment (InterfaceEnv ienv) = do
environ <- filter (caninherit . fst) <$> liftIO getEnvironment
interfaceenv <- mapM go ienv
return $ environ ++ interfaceenv
where
envprefix = "ANNEX_COMPUTE_"
caninherit v = not (envprefix `isPrefixOf` v)
go (f, Right v) = return (envprefix ++ f, v)
go (f, Left k) =
ifM (inAnnex k)
( do
objloc <- calcRepo (gitAnnexLocation k)
return (envprefix ++ f, fromOsPath objloc)
, giveup "missing an input to the computation"
)
runComputeProgram
:: ComputeProgram
-> Key
-> AssociatedFile
-> OsPath
-> MeterUpdate
-> VerifyConfig
-> (InterfaceEnv, InterfaceOutputs)
-> Annex Verification
runComputeProgram (ComputeProgram program) k _af dest p vc (ienv, InterfaceOutputs iout) = do
environ <- computeProgramEnvironment ienv
withOtherTmp $ \tmpdir ->
go environ tmpdir
`finally` liftIO (removeDirectoryRecursive tmpdir)
where
go environ tmpdir = do
let pr = (proc program [])
{ cwd = Just $ fromOsPath tmpdir
, std_out = CreatePipe
, env = Just environ
}
computing <- liftIO $ withCreateProcess pr $
processoutput mempty tmpdir
finish computing tmpdir
processoutput computing tmpdir _ (Just h) _ pid =
hGetLineUntilExitOrEOF pid h >>= \case
Just l
| null l -> processoutput computing tmpdir Nothing (Just h) Nothing pid
| otherwise -> parseoutput computing l >>= \case
Just computing' ->
processoutput computing' tmpdir Nothing (Just h) Nothing pid
Nothing -> do
hClose h
ifM (checkSuccessProcess pid)
( giveup $ program ++ " output included an unparseable line: \"" ++ l ++ "\""
, giveup $ program ++ " exited unsuccessfully"
)
Nothing -> do
hClose h
unlessM (checkSuccessProcess pid) $
giveup $ program ++ " exited unsuccessfully"
return computing
processoutput _ _ _ _ _ _ = error "internal"
parseoutput computing l = case Proto.parseMessage l of
Just (Computing field file) ->
case M.lookup field iout of
Just key -> do
when (key == k) $
-- XXX can start watching the file and updating progess now
return ()
return $ Just $
M.insert key (toRawFilePath file) computing
Nothing -> return (Just computing)
Just (Progress percent) -> do
-- XXX
return Nothing
Nothing -> return Nothing
finish computing tmpdir = do
case M.lookup k computing of
Nothing -> giveup $ program ++ " exited successfully, but failed to output a filename"
Just file -> do
let file' = tmpdir </> file
unlessM (liftIO $ doesFileExist file') $
giveup $ program ++ " exited sucessfully, but failed to write the computed file"
catchNonAsync (liftIO $ moveFile file' dest)
(\err -> giveup $ "failed to move the computed file: " ++ show err)
-- Try to move any other computed object files into the annex.
forM_ (M.toList computing) $ \(key, file) ->
when (k /= key) $ do
let file' = tmpdir </> file
whenM (liftIO $ doesFileExist file') $
whenM (verifyKeyContentPostRetrieval RetrievalAllKeysSecure vc verification k file') $
void $ tryNonAsync $ moveAnnex k file'
return verification
-- The program might not be reproducible, so require strong
-- verification.
verification = MustVerify
computeKey :: RemoteStateHandle -> ComputeProgram -> TMVar (Maybe Interface) -> Key -> AssociatedFile -> OsPath -> MeterUpdate -> VerifyConfig -> Annex Verification
computeKey rs program iv k _af dest p vc =
liftIO (getInterface program iv) >>= \case
computeKey rs program iv k af dest p vc =
liftIO (getInterfaceCached program iv) >>= \case
Left err -> giveup err
Right interface -> do
states <- sortBy (comparing computeTimeEstimate)
states <- map snd . sortOn fst
<$> getComputeStates rs k
case interfaceEnv states interface of
Left err -> giveup err
Right ienv -> undefined -- TODO
either giveup (runComputeProgram program k af dest p vc)
(interfaceEnv states interface)
-- Make sure that the compute state has everything needed by
-- the program's current interface.
checkKey :: RemoteStateHandle -> ComputeProgram -> TMVar (Maybe Interface) -> Key -> Annex Bool
checkKey rs program iv k = do
states <- getComputeStates rs k
liftIO (getInterface program iv) >>= \case
states <- map snd <$> getComputeStates rs k
liftIO (getInterfaceCached program iv) >>= \case
Left err -> giveup err
Right interface ->
case interfaceEnv states interface of