Merge branch 'asyncexternal'
This commit is contained in:
commit
95d9a3cf8a
9 changed files with 394 additions and 160 deletions
13
CHANGELOG
13
CHANGELOG
|
@ -1,4 +1,15 @@
|
|||
git-annex (8.20200810) upstream; urgency=medium
|
||||
git-annex (8.20200815) UNRELEASED; urgency=medium
|
||||
|
||||
* The external special remote protocol got an ASYNC extension.
|
||||
This can be used by an external special remote to let a single process
|
||||
perform concurrent actions, rather than multiple processes being
|
||||
started, when that is more efficient.
|
||||
* Display warning when external special remote does not start up
|
||||
properly, or is not usable.
|
||||
|
||||
-- Joey Hess <id@joeyh.name> Fri, 14 Aug 2020 14:57:45 -0400
|
||||
|
||||
git-annex (8.20200814) upstream; urgency=medium
|
||||
|
||||
* Added support for external backend programs. So if you want a hash
|
||||
that git-annex doesn't support, or something stranger, you can write a
|
||||
|
|
|
@ -6,10 +6,12 @@
|
|||
-}
|
||||
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE BangPatterns #-}
|
||||
|
||||
module Remote.External (remote) where
|
||||
|
||||
import Remote.External.Types
|
||||
import Remote.External.AsyncExtension
|
||||
import qualified Annex
|
||||
import Annex.Common
|
||||
import qualified Annex.ExternalAddonProcess as AddonProcess
|
||||
|
@ -194,7 +196,7 @@ externalSetup _ mu _ c gc = do
|
|||
-- responding to INITREMOTE need to be applied to
|
||||
-- the RemoteConfig.
|
||||
changes <- withExternalState external $
|
||||
liftIO . atomically . readTVar . externalConfigChanges
|
||||
liftIO . atomically . readTMVar . externalConfigChanges
|
||||
return (changes c')
|
||||
|
||||
gitConfigSpecialRemote u c'' [("externaltype", externaltype)]
|
||||
|
@ -406,28 +408,28 @@ handleRequest' st external req mp responsehandler
|
|||
send $ VALUE $ fromRawFilePath $ hashDirLower def k
|
||||
handleRemoteRequest (SETCONFIG setting value) =
|
||||
liftIO $ atomically $ do
|
||||
modifyTVar' (externalConfig st) $ \(ParsedRemoteConfig m c) ->
|
||||
let m' = M.insert
|
||||
ParsedRemoteConfig m c <- takeTMVar (externalConfig st)
|
||||
let !m' = M.insert
|
||||
(Accepted setting)
|
||||
(RemoteConfigValue (PassedThrough value))
|
||||
m
|
||||
c' = M.insert
|
||||
let !c' = M.insert
|
||||
(Accepted setting)
|
||||
(Accepted value)
|
||||
c
|
||||
in ParsedRemoteConfig m' c'
|
||||
modifyTVar' (externalConfigChanges st) $ \f ->
|
||||
M.insert (Accepted setting) (Accepted value) . f
|
||||
putTMVar (externalConfig st) (ParsedRemoteConfig m' c')
|
||||
f <- takeTMVar (externalConfigChanges st)
|
||||
let !f' = M.insert (Accepted setting) (Accepted value) . f
|
||||
putTMVar (externalConfigChanges st) f'
|
||||
handleRemoteRequest (GETCONFIG setting) = do
|
||||
value <- maybe "" fromProposedAccepted
|
||||
. (M.lookup (Accepted setting))
|
||||
. unparsedRemoteConfig
|
||||
<$> liftIO (atomically $ readTVar $ externalConfig st)
|
||||
<$> liftIO (atomically $ readTMVar $ externalConfig st)
|
||||
send $ VALUE value
|
||||
handleRemoteRequest (SETCREDS setting login password) = case (externalUUID external, externalGitConfig external) of
|
||||
(Just u, Just gc) -> do
|
||||
let v = externalConfig st
|
||||
pc <- liftIO $ atomically $ readTVar v
|
||||
pc <- liftIO $ atomically $ takeTMVar (externalConfig st)
|
||||
pc' <- setRemoteCredPair' pc encryptionAlreadySetup gc
|
||||
(credstorage setting u)
|
||||
(Just (login, password))
|
||||
|
@ -436,13 +438,14 @@ handleRequest' st external req mp responsehandler
|
|||
(unparsedRemoteConfig pc')
|
||||
(unparsedRemoteConfig pc)
|
||||
void $ liftIO $ atomically $ do
|
||||
_ <- swapTVar v pc'
|
||||
modifyTVar' (externalConfigChanges st) $ \f ->
|
||||
M.union configchanges . f
|
||||
putTMVar (externalConfig st) pc'
|
||||
f <- takeTMVar (externalConfigChanges st)
|
||||
let !f' = M.union configchanges . f
|
||||
putTMVar (externalConfigChanges st) f'
|
||||
_ -> senderror "cannot send SETCREDS here"
|
||||
handleRemoteRequest (GETCREDS setting) = case (externalUUID external, externalGitConfig external) of
|
||||
(Just u, Just gc) -> do
|
||||
c <- liftIO $ atomically $ readTVar $ externalConfig st
|
||||
c <- liftIO $ atomically $ readTMVar $ externalConfig st
|
||||
creds <- fromMaybe ("", "") <$>
|
||||
getRemoteCredPair c gc (credstorage setting u)
|
||||
send $ CREDS (fst creds) (snd creds)
|
||||
|
@ -503,18 +506,17 @@ handleRequest' st external req mp responsehandler
|
|||
withurl mk uri = handleRemoteRequest $ mk $
|
||||
setDownloader (show uri) OtherDownloader
|
||||
|
||||
sendMessage :: Sendable m => ExternalState -> m -> Annex ()
|
||||
sendMessage st m = liftIO $ externalSend st line
|
||||
where
|
||||
line = unwords $ formatMessage m
|
||||
sendMessage :: (Sendable m, ToAsyncWrapped m) => ExternalState -> m -> Annex ()
|
||||
sendMessage st m = liftIO $ externalSend st m
|
||||
|
||||
sendMessageAddonProcess :: AddonProcess.ExternalAddonProcess -> String -> IO ()
|
||||
sendMessageAddonProcess p line = do
|
||||
sendMessageAddonProcess :: Sendable m => AddonProcess.ExternalAddonProcess -> m -> IO ()
|
||||
sendMessageAddonProcess p m = do
|
||||
AddonProcess.protocolDebug p True line
|
||||
hPutStrLn h line
|
||||
hFlush h
|
||||
where
|
||||
h = AddonProcess.externalSend p
|
||||
line = unwords $ formatMessage m
|
||||
|
||||
receiveMessageAddonProcess :: AddonProcess.ExternalAddonProcess -> IO (Maybe String)
|
||||
receiveMessageAddonProcess p = do
|
||||
|
@ -550,7 +552,7 @@ receiveMessage
|
|||
receiveMessage st external handleresponse handlerequest handleexceptional =
|
||||
go =<< liftIO (externalReceive st)
|
||||
where
|
||||
go Nothing = protocolError False ""
|
||||
go Nothing = protocolError False "<EOF>"
|
||||
go (Just s) = case parseMessage s :: Maybe Response of
|
||||
Just resp -> case handleresponse resp of
|
||||
Nothing -> protocolError True s
|
||||
|
@ -563,10 +565,12 @@ receiveMessage st external handleresponse handlerequest handleexceptional =
|
|||
Nothing -> case parseMessage s :: Maybe ExceptionalMessage of
|
||||
Just msg -> maybe (protocolError True s) id (handleexceptional msg)
|
||||
Nothing -> protocolError False s
|
||||
protocolError parsed s = giveup $ "external special remote protocol error, unexpectedly received \"" ++ s ++ "\" " ++
|
||||
protocolError parsed s = do
|
||||
warning $ "external special remote protocol error, unexpectedly received \"" ++ s ++ "\" " ++
|
||||
if parsed
|
||||
then "(command not allowed at this time)"
|
||||
else "(unable to parse command)"
|
||||
giveup "unable to use special remote due to protocol error"
|
||||
|
||||
{- While the action is running, the ExternalState provided to it will not
|
||||
- be available to any other calls.
|
||||
|
@ -601,29 +605,58 @@ withExternalState external a = do
|
|||
put st = liftIO $ atomically $ modifyTVar' v (st:)
|
||||
|
||||
{- Starts an external remote process running, and checks VERSION and
|
||||
- exchanges EXTENSIONS. -}
|
||||
- exchanges EXTENSIONS.
|
||||
-
|
||||
- When the ASYNC extension is negotiated, a single process is used,
|
||||
- and this constructs a external state that communicates with a thread
|
||||
- that relays to it.
|
||||
-}
|
||||
startExternal :: External -> Annex ExternalState
|
||||
startExternal external = do
|
||||
startExternal external =
|
||||
liftIO (atomically $ takeTMVar (externalAsync external)) >>= \case
|
||||
UncheckedExternalAsync -> do
|
||||
(st, extensions) <- startExternal' external
|
||||
if asyncExtensionEnabled extensions
|
||||
then do
|
||||
relay <- liftIO $ runRelayToExternalAsync external st
|
||||
st' <- liftIO $ asyncRelayExternalState relay
|
||||
store (ExternalAsync relay)
|
||||
return st'
|
||||
else do
|
||||
store NoExternalAsync
|
||||
return st
|
||||
v@NoExternalAsync -> do
|
||||
store v
|
||||
fst <$> startExternal' external
|
||||
v@(ExternalAsync relay) -> do
|
||||
store v
|
||||
liftIO $ asyncRelayExternalState relay
|
||||
where
|
||||
store = liftIO . atomically . putTMVar (externalAsync external)
|
||||
|
||||
startExternal' :: External -> Annex (ExternalState, ExtensionList)
|
||||
startExternal' external = do
|
||||
pid <- liftIO $ atomically $ do
|
||||
n <- succ <$> readTVar (externalLastPid external)
|
||||
writeTVar (externalLastPid external) n
|
||||
return n
|
||||
AddonProcess.startExternalAddonProcess basecmd pid >>= \case
|
||||
Left (AddonProcess.ProgramFailure err) -> giveup err
|
||||
Left (AddonProcess.ProgramFailure err) -> do
|
||||
unusable err
|
||||
Left (AddonProcess.ProgramNotInstalled err) ->
|
||||
case (lookupName (unparsedRemoteConfig (externalDefaultConfig external)), remoteAnnexReadOnly <$> externalGitConfig external) of
|
||||
(Just rname, Just True) -> giveup $ unlines
|
||||
(Just rname, Just True) -> unusable $ unlines
|
||||
[ err
|
||||
, "This remote has annex-readonly=true, and previous versions of"
|
||||
, "git-annex would tried to download from it without"
|
||||
, "installing " ++ basecmd ++ ". If you want that, you need to set:"
|
||||
, "git config remote." ++ rname ++ ".annex-externaltype readonly"
|
||||
]
|
||||
_ -> giveup err
|
||||
_ -> unusable err
|
||||
Right p -> do
|
||||
cv <- liftIO $ newTVarIO $ externalDefaultConfig external
|
||||
ccv <- liftIO $ newTVarIO id
|
||||
pv <- liftIO $ newTVarIO Unprepared
|
||||
cv <- liftIO $ newTMVarIO $ externalDefaultConfig external
|
||||
ccv <- liftIO $ newTMVarIO id
|
||||
pv <- liftIO $ newTMVarIO Unprepared
|
||||
let st = ExternalState
|
||||
{ externalSend = sendMessageAddonProcess p
|
||||
, externalReceive = receiveMessageAddonProcess p
|
||||
|
@ -632,8 +665,8 @@ startExternal external = do
|
|||
, externalConfig = cv
|
||||
, externalConfigChanges = ccv
|
||||
}
|
||||
startproto st
|
||||
return st
|
||||
extensions <- startproto st
|
||||
return (st, extensions)
|
||||
where
|
||||
basecmd = "git-annex-remote-" ++ externalType external
|
||||
startproto st = do
|
||||
|
@ -645,14 +678,24 @@ startExternal external = do
|
|||
-- It responds with a EXTENSIONS_RESPONSE; that extensions
|
||||
-- list is reserved for future expansion. UNSUPPORTED_REQUEST
|
||||
-- is also accepted.
|
||||
receiveMessage st external
|
||||
exwanted <- receiveMessage st external
|
||||
(\resp -> case resp of
|
||||
EXTENSIONS_RESPONSE _ -> result ()
|
||||
UNSUPPORTED_REQUEST -> result ()
|
||||
EXTENSIONS_RESPONSE l -> result l
|
||||
UNSUPPORTED_REQUEST -> result mempty
|
||||
_ -> Nothing
|
||||
)
|
||||
(const Nothing)
|
||||
(const Nothing)
|
||||
case filter (`notElem` fromExtensionList supportedExtensionList) (fromExtensionList exwanted) of
|
||||
[] -> return exwanted
|
||||
exrest -> unusable $ unwords $
|
||||
[ basecmd
|
||||
, "requested extensions that this version of git-annex does not support:"
|
||||
] ++ exrest
|
||||
|
||||
unusable msg = do
|
||||
warning msg
|
||||
giveup ("unable to use external special remote " ++ basecmd)
|
||||
|
||||
stopExternal :: External -> Annex ()
|
||||
stopExternal external = liftIO $ do
|
||||
|
@ -672,10 +715,12 @@ checkVersion _ _ = Nothing
|
|||
- the error message. -}
|
||||
checkPrepared :: ExternalState -> External -> Annex ()
|
||||
checkPrepared st external = do
|
||||
v <- liftIO $ atomically $ readTVar $ externalPrepared st
|
||||
v <- liftIO $ atomically $ takeTMVar $ externalPrepared st
|
||||
case v of
|
||||
Prepared -> noop
|
||||
FailedPrepare errmsg -> giveup errmsg
|
||||
Prepared -> setprepared Prepared
|
||||
FailedPrepare errmsg -> do
|
||||
setprepared (FailedPrepare errmsg)
|
||||
giveup errmsg
|
||||
Unprepared ->
|
||||
handleRequest' st external PREPARE Nothing $ \resp ->
|
||||
case resp of
|
||||
|
@ -688,8 +733,8 @@ checkPrepared st external = do
|
|||
giveup errmsg'
|
||||
_ -> Nothing
|
||||
where
|
||||
setprepared status = liftIO $ atomically $ void $
|
||||
swapTVar (externalPrepared st) status
|
||||
setprepared status = liftIO $ atomically $
|
||||
putTMVar (externalPrepared st) status
|
||||
|
||||
respErrorMessage :: String -> String -> String
|
||||
respErrorMessage req err
|
||||
|
|
123
Remote/External/AsyncExtension.hs
vendored
Normal file
123
Remote/External/AsyncExtension.hs
vendored
Normal file
|
@ -0,0 +1,123 @@
|
|||
{- External remote protocol async extension.
|
||||
-
|
||||
- Copyright 2020 Joey Hess <id@joeyh.name>
|
||||
-
|
||||
- Licensed under the GNU AGPL version 3 or higher.
|
||||
-}
|
||||
|
||||
{-# LANGUAGE RankNTypes #-}
|
||||
{-# LANGUAGE BangPatterns #-}
|
||||
|
||||
module Remote.External.AsyncExtension (runRelayToExternalAsync) where
|
||||
|
||||
import Common
|
||||
import Messages
|
||||
import Remote.External.Types
|
||||
import Utility.SimpleProtocol as Proto
|
||||
|
||||
import Control.Concurrent.Async
|
||||
import Control.Concurrent.STM
|
||||
import Control.Concurrent.STM.TBMChan
|
||||
import qualified Data.Map.Strict as M
|
||||
|
||||
-- | Starts a thread that will handle all communication with the external
|
||||
-- process. The input ExternalState communicates directly with the external
|
||||
-- process.
|
||||
runRelayToExternalAsync :: External -> ExternalState -> IO ExternalAsyncRelay
|
||||
runRelayToExternalAsync external st = do
|
||||
jidmap <- newTVarIO M.empty
|
||||
sendq <- newSendQueue
|
||||
nextjid <- newTVarIO (JobId 1)
|
||||
void $ async $ sendloop st sendq
|
||||
void $ async $ receiveloop external st jidmap sendq
|
||||
return $ ExternalAsyncRelay $ do
|
||||
receiveq <- newReceiveQueue
|
||||
jid <- atomically $ do
|
||||
jid@(JobId n) <- readTVar nextjid
|
||||
let !jid' = JobId (succ n)
|
||||
writeTVar nextjid jid'
|
||||
modifyTVar' jidmap $ M.insert jid receiveq
|
||||
return jid
|
||||
return $ ExternalState
|
||||
{ externalSend = \msg ->
|
||||
atomically $ writeTBMChan sendq
|
||||
(toAsyncWrapped msg, jid)
|
||||
, externalReceive = atomically (readTBMChan receiveq)
|
||||
-- This shuts down the whole relay.
|
||||
, externalShutdown = shutdown external st sendq
|
||||
-- These three TMVars are shared amoung all
|
||||
-- ExternalStates that use this relay; they're
|
||||
-- common state about the external process.
|
||||
, externalPrepared = externalPrepared st
|
||||
, externalConfig = externalConfig st
|
||||
, externalConfigChanges = externalConfigChanges st
|
||||
}
|
||||
|
||||
type ReceiveQueue = TBMChan String
|
||||
|
||||
type SendQueue = TBMChan (AsyncWrapped, JobId)
|
||||
|
||||
type JidMap = TVar (M.Map JobId ReceiveQueue)
|
||||
|
||||
newReceiveQueue :: IO ReceiveQueue
|
||||
newReceiveQueue = newTBMChanIO 10
|
||||
|
||||
newSendQueue :: IO SendQueue
|
||||
newSendQueue = newTBMChanIO 10
|
||||
|
||||
receiveloop :: External -> ExternalState -> JidMap -> SendQueue -> IO ()
|
||||
receiveloop external st jidmap sendq = externalReceive st >>= \case
|
||||
Just l -> case parseMessage l :: Maybe AsyncMessage of
|
||||
Just (AsyncMessage jid msg) ->
|
||||
M.lookup jid <$> readTVarIO jidmap >>= \case
|
||||
Just c -> do
|
||||
atomically $ writeTBMChan c msg
|
||||
receiveloop external st jidmap sendq
|
||||
Nothing -> protoerr "unknown job number"
|
||||
Nothing -> case parseMessage l :: Maybe ExceptionalMessage of
|
||||
Just msg -> do
|
||||
-- ERROR is relayed to all listeners
|
||||
m <- readTVarIO jidmap
|
||||
forM (M.elems m) $ \c ->
|
||||
atomically $ writeTBMChan c l
|
||||
receiveloop external st jidmap sendq
|
||||
Nothing -> protoerr "unexpected non-async message"
|
||||
Nothing -> closeandshutdown
|
||||
where
|
||||
protoerr s = do
|
||||
warningIO $ "async external special remote protocol error: " ++ s
|
||||
closeandshutdown
|
||||
|
||||
closeandshutdown = do
|
||||
shutdown external st sendq True
|
||||
m <- atomically $ readTVar jidmap
|
||||
forM_ (M.elems m) (atomically . closeTBMChan)
|
||||
|
||||
sendloop :: ExternalState -> SendQueue -> IO ()
|
||||
sendloop st sendq = atomically (readTBMChan sendq) >>= \case
|
||||
Just (wrappedmsg, jid) -> do
|
||||
case wrappedmsg of
|
||||
AsyncWrappedRemoteResponse msg ->
|
||||
externalSend st $ wrapjid msg jid
|
||||
AsyncWrappedRequest msg ->
|
||||
externalSend st $ wrapjid msg jid
|
||||
AsyncWrappedExceptionalMessage msg ->
|
||||
externalSend st msg
|
||||
AsyncWrappedAsyncMessage msg ->
|
||||
externalSend st msg
|
||||
sendloop st sendq
|
||||
Nothing -> return ()
|
||||
where
|
||||
wrapjid msg jid = AsyncMessage jid $ unwords $ Proto.formatMessage msg
|
||||
|
||||
shutdown :: External -> ExternalState -> SendQueue -> Bool -> IO ()
|
||||
shutdown external st sendq b = do
|
||||
r <- atomically $ do
|
||||
r <- tryTakeTMVar (externalAsync external)
|
||||
putTMVar (externalAsync external)
|
||||
UncheckedExternalAsync
|
||||
return r
|
||||
case r of
|
||||
Just (ExternalAsync _) -> externalShutdown st b
|
||||
_ -> noop
|
||||
atomically $ closeTBMChan sendq
|
90
Remote/External/Types.hs
vendored
90
Remote/External/Types.hs
vendored
|
@ -5,7 +5,8 @@
|
|||
- Licensed under the GNU AGPL version 3 or higher.
|
||||
-}
|
||||
|
||||
{-# LANGUAGE FlexibleInstances, TypeSynonymInstances #-}
|
||||
{-# LANGUAGE FlexibleInstances, TypeSynonymInstances, RankNTypes #-}
|
||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
|
||||
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
||||
|
||||
module Remote.External.Types (
|
||||
|
@ -14,7 +15,11 @@ module Remote.External.Types (
|
|||
ExternalType,
|
||||
ExternalState(..),
|
||||
PrepareStatus(..),
|
||||
ExtensionList(..),
|
||||
supportedExtensionList,
|
||||
asyncExtensionEnabled,
|
||||
ExternalAsync(..),
|
||||
ExternalAsyncRelay(..),
|
||||
Proto.parseMessage,
|
||||
Proto.Sendable(..),
|
||||
Proto.Receivable(..),
|
||||
|
@ -26,6 +31,10 @@ module Remote.External.Types (
|
|||
RemoteRequest(..),
|
||||
RemoteResponse(..),
|
||||
ExceptionalMessage(..),
|
||||
AsyncMessage(..),
|
||||
AsyncWrapped(..),
|
||||
ToAsyncWrapped(..),
|
||||
JobId(..),
|
||||
ErrorMsg,
|
||||
Setting,
|
||||
Description,
|
||||
|
@ -49,6 +58,7 @@ import qualified Utility.SimpleProtocol as Proto
|
|||
import Control.Concurrent.STM
|
||||
import Network.URI
|
||||
import Data.Char
|
||||
import Text.Read
|
||||
|
||||
data External = External
|
||||
{ externalType :: ExternalType
|
||||
|
@ -60,6 +70,7 @@ data External = External
|
|||
, externalDefaultConfig :: ParsedRemoteConfig
|
||||
, externalGitConfig :: Maybe RemoteGitConfig
|
||||
, externalRemoteStateHandle :: Maybe RemoteStateHandle
|
||||
, externalAsync :: TMVar ExternalAsync
|
||||
}
|
||||
|
||||
newExternal :: ExternalType -> Maybe UUID -> ParsedRemoteConfig -> Maybe RemoteGitConfig -> Maybe RemoteStateHandle -> Annex External
|
||||
|
@ -71,27 +82,44 @@ newExternal externaltype u c gc rs = liftIO $ External
|
|||
<*> pure c
|
||||
<*> pure gc
|
||||
<*> pure rs
|
||||
<*> atomically (newTMVar UncheckedExternalAsync)
|
||||
|
||||
type ExternalType = String
|
||||
|
||||
data ExternalState
|
||||
= ExternalState
|
||||
{ externalSend :: String -> IO ()
|
||||
data ExternalState = ExternalState
|
||||
{ externalSend :: forall t. (Proto.Sendable t, ToAsyncWrapped t) => t -> IO ()
|
||||
, externalReceive :: IO (Maybe String)
|
||||
, externalShutdown :: Bool -> IO ()
|
||||
, externalPrepared :: TVar PrepareStatus
|
||||
, externalConfig :: TVar ParsedRemoteConfig
|
||||
, externalConfigChanges :: TVar (RemoteConfig -> RemoteConfig)
|
||||
, externalPrepared :: TMVar PrepareStatus
|
||||
, externalConfig :: TMVar ParsedRemoteConfig
|
||||
, externalConfigChanges :: TMVar (RemoteConfig -> RemoteConfig)
|
||||
}
|
||||
|
||||
type PID = Int
|
||||
|
||||
-- List of extensions to the protocol.
|
||||
newtype ExtensionList = ExtensionList [String]
|
||||
deriving (Show)
|
||||
newtype ExtensionList = ExtensionList { fromExtensionList :: [String] }
|
||||
deriving (Show, Monoid, Semigroup)
|
||||
|
||||
supportedExtensionList :: ExtensionList
|
||||
supportedExtensionList = ExtensionList ["INFO", "ASYNC"]
|
||||
supportedExtensionList = ExtensionList ["INFO", asyncExtension]
|
||||
|
||||
asyncExtension :: String
|
||||
asyncExtension = "ASYNC"
|
||||
|
||||
asyncExtensionEnabled :: ExtensionList -> Bool
|
||||
asyncExtensionEnabled l = asyncExtension `elem` fromExtensionList l
|
||||
|
||||
-- When the async extension is in use, a single external process
|
||||
-- is started and used for all requests.
|
||||
data ExternalAsync
|
||||
= ExternalAsync ExternalAsyncRelay
|
||||
| NoExternalAsync
|
||||
| UncheckedExternalAsync
|
||||
|
||||
data ExternalAsyncRelay = ExternalAsyncRelay
|
||||
{ asyncRelayExternalState :: IO ExternalState
|
||||
}
|
||||
|
||||
data PrepareStatus = Unprepared | Prepared | FailedPrepare ErrorMsg
|
||||
|
||||
|
@ -335,18 +363,36 @@ instance Proto.Receivable ExceptionalMessage where
|
|||
parseCommand "ERROR" = Proto.parse1 ERROR
|
||||
parseCommand _ = Proto.parseFail
|
||||
|
||||
-- Messages used by the async protocol extension.
|
||||
data AsyncMessage
|
||||
= START_ASYNC JobId
|
||||
| END_ASYNC JobId
|
||||
| UPDATE_ASYNC JobId
|
||||
data AsyncMessage = AsyncMessage JobId WrappedMsg
|
||||
|
||||
instance Proto.Receivable AsyncMessage where
|
||||
parseCommand "START-ASYNC" = Proto.parse1 START_ASYNC
|
||||
parseCommand "END-ASYNC" = Proto.parse1 END_ASYNC
|
||||
parseCommand "UPDATE-ASYNC" = Proto.parse1 UPDATE_ASYNC
|
||||
parseCommand "J" = Proto.parse2 AsyncMessage
|
||||
parseCommand _ = Proto.parseFail
|
||||
|
||||
instance Proto.Sendable AsyncMessage where
|
||||
formatMessage (AsyncMessage jid msg) = ["J", Proto.serialize jid, msg]
|
||||
|
||||
data AsyncWrapped
|
||||
= AsyncWrappedRemoteResponse RemoteResponse
|
||||
| AsyncWrappedRequest Request
|
||||
| AsyncWrappedExceptionalMessage ExceptionalMessage
|
||||
| AsyncWrappedAsyncMessage AsyncMessage
|
||||
|
||||
class ToAsyncWrapped t where
|
||||
toAsyncWrapped :: t -> AsyncWrapped
|
||||
|
||||
instance ToAsyncWrapped RemoteResponse where
|
||||
toAsyncWrapped = AsyncWrappedRemoteResponse
|
||||
|
||||
instance ToAsyncWrapped Request where
|
||||
toAsyncWrapped = AsyncWrappedRequest
|
||||
|
||||
instance ToAsyncWrapped ExceptionalMessage where
|
||||
toAsyncWrapped = AsyncWrappedExceptionalMessage
|
||||
|
||||
instance ToAsyncWrapped AsyncMessage where
|
||||
toAsyncWrapped = AsyncWrappedAsyncMessage
|
||||
|
||||
-- Data types used for parameters when communicating with the remote.
|
||||
-- All are serializable.
|
||||
type ErrorMsg = String
|
||||
|
@ -354,11 +400,17 @@ type Setting = String
|
|||
type Description = String
|
||||
type ProtocolVersion = Int
|
||||
type Size = Maybe Integer
|
||||
type JobId = String
|
||||
type WrappedMsg = String
|
||||
newtype JobId = JobId Integer
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
supportedProtocolVersions :: [ProtocolVersion]
|
||||
supportedProtocolVersions = [1]
|
||||
|
||||
instance Proto.Serializable JobId where
|
||||
serialize (JobId n) = show n
|
||||
deserialize = JobId <$$> readMaybe
|
||||
|
||||
instance Proto.Serializable Direction where
|
||||
serialize Upload = "STORE"
|
||||
serialize Download = "RETRIEVE"
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
(This is a draft and not implemented yet.)
|
||||
|
||||
This is an appendix to the [[external_special_remote_protocol]].
|
||||
|
||||
[[!toc]]
|
||||
|
@ -7,11 +5,11 @@ This is an appendix to the [[external_special_remote_protocol]].
|
|||
## introduction
|
||||
|
||||
Normally, an external special remote can only be used to do one thing at a
|
||||
time. When git-annex has concurrency enabled, it will start up multiple
|
||||
time, and when git-annex has concurrency enabled, it will start up multiple
|
||||
processes for the same external special remote.
|
||||
|
||||
This extension lets a single external special remote process handle
|
||||
multiple concurrent requests, which can be useful if multiple processes
|
||||
The `ASYNC` extension lets a single external special remote process handle
|
||||
multiple concurrent jobs, which can be useful if multiple processes
|
||||
would use too many resources, or if it can be better coordinated using a
|
||||
single process.
|
||||
|
||||
|
@ -28,105 +26,73 @@ that includes `ASYNC`, and the external special remote responding in kind.
|
|||
EXTENSIONS INFO ASYNC
|
||||
EXTENSIONS ASYNC
|
||||
|
||||
From this point forward, *everything* that the external special remote
|
||||
has to be wrapped in the async protocol. Messages git-annex sends are
|
||||
unchanged.
|
||||
(Older versions of git-annex will not include `ASYNC` in their extensions
|
||||
list. To support them, it's a good idea for the external special remote to
|
||||
fall back to using the regular protocol.)
|
||||
|
||||
Generally the first message git-annex sends will be PREPARE.
|
||||
Once the extension is negotiated, messages in the protocol are
|
||||
tagged with a job number, by prefixing them with "J n".
|
||||
|
||||
PREPARE
|
||||
As usual, the first message git-annex sends is generally PREPARE,
|
||||
which gets tagged with a job number:
|
||||
|
||||
Rather than just responding PREPARE-SUCCESS, it has to be wrapped
|
||||
in the async protocol:
|
||||
J 1 PREPARE
|
||||
|
||||
RESULT-ASYNC PREPARE-SUCCESS
|
||||
Rather than just responding PREPARE-SUCCESS, the job number has to be
|
||||
included in the reply:
|
||||
|
||||
Suppose git-annex wants to make some transfers. So it sends:
|
||||
J 1 PREPARE-SUCCESS
|
||||
|
||||
TRANSFER RETRIEVE Key1 file1
|
||||
Suppose git-annex wants to make some transfers. It can request several
|
||||
at the same time, using different job numbers:
|
||||
|
||||
The special remote should respond with an unique identifier for this
|
||||
async job that it's going to start. The identifier can
|
||||
be anything you want to use, but an incrementing number is a
|
||||
reasonable choice. (The Key itself is not a good choice, because git-annex
|
||||
could make different requests involving the same Key.)
|
||||
J 1 TRANSFER RETRIEVE Key1 file1
|
||||
J 2 TRANSFER RETRIEVE Key2 file2
|
||||
|
||||
START-ASYNC 1
|
||||
The special remote can now perform both transfers at the same time.
|
||||
If it sends PROGRESS messages for these transfers, they have to be tagged
|
||||
with the job numbers:
|
||||
|
||||
Once that's sent, git-annex can send its next request immediately,
|
||||
while that transfer is still running. For example, it might request a
|
||||
second transfer, and the special remote can reply when it's started that
|
||||
transfer too:
|
||||
J 1 PROGRESS 10
|
||||
J 2 PROGRESS 500
|
||||
J 1 PROGRESS 20
|
||||
|
||||
TRANSFER RETRIEVE 2 file2
|
||||
START-ASYNC 2
|
||||
The special remote can also send messages that query git-annex for some
|
||||
information. These messages and the reply will also be tagged with a job
|
||||
number.
|
||||
|
||||
If it needs to query git-annex for some information, the special remote
|
||||
can use `ASYNC` to send a message, and wait for git-annex to reply
|
||||
in a `REPLY-ASYNC` message:
|
||||
J 1 GETCONFIG url
|
||||
J 3 RETRIEVE Key3 file3
|
||||
J 1 VALUE http://example.com/
|
||||
|
||||
ASYNC 1 GETCONFIG url
|
||||
REPLY-ASYNC 1 VALUE http://example.com/
|
||||
One transfers are done, the special remote sends `TRANSFER-SUCCESS` tagged
|
||||
with the job number.
|
||||
|
||||
To indicate progress of transfers, the special remote can send
|
||||
`ASYNC` messages, wrapping the usual PROGRESS messages:
|
||||
J 2 TRANSFER-SUCCESS RETRIEVE Key2
|
||||
J 1 PROGRESS 100
|
||||
J 1 TRANSFER-SUCCESS RETRIEVE Key1
|
||||
|
||||
ASYNC 1 PROGRESS 10
|
||||
ASYNC 2 PROGRESS 500
|
||||
ASYNC 1 PROGRESS 20
|
||||
Lots of different jobs can be requested at the same time.
|
||||
|
||||
Once a transfer is done, the special remote indicates this with an
|
||||
`END-ASYNC` message, wrapping the usual `TRANSFER-SUCCESS` or
|
||||
`TRANSFER-FAILURE` message:
|
||||
J 4 CHECKPRESENT Key3
|
||||
J 5 CHECKPRESENT Key4
|
||||
J 6 REMOVE Key5
|
||||
J 4 CHECKPRESENT-SUCCESS Key3
|
||||
J 6 REMOVE-SUCCESS Key5
|
||||
J 5 CHECKPRESENT-FAILURE Key4
|
||||
|
||||
END-ASYNC 2 TRANSFER-SUCCESS RETRIEVE Key2
|
||||
ASYNC Key1 PROGRESS 100
|
||||
END-ASYNC 1 TRANSFER-SUCCESS RETRIEVE Key1
|
||||
## notes
|
||||
|
||||
Not only transfers, but everything the special remote sends to git-annex
|
||||
has to be wrapped in the async protocol.
|
||||
There will be one job number for each thread that git-annex runs
|
||||
concurrently, so around the same number as the -J value, although in some
|
||||
cases git-annex does more concurrent operations than the -J value.
|
||||
|
||||
CHECKPRESENT Key3
|
||||
START-ASYNC 3
|
||||
CHECKPRESENT Key4
|
||||
START-ASYNC 4
|
||||
END-ASYNC 3 CHECKPRESENT-SUCCESS Key3
|
||||
REMOVE Key3
|
||||
END-ASYNC 4 CHECKPRESENT-FAILURE Key4
|
||||
START_ASYNC 5
|
||||
END-ASYNC 5 REMOVE-SUCCESS Key3
|
||||
`PREPARE` is sent only once per run of a special remote
|
||||
program, and despite being tagged with a job number, it should prepare the
|
||||
special remote to run that and any other jobs.
|
||||
|
||||
## added messages
|
||||
|
||||
Here's the details about the additions to the protocol.
|
||||
|
||||
* `START-ASYNC JobId`
|
||||
This (or `RESULT-ASYNC` must be sent in response to all requests
|
||||
git-annex sends after `EXTENSIONS` has been used to negotiate the
|
||||
async protocol.
|
||||
The JobId is a unique value, typically an incrementing number.
|
||||
This does not need to be sent immediately after git-annex sends a request;
|
||||
other messages can be sent in between. But the next START-ASYNC git-annex sees
|
||||
after sending a request tells it the JobId that will be used for that request.
|
||||
* `END-ASYNC JobId ReplyMsg`
|
||||
Indicates that an async job is complete. The ReplyMsg indicates the result
|
||||
of the job, and is anything that would be sent as a protocol reply in the
|
||||
non-async protocol.
|
||||
* `RESULT-ASYNC ReplyMsg`
|
||||
This is the same as sending `START-ASYNC` immediately followed by
|
||||
`END-ASYNC`. This is often used to respond to `PREPARE`, `LISTCONFIGS`,
|
||||
and other things that are trivial or just don't need to be handled async.
|
||||
* `ASYNC JobId InfoMsg`
|
||||
Used to send any of the [special remote messages](https://git-annex.branchable.com/design/external_special_remote_protocol/#index5h2)
|
||||
to git-annex.
|
||||
Often used to send `PROGRESS`, but can also be used for other messages,
|
||||
including ones that git-annex sends a reply to. When git-annex does send
|
||||
a reply,
|
||||
it will be wrapped in `REPLY-ASYNC`.
|
||||
Can be sent at any time aftwr `START-ASYNC` and before `END-ASYNC` for
|
||||
the JobId in question.
|
||||
* `REPLY-ASYNC JobId Reply`
|
||||
Sent by git-annex when `ASYNC` has been sent and the message generated
|
||||
a reply. Note that this may not be the next message received from
|
||||
git-annex immediately after sending an `ASYNC` request.
|
||||
`ERROR` should not be tagged with a job number if either git-annex
|
||||
or the special remote needs to send it.
|
||||
|
||||
`VERSION`, `EXTENSIONS` and `ERROR` are the only protocol messages
|
||||
that do not get tagged with a job number.
|
||||
|
|
25
doc/devblog/day_629__async_external_special_remotes.mdwn
Normal file
25
doc/devblog/day_629__async_external_special_remotes.mdwn
Normal file
|
@ -0,0 +1,25 @@
|
|||
After a release on Monday, I've spent the week working on
|
||||
[[async extension to external special remote protocol|design/external_special_remote_protocol/async_appendix]].
|
||||
This is lets a single external special remote process handle multiple
|
||||
requests at the same time, when it's more efficient to use one process
|
||||
than for git-annex to run several processes.
|
||||
|
||||
It's a good thing I added support for extensions a couple of years back.
|
||||
I never imagined at the time using it for something like this, that
|
||||
radically changes the whole protocol! It could have just been protocol
|
||||
version 2, but then special remotes would be pushed towards using this by
|
||||
default, which I don't want. It's probably overkill for most of them.
|
||||
|
||||
J 4 CHECKPRESENT Key3
|
||||
J 5 CHECKPRESENT Key4
|
||||
J 6 REMOVE Key5
|
||||
J 4 CHECKPRESENT-SUCCESS Key3
|
||||
J 6 REMOVE-SUCCESS Key5
|
||||
J 5 CHECKPRESENT-FAILURE Key4
|
||||
|
||||
The protocol extension went through a bunch of iterations, ending up with
|
||||
probably the simplest possible way to do it, a simple framing layer around
|
||||
the main protocol. I started with rather a lot of rather hairy code and it
|
||||
kind of all melted away as I refined the protocol down to that, which was
|
||||
nice, although I also kind of wish I had been able to jump right to
|
||||
the clean and simple end result.
|
|
@ -8,3 +8,5 @@ Just an idea ;)
|
|||
|
||||
[[!meta author=yoh]]
|
||||
[[!tag projects/dandi]]
|
||||
|
||||
> [[done]] --[[Joey]]
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
[[!comment format=mdwn
|
||||
username="joey"
|
||||
subject="""comment 7"""
|
||||
date="2020-08-14T18:59:37Z"
|
||||
content="""
|
||||
ASYNC extension is implemented. The protocol went through several
|
||||
iterations and ended up at about the simplest and cleanest possible way to
|
||||
do it.
|
||||
"""]]
|
|
@ -961,6 +961,7 @@ Executable git-annex
|
|||
Remote.Directory
|
||||
Remote.Directory.LegacyChunked
|
||||
Remote.External
|
||||
Remote.External.AsyncExtension
|
||||
Remote.External.Types
|
||||
Remote.GCrypt
|
||||
Remote.Git
|
||||
|
|
Loading…
Reference in a new issue