Merge branch 'bwlimit'

This commit is contained in:
Joey Hess 2021-09-22 15:27:28 -04:00
commit fc9abca231
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
16 changed files with 149 additions and 47 deletions

View file

@ -461,8 +461,9 @@ importKeys remote importtreeconfig importcontent thirdpartypopulated importablec
, providedMimeEncoding = Nothing
, providedLinkType = Nothing
}
let bwlimit = remoteAnnexBwLimit (Remote.gitconfig remote)
islargefile <- checkMatcher' matcher mi mempty
metered Nothing sz $ const $ if islargefile
metered Nothing sz bwlimit $ const $ if islargefile
then doimportlarge importkey cidmap db loc cid sz f
else doimportsmall cidmap db loc cid sz
@ -557,11 +558,12 @@ importKeys remote importtreeconfig importcontent thirdpartypopulated importablec
Left e -> do
warning (show e)
return Nothing
let bwlimit = remoteAnnexBwLimit (Remote.gitconfig remote)
checkDiskSpaceToGet tmpkey Nothing $
notifyTransfer Download af $
download' (Remote.uuid remote) tmpkey af Nothing stdRetry $ \p ->
withTmp tmpkey $ \tmpfile ->
metered (Just p) tmpkey $
metered (Just p) tmpkey bwlimit $
const (rundownload tmpfile)
where
tmpkey = importKey cid sz

View file

@ -22,7 +22,7 @@ import Control.Monad.IO.Class (MonadIO)
detectStalls :: (Monad m, MonadIO m) => Maybe StallDetection -> TVar (Maybe BytesProcessed) -> m () -> m ()
detectStalls Nothing _ _ = noop
detectStalls (Just StallDetectionDisabled) _ _ = noop
detectStalls (Just (StallDetection minsz duration)) metervar onstall =
detectStalls (Just (StallDetection (BwRate minsz duration))) metervar onstall =
detectStalls' minsz duration metervar onstall Nothing
detectStalls (Just ProbeStallDetection) metervar onstall = do
-- Only do stall detection once the progress is confirmed to be

View file

@ -96,7 +96,7 @@ youtubeDl' url workdir p uo
-- with the size, which is why it's important the
-- meter is passed into commandMeter'
let unknownsize = Nothing :: Maybe FileSize
ok <- metered (Just p) unknownsize $ \meter meterupdate ->
ok <- metered (Just p) unknownsize Nothing $ \meter meterupdate ->
liftIO $ commandMeter'
parseYoutubeDlProgress oh (Just meter) meterupdate cmd opts
(\pr -> pr { cwd = Just workdir })

View file

@ -1,5 +1,8 @@
git-annex (8.20210904) UNRELEASED; urgency=medium
* Added annex.bwlimit and remote.name.annex-bwlimit config to limit
the bandwidth of transfers. It works for git remotes and many
but not all special remotes.
* Bug fix: Git configs such as annex.verify were incorrectly overriding
per-remote git configs such as remote.name.annex-verify.
(Reversion in version 4.20130323)

View file

@ -192,7 +192,7 @@ perform o file addunlockedmatcher = withOtherTmp $ \tmpdir -> do
}
ld <- lockDown cfg (fromRawFilePath file)
let sizer = keySource <$> ld
v <- metered Nothing sizer $ \_meter meterupdate ->
v <- metered Nothing sizer Nothing $ \_meter meterupdate ->
ingestAdd (checkGitIgnoreOption o) meterupdate ld
finish v
where

View file

