Merge branch 'message-serialization'

This commit is contained in:
Joey Hess 2020-12-08 15:23:43 -04:00
commit d81bf4e018
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
39 changed files with 832 additions and 335 deletions

View file

@ -70,6 +70,7 @@ import Types.WorkerPool
import Types.IndexFiles import Types.IndexFiles
import Types.CatFileHandles import Types.CatFileHandles
import Types.RemoteConfig import Types.RemoteConfig
import Types.TransferrerPool
import qualified Database.Keys.Handle as Keys import qualified Database.Keys.Handle as Keys
import Utility.InodeCache import Utility.InodeCache
import Utility.Url import Utility.Url
@ -156,6 +157,7 @@ data AnnexState = AnnexState
, cachedgitenv :: Maybe (AltIndexFile, FilePath, [(String, String)]) , cachedgitenv :: Maybe (AltIndexFile, FilePath, [(String, String)])
, urloptions :: Maybe UrlOptions , urloptions :: Maybe UrlOptions
, insmudgecleanfilter :: Bool , insmudgecleanfilter :: Bool
, transferrerpool :: TransferrerPool
} }
newState :: GitConfig -> Git.Repo -> IO AnnexState newState :: GitConfig -> Git.Repo -> IO AnnexState
@ -165,6 +167,7 @@ newState c r = do
o <- newMessageState o <- newMessageState
sc <- newTMVarIO False sc <- newTMVarIO False
kh <- Keys.newDbHandle kh <- Keys.newDbHandle
tp <- newTransferrerPool
return $ AnnexState return $ AnnexState
{ repo = r { repo = r
, repoadjustment = return , repoadjustment = return
@ -217,6 +220,7 @@ newState c r = do
, cachedgitenv = Nothing , cachedgitenv = Nothing
, urloptions = Nothing , urloptions = Nothing
, insmudgecleanfilter = False , insmudgecleanfilter = False
, transferrerpool = tp
} }
{- Makes an Annex state object for the specified git repo. {- Makes an Annex state object for the specified git repo.

View file

@ -6,6 +6,8 @@
-} -}
module Annex.Action ( module Annex.Action (
action,
verifiedAction,
startup, startup,
shutdown, shutdown,
stopCoProcesses, stopCoProcesses,
@ -21,6 +23,22 @@ import Annex.CheckAttr
import Annex.HashObject import Annex.HashObject
import Annex.CheckIgnore import Annex.CheckIgnore
{- Runs an action that may throw exceptions, catching and displaying them. -}
action :: Annex () -> Annex Bool
action a = tryNonAsync a >>= \case
Right () -> return True
Left e -> do
warning (show e)
return False
verifiedAction :: Annex Verification -> Annex (Bool, Verification)
verifiedAction a = tryNonAsync a >>= \case
Right v -> return (True, v)
Left e -> do
warning (show e)
return (False, UnVerified)
{- Actions to perform each time ran. -} {- Actions to perform each time ran. -}
startup :: Annex () startup :: Annex ()
startup = return () startup = return ()

View file

@ -466,7 +466,7 @@ importKeys remote importtreeconfig importcontent importablecontents = do
return (Just (k', ok)) return (Just (k', ok))
checkDiskSpaceToGet k Nothing $ checkDiskSpaceToGet k Nothing $
notifyTransfer Download af $ notifyTransfer Download af $
download (Remote.uuid remote) k af stdRetry $ \p' -> download' (Remote.uuid remote) k af stdRetry $ \p' ->
withTmp k $ downloader p' withTmp k $ downloader p'
-- The file is small, so is added to git, so while importing -- The file is small, so is added to git, so while importing
@ -520,7 +520,7 @@ importKeys remote importtreeconfig importcontent importablecontents = do
return Nothing return Nothing
checkDiskSpaceToGet tmpkey Nothing $ checkDiskSpaceToGet tmpkey Nothing $
notifyTransfer Download af $ notifyTransfer Download af $
download (Remote.uuid remote) tmpkey af stdRetry $ \p -> download' (Remote.uuid remote) tmpkey af stdRetry $ \p ->
withTmp tmpkey $ \tmpfile -> withTmp tmpkey $ \tmpfile ->
metered (Just p) tmpkey $ metered (Just p) tmpkey $
const (rundownload tmpfile) const (rundownload tmpfile)

View file

@ -10,13 +10,16 @@
module Annex.Transfer ( module Annex.Transfer (
module X, module X,
upload, upload,
upload',
alwaysUpload, alwaysUpload,
download, download,
download',
runTransfer, runTransfer,
alwaysRunTransfer, alwaysRunTransfer,
noRetry, noRetry,
stdRetry, stdRetry,
pickRemote, pickRemote,
stallDetection,
) where ) where
import Annex.Common import Annex.Common
@ -24,7 +27,9 @@ import qualified Annex
import Logs.Transfer as X import Logs.Transfer as X
import Types.Transfer as X import Types.Transfer as X
import Annex.Notification as X import Annex.Notification as X
import Annex.Content
import Annex.Perms import Annex.Perms
import Annex.Action
import Utility.Metered import Utility.Metered
import Utility.ThreadScheduler import Utility.ThreadScheduler
import Annex.LockPool import Annex.LockPool
@ -34,7 +39,9 @@ import Types.Concurrency
import Annex.Concurrent.Utility import Annex.Concurrent.Utility
import Types.WorkerPool import Types.WorkerPool
import Annex.WorkerPool import Annex.WorkerPool
import Annex.TransferrerPool
import Backend (isCryptographicallySecure) import Backend (isCryptographicallySecure)
import Types.StallDetection
import qualified Utility.RawFilePath as R import qualified Utility.RawFilePath as R
import Control.Concurrent import Control.Concurrent
@ -42,16 +49,36 @@ import qualified Data.Map.Strict as M
import qualified System.FilePath.ByteString as P import qualified System.FilePath.ByteString as P
import Data.Ord import Data.Ord
upload :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v -- Upload, supporting stall detection.
upload u key f d a _witness = guardHaveUUID u $ upload :: Remote -> Key -> AssociatedFile -> RetryDecider -> NotifyWitness -> Annex Bool
upload r key f d witness = stallDetection r >>= \case
Nothing -> upload' (Remote.uuid r) key f d go witness
Just sd -> runTransferrer sd r key f d Upload witness
where
go = action . Remote.storeKey r key f
-- Upload, not supporting stall detection.
upload' :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v
upload' u key f d a _witness = guardHaveUUID u $
runTransfer (Transfer Upload u (fromKey id key)) f d a runTransfer (Transfer Upload u (fromKey id key)) f d a
alwaysUpload :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v alwaysUpload :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v
alwaysUpload u key f d a _witness = guardHaveUUID u $ alwaysUpload u key f d a _witness = guardHaveUUID u $
alwaysRunTransfer (Transfer Upload u (fromKey id key)) f d a alwaysRunTransfer (Transfer Upload u (fromKey id key)) f d a
download :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v -- Download, supporting stall detection.
download u key f d a _witness = guardHaveUUID u $ download :: Remote -> Key -> AssociatedFile -> RetryDecider -> NotifyWitness -> Annex Bool
download r key f d witness = stallDetection r >>= \case
Nothing -> getViaTmp (Remote.retrievalSecurityPolicy r) (RemoteVerify r) key f $ \dest ->
download' (Remote.uuid r) key f d (go dest) witness
Just sd -> runTransferrer sd r key f d Download witness
where
go dest p = verifiedAction $
Remote.retrieveKeyFile r key f (fromRawFilePath dest) p
-- Download, not supporting stall detection.
download' :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v
download' u key f d a _witness = guardHaveUUID u $
runTransfer (Transfer Download u (fromKey id key)) f d a runTransfer (Transfer Download u (fromKey id key)) f d a
guardHaveUUID :: Observable v => UUID -> Annex v -> Annex v guardHaveUUID :: Observable v => UUID -> Annex v -> Annex v
@ -81,7 +108,7 @@ alwaysRunTransfer :: Observable v => Transfer -> AssociatedFile -> RetryDecider
alwaysRunTransfer = runTransfer' True alwaysRunTransfer = runTransfer' True
runTransfer' :: Observable v => Bool -> Transfer -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> Annex v runTransfer' :: Observable v => Bool -> Transfer -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> Annex v
runTransfer' ignorelock t afile retrydecider transferaction = enteringStage TransferStage $ debugLocks $ checkSecureHashes t $ do runTransfer' ignorelock t afile retrydecider transferaction = enteringStage TransferStage $ debugLocks $ preCheckSecureHashes (transferKey t) $ do
info <- liftIO $ startTransferInfo afile info <- liftIO $ startTransferInfo afile
(meter, tfile, createtfile, metervar) <- mkProgressUpdater t info (meter, tfile, createtfile, metervar) <- mkProgressUpdater t info
mode <- annexFileMode mode <- annexFileMode
@ -168,6 +195,31 @@ runTransfer' ignorelock t afile retrydecider transferaction = enteringStage Tran
f <- fromRepo $ gitAnnexTmpObjectLocation (transferKey t) f <- fromRepo $ gitAnnexTmpObjectLocation (transferKey t)
liftIO $ catchDefaultIO 0 $ getFileSize f liftIO $ catchDefaultIO 0 $ getFileSize f
runTransferrer
:: StallDetection
-> Remote
-> Key
-> AssociatedFile
-> RetryDecider
-> Direction
-> NotifyWitness
-> Annex Bool
runTransferrer sd r k afile retrydecider direction _witness =
enteringStage TransferStage $ preCheckSecureHashes k $ do
info <- liftIO $ startTransferInfo afile
go 0 info
where
go numretries info =
withTransferrer (performTransfer (Just sd) AnnexLevel id (Just r) t info) >>= \case
Right () -> return True
Left newinfo -> do
let !numretries' = succ numretries
ifM (retrydecider numretries' info newinfo)
( go numretries' newinfo
, return False
)
t = Transfer direction (Remote.uuid r) (fromKey id k)
{- Avoid download and upload of keys with insecure content when {- Avoid download and upload of keys with insecure content when
- annex.securehashesonly is configured. - annex.securehashesonly is configured.
- -
@ -180,8 +232,8 @@ runTransfer' ignorelock t afile retrydecider transferaction = enteringStage Tran
- still contains content using an insecure hash, remotes will likewise - still contains content using an insecure hash, remotes will likewise
- tend to be configured to reject it, so Upload is also prevented. - tend to be configured to reject it, so Upload is also prevented.
-} -}
checkSecureHashes :: Observable v => Transfer -> Annex v -> Annex v preCheckSecureHashes :: Observable v => Key -> Annex v -> Annex v
checkSecureHashes t a = ifM (isCryptographicallySecure (transferKey t)) preCheckSecureHashes k a = ifM (isCryptographicallySecure k)
( a ( a
, ifM (annexSecureHashesOnly <$> Annex.getGitConfig) , ifM (annexSecureHashesOnly <$> Annex.getGitConfig)
( do ( do
@ -191,7 +243,7 @@ checkSecureHashes t a = ifM (isCryptographicallySecure (transferKey t))
) )
) )
where where
variety = fromKey keyVariety (transferKey t) variety = fromKey keyVariety k
type NumRetries = Integer type NumRetries = Integer
@ -314,3 +366,9 @@ lessActiveFirst :: M.Map Remote Integer -> Remote -> Remote -> Ordering
lessActiveFirst active a b lessActiveFirst active a b
| Remote.cost a == Remote.cost b = comparing (`M.lookup` active) a b | Remote.cost a == Remote.cost b = comparing (`M.lookup` active) a b
| otherwise = comparing Remote.cost a b | otherwise = comparing Remote.cost a b
stallDetection :: Remote -> Annex (Maybe StallDetection)
stallDetection r = maybe globalcfg (pure . Just) remotecfg
where
globalcfg = annexStallDetection <$> Annex.getGitConfig
remotecfg = remoteAnnexStallDetection $ Remote.gitconfig r

264
Annex/TransferrerPool.hs Normal file
View file

@ -0,0 +1,264 @@
{- A pool of "git-annex transferkeys" processes
-
- Copyright 2013-2020 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
{-# LANGUAGE RankNTypes #-}
module Annex.TransferrerPool where
import Annex.Common
import qualified Annex
import Types.TransferrerPool
import Types.Transfer
import Types.Key
import qualified Types.Remote as Remote
import Git.Types (RemoteName)
import Types.StallDetection
import Types.Messages
import Messages.Serialized
import Annex.Path
import Utility.Batch
import Utility.Metered
import Utility.HumanTime
import Utility.ThreadScheduler
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM hiding (check)
import Control.Monad.IO.Class (MonadIO)
import Text.Read (readMaybe)
import Data.Time.Clock.POSIX
import System.Log.Logger (debugM)
data TransferRequest = TransferRequest TransferRequestLevel Direction (Either UUID RemoteName) KeyData AssociatedFile
deriving (Show, Read)
data TransferRequestLevel = AnnexLevel | AssistantLevel
deriving (Show, Read)
data TransferResponse
= TransferOutput SerializedOutput
| TransferResult Bool
deriving (Show, Read)
{- Runs an action with a Transferrer from the pool. -}
withTransferrer :: (Transferrer -> Annex a) -> Annex a
withTransferrer a = do
program <- liftIO programPath
pool <- Annex.getState Annex.transferrerpool
let nocheck = pure (pure True)
withTransferrer' False nocheck program nonBatchCommandMaker pool a
withTransferrer'
:: (MonadIO m, MonadFail m, MonadMask m)
=> Bool
-- ^ When minimizeprocesses is True, only one Transferrer is left
-- running in the pool at a time. So if this needed to start a
-- new Transferrer, it's stopped when done. Otherwise, idle
-- processes are left in the pool for use later.
-> MkCheckTransferrer
-> FilePath
-> BatchCommandMaker
-> TransferrerPool
-> (Transferrer -> m a)
-> m a
withTransferrer' minimizeprocesses mkcheck program batchmaker pool a = do
(mi, leftinpool) <- liftIO $ atomically (popTransferrerPool pool)
i@(TransferrerPoolItem (Just t) check) <- liftIO $ case mi of
Nothing -> mkTransferrerPoolItem mkcheck =<< mkTransferrer program batchmaker
Just i -> checkTransferrerPoolItem program batchmaker i
a t `finally` returntopool leftinpool check t i
where
returntopool leftinpool check t i
| not minimizeprocesses || leftinpool == 0 =
-- If the transferrer got killed, the handles will
-- be closed, so it should not be returned to the
-- pool.
liftIO $ whenM (hIsOpen (transferrerWrite t)) $
liftIO $ atomically $ pushTransferrerPool pool i
| otherwise = liftIO $ do
void $ forkIO $ shutdownTransferrer t
atomically $ pushTransferrerPool pool $ TransferrerPoolItem Nothing check
{- Check if a Transferrer from the pool is still ok to be used.
- If not, stop it and start a new one. -}
checkTransferrerPoolItem :: FilePath -> BatchCommandMaker -> TransferrerPoolItem -> IO TransferrerPoolItem
checkTransferrerPoolItem program batchmaker i = case i of
TransferrerPoolItem (Just t) check -> ifM check
( return i
, do
shutdownTransferrer t
new check
)
TransferrerPoolItem Nothing check -> new check
where
new check = do
t <- mkTransferrer program batchmaker
return $ TransferrerPoolItem (Just t) check
{- Requests that a Transferrer perform a Transfer, and waits for it to
- finish.
-
- When a stall is detected, kills the Transferrer.
-
- If the transfer failed or stalled, returns TransferInfo with an
- updated bytesComplete reflecting how much data has been transferred.
-}
performTransfer
:: (Monad m, MonadIO m, MonadMask m)
=> Maybe StallDetection
-> TransferRequestLevel
-> (forall a. Annex a -> m a)
-- ^ Run an annex action in the monad. Will not be used with
-- actions that block for a long time.
-> Maybe Remote
-> Transfer
-> TransferInfo
-> Transferrer
-> m (Either TransferInfo ())
performTransfer stalldetection level runannex r t info transferrer = do
bpv <- liftIO $ newTVarIO zeroBytesProcessed
ifM (catchBoolIO $ bracket setup cleanup (go bpv))
( return (Right ())
, do
n <- case transferDirection t of
Upload -> liftIO $ atomically $
fromBytesProcessed <$> readTVar bpv
Download -> do
f <- runannex $ fromRepo $ gitAnnexTmpObjectLocation (transferKey t)
liftIO $ catchDefaultIO 0 $ getFileSize f
return $ Left $ info { bytesComplete = Just n }
)
where
setup = do
liftIO $ sendRequest level t r
(associatedFile info)
(transferrerWrite transferrer)
metervar <- liftIO $ newEmptyTMVarIO
stalledvar <- liftIO $ newTVarIO False
tid <- liftIO $ async $
detectStalls stalldetection metervar $ do
atomically $ writeTVar stalledvar True
killTransferrer transferrer
return (metervar, tid, stalledvar)
cleanup (_, tid, stalledvar) = do
liftIO $ uninterruptibleCancel tid
whenM (liftIO $ atomically $ readTVar stalledvar) $ do
runannex $ showLongNote "Transfer stalled"
-- Close handles, to prevent the transferrer being
-- reused since the process was killed.
liftIO $ hClose $ transferrerRead transferrer
liftIO $ hClose $ transferrerWrite transferrer
go bpv (metervar, _, _) = relaySerializedOutput
(liftIO $ readResponse (transferrerRead transferrer))
(liftIO . sendSerializedOutputResponse (transferrerWrite transferrer))
(updatemeter bpv metervar)
runannex
updatemeter bpv metervar (Just n) = liftIO $ do
atomically $ do
void $ tryTakeTMVar metervar
putTMVar metervar n
atomically $ writeTVar bpv n
updatemeter _bpv metervar Nothing = liftIO $
atomically $ void $ tryTakeTMVar metervar
detectStalls :: Maybe StallDetection -> TMVar BytesProcessed -> IO () -> IO ()
detectStalls Nothing _ _ = noop
detectStalls (Just (StallDetection minsz duration)) metervar onstall = go Nothing
where
go st = do
starttm <- getPOSIXTime
threadDelaySeconds (Seconds (fromIntegral (durationSeconds duration)))
-- Get whatever progress value was reported most recently, or
-- if none were reported since last time, wait until one is
-- reported.
sofar <- atomically $ fromBytesProcessed <$> takeTMVar metervar
case st of
Nothing -> go (Just sofar)
Just prev
-- Just in case a progress meter somehow runs
-- backwards, or a second progress meter was
-- started and is at a smaller value than
-- the previous one.
| prev > sofar -> go (Just sofar)
| otherwise -> do
endtm <- getPOSIXTime
let actualduration = endtm - starttm
let sz = sofar - prev
let expectedsz = (minsz * durationSeconds duration)
`div` max 1 (ceiling actualduration)
if sz < expectedsz
then onstall
else go (Just sofar)
{- Starts a new git-annex transferkeys process, setting up handles
- that will be used to communicate with it. -}
mkTransferrer :: FilePath -> BatchCommandMaker -> IO Transferrer
mkTransferrer program batchmaker = do
{- It runs as a batch job. -}
let (program', params') = batchmaker (program, [Param "transferkeys"])
{- It's put into its own group so that the whole group can be
- killed to stop a transfer. -}
(Just writeh, Just readh, _, pid) <- createProcess
(proc program' $ toCommand params')
{ create_group = True
, std_in = CreatePipe
, std_out = CreatePipe
}
return $ Transferrer
{ transferrerRead = readh
, transferrerWrite = writeh
, transferrerHandle = pid
}
-- | Send a request to perform a transfer.
sendRequest :: TransferRequestLevel -> Transfer -> Maybe Remote -> AssociatedFile -> Handle -> IO ()
sendRequest level t mremote afile h = do
let l = show $ TransferRequest level
(transferDirection t)
(maybe (Left (transferUUID t)) (Right . Remote.name) mremote)
(keyData (transferKey t))
afile
debugM "transfer" ("> " ++ l)
hPutStrLn h l
hFlush h
sendSerializedOutputResponse :: Handle -> SerializedOutputResponse -> IO ()
sendSerializedOutputResponse h sor = hPutStrLn h $ show sor
-- | Read a response to a transfer requests.
--
-- Before the final response, this will return whatever SerializedOutput
-- should be displayed as the transfer is performed.
readResponse :: Handle -> IO (Either SerializedOutput Bool)
readResponse h = do
l <- liftIO $ hGetLine h
debugM "transfer" ("< " ++ l)
case readMaybe l of
Just (TransferOutput so) -> return (Left so)
Just (TransferResult r) -> return (Right r)
Nothing -> transferKeysProtocolError l
transferKeysProtocolError :: String -> a
transferKeysProtocolError l = error $ "transferkeys protocol error: " ++ show l
{- Closing the fds will shut down the transferrer, but only when it's
- in between transfers. -}
shutdownTransferrer :: Transferrer -> IO ()
shutdownTransferrer t = do
hClose $ transferrerRead t
hClose $ transferrerWrite t
void $ waitForProcess $ transferrerHandle t
{- Kill the transferrer, and all its child processes. -}
killTransferrer :: Transferrer -> IO ()
killTransferrer t = do
interruptProcessGroupOf $ transferrerHandle t
threadDelay 50000 -- 0.05 second grace period
terminateProcess $ transferrerHandle t

View file

@ -35,7 +35,6 @@ import Assistant.Types.DaemonStatus
import Assistant.Types.ScanRemotes import Assistant.Types.ScanRemotes
import Assistant.Types.TransferQueue import Assistant.Types.TransferQueue
import Assistant.Types.TransferSlots import Assistant.Types.TransferSlots
import Assistant.Types.TransferrerPool
import Assistant.Types.Pushes import Assistant.Types.Pushes
import Assistant.Types.BranchChange import Assistant.Types.BranchChange
import Assistant.Types.Commits import Assistant.Types.Commits
@ -65,7 +64,6 @@ data AssistantData = AssistantData
, scanRemoteMap :: ScanRemoteMap , scanRemoteMap :: ScanRemoteMap
, transferQueue :: TransferQueue , transferQueue :: TransferQueue
, transferSlots :: TransferSlots , transferSlots :: TransferSlots
, transferrerPool :: TransferrerPool
, failedPushMap :: FailedPushMap , failedPushMap :: FailedPushMap
, failedExportMap :: FailedPushMap , failedExportMap :: FailedPushMap
, commitChan :: CommitChan , commitChan :: CommitChan
@ -85,7 +83,6 @@ newAssistantData st dstatus = AssistantData
<*> newScanRemoteMap <*> newScanRemoteMap
<*> newTransferQueue <*> newTransferQueue
<*> newTransferSlots <*> newTransferSlots
<*> newTransferrerPool (checkNetworkConnections dstatus)
<*> newFailedPushMap <*> newFailedPushMap
<*> newFailedPushMap <*> newFailedPushMap
<*> newCommitChan <*> newCommitChan

View file

@ -1,6 +1,6 @@
{- git-annex assistant transfer slots {- git-annex assistant transfer slots
- -
- Copyright 2012 Joey Hess <id@joeyh.name> - Copyright 2012-2020 Joey Hess <id@joeyh.name>
- -
- Licensed under the GNU AGPL version 3 or higher. - Licensed under the GNU AGPL version 3 or higher.
-} -}
@ -9,22 +9,27 @@
module Assistant.TransferSlots where module Assistant.TransferSlots where
import Control.Concurrent.STM
import Assistant.Common import Assistant.Common
import Utility.ThreadScheduler import Utility.ThreadScheduler
import Utility.NotificationBroadcaster
import Assistant.Types.TransferSlots import Assistant.Types.TransferSlots
import Assistant.DaemonStatus import Assistant.DaemonStatus
import Assistant.TransferrerPool import Annex.TransferrerPool
import Assistant.Types.TransferrerPool import Types.TransferrerPool
import Assistant.Types.TransferQueue import Assistant.Types.TransferQueue
import Assistant.TransferQueue import Assistant.TransferQueue
import Assistant.Alert import Assistant.Alert
import Assistant.Alert.Utility import Assistant.Alert.Utility
import Assistant.Commits import Assistant.Commits
import Assistant.Drop import Assistant.Drop
import Annex.Transfer (stallDetection)
import Types.Transfer import Types.Transfer
import Logs.Transfer import Logs.Transfer
import Logs.Location import Logs.Location
import qualified Git import qualified Git
import qualified Annex
import qualified Remote import qualified Remote
import qualified Types.Remote as Remote import qualified Types.Remote as Remote
import Annex.Content import Annex.Content
@ -33,6 +38,7 @@ import Annex.Path
import Utility.Batch import Utility.Batch
import Types.NumCopies import Types.NumCopies
import Data.Either
import qualified Data.Map as M import qualified Data.Map as M
import qualified Control.Exception as E import qualified Control.Exception as E
import Control.Concurrent import Control.Concurrent
@ -75,16 +81,19 @@ runTransferThread :: FilePath -> BatchCommandMaker -> Maybe (Transfer, TransferI
runTransferThread _ _ Nothing = flip MSemN.signal 1 <<~ transferSlots runTransferThread _ _ Nothing = flip MSemN.signal 1 <<~ transferSlots
runTransferThread program batchmaker (Just (t, info, a)) = do runTransferThread program batchmaker (Just (t, info, a)) = do
d <- getAssistant id d <- getAssistant id
mkcheck <- checkNetworkConnections
<$> getAssistant daemonStatusHandle
aio <- asIO1 a aio <- asIO1 a
tid <- liftIO $ forkIO $ runTransferThread' program batchmaker d aio tid <- liftIO $ forkIO $ runTransferThread' mkcheck program batchmaker d aio
updateTransferInfo t $ info { transferTid = Just tid } updateTransferInfo t $ info { transferTid = Just tid }
runTransferThread' :: FilePath -> BatchCommandMaker -> AssistantData -> (Transferrer -> IO ()) -> IO () runTransferThread' :: MkCheckTransferrer -> FilePath -> BatchCommandMaker -> AssistantData -> (Transferrer -> IO ()) -> IO ()
runTransferThread' program batchmaker d run = go runTransferThread' mkcheck program batchmaker d run = go
where where
go = catchPauseResume $ go = catchPauseResume $ do
withTransferrer program batchmaker (transferrerPool d) p <- runAssistant d $ liftAnnex $
run Annex.getState Annex.transferrerpool
withTransferrer' True mkcheck program batchmaker p run
pause = catchPauseResume $ pause = catchPauseResume $
runEvery (Seconds 86400) noop runEvery (Seconds 86400) noop
{- Note: This must use E.try, rather than E.catch. {- Note: This must use E.try, rather than E.catch.
@ -116,7 +125,8 @@ genTransfer t info = case transferRemote info of
( do ( do
debug [ "Transferring:" , describeTransfer t info ] debug [ "Transferring:" , describeTransfer t info ]
notifyTransfer notifyTransfer
return $ Just (t, info, go remote) sd <- liftAnnex $ stallDetection remote
return $ Just (t, info, go remote sd)
, do , do
debug [ "Skipping unnecessary transfer:", debug [ "Skipping unnecessary transfer:",
describeTransfer t info ] describeTransfer t info ]
@ -155,7 +165,7 @@ genTransfer t info = case transferRemote info of
- usual cleanup. However, first check if something else is - usual cleanup. However, first check if something else is
- running the transfer, to avoid removing active transfers. - running the transfer, to avoid removing active transfers.
-} -}
go remote transferrer = ifM (liftIO $ performTransfer transferrer t info) go remote sd transferrer = ifM (isRight <$> performTransfer sd AssistantLevel liftAnnex (transferRemote info) t info transferrer)
( do ( do
case associatedFile info of case associatedFile info of
AssociatedFile Nothing -> noop AssociatedFile Nothing -> noop
@ -298,3 +308,9 @@ startTransfer t = do
getCurrentTransfers :: Assistant TransferMap getCurrentTransfers :: Assistant TransferMap
getCurrentTransfers = currentTransfers <$> getDaemonStatus getCurrentTransfers = currentTransfers <$> getDaemonStatus
checkNetworkConnections :: DaemonStatusHandle -> MkCheckTransferrer
checkNetworkConnections dstatushandle = do
dstatus <- atomically $ readTVar dstatushandle
h <- newNotificationHandle False (networkConnectedNotifier dstatus)
return $ not <$> checkNotification h

View file

@ -1,94 +0,0 @@
{- A pool of "git-annex transferkeys" processes
-
- Copyright 2013 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
module Assistant.TransferrerPool where
import Assistant.Common
import Assistant.Types.TransferrerPool
import Types.Transfer
import Utility.Batch
import qualified Command.TransferKeys as T
import Control.Concurrent.STM hiding (check)
import Control.Exception (throw)
import Control.Concurrent
{- Runs an action with a Transferrer from the pool.
-
- Only one Transferrer is left running in the pool at a time.
- So if this needed to start a new Transferrer, it's stopped when done.
-}
withTransferrer :: FilePath -> BatchCommandMaker -> TransferrerPool -> (Transferrer -> IO a) -> IO a
withTransferrer program batchmaker pool a = do
(mi, leftinpool) <- atomically (popTransferrerPool pool)
i@(TransferrerPoolItem (Just t) check) <- case mi of
Nothing -> mkTransferrerPoolItem pool =<< mkTransferrer program batchmaker
Just i -> checkTransferrerPoolItem program batchmaker i
v <- tryNonAsync $ a t
if leftinpool == 0
then atomically $ pushTransferrerPool pool i
else do
void $ forkIO $ stopTransferrer t
atomically $ pushTransferrerPool pool $ TransferrerPoolItem Nothing check
either throw return v
{- Check if a Transferrer from the pool is still ok to be used.
- If not, stop it and start a new one. -}
checkTransferrerPoolItem :: FilePath -> BatchCommandMaker -> TransferrerPoolItem -> IO TransferrerPoolItem
checkTransferrerPoolItem program batchmaker i = case i of
TransferrerPoolItem (Just t) check -> ifM check
( return i
, do
stopTransferrer t
new check
)
TransferrerPoolItem Nothing check -> new check
where
new check = do
t <- mkTransferrer program batchmaker
return $ TransferrerPoolItem (Just t) check
{- Requests that a Transferrer perform a Transfer, and waits for it to
- finish. -}
performTransfer :: Transferrer -> Transfer -> TransferInfo -> IO Bool
performTransfer transferrer t info = catchBoolIO $ do
T.sendRequest t info (transferrerWrite transferrer)
T.readResponse (transferrerRead transferrer)
{- Starts a new git-annex transferkeys process, setting up handles
- that will be used to communicate with it. -}
mkTransferrer :: FilePath -> BatchCommandMaker -> IO Transferrer
mkTransferrer program batchmaker = do
{- It runs as a batch job. -}
let (program', params') = batchmaker (program, [Param "transferkeys"])
{- It's put into its own group so that the whole group can be
- killed to stop a transfer. -}
(Just writeh, Just readh, _, pid) <- createProcess
(proc program' $ toCommand params')
{ create_group = True
, std_in = CreatePipe
, std_out = CreatePipe
}
return $ Transferrer
{ transferrerRead = readh
, transferrerWrite = writeh
, transferrerHandle = pid
}
{- Checks if a Transferrer is still running. If not, makes a new one. -}
checkTransferrer :: FilePath -> BatchCommandMaker -> Transferrer -> IO Transferrer
checkTransferrer program batchmaker t =
maybe (return t) (const $ mkTransferrer program batchmaker)
=<< getProcessExitCode (transferrerHandle t)
{- Closing the fds will stop the transferrer. -}
stopTransferrer :: Transferrer -> IO ()
stopTransferrer t = do
hClose $ transferrerRead t
hClose $ transferrerWrite t
void $ waitForProcess $ transferrerHandle t

View file

@ -1,5 +1,8 @@
git-annex (8.20201128) UNRELEASED; urgency=medium git-annex (8.20201128) UNRELEASED; urgency=medium
* New config annex.stalldetection, remote.name.annex-stalldetection,
which can be used to deal with remotes that stall during transfers,
or are sometimes too slow to want to use.
* Fix hang on shutdown of external special remote using ASYNC protocol * Fix hang on shutdown of external special remote using ASYNC protocol
extension. (Reversion introduced in version 8.20201007.) extension. (Reversion introduced in version 8.20201007.)
* Fix bug that made the next download after an empty file from a ssh * Fix bug that made the next download after an empty file from a ssh

View file

@ -332,7 +332,7 @@ downloadWeb addunlockedmatcher o url urlinfo file =
let cleanuptmp = pruneTmpWorkDirBefore tmp (liftIO . removeWhenExistsWith R.removeLink) let cleanuptmp = pruneTmpWorkDirBefore tmp (liftIO . removeWhenExistsWith R.removeLink)
showNote "using youtube-dl" showNote "using youtube-dl"
Transfer.notifyTransfer Transfer.Download url $ Transfer.notifyTransfer Transfer.Download url $
Transfer.download webUUID mediakey (AssociatedFile Nothing) Transfer.noRetry $ \p -> Transfer.download' webUUID mediakey (AssociatedFile Nothing) Transfer.noRetry $ \p ->
youtubeDl url (fromRawFilePath workdir) p >>= \case youtubeDl url (fromRawFilePath workdir) p >>= \case
Right (Just mediafile) -> do Right (Just mediafile) -> do
cleanuptmp cleanuptmp
@ -396,7 +396,7 @@ downloadWith' downloader dummykey u url afile =
checkDiskSpaceToGet dummykey Nothing $ do checkDiskSpaceToGet dummykey Nothing $ do
tmp <- fromRepo $ gitAnnexTmpObjectLocation dummykey tmp <- fromRepo $ gitAnnexTmpObjectLocation dummykey
ok <- Transfer.notifyTransfer Transfer.Download url $ ok <- Transfer.notifyTransfer Transfer.Download url $
Transfer.download u dummykey afile Transfer.stdRetry $ \p -> do Transfer.download' u dummykey afile Transfer.stdRetry $ \p -> do
createAnnexDirectory (parentDir tmp) createAnnexDirectory (parentDir tmp)
downloader (fromRawFilePath tmp) p downloader (fromRawFilePath tmp) p
if ok if ok

View file

@ -283,7 +283,7 @@ performExport r db ek af contentsha loc allfilledvar = do
sent <- tryNonAsync $ case ek of sent <- tryNonAsync $ case ek of
AnnexKey k -> ifM (inAnnex k) AnnexKey k -> ifM (inAnnex k)
( notifyTransfer Upload af $ ( notifyTransfer Upload af $
upload (uuid r) k af stdRetry $ \pm -> do upload' (uuid r) k af stdRetry $ \pm -> do
let rollback = void $ let rollback = void $
performUnexport r db [ek] loc performUnexport r db [ek] loc
sendAnnex k rollback $ \f -> sendAnnex k rollback $ \f ->

View file

@ -9,7 +9,6 @@ module Command.Get where
import Command import Command
import qualified Remote import qualified Remote
import Annex.Content
import Annex.Transfer import Annex.Transfer
import Annex.NumCopies import Annex.NumCopies
import Annex.Wanted import Annex.Wanted
@ -114,10 +113,6 @@ getKey' key afile = dispatch
| Remote.hasKeyCheap r = | Remote.hasKeyCheap r =
either (const False) id <$> Remote.hasKey r key either (const False) id <$> Remote.hasKey r key
| otherwise = return True | otherwise = return True
docopy r witness = getViaTmp (Remote.retrievalSecurityPolicy r) (RemoteVerify r) key afile $ \dest -> docopy r witness = do
download (Remote.uuid r) key afile stdRetry showAction $ "from " ++ Remote.name r
(\p -> do download r key afile stdRetry witness
showAction $ "from " ++ Remote.name r
Remote.verifiedAction $
Remote.retrieveKeyFile r key afile (fromRawFilePath dest) p
) witness

View file

@ -142,8 +142,7 @@ toPerform dest removewhen key afile fastcheck isthere = do
Right False -> logMove srcuuid destuuid False key $ \deststartedwithcopy -> do Right False -> logMove srcuuid destuuid False key $ \deststartedwithcopy -> do
showAction $ "to " ++ Remote.name dest showAction $ "to " ++ Remote.name dest
ok <- notifyTransfer Upload afile $ ok <- notifyTransfer Upload afile $
upload (Remote.uuid dest) key afile stdRetry $ upload dest key afile stdRetry
Remote.action . Remote.storeKey dest key afile
if ok if ok
then finish deststartedwithcopy $ then finish deststartedwithcopy $
Remote.logStatus dest key InfoPresent Remote.logStatus dest key InfoPresent
@ -223,10 +222,8 @@ fromPerform src removewhen key afile = do
then dispatch removewhen deststartedwithcopy True then dispatch removewhen deststartedwithcopy True
else dispatch removewhen deststartedwithcopy =<< get else dispatch removewhen deststartedwithcopy =<< get
where where
get = notifyTransfer Download afile $ get = notifyTransfer Download afile $
download (Remote.uuid src) key afile stdRetry $ \p -> download src key afile stdRetry
getViaTmp (Remote.retrievalSecurityPolicy src) (RemoteVerify src) key afile $ \t ->
Remote.verifiedAction $ Remote.retrieveKeyFile src key afile (fromRawFilePath t) p
dispatch _ _ False = stop -- failed dispatch _ _ False = stop -- failed
dispatch RemoveNever _ True = next $ return True -- copy complete dispatch RemoveNever _ True = next $ return True -- copy complete

View file

@ -51,7 +51,7 @@ start o (_, key) = startingCustomOutput key $ case fromToOptions o of
toPerform :: Key -> AssociatedFile -> Remote -> CommandPerform toPerform :: Key -> AssociatedFile -> Remote -> CommandPerform
toPerform key file remote = go Upload file $ toPerform key file remote = go Upload file $
upload (uuid remote) key file stdRetry $ \p -> do upload' (uuid remote) key file stdRetry $ \p -> do
tryNonAsync (Remote.storeKey remote key file p) >>= \case tryNonAsync (Remote.storeKey remote key file p) >>= \case
Right () -> do Right () -> do
Remote.logStatus remote key InfoPresent Remote.logStatus remote key InfoPresent
@ -62,7 +62,7 @@ toPerform key file remote = go Upload file $
fromPerform :: Key -> AssociatedFile -> Remote -> CommandPerform fromPerform :: Key -> AssociatedFile -> Remote -> CommandPerform
fromPerform key file remote = go Upload file $ fromPerform key file remote = go Upload file $
download (uuid remote) key file stdRetry $ \p -> download' (uuid remote) key file stdRetry $ \p ->
getViaTmp (retrievalSecurityPolicy remote) (RemoteVerify remote) key file $ \t -> getViaTmp (retrievalSecurityPolicy remote) (RemoteVerify remote) key file $ \t ->
tryNonAsync (Remote.retrieveKeyFile remote key file (fromRawFilePath t) p) >>= \case tryNonAsync (Remote.retrieveKeyFile remote key file (fromRawFilePath t) p) >>= \case
Right v -> return (True, v) Right v -> return (True, v)

View file

@ -1,25 +1,25 @@
{- git-annex command, used internally by assistant {- git-annex command
- -
- Copyright 2012, 2013 Joey Hess <id@joeyh.name> - Copyright 2012-2020 Joey Hess <id@joeyh.name>
- -
- Licensed under the GNU AGPL version 3 or higher. - Licensed under the GNU AGPL version 3 or higher.
-} -}
{-# LANGUAGE TypeSynonymInstances, FlexibleInstances #-}
module Command.TransferKeys where module Command.TransferKeys where
import Command import Command
import qualified Annex
import Annex.Content import Annex.Content
import Logs.Location import Logs.Location
import Annex.Transfer import Annex.Transfer
import qualified Remote import qualified Remote
import Utility.SimpleProtocol (dupIoHandles) import Utility.SimpleProtocol (dupIoHandles)
import Git.Types (RemoteName)
import qualified Database.Keys import qualified Database.Keys
import Annex.BranchState import Annex.BranchState
import Types.Messages
import Annex.TransferrerPool
data TransferRequest = TransferRequest Direction Remote Key AssociatedFile import Text.Read (readMaybe)
cmd :: Command cmd :: Command
cmd = command "transferkeys" SectionPlumbing "transfers keys" cmd = command "transferkeys" SectionPlumbing "transfers keys"
@ -32,12 +32,33 @@ start :: CommandStart
start = do start = do
enableInteractiveBranchAccess enableInteractiveBranchAccess
(readh, writeh) <- liftIO dupIoHandles (readh, writeh) <- liftIO dupIoHandles
Annex.setOutput $ SerializedOutput
(\v -> hPutStrLn writeh (show (TransferOutput v)) >> hFlush writeh)
(readMaybe <$> hGetLine readh)
runRequests readh writeh runner runRequests readh writeh runner
stop stop
where where
runner (TransferRequest direction remote key file) runner (TransferRequest AnnexLevel direction _ keydata file) remote
| direction == Upload =
-- This is called by eg, Annex.Transfer.upload,
-- so caller is responsible for doing notification,
-- and for retrying.
upload' (Remote.uuid remote) key file noRetry
(Remote.action . Remote.storeKey remote key file)
noNotification
| otherwise =
-- This is called by eg, Annex.Transfer.download
-- so caller is responsible for doing notification
-- and for retrying.
let go p = getViaTmp (Remote.retrievalSecurityPolicy remote) (RemoteVerify remote) key file $ \t -> do
Remote.verifiedAction (Remote.retrieveKeyFile remote key file (fromRawFilePath t) p)
in download' (Remote.uuid remote) key file noRetry go
noNotification
where
key = mkKey (const keydata)
runner (TransferRequest AssistantLevel direction _ keydata file) remote
| direction == Upload = notifyTransfer direction file $ | direction == Upload = notifyTransfer direction file $
upload (Remote.uuid remote) key file stdRetry $ \p -> do upload' (Remote.uuid remote) key file stdRetry $ \p -> do
tryNonAsync (Remote.storeKey remote key file p) >>= \case tryNonAsync (Remote.storeKey remote key file p) >>= \case
Left e -> do Left e -> do
warning (show e) warning (show e)
@ -46,7 +67,7 @@ start = do
Remote.logStatus remote key InfoPresent Remote.logStatus remote key InfoPresent
return True return True
| otherwise = notifyTransfer direction file $ | otherwise = notifyTransfer direction file $
download (Remote.uuid remote) key file stdRetry $ \p -> download' (Remote.uuid remote) key file stdRetry $ \p ->
getViaTmp (Remote.retrievalSecurityPolicy remote) (RemoteVerify remote) key file $ \t -> do getViaTmp (Remote.retrievalSecurityPolicy remote) (RemoteVerify remote) key file $ \t -> do
r <- tryNonAsync (Remote.retrieveKeyFile remote key file (fromRawFilePath t) p) >>= \case r <- tryNonAsync (Remote.retrieveKeyFile remote key file (fromRawFilePath t) p) >>= \case
Left e -> do Left e -> do
@ -58,82 +79,34 @@ start = do
-- not old cached data. -- not old cached data.
Database.Keys.closeDb Database.Keys.closeDb
return r return r
where
key = mkKey (const keydata)
runRequests runRequests
:: Handle :: Handle
-> Handle -> Handle
-> (TransferRequest -> Annex Bool) -> (TransferRequest -> Remote -> Annex Bool)
-> Annex () -> Annex ()
runRequests readh writeh a = do runRequests readh writeh a = go Nothing Nothing
liftIO $ hSetBuffering readh NoBuffering
go =<< readrequests
where where
go (d:rn:k:f:rest) = do go lastremoteoruuid lastremote = unlessM (liftIO $ hIsEOF readh) $ do
case (deserialize d, deserialize rn, deserialize k, deserialize f) of l <- liftIO $ hGetLine readh
(Just direction, Just remotename, Just key, Just file) -> do case readMaybe l of
mremote <- Remote.byName' remotename Just tr@(TransferRequest _ _ remoteoruuid _ _) -> do
-- Often the same remote will be used
-- repeatedly, so cache the last one to
-- avoid looking up repeatedly.
mremote <- if lastremoteoruuid == Just remoteoruuid
then pure lastremote
else eitherToMaybe <$> Remote.byName'
(either fromUUID id remoteoruuid)
case mremote of case mremote of
Left _ -> sendresult False Just remote -> do
Right remote -> sendresult =<< a sendresult =<< a tr remote
(TransferRequest direction remote key file) go (Just remoteoruuid) mremote
_ -> sendresult False Nothing -> transferKeysProtocolError l
go rest Nothing -> transferKeysProtocolError l
go [] = noop
go [""] = noop
go v = error $ "transferkeys protocol error: " ++ show v
readrequests = liftIO $ split fieldSep <$> hGetContents readh
sendresult b = liftIO $ do sendresult b = liftIO $ do
hPutStrLn writeh $ serialize b hPutStrLn writeh $ show $ TransferResult b
hFlush writeh hFlush writeh
sendRequest :: Transfer -> TransferInfo -> Handle -> IO ()
sendRequest t tinfo h = do
hPutStr h $ intercalate fieldSep
[ serialize (transferDirection t)
, maybe (serialize ((fromUUID (transferUUID t)) :: String))
(serialize . Remote.name)
(transferRemote tinfo)
, serialize (transferKey t)
, serialize (associatedFile tinfo)
, "" -- adds a trailing null
]
hFlush h
readResponse :: Handle -> IO Bool
readResponse h = fromMaybe False . deserialize <$> hGetLine h
fieldSep :: String
fieldSep = "\0"
class TCSerialized a where
serialize :: a -> String
deserialize :: String -> Maybe a
instance TCSerialized Bool where
serialize True = "1"
serialize False = "0"
deserialize "1" = Just True
deserialize "0" = Just False
deserialize _ = Nothing
instance TCSerialized Direction where
serialize Upload = "u"
serialize Download = "d"
deserialize "u" = Just Upload
deserialize "d" = Just Download
deserialize _ = Nothing
instance TCSerialized AssociatedFile where
serialize (AssociatedFile (Just f)) = fromRawFilePath f
serialize (AssociatedFile Nothing) = ""
deserialize "" = Just (AssociatedFile Nothing)
deserialize f = Just (AssociatedFile (Just (toRawFilePath f)))
instance TCSerialized RemoteName where
serialize n = n
deserialize n = Just n
instance TCSerialized Key where
serialize = serializeKey
deserialize = deserializeKey

View file

@ -285,9 +285,10 @@ debugEnabled = do
commandProgressDisabled :: Annex Bool commandProgressDisabled :: Annex Bool
commandProgressDisabled = withMessageState $ \s -> return $ commandProgressDisabled = withMessageState $ \s -> return $
case outputType s of case outputType s of
NormalOutput -> concurrentOutputEnabled s
QuietOutput -> True QuietOutput -> True
JSONOutput _ -> True JSONOutput _ -> True
NormalOutput -> concurrentOutputEnabled s SerializedOutput _ _ -> True
jsonOutputEnabled :: Annex Bool jsonOutputEnabled :: Annex Bool
jsonOutputEnabled = withMessageState $ \s -> return $ jsonOutputEnabled = withMessageState $ \s -> return $
@ -313,8 +314,20 @@ mkPrompter = getConcurrency >>= \case
where where
goconcurrent = withMessageState $ \s -> do goconcurrent = withMessageState $ \s -> do
let l = promptLock s let l = promptLock s
let (run, cleanup) = case outputType s of
SerializedOutput h hr ->
( \a -> do
liftIO $ outputSerialized h StartPrompt
liftIO $ waitOutputSerializedResponse hr ReadyPrompt
a
, liftIO $ outputSerialized h EndPrompt
)
_ ->
( hideRegionsWhile s
, noop
)
return $ \a -> return $ \a ->
debugLocks $ bracketIO debugLocks $ bracketIO
(takeMVar l) (takeMVar l)
(putMVar l) (\v -> putMVar l v >> cleanup)
(const $ hideRegionsWhile s a) (const $ run a)

View file

@ -98,10 +98,14 @@ inOwnConsoleRegion s a
Regions.closeConsoleRegion r Regions.closeConsoleRegion r
{- The progress region is displayed inline with the current console region. -} {- The progress region is displayed inline with the current console region. -}
withProgressRegion :: (Regions.ConsoleRegion -> Annex a) -> Annex a withProgressRegion
withProgressRegion a = do :: (MonadIO m, MonadMask m)
parent <- consoleRegion <$> Annex.getState Annex.output => MessageState
-> (Regions.ConsoleRegion -> m a) -> m a
withProgressRegion st a =
Regions.withConsoleRegion (maybe Regions.Linear Regions.InLine parent) a Regions.withConsoleRegion (maybe Regions.Linear Regions.InLine parent) a
where
parent = consoleRegion st
instance Regions.LiftRegion Annex where instance Regions.LiftRegion Annex where
liftRegion = liftIO . atomically liftRegion = liftIO . atomically

View file

@ -1,6 +1,6 @@
{- git-annex output messages, including concurrent output to display regions {- git-annex output messages, including concurrent output to display regions
- -
- Copyright 2010-2018 Joey Hess <id@joeyh.name> - Copyright 2010-2020 Joey Hess <id@joeyh.name>
- -
- Licensed under the GNU AGPL version 3 or higher. - Licensed under the GNU AGPL version 3 or higher.
-} -}
@ -29,25 +29,32 @@ outputMessage' jsonoutputter jsonbuilder msg = withMessageState $ \s -> case out
| otherwise -> liftIO $ flushed $ S.putStr msg | otherwise -> liftIO $ flushed $ S.putStr msg
JSONOutput _ -> void $ jsonoutputter jsonbuilder s JSONOutput _ -> void $ jsonoutputter jsonbuilder s
QuietOutput -> q QuietOutput -> q
SerializedOutput h _ -> do
liftIO $ outputSerialized h $ OutputMessage msg
void $ jsonoutputter jsonbuilder s
-- Buffer changes to JSON until end is reached and then emit it. -- Buffer changes to JSON until end is reached and then emit it.
bufferJSON :: JSONBuilder -> MessageState -> Annex Bool bufferJSON :: JSONBuilder -> MessageState -> Annex Bool
bufferJSON jsonbuilder s = case outputType s of bufferJSON jsonbuilder s = case outputType s of
JSONOutput jsonoptions JSONOutput _ -> go (flushed . JSON.emit)
| endjson -> do SerializedOutput h _ -> go (outputSerialized h . JSONObject . JSON.encode)
_ -> return False
where
go emitter
| endjson = do
Annex.changeState $ \st -> Annex.changeState $ \st ->
st { Annex.output = s { jsonBuffer = Nothing } } st { Annex.output = s { jsonBuffer = Nothing } }
maybe noop (liftIO . flushed . JSON.emit . JSON.finalize jsonoptions) json maybe noop (liftIO . emitter . JSON.finalize) json
return True return True
| otherwise -> do | otherwise = do
Annex.changeState $ \st -> Annex.changeState $ \st ->
st { Annex.output = s { jsonBuffer = json } } st { Annex.output = s { jsonBuffer = json } }
return True return True
_ -> return False
where
(json, endjson) = case jsonbuilder i of (json, endjson) = case jsonbuilder i of
Nothing -> (jsonBuffer s, False) Nothing -> (jsonBuffer s, False)
(Just (j, e)) -> (Just j, e) (Just (j, e)) -> (Just j, e)
i = case jsonBuffer s of i = case jsonBuffer s of
Nothing -> Nothing Nothing -> Nothing
Just b -> Just (b, False) Just b -> Just (b, False)
@ -55,11 +62,14 @@ bufferJSON jsonbuilder s = case outputType s of
-- Immediately output JSON. -- Immediately output JSON.
outputJSON :: JSONBuilder -> MessageState -> Annex Bool outputJSON :: JSONBuilder -> MessageState -> Annex Bool
outputJSON jsonbuilder s = case outputType s of outputJSON jsonbuilder s = case outputType s of
JSONOutput _ -> do JSONOutput _ -> go (flushed . JSON.emit)
maybe noop (liftIO . flushed . JSON.emit) SerializedOutput h _ -> go (outputSerialized h . JSONObject . JSON.encode)
_ -> return False
where
go emitter = do
maybe noop (liftIO . emitter)
(fst <$> jsonbuilder Nothing) (fst <$> jsonbuilder Nothing)
return True return True
_ -> return False
outputError :: String -> Annex () outputError :: String -> Annex ()
outputError msg = withMessageState $ \s -> case (outputType s, jsonBuffer s) of outputError msg = withMessageState $ \s -> case (outputType s, jsonBuffer s) of
@ -67,6 +77,8 @@ outputError msg = withMessageState $ \s -> case (outputType s, jsonBuffer s) of
let jb' = Just (JSON.addErrorMessage (lines msg) jb) let jb' = Just (JSON.addErrorMessage (lines msg) jb)
in Annex.changeState $ \st -> in Annex.changeState $ \st ->
st { Annex.output = s { jsonBuffer = jb' } } st { Annex.output = s { jsonBuffer = jb' } }
(SerializedOutput h _, _) ->
liftIO $ outputSerialized h $ OutputError msg
_ _
| concurrentOutputEnabled s -> concurrentMessage s True msg go | concurrentOutputEnabled s -> concurrentMessage s True msg go
| otherwise -> go | otherwise -> go
@ -81,3 +93,12 @@ q = noop
flushed :: IO () -> IO () flushed :: IO () -> IO ()
flushed a = a >> hFlush stdout flushed a = a >> hFlush stdout
outputSerialized :: (SerializedOutput -> IO ()) -> SerializedOutput -> IO ()
outputSerialized = id
-- | Wait for the specified response.
waitOutputSerializedResponse :: (IO (Maybe SerializedOutputResponse)) -> SerializedOutputResponse -> IO ()
waitOutputSerializedResponse getr r = tryIO getr >>= \case
Right (Just r') | r' == r -> return ()
v -> error $ "serialized output protocol error; expected " ++ show r ++ " got " ++ show v

View file

@ -11,6 +11,8 @@ module Messages.JSON (
JSONBuilder, JSONBuilder,
JSONChunk(..), JSONChunk(..),
emit, emit,
emit',
encode,
none, none,
start, start,
end, end,
@ -38,7 +40,6 @@ import Data.Maybe
import Data.Monoid import Data.Monoid
import Prelude import Prelude
import Types.Messages
import Types.Command (SeekInput(..)) import Types.Command (SeekInput(..))
import Key import Key
import Utility.Metered import Utility.Metered
@ -52,9 +53,12 @@ emitLock :: MVar ()
emitLock = unsafePerformIO $ newMVar () emitLock = unsafePerformIO $ newMVar ()
emit :: Object -> IO () emit :: Object -> IO ()
emit o = do emit = emit' . encode
emit' :: L.ByteString -> IO ()
emit' b = do
takeMVar emitLock takeMVar emitLock
L.hPut stdout (encode o) L.hPut stdout b
putStr "\n" putStr "\n"
putMVar emitLock () putMVar emitLock ()
@ -82,12 +86,10 @@ end :: Bool -> JSONBuilder
end b (Just (o, _)) = Just (HM.insert "success" (toJSON' b) o, True) end b (Just (o, _)) = Just (HM.insert "success" (toJSON' b) o, True)
end _ Nothing = Nothing end _ Nothing = Nothing
finalize :: JSONOptions -> Object -> Object -- Always include error-messages field, even if empty,
finalize jsonoptions o -- to make the json be self-documenting.
-- Always include error-messages field, even if empty, finalize :: Object -> Object
-- to make the json be self-documenting. finalize o = addErrorMessage [] o
| jsonErrorMessages jsonoptions = addErrorMessage [] o
| otherwise = o
addErrorMessage :: [String] -> Object -> Object addErrorMessage :: [String] -> Object -> Object
addErrorMessage msg o = addErrorMessage msg o =

View file

@ -1,6 +1,6 @@
{- git-annex progress output {- git-annex progress output
- -
- Copyright 2010-2019 Joey Hess <id@joeyh.name> - Copyright 2010-2020 Joey Hess <id@joeyh.name>
- -
- Licensed under the GNU AGPL version 3 or higher. - Licensed under the GNU AGPL version 3 or higher.
-} -}
@ -20,9 +20,11 @@ import Types.KeySource
import Utility.InodeCache import Utility.InodeCache
import qualified Messages.JSON as JSON import qualified Messages.JSON as JSON
import Messages.Concurrent import Messages.Concurrent
import Messages.Internal
import qualified System.Console.Regions as Regions import qualified System.Console.Regions as Regions
import qualified System.Console.Concurrent as Console import qualified System.Console.Concurrent as Console
import Control.Monad.IO.Class (MonadIO)
{- Class of things from which a size can be gotten to display a progress {- Class of things from which a size can be gotten to display a progress
- meter. -} - meter. -}
@ -63,38 +65,63 @@ instance MeterSize KeySizer where
{- Shows a progress meter while performing an action. {- Shows a progress meter while performing an action.
- The action is passed the meter and a callback to use to update the meter. - The action is passed the meter and a callback to use to update the meter.
--} --}
metered :: MeterSize sizer => Maybe MeterUpdate -> sizer -> (Meter -> MeterUpdate -> Annex a) -> Annex a metered
metered othermeter sizer a = withMessageState $ \st -> :: MeterSize sizer
flip go st =<< getMeterSize sizer => Maybe MeterUpdate
-> sizer
-> (Meter -> MeterUpdate -> Annex a)
-> Annex a
metered othermeter sizer a = withMessageState $ \st -> do
sz <- getMeterSize sizer
metered' st othermeter sz showOutput a
metered'
:: (Monad m, MonadIO m, MonadMask m)
=> MessageState
-> Maybe MeterUpdate
-> Maybe FileSize
-> m ()
-- ^ this should run showOutput
-> (Meter -> MeterUpdate -> m a)
-> m a
metered' st othermeter msize showoutput a = go st
where where
go _ (MessageState { outputType = QuietOutput }) = nometer go (MessageState { outputType = QuietOutput }) = nometer
go msize (MessageState { outputType = NormalOutput, concurrentOutputEnabled = False }) = do go (MessageState { outputType = NormalOutput, concurrentOutputEnabled = False }) = do
showOutput showoutput
meter <- liftIO $ mkMeter msize $ meter <- liftIO $ mkMeter msize $
displayMeterHandle stdout bandwidthMeter displayMeterHandle stdout bandwidthMeter
m <- liftIO $ rateLimitMeterUpdate 0.2 meter $ m <- liftIO $ rateLimitMeterUpdate consoleratelimit meter $
updateMeter meter updateMeter meter
r <- a meter (combinemeter m) r <- a meter (combinemeter m)
liftIO $ clearMeterHandle meter stdout liftIO $ clearMeterHandle meter stdout
return r return r
go msize (MessageState { outputType = NormalOutput, concurrentOutputEnabled = True }) = go (MessageState { outputType = NormalOutput, concurrentOutputEnabled = True }) =
withProgressRegion $ \r -> do withProgressRegion st $ \r -> do
meter <- liftIO $ mkMeter msize $ \_ msize' old new -> meter <- liftIO $ mkMeter msize $ \_ msize' old new ->
let s = bandwidthMeter msize' old new let s = bandwidthMeter msize' old new
in Regions.setConsoleRegion r ('\n' : s) in Regions.setConsoleRegion r ('\n' : s)
m <- liftIO $ rateLimitMeterUpdate 0.2 meter $ m <- liftIO $ rateLimitMeterUpdate consoleratelimit meter $
updateMeter meter updateMeter meter
a meter (combinemeter m) a meter (combinemeter m)
go msize (MessageState { outputType = JSONOutput jsonoptions }) go (MessageState { outputType = JSONOutput jsonoptions })
| jsonProgress jsonoptions = do | jsonProgress jsonoptions = do
buf <- withMessageState $ return . jsonBuffer let buf = jsonBuffer st
meter <- liftIO $ mkMeter msize $ \_ msize' _old (new, _now) -> meter <- liftIO $ mkMeter msize $ \_ msize' _old new ->
JSON.progress buf msize' new JSON.progress buf msize' (meterBytesProcessed new)
m <- liftIO $ rateLimitMeterUpdate 0.1 meter $ m <- liftIO $ rateLimitMeterUpdate jsonratelimit meter $
updateMeter meter updateMeter meter
a meter (combinemeter m) a meter (combinemeter m)
| otherwise = nometer | otherwise = nometer
go (MessageState { outputType = SerializedOutput h _ }) = do
liftIO $ outputSerialized h $ StartProgressMeter msize
meter <- liftIO $ mkMeter msize $ \_ _ _old new ->
outputSerialized h $ UpdateProgressMeter $
meterBytesProcessed new
m <- liftIO $ rateLimitMeterUpdate minratelimit meter $
updateMeter meter
a meter (combinemeter m)
`finally` (liftIO $ outputSerialized h EndProgressMeter)
nometer = do nometer = do
dummymeter <- liftIO $ mkMeter Nothing $ dummymeter <- liftIO $ mkMeter Nothing $
\_ _ _ _ -> return () \_ _ _ _ -> return ()
@ -104,6 +131,12 @@ metered othermeter sizer a = withMessageState $ \st ->
Nothing -> m Nothing -> m
Just om -> combineMeterUpdate m om Just om -> combineMeterUpdate m om
consoleratelimit = 0.2
jsonratelimit = 0.1
minratelimit = min consoleratelimit jsonratelimit
{- Poll file size to display meter. -} {- Poll file size to display meter. -}
meteredFile :: FilePath -> Maybe MeterUpdate -> Key -> Annex a -> Annex a meteredFile :: FilePath -> Maybe MeterUpdate -> Key -> Annex a -> Annex a
meteredFile file combinemeterupdate key a = meteredFile file combinemeterupdate key a =

102
Messages/Serialized.hs Normal file
View file

@ -0,0 +1,102 @@
{- serialized output
-
- Copyright 2020 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
{-# LANGUAGE RankNTypes #-}
module Messages.Serialized (
relaySerializedOutput,
outputSerialized,
waitOutputSerializedResponse,
) where
import Common
import Annex
import Types.Messages
import Messages
import Messages.Internal
import Messages.Progress
import qualified Messages.JSON as JSON
import Utility.Metered (BytesProcessed)
import Control.Monad.IO.Class (MonadIO)
-- | Relay serialized output from a child process to the console.
relaySerializedOutput
:: (Monad m, MonadIO m, MonadMask m)
=> m (Either SerializedOutput r)
-- ^ Get next serialized output, or final value to return.
-> (SerializedOutputResponse -> m ())
-- ^ Send response to child process.
-> (Maybe BytesProcessed -> m ())
-- ^ When a progress meter is running, is updated with
-- progress meter values sent by the process.
-- When a progress meter is stopped, Nothing is sent.
-> (forall a. Annex a -> m a)
-- ^ Run an annex action in the monad. Will not be used with
-- actions that block for a long time.
-> m r
relaySerializedOutput getso sendsor meterreport runannex = go Nothing
where
go st = loop st >>= \case
Right r -> return r
Left st' -> go st'
loop st = getso >>= \case
Right r -> return (Right r)
Left (OutputMessage msg) -> do
runannex $ outputMessage'
(\_ _ -> return False)
id
msg
loop st
Left (OutputError msg) -> do
runannex $ outputError msg
loop st
Left (JSONObject b) -> do
runannex $ withMessageState $ \s -> case outputType s of
JSONOutput _ -> liftIO $ flushed $ JSON.emit' b
SerializedOutput h _ -> liftIO $
outputSerialized h $ JSONObject b
_ -> q
loop st
Left (StartProgressMeter sz) -> do
ost <- runannex (Annex.getState Annex.output)
-- Display a progress meter while running, until
-- the meter ends or a final value is returned.
metered' ost Nothing sz (runannex showOutput)
(\_meter meterupdate -> loop (Just meterupdate))
>>= \case
Right r -> return (Right r)
-- Continue processing serialized
-- output after the progress meter
-- is done.
Left _st' -> loop Nothing
Left EndProgressMeter -> do
meterreport Nothing
return (Left st)
Left (UpdateProgressMeter n) -> do
case st of
Just meterupdate -> do
meterreport (Just n)
liftIO $ meterupdate n
Nothing -> noop
loop st
Left StartPrompt -> do
prompter <- runannex mkPrompter
v <- prompter $ do
sendsor ReadyPrompt
-- Continue processing serialized output
-- until EndPrompt or a final value is
-- returned. (EndPrompt is all that
-- ought to be sent while in a prompt
-- really, but if something else did get
-- sent, display it just in case.)
loop st
case v of
Right r -> return (Right r)
Left st' -> loop st'
Left EndPrompt -> return (Left st)

View file

@ -75,7 +75,7 @@ runLocal runst runner a = case a of
let rsp = RetrievalAllKeysSecure let rsp = RetrievalAllKeysSecure
v <- tryNonAsync $ do v <- tryNonAsync $ do
let runtransfer ti = let runtransfer ti =
Right <$> transfer download k af (\p -> Right <$> transfer download' k af (\p ->
getViaTmp rsp DefaultVerify k af $ \tmp -> getViaTmp rsp DefaultVerify k af $ \tmp ->
storefile (fromRawFilePath tmp) o l getb validitycheck p ti) storefile (fromRawFilePath tmp) o l getb validitycheck p ti)
let fallback = return $ Left $ let fallback = return $ Left $

View file

@ -70,6 +70,7 @@ import Annex.Common
import Types.Remote import Types.Remote
import qualified Annex import qualified Annex
import Annex.UUID import Annex.UUID
import Annex.Action
import Logs.UUID import Logs.UUID
import Logs.Trust import Logs.Trust
import Logs.Location hiding (logStatus) import Logs.Location hiding (logStatus)
@ -82,21 +83,6 @@ import Config.DynamicConfig
import Git.Types (RemoteName, ConfigKey(..), fromConfigValue) import Git.Types (RemoteName, ConfigKey(..), fromConfigValue)
import Utility.Aeson import Utility.Aeson
{- Runs an action that may throw exceptions, catching and displaying them. -}
action :: Annex () -> Annex Bool
action a = tryNonAsync a >>= \case
Right () -> return True
Left e -> do
warning (show e)
return False
verifiedAction :: Annex Verification -> Annex (Bool, Verification)
verifiedAction a = tryNonAsync a >>= \case
Right v -> return (True, v)
Left e -> do
warning (show e)
return (False, UnVerified)
{- Map from UUIDs of Remotes to a calculated value. -} {- Map from UUIDs of Remotes to a calculated value. -}
remoteMap :: (Remote -> v) -> Annex (M.Map UUID v) remoteMap :: (Remote -> v) -> Annex (M.Map UUID v)
remoteMap mkv = remoteMap' mkv (pure . mkk) remoteMap mkv = remoteMap' mkv (pure . mkk)

View file

@ -41,6 +41,7 @@ import Types.NumCopies
import Types.Difference import Types.Difference
import Types.RefSpec import Types.RefSpec
import Types.RepoVersion import Types.RepoVersion
import Types.StallDetection
import Config.DynamicConfig import Config.DynamicConfig
import Utility.HumanTime import Utility.HumanTime
import Utility.Gpg (GpgCmd, mkGpgCmd) import Utility.Gpg (GpgCmd, mkGpgCmd)
@ -116,6 +117,7 @@ data GitConfig = GitConfig
, annexRetry :: Maybe Integer , annexRetry :: Maybe Integer
, annexForwardRetry :: Maybe Integer , annexForwardRetry :: Maybe Integer
, annexRetryDelay :: Maybe Seconds , annexRetryDelay :: Maybe Seconds
, annexStallDetection :: Maybe StallDetection
, annexAllowedUrlSchemes :: S.Set Scheme , annexAllowedUrlSchemes :: S.Set Scheme
, annexAllowedIPAddresses :: String , annexAllowedIPAddresses :: String
, annexAllowUnverifiedDownloads :: Bool , annexAllowUnverifiedDownloads :: Bool
@ -202,6 +204,9 @@ extractGitConfig configsource r = GitConfig
, annexForwardRetry = getmayberead (annexConfig "forward-retry") , annexForwardRetry = getmayberead (annexConfig "forward-retry")
, annexRetryDelay = Seconds , annexRetryDelay = Seconds
<$> getmayberead (annexConfig "retrydelay") <$> getmayberead (annexConfig "retrydelay")
, annexStallDetection =
either (const Nothing) Just . parseStallDetection
=<< getmaybe (annexConfig "stalldetection")
, annexAllowedUrlSchemes = S.fromList $ map mkScheme $ , annexAllowedUrlSchemes = S.fromList $ map mkScheme $
maybe ["http", "https", "ftp"] words $ maybe ["http", "https", "ftp"] words $
getmaybe (annexConfig "security.allowed-url-schemes") getmaybe (annexConfig "security.allowed-url-schemes")
@ -306,6 +311,7 @@ data RemoteGitConfig = RemoteGitConfig
, remoteAnnexRetry :: Maybe Integer , remoteAnnexRetry :: Maybe Integer
, remoteAnnexForwardRetry :: Maybe Integer , remoteAnnexForwardRetry :: Maybe Integer
, remoteAnnexRetryDelay :: Maybe Seconds , remoteAnnexRetryDelay :: Maybe Seconds
, remoteAnnexStallDetection :: Maybe StallDetection
, remoteAnnexAllowUnverifiedDownloads :: Bool , remoteAnnexAllowUnverifiedDownloads :: Bool
, remoteAnnexConfigUUID :: Maybe UUID , remoteAnnexConfigUUID :: Maybe UUID
@ -369,6 +375,9 @@ extractRemoteGitConfig r remotename = do
, remoteAnnexForwardRetry = getmayberead "forward-retry" , remoteAnnexForwardRetry = getmayberead "forward-retry"
, remoteAnnexRetryDelay = Seconds , remoteAnnexRetryDelay = Seconds
<$> getmayberead "retrydelay" <$> getmayberead "retrydelay"
, remoteAnnexStallDetection =
either (const Nothing) Just . parseStallDetection
=<< getmaybe "stalldetection"
, remoteAnnexAllowUnverifiedDownloads = (== Just "ACKTHPPT") $ , remoteAnnexAllowUnverifiedDownloads = (== Just "ACKTHPPT") $
getmaybe ("security-allow-unverified-downloads") getmaybe ("security-allow-unverified-downloads")
, remoteAnnexConfigUUID = toUUID <$> getmaybe "config-uuid" , remoteAnnexConfigUUID = toUUID <$> getmaybe "config-uuid"

View file

@ -11,6 +11,7 @@ module Types.Key (
KeyData(..), KeyData(..),
Key, Key,
fromKey, fromKey,
keyData,
mkKey, mkKey,
alterKey, alterKey,
isKeyPrefix, isKeyPrefix,
@ -201,7 +202,7 @@ splitKeyNameExtension' keyname = S8.span (/= '.') keyname
{- A filename may be associated with a Key. -} {- A filename may be associated with a Key. -}
newtype AssociatedFile = AssociatedFile (Maybe RawFilePath) newtype AssociatedFile = AssociatedFile (Maybe RawFilePath)
deriving (Show, Eq, Ord) deriving (Show, Read, Eq, Ord)
{- There are several different varieties of keys. -} {- There are several different varieties of keys. -}
data KeyVariety data KeyVariety

View file

@ -1,6 +1,6 @@
{- git-annex Messages data types {- git-annex Messages data types
- -
- Copyright 2012-2018 Joey Hess <id@joeyh.name> - Copyright 2012-2020 Joey Hess <id@joeyh.name>
- -
- Licensed under the GNU AGPL version 3 or higher. - Licensed under the GNU AGPL version 3 or higher.
-} -}
@ -8,12 +8,21 @@
module Types.Messages where module Types.Messages where
import qualified Utility.Aeson as Aeson import qualified Utility.Aeson as Aeson
import Utility.Metered
import Utility.FileSize
import Control.Concurrent import Control.Concurrent
import System.Console.Regions (ConsoleRegion) import System.Console.Regions (ConsoleRegion)
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
data OutputType = NormalOutput | QuietOutput | JSONOutput JSONOptions data OutputType
deriving (Show) = NormalOutput
| QuietOutput
| JSONOutput JSONOptions
| SerializedOutput
(SerializedOutput -> IO ())
(IO (Maybe SerializedOutputResponse))
data JSONOptions = JSONOptions data JSONOptions = JSONOptions
{ jsonProgress :: Bool { jsonProgress :: Bool
@ -53,3 +62,23 @@ newMessageState = do
, jsonBuffer = Nothing , jsonBuffer = Nothing
, promptLock = promptlock , promptLock = promptlock
} }
-- | When communicating with a child process over a pipe while it is
-- performing some action, this is used to pass back output that the child
-- would normally display to the console.
data SerializedOutput
= OutputMessage S.ByteString
| OutputError String
| StartProgressMeter (Maybe FileSize)
| UpdateProgressMeter BytesProcessed
| EndProgressMeter
| StartPrompt
| EndPrompt
| JSONObject L.ByteString
-- ^ This is always sent, it's up to the consumer to decide if it
-- wants to display JSON, or human-readable messages.
deriving (Show, Read)
data SerializedOutputResponse
= ReadyPrompt
deriving (Eq, Show, Read)

29
Types/StallDetection.hs Normal file
View file

@ -0,0 +1,29 @@
{- types for stall detection
-
- Copyright 2020 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
module Types.StallDetection where
import Utility.DataUnits
import Utility.HumanTime
import Utility.Misc
-- Unless the given number of bytes have been sent over the given
-- amount of time, there's a stall.
data StallDetection = StallDetection ByteSize Duration
deriving (Show)
-- Parse eg, "0KiB/60s"
parseStallDetection :: String -> Either String StallDetection
parseStallDetection s =
let (bs, ds) = separate (== '/') s
in do
b <- maybe
(Left $ "Unable to parse stall detection amount " ++ bs)
Right
(readSize dataUnits bs)
d <- parseDuration ds
return (StallDetection b d)

View file

@ -1,19 +1,17 @@
{- A pool of "git-annex transferkeys" processes available for use {- A pool of "git-annex transferkeys" processes available for use
- -
- Copyright 2013 Joey Hess <id@joeyh.name> - Copyright 2013-2020 Joey Hess <id@joeyh.name>
- -
- Licensed under the GNU AGPL version 3 or higher. - Licensed under the GNU AGPL version 3 or higher.
-} -}
module Assistant.Types.TransferrerPool where module Types.TransferrerPool where
import Annex.Common import Common
import Utility.NotificationBroadcaster
import Assistant.Types.DaemonStatus
import Control.Concurrent.STM hiding (check) import Control.Concurrent.STM hiding (check)
type TransferrerPool = TVar (MkCheckTransferrer, [TransferrerPoolItem]) type TransferrerPool = TVar [TransferrerPoolItem]
type CheckTransferrer = IO Bool type CheckTransferrer = IO Bool
type MkCheckTransferrer = IO (IO Bool) type MkCheckTransferrer = IO (IO Bool)
@ -29,36 +27,29 @@ data Transferrer = Transferrer
, transferrerHandle :: ProcessHandle , transferrerHandle :: ProcessHandle
} }
newTransferrerPool :: MkCheckTransferrer -> IO TransferrerPool newTransferrerPool :: IO TransferrerPool
newTransferrerPool c = newTVarIO (c, []) newTransferrerPool = newTVarIO []
popTransferrerPool :: TransferrerPool -> STM (Maybe TransferrerPoolItem, Int) popTransferrerPool :: TransferrerPool -> STM (Maybe TransferrerPoolItem, Int)
popTransferrerPool p = do popTransferrerPool p = do
(c, l) <- readTVar p l <- readTVar p
case l of case l of
[] -> return (Nothing, 0) [] -> return (Nothing, 0)
(i:is) -> do (i:is) -> do
writeTVar p (c, is) writeTVar p is
return $ (Just i, length is) return $ (Just i, length is)
pushTransferrerPool :: TransferrerPool -> TransferrerPoolItem -> STM () pushTransferrerPool :: TransferrerPool -> TransferrerPoolItem -> STM ()
pushTransferrerPool p i = do pushTransferrerPool p i = do
(c, l) <- readTVar p l <- readTVar p
let l' = i:l let l' = i:l
writeTVar p (c, l') writeTVar p l'
{- Note that making a CheckTransferrer may allocate resources, {- Note that making a CheckTransferrer may allocate resources,
- such as a NotificationHandle, so it's important that the returned - such as a NotificationHandle, so it's important that the returned
- TransferrerPoolItem is pushed into the pool, and not left to be - TransferrerPoolItem is pushed into the pool, and not left to be
- garbage collected. -} - garbage collected. -}
mkTransferrerPoolItem :: TransferrerPool -> Transferrer -> IO TransferrerPoolItem mkTransferrerPoolItem :: MkCheckTransferrer -> Transferrer -> IO TransferrerPoolItem
mkTransferrerPoolItem p t = do mkTransferrerPoolItem mkcheck t = do
mkcheck <- atomically $ fst <$> readTVar p
check <- mkcheck check <- mkcheck
return $ TransferrerPoolItem (Just t) check return $ TransferrerPoolItem (Just t) check
checkNetworkConnections :: DaemonStatusHandle -> MkCheckTransferrer
checkNetworkConnections dstatushandle = do
dstatus <- atomically $ readTVar dstatushandle
h <- newNotificationHandle False (networkConnectedNotifier dstatus)
return $ not <$> checkNotification h

View file

@ -10,6 +10,7 @@
module Utility.Batch ( module Utility.Batch (
batch, batch,
BatchCommandMaker, BatchCommandMaker,
nonBatchCommandMaker,
getBatchCommandMaker, getBatchCommandMaker,
toBatchCommand, toBatchCommand,
batchCommand, batchCommand,
@ -50,6 +51,9 @@ batch a = a
- are available in the path. -} - are available in the path. -}
type BatchCommandMaker = (String, [CommandParam]) -> (String, [CommandParam]) type BatchCommandMaker = (String, [CommandParam]) -> (String, [CommandParam])
nonBatchCommandMaker :: BatchCommandMaker
nonBatchCommandMaker = id
getBatchCommandMaker :: IO BatchCommandMaker getBatchCommandMaker :: IO BatchCommandMaker
getBatchCommandMaker = do getBatchCommandMaker = do
#ifndef mingw32_HOST_OS #ifndef mingw32_HOST_OS

View file

@ -45,7 +45,9 @@ daysToDuration i = Duration $ i * dsecs
{- Parses a human-input time duration, of the form "5h", "1m", "5h1m", etc -} {- Parses a human-input time duration, of the form "5h", "1m", "5h1m", etc -}
parseDuration :: String -> Either String Duration parseDuration :: String -> Either String Duration
parseDuration d = maybe parsefail (Right . Duration) $ go 0 d parseDuration d
| null d = parsefail
| otherwise = maybe parsefail (Right . Duration) $ go 0 d
where where
go n [] = return n go n [] = return n
go n s = do go n s = do

View file

@ -9,6 +9,7 @@
module Utility.Metered ( module Utility.Metered (
MeterUpdate, MeterUpdate,
MeterState(..),
nullMeterUpdate, nullMeterUpdate,
combineMeterUpdate, combineMeterUpdate,
TotalSize(..), TotalSize(..),
@ -77,7 +78,7 @@ combineMeterUpdate a b = \n -> a n >> b n
{- Total number of bytes processed so far. -} {- Total number of bytes processed so far. -}
newtype BytesProcessed = BytesProcessed Integer newtype BytesProcessed = BytesProcessed Integer
deriving (Eq, Ord, Show) deriving (Eq, Ord, Show, Read)
class AsBytesProcessed a where class AsBytesProcessed a where
toBytesProcessed :: a -> BytesProcessed toBytesProcessed :: a -> BytesProcessed
@ -379,19 +380,24 @@ rateLimitMeterUpdate delta (Meter totalsizev _ _ _) meterupdate = do
data Meter = Meter (MVar (Maybe Integer)) (MVar MeterState) (MVar String) DisplayMeter data Meter = Meter (MVar (Maybe Integer)) (MVar MeterState) (MVar String) DisplayMeter
type MeterState = (BytesProcessed, POSIXTime) data MeterState = MeterState
{ meterBytesProcessed :: BytesProcessed
, meterTimeStamp :: POSIXTime
} deriving (Show)
type DisplayMeter = MVar String -> Maybe Integer -> (BytesProcessed, POSIXTime) -> (BytesProcessed, POSIXTime) -> IO () type DisplayMeter = MVar String -> Maybe Integer -> MeterState -> MeterState -> IO ()
type RenderMeter = Maybe Integer -> (BytesProcessed, POSIXTime) -> (BytesProcessed, POSIXTime) -> String type RenderMeter = Maybe Integer -> MeterState -> MeterState -> String
-- | Make a meter. Pass the total size, if it's known. -- | Make a meter. Pass the total size, if it's known.
mkMeter :: Maybe Integer -> DisplayMeter -> IO Meter mkMeter :: Maybe Integer -> DisplayMeter -> IO Meter
mkMeter totalsize displaymeter = Meter mkMeter totalsize displaymeter = do
<$> newMVar totalsize ts <- getPOSIXTime
<*> ((\t -> newMVar (zeroBytesProcessed, t)) =<< getPOSIXTime) Meter
<*> newMVar "" <$> newMVar totalsize
<*> pure displaymeter <*> newMVar (MeterState zeroBytesProcessed ts)
<*> newMVar ""
<*> pure displaymeter
setMeterTotalSize :: Meter -> Integer -> IO () setMeterTotalSize :: Meter -> Integer -> IO ()
setMeterTotalSize (Meter totalsizev _ _ _) = void . swapMVar totalsizev . Just setMeterTotalSize (Meter totalsizev _ _ _) = void . swapMVar totalsizev . Just
@ -400,10 +406,11 @@ setMeterTotalSize (Meter totalsizev _ _ _) = void . swapMVar totalsizev . Just
updateMeter :: Meter -> MeterUpdate updateMeter :: Meter -> MeterUpdate
updateMeter (Meter totalsizev sv bv displaymeter) new = do updateMeter (Meter totalsizev sv bv displaymeter) new = do
now <- getPOSIXTime now <- getPOSIXTime
(old, before) <- swapMVar sv (new, now) let curms = MeterState new now
when (old /= new) $ do oldms <- swapMVar sv curms
when (meterBytesProcessed oldms /= new) $ do
totalsize <- readMVar totalsizev totalsize <- readMVar totalsizev
displaymeter bv totalsize (old, before) (new, now) displaymeter bv totalsize oldms curms
-- | Display meter to a Handle. -- | Display meter to a Handle.
displayMeterHandle :: Handle -> RenderMeter -> DisplayMeter displayMeterHandle :: Handle -> RenderMeter -> DisplayMeter
@ -428,7 +435,7 @@ clearMeterHandle (Meter _ _ v _) h = do
-- or when total size is not known: -- or when total size is not known:
-- 1.3 MiB 300 KiB/s -- 1.3 MiB 300 KiB/s
bandwidthMeter :: RenderMeter bandwidthMeter :: RenderMeter
bandwidthMeter mtotalsize (BytesProcessed old, before) (BytesProcessed new, now) = bandwidthMeter mtotalsize (MeterState (BytesProcessed old) before) (MeterState (BytesProcessed new) now) =
unwords $ catMaybes unwords $ catMaybes
[ Just percentamount [ Just percentamount
-- Pad enough for max width: "100% xxxx.xx KiB xxxx KiB/s" -- Pad enough for max width: "100% xxxx.xx KiB xxxx KiB/s"

View file

@ -34,7 +34,7 @@ module Utility.Process (
) where ) where
import qualified Utility.Process.Shim import qualified Utility.Process.Shim
import Utility.Process.Shim as X (CreateProcess(..), ProcessHandle, StdStream(..), CmdSpec(..), proc, getPid, getProcessExitCode, shell, terminateProcess) import Utility.Process.Shim as X (CreateProcess(..), ProcessHandle, StdStream(..), CmdSpec(..), proc, getPid, getProcessExitCode, shell, terminateProcess, interruptProcessGroupOf)
import Utility.Misc import Utility.Misc
import Utility.Exception import Utility.Exception
import Utility.Monad import Utility.Monad

View file

@ -8,20 +8,15 @@ git annex transferkeys
# DESCRIPTION # DESCRIPTION
This plumbing-level command is used by the assistant to transfer data. This plumbing-level command is used to transfer data.
It is a long-running process, which is fed instructions about the keys It is a long-running process, which is fed instructions about the keys
to transfer using an internal stdio protocol, which is to transfer using an internal stdio protocol, which is
intentionally not documented (as it may change at any time). intentionally not documented (as it may change at any time).
It's normal to have a transferkeys process running when the assistant is
running.
# SEE ALSO # SEE ALSO
[[git-annex]](1) [[git-annex]](1)
[[git-annex-assistant]](1)
# AUTHOR # AUTHOR
Joey Hess <id@joeyh.name> Joey Hess <id@joeyh.name>

View file

@ -1392,6 +1392,31 @@ Remotes are configured using these settings in `.git/config`.
When making multiple retries of the same transfer, the delay When making multiple retries of the same transfer, the delay
doubles after each retry. (default 1) doubles after each retry. (default 1)
* `remote.<name>.annex-stalldetecton`, `annex.stalldetection`
This lets stalled or too-slow transfers be detected, and dealt with, so
rather than getting stuck, git-annex will cancel the stalled operation.
When this happens, the transfer will be considered to have failed, so
settings like annex.retry will control what it does next.
This is not enabled by default, because it can make git-annex use
more resources. In order to cancel stalls, git-annex has to run
transfers in separate processes (one per concurrent job). So it
may need to open more connections to a remote than usual, or
the communication with those processes may make it a bit slower.
The value specifies how much data git-annex should expect to see
flowing, minimum, when it's not stalled, over a given period of time.
The format is "$amount/$timeperiod".
For example, to detect outright stalls where no data has been transferred
after 30 seconds: `git config annex.stalldetection "0/30s"`
Or, if you have a remote on a USB drive that is normally capable of
several megabytes per second, but has bad sectors where it gets
stuck for a long time, you could use:
`git config remote.usbdrive.annex-stalldetection "1MB/1m"`
* `remote.<name>.annex-checkuuid` * `remote.<name>.annex-checkuuid`
This only affects remotes that have their url pointing to a directory on This only affects remotes that have their url pointing to a directory on

View file

@ -1,3 +1,5 @@
In a number of scenarios (e.g. [[bugs/still_seeing_errors_with_parallel_git-annex-add]], [[bugs/parallel_copy_fails]], [[git-annex-sync/#comment-aceb18109c0a536e04bcdd3aa04bda29]]), `git-annex` operations may fail or hang due to transient conditions. It would help a lot if `git-annex` could be configured to fail timed-out operations, and to retry failed operations after a delay. This would especially help when using `git-annex` in a script or a higher-level tool. I've tried wrapping some retry logic around `git-annex` calls, but it seems `git-annex` itself is in the best position to do that sensibly (e.g. only retrying idempotent operations, or capping retries per remote). This would be a catch-all fix for unusual conditions that are hard to test for. In a number of scenarios (e.g. [[bugs/still_seeing_errors_with_parallel_git-annex-add]], [[bugs/parallel_copy_fails]], [[git-annex-sync/#comment-aceb18109c0a536e04bcdd3aa04bda29]]), `git-annex` operations may fail or hang due to transient conditions. It would help a lot if `git-annex` could be configured to fail timed-out operations, and to retry failed operations after a delay. This would especially help when using `git-annex` in a script or a higher-level tool. I've tried wrapping some retry logic around `git-annex` calls, but it seems `git-annex` itself is in the best position to do that sensibly (e.g. only retrying idempotent operations, or capping retries per remote). This would be a catch-all fix for unusual conditions that are hard to test for.
`git-annex` already has config options `annex.retry` and `annex.retry-delay`, but it seems that they don't cover all failure types. `git-annex` already has config options `annex.retry` and `annex.retry-delay`, but it seems that they don't cover all failure types.
> Added annex.stalldetection, [[done]] --[[Joey]]

View file

@ -25,9 +25,16 @@ A few notes on implementing that:
outputs to stderr directly no matter the output type currently. outputs to stderr directly no matter the output type currently.
It would need to be changed to support the new output type. It would need to be changed to support the new output type.
(And probably should for concurrent output mode too actually!) (And probably should for concurrent output mode too actually!)
> It's true, this is not concurrent output safe. However, that's already
> the case, and output to stderr doesn't affect the piping of serialized
> messages on stdout. So, punted on this.
* So does warningIO, though it's only used in a couple of remotes * So does warningIO, though it's only used in a couple of remotes
and rarely. It would be good to find a way to eliminate it. and rarely. It would be good to find a way to eliminate it.
> Eliminated except for one call in a non-relevant code path.
* Messages.prompt. Which is used by remotes, and would need to * Messages.prompt. Which is used by remotes, and would need to
communicate over the pipe to the parent git-annex bidirectionally. communicate over the pipe to the parent git-annex bidirectionally.
Eg, send a message saying the parent needs to prepare for prompt, Eg, send a message saying the parent needs to prepare for prompt,
@ -35,18 +42,6 @@ A few notes on implementing that:
prompting is done. (Note that the parent would need to detect if the child prompting is done. (Note that the parent would need to detect if the child
process crashed to avoid being locked waiting for the prompt.) process crashed to avoid being locked waiting for the prompt.)
* Messages.Internal.outputMessage is used by several things, and > Done.
includes some special parameters used in json mode. Since the parent
git-annex might itself have json mode enabled, those parameters will need
to be included in the serialization. But those parameters are currently
actually functions that manipulate the json object that will be outputted
later. So cannot be serialized. Uuuuh.
Maybe the thing to do is, pass along the --json options to transferkeys, [[done]]
and have a message type for json objects, which it uses to send them
to git-annex, which can then output them. outputMessage can handle the
new output type by sending the message through the pipe, and also
building any json object, and sending it through the pipe once it's done.
> This is implemented in the message-serialization branch. Not merged
> pending actually using it. --[[Joey]]

View file

@ -0,0 +1,5 @@
The new annex.stalldetection is used for transfers from remotes, but not
import and export from remotes.
This should be doable, but it will need the transferkeys protocol to be
extended to cover the additional actions. --[[Joey]]

View file

@ -0,0 +1,9 @@
Some of the things git-annex transferkeys does are suboptimal, especially
when -J has many of them running.
In particular, it writes location logs when downloading (but not
uploading), and so it flushes the journal etc.
It may also do some queries of data from git that could be avoided with
some refactoring of what code runs in it, which could avoid it needing to
start up git helper processes like catkey. --[[Joey]]

View file

@ -479,7 +479,6 @@ Executable git-annex
Assistant.Threads.Watcher Assistant.Threads.Watcher
Assistant.TransferQueue Assistant.TransferQueue
Assistant.TransferSlots Assistant.TransferSlots
Assistant.TransferrerPool
Assistant.Types.Alert Assistant.Types.Alert
Assistant.Types.BranchChange Assistant.Types.BranchChange
Assistant.Types.Changes Assistant.Types.Changes
@ -495,7 +494,6 @@ Executable git-annex
Assistant.Types.ThreadedMonad Assistant.Types.ThreadedMonad
Assistant.Types.TransferQueue Assistant.Types.TransferQueue
Assistant.Types.TransferSlots Assistant.Types.TransferSlots
Assistant.Types.TransferrerPool
Assistant.Types.UrlRenderer Assistant.Types.UrlRenderer
Assistant.Unused Assistant.Unused
Assistant.Upgrade Assistant.Upgrade
@ -666,6 +664,7 @@ Executable git-annex
Annex.TaggedPush Annex.TaggedPush
Annex.Tmp Annex.Tmp
Annex.Transfer Annex.Transfer
Annex.TransferrerPool
Annex.UntrustedFilePath Annex.UntrustedFilePath
Annex.UpdateInstead Annex.UpdateInstead
Annex.UUID Annex.UUID
@ -931,6 +930,7 @@ Executable git-annex
Messages.Internal Messages.Internal
Messages.JSON Messages.JSON
Messages.Progress Messages.Progress
Messages.Serialized
P2P.Address P2P.Address
P2P.Annex P2P.Annex
P2P.Auth P2P.Auth
@ -1023,9 +1023,11 @@ Executable git-annex
Types.RepoVersion Types.RepoVersion
Types.ScheduledActivity Types.ScheduledActivity
Types.StandardGroups Types.StandardGroups
Types.StallDetection
Types.StoreRetrieve Types.StoreRetrieve
Types.Test Types.Test
Types.Transfer Types.Transfer
Types.TransferrerPool
Types.TrustLevel Types.TrustLevel
Types.UUID Types.UUID
Types.UrlContents Types.UrlContents