transfer canceling
Should work (untested) for transfers being run by other processes. Not yet by transfers being run by the assistant. killThread does not kill processes forked off by a thread. To fix this, will probably need to make `git annex getkey` and `git annex sendkey` commands that operate on keys, and write their own transfer info. Then the assistant can run them, and kill them, as needed.
This commit is contained in:
parent
09449792fa
commit
20203b45b9
3 changed files with 34 additions and 5 deletions
|
@ -13,7 +13,8 @@ module Assistant.TransferQueue (
|
||||||
queueTransfers,
|
queueTransfers,
|
||||||
queueTransfer,
|
queueTransfer,
|
||||||
queueTransferAt,
|
queueTransferAt,
|
||||||
getNextTransfer
|
getNextTransfer,
|
||||||
|
dequeueTransfer,
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Common.Annex
|
import Common.Annex
|
||||||
|
@ -30,7 +31,7 @@ import qualified Data.Map as M
|
||||||
- in parallel to allow for reading. -}
|
- in parallel to allow for reading. -}
|
||||||
data TransferQueue = TransferQueue
|
data TransferQueue = TransferQueue
|
||||||
{ queue :: TChan (Transfer, TransferInfo)
|
{ queue :: TChan (Transfer, TransferInfo)
|
||||||
, queuesize :: TVar Integer
|
, queuesize :: TVar Int
|
||||||
, queuelist :: TVar [(Transfer, TransferInfo)]
|
, queuelist :: TVar [(Transfer, TransferInfo)]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,7 +105,7 @@ queueTransfer schedule q dstatus f t remote =
|
||||||
|
|
||||||
{- Blocks until the queue is no larger than a given size, and then adds a
|
{- Blocks until the queue is no larger than a given size, and then adds a
|
||||||
- transfer to the queue. -}
|
- transfer to the queue. -}
|
||||||
queueTransferAt :: Integer -> Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
|
queueTransferAt :: Int -> Schedule -> TransferQueue -> DaemonStatusHandle -> AssociatedFile -> Transfer -> Remote -> IO ()
|
||||||
queueTransferAt wantsz schedule q dstatus f t remote = do
|
queueTransferAt wantsz schedule q dstatus f t remote = do
|
||||||
atomically $ do
|
atomically $ do
|
||||||
sz <- readTVar (queuesize q)
|
sz <- readTVar (queuesize q)
|
||||||
|
@ -132,3 +133,12 @@ getNextTransfer q dstatus acceptable = atomically $ do
|
||||||
M.insertWith' const t info
|
M.insertWith' const t info
|
||||||
return $ Just r
|
return $ Just r
|
||||||
else return Nothing
|
else return Nothing
|
||||||
|
|
||||||
|
{- Removes a transfer from the queue, if present, and returns True if it
|
||||||
|
- was present. -}
|
||||||
|
dequeueTransfer :: TransferQueue -> Transfer -> IO Bool
|
||||||
|
dequeueTransfer q t = atomically $ do
|
||||||
|
(l, removed) <- partition (\i -> fst i /= t) <$> readTVar (queuelist q)
|
||||||
|
void $ writeTVar (queuesize q) (length l)
|
||||||
|
void $ writeTVar (queuelist q) l
|
||||||
|
return $ not $ null removed
|
||||||
|
|
|
@ -27,7 +27,8 @@ newTransferSlots :: IO TransferSlots
|
||||||
newTransferSlots = newQSemN numSlots
|
newTransferSlots = newQSemN numSlots
|
||||||
|
|
||||||
{- Waits until a transfer slot becomes available, and runs a transfer
|
{- Waits until a transfer slot becomes available, and runs a transfer
|
||||||
- action in the slot, in its own thread. -}
|
- action in the slot, in its own thread. Note that this thread is
|
||||||
|
- subject to being killed when the transfer is canceled. -}
|
||||||
inTransferSlot :: TransferSlots -> ThreadState -> Annex a -> IO ThreadId
|
inTransferSlot :: TransferSlots -> ThreadState -> Annex a -> IO ThreadId
|
||||||
inTransferSlot s st a = do
|
inTransferSlot s st a = do
|
||||||
waitQSemN s 1
|
waitQSemN s 1
|
||||||
|
|
|
@ -29,6 +29,7 @@ import Yesod
|
||||||
import Text.Hamlet
|
import Text.Hamlet
|
||||||
import qualified Data.Map as M
|
import qualified Data.Map as M
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
|
import System.Posix.Signals (signalProcess, sigTERM, sigKILL)
|
||||||
|
|
||||||
{- A display of currently running and queued transfers.
|
{- A display of currently running and queued transfers.
|
||||||
-
|
-
|
||||||
|
@ -161,4 +162,21 @@ startTransfer :: Transfer -> Handler ()
|
||||||
startTransfer t = liftIO $ putStrLn "start"
|
startTransfer t = liftIO $ putStrLn "start"
|
||||||
|
|
||||||
cancelTransfer :: Transfer -> Handler ()
|
cancelTransfer :: Transfer -> Handler ()
|
||||||
cancelTransfer t = liftIO $ putStrLn "cancel"
|
cancelTransfer t = do
|
||||||
|
webapp <- getYesod
|
||||||
|
{- Remove if queued. -}
|
||||||
|
void $ liftIO $ dequeueTransfer (transferQueue webapp) t
|
||||||
|
{- When the transfer is running, don't directly remove it from the
|
||||||
|
- map, instead signal to end the transfer, and rely on the
|
||||||
|
- TransferWatcher to notice it's done and update the map. -}
|
||||||
|
mi <- liftIO $ M.lookup t . currentTransfers
|
||||||
|
<$> getDaemonStatus (daemonStatus webapp)
|
||||||
|
case mi of
|
||||||
|
Just (TransferInfo { transferTid = Just tid } ) -> do
|
||||||
|
-- TODO
|
||||||
|
error "TODO"
|
||||||
|
Just (TransferInfo { transferPid = Just pid } ) -> liftIO $ do
|
||||||
|
signalProcess sigTERM pid
|
||||||
|
threadDelay 500000 -- half a second grace period
|
||||||
|
signalProcess sigKILL pid
|
||||||
|
_ -> noop
|
||||||
|
|
Loading…
Add table
Reference in a new issue