@ -1,6 +1,6 @@
{- git-annex progress output
-
- Copyright 2010-2020 Joey Hess <id@joeyh.name>
- Copyright 2010-2021 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
@ -18,6 +18,7 @@ import Types
import Types.Messages
import Types.Key
import Types.KeySource
import Types.StallDetection (BwRate(..))
import Utility.InodeCache
import qualified Messages.JSON as JSON
import Messages.Concurrent
@ -72,11 +73,12 @@ metered
:: MeterSize sizer
=> Maybe MeterUpdate
-> sizer
-> Maybe BwRate
-> (Meter -> MeterUpdate -> Annex a)
-> Annex a
metered othermeter sizer a = withMessageState $ \st -> do
metered othermeterupdate sizer bwlimit a = withMessageState $ \st -> do
sz <- getMeterSize sizer
metered' st setclear othermeter sz showOutput a
metered' st setclear othermeterupdate sz bwlimit showOutput a
where
setclear c = Annex.changeState $ \st -> st
{ Annex.output = (Annex.output st) { clearProgressMeter = c } }
@ -90,11 +92,12 @@ metered'
-- NormalOutput.
-> Maybe MeterUpdate
-> Maybe TotalSize
-> Maybe BwRate
-> m ()
-- ^ this should run showOutput
-> (Meter -> MeterUpdate -> m a)
-> m a
metered' st setclear othermeter msize showoutput a = go st
metered' st setclear othermeterupdate msize bwlimit showoutput a = go st
where
go (MessageState { outputType = QuietOutput }) = nometer
go (MessageState { outputType = NormalOutput, concurrentOutputEnabled = False }) = do
@ -105,7 +108,7 @@ metered' st setclear othermeter msize showoutput a = go st
setclear clear
m <- liftIO $ rateLimitMeterUpdate consoleratelimit meter $
updateMeter meter
r <- a meter (combinemeter m)
r <- a meter =<< mkmeterupdate m
setclear noop
liftIO clear
return r
@ -116,7 +119,7 @@ metered' st setclear othermeter msize showoutput a = go st
in Regions.setConsoleRegion r ('\n' : s)
m <- liftIO $ rateLimitMeterUpdate consoleratelimit meter $
updateMeter meter
a meter (combinemeter m)
a meter =<< mkmeterupdate m
go (MessageState { outputType = JSONOutput jsonoptions })
| jsonProgress jsonoptions = do
let buf = jsonBuffer st
@ -124,7 +127,7 @@ metered' st setclear othermeter msize showoutput a = go st
JSON.progress buf msize' (meterBytesProcessed new)
m <- liftIO $ rateLimitMeterUpdate jsonratelimit meter $
updateMeter meter
a meter (combinemeter m)
a meter =<< mkmeterupdate m
| otherwise = nometer
go (MessageState { outputType = SerializedOutput h _ }) = do
liftIO $ outputSerialized h BeginProgressMeter
@ -144,16 +147,21 @@ metered' st setclear othermeter msize showoutput a = go st
meterBytesProcessed new
m <- liftIO $ rateLimitMeterUpdate minratelimit meter $
updateMeter meter
a meter (combinemeter m)
(a meter =<< mkmeterupdate m)
`finally` (liftIO $ outputSerialized h EndProgressMeter)
nometer = do
dummymeter <- liftIO $ mkMeter Nothing $
\_ _ _ _ -> return ()
a dummymeter (combinemeter (const noop))
a dummymeter =<< mkmeterupdate (const noop)
combinemeter m = case othermeter of
Nothing -> m
Just om -> combineMeterUpdate m om
mkmeterupdate m =
let mu = case othermeterupdate of
Nothing -> m
Just om -> combineMeterUpdate m om
in case bwlimit of
Nothing -> return mu
Just (BwRate sz duration) -> liftIO $
bwLimitMeterUpdate sz duration mu
consoleratelimit = 0.2
@ -164,7 +172,7 @@ metered' st setclear othermeter msize showoutput a = go st
{- Poll file size to display meter. -}
meteredFile :: FilePath -> Maybe MeterUpdate -> Key -> Annex a -> Annex a
meteredFile file combinemeterupdate key a =
metered combinemeterupdate key $ \_ p ->
metered combinemeterupdate key Nothing $ \_ p ->
watchFileSize file p a
{- Progress dots. -}

View file

@ -68,7 +68,7 @@ relaySerializedOutput getso sendsor meterreport runannex = go Nothing
let setclear = const noop
-- Display a progress meter while running, until
-- the meter ends or a final value is returned.
metered' ost setclear Nothing Nothing (runannex showOutput)
metered' ost setclear Nothing Nothing Nothing (runannex showOutput)
(\meter meterupdate -> loop (Just (meter, meterupdate)))
>>= \case
Right r -> return (Right r)

View file

@ -550,6 +550,7 @@ copyFromRemote'' repo forcersync r st@(State connpool _ _ _ _) key file dest met
| not $ Git.repoIsUrl repo = guardUsable repo (giveup "cannot access remote") $ do
u <- getUUID
hardlink <- wantHardLink
let bwlimit = remoteAnnexBwLimit (gitconfig r)
-- run copy from perspective of remote
onLocalFast st $ Annex.Content.prepSendAnnex' key >>= \case
Just (object, check) -> do
@ -559,7 +560,7 @@ copyFromRemote'' repo forcersync r st@(State connpool _ _ _ _) key file dest met
copier <- mkFileCopier hardlink st
(ok, v) <- runTransfer (Transfer Download u (fromKey id key))
file Nothing stdRetry $ \p ->
metered (Just (combineMeterUpdate p meterupdate)) key $ \_ p' ->
metered (Just (combineMeterUpdate p meterupdate)) key bwlimit $ \_ p' ->
copier object dest key p' checksuccess vc
if ok
then return v
@ -572,6 +573,7 @@ copyFromRemote'' repo forcersync r st@(State connpool _ _ _ _) key file dest met
then return v
else giveup "failed to retrieve content from remote"
else P2PHelper.retrieve
(gitconfig r)
(\p -> Ssh.runProto r connpool (return (False, UnVerified)) (fallback p))
key file dest meterupdate vc
| otherwise = giveup "copying from non-ssh, non-http remote not supported"
@ -680,7 +682,7 @@ copyToRemote' repo r st@(State connpool duc _ _ _) key file meterupdate
, giveup "remote does not have expected annex.uuid value"
)
| Git.repoIsSsh repo = commitOnCleanup repo r st $
P2PHelper.store
P2PHelper.store (gitconfig r)
(Ssh.runProto r connpool (return False) . copyremotefallback)
key file meterupdate
@ -694,6 +696,7 @@ copyToRemote' repo r st@(State connpool duc _ _ _) key file meterupdate
checkio <- Annex.withCurrentState check
u <- getUUID
hardlink <- wantHardLink
let bwlimit = remoteAnnexBwLimit (gitconfig r)
-- run copy from perspective of remote
res <- onLocalFast st $ ifM (Annex.Content.inAnnex key)
( return True
@ -705,7 +708,7 @@ copyToRemote' repo r st@(State connpool duc _ _ _) key file meterupdate
Just err -> giveup err
Nothing -> return True
res <- logStatusAfter key $ Annex.Content.getViaTmp rsp verify key file $ \dest ->
metered (Just (combineMeterUpdate meterupdate p)) key $ \_ p' ->
metered (Just (combineMeterUpdate meterupdate p)) key bwlimit $ \_ p' ->
copier object (fromRawFilePath dest) key p' checksuccess verify
Annex.Content.saveState True
return res

View file

@ -31,19 +31,21 @@ type ProtoConnRunner c = forall a. P2P.Proto a -> ClosableConnection c -> Annex
-- the pool when done.
type WithConn a c = (ClosableConnection c -> Annex (ClosableConnection c, a)) -> Annex a
store :: (MeterUpdate -> ProtoRunner Bool) -> Key -> AssociatedFile -> MeterUpdate -> Annex ()
store runner k af p = do
store :: RemoteGitConfig -> (MeterUpdate -> ProtoRunner Bool) -> Key -> AssociatedFile -> MeterUpdate -> Annex ()
store gc runner k af p = do
let sizer = KeySizer k (fmap (toRawFilePath . fst) <$> prepSendAnnex k)
metered (Just p) sizer $ \_ p' ->
let bwlimit = remoteAnnexBwLimit gc
metered (Just p) sizer bwlimit $ \_ p' ->
runner p' (P2P.put k af p') >>= \case
Just True -> return ()
Just False -> giveup "Transfer failed"
Nothing -> remoteUnavail
retrieve :: (MeterUpdate -> ProtoRunner (Bool, Verification)) -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> VerifyConfig -> Annex Verification
retrieve runner k af dest p verifyconfig = do
retrieve :: RemoteGitConfig -> (MeterUpdate -> ProtoRunner (Bool, Verification)) -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> VerifyConfig -> Annex Verification
retrieve gc runner k af dest p verifyconfig = do
iv <- startVerifyKeyContentIncrementally verifyconfig k
metered (Just p) k $ \m p' ->
let bwlimit = remoteAnnexBwLimit gc
metered (Just p) k bwlimit $ \m p' ->
runner p' (P2P.get dest k iv af m p') >>= \case
Just (True, v) -> return v
Just (False, _) -> giveup "Transfer failed"

