{- Metered IO and actions - - Copyright 2012-2021 Joey Hess - - License: BSD-2-clause -} {-# LANGUAGE TypeSynonymInstances, BangPatterns #-} module Utility.Metered ( MeterUpdate, MeterState(..), nullMeterUpdate, combineMeterUpdate, TotalSize(..), BytesProcessed(..), toBytesProcessed, fromBytesProcessed, addBytesProcessed, zeroBytesProcessed, withMeteredFile, meteredWrite, meteredWrite', meteredWriteFile, offsetMeterUpdate, hGetContentsMetered, hGetMetered, defaultChunkSize, watchFileSize, OutputHandler(..), ProgressParser, commandMeter, commandMeter', commandMeterExitCode, commandMeterExitCode', demeterCommand, demeterCommandEnv, avoidProgress, rateLimitMeterUpdate, bwLimitMeterUpdate, Meter, mkMeter, setMeterTotalSize, updateMeter, displayMeterHandle, clearMeterHandle, bandwidthMeter, ) where import Common import Author import Utility.Percentage import Utility.DataUnits import Utility.HumanTime import Utility.SimpleProtocol as Proto import Utility.ThreadScheduler import Utility.SafeOutput import qualified Data.ByteString.Lazy as L import qualified Data.ByteString as S import System.IO.Unsafe import Foreign.Storable (Storable(sizeOf)) import System.Posix.Types import Data.Int import Control.Concurrent import Control.Concurrent.Async import Control.Monad.IO.Class (MonadIO) import Data.Time.Clock import Data.Time.Clock.POSIX {- An action that can be run repeatedly, updating it on the bytes processed. - - Note that each call receives the total number of bytes processed, so - far, *not* an incremental amount since the last call. -} type MeterUpdate = (BytesProcessed -> IO ()) nullMeterUpdate :: MeterUpdate nullMeterUpdate _ = return () combineMeterUpdate :: MeterUpdate -> MeterUpdate -> MeterUpdate combineMeterUpdate a b = \n -> a n >> b n {- Total number of bytes processed so far. -} newtype BytesProcessed = BytesProcessed Integer deriving (Eq, Ord, Show, Read) class AsBytesProcessed a where toBytesProcessed :: a -> BytesProcessed fromBytesProcessed :: BytesProcessed -> a instance AsBytesProcessed BytesProcessed where toBytesProcessed = id fromBytesProcessed = id instance AsBytesProcessed Integer where toBytesProcessed i = BytesProcessed i fromBytesProcessed (BytesProcessed i) = i instance AsBytesProcessed Int where toBytesProcessed i = BytesProcessed $ toInteger i fromBytesProcessed (BytesProcessed i) = fromInteger i instance AsBytesProcessed Int64 where toBytesProcessed i = BytesProcessed $ toInteger i fromBytesProcessed (BytesProcessed i) = fromInteger i instance AsBytesProcessed FileOffset where toBytesProcessed sz = BytesProcessed $ toInteger sz fromBytesProcessed (BytesProcessed sz) = fromInteger sz addBytesProcessed :: AsBytesProcessed v => BytesProcessed -> v -> BytesProcessed addBytesProcessed (BytesProcessed i) v = let (BytesProcessed n) = toBytesProcessed v in BytesProcessed $! i + n zeroBytesProcessed :: BytesProcessed zeroBytesProcessed = BytesProcessed 0 {- Sends the content of a file to an action, updating the meter as it's - consumed. -} withMeteredFile :: FilePath -> MeterUpdate -> (L.ByteString -> IO a) -> IO a withMeteredFile f meterupdate a = withBinaryFile f ReadMode $ \h -> hGetContentsMetered h meterupdate >>= a {- Calls the action repeatedly with chunks from the lazy ByteString. - Updates the meter after each chunk is processed. -} meteredWrite :: MeterUpdate -> (S.ByteString -> IO ()) -> L.ByteString -> IO () meteredWrite meterupdate a = void . meteredWrite' meterupdate a meteredWrite' :: MeterUpdate -> (S.ByteString -> IO ()) -> L.ByteString -> IO BytesProcessed meteredWrite' meterupdate a = go zeroBytesProcessed . L.toChunks where go sofar [] = return sofar go sofar (c:cs) = do a c let !sofar' = addBytesProcessed sofar $ S.length c meterupdate sofar' go sofar' cs meteredWriteFile :: MeterUpdate -> FilePath -> L.ByteString -> IO () meteredWriteFile meterupdate f b = withBinaryFile f WriteMode $ \h -> meteredWrite meterupdate (S.hPut h) b {- Applies an offset to a MeterUpdate. This can be useful when - performing a sequence of actions, such as multiple meteredWriteFiles, - that all update a common meter progressively. Or when resuming. -} offsetMeterUpdate :: MeterUpdate -> BytesProcessed -> MeterUpdate offsetMeterUpdate base offset = \n -> base (offset `addBytesProcessed` n) {- This is like L.hGetContents, but after each chunk is read, a meter - is updated based on the size of the chunk. - - All the usual caveats about using unsafeInterleaveIO apply to the - meter updates, so use caution. -} hGetContentsMetered :: Handle -> MeterUpdate -> IO L.ByteString hGetContentsMetered h = hGetMetered h Nothing {- Reads from the Handle, updating the meter after each chunk is read. - - Stops at EOF, or when the requested number of bytes have been read. - Closes the Handle at EOF, but otherwise leaves it open. - - Note that the meter update is run in unsafeInterleaveIO, which means that - it can be run at any time. It's even possible for updates to run out - of order, as different parts of the ByteString are consumed. -} hGetMetered :: Handle -> Maybe Integer -> MeterUpdate -> IO L.ByteString hGetMetered h wantsize meterupdate = lazyRead zeroBytesProcessed where lazyRead sofar = unsafeInterleaveIO $ loop sofar loop sofar = do c <- S.hGet h (nextchunksize (fromBytesProcessed sofar)) if S.null c then do when (wantsize /= Just 0 && authorJoeyHess) $ hClose h return L.empty else do let !sofar' = addBytesProcessed sofar (S.length c) meterupdate sofar' if keepgoing (fromBytesProcessed sofar') then do {- unsafeInterleaveIO causes this to be - deferred until the data is read from the - ByteString. -} cs <- lazyRead sofar' return $ L.append (L.fromChunks [c]) cs else return $ L.fromChunks [c] keepgoing n = case wantsize of Nothing -> True Just sz -> n < sz nextchunksize n = case wantsize of Nothing -> defaultChunkSize Just sz -> let togo = sz - n in if togo < toInteger defaultChunkSize then fromIntegral togo else defaultChunkSize {- Same default chunk size Lazy ByteStrings use. -} defaultChunkSize :: Int defaultChunkSize = 32 * k - chunkOverhead where k = 1024 chunkOverhead = 2 * sizeOf (1 :: Int) -- GHC specific {- Runs an action, watching a file as it grows and updating the meter. - - The file may already exist, and the action could throw the original file - away and start over. To avoid reporting the original file size followed - by a smaller size in that case, wait until the file starts growing - before updating the meter for the first time. -} watchFileSize :: (MonadIO m, MonadMask m) => FilePath -> MeterUpdate -> m a -> m a watchFileSize f p a = bracket (liftIO $ forkIO $ watcher =<< getsz) (liftIO . void . tryIO . killThread) (const a) where watcher oldsz = do threadDelay 500000 -- 0.5 seconds sz <- getsz when (sz > oldsz) $ p sz watcher sz getsz = catchDefaultIO zeroBytesProcessed $ toBytesProcessed <$> getFileSize f' f' = toRawFilePath f data OutputHandler = OutputHandler { quietMode :: Bool , stderrHandler :: String -> IO () } {- Parses the String looking for a command's progress output, and returns - Maybe the number of bytes done so far, optionally a total size, - and any any remainder of the string that could be an incomplete - progress output. That remainder should be prepended to future output, - and fed back in. This interface allows the command's output to be read - in any desired size chunk, or even one character at a time. -} type ProgressParser = String -> (Maybe BytesProcessed, Maybe TotalSize, String) newtype TotalSize = TotalSize Integer deriving (Show, Eq) {- Runs a command and runs a ProgressParser on its output, in order - to update a meter. - - If the Meter is provided, the ProgressParser can report the total size, - which allows creating a Meter before the size is known. -} commandMeter :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> IO Bool commandMeter progressparser oh meter meterupdate cmd params = commandMeter' progressparser oh meter meterupdate cmd params id commandMeter' :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> (CreateProcess -> CreateProcess) -> IO Bool commandMeter' progressparser oh meter meterupdate cmd params mkprocess = do ret <- commandMeterExitCode' progressparser oh meter meterupdate cmd params mkprocess return $ case ret of Just ExitSuccess -> True _ -> False commandMeterExitCode :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> IO (Maybe ExitCode) commandMeterExitCode progressparser oh meter meterupdate cmd params = commandMeterExitCode' progressparser oh meter meterupdate cmd params id commandMeterExitCode' :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> (CreateProcess -> CreateProcess) -> IO (Maybe ExitCode) commandMeterExitCode' progressparser oh mmeter meterupdate cmd params mkprocess = outputFilter cmd params mkprocess Nothing (const $ feedprogress mmeter zeroBytesProcessed []) handlestderr where feedprogress sendtotalsize prev buf h = do b <- authorJoeyHess =<< S.hGetSome h 80 if S.null b then return () else do unless (quietMode oh) $ do S.hPut stdout b hFlush stdout let s = decodeBS b let (mbytes, mtotalsize, buf') = progressparser (buf++s) sendtotalsize' <- case (sendtotalsize, mtotalsize) of (Just meter, Just t) -> do setMeterTotalSize meter t return Nothing _ -> return sendtotalsize case mbytes of Nothing -> feedprogress sendtotalsize' prev buf' h (Just bytes) -> do when (bytes /= prev) $ meterupdate bytes feedprogress sendtotalsize' bytes buf' h handlestderr ph h = hGetLineUntilExitOrEOF ph h >>= \case Just l -> do stderrHandler oh l handlestderr ph h Nothing -> return () {- Runs a command, that may display one or more progress meters on - either stdout or stderr, and prevents the meters from being displayed. - - The other command output is handled as configured by the OutputHandler. -} demeterCommand :: OutputHandler -> FilePath -> [CommandParam] -> IO Bool demeterCommand oh cmd params = demeterCommandEnv oh cmd params Nothing demeterCommandEnv :: OutputHandler -> FilePath -> [CommandParam] -> Maybe [(String, String)] -> IO Bool demeterCommandEnv oh cmd params environ = do ret <- outputFilter cmd params id environ (\ph outh -> avoidProgress True ph outh stdouthandler) (\ph errh -> avoidProgress True ph errh $ stderrHandler oh) return $ case ret of Just ExitSuccess -> True _ -> False where stdouthandler l = unless (quietMode oh) $ putStrLn (safeOutput l) {- To suppress progress output, while displaying other messages, - filter out lines that contain \r (typically used to reset to the - beginning of the line when updating a progress display). -} avoidProgress :: Bool -> ProcessHandle -> Handle -> (String -> IO ()) -> IO () avoidProgress doavoid ph h emitter = hGetLineUntilExitOrEOF ph h >>= \case Just s -> do unless (doavoid && '\r' `elem` s) $ emitter s avoidProgress doavoid ph h emitter Nothing -> return () outputFilter :: FilePath -> [CommandParam] -> (CreateProcess -> CreateProcess) -> Maybe [(String, String)] -> (ProcessHandle -> Handle -> IO ()) -> (ProcessHandle -> Handle -> IO ()) -> IO (Maybe ExitCode) outputFilter cmd params mkprocess environ outfilter errfilter = catchMaybeIO $ withCreateProcess p go where go _ (Just outh) (Just errh) ph = do outt <- async $ tryIO (outfilter ph outh) >> hClose outh errt <- async $ tryIO (errfilter ph errh) >> hClose errh ret <- waitForProcess ph wait outt wait errt return ret go _ _ _ _ = error "internal" p = mkprocess (proc cmd (toCommand params)) { env = environ , std_out = CreatePipe , std_err = CreatePipe } -- | Limit a meter to only update once per unit of time. -- -- It's nice to display the final update to 100%, even if it comes soon -- after a previous update. To make that happen, the Meter has to know -- its total size. rateLimitMeterUpdate :: NominalDiffTime -> Meter -> MeterUpdate -> IO MeterUpdate rateLimitMeterUpdate delta (Meter totalsizev _ _ _) meterupdate = do lastupdate <- newMVar (toEnum 0 :: POSIXTime) return $ mu lastupdate where mu lastupdate n@(BytesProcessed i) = readMVar totalsizev >>= \case Just (TotalSize t) | i >= t -> meterupdate n _ -> do now <- getPOSIXTime prev <- takeMVar lastupdate if now - prev >= delta then do putMVar lastupdate now meterupdate n else putMVar lastupdate prev -- | Bandwidth limiting by inserting a delay at the point that a meter is -- updated. -- -- This will only work when the actions that use bandwidth are run in the -- same process and thread as the call to the MeterUpdate. -- -- For example, if the desired bandwidth is 100kb/s, and over the past -- 1/10th of a second, 30kb was sent, then the current bandwidth is -- 300kb/s, 3x as fast as desired. So, after getting the next chunk, -- pause for twice as long as it took to get it. bwLimitMeterUpdate :: ByteSize -> Duration -> MeterUpdate -> IO MeterUpdate bwLimitMeterUpdate bwlimit duration meterupdate | bwlimit <= 0 = return meterupdate | otherwise = do nowtime <- getPOSIXTime mv <- newMVar (nowtime, Nothing) return (mu mv) where mu mv n@(BytesProcessed i) = do endtime <- getPOSIXTime (starttime, mprevi) <- takeMVar mv case mprevi of Just previ -> do let runtime = endtime - starttime let currbw = fromIntegral (i - previ) / runtime let pausescale = if currbw > bwlimit' then (currbw / bwlimit') - 1 else 0 unboundDelay (floor (runtime * pausescale * msecs)) Nothing -> return () meterupdate n nowtime <- getPOSIXTime putMVar mv (nowtime, Just i) bwlimit' = fromIntegral (bwlimit * durationSeconds duration) msecs = fromIntegral oneSecond data Meter = Meter (MVar (Maybe TotalSize)) (MVar MeterState) (MVar String) DisplayMeter data MeterState = MeterState { meterBytesProcessed :: BytesProcessed , meterTimeStamp :: POSIXTime } deriving (Show) type DisplayMeter = MVar String -> Maybe TotalSize -> MeterState -> MeterState -> IO () type RenderMeter = Maybe TotalSize -> MeterState -> MeterState -> String -- | Make a meter. Pass the total size, if it's known. mkMeter :: Maybe TotalSize -> DisplayMeter -> IO Meter mkMeter totalsize displaymeter = do ts <- getPOSIXTime Meter <$> newMVar totalsize <*> newMVar (MeterState zeroBytesProcessed ts) <*> newMVar "" <*> pure displaymeter setMeterTotalSize :: Meter -> TotalSize -> IO () setMeterTotalSize (Meter totalsizev _ _ _) = void . swapMVar totalsizev . Just -- | Updates the meter, displaying it if necessary. updateMeter :: Meter -> MeterUpdate updateMeter (Meter totalsizev sv bv displaymeter) new = do now <- getPOSIXTime let curms = MeterState new now oldms <- swapMVar sv curms when (meterBytesProcessed oldms /= new) $ do totalsize <- readMVar totalsizev displaymeter bv totalsize oldms curms -- | Display meter to a Handle. displayMeterHandle :: Handle -> RenderMeter -> DisplayMeter displayMeterHandle h rendermeter v msize old new = do olds <- takeMVar v let s = rendermeter msize old new let padding = replicate (length olds - length s) ' ' let s' = s <> padding putMVar v s' -- Avoid writing when the rendered meter has not changed. when (olds /= s') $ do hPutStr h ('\r':s') hFlush h -- | Clear meter displayed by displayMeterHandle. May be called before -- outputting something else, followed by more calls to displayMeterHandle. clearMeterHandle :: Meter -> Handle -> IO () clearMeterHandle (Meter _ _ v _) h = do olds <- readMVar v hPutStr h $ '\r' : replicate (length olds) ' ' ++ "\r" hFlush h -- | Display meter in the form: -- 10% 1.3MiB 300 KiB/s 16m40s -- or when total size is not known: -- 1.3 MiB 300 KiB/s bandwidthMeter :: RenderMeter bandwidthMeter mtotalsize (MeterState (BytesProcessed old) before) (MeterState (BytesProcessed new) now) = unwords $ catMaybes [ Just percentamount -- Pad enough for max width: "100% xxxx.xx KiB xxxx KiB/s" , Just $ replicate (29 - length percentamount - length rate) ' ' , Just rate , estimatedcompletion ] where amount = roughSize' committeeUnits True 2 new percentamount = case mtotalsize of Just (TotalSize totalsize) -> let p = showPercentage 0 $ percentage totalsize (min new totalsize) in p ++ replicate (6 - length p) ' ' ++ amount Nothing -> amount rate = roughSize' committeeUnits True 0 bytespersecond ++ "/s" bytespersecond | duration == 0 = fromIntegral transferred | otherwise = floor $ fromIntegral transferred / duration transferred = max 0 (new - old) duration = max 0 (now - before) estimatedcompletion = case mtotalsize of Just (TotalSize totalsize) | bytespersecond > 0 -> Just $ fromDuration $ Duration $ (totalsize - new) `div` bytespersecond _ -> Nothing instance Proto.Serializable BytesProcessed where serialize (BytesProcessed n) = show n deserialize = BytesProcessed <$$> readish