remove the TChan component from the data structure

The code to maintain that TChan in parallel with the list was buggy,
the two were not always the same. And all that TChan was needed for was
blocking on the next transfer, which can be accomplished just as well by
checking the size and retrying, thanks to STM.

Also, this is faster, and uses less memory. Total win.
This commit is contained in:
Joey Hess 2012-08-31 12:57:24 -04:00
parent 4004baafaf
commit 8335a7ff7a

View file

@ -29,12 +29,8 @@ import qualified Types.Remote as Remote
import Control.Concurrent.STM import Control.Concurrent.STM
import qualified Data.Map as M import qualified Data.Map as M
{- The transfer queue consists of a channel listing the transfers to make;
- the size of the queue is also tracked, and a list is maintained
- in parallel to allow for reading. -}
data TransferQueue = TransferQueue data TransferQueue = TransferQueue
{ queue :: TChan (Transfer, TransferInfo) { queuesize :: TVar Int
, queuesize :: TVar Int
, queuelist :: TVar [(Transfer, TransferInfo)] , queuelist :: TVar [(Transfer, TransferInfo)]
} }
@ -43,8 +39,7 @@ data Schedule = Next | Later
newTransferQueue :: IO TransferQueue newTransferQueue :: IO TransferQueue
newTransferQueue = atomically $ TransferQueue newTransferQueue = atomically $ TransferQueue
<$> newTChan <$> newTVar 0
<*> newTVar 0
<*> newTVar [] <*> newTVar []
{- Reads the queue's content without blocking or changing it. -} {- Reads the queue's content without blocking or changing it. -}
@ -91,13 +86,12 @@ queueTransfers schedule q dstatus k f direction = do
enqueue :: Schedule -> TransferQueue -> DaemonStatusHandle -> Transfer -> TransferInfo -> IO () enqueue :: Schedule -> TransferQueue -> DaemonStatusHandle -> Transfer -> TransferInfo -> IO ()
enqueue schedule q dstatus t info enqueue schedule q dstatus t info
| schedule == Next = go unGetTChan (new:) | schedule == Next = go (new:)
| otherwise = go writeTChan (\l -> l++[new]) | otherwise = go (\l -> l++[new])
where where
new = (t, info) new = (t, info)
go modqueue modlist = do go modlist = do
atomically $ do atomically $ do
void $ modqueue (queue q) new
void $ modifyTVar' (queuesize q) succ void $ modifyTVar' (queuesize q) succ
void $ modifyTVar' (queuelist q) modlist void $ modifyTVar' (queuelist q) modlist
void $ notifyTransfer dstatus void $ notifyTransfer dstatus
@ -121,7 +115,7 @@ queueTransferAt wantsz schedule q dstatus f t remote = do
queueTransferWhenSmall :: TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO () queueTransferWhenSmall :: TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
queueTransferWhenSmall = queueTransferAt 10 Later queueTransferWhenSmall = queueTransferAt 10 Later
{- Blocks until a pending transfer is available from the queue, {- Blocks until a pending transfer is available in the queue,
- and removes it. - and removes it.
- -
- Checks that it's acceptable, before adding it to the - Checks that it's acceptable, before adding it to the
@ -131,15 +125,19 @@ queueTransferWhenSmall = queueTransferAt 10 Later
- where an observer sees an inconsistent status. -} - where an observer sees an inconsistent status. -}
getNextTransfer :: TransferQueue -> DaemonStatusHandle -> (TransferInfo -> Bool) -> IO (Maybe (Transfer, TransferInfo)) getNextTransfer :: TransferQueue -> DaemonStatusHandle -> (TransferInfo -> Bool) -> IO (Maybe (Transfer, TransferInfo))
getNextTransfer q dstatus acceptable = atomically $ do getNextTransfer q dstatus acceptable = atomically $ do
void $ modifyTVar' (queuesize q) pred sz <- readTVar (queuesize q)
void $ modifyTVar' (queuelist q) (drop 1) if sz < 1
r@(t, info) <- readTChan (queue q) then retry -- blocks until queuesize changes
if acceptable info else do
then do (r@(t,info):rest) <- readTVar (queuelist q)
adjustTransfersSTM dstatus $ writeTVar (queuelist q) rest
M.insertWith' const t info void $ modifyTVar' (queuesize q) pred
return $ Just r if acceptable info
else return Nothing then do
adjustTransfersSTM dstatus $
M.insertWith' const t info
return $ Just r
else return Nothing
{- Moves transfers matching a condition from the queue, to the {- Moves transfers matching a condition from the queue, to the
- currentTransfers map. -} - currentTransfers map. -}
@ -165,8 +163,4 @@ dequeueTransfersSTM q c = do
<$> readTVar (queuelist q) <$> readTVar (queuelist q)
void $ writeTVar (queuesize q) (length ts) void $ writeTVar (queuesize q) (length ts)
void $ writeTVar (queuelist q) ts void $ writeTVar (queuelist q) ts
drain
forM_ ts $ unGetTChan (queue q)
return removed return removed
where
drain = maybe noop (const drain) =<< tryReadTChan (queue q)