keep logs of failed transfers, and requeue them when doing a non-full scan

of a remote
This commit is contained in:
Joey Hess 2012-08-23 15:22:23 -04:00
parent 487bdf0e24
commit 715a9a2f8e
9 changed files with 132 additions and 64 deletions

View file

@ -14,9 +14,12 @@ import Data.Function
import Control.Concurrent.STM import Control.Concurrent.STM
import qualified Data.Map as M import qualified Data.Map as M
type Priority = Int data ScanInfo = ScanInfo
{ scanPriority :: Int
, fullScan :: Bool
}
type ScanRemoteMap = TMVar (M.Map Remote Priority) type ScanRemoteMap = TMVar (M.Map Remote ScanInfo)
{- The TMVar starts empty, and is left empty when there are no remotes {- The TMVar starts empty, and is left empty when there are no remotes
- to scan. -} - to scan. -}
@ -25,21 +28,23 @@ newScanRemoteMap = atomically newEmptyTMVar
{- Blocks until there is a remote that needs to be scanned. {- Blocks until there is a remote that needs to be scanned.
- Processes higher priority remotes first. -} - Processes higher priority remotes first. -}
getScanRemote :: ScanRemoteMap -> IO Remote getScanRemote :: ScanRemoteMap -> IO (Remote, ScanInfo)
getScanRemote v = atomically $ do getScanRemote v = atomically $ do
m <- takeTMVar v m <- takeTMVar v
let l = reverse $ map fst $ sortBy (compare `on` snd) $ M.toList m let l = reverse $ sortBy (compare `on` scanPriority . snd) $ M.toList m
case l of case l of
[] -> retry -- should never happen [] -> retry -- should never happen
(newest:_) -> do (ret@(r, _):_) -> do
let m' = M.delete newest m let m' = M.delete r m
unless (M.null m') $ unless (M.null m') $
putTMVar v m' putTMVar v m'
return newest return ret
{- Adds new remotes that need scanning to the map. -} {- Adds new remotes that need scanning to the map. -}
addScanRemotes :: ScanRemoteMap -> [Remote] -> IO () addScanRemotes :: ScanRemoteMap -> [Remote] -> Bool -> IO ()
addScanRemotes _ [] = noop addScanRemotes _ [] _ = noop
addScanRemotes v rs = atomically $ do addScanRemotes v rs full = atomically $ do
m <- fromMaybe M.empty <$> tryTakeTMVar v m <- fromMaybe M.empty <$> tryTakeTMVar v
putTMVar v $ M.union m $ M.fromList $ map (\r -> (r, Remote.cost r)) rs putTMVar v $ M.union (M.fromList $ zip rs (map info rs)) m
where
info r = ScanInfo (Remote.cost r) full

View file

