new protocol for transferkeys, with message serialization
Necessarily threw out the old protocol, so if an old git-annex assistant is running, and starts a transferkeys from the new git-annex, it would fail. But, that seems unlikely; the assistant starts up transferkeys processes and then keeps them running. Still, may need to test that scenario. The new protocol is simple read/show and looks like this: TransferRequest Download (Right "origin") (Key {keyName = "f8f8766a836fb6120abf4d5328ce8761404e437529e997aaa0363bdd4fecd7bb", keyVariety = SHA2Key (HashSize 256) (HasExt True), keySize = Just 30, keyMtime = Nothing, keyChunkSize = Nothing, keyChunkNum = Nothing}) (AssociatedFile (Just "foo")) TransferOutput (ProgressMeter (Just 30) (MeterState {meterBytesProcessed = BytesProcessed 0, meterTimeStamp = 1.6070268727892535e9}) (MeterState {meterBytesProcessed = BytesProcessed 30, meterTimeStamp = 1.6070268728043e9})) TransferOutput (OutputMessage "(checksum...) ") TransferResult True Granted, this is not optimally fast, but it seems good enough, and is probably nearly as fast as the old protocol anyhow. emitSerializedOutput for ProgressMeter is not yet implemented. It needs to somehow start or update a progress meter. There may need to be a new message that allocates a progress meter, and then have ProgressMeter update it. This commit was sponsored by Ethan Aubin
This commit is contained in:
parent
82dbc4387c
commit
cad147cbbf
10 changed files with 98 additions and 96 deletions
|
@ -155,7 +155,7 @@ genTransfer t info = case transferRemote info of
|
|||
- usual cleanup. However, first check if something else is
|
||||
- running the transfer, to avoid removing active transfers.
|
||||
-}
|
||||
go remote transferrer = ifM (liftIO $ performTransfer transferrer t info)
|
||||
go remote transferrer = ifM (liftAnnex $ performTransfer transferrer t info)
|
||||
( do
|
||||
case associatedFile info of
|
||||
AssociatedFile Nothing -> noop
|
||||
|
|
|
@ -55,9 +55,9 @@ checkTransferrerPoolItem program batchmaker i = case i of
|
|||
|
||||
{- Requests that a Transferrer perform a Transfer, and waits for it to
|
||||
- finish. -}
|
||||
performTransfer :: Transferrer -> Transfer -> TransferInfo -> IO Bool
|
||||
performTransfer :: Transferrer -> Transfer -> TransferInfo -> Annex Bool
|
||||
performTransfer transferrer t info = catchBoolIO $ do
|
||||
T.sendRequest t info (transferrerWrite transferrer)
|
||||
(liftIO $ T.sendRequest t info (transferrerWrite transferrer))
|
||||
T.readResponse (transferrerRead transferrer)
|
||||
|
||||
{- Starts a new git-annex transferkeys process, setting up handles
|
||||
|
|
|
@ -1,15 +1,14 @@
|
|||
{- git-annex command, used internally by assistant
|
||||
{- git-annex command
|
||||
-
|
||||
- Copyright 2012, 2013 Joey Hess <id@joeyh.name>
|
||||
- Copyright 2012-2020 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU AGPL version 3 or higher.
|
||||
-}
|
||||
|
||||
{-# LANGUAGE TypeSynonymInstances, FlexibleInstances #-}
|
||||
|
||||
module Command.TransferKeys where
|
||||
|
||||
import Command
|
||||
import qualified Annex
|
||||
import Annex.Content
|
||||
import Logs.Location
|
||||
import Annex.Transfer
|
||||
|
@ -18,8 +17,19 @@ import Utility.SimpleProtocol (dupIoHandles)
|
|||
import Git.Types (RemoteName)
|
||||
import qualified Database.Keys
|
||||
import Annex.BranchState
|
||||
import Types.Messages
|
||||
import Types.Key
|
||||
import Messages.Internal
|
||||
|
||||
data TransferRequest = TransferRequest Direction Remote Key AssociatedFile
|
||||
import Text.Read (readMaybe)
|
||||
|
||||
data TransferRequest = TransferRequest Direction (Either UUID RemoteName) KeyData AssociatedFile
|
||||
deriving (Show, Read)
|
||||
|
||||
data TransferResponse
|
||||
= TransferOutput SerializedOutput
|
||||
| TransferResult Bool
|
||||
deriving (Show, Read)
|
||||
|
||||
cmd :: Command
|
||||
cmd = command "transferkeys" SectionPlumbing "transfers keys"
|
||||
|
@ -32,10 +42,12 @@ start :: CommandStart
|
|||
start = do
|
||||
enableInteractiveBranchAccess
|
||||
(readh, writeh) <- liftIO dupIoHandles
|
||||
Annex.setOutput $ SerializedOutput $
|
||||
hPutStrLn writeh . show . TransferOutput
|
||||
runRequests readh writeh runner
|
||||
stop
|
||||
where
|
||||
runner (TransferRequest direction remote key file)
|
||||
runner (TransferRequest direction _ keydata file) remote
|
||||
| direction == Upload = notifyTransfer direction file $
|
||||
upload (Remote.uuid remote) key file stdRetry $ \p -> do
|
||||
tryNonAsync (Remote.storeKey remote key file p) >>= \case
|
||||
|
@ -58,82 +70,58 @@ start = do
|
|||
-- not old cached data.
|
||||
Database.Keys.closeDb
|
||||
return r
|
||||
where
|
||||
key = mkKey (const keydata)
|
||||
|
||||
runRequests
|
||||
:: Handle
|
||||
-> Handle
|
||||
-> (TransferRequest -> Annex Bool)
|
||||
-> (TransferRequest -> Remote -> Annex Bool)
|
||||
-> Annex ()
|
||||
runRequests readh writeh a = do
|
||||
liftIO $ hSetBuffering readh NoBuffering
|
||||
go =<< readrequests
|
||||
runRequests readh writeh a = go Nothing Nothing
|
||||
where
|
||||
go (d:rn:k:f:rest) = do
|
||||
case (deserialize d, deserialize rn, deserialize k, deserialize f) of
|
||||
(Just direction, Just remotename, Just key, Just file) -> do
|
||||
mremote <- Remote.byName' remotename
|
||||
go lastremoteoruuid lastremote = unlessM (liftIO $ hIsEOF readh) $ do
|
||||
l <- liftIO $ hGetLine readh
|
||||
case readMaybe l of
|
||||
Just tr@(TransferRequest _ remoteoruuid _ _) -> do
|
||||
-- Often the same remote will be used
|
||||
-- repeatedly, so cache the last one to
|
||||
-- avoid looking up repeatedly.
|
||||
mremote <- if lastremoteoruuid == Just remoteoruuid
|
||||
then pure lastremote
|
||||
else eitherToMaybe <$> Remote.byName'
|
||||
(either fromUUID id remoteoruuid)
|
||||
case mremote of
|
||||
Left _ -> sendresult False
|
||||
Right remote -> sendresult =<< a
|
||||
(TransferRequest direction remote key file)
|
||||
_ -> sendresult False
|
||||
go rest
|
||||
go [] = noop
|
||||
go [""] = noop
|
||||
go v = error $ "transferkeys protocol error: " ++ show v
|
||||
Just remote -> do
|
||||
sendresult =<< a tr remote
|
||||
go (Just remoteoruuid) mremote
|
||||
Nothing -> protocolError l
|
||||
Nothing -> protocolError l
|
||||
|
||||
readrequests = liftIO $ split fieldSep <$> hGetContents readh
|
||||
sendresult b = liftIO $ do
|
||||
hPutStrLn writeh $ serialize b
|
||||
hPutStrLn writeh $ show $ TransferResult b
|
||||
hFlush writeh
|
||||
|
||||
sendRequest :: Transfer -> TransferInfo -> Handle -> IO ()
|
||||
sendRequest t tinfo h = do
|
||||
hPutStr h $ intercalate fieldSep
|
||||
[ serialize (transferDirection t)
|
||||
, maybe (serialize ((fromUUID (transferUUID t)) :: String))
|
||||
(serialize . Remote.name)
|
||||
(transferRemote tinfo)
|
||||
, serialize (transferKey t)
|
||||
, serialize (associatedFile tinfo)
|
||||
, "" -- adds a trailing null
|
||||
]
|
||||
hFlush h
|
||||
sendRequest t tinfo h = hPutStrLn h $ show $ TransferRequest
|
||||
(transferDirection t)
|
||||
(maybe (Left (transferUUID t)) (Right . Remote.name) (transferRemote tinfo))
|
||||
(keyData (transferKey t))
|
||||
(associatedFile tinfo)
|
||||
|
||||
readResponse :: Handle -> IO Bool
|
||||
readResponse h = fromMaybe False . deserialize <$> hGetLine h
|
||||
-- | Read a response from this command.
|
||||
--
|
||||
-- Each TransferOutput line that is read before the final TransferResult
|
||||
-- will be output.
|
||||
readResponse :: Handle -> Annex Bool
|
||||
readResponse h = do
|
||||
l <- liftIO $ hGetLine h
|
||||
case readMaybe l of
|
||||
Just (TransferOutput so) -> do
|
||||
emitSerializedOutput so
|
||||
readResponse h
|
||||
Just (TransferResult r) -> return r
|
||||
Nothing -> protocolError l
|
||||
|
||||
fieldSep :: String
|
||||
fieldSep = "\0"
|
||||
|
||||
class TCSerialized a where
|
||||
serialize :: a -> String
|
||||
deserialize :: String -> Maybe a
|
||||
|
||||
instance TCSerialized Bool where
|
||||
serialize True = "1"
|
||||
serialize False = "0"
|
||||
deserialize "1" = Just True
|
||||
deserialize "0" = Just False
|
||||
deserialize _ = Nothing
|
||||
|
||||
instance TCSerialized Direction where
|
||||
serialize Upload = "u"
|
||||
serialize Download = "d"
|
||||
deserialize "u" = Just Upload
|
||||
deserialize "d" = Just Download
|
||||
deserialize _ = Nothing
|
||||
|
||||
instance TCSerialized AssociatedFile where
|
||||
serialize (AssociatedFile (Just f)) = fromRawFilePath f
|
||||
serialize (AssociatedFile Nothing) = ""
|
||||
deserialize "" = Just (AssociatedFile Nothing)
|
||||
deserialize f = Just (AssociatedFile (Just (toRawFilePath f)))
|
||||
|
||||
instance TCSerialized RemoteName where
|
||||
serialize n = n
|
||||
deserialize n = Just n
|
||||
|
||||
instance TCSerialized Key where
|
||||
serialize = serializeKey
|
||||
deserialize = deserializeKey
|
||||
protocolError :: String -> a
|
||||
protocolError l = error $ "transferkeys protocol error: " ++ show l
|
||||
|
|
|
@ -288,7 +288,7 @@ commandProgressDisabled = withMessageState $ \s -> return $
|
|||
NormalOutput -> concurrentOutputEnabled s
|
||||
QuietOutput -> True
|
||||
JSONOutput _ -> True
|
||||
SerializedOutput -> True
|
||||
SerializedOutput _ -> True
|
||||
|
||||
jsonOutputEnabled :: Annex Bool
|
||||
jsonOutputEnabled = withMessageState $ \s -> return $
|
||||
|
|
|
@ -29,15 +29,15 @@ outputMessage' jsonoutputter jsonbuilder msg = withMessageState $ \s -> case out
|
|||
| otherwise -> liftIO $ flushed $ S.putStr msg
|
||||
JSONOutput _ -> void $ jsonoutputter jsonbuilder s
|
||||
QuietOutput -> q
|
||||
SerializedOutput -> do
|
||||
liftIO $ outputSerialized $ OutputMessage msg
|
||||
SerializedOutput h -> do
|
||||
liftIO $ outputSerialized h $ OutputMessage msg
|
||||
void $ jsonoutputter jsonbuilder s
|
||||
|
||||
-- Buffer changes to JSON until end is reached and then emit it.
|
||||
bufferJSON :: JSONBuilder -> MessageState -> Annex Bool
|
||||
bufferJSON jsonbuilder s = case outputType s of
|
||||
JSONOutput _ -> go (flushed . JSON.emit)
|
||||
SerializedOutput -> go (outputSerialized . JSONObject . JSON.encode)
|
||||
SerializedOutput h -> go (outputSerialized h . JSONObject . JSON.encode)
|
||||
_ -> return False
|
||||
where
|
||||
go emitter
|
||||
|
@ -63,7 +63,7 @@ bufferJSON jsonbuilder s = case outputType s of
|
|||
outputJSON :: JSONBuilder -> MessageState -> Annex Bool
|
||||
outputJSON jsonbuilder s = case outputType s of
|
||||
JSONOutput _ -> go (flushed . JSON.emit)
|
||||
SerializedOutput -> go (outputSerialized . JSONObject . JSON.encode)
|
||||
SerializedOutput h -> go (outputSerialized h . JSONObject . JSON.encode)
|
||||
_ -> return False
|
||||
where
|
||||
go emitter = do
|
||||
|
@ -77,8 +77,8 @@ outputError msg = withMessageState $ \s -> case (outputType s, jsonBuffer s) of
|
|||
let jb' = Just (JSON.addErrorMessage (lines msg) jb)
|
||||
in Annex.changeState $ \st ->
|
||||
st { Annex.output = s { jsonBuffer = jb' } }
|
||||
(SerializedOutput, _) ->
|
||||
liftIO $ outputSerialized $ OutputError msg
|
||||
(SerializedOutput h, _) ->
|
||||
liftIO $ outputSerialized h $ OutputError msg
|
||||
_
|
||||
| concurrentOutputEnabled s -> concurrentMessage s True msg go
|
||||
| otherwise -> go
|
||||
|
@ -94,5 +94,20 @@ q = noop
|
|||
flushed :: IO () -> IO ()
|
||||
flushed a = a >> hFlush stdout
|
||||
|
||||
outputSerialized :: SerializedOutput -> IO ()
|
||||
outputSerialized = print
|
||||
outputSerialized :: (SerializedOutput -> IO ()) -> SerializedOutput -> IO ()
|
||||
outputSerialized = id
|
||||
|
||||
emitSerializedOutput :: SerializedOutput -> Annex ()
|
||||
emitSerializedOutput (OutputMessage msg) =
|
||||
outputMessage' nojsonoutputter nojsonbuilder msg
|
||||
where
|
||||
nojsonoutputter _ _ = return False
|
||||
nojsonbuilder = id
|
||||
emitSerializedOutput (OutputError msg) = outputError msg
|
||||
emitSerializedOutput (ProgressMeter sz old new) = undefined -- TODO
|
||||
emitSerializedOutput (JSONObject b) =
|
||||
withMessageState $ \s -> case outputType s of
|
||||
JSONOutput _ -> liftIO $ flushed $ JSON.emit' b
|
||||
SerializedOutput h -> liftIO $
|
||||
outputSerialized h $ JSONObject b
|
||||
_ -> q
|
||||
|
|
|
@ -11,6 +11,7 @@ module Messages.JSON (
|
|||
JSONBuilder,
|
||||
JSONChunk(..),
|
||||
emit,
|
||||
emit',
|
||||
encode,
|
||||
none,
|
||||
start,
|
||||
|
@ -52,9 +53,12 @@ emitLock :: MVar ()
|
|||
emitLock = unsafePerformIO $ newMVar ()
|
||||
|
||||
emit :: Object -> IO ()
|
||||
emit o = do
|
||||
emit = emit' . encode
|
||||
|
||||
emit' :: L.ByteString -> IO ()
|
||||
emit' b = do
|
||||
takeMVar emitLock
|
||||
L.hPut stdout (encode o)
|
||||
L.hPut stdout b
|
||||
putStr "\n"
|
||||
putMVar emitLock ()
|
||||
|
||||
|
|
|
@ -95,9 +95,9 @@ metered othermeter sizer a = withMessageState $ \st ->
|
|||
updateMeter meter
|
||||
a meter (combinemeter m)
|
||||
| otherwise = nometer
|
||||
go msize (MessageState { outputType = SerializedOutput }) = do
|
||||
go msize (MessageState { outputType = SerializedOutput h }) = do
|
||||
meter <- liftIO $ mkMeter msize $ \_ msize' old new ->
|
||||
outputSerialized $ ProgressMeter msize' old new
|
||||
outputSerialized h $ ProgressMeter msize' old new
|
||||
m <- liftIO $ rateLimitMeterUpdate minratelimit meter $
|
||||
updateMeter meter
|
||||
a meter (combinemeter m)
|
||||
|
|
|
@ -11,6 +11,7 @@ module Types.Key (
|
|||
KeyData(..),
|
||||
Key,
|
||||
fromKey,
|
||||
keyData,
|
||||
mkKey,
|
||||
alterKey,
|
||||
isKeyPrefix,
|
||||
|
@ -201,7 +202,7 @@ splitKeyNameExtension' keyname = S8.span (/= '.') keyname
|
|||
|
||||
{- A filename may be associated with a Key. -}
|
||||
newtype AssociatedFile = AssociatedFile (Maybe RawFilePath)
|
||||
deriving (Show, Eq, Ord)
|
||||
deriving (Show, Read, Eq, Ord)
|
||||
|
||||
{- There are several different varieties of keys. -}
|
||||
data KeyVariety
|
||||
|
|
|
@ -19,8 +19,7 @@ data OutputType
|
|||
= NormalOutput
|
||||
| QuietOutput
|
||||
| JSONOutput JSONOptions
|
||||
| SerializedOutput
|
||||
deriving (Show)
|
||||
| SerializedOutput (SerializedOutput -> IO ())
|
||||
|
||||
data JSONOptions = JSONOptions
|
||||
{ jsonProgress :: Bool
|
||||
|
|
|
@ -8,20 +8,15 @@ git annex transferkeys
|
|||
|
||||
# DESCRIPTION
|
||||
|
||||
This plumbing-level command is used by the assistant to transfer data.
|
||||
This plumbing-level command is used to transfer data.
|
||||
It is a long-running process, which is fed instructions about the keys
|
||||
to transfer using an internal stdio protocol, which is
|
||||
intentionally not documented (as it may change at any time).
|
||||
|
||||
It's normal to have a transferkeys process running when the assistant is
|
||||
running.
|
||||
|
||||
# SEE ALSO
|
||||
|
||||
[[git-annex]](1)
|
||||
|
||||
[[git-annex-assistant]](1)
|
||||
|
||||
# AUTHOR
|
||||
|
||||
Joey Hess <id@joeyh.name>
|
||||
|
|
Loading…
Reference in a new issue