2012-07-02 20:07:39 +00:00
|
|
|
{- git-annex assistant pending transfer queue
|
|
|
|
-
|
|
|
|
- Copyright 2012 Joey Hess <joey@kitenet.net>
|
|
|
|
-
|
|
|
|
- Licensed under the GNU GPL version 3 or higher.
|
|
|
|
-}
|
|
|
|
|
2012-07-25 18:54:09 +00:00
|
|
|
module Assistant.TransferQueue (
|
|
|
|
TransferQueue,
|
|
|
|
Schedule(..),
|
|
|
|
newTransferQueue,
|
|
|
|
queueTransfers,
|
|
|
|
queueTransfer,
|
|
|
|
queueTransferAt,
|
|
|
|
getNextTransfer
|
|
|
|
) where
|
2012-07-02 20:07:39 +00:00
|
|
|
|
|
|
|
import Common.Annex
|
2012-07-05 16:21:22 +00:00
|
|
|
import Assistant.DaemonStatus
|
2012-07-02 20:07:39 +00:00
|
|
|
import Logs.Transfer
|
|
|
|
import Types.Remote
|
2012-07-05 16:21:22 +00:00
|
|
|
import qualified Remote
|
2012-07-02 20:07:39 +00:00
|
|
|
|
|
|
|
import Control.Concurrent.STM
|
|
|
|
|
2012-07-25 17:12:34 +00:00
|
|
|
{- The transfer queue consists of a channel listing the transfers to make;
|
|
|
|
- the size of the queue is also tracked -}
|
|
|
|
data TransferQueue = TransferQueue
|
|
|
|
{ queue :: TChan (Transfer, TransferInfo)
|
|
|
|
, queuesize :: TVar Integer
|
|
|
|
}
|
|
|
|
|
|
|
|
data Schedule = Next | Later
|
|
|
|
deriving (Eq)
|
2012-07-02 20:07:39 +00:00
|
|
|
|
|
|
|
newTransferQueue :: IO TransferQueue
|
2012-07-25 17:12:34 +00:00
|
|
|
newTransferQueue = atomically $ TransferQueue <$> newTChan <*> newTVar 0
|
2012-07-02 20:07:39 +00:00
|
|
|
|
2012-07-25 18:02:50 +00:00
|
|
|
stubInfo :: AssociatedFile -> Remote -> TransferInfo
|
|
|
|
stubInfo f r = TransferInfo
|
2012-07-02 20:07:39 +00:00
|
|
|
{ startedTime = Nothing
|
|
|
|
, transferPid = Nothing
|
2012-07-18 22:42:41 +00:00
|
|
|
, transferTid = Nothing
|
2012-07-25 18:02:50 +00:00
|
|
|
, transferRemote = Just r
|
2012-07-02 20:07:39 +00:00
|
|
|
, bytesComplete = Nothing
|
|
|
|
, associatedFile = f
|
|
|
|
}
|
|
|
|
|
2012-07-25 18:02:50 +00:00
|
|
|
{- Adds transfers to queue for some of the known remotes. -}
|
2012-07-25 17:12:34 +00:00
|
|
|
queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
|
|
|
|
queueTransfers schedule q daemonstatus k f direction = do
|
2012-07-17 16:17:01 +00:00
|
|
|
rs <- knownRemotes <$> getDaemonStatus daemonstatus
|
2012-07-25 17:12:34 +00:00
|
|
|
mapM_ go =<< sufficientremotes rs
|
2012-07-05 16:21:22 +00:00
|
|
|
where
|
2012-07-23 03:16:56 +00:00
|
|
|
sufficientremotes rs
|
2012-07-17 16:17:01 +00:00
|
|
|
-- Queue downloads from all remotes that
|
|
|
|
-- have the key, with the cheapest ones first.
|
|
|
|
-- More expensive ones will only be tried if
|
|
|
|
-- downloading from a cheap one fails.
|
|
|
|
| direction == Download = do
|
|
|
|
uuids <- Remote.keyLocations k
|
2012-07-23 03:16:56 +00:00
|
|
|
return $ filter (\r -> uuid r `elem` uuids) rs
|
2012-07-05 16:44:03 +00:00
|
|
|
-- TODO: Determine a smaller set of remotes that
|
|
|
|
-- can be uploaded to, in order to ensure all
|
|
|
|
-- remotes can access the content. Currently,
|
|
|
|
-- send to every remote we can.
|
2012-07-23 03:16:56 +00:00
|
|
|
| otherwise = return rs
|
2012-07-05 16:21:22 +00:00
|
|
|
gentransfer r = Transfer
|
|
|
|
{ transferDirection = direction
|
|
|
|
, transferKey = k
|
2012-07-05 20:34:20 +00:00
|
|
|
, transferUUID = Remote.uuid r
|
2012-07-05 16:21:22 +00:00
|
|
|
}
|
2012-07-25 18:02:50 +00:00
|
|
|
go r = liftIO $ atomically $
|
|
|
|
enqueue schedule q (gentransfer r) (stubInfo f r)
|
2012-07-25 17:12:34 +00:00
|
|
|
|
|
|
|
enqueue :: Schedule -> TransferQueue -> Transfer -> TransferInfo -> STM ()
|
|
|
|
enqueue schedule q t info
|
|
|
|
| schedule == Next = go unGetTChan
|
|
|
|
| otherwise = go writeTChan
|
|
|
|
where
|
|
|
|
go a = do
|
|
|
|
void $ a (queue q) (t, info)
|
|
|
|
void $ modifyTVar' (queuesize q) succ
|
2012-07-05 16:21:22 +00:00
|
|
|
|
2012-07-25 17:12:34 +00:00
|
|
|
{- Adds a transfer to the queue. -}
|
2012-07-25 18:02:50 +00:00
|
|
|
queueTransfer :: Schedule -> TransferQueue -> AssociatedFile -> Transfer -> Remote -> IO ()
|
|
|
|
queueTransfer schedule q f t remote = atomically $
|
|
|
|
enqueue schedule q t (stubInfo f remote)
|
2012-07-02 20:07:39 +00:00
|
|
|
|
2012-07-25 17:12:34 +00:00
|
|
|
{- Blocks until the queue is no larger than a given size, and then adds a
|
|
|
|
- transfer to the queue. -}
|
2012-07-25 18:02:50 +00:00
|
|
|
queueTransferAt :: Integer -> Schedule -> TransferQueue -> AssociatedFile -> Transfer -> Remote -> IO ()
|
|
|
|
queueTransferAt wantsz schedule q f t remote = atomically $ do
|
2012-07-25 17:12:34 +00:00
|
|
|
sz <- readTVar (queuesize q)
|
|
|
|
if sz <= wantsz
|
2012-07-25 18:02:50 +00:00
|
|
|
then enqueue schedule q t (stubInfo f remote)
|
2012-07-25 17:12:34 +00:00
|
|
|
else retry -- blocks until queuesize changes
|
2012-07-02 20:07:39 +00:00
|
|
|
|
2012-07-25 17:12:34 +00:00
|
|
|
{- Blocks until a pending transfer is available from the queue. -}
|
2012-07-02 20:07:39 +00:00
|
|
|
getNextTransfer :: TransferQueue -> IO (Transfer, TransferInfo)
|
2012-07-25 17:12:34 +00:00
|
|
|
getNextTransfer q = atomically $ do
|
|
|
|
void $ modifyTVar' (queuesize q) pred
|
|
|
|
readTChan (queue q)
|