From 5a41e46bd4c182f8165fc1e42972e2511ee178d3 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Thu, 3 Dec 2020 13:01:28 -0400 Subject: [PATCH 01/20] start on serializing Messages Json objects not yet handled, and some other special cases, but this is the bulk of the messages. For progress meters, POSIXTime does not have a Read instance (or a suitable Show instance), so had to switch to using a Double for progress meters. This commit was sponsored by Ethan Aubin on Patreon. --- Messages.hs | 3 ++- Messages/Internal.hs | 8 +++++++- Messages/Progress.hs | 26 +++++++++++++++++++------- Types/Messages.hs | 15 +++++++++++++-- Utility/Metered.hs | 35 +++++++++++++++++++++-------------- 5 files changed, 62 insertions(+), 25 deletions(-) diff --git a/Messages.hs b/Messages.hs index d3e63458ad..2940b3f539 100644 --- a/Messages.hs +++ b/Messages.hs @@ -285,9 +285,10 @@ debugEnabled = do commandProgressDisabled :: Annex Bool commandProgressDisabled = withMessageState $ \s -> return $ case outputType s of + NormalOutput -> concurrentOutputEnabled s QuietOutput -> True JSONOutput _ -> True - NormalOutput -> concurrentOutputEnabled s + SerializedOutput -> True jsonOutputEnabled :: Annex Bool jsonOutputEnabled = withMessageState $ \s -> return $ diff --git a/Messages/Internal.hs b/Messages/Internal.hs index 79829ac151..7c84b8431e 100644 --- a/Messages/Internal.hs +++ b/Messages/Internal.hs @@ -1,6 +1,6 @@ {- git-annex output messages, including concurrent output to display regions - - - Copyright 2010-2018 Joey Hess + - Copyright 2010-2020 Joey Hess - - Licensed under the GNU AGPL version 3 or higher. -} @@ -29,6 +29,7 @@ outputMessage' jsonoutputter jsonbuilder msg = withMessageState $ \s -> case out | otherwise -> liftIO $ flushed $ S.putStr msg JSONOutput _ -> void $ jsonoutputter jsonbuilder s QuietOutput -> q + SerializedOutput -> liftIO $ outputSerialized $ OutputMessage (decodeBS' msg) -- Buffer changes to JSON until end is reached and then emit it. bufferJSON :: JSONBuilder -> MessageState -> Annex Bool @@ -67,6 +68,8 @@ outputError msg = withMessageState $ \s -> case (outputType s, jsonBuffer s) of let jb' = Just (JSON.addErrorMessage (lines msg) jb) in Annex.changeState $ \st -> st { Annex.output = s { jsonBuffer = jb' } } + (SerializedOutput, _) -> + liftIO $ outputSerialized $ OutputError msg _ | concurrentOutputEnabled s -> concurrentMessage s True msg go | otherwise -> go @@ -81,3 +84,6 @@ q = noop flushed :: IO () -> IO () flushed a = a >> hFlush stdout + +outputSerialized :: SerializedOutput -> IO () +outputSerialized = print diff --git a/Messages/Progress.hs b/Messages/Progress.hs index a8588614c2..9dec9a9ca9 100644 --- a/Messages/Progress.hs +++ b/Messages/Progress.hs @@ -1,6 +1,6 @@ {- git-annex progress output - - - Copyright 2010-2019 Joey Hess + - Copyright 2010-2020 Joey Hess - - Licensed under the GNU AGPL version 3 or higher. -} @@ -20,6 +20,7 @@ import Types.KeySource import Utility.InodeCache import qualified Messages.JSON as JSON import Messages.Concurrent +import Messages.Internal import qualified System.Console.Regions as Regions import qualified System.Console.Concurrent as Console @@ -72,7 +73,7 @@ metered othermeter sizer a = withMessageState $ \st -> showOutput meter <- liftIO $ mkMeter msize $ displayMeterHandle stdout bandwidthMeter - m <- liftIO $ rateLimitMeterUpdate 0.2 meter $ + m <- liftIO $ rateLimitMeterUpdate consoleratelimit meter $ updateMeter meter r <- a meter (combinemeter m) liftIO $ clearMeterHandle meter stdout @@ -82,19 +83,24 @@ metered othermeter sizer a = withMessageState $ \st -> meter <- liftIO $ mkMeter msize $ \_ msize' old new -> let s = bandwidthMeter msize' old new in Regions.setConsoleRegion r ('\n' : s) - m <- liftIO $ rateLimitMeterUpdate 0.2 meter $ + m <- liftIO $ rateLimitMeterUpdate consoleratelimit meter $ updateMeter meter a meter (combinemeter m) go msize (MessageState { outputType = JSONOutput jsonoptions }) | jsonProgress jsonoptions = do buf <- withMessageState $ return . jsonBuffer - meter <- liftIO $ mkMeter msize $ \_ msize' _old (new, _now) -> - JSON.progress buf msize' new - m <- liftIO $ rateLimitMeterUpdate 0.1 meter $ + meter <- liftIO $ mkMeter msize $ \_ msize' _old new -> + JSON.progress buf msize' (meterBytesProcessed new) + m <- liftIO $ rateLimitMeterUpdate jsonratelimit meter $ updateMeter meter a meter (combinemeter m) | otherwise = nometer - + go msize (MessageState { outputType = SerializedOutput }) = do + meter <- liftIO $ mkMeter msize $ \_ msize' old new -> + outputSerialized $ ProgressMeter msize' old new + m <- liftIO $ rateLimitMeterUpdate minratelimit meter $ + updateMeter meter + a meter (combinemeter m) nometer = do dummymeter <- liftIO $ mkMeter Nothing $ \_ _ _ _ -> return () @@ -104,6 +110,12 @@ metered othermeter sizer a = withMessageState $ \st -> Nothing -> m Just om -> combineMeterUpdate m om + consoleratelimit = 0.2 + + jsonratelimit = 0.1 + + minratelimit = min consoleratelimit jsonratelimit + {- Poll file size to display meter. -} meteredFile :: FilePath -> Maybe MeterUpdate -> Key -> Annex a -> Annex a meteredFile file combinemeterupdate key a = diff --git a/Types/Messages.hs b/Types/Messages.hs index f4319d9cfb..22346120c8 100644 --- a/Types/Messages.hs +++ b/Types/Messages.hs @@ -1,6 +1,6 @@ {- git-annex Messages data types - - - Copyright 2012-2018 Joey Hess + - Copyright 2012-2020 Joey Hess - - Licensed under the GNU AGPL version 3 or higher. -} @@ -8,11 +8,16 @@ module Types.Messages where import qualified Utility.Aeson as Aeson +import Utility.Metered import Control.Concurrent import System.Console.Regions (ConsoleRegion) -data OutputType = NormalOutput | QuietOutput | JSONOutput JSONOptions +data OutputType + = NormalOutput + | QuietOutput + | JSONOutput JSONOptions + | SerializedOutput deriving (Show) data JSONOptions = JSONOptions @@ -53,3 +58,9 @@ newMessageState = do , jsonBuffer = Nothing , promptLock = promptlock } + +data SerializedOutput + = OutputMessage String + | OutputError String + | ProgressMeter (Maybe Integer) MeterState MeterState + deriving (Show, Read) diff --git a/Utility/Metered.hs b/Utility/Metered.hs index 3e2333cc71..338bb0e6e0 100644 --- a/Utility/Metered.hs +++ b/Utility/Metered.hs @@ -9,6 +9,7 @@ module Utility.Metered ( MeterUpdate, + MeterState(..), nullMeterUpdate, combineMeterUpdate, TotalSize(..), @@ -77,7 +78,7 @@ combineMeterUpdate a b = \n -> a n >> b n {- Total number of bytes processed so far. -} newtype BytesProcessed = BytesProcessed Integer - deriving (Eq, Ord, Show) + deriving (Eq, Ord, Show, Read) class AsBytesProcessed a where toBytesProcessed :: a -> BytesProcessed @@ -379,19 +380,24 @@ rateLimitMeterUpdate delta (Meter totalsizev _ _ _) meterupdate = do data Meter = Meter (MVar (Maybe Integer)) (MVar MeterState) (MVar String) DisplayMeter -type MeterState = (BytesProcessed, POSIXTime) +data MeterState = MeterState + { meterBytesProcessed :: BytesProcessed + , meterTimeStamp :: Double + } deriving (Show, Read) -type DisplayMeter = MVar String -> Maybe Integer -> (BytesProcessed, POSIXTime) -> (BytesProcessed, POSIXTime) -> IO () +type DisplayMeter = MVar String -> Maybe Integer -> MeterState -> MeterState -> IO () -type RenderMeter = Maybe Integer -> (BytesProcessed, POSIXTime) -> (BytesProcessed, POSIXTime) -> String +type RenderMeter = Maybe Integer -> MeterState -> MeterState -> String -- | Make a meter. Pass the total size, if it's known. mkMeter :: Maybe Integer -> DisplayMeter -> IO Meter -mkMeter totalsize displaymeter = Meter - <$> newMVar totalsize - <*> ((\t -> newMVar (zeroBytesProcessed, t)) =<< getPOSIXTime) - <*> newMVar "" - <*> pure displaymeter +mkMeter totalsize displaymeter = do + ts <- realToFrac <$> getPOSIXTime + Meter + <$> newMVar totalsize + <*> newMVar (MeterState zeroBytesProcessed ts) + <*> newMVar "" + <*> pure displaymeter setMeterTotalSize :: Meter -> Integer -> IO () setMeterTotalSize (Meter totalsizev _ _ _) = void . swapMVar totalsizev . Just @@ -399,11 +405,12 @@ setMeterTotalSize (Meter totalsizev _ _ _) = void . swapMVar totalsizev . Just -- | Updates the meter, displaying it if necessary. updateMeter :: Meter -> MeterUpdate updateMeter (Meter totalsizev sv bv displaymeter) new = do - now <- getPOSIXTime - (old, before) <- swapMVar sv (new, now) - when (old /= new) $ do + now <- realToFrac <$> getPOSIXTime + let curms = MeterState new now + oldms <- swapMVar sv curms + when (meterBytesProcessed oldms /= new) $ do totalsize <- readMVar totalsizev - displaymeter bv totalsize (old, before) (new, now) + displaymeter bv totalsize oldms curms -- | Display meter to a Handle. displayMeterHandle :: Handle -> RenderMeter -> DisplayMeter @@ -428,7 +435,7 @@ clearMeterHandle (Meter _ _ v _) h = do -- or when total size is not known: -- 1.3 MiB 300 KiB/s bandwidthMeter :: RenderMeter -bandwidthMeter mtotalsize (BytesProcessed old, before) (BytesProcessed new, now) = +bandwidthMeter mtotalsize (MeterState (BytesProcessed old) before) (MeterState (BytesProcessed new) now) = unwords $ catMaybes [ Just percentamount -- Pad enough for max width: "100% xxxx.xx KiB xxxx KiB/s" From e7f42e2ec7c269ff0377e4e92136a78a24a1b8a4 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Thu, 3 Dec 2020 14:47:04 -0400 Subject: [PATCH 02/20] when serializing messages, include json objects This is done always, it's up to the comsumer to decide if it wants to output the json objects or the messages. Messages.JSON.finalize changed to not need a JSONOptions. As far as I can see, this does not change its behavior, since addErrorMessage appends to any list that's already there. This commit was sponsored by Ethan Aubin. --- Messages/Internal.hs | 29 +++++++++++++++++++---------- Messages/JSON.hs | 12 +++++------- Types/Messages.hs | 5 ++++- 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/Messages/Internal.hs b/Messages/Internal.hs index 7c84b8431e..b2ae380699 100644 --- a/Messages/Internal.hs +++ b/Messages/Internal.hs @@ -29,26 +29,32 @@ outputMessage' jsonoutputter jsonbuilder msg = withMessageState $ \s -> case out | otherwise -> liftIO $ flushed $ S.putStr msg JSONOutput _ -> void $ jsonoutputter jsonbuilder s QuietOutput -> q - SerializedOutput -> liftIO $ outputSerialized $ OutputMessage (decodeBS' msg) + SerializedOutput -> do + liftIO $ outputSerialized $ OutputMessage msg + void $ jsonoutputter jsonbuilder s -- Buffer changes to JSON until end is reached and then emit it. bufferJSON :: JSONBuilder -> MessageState -> Annex Bool bufferJSON jsonbuilder s = case outputType s of - JSONOutput jsonoptions - | endjson -> do + JSONOutput _ -> go (flushed . JSON.emit) + SerializedOutput -> go (outputSerialized . JSONObject . JSON.encode) + _ -> return False + where + go emitter + | endjson = do Annex.changeState $ \st -> st { Annex.output = s { jsonBuffer = Nothing } } - maybe noop (liftIO . flushed . JSON.emit . JSON.finalize jsonoptions) json + maybe noop (liftIO . emitter . JSON.finalize) json return True - | otherwise -> do + | otherwise = do Annex.changeState $ \st -> st { Annex.output = s { jsonBuffer = json } } return True - _ -> return False - where + (json, endjson) = case jsonbuilder i of Nothing -> (jsonBuffer s, False) (Just (j, e)) -> (Just j, e) + i = case jsonBuffer s of Nothing -> Nothing Just b -> Just (b, False) @@ -56,11 +62,14 @@ bufferJSON jsonbuilder s = case outputType s of -- Immediately output JSON. outputJSON :: JSONBuilder -> MessageState -> Annex Bool outputJSON jsonbuilder s = case outputType s of - JSONOutput _ -> do - maybe noop (liftIO . flushed . JSON.emit) + JSONOutput _ -> go (flushed . JSON.emit) + SerializedOutput -> go (outputSerialized . JSONObject . JSON.encode) + _ -> return False + where + go emitter = do + maybe noop (liftIO . emitter) (fst <$> jsonbuilder Nothing) return True - _ -> return False outputError :: String -> Annex () outputError msg = withMessageState $ \s -> case (outputType s, jsonBuffer s) of diff --git a/Messages/JSON.hs b/Messages/JSON.hs index 5c4726b2b2..4a6419b622 100644 --- a/Messages/JSON.hs +++ b/Messages/JSON.hs @@ -11,6 +11,7 @@ module Messages.JSON ( JSONBuilder, JSONChunk(..), emit, + encode, none, start, end, @@ -38,7 +39,6 @@ import Data.Maybe import Data.Monoid import Prelude -import Types.Messages import Types.Command (SeekInput(..)) import Key import Utility.Metered @@ -82,12 +82,10 @@ end :: Bool -> JSONBuilder end b (Just (o, _)) = Just (HM.insert "success" (toJSON' b) o, True) end _ Nothing = Nothing -finalize :: JSONOptions -> Object -> Object -finalize jsonoptions o - -- Always include error-messages field, even if empty, - -- to make the json be self-documenting. - | jsonErrorMessages jsonoptions = addErrorMessage [] o - | otherwise = o +-- Always include error-messages field, even if empty, +-- to make the json be self-documenting. +finalize :: Object -> Object +finalize o = addErrorMessage [] o addErrorMessage :: [String] -> Object -> Object addErrorMessage msg o = diff --git a/Types/Messages.hs b/Types/Messages.hs index 22346120c8..273fd713b2 100644 --- a/Types/Messages.hs +++ b/Types/Messages.hs @@ -12,6 +12,8 @@ import Utility.Metered import Control.Concurrent import System.Console.Regions (ConsoleRegion) +import qualified Data.ByteString as S +import qualified Data.ByteString.Lazy as L data OutputType = NormalOutput @@ -60,7 +62,8 @@ newMessageState = do } data SerializedOutput - = OutputMessage String + = OutputMessage S.ByteString | OutputError String | ProgressMeter (Maybe Integer) MeterState MeterState + | JSONObject L.ByteString deriving (Show, Read) From 82dbc4387ccb3bec27788d920c94145024f928f0 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Thu, 3 Dec 2020 14:57:22 -0400 Subject: [PATCH 03/20] comments --- Types/Messages.hs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Types/Messages.hs b/Types/Messages.hs index 273fd713b2..b61eec8cc9 100644 --- a/Types/Messages.hs +++ b/Types/Messages.hs @@ -61,9 +61,14 @@ newMessageState = do , promptLock = promptlock } +-- | When communicating with a child process over a pipe while it is +-- performing some action, this is used to pass back output that the child +-- would normally display to the console. data SerializedOutput = OutputMessage S.ByteString | OutputError String | ProgressMeter (Maybe Integer) MeterState MeterState | JSONObject L.ByteString + -- ^ This is always sent, it's up to the consumer to decide if it + -- wants to display JSON, or human-readable messages. deriving (Show, Read) From cad147cbbf26ece5c85b84237919b352acd5cdf8 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Thu, 3 Dec 2020 16:21:20 -0400 Subject: [PATCH 04/20] new protocol for transferkeys, with message serialization Necessarily threw out the old protocol, so if an old git-annex assistant is running, and starts a transferkeys from the new git-annex, it would fail. But, that seems unlikely; the assistant starts up transferkeys processes and then keeps them running. Still, may need to test that scenario. The new protocol is simple read/show and looks like this: TransferRequest Download (Right "origin") (Key {keyName = "f8f8766a836fb6120abf4d5328ce8761404e437529e997aaa0363bdd4fecd7bb", keyVariety = SHA2Key (HashSize 256) (HasExt True), keySize = Just 30, keyMtime = Nothing, keyChunkSize = Nothing, keyChunkNum = Nothing}) (AssociatedFile (Just "foo")) TransferOutput (ProgressMeter (Just 30) (MeterState {meterBytesProcessed = BytesProcessed 0, meterTimeStamp = 1.6070268727892535e9}) (MeterState {meterBytesProcessed = BytesProcessed 30, meterTimeStamp = 1.6070268728043e9})) TransferOutput (OutputMessage "(checksum...) ") TransferResult True Granted, this is not optimally fast, but it seems good enough, and is probably nearly as fast as the old protocol anyhow. emitSerializedOutput for ProgressMeter is not yet implemented. It needs to somehow start or update a progress meter. There may need to be a new message that allocates a progress meter, and then have ProgressMeter update it. This commit was sponsored by Ethan Aubin --- Assistant/TransferSlots.hs | 2 +- Assistant/TransferrerPool.hs | 4 +- Command/TransferKeys.hs | 130 +++++++++++++++----------------- Messages.hs | 2 +- Messages/Internal.hs | 31 ++++++-- Messages/JSON.hs | 8 +- Messages/Progress.hs | 4 +- Types/Key.hs | 3 +- Types/Messages.hs | 3 +- doc/git-annex-transferkeys.mdwn | 7 +- 10 files changed, 98 insertions(+), 96 deletions(-) diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs index 5b555548e7..12abd10b5d 100644 --- a/Assistant/TransferSlots.hs +++ b/Assistant/TransferSlots.hs @@ -155,7 +155,7 @@ genTransfer t info = case transferRemote info of - usual cleanup. However, first check if something else is - running the transfer, to avoid removing active transfers. -} - go remote transferrer = ifM (liftIO $ performTransfer transferrer t info) + go remote transferrer = ifM (liftAnnex $ performTransfer transferrer t info) ( do case associatedFile info of AssociatedFile Nothing -> noop diff --git a/Assistant/TransferrerPool.hs b/Assistant/TransferrerPool.hs index edf60c6ff3..0e3ee71734 100644 --- a/Assistant/TransferrerPool.hs +++ b/Assistant/TransferrerPool.hs @@ -55,9 +55,9 @@ checkTransferrerPoolItem program batchmaker i = case i of {- Requests that a Transferrer perform a Transfer, and waits for it to - finish. -} -performTransfer :: Transferrer -> Transfer -> TransferInfo -> IO Bool +performTransfer :: Transferrer -> Transfer -> TransferInfo -> Annex Bool performTransfer transferrer t info = catchBoolIO $ do - T.sendRequest t info (transferrerWrite transferrer) + (liftIO $ T.sendRequest t info (transferrerWrite transferrer)) T.readResponse (transferrerRead transferrer) {- Starts a new git-annex transferkeys process, setting up handles diff --git a/Command/TransferKeys.hs b/Command/TransferKeys.hs index f0ac31be97..36db8ce18b 100644 --- a/Command/TransferKeys.hs +++ b/Command/TransferKeys.hs @@ -1,15 +1,14 @@ -{- git-annex command, used internally by assistant +{- git-annex command - - - Copyright 2012, 2013 Joey Hess + - Copyright 2012-2020 Joey Hess - - Licensed under the GNU AGPL version 3 or higher. -} -{-# LANGUAGE TypeSynonymInstances, FlexibleInstances #-} - module Command.TransferKeys where import Command +import qualified Annex import Annex.Content import Logs.Location import Annex.Transfer @@ -18,8 +17,19 @@ import Utility.SimpleProtocol (dupIoHandles) import Git.Types (RemoteName) import qualified Database.Keys import Annex.BranchState +import Types.Messages +import Types.Key +import Messages.Internal -data TransferRequest = TransferRequest Direction Remote Key AssociatedFile +import Text.Read (readMaybe) + +data TransferRequest = TransferRequest Direction (Either UUID RemoteName) KeyData AssociatedFile + deriving (Show, Read) + +data TransferResponse + = TransferOutput SerializedOutput + | TransferResult Bool + deriving (Show, Read) cmd :: Command cmd = command "transferkeys" SectionPlumbing "transfers keys" @@ -32,10 +42,12 @@ start :: CommandStart start = do enableInteractiveBranchAccess (readh, writeh) <- liftIO dupIoHandles + Annex.setOutput $ SerializedOutput $ + hPutStrLn writeh . show . TransferOutput runRequests readh writeh runner stop where - runner (TransferRequest direction remote key file) + runner (TransferRequest direction _ keydata file) remote | direction == Upload = notifyTransfer direction file $ upload (Remote.uuid remote) key file stdRetry $ \p -> do tryNonAsync (Remote.storeKey remote key file p) >>= \case @@ -58,82 +70,58 @@ start = do -- not old cached data. Database.Keys.closeDb return r + where + key = mkKey (const keydata) runRequests :: Handle -> Handle - -> (TransferRequest -> Annex Bool) + -> (TransferRequest -> Remote -> Annex Bool) -> Annex () -runRequests readh writeh a = do - liftIO $ hSetBuffering readh NoBuffering - go =<< readrequests +runRequests readh writeh a = go Nothing Nothing where - go (d:rn:k:f:rest) = do - case (deserialize d, deserialize rn, deserialize k, deserialize f) of - (Just direction, Just remotename, Just key, Just file) -> do - mremote <- Remote.byName' remotename + go lastremoteoruuid lastremote = unlessM (liftIO $ hIsEOF readh) $ do + l <- liftIO $ hGetLine readh + case readMaybe l of + Just tr@(TransferRequest _ remoteoruuid _ _) -> do + -- Often the same remote will be used + -- repeatedly, so cache the last one to + -- avoid looking up repeatedly. + mremote <- if lastremoteoruuid == Just remoteoruuid + then pure lastremote + else eitherToMaybe <$> Remote.byName' + (either fromUUID id remoteoruuid) case mremote of - Left _ -> sendresult False - Right remote -> sendresult =<< a - (TransferRequest direction remote key file) - _ -> sendresult False - go rest - go [] = noop - go [""] = noop - go v = error $ "transferkeys protocol error: " ++ show v + Just remote -> do + sendresult =<< a tr remote + go (Just remoteoruuid) mremote + Nothing -> protocolError l + Nothing -> protocolError l - readrequests = liftIO $ split fieldSep <$> hGetContents readh sendresult b = liftIO $ do - hPutStrLn writeh $ serialize b + hPutStrLn writeh $ show $ TransferResult b hFlush writeh sendRequest :: Transfer -> TransferInfo -> Handle -> IO () -sendRequest t tinfo h = do - hPutStr h $ intercalate fieldSep - [ serialize (transferDirection t) - , maybe (serialize ((fromUUID (transferUUID t)) :: String)) - (serialize . Remote.name) - (transferRemote tinfo) - , serialize (transferKey t) - , serialize (associatedFile tinfo) - , "" -- adds a trailing null - ] - hFlush h +sendRequest t tinfo h = hPutStrLn h $ show $ TransferRequest + (transferDirection t) + (maybe (Left (transferUUID t)) (Right . Remote.name) (transferRemote tinfo)) + (keyData (transferKey t)) + (associatedFile tinfo) -readResponse :: Handle -> IO Bool -readResponse h = fromMaybe False . deserialize <$> hGetLine h +-- | Read a response from this command. +-- +-- Each TransferOutput line that is read before the final TransferResult +-- will be output. +readResponse :: Handle -> Annex Bool +readResponse h = do + l <- liftIO $ hGetLine h + case readMaybe l of + Just (TransferOutput so) -> do + emitSerializedOutput so + readResponse h + Just (TransferResult r) -> return r + Nothing -> protocolError l -fieldSep :: String -fieldSep = "\0" - -class TCSerialized a where - serialize :: a -> String - deserialize :: String -> Maybe a - -instance TCSerialized Bool where - serialize True = "1" - serialize False = "0" - deserialize "1" = Just True - deserialize "0" = Just False - deserialize _ = Nothing - -instance TCSerialized Direction where - serialize Upload = "u" - serialize Download = "d" - deserialize "u" = Just Upload - deserialize "d" = Just Download - deserialize _ = Nothing - -instance TCSerialized AssociatedFile where - serialize (AssociatedFile (Just f)) = fromRawFilePath f - serialize (AssociatedFile Nothing) = "" - deserialize "" = Just (AssociatedFile Nothing) - deserialize f = Just (AssociatedFile (Just (toRawFilePath f))) - -instance TCSerialized RemoteName where - serialize n = n - deserialize n = Just n - -instance TCSerialized Key where - serialize = serializeKey - deserialize = deserializeKey +protocolError :: String -> a +protocolError l = error $ "transferkeys protocol error: " ++ show l diff --git a/Messages.hs b/Messages.hs index 2940b3f539..87911376e8 100644 --- a/Messages.hs +++ b/Messages.hs @@ -288,7 +288,7 @@ commandProgressDisabled = withMessageState $ \s -> return $ NormalOutput -> concurrentOutputEnabled s QuietOutput -> True JSONOutput _ -> True - SerializedOutput -> True + SerializedOutput _ -> True jsonOutputEnabled :: Annex Bool jsonOutputEnabled = withMessageState $ \s -> return $ diff --git a/Messages/Internal.hs b/Messages/Internal.hs index b2ae380699..3104a10a49 100644 --- a/Messages/Internal.hs +++ b/Messages/Internal.hs @@ -29,15 +29,15 @@ outputMessage' jsonoutputter jsonbuilder msg = withMessageState $ \s -> case out | otherwise -> liftIO $ flushed $ S.putStr msg JSONOutput _ -> void $ jsonoutputter jsonbuilder s QuietOutput -> q - SerializedOutput -> do - liftIO $ outputSerialized $ OutputMessage msg + SerializedOutput h -> do + liftIO $ outputSerialized h $ OutputMessage msg void $ jsonoutputter jsonbuilder s -- Buffer changes to JSON until end is reached and then emit it. bufferJSON :: JSONBuilder -> MessageState -> Annex Bool bufferJSON jsonbuilder s = case outputType s of JSONOutput _ -> go (flushed . JSON.emit) - SerializedOutput -> go (outputSerialized . JSONObject . JSON.encode) + SerializedOutput h -> go (outputSerialized h . JSONObject . JSON.encode) _ -> return False where go emitter @@ -63,7 +63,7 @@ bufferJSON jsonbuilder s = case outputType s of outputJSON :: JSONBuilder -> MessageState -> Annex Bool outputJSON jsonbuilder s = case outputType s of JSONOutput _ -> go (flushed . JSON.emit) - SerializedOutput -> go (outputSerialized . JSONObject . JSON.encode) + SerializedOutput h -> go (outputSerialized h . JSONObject . JSON.encode) _ -> return False where go emitter = do @@ -77,8 +77,8 @@ outputError msg = withMessageState $ \s -> case (outputType s, jsonBuffer s) of let jb' = Just (JSON.addErrorMessage (lines msg) jb) in Annex.changeState $ \st -> st { Annex.output = s { jsonBuffer = jb' } } - (SerializedOutput, _) -> - liftIO $ outputSerialized $ OutputError msg + (SerializedOutput h, _) -> + liftIO $ outputSerialized h $ OutputError msg _ | concurrentOutputEnabled s -> concurrentMessage s True msg go | otherwise -> go @@ -94,5 +94,20 @@ q = noop flushed :: IO () -> IO () flushed a = a >> hFlush stdout -outputSerialized :: SerializedOutput -> IO () -outputSerialized = print +outputSerialized :: (SerializedOutput -> IO ()) -> SerializedOutput -> IO () +outputSerialized = id + +emitSerializedOutput :: SerializedOutput -> Annex () +emitSerializedOutput (OutputMessage msg) = + outputMessage' nojsonoutputter nojsonbuilder msg + where + nojsonoutputter _ _ = return False + nojsonbuilder = id +emitSerializedOutput (OutputError msg) = outputError msg +emitSerializedOutput (ProgressMeter sz old new) = undefined -- TODO +emitSerializedOutput (JSONObject b) = + withMessageState $ \s -> case outputType s of + JSONOutput _ -> liftIO $ flushed $ JSON.emit' b + SerializedOutput h -> liftIO $ + outputSerialized h $ JSONObject b + _ -> q diff --git a/Messages/JSON.hs b/Messages/JSON.hs index 4a6419b622..d423f62e9d 100644 --- a/Messages/JSON.hs +++ b/Messages/JSON.hs @@ -11,6 +11,7 @@ module Messages.JSON ( JSONBuilder, JSONChunk(..), emit, + emit', encode, none, start, @@ -52,9 +53,12 @@ emitLock :: MVar () emitLock = unsafePerformIO $ newMVar () emit :: Object -> IO () -emit o = do +emit = emit' . encode + +emit' :: L.ByteString -> IO () +emit' b = do takeMVar emitLock - L.hPut stdout (encode o) + L.hPut stdout b putStr "\n" putMVar emitLock () diff --git a/Messages/Progress.hs b/Messages/Progress.hs index 9dec9a9ca9..193bcb0f7b 100644 --- a/Messages/Progress.hs +++ b/Messages/Progress.hs @@ -95,9 +95,9 @@ metered othermeter sizer a = withMessageState $ \st -> updateMeter meter a meter (combinemeter m) | otherwise = nometer - go msize (MessageState { outputType = SerializedOutput }) = do + go msize (MessageState { outputType = SerializedOutput h }) = do meter <- liftIO $ mkMeter msize $ \_ msize' old new -> - outputSerialized $ ProgressMeter msize' old new + outputSerialized h $ ProgressMeter msize' old new m <- liftIO $ rateLimitMeterUpdate minratelimit meter $ updateMeter meter a meter (combinemeter m) diff --git a/Types/Key.hs b/Types/Key.hs index 7fd702046e..b8dd77c59d 100644 --- a/Types/Key.hs +++ b/Types/Key.hs @@ -11,6 +11,7 @@ module Types.Key ( KeyData(..), Key, fromKey, + keyData, mkKey, alterKey, isKeyPrefix, @@ -201,7 +202,7 @@ splitKeyNameExtension' keyname = S8.span (/= '.') keyname {- A filename may be associated with a Key. -} newtype AssociatedFile = AssociatedFile (Maybe RawFilePath) - deriving (Show, Eq, Ord) + deriving (Show, Read, Eq, Ord) {- There are several different varieties of keys. -} data KeyVariety diff --git a/Types/Messages.hs b/Types/Messages.hs index b61eec8cc9..d6339749c4 100644 --- a/Types/Messages.hs +++ b/Types/Messages.hs @@ -19,8 +19,7 @@ data OutputType = NormalOutput | QuietOutput | JSONOutput JSONOptions - | SerializedOutput - deriving (Show) + | SerializedOutput (SerializedOutput -> IO ()) data JSONOptions = JSONOptions { jsonProgress :: Bool diff --git a/doc/git-annex-transferkeys.mdwn b/doc/git-annex-transferkeys.mdwn index f726b9262c..b1f983d5b8 100644 --- a/doc/git-annex-transferkeys.mdwn +++ b/doc/git-annex-transferkeys.mdwn @@ -8,20 +8,15 @@ git annex transferkeys # DESCRIPTION -This plumbing-level command is used by the assistant to transfer data. +This plumbing-level command is used to transfer data. It is a long-running process, which is fed instructions about the keys to transfer using an internal stdio protocol, which is intentionally not documented (as it may change at any time). -It's normal to have a transferkeys process running when the assistant is -running. - # SEE ALSO [[git-annex]](1) -[[git-annex-assistant]](1) - # AUTHOR Joey Hess From 7a9b618d5d554a04b576964bccb1ba0e440e1204 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Fri, 4 Dec 2020 12:20:04 -0400 Subject: [PATCH 05/20] fix problem with last commit and assistant liftAnnex blocks all others calls, so avoid using it with a long-duration call to readResponse. --- Assistant/TransferSlots.hs | 2 +- Assistant/TransferrerPool.hs | 11 +++++++++-- Command/TransferKeys.hs | 14 ++++++-------- Messages.hs | 1 + 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs index 12abd10b5d..59066ee69c 100644 --- a/Assistant/TransferSlots.hs +++ b/Assistant/TransferSlots.hs @@ -155,7 +155,7 @@ genTransfer t info = case transferRemote info of - usual cleanup. However, first check if something else is - running the transfer, to avoid removing active transfers. -} - go remote transferrer = ifM (liftAnnex $ performTransfer transferrer t info) + go remote transferrer = ifM (performTransfer transferrer t info) ( do case associatedFile info of AssociatedFile Nothing -> noop diff --git a/Assistant/TransferrerPool.hs b/Assistant/TransferrerPool.hs index 0e3ee71734..da66a2dc20 100644 --- a/Assistant/TransferrerPool.hs +++ b/Assistant/TransferrerPool.hs @@ -55,10 +55,17 @@ checkTransferrerPoolItem program batchmaker i = case i of {- Requests that a Transferrer perform a Transfer, and waits for it to - finish. -} -performTransfer :: Transferrer -> Transfer -> TransferInfo -> Annex Bool +performTransfer :: Transferrer -> Transfer -> TransferInfo -> Assistant Bool performTransfer transferrer t info = catchBoolIO $ do (liftIO $ T.sendRequest t info (transferrerWrite transferrer)) - T.readResponse (transferrerRead transferrer) + readresponse + where + readresponse = + liftIO (T.readResponse (transferrerRead transferrer)) >>= \case + Right r -> return r + Left so -> do + liftAnnex $ emitSerializedOutput so + readresponse {- Starts a new git-annex transferkeys process, setting up handles - that will be used to communicate with it. -} diff --git a/Command/TransferKeys.hs b/Command/TransferKeys.hs index 36db8ce18b..f5ccbe9492 100644 --- a/Command/TransferKeys.hs +++ b/Command/TransferKeys.hs @@ -19,7 +19,6 @@ import qualified Database.Keys import Annex.BranchState import Types.Messages import Types.Key -import Messages.Internal import Text.Read (readMaybe) @@ -102,6 +101,7 @@ runRequests readh writeh a = go Nothing Nothing hPutStrLn writeh $ show $ TransferResult b hFlush writeh +-- FIXME this is bad when used with inAnnex sendRequest :: Transfer -> TransferInfo -> Handle -> IO () sendRequest t tinfo h = hPutStrLn h $ show $ TransferRequest (transferDirection t) @@ -111,16 +111,14 @@ sendRequest t tinfo h = hPutStrLn h $ show $ TransferRequest -- | Read a response from this command. -- --- Each TransferOutput line that is read before the final TransferResult --- will be output. -readResponse :: Handle -> Annex Bool +-- Before the final response, this will return whatever SerializedOutput +-- should be displayed as the transfer is performed. +readResponse :: Handle -> IO (Either SerializedOutput Bool) readResponse h = do l <- liftIO $ hGetLine h case readMaybe l of - Just (TransferOutput so) -> do - emitSerializedOutput so - readResponse h - Just (TransferResult r) -> return r + Just (TransferOutput so) -> return (Left so) + Just (TransferResult r) -> return (Right r) Nothing -> protocolError l protocolError :: String -> a diff --git a/Messages.hs b/Messages.hs index 87911376e8..f68b5f3da0 100644 --- a/Messages.hs +++ b/Messages.hs @@ -50,6 +50,7 @@ module Messages ( withMessageState, prompt, mkPrompter, + emitSerializedOutput, ) where import System.Log.Logger From 4efecaebd65eab2e5aa1d4f9704fa9b59fe967e4 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Fri, 4 Dec 2020 13:07:30 -0400 Subject: [PATCH 06/20] generalize to allow running in Assistant monad --- Messages/Concurrent.hs | 10 +++++++--- Messages/Progress.hs | 30 ++++++++++++++++++++++++------ 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/Messages/Concurrent.hs b/Messages/Concurrent.hs index 94554aff5d..5e4138416e 100644 --- a/Messages/Concurrent.hs +++ b/Messages/Concurrent.hs @@ -98,10 +98,14 @@ inOwnConsoleRegion s a Regions.closeConsoleRegion r {- The progress region is displayed inline with the current console region. -} -withProgressRegion :: (Regions.ConsoleRegion -> Annex a) -> Annex a -withProgressRegion a = do - parent <- consoleRegion <$> Annex.getState Annex.output +withProgressRegion + :: (MonadIO m, MonadMask m) + => MessageState + -> (Regions.ConsoleRegion -> m a) -> m a +withProgressRegion st a = Regions.withConsoleRegion (maybe Regions.Linear Regions.InLine parent) a + where + parent = consoleRegion st instance Regions.LiftRegion Annex where liftRegion = liftIO . atomically diff --git a/Messages/Progress.hs b/Messages/Progress.hs index 193bcb0f7b..5767e2e8fe 100644 --- a/Messages/Progress.hs +++ b/Messages/Progress.hs @@ -24,6 +24,7 @@ import Messages.Internal import qualified System.Console.Regions as Regions import qualified System.Console.Concurrent as Console +import Control.Monad.IO.Class (MonadIO) {- Class of things from which a size can be gotten to display a progress - meter. -} @@ -64,13 +65,30 @@ instance MeterSize KeySizer where {- Shows a progress meter while performing an action. - The action is passed the meter and a callback to use to update the meter. --} -metered :: MeterSize sizer => Maybe MeterUpdate -> sizer -> (Meter -> MeterUpdate -> Annex a) -> Annex a -metered othermeter sizer a = withMessageState $ \st -> - flip go st =<< getMeterSize sizer +metered + :: MeterSize sizer + => Maybe MeterUpdate + -> sizer + -> (Meter -> MeterUpdate -> Annex a) + -> Annex a +metered othermeter sizer a = withMessageState $ \st -> do + sz <- getMeterSize sizer + metered' st othermeter sz showOutput a + +metered' + :: (Monad m, MonadIO m, MonadMask m) + => MessageState + -> Maybe MeterUpdate + -> Maybe FileSize + -> m () + -- ^ this should run showOutput + -> (Meter -> MeterUpdate -> m a) + -> m a +metered' st othermeter size showoutput a = go size st where go _ (MessageState { outputType = QuietOutput }) = nometer go msize (MessageState { outputType = NormalOutput, concurrentOutputEnabled = False }) = do - showOutput + showoutput meter <- liftIO $ mkMeter msize $ displayMeterHandle stdout bandwidthMeter m <- liftIO $ rateLimitMeterUpdate consoleratelimit meter $ @@ -79,7 +97,7 @@ metered othermeter sizer a = withMessageState $ \st -> liftIO $ clearMeterHandle meter stdout return r go msize (MessageState { outputType = NormalOutput, concurrentOutputEnabled = True }) = - withProgressRegion $ \r -> do + withProgressRegion st $ \r -> do meter <- liftIO $ mkMeter msize $ \_ msize' old new -> let s = bandwidthMeter msize' old new in Regions.setConsoleRegion r ('\n' : s) @@ -88,7 +106,7 @@ metered othermeter sizer a = withMessageState $ \st -> a meter (combinemeter m) go msize (MessageState { outputType = JSONOutput jsonoptions }) | jsonProgress jsonoptions = do - buf <- withMessageState $ return . jsonBuffer + let buf = jsonBuffer st meter <- liftIO $ mkMeter msize $ \_ msize' _old new -> JSON.progress buf msize' (meterBytesProcessed new) m <- liftIO $ rateLimitMeterUpdate jsonratelimit meter $ From 31e417f351039460ab33a53fe03f4b134f0a4b58 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Fri, 4 Dec 2020 13:50:03 -0400 Subject: [PATCH 07/20] finish message serialization of progress meters Any given transfer can only display 1 progress meter at a time, or so this code assumes. In some cases, there are progress meters for different stages of a transfer, perhaps, and that is supported by this. This commit was sponsored by Ethan Aubin. --- Assistant/TransferrerPool.hs | 12 ++---- Messages.hs | 1 - Messages/Internal.hs | 15 -------- Messages/Progress.hs | 19 ++++++---- Messages/Serialized.hs | 71 ++++++++++++++++++++++++++++++++++++ Types/Messages.hs | 5 ++- git-annex.cabal | 1 + 7 files changed, 91 insertions(+), 33 deletions(-) create mode 100644 Messages/Serialized.hs diff --git a/Assistant/TransferrerPool.hs b/Assistant/TransferrerPool.hs index da66a2dc20..8566fea2cb 100644 --- a/Assistant/TransferrerPool.hs +++ b/Assistant/TransferrerPool.hs @@ -11,6 +11,7 @@ import Assistant.Common import Assistant.Types.TransferrerPool import Types.Transfer import Utility.Batch +import Messages.Serialized import qualified Command.TransferKeys as T @@ -58,14 +59,9 @@ checkTransferrerPoolItem program batchmaker i = case i of performTransfer :: Transferrer -> Transfer -> TransferInfo -> Assistant Bool performTransfer transferrer t info = catchBoolIO $ do (liftIO $ T.sendRequest t info (transferrerWrite transferrer)) - readresponse - where - readresponse = - liftIO (T.readResponse (transferrerRead transferrer)) >>= \case - Right r -> return r - Left so -> do - liftAnnex $ emitSerializedOutput so - readresponse + relaySerializedOutput + (liftIO (T.readResponse (transferrerRead transferrer))) + liftAnnex {- Starts a new git-annex transferkeys process, setting up handles - that will be used to communicate with it. -} diff --git a/Messages.hs b/Messages.hs index f68b5f3da0..87911376e8 100644 --- a/Messages.hs +++ b/Messages.hs @@ -50,7 +50,6 @@ module Messages ( withMessageState, prompt, mkPrompter, - emitSerializedOutput, ) where import System.Log.Logger diff --git a/Messages/Internal.hs b/Messages/Internal.hs index 3104a10a49..b48df97373 100644 --- a/Messages/Internal.hs +++ b/Messages/Internal.hs @@ -96,18 +96,3 @@ flushed a = a >> hFlush stdout outputSerialized :: (SerializedOutput -> IO ()) -> SerializedOutput -> IO () outputSerialized = id - -emitSerializedOutput :: SerializedOutput -> Annex () -emitSerializedOutput (OutputMessage msg) = - outputMessage' nojsonoutputter nojsonbuilder msg - where - nojsonoutputter _ _ = return False - nojsonbuilder = id -emitSerializedOutput (OutputError msg) = outputError msg -emitSerializedOutput (ProgressMeter sz old new) = undefined -- TODO -emitSerializedOutput (JSONObject b) = - withMessageState $ \s -> case outputType s of - JSONOutput _ -> liftIO $ flushed $ JSON.emit' b - SerializedOutput h -> liftIO $ - outputSerialized h $ JSONObject b - _ -> q diff --git a/Messages/Progress.hs b/Messages/Progress.hs index 5767e2e8fe..c5bf0710c3 100644 --- a/Messages/Progress.hs +++ b/Messages/Progress.hs @@ -84,10 +84,10 @@ metered' -- ^ this should run showOutput -> (Meter -> MeterUpdate -> m a) -> m a -metered' st othermeter size showoutput a = go size st +metered' st othermeter msize showoutput a = go st where - go _ (MessageState { outputType = QuietOutput }) = nometer - go msize (MessageState { outputType = NormalOutput, concurrentOutputEnabled = False }) = do + go (MessageState { outputType = QuietOutput }) = nometer + go (MessageState { outputType = NormalOutput, concurrentOutputEnabled = False }) = do showoutput meter <- liftIO $ mkMeter msize $ displayMeterHandle stdout bandwidthMeter @@ -96,7 +96,7 @@ metered' st othermeter size showoutput a = go size st r <- a meter (combinemeter m) liftIO $ clearMeterHandle meter stdout return r - go msize (MessageState { outputType = NormalOutput, concurrentOutputEnabled = True }) = + go (MessageState { outputType = NormalOutput, concurrentOutputEnabled = True }) = withProgressRegion st $ \r -> do meter <- liftIO $ mkMeter msize $ \_ msize' old new -> let s = bandwidthMeter msize' old new @@ -104,7 +104,7 @@ metered' st othermeter size showoutput a = go size st m <- liftIO $ rateLimitMeterUpdate consoleratelimit meter $ updateMeter meter a meter (combinemeter m) - go msize (MessageState { outputType = JSONOutput jsonoptions }) + go (MessageState { outputType = JSONOutput jsonoptions }) | jsonProgress jsonoptions = do let buf = jsonBuffer st meter <- liftIO $ mkMeter msize $ \_ msize' _old new -> @@ -113,12 +113,15 @@ metered' st othermeter size showoutput a = go size st updateMeter meter a meter (combinemeter m) | otherwise = nometer - go msize (MessageState { outputType = SerializedOutput h }) = do - meter <- liftIO $ mkMeter msize $ \_ msize' old new -> - outputSerialized h $ ProgressMeter msize' old new + go (MessageState { outputType = SerializedOutput h }) = do + liftIO $ outputSerialized h $ StartProgressMeter msize + meter <- liftIO $ mkMeter msize $ \_ _ _old new -> + outputSerialized h $ UpdateProgressMeter $ + meterBytesProcessed new m <- liftIO $ rateLimitMeterUpdate minratelimit meter $ updateMeter meter a meter (combinemeter m) + `finally` (liftIO $ outputSerialized h EndProgressMeter) nometer = do dummymeter <- liftIO $ mkMeter Nothing $ \_ _ _ _ -> return () diff --git a/Messages/Serialized.hs b/Messages/Serialized.hs new file mode 100644 index 0000000000..6f14604bca --- /dev/null +++ b/Messages/Serialized.hs @@ -0,0 +1,71 @@ +{- serialized output + - + - Copyright 2020 Joey Hess + - + - Licensed under the GNU AGPL version 3 or higher. + -} + +{-# LANGUAGE RankNTypes #-} + +module Messages.Serialized (outputSerialized, relaySerializedOutput) where + +import Common +import Annex +import Types.Messages +import Messages +import Messages.Internal +import Messages.Progress +import qualified Messages.JSON as JSON + +import Control.Monad.IO.Class (MonadIO) + +relaySerializedOutput + :: (Monad m, MonadIO m, MonadMask m) + => m (Either SerializedOutput r) + -- ^ Get next serialized output, or final value to return. + -> (forall a. Annex a -> m a) + -- ^ Run an annex action in the monad. Will not be used with + -- actions that block for a long time. + -> m r +relaySerializedOutput getso runannex = go Nothing + where + go st = loop st >>= \case + Right r -> return r + Left st' -> go st' + + loop st = getso >>= \case + Right r -> return (Right r) + Left (OutputMessage msg) -> do + runannex $ outputMessage' + (\_ _ -> return False) + id + msg + loop st + Left (OutputError msg) -> do + runannex $ outputError msg + loop st + Left (JSONObject b) -> do + runannex $ withMessageState $ \s -> case outputType s of + JSONOutput _ -> liftIO $ flushed $ JSON.emit' b + SerializedOutput h -> liftIO $ + outputSerialized h $ JSONObject b + _ -> q + loop st + Left (StartProgressMeter sz) -> do + ost <- runannex (Annex.getState Annex.output) + -- Display a progress meter while running, until + -- the meter ends or a final value is returned. + metered' ost Nothing sz (runannex showOutput) + (\_meter meterupdate -> loop (Just meterupdate)) + >>= \case + Right r -> return (Right r) + -- Continue processing serialized + -- output after the progress meter + -- is done. + Left _st' -> loop Nothing + Left EndProgressMeter -> return (Left st) + Left (UpdateProgressMeter n) -> do + case st of + Just meterupdate -> liftIO $ meterupdate n + Nothing -> noop + loop st diff --git a/Types/Messages.hs b/Types/Messages.hs index d6339749c4..b1e5d4e8f0 100644 --- a/Types/Messages.hs +++ b/Types/Messages.hs @@ -9,6 +9,7 @@ module Types.Messages where import qualified Utility.Aeson as Aeson import Utility.Metered +import Utility.FileSize import Control.Concurrent import System.Console.Regions (ConsoleRegion) @@ -66,7 +67,9 @@ newMessageState = do data SerializedOutput = OutputMessage S.ByteString | OutputError String - | ProgressMeter (Maybe Integer) MeterState MeterState + | StartProgressMeter (Maybe FileSize) + | UpdateProgressMeter BytesProcessed + | EndProgressMeter | JSONObject L.ByteString -- ^ This is always sent, it's up to the consumer to decide if it -- wants to display JSON, or human-readable messages. diff --git a/git-annex.cabal b/git-annex.cabal index 8efb59f2ca..ec267cef1f 100644 --- a/git-annex.cabal +++ b/git-annex.cabal @@ -931,6 +931,7 @@ Executable git-annex Messages.Internal Messages.JSON Messages.Progress + Messages.Serialized P2P.Address P2P.Annex P2P.Auth From e5b170aa1c2af419d1234f65c2d7dba6b0e2195c Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Fri, 4 Dec 2020 13:54:33 -0400 Subject: [PATCH 08/20] switch back to POSIXTime turned out not to need Read MeterState --- Utility/Metered.hs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Utility/Metered.hs b/Utility/Metered.hs index 338bb0e6e0..b4e4024a67 100644 --- a/Utility/Metered.hs +++ b/Utility/Metered.hs @@ -382,8 +382,8 @@ data Meter = Meter (MVar (Maybe Integer)) (MVar MeterState) (MVar String) Displa data MeterState = MeterState { meterBytesProcessed :: BytesProcessed - , meterTimeStamp :: Double - } deriving (Show, Read) + , meterTimeStamp :: POSIXTime + } deriving (Show) type DisplayMeter = MVar String -> Maybe Integer -> MeterState -> MeterState -> IO () @@ -392,7 +392,7 @@ type RenderMeter = Maybe Integer -> MeterState -> MeterState -> String -- | Make a meter. Pass the total size, if it's known. mkMeter :: Maybe Integer -> DisplayMeter -> IO Meter mkMeter totalsize displaymeter = do - ts <- realToFrac <$> getPOSIXTime + ts <- getPOSIXTime Meter <$> newMVar totalsize <*> newMVar (MeterState zeroBytesProcessed ts) @@ -405,7 +405,7 @@ setMeterTotalSize (Meter totalsizev _ _ _) = void . swapMVar totalsizev . Just -- | Updates the meter, displaying it if necessary. updateMeter :: Meter -> MeterUpdate updateMeter (Meter totalsizev sv bv displaymeter) new = do - now <- realToFrac <$> getPOSIXTime + now <- getPOSIXTime let curms = MeterState new now oldms <- swapMVar sv curms when (meterBytesProcessed oldms /= new) $ do From 438d5be1f77e7c9d7ebe8af56db284ab9329ee01 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Fri, 4 Dec 2020 14:54:09 -0400 Subject: [PATCH 09/20] support prompt in message serialization That seems to be the last thing needed for message serialization. Although it's only used in the assistant currently, so hard to tell if I forgot something. At this point, it should be possible to start using transferkeys when performing transfers, which will allow killing a transferkeys process if a transfer times out or stalls. But that's for another day. This commit was sponsored by Ethan Aubin. --- Assistant/TransferrerPool.hs | 3 ++- Command/TransferKeys.hs | 10 ++++--- Messages.hs | 18 ++++++++++--- Messages/Internal.hs | 14 +++++++--- Messages/Progress.hs | 2 +- Messages/Serialized.hs | 27 ++++++++++++++++--- Types/Messages.hs | 10 ++++++- .../serialize_Messages_for_transferkeys.mdwn | 22 +++++++-------- 8 files changed, 77 insertions(+), 29 deletions(-) diff --git a/Assistant/TransferrerPool.hs b/Assistant/TransferrerPool.hs index 8566fea2cb..e29a4bf839 100644 --- a/Assistant/TransferrerPool.hs +++ b/Assistant/TransferrerPool.hs @@ -60,7 +60,8 @@ performTransfer :: Transferrer -> Transfer -> TransferInfo -> Assistant Bool performTransfer transferrer t info = catchBoolIO $ do (liftIO $ T.sendRequest t info (transferrerWrite transferrer)) relaySerializedOutput - (liftIO (T.readResponse (transferrerRead transferrer))) + (liftIO $ T.readResponse (transferrerRead transferrer)) + (liftIO . T.sendSerializedOutputResponse (transferrerWrite transferrer)) liftAnnex {- Starts a new git-annex transferkeys process, setting up handles diff --git a/Command/TransferKeys.hs b/Command/TransferKeys.hs index f5ccbe9492..be7b4be01d 100644 --- a/Command/TransferKeys.hs +++ b/Command/TransferKeys.hs @@ -41,8 +41,9 @@ start :: CommandStart start = do enableInteractiveBranchAccess (readh, writeh) <- liftIO dupIoHandles - Annex.setOutput $ SerializedOutput $ - hPutStrLn writeh . show . TransferOutput + Annex.setOutput $ SerializedOutput + (hPutStrLn writeh . show . TransferOutput) + (readMaybe <$> hGetLine readh) runRequests readh writeh runner stop where @@ -101,7 +102,7 @@ runRequests readh writeh a = go Nothing Nothing hPutStrLn writeh $ show $ TransferResult b hFlush writeh --- FIXME this is bad when used with inAnnex +-- | Send a request to this command to perform a transfer. sendRequest :: Transfer -> TransferInfo -> Handle -> IO () sendRequest t tinfo h = hPutStrLn h $ show $ TransferRequest (transferDirection t) @@ -109,6 +110,9 @@ sendRequest t tinfo h = hPutStrLn h $ show $ TransferRequest (keyData (transferKey t)) (associatedFile tinfo) +sendSerializedOutputResponse :: Handle -> SerializedOutputResponse -> IO () +sendSerializedOutputResponse h sor = hPutStrLn h $ show sor + -- | Read a response from this command. -- -- Before the final response, this will return whatever SerializedOutput diff --git a/Messages.hs b/Messages.hs index 87911376e8..2dcd2a3250 100644 --- a/Messages.hs +++ b/Messages.hs @@ -288,7 +288,7 @@ commandProgressDisabled = withMessageState $ \s -> return $ NormalOutput -> concurrentOutputEnabled s QuietOutput -> True JSONOutput _ -> True - SerializedOutput _ -> True + SerializedOutput _ _ -> True jsonOutputEnabled :: Annex Bool jsonOutputEnabled = withMessageState $ \s -> return $ @@ -314,8 +314,20 @@ mkPrompter = getConcurrency >>= \case where goconcurrent = withMessageState $ \s -> do let l = promptLock s + let (run, cleanup) = case outputType s of + SerializedOutput h hr -> + ( \a -> do + liftIO $ outputSerialized h StartPrompt + liftIO $ waitOutputSerializedResponse hr ReadyPrompt + a + , liftIO $ outputSerialized h EndPrompt + ) + _ -> + ( hideRegionsWhile s + , noop + ) return $ \a -> debugLocks $ bracketIO (takeMVar l) - (putMVar l) - (const $ hideRegionsWhile s a) + (\v -> putMVar l v >> cleanup) + (const $ run a) diff --git a/Messages/Internal.hs b/Messages/Internal.hs index b48df97373..138f7df2bd 100644 --- a/Messages/Internal.hs +++ b/Messages/Internal.hs @@ -29,7 +29,7 @@ outputMessage' jsonoutputter jsonbuilder msg = withMessageState $ \s -> case out | otherwise -> liftIO $ flushed $ S.putStr msg JSONOutput _ -> void $ jsonoutputter jsonbuilder s QuietOutput -> q - SerializedOutput h -> do + SerializedOutput h _ -> do liftIO $ outputSerialized h $ OutputMessage msg void $ jsonoutputter jsonbuilder s @@ -37,7 +37,7 @@ outputMessage' jsonoutputter jsonbuilder msg = withMessageState $ \s -> case out bufferJSON :: JSONBuilder -> MessageState -> Annex Bool bufferJSON jsonbuilder s = case outputType s of JSONOutput _ -> go (flushed . JSON.emit) - SerializedOutput h -> go (outputSerialized h . JSONObject . JSON.encode) + SerializedOutput h _ -> go (outputSerialized h . JSONObject . JSON.encode) _ -> return False where go emitter @@ -63,7 +63,7 @@ bufferJSON jsonbuilder s = case outputType s of outputJSON :: JSONBuilder -> MessageState -> Annex Bool outputJSON jsonbuilder s = case outputType s of JSONOutput _ -> go (flushed . JSON.emit) - SerializedOutput h -> go (outputSerialized h . JSONObject . JSON.encode) + SerializedOutput h _ -> go (outputSerialized h . JSONObject . JSON.encode) _ -> return False where go emitter = do @@ -77,7 +77,7 @@ outputError msg = withMessageState $ \s -> case (outputType s, jsonBuffer s) of let jb' = Just (JSON.addErrorMessage (lines msg) jb) in Annex.changeState $ \st -> st { Annex.output = s { jsonBuffer = jb' } } - (SerializedOutput h, _) -> + (SerializedOutput h _, _) -> liftIO $ outputSerialized h $ OutputError msg _ | concurrentOutputEnabled s -> concurrentMessage s True msg go @@ -96,3 +96,9 @@ flushed a = a >> hFlush stdout outputSerialized :: (SerializedOutput -> IO ()) -> SerializedOutput -> IO () outputSerialized = id + +-- | Wait for the specified response. +waitOutputSerializedResponse :: (IO (Maybe SerializedOutputResponse)) -> SerializedOutputResponse -> IO () +waitOutputSerializedResponse getr r = tryIO getr >>= \case + Right (Just r') | r' == r -> return () + v -> error $ "serialized output protocol error; expected " ++ show r ++ " got " ++ show v diff --git a/Messages/Progress.hs b/Messages/Progress.hs index c5bf0710c3..8fd056e901 100644 --- a/Messages/Progress.hs +++ b/Messages/Progress.hs @@ -113,7 +113,7 @@ metered' st othermeter msize showoutput a = go st updateMeter meter a meter (combinemeter m) | otherwise = nometer - go (MessageState { outputType = SerializedOutput h }) = do + go (MessageState { outputType = SerializedOutput h _ }) = do liftIO $ outputSerialized h $ StartProgressMeter msize meter <- liftIO $ mkMeter msize $ \_ _ _old new -> outputSerialized h $ UpdateProgressMeter $ diff --git a/Messages/Serialized.hs b/Messages/Serialized.hs index 6f14604bca..b58d131ace 100644 --- a/Messages/Serialized.hs +++ b/Messages/Serialized.hs @@ -7,7 +7,11 @@ {-# LANGUAGE RankNTypes #-} -module Messages.Serialized (outputSerialized, relaySerializedOutput) where +module Messages.Serialized ( + relaySerializedOutput, + outputSerialized, + waitOutputSerializedResponse, +) where import Common import Annex @@ -19,15 +23,17 @@ import qualified Messages.JSON as JSON import Control.Monad.IO.Class (MonadIO) +-- | Relay serialized output from a child process to the console. relaySerializedOutput :: (Monad m, MonadIO m, MonadMask m) => m (Either SerializedOutput r) -- ^ Get next serialized output, or final value to return. + -> (SerializedOutputResponse -> m ()) -> (forall a. Annex a -> m a) -- ^ Run an annex action in the monad. Will not be used with -- actions that block for a long time. -> m r -relaySerializedOutput getso runannex = go Nothing +relaySerializedOutput getso sendsor runannex = go Nothing where go st = loop st >>= \case Right r -> return r @@ -47,7 +53,7 @@ relaySerializedOutput getso runannex = go Nothing Left (JSONObject b) -> do runannex $ withMessageState $ \s -> case outputType s of JSONOutput _ -> liftIO $ flushed $ JSON.emit' b - SerializedOutput h -> liftIO $ + SerializedOutput h _ -> liftIO $ outputSerialized h $ JSONObject b _ -> q loop st @@ -69,3 +75,18 @@ relaySerializedOutput getso runannex = go Nothing Just meterupdate -> liftIO $ meterupdate n Nothing -> noop loop st + Left StartPrompt -> do + prompter <- runannex mkPrompter + v <- prompter $ do + sendsor ReadyPrompt + -- Continue processing serialized output + -- until EndPrompt or a final value is + -- returned. (EndPrompt is all that + -- ought to be sent while in a prompt + -- really, but if something else did get + -- sent, display it just in case.) + loop st + case v of + Right r -> return (Right r) + Left st' -> loop st' + Left EndPrompt -> return (Left st) diff --git a/Types/Messages.hs b/Types/Messages.hs index b1e5d4e8f0..80fc176c45 100644 --- a/Types/Messages.hs +++ b/Types/Messages.hs @@ -20,7 +20,9 @@ data OutputType = NormalOutput | QuietOutput | JSONOutput JSONOptions - | SerializedOutput (SerializedOutput -> IO ()) + | SerializedOutput + (SerializedOutput -> IO ()) + (IO (Maybe SerializedOutputResponse)) data JSONOptions = JSONOptions { jsonProgress :: Bool @@ -70,7 +72,13 @@ data SerializedOutput | StartProgressMeter (Maybe FileSize) | UpdateProgressMeter BytesProcessed | EndProgressMeter + | StartPrompt + | EndPrompt | JSONObject L.ByteString -- ^ This is always sent, it's up to the consumer to decide if it -- wants to display JSON, or human-readable messages. deriving (Show, Read) + +data SerializedOutputResponse + = ReadyPrompt + deriving (Eq, Show, Read) diff --git a/doc/todo/serialize_Messages_for_transferkeys.mdwn b/doc/todo/serialize_Messages_for_transferkeys.mdwn index e9f7205734..18a1aafcaa 100644 --- a/doc/todo/serialize_Messages_for_transferkeys.mdwn +++ b/doc/todo/serialize_Messages_for_transferkeys.mdwn @@ -25,9 +25,16 @@ A few notes on implementing that: outputs to stderr directly no matter the output type currently. It would need to be changed to support the new output type. (And probably should for concurrent output mode too actually!) + + > It's true, this is not concurrent output safe. However, that's already + > the case, and output to stderr doesn't affect the piping of serialized + > messages on stdout. So, punted on this. + * So does warningIO, though it's only used in a couple of remotes and rarely. It would be good to find a way to eliminate it. + > Eliminated except for one call in a non-relevant code path. + * Messages.prompt. Which is used by remotes, and would need to communicate over the pipe to the parent git-annex bidirectionally. Eg, send a message saying the parent needs to prepare for prompt, @@ -35,17 +42,6 @@ A few notes on implementing that: prompting is done. (Note that the parent would need to detect if the child process crashed to avoid being locked waiting for the prompt.) -* Messages.Internal.outputMessage is used by several things, and - includes some special parameters used in json mode. Since the parent - git-annex might itself have json mode enabled, those parameters will need - to be included in the serialization. But those parameters are currently - actually functions that manipulate the json object that will be outputted - later. So cannot be serialized. Uuuuh. + > Done. - Maybe the thing to do is, pass along the --json options to transferkeys, - and have a message type for json objects, which it uses to send them - to git-annex, which can then output them. outputMessage can handle the - new output type by sending the message through the pipe, and also - building any json object, and sending it through the pipe once it's done. - -> Started work on this in the message-serialization branch. --[[Joey]] +[[done]] From 72e5764a87a088e21e12bb51c52650ca965d2596 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 7 Dec 2020 12:50:48 -0400 Subject: [PATCH 10/20] move TransferrerPool from assistant This old code will now be useful for git-annex beyond the assistant. git-annex won't use the CheckTransferrer part, and won't run transferkeys as a batch process, and will want withTransferrer to not shut down transferkeys processes. Still, the rest of this is a good fit for what I need now. Also removed some dead code, and simplified a little bit. This commit was sponsored by Mark Reidenbach on Patreon. --- {Assistant => Annex}/TransferrerPool.hs | 57 ++++++++++++++----------- Assistant/TransferSlots.hs | 6 +-- Assistant/Types/TransferrerPool.hs | 53 +++-------------------- Types/TransferrerPool.hs | 56 ++++++++++++++++++++++++ Utility/Batch.hs | 4 ++ git-annex.cabal | 3 +- 6 files changed, 103 insertions(+), 76 deletions(-) rename {Assistant => Annex}/TransferrerPool.hs (66%) create mode 100644 Types/TransferrerPool.hs diff --git a/Assistant/TransferrerPool.hs b/Annex/TransferrerPool.hs similarity index 66% rename from Assistant/TransferrerPool.hs rename to Annex/TransferrerPool.hs index e29a4bf839..7edfecd49e 100644 --- a/Assistant/TransferrerPool.hs +++ b/Annex/TransferrerPool.hs @@ -1,42 +1,46 @@ {- A pool of "git-annex transferkeys" processes - - - Copyright 2013 Joey Hess + - Copyright 2013-2020 Joey Hess - - Licensed under the GNU AGPL version 3 or higher. -} -module Assistant.TransferrerPool where +{-# LANGUAGE RankNTypes #-} -import Assistant.Common -import Assistant.Types.TransferrerPool +module Annex.TransferrerPool where + +import Annex.Common +import Types.TransferrerPool import Types.Transfer import Utility.Batch import Messages.Serialized - import qualified Command.TransferKeys as T import Control.Concurrent.STM hiding (check) -import Control.Exception (throw) import Control.Concurrent +import Control.Monad.IO.Class (MonadIO) {- Runs an action with a Transferrer from the pool. - - - Only one Transferrer is left running in the pool at a time. - - So if this needed to start a new Transferrer, it's stopped when done. + - When minimizeprocesses is True, only one Transferrer is left running + - in the pool at a time. So if this needed to start a new Transferrer, + - it's stopped when done. Otherwise, idle processes are left in the pool + - for use later. -} -withTransferrer :: FilePath -> BatchCommandMaker -> TransferrerPool -> (Transferrer -> IO a) -> IO a -withTransferrer program batchmaker pool a = do +withTransferrer :: Bool -> FilePath -> BatchCommandMaker -> TransferrerPool -> (Transferrer -> IO a) -> IO a +withTransferrer minimizeprocesses program batchmaker pool a = do (mi, leftinpool) <- atomically (popTransferrerPool pool) i@(TransferrerPoolItem (Just t) check) <- case mi of Nothing -> mkTransferrerPoolItem pool =<< mkTransferrer program batchmaker Just i -> checkTransferrerPoolItem program batchmaker i - v <- tryNonAsync $ a t - if leftinpool == 0 - then atomically $ pushTransferrerPool pool i - else do + a t `finally` returntopool leftinpool check t i + where + returntopool leftinpool check t i + | not minimizeprocesses || leftinpool == 0 = + atomically $ pushTransferrerPool pool i + | otherwise = do void $ forkIO $ stopTransferrer t atomically $ pushTransferrerPool pool $ TransferrerPoolItem Nothing check - either throw return v {- Check if a Transferrer from the pool is still ok to be used. - If not, stop it and start a new one. -} @@ -56,13 +60,21 @@ checkTransferrerPoolItem program batchmaker i = case i of {- Requests that a Transferrer perform a Transfer, and waits for it to - finish. -} -performTransfer :: Transferrer -> Transfer -> TransferInfo -> Assistant Bool -performTransfer transferrer t info = catchBoolIO $ do +performTransfer + :: (Monad m, MonadIO m, MonadMask m) + => Transferrer + -> Transfer + -> TransferInfo + -> (forall a. Annex a -> m a) + -- ^ Run an annex action in the monad. Will not be used with + -- actions that block for a long time. + -> m Bool +performTransfer transferrer t info runannex = catchBoolIO $ do (liftIO $ T.sendRequest t info (transferrerWrite transferrer)) relaySerializedOutput (liftIO $ T.readResponse (transferrerRead transferrer)) (liftIO . T.sendSerializedOutputResponse (transferrerWrite transferrer)) - liftAnnex + runannex {- Starts a new git-annex transferkeys process, setting up handles - that will be used to communicate with it. -} @@ -84,13 +96,8 @@ mkTransferrer program batchmaker = do , transferrerHandle = pid } -{- Checks if a Transferrer is still running. If not, makes a new one. -} -checkTransferrer :: FilePath -> BatchCommandMaker -> Transferrer -> IO Transferrer -checkTransferrer program batchmaker t = - maybe (return t) (const $ mkTransferrer program batchmaker) - =<< getProcessExitCode (transferrerHandle t) - -{- Closing the fds will stop the transferrer. -} +{- Closing the fds will stop the transferrer, but only when it's in between + - transfers. -} stopTransferrer :: Transferrer -> IO () stopTransferrer t = do hClose $ transferrerRead t diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs index 59066ee69c..80aeaaca6b 100644 --- a/Assistant/TransferSlots.hs +++ b/Assistant/TransferSlots.hs @@ -13,7 +13,7 @@ import Assistant.Common import Utility.ThreadScheduler import Assistant.Types.TransferSlots import Assistant.DaemonStatus -import Assistant.TransferrerPool +import Annex.TransferrerPool import Assistant.Types.TransferrerPool import Assistant.Types.TransferQueue import Assistant.TransferQueue @@ -83,7 +83,7 @@ runTransferThread' :: FilePath -> BatchCommandMaker -> AssistantData -> (Transfe runTransferThread' program batchmaker d run = go where go = catchPauseResume $ - withTransferrer program batchmaker (transferrerPool d) + withTransferrer True program batchmaker (transferrerPool d) run pause = catchPauseResume $ runEvery (Seconds 86400) noop @@ -155,7 +155,7 @@ genTransfer t info = case transferRemote info of - usual cleanup. However, first check if something else is - running the transfer, to avoid removing active transfers. -} - go remote transferrer = ifM (performTransfer transferrer t info) + go remote transferrer = ifM (performTransfer transferrer t info liftAnnex) ( do case associatedFile info of AssociatedFile Nothing -> noop diff --git a/Assistant/Types/TransferrerPool.hs b/Assistant/Types/TransferrerPool.hs index 20ea2ce7c1..65c49d5140 100644 --- a/Assistant/Types/TransferrerPool.hs +++ b/Assistant/Types/TransferrerPool.hs @@ -5,57 +5,16 @@ - Licensed under the GNU AGPL version 3 or higher. -} -module Assistant.Types.TransferrerPool where +module Assistant.Types.TransferrerPool ( + module Types.TransferrerPool, + checkNetworkConnections, +) where -import Annex.Common +import Types.TransferrerPool import Utility.NotificationBroadcaster import Assistant.Types.DaemonStatus -import Control.Concurrent.STM hiding (check) - -type TransferrerPool = TVar (MkCheckTransferrer, [TransferrerPoolItem]) - -type CheckTransferrer = IO Bool -type MkCheckTransferrer = IO (IO Bool) - -{- Each item in the pool may have a transferrer running, and has an - - IO action that can be used to check if it's still ok to use the - - transferrer. -} -data TransferrerPoolItem = TransferrerPoolItem (Maybe Transferrer) CheckTransferrer - -data Transferrer = Transferrer - { transferrerRead :: Handle - , transferrerWrite :: Handle - , transferrerHandle :: ProcessHandle - } - -newTransferrerPool :: MkCheckTransferrer -> IO TransferrerPool -newTransferrerPool c = newTVarIO (c, []) - -popTransferrerPool :: TransferrerPool -> STM (Maybe TransferrerPoolItem, Int) -popTransferrerPool p = do - (c, l) <- readTVar p - case l of - [] -> return (Nothing, 0) - (i:is) -> do - writeTVar p (c, is) - return $ (Just i, length is) - -pushTransferrerPool :: TransferrerPool -> TransferrerPoolItem -> STM () -pushTransferrerPool p i = do - (c, l) <- readTVar p - let l' = i:l - writeTVar p (c, l') - -{- Note that making a CheckTransferrer may allocate resources, - - such as a NotificationHandle, so it's important that the returned - - TransferrerPoolItem is pushed into the pool, and not left to be - - garbage collected. -} -mkTransferrerPoolItem :: TransferrerPool -> Transferrer -> IO TransferrerPoolItem -mkTransferrerPoolItem p t = do - mkcheck <- atomically $ fst <$> readTVar p - check <- mkcheck - return $ TransferrerPoolItem (Just t) check +import Control.Concurrent.STM checkNetworkConnections :: DaemonStatusHandle -> MkCheckTransferrer checkNetworkConnections dstatushandle = do diff --git a/Types/TransferrerPool.hs b/Types/TransferrerPool.hs new file mode 100644 index 0000000000..e6019f27f0 --- /dev/null +++ b/Types/TransferrerPool.hs @@ -0,0 +1,56 @@ +{- A pool of "git-annex transferkeys" processes available for use + - + - Copyright 2013-2020 Joey Hess + - + - Licensed under the GNU AGPL version 3 or higher. + -} + +module Types.TransferrerPool where + +import Annex.Common + +import Control.Concurrent.STM hiding (check) + +type TransferrerPool = TVar (MkCheckTransferrer, [TransferrerPoolItem]) + +type CheckTransferrer = IO Bool +type MkCheckTransferrer = IO (IO Bool) + +{- Each item in the pool may have a transferrer running, and has an + - IO action that can be used to check if it's still ok to use the + - transferrer. -} +data TransferrerPoolItem = TransferrerPoolItem (Maybe Transferrer) CheckTransferrer + +data Transferrer = Transferrer + { transferrerRead :: Handle + , transferrerWrite :: Handle + , transferrerHandle :: ProcessHandle + } + +newTransferrerPool :: MkCheckTransferrer -> IO TransferrerPool +newTransferrerPool c = newTVarIO (c, []) + +popTransferrerPool :: TransferrerPool -> STM (Maybe TransferrerPoolItem, Int) +popTransferrerPool p = do + (c, l) <- readTVar p + case l of + [] -> return (Nothing, 0) + (i:is) -> do + writeTVar p (c, is) + return $ (Just i, length is) + +pushTransferrerPool :: TransferrerPool -> TransferrerPoolItem -> STM () +pushTransferrerPool p i = do + (c, l) <- readTVar p + let l' = i:l + writeTVar p (c, l') + +{- Note that making a CheckTransferrer may allocate resources, + - such as a NotificationHandle, so it's important that the returned + - TransferrerPoolItem is pushed into the pool, and not left to be + - garbage collected. -} +mkTransferrerPoolItem :: TransferrerPool -> Transferrer -> IO TransferrerPoolItem +mkTransferrerPoolItem p t = do + mkcheck <- atomically $ fst <$> readTVar p + check <- mkcheck + return $ TransferrerPoolItem (Just t) check diff --git a/Utility/Batch.hs b/Utility/Batch.hs index de66913ee1..58e326efae 100644 --- a/Utility/Batch.hs +++ b/Utility/Batch.hs @@ -10,6 +10,7 @@ module Utility.Batch ( batch, BatchCommandMaker, + nonBatchCommandMaker, getBatchCommandMaker, toBatchCommand, batchCommand, @@ -50,6 +51,9 @@ batch a = a - are available in the path. -} type BatchCommandMaker = (String, [CommandParam]) -> (String, [CommandParam]) +nonBatchCommandMaker :: BatchCommandMaker +nonBatchCommandMaker = id + getBatchCommandMaker :: IO BatchCommandMaker getBatchCommandMaker = do #ifndef mingw32_HOST_OS diff --git a/git-annex.cabal b/git-annex.cabal index ec267cef1f..b527b6b15c 100644 --- a/git-annex.cabal +++ b/git-annex.cabal @@ -479,7 +479,6 @@ Executable git-annex Assistant.Threads.Watcher Assistant.TransferQueue Assistant.TransferSlots - Assistant.TransferrerPool Assistant.Types.Alert Assistant.Types.BranchChange Assistant.Types.Changes @@ -666,6 +665,7 @@ Executable git-annex Annex.TaggedPush Annex.Tmp Annex.Transfer + Annex.TransferrerPool Annex.UntrustedFilePath Annex.UpdateInstead Annex.UUID @@ -1027,6 +1027,7 @@ Executable git-annex Types.StoreRetrieve Types.Test Types.Transfer + Types.TransferrerPool Types.TrustLevel Types.UUID Types.UrlContents From 47016fc6569fe5f27067f60302b42ef7cbc13371 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 7 Dec 2020 13:08:59 -0400 Subject: [PATCH 11/20] move TransferrerPool from Assistant state to Annex state This commit was sponsored by Graham Spencer on Patreon. --- Annex.hs | 4 ++++ Annex/TransferrerPool.hs | 6 +++--- Assistant/Monad.hs | 3 --- Assistant/TransferSlots.hs | 28 +++++++++++++++++++++------- Assistant/Types/TransferrerPool.hs | 23 ----------------------- Types/TransferrerPool.hs | 21 ++++++++++----------- git-annex.cabal | 1 - 7 files changed, 38 insertions(+), 48 deletions(-) delete mode 100644 Assistant/Types/TransferrerPool.hs diff --git a/Annex.hs b/Annex.hs index 748ad96009..ed2aaf011c 100644 --- a/Annex.hs +++ b/Annex.hs @@ -70,6 +70,7 @@ import Types.WorkerPool import Types.IndexFiles import Types.CatFileHandles import Types.RemoteConfig +import Types.TransferrerPool import qualified Database.Keys.Handle as Keys import Utility.InodeCache import Utility.Url @@ -156,6 +157,7 @@ data AnnexState = AnnexState , cachedgitenv :: Maybe (AltIndexFile, FilePath, [(String, String)]) , urloptions :: Maybe UrlOptions , insmudgecleanfilter :: Bool + , transferrerpool :: TransferrerPool } newState :: GitConfig -> Git.Repo -> IO AnnexState @@ -165,6 +167,7 @@ newState c r = do o <- newMessageState sc <- newTMVarIO False kh <- Keys.newDbHandle + tp <- newTransferrerPool return $ AnnexState { repo = r , repoadjustment = return @@ -217,6 +220,7 @@ newState c r = do , cachedgitenv = Nothing , urloptions = Nothing , insmudgecleanfilter = False + , transferrerpool = tp } {- Makes an Annex state object for the specified git repo. diff --git a/Annex/TransferrerPool.hs b/Annex/TransferrerPool.hs index 7edfecd49e..286c6f9be2 100644 --- a/Annex/TransferrerPool.hs +++ b/Annex/TransferrerPool.hs @@ -27,11 +27,11 @@ import Control.Monad.IO.Class (MonadIO) - it's stopped when done. Otherwise, idle processes are left in the pool - for use later. -} -withTransferrer :: Bool -> FilePath -> BatchCommandMaker -> TransferrerPool -> (Transferrer -> IO a) -> IO a -withTransferrer minimizeprocesses program batchmaker pool a = do +withTransferrer :: Bool -> MkCheckTransferrer -> FilePath -> BatchCommandMaker -> TransferrerPool -> (Transferrer -> IO a) -> IO a +withTransferrer minimizeprocesses mkcheck program batchmaker pool a = do (mi, leftinpool) <- atomically (popTransferrerPool pool) i@(TransferrerPoolItem (Just t) check) <- case mi of - Nothing -> mkTransferrerPoolItem pool =<< mkTransferrer program batchmaker + Nothing -> mkTransferrerPoolItem mkcheck =<< mkTransferrer program batchmaker Just i -> checkTransferrerPoolItem program batchmaker i a t `finally` returntopool leftinpool check t i where diff --git a/Assistant/Monad.hs b/Assistant/Monad.hs index fc45bd7991..ff79f5f173 100644 --- a/Assistant/Monad.hs +++ b/Assistant/Monad.hs @@ -35,7 +35,6 @@ import Assistant.Types.DaemonStatus import Assistant.Types.ScanRemotes import Assistant.Types.TransferQueue import Assistant.Types.TransferSlots -import Assistant.Types.TransferrerPool import Assistant.Types.Pushes import Assistant.Types.BranchChange import Assistant.Types.Commits @@ -65,7 +64,6 @@ data AssistantData = AssistantData , scanRemoteMap :: ScanRemoteMap , transferQueue :: TransferQueue , transferSlots :: TransferSlots - , transferrerPool :: TransferrerPool , failedPushMap :: FailedPushMap , failedExportMap :: FailedPushMap , commitChan :: CommitChan @@ -85,7 +83,6 @@ newAssistantData st dstatus = AssistantData <*> newScanRemoteMap <*> newTransferQueue <*> newTransferSlots - <*> newTransferrerPool (checkNetworkConnections dstatus) <*> newFailedPushMap <*> newFailedPushMap <*> newCommitChan diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs index 80aeaaca6b..89788ca9f0 100644 --- a/Assistant/TransferSlots.hs +++ b/Assistant/TransferSlots.hs @@ -1,6 +1,6 @@ {- git-annex assistant transfer slots - - - Copyright 2012 Joey Hess + - Copyright 2012-2020 Joey Hess - - Licensed under the GNU AGPL version 3 or higher. -} @@ -9,12 +9,15 @@ module Assistant.TransferSlots where +import Control.Concurrent.STM + import Assistant.Common import Utility.ThreadScheduler +import Utility.NotificationBroadcaster import Assistant.Types.TransferSlots import Assistant.DaemonStatus import Annex.TransferrerPool -import Assistant.Types.TransferrerPool +import Types.TransferrerPool import Assistant.Types.TransferQueue import Assistant.TransferQueue import Assistant.Alert @@ -25,6 +28,7 @@ import Types.Transfer import Logs.Transfer import Logs.Location import qualified Git +import qualified Annex import qualified Remote import qualified Types.Remote as Remote import Annex.Content @@ -75,15 +79,19 @@ runTransferThread :: FilePath -> BatchCommandMaker -> Maybe (Transfer, TransferI runTransferThread _ _ Nothing = flip MSemN.signal 1 <<~ transferSlots runTransferThread program batchmaker (Just (t, info, a)) = do d <- getAssistant id + mkcheck <- checkNetworkConnections + <$> getAssistant daemonStatusHandle aio <- asIO1 a - tid <- liftIO $ forkIO $ runTransferThread' program batchmaker d aio + tid <- liftIO $ forkIO $ runTransferThread' mkcheck program batchmaker d aio updateTransferInfo t $ info { transferTid = Just tid } -runTransferThread' :: FilePath -> BatchCommandMaker -> AssistantData -> (Transferrer -> IO ()) -> IO () -runTransferThread' program batchmaker d run = go +runTransferThread' :: MkCheckTransferrer -> FilePath -> BatchCommandMaker -> AssistantData -> (Transferrer -> IO ()) -> IO () +runTransferThread' mkcheck program batchmaker d run = go where - go = catchPauseResume $ - withTransferrer True program batchmaker (transferrerPool d) + go = catchPauseResume $ do + p <- runAssistant d $ liftAnnex $ + Annex.getState Annex.transferrerpool + withTransferrer True mkcheck program batchmaker p run pause = catchPauseResume $ runEvery (Seconds 86400) noop @@ -298,3 +306,9 @@ startTransfer t = do getCurrentTransfers :: Assistant TransferMap getCurrentTransfers = currentTransfers <$> getDaemonStatus + +checkNetworkConnections :: DaemonStatusHandle -> MkCheckTransferrer +checkNetworkConnections dstatushandle = do + dstatus <- atomically $ readTVar dstatushandle + h <- newNotificationHandle False (networkConnectedNotifier dstatus) + return $ not <$> checkNotification h diff --git a/Assistant/Types/TransferrerPool.hs b/Assistant/Types/TransferrerPool.hs deleted file mode 100644 index 65c49d5140..0000000000 --- a/Assistant/Types/TransferrerPool.hs +++ /dev/null @@ -1,23 +0,0 @@ -{- A pool of "git-annex transferkeys" processes available for use - - - - Copyright 2013 Joey Hess - - - - Licensed under the GNU AGPL version 3 or higher. - -} - -module Assistant.Types.TransferrerPool ( - module Types.TransferrerPool, - checkNetworkConnections, -) where - -import Types.TransferrerPool -import Utility.NotificationBroadcaster -import Assistant.Types.DaemonStatus - -import Control.Concurrent.STM - -checkNetworkConnections :: DaemonStatusHandle -> MkCheckTransferrer -checkNetworkConnections dstatushandle = do - dstatus <- atomically $ readTVar dstatushandle - h <- newNotificationHandle False (networkConnectedNotifier dstatus) - return $ not <$> checkNotification h diff --git a/Types/TransferrerPool.hs b/Types/TransferrerPool.hs index e6019f27f0..72d61a3b9a 100644 --- a/Types/TransferrerPool.hs +++ b/Types/TransferrerPool.hs @@ -7,11 +7,11 @@ module Types.TransferrerPool where -import Annex.Common +import Common import Control.Concurrent.STM hiding (check) -type TransferrerPool = TVar (MkCheckTransferrer, [TransferrerPoolItem]) +type TransferrerPool = TVar [TransferrerPoolItem] type CheckTransferrer = IO Bool type MkCheckTransferrer = IO (IO Bool) @@ -27,30 +27,29 @@ data Transferrer = Transferrer , transferrerHandle :: ProcessHandle } -newTransferrerPool :: MkCheckTransferrer -> IO TransferrerPool -newTransferrerPool c = newTVarIO (c, []) +newTransferrerPool :: IO TransferrerPool +newTransferrerPool = newTVarIO [] popTransferrerPool :: TransferrerPool -> STM (Maybe TransferrerPoolItem, Int) popTransferrerPool p = do - (c, l) <- readTVar p + l <- readTVar p case l of [] -> return (Nothing, 0) (i:is) -> do - writeTVar p (c, is) + writeTVar p is return $ (Just i, length is) pushTransferrerPool :: TransferrerPool -> TransferrerPoolItem -> STM () pushTransferrerPool p i = do - (c, l) <- readTVar p + l <- readTVar p let l' = i:l - writeTVar p (c, l') + writeTVar p l' {- Note that making a CheckTransferrer may allocate resources, - such as a NotificationHandle, so it's important that the returned - TransferrerPoolItem is pushed into the pool, and not left to be - garbage collected. -} -mkTransferrerPoolItem :: TransferrerPool -> Transferrer -> IO TransferrerPoolItem -mkTransferrerPoolItem p t = do - mkcheck <- atomically $ fst <$> readTVar p +mkTransferrerPoolItem :: MkCheckTransferrer -> Transferrer -> IO TransferrerPoolItem +mkTransferrerPoolItem mkcheck t = do check <- mkcheck return $ TransferrerPoolItem (Just t) check diff --git a/git-annex.cabal b/git-annex.cabal index b527b6b15c..43e3249347 100644 --- a/git-annex.cabal +++ b/git-annex.cabal @@ -494,7 +494,6 @@ Executable git-annex Assistant.Types.ThreadedMonad Assistant.Types.TransferQueue Assistant.Types.TransferSlots - Assistant.Types.TransferrerPool Assistant.Types.UrlRenderer Assistant.Unused Assistant.Upgrade From 4c475688763e459bb357d9ec8d519afda8366a37 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 7 Dec 2020 14:44:21 -0400 Subject: [PATCH 12/20] refactoring This is groundwork for using git-annex transferkeys to run transfers, in order to allow stalled transfers to be interrupted and retried. The new upload and download are closer to what git-annex transferkeys does, so the plan is to make them use it. Then things that were left using upload' and download' won't recover from stalls. Notably, that includes import and export. But at least get/move/copy will be able to. (Also the assistant hopefully, but not yet.) This commit was sponsored by Jake Vosloo on Patreon. --- Annex/Action.hs | 18 ++++++++++++++++++ Annex/Import.hs | 4 ++-- Annex/Transfer.hs | 30 +++++++++++++++++++++++------- Command/AddUrl.hs | 4 ++-- Command/Export.hs | 2 +- Command/Get.hs | 11 +++-------- Command/Move.hs | 9 +++------ Command/TransferKey.hs | 4 ++-- Command/TransferKeys.hs | 4 ++-- P2P/Annex.hs | 2 +- Remote.hs | 16 +--------------- 11 files changed, 58 insertions(+), 46 deletions(-) diff --git a/Annex/Action.hs b/Annex/Action.hs index 1902b0d89c..fca7e14958 100644 --- a/Annex/Action.hs +++ b/Annex/Action.hs @@ -6,6 +6,8 @@ -} module Annex.Action ( + action, + verifiedAction, startup, shutdown, stopCoProcesses, @@ -21,6 +23,22 @@ import Annex.CheckAttr import Annex.HashObject import Annex.CheckIgnore +{- Runs an action that may throw exceptions, catching and displaying them. -} +action :: Annex () -> Annex Bool +action a = tryNonAsync a >>= \case + Right () -> return True + Left e -> do + warning (show e) + return False + +verifiedAction :: Annex Verification -> Annex (Bool, Verification) +verifiedAction a = tryNonAsync a >>= \case + Right v -> return (True, v) + Left e -> do + warning (show e) + return (False, UnVerified) + + {- Actions to perform each time ran. -} startup :: Annex () startup = return () diff --git a/Annex/Import.hs b/Annex/Import.hs index 57d7b5b2c1..9a5eda2968 100644 --- a/Annex/Import.hs +++ b/Annex/Import.hs @@ -466,7 +466,7 @@ importKeys remote importtreeconfig importcontent importablecontents = do return (Just (k', ok)) checkDiskSpaceToGet k Nothing $ notifyTransfer Download af $ - download (Remote.uuid remote) k af stdRetry $ \p' -> + download' (Remote.uuid remote) k af stdRetry $ \p' -> withTmp k $ downloader p' -- The file is small, so is added to git, so while importing @@ -520,7 +520,7 @@ importKeys remote importtreeconfig importcontent importablecontents = do return Nothing checkDiskSpaceToGet tmpkey Nothing $ notifyTransfer Download af $ - download (Remote.uuid remote) tmpkey af stdRetry $ \p -> + download' (Remote.uuid remote) tmpkey af stdRetry $ \p -> withTmp tmpkey $ \tmpfile -> metered (Just p) tmpkey $ const (rundownload tmpfile) diff --git a/Annex/Transfer.hs b/Annex/Transfer.hs index cf190058e2..20358c6d8d 100644 --- a/Annex/Transfer.hs +++ b/Annex/Transfer.hs @@ -10,8 +10,10 @@ module Annex.Transfer ( module X, upload, + upload', alwaysUpload, download, + download', runTransfer, alwaysRunTransfer, noRetry, @@ -24,7 +26,9 @@ import qualified Annex import Logs.Transfer as X import Types.Transfer as X import Annex.Notification as X +import Annex.Content import Annex.Perms +import Annex.Action import Utility.Metered import Utility.ThreadScheduler import Annex.LockPool @@ -42,16 +46,28 @@ import qualified Data.Map.Strict as M import qualified System.FilePath.ByteString as P import Data.Ord -upload :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v -upload u key f d a _witness = guardHaveUUID u $ +upload :: Remote -> Key -> AssociatedFile -> RetryDecider -> NotifyWitness -> Annex Bool +upload r key f d = upload' (Remote.uuid r) key f d $ + action . Remote.storeKey r key f + +upload' :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v +upload' u key f d a _witness = guardHaveUUID u $ runTransfer (Transfer Upload u (fromKey id key)) f d a alwaysUpload :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v alwaysUpload u key f d a _witness = guardHaveUUID u $ alwaysRunTransfer (Transfer Upload u (fromKey id key)) f d a -download :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v -download u key f d a _witness = guardHaveUUID u $ +download :: Remote -> Key -> AssociatedFile -> RetryDecider -> NotifyWitness -> Annex Bool +download r key f d witness = + getViaTmp (Remote.retrievalSecurityPolicy r) (RemoteVerify r) key f $ \dest -> + download' (Remote.uuid r) key f d (go dest) witness + where + go dest p = verifiedAction $ + Remote.retrieveKeyFile r key f (fromRawFilePath dest) p + +download' :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v +download' u key f d a _witness = guardHaveUUID u $ runTransfer (Transfer Download u (fromKey id key)) f d a guardHaveUUID :: Observable v => UUID -> Annex v -> Annex v @@ -81,7 +97,7 @@ alwaysRunTransfer :: Observable v => Transfer -> AssociatedFile -> RetryDecider alwaysRunTransfer = runTransfer' True runTransfer' :: Observable v => Bool -> Transfer -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> Annex v -runTransfer' ignorelock t afile retrydecider transferaction = enteringStage TransferStage $ debugLocks $ checkSecureHashes t $ do +runTransfer' ignorelock t afile retrydecider transferaction = enteringStage TransferStage $ debugLocks $ preCheckSecureHashes t $ do info <- liftIO $ startTransferInfo afile (meter, tfile, createtfile, metervar) <- mkProgressUpdater t info mode <- annexFileMode @@ -180,8 +196,8 @@ runTransfer' ignorelock t afile retrydecider transferaction = enteringStage Tran - still contains content using an insecure hash, remotes will likewise - tend to be configured to reject it, so Upload is also prevented. -} -checkSecureHashes :: Observable v => Transfer -> Annex v -> Annex v -checkSecureHashes t a = ifM (isCryptographicallySecure (transferKey t)) +preCheckSecureHashes :: Observable v => Transfer -> Annex v -> Annex v +preCheckSecureHashes t a = ifM (isCryptographicallySecure (transferKey t)) ( a , ifM (annexSecureHashesOnly <$> Annex.getGitConfig) ( do diff --git a/Command/AddUrl.hs b/Command/AddUrl.hs index 6217d3fc75..c54e2ecf15 100644 --- a/Command/AddUrl.hs +++ b/Command/AddUrl.hs @@ -332,7 +332,7 @@ downloadWeb addunlockedmatcher o url urlinfo file = let cleanuptmp = pruneTmpWorkDirBefore tmp (liftIO . removeWhenExistsWith R.removeLink) showNote "using youtube-dl" Transfer.notifyTransfer Transfer.Download url $ - Transfer.download webUUID mediakey (AssociatedFile Nothing) Transfer.noRetry $ \p -> + Transfer.download' webUUID mediakey (AssociatedFile Nothing) Transfer.noRetry $ \p -> youtubeDl url (fromRawFilePath workdir) p >>= \case Right (Just mediafile) -> do cleanuptmp @@ -396,7 +396,7 @@ downloadWith' downloader dummykey u url afile = checkDiskSpaceToGet dummykey Nothing $ do tmp <- fromRepo $ gitAnnexTmpObjectLocation dummykey ok <- Transfer.notifyTransfer Transfer.Download url $ - Transfer.download u dummykey afile Transfer.stdRetry $ \p -> do + Transfer.download' u dummykey afile Transfer.stdRetry $ \p -> do createAnnexDirectory (parentDir tmp) downloader (fromRawFilePath tmp) p if ok diff --git a/Command/Export.hs b/Command/Export.hs index 8635b93da7..0ad856f0fc 100644 --- a/Command/Export.hs +++ b/Command/Export.hs @@ -283,7 +283,7 @@ performExport r db ek af contentsha loc allfilledvar = do sent <- tryNonAsync $ case ek of AnnexKey k -> ifM (inAnnex k) ( notifyTransfer Upload af $ - upload (uuid r) k af stdRetry $ \pm -> do + upload' (uuid r) k af stdRetry $ \pm -> do let rollback = void $ performUnexport r db [ek] loc sendAnnex k rollback $ \f -> diff --git a/Command/Get.hs b/Command/Get.hs index c31b2c0bd7..433889f448 100644 --- a/Command/Get.hs +++ b/Command/Get.hs @@ -9,7 +9,6 @@ module Command.Get where import Command import qualified Remote -import Annex.Content import Annex.Transfer import Annex.NumCopies import Annex.Wanted @@ -114,10 +113,6 @@ getKey' key afile = dispatch | Remote.hasKeyCheap r = either (const False) id <$> Remote.hasKey r key | otherwise = return True - docopy r witness = getViaTmp (Remote.retrievalSecurityPolicy r) (RemoteVerify r) key afile $ \dest -> - download (Remote.uuid r) key afile stdRetry - (\p -> do - showAction $ "from " ++ Remote.name r - Remote.verifiedAction $ - Remote.retrieveKeyFile r key afile (fromRawFilePath dest) p - ) witness + docopy r witness = do + showAction $ "from " ++ Remote.name r + download r key afile stdRetry witness diff --git a/Command/Move.hs b/Command/Move.hs index 114f2507af..71e2951700 100644 --- a/Command/Move.hs +++ b/Command/Move.hs @@ -142,8 +142,7 @@ toPerform dest removewhen key afile fastcheck isthere = do Right False -> logMove srcuuid destuuid False key $ \deststartedwithcopy -> do showAction $ "to " ++ Remote.name dest ok <- notifyTransfer Upload afile $ - upload (Remote.uuid dest) key afile stdRetry $ - Remote.action . Remote.storeKey dest key afile + upload dest key afile stdRetry if ok then finish deststartedwithcopy $ Remote.logStatus dest key InfoPresent @@ -223,10 +222,8 @@ fromPerform src removewhen key afile = do then dispatch removewhen deststartedwithcopy True else dispatch removewhen deststartedwithcopy =<< get where - get = notifyTransfer Download afile $ - download (Remote.uuid src) key afile stdRetry $ \p -> - getViaTmp (Remote.retrievalSecurityPolicy src) (RemoteVerify src) key afile $ \t -> - Remote.verifiedAction $ Remote.retrieveKeyFile src key afile (fromRawFilePath t) p + get = notifyTransfer Download afile $ + download src key afile stdRetry dispatch _ _ False = stop -- failed dispatch RemoveNever _ True = next $ return True -- copy complete diff --git a/Command/TransferKey.hs b/Command/TransferKey.hs index b7f3cc9177..d6d660a39c 100644 --- a/Command/TransferKey.hs +++ b/Command/TransferKey.hs @@ -51,7 +51,7 @@ start o (_, key) = startingCustomOutput key $ case fromToOptions o of toPerform :: Key -> AssociatedFile -> Remote -> CommandPerform toPerform key file remote = go Upload file $ - upload (uuid remote) key file stdRetry $ \p -> do + upload' (uuid remote) key file stdRetry $ \p -> do tryNonAsync (Remote.storeKey remote key file p) >>= \case Right () -> do Remote.logStatus remote key InfoPresent @@ -62,7 +62,7 @@ toPerform key file remote = go Upload file $ fromPerform :: Key -> AssociatedFile -> Remote -> CommandPerform fromPerform key file remote = go Upload file $ - download (uuid remote) key file stdRetry $ \p -> + download' (uuid remote) key file stdRetry $ \p -> getViaTmp (retrievalSecurityPolicy remote) (RemoteVerify remote) key file $ \t -> tryNonAsync (Remote.retrieveKeyFile remote key file (fromRawFilePath t) p) >>= \case Right v -> return (True, v) diff --git a/Command/TransferKeys.hs b/Command/TransferKeys.hs index be7b4be01d..6e2112b5f8 100644 --- a/Command/TransferKeys.hs +++ b/Command/TransferKeys.hs @@ -49,7 +49,7 @@ start = do where runner (TransferRequest direction _ keydata file) remote | direction == Upload = notifyTransfer direction file $ - upload (Remote.uuid remote) key file stdRetry $ \p -> do + upload' (Remote.uuid remote) key file stdRetry $ \p -> do tryNonAsync (Remote.storeKey remote key file p) >>= \case Left e -> do warning (show e) @@ -58,7 +58,7 @@ start = do Remote.logStatus remote key InfoPresent return True | otherwise = notifyTransfer direction file $ - download (Remote.uuid remote) key file stdRetry $ \p -> + download' (Remote.uuid remote) key file stdRetry $ \p -> getViaTmp (Remote.retrievalSecurityPolicy remote) (RemoteVerify remote) key file $ \t -> do r <- tryNonAsync (Remote.retrieveKeyFile remote key file (fromRawFilePath t) p) >>= \case Left e -> do diff --git a/P2P/Annex.hs b/P2P/Annex.hs index d107f6ef3b..8cf858fead 100644 --- a/P2P/Annex.hs +++ b/P2P/Annex.hs @@ -75,7 +75,7 @@ runLocal runst runner a = case a of let rsp = RetrievalAllKeysSecure v <- tryNonAsync $ do let runtransfer ti = - Right <$> transfer download k af (\p -> + Right <$> transfer download' k af (\p -> getViaTmp rsp DefaultVerify k af $ \tmp -> storefile (fromRawFilePath tmp) o l getb validitycheck p ti) let fallback = return $ Left $ diff --git a/Remote.hs b/Remote.hs index 1989f9382c..1d6250f9e2 100644 --- a/Remote.hs +++ b/Remote.hs @@ -70,6 +70,7 @@ import Annex.Common import Types.Remote import qualified Annex import Annex.UUID +import Annex.Action import Logs.UUID import Logs.Trust import Logs.Location hiding (logStatus) @@ -82,21 +83,6 @@ import Config.DynamicConfig import Git.Types (RemoteName, ConfigKey(..), fromConfigValue) import Utility.Aeson -{- Runs an action that may throw exceptions, catching and displaying them. -} -action :: Annex () -> Annex Bool -action a = tryNonAsync a >>= \case - Right () -> return True - Left e -> do - warning (show e) - return False - -verifiedAction :: Annex Verification -> Annex (Bool, Verification) -verifiedAction a = tryNonAsync a >>= \case - Right v -> return (True, v) - Left e -> do - warning (show e) - return (False, UnVerified) - {- Map from UUIDs of Remotes to a calculated value. -} remoteMap :: (Remote -> v) -> Annex (M.Map UUID v) remoteMap mkv = remoteMap' mkv (pure . mkk) From fcc9e015560534b7d0ae7a4c5067a8b6dbb89679 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 7 Dec 2020 16:11:29 -0400 Subject: [PATCH 13/20] finally using transferkeys Seems to work! Even progress bars. Have not tested prompting or various error message displays yet. transferkeys had to be made to operate in different modes for the Assistant and Annex monads. A bit ugly, but it did relegate that really ugly Database.Keys.closeDb in transferkeys to only the assistant code path. This commit was sponsored by Noam Kremen. --- Annex/Transfer.hs | 26 ++++++++-- Annex/TransferrerPool.hs | 104 ++++++++++++++++++++++++++++++------- Assistant/TransferSlots.hs | 5 +- Command/TransferKeys.hs | 65 +++++++++-------------- 4 files changed, 132 insertions(+), 68 deletions(-) diff --git a/Annex/Transfer.hs b/Annex/Transfer.hs index 20358c6d8d..99436d093b 100644 --- a/Annex/Transfer.hs +++ b/Annex/Transfer.hs @@ -38,6 +38,7 @@ import Types.Concurrency import Annex.Concurrent.Utility import Types.WorkerPool import Annex.WorkerPool +import Annex.TransferrerPool import Backend (isCryptographicallySecure) import qualified Utility.RawFilePath as R @@ -47,8 +48,17 @@ import qualified System.FilePath.ByteString as P import Data.Ord upload :: Remote -> Key -> AssociatedFile -> RetryDecider -> NotifyWitness -> Annex Bool -upload r key f d = upload' (Remote.uuid r) key f d $ - action . Remote.storeKey r key f +upload r key f d _witness = + -- TODO: use this when not handling timeouts + --upload' (Remote.uuid r) key f d $ + -- action . Remote.storeKey r key f + + -- TODO: RetryDecider + -- TODO: Handle timeouts + withTransferrer $ \transferrer -> + performTransfer transferrer AnnexLevel + (Transfer Upload (Remote.uuid r) (fromKey id key)) + (Just r) f id upload' :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v upload' u key f d a _witness = guardHaveUUID u $ @@ -60,8 +70,16 @@ alwaysUpload u key f d a _witness = guardHaveUUID u $ download :: Remote -> Key -> AssociatedFile -> RetryDecider -> NotifyWitness -> Annex Bool download r key f d witness = - getViaTmp (Remote.retrievalSecurityPolicy r) (RemoteVerify r) key f $ \dest -> - download' (Remote.uuid r) key f d (go dest) witness + -- TODO: use this when not handling timeouts + --getViaTmp (Remote.retrievalSecurityPolicy r) (RemoteVerify r) key f $ \dest -> + -- download' (Remote.uuid r) key f d (go dest) witness + + -- TODO: RetryDecider + -- TODO: Handle timeouts + withTransferrer $ \transferrer -> + performTransfer transferrer AnnexLevel + (Transfer Download (Remote.uuid r) (fromKey id key)) + (Just r) f id where go dest p = verifiedAction $ Remote.retrieveKeyFile r key f (fromRawFilePath dest) p diff --git a/Annex/TransferrerPool.hs b/Annex/TransferrerPool.hs index 286c6f9be2..9aec95035e 100644 --- a/Annex/TransferrerPool.hs +++ b/Annex/TransferrerPool.hs @@ -10,35 +10,66 @@ module Annex.TransferrerPool where import Annex.Common +import qualified Annex import Types.TransferrerPool import Types.Transfer -import Utility.Batch +import Types.Key +import qualified Types.Remote as Remote +import Git.Types (RemoteName) +import Types.Messages import Messages.Serialized -import qualified Command.TransferKeys as T +import Annex.Path +import Utility.Batch import Control.Concurrent.STM hiding (check) import Control.Concurrent import Control.Monad.IO.Class (MonadIO) +import Text.Read (readMaybe) +import System.Log.Logger (debugM) -{- Runs an action with a Transferrer from the pool. - - - - When minimizeprocesses is True, only one Transferrer is left running - - in the pool at a time. So if this needed to start a new Transferrer, - - it's stopped when done. Otherwise, idle processes are left in the pool - - for use later. - -} -withTransferrer :: Bool -> MkCheckTransferrer -> FilePath -> BatchCommandMaker -> TransferrerPool -> (Transferrer -> IO a) -> IO a -withTransferrer minimizeprocesses mkcheck program batchmaker pool a = do - (mi, leftinpool) <- atomically (popTransferrerPool pool) - i@(TransferrerPoolItem (Just t) check) <- case mi of +data TransferRequest = TransferRequest TransferRequestLevel Direction (Either UUID RemoteName) KeyData AssociatedFile + deriving (Show, Read) + +data TransferRequestLevel = AnnexLevel | AssistantLevel + deriving (Show, Read) + +data TransferResponse + = TransferOutput SerializedOutput + | TransferResult Bool + deriving (Show, Read) + +{- Runs an action with a Transferrer from the pool. -} +withTransferrer :: (Transferrer -> Annex a) -> Annex a +withTransferrer a = do + program <- liftIO programPath + pool <- Annex.getState Annex.transferrerpool + let nocheck = pure (pure True) + withTransferrer' False nocheck program nonBatchCommandMaker pool a + +withTransferrer' + :: (MonadIO m, MonadFail m, MonadMask m) + => Bool + -- ^ When minimizeprocesses is True, only one Transferrer is left + -- running in the pool at a time. So if this needed to start a + -- new Transferrer, it's stopped when done. Otherwise, idle + -- processes are left in the pool for use later. + -> MkCheckTransferrer + -> FilePath + -> BatchCommandMaker + -> TransferrerPool + -> (Transferrer -> m a) + -> m a +withTransferrer' minimizeprocesses mkcheck program batchmaker pool a = do + (mi, leftinpool) <- liftIO $ atomically (popTransferrerPool pool) + i@(TransferrerPoolItem (Just t) check) <- liftIO $ case mi of Nothing -> mkTransferrerPoolItem mkcheck =<< mkTransferrer program batchmaker Just i -> checkTransferrerPoolItem program batchmaker i a t `finally` returntopool leftinpool check t i where returntopool leftinpool check t i | not minimizeprocesses || leftinpool == 0 = - atomically $ pushTransferrerPool pool i - | otherwise = do + liftIO $ atomically $ pushTransferrerPool pool i + | otherwise = liftIO $ do void $ forkIO $ stopTransferrer t atomically $ pushTransferrerPool pool $ TransferrerPoolItem Nothing check @@ -63,17 +94,19 @@ checkTransferrerPoolItem program batchmaker i = case i of performTransfer :: (Monad m, MonadIO m, MonadMask m) => Transferrer + -> TransferRequestLevel -> Transfer - -> TransferInfo + -> Maybe Remote + -> AssociatedFile -> (forall a. Annex a -> m a) -- ^ Run an annex action in the monad. Will not be used with -- actions that block for a long time. -> m Bool -performTransfer transferrer t info runannex = catchBoolIO $ do - (liftIO $ T.sendRequest t info (transferrerWrite transferrer)) +performTransfer transferrer level t mremote afile runannex = catchBoolIO $ do + (liftIO $ sendRequest level t mremote afile (transferrerWrite transferrer)) relaySerializedOutput - (liftIO $ T.readResponse (transferrerRead transferrer)) - (liftIO . T.sendSerializedOutputResponse (transferrerWrite transferrer)) + (liftIO $ readResponse (transferrerRead transferrer)) + (liftIO . sendSerializedOutputResponse (transferrerWrite transferrer)) runannex {- Starts a new git-annex transferkeys process, setting up handles @@ -103,3 +136,34 @@ stopTransferrer t = do hClose $ transferrerRead t hClose $ transferrerWrite t void $ waitForProcess $ transferrerHandle t + +-- | Send a request to perform a transfer. +sendRequest :: TransferRequestLevel -> Transfer -> Maybe Remote -> AssociatedFile -> Handle -> IO () +sendRequest level t mremote afile h = do + let l = show $ TransferRequest level + (transferDirection t) + (maybe (Left (transferUUID t)) (Right . Remote.name) mremote) + (keyData (transferKey t)) + afile + debugM "transfer" ("> " ++ l) + hPutStrLn h l + hFlush h + +sendSerializedOutputResponse :: Handle -> SerializedOutputResponse -> IO () +sendSerializedOutputResponse h sor = hPutStrLn h $ show sor + +-- | Read a response to a transfer requests. +-- +-- Before the final response, this will return whatever SerializedOutput +-- should be displayed as the transfer is performed. +readResponse :: Handle -> IO (Either SerializedOutput Bool) +readResponse h = do + l <- liftIO $ hGetLine h + debugM "transfer" ("< " ++ l) + case readMaybe l of + Just (TransferOutput so) -> return (Left so) + Just (TransferResult r) -> return (Right r) + Nothing -> transferKeysProtocolError l + +transferKeysProtocolError :: String -> a +transferKeysProtocolError l = error $ "transferkeys protocol error: " ++ show l diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs index 89788ca9f0..7504fe2f59 100644 --- a/Assistant/TransferSlots.hs +++ b/Assistant/TransferSlots.hs @@ -91,8 +91,7 @@ runTransferThread' mkcheck program batchmaker d run = go go = catchPauseResume $ do p <- runAssistant d $ liftAnnex $ Annex.getState Annex.transferrerpool - withTransferrer True mkcheck program batchmaker p - run + withTransferrer' True mkcheck program batchmaker p run pause = catchPauseResume $ runEvery (Seconds 86400) noop {- Note: This must use E.try, rather than E.catch. @@ -163,7 +162,7 @@ genTransfer t info = case transferRemote info of - usual cleanup. However, first check if something else is - running the transfer, to avoid removing active transfers. -} - go remote transferrer = ifM (performTransfer transferrer t info liftAnnex) + go remote transferrer = ifM (performTransfer transferrer AssistantLevel t (transferRemote info) (associatedFile info) liftAnnex) ( do case associatedFile info of AssociatedFile Nothing -> noop diff --git a/Command/TransferKeys.hs b/Command/TransferKeys.hs index 6e2112b5f8..573530e98f 100644 --- a/Command/TransferKeys.hs +++ b/Command/TransferKeys.hs @@ -14,22 +14,13 @@ import Logs.Location import Annex.Transfer import qualified Remote import Utility.SimpleProtocol (dupIoHandles) -import Git.Types (RemoteName) import qualified Database.Keys import Annex.BranchState import Types.Messages -import Types.Key +import Annex.TransferrerPool import Text.Read (readMaybe) -data TransferRequest = TransferRequest Direction (Either UUID RemoteName) KeyData AssociatedFile - deriving (Show, Read) - -data TransferResponse - = TransferOutput SerializedOutput - | TransferResult Bool - deriving (Show, Read) - cmd :: Command cmd = command "transferkeys" SectionPlumbing "transfers keys" paramNothing (withParams seek) @@ -42,12 +33,30 @@ start = do enableInteractiveBranchAccess (readh, writeh) <- liftIO dupIoHandles Annex.setOutput $ SerializedOutput - (hPutStrLn writeh . show . TransferOutput) + (\v -> hPutStrLn writeh (show (TransferOutput v)) >> hFlush writeh) (readMaybe <$> hGetLine readh) runRequests readh writeh runner stop where - runner (TransferRequest direction _ keydata file) remote + runner (TransferRequest AnnexLevel direction _ keydata file) remote + | direction == Upload = + -- This is called by eg, Annex.Transfer.upload, + -- so caller is responsible for doing notification, + -- and for retrying. + upload' (Remote.uuid remote) key file noRetry + (Remote.action . Remote.storeKey remote key file) + noNotification + | otherwise = + -- This is called by eg, Annex.Transfer.download + -- so caller is responsible for doing notification + -- and for retrying. + let go p = getViaTmp (Remote.retrievalSecurityPolicy remote) (RemoteVerify remote) key file $ \t -> do + Remote.verifiedAction (Remote.retrieveKeyFile remote key file (fromRawFilePath t) p) + in download' (Remote.uuid remote) key file noRetry go + noNotification + where + key = mkKey (const keydata) + runner (TransferRequest AssistantLevel direction _ keydata file) remote | direction == Upload = notifyTransfer direction file $ upload' (Remote.uuid remote) key file stdRetry $ \p -> do tryNonAsync (Remote.storeKey remote key file p) >>= \case @@ -83,7 +92,7 @@ runRequests readh writeh a = go Nothing Nothing go lastremoteoruuid lastremote = unlessM (liftIO $ hIsEOF readh) $ do l <- liftIO $ hGetLine readh case readMaybe l of - Just tr@(TransferRequest _ remoteoruuid _ _) -> do + Just tr@(TransferRequest _ _ remoteoruuid _ _) -> do -- Often the same remote will be used -- repeatedly, so cache the last one to -- avoid looking up repeatedly. @@ -95,35 +104,9 @@ runRequests readh writeh a = go Nothing Nothing Just remote -> do sendresult =<< a tr remote go (Just remoteoruuid) mremote - Nothing -> protocolError l - Nothing -> protocolError l + Nothing -> transferKeysProtocolError l + Nothing -> transferKeysProtocolError l sendresult b = liftIO $ do hPutStrLn writeh $ show $ TransferResult b hFlush writeh - --- | Send a request to this command to perform a transfer. -sendRequest :: Transfer -> TransferInfo -> Handle -> IO () -sendRequest t tinfo h = hPutStrLn h $ show $ TransferRequest - (transferDirection t) - (maybe (Left (transferUUID t)) (Right . Remote.name) (transferRemote tinfo)) - (keyData (transferKey t)) - (associatedFile tinfo) - -sendSerializedOutputResponse :: Handle -> SerializedOutputResponse -> IO () -sendSerializedOutputResponse h sor = hPutStrLn h $ show sor - --- | Read a response from this command. --- --- Before the final response, this will return whatever SerializedOutput --- should be displayed as the transfer is performed. -readResponse :: Handle -> IO (Either SerializedOutput Bool) -readResponse h = do - l <- liftIO $ hGetLine h - case readMaybe l of - Just (TransferOutput so) -> return (Left so) - Just (TransferResult r) -> return (Right r) - Nothing -> protocolError l - -protocolError :: String -> a -protocolError l = error $ "transferkeys protocol error: " ++ show l From 2cb3f3a99dc5672c88b457c3d4549081459fff99 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 7 Dec 2020 16:55:07 -0400 Subject: [PATCH 14/20] annex.stalldetection docs Not implemented yet. This commit was sponsored by Svenne Krap on Patreon. --- doc/git-annex.mdwn | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/doc/git-annex.mdwn b/doc/git-annex.mdwn index d19f333c09..2ee80c369f 100644 --- a/doc/git-annex.mdwn +++ b/doc/git-annex.mdwn @@ -1392,6 +1392,31 @@ Remotes are configured using these settings in `.git/config`. When making multiple retries of the same transfer, the delay doubles after each retry. (default 1) +* `remote..annex-stalldetecton`, `annex.stalldetection` + + This lets stalled or too-slow transfers be detected, and dealt with, so + rather than getting stuck, git-annex will cancel the stalled operation. + When this happens, the transfer will be considered to have failed, so + settings like annex.retry will control what it does next. + + This is not enabled by default, because it can make git-annex use + more resources. In order to cancel stalls, git-annex has to run + transfers in separate processes (one per concurrent job). So it + may need to open more connections to a remote than usual, or + the communication with those processes may make it a bit slower. + + The value specifies how much data git-annex should expect to see + flowing, minimum, when it's not stalled, over a given period of time. + The format is "$amount/$timeperiod". + + For example, to detect outright stalls where no data has been transferred + after 30 seconds: `git config annex.stalldetection "0KiB/60s"` + + Or, if you have a remote on a USB drive that is normally capable of + several megabytes per second, but has bad sectors where it gets + stuck for a long time, you could use: + `git config remote.usbdrive.annex-stalldetection "1MB/1m"` + * `remote..annex-checkuuid` This only affects remotes that have their url pointing to a directory on From 822a8eadf89ca1c6dac7a2c4b999b22fda792966 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 8 Dec 2020 10:53:07 -0400 Subject: [PATCH 15/20] rename --- Annex/TransferrerPool.hs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Annex/TransferrerPool.hs b/Annex/TransferrerPool.hs index 9aec95035e..f5ab42e0db 100644 --- a/Annex/TransferrerPool.hs +++ b/Annex/TransferrerPool.hs @@ -70,7 +70,7 @@ withTransferrer' minimizeprocesses mkcheck program batchmaker pool a = do | not minimizeprocesses || leftinpool == 0 = liftIO $ atomically $ pushTransferrerPool pool i | otherwise = liftIO $ do - void $ forkIO $ stopTransferrer t + void $ forkIO $ shutdownTransferrer t atomically $ pushTransferrerPool pool $ TransferrerPoolItem Nothing check {- Check if a Transferrer from the pool is still ok to be used. @@ -80,7 +80,7 @@ checkTransferrerPoolItem program batchmaker i = case i of TransferrerPoolItem (Just t) check -> ifM check ( return i , do - stopTransferrer t + shutdownTransferrer t new check ) TransferrerPoolItem Nothing check -> new check @@ -129,10 +129,10 @@ mkTransferrer program batchmaker = do , transferrerHandle = pid } -{- Closing the fds will stop the transferrer, but only when it's in between - - transfers. -} -stopTransferrer :: Transferrer -> IO () -stopTransferrer t = do +{- Closing the fds will shut down the transferrer, but only when it's + - in between transfers. -} +shutdownTransferrer :: Transferrer -> IO () +shutdownTransferrer t = do hClose $ transferrerRead t hClose $ transferrerWrite t void $ waitForProcess $ transferrerHandle t From b9cfd15e906fc0fab6204ddf23aa8bf39c4c158e Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 8 Dec 2020 11:43:06 -0400 Subject: [PATCH 16/20] add killTransferrer There is redundant code in the assistant that does the same thing, but that code uses a PID, not a ProcessHandle, and gets the PID from, apparently, the TransferInfo transferPid (although I can't seem to find where that gets set on non-windows). --- Annex/TransferrerPool.hs | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/Annex/TransferrerPool.hs b/Annex/TransferrerPool.hs index f5ab42e0db..cdfe45dad1 100644 --- a/Annex/TransferrerPool.hs +++ b/Annex/TransferrerPool.hs @@ -129,14 +129,6 @@ mkTransferrer program batchmaker = do , transferrerHandle = pid } -{- Closing the fds will shut down the transferrer, but only when it's - - in between transfers. -} -shutdownTransferrer :: Transferrer -> IO () -shutdownTransferrer t = do - hClose $ transferrerRead t - hClose $ transferrerWrite t - void $ waitForProcess $ transferrerHandle t - -- | Send a request to perform a transfer. sendRequest :: TransferRequestLevel -> Transfer -> Maybe Remote -> AssociatedFile -> Handle -> IO () sendRequest level t mremote afile h = do @@ -167,3 +159,20 @@ readResponse h = do transferKeysProtocolError :: String -> a transferKeysProtocolError l = error $ "transferkeys protocol error: " ++ show l + +{- Closing the fds will shut down the transferrer, but only when it's + - in between transfers. -} +shutdownTransferrer :: Transferrer -> IO () +shutdownTransferrer t = do + hClose $ transferrerRead t + hClose $ transferrerWrite t + void $ waitForProcess $ transferrerHandle t + +{- Kill the transferrer, and all its child processes. -} +killTransferrer :: Transferrer -> IO () +killTransferrer t = do + hClose $ transferrerRead t + hClose $ transferrerWrite t + interruptProcessGroupOf $ transferrerHandle t + threadDelay 50000 -- 0.05 second grace period + terminateProcess $ transferrerHandle t From 794fc72afbaab7b49faafc158fe5a064bdb2eb49 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 8 Dec 2020 12:51:56 -0400 Subject: [PATCH 17/20] avoid parseDuration succeeding on empty string --- Utility/HumanTime.hs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Utility/HumanTime.hs b/Utility/HumanTime.hs index 7db1008015..5178531cf8 100644 --- a/Utility/HumanTime.hs +++ b/Utility/HumanTime.hs @@ -45,7 +45,9 @@ daysToDuration i = Duration $ i * dsecs {- Parses a human-input time duration, of the form "5h", "1m", "5h1m", etc -} parseDuration :: String -> Either String Duration -parseDuration d = maybe parsefail (Right . Duration) $ go 0 d +parseDuration d + | null d = parsefail + | otherwise = maybe parsefail (Right . Duration) $ go 0 d where go n [] = return n go n s = do From c4d489f7d484023ac75a9e48c5fdeda28996025a Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 8 Dec 2020 15:17:35 -0400 Subject: [PATCH 18/20] add todo Not going to do this yet, so remember for later. --- doc/todo/stalldetection_for_import_and_export.mdwn | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 doc/todo/stalldetection_for_import_and_export.mdwn diff --git a/doc/todo/stalldetection_for_import_and_export.mdwn b/doc/todo/stalldetection_for_import_and_export.mdwn new file mode 100644 index 0000000000..a4698f461a --- /dev/null +++ b/doc/todo/stalldetection_for_import_and_export.mdwn @@ -0,0 +1,5 @@ +The new annex.stalldetection is used for transfers from remotes, but not +import and export from remotes. + +This should be doable, but it will need the transferkeys protocol to be +extended to cover the additional actions. --[[Joey]] From 09ed9f7d1fbc4305f7c7e901c2b09f6aa3d5818a Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 8 Dec 2020 15:20:05 -0400 Subject: [PATCH 19/20] reminder for later --- doc/todo/transferkeys_optimisation.mdwn | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 doc/todo/transferkeys_optimisation.mdwn diff --git a/doc/todo/transferkeys_optimisation.mdwn b/doc/todo/transferkeys_optimisation.mdwn new file mode 100644 index 0000000000..dbb741cd73 --- /dev/null +++ b/doc/todo/transferkeys_optimisation.mdwn @@ -0,0 +1,9 @@ +Some of the things git-annex transferkeys does are suboptimal, especially +when -J has many of them running. + +In particular, it writes location logs when downloading (but not +uploading), and so it flushes the journal etc. + +It may also do some queries of data from git that could be avoided with +some refactoring of what code runs in it, which could avoid it needing to +start up git helper processes like catkey. --[[Joey]] From 41f2c308ffcfe092b169575864e971f6b8ea557f Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 8 Dec 2020 15:22:18 -0400 Subject: [PATCH 20/20] stall detection is working New config annex.stalldetection, remote.name.annex-stalldetection, which can be used to deal with remotes that stall during transfers, or are sometimes too slow to want to use. This commit was sponsored by Luke Shumaker on Patreon. --- Annex/Transfer.hs | 76 ++++++++---- Annex/TransferrerPool.hs | 112 ++++++++++++++++-- Assistant/TransferSlots.hs | 7 +- CHANGELOG | 3 + Messages/Serialized.hs | 16 ++- Types/GitConfig.hs | 9 ++ Types/StallDetection.hs | 29 +++++ Utility/Process.hs | 2 +- doc/git-annex.mdwn | 2 +- ...ve_retries_to_mask_transient_failures.mdwn | 2 + git-annex.cabal | 1 + 11 files changed, 213 insertions(+), 46 deletions(-) create mode 100644 Types/StallDetection.hs diff --git a/Annex/Transfer.hs b/Annex/Transfer.hs index 99436d093b..ba2044e2fb 100644 --- a/Annex/Transfer.hs +++ b/Annex/Transfer.hs @@ -19,6 +19,7 @@ module Annex.Transfer ( noRetry, stdRetry, pickRemote, + stallDetection, ) where import Annex.Common @@ -40,6 +41,7 @@ import Types.WorkerPool import Annex.WorkerPool import Annex.TransferrerPool import Backend (isCryptographicallySecure) +import Types.StallDetection import qualified Utility.RawFilePath as R import Control.Concurrent @@ -47,19 +49,15 @@ import qualified Data.Map.Strict as M import qualified System.FilePath.ByteString as P import Data.Ord +-- Upload, supporting stall detection. upload :: Remote -> Key -> AssociatedFile -> RetryDecider -> NotifyWitness -> Annex Bool -upload r key f d _witness = - -- TODO: use this when not handling timeouts - --upload' (Remote.uuid r) key f d $ - -- action . Remote.storeKey r key f - - -- TODO: RetryDecider - -- TODO: Handle timeouts - withTransferrer $ \transferrer -> - performTransfer transferrer AnnexLevel - (Transfer Upload (Remote.uuid r) (fromKey id key)) - (Just r) f id +upload r key f d witness = stallDetection r >>= \case + Nothing -> upload' (Remote.uuid r) key f d go witness + Just sd -> runTransferrer sd r key f d Upload witness + where + go = action . Remote.storeKey r key f +-- Upload, not supporting stall detection. upload' :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v upload' u key f d a _witness = guardHaveUUID u $ runTransfer (Transfer Upload u (fromKey id key)) f d a @@ -68,22 +66,17 @@ alwaysUpload :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> alwaysUpload u key f d a _witness = guardHaveUUID u $ alwaysRunTransfer (Transfer Upload u (fromKey id key)) f d a +-- Download, supporting stall detection. download :: Remote -> Key -> AssociatedFile -> RetryDecider -> NotifyWitness -> Annex Bool -download r key f d witness = - -- TODO: use this when not handling timeouts - --getViaTmp (Remote.retrievalSecurityPolicy r) (RemoteVerify r) key f $ \dest -> - -- download' (Remote.uuid r) key f d (go dest) witness - - -- TODO: RetryDecider - -- TODO: Handle timeouts - withTransferrer $ \transferrer -> - performTransfer transferrer AnnexLevel - (Transfer Download (Remote.uuid r) (fromKey id key)) - (Just r) f id +download r key f d witness = stallDetection r >>= \case + Nothing -> getViaTmp (Remote.retrievalSecurityPolicy r) (RemoteVerify r) key f $ \dest -> + download' (Remote.uuid r) key f d (go dest) witness + Just sd -> runTransferrer sd r key f d Download witness where go dest p = verifiedAction $ Remote.retrieveKeyFile r key f (fromRawFilePath dest) p +-- Download, not supporting stall detection. download' :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v download' u key f d a _witness = guardHaveUUID u $ runTransfer (Transfer Download u (fromKey id key)) f d a @@ -115,7 +108,7 @@ alwaysRunTransfer :: Observable v => Transfer -> AssociatedFile -> RetryDecider alwaysRunTransfer = runTransfer' True runTransfer' :: Observable v => Bool -> Transfer -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> Annex v -runTransfer' ignorelock t afile retrydecider transferaction = enteringStage TransferStage $ debugLocks $ preCheckSecureHashes t $ do +runTransfer' ignorelock t afile retrydecider transferaction = enteringStage TransferStage $ debugLocks $ preCheckSecureHashes (transferKey t) $ do info <- liftIO $ startTransferInfo afile (meter, tfile, createtfile, metervar) <- mkProgressUpdater t info mode <- annexFileMode @@ -202,6 +195,31 @@ runTransfer' ignorelock t afile retrydecider transferaction = enteringStage Tran f <- fromRepo $ gitAnnexTmpObjectLocation (transferKey t) liftIO $ catchDefaultIO 0 $ getFileSize f +runTransferrer + :: StallDetection + -> Remote + -> Key + -> AssociatedFile + -> RetryDecider + -> Direction + -> NotifyWitness + -> Annex Bool +runTransferrer sd r k afile retrydecider direction _witness = + enteringStage TransferStage $ preCheckSecureHashes k $ do + info <- liftIO $ startTransferInfo afile + go 0 info + where + go numretries info = + withTransferrer (performTransfer (Just sd) AnnexLevel id (Just r) t info) >>= \case + Right () -> return True + Left newinfo -> do + let !numretries' = succ numretries + ifM (retrydecider numretries' info newinfo) + ( go numretries' newinfo + , return False + ) + t = Transfer direction (Remote.uuid r) (fromKey id k) + {- Avoid download and upload of keys with insecure content when - annex.securehashesonly is configured. - @@ -214,8 +232,8 @@ runTransfer' ignorelock t afile retrydecider transferaction = enteringStage Tran - still contains content using an insecure hash, remotes will likewise - tend to be configured to reject it, so Upload is also prevented. -} -preCheckSecureHashes :: Observable v => Transfer -> Annex v -> Annex v -preCheckSecureHashes t a = ifM (isCryptographicallySecure (transferKey t)) +preCheckSecureHashes :: Observable v => Key -> Annex v -> Annex v +preCheckSecureHashes k a = ifM (isCryptographicallySecure k) ( a , ifM (annexSecureHashesOnly <$> Annex.getGitConfig) ( do @@ -225,7 +243,7 @@ preCheckSecureHashes t a = ifM (isCryptographicallySecure (transferKey t)) ) ) where - variety = fromKey keyVariety (transferKey t) + variety = fromKey keyVariety k type NumRetries = Integer @@ -348,3 +366,9 @@ lessActiveFirst :: M.Map Remote Integer -> Remote -> Remote -> Ordering lessActiveFirst active a b | Remote.cost a == Remote.cost b = comparing (`M.lookup` active) a b | otherwise = comparing Remote.cost a b + +stallDetection :: Remote -> Annex (Maybe StallDetection) +stallDetection r = maybe globalcfg (pure . Just) remotecfg + where + globalcfg = annexStallDetection <$> Annex.getGitConfig + remotecfg = remoteAnnexStallDetection $ Remote.gitconfig r diff --git a/Annex/TransferrerPool.hs b/Annex/TransferrerPool.hs index cdfe45dad1..973f756298 100644 --- a/Annex/TransferrerPool.hs +++ b/Annex/TransferrerPool.hs @@ -16,15 +16,21 @@ import Types.Transfer import Types.Key import qualified Types.Remote as Remote import Git.Types (RemoteName) +import Types.StallDetection import Types.Messages import Messages.Serialized import Annex.Path import Utility.Batch +import Utility.Metered +import Utility.HumanTime +import Utility.ThreadScheduler -import Control.Concurrent.STM hiding (check) import Control.Concurrent +import Control.Concurrent.Async +import Control.Concurrent.STM hiding (check) import Control.Monad.IO.Class (MonadIO) import Text.Read (readMaybe) +import Data.Time.Clock.POSIX import System.Log.Logger (debugM) data TransferRequest = TransferRequest TransferRequestLevel Direction (Either UUID RemoteName) KeyData AssociatedFile @@ -68,7 +74,11 @@ withTransferrer' minimizeprocesses mkcheck program batchmaker pool a = do where returntopool leftinpool check t i | not minimizeprocesses || leftinpool == 0 = - liftIO $ atomically $ pushTransferrerPool pool i + -- If the transferrer got killed, the handles will + -- be closed, so it should not be returned to the + -- pool. + liftIO $ whenM (hIsOpen (transferrerWrite t)) $ + liftIO $ atomically $ pushTransferrerPool pool i | otherwise = liftIO $ do void $ forkIO $ shutdownTransferrer t atomically $ pushTransferrerPool pool $ TransferrerPoolItem Nothing check @@ -90,24 +100,102 @@ checkTransferrerPoolItem program batchmaker i = case i of return $ TransferrerPoolItem (Just t) check {- Requests that a Transferrer perform a Transfer, and waits for it to - - finish. -} + - finish. + - + - When a stall is detected, kills the Transferrer. + - + - If the transfer failed or stalled, returns TransferInfo with an + - updated bytesComplete reflecting how much data has been transferred. + -} performTransfer :: (Monad m, MonadIO m, MonadMask m) - => Transferrer + => Maybe StallDetection -> TransferRequestLevel - -> Transfer - -> Maybe Remote - -> AssociatedFile -> (forall a. Annex a -> m a) -- ^ Run an annex action in the monad. Will not be used with -- actions that block for a long time. - -> m Bool -performTransfer transferrer level t mremote afile runannex = catchBoolIO $ do - (liftIO $ sendRequest level t mremote afile (transferrerWrite transferrer)) - relaySerializedOutput + -> Maybe Remote + -> Transfer + -> TransferInfo + -> Transferrer + -> m (Either TransferInfo ()) +performTransfer stalldetection level runannex r t info transferrer = do + bpv <- liftIO $ newTVarIO zeroBytesProcessed + ifM (catchBoolIO $ bracket setup cleanup (go bpv)) + ( return (Right ()) + , do + n <- case transferDirection t of + Upload -> liftIO $ atomically $ + fromBytesProcessed <$> readTVar bpv + Download -> do + f <- runannex $ fromRepo $ gitAnnexTmpObjectLocation (transferKey t) + liftIO $ catchDefaultIO 0 $ getFileSize f + return $ Left $ info { bytesComplete = Just n } + ) + where + setup = do + liftIO $ sendRequest level t r + (associatedFile info) + (transferrerWrite transferrer) + metervar <- liftIO $ newEmptyTMVarIO + stalledvar <- liftIO $ newTVarIO False + tid <- liftIO $ async $ + detectStalls stalldetection metervar $ do + atomically $ writeTVar stalledvar True + killTransferrer transferrer + return (metervar, tid, stalledvar) + + cleanup (_, tid, stalledvar) = do + liftIO $ uninterruptibleCancel tid + whenM (liftIO $ atomically $ readTVar stalledvar) $ do + runannex $ showLongNote "Transfer stalled" + -- Close handles, to prevent the transferrer being + -- reused since the process was killed. + liftIO $ hClose $ transferrerRead transferrer + liftIO $ hClose $ transferrerWrite transferrer + + go bpv (metervar, _, _) = relaySerializedOutput (liftIO $ readResponse (transferrerRead transferrer)) (liftIO . sendSerializedOutputResponse (transferrerWrite transferrer)) + (updatemeter bpv metervar) runannex + + updatemeter bpv metervar (Just n) = liftIO $ do + atomically $ do + void $ tryTakeTMVar metervar + putTMVar metervar n + atomically $ writeTVar bpv n + updatemeter _bpv metervar Nothing = liftIO $ + atomically $ void $ tryTakeTMVar metervar + +detectStalls :: Maybe StallDetection -> TMVar BytesProcessed -> IO () -> IO () +detectStalls Nothing _ _ = noop +detectStalls (Just (StallDetection minsz duration)) metervar onstall = go Nothing + where + go st = do + starttm <- getPOSIXTime + threadDelaySeconds (Seconds (fromIntegral (durationSeconds duration))) + -- Get whatever progress value was reported most recently, or + -- if none were reported since last time, wait until one is + -- reported. + sofar <- atomically $ fromBytesProcessed <$> takeTMVar metervar + case st of + Nothing -> go (Just sofar) + Just prev + -- Just in case a progress meter somehow runs + -- backwards, or a second progress meter was + -- started and is at a smaller value than + -- the previous one. + | prev > sofar -> go (Just sofar) + | otherwise -> do + endtm <- getPOSIXTime + let actualduration = endtm - starttm + let sz = sofar - prev + let expectedsz = (minsz * durationSeconds duration) + `div` max 1 (ceiling actualduration) + if sz < expectedsz + then onstall + else go (Just sofar) {- Starts a new git-annex transferkeys process, setting up handles - that will be used to communicate with it. -} @@ -171,8 +259,6 @@ shutdownTransferrer t = do {- Kill the transferrer, and all its child processes. -} killTransferrer :: Transferrer -> IO () killTransferrer t = do - hClose $ transferrerRead t - hClose $ transferrerWrite t interruptProcessGroupOf $ transferrerHandle t threadDelay 50000 -- 0.05 second grace period terminateProcess $ transferrerHandle t diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs index 7504fe2f59..ee3e17e280 100644 --- a/Assistant/TransferSlots.hs +++ b/Assistant/TransferSlots.hs @@ -24,6 +24,7 @@ import Assistant.Alert import Assistant.Alert.Utility import Assistant.Commits import Assistant.Drop +import Annex.Transfer (stallDetection) import Types.Transfer import Logs.Transfer import Logs.Location @@ -37,6 +38,7 @@ import Annex.Path import Utility.Batch import Types.NumCopies +import Data.Either import qualified Data.Map as M import qualified Control.Exception as E import Control.Concurrent @@ -123,7 +125,8 @@ genTransfer t info = case transferRemote info of ( do debug [ "Transferring:" , describeTransfer t info ] notifyTransfer - return $ Just (t, info, go remote) + sd <- liftAnnex $ stallDetection remote + return $ Just (t, info, go remote sd) , do debug [ "Skipping unnecessary transfer:", describeTransfer t info ] @@ -162,7 +165,7 @@ genTransfer t info = case transferRemote info of - usual cleanup. However, first check if something else is - running the transfer, to avoid removing active transfers. -} - go remote transferrer = ifM (performTransfer transferrer AssistantLevel t (transferRemote info) (associatedFile info) liftAnnex) + go remote sd transferrer = ifM (isRight <$> performTransfer sd AssistantLevel liftAnnex (transferRemote info) t info transferrer) ( do case associatedFile info of AssociatedFile Nothing -> noop diff --git a/CHANGELOG b/CHANGELOG index 85a60e66d3..e994057304 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,8 @@ git-annex (8.20201128) UNRELEASED; urgency=medium + * New config annex.stalldetection, remote.name.annex-stalldetection, + which can be used to deal with remotes that stall during transfers, + or are sometimes too slow to want to use. * Fix hang on shutdown of external special remote using ASYNC protocol extension. (Reversion introduced in version 8.20201007.) * Fix bug that made the next download after an empty file from a ssh diff --git a/Messages/Serialized.hs b/Messages/Serialized.hs index b58d131ace..3dcc26edc0 100644 --- a/Messages/Serialized.hs +++ b/Messages/Serialized.hs @@ -20,6 +20,7 @@ import Messages import Messages.Internal import Messages.Progress import qualified Messages.JSON as JSON +import Utility.Metered (BytesProcessed) import Control.Monad.IO.Class (MonadIO) @@ -29,11 +30,16 @@ relaySerializedOutput => m (Either SerializedOutput r) -- ^ Get next serialized output, or final value to return. -> (SerializedOutputResponse -> m ()) + -- ^ Send response to child process. + -> (Maybe BytesProcessed -> m ()) + -- ^ When a progress meter is running, is updated with + -- progress meter values sent by the process. + -- When a progress meter is stopped, Nothing is sent. -> (forall a. Annex a -> m a) -- ^ Run an annex action in the monad. Will not be used with -- actions that block for a long time. -> m r -relaySerializedOutput getso sendsor runannex = go Nothing +relaySerializedOutput getso sendsor meterreport runannex = go Nothing where go st = loop st >>= \case Right r -> return r @@ -69,10 +75,14 @@ relaySerializedOutput getso sendsor runannex = go Nothing -- output after the progress meter -- is done. Left _st' -> loop Nothing - Left EndProgressMeter -> return (Left st) + Left EndProgressMeter -> do + meterreport Nothing + return (Left st) Left (UpdateProgressMeter n) -> do case st of - Just meterupdate -> liftIO $ meterupdate n + Just meterupdate -> do + meterreport (Just n) + liftIO $ meterupdate n Nothing -> noop loop st Left StartPrompt -> do diff --git a/Types/GitConfig.hs b/Types/GitConfig.hs index 7de6b62e46..cb50c79666 100644 --- a/Types/GitConfig.hs +++ b/Types/GitConfig.hs @@ -41,6 +41,7 @@ import Types.NumCopies import Types.Difference import Types.RefSpec import Types.RepoVersion +import Types.StallDetection import Config.DynamicConfig import Utility.HumanTime import Utility.Gpg (GpgCmd, mkGpgCmd) @@ -116,6 +117,7 @@ data GitConfig = GitConfig , annexRetry :: Maybe Integer , annexForwardRetry :: Maybe Integer , annexRetryDelay :: Maybe Seconds + , annexStallDetection :: Maybe StallDetection , annexAllowedUrlSchemes :: S.Set Scheme , annexAllowedIPAddresses :: String , annexAllowUnverifiedDownloads :: Bool @@ -202,6 +204,9 @@ extractGitConfig configsource r = GitConfig , annexForwardRetry = getmayberead (annexConfig "forward-retry") , annexRetryDelay = Seconds <$> getmayberead (annexConfig "retrydelay") + , annexStallDetection = + either (const Nothing) Just . parseStallDetection + =<< getmaybe (annexConfig "stalldetection") , annexAllowedUrlSchemes = S.fromList $ map mkScheme $ maybe ["http", "https", "ftp"] words $ getmaybe (annexConfig "security.allowed-url-schemes") @@ -306,6 +311,7 @@ data RemoteGitConfig = RemoteGitConfig , remoteAnnexRetry :: Maybe Integer , remoteAnnexForwardRetry :: Maybe Integer , remoteAnnexRetryDelay :: Maybe Seconds + , remoteAnnexStallDetection :: Maybe StallDetection , remoteAnnexAllowUnverifiedDownloads :: Bool , remoteAnnexConfigUUID :: Maybe UUID @@ -369,6 +375,9 @@ extractRemoteGitConfig r remotename = do , remoteAnnexForwardRetry = getmayberead "forward-retry" , remoteAnnexRetryDelay = Seconds <$> getmayberead "retrydelay" + , remoteAnnexStallDetection = + either (const Nothing) Just . parseStallDetection + =<< getmaybe "stalldetection" , remoteAnnexAllowUnverifiedDownloads = (== Just "ACKTHPPT") $ getmaybe ("security-allow-unverified-downloads") , remoteAnnexConfigUUID = toUUID <$> getmaybe "config-uuid" diff --git a/Types/StallDetection.hs b/Types/StallDetection.hs new file mode 100644 index 0000000000..4b8223c8c6 --- /dev/null +++ b/Types/StallDetection.hs @@ -0,0 +1,29 @@ +{- types for stall detection + - + - Copyright 2020 Joey Hess + - + - Licensed under the GNU AGPL version 3 or higher. + -} + +module Types.StallDetection where + +import Utility.DataUnits +import Utility.HumanTime +import Utility.Misc + +-- Unless the given number of bytes have been sent over the given +-- amount of time, there's a stall. +data StallDetection = StallDetection ByteSize Duration + deriving (Show) + +-- Parse eg, "0KiB/60s" +parseStallDetection :: String -> Either String StallDetection +parseStallDetection s = + let (bs, ds) = separate (== '/') s + in do + b <- maybe + (Left $ "Unable to parse stall detection amount " ++ bs) + Right + (readSize dataUnits bs) + d <- parseDuration ds + return (StallDetection b d) diff --git a/Utility/Process.hs b/Utility/Process.hs index 94321b2dfc..4a725c858a 100644 --- a/Utility/Process.hs +++ b/Utility/Process.hs @@ -34,7 +34,7 @@ module Utility.Process ( ) where import qualified Utility.Process.Shim -import Utility.Process.Shim as X (CreateProcess(..), ProcessHandle, StdStream(..), CmdSpec(..), proc, getPid, getProcessExitCode, shell, terminateProcess) +import Utility.Process.Shim as X (CreateProcess(..), ProcessHandle, StdStream(..), CmdSpec(..), proc, getPid, getProcessExitCode, shell, terminateProcess, interruptProcessGroupOf) import Utility.Misc import Utility.Exception import Utility.Monad diff --git a/doc/git-annex.mdwn b/doc/git-annex.mdwn index 2ee80c369f..b739cdafd2 100644 --- a/doc/git-annex.mdwn +++ b/doc/git-annex.mdwn @@ -1410,7 +1410,7 @@ Remotes are configured using these settings in `.git/config`. The format is "$amount/$timeperiod". For example, to detect outright stalls where no data has been transferred - after 30 seconds: `git config annex.stalldetection "0KiB/60s"` + after 30 seconds: `git config annex.stalldetection "0/30s"` Or, if you have a remote on a USB drive that is normally capable of several megabytes per second, but has bad sectors where it gets diff --git a/doc/todo/more_extensive_retries_to_mask_transient_failures.mdwn b/doc/todo/more_extensive_retries_to_mask_transient_failures.mdwn index ebe3348ee8..c030f0da70 100644 --- a/doc/todo/more_extensive_retries_to_mask_transient_failures.mdwn +++ b/doc/todo/more_extensive_retries_to_mask_transient_failures.mdwn @@ -1,3 +1,5 @@ In a number of scenarios (e.g. [[bugs/still_seeing_errors_with_parallel_git-annex-add]], [[bugs/parallel_copy_fails]], [[git-annex-sync/#comment-aceb18109c0a536e04bcdd3aa04bda29]]), `git-annex` operations may fail or hang due to transient conditions. It would help a lot if `git-annex` could be configured to fail timed-out operations, and to retry failed operations after a delay. This would especially help when using `git-annex` in a script or a higher-level tool. I've tried wrapping some retry logic around `git-annex` calls, but it seems `git-annex` itself is in the best position to do that sensibly (e.g. only retrying idempotent operations, or capping retries per remote). This would be a catch-all fix for unusual conditions that are hard to test for. `git-annex` already has config options `annex.retry` and `annex.retry-delay`, but it seems that they don't cover all failure types. + +> Added annex.stalldetection, [[done]] --[[Joey]] diff --git a/git-annex.cabal b/git-annex.cabal index 43e3249347..6f7a9ab2c8 100644 --- a/git-annex.cabal +++ b/git-annex.cabal @@ -1023,6 +1023,7 @@ Executable git-annex Types.RepoVersion Types.ScheduledActivity Types.StandardGroups + Types.StallDetection Types.StoreRetrieve Types.Test Types.Transfer