bwlimit
Added annex.bwlimit and remote.name.annex-bwlimit config that works for git remotes and many but not all special remotes. This nearly works, at least for a git remote on the same disk. With it set to 100kb/1s, the meter displays an actual bandwidth of 128 kb/s, with occasional spikes to 160 kb/s. So it needs to delay just a bit longer... I'm unsure why. However, at the beginning a lot of data flows before it determines the right bandwidth limit. A granularity of less than 1s would probably improve that. And, I don't know yet if it makes sense to have it be 100ks/1s rather than 100kb/s. Is there a situation where the user would want a larger granularity? Does granulatity need to be configurable at all? I only used that format for the config really in order to reuse an existing parser. This can't support for external special remotes, or for ones that themselves shell out to an external command. (Well, it could, but it would involve pausing and resuming the child process tree, which seems very hard to implement and very strange besides.) There could also be some built-in special remotes that it still doesn't work for, due to them not having a progress meter whose displays blocks the bandwidth using thread. But I don't think there are actually any that run a separate thread for downloads than the thread that displays the progress meter. Sponsored-by: Graham Spencer on Patreon
This commit is contained in:
parent
c9dd63d67d
commit
18e00500ce
16 changed files with 153 additions and 46 deletions
|
@ -461,8 +461,9 @@ importKeys remote importtreeconfig importcontent thirdpartypopulated importablec
|
||||||
, providedMimeEncoding = Nothing
|
, providedMimeEncoding = Nothing
|
||||||
, providedLinkType = Nothing
|
, providedLinkType = Nothing
|
||||||
}
|
}
|
||||||
|
bwlimit <- bwLimit (Remote.gitconfig remote)
|
||||||
islargefile <- checkMatcher' matcher mi mempty
|
islargefile <- checkMatcher' matcher mi mempty
|
||||||
metered Nothing sz $ const $ if islargefile
|
metered Nothing sz bwlimit $ const $ if islargefile
|
||||||
then doimportlarge importkey cidmap db loc cid sz f
|
then doimportlarge importkey cidmap db loc cid sz f
|
||||||
else doimportsmall cidmap db loc cid sz
|
else doimportsmall cidmap db loc cid sz
|
||||||
|
|
||||||
|
@ -557,11 +558,12 @@ importKeys remote importtreeconfig importcontent thirdpartypopulated importablec
|
||||||
Left e -> do
|
Left e -> do
|
||||||
warning (show e)
|
warning (show e)
|
||||||
return Nothing
|
return Nothing
|
||||||
|
bwlimit <- bwLimit (Remote.gitconfig remote)
|
||||||
checkDiskSpaceToGet tmpkey Nothing $
|
checkDiskSpaceToGet tmpkey Nothing $
|
||||||
notifyTransfer Download af $
|
notifyTransfer Download af $
|
||||||
download' (Remote.uuid remote) tmpkey af Nothing stdRetry $ \p ->
|
download' (Remote.uuid remote) tmpkey af Nothing stdRetry $ \p ->
|
||||||
withTmp tmpkey $ \tmpfile ->
|
withTmp tmpkey $ \tmpfile ->
|
||||||
metered (Just p) tmpkey $
|
metered (Just p) tmpkey bwlimit $
|
||||||
const (rundownload tmpfile)
|
const (rundownload tmpfile)
|
||||||
where
|
where
|
||||||
tmpkey = importKey cid sz
|
tmpkey = importKey cid sz
|
||||||
|
|
|
@ -22,7 +22,7 @@ import Control.Monad.IO.Class (MonadIO)
|
||||||
detectStalls :: (Monad m, MonadIO m) => Maybe StallDetection -> TVar (Maybe BytesProcessed) -> m () -> m ()
|
detectStalls :: (Monad m, MonadIO m) => Maybe StallDetection -> TVar (Maybe BytesProcessed) -> m () -> m ()
|
||||||
detectStalls Nothing _ _ = noop
|
detectStalls Nothing _ _ = noop
|
||||||
detectStalls (Just StallDetectionDisabled) _ _ = noop
|
detectStalls (Just StallDetectionDisabled) _ _ = noop
|
||||||
detectStalls (Just (StallDetection minsz duration)) metervar onstall =
|
detectStalls (Just (StallDetection (BwRate minsz duration))) metervar onstall =
|
||||||
detectStalls' minsz duration metervar onstall Nothing
|
detectStalls' minsz duration metervar onstall Nothing
|
||||||
detectStalls (Just ProbeStallDetection) metervar onstall = do
|
detectStalls (Just ProbeStallDetection) metervar onstall = do
|
||||||
-- Only do stall detection once the progress is confirmed to be
|
-- Only do stall detection once the progress is confirmed to be
|
||||||
|
|
|
@ -20,6 +20,7 @@ module Annex.Transfer (
|
||||||
stdRetry,
|
stdRetry,
|
||||||
pickRemote,
|
pickRemote,
|
||||||
stallDetection,
|
stallDetection,
|
||||||
|
bwLimit,
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Annex.Common
|
import Annex.Common
|
||||||
|
@ -406,3 +407,9 @@ stallDetection r = maybe globalcfg (pure . Just) remotecfg
|
||||||
where
|
where
|
||||||
globalcfg = annexStallDetection <$> Annex.getGitConfig
|
globalcfg = annexStallDetection <$> Annex.getGitConfig
|
||||||
remotecfg = remoteAnnexStallDetection $ Remote.gitconfig r
|
remotecfg = remoteAnnexStallDetection $ Remote.gitconfig r
|
||||||
|
|
||||||
|
bwLimit :: RemoteGitConfig -> Annex (Maybe BwRate)
|
||||||
|
bwLimit gc = maybe globalcfg (pure . Just) remotecfg
|
||||||
|
where
|
||||||
|
globalcfg = annexBwLimit <$> Annex.getGitConfig
|
||||||
|
remotecfg = remoteAnnexBwLimit gc
|
||||||
|
|
|
@ -96,7 +96,7 @@ youtubeDl' url workdir p uo
|
||||||
-- with the size, which is why it's important the
|
-- with the size, which is why it's important the
|
||||||
-- meter is passed into commandMeter'
|
-- meter is passed into commandMeter'
|
||||||
let unknownsize = Nothing :: Maybe FileSize
|
let unknownsize = Nothing :: Maybe FileSize
|
||||||
ok <- metered (Just p) unknownsize $ \meter meterupdate ->
|
ok <- metered (Just p) unknownsize Nothing $ \meter meterupdate ->
|
||||||
liftIO $ commandMeter'
|
liftIO $ commandMeter'
|
||||||
parseYoutubeDlProgress oh (Just meter) meterupdate cmd opts
|
parseYoutubeDlProgress oh (Just meter) meterupdate cmd opts
|
||||||
(\pr -> pr { cwd = Just workdir })
|
(\pr -> pr { cwd = Just workdir })
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
git-annex (8.20210904) UNRELEASED; urgency=medium
|
git-annex (8.20210904) UNRELEASED; urgency=medium
|
||||||
|
|
||||||
|
* Added annex.bwlimit and remote.name.annex-bwlimit config that works
|
||||||
|
for git remotes and many but not all special remotes.
|
||||||
* borg: Avoid trying to extract xattrs, ACLS, and bsdflags when
|
* borg: Avoid trying to extract xattrs, ACLS, and bsdflags when
|
||||||
retrieving from a borg repository.
|
retrieving from a borg repository.
|
||||||
|
|
||||||
|
|
|
@ -192,7 +192,7 @@ perform o file addunlockedmatcher = withOtherTmp $ \tmpdir -> do
|
||||||
}
|
}
|
||||||
ld <- lockDown cfg (fromRawFilePath file)
|
ld <- lockDown cfg (fromRawFilePath file)
|
||||||
let sizer = keySource <$> ld
|
let sizer = keySource <$> ld
|
||||||
v <- metered Nothing sizer $ \_meter meterupdate ->
|
v <- metered Nothing sizer Nothing $ \_meter meterupdate ->
|
||||||
ingestAdd (checkGitIgnoreOption o) meterupdate ld
|
ingestAdd (checkGitIgnoreOption o) meterupdate ld
|
||||||
finish v
|
finish v
|
||||||
where
|
where
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{- git-annex progress output
|
{- git-annex progress output
|
||||||
-
|
-
|
||||||
- Copyright 2010-2020 Joey Hess <id@joeyh.name>
|
- Copyright 2010-2021 Joey Hess <id@joeyh.name>
|
||||||
-
|
-
|
||||||
- Licensed under the GNU AGPL version 3 or higher.
|
- Licensed under the GNU AGPL version 3 or higher.
|
||||||
-}
|
-}
|
||||||
|
@ -18,6 +18,7 @@ import Types
|
||||||
import Types.Messages
|
import Types.Messages
|
||||||
import Types.Key
|
import Types.Key
|
||||||
import Types.KeySource
|
import Types.KeySource
|
||||||
|
import Types.StallDetection (BwRate(..))
|
||||||
import Utility.InodeCache
|
import Utility.InodeCache
|
||||||
import qualified Messages.JSON as JSON
|
import qualified Messages.JSON as JSON
|
||||||
import Messages.Concurrent
|
import Messages.Concurrent
|
||||||
|
@ -72,11 +73,12 @@ metered
|
||||||
:: MeterSize sizer
|
:: MeterSize sizer
|
||||||
=> Maybe MeterUpdate
|
=> Maybe MeterUpdate
|
||||||
-> sizer
|
-> sizer
|
||||||
|
-> Maybe BwRate
|
||||||
-> (Meter -> MeterUpdate -> Annex a)
|
-> (Meter -> MeterUpdate -> Annex a)
|
||||||
-> Annex a
|
-> Annex a
|
||||||
metered othermeter sizer a = withMessageState $ \st -> do
|
metered othermeterupdate sizer bwlimit a = withMessageState $ \st -> do
|
||||||
sz <- getMeterSize sizer
|
sz <- getMeterSize sizer
|
||||||
metered' st setclear othermeter sz showOutput a
|
metered' st setclear othermeterupdate sz bwlimit showOutput a
|
||||||
where
|
where
|
||||||
setclear c = Annex.changeState $ \st -> st
|
setclear c = Annex.changeState $ \st -> st
|
||||||
{ Annex.output = (Annex.output st) { clearProgressMeter = c } }
|
{ Annex.output = (Annex.output st) { clearProgressMeter = c } }
|
||||||
|
@ -90,11 +92,12 @@ metered'
|
||||||
-- NormalOutput.
|
-- NormalOutput.
|
||||||
-> Maybe MeterUpdate
|
-> Maybe MeterUpdate
|
||||||
-> Maybe TotalSize
|
-> Maybe TotalSize
|
||||||
|
-> Maybe BwRate
|
||||||
-> m ()
|
-> m ()
|
||||||
-- ^ this should run showOutput
|
-- ^ this should run showOutput
|
||||||
-> (Meter -> MeterUpdate -> m a)
|
-> (Meter -> MeterUpdate -> m a)
|
||||||
-> m a
|
-> m a
|
||||||
metered' st setclear othermeter msize showoutput a = go st
|
metered' st setclear othermeterupdate msize bwlimit showoutput a = go st
|
||||||
where
|
where
|
||||||
go (MessageState { outputType = QuietOutput }) = nometer
|
go (MessageState { outputType = QuietOutput }) = nometer
|
||||||
go (MessageState { outputType = NormalOutput, concurrentOutputEnabled = False }) = do
|
go (MessageState { outputType = NormalOutput, concurrentOutputEnabled = False }) = do
|
||||||
|
@ -105,7 +108,7 @@ metered' st setclear othermeter msize showoutput a = go st
|
||||||
setclear clear
|
setclear clear
|
||||||
m <- liftIO $ rateLimitMeterUpdate consoleratelimit meter $
|
m <- liftIO $ rateLimitMeterUpdate consoleratelimit meter $
|
||||||
updateMeter meter
|
updateMeter meter
|
||||||
r <- a meter (combinemeter m)
|
r <- a meter =<< mkmeterupdate m
|
||||||
setclear noop
|
setclear noop
|
||||||
liftIO clear
|
liftIO clear
|
||||||
return r
|
return r
|
||||||
|
@ -116,7 +119,7 @@ metered' st setclear othermeter msize showoutput a = go st
|
||||||
in Regions.setConsoleRegion r ('\n' : s)
|
in Regions.setConsoleRegion r ('\n' : s)
|
||||||
m <- liftIO $ rateLimitMeterUpdate consoleratelimit meter $
|
m <- liftIO $ rateLimitMeterUpdate consoleratelimit meter $
|
||||||
updateMeter meter
|
updateMeter meter
|
||||||
a meter (combinemeter m)
|
a meter =<< mkmeterupdate m
|
||||||
go (MessageState { outputType = JSONOutput jsonoptions })
|
go (MessageState { outputType = JSONOutput jsonoptions })
|
||||||
| jsonProgress jsonoptions = do
|
| jsonProgress jsonoptions = do
|
||||||
let buf = jsonBuffer st
|
let buf = jsonBuffer st
|
||||||
|
@ -124,7 +127,7 @@ metered' st setclear othermeter msize showoutput a = go st
|
||||||
JSON.progress buf msize' (meterBytesProcessed new)
|
JSON.progress buf msize' (meterBytesProcessed new)
|
||||||
m <- liftIO $ rateLimitMeterUpdate jsonratelimit meter $
|
m <- liftIO $ rateLimitMeterUpdate jsonratelimit meter $
|
||||||
updateMeter meter
|
updateMeter meter
|
||||||
a meter (combinemeter m)
|
a meter =<< mkmeterupdate m
|
||||||
| otherwise = nometer
|
| otherwise = nometer
|
||||||
go (MessageState { outputType = SerializedOutput h _ }) = do
|
go (MessageState { outputType = SerializedOutput h _ }) = do
|
||||||
liftIO $ outputSerialized h BeginProgressMeter
|
liftIO $ outputSerialized h BeginProgressMeter
|
||||||
|
@ -144,16 +147,21 @@ metered' st setclear othermeter msize showoutput a = go st
|
||||||
meterBytesProcessed new
|
meterBytesProcessed new
|
||||||
m <- liftIO $ rateLimitMeterUpdate minratelimit meter $
|
m <- liftIO $ rateLimitMeterUpdate minratelimit meter $
|
||||||
updateMeter meter
|
updateMeter meter
|
||||||
a meter (combinemeter m)
|
(a meter =<< mkmeterupdate m)
|
||||||
`finally` (liftIO $ outputSerialized h EndProgressMeter)
|
`finally` (liftIO $ outputSerialized h EndProgressMeter)
|
||||||
nometer = do
|
nometer = do
|
||||||
dummymeter <- liftIO $ mkMeter Nothing $
|
dummymeter <- liftIO $ mkMeter Nothing $
|
||||||
\_ _ _ _ -> return ()
|
\_ _ _ _ -> return ()
|
||||||
a dummymeter (combinemeter (const noop))
|
a dummymeter =<< mkmeterupdate (const noop)
|
||||||
|
|
||||||
combinemeter m = case othermeter of
|
mkmeterupdate m =
|
||||||
Nothing -> m
|
let mu = case othermeterupdate of
|
||||||
Just om -> combineMeterUpdate m om
|
Nothing -> m
|
||||||
|
Just om -> combineMeterUpdate m om
|
||||||
|
in case bwlimit of
|
||||||
|
Nothing -> return mu
|
||||||
|
Just (BwRate sz duration) -> liftIO $
|
||||||
|
bwLimitMeterUpdate sz duration mu
|
||||||
|
|
||||||
consoleratelimit = 0.2
|
consoleratelimit = 0.2
|
||||||
|
|
||||||
|
@ -164,7 +172,7 @@ metered' st setclear othermeter msize showoutput a = go st
|
||||||
{- Poll file size to display meter. -}
|
{- Poll file size to display meter. -}
|
||||||
meteredFile :: FilePath -> Maybe MeterUpdate -> Key -> Annex a -> Annex a
|
meteredFile :: FilePath -> Maybe MeterUpdate -> Key -> Annex a -> Annex a
|
||||||
meteredFile file combinemeterupdate key a =
|
meteredFile file combinemeterupdate key a =
|
||||||
metered combinemeterupdate key $ \_ p ->
|
metered combinemeterupdate key Nothing $ \_ p ->
|
||||||
watchFileSize file p a
|
watchFileSize file p a
|
||||||
|
|
||||||
{- Progress dots. -}
|
{- Progress dots. -}
|
||||||
|
|
|
@ -68,7 +68,7 @@ relaySerializedOutput getso sendsor meterreport runannex = go Nothing
|
||||||
let setclear = const noop
|
let setclear = const noop
|
||||||
-- Display a progress meter while running, until
|
-- Display a progress meter while running, until
|
||||||
-- the meter ends or a final value is returned.
|
-- the meter ends or a final value is returned.
|
||||||
metered' ost setclear Nothing Nothing (runannex showOutput)
|
metered' ost setclear Nothing Nothing Nothing (runannex showOutput)
|
||||||
(\meter meterupdate -> loop (Just (meter, meterupdate)))
|
(\meter meterupdate -> loop (Just (meter, meterupdate)))
|
||||||
>>= \case
|
>>= \case
|
||||||
Right r -> return (Right r)
|
Right r -> return (Right r)
|
||||||
|
|
|
@ -550,6 +550,7 @@ copyFromRemote'' repo forcersync r st@(State connpool _ _ _ _) key file dest met
|
||||||
| not $ Git.repoIsUrl repo = guardUsable repo (giveup "cannot access remote") $ do
|
| not $ Git.repoIsUrl repo = guardUsable repo (giveup "cannot access remote") $ do
|
||||||
u <- getUUID
|
u <- getUUID
|
||||||
hardlink <- wantHardLink
|
hardlink <- wantHardLink
|
||||||
|
bwlimit <- bwLimit (gitconfig r)
|
||||||
-- run copy from perspective of remote
|
-- run copy from perspective of remote
|
||||||
onLocalFast st $ Annex.Content.prepSendAnnex' key >>= \case
|
onLocalFast st $ Annex.Content.prepSendAnnex' key >>= \case
|
||||||
Just (object, check) -> do
|
Just (object, check) -> do
|
||||||
|
@ -559,7 +560,7 @@ copyFromRemote'' repo forcersync r st@(State connpool _ _ _ _) key file dest met
|
||||||
copier <- mkFileCopier hardlink st
|
copier <- mkFileCopier hardlink st
|
||||||
(ok, v) <- runTransfer (Transfer Download u (fromKey id key))
|
(ok, v) <- runTransfer (Transfer Download u (fromKey id key))
|
||||||
file Nothing stdRetry $ \p ->
|
file Nothing stdRetry $ \p ->
|
||||||
metered (Just (combineMeterUpdate p meterupdate)) key $ \_ p' ->
|
metered (Just (combineMeterUpdate p meterupdate)) key bwlimit $ \_ p' ->
|
||||||
copier object dest key p' checksuccess vc
|
copier object dest key p' checksuccess vc
|
||||||
if ok
|
if ok
|
||||||
then return v
|
then return v
|
||||||
|
@ -572,6 +573,7 @@ copyFromRemote'' repo forcersync r st@(State connpool _ _ _ _) key file dest met
|
||||||
then return v
|
then return v
|
||||||
else giveup "failed to retrieve content from remote"
|
else giveup "failed to retrieve content from remote"
|
||||||
else P2PHelper.retrieve
|
else P2PHelper.retrieve
|
||||||
|
(gitconfig r)
|
||||||
(\p -> Ssh.runProto r connpool (return (False, UnVerified)) (fallback p))
|
(\p -> Ssh.runProto r connpool (return (False, UnVerified)) (fallback p))
|
||||||
key file dest meterupdate vc
|
key file dest meterupdate vc
|
||||||
| otherwise = giveup "copying from non-ssh, non-http remote not supported"
|
| otherwise = giveup "copying from non-ssh, non-http remote not supported"
|
||||||
|
@ -680,7 +682,7 @@ copyToRemote' repo r st@(State connpool duc _ _ _) key file meterupdate
|
||||||
, giveup "remote does not have expected annex.uuid value"
|
, giveup "remote does not have expected annex.uuid value"
|
||||||
)
|
)
|
||||||
| Git.repoIsSsh repo = commitOnCleanup repo r st $
|
| Git.repoIsSsh repo = commitOnCleanup repo r st $
|
||||||
P2PHelper.store
|
P2PHelper.store (gitconfig r)
|
||||||
(Ssh.runProto r connpool (return False) . copyremotefallback)
|
(Ssh.runProto r connpool (return False) . copyremotefallback)
|
||||||
key file meterupdate
|
key file meterupdate
|
||||||
|
|
||||||
|
@ -694,6 +696,7 @@ copyToRemote' repo r st@(State connpool duc _ _ _) key file meterupdate
|
||||||
checkio <- Annex.withCurrentState check
|
checkio <- Annex.withCurrentState check
|
||||||
u <- getUUID
|
u <- getUUID
|
||||||
hardlink <- wantHardLink
|
hardlink <- wantHardLink
|
||||||
|
bwlimit <- bwLimit (gitconfig r)
|
||||||
-- run copy from perspective of remote
|
-- run copy from perspective of remote
|
||||||
res <- onLocalFast st $ ifM (Annex.Content.inAnnex key)
|
res <- onLocalFast st $ ifM (Annex.Content.inAnnex key)
|
||||||
( return True
|
( return True
|
||||||
|
@ -705,7 +708,7 @@ copyToRemote' repo r st@(State connpool duc _ _ _) key file meterupdate
|
||||||
Just err -> giveup err
|
Just err -> giveup err
|
||||||
Nothing -> return True
|
Nothing -> return True
|
||||||
res <- logStatusAfter key $ Annex.Content.getViaTmp rsp verify key file $ \dest ->
|
res <- logStatusAfter key $ Annex.Content.getViaTmp rsp verify key file $ \dest ->
|
||||||
metered (Just (combineMeterUpdate meterupdate p)) key $ \_ p' ->
|
metered (Just (combineMeterUpdate meterupdate p)) key bwlimit $ \_ p' ->
|
||||||
copier object (fromRawFilePath dest) key p' checksuccess verify
|
copier object (fromRawFilePath dest) key p' checksuccess verify
|
||||||
Annex.Content.saveState True
|
Annex.Content.saveState True
|
||||||
return res
|
return res
|
||||||
|
|
|
@ -18,6 +18,7 @@ import Messages.Progress
|
||||||
import Utility.Metered
|
import Utility.Metered
|
||||||
import Types.NumCopies
|
import Types.NumCopies
|
||||||
import Annex.Verify
|
import Annex.Verify
|
||||||
|
import Annex.Transfer
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
|
|
||||||
|
@ -31,19 +32,21 @@ type ProtoConnRunner c = forall a. P2P.Proto a -> ClosableConnection c -> Annex
|
||||||
-- the pool when done.
|
-- the pool when done.
|
||||||
type WithConn a c = (ClosableConnection c -> Annex (ClosableConnection c, a)) -> Annex a
|
type WithConn a c = (ClosableConnection c -> Annex (ClosableConnection c, a)) -> Annex a
|
||||||
|
|
||||||
store :: (MeterUpdate -> ProtoRunner Bool) -> Key -> AssociatedFile -> MeterUpdate -> Annex ()
|
store :: RemoteGitConfig -> (MeterUpdate -> ProtoRunner Bool) -> Key -> AssociatedFile -> MeterUpdate -> Annex ()
|
||||||
store runner k af p = do
|
store gc runner k af p = do
|
||||||
let sizer = KeySizer k (fmap (toRawFilePath . fst) <$> prepSendAnnex k)
|
let sizer = KeySizer k (fmap (toRawFilePath . fst) <$> prepSendAnnex k)
|
||||||
metered (Just p) sizer $ \_ p' ->
|
bwlimit <- bwLimit gc
|
||||||
|
metered (Just p) sizer bwlimit $ \_ p' ->
|
||||||
runner p' (P2P.put k af p') >>= \case
|
runner p' (P2P.put k af p') >>= \case
|
||||||
Just True -> return ()
|
Just True -> return ()
|
||||||
Just False -> giveup "Transfer failed"
|
Just False -> giveup "Transfer failed"
|
||||||
Nothing -> remoteUnavail
|
Nothing -> remoteUnavail
|
||||||
|
|
||||||
retrieve :: (MeterUpdate -> ProtoRunner (Bool, Verification)) -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> VerifyConfig -> Annex Verification
|
retrieve :: RemoteGitConfig -> (MeterUpdate -> ProtoRunner (Bool, Verification)) -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> VerifyConfig -> Annex Verification
|
||||||
retrieve runner k af dest p verifyconfig = do
|
retrieve gc runner k af dest p verifyconfig = do
|
||||||
iv <- startVerifyKeyContentIncrementally verifyconfig k
|
iv <- startVerifyKeyContentIncrementally verifyconfig k
|
||||||
metered (Just p) k $ \m p' ->
|
bwlimit <- bwLimit gc
|
||||||
|
metered (Just p) k bwlimit $ \m p' ->
|
||||||
runner p' (P2P.get dest k iv af m p') >>= \case
|
runner p' (P2P.get dest k iv af m p') >>= \case
|
||||||
Just (True, v) -> return v
|
Just (True, v) -> return v
|
||||||
Just (False, _) -> giveup "Transfer failed"
|
Just (False, _) -> giveup "Transfer failed"
|
||||||
|
|
|
@ -42,6 +42,7 @@ import Types.StoreRetrieve
|
||||||
import Types.Remote
|
import Types.Remote
|
||||||
import Annex.Verify
|
import Annex.Verify
|
||||||
import Annex.UUID
|
import Annex.UUID
|
||||||
|
import Annex.Transfer
|
||||||
import Config
|
import Config
|
||||||
import Config.Cost
|
import Config.Cost
|
||||||
import Utility.Metered
|
import Utility.Metered
|
||||||
|
@ -261,8 +262,9 @@ specialRemote' cfg c storer retriever remover checkpresent baser = encr
|
||||||
chunkconfig = chunkConfig cfg
|
chunkconfig = chunkConfig cfg
|
||||||
|
|
||||||
displayprogress p k srcfile a
|
displayprogress p k srcfile a
|
||||||
| displayProgress cfg =
|
| displayProgress cfg = do
|
||||||
metered (Just p) (KeySizer k (pure (fmap toRawFilePath srcfile))) (const a)
|
bwlimit <- bwLimit (gitconfig baser)
|
||||||
|
metered (Just p) (KeySizer k (pure (fmap toRawFilePath srcfile))) bwlimit (const a)
|
||||||
| otherwise = a p
|
| otherwise = a p
|
||||||
|
|
||||||
withBytes :: ContentSource -> (L.ByteString -> Annex a) -> Annex a
|
withBytes :: ContentSource -> (L.ByteString -> Annex a) -> Annex a
|
||||||
|
|
|
@ -55,8 +55,8 @@ chainGen addr r u rc gc rs = do
|
||||||
{ uuid = u
|
{ uuid = u
|
||||||
, cost = cst
|
, cost = cst
|
||||||
, name = Git.repoDescribe r
|
, name = Git.repoDescribe r
|
||||||
, storeKey = store (const protorunner)
|
, storeKey = store gc (const protorunner)
|
||||||
, retrieveKeyFile = retrieve (const protorunner)
|
, retrieveKeyFile = retrieve gc (const protorunner)
|
||||||
, retrieveKeyFileCheap = Nothing
|
, retrieveKeyFileCheap = Nothing
|
||||||
, retrievalSecurityPolicy = RetrievalAllKeysSecure
|
, retrievalSecurityPolicy = RetrievalAllKeysSecure
|
||||||
, removeKey = remove protorunner
|
, removeKey = remove protorunner
|
||||||
|
|
|
@ -124,6 +124,7 @@ data GitConfig = GitConfig
|
||||||
, annexForwardRetry :: Maybe Integer
|
, annexForwardRetry :: Maybe Integer
|
||||||
, annexRetryDelay :: Maybe Seconds
|
, annexRetryDelay :: Maybe Seconds
|
||||||
, annexStallDetection :: Maybe StallDetection
|
, annexStallDetection :: Maybe StallDetection
|
||||||
|
, annexBwLimit :: Maybe BwRate
|
||||||
, annexAllowedUrlSchemes :: S.Set Scheme
|
, annexAllowedUrlSchemes :: S.Set Scheme
|
||||||
, annexAllowedIPAddresses :: String
|
, annexAllowedIPAddresses :: String
|
||||||
, annexAllowUnverifiedDownloads :: Bool
|
, annexAllowUnverifiedDownloads :: Bool
|
||||||
|
@ -218,8 +219,11 @@ extractGitConfig configsource r = GitConfig
|
||||||
, annexRetryDelay = Seconds
|
, annexRetryDelay = Seconds
|
||||||
<$> getmayberead (annexConfig "retrydelay")
|
<$> getmayberead (annexConfig "retrydelay")
|
||||||
, annexStallDetection =
|
, annexStallDetection =
|
||||||
either (const Nothing) id . parseStallDetection
|
either (const Nothing) Just . parseStallDetection
|
||||||
=<< getmaybe (annexConfig "stalldetection")
|
=<< getmaybe (annexConfig "stalldetection")
|
||||||
|
, annexBwLimit =
|
||||||
|
either (const Nothing) Just . parseBwRate
|
||||||
|
=<< getmaybe (annexConfig "bwlimit")
|
||||||
, annexAllowedUrlSchemes = S.fromList $ map mkScheme $
|
, annexAllowedUrlSchemes = S.fromList $ map mkScheme $
|
||||||
maybe ["http", "https", "ftp"] words $
|
maybe ["http", "https", "ftp"] words $
|
||||||
getmaybe (annexConfig "security.allowed-url-schemes")
|
getmaybe (annexConfig "security.allowed-url-schemes")
|
||||||
|
@ -343,6 +347,7 @@ data RemoteGitConfig = RemoteGitConfig
|
||||||
, remoteAnnexForwardRetry :: Maybe Integer
|
, remoteAnnexForwardRetry :: Maybe Integer
|
||||||
, remoteAnnexRetryDelay :: Maybe Seconds
|
, remoteAnnexRetryDelay :: Maybe Seconds
|
||||||
, remoteAnnexStallDetection :: Maybe StallDetection
|
, remoteAnnexStallDetection :: Maybe StallDetection
|
||||||
|
, remoteAnnexBwLimit :: Maybe BwRate
|
||||||
, remoteAnnexAllowUnverifiedDownloads :: Bool
|
, remoteAnnexAllowUnverifiedDownloads :: Bool
|
||||||
, remoteAnnexConfigUUID :: Maybe UUID
|
, remoteAnnexConfigUUID :: Maybe UUID
|
||||||
|
|
||||||
|
@ -408,8 +413,11 @@ extractRemoteGitConfig r remotename = do
|
||||||
, remoteAnnexRetryDelay = Seconds
|
, remoteAnnexRetryDelay = Seconds
|
||||||
<$> getmayberead "retrydelay"
|
<$> getmayberead "retrydelay"
|
||||||
, remoteAnnexStallDetection =
|
, remoteAnnexStallDetection =
|
||||||
either (const Nothing) id . parseStallDetection
|
either (const Nothing) Just . parseStallDetection
|
||||||
=<< getmaybe "stalldetection"
|
=<< getmaybe "stalldetection"
|
||||||
|
, remoteAnnexBwLimit =
|
||||||
|
either (const Nothing) Just . parseBwRate
|
||||||
|
=<< getmaybe "bwlimit"
|
||||||
, remoteAnnexAllowUnverifiedDownloads = (== Just "ACKTHPPT") $
|
, remoteAnnexAllowUnverifiedDownloads = (== Just "ACKTHPPT") $
|
||||||
getmaybe ("security-allow-unverified-downloads")
|
getmaybe ("security-allow-unverified-downloads")
|
||||||
, remoteAnnexConfigUUID = toUUID <$> getmaybe "config-uuid"
|
, remoteAnnexConfigUUID = toUUID <$> getmaybe "config-uuid"
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
{- types for stall detection
|
{- types for stall detection and banwdith rates
|
||||||
-
|
-
|
||||||
- Copyright 2020-2021 Joey Hess <id@joeyh.name>
|
- Copyright 2020-2021 Joey Hess <id@joeyh.name>
|
||||||
-
|
-
|
||||||
|
@ -13,7 +13,7 @@ import Utility.Misc
|
||||||
import Git.Config
|
import Git.Config
|
||||||
|
|
||||||
data StallDetection
|
data StallDetection
|
||||||
= StallDetection ByteSize Duration
|
= StallDetection BwRate
|
||||||
-- ^ Unless the given number of bytes have been sent over the given
|
-- ^ Unless the given number of bytes have been sent over the given
|
||||||
-- amount of time, there's a stall.
|
-- amount of time, there's a stall.
|
||||||
| ProbeStallDetection
|
| ProbeStallDetection
|
||||||
|
@ -22,21 +22,29 @@ data StallDetection
|
||||||
| StallDetectionDisabled
|
| StallDetectionDisabled
|
||||||
deriving (Show)
|
deriving (Show)
|
||||||
|
|
||||||
|
data BwRate = BwRate ByteSize Duration
|
||||||
|
deriving (Show)
|
||||||
|
|
||||||
-- Parse eg, "0KiB/60s"
|
-- Parse eg, "0KiB/60s"
|
||||||
--
|
--
|
||||||
-- Also, it can be set to "true" (or other git config equivilants)
|
-- Also, it can be set to "true" (or other git config equivilants)
|
||||||
-- to enable ProbeStallDetection.
|
-- to enable ProbeStallDetection.
|
||||||
-- And "false" (and other git config equivilants) explicitly
|
-- And "false" (and other git config equivilants) explicitly
|
||||||
-- disable stall detection.
|
-- disable stall detection.
|
||||||
parseStallDetection :: String -> Either String (Maybe StallDetection)
|
parseStallDetection :: String -> Either String StallDetection
|
||||||
parseStallDetection s = case isTrueFalse s of
|
parseStallDetection s = case isTrueFalse s of
|
||||||
Nothing -> do
|
Nothing -> do
|
||||||
let (bs, ds) = separate (== '/') s
|
v <- parseBwRate s
|
||||||
b <- maybe
|
Right (StallDetection v)
|
||||||
(Left $ "Unable to parse stall detection amount " ++ bs)
|
Just True -> Right ProbeStallDetection
|
||||||
Right
|
Just False -> Right StallDetectionDisabled
|
||||||
(readSize dataUnits bs)
|
|
||||||
d <- parseDuration ds
|
parseBwRate :: String -> Either String BwRate
|
||||||
return (Just (StallDetection b d))
|
parseBwRate s = do
|
||||||
Just True -> Right (Just ProbeStallDetection)
|
let (bs, ds) = separate (== '/') s
|
||||||
Just False -> Right (Just StallDetectionDisabled)
|
b <- maybe
|
||||||
|
(Left $ "Unable to parse bandwidth amount " ++ bs)
|
||||||
|
Right
|
||||||
|
(readSize dataUnits bs)
|
||||||
|
d <- parseDuration ds
|
||||||
|
Right (BwRate b d)
|
||||||
|
|
|
@ -37,6 +37,7 @@ module Utility.Metered (
|
||||||
demeterCommandEnv,
|
demeterCommandEnv,
|
||||||
avoidProgress,
|
avoidProgress,
|
||||||
rateLimitMeterUpdate,
|
rateLimitMeterUpdate,
|
||||||
|
bwLimitMeterUpdate,
|
||||||
Meter,
|
Meter,
|
||||||
mkMeter,
|
mkMeter,
|
||||||
setMeterTotalSize,
|
setMeterTotalSize,
|
||||||
|
@ -51,6 +52,7 @@ import Utility.Percentage
|
||||||
import Utility.DataUnits
|
import Utility.DataUnits
|
||||||
import Utility.HumanTime
|
import Utility.HumanTime
|
||||||
import Utility.SimpleProtocol as Proto
|
import Utility.SimpleProtocol as Proto
|
||||||
|
import Utility.ThreadScheduler
|
||||||
|
|
||||||
import qualified Data.ByteString.Lazy as L
|
import qualified Data.ByteString.Lazy as L
|
||||||
import qualified Data.ByteString as S
|
import qualified Data.ByteString as S
|
||||||
|
@ -380,6 +382,47 @@ rateLimitMeterUpdate delta (Meter totalsizev _ _ _) meterupdate = do
|
||||||
meterupdate n
|
meterupdate n
|
||||||
else putMVar lastupdate prev
|
else putMVar lastupdate prev
|
||||||
|
|
||||||
|
-- | Bandwidth limiting by inserting a delay at the point that a meter is
|
||||||
|
-- updated.
|
||||||
|
--
|
||||||
|
-- This will only work when the actions that use bandwidth are run in the
|
||||||
|
-- same process and thread as the call to the MeterUpdate.
|
||||||
|
--
|
||||||
|
-- For example, if the desired bandwidth is 100kb/s, and over the past
|
||||||
|
-- second, 200kb was sent, then pausing for half a second, and then
|
||||||
|
-- running for half a second should result in the desired bandwidth.
|
||||||
|
-- But, if after that pause, only 75kb is sent over the next half a
|
||||||
|
-- second, then the next pause should be 2/3rds of a second.
|
||||||
|
bwLimitMeterUpdate :: ByteSize -> Duration -> MeterUpdate -> IO MeterUpdate
|
||||||
|
bwLimitMeterUpdate sz duration meterupdate = do
|
||||||
|
nowtime <- getPOSIXTime
|
||||||
|
lastpause <- newMVar (nowtime, toEnum 0 :: POSIXTime, 0)
|
||||||
|
return $ mu lastpause
|
||||||
|
where
|
||||||
|
mu lastpause n@(BytesProcessed i) = do
|
||||||
|
nowtime <- getPOSIXTime
|
||||||
|
meterupdate n
|
||||||
|
lastv@(prevtime, prevpauselength, previ) <- takeMVar lastpause
|
||||||
|
let timedelta = nowtime - prevtime
|
||||||
|
if timedelta >= durationsecs
|
||||||
|
then do
|
||||||
|
let sz' = i - previ
|
||||||
|
let runtime = timedelta - prevpauselength
|
||||||
|
let pauselength = calcpauselength sz' runtime
|
||||||
|
if pauselength > 0
|
||||||
|
then do
|
||||||
|
unboundDelay (floor (pauselength * fromIntegral oneSecond))
|
||||||
|
putMVar lastpause (nowtime, pauselength, i)
|
||||||
|
else putMVar lastpause lastv
|
||||||
|
else putMVar lastpause lastv
|
||||||
|
|
||||||
|
calcpauselength sz' runtime
|
||||||
|
| sz' > sz && sz' > 0 && runtime > 0 =
|
||||||
|
durationsecs - (fromIntegral sz / fromIntegral sz') * runtime
|
||||||
|
| otherwise = 0
|
||||||
|
|
||||||
|
durationsecs = fromIntegral (durationSeconds duration)
|
||||||
|
|
||||||
data Meter = Meter (MVar (Maybe TotalSize)) (MVar MeterState) (MVar String) DisplayMeter
|
data Meter = Meter (MVar (Maybe TotalSize)) (MVar MeterState) (MVar String) DisplayMeter
|
||||||
|
|
||||||
data MeterState = MeterState
|
data MeterState = MeterState
|
||||||
|
|
|
@ -1384,6 +1384,27 @@ Remotes are configured using these settings in `.git/config`.
|
||||||
When making multiple retries of the same transfer, the delay
|
When making multiple retries of the same transfer, the delay
|
||||||
doubles after each retry. (default 1)
|
doubles after each retry. (default 1)
|
||||||
|
|
||||||
|
* `remote.<name>.annex-bwlimit`, `annex.bwlimit`
|
||||||
|
|
||||||
|
This can be used to limit how much bandwidth is used for a transfer
|
||||||
|
from or to a remote.
|
||||||
|
|
||||||
|
For example, to limit transfers to 1 gigabyte per second:
|
||||||
|
`git config annex.bwlimit "1GB/1s"`
|
||||||
|
|
||||||
|
This will work with many remotes, including git remotes, but not
|
||||||
|
for remotes where the transfer is run by a separate program than
|
||||||
|
git-annex.
|
||||||
|
|
||||||
|
The bandwidth limiting is implemented by pausing when
|
||||||
|
the transfer is running too fast, so it may use more bandwidth
|
||||||
|
than configured before being slowed down, either at the beginning
|
||||||
|
or if the available bandwidth changes while it is running.
|
||||||
|
|
||||||
|
It is different to use "1GB/1s" than "10GB/10s". git-annex will
|
||||||
|
track how much data was transferred over the time period, and then
|
||||||
|
pausing. So usually 1s is the best time period to use.
|
||||||
|
|
||||||
* `remote.<name>.annex-stalldetecton`, `annex.stalldetection`
|
* `remote.<name>.annex-stalldetecton`, `annex.stalldetection`
|
||||||
|
|
||||||
Configuring this lets stalled or too-slow transfers be detected, and
|
Configuring this lets stalled or too-slow transfers be detected, and
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue