300 lines
10 KiB
Haskell
300 lines
10 KiB
Haskell
{- A pool of "git-annex transferrer" processes
|
|
-
|
|
- Copyright 2013-2022 Joey Hess <id@joeyh.name>
|
|
-
|
|
- Licensed under the GNU AGPL version 3 or higher.
|
|
-}
|
|
|
|
{-# LANGUAGE OverloadedStrings #-}
|
|
{-# LANGUAGE RankNTypes #-}
|
|
{-# LANGUAGE CPP #-}
|
|
|
|
module Annex.TransferrerPool where
|
|
|
|
import Annex.Common
|
|
import qualified Annex
|
|
import Types.TransferrerPool
|
|
import Types.Transferrer
|
|
import Types.Transfer
|
|
import qualified Types.Remote as Remote
|
|
import Types.Messages
|
|
import Types.CleanupActions
|
|
import Messages.Serialized
|
|
import Annex.Path
|
|
import Annex.StallDetection
|
|
import Annex.Link
|
|
import Utility.Batch
|
|
import Utility.Metered
|
|
import qualified Utility.SimpleProtocol as Proto
|
|
|
|
import Control.Concurrent
|
|
import Control.Concurrent.Async
|
|
import Control.Concurrent.STM hiding (check)
|
|
import Control.Monad.IO.Class (MonadIO)
|
|
import qualified Data.Map as M
|
|
#ifndef mingw32_HOST_OS
|
|
import System.Posix.Signals
|
|
import System.Posix.Process (getProcessGroupIDOf)
|
|
#endif
|
|
|
|
type SignalActionsVar = TVar (M.Map SignalAction (Int -> IO ()))
|
|
|
|
data RunTransferrer = RunTransferrer String [CommandParam] BatchCommandMaker
|
|
|
|
mkRunTransferrer :: BatchCommandMaker -> Annex RunTransferrer
|
|
mkRunTransferrer batchmaker = RunTransferrer
|
|
<$> liftIO programPath
|
|
<*> gitAnnexChildProcessParams "transferrer" []
|
|
<*> pure batchmaker
|
|
|
|
{- Runs an action with a Transferrer from the pool. -}
|
|
withTransferrer :: (Transferrer -> Annex a) -> Annex a
|
|
withTransferrer a = do
|
|
rt <- mkRunTransferrer nonBatchCommandMaker
|
|
pool <- Annex.getRead Annex.transferrerpool
|
|
let nocheck = pure (pure True)
|
|
signalactonsvar <- Annex.getRead Annex.signalactions
|
|
withTransferrer' False signalactonsvar nocheck rt pool a
|
|
|
|
withTransferrer'
|
|
:: (MonadIO m, MonadMask m)
|
|
=> Bool
|
|
-- ^ When minimizeprocesses is True, only one Transferrer is left
|
|
-- running in the pool at a time. So if this needed to start a
|
|
-- new Transferrer, it's stopped when done. Otherwise, idle
|
|
-- processes are left in the pool for use later.
|
|
-> SignalActionsVar
|
|
-> MkCheckTransferrer
|
|
-> RunTransferrer
|
|
-> TransferrerPool
|
|
-> (Transferrer -> m a)
|
|
-> m a
|
|
withTransferrer' minimizeprocesses signalactonsvar mkcheck rt pool a = do
|
|
(mi, leftinpool) <- liftIO $ atomically (popTransferrerPool pool)
|
|
(i@(TransferrerPoolItem _ check), t) <- liftIO $ case mi of
|
|
Nothing -> do
|
|
t <- mkTransferrer signalactonsvar rt
|
|
i <- mkTransferrerPoolItem mkcheck t
|
|
return (i, t)
|
|
Just i -> checkTransferrerPoolItem signalactonsvar rt i
|
|
a t `finally` returntopool leftinpool check t i
|
|
where
|
|
returntopool leftinpool check t i
|
|
| not minimizeprocesses || leftinpool == 0 =
|
|
-- If the transferrer got killed, the handles will
|
|
-- be closed, so it should not be returned to the
|
|
-- pool.
|
|
liftIO $ whenM (hIsOpen (transferrerWrite t)) $
|
|
liftIO $ atomically $ pushTransferrerPool pool i
|
|
| otherwise = liftIO $ do
|
|
void $ forkIO $ transferrerShutdown t
|
|
atomically $ pushTransferrerPool pool $ TransferrerPoolItem Nothing check
|
|
|
|
{- Check if a Transferrer from the pool is still ok to be used.
|
|
- If not, stop it and start a new one. -}
|
|
checkTransferrerPoolItem :: SignalActionsVar -> RunTransferrer -> TransferrerPoolItem -> IO (TransferrerPoolItem, Transferrer)
|
|
checkTransferrerPoolItem signalactonsvar rt i = case i of
|
|
TransferrerPoolItem (Just t) check -> ifM check
|
|
( return (i, t)
|
|
, do
|
|
transferrerShutdown t
|
|
new check
|
|
)
|
|
TransferrerPoolItem Nothing check -> new check
|
|
where
|
|
new check = do
|
|
t <- mkTransferrer signalactonsvar rt
|
|
return (TransferrerPoolItem (Just t) check, t)
|
|
|
|
data TransferRequestLevel = AnnexLevel | AssistantLevel
|
|
deriving (Show)
|
|
|
|
{- Requests that a Transferrer perform a Transfer, and waits for it to
|
|
- finish.
|
|
-
|
|
- When a stall is detected, kills the Transferrer.
|
|
-
|
|
- If the transfer failed or stalled, returns TransferInfo with an
|
|
- updated bytesComplete reflecting how much data has been transferred.
|
|
-}
|
|
performTransfer
|
|
:: (Monad m, MonadIO m, MonadMask m)
|
|
=> Maybe StallDetection
|
|
-> TransferRequestLevel
|
|
-> (forall a. Annex a -> m a)
|
|
-- ^ Run an annex action in the monad. Will not be used with
|
|
-- actions that block for a long time.
|
|
-> Maybe Remote
|
|
-> Transfer
|
|
-> TransferInfo
|
|
-> Transferrer
|
|
-> m (Either TransferInfo ())
|
|
performTransfer stalldetection level runannex r t info transferrer = do
|
|
bpv <- liftIO $ newTVarIO zeroBytesProcessed
|
|
ifM (catchBoolIO $ bracket setup cleanup (go bpv))
|
|
( return (Right ())
|
|
, do
|
|
n <- liftIO $ atomically $
|
|
fromBytesProcessed <$> readTVar bpv
|
|
return $ Left $ info { bytesComplete = Just n }
|
|
)
|
|
where
|
|
setup = do
|
|
liftIO $ sendRequest level t r
|
|
(associatedFile info)
|
|
(transferrerWrite transferrer)
|
|
metervar <- liftIO $ newTVarIO Nothing
|
|
stalledvar <- liftIO $ newTVarIO False
|
|
tid <- liftIO $ async $
|
|
detectStalls stalldetection metervar $ do
|
|
atomically $ writeTVar stalledvar True
|
|
killTransferrer transferrer
|
|
return (metervar, tid, stalledvar)
|
|
|
|
cleanup (_, tid, stalledvar) = do
|
|
liftIO $ uninterruptibleCancel tid
|
|
whenM (liftIO $ atomically $ readTVar stalledvar) $ do
|
|
runannex $ showLongNote "Transfer stalled"
|
|
-- Close handles, to prevent the transferrer being
|
|
-- reused since the process was killed.
|
|
liftIO $ hClose $ transferrerRead transferrer
|
|
liftIO $ hClose $ transferrerWrite transferrer
|
|
|
|
go bpv (metervar, _, _) = relaySerializedOutput
|
|
(liftIO $ readResponse (transferrerRead transferrer))
|
|
(liftIO . sendSerializedOutputResponse (transferrerWrite transferrer))
|
|
(updatemeter bpv metervar)
|
|
runannex
|
|
|
|
updatemeter bpv metervar (Just n) = liftIO $ do
|
|
atomically $ writeTVar metervar (Just n)
|
|
atomically $ writeTVar bpv n
|
|
updatemeter _bpv metervar Nothing = liftIO $
|
|
atomically $ writeTVar metervar Nothing
|
|
|
|
{- Starts a new git-annex transfer process, setting up handles
|
|
- that will be used to communicate with it. -}
|
|
mkTransferrer :: SignalActionsVar -> RunTransferrer -> IO Transferrer
|
|
#ifndef mingw32_HOST_OS
|
|
mkTransferrer signalactonsvar (RunTransferrer program params batchmaker) = do
|
|
#else
|
|
mkTransferrer _ (RunTransferrer program params batchmaker) = do
|
|
#endif
|
|
{- It runs as a batch job. -}
|
|
let (program', params') = batchmaker (program, params)
|
|
{- It's put into its own group so that the whole group can be
|
|
- killed to stop a transfer. -}
|
|
(Just writeh, Just readh, _, ph) <- createProcess
|
|
(proc program' $ toCommand params')
|
|
{ create_group = True
|
|
, std_in = CreatePipe
|
|
, std_out = CreatePipe
|
|
}
|
|
|
|
{- Set up signal propagation, so eg ctrl-c will also interrupt
|
|
- the processes in the transferrer's process group.
|
|
-
|
|
- There is a race between the process being created and this point.
|
|
- If a signal is received before this can run, it is not sent to
|
|
- the transferrer. This leaves the transferrer waiting for the
|
|
- first message on stdin to tell what to do. If the signal kills
|
|
- this parent process, the transferrer will then get a sigpipe
|
|
- and die too. If the signal suspends this parent process,
|
|
- it's ok to leave the transferrer running, as it's waiting on
|
|
- the pipe until this process wakes back up.
|
|
-}
|
|
#ifndef mingw32_HOST_OS
|
|
pid <- getPid ph
|
|
unregistersignalprop <- case pid of
|
|
Just p -> getProcessGroupIDOf p >>= \pgrp -> do
|
|
atomically $ modifyTVar' signalactonsvar $
|
|
M.insert (PropagateSignalProcessGroup p) $ \sig ->
|
|
signalProcessGroup (fromIntegral sig) pgrp
|
|
return $ atomically $ modifyTVar' signalactonsvar $
|
|
M.delete (PropagateSignalProcessGroup p)
|
|
Nothing -> return noop
|
|
#else
|
|
let unregistersignalprop = noop
|
|
#endif
|
|
|
|
return $ Transferrer
|
|
{ transferrerRead = readh
|
|
, transferrerWrite = writeh
|
|
, transferrerHandle = ph
|
|
, transferrerShutdown = do
|
|
-- The transferrer may write to stdout
|
|
-- as it's shutting down, so don't close
|
|
-- the readh right away. Instead, drain
|
|
-- anything sent to it.
|
|
drainer <- async $ void $ hGetContents readh
|
|
hClose writeh
|
|
void $ waitForProcess ph
|
|
wait drainer
|
|
hClose readh
|
|
unregistersignalprop
|
|
}
|
|
|
|
-- | Send a request to perform a transfer.
|
|
sendRequest :: TransferRequestLevel -> Transfer -> Maybe Remote -> AssociatedFile -> Handle -> IO ()
|
|
sendRequest level t mremote afile h = do
|
|
let tr = maybe
|
|
(TransferRemoteUUID (transferUUID t))
|
|
(TransferRemoteName . Remote.name)
|
|
mremote
|
|
let f = case (level, transferDirection t) of
|
|
(AnnexLevel, Upload) -> UploadRequest
|
|
(AnnexLevel, Download) -> DownloadRequest
|
|
(AssistantLevel, Upload) -> AssistantUploadRequest
|
|
(AssistantLevel, Download) -> AssistantDownloadRequest
|
|
let r = f tr (transferKey t) (TransferAssociatedFile afile)
|
|
let l = unwords $ Proto.formatMessage r
|
|
debug "Annex.TransferrerPool" ("> " ++ l)
|
|
hPutStrLn h l
|
|
hFlush h
|
|
|
|
sendSerializedOutputResponse :: Handle -> SerializedOutputResponse -> IO ()
|
|
sendSerializedOutputResponse h sor = do
|
|
let l = unwords $ Proto.formatMessage $
|
|
TransferSerializedOutputResponse sor
|
|
debug "Annex.TransferrerPool" ("> " ++ show l)
|
|
hPutStrLn h l
|
|
hFlush h
|
|
|
|
-- | Read a response to a transfer request.
|
|
--
|
|
-- Before the final response, this will return whatever SerializedOutput
|
|
-- should be displayed as the transfer is performed.
|
|
readResponse :: Handle -> IO (Either SerializedOutput Bool)
|
|
readResponse h = do
|
|
l <- liftIO $ hGetLine h
|
|
debug "Annex.TransferrerPool" ("< " ++ l)
|
|
case Proto.parseMessage l of
|
|
Just (TransferOutput so) -> return (Left so)
|
|
Just (TransferResult r) -> return (Right r)
|
|
Nothing -> transferrerProtocolError l
|
|
|
|
transferrerProtocolError :: String -> a
|
|
transferrerProtocolError l = giveup $ "transferrer protocol error: " ++ show l
|
|
|
|
{- Kill the transferrer, and all its child processes. -}
|
|
killTransferrer :: Transferrer -> IO ()
|
|
killTransferrer t = do
|
|
interruptProcessGroupOf $ transferrerHandle t
|
|
threadDelay 50000 -- 0.05 second grace period
|
|
terminateProcess $ transferrerHandle t
|
|
|
|
{- Stop all transferrers in the pool. -}
|
|
emptyTransferrerPool :: Annex ()
|
|
emptyTransferrerPool = do
|
|
poolvar <- Annex.getRead Annex.transferrerpool
|
|
pool <- liftIO $ atomically $ swapTVar poolvar []
|
|
liftIO $ forM_ pool $ \case
|
|
TransferrerPoolItem (Just t) _ -> transferrerShutdown t
|
|
TransferrerPoolItem Nothing _ -> noop
|
|
-- Transferrers usually restage pointer files themselves,
|
|
-- but when killTransferrer is used, a transferrer may have
|
|
-- pointer files it has not gotten around to restaging yet.
|
|
-- So, restage pointer files here in clean up from such killed
|
|
-- transferrers.
|
|
unless (null pool) $
|
|
restagePointerFiles =<< Annex.gitRepo
|