finally using transferkeys

Seems to work! Even progress bars. Have not tested prompting or various
error message displays yet.

transferkeys had to be made to operate in different modes for the
Assistant and Annex monads. A bit ugly, but it did relegate that
really ugly Database.Keys.closeDb in transferkeys to only the assistant
code path.

This commit was sponsored by Noam Kremen.
This commit is contained in:
Joey Hess 2020-12-07 16:11:29 -04:00
parent 4c47568876
commit fcc9e01556
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
4 changed files with 132 additions and 68 deletions

View file

@ -38,6 +38,7 @@ import Types.Concurrency
import Annex.Concurrent.Utility
import Types.WorkerPool
import Annex.WorkerPool
import Annex.TransferrerPool
import Backend (isCryptographicallySecure)
import qualified Utility.RawFilePath as R
@ -47,8 +48,17 @@ import qualified System.FilePath.ByteString as P
import Data.Ord
upload :: Remote -> Key -> AssociatedFile -> RetryDecider -> NotifyWitness -> Annex Bool
upload r key f d = upload' (Remote.uuid r) key f d $
action . Remote.storeKey r key f
upload r key f d _witness =
-- TODO: use this when not handling timeouts
--upload' (Remote.uuid r) key f d $
-- action . Remote.storeKey r key f
-- TODO: RetryDecider
-- TODO: Handle timeouts
withTransferrer $ \transferrer ->
performTransfer transferrer AnnexLevel
(Transfer Upload (Remote.uuid r) (fromKey id key))
(Just r) f id
upload' :: Observable v => UUID -> Key -> AssociatedFile -> RetryDecider -> (MeterUpdate -> Annex v) -> NotifyWitness -> Annex v
upload' u key f d a _witness = guardHaveUUID u $
@ -60,8 +70,16 @@ alwaysUpload u key f d a _witness = guardHaveUUID u $
download :: Remote -> Key -> AssociatedFile -> RetryDecider -> NotifyWitness -> Annex Bool
download r key f d witness =
getViaTmp (Remote.retrievalSecurityPolicy r) (RemoteVerify r) key f $ \dest ->
download' (Remote.uuid r) key f d (go dest) witness
-- TODO: use this when not handling timeouts
--getViaTmp (Remote.retrievalSecurityPolicy r) (RemoteVerify r) key f $ \dest ->
-- download' (Remote.uuid r) key f d (go dest) witness
-- TODO: RetryDecider
-- TODO: Handle timeouts
withTransferrer $ \transferrer ->
performTransfer transferrer AnnexLevel
(Transfer Download (Remote.uuid r) (fromKey id key))
(Just r) f id
where
go dest p = verifiedAction $
Remote.retrieveKeyFile r key f (fromRawFilePath dest) p

View file

