fork off git-annex copy for transfers
This doesn't quite work, because canceling a transfer sends a signal to git-annex, but not to rsync (etc). Looked at making git-annex run in its own process group, which could then be killed, and would kill child processes. But, rsync checks if it's process group is the foreground process group and doesn't show progress if not, and when git has run git-annex, if git-annex makes a new process group, that is not the case. Also, if git has run git-annex, ctrl-c wouldn't be propigated to it if it made a new process group. So this seems like a blind alley, but recording it here just in case.
This commit is contained in:
parent
2e1f3a86ae
commit
d5e06e7b89
4 changed files with 44 additions and 63 deletions
|
@ -36,14 +36,3 @@ withThreadState a = do
|
||||||
- time. -}
|
- time. -}
|
||||||
runThreadState :: ThreadState -> Annex a -> IO a
|
runThreadState :: ThreadState -> Annex a -> IO a
|
||||||
runThreadState mvar a = modifyMVar mvar $ \state -> swap <$> Annex.run state a
|
runThreadState mvar a = modifyMVar mvar $ \state -> swap <$> Annex.run state a
|
||||||
|
|
||||||
{- Runs an Annex action, using a copy of the state from the MVar.
|
|
||||||
-
|
|
||||||
- It's up to the action to perform any necessary shutdown tasks in order
|
|
||||||
- for state to not be lost. And it's up to the caller to resynchronise
|
|
||||||
- with any changes the action makes to eg, the git-annex branch.
|
|
||||||
-}
|
|
||||||
unsafeRunThreadState :: ThreadState -> Annex a -> IO ()
|
|
||||||
unsafeRunThreadState mvar a = do
|
|
||||||
state <- readMVar mvar
|
|
||||||
void $ Annex.eval state a
|
|
||||||
|
|
|
@ -14,7 +14,6 @@ import Assistant.TransferQueue
|
||||||
import Assistant.TransferSlots
|
import Assistant.TransferSlots
|
||||||
import Assistant.Alert
|
import Assistant.Alert
|
||||||
import Logs.Transfer
|
import Logs.Transfer
|
||||||
import Logs.Presence
|
|
||||||
import Logs.Location
|
import Logs.Location
|
||||||
import Annex.Content
|
import Annex.Content
|
||||||
import qualified Remote
|
import qualified Remote
|
||||||
|
@ -41,7 +40,7 @@ transfererThread st dstatus transferqueue slots = go
|
||||||
( do
|
( do
|
||||||
debug thisThread [ "Transferring:" , show t ]
|
debug thisThread [ "Transferring:" , show t ]
|
||||||
notifyTransfer dstatus
|
notifyTransfer dstatus
|
||||||
transferThread st dstatus slots t info
|
transferThread dstatus slots t info
|
||||||
, do
|
, do
|
||||||
debug thisThread [ "Skipping unnecessary transfer:" , show t ]
|
debug thisThread [ "Skipping unnecessary transfer:" , show t ]
|
||||||
-- getNextTransfer added t to the
|
-- getNextTransfer added t to the
|
||||||
|
@ -71,22 +70,22 @@ shouldTransfer t info
|
||||||
where
|
where
|
||||||
key = transferKey t
|
key = transferKey t
|
||||||
|
|
||||||
{- A transfer is run in a separate thread, with a *copy* of the Annex
|
{- A sepeate git-annex process is forked off to run a transfer.
|
||||||
- state. This is necessary to avoid blocking the rest of the assistant
|
- This allows killing the process if the user decides to cancel the
|
||||||
- on the transfer completing, and also to allow multiple transfers to run
|
- transfer.
|
||||||
- at once. This requires GHC's threaded runtime to work!
|
|
||||||
-
|
-
|
||||||
- The copy of state means that the transfer processes are responsible
|
- A thread is forked off to run the process, and the thread
|
||||||
- for doing any necessary shutdown cleanups, and that the parent
|
- occupys one of the transfer slots. If all slots are in use, this will
|
||||||
- thread's cache must be invalidated once a transfer completes, as
|
- block until one becomes available. The thread's id is also recorded in
|
||||||
- changes may have been made to the git-annex branch.
|
- the transfer info; the thread will also be killed when a transfer is
|
||||||
-}
|
- stopped, to avoid it displaying any alert about the transfer having
|
||||||
transferThread :: ThreadState -> DaemonStatusHandle -> TransferSlots -> Transfer -> TransferInfo -> IO ()
|
- failed. -}
|
||||||
transferThread st dstatus slots t info = case (transferRemote info, associatedFile info) of
|
transferThread :: DaemonStatusHandle -> TransferSlots -> Transfer -> TransferInfo -> IO ()
|
||||||
|
transferThread dstatus slots t info = case (transferRemote info, associatedFile info) of
|
||||||
(Nothing, _) -> noop
|
(Nothing, _) -> noop
|
||||||
(_, Nothing) -> noop
|
(_, Nothing) -> noop
|
||||||
(Just remote, Just file) -> do
|
(Just remote, Just file) -> do
|
||||||
tid <- inTransferSlot slots st $
|
tid <- inTransferSlot slots $
|
||||||
transferprocess remote file
|
transferprocess remote file
|
||||||
now <- getCurrentTime
|
now <- getCurrentTime
|
||||||
adjustTransfers dstatus $
|
adjustTransfers dstatus $
|
||||||
|
@ -97,24 +96,15 @@ transferThread st dstatus slots t info = case (transferRemote info, associatedFi
|
||||||
where
|
where
|
||||||
direction = transferDirection t
|
direction = transferDirection t
|
||||||
isdownload = direction == Download
|
isdownload = direction == Download
|
||||||
tofrom
|
|
||||||
| isdownload = "from"
|
|
||||||
| otherwise = "to"
|
|
||||||
key = transferKey t
|
|
||||||
|
|
||||||
transferprocess remote file = do
|
transferprocess remote file = void $ do
|
||||||
showStart "copy" file
|
ok <- boolSystem "git-annex"
|
||||||
showAction $ tofrom ++ " " ++ Remote.name remote
|
[ Param "copy"
|
||||||
ok <- runTransfer t (Just file) $
|
, Param "--fast"
|
||||||
if isdownload
|
, Param $ if isdownload then "--from" else "--to"
|
||||||
then getViaTmp key $
|
, Param $ Remote.name remote
|
||||||
Remote.retrieveKeyFile remote key (Just file)
|
, File file
|
||||||
else do
|
]
|
||||||
ok <- Remote.storeKey remote key $ Just file
|
addAlert dstatus $
|
||||||
when ok $
|
|
||||||
Remote.logStatus remote key InfoPresent
|
|
||||||
return ok
|
|
||||||
showEndResult ok
|
|
||||||
liftIO $ addAlert dstatus $
|
|
||||||
makeAlertFiller ok $
|
makeAlertFiller ok $
|
||||||
transferFileAlert direction file
|
transferFileAlert direction file
|
||||||
|
|
|
@ -11,7 +11,6 @@ import Control.Exception
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
|
|
||||||
import Common.Annex
|
import Common.Annex
|
||||||
import Assistant.ThreadedMonad
|
|
||||||
|
|
||||||
type TransferSlots = QSemN
|
type TransferSlots = QSemN
|
||||||
|
|
||||||
|
@ -29,13 +28,12 @@ newTransferSlots = newQSemN numSlots
|
||||||
{- Waits until a transfer slot becomes available, and runs a transfer
|
{- Waits until a transfer slot becomes available, and runs a transfer
|
||||||
- action in the slot, in its own thread. Note that this thread is
|
- action in the slot, in its own thread. Note that this thread is
|
||||||
- subject to being killed when the transfer is canceled. -}
|
- subject to being killed when the transfer is canceled. -}
|
||||||
inTransferSlot :: TransferSlots -> ThreadState -> Annex a -> IO ThreadId
|
inTransferSlot :: TransferSlots -> IO () -> IO ThreadId
|
||||||
inTransferSlot s st a = do
|
inTransferSlot s a = do
|
||||||
waitQSemN s 1
|
waitQSemN s 1
|
||||||
forkIO $ bracket_ noop done run
|
forkIO $ bracket_ noop done a
|
||||||
where
|
where
|
||||||
done = transferComplete s
|
done = transferComplete s
|
||||||
run = unsafeRunThreadState st a
|
|
||||||
|
|
||||||
{- Call when a transfer is complete. -}
|
{- Call when a transfer is complete. -}
|
||||||
transferComplete :: TransferSlots -> IO ()
|
transferComplete :: TransferSlots -> IO ()
|
||||||
|
|
|
@ -164,19 +164,23 @@ startTransfer t = liftIO $ putStrLn "start"
|
||||||
cancelTransfer :: Transfer -> Handler ()
|
cancelTransfer :: Transfer -> Handler ()
|
||||||
cancelTransfer t = do
|
cancelTransfer t = do
|
||||||
webapp <- getYesod
|
webapp <- getYesod
|
||||||
{- Remove if queued. -}
|
{- remove queued transfer -}
|
||||||
void $ liftIO $ dequeueTransfer (transferQueue webapp) t
|
void $ liftIO $ dequeueTransfer (transferQueue webapp) t
|
||||||
{- When the transfer is running, don't directly remove it from the
|
{- stop running transfer -}
|
||||||
- map, instead signal to end the transfer, and rely on the
|
maybe noop (void . liftIO . stop webapp) =<< running webapp
|
||||||
- TransferWatcher to notice it's done and update the map. -}
|
where
|
||||||
mi <- liftIO $ M.lookup t . currentTransfers
|
running webapp = liftIO $ M.lookup t . currentTransfers
|
||||||
<$> getDaemonStatus (daemonStatus webapp)
|
<$> getDaemonStatus (daemonStatus webapp)
|
||||||
case mi of
|
stop webapp info = do
|
||||||
Just (TransferInfo { transferTid = Just tid } ) -> do
|
putStrLn $ "stopping transfer " ++ show info
|
||||||
-- TODO
|
{- When there's a thread associated with the
|
||||||
error "TODO"
|
- transfer, it's killed first, to avoid it
|
||||||
Just (TransferInfo { transferPid = Just pid } ) -> liftIO $ do
|
- displaying any alert about the transfer having
|
||||||
signalProcess sigTERM pid
|
- failed when the transfer process is killed. -}
|
||||||
threadDelay 500000 -- half a second grace period
|
maybe noop killThread $ transferTid info
|
||||||
signalProcess sigKILL pid
|
maybe noop killproc $ transferPid info
|
||||||
_ -> noop
|
removeTransfer (daemonStatus webapp) t
|
||||||
|
killproc pid = do
|
||||||
|
void $ tryIO $ signalProcess sigTERM pid
|
||||||
|
threadDelay 100000 -- 0.1 second grace period
|
||||||
|
void $ tryIO $ signalProcess sigKILL pid
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue