196 lines
6.6 KiB
Haskell
196 lines
6.6 KiB
Haskell
{- git-annex assistant thread to scan remotes to find needed transfers
|
|
-
|
|
- Copyright 2012 Joey Hess <id@joeyh.name>
|
|
-
|
|
- Licensed under the GNU AGPL version 3 or higher.
|
|
-}
|
|
|
|
module Assistant.Threads.TransferScanner where
|
|
|
|
import Assistant.Common
|
|
import Assistant.Types.ScanRemotes
|
|
import Assistant.ScanRemotes
|
|
import Assistant.TransferQueue
|
|
import Assistant.DaemonStatus
|
|
import Assistant.Drop
|
|
import Assistant.Sync
|
|
import Assistant.DeleteRemote
|
|
import Assistant.Types.UrlRenderer
|
|
import Types.Transfer
|
|
import Logs.Transfer
|
|
import Logs.Location
|
|
import Logs.Group
|
|
import qualified Remote
|
|
import qualified Types.Remote as Remote
|
|
import Utility.ThreadScheduler
|
|
import Utility.NotificationBroadcaster
|
|
import Utility.Batch
|
|
import qualified Git.LsFiles as LsFiles
|
|
import Annex.WorkTree
|
|
import Annex.Content
|
|
import Annex.Wanted
|
|
import CmdLine.Action
|
|
import Types.Command
|
|
|
|
import qualified Data.Set as S
|
|
import Control.Concurrent
|
|
|
|
{- This thread waits until a remote needs to be scanned, to find transfers
|
|
- that need to be made, to keep data in sync.
|
|
-}
|
|
transferScannerThread :: UrlRenderer -> NamedThread
|
|
transferScannerThread urlrenderer = namedThread "TransferScanner" $ do
|
|
startupScan
|
|
go S.empty
|
|
where
|
|
go scanned = do
|
|
scanrunning False
|
|
liftIO $ threadDelaySeconds (Seconds 2)
|
|
(rs, infos) <- unzip <$> getScanRemote
|
|
scanrunning True
|
|
if any fullScan infos || any (`S.notMember` scanned) rs
|
|
then do
|
|
expensiveScan urlrenderer rs
|
|
go $ scanned `S.union` S.fromList rs
|
|
else do
|
|
mapM_ failedTransferScan rs
|
|
go scanned
|
|
scanrunning b = do
|
|
ds <- modifyDaemonStatus $ \s ->
|
|
(s { transferScanRunning = b }, s)
|
|
liftIO $ sendNotification $ transferNotifier ds
|
|
|
|
{- All git remotes are synced, all exports are updated,
|
|
- and all available remotes are scanned in full on startup,
|
|
- for multiple reasons, including:
|
|
-
|
|
- * This may be the first run, and there may be remotes
|
|
- already in place, that need to be synced.
|
|
- * Changes may have been made last time we run, but remotes were
|
|
- not available to be synced with.
|
|
- * Changes may have been made to remotes while we were down.
|
|
- * We may have run before, and scanned a remote, but
|
|
- only been in a subdirectory of the git remote, and so
|
|
- not synced it all.
|
|
- * We may have run before, and had transfers queued,
|
|
- and then the system (or us) crashed, and that info was
|
|
- lost.
|
|
- * A remote may be in the unwanted group, and this is a chance
|
|
- to determine if the remote has been emptied.
|
|
-}
|
|
startupScan = do
|
|
reconnectRemotes =<< syncGitRemotes <$> getDaemonStatus
|
|
addScanRemotes True =<< scannableRemotes
|
|
|
|
{- This is a cheap scan for failed transfers involving a remote. -}
|
|
failedTransferScan :: Remote -> Assistant ()
|
|
failedTransferScan r = do
|
|
failed <- liftAnnex $ clearFailedTransfers (Remote.uuid r)
|
|
mapM_ retry failed
|
|
where
|
|
retry (t, info)
|
|
| transferDirection t == Download =
|
|
{- Check if the remote still has the key.
|
|
- If not, relies on the expensiveScan to
|
|
- get it queued from some other remote. -}
|
|
whenM (liftAnnex $ remoteHas r $ transferKey t) $
|
|
requeue t info
|
|
| otherwise =
|
|
{- The Transferrer checks when uploading
|
|
- that the remote doesn't already have the
|
|
- key, so it's not redundantly checked here. -}
|
|
requeue t info
|
|
requeue t info = queueTransferWhenSmall "retrying failed transfer" (associatedFile info) t r
|
|
|
|
{- This is a expensive scan through the full git work tree, finding
|
|
- files to transfer. The scan is blocked when the transfer queue gets
|
|
- too large.
|
|
-
|
|
- This also finds files that are present either here or on a remote
|
|
- but that are not preferred content, and drops them. Searching for files
|
|
- to drop is done concurrently with the scan for transfers.
|
|
-
|
|
- TODO: It would be better to first drop as much as we can, before
|
|
- transferring much, to minimise disk use.
|
|
-
|
|
- During the scan, we'll also check if any unwanted repositories are empty,
|
|
- and can be removed. While unrelated, this is a cheap place to do it,
|
|
- since we need to look at the locations of all keys anyway.
|
|
-}
|
|
expensiveScan :: UrlRenderer -> [Remote] -> Assistant ()
|
|
expensiveScan urlrenderer rs = batch <~> do
|
|
debug ["starting scan of", show visiblers]
|
|
|
|
let us = map Remote.uuid rs
|
|
|
|
mapM_ (liftAnnex . clearFailedTransfers) us
|
|
|
|
unwantedrs <- liftAnnex $ S.fromList
|
|
<$> filterM inUnwantedGroup us
|
|
|
|
g <- liftAnnex gitRepo
|
|
(files, cleanup) <- liftIO $ LsFiles.inRepo [] [] g
|
|
removablers <- scan unwantedrs files
|
|
void $ liftIO cleanup
|
|
|
|
debug ["finished scan of", show visiblers]
|
|
|
|
remove <- asIO1 $ removableRemote urlrenderer
|
|
liftIO $ mapM_ (void . tryNonAsync . remove) $ S.toList removablers
|
|
where
|
|
visiblers = let rs' = filter (not . Remote.readonly) rs
|
|
in if null rs' then rs else rs'
|
|
|
|
scan unwanted [] = return unwanted
|
|
scan unwanted (f:fs) = do
|
|
(unwanted', ts) <- maybe
|
|
(return (unwanted, []))
|
|
(findtransfers f unwanted)
|
|
=<< liftAnnex (lookupKey f)
|
|
mapM_ (enqueue f) ts
|
|
|
|
{- Delay for a short time to avoid using too much CPU. -}
|
|
liftIO $ threadDelay $ fromIntegral $ oneSecond `div` 200
|
|
|
|
scan unwanted' fs
|
|
|
|
enqueue f (r, t) =
|
|
queueTransferWhenSmall "expensive scan found missing object"
|
|
(AssociatedFile (Just f)) t r
|
|
findtransfers f unwanted key = do
|
|
let af = AssociatedFile (Just f)
|
|
locs <- liftAnnex $ loggedLocations key
|
|
present <- liftAnnex $ inAnnex key
|
|
let slocs = S.fromList locs
|
|
|
|
{- The remotes may have changed since this scan began. -}
|
|
syncrs <- syncDataRemotes <$> getDaemonStatus
|
|
let use l a = mapMaybe (a key slocs) . l <$> getDaemonStatus
|
|
|
|
liftAnnex $ handleDropsFrom locs syncrs
|
|
"expensive scan found too many copies of object"
|
|
present key af (SeekInput []) [] callCommandAction
|
|
ts <- if present
|
|
then liftAnnex . filterM (wantGetBy NoLiveUpdate True (Just key) af . Remote.uuid . fst)
|
|
=<< use syncDataRemotes (genTransfer Upload False)
|
|
else ifM (liftAnnex $ wantGet NoLiveUpdate True (Just key) af)
|
|
( use downloadRemotes (genTransfer Download True) , return [] )
|
|
let unwanted' = S.difference unwanted slocs
|
|
return (unwanted', ts)
|
|
|
|
-- Both syncDataRemotes and exportRemotes can be scanned.
|
|
-- The downloadRemotes list contains both.
|
|
scannableRemotes :: Assistant [Remote]
|
|
scannableRemotes = downloadRemotes <$> getDaemonStatus
|
|
|
|
genTransfer :: Direction -> Bool -> Key -> S.Set UUID -> Remote -> Maybe (Remote, Transfer)
|
|
genTransfer direction want key slocs r
|
|
| direction == Upload && Remote.readonly r = Nothing
|
|
| S.member (Remote.uuid r) slocs == want = Just
|
|
(r, Transfer direction (Remote.uuid r) (fromKey id key))
|
|
| otherwise = Nothing
|
|
|
|
remoteHas :: Remote -> Key -> Annex Bool
|
|
remoteHas r key = elem
|
|
<$> pure (Remote.uuid r)
|
|
<*> loggedLocations key
|