2013-03-28 21:03:04 +00:00
|
|
|
{- Metered IO
|
|
|
|
-
|
2015-02-10 16:34:34 +00:00
|
|
|
- Copyright 2012-2105 Joey Hess <id@joeyh.name>
|
2013-03-28 21:03:04 +00:00
|
|
|
-
|
2014-05-10 14:01:27 +00:00
|
|
|
- License: BSD-2-clause
|
2013-03-28 21:03:04 +00:00
|
|
|
-}
|
|
|
|
|
|
|
|
{-# LANGUAGE TypeSynonymInstances #-}
|
|
|
|
|
|
|
|
module Utility.Metered where
|
|
|
|
|
|
|
|
import Common
|
|
|
|
|
|
|
|
import qualified Data.ByteString.Lazy as L
|
|
|
|
import qualified Data.ByteString as S
|
|
|
|
import System.IO.Unsafe
|
|
|
|
import Foreign.Storable (Storable(sizeOf))
|
|
|
|
import System.Posix.Types
|
2014-07-25 20:20:32 +00:00
|
|
|
import Data.Int
|
2015-02-10 16:34:34 +00:00
|
|
|
import Data.Bits.Utils
|
2013-03-28 21:03:04 +00:00
|
|
|
|
|
|
|
{- An action that can be run repeatedly, updating it on the bytes processed.
|
|
|
|
-
|
|
|
|
- Note that each call receives the total number of bytes processed, so
|
|
|
|
- far, *not* an incremental amount since the last call. -}
|
|
|
|
type MeterUpdate = (BytesProcessed -> IO ())
|
|
|
|
|
2014-08-01 19:09:49 +00:00
|
|
|
nullMeterUpdate :: MeterUpdate
|
|
|
|
nullMeterUpdate _ = return ()
|
|
|
|
|
2013-03-28 21:03:04 +00:00
|
|
|
{- Total number of bytes processed so far. -}
|
|
|
|
newtype BytesProcessed = BytesProcessed Integer
|
external special remotes mostly implemented (untested)
This has not been tested at all. It compiles!
The only known missing things are support for encryption, and for get/set
of special remote configuration, and of key state. (The latter needs
separate work to add a new per-key log file to store that state.)
Only thing I don't much like is that initremote needs to be passed both
type=external and externaltype=foo. It would be better to have just
type=foo
Most of this is quite straightforward code, that largely wrote itself given
the types. The only tricky parts were:
* Need to lock the remote when using it to eg make a request, because
in theory git-annex could have multiple threads that each try to use
a remote at the same time. I don't think that git-annex ever does
that currently, but better safe than sorry.
* Rather than starting up every external special remote program when
git-annex starts, they are started only on demand, when first used.
This will avoid slowdown, especially when running fast git-annex query
commands. Once started, they keep running until git-annex stops, currently,
which may not be ideal, but it's hard to know a better time to stop them.
* Bit of a chicken and egg problem with caching the cost of the remote,
because setting annex-cost in the git config needs the remote to already
be set up. Managed to finesse that.
This commit was sponsored by Lukas Anzinger.
2013-12-26 22:23:13 +00:00
|
|
|
deriving (Eq, Ord, Show)
|
2013-03-28 21:03:04 +00:00
|
|
|
|
|
|
|
class AsBytesProcessed a where
|
|
|
|
toBytesProcessed :: a -> BytesProcessed
|
|
|
|
fromBytesProcessed :: BytesProcessed -> a
|
|
|
|
|
2014-07-25 20:20:32 +00:00
|
|
|
instance AsBytesProcessed BytesProcessed where
|
|
|
|
toBytesProcessed = id
|
|
|
|
fromBytesProcessed = id
|
|
|
|
|
2013-03-28 21:03:04 +00:00
|
|
|
instance AsBytesProcessed Integer where
|
|
|
|
toBytesProcessed i = BytesProcessed i
|
|
|
|
fromBytesProcessed (BytesProcessed i) = i
|
|
|
|
|
|
|
|
instance AsBytesProcessed Int where
|
|
|
|
toBytesProcessed i = BytesProcessed $ toInteger i
|
|
|
|
fromBytesProcessed (BytesProcessed i) = fromInteger i
|
|
|
|
|
2014-07-25 20:20:32 +00:00
|
|
|
instance AsBytesProcessed Int64 where
|
|
|
|
toBytesProcessed i = BytesProcessed $ toInteger i
|
|
|
|
fromBytesProcessed (BytesProcessed i) = fromInteger i
|
|
|
|
|
2013-03-28 21:03:04 +00:00
|
|
|
instance AsBytesProcessed FileOffset where
|
|
|
|
toBytesProcessed sz = BytesProcessed $ toInteger sz
|
|
|
|
fromBytesProcessed (BytesProcessed sz) = fromInteger sz
|
|
|
|
|
|
|
|
addBytesProcessed :: AsBytesProcessed v => BytesProcessed -> v -> BytesProcessed
|
|
|
|
addBytesProcessed (BytesProcessed i) v =
|
|
|
|
let (BytesProcessed n) = toBytesProcessed v
|
|
|
|
in BytesProcessed $! i + n
|
|
|
|
|
|
|
|
zeroBytesProcessed :: BytesProcessed
|
|
|
|
zeroBytesProcessed = BytesProcessed 0
|
|
|
|
|
|
|
|
{- Sends the content of a file to an action, updating the meter as it's
|
|
|
|
- consumed. -}
|
|
|
|
withMeteredFile :: FilePath -> MeterUpdate -> (L.ByteString -> IO a) -> IO a
|
|
|
|
withMeteredFile f meterupdate a = withBinaryFile f ReadMode $ \h ->
|
|
|
|
hGetContentsMetered h meterupdate >>= a
|
|
|
|
|
|
|
|
{- Sends the content of a file to a Handle, updating the meter as it's
|
|
|
|
- written. -}
|
|
|
|
streamMeteredFile :: FilePath -> MeterUpdate -> Handle -> IO ()
|
|
|
|
streamMeteredFile f meterupdate h = withMeteredFile f meterupdate $ L.hPut h
|
|
|
|
|
|
|
|
{- Writes a ByteString to a Handle, updating a meter as it's written. -}
|
|
|
|
meteredWrite :: MeterUpdate -> Handle -> L.ByteString -> IO ()
|
|
|
|
meteredWrite meterupdate h = go zeroBytesProcessed . L.toChunks
|
|
|
|
where
|
|
|
|
go _ [] = return ()
|
|
|
|
go sofar (c:cs) = do
|
|
|
|
S.hPut h c
|
|
|
|
let sofar' = addBytesProcessed sofar $ S.length c
|
|
|
|
meterupdate sofar'
|
|
|
|
go sofar' cs
|
|
|
|
|
|
|
|
meteredWriteFile :: MeterUpdate -> FilePath -> L.ByteString -> IO ()
|
|
|
|
meteredWriteFile meterupdate f b = withBinaryFile f WriteMode $ \h ->
|
|
|
|
meteredWrite meterupdate h b
|
|
|
|
|
2014-07-25 20:20:32 +00:00
|
|
|
{- Applies an offset to a MeterUpdate. This can be useful when
|
|
|
|
- performing a sequence of actions, such as multiple meteredWriteFiles,
|
resume interrupted chunked downloads
Leverage the new chunked remotes to automatically resume downloads.
Sort of like rsync, although of course not as efficient since this
needs to start at a chunk boundry.
But, unlike rsync, this method will work for S3, WebDAV, external
special remotes, etc, etc. Only directory special remotes so far,
but many more soon!
This implementation will also properly handle starting a download
from one remote, interrupting, and resuming from another one, and so on.
(Resuming interrupted chunked uploads is similarly doable, although
slightly more expensive.)
This commit was sponsored by Thomas Djärv.
2014-07-27 22:52:42 +00:00
|
|
|
- that all update a common meter progressively. Or when resuming.
|
2014-07-25 20:20:32 +00:00
|
|
|
-}
|
|
|
|
offsetMeterUpdate :: MeterUpdate -> BytesProcessed -> MeterUpdate
|
|
|
|
offsetMeterUpdate base offset = \n -> base (offset `addBytesProcessed` n)
|
|
|
|
|
2013-03-28 21:03:04 +00:00
|
|
|
{- This is like L.hGetContents, but after each chunk is read, a meter
|
|
|
|
- is updated based on the size of the chunk.
|
2014-11-03 22:37:05 +00:00
|
|
|
-
|
|
|
|
- All the usual caveats about using unsafeInterleaveIO apply to the
|
|
|
|
- meter updates, so use caution.
|
|
|
|
-}
|
|
|
|
hGetContentsMetered :: Handle -> MeterUpdate -> IO L.ByteString
|
|
|
|
hGetContentsMetered h = hGetUntilMetered h (const True)
|
|
|
|
|
|
|
|
{- Reads from the Handle, updating the meter after each chunk.
|
2013-03-28 21:03:04 +00:00
|
|
|
-
|
|
|
|
- Note that the meter update is run in unsafeInterleaveIO, which means that
|
|
|
|
- it can be run at any time. It's even possible for updates to run out
|
|
|
|
- of order, as different parts of the ByteString are consumed.
|
|
|
|
-
|
2014-11-03 22:37:05 +00:00
|
|
|
- Stops at EOF, or when keepgoing evaluates to False.
|
|
|
|
- Closes the Handle at EOF, but otherwise leaves it open.
|
2013-03-28 21:03:04 +00:00
|
|
|
-}
|
2014-11-03 22:37:05 +00:00
|
|
|
hGetUntilMetered :: Handle -> (Integer -> Bool) -> MeterUpdate -> IO L.ByteString
|
|
|
|
hGetUntilMetered h keepgoing meterupdate = lazyRead zeroBytesProcessed
|
2013-03-28 21:03:04 +00:00
|
|
|
where
|
|
|
|
lazyRead sofar = unsafeInterleaveIO $ loop sofar
|
|
|
|
|
|
|
|
loop sofar = do
|
2014-11-04 00:36:11 +00:00
|
|
|
c <- S.hGet h defaultChunkSize
|
2013-03-28 21:03:04 +00:00
|
|
|
if S.null c
|
|
|
|
then do
|
|
|
|
hClose h
|
|
|
|
return $ L.empty
|
|
|
|
else do
|
2014-11-03 22:37:05 +00:00
|
|
|
let sofar' = addBytesProcessed sofar (S.length c)
|
2013-03-28 21:03:04 +00:00
|
|
|
meterupdate sofar'
|
2014-11-03 22:37:05 +00:00
|
|
|
if keepgoing (fromBytesProcessed sofar')
|
|
|
|
then do
|
|
|
|
{- unsafeInterleaveIO causes this to be
|
|
|
|
- deferred until the data is read from the
|
|
|
|
- ByteString. -}
|
|
|
|
cs <- lazyRead sofar'
|
|
|
|
return $ L.append (L.fromChunks [c]) cs
|
|
|
|
else return $ L.fromChunks [c]
|
2013-03-28 21:03:04 +00:00
|
|
|
|
|
|
|
{- Same default chunk size Lazy ByteStrings use. -}
|
|
|
|
defaultChunkSize :: Int
|
|
|
|
defaultChunkSize = 32 * k - chunkOverhead
|
|
|
|
where
|
|
|
|
k = 1024
|
|
|
|
chunkOverhead = 2 * sizeOf (undefined :: Int) -- GHC specific
|
2014-12-17 17:21:55 +00:00
|
|
|
|
|
|
|
{- Parses the String looking for a command's progress output, and returns
|
|
|
|
- Maybe the number of bytes rsynced so far, and any any remainder of the
|
|
|
|
- string that could be an incomplete progress output. That remainder
|
|
|
|
- should be prepended to future output, and fed back in. This interface
|
|
|
|
- allows the command's output to be read in any desired size chunk, or
|
|
|
|
- even one character at a time.
|
|
|
|
-}
|
|
|
|
type ProgressParser = String -> (Maybe BytesProcessed, String)
|
|
|
|
|
|
|
|
{- Runs a command and runs a ProgressParser on its output, in order
|
|
|
|
- to update the meter. The command's output is also sent to stdout. -}
|
|
|
|
commandMeter :: ProgressParser -> MeterUpdate -> FilePath -> [CommandParam] -> IO Bool
|
|
|
|
commandMeter progressparser meterupdate cmd params = liftIO $ catchBoolIO $
|
|
|
|
withHandle StdoutHandle createProcessSuccess p $
|
|
|
|
feedprogress zeroBytesProcessed []
|
|
|
|
where
|
|
|
|
p = proc cmd (toCommand params)
|
|
|
|
|
|
|
|
feedprogress prev buf h = do
|
2015-02-10 16:34:34 +00:00
|
|
|
b <- S.hGetSome h 80
|
|
|
|
if S.null b
|
2014-12-17 17:21:55 +00:00
|
|
|
then return True
|
|
|
|
else do
|
2015-02-10 16:34:34 +00:00
|
|
|
S.hPut stdout b
|
2014-12-17 17:21:55 +00:00
|
|
|
hFlush stdout
|
2015-02-10 16:34:34 +00:00
|
|
|
let s = w82s (S.unpack b)
|
2014-12-17 17:21:55 +00:00
|
|
|
let (mbytes, buf') = progressparser (buf++s)
|
|
|
|
case mbytes of
|
|
|
|
Nothing -> feedprogress prev buf' h
|
|
|
|
(Just bytes) -> do
|
|
|
|
when (bytes /= prev) $
|
|
|
|
meterupdate bytes
|
|
|
|
feedprogress bytes buf' h
|