make start button work on queued transfers
When multiple downloads of a key are queued, it starts the first, but leaves the other downloads in the queue. This ensures that we don't lose a queued download if the one that got started failed.
This commit is contained in:
parent
c21a9fe04a
commit
8d32d54320
3 changed files with 66 additions and 51 deletions
|
@ -34,49 +34,26 @@ transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> Transf
|
|||
transfererThread st dstatus transferqueue slots = go =<< readProgramFile
|
||||
where
|
||||
go program = forever $ inTransferSlot dstatus slots $
|
||||
getNextTransfer transferqueue dstatus notrunning
|
||||
>>= handle program
|
||||
handle _ Nothing = return Nothing
|
||||
handle program (Just (t, info)) = ifM (runThreadState st $ shouldTransfer t info)
|
||||
( do
|
||||
debug thisThread [ "Transferring:" , show t ]
|
||||
notifyTransfer dstatus
|
||||
let a = doTransfer dstatus t info program
|
||||
return $ Just (t, info, a)
|
||||
, do
|
||||
debug thisThread [ "Skipping unnecessary transfer:" , show t ]
|
||||
-- getNextTransfer added t to the
|
||||
-- daemonstatus's transfer map.
|
||||
void $ removeTransfer dstatus t
|
||||
return Nothing
|
||||
)
|
||||
maybe (return Nothing) (uncurry $ startTransfer st dstatus program)
|
||||
=<< getNextTransfer transferqueue dstatus notrunning
|
||||
{- Skip transfers that are already running. -}
|
||||
notrunning i = startedTime i == Nothing
|
||||
|
||||
{- Checks if the file to download is already present, or the remote
|
||||
- being uploaded to isn't known to have the file. -}
|
||||
shouldTransfer :: Transfer -> TransferInfo -> Annex Bool
|
||||
shouldTransfer t info
|
||||
| transferDirection t == Download =
|
||||
not <$> inAnnex key
|
||||
| transferDirection t == Upload =
|
||||
{- Trust the location log to check if the
|
||||
- remote already has the key. This avoids
|
||||
- a roundtrip to the remote. -}
|
||||
case transferRemote info of
|
||||
Nothing -> return False
|
||||
Just remote ->
|
||||
notElem (Remote.uuid remote)
|
||||
<$> loggedLocations key
|
||||
| otherwise = return False
|
||||
where
|
||||
key = transferKey t
|
||||
|
||||
doTransfer :: DaemonStatusHandle -> Transfer -> TransferInfo -> FilePath -> IO ()
|
||||
doTransfer dstatus t info program = case (transferRemote info, associatedFile info) of
|
||||
(Nothing, _) -> noop
|
||||
(_, Nothing) -> noop
|
||||
(Just remote, Just file) -> transferprocess remote file
|
||||
{- By the time this is called, the daemonstatis's transfer map should
|
||||
- already have been updated to include the transfer. -}
|
||||
startTransfer :: ThreadState -> DaemonStatusHandle -> FilePath -> Transfer -> TransferInfo -> TransferGenerator
|
||||
startTransfer st dstatus program t info = case (transferRemote info, associatedFile info) of
|
||||
(Just remote, Just file) -> ifM (runThreadState st $ shouldTransfer t info)
|
||||
( do
|
||||
debug thisThread [ "Transferring:" , show t ]
|
||||
notifyTransfer dstatus
|
||||
return $ Just (t, info, transferprocess remote file)
|
||||
, do
|
||||
debug thisThread [ "Skipping unnecessary transfer:" , show t ]
|
||||
void $ removeTransfer dstatus t
|
||||
return Nothing
|
||||
)
|
||||
_ -> return Nothing
|
||||
where
|
||||
direction = transferDirection t
|
||||
isdownload = direction == Download
|
||||
|
@ -101,3 +78,22 @@ doTransfer dstatus t info program = case (transferRemote info, associatedFile in
|
|||
, Param "--file"
|
||||
, File file
|
||||
]
|
||||
|
||||
{- Checks if the file to download is already present, or the remote
|
||||
- being uploaded to isn't known to have the file. -}
|
||||
shouldTransfer :: Transfer -> TransferInfo -> Annex Bool
|
||||
shouldTransfer t info
|
||||
| transferDirection t == Download =
|
||||
not <$> inAnnex key
|
||||
| transferDirection t == Upload =
|
||||
{- Trust the location log to check if the
|
||||
- remote already has the key. This avoids
|
||||
- a roundtrip to the remote. -}
|
||||
case transferRemote info of
|
||||
Nothing -> return False
|
||||
Just remote ->
|
||||
notElem (Remote.uuid remote)
|
||||
<$> loggedLocations key
|
||||
| otherwise = return False
|
||||
where
|
||||
key = transferKey t
|
||||
|
|
|
@ -15,6 +15,7 @@ module Assistant.TransferQueue (
|
|||
queueTransferAt,
|
||||
queueTransferWhenSmall,
|
||||
getNextTransfer,
|
||||
getMatchingTransfers,
|
||||
dequeueTransfers,
|
||||
) where
|
||||
|
||||
|
@ -140,20 +141,32 @@ getNextTransfer q dstatus acceptable = atomically $ do
|
|||
return $ Just r
|
||||
else return Nothing
|
||||
|
||||
{- Moves transfers matching a condition from the queue, to the
|
||||
- currentTransfers map. -}
|
||||
getMatchingTransfers :: TransferQueue -> DaemonStatusHandle -> (Transfer -> Bool) -> IO [(Transfer, TransferInfo)]
|
||||
getMatchingTransfers q dstatus c = atomically $ do
|
||||
ts <- dequeueTransfersSTM q c
|
||||
unless (null ts) $
|
||||
adjustTransfersSTM dstatus $ \m -> M.union m $ M.fromList ts
|
||||
return ts
|
||||
|
||||
{- Removes transfers matching a condition from the queue, and returns the
|
||||
- removed transfers. -}
|
||||
dequeueTransfers :: TransferQueue -> DaemonStatusHandle -> (Transfer -> Bool) -> IO [(Transfer, TransferInfo)]
|
||||
dequeueTransfers q dstatus c = do
|
||||
removed <- atomically $ do
|
||||
(removed, ls) <- partition (c . fst)
|
||||
<$> readTVar (queuelist q)
|
||||
void $ writeTVar (queuesize q) (length ls)
|
||||
void $ writeTVar (queuelist q) ls
|
||||
drain
|
||||
forM_ ls $ unGetTChan (queue q)
|
||||
return removed
|
||||
removed <- atomically $ dequeueTransfersSTM q c
|
||||
unless (null removed) $
|
||||
notifyTransfer dstatus
|
||||
return removed
|
||||
|
||||
dequeueTransfersSTM :: TransferQueue -> (Transfer -> Bool) -> STM [(Transfer, TransferInfo)]
|
||||
dequeueTransfersSTM q c = do
|
||||
(removed, ts) <- partition (c . fst)
|
||||
<$> readTVar (queuelist q)
|
||||
void $ writeTVar (queuesize q) (length ts)
|
||||
void $ writeTVar (queuelist q) ts
|
||||
drain
|
||||
forM_ ts $ unGetTChan (queue q)
|
||||
return removed
|
||||
where
|
||||
drain = maybe noop (const drain) =<< tryReadTChan (queue q)
|
||||
|
|
|
@ -209,8 +209,14 @@ cancelTransfer pause t = do
|
|||
startTransfer :: Transfer -> Handler ()
|
||||
startTransfer t = do
|
||||
m <- getCurrentTransfers
|
||||
webapp <- getYesod
|
||||
let dstatus = daemonStatus webapp
|
||||
let q = transferQueue webapp
|
||||
{- resume a paused transfer -}
|
||||
maybe noop go (M.lookup t m)
|
||||
-- TODO: handle starting a queued transfer
|
||||
{- start a queued transfer -}
|
||||
is <- liftIO $ map snd <$> getMatchingTransfers q dstatus (== t)
|
||||
maybe noop start $ headMaybe is
|
||||
where
|
||||
go info = maybe (start info) (resume info) $ transferTid info
|
||||
resume info tid = do
|
||||
|
@ -222,6 +228,7 @@ startTransfer t = do
|
|||
throwTo tid ResumeTransfer
|
||||
start info = do
|
||||
webapp <- getYesod
|
||||
let st = fromJust $ threadState webapp
|
||||
let dstatus = daemonStatus webapp
|
||||
let slots = transferSlots webapp
|
||||
{- This transfer was being run by another process,
|
||||
|
@ -230,8 +237,7 @@ startTransfer t = do
|
|||
{ transferPid = Nothing, transferPaused = False }
|
||||
liftIO $ inImmediateTransferSlot dstatus slots $ do
|
||||
program <- readProgramFile
|
||||
let a = Transferrer.doTransfer dstatus t info program
|
||||
return $ Just (t, info, a)
|
||||
Transferrer.startTransfer st dstatus program t info
|
||||
|
||||
getCurrentTransfers :: Handler TransferMap
|
||||
getCurrentTransfers = currentTransfers
|
||||
|
|
Loading…
Add table
Reference in a new issue