From fcc9e015560534b7d0ae7a4c5067a8b6dbb89679 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Mon, 7 Dec 2020 16:11:29 -0400 Subject: [PATCH] finally using transferkeys Seems to work! Even progress bars. Have not tested prompting or various error message displays yet. transferkeys had to be made to operate in different modes for the Assistant and Annex monads. A bit ugly, but it did relegate that really ugly Database.Keys.closeDb in transferkeys to only the assistant code path. This commit was sponsored by Noam Kremen. --- Annex/Transfer.hs | 26 ++++++++-- Annex/TransferrerPool.hs | 104 ++++++++++++++++++++++++++++++------- Assistant/TransferSlots.hs | 5 +- Command/TransferKeys.hs | 65 +++++++++-------------- 4 files changed, 132 insertions(+), 68 deletions(-) diff --git a/Annex/Transfer.hs b/Annex/Transfer.hs index 20358c6d8d..99436d093b 100644 --- a/Annex/Transfer.hs +++ b/Annex/Transfer.hs @@ -38,6 +38,7 @@ import Types.Concurrency import Annex.Concurrent.Utility import Types.WorkerPool import Annex.WorkerPool +import Annex.TransferrerPool import Backend (isCryptographicallySecure) import qualified Utility.RawFilePath as R @@ -47,8 +48,17 @@ import qualified System.FilePath.ByteString as P import Data.Ord upload :: Remote -> Key -> AssociatedFile -> RetryDecider -> NotifyWitness -> Annex Bool -upload r key f d = upload' (Remote.uuid r) key f d $ - action . Remote.storeKey r key f +upload r key f d _witness = + -- TODO: use this when not handling timeouts + --upload' (Remote.uuid r) key f d $ + -- action . Remote.storeKey r key f + + -- TODO: RetryDecider + -- TODO: Handle timeouts + withTransferrer $ \transferrer -> + performTransfer transferrer AnnexLevel + (Transfer Upload (Remote.uuid r) (fromKey id key)) + (Just r) f id upload' :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v upload' u key f d a _witness = guardHaveUUID u $ @@ -60,8 +70,16 @@ alwaysUpload u key f d a _witness = guardHaveUUID u $ download :: Remote -> Key -> AssociatedFile -> RetryDecider -> NotifyWitness -> Annex Bool download r key f d witness = - getViaTmp (Remote.retrievalSecurityPolicy r) (RemoteVerify r) key f $ \dest -> - download' (Remote.uuid r) key f d (go dest) witness + -- TODO: use this when not handling timeouts + --getViaTmp (Remote.retrievalSecurityPolicy r) (RemoteVerify r) key f $ \dest -> + -- download' (Remote.uuid r) key f d (go dest) witness + + -- TODO: RetryDecider + -- TODO: Handle timeouts + withTransferrer $ \transferrer -> + performTransfer transferrer AnnexLevel + (Transfer Download (Remote.uuid r) (fromKey id key)) + (Just r) f id where go dest p = verifiedAction $ Remote.retrieveKeyFile r key f (fromRawFilePath dest) p diff --git a/Annex/TransferrerPool.hs b/Annex/TransferrerPool.hs index 286c6f9be2..9aec95035e 100644 --- a/Annex/TransferrerPool.hs +++ b/Annex/TransferrerPool.hs @@ -10,35 +10,66 @@ module Annex.TransferrerPool where import Annex.Common +import qualified Annex import Types.TransferrerPool import Types.Transfer -import Utility.Batch +import Types.Key +import qualified Types.Remote as Remote +import Git.Types (RemoteName) +import Types.Messages import Messages.Serialized -import qualified Command.TransferKeys as T +import Annex.Path +import Utility.Batch import Control.Concurrent.STM hiding (check) import Control.Concurrent import Control.Monad.IO.Class (MonadIO) +import Text.Read (readMaybe) +import System.Log.Logger (debugM) -{- Runs an action with a Transferrer from the pool. - - - - When minimizeprocesses is True, only one Transferrer is left running - - in the pool at a time. So if this needed to start a new Transferrer, - - it's stopped when done. Otherwise, idle processes are left in the pool - - for use later. - -} -withTransferrer :: Bool -> MkCheckTransferrer -> FilePath -> BatchCommandMaker -> TransferrerPool -> (Transferrer -> IO a) -> IO a -withTransferrer minimizeprocesses mkcheck program batchmaker pool a = do - (mi, leftinpool) <- atomically (popTransferrerPool pool) - i@(TransferrerPoolItem (Just t) check) <- case mi of +data TransferRequest = TransferRequest TransferRequestLevel Direction (Either UUID RemoteName) KeyData AssociatedFile + deriving (Show, Read) + +data TransferRequestLevel = AnnexLevel | AssistantLevel + deriving (Show, Read) + +data TransferResponse + = TransferOutput SerializedOutput + | TransferResult Bool + deriving (Show, Read) + +{- Runs an action with a Transferrer from the pool. -} +withTransferrer :: (Transferrer -> Annex a) -> Annex a +withTransferrer a = do + program <- liftIO programPath + pool <- Annex.getState Annex.transferrerpool + let nocheck = pure (pure True) + withTransferrer' False nocheck program nonBatchCommandMaker pool a + +withTransferrer' + :: (MonadIO m, MonadFail m, MonadMask m) + => Bool + -- ^ When minimizeprocesses is True, only one Transferrer is left + -- running in the pool at a time. So if this needed to start a + -- new Transferrer, it's stopped when done. Otherwise, idle + -- processes are left in the pool for use later. + -> MkCheckTransferrer + -> FilePath + -> BatchCommandMaker + -> TransferrerPool + -> (Transferrer -> m a) + -> m a +withTransferrer' minimizeprocesses mkcheck program batchmaker pool a = do + (mi, leftinpool) <- liftIO $ atomically (popTransferrerPool pool) + i@(TransferrerPoolItem (Just t) check) <- liftIO $ case mi of Nothing -> mkTransferrerPoolItem mkcheck =<< mkTransferrer program batchmaker Just i -> checkTransferrerPoolItem program batchmaker i a t `finally` returntopool leftinpool check t i where returntopool leftinpool check t i | not minimizeprocesses || leftinpool == 0 = - atomically $ pushTransferrerPool pool i - | otherwise = do + liftIO $ atomically $ pushTransferrerPool pool i + | otherwise = liftIO $ do void $ forkIO $ stopTransferrer t atomically $ pushTransferrerPool pool $ TransferrerPoolItem Nothing check @@ -63,17 +94,19 @@ checkTransferrerPoolItem program batchmaker i = case i of performTransfer :: (Monad m, MonadIO m, MonadMask m) => Transferrer + -> TransferRequestLevel -> Transfer - -> TransferInfo + -> Maybe Remote + -> AssociatedFile -> (forall a. Annex a -> m a) -- ^ Run an annex action in the monad. Will not be used with -- actions that block for a long time. -> m Bool -performTransfer transferrer t info runannex = catchBoolIO $ do - (liftIO $ T.sendRequest t info (transferrerWrite transferrer)) +performTransfer transferrer level t mremote afile runannex = catchBoolIO $ do + (liftIO $ sendRequest level t mremote afile (transferrerWrite transferrer)) relaySerializedOutput - (liftIO $ T.readResponse (transferrerRead transferrer)) - (liftIO . T.sendSerializedOutputResponse (transferrerWrite transferrer)) + (liftIO $ readResponse (transferrerRead transferrer)) + (liftIO . sendSerializedOutputResponse (transferrerWrite transferrer)) runannex {- Starts a new git-annex transferkeys process, setting up handles @@ -103,3 +136,34 @@ stopTransferrer t = do hClose $ transferrerRead t hClose $ transferrerWrite t void $ waitForProcess $ transferrerHandle t + +-- | Send a request to perform a transfer. +sendRequest :: TransferRequestLevel -> Transfer -> Maybe Remote -> AssociatedFile -> Handle -> IO () +sendRequest level t mremote afile h = do + let l = show $ TransferRequest level + (transferDirection t) + (maybe (Left (transferUUID t)) (Right . Remote.name) mremote) + (keyData (transferKey t)) + afile + debugM "transfer" ("> " ++ l) + hPutStrLn h l + hFlush h + +sendSerializedOutputResponse :: Handle -> SerializedOutputResponse -> IO () +sendSerializedOutputResponse h sor = hPutStrLn h $ show sor + +-- | Read a response to a transfer requests. +-- +-- Before the final response, this will return whatever SerializedOutput +-- should be displayed as the transfer is performed. +readResponse :: Handle -> IO (Either SerializedOutput Bool) +readResponse h = do + l <- liftIO $ hGetLine h + debugM "transfer" ("< " ++ l) + case readMaybe l of + Just (TransferOutput so) -> return (Left so) + Just (TransferResult r) -> return (Right r) + Nothing -> transferKeysProtocolError l + +transferKeysProtocolError :: String -> a +transferKeysProtocolError l = error $ "transferkeys protocol error: " ++ show l diff --git a/Assistant/TransferSlots.hs b/Assistant/TransferSlots.hs index 89788ca9f0..7504fe2f59 100644 --- a/Assistant/TransferSlots.hs +++ b/Assistant/TransferSlots.hs @@ -91,8 +91,7 @@ runTransferThread' mkcheck program batchmaker d run = go go = catchPauseResume $ do p <- runAssistant d $ liftAnnex $ Annex.getState Annex.transferrerpool - withTransferrer True mkcheck program batchmaker p - run + withTransferrer' True mkcheck program batchmaker p run pause = catchPauseResume $ runEvery (Seconds 86400) noop {- Note: This must use E.try, rather than E.catch. @@ -163,7 +162,7 @@ genTransfer t info = case transferRemote info of - usual cleanup. However, first check if something else is - running the transfer, to avoid removing active transfers. -} - go remote transferrer = ifM (performTransfer transferrer t info liftAnnex) + go remote transferrer = ifM (performTransfer transferrer AssistantLevel t (transferRemote info) (associatedFile info) liftAnnex) ( do case associatedFile info of AssociatedFile Nothing -> noop diff --git a/Command/TransferKeys.hs b/Command/TransferKeys.hs index 6e2112b5f8..573530e98f 100644 --- a/Command/TransferKeys.hs +++ b/Command/TransferKeys.hs @@ -14,22 +14,13 @@ import Logs.Location import Annex.Transfer import qualified Remote import Utility.SimpleProtocol (dupIoHandles) -import Git.Types (RemoteName) import qualified Database.Keys import Annex.BranchState import Types.Messages -import Types.Key +import Annex.TransferrerPool import Text.Read (readMaybe) -data TransferRequest = TransferRequest Direction (Either UUID RemoteName) KeyData AssociatedFile - deriving (Show, Read) - -data TransferResponse - = TransferOutput SerializedOutput - | TransferResult Bool - deriving (Show, Read) - cmd :: Command cmd = command "transferkeys" SectionPlumbing "transfers keys" paramNothing (withParams seek) @@ -42,12 +33,30 @@ start = do enableInteractiveBranchAccess (readh, writeh) <- liftIO dupIoHandles Annex.setOutput $ SerializedOutput - (hPutStrLn writeh . show . TransferOutput) + (\v -> hPutStrLn writeh (show (TransferOutput v)) >> hFlush writeh) (readMaybe <$> hGetLine readh) runRequests readh writeh runner stop where - runner (TransferRequest direction _ keydata file) remote + runner (TransferRequest AnnexLevel direction _ keydata file) remote + | direction == Upload = + -- This is called by eg, Annex.Transfer.upload, + -- so caller is responsible for doing notification, + -- and for retrying. + upload' (Remote.uuid remote) key file noRetry + (Remote.action . Remote.storeKey remote key file) + noNotification + | otherwise = + -- This is called by eg, Annex.Transfer.download + -- so caller is responsible for doing notification + -- and for retrying. + let go p = getViaTmp (Remote.retrievalSecurityPolicy remote) (RemoteVerify remote) key file $ \t -> do + Remote.verifiedAction (Remote.retrieveKeyFile remote key file (fromRawFilePath t) p) + in download' (Remote.uuid remote) key file noRetry go + noNotification + where + key = mkKey (const keydata) + runner (TransferRequest AssistantLevel direction _ keydata file) remote | direction == Upload = notifyTransfer direction file $ upload' (Remote.uuid remote) key file stdRetry $ \p -> do tryNonAsync (Remote.storeKey remote key file p) >>= \case @@ -83,7 +92,7 @@ runRequests readh writeh a = go Nothing Nothing go lastremoteoruuid lastremote = unlessM (liftIO $ hIsEOF readh) $ do l <- liftIO $ hGetLine readh case readMaybe l of - Just tr@(TransferRequest _ remoteoruuid _ _) -> do + Just tr@(TransferRequest _ _ remoteoruuid _ _) -> do -- Often the same remote will be used -- repeatedly, so cache the last one to -- avoid looking up repeatedly. @@ -95,35 +104,9 @@ runRequests readh writeh a = go Nothing Nothing Just remote -> do sendresult =<< a tr remote go (Just remoteoruuid) mremote - Nothing -> protocolError l - Nothing -> protocolError l + Nothing -> transferKeysProtocolError l + Nothing -> transferKeysProtocolError l sendresult b = liftIO $ do hPutStrLn writeh $ show $ TransferResult b hFlush writeh - --- | Send a request to this command to perform a transfer. -sendRequest :: Transfer -> TransferInfo -> Handle -> IO () -sendRequest t tinfo h = hPutStrLn h $ show $ TransferRequest - (transferDirection t) - (maybe (Left (transferUUID t)) (Right . Remote.name) (transferRemote tinfo)) - (keyData (transferKey t)) - (associatedFile tinfo) - -sendSerializedOutputResponse :: Handle -> SerializedOutputResponse -> IO () -sendSerializedOutputResponse h sor = hPutStrLn h $ show sor - --- | Read a response from this command. --- --- Before the final response, this will return whatever SerializedOutput --- should be displayed as the transfer is performed. -readResponse :: Handle -> IO (Either SerializedOutput Bool) -readResponse h = do - l <- liftIO $ hGetLine h - case readMaybe l of - Just (TransferOutput so) -> return (Left so) - Just (TransferResult r) -> return (Right r) - Nothing -> protocolError l - -protocolError :: String -> a -protocolError l = error $ "transferkeys protocol error: " ++ show l