@ -26,10 +26,11 @@ import qualified Data.Map as M
{- Syncs with remotes that may have been disconnected for a while. {- Syncs with remotes that may have been disconnected for a while.
- -
- After getting git in sync, queues a scan for file transfers. - First gets git in sync, and then prepares any necessary file transfers.
- To avoid doing that expensive scan unnecessarily, it's only run -
- if the git-annex branches of the remotes have diverged from the - An expensive full scan is queued when the git-annex branches of the
- local git-annex branch. - remotes have diverged from the local git-annex branch. Otherwise,
- it's sufficient to requeue failed transfers.
-} -}
reconnectRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> [Remote] -> IO () reconnectRemotes :: ThreadName -> ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> [Remote] -> IO ()
reconnectRemotes _ _ _ _ [] = noop reconnectRemotes _ _ _ _ [] = noop
@ -38,16 +39,14 @@ reconnectRemotes threadname st dstatus scanremotes rs = void $
sync =<< runThreadState st (inRepo Git.Branch.current) sync =<< runThreadState st (inRepo Git.Branch.current)
where where
sync (Just branch) = do sync (Just branch) = do
haddiverged <- manualPull st (Just branch) rs diverged <- manualPull st (Just branch) rs
when haddiverged $ addScanRemotes scanremotes rs diverged
addScanRemotes scanremotes rs
now <- getCurrentTime now <- getCurrentTime
pushToRemotes threadname now st Nothing rs pushToRemotes threadname now st Nothing rs
{- No local branch exists yet, but we can try pulling. -} {- No local branch exists yet, but we can try pulling. -}
sync Nothing = do sync Nothing = do
haddiverged <- manualPull st Nothing rs diverged <- manualPull st Nothing rs
when haddiverged $ addScanRemotes scanremotes rs diverged
addScanRemotes scanremotes rs
return True return True
{- Updates the local sync branch, then pushes it to all remotes, in {- Updates the local sync branch, then pushes it to all remotes, in

View file

@ -30,24 +30,45 @@ thisThread = "TransferScanner"
transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> IO () transferScannerThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> TransferQueue -> IO ()
transferScannerThread st dstatus scanremotes transferqueue = do transferScannerThread st dstatus scanremotes transferqueue = do
runEvery (Seconds 2) $ do runEvery (Seconds 2) $ do
r <- getScanRemote scanremotes (r, info) <- getScanRemote scanremotes
liftIO $ debug thisThread ["starting scan of", show r] scanned <- runThreadState st $ inRepo $
void $ alertWhile dstatus (scanAlert r) $ checkTransferScanned $ Remote.uuid r
scan st dstatus transferqueue r if not scanned || fullScan info
liftIO $ debug thisThread ["finished scan of", show r] then do
liftIO $ debug thisThread ["starting scan of", show r]
void $ alertWhile dstatus (scanAlert r) $
expensiveScan st dstatus transferqueue r
liftIO $ debug thisThread ["finished scan of", show r]
runThreadState st $ inRepo $
transferScanned $ Remote.uuid r
else failedTransferScan st dstatus transferqueue r
{- This is a naive scan through the git work tree. {- This is a cheap scan for failed transfers involving a remote. -}
failedTransferScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Remote -> IO ()
failedTransferScan st dstatus transferqueue r = do
ts <- runThreadState st $
getFailedTransfers $ Remote.uuid r
go ts
where
go [] = noop
go ((t, info):ts) = do
queueTransferWhenSmall
transferqueue dstatus (associatedFile info) t r
void $ runThreadState st $ inRepo $
liftIO . tryIO . removeFile . failedTransferFile t
go ts
{- This is a expensive scan through the full git work tree.
- -
- The scan is blocked when the transfer queue gets too large. -} - The scan is blocked when the transfer queue gets too large. -}
scan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Remote -> IO Bool expensiveScan :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Remote -> IO Bool
scan st dstatus transferqueue r = do expensiveScan st dstatus transferqueue r = do
g <- runThreadState st $ fromRepo id g <- runThreadState st $ fromRepo id
files <- LsFiles.inRepo [] g files <- LsFiles.inRepo [] g
go files go files
inRepo $ transferScanned $ uuid r
return True return True
where where
go [] = return () go [] = noop
go (f:fs) = do go (f:fs) = do
v <- runThreadState st $ whenAnnexed check f v <- runThreadState st $ whenAnnexed check f
case v of case v of
@ -67,8 +88,7 @@ scan st dstatus transferqueue r = do
| otherwise = return Nothing | otherwise = return Nothing
u = Remote.uuid r u = Remote.uuid r
enqueue f t = queueTransferAt smallsize Later transferqueue dstatus (Just f) t r enqueue f t = queueTransferWhenSmall transferqueue dstatus (Just f) t r
smallsize = 10
{- Look directly in remote for the key when it's cheap; {- Look directly in remote for the key when it's cheap;
- otherwise rely on the location log. -} - otherwise rely on the location log. -}

View file

@ -13,6 +13,7 @@ module Assistant.TransferQueue (
queueTransfers, queueTransfers,
queueTransfer, queueTransfer,
queueTransferAt, queueTransferAt,
queueTransferWhenSmall,
getNextTransfer, getNextTransfer,
dequeueTransfer, dequeueTransfer,
) where ) where
@ -115,6 +116,9 @@ queueTransferAt wantsz schedule q dstatus f t remote = do
else retry -- blocks until queuesize changes else retry -- blocks until queuesize changes
enqueue schedule q dstatus t (stubInfo f remote) enqueue schedule q dstatus t (stubInfo f remote)
queueTransferWhenSmall :: TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
queueTransferWhenSmall = queueTransferAt 10 Later
{- Blocks until a pending transfer is available from the queue, {- Blocks until a pending transfer is available from the queue,
- and removes it. - and removes it.
- -

View file

@ -135,13 +135,12 @@ fromPerform :: Remote -> Bool -> Key -> FilePath -> CommandPerform
fromPerform src move key file = moveLock move key $ fromPerform src move key file = moveLock move key $
ifM (inAnnex key) ifM (inAnnex key)
( handle move True ( handle move True
, download (Remote.uuid src) key (Just file) $ do , handle move =<< go
showAction $ "from " ++ Remote.name src
ok <- getViaTmp key $
Remote.retrieveKeyFile src key (Just file)
handle move ok
) )
where where
go = download (Remote.uuid src) key (Just file) $ do
showAction $ "from " ++ Remote.name src
getViaTmp key $ Remote.retrieveKeyFile src key (Just file)
handle _ False = stop -- failed handle _ False = stop -- failed
handle False True = next $ return True -- copy complete handle False True = next $ return True -- copy complete
handle True True = do -- finish moving handle True True = do -- finish moving

View file

@ -13,6 +13,7 @@ import CmdLine
import Annex.Content import Annex.Content
import Utility.RsyncFile import Utility.RsyncFile
import Logs.Transfer import Logs.Transfer
import Command.SendKey (fieldTransfer)
def :: [Command] def :: [Command]
def = [oneShot $ command "recvkey" paramKey seek def = [oneShot $ command "recvkey" paramKey seek
@ -30,7 +31,7 @@ start key = ifM (inAnnex key)
-- forcibly quit after receiving one key, -- forcibly quit after receiving one key,
-- and shutdown cleanly -- and shutdown cleanly
_ <- shutdown True _ <- shutdown True
liftIO exitSuccess return True
, liftIO exitFailure , return False
) )
) )

