improve concurrency of move/copy --from --to

Use separate stages for download and upload. In the common case where
it downloads the file from one remote and then uploads to the other,
those are by far the most expensive operations, and there's a decent
chance the two remotes bottleneck on different resources.

Suppose it's being run with -J2 and a bunch of 10 mb files. Two threads
will be started both downloading from the src remote. They will probably
finish at the same time. Then two threads will be started uploading to
the dst remote. They will probably take the same time as well. Before
this change, it would alternate back and forth, bottlenecking on src and dst.
With this change, as soon as the two threads start uploading to dst, two
more threads are able to start, downloading from src. So bandwidth to
both remotes is saturated more often.

Other commands that use transferStages only send in one direction at a
time. So the worker threads for the other direction will sit idle, and
there will be no change in their behavior.

Sponsored-by: Dartmouth College's DANDI project
This commit is contained in:
Joey Hess 2023-01-24 13:45:01 -04:00
parent 57987ed2cd
commit 579d9b60c1
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
12 changed files with 74 additions and 35 deletions

View file

@ -120,7 +120,7 @@ alwaysRunTransfer = runTransfer' True
runTransfer' :: Observable v => Bool -> Transfer -> AssociatedFile -> Maybe StallDetection -> RetryDecider -> (MeterUpdate -> Annex v) -> Annex v runTransfer' :: Observable v => Bool -> Transfer -> AssociatedFile -> Maybe StallDetection -> RetryDecider -> (MeterUpdate -> Annex v) -> Annex v
runTransfer' ignorelock t afile stalldetection retrydecider transferaction = runTransfer' ignorelock t afile stalldetection retrydecider transferaction =
enteringStage TransferStage $ enteringStage (TransferStage (transferDirection t)) $
debugLocks $ debugLocks $
preCheckSecureHashes (transferKey t) go preCheckSecureHashes (transferKey t) go
where where
@ -244,7 +244,7 @@ runTransferrer
-> NotifyWitness -> NotifyWitness
-> Annex Bool -> Annex Bool
runTransferrer sd r k afile retrydecider direction _witness = runTransferrer sd r k afile retrydecider direction _witness =
enteringStage TransferStage $ preCheckSecureHashes k $ do enteringStage (TransferStage direction) $ preCheckSecureHashes k $ do
info <- liftIO $ startTransferInfo afile info <- liftIO $ startTransferInfo afile
go 0 info go 0 info
where where

View file

@ -50,7 +50,7 @@ seek o = case fromToOptions o of
Nothing -> giveup "Specify --from or --to" Nothing -> giveup "Specify --from or --to"
seek' :: CopyOptions -> FromToHereOptions -> CommandSeek seek' :: CopyOptions -> FromToHereOptions -> CommandSeek
seek' o fto = startConcurrency commandStages $ do seek' o fto = startConcurrend stages $ do
case batchOption o of case batchOption o of
NoBatch -> withKeyOptions NoBatch -> withKeyOptions
(keyOptions o) (autoMode o) seeker (keyOptions o) (autoMode o) seeker
@ -73,6 +73,12 @@ seek' o fto = startConcurrency commandStages $ do
} }
keyaction = Command.Move.startKey fto Command.Move.RemoveNever keyaction = Command.Move.startKey fto Command.Move.RemoveNever
stages = case fto of
FromOrToRemote (FromRemote _) -> commandStages
FromOrToRemote (ToRemote _) -> commandStages
ToHere -> commandStages
FromRemoteToRemote _ _ -> transferStages
{- A copy is just a move that does not delete the source file. {- A copy is just a move that does not delete the source file.
- However, auto mode avoids unnecessary copies, and avoids getting or - However, auto mode avoids unnecessary copies, and avoids getting or
- sending non-preferred content. -} - sending non-preferred content. -}

View file

