Improve behavior when -J transfers multiple files that point to the same key
After a false start, I found a fairly non-intrusive way to deal with it. Although it only handles transfers -- there may be issues with eg concurrent dropping of the same key, or other operations. There is no added overhead when -J is not used, other than an added inAnnex check. When -J is used, it has to maintain and check a small Set, which should be negligible overhead. It could output some message saying that the transfer is being done by another thread. Or it could even display the same progress info for both files that are being downloaded since they have the same content. But I opted to keep it simple, since this is rather an edge case, so it just doesn't say anything about the transfer of the file until the other thread finishes. Since the deferred transfer action still runs, actions that do more than transfer content will still get a chance to do their other work. (An example of something that needs to do such other work is P2P.Annex, where the download always needs to receive the content from the peer.) And, if the first thread fails to complete a transfer, the second thread can resume it. But, this unfortunately means that there's a risk of redundant work being done to transfer a key that just got transferred. That's not ideal, but should never cause breakage; the same thing can occur when running two separate git-annex processes. The get/move/copy/mirror --from commands had extra inAnnex checks added, inside the download actions. Without those checks, the first thread downloaded the content, and then the second thread woke up and downloaded the same content redundantly. move/copy/mirror --to is left doing redundant uploads for now. It would need a second checkPresent of the remote inside the upload to avoid them, which would be expensive. A better way to avoid redundant work needs to be found.. This commit was supported by the NSF-funded DataLad project.
This commit is contained in:
parent
594359351c
commit
68a49adcda
13 changed files with 72 additions and 32 deletions
4
Annex.hs
4
Annex.hs
|
@ -61,6 +61,7 @@ import Types.UUID
|
|||
import Types.FileMatcher
|
||||
import Types.NumCopies
|
||||
import Types.LockCache
|
||||
import Types.Transfer
|
||||
import Types.DesktopNotify
|
||||
import Types.CleanupActions
|
||||
import qualified Database.Keys.Handle as Keys
|
||||
|
@ -125,6 +126,7 @@ data AnnexState = AnnexState
|
|||
, groupmap :: Maybe GroupMap
|
||||
, ciphers :: M.Map StorableCipher Cipher
|
||||
, lockcache :: LockCache
|
||||
, currentprocesstransfers :: TVar (S.Set Transfer)
|
||||
, sshstalecleaned :: TMVar Bool
|
||||
, flags :: M.Map String Bool
|
||||
, fields :: M.Map String String
|
||||
|
@ -149,6 +151,7 @@ newState c r = do
|
|||
emptyactiveremotes <- newMVar M.empty
|
||||
o <- newMessageState
|
||||
sc <- newTMVarIO False
|
||||
cpt <- newTVarIO S.empty
|
||||
return $ AnnexState
|
||||
{ repo = r
|
||||
, repoadjustment = return
|
||||
|
@ -179,6 +182,7 @@ newState c r = do
|
|||
, groupmap = Nothing
|
||||
, ciphers = M.empty
|
||||
, lockcache = M.empty
|
||||
, currentprocesstransfers = cpt
|
||||
, sshstalecleaned = sc
|
||||
, flags = M.empty
|
||||
, fields = M.empty
|
||||
|
|
|
@ -32,7 +32,9 @@ import qualified Types.Remote as Remote
|
|||
import Types.Concurrency
|
||||
|
||||
import Control.Concurrent
|
||||
import Control.Concurrent.STM
|
||||
import qualified Data.Map.Strict as M
|
||||
import qualified Data.Set as S
|
||||
import Data.Ord
|
||||
|
||||
class Observable a where
|
||||
|
@ -89,7 +91,8 @@ alwaysRunTransfer :: Observable v => Transfer -> AssociatedFile -> RetryDecider
|
|||
alwaysRunTransfer = runTransfer' True
|
||||
|
||||
runTransfer' :: Observable v => Bool -> Transfer -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> Annex v
|
||||
runTransfer' ignorelock t afile shouldretry transferaction = checkSecureHashes t $ do
|
||||
runTransfer' ignorelock t afile shouldretry transferaction =
|
||||
checkSecureHashes t $ currentProcessTransfer t $ do
|
||||
info <- liftIO $ startTransferInfo afile
|
||||
(meter, tfile, metervar) <- mkProgressUpdater t info
|
||||
mode <- annexFileMode
|
||||
|
@ -99,7 +102,7 @@ runTransfer' ignorelock t afile shouldretry transferaction = checkSecureHashes t
|
|||
showNote "transfer already in progress, or unable to take transfer lock"
|
||||
return observeFailure
|
||||
else do
|
||||
v <- retry info metervar $ transferaction meter
|
||||
v <- handleretry info metervar $ transferaction meter
|
||||
liftIO $ cleanup tfile lck
|
||||
if observeBool v
|
||||
then removeFailedTransfer t
|
||||
|
@ -153,7 +156,7 @@ runTransfer' ignorelock t afile shouldretry transferaction = checkSecureHashes t
|
|||
dropLock lockhandle
|
||||
void $ tryIO $ removeFile lck
|
||||
#endif
|
||||
retry oldinfo metervar run = do
|
||||
handleretry oldinfo metervar run = do
|
||||
v <- tryNonAsync run
|
||||
case v of
|
||||
Right b -> return b
|
||||
|
@ -162,7 +165,7 @@ runTransfer' ignorelock t afile shouldretry transferaction = checkSecureHashes t
|
|||
b <- getbytescomplete metervar
|
||||
let newinfo = oldinfo { bytesComplete = Just b }
|
||||
if shouldretry oldinfo newinfo
|
||||
then retry newinfo metervar run
|
||||
then handleretry newinfo metervar run
|
||||
else return observeFailure
|
||||
getbytescomplete metervar
|
||||
| transferDirection t == Upload =
|
||||
|
@ -256,3 +259,20 @@ lessActiveFirst :: M.Map Remote Integer -> Remote -> Remote -> Ordering
|
|||
lessActiveFirst active a b
|
||||
| Remote.cost a == Remote.cost b = comparing (`M.lookup` active) a b
|
||||
| otherwise = compare a b
|
||||
|
||||
{- Runs a transfer action. Only one thread can run for a given Transfer
|
||||
- at a time; other threads will block. -}
|
||||
currentProcessTransfer :: Transfer -> Annex a -> Annex a
|
||||
currentProcessTransfer t a = go =<< Annex.getState Annex.concurrency
|
||||
where
|
||||
go NonConcurrent = a
|
||||
go (Concurrent _) = do
|
||||
tv <- Annex.getState Annex.currentprocesstransfers
|
||||
bracket_ (setup tv) (cleanup tv) a
|
||||
setup tv = liftIO $ atomically $ do
|
||||
s <- readTVar tv
|
||||
if S.member t s
|
||||
then retry
|
||||
else writeTVar tv $! S.insert t s
|
||||
cleanup tv = liftIO $ atomically $
|
||||
modifyTVar' tv $ S.delete t
|
||||
|
|
|
@ -38,7 +38,8 @@ data DaemonStatus = DaemonStatus
|
|||
, lastSanityCheck :: Maybe POSIXTime
|
||||
-- True when a scan for file transfers is running
|
||||
, transferScanRunning :: Bool
|
||||
-- Currently running file content transfers
|
||||
-- Currently running file content transfers, for both this process
|
||||
-- and other processes.
|
||||
, currentTransfers :: TransferMap
|
||||
-- Messages to display to the user.
|
||||
, alertMap :: AlertMap
|
||||
|
|
|
@ -8,7 +8,6 @@
|
|||
module Assistant.Types.TransferQueue where
|
||||
|
||||
import Annex.Common
|
||||
import Types.Transfer
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Utility.TList
|
||||
|
|
|
@ -22,7 +22,6 @@ import Utility.NotificationBroadcaster
|
|||
import Utility.AuthToken
|
||||
import Utility.WebApp
|
||||
import Utility.Yesod
|
||||
import Types.Transfer
|
||||
import Utility.Gpg (KeyId)
|
||||
import Build.SysConfig (packageversion)
|
||||
import Types.ScheduledActivity
|
||||
|
|
|
@ -11,6 +11,8 @@ git-annex (6.20171004) UNRELEASED; urgency=medium
|
|||
where interrupting an add could result in the file being
|
||||
moved into the annex, with no symlink yet created.
|
||||
* Avoid repeated checking that files passed on the command line exist.
|
||||
* Improve behavior when -J transfers multiple files that point to the
|
||||
same key.
|
||||
* stack.yaml: Update to lts-9.9.
|
||||
|
||||
-- Joey Hess <id@joeyh.name> Sat, 07 Oct 2017 14:11:00 -0400
|
||||
|
|
|
@ -109,9 +109,10 @@ getKey' key afile = dispatch
|
|||
| Remote.hasKeyCheap r =
|
||||
either (const False) id <$> Remote.hasKey r key
|
||||
| otherwise = return True
|
||||
docopy r witness = getViaTmp (RemoteVerify r) key $ \dest ->
|
||||
download (Remote.uuid r) key afile forwardRetry
|
||||
(\p -> do
|
||||
docopy r = download (Remote.uuid r) key afile forwardRetry $ \p ->
|
||||
ifM (inAnnex key)
|
||||
( return True
|
||||
, getViaTmp (RemoteVerify r) key $ \dest -> do
|
||||
showAction $ "from " ++ Remote.name r
|
||||
Remote.retrieveKeyFile r key afile dest p
|
||||
) witness
|
||||
)
|
||||
|
|
|
@ -200,8 +200,11 @@ fromPerform src move key afile = do
|
|||
where
|
||||
go = notifyTransfer Download afile $
|
||||
download (Remote.uuid src) key afile forwardRetry $ \p ->
|
||||
getViaTmp (RemoteVerify src) key $ \t ->
|
||||
ifM (inAnnex key)
|
||||
( return True
|
||||
, getViaTmp (RemoteVerify src) key $ \t ->
|
||||
Remote.retrieveKeyFile src key afile t p
|
||||
)
|
||||
dispatch _ False = stop -- failed
|
||||
dispatch False True = next $ return True -- copy complete
|
||||
-- Finish by dropping from remote, taking care to verify that
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
|
||||
module Logs.Transfer where
|
||||
|
||||
import Types
|
||||
import Types.Transfer
|
||||
import Types.ActionItem
|
||||
import Annex.Common
|
||||
|
|
4
Types.hs
4
Types.hs
|
@ -15,6 +15,8 @@ module Types (
|
|||
RemoteGitConfig(..),
|
||||
Remote,
|
||||
RemoteType,
|
||||
Transfer,
|
||||
TransferInfo,
|
||||
) where
|
||||
|
||||
import Annex
|
||||
|
@ -23,7 +25,9 @@ import Types.GitConfig
|
|||
import Types.Key
|
||||
import Types.UUID
|
||||
import Types.Remote
|
||||
import Types.Transfer
|
||||
|
||||
type Backend = BackendA Annex
|
||||
type Remote = RemoteA Annex
|
||||
type RemoteType = RemoteTypeA Annex
|
||||
type TransferInfo = TransferInfoA Annex
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
module Types.ActionItem where
|
||||
|
||||
import Key
|
||||
import Types
|
||||
import Types.Transfer
|
||||
import Git.FilePath
|
||||
|
||||
|
|
|
@ -7,10 +7,13 @@
|
|||
|
||||
module Types.Transfer where
|
||||
|
||||
import Types
|
||||
import Types.Remote
|
||||
import Types.Key
|
||||
import Types.UUID
|
||||
import Utility.PID
|
||||
import Utility.QuickCheck
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Data.Time.Clock.POSIX
|
||||
import Control.Concurrent
|
||||
import Control.Applicative
|
||||
|
@ -30,18 +33,18 @@ data Transfer = Transfer
|
|||
- git repository. It's some file, possibly relative to some directory,
|
||||
- of some repository, that was acted on to initiate the transfer.
|
||||
-}
|
||||
data TransferInfo = TransferInfo
|
||||
data TransferInfoA a = TransferInfo
|
||||
{ startedTime :: Maybe POSIXTime
|
||||
, transferPid :: Maybe PID
|
||||
, transferTid :: Maybe ThreadId
|
||||
, transferRemote :: Maybe Remote
|
||||
, transferRemote :: Maybe (RemoteA a)
|
||||
, bytesComplete :: Maybe Integer
|
||||
, associatedFile :: AssociatedFile
|
||||
, transferPaused :: Bool
|
||||
}
|
||||
deriving (Show, Eq, Ord)
|
||||
|
||||
stubTransferInfo :: TransferInfo
|
||||
stubTransferInfo :: TransferInfoA a
|
||||
stubTransferInfo = TransferInfo Nothing Nothing Nothing Nothing Nothing (AssociatedFile Nothing) False
|
||||
|
||||
data Direction = Upload | Download
|
||||
|
@ -56,7 +59,7 @@ parseDirection "upload" = Just Upload
|
|||
parseDirection "download" = Just Download
|
||||
parseDirection _ = Nothing
|
||||
|
||||
instance Arbitrary TransferInfo where
|
||||
instance Arbitrary (TransferInfoA a) where
|
||||
arbitrary = TransferInfo
|
||||
<$> arbitrary
|
||||
<*> arbitrary
|
||||
|
|
|
@ -46,3 +46,5 @@ so at the end we get a run of git-annex which exits with error 1... and in json
|
|||
I wondered if annex should first analyze passed paths to get actual keys to be fetched?
|
||||
|
||||
[[!meta author=yoh]]
|
||||
|
||||
> [[fixed|done]] --[[Joey]]
|
||||
|
|
Loading…
Reference in a new issue