use fine-grained WorkerStages when transferring and verifying
This means that Command.Move and Command.Get don't need to manually set the stage, and is a lot cleaner conceptually. Also, this makes Command.Sync.syncFile use the worker pool better. In the scenario where it first downloads content and then uploads it to some other remotes, it will start in TransferStage, then enter VerifyStage and then go back to TransferStage for each transfer to the remotes. Before, it entered CleanupStage after the download, and stayed in it for the upload, so too many transfer jobs could run at the same time. Note that, in Remote.Git, it uses runTransfer and also verifyKeyContent inside onLocal. That has a Annex state for the remote, with no worker pool. So the resulting calls to enteringStage won't block in there. While Remote.Git.copyToRemote does do checksum verification, I realized that should not use a verification slot in the WorkerPool to do it. Because, it's reading back from eg, a removable disk to checksum. That will contend with other writes to that disk. It's best to treat that checksum verification as just part of the transer. So, removed the todo item about that, as there's nothing needing to be done.
This commit is contained in:
parent
53882ab4a7
commit
9d36c826c0
9 changed files with 13 additions and 34 deletions
|
@ -91,6 +91,7 @@ import Utility.InodeCache
|
|||
import Annex.Content.LowLevel
|
||||
import Annex.Content.PointerFile
|
||||
import Annex.Concurrent
|
||||
import Types.WorkerPool
|
||||
|
||||
{- Checks if a given key's content is currently present. -}
|
||||
inAnnex :: Key -> Annex Bool
|
||||
|
@ -388,7 +389,7 @@ verifyKeyContent rsp v verification k f = case (rsp, verification) of
|
|||
)
|
||||
(_, MustVerify) -> verify
|
||||
where
|
||||
verify = verifysize <&&> verifycontent
|
||||
verify = enteringStage VerifyStage $ verifysize <&&> verifycontent
|
||||
verifysize = case keySize k of
|
||||
Nothing -> return True
|
||||
Just size -> do
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{- git-annex transfers
|
||||
-
|
||||
- Copyright 2012-2018 Joey Hess <id@joeyh.name>
|
||||
- Copyright 2012-2019 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU AGPL version 3 or higher.
|
||||
-}
|
||||
|
@ -30,7 +30,9 @@ import Utility.ThreadScheduler
|
|||
import Annex.LockPool
|
||||
import Types.Key
|
||||
import qualified Types.Remote as Remote
|
||||
import Annex.Concurrent
|
||||
import Types.Concurrency
|
||||
import Types.WorkerPool
|
||||
|
||||
import Control.Concurrent
|
||||
import qualified Data.Map.Strict as M
|
||||
|
@ -77,7 +79,7 @@ alwaysRunTransfer :: Observable v => Transfer -> AssociatedFile -> RetryDecider
|
|||
alwaysRunTransfer = runTransfer' True
|
||||
|
||||
runTransfer' :: Observable v => Bool -> Transfer -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> Annex v
|
||||
runTransfer' ignorelock t afile retrydecider transferaction = debugLocks $ checkSecureHashes t $ do
|
||||
runTransfer' ignorelock t afile retrydecider transferaction = enteringStage TransferStage $ debugLocks $ checkSecureHashes t $ do
|
||||
shouldretry <- retrydecider
|
||||
info <- liftIO $ startTransferInfo afile
|
||||
(meter, tfile, createtfile, metervar) <- mkProgressUpdater t info
|
||||
|
|
18
Command.hs
18
Command.hs
|
@ -105,24 +105,6 @@ stop = return Nothing
|
|||
stopUnless :: Annex Bool -> Annex (Maybe a) -> Annex (Maybe a)
|
||||
stopUnless c a = ifM c ( a , stop )
|
||||
|
||||
{- This can be used in the perform stage to run the action that is the bulk
|
||||
- of the work to do in that stage. If the action succeeds, then any actions
|
||||
- run after it will be scheduled as if they were run in the cleanup stage
|
||||
- instead of the perform stage.
|
||||
-
|
||||
- This is not needed for a perform stage that uses `next` to run the
|
||||
- cleanup stage action. But sometimes a larger action is being built up
|
||||
- and it's not practical to separate out the cleanup stage part from the
|
||||
- rest of the action.
|
||||
-}
|
||||
performJob :: Observable a => Annex a -> Annex a
|
||||
performJob a = do
|
||||
r <- a
|
||||
if observeBool r
|
||||
then changeStageTo CleanupStage
|
||||
else noop
|
||||
return r
|
||||
|
||||
{- When acting on a failed transfer, stops unless it was in the specified
|
||||
- direction. -}
|
||||
checkFailedTransferDirection :: ActionItem -> Direction -> Annex (Maybe a) -> Annex (Maybe a)
|
||||
|
|
|
@ -52,7 +52,7 @@ parseDropFromOption = parseRemoteOption <$> strOption
|
|||
)
|
||||
|
||||
seek :: DropOptions -> CommandSeek
|
||||
seek o = startConcurrency commandStages $
|
||||
seek o = startConcurrency transferStages $
|
||||
case batchOption o of
|
||||
Batch fmt -> batchFilesMatching fmt go
|
||||
NoBatch -> withKeyOptions (keyOptions o) (autoMode o)
|
||||
|
|
|
@ -38,7 +38,7 @@ optParser desc = GetOptions
|
|||
<*> parseBatchOption
|
||||
|
||||
seek :: GetOptions -> CommandSeek
|
||||
seek o = startConcurrency commandStages $ do
|
||||
seek o = startConcurrency transferStages $ do
|
||||
from <- maybe (pure Nothing) (Just <$$> getParsed) (getFrom o)
|
||||
let go = whenAnnexed $ start o from
|
||||
case batchOption o of
|
||||
|
@ -108,7 +108,7 @@ getKey' key afile = dispatch
|
|||
| Remote.hasKeyCheap r =
|
||||
either (const False) id <$> Remote.hasKey r key
|
||||
| otherwise = return True
|
||||
docopy r witness = performJob $ getViaTmp (Remote.retrievalSecurityPolicy r) (RemoteVerify r) key $ \dest ->
|
||||
docopy r witness = getViaTmp (Remote.retrievalSecurityPolicy r) (RemoteVerify r) key $ \dest ->
|
||||
download (Remote.uuid r) key afile stdRetry
|
||||
(\p -> do
|
||||
showAction $ "from " ++ Remote.name r
|
||||
|
|
|
@ -41,7 +41,7 @@ instance DeferredParseClass MirrorOptions where
|
|||
<*> pure (keyOptions v)
|
||||
|
||||
seek :: MirrorOptions -> CommandSeek
|
||||
seek o = startConcurrency commandStages $
|
||||
seek o = startConcurrency transferStages $
|
||||
withKeyOptions (keyOptions o) False
|
||||
(commandAction . startKey o (AssociatedFile Nothing))
|
||||
(withFilesInGit (commandAction . (whenAnnexed $ start o)))
|
||||
|
|
|
@ -54,7 +54,7 @@ data RemoveWhen = RemoveSafe | RemoveNever
|
|||
deriving (Show, Eq)
|
||||
|
||||
seek :: MoveOptions -> CommandSeek
|
||||
seek o = startConcurrency commandStages $ do
|
||||
seek o = startConcurrency transferStages $ do
|
||||
let go = whenAnnexed $ start (fromToOptions o) (removeWhen o)
|
||||
case batchOption o of
|
||||
Batch fmt -> batchFilesMatching fmt go
|
||||
|
@ -208,7 +208,7 @@ fromPerform src removewhen key afile = do
|
|||
where
|
||||
go = notifyTransfer Download afile $
|
||||
download (Remote.uuid src) key afile stdRetry $ \p ->
|
||||
performJob $ getViaTmp (Remote.retrievalSecurityPolicy src) (RemoteVerify src) key $ \t ->
|
||||
getViaTmp (Remote.retrievalSecurityPolicy src) (RemoteVerify src) key $ \t ->
|
||||
Remote.retrieveKeyFile src key afile t p
|
||||
dispatch _ _ False = stop -- failed
|
||||
dispatch RemoveNever _ True = next $ return True -- copy complete
|
||||
|
|
|
@ -164,7 +164,7 @@ instance DeferredParseClass SyncOptions where
|
|||
seek :: SyncOptions -> CommandSeek
|
||||
seek o = do
|
||||
prepMerge
|
||||
startConcurrency commandStages (seek' o)
|
||||
startConcurrency transferStages (seek' o)
|
||||
|
||||
seek' :: SyncOptions -> CommandSeek
|
||||
seek' o = do
|
||||
|
|
|
@ -7,9 +7,3 @@ are still some things that could be improved, tracked here:
|
|||
* Maybe extend --jobs/annex.jobs for more control. `--jobs=cpus` is already
|
||||
supported; it might be good to have `--jobs=cpus-1` to leave a spare
|
||||
cpu to avoid contention, or `--jobs=remotes*2` to run 2 jobs per remote.
|
||||
|
||||
* Checksum verification is done in the cleanup stage job pool now for
|
||||
`git-annex get`, and `git-annex move --from` etc. But only for downloads.
|
||||
When an upload involves checksum verification, eg `git annex move --to` a
|
||||
removable drive, that checksum verification is done inside Remote.Git,
|
||||
and still runs in the perform stage job pool.
|
||||
|
|
Loading…
Reference in a new issue