better dup key with -J fix

This avoids all the complication about redundant work discussed in
the previous try at fixing this. At the expense of needing each command
that could have the problem to be patched to simply wrap the action in
onlyActionOn once the key is known. But there do not seem to be many
such commands.

onlyActionOn' should not be used with a CommandStart (or CommandPerform),
although the types do allow it. onlyActionOn handles running the whole
CommandStart chain. I couldn't immediately see a way to avoid mistken
use of onlyActionOn'.

This commit was supported by the NSF-funded DataLad project.
This commit is contained in:
Joey Hess 2017-10-17 17:54:38 -04:00
parent 68a49adcda
commit e1ac299ad0
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
16 changed files with 82 additions and 77 deletions

View file

@ -61,7 +61,6 @@ import Types.UUID
import Types.FileMatcher import Types.FileMatcher
import Types.NumCopies import Types.NumCopies
import Types.LockCache import Types.LockCache
import Types.Transfer
import Types.DesktopNotify import Types.DesktopNotify
import Types.CleanupActions import Types.CleanupActions
import qualified Database.Keys.Handle as Keys import qualified Database.Keys.Handle as Keys
@ -126,7 +125,6 @@ data AnnexState = AnnexState
, groupmap :: Maybe GroupMap , groupmap :: Maybe GroupMap
, ciphers :: M.Map StorableCipher Cipher , ciphers :: M.Map StorableCipher Cipher
, lockcache :: LockCache , lockcache :: LockCache
, currentprocesstransfers :: TVar (S.Set Transfer)
, sshstalecleaned :: TMVar Bool , sshstalecleaned :: TMVar Bool
, flags :: M.Map String Bool , flags :: M.Map String Bool
, fields :: M.Map String String , fields :: M.Map String String
@ -140,6 +138,7 @@ data AnnexState = AnnexState
, existinghooks :: M.Map Git.Hook.Hook Bool , existinghooks :: M.Map Git.Hook.Hook Bool
, desktopnotify :: DesktopNotify , desktopnotify :: DesktopNotify
, workers :: [Either AnnexState (Async AnnexState)] , workers :: [Either AnnexState (Async AnnexState)]
, activekeys :: TVar (M.Map Key ThreadId)
, activeremotes :: MVar (M.Map (Types.Remote.RemoteA Annex) Integer) , activeremotes :: MVar (M.Map (Types.Remote.RemoteA Annex) Integer)
, keysdbhandle :: Maybe Keys.DbHandle , keysdbhandle :: Maybe Keys.DbHandle
, cachedcurrentbranch :: Maybe Git.Branch , cachedcurrentbranch :: Maybe Git.Branch
@ -149,9 +148,9 @@ data AnnexState = AnnexState
newState :: GitConfig -> Git.Repo -> IO AnnexState newState :: GitConfig -> Git.Repo -> IO AnnexState
newState c r = do newState c r = do
emptyactiveremotes <- newMVar M.empty emptyactiveremotes <- newMVar M.empty
emptyactivekeys <- newTVarIO M.empty
o <- newMessageState o <- newMessageState
sc <- newTMVarIO False sc <- newTMVarIO False
cpt <- newTVarIO S.empty
return $ AnnexState return $ AnnexState
{ repo = r { repo = r
, repoadjustment = return , repoadjustment = return
@ -182,7 +181,6 @@ newState c r = do
, groupmap = Nothing , groupmap = Nothing
, ciphers = M.empty , ciphers = M.empty
, lockcache = M.empty , lockcache = M.empty
, currentprocesstransfers = cpt
, sshstalecleaned = sc , sshstalecleaned = sc
, flags = M.empty , flags = M.empty
, fields = M.empty , fields = M.empty
@ -196,6 +194,7 @@ newState c r = do
, existinghooks = M.empty , existinghooks = M.empty
, desktopnotify = mempty , desktopnotify = mempty
, workers = [] , workers = []
, activekeys = emptyactivekeys
, activeremotes = emptyactiveremotes , activeremotes = emptyactiveremotes
, keysdbhandle = Nothing , keysdbhandle = Nothing
, cachedcurrentbranch = Nothing , cachedcurrentbranch = Nothing

View file

@ -32,9 +32,7 @@ import qualified Types.Remote as Remote
import Types.Concurrency import Types.Concurrency
import Control.Concurrent import Control.Concurrent
import Control.Concurrent.STM
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
import qualified Data.Set as S
import Data.Ord import Data.Ord
class Observable a where class Observable a where
@ -91,8 +89,7 @@ alwaysRunTransfer :: Observable v => Transfer -> AssociatedFile -> RetryDecider
alwaysRunTransfer = runTransfer' True alwaysRunTransfer = runTransfer' True
runTransfer' :: Observable v => Bool -> Transfer -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> Annex v runTransfer' :: Observable v => Bool -> Transfer -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> Annex v
runTransfer' ignorelock t afile shouldretry transferaction = runTransfer' ignorelock t afile shouldretry transferaction = checkSecureHashes t $ do
checkSecureHashes t $ currentProcessTransfer t $ do
info <- liftIO $ startTransferInfo afile info <- liftIO $ startTransferInfo afile
(meter, tfile, metervar) <- mkProgressUpdater t info (meter, tfile, metervar) <- mkProgressUpdater t info
mode <- annexFileMode mode <- annexFileMode
@ -102,7 +99,7 @@ runTransfer' ignorelock t afile shouldretry transferaction =
showNote "transfer already in progress, or unable to take transfer lock" showNote "transfer already in progress, or unable to take transfer lock"
return observeFailure return observeFailure
else do else do
v <- handleretry info metervar $ transferaction meter v <- retry info metervar $ transferaction meter
liftIO $ cleanup tfile lck liftIO $ cleanup tfile lck
if observeBool v if observeBool v
then removeFailedTransfer t then removeFailedTransfer t
@ -156,7 +153,7 @@ runTransfer' ignorelock t afile shouldretry transferaction =
dropLock lockhandle dropLock lockhandle
void $ tryIO $ removeFile lck void $ tryIO $ removeFile lck
#endif #endif
handleretry oldinfo metervar run = do retry oldinfo metervar run = do
v <- tryNonAsync run v <- tryNonAsync run
case v of case v of
Right b -> return b Right b -> return b
@ -165,7 +162,7 @@ runTransfer' ignorelock t afile shouldretry transferaction =
b <- getbytescomplete metervar b <- getbytescomplete metervar
let newinfo = oldinfo { bytesComplete = Just b } let newinfo = oldinfo { bytesComplete = Just b }
if shouldretry oldinfo newinfo if shouldretry oldinfo newinfo
then handleretry newinfo metervar run then retry newinfo metervar run
else return observeFailure else return observeFailure
getbytescomplete metervar getbytescomplete metervar
| transferDirection t == Upload = | transferDirection t == Upload =
@ -259,20 +256,3 @@ lessActiveFirst :: M.Map Remote Integer -> Remote -> Remote -> Ordering
lessActiveFirst active a b lessActiveFirst active a b
| Remote.cost a == Remote.cost b = comparing (`M.lookup` active) a b | Remote.cost a == Remote.cost b = comparing (`M.lookup` active) a b
| otherwise = compare a b | otherwise = compare a b
{- Runs a transfer action. Only one thread can run for a given Transfer
- at a time; other threads will block. -}
currentProcessTransfer :: Transfer -> Annex a -> Annex a
currentProcessTransfer t a = go =<< Annex.getState Annex.concurrency
where
go NonConcurrent = a
go (Concurrent _) = do
tv <- Annex.getState Annex.currentprocesstransfers
bracket_ (setup tv) (cleanup tv) a
setup tv = liftIO $ atomically $ do
s <- readTVar tv
if S.member t s
then retry
else writeTVar tv $! S.insert t s
cleanup tv = liftIO $ atomically $
modifyTVar' tv $ S.delete t

View file

@ -38,8 +38,7 @@ data DaemonStatus = DaemonStatus
, lastSanityCheck :: Maybe POSIXTime , lastSanityCheck :: Maybe POSIXTime
-- True when a scan for file transfers is running -- True when a scan for file transfers is running
, transferScanRunning :: Bool , transferScanRunning :: Bool
-- Currently running file content transfers, for both this process -- Currently running file content transfers
-- and other processes.
, currentTransfers :: TransferMap , currentTransfers :: TransferMap
-- Messages to display to the user. -- Messages to display to the user.
, alertMap :: AlertMap , alertMap :: AlertMap

View file

@ -8,6 +8,7 @@
module Assistant.Types.TransferQueue where module Assistant.Types.TransferQueue where
import Annex.Common import Annex.Common
import Types.Transfer
import Control.Concurrent.STM import Control.Concurrent.STM
import Utility.TList import Utility.TList

View file

@ -22,6 +22,7 @@ import Utility.NotificationBroadcaster
import Utility.AuthToken import Utility.AuthToken
import Utility.WebApp import Utility.WebApp
import Utility.Yesod import Utility.Yesod
import Types.Transfer
import Utility.Gpg (KeyId) import Utility.Gpg (KeyId)
import Build.SysConfig (packageversion) import Build.SysConfig (packageversion)
import Types.ScheduledActivity import Types.ScheduledActivity

View file

@ -11,8 +11,8 @@ git-annex (6.20171004) UNRELEASED; urgency=medium
where interrupting an add could result in the file being where interrupting an add could result in the file being
moved into the annex, with no symlink yet created. moved into the annex, with no symlink yet created.
* Avoid repeated checking that files passed on the command line exist. * Avoid repeated checking that files passed on the command line exist.
* Improve behavior when -J transfers multiple files that point to the * get -J/move -J/copy -J/mirror -J/sync -J: Avoid "transfer already in
same key. progress" errors when two files use the same key.
* stack.yaml: Update to lts-9.9. * stack.yaml: Update to lts-9.9.
-- Joey Hess <id@joeyh.name> Sat, 07 Oct 2017 14:11:00 -0400 -- Joey Hess <id@joeyh.name> Sat, 07 Oct 2017 14:11:00 -0400

View file

@ -1,6 +1,6 @@
{- git-annex command-line actions {- git-annex command-line actions
- -
- Copyright 2010-2015 Joey Hess <id@joeyh.name> - Copyright 2010-2017 Joey Hess <id@joeyh.name>
- -
- Licensed under the GNU GPL version 3 or higher. - Licensed under the GNU GPL version 3 or higher.
-} -}
@ -18,9 +18,12 @@ import Messages.Concurrent
import Types.Messages import Types.Messages
import Remote.List import Remote.List
import Control.Concurrent
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception (throwIO) import Control.Exception (throwIO)
import Data.Either import Data.Either
import qualified Data.Map.Strict as M
#ifdef WITH_CONCURRENTOUTPUT #ifdef WITH_CONCURRENTOUTPUT
import qualified System.Console.Regions as Regions import qualified System.Console.Regions as Regions
@ -177,3 +180,36 @@ allowConcurrentOutput a = go =<< Annex.getState Annex.concurrency
#else #else
allowConcurrentOutput = id allowConcurrentOutput = id
#endif #endif
{- Ensures that only one thread processes a key at a time.
- Other threads will block until it's done. -}
onlyActionOn :: Key -> CommandStart -> CommandStart
onlyActionOn k a = onlyActionOn' k run
where
run = do
-- Run whole action, not just start stage, so other threads
-- block until it's done.
r <- callCommandAction' a
case r of
Nothing -> return Nothing
Just r' -> return $ Just $ return $ Just $ return r'
onlyActionOn' :: Key -> Annex a -> Annex a
onlyActionOn' k a = go =<< Annex.getState Annex.concurrency
where
go NonConcurrent = a
go (Concurrent _) = do
tv <- Annex.getState Annex.activekeys
bracket (setup tv) id (const a)
setup tv = liftIO $ do
mytid <- myThreadId
atomically $ do
m <- readTVar tv
case M.lookup k m of
Just tid
| tid /= mytid -> retry
| otherwise -> return (return ())
Nothing -> do
writeTVar tv $! M.insert k mytid m
return $ liftIO $ atomically $
modifyTVar tv $ M.delete k

View file

@ -62,8 +62,8 @@ startKeys from key ai = checkFailedTransferDirection ai Download $
start' (return True) from key (AssociatedFile Nothing) ai start' (return True) from key (AssociatedFile Nothing) ai
start' :: Annex Bool -> Maybe Remote -> Key -> AssociatedFile -> ActionItem -> CommandStart start' :: Annex Bool -> Maybe Remote -> Key -> AssociatedFile -> ActionItem -> CommandStart
start' expensivecheck from key afile ai = stopUnless (not <$> inAnnex key) $ start' expensivecheck from key afile ai = onlyActionOn key $
stopUnless expensivecheck $ stopUnless (not <$> inAnnex key) $ stopUnless expensivecheck $
case from of case from of
Nothing -> go $ perform key afile Nothing -> go $ perform key afile
Just src -> Just src ->
@ -109,10 +109,9 @@ getKey' key afile = dispatch
| Remote.hasKeyCheap r = | Remote.hasKeyCheap r =
either (const False) id <$> Remote.hasKey r key either (const False) id <$> Remote.hasKey r key
| otherwise = return True | otherwise = return True
docopy r = download (Remote.uuid r) key afile forwardRetry $ \p -> docopy r witness = getViaTmp (RemoteVerify r) key $ \dest ->
ifM (inAnnex key) download (Remote.uuid r) key afile forwardRetry
( return True (\p -> do
, getViaTmp (RemoteVerify r) key $ \dest -> do
showAction $ "from " ++ Remote.name r showAction $ "from " ++ Remote.name r
Remote.retrieveKeyFile r key afile dest p Remote.retrieveKeyFile r key afile dest p
) ) witness

View file

@ -53,7 +53,7 @@ start o file k = startKey o afile k (mkActionItem afile)
afile = AssociatedFile (Just file) afile = AssociatedFile (Just file)
startKey :: MirrorOptions -> AssociatedFile -> Key -> ActionItem -> CommandStart startKey :: MirrorOptions -> AssociatedFile -> Key -> ActionItem -> CommandStart
startKey o afile key ai = case fromToOptions o of startKey o afile key ai = onlyActionOn key $ case fromToOptions o of
ToRemote r -> checkFailedTransferDirection ai Upload $ ifM (inAnnex key) ToRemote r -> checkFailedTransferDirection ai Upload $ ifM (inAnnex key)
( Command.Move.toStart False afile key ai =<< getParsed r ( Command.Move.toStart False afile key ai =<< getParsed r
, do , do

View file

@ -74,7 +74,7 @@ startKey :: MoveOptions -> Bool -> Key -> ActionItem -> CommandStart
startKey o move = start' o move (AssociatedFile Nothing) startKey o move = start' o move (AssociatedFile Nothing)
start' :: MoveOptions -> Bool -> AssociatedFile -> Key -> ActionItem -> CommandStart start' :: MoveOptions -> Bool -> AssociatedFile -> Key -> ActionItem -> CommandStart
start' o move afile key ai = start' o move afile key ai = onlyActionOn key $
case fromToOptions o of case fromToOptions o of
Right (FromRemote src) -> Right (FromRemote src) ->
checkFailedTransferDirection ai Download $ checkFailedTransferDirection ai Download $
@ -200,11 +200,8 @@ fromPerform src move key afile = do
where where
go = notifyTransfer Download afile $ go = notifyTransfer Download afile $
download (Remote.uuid src) key afile forwardRetry $ \p -> download (Remote.uuid src) key afile forwardRetry $ \p ->
ifM (inAnnex key) getViaTmp (RemoteVerify src) key $ \t ->
( return True
, getViaTmp (RemoteVerify src) key $ \t ->
Remote.retrieveKeyFile src key afile t p Remote.retrieveKeyFile src key afile t p
)
dispatch _ False = stop -- failed dispatch _ False = stop -- failed
dispatch False True = next $ return True -- copy complete dispatch False True = next $ return True -- copy complete
-- Finish by dropping from remote, taking care to verify that -- Finish by dropping from remote, taking care to verify that

View file

@ -609,7 +609,7 @@ seekSyncContent o rs = do
- Returns True if any file transfers were made. - Returns True if any file transfers were made.
-} -}
syncFile :: Either (Maybe (Bloom Key)) (Key -> Annex ()) -> [Remote] -> AssociatedFile -> Key -> Annex Bool syncFile :: Either (Maybe (Bloom Key)) (Key -> Annex ()) -> [Remote] -> AssociatedFile -> Key -> Annex Bool
syncFile ebloom rs af k = do syncFile ebloom rs af k = onlyActionOn' k $ do
locs <- Remote.keyLocations k locs <- Remote.keyLocations k
let (have, lack) = partition (\r -> Remote.uuid r `elem` locs) rs let (have, lack) = partition (\r -> Remote.uuid r `elem` locs) rs

View file

@ -9,7 +9,6 @@
module Logs.Transfer where module Logs.Transfer where
import Types
import Types.Transfer import Types.Transfer
import Types.ActionItem import Types.ActionItem
import Annex.Common import Annex.Common

View file

@ -15,8 +15,6 @@ module Types (
RemoteGitConfig(..), RemoteGitConfig(..),
Remote, Remote,
RemoteType, RemoteType,
Transfer,
TransferInfo,
) where ) where
import Annex import Annex
@ -25,9 +23,7 @@ import Types.GitConfig
import Types.Key import Types.Key
import Types.UUID import Types.UUID
import Types.Remote import Types.Remote
import Types.Transfer
type Backend = BackendA Annex type Backend = BackendA Annex
type Remote = RemoteA Annex type Remote = RemoteA Annex
type RemoteType = RemoteTypeA Annex type RemoteType = RemoteTypeA Annex
type TransferInfo = TransferInfoA Annex

View file

@ -10,7 +10,6 @@
module Types.ActionItem where module Types.ActionItem where
import Key import Key
import Types
import Types.Transfer import Types.Transfer
import Git.FilePath import Git.FilePath

View file

@ -7,13 +7,10 @@
module Types.Transfer where module Types.Transfer where
import Types.Remote import Types
import Types.Key
import Types.UUID
import Utility.PID import Utility.PID
import Utility.QuickCheck import Utility.QuickCheck
import Control.Concurrent.STM
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Control.Concurrent import Control.Concurrent
import Control.Applicative import Control.Applicative
@ -33,18 +30,18 @@ data Transfer = Transfer
- git repository. It's some file, possibly relative to some directory, - git repository. It's some file, possibly relative to some directory,
- of some repository, that was acted on to initiate the transfer. - of some repository, that was acted on to initiate the transfer.
-} -}
data TransferInfoA a = TransferInfo data TransferInfo = TransferInfo
{ startedTime :: Maybe POSIXTime { startedTime :: Maybe POSIXTime
, transferPid :: Maybe PID , transferPid :: Maybe PID
, transferTid :: Maybe ThreadId , transferTid :: Maybe ThreadId
, transferRemote :: Maybe (RemoteA a) , transferRemote :: Maybe Remote
, bytesComplete :: Maybe Integer , bytesComplete :: Maybe Integer
, associatedFile :: AssociatedFile , associatedFile :: AssociatedFile
, transferPaused :: Bool , transferPaused :: Bool
} }
deriving (Show, Eq, Ord) deriving (Show, Eq, Ord)
stubTransferInfo :: TransferInfoA a stubTransferInfo :: TransferInfo
stubTransferInfo = TransferInfo Nothing Nothing Nothing Nothing Nothing (AssociatedFile Nothing) False stubTransferInfo = TransferInfo Nothing Nothing Nothing Nothing Nothing (AssociatedFile Nothing) False
data Direction = Upload | Download data Direction = Upload | Download
@ -59,7 +56,7 @@ parseDirection "upload" = Just Upload
parseDirection "download" = Just Download parseDirection "download" = Just Download
parseDirection _ = Nothing parseDirection _ = Nothing
instance Arbitrary (TransferInfoA a) where instance Arbitrary TransferInfo where
arbitrary = TransferInfo arbitrary = TransferInfo
<$> arbitrary <$> arbitrary
<*> arbitrary <*> arbitrary

View file

@ -47,4 +47,6 @@ I wondered if annex should first analyze passed paths to get actual keys to be f
[[!meta author=yoh]] [[!meta author=yoh]]
> [[fixed|done]] --[[Joey]] > [[fixed|done]]; also fixed for several other commands, but the final
> fix needed each command that could have the problem to be modified, so
> there could possibly be some I missed.. --[[Joey]]