dd39e9e255
When annex.stalldetection is not enabled, and a likely stall is detected, display a suggestion to enable it. Note that the progress meter display is not taken down when displaying the message, so it will display like this: 0% 8 B 0 B/s Transfer seems to have stalled. To handle stalling transfers, configure annex.stalldetection 0% 10 B 0 B/s Although of course if it's really stalled, it will never update again after the message. Taking down the progress meter and starting a new one doesn't seem too necessary given how unusual this is, also this does help show the state it was at when it stalled. Use of uninterruptibleCancel here is ok, the thread it's canceling only does STM transactions and sleeps. The annex thread that gets forked off is separate to avoid it being canceled, so that it can be joined back at the end. A module cycle required moving from dupState the precaching of the remote list. Doing it at startConcurrency should cover all the cases where the remote list is used in concurrent actions. This commit was sponsored by Kevin Mueller on Patreon.
97 lines
2.9 KiB
Haskell
97 lines
2.9 KiB
Haskell
{- git-annex concurrent state
|
|
-
|
|
- Copyright 2015-2020 Joey Hess <id@joeyh.name>
|
|
-
|
|
- Licensed under the GNU AGPL version 3 or higher.
|
|
-}
|
|
|
|
module Annex.Concurrent (
|
|
module Annex.Concurrent,
|
|
module Annex.Concurrent.Utility
|
|
) where
|
|
|
|
import Annex
|
|
import Annex.Common
|
|
import Annex.Concurrent.Utility
|
|
import qualified Annex.Queue
|
|
import Annex.Action
|
|
import Types.Concurrency
|
|
import Types.CatFileHandles
|
|
import Annex.CheckAttr
|
|
import Annex.CheckIgnore
|
|
|
|
import qualified Data.Map as M
|
|
|
|
setConcurrency :: ConcurrencySetting -> Annex ()
|
|
setConcurrency (ConcurrencyCmdLine s) = setConcurrency' s ConcurrencyCmdLine
|
|
setConcurrency (ConcurrencyGitConfig s) = setConcurrency' s ConcurrencyGitConfig
|
|
|
|
setConcurrency' :: Concurrency -> (Concurrency -> ConcurrencySetting) -> Annex ()
|
|
setConcurrency' NonConcurrent f =
|
|
Annex.changeState $ \s -> s
|
|
{ Annex.concurrency = f NonConcurrent
|
|
}
|
|
setConcurrency' c f = do
|
|
cfh <- getState Annex.catfilehandles
|
|
cfh' <- case cfh of
|
|
CatFileHandlesNonConcurrent _ -> liftIO catFileHandlesPool
|
|
CatFileHandlesPool _ -> pure cfh
|
|
cah <- mkConcurrentCheckAttrHandle c
|
|
cih <- mkConcurrentCheckIgnoreHandle c
|
|
Annex.changeState $ \s -> s
|
|
{ Annex.concurrency = f c
|
|
, Annex.catfilehandles = cfh'
|
|
, Annex.checkattrhandle = Just cah
|
|
, Annex.checkignorehandle = Just cih
|
|
}
|
|
|
|
{- Allows forking off a thread that uses a copy of the current AnnexState
|
|
- to run an Annex action.
|
|
-
|
|
- The returned IO action can be used to start the thread.
|
|
- It returns an Annex action that must be run in the original
|
|
- calling context to merge the forked AnnexState back into the
|
|
- current AnnexState.
|
|
-}
|
|
forkState :: Annex a -> Annex (IO (Annex a))
|
|
forkState a = do
|
|
st <- dupState
|
|
return $ do
|
|
(ret, newst) <- run st a
|
|
return $ do
|
|
mergeState newst
|
|
return ret
|
|
|
|
{- Returns a copy of the current AnnexState that is safe to be
|
|
- used when forking off a thread.
|
|
-
|
|
- After an Annex action is run using this AnnexState, it
|
|
- should be merged back into the current Annex's state,
|
|
- by calling mergeState.
|
|
-}
|
|
dupState :: Annex AnnexState
|
|
dupState = do
|
|
st <- Annex.getState id
|
|
-- Make sure that concurrency is enabled, if it was not already,
|
|
-- so the concurrency-safe resource pools are set up.
|
|
st' <- case getConcurrency' (Annex.concurrency st) of
|
|
NonConcurrent -> do
|
|
setConcurrency (ConcurrencyCmdLine (Concurrent 1))
|
|
Annex.getState id
|
|
_ -> return st
|
|
return $ st'
|
|
-- each thread has its own repoqueue
|
|
{ Annex.repoqueue = Nothing
|
|
-- no errors from this thread yet
|
|
, Annex.errcounter = 0
|
|
}
|
|
|
|
{- Merges the passed AnnexState into the current Annex state.
|
|
- Also closes various handles in it. -}
|
|
mergeState :: AnnexState -> Annex ()
|
|
mergeState st = do
|
|
st' <- liftIO $ snd <$> run st stopNonConcurrentSafeCoProcesses
|
|
forM_ (M.toList $ Annex.cleanupactions st') $
|
|
uncurry addCleanupAction
|
|
Annex.Queue.mergeFrom st'
|
|
changeState $ \s -> s { errcounter = errcounter s + errcounter st' }
|