finished pushing Assistant monad into all relevant files
All temporary and old functions are removed.
This commit is contained in:
parent
47d94eb9a4
commit
93ffd47d76
26 changed files with 262 additions and 301 deletions
|
@ -21,9 +21,8 @@ module Assistant.TransferQueue (
|
|||
dequeueTransfers,
|
||||
) where
|
||||
|
||||
import Common.Annex
|
||||
import Assistant.Common
|
||||
import Assistant.DaemonStatus
|
||||
import Assistant.Types.DaemonStatus
|
||||
import Assistant.Types.TransferQueue
|
||||
import Logs.Transfer
|
||||
import Types.Remote
|
||||
|
@ -35,8 +34,8 @@ import Control.Concurrent.STM
|
|||
import qualified Data.Map as M
|
||||
|
||||
{- Reads the queue's content without blocking or changing it. -}
|
||||
getTransferQueue :: TransferQueue -> IO [(Transfer, TransferInfo)]
|
||||
getTransferQueue q = atomically $ readTVar $ queuelist q
|
||||
getTransferQueue :: Assistant [(Transfer, TransferInfo)]
|
||||
getTransferQueue = (atomically . readTVar . queuelist) <<~ transferQueue
|
||||
|
||||
stubInfo :: AssociatedFile -> Remote -> TransferInfo
|
||||
stubInfo f r = stubTransferInfo
|
||||
|
@ -46,101 +45,104 @@ stubInfo f r = stubTransferInfo
|
|||
|
||||
{- Adds transfers to queue for some of the known remotes.
|
||||
- Honors preferred content settings, only transferring wanted files. -}
|
||||
queueTransfers :: Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
|
||||
queueTransfers :: Schedule -> Key -> AssociatedFile -> Direction -> Assistant ()
|
||||
queueTransfers = queueTransfersMatching (const True)
|
||||
|
||||
{- Adds transfers to queue for some of the known remotes, that match a
|
||||
- condition. Honors preferred content settings. -}
|
||||
queueTransfersMatching :: (UUID -> Bool) -> Schedule -> TransferQueue -> DaemonStatusHandle -> Key -> AssociatedFile -> Direction -> Annex ()
|
||||
queueTransfersMatching matching schedule q dstatus k f direction
|
||||
| direction == Download = whenM (wantGet f) go
|
||||
queueTransfersMatching :: (UUID -> Bool) -> Schedule -> Key -> AssociatedFile -> Direction -> Assistant ()
|
||||
queueTransfersMatching matching schedule k f direction
|
||||
| direction == Download = whenM (liftAnnex $ wantGet f) go
|
||||
| otherwise = go
|
||||
where
|
||||
go = do
|
||||
rs <- sufficientremotes
|
||||
=<< syncRemotes <$> liftIO (getDaemonStatusOld dstatus)
|
||||
let matchingrs = filter (matching . Remote.uuid) rs
|
||||
if null matchingrs
|
||||
then defer
|
||||
else forM_ matchingrs $ \r -> liftIO $
|
||||
enqueue schedule q dstatus (gentransfer r) (stubInfo f r)
|
||||
sufficientremotes rs
|
||||
{- Queue downloads from all remotes that
|
||||
- have the key, with the cheapest ones first.
|
||||
- More expensive ones will only be tried if
|
||||
- downloading from a cheap one fails. -}
|
||||
| direction == Download = do
|
||||
uuids <- Remote.keyLocations k
|
||||
return $ filter (\r -> uuid r `elem` uuids) rs
|
||||
{- Upload to all remotes that want the content. -}
|
||||
| otherwise = filterM (wantSend f . Remote.uuid) $
|
||||
filter (not . Remote.readonly) rs
|
||||
gentransfer r = Transfer
|
||||
{ transferDirection = direction
|
||||
, transferKey = k
|
||||
, transferUUID = Remote.uuid r
|
||||
}
|
||||
defer
|
||||
{- Defer this download, as no known remote has the key. -}
|
||||
| direction == Download = void $ liftIO $ atomically $
|
||||
modifyTVar' (deferreddownloads q) $
|
||||
\l -> (k, f):l
|
||||
| otherwise = noop
|
||||
where
|
||||
go = do
|
||||
rs <- liftAnnex . sufficientremotes
|
||||
=<< syncRemotes <$> getDaemonStatus
|
||||
let matchingrs = filter (matching . Remote.uuid) rs
|
||||
if null matchingrs
|
||||
then defer
|
||||
else forM_ matchingrs $ \r ->
|
||||
enqueue schedule (gentransfer r) (stubInfo f r)
|
||||
sufficientremotes rs
|
||||
{- Queue downloads from all remotes that
|
||||
- have the key, with the cheapest ones first.
|
||||
- More expensive ones will only be tried if
|
||||
- downloading from a cheap one fails. -}
|
||||
| direction == Download = do
|
||||
uuids <- Remote.keyLocations k
|
||||
return $ filter (\r -> uuid r `elem` uuids) rs
|
||||
{- Upload to all remotes that want the content. -}
|
||||
| otherwise = filterM (wantSend f . Remote.uuid) $
|
||||
filter (not . Remote.readonly) rs
|
||||
gentransfer r = Transfer
|
||||
{ transferDirection = direction
|
||||
, transferKey = k
|
||||
, transferUUID = Remote.uuid r
|
||||
}
|
||||
defer
|
||||
{- Defer this download, as no known remote has the key. -}
|
||||
| direction == Download = do
|
||||
q <- getAssistant transferQueue
|
||||
void $ liftIO $ atomically $
|
||||
modifyTVar' (deferreddownloads q) $
|
||||
\l -> (k, f):l
|
||||
| otherwise = noop
|
||||
|
||||
{- Queues any deferred downloads that can now be accomplished, leaving
|
||||
- any others in the list to try again later. -}
|
||||
queueDeferredDownloads :: Schedule -> TransferQueue -> DaemonStatusHandle -> Annex ()
|
||||
queueDeferredDownloads schedule q dstatus = do
|
||||
queueDeferredDownloads :: Schedule -> Assistant ()
|
||||
queueDeferredDownloads schedule = do
|
||||
q <- getAssistant transferQueue
|
||||
l <- liftIO $ atomically $ swapTVar (deferreddownloads q) []
|
||||
rs <- syncRemotes <$> liftIO (getDaemonStatusOld dstatus)
|
||||
rs <- syncRemotes <$> getDaemonStatus
|
||||
left <- filterM (queue rs) l
|
||||
unless (null left) $
|
||||
liftIO $ atomically $ modifyTVar' (deferreddownloads q) $
|
||||
\new -> new ++ left
|
||||
where
|
||||
queue rs (k, f) = do
|
||||
uuids <- Remote.keyLocations k
|
||||
let sources = filter (\r -> uuid r `elem` uuids) rs
|
||||
unless (null sources) $
|
||||
forM_ sources $ \r -> liftIO $
|
||||
enqueue schedule q dstatus
|
||||
(gentransfer r) (stubInfo f r)
|
||||
return $ null sources
|
||||
where
|
||||
gentransfer r = Transfer
|
||||
{ transferDirection = Download
|
||||
, transferKey = k
|
||||
, transferUUID = Remote.uuid r
|
||||
}
|
||||
where
|
||||
queue rs (k, f) = do
|
||||
uuids <- liftAnnex $ Remote.keyLocations k
|
||||
let sources = filter (\r -> uuid r `elem` uuids) rs
|
||||
unless (null sources) $
|
||||
forM_ sources $ \r ->
|
||||
enqueue schedule (gentransfer r) (stubInfo f r)
|
||||
return $ null sources
|
||||
where
|
||||
gentransfer r = Transfer
|
||||
{ transferDirection = Download
|
||||
, transferKey = k
|
||||
, transferUUID = Remote.uuid r
|
||||
}
|
||||
|
||||
enqueue :: Schedule -> TransferQueue -> DaemonStatusHandle -> Transfer -> TransferInfo -> IO ()
|
||||
enqueue schedule q dstatus t info
|
||||
enqueue :: Schedule -> Transfer -> TransferInfo -> Assistant ()
|
||||
enqueue schedule t info
|
||||
| schedule == Next = go (new:)
|
||||
| otherwise = go (\l -> l++[new])
|
||||
where
|
||||
new = (t, info)
|
||||
go modlist = do
|
||||
atomically $ do
|
||||
void $ modifyTVar' (queuesize q) succ
|
||||
void $ modifyTVar' (queuelist q) modlist
|
||||
void $ notifyTransferOld dstatus
|
||||
where
|
||||
new = (t, info)
|
||||
go modlist = do
|
||||
q <- getAssistant transferQueue
|
||||
liftIO $ atomically $ do
|
||||
void $ modifyTVar' (queuesize q) succ
|
||||
void $ modifyTVar' (queuelist q) modlist
|
||||
notifyTransfer
|
||||
|
||||
{- Adds a transfer to the queue. -}
|
||||
queueTransfer :: Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
|
||||
queueTransfer schedule q dstatus f t remote =
|
||||
enqueue schedule q dstatus t (stubInfo f remote)
|
||||
queueTransfer :: Schedule -> AssociatedFile -> Transfer -> Remote -> Assistant ()
|
||||
queueTransfer schedule f t remote = enqueue schedule t (stubInfo f remote)
|
||||
|
||||
{- Blocks until the queue is no larger than a given size, and then adds a
|
||||
- transfer to the queue. -}
|
||||
queueTransferAt :: Int -> Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
|
||||
queueTransferAt wantsz schedule q dstatus f t remote = do
|
||||
atomically $ do
|
||||
queueTransferAt :: Int -> Schedule -> AssociatedFile -> Transfer -> Remote -> Assistant ()
|
||||
queueTransferAt wantsz schedule f t remote = do
|
||||
q <- getAssistant transferQueue
|
||||
liftIO $ atomically $ do
|
||||
sz <- readTVar (queuesize q)
|
||||
unless (sz <= wantsz) $
|
||||
retry -- blocks until queuesize changes
|
||||
enqueue schedule q dstatus t (stubInfo f remote)
|
||||
enqueue schedule t (stubInfo f remote)
|
||||
|
||||
queueTransferWhenSmall :: TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
|
||||
queueTransferWhenSmall :: AssociatedFile -> Transfer -> Remote -> Assistant ()
|
||||
queueTransferWhenSmall = queueTransferAt 10 Later
|
||||
|
||||
{- Blocks until a pending transfer is available in the queue,
|
||||
|
@ -151,38 +153,45 @@ queueTransferWhenSmall = queueTransferAt 10 Later
|
|||
-
|
||||
- This is done in a single STM transaction, so there is no window
|
||||
- where an observer sees an inconsistent status. -}
|
||||
getNextTransfer :: TransferQueue -> DaemonStatusHandle -> (TransferInfo -> Bool) -> IO (Maybe (Transfer, TransferInfo))
|
||||
getNextTransfer q dstatus acceptable = atomically $ do
|
||||
sz <- readTVar (queuesize q)
|
||||
if sz < 1
|
||||
then retry -- blocks until queuesize changes
|
||||
else do
|
||||
(r@(t,info):rest) <- readTVar (queuelist q)
|
||||
writeTVar (queuelist q) rest
|
||||
void $ modifyTVar' (queuesize q) pred
|
||||
if acceptable info
|
||||
then do
|
||||
adjustTransfersSTM dstatus $
|
||||
M.insertWith' const t info
|
||||
return $ Just r
|
||||
else return Nothing
|
||||
getNextTransfer :: (TransferInfo -> Bool) -> Assistant (Maybe (Transfer, TransferInfo))
|
||||
getNextTransfer acceptable = do
|
||||
q <- getAssistant transferQueue
|
||||
dstatus <- getAssistant daemonStatusHandle
|
||||
liftIO $ atomically $ do
|
||||
sz <- readTVar (queuesize q)
|
||||
if sz < 1
|
||||
then retry -- blocks until queuesize changes
|
||||
else do
|
||||
(r@(t,info):rest) <- readTVar (queuelist q)
|
||||
writeTVar (queuelist q) rest
|
||||
void $ modifyTVar' (queuesize q) pred
|
||||
if acceptable info
|
||||
then do
|
||||
adjustTransfersSTM dstatus $
|
||||
M.insertWith' const t info
|
||||
return $ Just r
|
||||
else return Nothing
|
||||
|
||||
{- Moves transfers matching a condition from the queue, to the
|
||||
- currentTransfers map. -}
|
||||
getMatchingTransfers :: TransferQueue -> DaemonStatusHandle -> (Transfer -> Bool) -> IO [(Transfer, TransferInfo)]
|
||||
getMatchingTransfers q dstatus c = atomically $ do
|
||||
ts <- dequeueTransfersSTM q c
|
||||
unless (null ts) $
|
||||
adjustTransfersSTM dstatus $ \m -> M.union m $ M.fromList ts
|
||||
return ts
|
||||
getMatchingTransfers :: (Transfer -> Bool) -> Assistant [(Transfer, TransferInfo)]
|
||||
getMatchingTransfers c = do
|
||||
q <- getAssistant transferQueue
|
||||
dstatus <- getAssistant daemonStatusHandle
|
||||
liftIO $ atomically $ do
|
||||
ts <- dequeueTransfersSTM q c
|
||||
unless (null ts) $
|
||||
adjustTransfersSTM dstatus $ \m -> M.union m $ M.fromList ts
|
||||
return ts
|
||||
|
||||
{- Removes transfers matching a condition from the queue, and returns the
|
||||
- removed transfers. -}
|
||||
dequeueTransfers :: TransferQueue -> DaemonStatusHandle -> (Transfer -> Bool) -> IO [(Transfer, TransferInfo)]
|
||||
dequeueTransfers q dstatus c = do
|
||||
removed <- atomically $ dequeueTransfersSTM q c
|
||||
dequeueTransfers :: (Transfer -> Bool) -> Assistant [(Transfer, TransferInfo)]
|
||||
dequeueTransfers c = do
|
||||
q <- getAssistant transferQueue
|
||||
removed <- liftIO $ atomically $ dequeueTransfersSTM q c
|
||||
unless (null removed) $
|
||||
notifyTransferOld dstatus
|
||||
notifyTransfer
|
||||
return removed
|
||||
|
||||
dequeueTransfersSTM :: TransferQueue -> (Transfer -> Bool) -> STM [(Transfer, TransferInfo)]
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue