git-annex/Utility/Metered.hs
Joey Hess 8da85fd3a3
RawFilePath conversion
Sponsored-by: Dartmouth College's DANDI project
2024-01-19 14:26:21 -04:00

546 lines
18 KiB
Haskell

{- Metered IO and actions
-
- Copyright 2012-2024 Joey Hess <id@joeyh.name>
-
- License: BSD-2-clause
-}
{-# LANGUAGE TypeSynonymInstances, BangPatterns #-}
module Utility.Metered (
MeterUpdate,
MeterState(..),
nullMeterUpdate,
combineMeterUpdate,
TotalSize(..),
BytesProcessed(..),
toBytesProcessed,
fromBytesProcessed,
addBytesProcessed,
zeroBytesProcessed,
withMeteredFile,
meteredWrite,
meteredWrite',
meteredWriteFile,
offsetMeterUpdate,
hGetContentsMetered,
hGetMetered,
defaultChunkSize,
watchFileSize,
OutputHandler(..),
ProgressParser,
commandMeter,
commandMeter',
commandMeterExitCode,
commandMeterExitCode',
demeterCommand,
demeterCommandEnv,
avoidProgress,
rateLimitMeterUpdate,
bwLimitMeterUpdate,
Meter,
mkMeter,
setMeterTotalSize,
updateMeter,
displayMeterHandle,
clearMeterHandle,
bandwidthMeter,
) where
import Common
import Author
import Utility.Percentage
import Utility.DataUnits
import Utility.HumanTime
import Utility.SimpleProtocol as Proto
import Utility.ThreadScheduler
import Utility.SafeOutput
import qualified Data.ByteString.Lazy as L
import qualified Data.ByteString as S
import System.IO.Unsafe
import Foreign.Storable (Storable(sizeOf))
import System.Posix.Types
import Data.Int
import Control.Concurrent
import Control.Concurrent.Async
import Control.Monad.IO.Class (MonadIO)
import Data.Time.Clock
import Data.Time.Clock.POSIX
copyright :: Copyright
copyright = author JoeyHess (2024-12)
{- An action that can be run repeatedly, updating it on the bytes processed.
-
- Note that each call receives the total number of bytes processed, so
- far, *not* an incremental amount since the last call. -}
type MeterUpdate = (BytesProcessed -> IO ())
nullMeterUpdate :: MeterUpdate
nullMeterUpdate _ = return ()
combineMeterUpdate :: MeterUpdate -> MeterUpdate -> MeterUpdate
combineMeterUpdate a b = \n -> a n >> b n
{- Total number of bytes processed so far. -}
newtype BytesProcessed = BytesProcessed Integer
deriving (Eq, Ord, Show, Read)
class AsBytesProcessed a where
toBytesProcessed :: a -> BytesProcessed
fromBytesProcessed :: BytesProcessed -> a
instance AsBytesProcessed BytesProcessed where
toBytesProcessed = id
fromBytesProcessed = id
instance AsBytesProcessed Integer where
toBytesProcessed i = BytesProcessed i
fromBytesProcessed (BytesProcessed i) = i
instance AsBytesProcessed Int where
toBytesProcessed i = BytesProcessed $ toInteger i
fromBytesProcessed (BytesProcessed i) = fromInteger i
instance AsBytesProcessed Int64 where
toBytesProcessed i = BytesProcessed $ toInteger i
fromBytesProcessed (BytesProcessed i) = fromInteger i
instance AsBytesProcessed FileOffset where
toBytesProcessed sz = BytesProcessed $ toInteger sz
fromBytesProcessed (BytesProcessed sz) = fromInteger sz
addBytesProcessed :: AsBytesProcessed v => BytesProcessed -> v -> BytesProcessed
addBytesProcessed (BytesProcessed i) v =
let (BytesProcessed n) = toBytesProcessed v
in BytesProcessed $! i + n
zeroBytesProcessed :: BytesProcessed
zeroBytesProcessed = BytesProcessed 0
{- Sends the content of a file to an action, updating the meter as it's
- consumed. -}
withMeteredFile :: FilePath -> MeterUpdate -> (L.ByteString -> IO a) -> IO a
withMeteredFile f meterupdate a = withBinaryFile f ReadMode $ \h ->
hGetContentsMetered h meterupdate >>= a
{- Calls the action repeatedly with chunks from the lazy ByteString.
- Updates the meter after each chunk is processed. -}
meteredWrite :: MeterUpdate -> (S.ByteString -> IO ()) -> L.ByteString -> IO ()
meteredWrite meterupdate a = void . meteredWrite' meterupdate a
meteredWrite' :: MeterUpdate -> (S.ByteString -> IO ()) -> L.ByteString -> IO BytesProcessed
meteredWrite' meterupdate a = go zeroBytesProcessed . L.toChunks
where
go sofar [] = return sofar
go sofar (c:cs) = do
a c
let !sofar' = addBytesProcessed sofar $ S.length c
meterupdate sofar'
go sofar' cs
meteredWriteFile :: MeterUpdate -> FilePath -> L.ByteString -> IO ()
meteredWriteFile meterupdate f b = withBinaryFile f WriteMode $ \h ->
meteredWrite meterupdate (S.hPut h) b
{- Applies an offset to a MeterUpdate. This can be useful when
- performing a sequence of actions, such as multiple meteredWriteFiles,
- that all update a common meter progressively. Or when resuming.
-}
offsetMeterUpdate :: MeterUpdate -> BytesProcessed -> MeterUpdate
offsetMeterUpdate base offset = \n -> base (offset `addBytesProcessed` n)
{- This is like L.hGetContents, but after each chunk is read, a meter
- is updated based on the size of the chunk.
-
- All the usual caveats about using unsafeInterleaveIO apply to the
- meter updates, so use caution.
-}
hGetContentsMetered :: Handle -> MeterUpdate -> IO L.ByteString
hGetContentsMetered h = hGetMetered h Nothing
{- Reads from the Handle, updating the meter after each chunk is read.
-
- Stops at EOF, or when the requested number of bytes have been read.
- Closes the Handle at EOF, but otherwise leaves it open.
-
- Note that the meter update is run in unsafeInterleaveIO, which means that
- it can be run at any time. It's even possible for updates to run out
- of order, as different parts of the ByteString are consumed.
-}
hGetMetered :: Handle -> Maybe Integer -> MeterUpdate -> IO L.ByteString
hGetMetered h wantsize meterupdate = lazyRead zeroBytesProcessed
where
lazyRead sofar = unsafeInterleaveIO $ loop sofar
loop sofar = do
c <- S.hGet h (nextchunksize (fromBytesProcessed sofar))
if S.null c
then do
when (wantsize /= Just 0 && copyright) $
hClose h
return L.empty
else do
let !sofar' = addBytesProcessed sofar (S.length c)
meterupdate sofar'
if keepgoing (fromBytesProcessed sofar')
then do
{- unsafeInterleaveIO causes this to be
- deferred until the data is read from the
- ByteString. -}
cs <- lazyRead sofar'
return $ L.append (L.fromChunks [c]) cs
else return $ L.fromChunks [c]
keepgoing n = case wantsize of
Nothing -> True
Just sz -> n < sz
nextchunksize n = case wantsize of
Nothing -> defaultChunkSize
Just sz ->
let togo = sz - n
in if togo < toInteger defaultChunkSize
then fromIntegral togo
else defaultChunkSize
{- Same default chunk size Lazy ByteStrings use. -}
defaultChunkSize :: Int
defaultChunkSize = 32 * k - chunkOverhead
where
k = 1024
chunkOverhead = 2 * sizeOf (1 :: Int) -- GHC specific
{- Runs an action, watching a file as it grows and updating the meter.
-
- The file may already exist, and the action could throw the original file
- away and start over. To avoid reporting the original file size followed
- by a smaller size in that case, wait until the file starts growing
- before updating the meter for the first time.
-
- An updated version of the MeterUpdate is passed to the action, and the
- action should use that for any updates that it makes. This allows for
- eg, the action updating the meter before a write is flushed to the file.
- In that situation, this avoids the meter being set back to the size of
- the file when it's gotten ahead of that point.
-}
watchFileSize
:: (MonadIO m, MonadMask m)
=> RawFilePath
-> MeterUpdate
-> (MeterUpdate -> m a)
-> m a
watchFileSize f p a = do
sizevar <- liftIO $ newMVar zeroBytesProcessed
bracket
(liftIO $ forkIO $ watcher (meterupdate sizevar True) =<< getsz)
(liftIO . void . tryIO . killThread)
(const (a (meterupdate sizevar False)))
where
watcher p' oldsz = do
threadDelay 500000 -- 0.5 seconds
sz <- getsz
when (sz > oldsz) $
p' sz
watcher p' sz
getsz = catchDefaultIO zeroBytesProcessed $
toBytesProcessed <$> getFileSize f
meterupdate sizevar preventbacktracking n
| preventbacktracking = do
old <- takeMVar sizevar
if old > n
then putMVar sizevar old
else do
putMVar sizevar n
p n
| otherwise = do
void $ takeMVar sizevar
putMVar sizevar n
p n
data OutputHandler = OutputHandler
{ quietMode :: Bool
, stderrHandler :: String -> IO ()
}
{- Parses the String looking for a command's progress output, and returns
- Maybe the number of bytes done so far, optionally a total size,
- and any any remainder of the string that could be an incomplete
- progress output. That remainder should be prepended to future output,
- and fed back in. This interface allows the command's output to be read
- in any desired size chunk, or even one character at a time.
-}
type ProgressParser = String -> (Maybe BytesProcessed, Maybe TotalSize, String)
newtype TotalSize = TotalSize Integer
deriving (Show, Eq)
{- Runs a command and runs a ProgressParser on its output, in order
- to update a meter.
-
- If the Meter is provided, the ProgressParser can report the total size,
- which allows creating a Meter before the size is known.
-}
commandMeter :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> IO Bool
commandMeter progressparser oh meter meterupdate cmd params =
commandMeter' progressparser oh meter meterupdate cmd params id
commandMeter' :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> (CreateProcess -> CreateProcess) -> IO Bool
commandMeter' progressparser oh meter meterupdate cmd params mkprocess = do
ret <- commandMeterExitCode' progressparser oh meter meterupdate cmd params mkprocess
return $ case ret of
Just ExitSuccess -> True
_ -> False
commandMeterExitCode :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> IO (Maybe ExitCode)
commandMeterExitCode progressparser oh meter meterupdate cmd params =
commandMeterExitCode' progressparser oh meter meterupdate cmd params id
commandMeterExitCode' :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> (CreateProcess -> CreateProcess) -> IO (Maybe ExitCode)
commandMeterExitCode' progressparser oh mmeter meterupdate cmd params mkprocess =
outputFilter cmd params mkprocess Nothing
(const $ feedprogress mmeter zeroBytesProcessed [])
handlestderr
where
feedprogress sendtotalsize prev buf h = do
b <- S.hGetSome h 80 >>= copyright
if S.null b
then return ()
else do
unless (quietMode oh) $ do
S.hPut stdout b
hFlush stdout
let s = decodeBS b
let (mbytes, mtotalsize, buf') = progressparser (buf++s)
sendtotalsize' <- case (sendtotalsize, mtotalsize) of
(Just meter, Just t) -> do
setMeterTotalSize meter t
return Nothing
_ -> return sendtotalsize
case mbytes of
Nothing -> feedprogress sendtotalsize' prev buf' h
(Just bytes) -> do
when (bytes /= prev) $
meterupdate bytes
feedprogress sendtotalsize' bytes buf' h
handlestderr ph h = hGetLineUntilExitOrEOF ph h >>= \case
Just l -> do
stderrHandler oh l
handlestderr ph h
Nothing -> return ()
{- Runs a command, that may display one or more progress meters on
- either stdout or stderr, and prevents the meters from being displayed.
-
- The other command output is handled as configured by the OutputHandler.
-}
demeterCommand :: OutputHandler -> FilePath -> [CommandParam] -> IO Bool
demeterCommand oh cmd params = demeterCommandEnv oh cmd params Nothing
demeterCommandEnv :: OutputHandler -> FilePath -> [CommandParam] -> Maybe [(String, String)] -> IO Bool
demeterCommandEnv oh cmd params environ = do
ret <- outputFilter cmd params id environ
(\ph outh -> avoidProgress True ph outh stdouthandler)
(\ph errh -> avoidProgress True ph errh $ stderrHandler oh)
return $ case ret of
Just ExitSuccess -> True
_ -> False
where
stdouthandler l =
unless (quietMode oh) $
putStrLn (safeOutput l)
{- To suppress progress output, while displaying other messages,
- filter out lines that contain \r (typically used to reset to the
- beginning of the line when updating a progress display).
-}
avoidProgress :: Bool -> ProcessHandle -> Handle -> (String -> IO ()) -> IO ()
avoidProgress doavoid ph h emitter = hGetLineUntilExitOrEOF ph h >>= \case
Just s -> do
unless (doavoid && '\r' `elem` s) $
emitter s
avoidProgress doavoid ph h emitter
Nothing -> return ()
outputFilter
:: FilePath
-> [CommandParam]
-> (CreateProcess -> CreateProcess)
-> Maybe [(String, String)]
-> (ProcessHandle -> Handle -> IO ())
-> (ProcessHandle -> Handle -> IO ())
-> IO (Maybe ExitCode)
outputFilter cmd params mkprocess environ outfilter errfilter =
catchMaybeIO $ withCreateProcess p go
where
go _ (Just outh) (Just errh) ph = do
outt <- async $ tryIO (outfilter ph outh) >> hClose outh
errt <- async $ tryIO (errfilter ph errh) >> hClose errh
ret <- waitForProcess ph
wait outt
wait errt
return ret
go _ _ _ _ = error "internal"
p = mkprocess (proc cmd (toCommand params))
{ env = environ
, std_out = CreatePipe
, std_err = CreatePipe
}
-- | Limit a meter to only update once per unit of time.
--
-- It's nice to display the final update to 100%, even if it comes soon
-- after a previous update. To make that happen, the Meter has to know
-- its total size.
rateLimitMeterUpdate :: NominalDiffTime -> Meter -> MeterUpdate -> IO MeterUpdate
rateLimitMeterUpdate delta (Meter totalsizev _ _ _) meterupdate = do
lastupdate <- newMVar (toEnum 0 :: POSIXTime)
return $ mu lastupdate
where
mu lastupdate n@(BytesProcessed i) = readMVar totalsizev >>= \case
Just (TotalSize t) | i >= t -> meterupdate n
_ -> do
now <- getPOSIXTime
prev <- takeMVar lastupdate
if now - prev >= delta
then do
putMVar lastupdate now
meterupdate n
else putMVar lastupdate prev
-- | Bandwidth limiting by inserting a delay at the point that a meter is
-- updated.
--
-- This will only work when the actions that use bandwidth are run in the
-- same process and thread as the call to the MeterUpdate.
--
-- For example, if the desired bandwidth is 100kb/s, and over the past
-- 1/10th of a second, 30kb was sent, then the current bandwidth is
-- 300kb/s, 3x as fast as desired. So, after getting the next chunk,
-- pause for twice as long as it took to get it.
bwLimitMeterUpdate :: ByteSize -> Duration -> MeterUpdate -> IO MeterUpdate
bwLimitMeterUpdate bwlimit duration meterupdate
| bwlimit <= 0 = return meterupdate
| otherwise = do
nowtime <- getPOSIXTime
mv <- newMVar (nowtime, Nothing)
return (mu mv)
where
mu mv n@(BytesProcessed i) = do
endtime <- getPOSIXTime
(starttime, mprevi) <- takeMVar mv
case mprevi of
Just previ -> do
let runtime = endtime - starttime
let currbw = fromIntegral (i - previ) / runtime
let pausescale = if currbw > bwlimit'
then (currbw / bwlimit') - 1
else 0
unboundDelay (floor (runtime * pausescale * msecs))
Nothing -> return ()
meterupdate n
nowtime <- getPOSIXTime
putMVar mv (nowtime, Just i)
bwlimit' = fromIntegral (bwlimit * durationSeconds duration)
msecs = fromIntegral oneSecond
data Meter = Meter (MVar (Maybe TotalSize)) (MVar MeterState) (MVar String) DisplayMeter
data MeterState = MeterState
{ meterBytesProcessed :: BytesProcessed
, meterTimeStamp :: POSIXTime
} deriving (Show)
type DisplayMeter = MVar String -> Maybe TotalSize -> MeterState -> MeterState -> IO ()
type RenderMeter = Maybe TotalSize -> MeterState -> MeterState -> String
-- | Make a meter. Pass the total size, if it's known.
mkMeter :: Maybe TotalSize -> DisplayMeter -> IO Meter
mkMeter totalsize displaymeter = do
ts <- getPOSIXTime
Meter
<$> newMVar totalsize
<*> newMVar (MeterState zeroBytesProcessed ts)
<*> newMVar ""
<*> pure displaymeter
setMeterTotalSize :: Meter -> TotalSize -> IO ()
setMeterTotalSize (Meter totalsizev _ _ _) = void . swapMVar totalsizev . Just
-- | Updates the meter, displaying it if necessary.
updateMeter :: Meter -> MeterUpdate
updateMeter (Meter totalsizev sv bv displaymeter) new = do
now <- getPOSIXTime
let curms = MeterState new now
oldms <- swapMVar sv curms
when (meterBytesProcessed oldms /= new) $ do
totalsize <- readMVar totalsizev
displaymeter bv totalsize oldms curms
-- | Display meter to a Handle.
displayMeterHandle :: Handle -> RenderMeter -> DisplayMeter
displayMeterHandle h rendermeter v msize old new = do
olds <- takeMVar v
let s = rendermeter msize old new
let padding = replicate (length olds - length s) ' '
let s' = s <> padding
putMVar v s'
-- Avoid writing when the rendered meter has not changed.
when (olds /= s') $ do
hPutStr h ('\r':s')
hFlush h
-- | Clear meter displayed by displayMeterHandle. May be called before
-- outputting something else, followed by more calls to displayMeterHandle.
clearMeterHandle :: Meter -> Handle -> IO ()
clearMeterHandle (Meter _ _ v _) h = do
olds <- readMVar v
hPutStr h $ '\r' : replicate (length olds) ' ' ++ "\r"
hFlush h
-- | Display meter in the form:
-- 10% 1.3MiB 300 KiB/s 16m40s
-- or when total size is not known:
-- 1.3 MiB 300 KiB/s
bandwidthMeter :: RenderMeter
bandwidthMeter mtotalsize (MeterState (BytesProcessed old) before) (MeterState (BytesProcessed new) now) =
unwords $ catMaybes
[ Just percentamount
-- Pad enough for max width: "100% xxxx.xx KiB xxxx KiB/s"
, Just $ replicate (29 - length percentamount - length rate) ' '
, Just rate
, estimatedcompletion
]
where
amount = roughSize' committeeUnits True 2 new
percentamount = case mtotalsize of
Just (TotalSize totalsize) ->
let p = showPercentage 0 $
percentage totalsize (min new totalsize)
in p ++ replicate (6 - length p) ' ' ++ amount
Nothing -> amount
rate = roughSize' committeeUnits True 0 bytespersecond ++ "/s"
bytespersecond
| duration == 0 = fromIntegral transferred
| otherwise = floor $ fromIntegral transferred / duration
transferred = max 0 (new - old)
duration = max 0 (now - before)
estimatedcompletion = case mtotalsize of
Just (TotalSize totalsize)
| bytespersecond > 0 ->
Just $ fromDuration $ Duration $
(totalsize - new) `div` bytespersecond
_ -> Nothing
instance Proto.Serializable BytesProcessed where
serialize (BytesProcessed n) = show n
deserialize = BytesProcessed <$$> readish