From d3f78da0ed4e298572c698ea39a5fea03bcd3e07 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Fri, 11 Dec 2020 15:28:58 -0400 Subject: [PATCH] propagate signals to the transferrer process group Done on unix, could not implement it on windows quite. The signal library gets part of the way needed for windows. But I had to open https://github.com/pmlodawski/signal/issues/1 because it lacks raiseSignal. Also, I don't know what the equivilant of getProcessGroupIDOf is on windows. And System.Process does not provide a way to send any signal to a process group except for SIGINT. This commit was sponsored by Boyd Stephen Smith Jr. on Patreon. --- Annex.hs | 15 ++-- Annex/Action.hs | 34 +++++++- Annex/AdjustedBranch.hs | 2 +- Annex/Concurrent.hs | 4 +- Annex/Ssh.hs | 2 +- Annex/Tmp.hs | 2 +- Annex/TransferrerPool.hs | 81 +++++++++++++------ Assistant/TransferSlots.hs | 4 +- Command/Fsck.hs | 2 +- Command/Transferrer.hs | 24 +++++- Remote/BitTorrent.hs | 2 +- Remote/External.hs | 2 +- Remote/Git.hs | 2 +- Remote/Helper/Hooks.hs | 2 +- Types/CleanupActions.hs | 9 ++- Types/TransferrerPool.hs | 4 + ..._c_propagation_to_transferrer_process.mdwn | 7 +- 17 files changed, 150 insertions(+), 48 deletions(-) diff --git a/Annex.hs b/Annex.hs index ed2aaf011c..b38468ebd0 100644 --- a/Annex.hs +++ b/Annex.hs @@ -22,7 +22,7 @@ module Annex ( setOutput, getFlag, getField, - addCleanup, + addCleanupAction, gitRepo, inRepo, fromRepo, @@ -140,7 +140,8 @@ data AnnexState = AnnexState , sshstalecleaned :: TMVar Bool , flags :: M.Map String Bool , fields :: M.Map String String - , cleanup :: M.Map CleanupAction (Annex ()) + , cleanupactions :: M.Map CleanupAction (Annex ()) + , signalactions :: TVar (M.Map SignalAction (Int -> IO ())) , sentinalstatus :: Maybe SentinalStatus , useragent :: Maybe String , errcounter :: Integer @@ -164,6 +165,7 @@ newState :: GitConfig -> Git.Repo -> IO AnnexState newState c r = do emptyactiveremotes <- newMVar M.empty emptyactivekeys <- newTVarIO M.empty + si <- newTVarIO M.empty o <- newMessageState sc <- newTMVarIO False kh <- Keys.newDbHandle @@ -203,7 +205,8 @@ newState c r = do , sshstalecleaned = sc , flags = M.empty , fields = M.empty - , cleanup = M.empty + , cleanupactions = M.empty + , signalactions = si , sentinalstatus = Nothing , useragent = Nothing , errcounter = 0 @@ -289,9 +292,9 @@ setField field value = changeState $ \s -> s { fields = M.insert field value $ fields s } {- Adds a cleanup action to perform. -} -addCleanup :: CleanupAction -> Annex () -> Annex () -addCleanup k a = changeState $ \s -> - s { cleanup = M.insert k a $ cleanup s } +addCleanupAction :: CleanupAction -> Annex () -> Annex () +addCleanupAction k a = changeState $ \s -> + s { cleanupactions = M.insert k a $ cleanupactions s } {- Sets the type of output to emit. -} setOutput :: OutputType -> Annex () diff --git a/Annex/Action.hs b/Annex/Action.hs index b1ab48f9a2..f23a564194 100644 --- a/Annex/Action.hs +++ b/Annex/Action.hs @@ -5,6 +5,8 @@ - Licensed under the GNU AGPL version 3 or higher. -} +{-# LANGUAGE CPP #-} + module Annex.Action ( action, verifiedAction, @@ -25,6 +27,11 @@ import Annex.HashObject import Annex.CheckIgnore import Annex.TransferrerPool +import Control.Concurrent.STM +#ifndef mingw32_HOST_OS +import System.Posix.Signals +#endif + {- Runs an action that may throw exceptions, catching and displaying them. -} action :: Annex () -> Annex Bool action a = tryNonAsync a >>= \case @@ -43,13 +50,36 @@ verifiedAction a = tryNonAsync a >>= \case {- Actions to perform each time ran. -} startup :: Annex () -startup = return () +startup = do +#ifndef mingw32_HOST_OS + av <- Annex.getState Annex.signalactions + let propagate sig = liftIO $ installhandleronce sig av + propagate sigINT + propagate sigQUIT + propagate sigTERM + propagate sigTSTP + propagate sigCONT + propagate sigHUP + -- sigWINCH is not propagated; it should not be needed, + -- and the concurrent-output library installs its own signal + -- handler for it. + -- sigSTOP and sigKILL cannot be caught, so will not be propagated. + where + installhandleronce sig av = void $ + installHandler sig (CatchOnce (gotsignal sig av)) Nothing + gotsignal sig av = do + mapM_ (\a -> a (fromIntegral sig)) =<< atomically (readTVar av) + raiseSignal sig + installhandleronce sig av +#else + return () +#endif {- Cleanup actions. -} shutdown :: Bool -> Annex () shutdown nocommit = do saveState nocommit - sequence_ =<< M.elems <$> Annex.getState Annex.cleanup + sequence_ =<< M.elems <$> Annex.getState Annex.cleanupactions stopCoProcesses {- Stops all long-running child processes, including git query processes. -} diff --git a/Annex/AdjustedBranch.hs b/Annex/AdjustedBranch.hs index 794493846c..e8709422ae 100644 --- a/Annex/AdjustedBranch.hs +++ b/Annex/AdjustedBranch.hs @@ -296,7 +296,7 @@ adjustedBranchRefresh _af a = do unless (adjustmentIsStable adj) $ ifM (checkcounter n) ( update adj origbranch - , Annex.addCleanup AdjustedBranchUpdate $ + , Annex.addCleanupAction AdjustedBranchUpdate $ adjustedBranchRefreshFull adj origbranch ) _ -> return () diff --git a/Annex/Concurrent.hs b/Annex/Concurrent.hs index 6f10e33885..96ece9d135 100644 --- a/Annex/Concurrent.hs +++ b/Annex/Concurrent.hs @@ -100,8 +100,8 @@ dupState = do mergeState :: AnnexState -> Annex () mergeState st = do st' <- liftIO $ snd <$> run st stopNonConcurrentSafeCoProcesses - forM_ (M.toList $ Annex.cleanup st') $ - uncurry addCleanup + forM_ (M.toList $ Annex.cleanupactions st') $ + uncurry addCleanupAction Annex.Queue.mergeFrom st' changeState $ \s -> s { errcounter = errcounter s + errcounter st' } diff --git a/Annex/Ssh.hs b/Annex/Ssh.hs index 3abf8e3e0a..48578a4680 100644 --- a/Annex/Ssh.hs +++ b/Annex/Ssh.hs @@ -230,7 +230,7 @@ prepSocket socketfile sshhost sshparams = do sshCleanup liftIO $ atomically $ putTMVar tv True -- Cleanup at shutdown. - Annex.addCleanup SshCachingCleanup sshCleanup + Annex.addCleanupAction SshCachingCleanup sshCleanup let socketlock = socket2lock socketfile diff --git a/Annex/Tmp.hs b/Annex/Tmp.hs index b19daeb18d..720d1f4945 100644 --- a/Annex/Tmp.hs +++ b/Annex/Tmp.hs @@ -24,7 +24,7 @@ import Data.Time.Clock.POSIX -- any time. withOtherTmp :: (RawFilePath -> Annex a) -> Annex a withOtherTmp a = do - Annex.addCleanup OtherTmpCleanup cleanupOtherTmp + Annex.addCleanupAction OtherTmpCleanup cleanupOtherTmp tmpdir <- fromRepo gitAnnexTmpOtherDir tmplck <- fromRepo gitAnnexTmpOtherLock withSharedLock (const tmplck) $ do diff --git a/Annex/TransferrerPool.hs b/Annex/TransferrerPool.hs index 2d2aceebc5..e9476c8826 100644 --- a/Annex/TransferrerPool.hs +++ b/Annex/TransferrerPool.hs @@ -6,6 +6,7 @@ -} {-# LANGUAGE RankNTypes #-} +{-# LANGUAGE CPP #-} module Annex.TransferrerPool where @@ -17,6 +18,7 @@ import Types.Transfer import qualified Types.Remote as Remote import Types.StallDetection import Types.Messages +import Types.CleanupActions import Messages.Serialized import Annex.Path import Utility.Batch @@ -31,6 +33,13 @@ import Control.Concurrent.STM hiding (check) import Control.Monad.IO.Class (MonadIO) import Data.Time.Clock.POSIX import System.Log.Logger (debugM) +import qualified Data.Map as M +#ifndef mingw32_HOST_OS +import System.Posix.Signals +import System.Posix.Process (getProcessGroupIDOf) +#endif + +type SignalActionsVar = TVar (M.Map SignalAction (Int -> IO ())) {- Runs an action with a Transferrer from the pool. -} withTransferrer :: (Transferrer -> Annex a) -> Annex a @@ -38,7 +47,8 @@ withTransferrer a = do program <- liftIO programPath pool <- Annex.getState Annex.transferrerpool let nocheck = pure (pure True) - withTransferrer' False nocheck program nonBatchCommandMaker pool a + signalactonsvar <- Annex.getState Annex.signalactions + withTransferrer' False signalactonsvar nocheck program nonBatchCommandMaker pool a withTransferrer' :: (MonadIO m, MonadMask m) @@ -47,20 +57,21 @@ withTransferrer' -- running in the pool at a time. So if this needed to start a -- new Transferrer, it's stopped when done. Otherwise, idle -- processes are left in the pool for use later. + -> SignalActionsVar -> MkCheckTransferrer -> FilePath -> BatchCommandMaker -> TransferrerPool -> (Transferrer -> m a) -> m a -withTransferrer' minimizeprocesses mkcheck program batchmaker pool a = do +withTransferrer' minimizeprocesses signalactonsvar mkcheck program batchmaker pool a = do (mi, leftinpool) <- liftIO $ atomically (popTransferrerPool pool) (i@(TransferrerPoolItem _ check), t) <- liftIO $ case mi of Nothing -> do - t <- mkTransferrer program batchmaker + t <- mkTransferrer signalactonsvar program batchmaker i <- mkTransferrerPoolItem mkcheck t return (i, t) - Just i -> checkTransferrerPoolItem program batchmaker i + Just i -> checkTransferrerPoolItem signalactonsvar program batchmaker i a t `finally` returntopool leftinpool check t i where returntopool leftinpool check t i @@ -71,23 +82,23 @@ withTransferrer' minimizeprocesses mkcheck program batchmaker pool a = do liftIO $ whenM (hIsOpen (transferrerWrite t)) $ liftIO $ atomically $ pushTransferrerPool pool i | otherwise = liftIO $ do - void $ forkIO $ shutdownTransferrer t + void $ forkIO $ transferrerShutdown t atomically $ pushTransferrerPool pool $ TransferrerPoolItem Nothing check {- Check if a Transferrer from the pool is still ok to be used. - If not, stop it and start a new one. -} -checkTransferrerPoolItem :: FilePath -> BatchCommandMaker -> TransferrerPoolItem -> IO (TransferrerPoolItem, Transferrer) -checkTransferrerPoolItem program batchmaker i = case i of +checkTransferrerPoolItem :: SignalActionsVar -> FilePath -> BatchCommandMaker -> TransferrerPoolItem -> IO (TransferrerPoolItem, Transferrer) +checkTransferrerPoolItem signalactonsvar program batchmaker i = case i of TransferrerPoolItem (Just t) check -> ifM check ( return (i, t) , do - shutdownTransferrer t + transferrerShutdown t new check ) TransferrerPoolItem Nothing check -> new check where new check = do - t <- mkTransferrer program batchmaker + t <- mkTransferrer signalactonsvar program batchmaker return (TransferrerPoolItem (Just t) check, t) data TransferRequestLevel = AnnexLevel | AssistantLevel @@ -193,22 +204,54 @@ detectStalls (Just (StallDetection minsz duration)) metervar onstall = go Nothin {- Starts a new git-annex transfer process, setting up handles - that will be used to communicate with it. -} -mkTransferrer :: FilePath -> BatchCommandMaker -> IO Transferrer -mkTransferrer program batchmaker = do +mkTransferrer :: SignalActionsVar -> FilePath -> BatchCommandMaker -> IO Transferrer +mkTransferrer signalactonsvar program batchmaker = do {- It runs as a batch job. -} let (program', params') = batchmaker (program, [Param "transferrer"]) {- It's put into its own group so that the whole group can be - killed to stop a transfer. -} - (Just writeh, Just readh, _, pid) <- createProcess + (Just writeh, Just readh, _, ph) <- createProcess (proc program' $ toCommand params') { create_group = True , std_in = CreatePipe , std_out = CreatePipe } + + {- Set up signal propagation, so eg ctrl-c will also interrupt + - the processes in the transferrer's process group. + - + - There is a race between the process being created and this point. + - If a signal is received before this can run, it is not sent to + - the transferrer. This leaves the transferrer waiting for the + - first message on stdin to tell what to do. If the signal kills + - this parent process, the transferrer will then get a sigpipe + - and die too. If the signal suspends this parent process, + - it's ok to leave the transferrer running, as it's waiting on + - the pipe until this process wakes back up. + -} +#ifndef mingw32_HOST_OS + pid <- getPid ph + unregistersignalprop <- case pid of + Just p -> getProcessGroupIDOf p >>= \pgrp -> do + atomically $ modifyTVar' signalactonsvar $ + M.insert (PropagateSignalProcessGroup p) $ \sig -> + signalProcessGroup (fromIntegral sig) pgrp + return $ atomically $ modifyTVar' signalactonsvar $ + M.delete (PropagateSignalProcessGroup p) + Nothing -> return noop +#else + let unregistersignalprop = noop +#endif + return $ Transferrer { transferrerRead = readh , transferrerWrite = writeh - , transferrerHandle = pid + , transferrerHandle = ph + , transferrerShutdown = do + hClose readh + hClose writeh + void $ waitForProcess ph + unregistersignalprop } -- | Send a request to perform a transfer. @@ -237,7 +280,7 @@ sendSerializedOutputResponse h sor = do hPutStrLn h l hFlush h --- | Read a response to a transfer requests. +-- | Read a response to a transfer request. -- -- Before the final response, this will return whatever SerializedOutput -- should be displayed as the transfer is performed. @@ -253,14 +296,6 @@ readResponse h = do transferrerProtocolError :: String -> a transferrerProtocolError l = giveup $ "transferrer protocol error: " ++ show l -{- Closing the fds will shut down the transferrer, but only when it's - - in between transfers. -} -shutdownTransferrer :: Transferrer -> IO () -shutdownTransferrer t = do - hClose $ transferrerRead t - hClose $ transferrerWrite t - void $ waitForProcess $ transferrerHandle t - {- Kill the transferrer, and all its child processes. -} killTransferrer :: Transferrer -> IO () killTransferrer t = do @@ -274,5 +309,5 @@ emptyTransferrerPool = do poolvar <- Annex.getState Annex.transferrerpool pool <- liftIO $ atomically $ swapTVar poolvar [] liftIO $ forM_ pool $ \case - TransferrerPoolItem (Just t) _ -> shutdownTransferrer t + TransferrerPoolItem (Just t) _ -> transferrerShutdown t TransferrerPoolItem Nothing _ -> noop diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs index ee3e17e280..6d51b1a0b1 100644 --- a/Assistant/TransferSlots.hs +++ b/Assistant/TransferSlots.hs @@ -93,7 +93,9 @@ runTransferThread' mkcheck program batchmaker d run = go go = catchPauseResume $ do p <- runAssistant d $ liftAnnex $ Annex.getState Annex.transferrerpool - withTransferrer' True mkcheck program batchmaker p run + signalactonsvar <- runAssistant d $ liftAnnex $ + Annex.getState Annex.signalactions + withTransferrer' True signalactonsvar mkcheck program batchmaker p run pause = catchPauseResume $ runEvery (Seconds 86400) noop {- Note: This must use E.try, rather than E.catch. diff --git a/Command/Fsck.hs b/Command/Fsck.hs index 067300162f..439d68f4c1 100644 --- a/Command/Fsck.hs +++ b/Command/Fsck.hs @@ -662,7 +662,7 @@ cleanupIncremental _ = return () openFsckDb :: UUID -> Annex FsckDb.FsckHandle openFsckDb u = do h <- FsckDb.openDb u - Annex.addCleanup FsckCleanup $ + Annex.addCleanupAction FsckCleanup $ FsckDb.closeDb h return h diff --git a/Command/Transferrer.hs b/Command/Transferrer.hs index 9376aefecf..116c64c5a3 100644 --- a/Command/Transferrer.hs +++ b/Command/Transferrer.hs @@ -34,7 +34,7 @@ start = do (readh, writeh) <- liftIO dupIoHandles let outputwriter = sendTransferResponse writeh . TransferOutput let outputresponsereader = do - l <- hGetLine readh + l <- getNextLine readh return $ case Proto.parseMessage l of Just (TransferSerializedOutputResponse r) -> Just r Nothing -> Nothing @@ -90,7 +90,7 @@ runRequests runRequests readh writeh a = go Nothing Nothing where go lastremoteoruuid lastremote = unlessM (liftIO $ hIsEOF readh) $ do - l <- liftIO $ hGetLine readh + l <- liftIO $ getNextLine readh case Proto.parseMessage l of Just tr -> do let remoteoruuid = transferRequestRemote tr @@ -114,6 +114,24 @@ runRequests readh writeh a = go Nothing Nothing sendresult = liftIO . sendTransferResponse writeh . TransferResult sendTransferResponse :: Handle -> TransferResponse -> IO () -sendTransferResponse h r = do +sendTransferResponse h r = silenceIOErrors $ do hPutStrLn h $ unwords $ Proto.formatMessage r hFlush h + +getNextLine :: Handle -> IO String +getNextLine = silenceIOErrors . hGetLine + +{- If the pipe we're talking to gets closed due to the parent git-annex + - having exited, read/write would throw an exception due to sigpipe, + - which gets displayed on the console in an ugly way. This silences that + - display, and exits on exception instead. + - + - Normally signals like SIGINT get propagated to this process + - from the parent process. However, since this process is run in its own + - process group, that propagation requires the parent to actively + - propagate the signal. One way that could not happen is if the parent + - gets a signal it cannot catch. Another way is if the parent is hit by + - the signal before it can set up the signal propagation. + -} +silenceIOErrors :: IO a -> IO a +silenceIOErrors a = catchIO a (const exitFailure) diff --git a/Remote/BitTorrent.hs b/Remote/BitTorrent.hs index ce19e7e188..a60b58506c 100644 --- a/Remote/BitTorrent.hs +++ b/Remote/BitTorrent.hs @@ -181,7 +181,7 @@ tmpTorrentFile u = fromRepo . gitAnnexTmpObjectLocation =<< torrentUrlKey u - torrent file once. -} registerTorrentCleanup :: URLString -> Annex () -registerTorrentCleanup u = Annex.addCleanup (TorrentCleanup u) $ +registerTorrentCleanup u = Annex.addCleanupAction (TorrentCleanup u) $ liftIO . removeWhenExistsWith R.removeLink =<< tmpTorrentFile u {- Downloads the torrent file. (Not its contents.) -} diff --git a/Remote/External.hs b/Remote/External.hs index 2ece936b8f..255c5e2456 100644 --- a/Remote/External.hs +++ b/Remote/External.hs @@ -81,7 +81,7 @@ gen r u rc gc rs | otherwise = do c <- parsedRemoteConfig remote rc external <- newExternal externaltype (Just u) c (Just gc) (Just rs) - Annex.addCleanup (RemoteCleanup u) $ stopExternal external + Annex.addCleanupAction (RemoteCleanup u) $ stopExternal external cst <- getCost external r gc avail <- getAvailability external r gc exportsupported <- if exportTree c diff --git a/Remote/Git.hs b/Remote/Git.hs index b790915a33..0021eb2d62 100644 --- a/Remote/Git.hs +++ b/Remote/Git.hs @@ -832,7 +832,7 @@ rsyncOrCopyFile st rsyncparams src dest p = commitOnCleanup :: Git.Repo -> Remote -> State -> Annex a -> Annex a commitOnCleanup repo r st a = go `after` a where - go = Annex.addCleanup (RemoteCleanup $ uuid r) cleanup + go = Annex.addCleanupAction (RemoteCleanup $ uuid r) cleanup cleanup | not $ Git.repoIsUrl repo = onLocalFast st $ doQuietSideAction $ diff --git a/Remote/Helper/Hooks.hs b/Remote/Helper/Hooks.hs index 2d0c62f197..c3f487516d 100644 --- a/Remote/Helper/Hooks.hs +++ b/Remote/Helper/Hooks.hs @@ -76,7 +76,7 @@ runHooks r starthook stophook a = do -- So, requiring idempotency is the right approach. run starthook - Annex.addCleanup (StopHook $ uuid r) $ runstop lck + Annex.addCleanupAction (StopHook $ uuid r) $ runstop lck runstop lck = do -- Drop any shared lock we have, and take an -- exclusive lock, without blocking. If the lock diff --git a/Types/CleanupActions.hs b/Types/CleanupActions.hs index 23dc7e748a..418472ef3e 100644 --- a/Types/CleanupActions.hs +++ b/Types/CleanupActions.hs @@ -1,6 +1,6 @@ {- Enumeration of cleanup actions - - - Copyright 2014 Joey Hess + - Copyright 2014-2020 Joey Hess - - Licensed under the GNU AGPL version 3 or higher. -} @@ -8,9 +8,10 @@ module Types.CleanupActions where import Types.UUID - import Utility.Url +import System.Process (Pid) + data CleanupAction = RemoteCleanup UUID | StopHook UUID @@ -20,3 +21,7 @@ data CleanupAction | TorrentCleanup URLString | OtherTmpCleanup deriving (Eq, Ord) + +data SignalAction + = PropagateSignalProcessGroup Pid + deriving (Eq, Ord) diff --git a/Types/TransferrerPool.hs b/Types/TransferrerPool.hs index e2150e3008..c8c560e682 100644 --- a/Types/TransferrerPool.hs +++ b/Types/TransferrerPool.hs @@ -25,6 +25,10 @@ data Transferrer = Transferrer { transferrerRead :: Handle , transferrerWrite :: Handle , transferrerHandle :: ProcessHandle + , transferrerShutdown :: IO () + -- ^ Closes the FDs and waits for the process to exit. + -- Should be used when the transferrer is in between transfers, + -- as otherwise it may not shutdown promptly. } newTransferrerPool :: IO TransferrerPool diff --git a/doc/todo/ctrl_c_propagation_to_transferrer_process.mdwn b/doc/todo/ctrl_c_propagation_to_transferrer_process.mdwn index 6cb74e5fbd..e0c41ce031 100644 --- a/doc/todo/ctrl_c_propagation_to_transferrer_process.mdwn +++ b/doc/todo/ctrl_c_propagation_to_transferrer_process.mdwn @@ -11,7 +11,12 @@ along with it when a stall is detected. Maybe what's needed is a SIGINT handler in the main git-annex that signals all the transferrer processes with SIGINT and waits on them -exiting. Unsure if that can be implemented in haskell? +exiting. And other signals, eg SIGTSTP for ctrl-z. + +> Implemented this, but not for windows (yet). But not gonna leave open +> for something that on windows in my experience does not work very +> reliably in general. (I've many times hit ctrl-c in a windows terminal and +> had the whole terminal lock up.) So, [[done]] --[[Joey]] Or, note that it would suffice to remove the child process group stuff, if we assume that all child processes started by git-annex transferrer are