get -J: Download different files from different remotes when the remotes have the same costs.
Only done in -J mode because only if there's concurrency can downloading from two remotes be faster. Without concurrency, it's likely the case that sequential downloads from the same remote are faster than switching back and forth between two remotes. There is some hairy MVar code here, but basically it just keeps the activeremotes MVar full except when deciding which remote to assign to a thread. Also affects gets by sync --content -J This commit was sponsored by Jochen Bartl.
This commit is contained in:
parent
eb469bd139
commit
31289da691
6 changed files with 135 additions and 59 deletions
100
Annex.hs
100
Annex.hs
|
@ -77,7 +77,7 @@ import qualified Data.Set as S
|
|||
- The MVar is not exposed outside this module.
|
||||
-
|
||||
- Note that when an Annex action fails and the exception is caught,
|
||||
- ny changes the action has made to the AnnexState are retained,
|
||||
- any changes the action has made to the AnnexState are retained,
|
||||
- due to the use of the MVar to store the state.
|
||||
-}
|
||||
newtype Annex a = Annex { runAnnex :: ReaderT (MVar AnnexState) IO a }
|
||||
|
@ -135,56 +135,60 @@ data AnnexState = AnnexState
|
|||
, desktopnotify :: DesktopNotify
|
||||
, workers :: [Either AnnexState (Async AnnexState)]
|
||||
, concurrentjobs :: Maybe Int
|
||||
, activeremotes :: MVar (S.Set (Types.Remote.RemoteA Annex))
|
||||
, keysdbhandle :: Maybe Keys.DbHandle
|
||||
, cachedcurrentbranch :: Maybe Git.Branch
|
||||
}
|
||||
|
||||
newState :: GitConfig -> Git.Repo -> AnnexState
|
||||
newState c r = AnnexState
|
||||
{ repo = r
|
||||
, repoadjustment = return
|
||||
, gitconfig = c
|
||||
, backends = []
|
||||
, remotes = []
|
||||
, remoteannexstate = M.empty
|
||||
, output = def
|
||||
, force = False
|
||||
, fast = False
|
||||
, daemon = False
|
||||
, branchstate = startBranchState
|
||||
, repoqueue = Nothing
|
||||
, catfilehandles = M.empty
|
||||
, hashobjecthandle = Nothing
|
||||
, checkattrhandle = Nothing
|
||||
, checkignorehandle = Nothing
|
||||
, forcebackend = Nothing
|
||||
, globalnumcopies = Nothing
|
||||
, forcenumcopies = Nothing
|
||||
, limit = BuildingMatcher []
|
||||
, uuidmap = Nothing
|
||||
, preferredcontentmap = Nothing
|
||||
, requiredcontentmap = Nothing
|
||||
, forcetrust = M.empty
|
||||
, trustmap = Nothing
|
||||
, groupmap = Nothing
|
||||
, ciphers = M.empty
|
||||
, lockcache = M.empty
|
||||
, flags = M.empty
|
||||
, fields = M.empty
|
||||
, cleanup = M.empty
|
||||
, sentinalstatus = Nothing
|
||||
, useragent = Nothing
|
||||
, errcounter = 0
|
||||
, unusedkeys = Nothing
|
||||
, tempurls = M.empty
|
||||
, quviversion = Nothing
|
||||
, existinghooks = M.empty
|
||||
, desktopnotify = mempty
|
||||
, workers = []
|
||||
, concurrentjobs = Nothing
|
||||
, keysdbhandle = Nothing
|
||||
, cachedcurrentbranch = Nothing
|
||||
}
|
||||
newState :: GitConfig -> Git.Repo -> IO AnnexState
|
||||
newState c r = do
|
||||
emptyactiveremotes <- newMVar S.empty
|
||||
return $ AnnexState
|
||||
{ repo = r
|
||||
, repoadjustment = return
|
||||
, gitconfig = c
|
||||
, backends = []
|
||||
, remotes = []
|
||||
, remoteannexstate = M.empty
|
||||
, output = def
|
||||
, force = False
|
||||
, fast = False
|
||||
, daemon = False
|
||||
, branchstate = startBranchState
|
||||
, repoqueue = Nothing
|
||||
, catfilehandles = M.empty
|
||||
, hashobjecthandle = Nothing
|
||||
, checkattrhandle = Nothing
|
||||
, checkignorehandle = Nothing
|
||||
, forcebackend = Nothing
|
||||
, globalnumcopies = Nothing
|
||||
, forcenumcopies = Nothing
|
||||
, limit = BuildingMatcher []
|
||||
, uuidmap = Nothing
|
||||
, preferredcontentmap = Nothing
|
||||
, requiredcontentmap = Nothing
|
||||
, forcetrust = M.empty
|
||||
, trustmap = Nothing
|
||||
, groupmap = Nothing
|
||||
, ciphers = M.empty
|
||||
, lockcache = M.empty
|
||||
, flags = M.empty
|
||||
, fields = M.empty
|
||||
, cleanup = M.empty
|
||||
, sentinalstatus = Nothing
|
||||
, useragent = Nothing
|
||||
, errcounter = 0
|
||||
, unusedkeys = Nothing
|
||||
, tempurls = M.empty
|
||||
, quviversion = Nothing
|
||||
, existinghooks = M.empty
|
||||
, desktopnotify = mempty
|
||||
, workers = []
|
||||
, concurrentjobs = Nothing
|
||||
, activeremotes = emptyactiveremotes
|
||||
, keysdbhandle = Nothing
|
||||
, cachedcurrentbranch = Nothing
|
||||
}
|
||||
|
||||
{- Makes an Annex state object for the specified git repo.
|
||||
- Ensures the config is read, if it was not already, and performs
|
||||
|
@ -193,7 +197,7 @@ new :: Git.Repo -> IO AnnexState
|
|||
new r = do
|
||||
r' <- Git.Config.read =<< Git.relPath r
|
||||
let c = extractGitConfig r'
|
||||
newState c <$> fixupRepo r' c
|
||||
newState c =<< fixupRepo r' c
|
||||
|
||||
{- Performs an action in the Annex monad from a starting state,
|
||||
- returning a new state. -}
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
{- git-annex transfers
|
||||
-
|
||||
- Copyright 2012-2014 Joey Hess <id@joeyh.name>
|
||||
- Copyright 2012-2016 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
{-# LANGUAGE CPP, FlexibleInstances #-}
|
||||
{-# LANGUAGE CPP, FlexibleInstances, BangPatterns #-}
|
||||
|
||||
module Annex.Transfer (
|
||||
module X,
|
||||
|
@ -15,9 +15,11 @@ module Annex.Transfer (
|
|||
alwaysRunTransfer,
|
||||
noRetry,
|
||||
forwardRetry,
|
||||
pickRemote,
|
||||
) where
|
||||
|
||||
import Annex.Common
|
||||
import qualified Annex
|
||||
import Logs.Transfer as X
|
||||
import Types.Transfer as X
|
||||
import Annex.Notification as X
|
||||
|
@ -25,8 +27,10 @@ import Annex.Perms
|
|||
import Utility.Metered
|
||||
import Annex.LockPool
|
||||
import Types.Remote (Verification(..))
|
||||
import qualified Types.Remote as Remote
|
||||
|
||||
import Control.Concurrent
|
||||
import qualified Data.Set as S
|
||||
|
||||
class Observable a where
|
||||
observeBool :: a -> Bool
|
||||
|
@ -166,3 +170,56 @@ noRetry _ _ = False
|
|||
- to send some data. -}
|
||||
forwardRetry :: RetryDecider
|
||||
forwardRetry old new = bytesComplete old < bytesComplete new
|
||||
|
||||
{- Picks a remote from the list and tries a transfer to it. If the transfer
|
||||
- does not succeed, goes on to try other remotes from the list.
|
||||
-
|
||||
- The list should already be ordered by remote cost, and is normally
|
||||
- tried in order. However, when concurrent jobs are running, they will
|
||||
- be assigned different remotes of the same cost when possible. This can
|
||||
- increase total transfer speed.
|
||||
-}
|
||||
pickRemote :: Observable v => [Remote] -> (Remote -> Annex v) -> Annex v
|
||||
pickRemote l a = go l =<< Annex.getState Annex.concurrentjobs
|
||||
where
|
||||
go [] _ = return observeFailure
|
||||
go (r:[]) _ = a r
|
||||
go rs (Just n) | n > 1 = do
|
||||
mv <- Annex.getState Annex.activeremotes
|
||||
active <- liftIO $ takeMVar mv
|
||||
let rs' = sortBy (inactiveFirst active) rs
|
||||
goconcurrent mv active rs'
|
||||
go (r:rs) _ = do
|
||||
ok <- a r
|
||||
if observeBool ok
|
||||
then return ok
|
||||
else go rs Nothing
|
||||
goconcurrent mv active [] = do
|
||||
liftIO $ putMVar mv active
|
||||
return observeFailure
|
||||
goconcurrent mv active (r:rs) = do
|
||||
let !active' = S.insert r active
|
||||
liftIO $ putMVar mv active'
|
||||
let getnewactive = do
|
||||
active'' <- liftIO $ takeMVar mv
|
||||
let !active''' = S.delete r active''
|
||||
return active'''
|
||||
let removeactive = liftIO . putMVar mv =<< getnewactive
|
||||
ok <- a r `onException` removeactive
|
||||
if observeBool ok
|
||||
then do
|
||||
removeactive
|
||||
return ok
|
||||
else do
|
||||
active'' <- getnewactive
|
||||
-- Re-sort the remaining rs
|
||||
-- because other threads could have
|
||||
-- been assigned them in the meantime.
|
||||
let rs' = sortBy (inactiveFirst active'') rs
|
||||
goconcurrent mv active'' rs'
|
||||
|
||||
inactiveFirst :: S.Set Remote -> Remote -> Remote -> Ordering
|
||||
inactiveFirst active a b
|
||||
| Remote.cost a == Remote.cost b =
|
||||
if a `S.member` active then GT else LT
|
||||
| otherwise = compare a b
|
||||
|
|
|
@ -11,6 +11,8 @@ git-annex (6.20160809) UNRELEASED; urgency=medium
|
|||
Fixes a reversion introduced in version 5.20150727.
|
||||
* Assistant, repair: Filter out git fsck lines about duplicate file
|
||||
entries in tree objects.
|
||||
* get -J, sync --content -J: Download different files from different
|
||||
remotes when the remotes have the same costs.
|
||||
|
||||
-- Joey Hess <id@joeyh.name> Mon, 05 Sep 2016 11:51:49 -0400
|
||||
|
||||
|
|
|
@ -89,16 +89,17 @@ getKey' key afile = dispatch
|
|||
showNote "not available"
|
||||
showlocs
|
||||
return False
|
||||
dispatch remotes = notifyTransfer Download afile $ trycopy remotes remotes
|
||||
trycopy full [] _ = do
|
||||
Remote.showTriedRemotes full
|
||||
showlocs
|
||||
return False
|
||||
trycopy full (r:rs) witness =
|
||||
ifM (probablyPresent r)
|
||||
( docopy r witness <||> trycopy full rs witness
|
||||
, trycopy full rs witness
|
||||
dispatch remotes = notifyTransfer Download afile $ \witness -> do
|
||||
ok <- pickRemote remotes $ \r -> ifM (probablyPresent r)
|
||||
( docopy r witness
|
||||
, return False
|
||||
)
|
||||
if ok
|
||||
then return ok
|
||||
else do
|
||||
Remote.showTriedRemotes remotes
|
||||
showlocs
|
||||
return False
|
||||
showlocs = Remote.showLocations False key []
|
||||
"No other repository is known to contain the file."
|
||||
-- This check is to avoid an ugly message if a remote is a
|
||||
|
|
|
@ -20,3 +20,12 @@ it might not really be desirable when not run in parallel. In particular,
|
|||
if A and B are on different spinning disks, then an access pattern of
|
||||
A,B,A,B might keep the disks idle enough that they spin down in-between
|
||||
access.
|
||||
|
||||
> done for `git annex get -JN` where two remotes have the same cost.
|
||||
>
|
||||
> Also for `git annex sync --content -JN` when downloading and two remotes
|
||||
> have the same cost.
|
||||
>
|
||||
> Can't think of any other places that apply, except perhaps the assistant,
|
||||
> but it does its own much different queuing of transfers. [[done]]
|
||||
> --[[Joey]]
|
||||
|
|
|
@ -5,3 +5,6 @@ This of course assumes that we like the idea of "parallel" launching and running
|
|||
This wish item is probably only useful for the paranoid people who store more than 1 copy of their data.
|
||||
|
||||
See [[get_round_robin]] --[[Joey]]
|
||||
|
||||
> [[done]], at least for `git annex get -JN` when two remotes have the same
|
||||
> cost..
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue