webapp: Progess bar fixes for many types of special remotes.
There was confusion in different parts of the progress bar code about whether an update contained the total number of bytes transferred, or the number of bytes transferred since the last update. One way this bug showed up was progress bars that seemed to stick at zero for a long time. In order to fix it comprehensively, I add a new BytesProcessed data type, that is explicitly a total quantity of bytes, not a delta. Note that this doesn't necessarily fix every problem with progress bars. Particularly, buffering can now cause progress bars to seem to run ahead of transfers, reaching 100% when data is still being uploaded.
This commit is contained in:
parent
577128e9b8
commit
cf07a2c412
24 changed files with 172 additions and 129 deletions
|
@ -13,6 +13,7 @@ import Annex.Content
|
||||||
import Utility.Rsync
|
import Utility.Rsync
|
||||||
import Logs.Transfer
|
import Logs.Transfer
|
||||||
import qualified Fields
|
import qualified Fields
|
||||||
|
import Utility.Metered
|
||||||
|
|
||||||
def :: [Command]
|
def :: [Command]
|
||||||
def = [noCommit $ command "sendkey" paramKey seek
|
def = [noCommit $ command "sendkey" paramKey seek
|
||||||
|
|
|
@ -13,6 +13,7 @@ import Annex.Content
|
||||||
import Logs.Transfer
|
import Logs.Transfer
|
||||||
import Types.Key
|
import Types.Key
|
||||||
import qualified Fields
|
import qualified Fields
|
||||||
|
import Utility.Metered
|
||||||
|
|
||||||
def :: [Command]
|
def :: [Command]
|
||||||
def = [noCommit $ command "transferinfo" paramKey seek SectionPlumbing
|
def = [noCommit $ command "transferinfo" paramKey seek SectionPlumbing
|
||||||
|
@ -50,10 +51,14 @@ start (k:[]) = do
|
||||||
(update, tfile, _) <- mkProgressUpdater t info
|
(update, tfile, _) <- mkProgressUpdater t info
|
||||||
liftIO $ mapM_ void
|
liftIO $ mapM_ void
|
||||||
[ tryIO $ forever $ do
|
[ tryIO $ forever $ do
|
||||||
bytes <- readish <$> getLine
|
bytes <- readUpdate
|
||||||
maybe (error "transferinfo protocol error") update bytes
|
maybe (error "transferinfo protocol error")
|
||||||
|
(update . toBytesProcessed) bytes
|
||||||
, tryIO $ removeFile tfile
|
, tryIO $ removeFile tfile
|
||||||
, exitSuccess
|
, exitSuccess
|
||||||
]
|
]
|
||||||
stop
|
stop
|
||||||
start _ = error "wrong number of parameters"
|
start _ = error "wrong number of parameters"
|
||||||
|
|
||||||
|
readUpdate :: IO (Maybe Integer)
|
||||||
|
readUpdate = readish <$> getLine
|
||||||
|
|
|
@ -13,6 +13,7 @@ import Annex.Exception
|
||||||
import qualified Git
|
import qualified Git
|
||||||
import Types.Remote
|
import Types.Remote
|
||||||
import Types.Key
|
import Types.Key
|
||||||
|
import Utility.Metered
|
||||||
import Utility.Percentage
|
import Utility.Percentage
|
||||||
import Utility.QuickCheck
|
import Utility.QuickCheck
|
||||||
|
|
||||||
|
@ -165,12 +166,13 @@ mkProgressUpdater t info = do
|
||||||
mvar <- liftIO $ newMVar 0
|
mvar <- liftIO $ newMVar 0
|
||||||
return (liftIO . updater tfile mvar, tfile, mvar)
|
return (liftIO . updater tfile mvar, tfile, mvar)
|
||||||
where
|
where
|
||||||
updater tfile mvar bytes = modifyMVar_ mvar $ \oldbytes -> do
|
updater tfile mvar b = modifyMVar_ mvar $ \oldbytes -> do
|
||||||
if (bytes - oldbytes >= mindelta)
|
let newbytes = fromBytesProcessed b
|
||||||
|
if (newbytes - oldbytes >= mindelta)
|
||||||
then do
|
then do
|
||||||
let info' = info { bytesComplete = Just bytes }
|
let info' = info { bytesComplete = Just newbytes }
|
||||||
_ <- tryIO $ writeTransferInfoFile info' tfile
|
_ <- tryIO $ writeTransferInfoFile info' tfile
|
||||||
return bytes
|
return newbytes
|
||||||
else return oldbytes
|
else return oldbytes
|
||||||
{- The minimum change in bytesComplete that is worth
|
{- The minimum change in bytesComplete that is worth
|
||||||
- updating a transfer info file for is 1% of the total
|
- updating a transfer info file for is 1% of the total
|
||||||
|
|
|
@ -43,14 +43,15 @@ import System.Log.Logger
|
||||||
import System.Log.Formatter
|
import System.Log.Formatter
|
||||||
import System.Log.Handler (setFormatter, LogHandler)
|
import System.Log.Handler (setFormatter, LogHandler)
|
||||||
import System.Log.Handler.Simple
|
import System.Log.Handler.Simple
|
||||||
|
import qualified Data.Set as S
|
||||||
|
|
||||||
import Common
|
import Common
|
||||||
import Types
|
import Types
|
||||||
import Types.Messages
|
import Types.Messages
|
||||||
|
import qualified Messages.JSON as JSON
|
||||||
import Types.Key
|
import Types.Key
|
||||||
import qualified Annex
|
import qualified Annex
|
||||||
import qualified Messages.JSON as JSON
|
import Utility.Metered
|
||||||
import qualified Data.Set as S
|
|
||||||
|
|
||||||
showStart :: String -> String -> Annex ()
|
showStart :: String -> String -> Annex ()
|
||||||
showStart command file = handle (JSON.start command $ Just file) $
|
showStart command file = handle (JSON.start command $ Just file) $
|
||||||
|
@ -86,7 +87,7 @@ meteredBytes combinemeterupdate size a = withOutputType go
|
||||||
meter <- liftIO $ newMeter progress "B" 25 (renderNums binaryOpts 1)
|
meter <- liftIO $ newMeter progress "B" 25 (renderNums binaryOpts 1)
|
||||||
showOutput
|
showOutput
|
||||||
r <- a $ \n -> liftIO $ do
|
r <- a $ \n -> liftIO $ do
|
||||||
incrP progress n
|
setP progress $ fromBytesProcessed n
|
||||||
displayMeter stdout meter
|
displayMeter stdout meter
|
||||||
maybe noop (\m -> m n) combinemeterupdate
|
maybe noop (\m -> m n) combinemeterupdate
|
||||||
liftIO $ clearMeter stdout meter
|
liftIO $ clearMeter stdout meter
|
||||||
|
|
40
Meters.hs
40
Meters.hs
|
@ -1,40 +0,0 @@
|
||||||
{- git-annex meters
|
|
||||||
-
|
|
||||||
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
|
||||||
-
|
|
||||||
- Licensed under the GNU GPL version 3 or higher.
|
|
||||||
-}
|
|
||||||
|
|
||||||
module Meters where
|
|
||||||
|
|
||||||
import Common
|
|
||||||
import Types.Meters
|
|
||||||
import Utility.Observed
|
|
||||||
|
|
||||||
import qualified Data.ByteString.Lazy as L
|
|
||||||
import qualified Data.ByteString as S
|
|
||||||
|
|
||||||
{- Sends the content of a file to an action, updating the meter as it's
|
|
||||||
- consumed. -}
|
|
||||||
withMeteredFile :: FilePath -> MeterUpdate -> (L.ByteString -> IO a) -> IO a
|
|
||||||
withMeteredFile f meterupdate a = withBinaryFile f ReadMode $ \h ->
|
|
||||||
hGetContentsObserved h (meterupdate . toInteger) >>= a
|
|
||||||
|
|
||||||
{- Sends the content of a file to a Handle, updating the meter as it's
|
|
||||||
- written. -}
|
|
||||||
streamMeteredFile :: FilePath -> MeterUpdate -> Handle -> IO ()
|
|
||||||
streamMeteredFile f meterupdate h = withMeteredFile f meterupdate $ L.hPut h
|
|
||||||
|
|
||||||
{- Writes a ByteString to a Handle, updating a meter as it's written. -}
|
|
||||||
meteredWrite :: MeterUpdate -> Handle -> L.ByteString -> IO ()
|
|
||||||
meteredWrite meterupdate h = go . L.toChunks
|
|
||||||
where
|
|
||||||
go [] = return ()
|
|
||||||
go (c:cs) = do
|
|
||||||
S.hPut h c
|
|
||||||
meterupdate $ toInteger $ S.length c
|
|
||||||
go cs
|
|
||||||
|
|
||||||
meteredWriteFile :: MeterUpdate -> FilePath -> L.ByteString -> IO ()
|
|
||||||
meteredWriteFile meterupdate f b = withBinaryFile f WriteMode $ \h ->
|
|
||||||
meteredWrite meterupdate h b
|
|
|
@ -29,6 +29,7 @@ import Data.ByteString.Lazy.UTF8 (fromString)
|
||||||
import Data.Digest.Pure.SHA
|
import Data.Digest.Pure.SHA
|
||||||
import Utility.UserInfo
|
import Utility.UserInfo
|
||||||
import Annex.Content
|
import Annex.Content
|
||||||
|
import Utility.Metered
|
||||||
|
|
||||||
type BupRepo = String
|
type BupRepo = String
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,7 @@ import Remote.Helper.Encryptable
|
||||||
import Remote.Helper.Chunked
|
import Remote.Helper.Chunked
|
||||||
import Crypto
|
import Crypto
|
||||||
import Annex.Content
|
import Annex.Content
|
||||||
import Meters
|
import Utility.Metered
|
||||||
|
|
||||||
remote :: RemoteType
|
remote :: RemoteType
|
||||||
remote = RemoteType {
|
remote = RemoteType {
|
||||||
|
@ -154,17 +154,20 @@ storeSplit' :: MeterUpdate -> Int64 -> [FilePath] -> [S.ByteString] -> [FilePath
|
||||||
storeSplit' _ _ [] _ _ = error "ran out of dests"
|
storeSplit' _ _ [] _ _ = error "ran out of dests"
|
||||||
storeSplit' _ _ _ [] c = return $ reverse c
|
storeSplit' _ _ _ [] c = return $ reverse c
|
||||||
storeSplit' meterupdate chunksize (d:dests) bs c = do
|
storeSplit' meterupdate chunksize (d:dests) bs c = do
|
||||||
bs' <- E.bracket (openFile d WriteMode) hClose (feed chunksize bs)
|
bs' <- E.bracket (openFile d WriteMode) hClose $
|
||||||
|
feed zeroBytesProcessed chunksize bs
|
||||||
storeSplit' meterupdate chunksize dests bs' (d:c)
|
storeSplit' meterupdate chunksize dests bs' (d:c)
|
||||||
where
|
where
|
||||||
feed _ [] _ = return []
|
feed _ _ [] _ = return []
|
||||||
feed sz (l:ls) h = do
|
feed bytes sz (l:ls) h = do
|
||||||
let s = fromIntegral $ S.length l
|
let len = S.length l
|
||||||
|
let s = fromIntegral len
|
||||||
if s <= sz || sz == chunksize
|
if s <= sz || sz == chunksize
|
||||||
then do
|
then do
|
||||||
S.hPut h l
|
S.hPut h l
|
||||||
meterupdate $ toInteger s
|
let bytes' = addBytesProcessed bytes len
|
||||||
feed (sz - s) ls h
|
meterupdate bytes'
|
||||||
|
feed bytes' (sz - s) ls h
|
||||||
else return (l:ls)
|
else return (l:ls)
|
||||||
|
|
||||||
storeHelper :: FilePath -> ChunkSize -> Key -> ([FilePath] -> IO [FilePath]) -> Annex Bool
|
storeHelper :: FilePath -> ChunkSize -> Key -> ([FilePath] -> IO [FilePath]) -> Annex Bool
|
||||||
|
|
|
@ -40,6 +40,7 @@ import Init
|
||||||
import Types.Key
|
import Types.Key
|
||||||
import qualified Fields
|
import qualified Fields
|
||||||
import Logs.Location
|
import Logs.Location
|
||||||
|
import Utility.Metered
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
import Control.Concurrent.MSampleVar
|
import Control.Concurrent.MSampleVar
|
||||||
|
@ -309,7 +310,7 @@ copyFromRemote r key file dest
|
||||||
: maybe [] (\f -> [(Fields.associatedFile, f)]) file
|
: maybe [] (\f -> [(Fields.associatedFile, f)]) file
|
||||||
Just (cmd, params) <- git_annex_shell (repo r) "transferinfo"
|
Just (cmd, params) <- git_annex_shell (repo r) "transferinfo"
|
||||||
[Param $ key2file key] fields
|
[Param $ key2file key] fields
|
||||||
v <- liftIO $ newEmptySV
|
v <- liftIO $ (newEmptySV :: IO (MSampleVar Integer))
|
||||||
tid <- liftIO $ forkIO $ void $ tryIO $ do
|
tid <- liftIO $ forkIO $ void $ tryIO $ do
|
||||||
bytes <- readSV v
|
bytes <- readSV v
|
||||||
p <- createProcess $
|
p <- createProcess $
|
||||||
|
@ -325,7 +326,7 @@ copyFromRemote r key file dest
|
||||||
send bytes
|
send bytes
|
||||||
forever $
|
forever $
|
||||||
send =<< readSV v
|
send =<< readSV v
|
||||||
let feeder = writeSV v
|
let feeder = writeSV v . fromBytesProcessed
|
||||||
bracketIO noop (const $ tryIO $ killThread tid) (a feeder)
|
bracketIO noop (const $ tryIO $ killThread tid) (a feeder)
|
||||||
|
|
||||||
copyFromRemoteCheap :: Remote -> Key -> FilePath -> Annex Bool
|
copyFromRemoteCheap :: Remote -> Key -> FilePath -> Annex Bool
|
||||||
|
@ -391,13 +392,13 @@ rsyncOrCopyFile rsyncparams src dest p =
|
||||||
dorsync = rsyncHelper (Just p) $
|
dorsync = rsyncHelper (Just p) $
|
||||||
rsyncparams ++ [Param src, Param dest]
|
rsyncparams ++ [Param src, Param dest]
|
||||||
docopy = liftIO $ bracket
|
docopy = liftIO $ bracket
|
||||||
(forkIO $ watchfilesize 0)
|
(forkIO $ watchfilesize zeroBytesProcessed)
|
||||||
(void . tryIO . killThread)
|
(void . tryIO . killThread)
|
||||||
(const $ copyFileExternal src dest)
|
(const $ copyFileExternal src dest)
|
||||||
watchfilesize oldsz = do
|
watchfilesize oldsz = do
|
||||||
threadDelay 500000 -- 0.5 seconds
|
threadDelay 500000 -- 0.5 seconds
|
||||||
v <- catchMaybeIO $
|
v <- catchMaybeIO $
|
||||||
fromIntegral . fileSize
|
toBytesProcessed . fileSize
|
||||||
<$> getFileStatus dest
|
<$> getFileStatus dest
|
||||||
case v of
|
case v of
|
||||||
Just sz
|
Just sz
|
||||||
|
|
|
@ -22,7 +22,7 @@ import Remote.Helper.Encryptable
|
||||||
import qualified Remote.Helper.AWS as AWS
|
import qualified Remote.Helper.AWS as AWS
|
||||||
import Crypto
|
import Crypto
|
||||||
import Creds
|
import Creds
|
||||||
import Meters
|
import Utility.Metered
|
||||||
import qualified Annex
|
import qualified Annex
|
||||||
import Annex.Content
|
import Annex.Content
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,7 @@ module Remote.Helper.Chunked where
|
||||||
import Common.Annex
|
import Common.Annex
|
||||||
import Utility.DataUnits
|
import Utility.DataUnits
|
||||||
import Types.Remote
|
import Types.Remote
|
||||||
import Meters
|
import Utility.Metered
|
||||||
|
|
||||||
import qualified Data.Map as M
|
import qualified Data.Map as M
|
||||||
import qualified Data.ByteString.Lazy as L
|
import qualified Data.ByteString.Lazy as L
|
||||||
|
|
|
@ -15,6 +15,7 @@ import Crypto
|
||||||
import qualified Annex
|
import qualified Annex
|
||||||
import Config.Cost
|
import Config.Cost
|
||||||
import Utility.Base64
|
import Utility.Base64
|
||||||
|
import Utility.Metered
|
||||||
|
|
||||||
{- Encryption setup for a remote. The user must specify whether to use
|
{- Encryption setup for a remote. The user must specify whether to use
|
||||||
- an encryption key, or not encrypt. An encrypted cipher is created, or is
|
- an encryption key, or not encrypt. An encrypted cipher is created, or is
|
||||||
|
|
|
@ -21,6 +21,7 @@ import Annex.Content
|
||||||
import Remote.Helper.Special
|
import Remote.Helper.Special
|
||||||
import Remote.Helper.Encryptable
|
import Remote.Helper.Encryptable
|
||||||
import Crypto
|
import Crypto
|
||||||
|
import Utility.Metered
|
||||||
|
|
||||||
remote :: RemoteType
|
remote :: RemoteType
|
||||||
remote = RemoteType {
|
remote = RemoteType {
|
||||||
|
|
|
@ -22,6 +22,7 @@ import Remote.Helper.Encryptable
|
||||||
import Crypto
|
import Crypto
|
||||||
import Utility.Rsync
|
import Utility.Rsync
|
||||||
import Utility.CopyFile
|
import Utility.CopyFile
|
||||||
|
import Utility.Metered
|
||||||
import Annex.Perms
|
import Annex.Perms
|
||||||
|
|
||||||
type RsyncUrl = String
|
type RsyncUrl = String
|
||||||
|
|
|
@ -27,7 +27,7 @@ import Remote.Helper.Encryptable
|
||||||
import qualified Remote.Helper.AWS as AWS
|
import qualified Remote.Helper.AWS as AWS
|
||||||
import Crypto
|
import Crypto
|
||||||
import Creds
|
import Creds
|
||||||
import Meters
|
import Utility.Metered
|
||||||
import Annex.Content
|
import Annex.Content
|
||||||
|
|
||||||
remote :: RemoteType
|
remote :: RemoteType
|
||||||
|
|
|
@ -17,6 +17,7 @@ import Config.Cost
|
||||||
import Logs.Web
|
import Logs.Web
|
||||||
import qualified Utility.Url as Url
|
import qualified Utility.Url as Url
|
||||||
import Types.Key
|
import Types.Key
|
||||||
|
import Utility.Metered
|
||||||
|
|
||||||
import qualified Data.Map as M
|
import qualified Data.Map as M
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,7 @@ import Remote.Helper.Encryptable
|
||||||
import Remote.Helper.Chunked
|
import Remote.Helper.Chunked
|
||||||
import Crypto
|
import Crypto
|
||||||
import Creds
|
import Creds
|
||||||
import Meters
|
import Utility.Metered
|
||||||
import Annex.Content
|
import Annex.Content
|
||||||
|
|
||||||
type DavUrl = String
|
type DavUrl = String
|
||||||
|
|
4
Types.hs
4
Types.hs
|
@ -14,8 +14,7 @@ module Types (
|
||||||
RemoteGitConfig(..),
|
RemoteGitConfig(..),
|
||||||
Remote,
|
Remote,
|
||||||
RemoteType,
|
RemoteType,
|
||||||
Option,
|
Option
|
||||||
MeterUpdate
|
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Annex
|
import Annex
|
||||||
|
@ -25,7 +24,6 @@ import Types.Key
|
||||||
import Types.UUID
|
import Types.UUID
|
||||||
import Types.Remote
|
import Types.Remote
|
||||||
import Types.Option
|
import Types.Option
|
||||||
import Types.Meters
|
|
||||||
|
|
||||||
type Backend = BackendA Annex
|
type Backend = BackendA Annex
|
||||||
type Remote = RemoteA Annex
|
type Remote = RemoteA Annex
|
||||||
|
|
|
@ -1,12 +0,0 @@
|
||||||
{- git-annex meter types
|
|
||||||
-
|
|
||||||
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
|
||||||
-
|
|
||||||
- Licensed under the GNU GPL version 3 or higher.
|
|
||||||
-}
|
|
||||||
|
|
||||||
module Types.Meters where
|
|
||||||
|
|
||||||
{- An action that can be run repeatedly, feeding it the number of
|
|
||||||
- bytes sent or retrieved so far. -}
|
|
||||||
type MeterUpdate = (Integer -> IO ())
|
|
|
@ -15,9 +15,9 @@ import Data.Ord
|
||||||
import qualified Git
|
import qualified Git
|
||||||
import Types.Key
|
import Types.Key
|
||||||
import Types.UUID
|
import Types.UUID
|
||||||
import Types.Meters
|
|
||||||
import Types.GitConfig
|
import Types.GitConfig
|
||||||
import Config.Cost
|
import Config.Cost
|
||||||
|
import Utility.Metered
|
||||||
|
|
||||||
type RemoteConfigKey = String
|
type RemoteConfigKey = String
|
||||||
type RemoteConfig = M.Map RemoteConfigKey String
|
type RemoteConfig = M.Map RemoteConfigKey String
|
||||||
|
|
116
Utility/Metered.hs
Normal file
116
Utility/Metered.hs
Normal file
|
@ -0,0 +1,116 @@
|
||||||
|
{- Metered IO
|
||||||
|
-
|
||||||
|
- Copyright 2012, 2013 Joey Hess <joey@kitenet.net>
|
||||||
|
-
|
||||||
|
- Licensed under the GNU GPL version 3 or higher.
|
||||||
|
-}
|
||||||
|
|
||||||
|
{-# LANGUAGE TypeSynonymInstances #-}
|
||||||
|
|
||||||
|
module Utility.Metered where
|
||||||
|
|
||||||
|
import Common
|
||||||
|
|
||||||
|
import qualified Data.ByteString.Lazy as L
|
||||||
|
import qualified Data.ByteString as S
|
||||||
|
import System.IO.Unsafe
|
||||||
|
import Foreign.Storable (Storable(sizeOf))
|
||||||
|
import System.Posix.Types
|
||||||
|
|
||||||
|
{- An action that can be run repeatedly, updating it on the bytes processed.
|
||||||
|
-
|
||||||
|
- Note that each call receives the total number of bytes processed, so
|
||||||
|
- far, *not* an incremental amount since the last call. -}
|
||||||
|
type MeterUpdate = (BytesProcessed -> IO ())
|
||||||
|
|
||||||
|
{- Total number of bytes processed so far. -}
|
||||||
|
newtype BytesProcessed = BytesProcessed Integer
|
||||||
|
deriving (Eq, Ord)
|
||||||
|
|
||||||
|
class AsBytesProcessed a where
|
||||||
|
toBytesProcessed :: a -> BytesProcessed
|
||||||
|
fromBytesProcessed :: BytesProcessed -> a
|
||||||
|
|
||||||
|
instance AsBytesProcessed Integer where
|
||||||
|
toBytesProcessed i = BytesProcessed i
|
||||||
|
fromBytesProcessed (BytesProcessed i) = i
|
||||||
|
|
||||||
|
instance AsBytesProcessed Int where
|
||||||
|
toBytesProcessed i = BytesProcessed $ toInteger i
|
||||||
|
fromBytesProcessed (BytesProcessed i) = fromInteger i
|
||||||
|
|
||||||
|
instance AsBytesProcessed FileOffset where
|
||||||
|
toBytesProcessed sz = BytesProcessed $ toInteger sz
|
||||||
|
fromBytesProcessed (BytesProcessed sz) = fromInteger sz
|
||||||
|
|
||||||
|
addBytesProcessed :: AsBytesProcessed v => BytesProcessed -> v -> BytesProcessed
|
||||||
|
addBytesProcessed (BytesProcessed i) v =
|
||||||
|
let (BytesProcessed n) = toBytesProcessed v
|
||||||
|
in BytesProcessed $! i + n
|
||||||
|
|
||||||
|
zeroBytesProcessed :: BytesProcessed
|
||||||
|
zeroBytesProcessed = BytesProcessed 0
|
||||||
|
|
||||||
|
{- Sends the content of a file to an action, updating the meter as it's
|
||||||
|
- consumed. -}
|
||||||
|
withMeteredFile :: FilePath -> MeterUpdate -> (L.ByteString -> IO a) -> IO a
|
||||||
|
withMeteredFile f meterupdate a = withBinaryFile f ReadMode $ \h ->
|
||||||
|
hGetContentsMetered h meterupdate >>= a
|
||||||
|
|
||||||
|
{- Sends the content of a file to a Handle, updating the meter as it's
|
||||||
|
- written. -}
|
||||||
|
streamMeteredFile :: FilePath -> MeterUpdate -> Handle -> IO ()
|
||||||
|
streamMeteredFile f meterupdate h = withMeteredFile f meterupdate $ L.hPut h
|
||||||
|
|
||||||
|
{- Writes a ByteString to a Handle, updating a meter as it's written. -}
|
||||||
|
meteredWrite :: MeterUpdate -> Handle -> L.ByteString -> IO ()
|
||||||
|
meteredWrite meterupdate h = go zeroBytesProcessed . L.toChunks
|
||||||
|
where
|
||||||
|
go _ [] = return ()
|
||||||
|
go sofar (c:cs) = do
|
||||||
|
S.hPut h c
|
||||||
|
let sofar' = addBytesProcessed sofar $ S.length c
|
||||||
|
meterupdate sofar'
|
||||||
|
go sofar' cs
|
||||||
|
|
||||||
|
meteredWriteFile :: MeterUpdate -> FilePath -> L.ByteString -> IO ()
|
||||||
|
meteredWriteFile meterupdate f b = withBinaryFile f WriteMode $ \h ->
|
||||||
|
meteredWrite meterupdate h b
|
||||||
|
|
||||||
|
{- This is like L.hGetContents, but after each chunk is read, a meter
|
||||||
|
- is updated based on the size of the chunk.
|
||||||
|
-
|
||||||
|
- Note that the meter update is run in unsafeInterleaveIO, which means that
|
||||||
|
- it can be run at any time. It's even possible for updates to run out
|
||||||
|
- of order, as different parts of the ByteString are consumed.
|
||||||
|
-
|
||||||
|
- All the usual caveats about using unsafeInterleaveIO apply to the
|
||||||
|
- meter updates, so use caution.
|
||||||
|
-}
|
||||||
|
hGetContentsMetered :: Handle -> MeterUpdate -> IO L.ByteString
|
||||||
|
hGetContentsMetered h meterupdate = lazyRead zeroBytesProcessed
|
||||||
|
where
|
||||||
|
lazyRead sofar = unsafeInterleaveIO $ loop sofar
|
||||||
|
|
||||||
|
loop sofar = do
|
||||||
|
c <- S.hGetSome h defaultChunkSize
|
||||||
|
if S.null c
|
||||||
|
then do
|
||||||
|
hClose h
|
||||||
|
return $ L.empty
|
||||||
|
else do
|
||||||
|
let sofar' = addBytesProcessed sofar $
|
||||||
|
S.length c
|
||||||
|
meterupdate sofar'
|
||||||
|
{- unsafeInterleaveIO causes this to be
|
||||||
|
- deferred until the data is read from the
|
||||||
|
- ByteString. -}
|
||||||
|
cs <- lazyRead sofar'
|
||||||
|
return $ L.append (L.fromChunks [c]) cs
|
||||||
|
|
||||||
|
{- Same default chunk size Lazy ByteStrings use. -}
|
||||||
|
defaultChunkSize :: Int
|
||||||
|
defaultChunkSize = 32 * k - chunkOverhead
|
||||||
|
where
|
||||||
|
k = 1024
|
||||||
|
chunkOverhead = 2 * sizeOf (undefined :: Int) -- GHC specific
|
|
@ -1,43 +0,0 @@
|
||||||
module Utility.Observed where
|
|
||||||
|
|
||||||
import qualified Data.ByteString.Lazy as L
|
|
||||||
import qualified Data.ByteString as S
|
|
||||||
import System.IO
|
|
||||||
import System.IO.Unsafe
|
|
||||||
import Foreign.Storable (Storable(sizeOf))
|
|
||||||
|
|
||||||
{- This is like L.hGetContents, but after each chunk is read, an action
|
|
||||||
- is run to observe the size of the chunk.
|
|
||||||
-
|
|
||||||
- Note that the observer is run in unsafeInterleaveIO, which means that
|
|
||||||
- it can be run at any time. It's even possible for observers to run out
|
|
||||||
- of order, as different parts of the ByteString are consumed.
|
|
||||||
-
|
|
||||||
- All the usual caveats about using unsafeInterleaveIO apply to the observers,
|
|
||||||
- so use caution.
|
|
||||||
-}
|
|
||||||
hGetContentsObserved :: Handle -> (Int -> IO ()) -> IO L.ByteString
|
|
||||||
hGetContentsObserved h observe = lazyRead
|
|
||||||
where
|
|
||||||
lazyRead = unsafeInterleaveIO loop
|
|
||||||
|
|
||||||
loop = do
|
|
||||||
c <- S.hGetSome h defaultChunkSize
|
|
||||||
if S.null c
|
|
||||||
then do
|
|
||||||
hClose h
|
|
||||||
return $ L.empty
|
|
||||||
else do
|
|
||||||
observe $ S.length c
|
|
||||||
{- unsafeInterleaveIO causes this to be
|
|
||||||
- deferred until the data is read from the
|
|
||||||
- ByteString. -}
|
|
||||||
cs <- lazyRead
|
|
||||||
return $ L.append (L.fromChunks [c]) cs
|
|
||||||
|
|
||||||
{- Same default chunk size Lazy ByteStrings use. -}
|
|
||||||
defaultChunkSize :: Int
|
|
||||||
defaultChunkSize = 32 * k - chunkOverhead
|
|
||||||
where
|
|
||||||
k = 1024
|
|
||||||
chunkOverhead = 2 * sizeOf (undefined :: Int) -- GHC specific
|
|
|
@ -8,6 +8,7 @@
|
||||||
module Utility.Rsync where
|
module Utility.Rsync where
|
||||||
|
|
||||||
import Common
|
import Common
|
||||||
|
import Utility.Metered
|
||||||
|
|
||||||
import Data.Char
|
import Data.Char
|
||||||
|
|
||||||
|
@ -44,14 +45,13 @@ rsyncServerParams =
|
||||||
rsync :: [CommandParam] -> IO Bool
|
rsync :: [CommandParam] -> IO Bool
|
||||||
rsync = boolSystem "rsync"
|
rsync = boolSystem "rsync"
|
||||||
|
|
||||||
{- Runs rsync, but intercepts its progress output and feeds bytes
|
{- Runs rsync, but intercepts its progress output and updates a meter.
|
||||||
- complete values into the callback. The progress output is also output
|
- The progress output is also output to stdout.
|
||||||
- to stdout.
|
|
||||||
-
|
-
|
||||||
- The params must enable rsync's --progress mode for this to work.
|
- The params must enable rsync's --progress mode for this to work.
|
||||||
-}
|
-}
|
||||||
rsyncProgress :: (Integer -> IO ()) -> [CommandParam] -> IO Bool
|
rsyncProgress :: MeterUpdate -> [CommandParam] -> IO Bool
|
||||||
rsyncProgress callback params = do
|
rsyncProgress meterupdate params = do
|
||||||
r <- withHandle StdoutHandle createProcessSuccess p (feedprogress 0 [])
|
r <- withHandle StdoutHandle createProcessSuccess p (feedprogress 0 [])
|
||||||
{- For an unknown reason, piping rsync's output like this does
|
{- For an unknown reason, piping rsync's output like this does
|
||||||
- causes it to run a second ssh process, which it neglects to wait
|
- causes it to run a second ssh process, which it neglects to wait
|
||||||
|
@ -72,7 +72,7 @@ rsyncProgress callback params = do
|
||||||
Nothing -> feedprogress prev buf' h
|
Nothing -> feedprogress prev buf' h
|
||||||
(Just bytes) -> do
|
(Just bytes) -> do
|
||||||
when (bytes /= prev) $
|
when (bytes /= prev) $
|
||||||
callback bytes
|
meterupdate $ toBytesProcessed bytes
|
||||||
feedprogress bytes buf' h
|
feedprogress bytes buf' h
|
||||||
|
|
||||||
{- Checks if an rsync url involves the remote shell (ssh or rsh).
|
{- Checks if an rsync url involves the remote shell (ssh or rsh).
|
||||||
|
|
1
debian/changelog
vendored
1
debian/changelog
vendored
|
@ -4,6 +4,7 @@ git-annex (4.20130324) UNRELEASED; urgency=low
|
||||||
* Per-command usage messages.
|
* Per-command usage messages.
|
||||||
* webapp: Fix a race that sometimes caused alerts or other notifications
|
* webapp: Fix a race that sometimes caused alerts or other notifications
|
||||||
to be missed if they occurred while a page was loading.
|
to be missed if they occurred while a page was loading.
|
||||||
|
* webapp: Progess bar fixes for many types of special remotes.
|
||||||
|
|
||||||
-- Joey Hess <joeyh@debian.org> Mon, 25 Mar 2013 10:21:46 -0400
|
-- Joey Hess <joeyh@debian.org> Mon, 25 Mar 2013 10:21:46 -0400
|
||||||
|
|
||||||
|
|
|
@ -19,3 +19,8 @@ I expect a changing status bar and percentage. Instead I see no changes when an
|
||||||
When uploading local data to an S3 remote, I see no progress bars. The progress bar area on active uploads stays the same grey as the bar on queued uploads. The status does not change from "0% of...". The uploads are completing, but this makes it very difficult to judge their activity.
|
When uploading local data to an S3 remote, I see no progress bars. The progress bar area on active uploads stays the same grey as the bar on queued uploads. The status does not change from "0% of...". The uploads are completing, but this makes it very difficult to judge their activity.
|
||||||
|
|
||||||
The only remotes I currently have setup are S3 special remotes, so I cannot say whether progress bars are working for uploads to other remote types.
|
The only remotes I currently have setup are S3 special remotes, so I cannot say whether progress bars are working for uploads to other remote types.
|
||||||
|
|
||||||
|
> [[done]], this turned out to be a confusion in the progress code;
|
||||||
|
> parts were expecting a full number of bytes since the start, while
|
||||||
|
> other parts were sending the number of bytes in a chunk. Result was
|
||||||
|
> progress bars stuck at 0% often. --[[Joey]]
|
||||||
|
|
Loading…
Reference in a new issue