View file

@ -12,6 +12,7 @@ import Command
import Annex.Content import Annex.Content
import Utility.RsyncFile import Utility.RsyncFile
import Logs.Transfer import Logs.Transfer
import qualified Fields
def :: [Command] def :: [Command]
def = [oneShot $ command "sendkey" paramKey seek def = [oneShot $ command "sendkey" paramKey seek
@ -24,9 +25,17 @@ start :: Key -> CommandStart
start key = ifM (inAnnex key) start key = ifM (inAnnex key)
( fieldTransfer Upload key $ do ( fieldTransfer Upload key $ do
file <- inRepo $ gitAnnexLocation key file <- inRepo $ gitAnnexLocation key
liftIO $ ifM (rsyncServerSend file) liftIO $ rsyncServerSend file
( exitSuccess , exitFailure )
, do , do
warning "requested key is not present" warning "requested key is not present"
liftIO exitFailure liftIO exitFailure
) )
fieldTransfer :: Direction -> Key -> Annex Bool -> CommandStart
fieldTransfer direction key a = do
afile <- Fields.getField Fields.associatedFile
ok <- maybe a (\u -> runTransfer (Transfer direction (toUUID u) key) afile a)
=<< Fields.getField Fields.remoteUUID
if ok
then liftIO exitSuccess
else liftIO exitFailure

View file

@ -18,6 +18,9 @@ data Field = Field
, fieldCheck :: String -> Bool , fieldCheck :: String -> Bool
} }
getField :: Field -> Annex (Maybe String)
getField = Annex.getField . fieldName
remoteUUID :: Field remoteUUID :: Field
remoteUUID = Field "remoteuuid" $ remoteUUID = Field "remoteuuid" $
-- does it look like a UUID? -- does it look like a UUID?
@ -27,6 +30,3 @@ associatedFile :: Field
associatedFile = Field "associatedfile" $ \f -> associatedFile = Field "associatedfile" $ \f ->
-- is the file a safe relative filename? -- is the file a safe relative filename?
not (isAbsolute f) && not ("../" `isPrefixOf` f) not (isAbsolute f) && not ("../" `isPrefixOf` f)
getField :: Field -> Annex (Maybe String)
getField = Annex.getField . fieldName

View file

