git-annex/Assistant/TransferQueue.hs

235 lines
7.6 KiB
Haskell
Raw Normal View History

2012-07-02 20:07:39 +00:00
{- git-annex assistant pending transfer queue
-
- Copyright 2012-2014 Joey Hess <id@joeyh.name>
2012-07-02 20:07:39 +00:00
-
- Licensed under the GNU AGPL version 3 or higher.
2012-07-02 20:07:39 +00:00
-}
{-# LANGUAGE BangPatterns #-}
module Assistant.TransferQueue (
TransferQueue,
Schedule(..),
newTransferQueue,
getTransferQueue,
queueTransfers,
queueTransfersMatching,
queueDeferredDownloads,
queueTransfer,
queueTransferAt,
queueTransferWhenSmall,
getNextTransfer,
getMatchingTransfers,
2012-08-29 19:56:47 +00:00
dequeueTransfers,
) where
2012-07-02 20:07:39 +00:00
import Assistant.Common
import Assistant.DaemonStatus
2012-10-30 18:34:48 +00:00
import Assistant.Types.TransferQueue
import Types.Transfer
2012-07-02 20:07:39 +00:00
import Logs.Transfer
import Types.Remote
import qualified Remote
import qualified Types.Remote as Remote
import Annex.Wanted
import Utility.TList
2012-07-02 20:07:39 +00:00
import Control.Concurrent.STM
import qualified Data.Map.Strict as M
import qualified Data.Set as S
2012-07-02 20:07:39 +00:00
type Reason = String
{- Reads the queue's content without blocking or changing it. -}
getTransferQueue :: Assistant [(Transfer, TransferInfo)]
getTransferQueue = (atomically . readTList . queuelist) <<~ transferQueue
2012-07-02 20:07:39 +00:00
stubInfo :: AssociatedFile -> Remote -> TransferInfo
stubInfo f r = stubTransferInfo
{ transferRemote = Just r
2012-07-02 20:07:39 +00:00
, associatedFile = f
}
{- Adds transfers to queue for some of the known remotes.
- Honors preferred content settings, only transferring wanted files. -}
queueTransfers :: Reason -> Schedule -> Key -> AssociatedFile -> Direction -> Assistant Bool
queueTransfers = queueTransfersMatching (const True)
{- Adds transfers to queue for some of the known remotes, that match a
- condition. Honors preferred content settings. -}
queueTransfersMatching :: (UUID -> Bool) -> Reason -> Schedule -> Key -> AssociatedFile -> Direction -> Assistant Bool
queueTransfersMatching matching reason schedule k f direction
| direction == Download = ifM (liftAnnex $ wantGet True (Just k) f)
( go
, return False
)
| otherwise = go
where
go = do
rs <- liftAnnex . selectremotes =<< getDaemonStatus
let matchingrs = filter (matching . Remote.uuid) rs
if null matchingrs
then do
defer
return False
else do
forM_ matchingrs $ \r ->
enqueue reason schedule (gentransfer r) (stubInfo f r)
return True
selectremotes st
{- Queue downloads from all remotes that
- have the key. The list of remotes is ordered with
- cheapest first. More expensive ones will only be tried
- if downloading from a cheap one fails. -}
| direction == Download = do
s <- locs
return $ filter (inset s) (downloadRemotes st)
{- Upload to all remotes that want the content and don't
- already have it. -}
| otherwise = do
s <- locs
filterM (wantGetBy True (Just k) f . Remote.uuid) $
filter (\r -> not (inset s r || Remote.readonly r))
(syncDataRemotes st)
where
locs = S.fromList . map Remote.uuid <$> Remote.keyPossibilities k
inset s r = S.member (Remote.uuid r) s
gentransfer r = Transfer
{ transferDirection = direction
, transferKeyData = fromKey id k
, transferUUID = Remote.uuid r
}
defer
{- Defer this download, as no known remote has the key. -}
| direction == Download = do
q <- getAssistant transferQueue
void $ liftIO $ atomically $
consTList (deferreddownloads q) (k, f)
| otherwise = noop
{- Queues any deferred downloads that can now be accomplished, leaving
- any others in the list to try again later. -}
queueDeferredDownloads :: Reason -> Schedule -> Assistant ()
queueDeferredDownloads reason schedule = do
q <- getAssistant transferQueue
l <- liftIO $ atomically $ readTList (deferreddownloads q)
rs <- downloadRemotes <$> getDaemonStatus
left <- filterM (queue rs) l
unless (null left) $
liftIO $ atomically $ appendTList (deferreddownloads q) left
where
queue rs (k, f) = do
uuids <- liftAnnex $ Remote.keyLocations k
let sources = filter (\r -> uuid r `elem` uuids) rs
unless (null sources) $
forM_ sources $ \r ->
2013-03-13 17:05:30 +00:00
enqueue reason schedule
(gentransfer r) (stubInfo f r)
return $ null sources
where
gentransfer r = Transfer
{ transferDirection = Download
, transferKeyData = fromKey id k
, transferUUID = Remote.uuid r
}
enqueue :: Reason -> Schedule -> Transfer -> TransferInfo -> Assistant ()
enqueue reason schedule t info
| schedule == Next = go consTList
| otherwise = go snocTList
where
go modlist = whenM (add modlist) $ do
debug [ "queued", describeTransfer t info, ": " ++ reason ]
notifyTransfer
add modlist = do
q <- getAssistant transferQueue
dstatus <- getAssistant daemonStatusHandle
liftIO $ atomically $ ifM (checkRunningTransferSTM dstatus t)
( return False
, do
l <- readTList (queuelist q)
if (t `notElem` map fst l)
then do
void $ modifyTVar' (queuesize q) succ
void $ modlist (queuelist q) (t, info)
return True
else return False
)
{- Adds a transfer to the queue. -}
queueTransfer :: Reason -> Schedule -> AssociatedFile -> Transfer -> Remote -> Assistant ()
queueTransfer reason schedule f t remote =
enqueue reason schedule t (stubInfo f remote)
2012-07-02 20:07:39 +00:00
{- Blocks until the queue is no larger than a given size, and then adds a
- transfer to the queue. -}
queueTransferAt :: Int -> Reason -> Schedule -> AssociatedFile -> Transfer -> Remote -> Assistant ()
queueTransferAt wantsz reason schedule f t remote = do
q <- getAssistant transferQueue
liftIO $ atomically $ do
sz <- readTVar (queuesize q)
2012-09-13 04:57:52 +00:00
unless (sz <= wantsz) $
retry -- blocks until queuesize changes
enqueue reason schedule t (stubInfo f remote)
2012-07-02 20:07:39 +00:00
queueTransferWhenSmall :: Reason -> AssociatedFile -> Transfer -> Remote -> Assistant ()
queueTransferWhenSmall reason = queueTransferAt 10 reason Later
{- Blocks until a pending transfer is available in the queue,
- and removes it.
-
- Checks that it's acceptable, before adding it to the
- currentTransfers map. If it's not acceptable, it's discarded.
-
- This is done in a single STM transaction, so there is no window
- where an observer sees an inconsistent status. -}
getNextTransfer :: (TransferInfo -> Bool) -> Assistant (Maybe (Transfer, TransferInfo))
getNextTransfer acceptable = do
q <- getAssistant transferQueue
dstatus <- getAssistant daemonStatusHandle
liftIO $ atomically $ do
sz <- readTVar (queuesize q)
if sz < 1
then retry -- blocks until queuesize changes
else readTList (queuelist q) >>= \case
[] -> retry -- blocks until something is queued
(r@(t,info):rest) -> do
void $ modifyTVar' (queuesize q) pred
setTList (queuelist q) rest
if acceptable info
then do
adjustTransfersSTM dstatus $
M.insert t info
return $ Just r
else return Nothing
{- Moves transfers matching a condition from the queue, to the
- currentTransfers map. -}
getMatchingTransfers :: (Transfer -> Bool) -> Assistant [(Transfer, TransferInfo)]
getMatchingTransfers c = do
q <- getAssistant transferQueue
dstatus <- getAssistant daemonStatusHandle
liftIO $ atomically $ do
ts <- dequeueTransfersSTM q c
unless (null ts) $
adjustTransfersSTM dstatus $ \m -> M.union m $ M.fromList ts
return ts
2012-08-29 19:56:47 +00:00
{- Removes transfers matching a condition from the queue, and returns the
- removed transfers. -}
dequeueTransfers :: (Transfer -> Bool) -> Assistant [(Transfer, TransferInfo)]
dequeueTransfers c = do
q <- getAssistant transferQueue
removed <- liftIO $ atomically $ dequeueTransfersSTM q c
2012-08-29 19:56:47 +00:00
unless (null removed) $
notifyTransfer
2012-08-29 19:56:47 +00:00
return removed
dequeueTransfersSTM :: TransferQueue -> (Transfer -> Bool) -> STM [(Transfer, TransferInfo)]
dequeueTransfersSTM q c = do
!(removed, ts) <- partition (c . fst) <$> readTList (queuelist q)
let !len = length ts
void $ writeTVar (queuesize q) len
setTList (queuelist q) ts
return removed