propagate signals to the transferrer process group

Done on unix, could not implement it on windows quite.

The signal library gets part of the way needed for windows.
But I had to open https://github.com/pmlodawski/signal/issues/1 because
it lacks raiseSignal.

Also, I don't know what the equivilant of getProcessGroupIDOf is on
windows. And System.Process does not provide a way to send any signal to
a process group except for SIGINT.

This commit was sponsored by Boyd Stephen Smith Jr. on Patreon.
This commit is contained in:
Joey Hess 2020-12-11 15:28:58 -04:00
parent 79c765b727
commit d3f78da0ed
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
17 changed files with 150 additions and 48 deletions

View file

@ -22,7 +22,7 @@ module Annex (
setOutput,
getFlag,
getField,
addCleanup,
addCleanupAction,
gitRepo,
inRepo,
fromRepo,
@ -140,7 +140,8 @@ data AnnexState = AnnexState
, sshstalecleaned :: TMVar Bool
, flags :: M.Map String Bool
, fields :: M.Map String String
, cleanup :: M.Map CleanupAction (Annex ())
, cleanupactions :: M.Map CleanupAction (Annex ())
, signalactions :: TVar (M.Map SignalAction (Int -> IO ()))
, sentinalstatus :: Maybe SentinalStatus
, useragent :: Maybe String
, errcounter :: Integer
@ -164,6 +165,7 @@ newState :: GitConfig -> Git.Repo -> IO AnnexState
newState c r = do
emptyactiveremotes <- newMVar M.empty
emptyactivekeys <- newTVarIO M.empty
si <- newTVarIO M.empty
o <- newMessageState
sc <- newTMVarIO False
kh <- Keys.newDbHandle
@ -203,7 +205,8 @@ newState c r = do
, sshstalecleaned = sc
, flags = M.empty
, fields = M.empty
, cleanup = M.empty
, cleanupactions = M.empty
, signalactions = si
, sentinalstatus = Nothing
, useragent = Nothing
, errcounter = 0
@ -289,9 +292,9 @@ setField field value = changeState $ \s ->
s { fields = M.insert field value $ fields s }
{- Adds a cleanup action to perform. -}
addCleanup :: CleanupAction -> Annex () -> Annex ()
addCleanup k a = changeState $ \s ->
s { cleanup = M.insert k a $ cleanup s }
addCleanupAction :: CleanupAction -> Annex () -> Annex ()
addCleanupAction k a = changeState $ \s ->
s { cleanupactions = M.insert k a $ cleanupactions s }
{- Sets the type of output to emit. -}
setOutput :: OutputType -> Annex ()

View file

@ -5,6 +5,8 @@
- Licensed under the GNU AGPL version 3 or higher.
-}
{-# LANGUAGE CPP #-}
module Annex.Action (
action,
verifiedAction,
@ -25,6 +27,11 @@ import Annex.HashObject
import Annex.CheckIgnore
import Annex.TransferrerPool
import Control.Concurrent.STM
#ifndef mingw32_HOST_OS
import System.Posix.Signals
#endif
{- Runs an action that may throw exceptions, catching and displaying them. -}
action :: Annex () -> Annex Bool
action a = tryNonAsync a >>= \case
@ -43,13 +50,36 @@ verifiedAction a = tryNonAsync a >>= \case
{- Actions to perform each time ran. -}
startup :: Annex ()
startup = return ()
startup = do
#ifndef mingw32_HOST_OS
av <- Annex.getState Annex.signalactions
let propagate sig = liftIO $ installhandleronce sig av
propagate sigINT
propagate sigQUIT
propagate sigTERM
propagate sigTSTP
propagate sigCONT
propagate sigHUP
-- sigWINCH is not propagated; it should not be needed,
-- and the concurrent-output library installs its own signal
-- handler for it.
-- sigSTOP and sigKILL cannot be caught, so will not be propagated.
where
installhandleronce sig av = void $
installHandler sig (CatchOnce (gotsignal sig av)) Nothing
gotsignal sig av = do
mapM_ (\a -> a (fromIntegral sig)) =<< atomically (readTVar av)
raiseSignal sig
installhandleronce sig av
#else
return ()
#endif
{- Cleanup actions. -}
shutdown :: Bool -> Annex ()
shutdown nocommit = do
saveState nocommit
sequence_ =<< M.elems <$> Annex.getState Annex.cleanup
sequence_ =<< M.elems <$> Annex.getState Annex.cleanupactions
stopCoProcesses
{- Stops all long-running child processes, including git query processes. -}

View file

@ -296,7 +296,7 @@ adjustedBranchRefresh _af a = do
unless (adjustmentIsStable adj) $
ifM (checkcounter n)
( update adj origbranch
, Annex.addCleanup AdjustedBranchUpdate $
, Annex.addCleanupAction AdjustedBranchUpdate $
adjustedBranchRefreshFull adj origbranch
)
_ -> return ()

View file

@ -100,8 +100,8 @@ dupState = do
mergeState :: AnnexState -> Annex ()
mergeState st = do
st' <- liftIO $ snd <$> run st stopNonConcurrentSafeCoProcesses
forM_ (M.toList $ Annex.cleanup st') $
uncurry addCleanup
forM_ (M.toList $ Annex.cleanupactions st') $
uncurry addCleanupAction
Annex.Queue.mergeFrom st'
changeState $ \s -> s { errcounter = errcounter s + errcounter st' }

View file

@ -230,7 +230,7 @@ prepSocket socketfile sshhost sshparams = do
sshCleanup
liftIO $ atomically $ putTMVar tv True
-- Cleanup at shutdown.
Annex.addCleanup SshCachingCleanup sshCleanup
Annex.addCleanupAction SshCachingCleanup sshCleanup
let socketlock = socket2lock socketfile

View file

@ -24,7 +24,7 @@ import Data.Time.Clock.POSIX
-- any time.
withOtherTmp :: (RawFilePath -> Annex a) -> Annex a
withOtherTmp a = do
Annex.addCleanup OtherTmpCleanup cleanupOtherTmp
Annex.addCleanupAction OtherTmpCleanup cleanupOtherTmp
tmpdir <- fromRepo gitAnnexTmpOtherDir
tmplck <- fromRepo gitAnnexTmpOtherLock
withSharedLock (const tmplck) $ do

View file

@ -6,6 +6,7 @@
-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE CPP #-}
module Annex.TransferrerPool where
@ -17,6 +18,7 @@ import Types.Transfer
import qualified Types.Remote as Remote
import Types.StallDetection
import Types.Messages
import Types.CleanupActions
import Messages.Serialized
import Annex.Path
import Utility.Batch
@ -31,6 +33,13 @@ import Control.Concurrent.STM hiding (check)
import Control.Monad.IO.Class (MonadIO)
import Data.Time.Clock.POSIX
import System.Log.Logger (debugM)
import qualified Data.Map as M
#ifndef mingw32_HOST_OS
import System.Posix.Signals
import System.Posix.Process (getProcessGroupIDOf)
#endif
type SignalActionsVar = TVar (M.Map SignalAction (Int -> IO ()))
{- Runs an action with a Transferrer from the pool. -}
withTransferrer :: (Transferrer -> Annex a) -> Annex a
@ -38,7 +47,8 @@ withTransferrer a = do
program <- liftIO programPath
pool <- Annex.getState Annex.transferrerpool
let nocheck = pure (pure True)
withTransferrer' False nocheck program nonBatchCommandMaker pool a
signalactonsvar <- Annex.getState Annex.signalactions
withTransferrer' False signalactonsvar nocheck program nonBatchCommandMaker pool a
withTransferrer'
:: (MonadIO m, MonadMask m)
@ -47,20 +57,21 @@ withTransferrer'
-- running in the pool at a time. So if this needed to start a
-- new Transferrer, it's stopped when done. Otherwise, idle
-- processes are left in the pool for use later.
-> SignalActionsVar
-> MkCheckTransferrer
-> FilePath
-> BatchCommandMaker
-> TransferrerPool
-> (Transferrer -> m a)
-> m a
withTransferrer' minimizeprocesses mkcheck program batchmaker pool a = do
withTransferrer' minimizeprocesses signalactonsvar mkcheck program batchmaker pool a = do
(mi, leftinpool) <- liftIO $ atomically (popTransferrerPool pool)
(i@(TransferrerPoolItem _ check), t) <- liftIO $ case mi of
Nothing -> do
t <- mkTransferrer program batchmaker
t <- mkTransferrer signalactonsvar program batchmaker
i <- mkTransferrerPoolItem mkcheck t
return (i, t)
Just i -> checkTransferrerPoolItem program batchmaker i
Just i -> checkTransferrerPoolItem signalactonsvar program batchmaker i
a t `finally` returntopool leftinpool check t i
where
returntopool leftinpool check t i
@ -71,23 +82,23 @@ withTransferrer' minimizeprocesses mkcheck program batchmaker pool a = do
liftIO $ whenM (hIsOpen (transferrerWrite t)) $
liftIO $ atomically $ pushTransferrerPool pool i
| otherwise = liftIO $ do
void $ forkIO $ shutdownTransferrer t
void $ forkIO $ transferrerShutdown t
atomically $ pushTransferrerPool pool $ TransferrerPoolItem Nothing check
{- Check if a Transferrer from the pool is still ok to be used.
- If not, stop it and start a new one. -}
checkTransferrerPoolItem :: FilePath -> BatchCommandMaker -> TransferrerPoolItem -> IO (TransferrerPoolItem, Transferrer)
checkTransferrerPoolItem program batchmaker i = case i of
checkTransferrerPoolItem :: SignalActionsVar -> FilePath -> BatchCommandMaker -> TransferrerPoolItem -> IO (TransferrerPoolItem, Transferrer)
checkTransferrerPoolItem signalactonsvar program batchmaker i = case i of
TransferrerPoolItem (Just t) check -> ifM check
( return (i, t)
, do
shutdownTransferrer t
transferrerShutdown t
new check
)
TransferrerPoolItem Nothing check -> new check
where
new check = do
t <- mkTransferrer program batchmaker
t <- mkTransferrer signalactonsvar program batchmaker
return (TransferrerPoolItem (Just t) check, t)
data TransferRequestLevel = AnnexLevel | AssistantLevel
@ -193,22 +204,54 @@ detectStalls (Just (StallDetection minsz duration)) metervar onstall = go Nothin
{- Starts a new git-annex transfer process, setting up handles
- that will be used to communicate with it. -}
mkTransferrer :: FilePath -> BatchCommandMaker -> IO Transferrer
mkTransferrer program batchmaker = do
mkTransferrer :: SignalActionsVar -> FilePath -> BatchCommandMaker -> IO Transferrer
mkTransferrer signalactonsvar program batchmaker = do
{- It runs as a batch job. -}
let (program', params') = batchmaker (program, [Param "transferrer"])
{- It's put into its own group so that the whole group can be
- killed to stop a transfer. -}
(Just writeh, Just readh, _, pid) <- createProcess
(Just writeh, Just readh, _, ph) <- createProcess
(proc program' $ toCommand params')
{ create_group = True
, std_in = CreatePipe
, std_out = CreatePipe
}
{- Set up signal propagation, so eg ctrl-c will also interrupt
- the processes in the transferrer's process group.
-
- There is a race between the process being created and this point.
- If a signal is received before this can run, it is not sent to
- the transferrer. This leaves the transferrer waiting for the
- first message on stdin to tell what to do. If the signal kills
- this parent process, the transferrer will then get a sigpipe
- and die too. If the signal suspends this parent process,
- it's ok to leave the transferrer running, as it's waiting on
- the pipe until this process wakes back up.
-}
#ifndef mingw32_HOST_OS
pid <- getPid ph
unregistersignalprop <- case pid of
Just p -> getProcessGroupIDOf p >>= \pgrp -> do
atomically $ modifyTVar' signalactonsvar $
M.insert (PropagateSignalProcessGroup p) $ \sig ->
signalProcessGroup (fromIntegral sig) pgrp
return $ atomically $ modifyTVar' signalactonsvar $
M.delete (PropagateSignalProcessGroup p)
Nothing -> return noop
#else
let unregistersignalprop = noop
#endif
return $ Transferrer
{ transferrerRead = readh
, transferrerWrite = writeh
, transferrerHandle = pid
, transferrerHandle = ph
, transferrerShutdown = do
hClose readh
hClose writeh
void $ waitForProcess ph
unregistersignalprop
}
-- | Send a request to perform a transfer.
@ -237,7 +280,7 @@ sendSerializedOutputResponse h sor = do
hPutStrLn h l
hFlush h
-- | Read a response to a transfer requests.
-- | Read a response to a transfer request.
--
-- Before the final response, this will return whatever SerializedOutput
-- should be displayed as the transfer is performed.
@ -253,14 +296,6 @@ readResponse h = do
transferrerProtocolError :: String -> a
transferrerProtocolError l = giveup $ "transferrer protocol error: " ++ show l
{- Closing the fds will shut down the transferrer, but only when it's
- in between transfers. -}
shutdownTransferrer :: Transferrer -> IO ()
shutdownTransferrer t = do
hClose $ transferrerRead t
hClose $ transferrerWrite t
void $ waitForProcess $ transferrerHandle t
{- Kill the transferrer, and all its child processes. -}
killTransferrer :: Transferrer -> IO ()
killTransferrer t = do
@ -274,5 +309,5 @@ emptyTransferrerPool = do
poolvar <- Annex.getState Annex.transferrerpool
pool <- liftIO $ atomically $ swapTVar poolvar []
liftIO $ forM_ pool $ \case
TransferrerPoolItem (Just t) _ -> shutdownTransferrer t
TransferrerPoolItem (Just t) _ -> transferrerShutdown t
TransferrerPoolItem Nothing _ -> noop

View file

@ -93,7 +93,9 @@ runTransferThread' mkcheck program batchmaker d run = go
go = catchPauseResume $ do
p <- runAssistant d $ liftAnnex $
Annex.getState Annex.transferrerpool
withTransferrer' True mkcheck program batchmaker p run
signalactonsvar <- runAssistant d $ liftAnnex $
Annex.getState Annex.signalactions
withTransferrer' True signalactonsvar mkcheck program batchmaker p run
pause = catchPauseResume $
runEvery (Seconds 86400) noop
{- Note: This must use E.try, rather than E.catch.

View file

@ -662,7 +662,7 @@ cleanupIncremental _ = return ()
openFsckDb :: UUID -> Annex FsckDb.FsckHandle
openFsckDb u = do
h <- FsckDb.openDb u
Annex.addCleanup FsckCleanup $
Annex.addCleanupAction FsckCleanup $
FsckDb.closeDb h
return h

View file

@ -34,7 +34,7 @@ start = do
(readh, writeh) <- liftIO dupIoHandles
let outputwriter = sendTransferResponse writeh . TransferOutput
let outputresponsereader = do
l <- hGetLine readh
l <- getNextLine readh
return $ case Proto.parseMessage l of
Just (TransferSerializedOutputResponse r) -> Just r
Nothing -> Nothing
@ -90,7 +90,7 @@ runRequests
runRequests readh writeh a = go Nothing Nothing
where
go lastremoteoruuid lastremote = unlessM (liftIO $ hIsEOF readh) $ do
l <- liftIO $ hGetLine readh
l <- liftIO $ getNextLine readh
case Proto.parseMessage l of
Just tr -> do
let remoteoruuid = transferRequestRemote tr
@ -114,6 +114,24 @@ runRequests readh writeh a = go Nothing Nothing
sendresult = liftIO . sendTransferResponse writeh . TransferResult
sendTransferResponse :: Handle -> TransferResponse -> IO ()
sendTransferResponse h r = do
sendTransferResponse h r = silenceIOErrors $ do
hPutStrLn h $ unwords $ Proto.formatMessage r
hFlush h
getNextLine :: Handle -> IO String
getNextLine = silenceIOErrors . hGetLine
{- If the pipe we're talking to gets closed due to the parent git-annex
- having exited, read/write would throw an exception due to sigpipe,
- which gets displayed on the console in an ugly way. This silences that
- display, and exits on exception instead.
-
- Normally signals like SIGINT get propagated to this process
- from the parent process. However, since this process is run in its own
- process group, that propagation requires the parent to actively
- propagate the signal. One way that could not happen is if the parent
- gets a signal it cannot catch. Another way is if the parent is hit by
- the signal before it can set up the signal propagation.
-}
silenceIOErrors :: IO a -> IO a
silenceIOErrors a = catchIO a (const exitFailure)

View file

@ -181,7 +181,7 @@ tmpTorrentFile u = fromRepo . gitAnnexTmpObjectLocation =<< torrentUrlKey u
- torrent file once.
-}
registerTorrentCleanup :: URLString -> Annex ()
registerTorrentCleanup u = Annex.addCleanup (TorrentCleanup u) $
registerTorrentCleanup u = Annex.addCleanupAction (TorrentCleanup u) $
liftIO . removeWhenExistsWith R.removeLink =<< tmpTorrentFile u
{- Downloads the torrent file. (Not its contents.) -}

View file

@ -81,7 +81,7 @@ gen r u rc gc rs
| otherwise = do
c <- parsedRemoteConfig remote rc
external <- newExternal externaltype (Just u) c (Just gc) (Just rs)
Annex.addCleanup (RemoteCleanup u) $ stopExternal external
Annex.addCleanupAction (RemoteCleanup u) $ stopExternal external
cst <- getCost external r gc
avail <- getAvailability external r gc
exportsupported <- if exportTree c

View file

@ -832,7 +832,7 @@ rsyncOrCopyFile st rsyncparams src dest p =
commitOnCleanup :: Git.Repo -> Remote -> State -> Annex a -> Annex a
commitOnCleanup repo r st a = go `after` a
where
go = Annex.addCleanup (RemoteCleanup $ uuid r) cleanup
go = Annex.addCleanupAction (RemoteCleanup $ uuid r) cleanup
cleanup
| not $ Git.repoIsUrl repo = onLocalFast st $
doQuietSideAction $

View file

@ -76,7 +76,7 @@ runHooks r starthook stophook a = do
-- So, requiring idempotency is the right approach.
run starthook
Annex.addCleanup (StopHook $ uuid r) $ runstop lck
Annex.addCleanupAction (StopHook $ uuid r) $ runstop lck
runstop lck = do
-- Drop any shared lock we have, and take an
-- exclusive lock, without blocking. If the lock

View file

@ -1,6 +1,6 @@
{- Enumeration of cleanup actions
-
- Copyright 2014 Joey Hess <id@joeyh.name>
- Copyright 2014-2020 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
@ -8,9 +8,10 @@
module Types.CleanupActions where
import Types.UUID
import Utility.Url
import System.Process (Pid)
data CleanupAction
= RemoteCleanup UUID
| StopHook UUID
@ -20,3 +21,7 @@ data CleanupAction
| TorrentCleanup URLString
| OtherTmpCleanup
deriving (Eq, Ord)
data SignalAction
= PropagateSignalProcessGroup Pid
deriving (Eq, Ord)

View file

@ -25,6 +25,10 @@ data Transferrer = Transferrer
{ transferrerRead :: Handle
, transferrerWrite :: Handle
, transferrerHandle :: ProcessHandle
, transferrerShutdown :: IO ()
-- ^ Closes the FDs and waits for the process to exit.
-- Should be used when the transferrer is in between transfers,
-- as otherwise it may not shutdown promptly.
}
newTransferrerPool :: IO TransferrerPool

View file

@ -11,7 +11,12 @@ along with it when a stall is detected.
Maybe what's needed is a SIGINT handler in the main git-annex that
signals all the transferrer processes with SIGINT and waits on them
exiting. Unsure if that can be implemented in haskell?
exiting. And other signals, eg SIGTSTP for ctrl-z.
> Implemented this, but not for windows (yet). But not gonna leave open
> for something that on windows in my experience does not work very
> reliably in general. (I've many times hit ctrl-c in a windows terminal and
> had the whole terminal lock up.) So, [[done]] --[[Joey]]
Or, note that it would suffice to remove the child process group stuff,
if we assume that all child processes started by git-annex transferrer are