@ -10,35 +10,66 @@
module Annex.TransferrerPool where
import Annex.Common
import qualified Annex
import Types.TransferrerPool
import Types.Transfer
import Utility.Batch
import Types.Key
import qualified Types.Remote as Remote
import Git.Types (RemoteName)
import Types.Messages
import Messages.Serialized
import qualified Command.TransferKeys as T
import Annex.Path
import Utility.Batch
import Control.Concurrent.STM hiding (check)
import Control.Concurrent
import Control.Monad.IO.Class (MonadIO)
import Text.Read (readMaybe)
import System.Log.Logger (debugM)
{- Runs an action with a Transferrer from the pool.
-
- When minimizeprocesses is True, only one Transferrer is left running
- in the pool at a time. So if this needed to start a new Transferrer,
- it's stopped when done. Otherwise, idle processes are left in the pool
- for use later.
-}
withTransferrer :: Bool -> MkCheckTransferrer -> FilePath -> BatchCommandMaker -> TransferrerPool -> (Transferrer -> IO a) -> IO a
withTransferrer minimizeprocesses mkcheck program batchmaker pool a = do
(mi, leftinpool) <- atomically (popTransferrerPool pool)
i@(TransferrerPoolItem (Just t) check) <- case mi of
data TransferRequest = TransferRequest TransferRequestLevel Direction (Either UUID RemoteName) KeyData AssociatedFile
deriving (Show, Read)
data TransferRequestLevel = AnnexLevel | AssistantLevel
deriving (Show, Read)
data TransferResponse
= TransferOutput SerializedOutput
| TransferResult Bool
deriving (Show, Read)
{- Runs an action with a Transferrer from the pool. -}
withTransferrer :: (Transferrer -> Annex a) -> Annex a
withTransferrer a = do
program <- liftIO programPath
pool <- Annex.getState Annex.transferrerpool
let nocheck = pure (pure True)
withTransferrer' False nocheck program nonBatchCommandMaker pool a
withTransferrer'
:: (MonadIO m, MonadFail m, MonadMask m)
=> Bool
-- ^ When minimizeprocesses is True, only one Transferrer is left
-- running in the pool at a time. So if this needed to start a
-- new Transferrer, it's stopped when done. Otherwise, idle
-- processes are left in the pool for use later.
-> MkCheckTransferrer
-> FilePath
-> BatchCommandMaker
-> TransferrerPool
-> (Transferrer -> m a)
-> m a
withTransferrer' minimizeprocesses mkcheck program batchmaker pool a = do
(mi, leftinpool) <- liftIO $ atomically (popTransferrerPool pool)
i@(TransferrerPoolItem (Just t) check) <- liftIO $ case mi of
Nothing -> mkTransferrerPoolItem mkcheck =<< mkTransferrer program batchmaker
Just i -> checkTransferrerPoolItem program batchmaker i
a t `finally` returntopool leftinpool check t i
where
returntopool leftinpool check t i
| not minimizeprocesses || leftinpool == 0 =
atomically $ pushTransferrerPool pool i
| otherwise = do
liftIO $ atomically $ pushTransferrerPool pool i
| otherwise = liftIO $ do
void $ forkIO $ stopTransferrer t
atomically $ pushTransferrerPool pool $ TransferrerPoolItem Nothing check
@ -63,17 +94,19 @@ checkTransferrerPoolItem program batchmaker i = case i of
performTransfer
:: (Monad m, MonadIO m, MonadMask m)
=> Transferrer
-> TransferRequestLevel
-> Transfer
-> TransferInfo
-> Maybe Remote
-> AssociatedFile
-> (forall a. Annex a -> m a)
-- ^ Run an annex action in the monad. Will not be used with
-- actions that block for a long time.
-> m Bool
performTransfer transferrer t info runannex = catchBoolIO $ do
(liftIO $ T.sendRequest t info (transferrerWrite transferrer))
performTransfer transferrer level t mremote afile runannex = catchBoolIO $ do
(liftIO $ sendRequest level t mremote afile (transferrerWrite transferrer))
relaySerializedOutput
(liftIO $ T.readResponse (transferrerRead transferrer))
(liftIO . T.sendSerializedOutputResponse (transferrerWrite transferrer))
(liftIO $ readResponse (transferrerRead transferrer))
(liftIO . sendSerializedOutputResponse (transferrerWrite transferrer))
runannex
{- Starts a new git-annex transferkeys process, setting up handles
@ -103,3 +136,34 @@ stopTransferrer t = do
hClose $ transferrerRead t
hClose $ transferrerWrite t
void $ waitForProcess $ transferrerHandle t
-- | Send a request to perform a transfer.
sendRequest :: TransferRequestLevel -> Transfer -> Maybe Remote -> AssociatedFile -> Handle -> IO ()
sendRequest level t mremote afile h = do
let l = show $ TransferRequest level
(transferDirection t)
(maybe (Left (transferUUID t)) (Right . Remote.name) mremote)
(keyData (transferKey t))
afile
debugM "transfer" ("> " ++ l)
hPutStrLn h l
hFlush h
sendSerializedOutputResponse :: Handle -> SerializedOutputResponse -> IO ()
sendSerializedOutputResponse h sor = hPutStrLn h $ show sor
-- | Read a response to a transfer requests.
--
-- Before the final response, this will return whatever SerializedOutput
-- should be displayed as the transfer is performed.
readResponse :: Handle -> IO (Either SerializedOutput Bool)
readResponse h = do
l <- liftIO $ hGetLine h
debugM "transfer" ("< " ++ l)
case readMaybe l of
Just (TransferOutput so) -> return (Left so)
Just (TransferResult r) -> return (Right r)
Nothing -> transferKeysProtocolError l
transferKeysProtocolError :: String -> a
transferKeysProtocolError l = error $ "transferkeys protocol error: " ++ show l

View file

@ -91,8 +91,7 @@ runTransferThread' mkcheck program batchmaker d run = go
go = catchPauseResume $ do
p <- runAssistant d $ liftAnnex $
Annex.getState Annex.transferrerpool
withTransferrer True mkcheck program batchmaker p
run
withTransferrer' True mkcheck program batchmaker p run
pause = catchPauseResume $
runEvery (Seconds 86400) noop
{- Note: This must use E.try, rather than E.catch.
@ -163,7 +162,7 @@ genTransfer t info = case transferRemote info of
- usual cleanup. However, first check if something else is
- running the transfer, to avoid removing active transfers.
-}
go remote transferrer = ifM (performTransfer transferrer t info liftAnnex)
go remote transferrer = ifM (performTransfer transferrer AssistantLevel t (transferRemote info) (associatedFile info) liftAnnex)
( do
case associatedFile info of
AssociatedFile Nothing -> noop

View file

@ -14,22 +14,13 @@ import Logs.Location
import Annex.Transfer
import qualified Remote
import Utility.SimpleProtocol (dupIoHandles)
import Git.Types (RemoteName)
import qualified Database.Keys
import Annex.BranchState
import Types.Messages
import Types.Key
import Annex.TransferrerPool
import Text.Read (readMaybe)
data TransferRequest = TransferRequest Direction (Either UUID RemoteName) KeyData AssociatedFile
deriving (Show, Read)
data TransferResponse
= TransferOutput SerializedOutput
| TransferResult Bool
deriving (Show, Read)
cmd :: Command
cmd = command "transferkeys" SectionPlumbing "transfers keys"
paramNothing (withParams seek)
@ -42,12 +33,30 @@ start = do
enableInteractiveBranchAccess
(readh, writeh) <- liftIO dupIoHandles
Annex.setOutput $ SerializedOutput
(hPutStrLn writeh . show . TransferOutput)
(\v -> hPutStrLn writeh (show (TransferOutput v)) >> hFlush writeh)
(readMaybe <$> hGetLine readh)
runRequests readh writeh runner
stop
where
runner (TransferRequest direction _ keydata file) remote
runner (TransferRequest AnnexLevel direction _ keydata file) remote
| direction == Upload =
-- This is called by eg, Annex.Transfer.upload,
-- so caller is responsible for doing notification,
-- and for retrying.
upload' (Remote.uuid remote) key file noRetry
(Remote.action . Remote.storeKey remote key file)
noNotification
| otherwise =
-- This is called by eg, Annex.Transfer.download
-- so caller is responsible for doing notification
-- and for retrying.
let go p = getViaTmp (Remote.retrievalSecurityPolicy remote) (RemoteVerify remote) key file $ \t -> do
Remote.verifiedAction (Remote.retrieveKeyFile remote key file (fromRawFilePath t) p)
in download' (Remote.uuid remote) key file noRetry go
noNotification
where
key = mkKey (const keydata)
runner (TransferRequest AssistantLevel direction _ keydata file) remote
| direction == Upload = notifyTransfer direction file $
upload' (Remote.uuid remote) key file stdRetry $ \p -> do
tryNonAsync (Remote.storeKey remote key file p) >>= \case
@ -83,7 +92,7 @@ runRequests readh writeh a = go Nothing Nothing
go lastremoteoruuid lastremote = unlessM (liftIO $ hIsEOF readh) $ do
l <- liftIO $ hGetLine readh
case readMaybe l of
Just tr@(TransferRequest _ remoteoruuid _ _) -> do
Just tr@(TransferRequest _ _ remoteoruuid _ _) -> do
-- Often the same remote will be used
-- repeatedly, so cache the last one to
-- avoid looking up repeatedly.
@ -95,35 +104,9 @@ runRequests readh writeh a = go Nothing Nothing
Just remote -> do
sendresult =<< a tr remote
go (Just remoteoruuid) mremote
Nothing -> protocolError l
Nothing -> protocolError l
Nothing -> transferKeysProtocolError l
Nothing -> transferKeysProtocolError l
sendresult b = liftIO $ do
hPutStrLn writeh $ show $ TransferResult b
hFlush writeh
-- | Send a request to this command to perform a transfer.
sendRequest :: Transfer -> TransferInfo -> Handle -> IO ()
sendRequest t tinfo h = hPutStrLn h $ show $ TransferRequest
(transferDirection t)
(maybe (Left (transferUUID t)) (Right . Remote.name) (transferRemote tinfo))
(keyData (transferKey t))
(associatedFile tinfo)
sendSerializedOutputResponse :: Handle -> SerializedOutputResponse -> IO ()
sendSerializedOutputResponse h sor = hPutStrLn h $ show sor
-- | Read a response from this command.
--
-- Before the final response, this will return whatever SerializedOutput
-- should be displayed as the transfer is performed.
readResponse :: Handle -> IO (Either SerializedOutput Bool)
readResponse h = do
l <- liftIO $ hGetLine h
case readMaybe l of
Just (TransferOutput so) -> return (Left so)
Just (TransferResult r) -> return (Right r)
Nothing -> protocolError l
protocolError :: String -> a
protocolError l = error $ "transferkeys protocol error: " ++ show l