{- git-annex transfers - - Copyright 2012-2024 Joey Hess - - Licensed under the GNU AGPL version 3 or higher. -} {-# LANGUAGE CPP, BangPatterns, OverloadedStrings #-} module Annex.Transfer ( module X, upload, upload', alwaysUpload, download, download', runTransfer, alwaysRunTransfer, noRetry, stdRetry, pickRemote, ) where import Annex.Common import qualified Annex import Logs.Transfer as X import Types.Transfer as X import Annex.Notification as X import Annex.Content import Annex.Perms import Annex.Action import Utility.Metered import Utility.ThreadScheduler import Utility.FileMode import Annex.LockPool import Types.Key import qualified Types.Remote as Remote import qualified Types.Backend import Types.Concurrency import Annex.Concurrent import Types.WorkerPool import Annex.WorkerPool import Annex.TransferrerPool import Annex.StallDetection import Backend (isCryptographicallySecureKey) import Types.StallDetection import qualified Utility.RawFilePath as R import Control.Concurrent import Control.Concurrent.Async import Control.Concurrent.STM hiding (retry) import qualified Data.Map.Strict as M import qualified System.FilePath.ByteString as P import Data.Ord -- Upload, supporting canceling detected stalls. upload :: Remote -> Key -> AssociatedFile -> RetryDecider -> NotifyWitness -> Annex Bool upload r key f d witness = case getStallDetection Upload r of Nothing -> go (Just ProbeStallDetection) Just StallDetectionDisabled -> go Nothing Just sd -> runTransferrer sd r key f d Upload witness where go sd = upload' (Remote.uuid r) key f sd d (action . Remote.storeKey r key f) witness -- Upload, not supporting canceling detected stalls upload' :: Observable v => UUID -> Key -> AssociatedFile -> Maybe StallDetection -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v upload' u key f sd d a _witness = guardHaveUUID u $ runTransfer (Transfer Upload u (fromKey id key)) Nothing f sd d a alwaysUpload :: Observable v => UUID -> Key -> AssociatedFile -> Maybe StallDetection -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v alwaysUpload u key f sd d a _witness = guardHaveUUID u $ alwaysRunTransfer (Transfer Upload u (fromKey id key)) Nothing f sd d a -- Download, supporting canceling detected stalls. download :: Remote -> Key -> AssociatedFile -> RetryDecider -> NotifyWitness -> Annex Bool download r key f d witness = case getStallDetection Download r of Nothing -> go (Just ProbeStallDetection) Just StallDetectionDisabled -> go Nothing Just sd -> runTransferrer sd r key f d Download witness where go sd = getViaTmp (Remote.retrievalSecurityPolicy r) vc key f Nothing $ \dest -> download' (Remote.uuid r) key f sd d (go' dest) witness go' dest p = verifiedAction $ Remote.retrieveKeyFile r key f (fromRawFilePath dest) p vc vc = Remote.RemoteVerify r -- Download, not supporting canceling detected stalls. download' :: Observable v => UUID -> Key -> AssociatedFile -> Maybe StallDetection -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v download' u key f sd d a _witness = guardHaveUUID u $ runTransfer (Transfer Download u (fromKey id key)) Nothing f sd d a guardHaveUUID :: Observable v => UUID -> Annex v -> Annex v guardHaveUUID u a | u == NoUUID = return observeFailure | otherwise = a {- Runs a transfer action. Creates and locks the lock file while the - action is running, and stores info in the transfer information - file. - - If the transfer action returns False, the transfer info is - left in the failedTransferDir. - - If the transfer is already in progress, returns False. - - An upload can be run from a read-only filesystem, and in this case - no transfer information or lock file is used. - - Cannot cancel stalls, but when a likely stall is detected, - suggests to the user that they enable stall detection handling. -} runTransfer :: Observable v => Transfer -> Maybe Backend -> AssociatedFile -> Maybe StallDetection -> RetryDecider -> (MeterUpdate -> Annex v) -> Annex v runTransfer = runTransfer' False {- Like runTransfer, but ignores any existing transfer lock file for the - transfer, allowing re-running a transfer that is already in progress. -} alwaysRunTransfer :: Observable v => Transfer -> Maybe Backend -> AssociatedFile -> Maybe StallDetection -> RetryDecider -> (MeterUpdate -> Annex v) -> Annex v alwaysRunTransfer = runTransfer' True runTransfer' :: Observable v => Bool -> Transfer -> Maybe Backend -> AssociatedFile -> Maybe StallDetection -> RetryDecider -> (MeterUpdate -> Annex v) -> Annex v runTransfer' ignorelock t eventualbackend afile stalldetection retrydecider transferaction = enteringStage (TransferStage (transferDirection t)) $ debugLocks $ preCheckSecureHashes (transferKey t) eventualbackend go where go = do info <- liftIO $ startTransferInfo afile (tfile, lckfile, moldlckfile) <- fromRepo $ transferFileAndLockFile t (meter, createtfile, metervar) <- mkProgressUpdater t info tfile mode <- annexFileMode (lck, inprogress) <- prep lckfile moldlckfile createtfile mode if inprogress && not ignorelock then do warning "transfer already in progress, or unable to take transfer lock" return observeFailure else do v <- retry 0 info metervar $ detectStallsAndSuggestConfig stalldetection metervar $ transferaction meter liftIO $ cleanup tfile lckfile moldlckfile lck if observeBool v then removeFailedTransfer t else recordFailedTransfer t info return v prep :: RawFilePath -> Maybe RawFilePath -> Annex () -> ModeSetter -> Annex (Maybe (LockHandle, Maybe LockHandle), Bool) #ifndef mingw32_HOST_OS prep lckfile moldlckfile createtfile mode = catchPermissionDenied (const prepfailed) $ do createAnnexDirectory $ P.takeDirectory lckfile tryLockExclusive (Just mode) lckfile >>= \case Nothing -> return (Nothing, True) -- Since the lock file is removed in cleanup, -- there's a race where different processes -- may have a deleted and a new version of the same -- lock file open. checkSaneLock guards against -- that. Just lockhandle -> ifM (checkSaneLock lckfile lockhandle) ( case moldlckfile of Nothing -> do createtfile return (Just (lockhandle, Nothing), False) Just oldlckfile -> do createAnnexDirectory $ P.takeDirectory oldlckfile tryLockExclusive (Just mode) oldlckfile >>= \case Nothing -> do liftIO $ dropLock lockhandle return (Nothing, True) Just oldlockhandle -> ifM (checkSaneLock oldlckfile oldlockhandle) ( do createtfile return (Just (lockhandle, Just oldlockhandle), False) , do liftIO $ dropLock oldlockhandle liftIO $ dropLock lockhandle return (Nothing, True) ) , do liftIO $ dropLock lockhandle return (Nothing, True) ) #else prep lckfile moldlckfile createtfile _mode = catchPermissionDenied (const prepfailed) $ do createAnnexDirectory $ P.takeDirectory lckfile catchMaybeIO (liftIO $ lockExclusive lckfile) >>= \case Just (Just lockhandle) -> case moldlckfile of Nothing -> do createtfile return (Just (lockhandle, Nothing), False) Just oldlckfile -> do createAnnexDirectory $ P.takeDirectory oldlckfile catchMaybeIO (liftIO $ lockExclusive oldlckfile) >>= \case Just (Just oldlockhandle) -> do createtfile return (Just (lockhandle, Just oldlockhandle), False) _ -> do liftIO $ dropLock lockhandle return (Nothing, False) _ -> return (Nothing, False) #endif prepfailed = return (Nothing, False) cleanup _ _ _ Nothing = noop cleanup tfile lckfile moldlckfile (Just (lockhandle, moldlockhandle)) = do void $ tryIO $ R.removeLink tfile #ifndef mingw32_HOST_OS void $ tryIO $ R.removeLink lckfile maybe noop (void . tryIO . R.removeLink) moldlckfile maybe noop dropLock moldlockhandle dropLock lockhandle #else {- Windows cannot delete the lockfile until the lock - is closed. So it's possible to race with another - process that takes the lock before it's removed, - so ignore failure to remove. -} maybe noop dropLock moldlockhandle dropLock lockhandle void $ tryIO $ R.removeLink lckfile maybe noop (void . tryIO . R.removeLink) moldlckfile #endif retry numretries oldinfo metervar run = tryNonAsync run >>= \case Right v | observeBool v -> return v | otherwise -> checkretry Left e -> do warning (UnquotedString (show e)) checkretry where checkretry = do b <- getbytescomplete metervar let newinfo = oldinfo { bytesComplete = Just b } let !numretries' = succ numretries ifM (retrydecider numretries' oldinfo newinfo) ( retry numretries' newinfo metervar run , return observeFailure ) getbytescomplete metervar = liftIO $ maybe 0 fromBytesProcessed <$> readTVarIO metervar detectStallsAndSuggestConfig :: Maybe StallDetection -> TVar (Maybe BytesProcessed) -> Annex a -> Annex a detectStallsAndSuggestConfig Nothing _ a = a detectStallsAndSuggestConfig sd@(Just _) metervar a = bracket setup cleanup (const a) where setup = do v <- liftIO newEmptyTMVarIO sdt <- liftIO $ async $ detectStalls sd metervar $ void $ atomically $ tryPutTMVar v True wt <- liftIO . async =<< forkState (warnonstall v) return (v, sdt, wt) cleanup (v, sdt, wt) = do liftIO $ uninterruptibleCancel sdt void $ liftIO $ atomically $ tryPutTMVar v False join (liftIO (wait wt)) warnonstall v = whenM (liftIO (atomically (takeTMVar v))) $ warning "Transfer seems to have stalled. To restart stalled transfers, configure annex.stalldetection" {- Runs a transfer using a separate process, which lets detected stalls be - canceled. -} runTransferrer :: StallDetection -> Remote -> Key -> AssociatedFile -> RetryDecider -> Direction -> NotifyWitness -> Annex Bool runTransferrer sd r k afile retrydecider direction _witness = enteringStage (TransferStage direction) $ preCheckSecureHashes k Nothing $ do info <- liftIO $ startTransferInfo afile go 0 info where go numretries info = withTransferrer (performTransfer (Just sd) AnnexLevel id (Just r) t info) >>= \case Right () -> return True Left newinfo -> do let !numretries' = succ numretries ifM (retrydecider numretries' info newinfo) ( go numretries' newinfo , return False ) t = Transfer direction (Remote.uuid r) (fromKey id k) {- Avoid download and upload of keys with insecure content when - annex.securehashesonly is configured. - - This is not a security check. Even if this let the content be - downloaded, the actual security checks would prevent the content from - being added to the repository. The only reason this is done here is to - avoid transferring content that's going to be rejected anyway. - - We assume that, if annex.securehashesonly is set and the local repo - still contains content using an insecure hash, remotes will likewise - tend to be configured to reject it, so Upload is also prevented. -} preCheckSecureHashes :: Observable v => Key -> Maybe Backend -> Annex v -> Annex v preCheckSecureHashes k meventualbackend a = case meventualbackend of Just eventualbackend -> go (pure (Types.Backend.isCryptographicallySecure eventualbackend)) (Types.Backend.backendVariety eventualbackend) Nothing -> go (isCryptographicallySecureKey k) (fromKey keyVariety k) where go checksecure variety = ifM checksecure ( a , ifM (annexSecureHashesOnly <$> Annex.getGitConfig) ( blocked variety , a ) ) blocked variety = do warning $ UnquotedString $ "annex.securehashesonly blocked transfer of " ++ decodeBS (formatKeyVariety variety) ++ " key" return observeFailure type NumRetries = Integer type RetryDecider = NumRetries -> TransferInfo -> TransferInfo -> Annex Bool {- Both retry deciders are checked together, so if one chooses to delay, - it will always take effect. -} combineRetryDeciders :: RetryDecider -> RetryDecider -> RetryDecider combineRetryDeciders a b = \n old new -> do ar <- a n old new br <- b n old new return (ar || br) noRetry :: RetryDecider noRetry _ _ _ = pure False stdRetry :: RetryDecider stdRetry = combineRetryDeciders forwardRetry configuredRetry {- Keep retrying failed transfers, as long as forward progress is being - made. - - Up to a point -- while some remotes can resume where the previous - transfer left off, and so it would make sense to keep retrying forever, - other remotes restart each transfer from the beginning, and so even if - forward progress is being made, it's not real progress. So, retry a - maximum of 5 times by default. -} forwardRetry :: RetryDecider forwardRetry numretries old new | fromMaybe 0 (bytesComplete old) < fromMaybe 0 (bytesComplete new) = (numretries <=) <$> maybe globalretrycfg pure remoteretrycfg | otherwise = return False where globalretrycfg = fromMaybe 5 . annexForwardRetry <$> Annex.getGitConfig remoteretrycfg = remoteAnnexRetry =<< (Remote.gitconfig <$> transferRemote new) {- Retries a number of times with growing delays in between when enabled - by git configuration. -} configuredRetry :: RetryDecider configuredRetry numretries _old new = do (maxretries, Seconds initretrydelay) <- getcfg $ Remote.gitconfig <$> transferRemote new if numretries < maxretries then do let retrydelay = Seconds (initretrydelay * 2^(numretries-1)) showSideAction $ UnquotedString $ "Delaying " ++ show (fromSeconds retrydelay) ++ "s before retrying." liftIO $ threadDelaySeconds retrydelay return True else return False where globalretrycfg = fromMaybe 0 . annexRetry <$> Annex.getGitConfig globalretrydelaycfg = fromMaybe (Seconds 1) . annexRetryDelay <$> Annex.getGitConfig getcfg Nothing = (,) <$> globalretrycfg <*> globalretrydelaycfg getcfg (Just gc) = (,) <$> maybe globalretrycfg return (remoteAnnexRetry gc) <*> maybe globalretrydelaycfg return (remoteAnnexRetryDelay gc) {- Picks a remote from the list and tries a transfer to it. If the transfer - does not succeed, goes on to try other remotes from the list. - - The list should already be ordered by remote cost, and is normally - tried in order. However, when concurrent jobs are running, they will - be assigned different remotes of the same cost when possible. This can - increase total transfer speed. -} pickRemote :: Observable v => [Remote] -> (Remote -> Annex v) -> Annex v pickRemote l a = debugLocks $ go l =<< getConcurrency where go [] _ = return observeFailure go (r:[]) _ = a r go rs NonConcurrent = gononconcurrent rs go rs (Concurrent n) | n <= 1 = gononconcurrent rs | otherwise = goconcurrent rs go rs ConcurrentPerCpu = goconcurrent rs gononconcurrent [] = return observeFailure gononconcurrent (r:rs) = do ok <- a r if observeBool ok then return ok else gononconcurrent rs goconcurrent rs = do mv <- Annex.getRead Annex.activeremotes active <- liftIO $ takeMVar mv let rs' = sortBy (lessActiveFirst active) rs goconcurrent' mv active rs' goconcurrent' mv active [] = do liftIO $ putMVar mv active return observeFailure goconcurrent' mv active (r:rs) = do let !active' = M.insertWith (+) r 1 active liftIO $ putMVar mv active' let getnewactive = do active'' <- liftIO $ takeMVar mv let !active''' = M.update (\n -> if n > 1 then Just (n-1) else Nothing) r active'' return active''' let removeactive = liftIO . putMVar mv =<< getnewactive ok <- a r `onException` removeactive if observeBool ok then do removeactive return ok else do active'' <- getnewactive -- Re-sort the remaining rs -- because other threads could have -- been assigned them in the meantime. let rs' = sortBy (lessActiveFirst active'') rs goconcurrent' mv active'' rs' lessActiveFirst :: M.Map Remote Integer -> Remote -> Remote -> Ordering lessActiveFirst active a b | Remote.cost a == Remote.cost b = comparing (`M.lookup` active) a b | otherwise = comparing Remote.cost a b