diff --git a/Remote/Compute.hs b/Remote/Compute.hs index 18ebe950f7..3012337f3d 100644 --- a/Remote/Compute.hs +++ b/Remote/Compute.hs @@ -5,7 +5,19 @@ - Licensed under the GNU AGPL version 3 or higher. -} -module Remote.Compute (remote) where +{-# LANGUAGE OverloadedStrings #-} + +module Remote.Compute ( + remote, + Interface, + ComputeState(..), + setComputeState, + getComputeStates, + InterfaceEnv, + interfaceEnv, + getComputeProgram, + runComputeProgram, +) where import Annex.Common import Types.Remote @@ -18,21 +30,23 @@ import Remote.Helper.Special import Remote.Helper.ExportImport import Annex.SpecialRemote.Config import Annex.UUID +import Annex.Content +import Annex.Tmp import Logs.MetaData import Utility.Metered -import Utility.Hash import Utility.TimeStamp -import Git.FilePath +import Utility.Env import qualified Git import qualified Utility.SimpleProtocol as Proto +import Network.HTTP.Types.URI import Control.Concurrent.STM import Data.Time.Clock import Data.Either -import Data.Char -import Data.Ord +import Text.Read import qualified Data.Map as M import qualified Data.Set as S +import qualified Data.ByteString as B import qualified Data.Text as T import qualified Data.Text.Encoding as T @@ -106,19 +120,20 @@ computeConfigParser :: RemoteConfig -> Annex RemoteConfigParser computeConfigParser rc = do Interface interface <- case getComputeProgram rc of Left _ -> pure $ Interface [] - Right program -> liftIO (getInterfaceUncached program) >>= return . \case + Right program -> liftIO (getInterface program) >>= return . \case Left _ -> Interface [] Right interface -> interface let m = M.fromList $ mapMaybe collectfields interface - let ininterface f = case toField (fromProposedAccepted f) of - Just f' -> M.member f' m - Nothing -> False + let ininterface f = M.member (Field (fromProposedAccepted f)) m return $ RemoteConfigParser { remoteConfigFieldParsers = [ optionalStringParser programField (FieldDesc $ "compute program (must start with \"" ++ safetyPrefix ++ "\")") ] - , remoteConfigRestPassthrough = Just (ininterface, M.toList $ M.mapKeys fromField m) + , remoteConfigRestPassthrough = Just + ( ininterface + , M.toList $ M.mapKeys fromField m + ) } where collectfields (InterfaceInput f d) = Just (f, FieldDesc d) @@ -150,7 +165,7 @@ programField = Accepted "program" type Description = String -newtype Field = Field MetaField +newtype Field = Field { fromField :: String } deriving (Show, Eq, Ord) data InterfaceItem @@ -175,23 +190,33 @@ instance Proto.Receivable InterfaceItem where parseCommand "VALUE?" = Proto.parse2 InterfaceOptionalValue parseCommand "OUTPUT" = Proto.parse2 InterfaceOutput parseCommand "REPRODUCIBLE" = Proto.parse0 InterfaceReproducible + parseCommand _ = Proto.parseFail + +data ProcessOutput + = Computing Field FilePath + | Progress PercentFloat + deriving (Show, Eq) + +instance Proto.Receivable ProcessOutput where + parseCommand "COMPUTING" = Proto.parse2 Computing + parseCommand "PROGRESS" = Proto.parse1 Progress + parseCommand _ = Proto.parseFail instance Proto.Serializable Field where serialize = fromField - deserialize = toField + deserialize = Just . Field --- While MetaField is case insensitive, environment variable names are not, --- so make Field always lower cased. -toField :: String -> Maybe Field -toField f = Field <$> toMetaField (T.pack (map toLower f)) +newtype PercentFloat = PercentFloat Float + deriving (Show, Eq) -fromField :: Field -> String -fromField (Field f) = T.unpack (fromMetaField f) +instance Proto.Serializable PercentFloat where + serialize (PercentFloat p) = show p + deserialize s = PercentFloat <$> readMaybe s -getInterface :: ComputeProgram -> TMVar (Maybe Interface) -> IO (Either String Interface) -getInterface program iv = +getInterfaceCached :: ComputeProgram -> TMVar (Maybe Interface) -> IO (Either String Interface) +getInterfaceCached program iv = atomically (takeTMVar iv) >>= \case - Nothing -> getInterfaceUncached program >>= \case + Nothing -> getInterface program >>= \case Left err -> do atomically $ putTMVar iv Nothing return (Left err) @@ -202,8 +227,8 @@ getInterface program iv = atomically $ putTMVar iv (Just interface) return (Right interface) -getInterfaceUncached :: ComputeProgram -> IO (Either String Interface) -getInterfaceUncached (ComputeProgram program) = +getInterface :: ComputeProgram -> IO (Either String Interface) +getInterface (ComputeProgram program) = catchMaybeIO (readProcess program ["interface"]) >>= \case Nothing -> return $ Left $ "Failed to run " ++ program Just output -> return $ case parseInterface output of @@ -235,148 +260,294 @@ data ComputeState = ComputeState { computeInputs :: M.Map Field ComputeInput , computeValues :: M.Map Field ComputeValue , computeOutputs :: M.Map Field ComputeOutput - , computeTimeEstimate :: NominalDiffTime } deriving (Show, Eq) --- Generates a hash of a ComputeState. --- --- This is used as a short unique identifier in the metadata fields, --- since more than one ComputeState may be stored in the compute remote's --- metadata for a given Key. --- --- A md5 is fine for this. It does not need to protect against intentional --- collisions. And 2^64 is a sufficiently small chance of accidental --- collision. -hashComputeState :: ComputeState -> String -hashComputeState state = show $ md5s $ - mconcat (map (go goi) (M.toAscList (computeInputs state))) - <> - mconcat (map (go gov) (M.toAscList (computeValues state))) - <> - mconcat (map (go goo) (M.toAscList (computeOutputs state))) - <> - encodeBS (show (computeTimeEstimate state)) +{- Formats a ComputeState as an URL query string. + - + - Prefixes fields with "k" and "f" for computeInputs, with + - "v" for computeValues and "o" for computeOutputs. + - + - When the passed Key is an output, rather than duplicate it + - in the query string, that output has no value. + - + - Fields in the query string are sorted. This is in order to ensure + - that the same ComputeState is always formatted the same way. + - + - Example: "ffoo=somefile&kfoo=WORM--foo&oresult&vbar=11" + -} +formatComputeState :: Key -> ComputeState -> B.ByteString +formatComputeState k st = renderQuery False $ sortOn fst $ concat + [ concatMap formatinput $ M.toList (computeInputs st) + , map formatvalue $ M.toList (computeValues st) + , map formatoutput $ M.toList (computeOutputs st) + ] where - go c (Field f, v) = T.encodeUtf8 (fromMetaField f) <> c v - goi (ComputeInput k f) = serializeKey' k <> encodeBS f - gov (ComputeValue s) = encodeBS s - goo (ComputeOutput k) = serializeKey' k + formatinput (f, ComputeInput key file) = + [ ("k" <> fb, Just (serializeKey' key)) + , ("f" <> fb, Just (toRawFilePath file)) + ] + where + fb = encodeBS (fromField f) + formatvalue (f, ComputeValue v) = + ("v" <> encodeBS (fromField f), Just (encodeBS v)) + formatoutput (f, ComputeOutput key) = + ("o" <> encodeBS (fromField f), + if key == k + then Nothing + else Just (serializeKey' key) + ) -computeStateMetaData :: ComputeState -> MetaData -computeStateMetaData = undefined - --- FIXME: Need to unswizzle the mixed up metadata based on hash prefixes. -metaDataComputeStates :: MetaData -> [ComputeState] -metaDataComputeStates (MetaData m) = - go (ComputeState mempty mempty mempty 0) (M.toList m) +parseComputeState :: Key -> B.ByteString -> Maybe ComputeState +parseComputeState k b = + let q = parseQuery b + st = go emptycomputestate (M.fromList q) q + in if st == emptycomputestate then Nothing else Just st where - go c ((f,v):rest) = - let c' = case T.unpack (fromMetaField f) of - ('i':'n':'p':'u':'t':'-':f') -> case M.lookup m =<< toMetaField (T.pack ("key-" ++ f')) of - Nothing -> c - Just kv -> case deserializeKey' (fromMetaValue kv) of - Just k -> c - { computeInputs = - M.insert (toField f) - (ComputeInput k (decodeBS (fromMetaValue v))) - (computeOutputs c) - } - Nothing -> c - ('v':'a':'l':'u':'e':'-':f') -> c - { computeValues = - M.insert (toField f) - (ComputeValue (decodeBS (fromMetaValue v))) - (computeValues c) - } - ('o':'u':'t':'p':'u':'t':'-':f') -> - case deserializeKey' (fromMetaValue v) of - Just k -> c + emptycomputestate = ComputeState mempty mempty mempty + go c _ [] = c + go c m ((f, v):rest) = + let c' = fromMaybe c $ case decodeBS f of + ('f':f') -> do + file <- fromRawFilePath <$> v + kv <- M.lookup (encodeBS ('k':f')) m + key <- deserializeKey' =<< kv + Just $ c + { computeInputs = + M.insert (Field f') + (ComputeInput key file) + (computeInputs c) + } + ('v':f') -> do + val <- decodeBS <$> v + Just $ c + { computeValues = + M.insert (Field f') + (ComputeValue val) + (computeValues c) + } + ('o':f') -> case v of + Just kv -> do + key <- deserializeKey' kv + Just $ c { computeOutputs = - M.insert (toField f) - (ComputeOutput k) + M.insert (Field f') + (ComputeOutput key) (computeOutputs c) } - Nothing -> c - ('t':'i':'m':'e':'-':f') -> - case parsePOSIXTime (fromMetaValue v) of - Just t -> c { computeTimeEstimate = t } - Nothing -> c - _ -> c - in go c' rest + Nothing -> Just $ c + { computeOutputs = + M.insert (Field f') + (ComputeOutput k) + (computeOutputs c) + } + _ -> Nothing + in go c' m rest -getComputeStates :: RemoteStateHandle -> Key -> Annex [ComputeState] +{- The per remote metadata is used to store ComputeState. This allows + - recording multiple ComputeStates that generate the same key. + - + - The metadata fields are numbers (prefixed with "t" to make them legal + - field names), which are estimates of how long it might take to run + - the computation (in seconds). + -} +setComputeState :: RemoteStateHandle -> Key -> NominalDiffTime -> ComputeState -> Annex () +setComputeState rs k ts st = addRemoteMetaData k rs $ MetaData $ M.singleton + (mkMetaFieldUnchecked $ T.pack ('t':show (truncateResolution 1 ts))) + (S.singleton (MetaValue (CurrentlySet True) (formatComputeState k st))) + +getComputeStates :: RemoteStateHandle -> Key -> Annex [(NominalDiffTime, ComputeState)] getComputeStates rs k = do - RemoteMetaData _ m <- getCurrentRemoteMetaData rs k - return (metaDataComputeStates m) + RemoteMetaData _ (MetaData m) <- getCurrentRemoteMetaData rs k + return $ go [] (M.toList m) + where + go c [] = concat c + go c ((f, s) : rest) = + let sts = mapMaybe (parseComputeState k . fromMetaValue) + (S.toList s) + in case parsePOSIXTime (T.encodeUtf8 (T.drop 1 (fromMetaField f))) of + Just ts -> go (zip (repeat ts) sts : c) rest + Nothing -> go c rest -setComputeState :: RemoteStateHandle -> Key -> ComputeState -> Annex () -setComputeState rs k st = addRemoteMetaData k rs (computeStateMetaData st) +data InterfaceEnv = InterfaceEnv [(String, Either Key String)] + +data InterfaceOutputs = InterfaceOutputs (M.Map Field Key) {- Finds the first compute state that provides everything required by the - interface, and returns a list of what should be provided to the program - - in its environment. + - in its environment, and what outputs the program is expected to make. -} -interfaceEnv :: [ComputeState] -> Interface -> Either String [(String, Either Key String)] +interfaceEnv :: [ComputeState] -> Interface -> Either String (InterfaceEnv, InterfaceOutputs) interfaceEnv states interface = go Nothing states where go (Just firsterr) [] = Left firsterr - go Nothing [] = interfaceEnv' (ComputeState mempty mempty mempty 0) interface + go Nothing [] = interfaceEnv' (ComputeState mempty mempty mempty) interface go firsterr (state:rest) = case interfaceEnv' state interface of Right v -> Right v Left e | null rest -> Left (fromMaybe e firsterr) | otherwise -> go (firsterr <|> Just e) rest -interfaceEnv' :: ComputeState -> Interface -> Either String [(String, Either Key String)] -interfaceEnv' state (Interface interface) = - case partitionEithers (mapMaybe go interface) of - ([], env) -> Right $ - map (\(f, v) -> (fromField f, v)) env +interfaceEnv' :: ComputeState -> Interface -> Either String (InterfaceEnv, InterfaceOutputs) +interfaceEnv' state interface@(Interface i) = + case partitionEithers (mapMaybe go i) of + ([], r) -> Right + ( InterfaceEnv (map (\(f, v) -> (fromField f, v)) r) + , interfaceOutputs state interface + ) (problems, _) -> Left $ unlines problems where - go (InterfaceInput name desc) = - case M.lookup name (computeInputs state) of + go (InterfaceInput field desc) = + case M.lookup field (computeInputs state) of Just (ComputeInput key _file) -> Just $ - Right (name, Left key) + Right (field, Left key) Nothing -> Just $ - Left $ "Missing required input \"" ++ fromField name ++ "\" -- " ++ desc - go (InterfaceOptionalInput name desc) = - case M.lookup name (computeInputs state) of + Left $ "Missing required input \"" ++ fromField field ++ "\" -- " ++ desc + go (InterfaceOptionalInput field _desc) = + case M.lookup field (computeInputs state) of Just (ComputeInput key _file) -> Just $ - Right (name, Left key) + Right (field, Left key) Nothing -> Nothing - go (InterfaceValue name desc) = - case M.lookup name (computeValues state) of + go (InterfaceValue field desc) = + case M.lookup field (computeValues state) of Just (ComputeValue v) -> Just $ - Right (name, Right v) - nothing -> Just $ - Left $ "Missing required value \"" ++ fromField name ++ "\" -- " ++ desc - go (InterfaceOptionalValue name desc) = - case M.lookup name (computeValues state) of + Right (field, Right v) + Nothing -> Just $ + Left $ "Missing required value \"" ++ fromField field ++ "\" -- " ++ desc + go (InterfaceOptionalValue field _desc) = + case M.lookup field (computeValues state) of Just (ComputeValue v) -> Just $ - Right (name, Right v) + Right (field, Right v) Nothing -> Nothing go (InterfaceOutput _ _) = Nothing go InterfaceReproducible = Nothing +interfaceOutputs :: ComputeState -> Interface -> InterfaceOutputs +interfaceOutputs state (Interface interface) = + InterfaceOutputs $ M.fromList $ mapMaybe go interface + where + go (InterfaceOutput field _) = do + ComputeOutput key <- M.lookup field (computeOutputs state) + Just (field, key) + go _ = Nothing + +computeProgramEnvironment :: InterfaceEnv -> Annex [(String, String)] +computeProgramEnvironment (InterfaceEnv ienv) = do + environ <- filter (caninherit . fst) <$> liftIO getEnvironment + interfaceenv <- mapM go ienv + return $ environ ++ interfaceenv + where + envprefix = "ANNEX_COMPUTE_" + caninherit v = not (envprefix `isPrefixOf` v) + go (f, Right v) = return (envprefix ++ f, v) + go (f, Left k) = + ifM (inAnnex k) + ( do + objloc <- calcRepo (gitAnnexLocation k) + return (envprefix ++ f, fromOsPath objloc) + , giveup "missing an input to the computation" + ) + +runComputeProgram + :: ComputeProgram + -> Key + -> AssociatedFile + -> OsPath + -> MeterUpdate + -> VerifyConfig + -> (InterfaceEnv, InterfaceOutputs) + -> Annex Verification +runComputeProgram (ComputeProgram program) k _af dest p vc (ienv, InterfaceOutputs iout) = do + environ <- computeProgramEnvironment ienv + withOtherTmp $ \tmpdir -> + go environ tmpdir + `finally` liftIO (removeDirectoryRecursive tmpdir) + where + go environ tmpdir = do + let pr = (proc program []) + { cwd = Just $ fromOsPath tmpdir + , std_out = CreatePipe + , env = Just environ + } + computing <- liftIO $ withCreateProcess pr $ + processoutput mempty tmpdir + finish computing tmpdir + + processoutput computing tmpdir _ (Just h) _ pid = + hGetLineUntilExitOrEOF pid h >>= \case + Just l + | null l -> processoutput computing tmpdir Nothing (Just h) Nothing pid + | otherwise -> parseoutput computing l >>= \case + Just computing' -> + processoutput computing' tmpdir Nothing (Just h) Nothing pid + Nothing -> do + hClose h + ifM (checkSuccessProcess pid) + ( giveup $ program ++ " output included an unparseable line: \"" ++ l ++ "\"" + , giveup $ program ++ " exited unsuccessfully" + ) + Nothing -> do + hClose h + unlessM (checkSuccessProcess pid) $ + giveup $ program ++ " exited unsuccessfully" + return computing + processoutput _ _ _ _ _ _ = error "internal" + + parseoutput computing l = case Proto.parseMessage l of + Just (Computing field file) -> + case M.lookup field iout of + Just key -> do + when (key == k) $ + -- XXX can start watching the file and updating progess now + return () + return $ Just $ + M.insert key (toRawFilePath file) computing + Nothing -> return (Just computing) + Just (Progress percent) -> do + -- XXX + return Nothing + Nothing -> return Nothing + + finish computing tmpdir = do + case M.lookup k computing of + Nothing -> giveup $ program ++ " exited successfully, but failed to output a filename" + Just file -> do + let file' = tmpdir file + unlessM (liftIO $ doesFileExist file') $ + giveup $ program ++ " exited sucessfully, but failed to write the computed file" + catchNonAsync (liftIO $ moveFile file' dest) + (\err -> giveup $ "failed to move the computed file: " ++ show err) + + -- Try to move any other computed object files into the annex. + forM_ (M.toList computing) $ \(key, file) -> + when (k /= key) $ do + let file' = tmpdir file + whenM (liftIO $ doesFileExist file') $ + whenM (verifyKeyContentPostRetrieval RetrievalAllKeysSecure vc verification k file') $ + void $ tryNonAsync $ moveAnnex k file' + + return verification + + -- The program might not be reproducible, so require strong + -- verification. + verification = MustVerify + computeKey :: RemoteStateHandle -> ComputeProgram -> TMVar (Maybe Interface) -> Key -> AssociatedFile -> OsPath -> MeterUpdate -> VerifyConfig -> Annex Verification -computeKey rs program iv k _af dest p vc = - liftIO (getInterface program iv) >>= \case +computeKey rs program iv k af dest p vc = + liftIO (getInterfaceCached program iv) >>= \case Left err -> giveup err Right interface -> do - states <- sortBy (comparing computeTimeEstimate) + states <- map snd . sortOn fst <$> getComputeStates rs k - case interfaceEnv states interface of - Left err -> giveup err - Right ienv -> undefined -- TODO + either giveup (runComputeProgram program k af dest p vc) + (interfaceEnv states interface) -- Make sure that the compute state has everything needed by -- the program's current interface. checkKey :: RemoteStateHandle -> ComputeProgram -> TMVar (Maybe Interface) -> Key -> Annex Bool checkKey rs program iv k = do - states <- getComputeStates rs k - liftIO (getInterface program iv) >>= \case + states <- map snd <$> getComputeStates rs k + liftIO (getInterfaceCached program iv) >>= \case Left err -> giveup err Right interface -> case interfaceEnv states interface of