@ -13,7 +13,6 @@ import Annex.Exception
import qualified Git import qualified Git
import Types.Remote import Types.Remote
import Types.Key import Types.Key
import qualified Fields
import Utility.Percentage import Utility.Percentage
import System.Posix.Types import System.Posix.Types
@ -66,23 +65,20 @@ percentComplete (Transfer { transferKey = key }) (TransferInfo { bytesComplete =
(\size -> percentage size complete) <$> keySize key (\size -> percentage size complete) <$> keySize key
percentComplete _ _ = Nothing percentComplete _ _ = Nothing
upload :: UUID -> Key -> AssociatedFile -> Annex a -> Annex a upload :: UUID -> Key -> AssociatedFile -> Annex Bool -> Annex Bool
upload u key file a = runTransfer (Transfer Upload u key) file a upload u key file a = runTransfer (Transfer Upload u key) file a
download :: UUID -> Key -> AssociatedFile -> Annex a -> Annex a download :: UUID -> Key -> AssociatedFile -> Annex Bool -> Annex Bool
download u key file a = runTransfer (Transfer Download u key) file a download u key file a = runTransfer (Transfer Download u key) file a
fieldTransfer :: Direction -> Key -> Annex a -> Annex a
fieldTransfer direction key a = do
afile <- Fields.getField Fields.associatedFile
maybe a (\u -> runTransfer (Transfer direction (toUUID u) key) afile a)
=<< Fields.getField Fields.remoteUUID
{- Runs a transfer action. Creates and locks the lock file while the {- Runs a transfer action. Creates and locks the lock file while the
- action is running, and stores info in the transfer information - action is running, and stores info in the transfer information
- file. Will throw an error if the transfer is already in progress. - file. Will throw an error if the transfer is already in progress.
-
- If the transfer action returns False, the transfer info is
- left in the failedTransferDir.
-} -}
runTransfer :: Transfer -> Maybe FilePath -> Annex a -> Annex a runTransfer :: Transfer -> Maybe FilePath -> Annex Bool -> Annex Bool
runTransfer t file a = do runTransfer t file a = do
tfile <- fromRepo $ transferFile t tfile <- fromRepo $ transferFile t
createAnnexDirectory $ takeDirectory tfile createAnnexDirectory $ takeDirectory tfile
@ -95,21 +91,28 @@ runTransfer t file a = do
<*> pure Nothing <*> pure Nothing
<*> pure file <*> pure file
<*> pure False <*> pure False
bracketIO (prep tfile mode info) (cleanup tfile) a let content = writeTransferInfo info
ok <- bracketIO (prep tfile mode content) (cleanup tfile) a
unless ok $ failed content
return ok
where where
prep tfile mode info = do prep tfile mode content = do
fd <- openFd (transferLockFile tfile) ReadWrite (Just mode) fd <- openFd (transferLockFile tfile) ReadWrite (Just mode)
defaultFileFlags { trunc = True } defaultFileFlags { trunc = True }
locked <- catchMaybeIO $ locked <- catchMaybeIO $
setLock fd (WriteLock, AbsoluteSeek, 0, 0) setLock fd (WriteLock, AbsoluteSeek, 0, 0)
when (locked == Nothing) $ when (locked == Nothing) $
error $ "transfer already in progress" error $ "transfer already in progress"
writeFile tfile $ writeTransferInfo info writeFile tfile content
return fd return fd
cleanup tfile fd = do cleanup tfile fd = do
void $ tryIO $ removeFile tfile void $ tryIO $ removeFile tfile
void $ tryIO $ removeFile $ transferLockFile tfile void $ tryIO $ removeFile $ transferLockFile tfile
closeFd fd closeFd fd
failed content = do
failedtfile <- fromRepo $ failedTransferFile t
createAnnexDirectory $ takeDirectory failedtfile
liftIO $ writeFile failedtfile content
{- If a transfer is still running, returns its TransferInfo. -} {- If a transfer is still running, returns its TransferInfo. -}
checkTransfer :: Transfer -> Annex (Maybe TransferInfo) checkTransfer :: Transfer -> Annex (Maybe TransferInfo)
@ -128,7 +131,7 @@ checkTransfer t = do
Nothing -> return Nothing Nothing -> return Nothing
Just (pid, _) -> liftIO $ Just (pid, _) -> liftIO $
flip catchDefaultIO Nothing $ do flip catchDefaultIO Nothing $ do
readTransferInfo pid readTransferInfo (Just pid)
<$> readFile tfile <$> readFile tfile
{- Gets all currently running transfers. -} {- Gets all currently running transfers. -}
@ -140,15 +143,35 @@ getTransfers = do
filter running $ zip transfers infos filter running $ zip transfers infos
where where
findfiles = liftIO . mapM dirContentsRecursive findfiles = liftIO . mapM dirContentsRecursive
=<< mapM (fromRepo . transferDir) [Upload, Download] =<< mapM (fromRepo . transferDir)
[Download, Upload]
running (_, i) = isJust i running (_, i) = isJust i
{- Gets failed transfers for a given remote UUID. -}
getFailedTransfers :: UUID -> Annex [(Transfer, TransferInfo)]
getFailedTransfers u = catMaybes <$> (liftIO . getpairs =<< concat <$> findfiles)
where
getpairs = mapM $ \f -> do
let mt = parseTransferFile f
mi <- readTransferInfo Nothing <$> readFile f
return $ case (mt, mi) of
(Just t, Just i) -> Just (t, i)
_ -> Nothing
findfiles = liftIO . mapM dirContentsRecursive
=<< mapM (fromRepo . failedTransferDir u)
[Download, Upload]
{- The transfer information file to use for a given Transfer. -} {- The transfer information file to use for a given Transfer. -}
transferFile :: Transfer -> Git.Repo -> FilePath transferFile :: Transfer -> Git.Repo -> FilePath
transferFile (Transfer direction u key) r = transferDir direction r transferFile (Transfer direction u key) r = transferDir direction r
</> fromUUID u </> fromUUID u
</> keyFile key </> keyFile key
{- The transfer information file to use to record a failed Transfer -}
failedTransferFile :: Transfer -> Git.Repo -> FilePath
failedTransferFile (Transfer direction u key) r = failedTransferDir u direction r
</> keyFile key
{- The transfer lock file corresponding to a given transfer info file. -} {- The transfer lock file corresponding to a given transfer info file. -}
transferLockFile :: FilePath -> FilePath transferLockFile :: FilePath -> FilePath
transferLockFile infofile = let (d,f) = splitFileName infofile in transferLockFile infofile = let (d,f) = splitFileName infofile in
@ -176,12 +199,12 @@ writeTransferInfo info = unlines
, fromMaybe "" $ associatedFile info -- comes last; arbitrary content , fromMaybe "" $ associatedFile info -- comes last; arbitrary content
] ]
readTransferInfo :: ProcessID -> String -> Maybe TransferInfo readTransferInfo :: (Maybe ProcessID) -> String -> Maybe TransferInfo
readTransferInfo pid s = readTransferInfo mpid s =
case bits of case bits of
[time] -> TransferInfo [time] -> TransferInfo
<$> (Just <$> parsePOSIXTime time) <$> (Just <$> parsePOSIXTime time)
<*> pure (Just pid) <*> pure mpid
<*> pure Nothing <*> pure Nothing
<*> pure Nothing <*> pure Nothing
<*> pure Nothing <*> pure Nothing
@ -200,13 +223,21 @@ parsePOSIXTime s = utcTimeToPOSIXSeconds
transferDir :: Direction -> Git.Repo -> FilePath transferDir :: Direction -> Git.Repo -> FilePath
transferDir direction r = gitAnnexTransferDir r </> showLcDirection direction transferDir direction r = gitAnnexTransferDir r </> showLcDirection direction
{- The directory holding failed transfer information files for a given
- Direction and UUID -}
failedTransferDir :: UUID -> Direction -> Git.Repo -> FilePath
failedTransferDir u direction r = gitAnnexTransferDir r
</> "failed"
</> showLcDirection direction
</> fromUUID u
{- The directory holding remote uuids that have been scanned for transfers. -} {- The directory holding remote uuids that have been scanned for transfers. -}
transferScannedDir :: Git.Repo -> FilePath transferScannedDir :: Git.Repo -> FilePath
transferScannedDir r = gitAnnexTransferDir r </> "scanned" transferScannedDir r = gitAnnexTransferDir r </> "scanned"
{- The file indicating whether a remote uuid has been scanned. -} {- The file indicating whether a remote uuid has been scanned. -}
transferScannedFile :: UUID -> Git.Repo -> FilePath transferScannedFile :: UUID -> Git.Repo -> FilePath
transferScannedFile u r = transferScannedDir r </> show u transferScannedFile u r = transferScannedDir r </> fromUUID u
{- Checks if a given remote UUID has been scanned for transfers. -} {- Checks if a given remote UUID has been scanned for transfers. -}
checkTransferScanned :: UUID -> Git.Repo -> IO Bool checkTransferScanned :: UUID -> Git.Repo -> IO Bool