View file

@ -261,8 +261,9 @@ specialRemote' cfg c storer retriever remover checkpresent baser = encr
chunkconfig = chunkConfig cfg
displayprogress p k srcfile a
| displayProgress cfg =
metered (Just p) (KeySizer k (pure (fmap toRawFilePath srcfile))) (const a)
| displayProgress cfg = do
let bwlimit = remoteAnnexBwLimit (gitconfig baser)
metered (Just p) (KeySizer k (pure (fmap toRawFilePath srcfile))) bwlimit (const a)
| otherwise = a p
withBytes :: ContentSource -> (L.ByteString -> Annex a) -> Annex a

View file

@ -55,8 +55,8 @@ chainGen addr r u rc gc rs = do
{ uuid = u
, cost = cst
, name = Git.repoDescribe r
, storeKey = store (const protorunner)
, retrieveKeyFile = retrieve (const protorunner)
, storeKey = store gc (const protorunner)
, retrieveKeyFile = retrieve gc (const protorunner)
, retrieveKeyFileCheap = Nothing
, retrievalSecurityPolicy = RetrievalAllKeysSecure
, removeKey = remove protorunner

View file

@ -339,6 +339,7 @@ data RemoteGitConfig = RemoteGitConfig
, remoteAnnexForwardRetry :: Maybe Integer
, remoteAnnexRetryDelay :: Maybe Seconds
, remoteAnnexStallDetection :: Maybe StallDetection
, remoteAnnexBwLimit :: Maybe BwRate
, remoteAnnexAllowUnverifiedDownloads :: Bool
, remoteAnnexConfigUUID :: Maybe UUID
@ -404,8 +405,11 @@ extractRemoteGitConfig r remotename = do
, remoteAnnexRetryDelay = Seconds
<$> getmayberead "retrydelay"
, remoteAnnexStallDetection =
either (const Nothing) id . parseStallDetection
either (const Nothing) Just . parseStallDetection
=<< getmaybe "stalldetection"
, remoteAnnexBwLimit = do
sz <- readSize dataUnits =<< getmaybe "bwlimit"
return (BwRate sz (Duration 1))
, remoteAnnexAllowUnverifiedDownloads = (== Just "ACKTHPPT") $
getmaybe ("security-allow-unverified-downloads")
, remoteAnnexConfigUUID = toUUID <$> getmaybe "config-uuid"

View file

@ -1,4 +1,4 @@
{- types for stall detection
{- types for stall detection and banwdith rates
-
- Copyright 2020-2021 Joey Hess <id@joeyh.name>
-
@ -13,7 +13,7 @@ import Utility.Misc
import Git.Config
data StallDetection
= StallDetection ByteSize Duration
= StallDetection BwRate
-- ^ Unless the given number of bytes have been sent over the given
-- amount of time, there's a stall.
| ProbeStallDetection
@ -22,21 +22,29 @@ data StallDetection
| StallDetectionDisabled
deriving (Show)
data BwRate = BwRate ByteSize Duration
deriving (Show)
-- Parse eg, "0KiB/60s"
--
-- Also, it can be set to "true" (or other git config equivilants)
-- to enable ProbeStallDetection.
-- And "false" (and other git config equivilants) explicitly
-- disable stall detection.
parseStallDetection :: String -> Either String (Maybe StallDetection)
parseStallDetection :: String -> Either String StallDetection
parseStallDetection s = case isTrueFalse s of
Nothing -> do
let (bs, ds) = separate (== '/') s
b <- maybe
(Left $ "Unable to parse stall detection amount " ++ bs)
Right
(readSize dataUnits bs)
d <- parseDuration ds
return (Just (StallDetection b d))
Just True -> Right (Just ProbeStallDetection)
Just False -> Right (Just StallDetectionDisabled)
v <- parseBwRate s
Right (StallDetection v)
Just True -> Right ProbeStallDetection
Just False -> Right StallDetectionDisabled
parseBwRate :: String -> Either String BwRate
parseBwRate s = do
let (bs, ds) = separate (== '/') s
b <- maybe
(Left $ "Unable to parse bandwidth amount " ++ bs)
Right
(readSize dataUnits bs)
d <- parseDuration ds
Right (BwRate b d)

View file

@ -37,6 +37,7 @@ module Utility.Metered (
demeterCommandEnv,
avoidProgress,
rateLimitMeterUpdate,
bwLimitMeterUpdate,
Meter,
mkMeter,
setMeterTotalSize,
@ -51,6 +52,7 @@ import Utility.Percentage
import Utility.DataUnits
import Utility.HumanTime
import Utility.SimpleProtocol as Proto
import Utility.ThreadScheduler
import qualified Data.ByteString.Lazy as L
import qualified Data.ByteString as S
@ -380,6 +382,42 @@ rateLimitMeterUpdate delta (Meter totalsizev _ _ _) meterupdate = do
meterupdate n
else putMVar lastupdate prev
-- | Bandwidth limiting by inserting a delay at the point that a meter is
-- updated.
--
-- This will only work when the actions that use bandwidth are run in the
-- same process and thread as the call to the MeterUpdate.
--
-- For example, if the desired bandwidth is 100kb/s, and over the past
-- 1/10th of a second, 30kb was sent, then the current bandwidth is
-- 300kb/s, 3x as fast as desired. So, after getting the next chunk,
-- pause for twice as long as it took to get it.
bwLimitMeterUpdate :: ByteSize -> Duration -> MeterUpdate -> IO MeterUpdate
bwLimitMeterUpdate bwlimit duration meterupdate
| bwlimit <= 0 = return meterupdate
| otherwise = do
nowtime <- getPOSIXTime
mv <- newMVar (nowtime, 0)
return (mu mv)
where
mu mv n@(BytesProcessed i) = do
endtime <- getPOSIXTime
(starttime, previ) <- takeMVar mv
let runtime = endtime - starttime
let currbw = fromIntegral (i - previ) / runtime
let pausescale = if currbw > bwlimit'
then (currbw / bwlimit') - 1
else 0
unboundDelay (floor (runtime * pausescale * msecs))
meterupdate n
nowtime <- getPOSIXTime
putMVar mv (nowtime, i)
bwlimit' = fromIntegral (bwlimit * durationSeconds duration)
msecs = fromIntegral oneSecond
data Meter = Meter (MVar (Maybe TotalSize)) (MVar MeterState) (MVar String) DisplayMeter
data MeterState = MeterState

View file

@ -1384,6 +1384,18 @@ Remotes are configured using these settings in `.git/config`.
When making multiple retries of the same transfer, the delay
doubles after each retry. (default 1)
* `remote.<name>.annex-bwlimit`, `annex.bwlimit`
This can be used to limit how much bandwidth is used for a transfer
from or to a remote.
For example, to limit transfers to 1 mebibyte per second:
`git config annex.bwlimit "1MiB"`
This will work with many remotes, including git remotes, but not
for remotes where the transfer is run by a separate program than
git-annex.
* `remote.<name>.annex-stalldetecton`, `annex.stalldetection`
Configuring this lets stalled or too-slow transfers be detected, and

View file

@ -10,5 +10,26 @@ works, it will probably work to put the delay in there. --[[Joey]]
[[confirmed]]
> Implmentation in progress in the `bwlimit` branch. Seems to work, but see
> commit message for what still needs to be done. --[[Joey]]
> Implemented and works well.
>
> A local git remote, when resuming an interrupted
> transfer, has to hash the file (with default annex.verify settings),
> and that hashing updates the progress bar, and so the bwlimit can kick
> in and slow down that initial hashing, before any data copying begins.
> This seems perhaps ok; if you've bwlimited a local git remote,
> remote you're wanting to limit disk IO. Only reason it might not be ok
> is if the intent is to limit IO to the disk containing the remote
> but not the one containing the annex repo. (This also probably
> holds for the directory special remote.)
>
> Other remotes, including git over ssh, when resuming don't have that
> problem. Looks like chunked special remotes narrowly avoid it, just
> because their implementation choose to not do incremental verification
> when resuming. It might be worthwhile to differentiate between progress
> updates for incremental verification setup and for actual transfers, and
> only rate limit the latter, just to avoid fragility in the code.
> I have not done so yet though, and am closing this..
> --[[Joey]]
[[done]]