better transfer queue management

Allow transfers to be added with blocking until the queue is sufficiently
small.

Better control over which end of the queue to add a transfer to.
This commit is contained in:
Joey Hess 2012-07-25 13:12:34 -04:00
parent 6107328a6b
commit a9dbfdf28d
4 changed files with 52 additions and 25 deletions

View file

@ -161,7 +161,7 @@ handleAdds st changechan transferqueue dstatus cs = returnWhen (null pendingadds
sha <- inRepo $ sha <- inRepo $
Git.HashObject.hashObject BlobObject link Git.HashObject.hashObject BlobObject link
stageSymlink file sha stageSymlink file sha
queueTransfers transferqueue dstatus key (Just file) Upload queueTransfers Next transferqueue dstatus key (Just file) Upload
showEndOk showEndOk
return $ Just change return $ Just change

View file

@ -18,16 +18,23 @@ import Utility.ThreadScheduler
thisThread :: ThreadName thisThread :: ThreadName
thisThread = "TransferScanner" thisThread = "TransferScanner"
{- This thread scans remotes, to find transfers that need to be made to {- This thread waits until a remote needs to be scanned, to find transfers
- keep their data in sync. The transfers are queued with low priority. -} - that need to be made, to keep data in sync.
-
- Remotes are scanned in the background; the scan is blocked when the
- transfer queue gets too large.
-}
transferScannerThread :: ThreadState -> ScanRemoteMap -> TransferQueue -> IO () transferScannerThread :: ThreadState -> ScanRemoteMap -> TransferQueue -> IO ()
transferScannerThread st scanremotes transferqueue = do transferScannerThread st scanremotes transferqueue = do
runEvery (Seconds 2) $ do runEvery (Seconds 2) $ do
r <- getScanRemote scanremotes r <- getScanRemote scanremotes
needtransfer <- scan st r needtransfer <- scan st r
forM_ needtransfer $ \(f, t) -> forM_ needtransfer $ \(f, t) ->
queueLaterTransfer transferqueue f t queueTransferAt smallsize Later transferqueue f t
where
smallsize = 10
{- -}
scan :: ThreadState -> Remote -> IO [(AssociatedFile, Transfer)] scan :: ThreadState -> Remote -> IO [(AssociatedFile, Transfer)]
scan st r = do scan st r = do
debug thisThread ["scanning", show r] debug thisThread ["scanning", show r]

View file

@ -206,7 +206,7 @@ onAddSymlink threadname file filestatus dstatus transferqueue = go =<< Backend.l
- try to get the key's content. -} - try to get the key's content. -}
checkcontent key daemonstatus checkcontent key daemonstatus
| scanComplete daemonstatus = unlessM (inAnnex key) $ | scanComplete daemonstatus = unlessM (inAnnex key) $
queueTransfers transferqueue dstatus queueTransfers Next transferqueue dstatus
key (Just file) Download key (Just file) Download
| otherwise = noop | otherwise = noop

View file

@ -15,10 +15,18 @@ import qualified Remote
import Control.Concurrent.STM import Control.Concurrent.STM
type TransferQueue = TChan (Transfer, TransferInfo) {- The transfer queue consists of a channel listing the transfers to make;
- the size of the queue is also tracked -}
data TransferQueue = TransferQueue
{ queue :: TChan (Transfer, TransferInfo)
, queuesize :: TVar Integer
}
data Schedule = Next | Later
deriving (Eq)
newTransferQueue :: IO TransferQueue newTransferQueue :: IO TransferQueue
newTransferQueue = atomically newTChan newTransferQueue = atomically $ TransferQueue <$> newTChan <*> newTVar 0
stubInfo :: AssociatedFile -> TransferInfo stubInfo :: AssociatedFile -> TransferInfo
stubInfo f = TransferInfo stubInfo f = TransferInfo
@ -30,13 +38,11 @@ stubInfo f = TransferInfo
, associatedFile = f , associatedFile = f
} }
{- Adds pending transfers to the end of the queue for some of the known {- Adds pending transfers to queue for some of the known remotes. -}
- remotes. -} queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
queueTransfers :: TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex () queueTransfers schedule q daemonstatus k f direction = do
queueTransfers q daemonstatus k f direction = do
rs <- knownRemotes <$> getDaemonStatus daemonstatus rs <- knownRemotes <$> getDaemonStatus daemonstatus
mapM_ (\r -> queue r $ gentransfer r) mapM_ go =<< sufficientremotes rs
=<< sufficientremotes rs
where where
sufficientremotes rs sufficientremotes rs
-- Queue downloads from all remotes that -- Queue downloads from all remotes that
@ -56,20 +62,34 @@ queueTransfers q daemonstatus k f direction = do
, transferKey = k , transferKey = k
, transferUUID = Remote.uuid r , transferUUID = Remote.uuid r
} }
queue r t = liftIO $ void $ atomically $ do go r = liftIO $ atomically $ do
let info = (stubInfo f) { transferRemote = Just r } let info = (stubInfo f) { transferRemote = Just r }
writeTChan q (t, info) enqueue schedule q (gentransfer r) info
{- Adds a transfer to the end of the queue, to be processed later. -} enqueue :: Schedule -> TransferQueue -> Transfer -> TransferInfo -> STM ()
queueLaterTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO () enqueue schedule q t info
queueLaterTransfer q f t = void $ atomically $ | schedule == Next = go unGetTChan
writeTChan q (t, stubInfo f) | otherwise = go writeTChan
where
go a = do
void $ a (queue q) (t, info)
void $ modifyTVar' (queuesize q) succ
{- Adds a transfer to the start of the queue, to be processed next. -} {- Adds a transfer to the queue. -}
queueNextTransfer :: TransferQueue -> AssociatedFile -> Transfer -> IO () queueTransfer :: Schedule -> TransferQueue -> AssociatedFile -> Transfer -> IO ()
queueNextTransfer q f t = void $ atomically $ queueTransfer schedule q f t = atomically $ enqueue schedule q t (stubInfo f)
unGetTChan q (t, stubInfo f)
{- Blocks until a pending transfer is available in the queue. -} {- Blocks until the queue is no larger than a given size, and then adds a
- transfer to the queue. -}
queueTransferAt :: Integer -> Schedule -> TransferQueue -> AssociatedFile -> Transfer -> IO ()
queueTransferAt wantsz schedule q f t = atomically $ do
sz <- readTVar (queuesize q)
if sz <= wantsz
then enqueue schedule q t (stubInfo f)
else retry -- blocks until queuesize changes
{- Blocks until a pending transfer is available from the queue. -}
getNextTransfer :: TransferQueue -> IO (Transfer, TransferInfo) getNextTransfer :: TransferQueue -> IO (Transfer, TransferInfo)
getNextTransfer = atomically . readTChan getNextTransfer q = atomically $ do
void $ modifyTVar' (queuesize q) pred
readTChan (queue q)