implement resuming of paused transfers

Currently waits for a new transfer slot to open up, which probably needs to
change..
This commit is contained in:
Joey Hess 2012-08-12 12:11:20 -04:00
parent 37eed5d8d0
commit b6b8f6da9c
5 changed files with 36 additions and 12 deletions

View file

@ -155,7 +155,7 @@ startAssistant assistant daemonize webappwaiter = do
mapM_ startthread mapM_ startthread
[ watch $ commitThread st changechan commitchan transferqueue dstatus [ watch $ commitThread st changechan commitchan transferqueue dstatus
#ifdef WITH_WEBAPP #ifdef WITH_WEBAPP
, assist $ webAppThread (Just st) dstatus scanremotes transferqueue Nothing webappwaiter , assist $ webAppThread (Just st) dstatus scanremotes transferqueue transferslots Nothing webappwaiter
#endif #endif
, assist $ pushThread st dstatus commitchan pushmap , assist $ pushThread st dstatus commitchan pushmap
, assist $ pushRetryThread st dstatus pushmap , assist $ pushRetryThread st dstatus pushmap

View file

@ -21,6 +21,7 @@ import Assistant.ThreadedMonad
import Assistant.DaemonStatus import Assistant.DaemonStatus
import Assistant.ScanRemotes import Assistant.ScanRemotes
import Assistant.TransferQueue import Assistant.TransferQueue
import Assistant.TransferSlots
import Utility.WebApp import Utility.WebApp
import Utility.FileMode import Utility.FileMode
import Utility.TempFile import Utility.TempFile
@ -43,15 +44,17 @@ webAppThread
-> DaemonStatusHandle -> DaemonStatusHandle
-> ScanRemoteMap -> ScanRemoteMap
-> TransferQueue -> TransferQueue
-> TransferSlots
-> Maybe (IO String) -> Maybe (IO String)
-> Maybe (Url -> FilePath -> IO ()) -> Maybe (Url -> FilePath -> IO ())
-> IO () -> IO ()
webAppThread mst dstatus scanremotes transferqueue postfirstrun onstartup = do webAppThread mst dstatus scanremotes transferqueue transferslots postfirstrun onstartup = do
webapp <- WebApp webapp <- WebApp
<$> pure mst <$> pure mst
<*> pure dstatus <*> pure dstatus
<*> pure scanremotes <*> pure scanremotes
<*> pure transferqueue <*> pure transferqueue
<*> pure transferslots
<*> (pack <$> genRandomToken) <*> (pack <$> genRandomToken)
<*> getreldir mst <*> getreldir mst
<*> pure $(embed "static") <*> pure $(embed "static")

View file

@ -15,6 +15,7 @@ import Assistant.ThreadedMonad
import Assistant.DaemonStatus import Assistant.DaemonStatus
import Assistant.ScanRemotes import Assistant.ScanRemotes
import Assistant.TransferQueue import Assistant.TransferQueue
import Assistant.TransferSlots
import Assistant.Alert import Assistant.Alert
import Utility.NotificationBroadcaster import Utility.NotificationBroadcaster
import Utility.WebApp import Utility.WebApp
@ -36,6 +37,7 @@ data WebApp = WebApp
, daemonStatus :: DaemonStatusHandle , daemonStatus :: DaemonStatusHandle
, scanRemotes :: ScanRemoteMap , scanRemotes :: ScanRemoteMap
, transferQueue :: TransferQueue , transferQueue :: TransferQueue
, transferSlots :: TransferSlots
, secretToken :: Text , secretToken :: Text
, relDir :: Maybe FilePath , relDir :: Maybe FilePath
, getStatic :: Static , getStatic :: Static

View file

@ -17,6 +17,7 @@ import Assistant.WebApp.Configurators
import Assistant.DaemonStatus import Assistant.DaemonStatus
import Assistant.TransferQueue import Assistant.TransferQueue
import Assistant.TransferSlots import Assistant.TransferSlots
import qualified Assistant.Threads.Transferrer as Transferrer
import Utility.NotificationBroadcaster import Utility.NotificationBroadcaster
import Utility.Yesod import Utility.Yesod
import Logs.Transfer import Logs.Transfer
@ -39,9 +40,7 @@ import System.Posix.Process (getProcessGroupIDOf)
transfersDisplay :: Bool -> Widget transfersDisplay :: Bool -> Widget
transfersDisplay warnNoScript = do transfersDisplay warnNoScript = do
webapp <- lift getYesod webapp <- lift getYesod
current <- lift $ runAnnex [] $ current <- lift $ M.toList <$> getCurrentTransfers
M.toList . currentTransfers
<$> liftIO (getDaemonStatus $ daemonStatus webapp)
queued <- liftIO $ getTransferQueue $ transferQueue webapp queued <- liftIO $ getTransferQueue $ transferQueue webapp
let ident = "transfers" let ident = "transfers"
autoUpdate ident NotifierTransfersR (10 :: Int) (10 :: Int) autoUpdate ident NotifierTransfersR (10 :: Int) (10 :: Int)
@ -155,9 +154,6 @@ getCancelTransferR t = cancelTransfer False t >> redirectBack
postCancelTransferR :: Transfer -> Handler () postCancelTransferR :: Transfer -> Handler ()
postCancelTransferR t = cancelTransfer False t postCancelTransferR t = cancelTransfer False t
startTransfer :: Transfer -> Handler ()
startTransfer t = liftIO $ putStrLn "start"
pauseTransfer :: Transfer -> Handler () pauseTransfer :: Transfer -> Handler ()
pauseTransfer = cancelTransfer True pauseTransfer = cancelTransfer True
@ -165,14 +161,13 @@ cancelTransfer :: Bool -> Transfer-> Handler ()
cancelTransfer pause t = do cancelTransfer pause t = do
webapp <- getYesod webapp <- getYesod
let dstatus = daemonStatus webapp let dstatus = daemonStatus webapp
m <- getCurrentTransfers
liftIO $ do liftIO $ do
{- remove queued transfer -} {- remove queued transfer -}
void $ dequeueTransfer (transferQueue webapp) dstatus t void $ dequeueTransfer (transferQueue webapp) dstatus t
{- stop running transfer -} {- stop running transfer -}
maybe noop (stop dstatus) =<< running dstatus maybe noop (stop dstatus) (M.lookup t m)
where where
running dstatus = M.lookup t . currentTransfers
<$> getDaemonStatus dstatus
stop dstatus info = do stop dstatus info = do
{- When there's a thread associated with the {- When there's a thread associated with the
- transfer, it's killed first, to avoid it - transfer, it's killed first, to avoid it
@ -197,3 +192,25 @@ cancelTransfer pause t = do
void $ tryIO $ signalProcessGroup sigTERM g void $ tryIO $ signalProcessGroup sigTERM g
threadDelay 100000 -- 0.1 second grace period threadDelay 100000 -- 0.1 second grace period
void $ tryIO $ signalProcessGroup sigKILL g void $ tryIO $ signalProcessGroup sigKILL g
startTransfer :: Transfer -> Handler ()
startTransfer t = do
m <- getCurrentTransfers
maybe noop resume (M.lookup t m)
-- TODO: handle starting a queued transfer
where
resume info = maybe (start info) signalthread $ transferTid info
signalthread tid = liftIO $ throwTo tid ResumeTransfer
start info = do
webapp <- getYesod
let dstatus = daemonStatus webapp
let slots = transferSlots webapp
{- This transfer was being run by another process,
- forget that old pid, and start a new one. -}
liftIO $ updateTransferInfo dstatus t $ info
{ transferPid = Nothing }
liftIO $ Transferrer.transferThread dstatus slots t info
getCurrentTransfers :: Handler TransferMap
getCurrentTransfers = currentTransfers
<$> (liftIO . getDaemonStatus =<< daemonStatus <$> getYesod)

View file

@ -13,6 +13,7 @@ import Assistant
import Assistant.DaemonStatus import Assistant.DaemonStatus
import Assistant.ScanRemotes import Assistant.ScanRemotes
import Assistant.TransferQueue import Assistant.TransferQueue
import Assistant.TransferSlots
import Assistant.Threads.WebApp import Assistant.Threads.WebApp
import Utility.WebApp import Utility.WebApp
import Utility.Daemon (checkDaemon, lockPidFile) import Utility.Daemon (checkDaemon, lockPidFile)
@ -89,9 +90,10 @@ firstRun = do
dstatus <- atomically . newTMVar =<< newDaemonStatus dstatus <- atomically . newTMVar =<< newDaemonStatus
scanremotes <- newScanRemoteMap scanremotes <- newScanRemoteMap
transferqueue <- newTransferQueue transferqueue <- newTransferQueue
transferslots <- newTransferSlots
v <- newEmptyMVar v <- newEmptyMVar
let callback a = Just $ a v let callback a = Just $ a v
webAppThread Nothing dstatus scanremotes transferqueue webAppThread Nothing dstatus scanremotes transferqueue transferslots
(callback signaler) (callback mainthread) (callback signaler) (callback mainthread)
where where
signaler v = do signaler v = do