run download checksum verification in separate job pool
get, move, copy, sync: When -J or annex.jobs has enabled concurrency, checksum verification uses a separate job pool than is used for downloads, to keep bandwidth saturated. Not yet done for upload checksum verification, but that only affects remotes on local disks.
This commit is contained in:
parent
5a9842d7ed
commit
04cc470201
8 changed files with 43 additions and 35 deletions
10
CHANGELOG
10
CHANGELOG
|
@ -1,9 +1,11 @@
|
||||||
git-annex (7.20190616) UNRELEASED; urgency=medium
|
git-annex (7.20190616) UNRELEASED; urgency=medium
|
||||||
|
|
||||||
* When running multiple concurrent actions, the cleanup phase is run
|
* get, move, copy, sync: When -J or annex.jobs has enabled concurrency,
|
||||||
in a separate queue than the main action queue. This can make some
|
checksum verification uses a separate job pool than is used for
|
||||||
commands faster, because less time is spent on bookkeeping in
|
downloads, to keep bandwidth saturated.
|
||||||
between each file transfer.
|
* Other commands also run their cleanup phase using a separate job pool
|
||||||
|
than their perform phase, which may make some of them somewhat faster
|
||||||
|
when running concurrently as well.
|
||||||
|
|
||||||
-- Joey Hess <id@joeyh.name> Sat, 15 Jun 2019 12:38:25 -0400
|
-- Joey Hess <id@joeyh.name> Sat, 15 Jun 2019 12:38:25 -0400
|
||||||
|
|
||||||
|
|
19
Command.hs
19
Command.hs
|
@ -28,6 +28,7 @@ import Config
|
||||||
import Utility.Daemon
|
import Utility.Daemon
|
||||||
import Types.Transfer
|
import Types.Transfer
|
||||||
import Types.ActionItem
|
import Types.ActionItem
|
||||||
|
import Types.WorkerPool
|
||||||
|
|
||||||
{- Generates a normal Command -}
|
{- Generates a normal Command -}
|
||||||
command :: String -> CommandSection -> String -> CmdParamsDesc -> (CmdParamsDesc -> CommandParser) -> Command
|
command :: String -> CommandSection -> String -> CmdParamsDesc -> (CmdParamsDesc -> CommandParser) -> Command
|
||||||
|
@ -104,6 +105,24 @@ stop = return Nothing
|
||||||
stopUnless :: Annex Bool -> Annex (Maybe a) -> Annex (Maybe a)
|
stopUnless :: Annex Bool -> Annex (Maybe a) -> Annex (Maybe a)
|
||||||
stopUnless c a = ifM c ( a , stop )
|
stopUnless c a = ifM c ( a , stop )
|
||||||
|
|
||||||
|
{- This can be used in the perform stage to run the action that is the bulk
|
||||||
|
- of the work to do in that stage. If the action succeeds, then any actions
|
||||||
|
- run after it will be scheduled as if they were run in the cleanup stage
|
||||||
|
- instead of the perform stage.
|
||||||
|
-
|
||||||
|
- This is not needed for a perform stage that uses `next` to run the
|
||||||
|
- cleanup stage action. But sometimes a larger action is being built up
|
||||||
|
- and it's not practical to separate out the cleanup stage part from the
|
||||||
|
- rest of the action.
|
||||||
|
-}
|
||||||
|
performJob :: Observable a => Annex a -> Annex a
|
||||||
|
performJob a = do
|
||||||
|
r <- a
|
||||||
|
if observeBool r
|
||||||
|
then changeStageTo CleanupStage
|
||||||
|
else noop
|
||||||
|
return r
|
||||||
|
|
||||||
{- When acting on a failed transfer, stops unless it was in the specified
|
{- When acting on a failed transfer, stops unless it was in the specified
|
||||||
- direction. -}
|
- direction. -}
|
||||||
checkFailedTransferDirection :: ActionItem -> Direction -> Annex (Maybe a) -> Annex (Maybe a)
|
checkFailedTransferDirection :: ActionItem -> Direction -> Annex (Maybe a) -> Annex (Maybe a)
|
||||||
|
|
|
@ -108,7 +108,7 @@ getKey' key afile = dispatch
|
||||||
| Remote.hasKeyCheap r =
|
| Remote.hasKeyCheap r =
|
||||||
either (const False) id <$> Remote.hasKey r key
|
either (const False) id <$> Remote.hasKey r key
|
||||||
| otherwise = return True
|
| otherwise = return True
|
||||||
docopy r witness = getViaTmp (Remote.retrievalSecurityPolicy r) (RemoteVerify r) key $ \dest ->
|
docopy r witness = performJob $ getViaTmp (Remote.retrievalSecurityPolicy r) (RemoteVerify r) key $ \dest ->
|
||||||
download (Remote.uuid r) key afile stdRetry
|
download (Remote.uuid r) key afile stdRetry
|
||||||
(\p -> do
|
(\p -> do
|
||||||
showAction $ "from " ++ Remote.name r
|
showAction $ "from " ++ Remote.name r
|
||||||
|
|
|
@ -208,7 +208,7 @@ fromPerform src removewhen key afile = do
|
||||||
where
|
where
|
||||||
go = notifyTransfer Download afile $
|
go = notifyTransfer Download afile $
|
||||||
download (Remote.uuid src) key afile stdRetry $ \p ->
|
download (Remote.uuid src) key afile stdRetry $ \p ->
|
||||||
getViaTmp (Remote.retrievalSecurityPolicy src) (RemoteVerify src) key $ \t ->
|
performJob $ getViaTmp (Remote.retrievalSecurityPolicy src) (RemoteVerify src) key $ \t ->
|
||||||
Remote.retrieveKeyFile src key afile t p
|
Remote.retrieveKeyFile src key afile t p
|
||||||
dispatch _ _ False = stop -- failed
|
dispatch _ _ False = stop -- failed
|
||||||
dispatch RemoveNever _ True = next $ return True -- copy complete
|
dispatch RemoveNever _ True = next $ return True -- copy complete
|
||||||
|
|
|
@ -692,7 +692,8 @@ syncFile ebloom rs af k = do
|
||||||
, return []
|
, return []
|
||||||
)
|
)
|
||||||
get have = includeCommandAction $ starting "get" ai $
|
get have = includeCommandAction $ starting "get" ai $
|
||||||
next $ getKey' k af have
|
stopUnless (getKey' k af have) $
|
||||||
|
next $ return True
|
||||||
|
|
||||||
wantput r
|
wantput r
|
||||||
| Remote.readonly r || remoteAnnexReadOnly (Remote.gitconfig r) = return False
|
| Remote.readonly r || remoteAnnexReadOnly (Remote.gitconfig r) = return False
|
||||||
|
|
|
@ -7,6 +7,9 @@ module Types.Concurrency where
|
||||||
|
|
||||||
import Utility.PartialPrelude
|
import Utility.PartialPrelude
|
||||||
|
|
||||||
|
-- Note that Concurrent 1 is not the same as NonConcurrent;
|
||||||
|
-- the former specifies 1 job of each particular kind, but there can be
|
||||||
|
-- more than one kind of job running concurrently.
|
||||||
data Concurrency = NonConcurrent | Concurrent Int | ConcurrentPerCpu
|
data Concurrency = NonConcurrent | Concurrent Int | ConcurrentPerCpu
|
||||||
|
|
||||||
parseConcurrency :: String -> Maybe Concurrency
|
parseConcurrency :: String -> Maybe Concurrency
|
||||||
|
|
|
@ -14,6 +14,7 @@ import Control.Concurrent.Async
|
||||||
data WorkerPool t
|
data WorkerPool t
|
||||||
= UnallocatedWorkerPool
|
= UnallocatedWorkerPool
|
||||||
| WorkerPool [Worker t]
|
| WorkerPool [Worker t]
|
||||||
|
deriving (Show)
|
||||||
|
|
||||||
-- | A worker can either be idle or running an Async action.
|
-- | A worker can either be idle or running an Async action.
|
||||||
-- And it is used for some stage.
|
-- And it is used for some stage.
|
||||||
|
@ -21,9 +22,13 @@ data Worker t
|
||||||
= IdleWorker t WorkerStage
|
= IdleWorker t WorkerStage
|
||||||
| ActiveWorker (Async t) WorkerStage
|
| ActiveWorker (Async t) WorkerStage
|
||||||
|
|
||||||
|
instance Show (Worker t) where
|
||||||
|
show (IdleWorker _ s) = "IdleWorker " ++ show s
|
||||||
|
show (ActiveWorker _ s) = "ActiveWorker " ++ show s
|
||||||
|
|
||||||
-- | These correspond to CommandPerform and CommandCleanup.
|
-- | These correspond to CommandPerform and CommandCleanup.
|
||||||
data WorkerStage = PerformStage | CleanupStage
|
data WorkerStage = PerformStage | CleanupStage
|
||||||
deriving (Eq)
|
deriving (Show, Eq)
|
||||||
|
|
||||||
workerStage :: Worker t -> WorkerStage
|
workerStage :: Worker t -> WorkerStage
|
||||||
workerStage (IdleWorker _ s) = s
|
workerStage (IdleWorker _ s) = s
|
||||||
|
|
|
@ -8,30 +8,8 @@ are still some things that could be improved, tracked here:
|
||||||
supported; it might be good to have `--jobs=cpus-1` to leave a spare
|
supported; it might be good to have `--jobs=cpus-1` to leave a spare
|
||||||
cpu to avoid contention, or `--jobs=remotes*2` to run 2 jobs per remote.
|
cpu to avoid contention, or `--jobs=remotes*2` to run 2 jobs per remote.
|
||||||
|
|
||||||
* Parallelism is often used when the user wants to full saturate the pipe
|
* Checksum verification is done in the cleanup stage job pool now for
|
||||||
to a remote, since having some extra transfers running avoid being
|
`git-annex get`, and `git-annex move --from` etc. But only for downloads.
|
||||||
delayed while git-annex runs cleanup actions, checksum verification,
|
When an upload involves checksum verification, eg `git annex move --to` a
|
||||||
and other non-transfer stuff.
|
removable drive, that checksum verification is done inside Remote.Git,
|
||||||
|
and still runs in the perform stage job pool.
|
||||||
But, the user will sometimes be disappointed, because every job
|
|
||||||
can still end up stuck doing checksum verification at the same time,
|
|
||||||
so the pipe to the remote is not saturated.
|
|
||||||
|
|
||||||
Now that cleanup actions don't occupy space in the main worker queue,
|
|
||||||
all that needs to be done is make checksum verification be done as the
|
|
||||||
cleanup action. Currently, it's bundled into the same action that
|
|
||||||
transfers content.
|
|
||||||
|
|
||||||
> Had a closer look at moving the checksum verification to cleanup,
|
|
||||||
> and it's really quite difficult to do. Things like runTransfer
|
|
||||||
> and pickRemote expect to be able to run the entire transfer action,
|
|
||||||
> including verification, and if it fails may retry it or try to
|
|
||||||
> transfer from a different remote instead.
|
|
||||||
>
|
|
||||||
> It feels like inverting all that control to move verification to
|
|
||||||
> cleanup would introduce a lot of complexity if it's even possible to do
|
|
||||||
> cleanly at all.
|
|
||||||
>
|
|
||||||
> Makes me wonder about just calling changeStageTo once the transfer
|
|
||||||
> is complete and before verification. Feels like a hack, but I think it
|
|
||||||
> would just work.
|
|
||||||
|
|
Loading…
Reference in a new issue