automatically adjust stall detection period

Improve annex.stalldetection to handle remotes that update progress less
frequently than the configured time period.

In particular, this makes remotes that don't report progress but are
chunked work when transferring a single chunk takes longer than the
specified time period.

Any remotes that just have very low update granulatity would also be
handled by this.

The change to Remote.Helper.Chunked avoids an extra progress update when
resuming an interrupted upload. In that case, the code saw first Nothing
and then Just the already transferred number of bytes, which defeated this
new heuristic. This change will mean that, when resuming an interrupted
upload to a chunked remote that does not do its own progress reporting, the
progress display does not start out displaying the amount sent so far,
until after the first chunk is sent. This behavior change does not seem
like a major problem.

About the scalefudgefactor, it seems reasonable to expect subsequent chunks
to take no more than 1.5 times as long as the first chunk to transfer.
Could set it to 1, but then any chunk taking a little longer would be
treated as a stall. 2 also seems a likely value. Even 10 might be fine?

Sponsored-by: Dartmouth College's DANDI project
This commit is contained in:
Joey Hess 2024-01-18 17:11:56 -04:00
parent 8f655f7953
commit c2634e7df2
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
7 changed files with 153 additions and 19 deletions

View file

@ -1,6 +1,6 @@
{- Stall detection for transfers.
-
- Copyright 2020-2021 Joey Hess <id@joeyh.name>
- Copyright 2020-2024 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
@ -16,41 +16,74 @@ import Utility.ThreadScheduler
import Control.Concurrent.STM
import Control.Monad.IO.Class (MonadIO)
import Data.Time.Clock
{- This may be safely canceled (with eg uninterruptibleCancel),
- as long as the passed action can be safely canceled. -}
detectStalls :: (Monad m, MonadIO m) => Maybe StallDetection -> TVar (Maybe BytesProcessed) -> m () -> m ()
detectStalls Nothing _ _ = noop
detectStalls (Just StallDetectionDisabled) _ _ = noop
detectStalls (Just (StallDetection (BwRate minsz duration))) metervar onstall =
detectStalls' minsz duration metervar onstall Nothing
detectStalls (Just (StallDetection (BwRate minsz duration))) metervar onstall = do
-- If the progress is being updated, but less frequently than
-- the specified duration, a stall would be incorrectly detected.
--
-- For example, consider the case of a remote that does
-- not support progress updates, but is chunked with a large chunk
-- size. In that case, progress is only updated after each chunk.
--
-- So, wait for the first update, and see how long it takes.
-- It's longer than the duration, upscale the duration and minsz
-- accordingly.
starttime <- liftIO getCurrentTime
v <- waitforfirstupdate =<< readMeterVar metervar
endtime <- liftIO getCurrentTime
let timepassed = floor (endtime `diffUTCTime` starttime)
let (scaledminsz, scaledduration) = upscale timepassed
detectStalls' scaledminsz scaledduration metervar onstall v
where
upscale timepassed
| timepassed > dsecs =
let scale = scaleamount timepassed
in (minsz * scale, Duration (dsecs * scale))
| otherwise = (minsz, duration)
scaleamount timepassed = max 1 $ floor $
(fromIntegral timepassed / fromIntegral (max dsecs 1))
* scalefudgefactor
scalefudgefactor = 1.5 :: Double
dsecs = durationSeconds duration
minwaitsecs = Seconds $
min 60 (fromIntegral (durationSeconds duration))
waitforfirstupdate startval = do
liftIO $ threadDelaySeconds minwaitsecs
v <- readMeterVar metervar
if v > startval
then return v
else waitforfirstupdate startval
detectStalls (Just ProbeStallDetection) metervar onstall = do
-- Only do stall detection once the progress is confirmed to be
-- consistently updating. After the first update, it needs to
-- advance twice within 30 seconds. With that established,
-- if no data at all is sent for a 60 second period, it's
-- assumed to be a stall.
v <- getval >>= waitforfirstupdate
v <- readMeterVar metervar >>= waitforfirstupdate
ontimelyadvance v $ \v' -> ontimelyadvance v' $
detectStalls' 1 duration metervar onstall
where
getval = liftIO $ atomically $ fmap fromBytesProcessed
<$> readTVar metervar
duration = Duration 60
delay = Seconds (fromIntegral (durationSeconds duration) `div` 2)
waitforfirstupdate startval = do
liftIO $ threadDelaySeconds delay
v <- getval
v <- readMeterVar metervar
if v > startval
then return v
else waitforfirstupdate startval
ontimelyadvance v cont = do
liftIO $ threadDelaySeconds delay
v' <- getval
v' <- readMeterVar metervar
when (v' > v) $
cont v'
@ -65,8 +98,7 @@ detectStalls'
detectStalls' minsz duration metervar onstall st = do
liftIO $ threadDelaySeconds delay
-- Get whatever progress value was reported most recently, if any.
v <- liftIO $ atomically $ fmap fromBytesProcessed
<$> readTVar metervar
v <- readMeterVar metervar
let cont = detectStalls' minsz duration metervar onstall v
case (st, v) of
(Nothing, _) -> cont
@ -81,3 +113,10 @@ detectStalls' minsz duration metervar onstall st = do
| otherwise -> cont
where
delay = Seconds (fromIntegral (durationSeconds duration))
readMeterVar
:: MonadIO m
=> TVar (Maybe BytesProcessed)
-> m (Maybe ByteSize)
readMeterVar metervar = liftIO $ atomically $
fmap fromBytesProcessed <$> readTVar metervar