diff --git a/Assistant/Threads/Transferrer.hs b/Assistant/Threads/Transferrer.hs index 9a772d628b..d4c00afd8e 100644 --- a/Assistant/Threads/Transferrer.hs +++ b/Assistant/Threads/Transferrer.hs @@ -33,21 +33,23 @@ maxTransfers = 1 transfererThread :: ThreadState -> DaemonStatusHandle -> TransferQueue -> TransferSlots -> IO () transfererThread st dstatus transferqueue slots = go =<< readProgramFile where - go program = getNextTransfer transferqueue dstatus notrunning >>= handle program - handle program Nothing = go program - handle program (Just (t, info)) = do - ifM (runThreadState st $ shouldTransfer t info) - ( do - debug thisThread [ "Transferring:" , show t ] - notifyTransfer dstatus - transferThread dstatus slots t info inTransferSlot program - , do - debug thisThread [ "Skipping unnecessary transfer:" , show t ] - -- getNextTransfer added t to the - -- daemonstatus's transfer map. - void $ removeTransfer dstatus t - ) - go program + go program = forever $ inTransferSlot dstatus slots $ + getNextTransfer transferqueue dstatus notrunning + >>= handle program + handle _ Nothing = return Nothing + handle program (Just (t, info)) = ifM (runThreadState st $ shouldTransfer t info) + ( do + debug thisThread [ "Transferring:" , show t ] + notifyTransfer dstatus + let a = doTransfer dstatus t info program + return $ Just (t, info, a) + , do + debug thisThread [ "Skipping unnecessary transfer:" , show t ] + -- getNextTransfer added t to the + -- daemonstatus's transfer map. + void $ removeTransfer dstatus t + return Nothing + ) {- Skip transfers that are already running. -} notrunning i = startedTime i == Nothing @@ -70,24 +72,11 @@ shouldTransfer t info where key = transferKey t -{- A sepeate git-annex process is forked off to run a transfer, - - running in its own process group. This allows killing it and all its - - children if the user decides to cancel the transfer. - - - - A thread is forked off to run the process, and the thread - - occupies one of the transfer slots. If all slots are in use, this will - - block until one becomes available. The thread's id is also recorded in - - the transfer info; the thread will also be killed when a transfer is - - stopped, to avoid it displaying any alert about the transfer having - - failed. -} -transferThread :: DaemonStatusHandle -> TransferSlots -> Transfer -> TransferInfo -> TransferSlotRunner -> FilePath -> IO () -transferThread dstatus slots t info runner program = case (transferRemote info, associatedFile info) of +doTransfer :: DaemonStatusHandle -> Transfer -> TransferInfo -> FilePath -> IO () +doTransfer dstatus t info program = case (transferRemote info, associatedFile info) of (Nothing, _) -> noop (_, Nothing) -> noop - (Just remote, Just file) -> do - tid <- runner slots $ - transferprocess remote file - updateTransferInfo dstatus t $ info { transferTid = Just tid } + (Just remote, Just file) -> transferprocess remote file where direction = transferDirection t isdownload = direction == Download diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs index 27b869f1df..8e24d730cb 100644 --- a/Assistant/TransferSlots.hs +++ b/Assistant/TransferSlots.hs @@ -9,13 +9,15 @@ module Assistant.TransferSlots where +import Common.Annex +import Utility.ThreadScheduler +import Assistant.DaemonStatus +import Logs.Transfer + import qualified Control.Exception as E import Control.Concurrent import Data.Typeable -import Common.Annex -import Utility.ThreadScheduler - type TransferSlots = QSemN {- A special exception that can be thrown to pause or resume a transfer, while @@ -25,7 +27,8 @@ data TransferException = PauseTransfer | ResumeTransfer instance E.Exception TransferException -type TransferSlotRunner = TransferSlots -> IO () -> IO ThreadId +type TransferSlotRunner = DaemonStatusHandle -> TransferSlots -> TransferGenerator -> IO () +type TransferGenerator = IO (Maybe (Transfer, TransferInfo, IO ())) {- Number of concurrent transfers allowed to be run from the assistant. - @@ -38,31 +41,40 @@ numSlots = 1 newTransferSlots :: IO TransferSlots newTransferSlots = newQSemN numSlots -{- Waits until a transfer slot becomes available, and runs a transfer - - action in the slot, in its own thread. +{- Waits until a transfer slot becomes available, then runs a + - TransferGenerator, and then runs the transfer action in its own thread. -} inTransferSlot :: TransferSlotRunner -inTransferSlot = runTransferSlot (\s -> waitQSemN s 1) +inTransferSlot dstatus s gen = do + waitQSemN s 1 + runTransferThread dstatus s =<< gen -{- Runs a transfer action, without waiting for a slot to become available. -} +{- Runs a TransferGenerator, and its transfer action, + - without waiting for a slot to become available. -} inImmediateTransferSlot :: TransferSlotRunner -inImmediateTransferSlot = runTransferSlot (\s -> signalQSemN s (-1)) +inImmediateTransferSlot dstatus s gen = do + signalQSemN s (-1) + runTransferThread dstatus s =<< gen -{- Note that the action is subject to being killed when the transfer +{- Runs a transfer action, in an already allocated transfer slot. + - Once it finishes, frees the transfer slot. + - + - Note that the action is subject to being killed when the transfer - is canceled or paused. - - A PauseTransfer exception is handled by letting the action be killed, - then pausing the thread until a ResumeTransfer exception is raised, - then rerunning the action. -} -runTransferSlot :: (QSemN -> IO ()) -> TransferSlotRunner -runTransferSlot allocator s transfer = do - allocator s - forkIO $ E.bracket_ noop (signalQSemN s 1) go +runTransferThread :: DaemonStatusHandle -> TransferSlots -> Maybe (Transfer, TransferInfo, IO ()) -> IO () +runTransferThread _ s Nothing = signalQSemN s 1 +runTransferThread dstatus s (Just (t, info, a)) = do + tid <- forkIO $ E.bracket_ noop (signalQSemN s 1) go + updateTransferInfo dstatus t $ info { transferTid = Just tid } where - go = catchPauseResume transfer + go = catchPauseResume a pause = catchPauseResume $ runEvery (Seconds 86400) noop - catchPauseResume a = E.catch a handlePauseResume + catchPauseResume a' = E.catch a' handlePauseResume handlePauseResume PauseTransfer = do putStrLn "pause" pause diff --git a/Assistant/WebApp/DashBoard.hs b/Assistant/WebApp/DashBoard.hs index 949793121a..e51708d640 100644 --- a/Assistant/WebApp/DashBoard.hs +++ b/Assistant/WebApp/DashBoard.hs @@ -210,9 +210,10 @@ startTransfer t = do - forget that old pid, and start a new one. -} liftIO $ updateTransferInfo dstatus t $ info { transferPid = Nothing } - liftIO $ Transferrer.transferThread - dstatus slots t info inImmediateTransferSlot - =<< readProgramFile + liftIO $ inImmediateTransferSlot dstatus slots $ do + program <- readProgramFile + let a = Transferrer.doTransfer dstatus t info program + return $ Just (t, info, a) getCurrentTransfers :: Handler TransferMap getCurrentTransfers = currentTransfers