diff --git a/Annex/Import.hs b/Annex/Import.hs index 453965ba79..2d15c11b99 100644 --- a/Annex/Import.hs +++ b/Annex/Import.hs @@ -461,8 +461,9 @@ importKeys remote importtreeconfig importcontent thirdpartypopulated importablec , providedMimeEncoding = Nothing , providedLinkType = Nothing } + let bwlimit = remoteAnnexBwLimit (Remote.gitconfig remote) islargefile <- checkMatcher' matcher mi mempty - metered Nothing sz $ const $ if islargefile + metered Nothing sz bwlimit $ const $ if islargefile then doimportlarge importkey cidmap db loc cid sz f else doimportsmall cidmap db loc cid sz @@ -557,11 +558,12 @@ importKeys remote importtreeconfig importcontent thirdpartypopulated importablec Left e -> do warning (show e) return Nothing + let bwlimit = remoteAnnexBwLimit (Remote.gitconfig remote) checkDiskSpaceToGet tmpkey Nothing $ notifyTransfer Download af $ download' (Remote.uuid remote) tmpkey af Nothing stdRetry $ \p -> withTmp tmpkey $ \tmpfile -> - metered (Just p) tmpkey $ + metered (Just p) tmpkey bwlimit $ const (rundownload tmpfile) where tmpkey = importKey cid sz diff --git a/Annex/StallDetection.hs b/Annex/StallDetection.hs index 02540a4732..9c095b4c86 100644 --- a/Annex/StallDetection.hs +++ b/Annex/StallDetection.hs @@ -22,7 +22,7 @@ import Control.Monad.IO.Class (MonadIO) detectStalls :: (Monad m, MonadIO m) => Maybe StallDetection -> TVar (Maybe BytesProcessed) -> m () -> m () detectStalls Nothing _ _ = noop detectStalls (Just StallDetectionDisabled) _ _ = noop -detectStalls (Just (StallDetection minsz duration)) metervar onstall = +detectStalls (Just (StallDetection (BwRate minsz duration))) metervar onstall = detectStalls' minsz duration metervar onstall Nothing detectStalls (Just ProbeStallDetection) metervar onstall = do -- Only do stall detection once the progress is confirmed to be diff --git a/Annex/YoutubeDl.hs b/Annex/YoutubeDl.hs index 5cbc9e7f3b..52219827ae 100644 --- a/Annex/YoutubeDl.hs +++ b/Annex/YoutubeDl.hs @@ -96,7 +96,7 @@ youtubeDl' url workdir p uo -- with the size, which is why it's important the -- meter is passed into commandMeter' let unknownsize = Nothing :: Maybe FileSize - ok <- metered (Just p) unknownsize $ \meter meterupdate -> + ok <- metered (Just p) unknownsize Nothing $ \meter meterupdate -> liftIO $ commandMeter' parseYoutubeDlProgress oh (Just meter) meterupdate cmd opts (\pr -> pr { cwd = Just workdir }) diff --git a/CHANGELOG b/CHANGELOG index 9589632065..7a1d4066e5 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,8 @@ git-annex (8.20210904) UNRELEASED; urgency=medium + * Added annex.bwlimit and remote.name.annex-bwlimit config to limit + the bandwidth of transfers. It works for git remotes and many + but not all special remotes. * Bug fix: Git configs such as annex.verify were incorrectly overriding per-remote git configs such as remote.name.annex-verify. (Reversion in version 4.20130323) diff --git a/Command/Add.hs b/Command/Add.hs index 0fe4351ab0..4da6f7354f 100644 --- a/Command/Add.hs +++ b/Command/Add.hs @@ -192,7 +192,7 @@ perform o file addunlockedmatcher = withOtherTmp $ \tmpdir -> do } ld <- lockDown cfg (fromRawFilePath file) let sizer = keySource <$> ld - v <- metered Nothing sizer $ \_meter meterupdate -> + v <- metered Nothing sizer Nothing $ \_meter meterupdate -> ingestAdd (checkGitIgnoreOption o) meterupdate ld finish v where diff --git a/Messages/Progress.hs b/Messages/Progress.hs index 397aebbb37..832dd9e0f3 100644 --- a/Messages/Progress.hs +++ b/Messages/Progress.hs @@ -1,6 +1,6 @@ {- git-annex progress output - - - Copyright 2010-2020 Joey Hess + - Copyright 2010-2021 Joey Hess - - Licensed under the GNU AGPL version 3 or higher. -} @@ -18,6 +18,7 @@ import Types import Types.Messages import Types.Key import Types.KeySource +import Types.StallDetection (BwRate(..)) import Utility.InodeCache import qualified Messages.JSON as JSON import Messages.Concurrent @@ -72,11 +73,12 @@ metered :: MeterSize sizer => Maybe MeterUpdate -> sizer + -> Maybe BwRate -> (Meter -> MeterUpdate -> Annex a) -> Annex a -metered othermeter sizer a = withMessageState $ \st -> do +metered othermeterupdate sizer bwlimit a = withMessageState $ \st -> do sz <- getMeterSize sizer - metered' st setclear othermeter sz showOutput a + metered' st setclear othermeterupdate sz bwlimit showOutput a where setclear c = Annex.changeState $ \st -> st { Annex.output = (Annex.output st) { clearProgressMeter = c } } @@ -90,11 +92,12 @@ metered' -- NormalOutput. -> Maybe MeterUpdate -> Maybe TotalSize + -> Maybe BwRate -> m () -- ^ this should run showOutput -> (Meter -> MeterUpdate -> m a) -> m a -metered' st setclear othermeter msize showoutput a = go st +metered' st setclear othermeterupdate msize bwlimit showoutput a = go st where go (MessageState { outputType = QuietOutput }) = nometer go (MessageState { outputType = NormalOutput, concurrentOutputEnabled = False }) = do @@ -105,7 +108,7 @@ metered' st setclear othermeter msize showoutput a = go st setclear clear m <- liftIO $ rateLimitMeterUpdate consoleratelimit meter $ updateMeter meter - r <- a meter (combinemeter m) + r <- a meter =<< mkmeterupdate m setclear noop liftIO clear return r @@ -116,7 +119,7 @@ metered' st setclear othermeter msize showoutput a = go st in Regions.setConsoleRegion r ('\n' : s) m <- liftIO $ rateLimitMeterUpdate consoleratelimit meter $ updateMeter meter - a meter (combinemeter m) + a meter =<< mkmeterupdate m go (MessageState { outputType = JSONOutput jsonoptions }) | jsonProgress jsonoptions = do let buf = jsonBuffer st @@ -124,7 +127,7 @@ metered' st setclear othermeter msize showoutput a = go st JSON.progress buf msize' (meterBytesProcessed new) m <- liftIO $ rateLimitMeterUpdate jsonratelimit meter $ updateMeter meter - a meter (combinemeter m) + a meter =<< mkmeterupdate m | otherwise = nometer go (MessageState { outputType = SerializedOutput h _ }) = do liftIO $ outputSerialized h BeginProgressMeter @@ -144,16 +147,21 @@ metered' st setclear othermeter msize showoutput a = go st meterBytesProcessed new m <- liftIO $ rateLimitMeterUpdate minratelimit meter $ updateMeter meter - a meter (combinemeter m) + (a meter =<< mkmeterupdate m) `finally` (liftIO $ outputSerialized h EndProgressMeter) nometer = do dummymeter <- liftIO $ mkMeter Nothing $ \_ _ _ _ -> return () - a dummymeter (combinemeter (const noop)) + a dummymeter =<< mkmeterupdate (const noop) - combinemeter m = case othermeter of - Nothing -> m - Just om -> combineMeterUpdate m om + mkmeterupdate m = + let mu = case othermeterupdate of + Nothing -> m + Just om -> combineMeterUpdate m om + in case bwlimit of + Nothing -> return mu + Just (BwRate sz duration) -> liftIO $ + bwLimitMeterUpdate sz duration mu consoleratelimit = 0.2 @@ -164,7 +172,7 @@ metered' st setclear othermeter msize showoutput a = go st {- Poll file size to display meter. -} meteredFile :: FilePath -> Maybe MeterUpdate -> Key -> Annex a -> Annex a meteredFile file combinemeterupdate key a = - metered combinemeterupdate key $ \_ p -> + metered combinemeterupdate key Nothing $ \_ p -> watchFileSize file p a {- Progress dots. -} diff --git a/Messages/Serialized.hs b/Messages/Serialized.hs index 0f5faddc9f..3f20d1a27d 100644 --- a/Messages/Serialized.hs +++ b/Messages/Serialized.hs @@ -68,7 +68,7 @@ relaySerializedOutput getso sendsor meterreport runannex = go Nothing let setclear = const noop -- Display a progress meter while running, until -- the meter ends or a final value is returned. - metered' ost setclear Nothing Nothing (runannex showOutput) + metered' ost setclear Nothing Nothing Nothing (runannex showOutput) (\meter meterupdate -> loop (Just (meter, meterupdate))) >>= \case Right r -> return (Right r) diff --git a/Remote/Git.hs b/Remote/Git.hs index 81a6bc5fdf..7f615d114e 100644 --- a/Remote/Git.hs +++ b/Remote/Git.hs @@ -550,6 +550,7 @@ copyFromRemote'' repo forcersync r st@(State connpool _ _ _ _) key file dest met | not $ Git.repoIsUrl repo = guardUsable repo (giveup "cannot access remote") $ do u <- getUUID hardlink <- wantHardLink + let bwlimit = remoteAnnexBwLimit (gitconfig r) -- run copy from perspective of remote onLocalFast st $ Annex.Content.prepSendAnnex' key >>= \case Just (object, check) -> do @@ -559,7 +560,7 @@ copyFromRemote'' repo forcersync r st@(State connpool _ _ _ _) key file dest met copier <- mkFileCopier hardlink st (ok, v) <- runTransfer (Transfer Download u (fromKey id key)) file Nothing stdRetry $ \p -> - metered (Just (combineMeterUpdate p meterupdate)) key $ \_ p' -> + metered (Just (combineMeterUpdate p meterupdate)) key bwlimit $ \_ p' -> copier object dest key p' checksuccess vc if ok then return v @@ -572,6 +573,7 @@ copyFromRemote'' repo forcersync r st@(State connpool _ _ _ _) key file dest met then return v else giveup "failed to retrieve content from remote" else P2PHelper.retrieve + (gitconfig r) (\p -> Ssh.runProto r connpool (return (False, UnVerified)) (fallback p)) key file dest meterupdate vc | otherwise = giveup "copying from non-ssh, non-http remote not supported" @@ -680,7 +682,7 @@ copyToRemote' repo r st@(State connpool duc _ _ _) key file meterupdate , giveup "remote does not have expected annex.uuid value" ) | Git.repoIsSsh repo = commitOnCleanup repo r st $ - P2PHelper.store + P2PHelper.store (gitconfig r) (Ssh.runProto r connpool (return False) . copyremotefallback) key file meterupdate @@ -694,6 +696,7 @@ copyToRemote' repo r st@(State connpool duc _ _ _) key file meterupdate checkio <- Annex.withCurrentState check u <- getUUID hardlink <- wantHardLink + let bwlimit = remoteAnnexBwLimit (gitconfig r) -- run copy from perspective of remote res <- onLocalFast st $ ifM (Annex.Content.inAnnex key) ( return True @@ -705,7 +708,7 @@ copyToRemote' repo r st@(State connpool duc _ _ _) key file meterupdate Just err -> giveup err Nothing -> return True res <- logStatusAfter key $ Annex.Content.getViaTmp rsp verify key file $ \dest -> - metered (Just (combineMeterUpdate meterupdate p)) key $ \_ p' -> + metered (Just (combineMeterUpdate meterupdate p)) key bwlimit $ \_ p' -> copier object (fromRawFilePath dest) key p' checksuccess verify Annex.Content.saveState True return res diff --git a/Remote/Helper/P2P.hs b/Remote/Helper/P2P.hs index 647bc6b016..2dd641a67e 100644 --- a/Remote/Helper/P2P.hs +++ b/Remote/Helper/P2P.hs @@ -31,19 +31,21 @@ type ProtoConnRunner c = forall a. P2P.Proto a -> ClosableConnection c -> Annex -- the pool when done. type WithConn a c = (ClosableConnection c -> Annex (ClosableConnection c, a)) -> Annex a -store :: (MeterUpdate -> ProtoRunner Bool) -> Key -> AssociatedFile -> MeterUpdate -> Annex () -store runner k af p = do +store :: RemoteGitConfig -> (MeterUpdate -> ProtoRunner Bool) -> Key -> AssociatedFile -> MeterUpdate -> Annex () +store gc runner k af p = do let sizer = KeySizer k (fmap (toRawFilePath . fst) <$> prepSendAnnex k) - metered (Just p) sizer $ \_ p' -> + let bwlimit = remoteAnnexBwLimit gc + metered (Just p) sizer bwlimit $ \_ p' -> runner p' (P2P.put k af p') >>= \case Just True -> return () Just False -> giveup "Transfer failed" Nothing -> remoteUnavail -retrieve :: (MeterUpdate -> ProtoRunner (Bool, Verification)) -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> VerifyConfig -> Annex Verification -retrieve runner k af dest p verifyconfig = do +retrieve :: RemoteGitConfig -> (MeterUpdate -> ProtoRunner (Bool, Verification)) -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> VerifyConfig -> Annex Verification +retrieve gc runner k af dest p verifyconfig = do iv <- startVerifyKeyContentIncrementally verifyconfig k - metered (Just p) k $ \m p' -> + let bwlimit = remoteAnnexBwLimit gc + metered (Just p) k bwlimit $ \m p' -> runner p' (P2P.get dest k iv af m p') >>= \case Just (True, v) -> return v Just (False, _) -> giveup "Transfer failed" diff --git a/Remote/Helper/Special.hs b/Remote/Helper/Special.hs index 86ba114d8c..8f595865db 100644 --- a/Remote/Helper/Special.hs +++ b/Remote/Helper/Special.hs @@ -261,8 +261,9 @@ specialRemote' cfg c storer retriever remover checkpresent baser = encr chunkconfig = chunkConfig cfg displayprogress p k srcfile a - | displayProgress cfg = - metered (Just p) (KeySizer k (pure (fmap toRawFilePath srcfile))) (const a) + | displayProgress cfg = do + let bwlimit = remoteAnnexBwLimit (gitconfig baser) + metered (Just p) (KeySizer k (pure (fmap toRawFilePath srcfile))) bwlimit (const a) | otherwise = a p withBytes :: ContentSource -> (L.ByteString -> Annex a) -> Annex a diff --git a/Remote/P2P.hs b/Remote/P2P.hs index 21cf5b42e1..7f3817c1bf 100644 --- a/Remote/P2P.hs +++ b/Remote/P2P.hs @@ -55,8 +55,8 @@ chainGen addr r u rc gc rs = do { uuid = u , cost = cst , name = Git.repoDescribe r - , storeKey = store (const protorunner) - , retrieveKeyFile = retrieve (const protorunner) + , storeKey = store gc (const protorunner) + , retrieveKeyFile = retrieve gc (const protorunner) , retrieveKeyFileCheap = Nothing , retrievalSecurityPolicy = RetrievalAllKeysSecure , removeKey = remove protorunner diff --git a/Types/GitConfig.hs b/Types/GitConfig.hs index 5c94743e89..46362de897 100644 --- a/Types/GitConfig.hs +++ b/Types/GitConfig.hs @@ -339,6 +339,7 @@ data RemoteGitConfig = RemoteGitConfig , remoteAnnexForwardRetry :: Maybe Integer , remoteAnnexRetryDelay :: Maybe Seconds , remoteAnnexStallDetection :: Maybe StallDetection + , remoteAnnexBwLimit :: Maybe BwRate , remoteAnnexAllowUnverifiedDownloads :: Bool , remoteAnnexConfigUUID :: Maybe UUID @@ -404,8 +405,11 @@ extractRemoteGitConfig r remotename = do , remoteAnnexRetryDelay = Seconds <$> getmayberead "retrydelay" , remoteAnnexStallDetection = - either (const Nothing) id . parseStallDetection + either (const Nothing) Just . parseStallDetection =<< getmaybe "stalldetection" + , remoteAnnexBwLimit = do + sz <- readSize dataUnits =<< getmaybe "bwlimit" + return (BwRate sz (Duration 1)) , remoteAnnexAllowUnverifiedDownloads = (== Just "ACKTHPPT") $ getmaybe ("security-allow-unverified-downloads") , remoteAnnexConfigUUID = toUUID <$> getmaybe "config-uuid" diff --git a/Types/StallDetection.hs b/Types/StallDetection.hs index 1cfc098a5d..13d88699f2 100644 --- a/Types/StallDetection.hs +++ b/Types/StallDetection.hs @@ -1,4 +1,4 @@ -{- types for stall detection +{- types for stall detection and banwdith rates - - Copyright 2020-2021 Joey Hess - @@ -13,7 +13,7 @@ import Utility.Misc import Git.Config data StallDetection - = StallDetection ByteSize Duration + = StallDetection BwRate -- ^ Unless the given number of bytes have been sent over the given -- amount of time, there's a stall. | ProbeStallDetection @@ -22,21 +22,29 @@ data StallDetection | StallDetectionDisabled deriving (Show) +data BwRate = BwRate ByteSize Duration + deriving (Show) + -- Parse eg, "0KiB/60s" -- -- Also, it can be set to "true" (or other git config equivilants) -- to enable ProbeStallDetection. -- And "false" (and other git config equivilants) explicitly -- disable stall detection. -parseStallDetection :: String -> Either String (Maybe StallDetection) +parseStallDetection :: String -> Either String StallDetection parseStallDetection s = case isTrueFalse s of Nothing -> do - let (bs, ds) = separate (== '/') s - b <- maybe - (Left $ "Unable to parse stall detection amount " ++ bs) - Right - (readSize dataUnits bs) - d <- parseDuration ds - return (Just (StallDetection b d)) - Just True -> Right (Just ProbeStallDetection) - Just False -> Right (Just StallDetectionDisabled) + v <- parseBwRate s + Right (StallDetection v) + Just True -> Right ProbeStallDetection + Just False -> Right StallDetectionDisabled + +parseBwRate :: String -> Either String BwRate +parseBwRate s = do + let (bs, ds) = separate (== '/') s + b <- maybe + (Left $ "Unable to parse bandwidth amount " ++ bs) + Right + (readSize dataUnits bs) + d <- parseDuration ds + Right (BwRate b d) diff --git a/Utility/Metered.hs b/Utility/Metered.hs index a7c9c37d7f..1e12444d60 100644 --- a/Utility/Metered.hs +++ b/Utility/Metered.hs @@ -37,6 +37,7 @@ module Utility.Metered ( demeterCommandEnv, avoidProgress, rateLimitMeterUpdate, + bwLimitMeterUpdate, Meter, mkMeter, setMeterTotalSize, @@ -51,6 +52,7 @@ import Utility.Percentage import Utility.DataUnits import Utility.HumanTime import Utility.SimpleProtocol as Proto +import Utility.ThreadScheduler import qualified Data.ByteString.Lazy as L import qualified Data.ByteString as S @@ -380,6 +382,42 @@ rateLimitMeterUpdate delta (Meter totalsizev _ _ _) meterupdate = do meterupdate n else putMVar lastupdate prev +-- | Bandwidth limiting by inserting a delay at the point that a meter is +-- updated. +-- +-- This will only work when the actions that use bandwidth are run in the +-- same process and thread as the call to the MeterUpdate. +-- +-- For example, if the desired bandwidth is 100kb/s, and over the past +-- 1/10th of a second, 30kb was sent, then the current bandwidth is +-- 300kb/s, 3x as fast as desired. So, after getting the next chunk, +-- pause for twice as long as it took to get it. +bwLimitMeterUpdate :: ByteSize -> Duration -> MeterUpdate -> IO MeterUpdate +bwLimitMeterUpdate bwlimit duration meterupdate + | bwlimit <= 0 = return meterupdate + | otherwise = do + nowtime <- getPOSIXTime + mv <- newMVar (nowtime, 0) + return (mu mv) + where + mu mv n@(BytesProcessed i) = do + endtime <- getPOSIXTime + (starttime, previ) <- takeMVar mv + + let runtime = endtime - starttime + let currbw = fromIntegral (i - previ) / runtime + let pausescale = if currbw > bwlimit' + then (currbw / bwlimit') - 1 + else 0 + unboundDelay (floor (runtime * pausescale * msecs)) + meterupdate n + + nowtime <- getPOSIXTime + putMVar mv (nowtime, i) + + bwlimit' = fromIntegral (bwlimit * durationSeconds duration) + msecs = fromIntegral oneSecond + data Meter = Meter (MVar (Maybe TotalSize)) (MVar MeterState) (MVar String) DisplayMeter data MeterState = MeterState diff --git a/doc/git-annex.mdwn b/doc/git-annex.mdwn index 148d321bfd..def6ec1dc4 100644 --- a/doc/git-annex.mdwn +++ b/doc/git-annex.mdwn @@ -1384,6 +1384,18 @@ Remotes are configured using these settings in `.git/config`. When making multiple retries of the same transfer, the delay doubles after each retry. (default 1) +* `remote..annex-bwlimit`, `annex.bwlimit` + + This can be used to limit how much bandwidth is used for a transfer + from or to a remote. + + For example, to limit transfers to 1 mebibyte per second: + `git config annex.bwlimit "1MiB"` + + This will work with many remotes, including git remotes, but not + for remotes where the transfer is run by a separate program than + git-annex. + * `remote..annex-stalldetecton`, `annex.stalldetection` Configuring this lets stalled or too-slow transfers be detected, and diff --git a/doc/todo/bwlimit.mdwn b/doc/todo/bwlimit.mdwn index da1deffe69..675896cfe0 100644 --- a/doc/todo/bwlimit.mdwn +++ b/doc/todo/bwlimit.mdwn @@ -10,5 +10,26 @@ works, it will probably work to put the delay in there. --[[Joey]] [[confirmed]] -> Implmentation in progress in the `bwlimit` branch. Seems to work, but see -> commit message for what still needs to be done. --[[Joey]] +> Implemented and works well. +> +> A local git remote, when resuming an interrupted +> transfer, has to hash the file (with default annex.verify settings), +> and that hashing updates the progress bar, and so the bwlimit can kick +> in and slow down that initial hashing, before any data copying begins. +> This seems perhaps ok; if you've bwlimited a local git remote, +> remote you're wanting to limit disk IO. Only reason it might not be ok +> is if the intent is to limit IO to the disk containing the remote +> but not the one containing the annex repo. (This also probably +> holds for the directory special remote.) +> +> Other remotes, including git over ssh, when resuming don't have that +> problem. Looks like chunked special remotes narrowly avoid it, just +> because their implementation choose to not do incremental verification +> when resuming. It might be worthwhile to differentiate between progress +> updates for incremental verification setup and for actual transfers, and +> only rate limit the latter, just to avoid fragility in the code. +> I have not done so yet though, and am closing this.. +> --[[Joey]] + +[[done]] +