get, move, copy, mirror: Concurrent downloads and uploads are now supported!

This works, and seems fairly robust. Clean get of 20 files at -J3. At -J10,
there are some messages about ssh multiplexing, probably due to a race
spinning up the ssh connection cacher. But, it manages to get all the files
ok regardless.

The progress bars are a scrambled mess though, due to bugs in
ascii-progress, which I've already filed. Particularly this one:
https://github.com/yamadapc/haskell-ascii-progress/issues/8
This commit is contained in:
Joey Hess 2015-04-10 17:08:07 -04:00
parent 75b6b5cbc7
commit 8077ccbd54
14 changed files with 138 additions and 13 deletions

View file

@ -68,6 +68,7 @@ import Utility.Url
import "mtl" Control.Monad.Reader import "mtl" Control.Monad.Reader
import Control.Concurrent import Control.Concurrent
import Control.Concurrent.Async
import qualified Data.Map as M import qualified Data.Map as M
import qualified Data.Set as S import qualified Data.Set as S
@ -133,6 +134,7 @@ data AnnexState = AnnexState
#endif #endif
, existinghooks :: M.Map Git.Hook.Hook Bool , existinghooks :: M.Map Git.Hook.Hook Bool
, desktopnotify :: DesktopNotify , desktopnotify :: DesktopNotify
, workers :: [Either AnnexState (Async AnnexState)]
} }
newState :: GitConfig -> Git.Repo -> AnnexState newState :: GitConfig -> Git.Repo -> AnnexState
@ -178,6 +180,7 @@ newState c r = AnnexState
#endif #endif
, existinghooks = M.empty , existinghooks = M.empty
, desktopnotify = mempty , desktopnotify = mempty
, workers = []
} }
{- Makes an Annex state object for the specified git repo. {- Makes an Annex state object for the specified git repo.

View file

@ -42,7 +42,7 @@ type Reason = String
- The runner is used to run commands, and so can be either callCommand - The runner is used to run commands, and so can be either callCommand
- or commandAction. - or commandAction.
-} -}
handleDropsFrom :: [UUID] -> [Remote] -> Reason -> Bool -> Key -> AssociatedFile -> Maybe Remote -> CommandActionRunner -> Annex () handleDropsFrom :: [UUID] -> [Remote] -> Reason -> Bool -> Key -> AssociatedFile -> Maybe Remote -> (CommandStart -> CommandCleanup) -> Annex ()
handleDropsFrom locs rs reason fromhere key afile knownpresentremote runner = do handleDropsFrom locs rs reason fromhere key afile knownpresentremote runner = do
fs <- ifM isDirect fs <- ifM isDirect
( do ( do

View file

@ -1,6 +1,6 @@
{- git-annex command-line actions {- git-annex command-line actions
- -
- Copyright 2010-2014 Joey Hess <id@joeyh.name> - Copyright 2010-2015 Joey Hess <id@joeyh.name>
- -
- Licensed under the GNU GPL version 3 or higher. - Licensed under the GNU GPL version 3 or higher.
-} -}
@ -13,6 +13,13 @@ import Common.Annex
import qualified Annex import qualified Annex
import Types.Command import Types.Command
import qualified Annex.Queue import qualified Annex.Queue
import Messages.Internal
import Types.Messages
import Control.Concurrent.Async
import Control.Exception (throwIO)
import qualified Data.Map as M
import Data.Either
type CommandActionRunner = CommandStart -> CommandCleanup type CommandActionRunner = CommandStart -> CommandCleanup
@ -24,6 +31,7 @@ performCommandAction Command { cmdseek = seek, cmdcheck = c, cmdname = name } pa
mapM_ runCheck c mapM_ runCheck c
Annex.changeState $ \s -> s { Annex.errcounter = 0 } Annex.changeState $ \s -> s { Annex.errcounter = 0 }
seek params seek params
finishCommandActions
cont cont
showerrcount =<< Annex.getState Annex.errcounter showerrcount =<< Annex.getState Annex.errcounter
where where
@ -35,9 +43,93 @@ performCommandAction Command { cmdseek = seek, cmdcheck = c, cmdname = name } pa
- including by throwing IO errors (but other errors terminate the whole - including by throwing IO errors (but other errors terminate the whole
- command). - command).
- -
- This should only be run in the seek stage. -} - When concurrency is enabled, a thread is forked off to run the action
commandAction :: CommandActionRunner - in the background, as soon as a free slot is available.
commandAction a = account =<< tryIO go
- This should only be run in the seek stage.
-}
commandAction :: CommandStart -> Annex ()
commandAction a = withOutputType go
where
go (ParallelOutput n) = do
ws <- Annex.getState Annex.workers
(st, ws') <- if null ws
then do
st <- newWorkerState
return (st, replicate (n-1) (Left st))
else do
l <- liftIO $ drainTo (n-1) ws
findFreeSlot l
w <- liftIO $ async $ snd <$> Annex.run st run
Annex.changeState $ \s -> s { Annex.workers = Right w:ws' }
go _ = run
run = void $ includeCommandAction a
{- Waits for any forked off command actions to finish.
-
- Merge together the cleanup actions of all the AnnexStates used by
- threads, into the current Annex's state, so they'll run at shutdown.
-
- Also merge together the errcounters of the AnnexStates.
-}
finishCommandActions :: Annex ()
finishCommandActions = do
l <- liftIO . drainTo 0 =<< Annex.getState Annex.workers
forM_ (lefts l) $ \st -> do
forM_ (M.toList $ Annex.cleanup st) $
uncurry Annex.addCleanup
Annex.changeState $ \s ->
s { Annex.errcounter = Annex.errcounter s + Annex.errcounter st }
{- Wait for Asyncs from the list to finish, replacing them with their
- final AnnexStates, until the list of remaining Asyncs is not larger
- than the specified size, then returns the new list.
-
- If the action throws an exception, it is propigated, but first
- all other actions are waited for, to allow for a clean shutdown.
-}
drainTo
:: Int
-> [Either Annex.AnnexState (Async Annex.AnnexState)]
-> IO [Either Annex.AnnexState (Async Annex.AnnexState)]
drainTo sz l
| null as || sz >= length as = return l
| otherwise = do
(done, ret) <- waitAnyCatch as
let as' = filter (/= done) as
case ret of
Left e -> do
void $ drainTo 0 (map Left sts ++ map Right as')
throwIO e
Right st -> do
drainTo sz $ map Left (st:sts) ++ map Right as'
where
(sts, as) = partitionEithers l
findFreeSlot :: [Either Annex.AnnexState (Async Annex.AnnexState)] -> Annex (Annex.AnnexState, [Either Annex.AnnexState (Async Annex.AnnexState)])
findFreeSlot = go []
where
go c [] = do
st <- newWorkerState
return (st, c)
go c (Left st:rest) = return (st, c ++ rest)
go c (v:rest) = go (v:c) rest
{- From the current Annex state, get a state that is suitable for being
- used for a worker thread. Avoid sharing eg, open file handles. -}
newWorkerState :: Annex Annex.AnnexState
newWorkerState = do
st <- Annex.getState id
return $ st
{ Annex.workers = []
, Annex.catfilehandles = M.empty
, Annex.checkattrhandle = Nothing
, Annex.checkignorehandle = Nothing
}
{- Like commandAction, but without the concurrency. -}
includeCommandAction :: CommandStart -> CommandCleanup
includeCommandAction a = account =<< tryIO go
where where
go = do go = do
Annex.Queue.flushWhenFull Annex.Queue.flushWhenFull
@ -58,7 +150,7 @@ commandAction a = account =<< tryIO go
{- Runs a single command action through the start, perform and cleanup {- Runs a single command action through the start, perform and cleanup
- stages, without catching errors. Useful if one command wants to run - stages, without catching errors. Useful if one command wants to run
- part of another command. -} - part of another command. -}
callCommandAction :: CommandActionRunner callCommandAction :: CommandStart -> CommandCleanup
callCommandAction = start callCommandAction = start
where where
start = stage $ maybe skip perform start = stage $ maybe skip perform

View file

@ -138,6 +138,14 @@ jsonOption :: Option
jsonOption = Option ['j'] ["json"] (NoArg (Annex.setOutput JSONOutput)) jsonOption = Option ['j'] ["json"] (NoArg (Annex.setOutput JSONOutput))
"enable JSON output" "enable JSON output"
jobsOption :: Option
jobsOption = Option ['J'] ["jobs"] (ReqArg set paramNumber)
"enable concurrent jobs"
where
set s = case readish s of
Nothing -> error "Bad --jobs number"
Just n -> Annex.setOutput (ParallelOutput n)
timeLimitOption :: Option timeLimitOption :: Option
timeLimitOption = Option ['T'] ["time-limit"] timeLimitOption = Option ['T'] ["time-limit"]
(ReqArg Limit.addTimeLimit paramTime) (ReqArg Limit.addTimeLimit paramTime)

View file

@ -84,7 +84,7 @@ withFilesInRefs a = mapM_ go
case v of case v of
Nothing -> noop Nothing -> noop
Just k -> whenM (matcher $ MatchingKey k) $ Just k -> whenM (matcher $ MatchingKey k) $
void $ commandAction $ a f k commandAction $ a f k
withPathContents :: ((FilePath, FilePath) -> CommandStart) -> CommandSeek withPathContents :: ((FilePath, FilePath) -> CommandStart) -> CommandSeek
withPathContents a params = do withPathContents a params = do

View file

@ -21,7 +21,7 @@ cmd = [withOptions getOptions $ command "get" paramPaths seek
SectionCommon "make content of annexed files available"] SectionCommon "make content of annexed files available"]
getOptions :: [Option] getOptions :: [Option]
getOptions = fromOption : annexedMatchingOptions ++ keyOptions ++ [autoOption] getOptions = fromOption : autoOption : jobsOption : annexedMatchingOptions ++ keyOptions
seek :: CommandSeek seek :: CommandSeek
seek ps = do seek ps = do

View file

@ -21,7 +21,7 @@ cmd = [withOptions mirrorOptions $ command "mirror" paramPaths seek
SectionCommon "mirror content of files to/from another repository"] SectionCommon "mirror content of files to/from another repository"]
mirrorOptions :: [Option] mirrorOptions :: [Option]
mirrorOptions = fromToOptions ++ annexedMatchingOptions ++ keyOptions mirrorOptions = fromToOptions ++ [jobsOption] ++ annexedMatchingOptions ++ keyOptions
seek :: CommandSeek seek :: CommandSeek
seek ps = do seek ps = do

View file

@ -22,7 +22,7 @@ cmd = [withOptions moveOptions $ command "move" paramPaths seek
SectionCommon "move content of files to/from another repository"] SectionCommon "move content of files to/from another repository"]
moveOptions :: [Option] moveOptions :: [Option]
moveOptions = fromToOptions ++ keyOptions ++ annexedMatchingOptions moveOptions = fromToOptions ++ [jobsOption] ++ keyOptions ++ annexedMatchingOptions
seek :: CommandSeek seek :: CommandSeek
seek ps = do seek ps = do

View file

@ -389,7 +389,7 @@ syncFile rs f k = do
u <- getUUID u <- getUUID
let locs' = concat [[u | got], putrs, locs] let locs' = concat [[u | got], putrs, locs]
-- Using callCommandAction rather than commandAction for drops, -- Using callCommandAction rather than includeCommandAction for drops,
-- because a failure to drop does not mean the sync failed. -- because a failure to drop does not mean the sync failed.
handleDropsFrom locs' rs "unwanted" True k (Just f) handleDropsFrom locs' rs "unwanted" True k (Just f)
Nothing callCommandAction Nothing callCommandAction
@ -403,7 +403,7 @@ syncFile rs f k = do
( return [ get have ] ( return [ get have ]
, return [] , return []
) )
get have = commandAction $ do get have = includeCommandAction $ do
showStart "get" f showStart "get" f
next $ next $ getViaTmp k $ \dest -> getKeyFile' k (Just f) dest have next $ next $ getViaTmp k $ \dest -> getKeyFile' k (Just f) dest have
@ -415,7 +415,7 @@ syncFile rs f k = do
, return [] , return []
) )
put dest = do put dest = do
ok <- commandAction $ do ok <- includeCommandAction $ do
showStart "copy" f showStart "copy" f
Command.Move.toStart' dest False (Just f) k Command.Move.toStart' dest False (Just f) k
return (ok, if ok then Just (Remote.uuid dest) else Nothing) return (ok, if ok then Just (Remote.uuid dest) else Nothing)

2
debian/changelog vendored
View file

@ -4,6 +4,8 @@ git-annex (5.20150410) UNRELEASED; urgency=medium
activity from other uuids. activity from other uuids.
* Union merge could fall over if there was a file in the repository * Union merge could fall over if there was a file in the repository
with the same name as a git ref. Now fixed. with the same name as a git ref. Now fixed.
* get, move, copy, mirror: Concurrent downloads and uploads are
now supported! For example: git-annex get -J10
-- Joey Hess <id@joeyh.name> Thu, 09 Apr 2015 20:59:43 -0400 -- Joey Hess <id@joeyh.name> Thu, 09 Apr 2015 20:59:43 -0400

View file

@ -22,6 +22,11 @@ Copies the content of files from or to another remote.
Use this option to copy the content of files from the local repository Use this option to copy the content of files from the local repository
to the specified remote. to the specified remote.
* `--jobs=N` `-JN`
Enables parallel transfers with up to the specified number of jobs
running at once. For example: `-J10`
* `--auto` * `--auto`
Rather than copying all files, only copy files that don't yet have Rather than copying all files, only copy files that don't yet have

View file

@ -25,6 +25,11 @@ or transferring them from some kind of key-value store.
Normally git-annex will choose which remotes to get the content Normally git-annex will choose which remotes to get the content
from. Use this option to specify which remote to use. from. Use this option to specify which remote to use.
* `--jobs=N` `-JN`
Enables parallel download with up to the specified number of jobs
running at once. For example: `-J10`
* `--all` * `--all`
Rather than specifying a filename or path to get, this option can be Rather than specifying a filename or path to get, this option can be

View file

@ -31,6 +31,11 @@ contents.
Use the remote as the source repository, and mirror its contents to the local Use the remote as the source repository, and mirror its contents to the local
repository. repository.
* `--jobs=N` `-JN`
Enables parallel transfers with up to the specified number of jobs
running at once. For example: `-J10`
* `--all` * `--all`
Mirror all objects stored in the git annex, not only objects used by Mirror all objects stored in the git annex, not only objects used by

View file

@ -22,6 +22,11 @@ Moves the content of files from or to another remote.
Use this option to move the content of files from the local repository Use this option to move the content of files from the local repository
to the specified remote. to the specified remote.
* `--jobs=N` `-JN`
Enables parallel transfers with up to the specified number of jobs
running at once. For example: `-J10`
* `--all` * `--all`
Rather than specifying a filename or path to move, this option can be Rather than specifying a filename or path to move, this option can be