@ -38,7 +38,7 @@ optParser desc = GetOptions
<*> parseBatchOption True <*> parseBatchOption True
seek :: GetOptions -> CommandSeek seek :: GetOptions -> CommandSeek
seek o = startConcurrency downloadStages $ do seek o = startConcurrency transferStages $ do
from <- maybe (pure Nothing) (Just <$$> getParsed) (getFrom o) from <- maybe (pure Nothing) (Just <$$> getParsed) (getFrom o)
let seeker = AnnexedFileSeeker let seeker = AnnexedFileSeeker
{ startAction = start o from { startAction = start o from

View file

@ -48,7 +48,7 @@ seek o = startConcurrency stages $
=<< workTreeItems ww (mirrorFiles o) =<< workTreeItems ww (mirrorFiles o)
where where
stages = case fromToOptions o of stages = case fromToOptions o of
FromRemote _ -> downloadStages FromRemote _ -> transferStages
ToRemote _ -> commandStages ToRemote _ -> commandStages
ww = WarnUnmatchLsFiles ww = WarnUnmatchLsFiles
seeker = AnnexedFileSeeker seeker = AnnexedFileSeeker

View file

@ -84,10 +84,10 @@ seek' o fto = startConcurrency stages $ do
, usesLocationLog = True , usesLocationLog = True
} }
stages = case fto of stages = case fto of
FromOrToRemote (FromRemote _) -> downloadStages FromOrToRemote (FromRemote _) -> transferStages
FromOrToRemote (ToRemote _) -> commandStages FromOrToRemote (ToRemote _) -> commandStages
ToHere -> downloadStages ToHere -> transferStages
FromRemoteToRemote _ _ -> commandStages FromRemoteToRemote _ _ -> transferStages
keyaction = startKey fto (removeWhen o) keyaction = startKey fto (removeWhen o)
ww = WarnUnmatchLsFiles ww = WarnUnmatchLsFiles

View file

@ -212,7 +212,7 @@ instance DeferredParseClass SyncOptions where
seek :: SyncOptions -> CommandSeek seek :: SyncOptions -> CommandSeek
seek o = do seek o = do
prepMerge prepMerge
startConcurrency downloadStages (seek' o) startConcurrency transferStages (seek' o)
seek' :: SyncOptions -> CommandSeek seek' :: SyncOptions -> CommandSeek
seek' o = do seek' o = do

25
Types/Direction.hs Normal file
View file

@ -0,0 +1,25 @@
{- git-annex transfer direction types
-
- Copyright 2012 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
{-# LANGUAGE OverloadedStrings #-}
module Types.Direction where
import qualified Data.ByteString as B
data Direction = Upload | Download
deriving (Eq, Ord, Show, Read)
formatDirection :: Direction -> B.ByteString
formatDirection Upload = "upload"
formatDirection Download = "download"
parseDirection :: String -> Maybe Direction
parseDirection "upload" = Just Upload
parseDirection "download" = Just Download
parseDirection _ = Nothing

View file

@ -5,20 +5,22 @@
- Licensed under the GNU AGPL version 3 or higher. - Licensed under the GNU AGPL version 3 or higher.
-} -}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE FlexibleInstances #-}
module Types.Transfer where module Types.Transfer (
module Types.Transfer,
module Types.Direction
) where
import Types import Types
import Types.Remote (Verification(..)) import Types.Remote (Verification(..))
import Types.Key import Types.Key
import Types.Direction
import Utility.PID import Utility.PID
import Utility.QuickCheck import Utility.QuickCheck
import Utility.Url import Utility.Url
import Utility.FileSystemEncoding import Utility.FileSystemEncoding
import qualified Data.ByteString as B
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Control.Concurrent import Control.Concurrent
import Control.Applicative import Control.Applicative
@ -55,18 +57,6 @@ data TransferInfo = TransferInfo
stubTransferInfo :: TransferInfo stubTransferInfo :: TransferInfo
stubTransferInfo = TransferInfo Nothing Nothing Nothing Nothing Nothing (AssociatedFile Nothing) False stubTransferInfo = TransferInfo Nothing Nothing Nothing Nothing Nothing (AssociatedFile Nothing) False
data Direction = Upload | Download
deriving (Eq, Ord, Show, Read)
formatDirection :: Direction -> B.ByteString
formatDirection Upload = "upload"
formatDirection Download = "download"
parseDirection :: String -> Maybe Direction
parseDirection "upload" = Just Upload
parseDirection "download" = Just Download
parseDirection _ = Nothing
instance Arbitrary TransferInfo where instance Arbitrary TransferInfo where
arbitrary = TransferInfo arbitrary = TransferInfo
<$> arbitrary <$> arbitrary

