add TransferScanner thread
Efficiently finding transfers that need to be done to get two repos back in sync seems like an interesting problem.
This commit is contained in:
6 changed files with 138 additions and 43 deletions
@ -36,8 +36,7 @@
- inotify threads associated with it, too.)
- Thread 9: transfer watcher
- Watches for transfer information files being created and removed,
- and maintains the DaemonStatus currentTransfers map and the
- TransferSlots QSemN.
- and maintains the DaemonStatus currentTransfers map.
- (This uses inotify on .git/annex/transfer/, so there are
- additional inotify threads associated with it, too.)
- Thread 10: transferrer
@ -49,8 +48,14 @@
- Thread 13: mount watcher
- Either uses dbus to watch for drive mount events, or, when
- there's no dbus, polls to find newly mounted filesystems.
- Once a filesystem that contains a remote is mounted, syncs
- with it.
- Once a filesystem that contains a remote is mounted, updates
- state about that remote, pulls from it, and queues a push to it,
- as well as an update, and queues it onto the
- ConnectedRemoteChan
- Thread 14: transfer scanner
- Does potentially expensive checks to find data that needs to be
- transferred from or to remotes, and queues Transfers.
- Uses the ScanRemotes map.
- ThreadState: (MVar)
- The Annex state is stored here, which allows resuscitating the
@ -78,6 +83,9 @@
- to block until a slot is available.
- This MVar should only be manipulated from inside the Annex monad,
- which ensures it's accessed only after the ThreadState MVar.
- ScanRemotes (STM TMVar)
- Remotes that have been disconnected, and should be scanned
- are indicated by writing to this TMVar.
module Assistant where
@ -88,6 +96,7 @@ import Assistant.DaemonStatus
import Assistant.Changes
import Assistant.Commits
import Assistant.Pushes
import Assistant.ScanRemotes
import Assistant.TransferQueue
import Assistant.TransferSlots
import Assistant.Threads.Watcher
@ -98,6 +107,7 @@ import Assistant.Threads.TransferWatcher
import Assistant.Threads.Transferrer
import Assistant.Threads.SanityChecker
import Assistant.Threads.MountWatcher
import Assistant.Threads.TransferScanner
import qualified Utility.Daemon
import Utility.LogFile
import Utility.ThreadScheduler
@ -124,6 +134,7 @@ startDaemon assistant foreground
pushmap <- newFailedPushMap
transferqueue <- newTransferQueue
transferslots <- newTransferSlots
scanremotes <- newScanRemoteMap
mapM_ forkIO
[ commitThread st changechan commitchan transferqueue dstatus
, pushThread st dstatus commitchan pushmap
@ -133,7 +144,8 @@ startDaemon assistant foreground
, transfererThread st dstatus transferqueue transferslots
, daemonStatusThread st dstatus
, sanityCheckerThread st dstatus transferqueue changechan
, mountWatcherThread st dstatus
, mountWatcherThread st dstatus scanremotes
, transferScannerThread st scanremotes transferqueue
, watchThread st dstatus transferqueue changechan
debug "assistant"
Normal file
Normal file
@ -0,0 +1,41 @@
{- git-annex assistant remotes needing scanning
- Copyright 2012 Joey Hess <>
- Licensed under the GNU GPL version 3 or higher.
module Assistant.ScanRemotes where
import Common.Annex
import Data.Function
import Control.Concurrent.STM
import Data.Time.Clock
import qualified Data.Map as M
type ScanRemoteMap = TMVar (M.Map Remote UTCTime)
{- The TMVar starts empty, and is left empty when there are no remotes
- to scan. -}
newScanRemoteMap :: IO ScanRemoteMap
newScanRemoteMap = atomically newEmptyTMVar
{- Blocks until there is a remote that needs to be scanned.
- Processes remotes added most recently first. -}
getScanRemote :: ScanRemoteMap -> IO Remote
getScanRemote v = atomically $ do
m <- takeTMVar v
let newest = Prelude.head $ reverse $
map fst $ sortBy (compare `on` snd) $ M.toList m
putTMVar v $ M.delete newest m
return newest
{- Adds new remotes that need scanning to the map. -}
addScanRemotes :: ScanRemoteMap -> [Remote] -> IO ()
addScanRemotes _ [] = return ()
addScanRemotes v rs = do
now <- getCurrentTime
atomically $ do
m <- fromMaybe M.empty <$> tryTakeTMVar v
putTMVar v $ foldr (`M.insert` now) m rs
@ -13,6 +13,8 @@ module Assistant.Threads.MountWatcher where
import Assistant.Common
import Assistant.ThreadedMonad
import Assistant.DaemonStatus
import Assistant.ScanRemotes
import Assistant.Threads.Pusher (pushToRemotes)
import qualified Annex
import qualified Git
import Utility.ThreadScheduler
@ -27,6 +29,7 @@ import Logs.Remote
import Control.Concurrent
import qualified Control.Exception as E
import qualified Data.Set as S
import Data.Time.Clock
import DBus.Client
@ -39,18 +42,18 @@ import Data.Word (Word32)
thisThread :: ThreadName
thisThread = "MountWatcher"
mountWatcherThread :: ThreadState -> DaemonStatusHandle -> IO ()
mountWatcherThread st handle =
mountWatcherThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
mountWatcherThread st handle scanremotes =
dbusThread st handle
dbusThread st handle scanremotes
pollingThread st handle
pollingThread st handle scanremotes
dbusThread :: ThreadState -> DaemonStatusHandle -> IO ()
dbusThread st dstatus = E.catch (go =<< connectSession) onerr
dbusThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
dbusThread st dstatus scanremotes = E.catch (go =<< connectSession) onerr
go client = ifM (checkMountMonitor client)
( do
@ -63,7 +66,7 @@ dbusThread st dstatus = E.catch (go =<< connectSession) onerr
listen client matcher $ \_event -> do
nowmounted <- currentMountPoints
wasmounted <- swapMVar mvar nowmounted
handleMounts st dstatus wasmounted nowmounted
handleMounts st dstatus scanremotes wasmounted nowmounted
, do
runThreadState st $
warning "No known volume monitor available through dbus; falling back to mtab polling"
@ -74,7 +77,7 @@ dbusThread st dstatus = E.catch (go =<< connectSession) onerr
runThreadState st $
warning $ "Failed to use dbus; falling back to mtab polling (" ++ show e ++ ")"
pollinstead = pollingThread st dstatus
pollinstead = pollingThread st dstatus scanremotes
type ServiceName = String
@ -140,30 +143,32 @@ mountAdded = [gvfs, kde]
pollingThread :: ThreadState -> DaemonStatusHandle -> IO ()
pollingThread st dstatus = go =<< currentMountPoints
pollingThread :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> IO ()
pollingThread st dstatus scanremotes = go =<< currentMountPoints
go wasmounted = do
threadDelaySeconds (Seconds 10)
nowmounted <- currentMountPoints
handleMounts st dstatus wasmounted nowmounted
handleMounts st dstatus scanremotes wasmounted nowmounted
go nowmounted
handleMounts :: ThreadState -> DaemonStatusHandle -> MountPoints -> MountPoints -> IO ()
handleMounts st dstatus wasmounted nowmounted = mapM_ (handleMount st dstatus) $
handleMounts :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> MountPoints -> MountPoints -> IO ()
handleMounts st dstatus scanremotes wasmounted nowmounted = mapM_ (handleMount st dstatus scanremotes) $
S.toList $ newMountPoints wasmounted nowmounted
handleMount :: ThreadState -> DaemonStatusHandle -> Mntent -> IO ()
handleMount st dstatus mntent = do
handleMount :: ThreadState -> DaemonStatusHandle -> ScanRemoteMap -> Mntent -> IO ()
handleMount st dstatus scanremotes mntent = do
debug thisThread ["detected mount of", mnt_dir mntent]
rs <- remotesUnder st dstatus mntent
unless (null rs) $ do
branch <- runThreadState st $ Command.Sync.currentBranch
let pullrs = filter Git.repoIsLocal rs
debug thisThread ["pulling from", show pullrs]
runThreadState st $ manualPull branch pullrs
-- TODO queue transfers for new files in both directions
let nonspecial = filter (Git.repoIsLocal . Remote.repo) rs
unless (null nonspecial) $ do
debug thisThread ["pulling from", show nonspecial]
runThreadState st $ manualPull branch nonspecial
now <- getCurrentTime
pushToRemotes thisThread now st Nothing nonspecial
addScanRemotes scanremotes rs
{- Finds remotes located underneath the mount point.
@ -1,4 +1,4 @@
{- git-annex assistant git pushing threads
{- git-annex assistant git pushing thread
- Copyright 2012 Joey Hess <>
@ -36,7 +36,7 @@ pushRetryThread st pushmap = runEvery (Seconds halfhour) $ do
, "failed pushes"
now <- getCurrentTime
pushToRemotes now st pushmap topush
pushToRemotes thisThread now st (Just pushmap) topush
halfhour = 1800
@ -53,7 +53,7 @@ pushThread st daemonstatus commitchan pushmap = do
then do
remotes <- runThreadState st $
knownRemotes <$> getDaemonStatus daemonstatus
pushToRemotes now st pushmap remotes
pushToRemotes thisThread now st (Just pushmap) remotes
else do
debug thisThread
[ "delaying push of"
@ -78,24 +78,27 @@ shouldPush _now commits
- Avoids running possibly long-duration commands in the Annex monad, so
- as not to block other threads. -}
pushToRemotes :: UTCTime -> ThreadState -> FailedPushMap -> [Remote] -> IO ()
pushToRemotes now st pushmap remotes = do
pushToRemotes :: ThreadName -> UTCTime -> ThreadState -> (Maybe FailedPushMap) -> [Remote] -> IO ()
pushToRemotes threadname now st mpushmap remotes = do
(g, branch) <- runThreadState st $
(,) <$> fromRepo id <*> Command.Sync.currentBranch
go True branch g remotes
go shouldretry branch g rs = do
debug thisThread
debug threadname
[ "pushing to"
, show rs
Command.Sync.updateBranch (Command.Sync.syncBranch branch) g
(succeeded, failed) <- inParallel (push g branch) rs
changeFailedPushMap pushmap $ \m ->
M.union (makemap failed) $
M.difference m (makemap succeeded)
case mpushmap of
Nothing -> noop
Just pushmap ->
changeFailedPushMap pushmap $ \m ->
M.union (makemap failed) $
M.difference m (makemap succeeded)
unless (null failed) $
debug thisThread
debug threadname
[ "failed to push to"
, show failed
@ -109,6 +112,6 @@ pushToRemotes now st pushmap remotes = do
( exitSuccess, exitFailure)
retry branch g rs = do
debug thisThread [ "trying manual pull to resolve failed pushes" ]
debug threadname [ "trying manual pull to resolve failed pushes" ]
runThreadState st $ manualPull branch rs
go False branch g rs
Normal file
Normal file
@ -0,0 +1,34 @@
{- git-annex assistant thread to scan remotes to find needed transfers
- Copyright 2012 Joey Hess <>
- Licensed under the GNU GPL version 3 or higher.
module Assistant.Threads.TransferScanner where
import Assistant.Common
import Assistant.ScanRemotes
import Assistant.TransferQueue
import Assistant.ThreadedMonad
import Logs.Transfer
import Types.Remote
import Utility.ThreadScheduler
thisThread :: ThreadName
thisThread = "TransferScanner"
{- This thread scans remotes, to find transfers that need to be made to
- keep their data in sync. The transfers are queued with lot priority. -}
transferScannerThread :: ThreadState -> ScanRemoteMap -> TransferQueue -> IO ()
transferScannerThread st scanremotes transferqueue = do
runEvery (Seconds 2) $ do
r <- getScanRemote scanremotes
needtransfer <- scan st r
forM_ needtransfer $ \(f, t) ->
queueLaterTransfer transferqueue f t
scan :: ThreadState -> Remote -> IO [(AssociatedFile, Transfer)]
scan st r = do
debug thisThread ["scanning", show r]
return [] -- TODO
@ -38,19 +38,19 @@ queueTransfers q daemonstatus k f direction = do
mapM_ (\r -> queue r $ gentransfer r)
=<< sufficientremotes rs
sufficientremotes l
sufficientremotes rs
-- Queue downloads from all remotes that
-- have the key, with the cheapest ones first.
-- More expensive ones will only be tried if
-- downloading from a cheap one fails.
| direction == Download = do
uuids <- Remote.keyLocations k
return $ filter (\r -> uuid r `elem` uuids) l
return $ filter (\r -> uuid r `elem` uuids) rs
-- TODO: Determine a smaller set of remotes that
-- can be uploaded to, in order to ensure all
-- remotes can access the content. Currently,
-- send to every remote we can.
| otherwise = return l
| otherwise = return rs
gentransfer r = Transfer
{ transferDirection = direction
, transferKey = k
@ -60,12 +60,12 @@ queueTransfers q daemonstatus k f direction = do
let info = (stubInfo f) { transferRemote = Just r }
writeTChan q (t, info)
{- Adds a pending transfer to the end of the queue. -}
queueTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO ()
queueTransfer q f t = void $ atomically $
{- Adds a transfer to the end of the queue, to be processed later. -}
queueLaterTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO ()
queueLaterTransfer q f t = void $ atomically $
writeTChan q (t, stubInfo f)
{- Adds a pending transfer to the start of the queue, to be processed next. -}
{- Adds a transfer to the start of the queue, to be processed next. -}
queueNextTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO ()
queueNextTransfer q f t = void $ atomically $
unGetTChan q (t, stubInfo f)
Reference in a new issue