diff --git a/Annex/Import.hs b/Annex/Import.hs index 453965ba79..010808ef3f 100644 --- a/Annex/Import.hs +++ b/Annex/Import.hs @@ -461,8 +461,9 @@ importKeys remote importtreeconfig importcontent thirdpartypopulated importablec , providedMimeEncoding = Nothing , providedLinkType = Nothing } + bwlimit <- bwLimit (Remote.gitconfig remote) islargefile <- checkMatcher' matcher mi mempty - metered Nothing sz $ const $ if islargefile + metered Nothing sz bwlimit $ const $ if islargefile then doimportlarge importkey cidmap db loc cid sz f else doimportsmall cidmap db loc cid sz @@ -557,11 +558,12 @@ importKeys remote importtreeconfig importcontent thirdpartypopulated importablec Left e -> do warning (show e) return Nothing + bwlimit <- bwLimit (Remote.gitconfig remote) checkDiskSpaceToGet tmpkey Nothing $ notifyTransfer Download af $ download' (Remote.uuid remote) tmpkey af Nothing stdRetry $ \p -> withTmp tmpkey $ \tmpfile -> - metered (Just p) tmpkey $ + metered (Just p) tmpkey bwlimit $ const (rundownload tmpfile) where tmpkey = importKey cid sz diff --git a/Annex/StallDetection.hs b/Annex/StallDetection.hs index 02540a4732..9c095b4c86 100644 --- a/Annex/StallDetection.hs +++ b/Annex/StallDetection.hs @@ -22,7 +22,7 @@ import Control.Monad.IO.Class (MonadIO) detectStalls :: (Monad m, MonadIO m) => Maybe StallDetection -> TVar (Maybe BytesProcessed) -> m () -> m () detectStalls Nothing _ _ = noop detectStalls (Just StallDetectionDisabled) _ _ = noop -detectStalls (Just (StallDetection minsz duration)) metervar onstall = +detectStalls (Just (StallDetection (BwRate minsz duration))) metervar onstall = detectStalls' minsz duration metervar onstall Nothing detectStalls (Just ProbeStallDetection) metervar onstall = do -- Only do stall detection once the progress is confirmed to be diff --git a/Annex/Transfer.hs b/Annex/Transfer.hs index c57dbaf3ec..c2c3b582ec 100644 --- a/Annex/Transfer.hs +++ b/Annex/Transfer.hs @@ -20,6 +20,7 @@ module Annex.Transfer ( stdRetry, pickRemote, stallDetection, + bwLimit, ) where import Annex.Common @@ -406,3 +407,9 @@ stallDetection r = maybe globalcfg (pure . Just) remotecfg where globalcfg = annexStallDetection <$> Annex.getGitConfig remotecfg = remoteAnnexStallDetection $ Remote.gitconfig r + +bwLimit :: RemoteGitConfig -> Annex (Maybe BwRate) +bwLimit gc = maybe globalcfg (pure . Just) remotecfg + where + globalcfg = annexBwLimit <$> Annex.getGitConfig + remotecfg = remoteAnnexBwLimit gc diff --git a/Annex/YoutubeDl.hs b/Annex/YoutubeDl.hs index 5cbc9e7f3b..52219827ae 100644 --- a/Annex/YoutubeDl.hs +++ b/Annex/YoutubeDl.hs @@ -96,7 +96,7 @@ youtubeDl' url workdir p uo -- with the size, which is why it's important the -- meter is passed into commandMeter' let unknownsize = Nothing :: Maybe FileSize - ok <- metered (Just p) unknownsize $ \meter meterupdate -> + ok <- metered (Just p) unknownsize Nothing $ \meter meterupdate -> liftIO $ commandMeter' parseYoutubeDlProgress oh (Just meter) meterupdate cmd opts (\pr -> pr { cwd = Just workdir }) diff --git a/CHANGELOG b/CHANGELOG index 538ae62a49..fd9f1d06ef 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,7 @@ git-annex (8.20210904) UNRELEASED; urgency=medium + * Added annex.bwlimit and remote.name.annex-bwlimit config that works + for git remotes and many but not all special remotes. * borg: Avoid trying to extract xattrs, ACLS, and bsdflags when retrieving from a borg repository. diff --git a/Command/Add.hs b/Command/Add.hs index 0fe4351ab0..4da6f7354f 100644 --- a/Command/Add.hs +++ b/Command/Add.hs @@ -192,7 +192,7 @@ perform o file addunlockedmatcher = withOtherTmp $ \tmpdir -> do } ld <- lockDown cfg (fromRawFilePath file) let sizer = keySource <$> ld - v <- metered Nothing sizer $ \_meter meterupdate -> + v <- metered Nothing sizer Nothing $ \_meter meterupdate -> ingestAdd (checkGitIgnoreOption o) meterupdate ld finish v where diff --git a/Messages/Progress.hs b/Messages/Progress.hs index 397aebbb37..832dd9e0f3 100644 --- a/Messages/Progress.hs +++ b/Messages/Progress.hs @@ -1,6 +1,6 @@ {- git-annex progress output - - - Copyright 2010-2020 Joey Hess + - Copyright 2010-2021 Joey Hess - - Licensed under the GNU AGPL version 3 or higher. -} @@ -18,6 +18,7 @@ import Types import Types.Messages import Types.Key import Types.KeySource +import Types.StallDetection (BwRate(..)) import Utility.InodeCache import qualified Messages.JSON as JSON import Messages.Concurrent @@ -72,11 +73,12 @@ metered :: MeterSize sizer => Maybe MeterUpdate -> sizer + -> Maybe BwRate -> (Meter -> MeterUpdate -> Annex a) -> Annex a -metered othermeter sizer a = withMessageState $ \st -> do +metered othermeterupdate sizer bwlimit a = withMessageState $ \st -> do sz <- getMeterSize sizer - metered' st setclear othermeter sz showOutput a + metered' st setclear othermeterupdate sz bwlimit showOutput a where setclear c = Annex.changeState $ \st -> st { Annex.output = (Annex.output st) { clearProgressMeter = c } } @@ -90,11 +92,12 @@ metered' -- NormalOutput. -> Maybe MeterUpdate -> Maybe TotalSize + -> Maybe BwRate -> m () -- ^ this should run showOutput -> (Meter -> MeterUpdate -> m a) -> m a -metered' st setclear othermeter msize showoutput a = go st +metered' st setclear othermeterupdate msize bwlimit showoutput a = go st where go (MessageState { outputType = QuietOutput }) = nometer go (MessageState { outputType = NormalOutput, concurrentOutputEnabled = False }) = do @@ -105,7 +108,7 @@ metered' st setclear othermeter msize showoutput a = go st setclear clear m <- liftIO $ rateLimitMeterUpdate consoleratelimit meter $ updateMeter meter - r <- a meter (combinemeter m) + r <- a meter =<< mkmeterupdate m setclear noop liftIO clear return r @@ -116,7 +119,7 @@ metered' st setclear othermeter msize showoutput a = go st in Regions.setConsoleRegion r ('\n' : s) m <- liftIO $ rateLimitMeterUpdate consoleratelimit meter $ updateMeter meter - a meter (combinemeter m) + a meter =<< mkmeterupdate m go (MessageState { outputType = JSONOutput jsonoptions }) | jsonProgress jsonoptions = do let buf = jsonBuffer st @@ -124,7 +127,7 @@ metered' st setclear othermeter msize showoutput a = go st JSON.progress buf msize' (meterBytesProcessed new) m <- liftIO $ rateLimitMeterUpdate jsonratelimit meter $ updateMeter meter - a meter (combinemeter m) + a meter =<< mkmeterupdate m | otherwise = nometer go (MessageState { outputType = SerializedOutput h _ }) = do liftIO $ outputSerialized h BeginProgressMeter @@ -144,16 +147,21 @@ metered' st setclear othermeter msize showoutput a = go st meterBytesProcessed new m <- liftIO $ rateLimitMeterUpdate minratelimit meter $ updateMeter meter - a meter (combinemeter m) + (a meter =<< mkmeterupdate m) `finally` (liftIO $ outputSerialized h EndProgressMeter) nometer = do dummymeter <- liftIO $ mkMeter Nothing $ \_ _ _ _ -> return () - a dummymeter (combinemeter (const noop)) + a dummymeter =<< mkmeterupdate (const noop) - combinemeter m = case othermeter of - Nothing -> m - Just om -> combineMeterUpdate m om + mkmeterupdate m = + let mu = case othermeterupdate of + Nothing -> m + Just om -> combineMeterUpdate m om + in case bwlimit of + Nothing -> return mu + Just (BwRate sz duration) -> liftIO $ + bwLimitMeterUpdate sz duration mu consoleratelimit = 0.2 @@ -164,7 +172,7 @@ metered' st setclear othermeter msize showoutput a = go st {- Poll file size to display meter. -} meteredFile :: FilePath -> Maybe MeterUpdate -> Key -> Annex a -> Annex a meteredFile file combinemeterupdate key a = - metered combinemeterupdate key $ \_ p -> + metered combinemeterupdate key Nothing $ \_ p -> watchFileSize file p a {- Progress dots. -} diff --git a/Messages/Serialized.hs b/Messages/Serialized.hs index 0f5faddc9f..3f20d1a27d 100644 --- a/Messages/Serialized.hs +++ b/Messages/Serialized.hs @@ -68,7 +68,7 @@ relaySerializedOutput getso sendsor meterreport runannex = go Nothing let setclear = const noop -- Display a progress meter while running, until -- the meter ends or a final value is returned. - metered' ost setclear Nothing Nothing (runannex showOutput) + metered' ost setclear Nothing Nothing Nothing (runannex showOutput) (\meter meterupdate -> loop (Just (meter, meterupdate))) >>= \case Right r -> return (Right r) diff --git a/Remote/Git.hs b/Remote/Git.hs index 81a6bc5fdf..65d388bf92 100644 --- a/Remote/Git.hs +++ b/Remote/Git.hs @@ -550,6 +550,7 @@ copyFromRemote'' repo forcersync r st@(State connpool _ _ _ _) key file dest met | not $ Git.repoIsUrl repo = guardUsable repo (giveup "cannot access remote") $ do u <- getUUID hardlink <- wantHardLink + bwlimit <- bwLimit (gitconfig r) -- run copy from perspective of remote onLocalFast st $ Annex.Content.prepSendAnnex' key >>= \case Just (object, check) -> do @@ -559,7 +560,7 @@ copyFromRemote'' repo forcersync r st@(State connpool _ _ _ _) key file dest met copier <- mkFileCopier hardlink st (ok, v) <- runTransfer (Transfer Download u (fromKey id key)) file Nothing stdRetry $ \p -> - metered (Just (combineMeterUpdate p meterupdate)) key $ \_ p' -> + metered (Just (combineMeterUpdate p meterupdate)) key bwlimit $ \_ p' -> copier object dest key p' checksuccess vc if ok then return v @@ -572,6 +573,7 @@ copyFromRemote'' repo forcersync r st@(State connpool _ _ _ _) key file dest met then return v else giveup "failed to retrieve content from remote" else P2PHelper.retrieve + (gitconfig r) (\p -> Ssh.runProto r connpool (return (False, UnVerified)) (fallback p)) key file dest meterupdate vc | otherwise = giveup "copying from non-ssh, non-http remote not supported" @@ -680,7 +682,7 @@ copyToRemote' repo r st@(State connpool duc _ _ _) key file meterupdate , giveup "remote does not have expected annex.uuid value" ) | Git.repoIsSsh repo = commitOnCleanup repo r st $ - P2PHelper.store + P2PHelper.store (gitconfig r) (Ssh.runProto r connpool (return False) . copyremotefallback) key file meterupdate @@ -694,6 +696,7 @@ copyToRemote' repo r st@(State connpool duc _ _ _) key file meterupdate checkio <- Annex.withCurrentState check u <- getUUID hardlink <- wantHardLink + bwlimit <- bwLimit (gitconfig r) -- run copy from perspective of remote res <- onLocalFast st $ ifM (Annex.Content.inAnnex key) ( return True @@ -705,7 +708,7 @@ copyToRemote' repo r st@(State connpool duc _ _ _) key file meterupdate Just err -> giveup err Nothing -> return True res <- logStatusAfter key $ Annex.Content.getViaTmp rsp verify key file $ \dest -> - metered (Just (combineMeterUpdate meterupdate p)) key $ \_ p' -> + metered (Just (combineMeterUpdate meterupdate p)) key bwlimit $ \_ p' -> copier object (fromRawFilePath dest) key p' checksuccess verify Annex.Content.saveState True return res diff --git a/Remote/Helper/P2P.hs b/Remote/Helper/P2P.hs index 647bc6b016..0a9d41b9de 100644 --- a/Remote/Helper/P2P.hs +++ b/Remote/Helper/P2P.hs @@ -18,6 +18,7 @@ import Messages.Progress import Utility.Metered import Types.NumCopies import Annex.Verify +import Annex.Transfer import Control.Concurrent @@ -31,19 +32,21 @@ type ProtoConnRunner c = forall a. P2P.Proto a -> ClosableConnection c -> Annex -- the pool when done. type WithConn a c = (ClosableConnection c -> Annex (ClosableConnection c, a)) -> Annex a -store :: (MeterUpdate -> ProtoRunner Bool) -> Key -> AssociatedFile -> MeterUpdate -> Annex () -store runner k af p = do +store :: RemoteGitConfig -> (MeterUpdate -> ProtoRunner Bool) -> Key -> AssociatedFile -> MeterUpdate -> Annex () +store gc runner k af p = do let sizer = KeySizer k (fmap (toRawFilePath . fst) <$> prepSendAnnex k) - metered (Just p) sizer $ \_ p' -> + bwlimit <- bwLimit gc + metered (Just p) sizer bwlimit $ \_ p' -> runner p' (P2P.put k af p') >>= \case Just True -> return () Just False -> giveup "Transfer failed" Nothing -> remoteUnavail -retrieve :: (MeterUpdate -> ProtoRunner (Bool, Verification)) -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> VerifyConfig -> Annex Verification -retrieve runner k af dest p verifyconfig = do +retrieve :: RemoteGitConfig -> (MeterUpdate -> ProtoRunner (Bool, Verification)) -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> VerifyConfig -> Annex Verification +retrieve gc runner k af dest p verifyconfig = do iv <- startVerifyKeyContentIncrementally verifyconfig k - metered (Just p) k $ \m p' -> + bwlimit <- bwLimit gc + metered (Just p) k bwlimit $ \m p' -> runner p' (P2P.get dest k iv af m p') >>= \case Just (True, v) -> return v Just (False, _) -> giveup "Transfer failed" diff --git a/Remote/Helper/Special.hs b/Remote/Helper/Special.hs index 86ba114d8c..d221be040e 100644 --- a/Remote/Helper/Special.hs +++ b/Remote/Helper/Special.hs @@ -42,6 +42,7 @@ import Types.StoreRetrieve import Types.Remote import Annex.Verify import Annex.UUID +import Annex.Transfer import Config import Config.Cost import Utility.Metered @@ -261,8 +262,9 @@ specialRemote' cfg c storer retriever remover checkpresent baser = encr chunkconfig = chunkConfig cfg displayprogress p k srcfile a - | displayProgress cfg = - metered (Just p) (KeySizer k (pure (fmap toRawFilePath srcfile))) (const a) + | displayProgress cfg = do + bwlimit <- bwLimit (gitconfig baser) + metered (Just p) (KeySizer k (pure (fmap toRawFilePath srcfile))) bwlimit (const a) | otherwise = a p withBytes :: ContentSource -> (L.ByteString -> Annex a) -> Annex a diff --git a/Remote/P2P.hs b/Remote/P2P.hs index 21cf5b42e1..7f3817c1bf 100644 --- a/Remote/P2P.hs +++ b/Remote/P2P.hs @@ -55,8 +55,8 @@ chainGen addr r u rc gc rs = do { uuid = u , cost = cst , name = Git.repoDescribe r - , storeKey = store (const protorunner) - , retrieveKeyFile = retrieve (const protorunner) + , storeKey = store gc (const protorunner) + , retrieveKeyFile = retrieve gc (const protorunner) , retrieveKeyFileCheap = Nothing , retrievalSecurityPolicy = RetrievalAllKeysSecure , removeKey = remove protorunner diff --git a/Types/GitConfig.hs b/Types/GitConfig.hs index ab1060e060..64585cdffc 100644 --- a/Types/GitConfig.hs +++ b/Types/GitConfig.hs @@ -124,6 +124,7 @@ data GitConfig = GitConfig , annexForwardRetry :: Maybe Integer , annexRetryDelay :: Maybe Seconds , annexStallDetection :: Maybe StallDetection + , annexBwLimit :: Maybe BwRate , annexAllowedUrlSchemes :: S.Set Scheme , annexAllowedIPAddresses :: String , annexAllowUnverifiedDownloads :: Bool @@ -218,8 +219,11 @@ extractGitConfig configsource r = GitConfig , annexRetryDelay = Seconds <$> getmayberead (annexConfig "retrydelay") , annexStallDetection = - either (const Nothing) id . parseStallDetection + either (const Nothing) Just . parseStallDetection =<< getmaybe (annexConfig "stalldetection") + , annexBwLimit = + either (const Nothing) Just . parseBwRate + =<< getmaybe (annexConfig "bwlimit") , annexAllowedUrlSchemes = S.fromList $ map mkScheme $ maybe ["http", "https", "ftp"] words $ getmaybe (annexConfig "security.allowed-url-schemes") @@ -343,6 +347,7 @@ data RemoteGitConfig = RemoteGitConfig , remoteAnnexForwardRetry :: Maybe Integer , remoteAnnexRetryDelay :: Maybe Seconds , remoteAnnexStallDetection :: Maybe StallDetection + , remoteAnnexBwLimit :: Maybe BwRate , remoteAnnexAllowUnverifiedDownloads :: Bool , remoteAnnexConfigUUID :: Maybe UUID @@ -408,8 +413,11 @@ extractRemoteGitConfig r remotename = do , remoteAnnexRetryDelay = Seconds <$> getmayberead "retrydelay" , remoteAnnexStallDetection = - either (const Nothing) id . parseStallDetection + either (const Nothing) Just . parseStallDetection =<< getmaybe "stalldetection" + , remoteAnnexBwLimit = + either (const Nothing) Just . parseBwRate + =<< getmaybe "bwlimit" , remoteAnnexAllowUnverifiedDownloads = (== Just "ACKTHPPT") $ getmaybe ("security-allow-unverified-downloads") , remoteAnnexConfigUUID = toUUID <$> getmaybe "config-uuid" diff --git a/Types/StallDetection.hs b/Types/StallDetection.hs index 1cfc098a5d..13d88699f2 100644 --- a/Types/StallDetection.hs +++ b/Types/StallDetection.hs @@ -1,4 +1,4 @@ -{- types for stall detection +{- types for stall detection and banwdith rates - - Copyright 2020-2021 Joey Hess - @@ -13,7 +13,7 @@ import Utility.Misc import Git.Config data StallDetection - = StallDetection ByteSize Duration + = StallDetection BwRate -- ^ Unless the given number of bytes have been sent over the given -- amount of time, there's a stall. | ProbeStallDetection @@ -22,21 +22,29 @@ data StallDetection | StallDetectionDisabled deriving (Show) +data BwRate = BwRate ByteSize Duration + deriving (Show) + -- Parse eg, "0KiB/60s" -- -- Also, it can be set to "true" (or other git config equivilants) -- to enable ProbeStallDetection. -- And "false" (and other git config equivilants) explicitly -- disable stall detection. -parseStallDetection :: String -> Either String (Maybe StallDetection) +parseStallDetection :: String -> Either String StallDetection parseStallDetection s = case isTrueFalse s of Nothing -> do - let (bs, ds) = separate (== '/') s - b <- maybe - (Left $ "Unable to parse stall detection amount " ++ bs) - Right - (readSize dataUnits bs) - d <- parseDuration ds - return (Just (StallDetection b d)) - Just True -> Right (Just ProbeStallDetection) - Just False -> Right (Just StallDetectionDisabled) + v <- parseBwRate s + Right (StallDetection v) + Just True -> Right ProbeStallDetection + Just False -> Right StallDetectionDisabled + +parseBwRate :: String -> Either String BwRate +parseBwRate s = do + let (bs, ds) = separate (== '/') s + b <- maybe + (Left $ "Unable to parse bandwidth amount " ++ bs) + Right + (readSize dataUnits bs) + d <- parseDuration ds + Right (BwRate b d) diff --git a/Utility/Metered.hs b/Utility/Metered.hs index a7c9c37d7f..ea391edb5c 100644 --- a/Utility/Metered.hs +++ b/Utility/Metered.hs @@ -37,6 +37,7 @@ module Utility.Metered ( demeterCommandEnv, avoidProgress, rateLimitMeterUpdate, + bwLimitMeterUpdate, Meter, mkMeter, setMeterTotalSize, @@ -51,6 +52,7 @@ import Utility.Percentage import Utility.DataUnits import Utility.HumanTime import Utility.SimpleProtocol as Proto +import Utility.ThreadScheduler import qualified Data.ByteString.Lazy as L import qualified Data.ByteString as S @@ -380,6 +382,47 @@ rateLimitMeterUpdate delta (Meter totalsizev _ _ _) meterupdate = do meterupdate n else putMVar lastupdate prev +-- | Bandwidth limiting by inserting a delay at the point that a meter is +-- updated. +-- +-- This will only work when the actions that use bandwidth are run in the +-- same process and thread as the call to the MeterUpdate. +-- +-- For example, if the desired bandwidth is 100kb/s, and over the past +-- second, 200kb was sent, then pausing for half a second, and then +-- running for half a second should result in the desired bandwidth. +-- But, if after that pause, only 75kb is sent over the next half a +-- second, then the next pause should be 2/3rds of a second. +bwLimitMeterUpdate :: ByteSize -> Duration -> MeterUpdate -> IO MeterUpdate +bwLimitMeterUpdate sz duration meterupdate = do + nowtime <- getPOSIXTime + lastpause <- newMVar (nowtime, toEnum 0 :: POSIXTime, 0) + return $ mu lastpause + where + mu lastpause n@(BytesProcessed i) = do + nowtime <- getPOSIXTime + meterupdate n + lastv@(prevtime, prevpauselength, previ) <- takeMVar lastpause + let timedelta = nowtime - prevtime + if timedelta >= durationsecs + then do + let sz' = i - previ + let runtime = timedelta - prevpauselength + let pauselength = calcpauselength sz' runtime + if pauselength > 0 + then do + unboundDelay (floor (pauselength * fromIntegral oneSecond)) + putMVar lastpause (nowtime, pauselength, i) + else putMVar lastpause lastv + else putMVar lastpause lastv + + calcpauselength sz' runtime + | sz' > sz && sz' > 0 && runtime > 0 = + durationsecs - (fromIntegral sz / fromIntegral sz') * runtime + | otherwise = 0 + + durationsecs = fromIntegral (durationSeconds duration) + data Meter = Meter (MVar (Maybe TotalSize)) (MVar MeterState) (MVar String) DisplayMeter data MeterState = MeterState diff --git a/doc/git-annex.mdwn b/doc/git-annex.mdwn index 148d321bfd..f781c09b09 100644 --- a/doc/git-annex.mdwn +++ b/doc/git-annex.mdwn @@ -1384,6 +1384,27 @@ Remotes are configured using these settings in `.git/config`. When making multiple retries of the same transfer, the delay doubles after each retry. (default 1) +* `remote..annex-bwlimit`, `annex.bwlimit` + + This can be used to limit how much bandwidth is used for a transfer + from or to a remote. + + For example, to limit transfers to 1 gigabyte per second: + `git config annex.bwlimit "1GB/1s"` + + This will work with many remotes, including git remotes, but not + for remotes where the transfer is run by a separate program than + git-annex. + + The bandwidth limiting is implemented by pausing when + the transfer is running too fast, so it may use more bandwidth + than configured before being slowed down, either at the beginning + or if the available bandwidth changes while it is running. + + It is different to use "1GB/1s" than "10GB/10s". git-annex will + track how much data was transferred over the time period, and then + pausing. So usually 1s is the best time period to use. + * `remote..annex-stalldetecton`, `annex.stalldetection` Configuring this lets stalled or too-slow transfers be detected, and