From 715a9a2f8e788ffe0bc92bc02919a1825bda49a7 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Thu, 23 Aug 2012 15:22:23 -0400 Subject: [PATCH] keep logs of failed transfers, and requeue them when doing a non-full scan of a remote --- Assistant/ScanRemotes.hs | 27 ++++++----- Assistant/Sync.hs | 19 ++++---- Assistant/Threads/TransferScanner.hs | 44 +++++++++++++----- Assistant/TransferQueue.hs | 4 ++ Command/Move.hs | 9 ++-- Command/RecvKey.hs | 5 +- Command/SendKey.hs | 13 +++++- Fields.hs | 6 +-- Logs/Transfer.hs | 69 ++++++++++++++++++++-------- 9 files changed, 132 insertions(+), 64 deletions(-) diff --git a/Assistant/ScanRemotes.hs b/Assistant/ScanRemotes.hs index 524dc200b5..2920e89c38 100644 --- a/Assistant/ScanRemotes.hs +++ b/Assistant/ScanRemotes.hs @@ -14,9 +14,12 @@ import Data.Function import Control.Concurrent.STM import qualified Data.Map as M -type Priority = Int +data ScanInfo = ScanInfo + { scanPriority :: Int + , fullScan :: Bool + } -type ScanRemoteMap = TMVar (M.Map Remote Priority) +type ScanRemoteMap = TMVar (M.Map Remote ScanInfo) {- The TMVar starts empty, and is left empty when there are no remotes - to scan. -} @@ -25,21 +28,23 @@ newScanRemoteMap = atomically newEmptyTMVar {- Blocks until there is a remote that needs to be scanned. - Processes higher priority remotes first. -} -getScanRemote :: ScanRemoteMap -> IO Remote +getScanRemote :: ScanRemoteMap -> IO (Remote, ScanInfo) getScanRemote v = atomically $ do m <- takeTMVar v - let l = reverse $ map fst $ sortBy (compare `on` snd) $ M.toList m + let l = reverse $ sortBy (compare `on` scanPriority . snd) $ M.toList m case l of [] -> retry -- should never happen - (newest:_) -> do - let m' = M.delete newest m + (ret@(r, _):_) -> do + let m' = M.delete r m unless (M.null m') $ putTMVar v m' - return newest + return ret {- Adds new remotes that need scanning to the map. -} -addScanRemotes :: ScanRemoteMap -> [Remote] -> IO () -addScanRemotes _ [] = noop -addScanRemotes v rs = atomically $ do +addScanRemotes :: ScanRemoteMap -> [Remote] -> Bool -> IO () +addScanRemotes _ [] _ = noop +addScanRemotes v rs full = atomically $ do m <- fromMaybe M.empty <$> tryTakeTMVar v - putTMVar v $ M.union m $ M.fromList $ map (\r -> (r, Remote.cost r)) rs + putTMVar v $ M.union (M.fromList $ zip rs (map info rs)) m + where + info r = ScanInfo (Remote.cost r) full diff --git a/Assistant/Sync.hs b/Assistant/Sync.hs index 42f82e9ab2..6a586e0976 100644 --- a/Assistant/Sync.hs +++ b/Assistant/Sync.hs @@ -26,10 +26,11 @@ import qualified Data.Map as M {- Syncs with remotes that may have been disconnected for a while. - - - After getting git in sync, queues a scan for file transfers. - - To avoid doing that expensive scan unnecessarily, it's only run - - if the git-annex branches of the remotes have diverged from the - - local git-annex branch. + - First gets git in sync, and then prepares any necessary file transfers. + - + - An expensive full scan is queued when the git-annex branches of the + - remotes have diverged from the local git-annex branch. Otherwise, + - it's sufficient to requeue failed transfers. -} reconnectRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> [Remote] -> IO () reconnectRemotes _ _ _ _ [] = noop @@ -38,16 +39,14 @@ reconnectRemotes threadname st dstatus scanremotes rs = void $ sync =<< runThreadState st (inRepo Git.Branch.current) where sync (Just branch) = do - haddiverged <- manualPull st (Just branch) rs - when haddiverged $ - addScanRemotes scanremotes rs + diverged <- manualPull st (Just branch) rs + addScanRemotes scanremotes rs diverged now <- getCurrentTime pushToRemotes threadname now st Nothing rs {- No local branch exists yet, but we can try pulling. -} sync Nothing = do - haddiverged <- manualPull st Nothing rs - when haddiverged $ - addScanRemotes scanremotes rs + diverged <- manualPull st Nothing rs + addScanRemotes scanremotes rs diverged return True {- Updates the local sync branch, then pushes it to all remotes, in diff --git a/Assistant/Threads/TransferScanner.hs b/Assistant/Threads/TransferScanner.hs index 6bef2a6f10..b3222edb4c 100644 --- a/Assistant/Threads/TransferScanner.hs +++ b/Assistant/Threads/TransferScanner.hs @@ -30,24 +30,45 @@ thisThread = "TransferScanner" transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> IO () transferScannerThread st dstatus scanremotes transferqueue = do runEvery (Seconds 2) $ do - r <- getScanRemote scanremotes - liftIO $ debug thisThread ["starting scan of", show r] - void $ alertWhile dstatus (scanAlert r) $ - scan st dstatus transferqueue r - liftIO $ debug thisThread ["finished scan of", show r] + (r, info) <- getScanRemote scanremotes + scanned <- runThreadState st $ inRepo $ + checkTransferScanned $ Remote.uuid r + if not scanned || fullScan info + then do + liftIO $ debug thisThread ["starting scan of", show r] + void $ alertWhile dstatus (scanAlert r) $ + expensiveScan st dstatus transferqueue r + liftIO $ debug thisThread ["finished scan of", show r] + runThreadState st $ inRepo $ + transferScanned $ Remote.uuid r + else failedTransferScan st dstatus transferqueue r -{- This is a naive scan through the git work tree. +{- This is a cheap scan for failed transfers involving a remote. -} +failedTransferScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Remote -> IO () +failedTransferScan st dstatus transferqueue r = do + ts <- runThreadState st $ + getFailedTransfers $ Remote.uuid r + go ts + where + go [] = noop + go ((t, info):ts) = do + queueTransferWhenSmall + transferqueue dstatus (associatedFile info) t r + void $ runThreadState st $ inRepo $ + liftIO . tryIO . removeFile . failedTransferFile t + go ts + +{- This is a expensive scan through the full git work tree. - - The scan is blocked when the transfer queue gets too large. -} -scan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Remote -> IO Bool -scan st dstatus transferqueue r = do +expensiveScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Remote -> IO Bool +expensiveScan st dstatus transferqueue r = do g <- runThreadState st $ fromRepo id files <- LsFiles.inRepo [] g go files - inRepo $ transferScanned $ uuid r return True where - go [] = return () + go [] = noop go (f:fs) = do v <- runThreadState st $ whenAnnexed check f case v of @@ -67,8 +88,7 @@ scan st dstatus transferqueue r = do | otherwise = return Nothing u = Remote.uuid r - enqueue f t = queueTransferAt smallsize Later transferqueue dstatus (Just f) t r - smallsize = 10 + enqueue f t = queueTransferWhenSmall transferqueue dstatus (Just f) t r {- Look directly in remote for the key when it's cheap; - otherwise rely on the location log. -} diff --git a/Assistant/TransferQueue.hs b/Assistant/TransferQueue.hs index aa61925278..18719de8ee 100644 --- a/Assistant/TransferQueue.hs +++ b/Assistant/TransferQueue.hs @@ -13,6 +13,7 @@ module Assistant.TransferQueue ( queueTransfers, queueTransfer, queueTransferAt, + queueTransferWhenSmall, getNextTransfer, dequeueTransfer, ) where @@ -115,6 +116,9 @@ queueTransferAt wantsz schedule q dstatus f t remote = do else retry -- blocks until queuesize changes enqueue schedule q dstatus t (stubInfo f remote) +queueTransferWhenSmall :: TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO () +queueTransferWhenSmall = queueTransferAt 10 Later + {- Blocks until a pending transfer is available from the queue, - and removes it. - diff --git a/Command/Move.hs b/Command/Move.hs index e7c11e80d3..7955cecd3c 100644 --- a/Command/Move.hs +++ b/Command/Move.hs @@ -135,13 +135,12 @@ fromPerform :: Remote -> Bool -> Key -> FilePath -> CommandPerform fromPerform src move key file = moveLock move key $ ifM (inAnnex key) ( handle move True - , download (Remote.uuid src) key (Just file) $ do - showAction $ "from " ++ Remote.name src - ok <- getViaTmp key $ - Remote.retrieveKeyFile src key (Just file) - handle move ok + , handle move =<< go ) where + go = download (Remote.uuid src) key (Just file) $ do + showAction $ "from " ++ Remote.name src + getViaTmp key $ Remote.retrieveKeyFile src key (Just file) handle _ False = stop -- failed handle False True = next $ return True -- copy complete handle True True = do -- finish moving diff --git a/Command/RecvKey.hs b/Command/RecvKey.hs index ce8bff9975..0606f9c510 100644 --- a/Command/RecvKey.hs +++ b/Command/RecvKey.hs @@ -13,6 +13,7 @@ import CmdLine import Annex.Content import Utility.RsyncFile import Logs.Transfer +import Command.SendKey (fieldTransfer) def :: [Command] def = [oneShot $ command "recvkey" paramKey seek @@ -30,7 +31,7 @@ start key = ifM (inAnnex key) -- forcibly quit after receiving one key, -- and shutdown cleanly _ <- shutdown True - liftIO exitSuccess - , liftIO exitFailure + return True + , return False ) ) diff --git a/Command/SendKey.hs b/Command/SendKey.hs index 5eca70d24c..8f914b5ed1 100644 --- a/Command/SendKey.hs +++ b/Command/SendKey.hs @@ -12,6 +12,7 @@ import Command import Annex.Content import Utility.RsyncFile import Logs.Transfer +import qualified Fields def :: [Command] def = [oneShot $ command "sendkey" paramKey seek @@ -24,9 +25,17 @@ start :: Key -> CommandStart start key = ifM (inAnnex key) ( fieldTransfer Upload key $ do file <- inRepo $ gitAnnexLocation key - liftIO $ ifM (rsyncServerSend file) - ( exitSuccess , exitFailure ) + liftIO $ rsyncServerSend file , do warning "requested key is not present" liftIO exitFailure ) + +fieldTransfer :: Direction -> Key -> Annex Bool -> CommandStart +fieldTransfer direction key a = do + afile <- Fields.getField Fields.associatedFile + ok <- maybe a (\u -> runTransfer (Transfer direction (toUUID u) key) afile a) + =<< Fields.getField Fields.remoteUUID + if ok + then liftIO exitSuccess + else liftIO exitFailure diff --git a/Fields.hs b/Fields.hs index 38427ad057..145a8adca1 100644 --- a/Fields.hs +++ b/Fields.hs @@ -18,6 +18,9 @@ data Field = Field , fieldCheck :: String -> Bool } +getField :: Field -> Annex (Maybe String) +getField = Annex.getField . fieldName + remoteUUID :: Field remoteUUID = Field "remoteuuid" $ -- does it look like a UUID? @@ -27,6 +30,3 @@ associatedFile :: Field associatedFile = Field "associatedfile" $ \f -> -- is the file a safe relative filename? not (isAbsolute f) && not ("../" `isPrefixOf` f) - -getField :: Field -> Annex (Maybe String) -getField = Annex.getField . fieldName diff --git a/Logs/Transfer.hs b/Logs/Transfer.hs index 4e43929fcf..b412ccd3ea 100644 --- a/Logs/Transfer.hs +++ b/Logs/Transfer.hs @@ -13,7 +13,6 @@ import Annex.Exception import qualified Git import Types.Remote import Types.Key -import qualified Fields import Utility.Percentage import System.Posix.Types @@ -66,23 +65,20 @@ percentComplete (Transfer { transferKey = key }) (TransferInfo { bytesComplete = (\size -> percentage size complete) <$> keySize key percentComplete _ _ = Nothing -upload :: UUID -> Key -> AssociatedFile -> Annex a -> Annex a +upload :: UUID -> Key -> AssociatedFile -> Annex Bool -> Annex Bool upload u key file a = runTransfer (Transfer Upload u key) file a -download :: UUID -> Key -> AssociatedFile -> Annex a -> Annex a +download :: UUID -> Key -> AssociatedFile -> Annex Bool -> Annex Bool download u key file a = runTransfer (Transfer Download u key) file a -fieldTransfer :: Direction -> Key -> Annex a -> Annex a -fieldTransfer direction key a = do - afile <- Fields.getField Fields.associatedFile - maybe a (\u -> runTransfer (Transfer direction (toUUID u) key) afile a) - =<< Fields.getField Fields.remoteUUID - {- Runs a transfer action. Creates and locks the lock file while the - action is running, and stores info in the transfer information - file. Will throw an error if the transfer is already in progress. + - + - If the transfer action returns False, the transfer info is + - left in the failedTransferDir. -} -runTransfer :: Transfer -> Maybe FilePath -> Annex a -> Annex a +runTransfer :: Transfer -> Maybe FilePath -> Annex Bool -> Annex Bool runTransfer t file a = do tfile <- fromRepo $ transferFile t createAnnexDirectory $ takeDirectory tfile @@ -95,21 +91,28 @@ runTransfer t file a = do <*> pure Nothing <*> pure file <*> pure False - bracketIO (prep tfile mode info) (cleanup tfile) a + let content = writeTransferInfo info + ok <- bracketIO (prep tfile mode content) (cleanup tfile) a + unless ok $ failed content + return ok where - prep tfile mode info = do + prep tfile mode content = do fd <- openFd (transferLockFile tfile) ReadWrite (Just mode) defaultFileFlags { trunc = True } locked <- catchMaybeIO $ setLock fd (WriteLock, AbsoluteSeek, 0, 0) when (locked == Nothing) $ error $ "transfer already in progress" - writeFile tfile $ writeTransferInfo info + writeFile tfile content return fd cleanup tfile fd = do void $ tryIO $ removeFile tfile void $ tryIO $ removeFile $ transferLockFile tfile closeFd fd + failed content = do + failedtfile <- fromRepo $ failedTransferFile t + createAnnexDirectory $ takeDirectory failedtfile + liftIO $ writeFile failedtfile content {- If a transfer is still running, returns its TransferInfo. -} checkTransfer :: Transfer -> Annex (Maybe TransferInfo) @@ -128,7 +131,7 @@ checkTransfer t = do Nothing -> return Nothing Just (pid, _) -> liftIO $ flip catchDefaultIO Nothing $ do - readTransferInfo pid + readTransferInfo (Just pid) <$> readFile tfile {- Gets all currently running transfers. -} @@ -140,15 +143,35 @@ getTransfers = do filter running $ zip transfers infos where findfiles = liftIO . mapM dirContentsRecursive - =<< mapM (fromRepo . transferDir) [Upload, Download] + =<< mapM (fromRepo . transferDir) + [Download, Upload] running (_, i) = isJust i +{- Gets failed transfers for a given remote UUID. -} +getFailedTransfers :: UUID -> Annex [(Transfer, TransferInfo)] +getFailedTransfers u = catMaybes <$> (liftIO . getpairs =<< concat <$> findfiles) + where + getpairs = mapM $ \f -> do + let mt = parseTransferFile f + mi <- readTransferInfo Nothing <$> readFile f + return $ case (mt, mi) of + (Just t, Just i) -> Just (t, i) + _ -> Nothing + findfiles = liftIO . mapM dirContentsRecursive + =<< mapM (fromRepo . failedTransferDir u) + [Download, Upload] + {- The transfer information file to use for a given Transfer. -} transferFile :: Transfer -> Git.Repo -> FilePath transferFile (Transfer direction u key) r = transferDir direction r fromUUID u keyFile key +{- The transfer information file to use to record a failed Transfer -} +failedTransferFile :: Transfer -> Git.Repo -> FilePath +failedTransferFile (Transfer direction u key) r = failedTransferDir u direction r + keyFile key + {- The transfer lock file corresponding to a given transfer info file. -} transferLockFile :: FilePath -> FilePath transferLockFile infofile = let (d,f) = splitFileName infofile in @@ -176,12 +199,12 @@ writeTransferInfo info = unlines , fromMaybe "" $ associatedFile info -- comes last; arbitrary content ] -readTransferInfo :: ProcessID -> String -> Maybe TransferInfo -readTransferInfo pid s = +readTransferInfo :: (Maybe ProcessID) -> String -> Maybe TransferInfo +readTransferInfo mpid s = case bits of [time] -> TransferInfo <$> (Just <$> parsePOSIXTime time) - <*> pure (Just pid) + <*> pure mpid <*> pure Nothing <*> pure Nothing <*> pure Nothing @@ -200,13 +223,21 @@ parsePOSIXTime s = utcTimeToPOSIXSeconds transferDir :: Direction -> Git.Repo -> FilePath transferDir direction r = gitAnnexTransferDir r showLcDirection direction +{- The directory holding failed transfer information files for a given + - Direction and UUID -} +failedTransferDir :: UUID -> Direction -> Git.Repo -> FilePath +failedTransferDir u direction r = gitAnnexTransferDir r + "failed" + showLcDirection direction + fromUUID u + {- The directory holding remote uuids that have been scanned for transfers. -} transferScannedDir :: Git.Repo -> FilePath transferScannedDir r = gitAnnexTransferDir r "scanned" {- The file indicating whether a remote uuid has been scanned. -} transferScannedFile :: UUID -> Git.Repo -> FilePath -transferScannedFile u r = transferScannedDir r show u +transferScannedFile u r = transferScannedDir r fromUUID u {- Checks if a given remote UUID has been scanned for transfers. -} checkTransferScanned :: UUID -> Git.Repo -> IO Bool