implement pausing of transfers
A paused transfer's thread keeps running, keeping the slot in use. This is intentional; pausing a transfer should not let other queued transfers to run in its place.
This commit is contained in:
parent
21bd92f077
commit
8ba9830653
5 changed files with 62 additions and 26 deletions
|
@ -183,17 +183,19 @@ adjustTransfersSTM dstatus a = do
|
|||
s <- takeTMVar dstatus
|
||||
putTMVar dstatus $ s { currentTransfers = a (currentTransfers s) }
|
||||
|
||||
{- Updates a transfer's info. Preserves any transferTid value, which is not
|
||||
- written to disk. -}
|
||||
{- Updates a transfer's info.
|
||||
- Preserves the transferTid and transferPaused values,
|
||||
- which are not written to disk. -}
|
||||
updateTransferInfo :: DaemonStatusHandle -> Transfer -> TransferInfo -> IO ()
|
||||
updateTransferInfo dstatus t info =
|
||||
notifyTransfer dstatus `after` modifyDaemonStatus_ dstatus go
|
||||
where
|
||||
go s = s { currentTransfers = update (currentTransfers s) }
|
||||
update m = M.insertWith' merge t info m
|
||||
merge new old = case transferTid old of
|
||||
Nothing -> new
|
||||
Just _ -> new { transferTid = transferTid old }
|
||||
merge new old = new
|
||||
{ transferTid = maybe (transferTid new) Just (transferTid old)
|
||||
, transferPaused = transferPaused new || transferPaused old
|
||||
}
|
||||
|
||||
{- Removes a transfer from the map, and returns its info. -}
|
||||
removeTransfer :: DaemonStatusHandle -> Transfer -> IO (Maybe TransferInfo)
|
||||
|
|
|
@ -56,6 +56,7 @@ stubInfo f r = TransferInfo
|
|||
, transferRemote = Just r
|
||||
, bytesComplete = Nothing
|
||||
, associatedFile = f
|
||||
, transferPaused = False
|
||||
}
|
||||
|
||||
{- Adds transfers to queue for some of the known remotes. -}
|
||||
|
|
|
@ -5,15 +5,26 @@
|
|||
- Licensed under the GNU GPL version 3 or higher.
|
||||
-}
|
||||
|
||||
{-# LANGUAGE DeriveDataTypeable #-}
|
||||
|
||||
module Assistant.TransferSlots where
|
||||
|
||||
import Control.Exception
|
||||
import qualified Control.Exception as E
|
||||
import Control.Concurrent
|
||||
import Data.Typeable
|
||||
|
||||
import Common.Annex
|
||||
import Utility.ThreadScheduler
|
||||
|
||||
type TransferSlots = QSemN
|
||||
|
||||
{- A special exception that can be thrown to pause or resume a transfer, while
|
||||
- keeping its slot in use. -}
|
||||
data TransferException = PauseTransfer | ResumeTransfer
|
||||
deriving (Show, Eq, Typeable)
|
||||
|
||||
instance E.Exception TransferException
|
||||
|
||||
{- Number of concurrent transfers allowed to be run from the assistant.
|
||||
-
|
||||
- Transfers launched by other means, including by remote assistants,
|
||||
|
@ -26,15 +37,26 @@ newTransferSlots :: IO TransferSlots
|
|||
newTransferSlots = newQSemN numSlots
|
||||
|
||||
{- Waits until a transfer slot becomes available, and runs a transfer
|
||||
- action in the slot, in its own thread. Note that this thread is
|
||||
- subject to being killed when the transfer is canceled. -}
|
||||
- action in the slot, in its own thread.
|
||||
-
|
||||
- Note that the action is subject to being killed when the transfer
|
||||
- is canceled or paused.
|
||||
-
|
||||
- A PauseTransfer exception is handled by letting the action be killed,
|
||||
- then pausing the thread until a ResumeTransfer exception is raised,
|
||||
- then rerunning the action.
|
||||
-}
|
||||
inTransferSlot :: TransferSlots -> IO () -> IO ThreadId
|
||||
inTransferSlot s a = do
|
||||
inTransferSlot s transfer = do
|
||||
waitQSemN s 1
|
||||
forkIO $ bracket_ noop done a
|
||||
forkIO $ E.bracket_ noop (signalQSemN s 1) go
|
||||
where
|
||||
done = transferComplete s
|
||||
|
||||
{- Call when a transfer is complete. -}
|
||||
transferComplete :: TransferSlots -> IO ()
|
||||
transferComplete s = signalQSemN s 1
|
||||
go = catchPauseResume transfer
|
||||
pause = catchPauseResume $ runEvery (Seconds 86400) noop
|
||||
catchPauseResume a = E.catch a handlePauseResume
|
||||
handlePauseResume PauseTransfer = do
|
||||
putStrLn "pause"
|
||||
pause
|
||||
handlePauseResume ResumeTransfer = do
|
||||
putStrLn "resume"
|
||||
go
|
||||
|
|
|
@ -16,6 +16,7 @@ import Assistant.WebApp.Notifications
|
|||
import Assistant.WebApp.Configurators
|
||||
import Assistant.DaemonStatus
|
||||
import Assistant.TransferQueue
|
||||
import Assistant.TransferSlots
|
||||
import Utility.NotificationBroadcaster
|
||||
import Utility.Yesod
|
||||
import Logs.Transfer
|
||||
|
@ -147,18 +148,18 @@ getStartTransferR t = startTransfer t >> redirectBack
|
|||
postStartTransferR :: Transfer -> Handler ()
|
||||
postStartTransferR t = startTransfer t
|
||||
getCancelTransferR :: Transfer -> Handler ()
|
||||
getCancelTransferR t = cancelTransfer t >> redirectBack
|
||||
getCancelTransferR t = cancelTransfer False t >> redirectBack
|
||||
postCancelTransferR :: Transfer -> Handler ()
|
||||
postCancelTransferR t = cancelTransfer t
|
||||
|
||||
pauseTransfer :: Transfer -> Handler ()
|
||||
pauseTransfer t = liftIO $ putStrLn "pause"
|
||||
postCancelTransferR t = cancelTransfer False t
|
||||
|
||||
startTransfer :: Transfer -> Handler ()
|
||||
startTransfer t = liftIO $ putStrLn "start"
|
||||
|
||||
cancelTransfer :: Transfer -> Handler ()
|
||||
cancelTransfer t = do
|
||||
pauseTransfer :: Transfer -> Handler ()
|
||||
pauseTransfer = cancelTransfer True
|
||||
|
||||
cancelTransfer :: Bool -> Transfer-> Handler ()
|
||||
cancelTransfer pause t = do
|
||||
webapp <- getYesod
|
||||
let dstatus = daemonStatus webapp
|
||||
liftIO $ do
|
||||
|
@ -169,15 +170,22 @@ cancelTransfer t = do
|
|||
where
|
||||
running dstatus = M.lookup t . currentTransfers
|
||||
<$> getDaemonStatus dstatus
|
||||
stop dstatus info = void $ do
|
||||
putStrLn $ "stopping transfer " ++ show info
|
||||
stop dstatus info = do
|
||||
{- When there's a thread associated with the
|
||||
- transfer, it's killed first, to avoid it
|
||||
- displaying any alert about the transfer having
|
||||
- failed when the transfer process is killed. -}
|
||||
maybe noop killThread $ transferTid info
|
||||
maybe noop signalthread $ transferTid info
|
||||
maybe noop killproc $ transferPid info
|
||||
removeTransfer dstatus t
|
||||
if pause
|
||||
then void $
|
||||
updateTransferInfo dstatus t $ info
|
||||
{ transferPaused = True }
|
||||
else void $
|
||||
removeTransfer dstatus t
|
||||
signalthread tid
|
||||
| pause = throwTo tid PauseTransfer
|
||||
| otherwise = killThread tid
|
||||
{- In order to stop helper processes like rsync,
|
||||
- kill the whole process group of the process running the
|
||||
- transfer. -}
|
||||
|
|
|
@ -45,6 +45,7 @@ data TransferInfo = TransferInfo
|
|||
, transferRemote :: Maybe Remote
|
||||
, bytesComplete :: Maybe Integer
|
||||
, associatedFile :: Maybe FilePath
|
||||
, transferPaused :: Bool
|
||||
}
|
||||
deriving (Show, Eq, Ord)
|
||||
|
||||
|
@ -93,6 +94,7 @@ runTransfer t file a = do
|
|||
<*> pure Nothing -- not 0; transfer may be resuming
|
||||
<*> pure Nothing
|
||||
<*> pure file
|
||||
<*> pure False
|
||||
bracketIO (prep tfile mode info) (cleanup tfile) a
|
||||
where
|
||||
prep tfile mode info = do
|
||||
|
@ -185,6 +187,7 @@ readTransferInfo pid s =
|
|||
<*> pure Nothing
|
||||
<*> pure Nothing
|
||||
<*> pure (if null filename then Nothing else Just filename)
|
||||
<*> pure False
|
||||
_ -> Nothing
|
||||
where
|
||||
(bits, filebits) = splitAt 1 $ lines s
|
||||
|
|
Loading…
Add table
Reference in a new issue