View file

@ -1,12 +1,14 @@
{- Worker thread pool. {- Worker thread pool.
- -
- Copyright 2019 Joey Hess <id@joeyh.name> - Copyright 2019-2023 Joey Hess <id@joeyh.name>
- -
- Licensed under the GNU AGPL version 3 or higher. - Licensed under the GNU AGPL version 3 or higher.
-} -}
module Types.WorkerPool where module Types.WorkerPool where
import Types.Direction
import Control.Concurrent import Control.Concurrent
import Control.Concurrent.Async import Control.Concurrent.Async
import qualified Data.Set as S import qualified Data.Set as S
@ -49,7 +51,7 @@ data WorkerStage
-- ^ Running a CommandPerform action. -- ^ Running a CommandPerform action.
| CleanupStage | CleanupStage
-- ^ Running a CommandCleanup action. -- ^ Running a CommandCleanup action.
| TransferStage | TransferStage Direction
-- ^ Transferring content to or from a remote. -- ^ Transferring content to or from a remote.
| VerifyStage | VerifyStage
-- ^ Verifying content, eg by calculating a checksum. -- ^ Verifying content, eg by calculating a checksum.
@ -82,15 +84,24 @@ commandStages = UsedStages
, stageSet = S.fromList [PerformStage, CleanupStage] , stageSet = S.fromList [PerformStage, CleanupStage]
} }
-- | When a command is downloading content, it can use this instead. -- | This is mostly useful for downloads, not for uploads. A download
-- Downloads are often bottlenecked on the network or another disk -- is often bottlenecked on the network or another disk than the one
-- than the one containing the repository, while verification bottlenecks -- containing the repository. When verification is not done incrementally,
-- on the disk containing the repository or on the CPU. So, run the -- it bottlenecks on the disk containing the repository or on the CPU.
-- transfer and verify stage separately. -- So it makes sense to run the download and verify stages separately.
downloadStages :: UsedStages --
downloadStages = UsedStages -- For uploads, there is no separate verify step to this is less likely
{ initialStage = TransferStage -- to be useful than commandStages. However, a separate stage is provided
, stageSet = S.fromList [TransferStage, VerifyStage] -- for Uploads. That can be useful when a command downloads from one remote
-- (eg using the network) and uploads to another remote (eg using a disk).
transferStages :: UsedStages
transferStages = UsedStages
{ initialStage = TransferStage Download
, stageSet = S.fromList
[ TransferStage Download
, TransferStage Upload
, VerifyStage
]
} }
workerStage :: Worker t -> WorkerStage workerStage :: Worker t -> WorkerStage

View file

@ -48,6 +48,9 @@ Paths of files or directories to operate on can be specified.
Setting this to "cpus" will run one job per CPU core. Setting this to "cpus" will run one job per CPU core.
Note that when using --from with --to, twice this many jobs will
run at once, evenly split between the two remotes.
* `--auto` * `--auto`
Rather than copying all specified files, only copy those that don't yet have Rather than copying all specified files, only copy those that don't yet have

View file

@ -55,6 +55,9 @@ Paths of files or directories to operate on can be specified.
Setting this to "cpus" will run one job per CPU core. Setting this to "cpus" will run one job per CPU core.
Note that when using --from with --to, twice this many jobs will
run at once, evenly split between the two remotes.
* `--all` `-A` * `--all` `-A`
Rather than specifying a filename or path to move, this option can be Rather than specifying a filename or path to move, this option can be

View file

@ -1009,6 +1009,7 @@ Executable git-annex
Types.DeferredParse Types.DeferredParse
Types.DesktopNotify Types.DesktopNotify
Types.Difference Types.Difference
Types.Direction
Types.Distribution Types.Distribution
Types.Export Types.Export
Types.FileMatcher Types.FileMatcher