computation progress display
This commit is contained in:
parent
4a4a614b0d
commit
ccc454a791
4 changed files with 66 additions and 23 deletions
|
@ -101,6 +101,7 @@ perform o r = do
|
||||||
Remote.Compute.runComputeProgram program state
|
Remote.Compute.runComputeProgram program state
|
||||||
(Remote.Compute.ImmutableState False)
|
(Remote.Compute.ImmutableState False)
|
||||||
(getInputContent fast)
|
(getInputContent fast)
|
||||||
|
Nothing
|
||||||
(addComputed "adding" True r (reproducible o) chooseBackend Just fast)
|
(addComputed "adding" True r (reproducible o) chooseBackend Just fast)
|
||||||
next $ return True
|
next $ return True
|
||||||
|
|
||||||
|
|
|
@ -129,6 +129,7 @@ perform o r file origkey origstate = do
|
||||||
Remote.Compute.runComputeProgram program origstate
|
Remote.Compute.runComputeProgram program origstate
|
||||||
(Remote.Compute.ImmutableState False)
|
(Remote.Compute.ImmutableState False)
|
||||||
(getinputcontent program)
|
(getinputcontent program)
|
||||||
|
Nothing
|
||||||
(go program reproducibleconfig)
|
(go program reproducibleconfig)
|
||||||
next $ return True
|
next $ return True
|
||||||
where
|
where
|
||||||
|
|
|
@ -44,6 +44,7 @@ import qualified Annex.Transfer
|
||||||
import Logs.MetaData
|
import Logs.MetaData
|
||||||
import Logs.EquivilantKeys
|
import Logs.EquivilantKeys
|
||||||
import Logs.Location
|
import Logs.Location
|
||||||
|
import Messages.Progress
|
||||||
import Utility.Metered
|
import Utility.Metered
|
||||||
import Utility.TimeStamp
|
import Utility.TimeStamp
|
||||||
import Utility.Env
|
import Utility.Env
|
||||||
|
@ -59,6 +60,8 @@ import qualified Utility.SimpleProtocol as Proto
|
||||||
import Network.HTTP.Types.URI
|
import Network.HTTP.Types.URI
|
||||||
import Data.Time.Clock
|
import Data.Time.Clock
|
||||||
import Text.Read
|
import Text.Read
|
||||||
|
import Control.Concurrent.STM
|
||||||
|
import Control.Concurrent.Async
|
||||||
import qualified Data.Map as M
|
import qualified Data.Map as M
|
||||||
import qualified Data.Set as S
|
import qualified Data.Set as S
|
||||||
import qualified Data.ByteString as B
|
import qualified Data.ByteString as B
|
||||||
|
@ -209,8 +212,10 @@ newtype PercentFloat = PercentFloat Float
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq)
|
||||||
|
|
||||||
instance Proto.Serializable PercentFloat where
|
instance Proto.Serializable PercentFloat where
|
||||||
serialize (PercentFloat p) = show p
|
serialize (PercentFloat p) = show p ++ "%"
|
||||||
deserialize s = PercentFloat <$> readMaybe s
|
deserialize s = do
|
||||||
|
s' <- reverse <$> stripPrefix "%" (reverse s)
|
||||||
|
PercentFloat <$> readMaybe s'
|
||||||
|
|
||||||
data ComputeState = ComputeState
|
data ComputeState = ComputeState
|
||||||
{ computeParams :: [String]
|
{ computeParams :: [String]
|
||||||
|
@ -374,9 +379,11 @@ runComputeProgram
|
||||||
-> (OsPath -> Annex (Key, Maybe (Either Git.Sha OsPath)))
|
-> (OsPath -> Annex (Key, Maybe (Either Git.Sha OsPath)))
|
||||||
-- ^ get input file's content, or Nothing the input file's
|
-- ^ get input file's content, or Nothing the input file's
|
||||||
-- content is not available
|
-- content is not available
|
||||||
|
-> Maybe (Key, MeterUpdate)
|
||||||
|
-- ^ update meter for this key
|
||||||
-> (ComputeState -> OsPath -> NominalDiffTime -> Annex v)
|
-> (ComputeState -> OsPath -> NominalDiffTime -> Annex v)
|
||||||
-> Annex v
|
-> Annex v
|
||||||
runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) getinputcontent cont =
|
runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) getinputcontent meterkey cont =
|
||||||
withOtherTmp $ \othertmpdir ->
|
withOtherTmp $ \othertmpdir ->
|
||||||
withTmpDirIn othertmpdir (literalOsPath "compute") go
|
withTmpDirIn othertmpdir (literalOsPath "compute") go
|
||||||
where
|
where
|
||||||
|
@ -391,10 +398,10 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
|
||||||
}
|
}
|
||||||
showOutput
|
showOutput
|
||||||
starttime <- liftIO currentMonotonicTimestamp
|
starttime <- liftIO currentMonotonicTimestamp
|
||||||
state' <- bracket
|
state' <- withmeterfile $ \meterfile -> bracket
|
||||||
(liftIO $ createProcess pr)
|
(liftIO $ createProcess pr)
|
||||||
(liftIO . cleanupProcess)
|
(liftIO . cleanupProcess)
|
||||||
(getinput state tmpdir subdir)
|
(getinput tmpdir subdir state meterfile)
|
||||||
endtime <- liftIO currentMonotonicTimestamp
|
endtime <- liftIO currentMonotonicTimestamp
|
||||||
cont state' subdir (calcduration starttime endtime)
|
cont state' subdir (calcduration starttime endtime)
|
||||||
|
|
||||||
|
@ -408,13 +415,13 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
|
||||||
, return tmpdir
|
, return tmpdir
|
||||||
)
|
)
|
||||||
|
|
||||||
getinput state' tmpdir subdir p =
|
getinput tmpdir subdir state' meterfile p =
|
||||||
liftIO (hGetLineUntilExitOrEOF (processHandle p) (stdoutHandle p)) >>= \case
|
liftIO (hGetLineUntilExitOrEOF (processHandle p) (stdoutHandle p)) >>= \case
|
||||||
Just l
|
Just l
|
||||||
| null l -> getinput state' tmpdir subdir p
|
| null l -> getinput tmpdir subdir state' meterfile p
|
||||||
| otherwise -> do
|
| otherwise -> do
|
||||||
state'' <- parseoutput p tmpdir subdir state' l
|
state'' <- parseoutput p tmpdir subdir state' meterfile l
|
||||||
getinput state'' tmpdir subdir p
|
getinput tmpdir subdir state'' meterfile p
|
||||||
Nothing -> do
|
Nothing -> do
|
||||||
liftIO $ hClose (stdoutHandle p)
|
liftIO $ hClose (stdoutHandle p)
|
||||||
liftIO $ hClose (stdinHandle p)
|
liftIO $ hClose (stdinHandle p)
|
||||||
|
@ -422,7 +429,7 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
|
||||||
giveup $ program ++ " exited unsuccessfully"
|
giveup $ program ++ " exited unsuccessfully"
|
||||||
return state'
|
return state'
|
||||||
|
|
||||||
parseoutput p tmpdir subdir state' l = case Proto.parseMessage l of
|
parseoutput p tmpdir subdir state' meterfile l = case Proto.parseMessage l of
|
||||||
Just (ProcessInput f) -> do
|
Just (ProcessInput f) -> do
|
||||||
let f' = toOsPath f
|
let f' = toOsPath f
|
||||||
let knowninput = M.member f' (computeInputs state')
|
let knowninput = M.member f' (computeInputs state')
|
||||||
|
@ -453,7 +460,12 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
|
||||||
Just (ProcessOutput f) -> do
|
Just (ProcessOutput f) -> do
|
||||||
let f' = toOsPath f
|
let f' = toOsPath f
|
||||||
checksafefile tmpdir subdir f' "output"
|
checksafefile tmpdir subdir f' "output"
|
||||||
let knownoutput = M.member f' (computeOutputs state')
|
knownoutput <- case M.lookup f' (computeOutputs state') of
|
||||||
|
Nothing -> return False
|
||||||
|
Just mk -> do
|
||||||
|
when (mk == fmap fst meterkey) $
|
||||||
|
meterfile (subdir </> f')
|
||||||
|
return True
|
||||||
checkimmutable knownoutput "outputting" f' $
|
checkimmutable knownoutput "outputting" f' $
|
||||||
return $ if immutablestate
|
return $ if immutablestate
|
||||||
then state'
|
then state'
|
||||||
|
@ -463,12 +475,12 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
|
||||||
(computeOutputs state')
|
(computeOutputs state')
|
||||||
}
|
}
|
||||||
Just (ProcessProgress percent) -> do
|
Just (ProcessProgress percent) -> do
|
||||||
-- XXX
|
liftIO $ updatepercent percent
|
||||||
return state'
|
return state'
|
||||||
Just ProcessReproducible ->
|
Just ProcessReproducible ->
|
||||||
return $ state' { computeReproducible = True }
|
return $ state' { computeReproducible = True }
|
||||||
Nothing -> giveup $
|
Nothing -> giveup $
|
||||||
program ++ " output included an unparseable line: \"" ++ l ++ "\""
|
program ++ " output an unparseable line: \"" ++ l ++ "\""
|
||||||
|
|
||||||
checksafefile tmpdir subdir f fileaction = do
|
checksafefile tmpdir subdir f fileaction = do
|
||||||
let err problem = giveup $
|
let err problem = giveup $
|
||||||
|
@ -497,26 +509,57 @@ runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate)
|
||||||
liftIO . F.writeFile f =<< catObject gitsha
|
liftIO . F.writeFile f =<< catObject gitsha
|
||||||
return f
|
return f
|
||||||
|
|
||||||
|
withmeterfile a = case meterkey of
|
||||||
|
Nothing -> a (const noop)
|
||||||
|
Just (_, progress) -> do
|
||||||
|
filev <- liftIO newEmptyTMVarIO
|
||||||
|
endv <- liftIO $ newEmptyTMVarIO
|
||||||
|
let meterfile = void . liftIO
|
||||||
|
. atomically . tryPutTMVar filev
|
||||||
|
let endmeterfile = atomically $ putTMVar endv ()
|
||||||
|
tid <- liftIO $ async $ do
|
||||||
|
v <- liftIO $ atomically $
|
||||||
|
(Right <$> takeTMVar filev)
|
||||||
|
`orElse`
|
||||||
|
(Left <$> takeTMVar endv)
|
||||||
|
case v of
|
||||||
|
Right f -> watchFileSize f progress $ \_ ->
|
||||||
|
void $ liftIO $ atomically $
|
||||||
|
takeTMVar endv
|
||||||
|
Left () -> return ()
|
||||||
|
a meterfile
|
||||||
|
`finally` liftIO (endmeterfile >> wait tid)
|
||||||
|
|
||||||
|
updatepercent (PercentFloat percent) = case meterkey of
|
||||||
|
Nothing -> noop
|
||||||
|
Just (k, progress) -> case fromKey keySize k of
|
||||||
|
Nothing -> noop
|
||||||
|
Just sz ->
|
||||||
|
progress $ BytesProcessed $ floor $
|
||||||
|
fromIntegral sz * percent / 100
|
||||||
|
|
||||||
computationBehaviorChangeError :: ComputeProgram -> String -> OsPath -> Annex a
|
computationBehaviorChangeError :: ComputeProgram -> String -> OsPath -> Annex a
|
||||||
computationBehaviorChangeError (ComputeProgram program) requestdesc p =
|
computationBehaviorChangeError (ComputeProgram program) requestdesc p =
|
||||||
giveup $ program ++ " is not behaving the same way it used to, now " ++ requestdesc ++ ": " ++ fromOsPath p
|
giveup $ program ++ " is not behaving the same way it used to, now " ++ requestdesc ++ ": " ++ fromOsPath p
|
||||||
|
|
||||||
computeKey :: RemoteStateHandle -> ComputeProgram -> Key -> AssociatedFile -> OsPath -> MeterUpdate -> VerifyConfig -> Annex Verification
|
computeKey :: RemoteStateHandle -> ComputeProgram -> Key -> AssociatedFile -> OsPath -> MeterUpdate -> VerifyConfig -> Annex Verification
|
||||||
computeKey rs (ComputeProgram program) k _af dest p vc =
|
computeKey rs (ComputeProgram program) k _af dest meterupdate vc =
|
||||||
getComputeState rs k >>= \case
|
getComputeState rs k >>= \case
|
||||||
Just state ->
|
Just state ->
|
||||||
case computeskey state of
|
case computeskey state of
|
||||||
Just keyfile -> runComputeProgram
|
Just keyfile -> go state keyfile
|
||||||
(ComputeProgram program)
|
|
||||||
state
|
|
||||||
(ImmutableState True)
|
|
||||||
(getinputcontent state)
|
|
||||||
(postcompute keyfile)
|
|
||||||
Nothing -> missingstate
|
Nothing -> missingstate
|
||||||
Nothing -> missingstate
|
Nothing -> missingstate
|
||||||
where
|
where
|
||||||
missingstate = giveup "Missing compute state"
|
missingstate = giveup "Missing compute state"
|
||||||
|
|
||||||
|
go state keyfile = metered (Just meterupdate) k Nothing $ \_ p ->
|
||||||
|
runComputeProgram (ComputeProgram program) state
|
||||||
|
(ImmutableState True)
|
||||||
|
(getinputcontent state)
|
||||||
|
(Just (k, p))
|
||||||
|
(postcompute keyfile)
|
||||||
|
|
||||||
getinputcontent state f =
|
getinputcontent state f =
|
||||||
case M.lookup f (computeInputs state) of
|
case M.lookup f (computeInputs state) of
|
||||||
Just inputkey -> case keyGitSha inputkey of
|
Just inputkey -> case keyGitSha inputkey of
|
||||||
|
|
|
@ -11,8 +11,6 @@
|
||||||
* allow git-annex enableremote with program= explicitly specified,
|
* allow git-annex enableremote with program= explicitly specified,
|
||||||
without checking annex.security.allowed-compute-programs
|
without checking annex.security.allowed-compute-programs
|
||||||
|
|
||||||
* need progress bars for computations and implement PROGRESS message
|
|
||||||
|
|
||||||
* addcomputed should honor annex.addunlocked.
|
* addcomputed should honor annex.addunlocked.
|
||||||
|
|
||||||
* Perhaps recompute should write a new version of a file as an unlocked
|
* Perhaps recompute should write a new version of a file as an unlocked
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue