concurrency and status messages when downloading from import
This commit is contained in:
parent
ee5f1422df
commit
e412129523
10 changed files with 57 additions and 27 deletions
|
@ -16,7 +16,6 @@ import qualified Remote
|
||||||
import qualified Command.Drop
|
import qualified Command.Drop
|
||||||
import Command
|
import Command
|
||||||
import Annex.Wanted
|
import Annex.Wanted
|
||||||
import Annex.Export
|
|
||||||
import Config
|
import Config
|
||||||
import Annex.Content.Direct
|
import Annex.Content.Direct
|
||||||
import qualified Database.Keys
|
import qualified Database.Keys
|
||||||
|
|
|
@ -13,11 +13,9 @@ import Types
|
||||||
import Types.Key
|
import Types.Key
|
||||||
import qualified Git
|
import qualified Git
|
||||||
import qualified Types.Remote as Remote
|
import qualified Types.Remote as Remote
|
||||||
import Config
|
|
||||||
import Messages
|
import Messages
|
||||||
import Utility.FileSystemEncoding
|
import Utility.FileSystemEncoding
|
||||||
|
|
||||||
import qualified Data.Map as M
|
|
||||||
import Control.Applicative
|
import Control.Applicative
|
||||||
import Data.Maybe
|
import Data.Maybe
|
||||||
import Prelude
|
import Prelude
|
||||||
|
@ -44,9 +42,6 @@ exportKey sha = mk <$> catKey sha
|
||||||
, keyChunkNum = Nothing
|
, keyChunkNum = Nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
exportTree :: Remote.RemoteConfig -> Bool
|
|
||||||
exportTree c = fromMaybe False $ yesNo =<< M.lookup "exporttree" c
|
|
||||||
|
|
||||||
warnExportConflict :: Remote -> Annex ()
|
warnExportConflict :: Remote -> Annex ()
|
||||||
warnExportConflict r = toplevelWarning True $
|
warnExportConflict r = toplevelWarning True $
|
||||||
"Export conflict detected. Different trees have been exported to " ++
|
"Export conflict detected. Different trees have been exported to " ++
|
||||||
|
|
|
@ -30,6 +30,7 @@ import Annex.Link
|
||||||
import Annex.LockFile
|
import Annex.LockFile
|
||||||
import Annex.Content
|
import Annex.Content
|
||||||
import Annex.Export
|
import Annex.Export
|
||||||
|
import Command
|
||||||
import Backend
|
import Backend
|
||||||
import Config
|
import Config
|
||||||
import Types.Key
|
import Types.Key
|
||||||
|
@ -44,9 +45,7 @@ import qualified Logs.ContentIdentifier as CIDLog
|
||||||
|
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
import qualified Data.Map.Strict as M
|
import qualified Data.Map.Strict as M
|
||||||
|
import qualified Data.Set as S
|
||||||
importTree :: Remote.RemoteConfig -> Bool
|
|
||||||
importTree c = fromMaybe False $ yesNo =<< M.lookup "importtree" c
|
|
||||||
|
|
||||||
{- Configures how to build an import tree. -}
|
{- Configures how to build an import tree. -}
|
||||||
data ImportTreeConfig
|
data ImportTreeConfig
|
||||||
|
@ -217,20 +216,24 @@ downloadImport remote importtreeconfig importablecontents = do
|
||||||
-- importablecontents (eg when it has a history),
|
-- importablecontents (eg when it has a history),
|
||||||
-- they will only be downloaded once.
|
-- they will only be downloaded once.
|
||||||
cidmap <- liftIO $ newTVarIO M.empty
|
cidmap <- liftIO $ newTVarIO M.empty
|
||||||
|
-- When concurrency is enabled, this set is needed to
|
||||||
|
-- avoid two threads both downloading the same content identifier.
|
||||||
|
downloading <- liftIO $ newTVarIO S.empty
|
||||||
withExclusiveLock gitAnnexContentIdentifierLock $
|
withExclusiveLock gitAnnexContentIdentifierLock $
|
||||||
bracket CIDDb.openDb CIDDb.closeDb $ \db -> do
|
bracket CIDDb.openDb CIDDb.closeDb $ \db -> do
|
||||||
CIDDb.needsUpdateFromLog db
|
CIDDb.needsUpdateFromLog db
|
||||||
>>= maybe noop (CIDDb.updateFromLog db)
|
>>= maybe noop (CIDDb.updateFromLog db)
|
||||||
go cidmap importablecontents db
|
go cidmap downloading importablecontents db
|
||||||
-- TODO really support concurrency; avoid donwloading the same
|
|
||||||
-- ContentIdentifier twice.
|
|
||||||
where
|
where
|
||||||
go cidmap (ImportableContents l h) db = do
|
go cidmap downloading (ImportableContents l h) db = do
|
||||||
l' <- mapM (download cidmap db) l
|
jobs <- forM l $ \i ->
|
||||||
|
startdownload cidmap downloading db i
|
||||||
|
l' <- liftIO $ forM jobs $
|
||||||
|
either pure (atomically . takeTMVar)
|
||||||
if any isNothing l'
|
if any isNothing l'
|
||||||
then return Nothing
|
then return Nothing
|
||||||
else do
|
else do
|
||||||
h' <- mapM (\ic -> go cidmap ic db) h
|
h' <- mapM (\ic -> go cidmap downloading ic db) h
|
||||||
if any isNothing h'
|
if any isNothing h'
|
||||||
then return Nothing
|
then return Nothing
|
||||||
else return $ Just $
|
else return $ Just $
|
||||||
|
@ -238,9 +241,40 @@ downloadImport remote importtreeconfig importablecontents = do
|
||||||
(catMaybes l')
|
(catMaybes l')
|
||||||
(catMaybes h')
|
(catMaybes h')
|
||||||
|
|
||||||
download cidmap db (loc, (cid, sz)) = getcidkey cidmap db cid >>= \case
|
waitstart downloading cid = liftIO $ atomically $ do
|
||||||
(k:_) -> return $ Just (loc, k)
|
s <- readTVar downloading
|
||||||
[] -> checkDiskSpaceToGet tmpkey Nothing $
|
if S.member cid s
|
||||||
|
then retry
|
||||||
|
else writeTVar downloading $ S.insert cid s
|
||||||
|
|
||||||
|
signaldone downloading cid = liftIO $ atomically $ do
|
||||||
|
s <- readTVar downloading
|
||||||
|
writeTVar downloading $ S.delete cid s
|
||||||
|
|
||||||
|
startdownload cidmap downloading db i@(loc, (cid, _sz)) = getcidkey cidmap db cid >>= \case
|
||||||
|
(k:_) -> return $ Left $ Just (loc, k)
|
||||||
|
[] -> do
|
||||||
|
job <- liftIO $ newEmptyTMVarIO
|
||||||
|
let rundownload = do
|
||||||
|
showStart "import" (fromImportLocation loc)
|
||||||
|
next $ tryNonAsync (download cidmap db i) >>= \case
|
||||||
|
Left e -> next $ do
|
||||||
|
warning (show e)
|
||||||
|
liftIO $ atomically $
|
||||||
|
putTMVar job Nothing
|
||||||
|
return False
|
||||||
|
Right r -> next $ do
|
||||||
|
liftIO $ atomically $
|
||||||
|
putTMVar job r
|
||||||
|
return True
|
||||||
|
commandAction $ bracket_
|
||||||
|
(waitstart downloading cid)
|
||||||
|
(signaldone downloading cid)
|
||||||
|
rundownload
|
||||||
|
return (Right job)
|
||||||
|
|
||||||
|
download cidmap db (loc, (cid, sz)) =
|
||||||
|
checkDiskSpaceToGet tmpkey Nothing $
|
||||||
withTmp tmpkey $ \tmpfile ->
|
withTmp tmpkey $ \tmpfile ->
|
||||||
Remote.retrieveExportWithContentIdentifier ia loc cid tmpfile (mkkey loc tmpfile) p >>= \case
|
Remote.retrieveExportWithContentIdentifier ia loc cid tmpfile (mkkey loc tmpfile) p >>= \case
|
||||||
Just k -> tryNonAsync (moveAnnex k tmpfile) >>= \case
|
Just k -> tryNonAsync (moveAnnex k tmpfile) >>= \case
|
||||||
|
|
|
@ -19,8 +19,8 @@ import Logs.Trust
|
||||||
import Utility.TimeStamp
|
import Utility.TimeStamp
|
||||||
import qualified Remote
|
import qualified Remote
|
||||||
import qualified Types.Remote as Remote
|
import qualified Types.Remote as Remote
|
||||||
|
import Config
|
||||||
import Config.DynamicConfig
|
import Config.DynamicConfig
|
||||||
import Annex.Export
|
|
||||||
|
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
import System.Posix.Types
|
import System.Posix.Types
|
||||||
|
|
|
@ -261,7 +261,7 @@ seekRemote remote branch msubdir = allowConcurrentOutput $ do
|
||||||
parentcommit <- fromtrackingbranch Git.Ref.sha
|
parentcommit <- fromtrackingbranch Git.Ref.sha
|
||||||
let importcommitconfig = ImportCommitConfig parentcommit ManualCommit importmessage
|
let importcommitconfig = ImportCommitConfig parentcommit ManualCommit importmessage
|
||||||
|
|
||||||
importable <- download importtreeconfig =<< enumerate
|
importable <- download importtreeconfig =<< listcontents
|
||||||
void $ includeCommandAction $
|
void $ includeCommandAction $
|
||||||
commitRemote remote branch tb parentcommit importtreeconfig importcommitconfig importable
|
commitRemote remote branch tb parentcommit importtreeconfig importcommitconfig importable
|
||||||
where
|
where
|
||||||
|
@ -271,8 +271,8 @@ seekRemote remote branch msubdir = allowConcurrentOutput $ do
|
||||||
|
|
||||||
fromtrackingbranch a = inRepo $ a (fromRemoteTrackingBranch tb)
|
fromtrackingbranch a = inRepo $ a (fromRemoteTrackingBranch tb)
|
||||||
|
|
||||||
enumerate = do
|
listcontents = do
|
||||||
showStart' "import" (Just (Remote.name remote))
|
showStart' "list" (Just (Remote.name remote))
|
||||||
Remote.listImportableContents (Remote.importActions remote) >>= \case
|
Remote.listImportableContents (Remote.importActions remote) >>= \case
|
||||||
Nothing -> do
|
Nothing -> do
|
||||||
showEndFail
|
showEndFail
|
||||||
|
|
|
@ -94,6 +94,12 @@ setRemoteIgnore r b = setConfig (remoteConfig r "ignore") (Git.Config.boolConfig
|
||||||
setRemoteBare :: Git.Repo -> Bool -> Annex ()
|
setRemoteBare :: Git.Repo -> Bool -> Annex ()
|
||||||
setRemoteBare r b = setConfig (remoteConfig r "bare") (Git.Config.boolConfig b)
|
setRemoteBare r b = setConfig (remoteConfig r "bare") (Git.Config.boolConfig b)
|
||||||
|
|
||||||
|
exportTree :: Remote.RemoteConfig -> Bool
|
||||||
|
exportTree c = fromMaybe False $ yesNo =<< M.lookup "exporttree" c
|
||||||
|
|
||||||
|
importTree :: Remote.RemoteConfig -> Bool
|
||||||
|
importTree c = fromMaybe False $ yesNo =<< M.lookup "importtree" c
|
||||||
|
|
||||||
isBareRepo :: Annex Bool
|
isBareRepo :: Annex Bool
|
||||||
isBareRepo = fromRepo Git.repoIsLocalBare
|
isBareRepo = fromRepo Git.repoIsLocalBare
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,6 @@ import Git.Config (isTrue, boolConfig)
|
||||||
import Git.Env
|
import Git.Env
|
||||||
import Remote.Helper.Special
|
import Remote.Helper.Special
|
||||||
import Remote.Helper.ExportImport
|
import Remote.Helper.ExportImport
|
||||||
import Annex.Export
|
|
||||||
import Remote.Helper.ReadOnly
|
import Remote.Helper.ReadOnly
|
||||||
import Remote.Helper.Messages
|
import Remote.Helper.Messages
|
||||||
import Utility.Metered
|
import Utility.Metered
|
||||||
|
|
|
@ -18,7 +18,6 @@ import Remote.Helper.Encryptable (isEncrypted)
|
||||||
import qualified Database.Export as Export
|
import qualified Database.Export as Export
|
||||||
import qualified Database.ContentIdentifier as ContentIdentifier
|
import qualified Database.ContentIdentifier as ContentIdentifier
|
||||||
import Annex.Export
|
import Annex.Export
|
||||||
import Annex.Import
|
|
||||||
import Annex.LockFile
|
import Annex.LockFile
|
||||||
import Config
|
import Config
|
||||||
import Git.Types (fromRef)
|
import Git.Types (fromRef)
|
||||||
|
|
|
@ -39,7 +39,6 @@ import Control.Concurrent.STM.TVar
|
||||||
import Annex.Common
|
import Annex.Common
|
||||||
import Types.Remote
|
import Types.Remote
|
||||||
import Types.Export
|
import Types.Export
|
||||||
import Annex.Export
|
|
||||||
import qualified Git
|
import qualified Git
|
||||||
import Config
|
import Config
|
||||||
import Config.Cost
|
import Config.Cost
|
||||||
|
|
|
@ -13,8 +13,7 @@ this.
|
||||||
* Need to support annex-tracking-branch configuration, which documentation
|
* Need to support annex-tracking-branch configuration, which documentation
|
||||||
says makes git-annex sync and assistant do imports.
|
says makes git-annex sync and assistant do imports.
|
||||||
|
|
||||||
* git-annex import needs to say when it's downloading files, display
|
* progress bars when downloading from import
|
||||||
progress bars, and support concurrent downloads.
|
|
||||||
|
|
||||||
* When on an adjusted unlocked branch, need to import the files unlocked.
|
* When on an adjusted unlocked branch, need to import the files unlocked.
|
||||||
Also, the tracking branch code needs to know about such branches,
|
Also, the tracking branch code needs to know about such branches,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue