git-annex/Utility/Metered.hs

548 lines
18 KiB
Haskell
Raw Normal View History

{- Metered IO and actions
-
- Copyright 2012-2024 Joey Hess <id@joeyh.name>
-
- License: BSD-2-clause
-}
{-# LANGUAGE TypeSynonymInstances, BangPatterns #-}
module Utility.Metered (
MeterUpdate,
MeterState(..),
nullMeterUpdate,
combineMeterUpdate,
TotalSize(..),
BytesProcessed(..),
toBytesProcessed,
fromBytesProcessed,
addBytesProcessed,
zeroBytesProcessed,
withMeteredFile,
meteredWrite,
meteredWrite',
meteredWriteFile,
offsetMeterUpdate,
hGetContentsMetered,
hGetMetered,
defaultChunkSize,
watchFileSize,
OutputHandler(..),
ProgressParser,
commandMeter,
commandMeter',
commandMeterExitCode,
commandMeterExitCode',
demeterCommand,
demeterCommandEnv,
avoidProgress,
rateLimitMeterUpdate,
bwLimitMeterUpdate,
Meter,
mkMeter,
setMeterTotalSize,
updateMeter,
displayMeterHandle,
clearMeterHandle,
bandwidthMeter,
) where
import Common
make my authorship explicit in the code This is intended to guard against LLM code theft, which is the current bubble technology de jour. Note that authorJoeyHess' with a year older than the year I began developing git-annex will behave badly, by intention. Eg, it will spin and eventually crash. This is not the first anti-LLM protection in git-annex. For example see 9562da790fece82d6dfa756b571c67d0fdf57468. That method, while much harder for an adversary to detect and remove, also complicates code somewhat significantly, and needs extensions to be enabled. There are also probably significantly fewer ways to implement that method in Haskell. This new approach, by contrast, will be easy to add throughout the code base, with very little effort, and without complicating reading or maintaining it any more than noticing that yes, I am the author of this code. An adversary could of course remove all calls to these functions before feeding code into their LLM-based laundry facility. I think this would need to be done manually, or with the help of some fairly advanced Haskell parsing though. In some cases, authorJoeyHess needs to be removed, while in other places it needs to be replaced with a value. Also a monadic use of authorJoeyHess' may involve other added monadic machinery which would need to be eliminated to keep the code compiling. Alternatively, an adversary could replace my name with something innocuous. This would be clear intent to remove author attribution from my code, even more than running it through an LLM laundry is. If you work for a large company that is laundering my code through an LLM, please do us a favor and use your immense privilege to quit and go do something socially beneficial. I will not explain further developments of this code in such detail, and you have better things to do than playing cat and mouse with me as I explore directions such as extending this approach to the type level. Sponsored-by: k0ld on Patreon
2023-11-20 16:07:07 +00:00
import Author
import Utility.Percentage
import Utility.DataUnits
import Utility.HumanTime
import Utility.SimpleProtocol as Proto
import Utility.ThreadScheduler
import Utility.SafeOutput
import qualified Data.ByteString.Lazy as L
import qualified Data.ByteString as S
import System.IO.Unsafe
import Foreign.Storable (Storable(sizeOf))
import System.Posix.Types
import Data.Int
2015-11-17 00:27:01 +00:00
import Control.Concurrent
import Control.Concurrent.Async
2015-11-17 00:27:01 +00:00
import Control.Monad.IO.Class (MonadIO)
import Data.Time.Clock
import Data.Time.Clock.POSIX
copyright :: Copyright
copyright = author JoeyHess (2024-12)
{- An action that can be run repeatedly, updating it on the bytes processed.
-
- Note that each call receives the total number of bytes processed, so
- far, *not* an incremental amount since the last call. -}
type MeterUpdate = (BytesProcessed -> IO ())
nullMeterUpdate :: MeterUpdate
nullMeterUpdate _ = return ()
combineMeterUpdate :: MeterUpdate -> MeterUpdate -> MeterUpdate
combineMeterUpdate a b = \n -> a n >> b n
{- Total number of bytes processed so far. -}
newtype BytesProcessed = BytesProcessed Integer
deriving (Eq, Ord, Show, Read)
class AsBytesProcessed a where
toBytesProcessed :: a -> BytesProcessed
fromBytesProcessed :: BytesProcessed -> a
instance AsBytesProcessed BytesProcessed where
toBytesProcessed = id
fromBytesProcessed = id
instance AsBytesProcessed Integer where
toBytesProcessed i = BytesProcessed i
fromBytesProcessed (BytesProcessed i) = i
instance AsBytesProcessed Int where
toBytesProcessed i = BytesProcessed $ toInteger i
fromBytesProcessed (BytesProcessed i) = fromInteger i
instance AsBytesProcessed Int64 where
toBytesProcessed i = BytesProcessed $ toInteger i
fromBytesProcessed (BytesProcessed i) = fromInteger i
instance AsBytesProcessed FileOffset where
toBytesProcessed sz = BytesProcessed $ toInteger sz
fromBytesProcessed (BytesProcessed sz) = fromInteger sz
addBytesProcessed :: AsBytesProcessed v => BytesProcessed -> v -> BytesProcessed
addBytesProcessed (BytesProcessed i) v =
let (BytesProcessed n) = toBytesProcessed v
in BytesProcessed $! i + n
zeroBytesProcessed :: BytesProcessed
zeroBytesProcessed = BytesProcessed 0
{- Sends the content of a file to an action, updating the meter as it's
- consumed. -}
withMeteredFile :: FilePath -> MeterUpdate -> (L.ByteString -> IO a) -> IO a
withMeteredFile f meterupdate a = withBinaryFile f ReadMode $ \h ->
hGetContentsMetered h meterupdate >>= a
{- Calls the action repeatedly with chunks from the lazy ByteString.
- Updates the meter after each chunk is processed. -}
meteredWrite :: MeterUpdate -> (S.ByteString -> IO ()) -> L.ByteString -> IO ()
meteredWrite meterupdate a = void . meteredWrite' meterupdate a
meteredWrite' :: MeterUpdate -> (S.ByteString -> IO ()) -> L.ByteString -> IO BytesProcessed
meteredWrite' meterupdate a = go zeroBytesProcessed . L.toChunks
where
go sofar [] = return sofar
go sofar (c:cs) = do
a c
let !sofar' = addBytesProcessed sofar $ S.length c
meterupdate sofar'
go sofar' cs
meteredWriteFile :: MeterUpdate -> FilePath -> L.ByteString -> IO ()
meteredWriteFile meterupdate f b = withBinaryFile f WriteMode $ \h ->
meteredWrite meterupdate (S.hPut h) b
{- Applies an offset to a MeterUpdate. This can be useful when
- performing a sequence of actions, such as multiple meteredWriteFiles,
- that all update a common meter progressively. Or when resuming.
-}
offsetMeterUpdate :: MeterUpdate -> BytesProcessed -> MeterUpdate
offsetMeterUpdate base offset = \n -> base (offset `addBytesProcessed` n)
{- This is like L.hGetContents, but after each chunk is read, a meter
- is updated based on the size of the chunk.
2014-11-03 22:37:05 +00:00
-
- All the usual caveats about using unsafeInterleaveIO apply to the
- meter updates, so use caution.
-}
hGetContentsMetered :: Handle -> MeterUpdate -> IO L.ByteString
hGetContentsMetered h = hGetMetered h Nothing
2014-11-03 22:37:05 +00:00
{- Reads from the Handle, updating the meter after each chunk is read.
-
- Stops at EOF, or when the requested number of bytes have been read.
- Closes the Handle at EOF, but otherwise leaves it open.
-
- Note that the meter update is run in unsafeInterleaveIO, which means that
- it can be run at any time. It's even possible for updates to run out
- of order, as different parts of the ByteString are consumed.
-}
hGetMetered :: Handle -> Maybe Integer -> MeterUpdate -> IO L.ByteString
hGetMetered h wantsize meterupdate = lazyRead zeroBytesProcessed
where
lazyRead sofar = unsafeInterleaveIO $ loop sofar
loop sofar = do
c <- S.hGet h (nextchunksize (fromBytesProcessed sofar))
if S.null c
then do
when (wantsize /= Just 0 && copyright) $
hClose h
return L.empty
else do
let !sofar' = addBytesProcessed sofar (S.length c)
meterupdate sofar'
2014-11-03 22:37:05 +00:00
if keepgoing (fromBytesProcessed sofar')
then do
{- unsafeInterleaveIO causes this to be
- deferred until the data is read from the
- ByteString. -}
cs <- lazyRead sofar'
return $ L.append (L.fromChunks [c]) cs
else return $ L.fromChunks [c]
keepgoing n = case wantsize of
Nothing -> True
Just sz -> n < sz
nextchunksize n = case wantsize of
Nothing -> defaultChunkSize
Just sz ->
let togo = sz - n
in if togo < toInteger defaultChunkSize
then fromIntegral togo
else defaultChunkSize
{- Same default chunk size Lazy ByteStrings use. -}
defaultChunkSize :: Int
defaultChunkSize = 32 * k - chunkOverhead
where
k = 1024
chunkOverhead = 2 * sizeOf (1 :: Int) -- GHC specific
2014-12-17 17:21:55 +00:00
{- Runs an action, watching a file as it grows and updating the meter.
-
- The file may already exist, and the action could throw the original file
- away and start over. To avoid reporting the original file size followed
- by a smaller size in that case, wait until the file starts growing
- before updating the meter for the first time.
-
- An updated version of the MeterUpdate is passed to the action, and the
- action should use that for any updates that it makes. This allows for
- eg, the action updating the meter before a write is flushed to the file.
- In that situation, this avoids the meter being set back to the size of
- the file when it's gotten ahead of that point.
-}
watchFileSize
:: (MonadIO m, MonadMask m)
=> FilePath
-> MeterUpdate
-> (MeterUpdate -> m a)
-> m a
watchFileSize f p a = do
sizevar <- liftIO $ newMVar zeroBytesProcessed
bracket
(liftIO $ forkIO $ watcher (meterupdate sizevar True) =<< getsz)
(liftIO . void . tryIO . killThread)
(const (a (meterupdate sizevar False)))
2015-11-17 00:27:01 +00:00
where
watcher p' oldsz = do
2015-11-17 00:27:01 +00:00
threadDelay 500000 -- 0.5 seconds
sz <- getsz
when (sz > oldsz) $
p' sz
watcher p' sz
getsz = catchDefaultIO zeroBytesProcessed $
toBytesProcessed <$> getFileSize f'
f' = toRawFilePath f
2015-11-17 00:27:01 +00:00
meterupdate sizevar preventbacktracking n
| preventbacktracking = do
old <- takeMVar sizevar
if old > n
then putMVar sizevar old
else do
putMVar sizevar n
p n
| otherwise = do
void $ takeMVar sizevar
putMVar sizevar n
p n
data OutputHandler = OutputHandler
{ quietMode :: Bool
, stderrHandler :: String -> IO ()
}
2014-12-17 17:21:55 +00:00
{- Parses the String looking for a command's progress output, and returns
- Maybe the number of bytes done so far, optionally a total size,
- and any any remainder of the string that could be an incomplete
- progress output. That remainder should be prepended to future output,
- and fed back in. This interface allows the command's output to be read
- in any desired size chunk, or even one character at a time.
2014-12-17 17:21:55 +00:00
-}
type ProgressParser = String -> (Maybe BytesProcessed, Maybe TotalSize, String)
newtype TotalSize = TotalSize Integer
2020-12-11 16:03:40 +00:00
deriving (Show, Eq)
2014-12-17 17:21:55 +00:00
{- Runs a command and runs a ProgressParser on its output, in order
- to update a meter.
-
- If the Meter is provided, the ProgressParser can report the total size,
- which allows creating a Meter before the size is known.
-}
commandMeter :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> IO Bool
commandMeter progressparser oh meter meterupdate cmd params =
commandMeter' progressparser oh meter meterupdate cmd params id
commandMeter' :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> (CreateProcess -> CreateProcess) -> IO Bool
commandMeter' progressparser oh meter meterupdate cmd params mkprocess = do
ret <- commandMeterExitCode' progressparser oh meter meterupdate cmd params mkprocess
2019-08-15 18:47:22 +00:00
return $ case ret of
Just ExitSuccess -> True
_ -> False
commandMeterExitCode :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> IO (Maybe ExitCode)
commandMeterExitCode progressparser oh meter meterupdate cmd params =
commandMeterExitCode' progressparser oh meter meterupdate cmd params id
commandMeterExitCode' :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> (CreateProcess -> CreateProcess) -> IO (Maybe ExitCode)
commandMeterExitCode' progressparser oh mmeter meterupdate cmd params mkprocess =
outputFilter cmd params mkprocess Nothing
(const $ feedprogress mmeter zeroBytesProcessed [])
handlestderr
2014-12-17 17:21:55 +00:00
where
feedprogress sendtotalsize prev buf h = do
b <- S.hGetSome h 80 >>= copyright
if S.null b
then return ()
2014-12-17 17:21:55 +00:00
else do
unless (quietMode oh) $ do
S.hPut stdout b
hFlush stdout
let s = decodeBS b
let (mbytes, mtotalsize, buf') = progressparser (buf++s)
sendtotalsize' <- case (sendtotalsize, mtotalsize) of
2020-12-11 16:03:40 +00:00
(Just meter, Just t) -> do
setMeterTotalSize meter t
return Nothing
_ -> return sendtotalsize
2014-12-17 17:21:55 +00:00
case mbytes of
Nothing -> feedprogress sendtotalsize' prev buf' h
2014-12-17 17:21:55 +00:00
(Just bytes) -> do
when (bytes /= prev) $
meterupdate bytes
feedprogress sendtotalsize' bytes buf' h
handlestderr ph h = hGetLineUntilExitOrEOF ph h >>= \case
Just l -> do
stderrHandler oh l
handlestderr ph h
Nothing -> return ()
{- Runs a command, that may display one or more progress meters on
- either stdout or stderr, and prevents the meters from being displayed.
-
- The other command output is handled as configured by the OutputHandler.
-}
demeterCommand :: OutputHandler -> FilePath -> [CommandParam] -> IO Bool
demeterCommand oh cmd params = demeterCommandEnv oh cmd params Nothing
demeterCommandEnv :: OutputHandler -> FilePath -> [CommandParam] -> Maybe [(String, String)] -> IO Bool
2019-08-15 18:47:22 +00:00
demeterCommandEnv oh cmd params environ = do
ret <- outputFilter cmd params id environ
(\ph outh -> avoidProgress True ph outh stdouthandler)
(\ph errh -> avoidProgress True ph errh $ stderrHandler oh)
2019-08-15 18:47:22 +00:00
return $ case ret of
Just ExitSuccess -> True
_ -> False
where
stdouthandler l =
unless (quietMode oh) $
putStrLn (safeOutput l)
{- To suppress progress output, while displaying other messages,
- filter out lines that contain \r (typically used to reset to the
- beginning of the line when updating a progress display).
-}
avoidProgress :: Bool -> ProcessHandle -> Handle -> (String -> IO ()) -> IO ()
avoidProgress doavoid ph h emitter = hGetLineUntilExitOrEOF ph h >>= \case
Just s -> do
unless (doavoid && '\r' `elem` s) $
emitter s
avoidProgress doavoid ph h emitter
Nothing -> return ()
outputFilter
:: FilePath
-> [CommandParam]
-> (CreateProcess -> CreateProcess)
-> Maybe [(String, String)]
-> (ProcessHandle -> Handle -> IO ())
-> (ProcessHandle -> Handle -> IO ())
2019-08-15 18:47:22 +00:00
-> IO (Maybe ExitCode)
outputFilter cmd params mkprocess environ outfilter errfilter =
catchMaybeIO $ withCreateProcess p go
where
go _ (Just outh) (Just errh) ph = do
outt <- async $ tryIO (outfilter ph outh) >> hClose outh
errt <- async $ tryIO (errfilter ph errh) >> hClose errh
ret <- waitForProcess ph
wait outt
wait errt
return ret
go _ _ _ _ = error "internal"
p = mkprocess (proc cmd (toCommand params))
{ env = environ
, std_out = CreatePipe
, std_err = CreatePipe
}
-- | Limit a meter to only update once per unit of time.
--
-- It's nice to display the final update to 100%, even if it comes soon
-- after a previous update. To make that happen, the Meter has to know
-- its total size.
rateLimitMeterUpdate :: NominalDiffTime -> Meter -> MeterUpdate -> IO MeterUpdate
rateLimitMeterUpdate delta (Meter totalsizev _ _ _) meterupdate = do
lastupdate <- newMVar (toEnum 0 :: POSIXTime)
return $ mu lastupdate
where
mu lastupdate n@(BytesProcessed i) = readMVar totalsizev >>= \case
2020-12-11 16:03:40 +00:00
Just (TotalSize t) | i >= t -> meterupdate n
_ -> do
now <- getPOSIXTime
prev <- takeMVar lastupdate
if now - prev >= delta
then do
putMVar lastupdate now
meterupdate n
else putMVar lastupdate prev
-- | Bandwidth limiting by inserting a delay at the point that a meter is
-- updated.
--
-- This will only work when the actions that use bandwidth are run in the
-- same process and thread as the call to the MeterUpdate.
--
-- For example, if the desired bandwidth is 100kb/s, and over the past
-- 1/10th of a second, 30kb was sent, then the current bandwidth is
-- 300kb/s, 3x as fast as desired. So, after getting the next chunk,
-- pause for twice as long as it took to get it.
bwLimitMeterUpdate :: ByteSize -> Duration -> MeterUpdate -> IO MeterUpdate
bwLimitMeterUpdate bwlimit duration meterupdate
| bwlimit <= 0 = return meterupdate
| otherwise = do
nowtime <- getPOSIXTime
mv <- newMVar (nowtime, Nothing)
return (mu mv)
where
mu mv n@(BytesProcessed i) = do
endtime <- getPOSIXTime
(starttime, mprevi) <- takeMVar mv
case mprevi of
Just previ -> do
let runtime = endtime - starttime
let currbw = fromIntegral (i - previ) / runtime
let pausescale = if currbw > bwlimit'
then (currbw / bwlimit') - 1
else 0
unboundDelay (floor (runtime * pausescale * msecs))
Nothing -> return ()
meterupdate n
nowtime <- getPOSIXTime
putMVar mv (nowtime, Just i)
bwlimit' = fromIntegral (bwlimit * durationSeconds duration)
msecs = fromIntegral oneSecond
2020-12-11 16:03:40 +00:00
data Meter = Meter (MVar (Maybe TotalSize)) (MVar MeterState) (MVar String) DisplayMeter
data MeterState = MeterState
{ meterBytesProcessed :: BytesProcessed
, meterTimeStamp :: POSIXTime
} deriving (Show)
2020-12-11 16:03:40 +00:00
type DisplayMeter = MVar String -> Maybe TotalSize -> MeterState -> MeterState -> IO ()
2020-12-11 16:03:40 +00:00
type RenderMeter = Maybe TotalSize -> MeterState -> MeterState -> String
-- | Make a meter. Pass the total size, if it's known.
2020-12-11 16:03:40 +00:00
mkMeter :: Maybe TotalSize -> DisplayMeter -> IO Meter
mkMeter totalsize displaymeter = do
ts <- getPOSIXTime
Meter
<$> newMVar totalsize
<*> newMVar (MeterState zeroBytesProcessed ts)
<*> newMVar ""
<*> pure displaymeter
2020-12-11 16:03:40 +00:00
setMeterTotalSize :: Meter -> TotalSize -> IO ()
setMeterTotalSize (Meter totalsizev _ _ _) = void . swapMVar totalsizev . Just
-- | Updates the meter, displaying it if necessary.
updateMeter :: Meter -> MeterUpdate
updateMeter (Meter totalsizev sv bv displaymeter) new = do
now <- getPOSIXTime
let curms = MeterState new now
oldms <- swapMVar sv curms
when (meterBytesProcessed oldms /= new) $ do
totalsize <- readMVar totalsizev
displaymeter bv totalsize oldms curms
-- | Display meter to a Handle.
displayMeterHandle :: Handle -> RenderMeter -> DisplayMeter
displayMeterHandle h rendermeter v msize old new = do
olds <- takeMVar v
let s = rendermeter msize old new
let padding = replicate (length olds - length s) ' '
let s' = s <> padding
putMVar v s'
-- Avoid writing when the rendered meter has not changed.
when (olds /= s') $ do
hPutStr h ('\r':s')
hFlush h
-- | Clear meter displayed by displayMeterHandle. May be called before
-- outputting something else, followed by more calls to displayMeterHandle.
clearMeterHandle :: Meter -> Handle -> IO ()
clearMeterHandle (Meter _ _ v _) h = do
olds <- readMVar v
hPutStr h $ '\r' : replicate (length olds) ' ' ++ "\r"
hFlush h
-- | Display meter in the form:
-- 10% 1.3MiB 300 KiB/s 16m40s
-- or when total size is not known:
-- 1.3 MiB 300 KiB/s
bandwidthMeter :: RenderMeter
bandwidthMeter mtotalsize (MeterState (BytesProcessed old) before) (MeterState (BytesProcessed new) now) =
unwords $ catMaybes
[ Just percentamount
-- Pad enough for max width: "100% xxxx.xx KiB xxxx KiB/s"
, Just $ replicate (29 - length percentamount - length rate) ' '
, Just rate
, estimatedcompletion
]
where
amount = roughSize' committeeUnits True 2 new
percentamount = case mtotalsize of
2020-12-11 16:03:40 +00:00
Just (TotalSize totalsize) ->
let p = showPercentage 0 $
percentage totalsize (min new totalsize)
in p ++ replicate (6 - length p) ' ' ++ amount
Nothing -> amount
rate = roughSize' committeeUnits True 0 bytespersecond ++ "/s"
bytespersecond
| duration == 0 = fromIntegral transferred
| otherwise = floor $ fromIntegral transferred / duration
transferred = max 0 (new - old)
duration = max 0 (now - before)
estimatedcompletion = case mtotalsize of
2020-12-11 16:03:40 +00:00
Just (TotalSize totalsize)
| bytespersecond > 0 ->
Just $ fromDuration $ Duration $
(totalsize - new) `div` bytespersecond
_ -> Nothing
instance Proto.Serializable BytesProcessed where
serialize (BytesProcessed n) = show n
deserialize = BytesProcessed <$$> readish