wip external remote async protocol extension

This commit is contained in:
Joey Hess 2020-08-12 15:17:53 -04:00
parent 7a21492f49
commit 06a4ab39fa
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
4 changed files with 187 additions and 15 deletions

View file

@ -10,6 +10,7 @@
module Remote.External (remote) where
import Remote.External.Types
import Remote.External.AsyncExtension
import qualified Annex
import Annex.Common
import qualified Annex.ExternalAddonProcess as AddonProcess
@ -601,9 +602,37 @@ withExternalState external a = do
put st = liftIO $ atomically $ modifyTVar' v (st:)
{- Starts an external remote process running, and checks VERSION and
- exchanges EXTENSIONS. -}
- exchanges EXTENSIONS.
-
- When the ASYNC extension is negotiated, a single process is used,
- and this constructs a external state that communicates with a thread
- that relays to it.
-}
startExternal :: External -> Annex ExternalState
startExternal external = do
startExternal external =
liftIO (atomically $ takeTMVar (externalAsync external)) >>= \case
UncheckedExternalAsync -> do
(st, extensions) <- startExternal' external
if asyncExtensionEnabled extensions
then do
v <- liftIO $ runRelayToExternalAsync st
st' <- liftIO $ relayToExternalAsync v
store (ExternalAsync v)
return st'
else do
store NoExternalAsync
return st
v@NoExternalAsync -> do
store v
fst <$> startExternal' external
v@(ExternalAsync ExternalAsyncRelay) -> do
store v
liftIO $ relayToExternalAsync v
where
store = liftIO . atomically . putTMVar (externalAsync external)
startExternal' :: External -> Annex (ExternalState, ExtensionList)
startExternal' external = do
pid <- liftIO $ atomically $ do
n <- succ <$> readTVar (externalLastPid external)
writeTVar (externalLastPid external) n
@ -632,8 +661,8 @@ startExternal external = do
, externalConfig = cv
, externalConfigChanges = ccv
}
startproto st
return st
extensions <- startproto st
return (st, extensions)
where
basecmd = "git-annex-remote-" ++ externalType external
startproto st = do
@ -645,14 +674,20 @@ startExternal external = do
-- It responds with a EXTENSIONS_RESPONSE; that extensions
-- list is reserved for future expansion. UNSUPPORTED_REQUEST
-- is also accepted.
receiveMessage st external
exwanted <- receiveMessage st external
(\resp -> case resp of
EXTENSIONS_RESPONSE _ -> result ()
UNSUPPORTED_REQUEST -> result ()
EXTENSIONS_RESPONSE l -> result l
UNSUPPORTED_REQUEST -> result []
_ -> Nothing
)
(const Nothing)
(const Nothing)
case filter (`notElem` fromExtensionList supportedExtensionList) (fromExtensionList exwanted) of
[] -> return exwanted
exrest -> giveup $ unwords $
[ basecmd
, "requested extensions that this version of git-annex does not support:"
] ++ exrest
stopExternal :: External -> Annex ()
stopExternal external = liftIO $ do

110
Remote/External/AsyncExtension.hs vendored Normal file
View file

@ -0,0 +1,110 @@
{- External remote protocol async extension.
-
- Copyright 2013-2020 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
{-# LANGUAGE BangPatterns #-}
module Remote.External.AsyncExtension where
import Common.Annex
import Remote.External.Types
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMChan
import qualified Data.Map.Strict as M
-- | Constructs an ExternalState that can be used to communicate with
-- the external process via the relay.
relayToExternalAsync :: ExternalAsyncRelay -> IO ExternalState
relayToExternalAsync relay = do
n <- liftIO $ atomically $ do
v <- readTVar (asyncRelayLast relay)
let !n = succ v
writeTVar (asyncRelayLast relay) n
return n
return $ asyncRelayExternalState n
-- | Starts a thread that will handle all communication with the external
-- process. The input ExternalState communicates directly with the external
-- process.
runRelayToExternalAsync :: ExternalState -> IO ExternalAsyncRelay
runRelayToExternalAsync st = do
startcomm <- runRelayToExternalAsync' st
pv <- atomically $ newTVar 1
return $ ExternalAsyncRelay
{ asyncRelayLastId = pv
, asyncRelayExternalState = relaystate startcomm
}
where
relaystate startcomm n = do
(sendh, receiveh, shutdownh) <- startcomm n
ExternalState
{ externalSend = atomically . writeTBMChan sendh
, externalReceive = atomically . readTBMChan receiveh
, externalShutdown = atomically . writeTBMChan shutdownh
-- These three TVars are shared amoung all
-- ExternalStates that use this relay; they're
-- common state about the external process.
-- TODO: ALL code using these in Remote.External
-- has to be made async-safe.
, externalPrepared = externalPrepared st
, externalConfig = externalConfig st
, externalConfigChanges = externalConfigChanges st
}
newtype ClientId = ClientId Int
deriving (Show, Eq, Ord)
runRelayToExternalAsync'
:: ExternalState
-> IO (ClientId -> IO (TBMChan String, TBMChan (Maybe String), TBMChan Bool))
runRelayToExternalAsync' st = do
let startcomm n =
sendt <- async sendloop
void $ async (receiveloop [] Nothing sendt)
return startcomm
where
receiveloop newreqs currjid sendt = externalReceive st >>= \case
Just l -> case parseMessage l :: Maybe AsyncMessage of
Just (START_ASYNC jid) -> case newreqs of
[] -> giveup "async special remote protocol error: unexpected START-ASYNC"
(c:newreqs') -> do
let !receiverjids' = M.insert jid c receiverjids
receiveloop newreqs' Nothing receiverjids' sendt
Just (END_ASYNC jid) -> do
let !receiverjids' = M.delete jid receiverjids
receiveloop newreqs (Just jid) receiverjids' sendt
Just (UPDATE_ASYNC jid) ->
receiveloop newreqs (Just jid) receiverjids sendt
Nothing -> case currjid of
Just jid ->
--
Nothing -> case newreqs of
[] -> giveup "async special remote protocol error: unexpected non-async message"
(c:_) -> do
case M.lookup c receivers of
Just c -> atomically $ writeTBMChan c l
Nothing -> return ()
receiveloop newreqs Nothing sendt
Nothing -> do
-- Unable to receive anything more from the
-- process, so it's not usable any longer.
-- So close all chans, stop the process,
-- and avoid any new ExternalStates from being
-- created using it.
atomically $ do
void $ tryTakeTMVar (externalAsync external)
putTMVar (externalAsync external)
UncheckedExternalAsync
forM_ (M.toList receivers) $
atomically . closeTBMChan
forM_ (M.toList senders) $
atomically . closeTBMChan
externalShutdown st True
cancel sendt
sendloop = do

View file

@ -15,6 +15,7 @@ module Remote.External.Types (
ExternalState(..),
PrepareStatus(..),
supportedExtensionList,
asyncExtensionEnabled,
Proto.parseMessage,
Proto.Sendable(..),
Proto.Receivable(..),
@ -60,6 +61,7 @@ data External = External
, externalDefaultConfig :: ParsedRemoteConfig
, externalGitConfig :: Maybe RemoteGitConfig
, externalRemoteStateHandle :: Maybe RemoteStateHandle
, externalAsync :: TMVar ExternalAsync
}
newExternal :: ExternalType -> Maybe UUID -> ParsedRemoteConfig -> Maybe RemoteGitConfig -> Maybe RemoteStateHandle -> Annex External
@ -71,6 +73,7 @@ newExternal externaltype u c gc rs = liftIO $ External
<*> pure c
<*> pure gc
<*> pure rs
<*> atomically (newTMVar UncheckedExternalAsync)
type ExternalType = String
@ -87,11 +90,29 @@ data ExternalState
type PID = Int
-- List of extensions to the protocol.
newtype ExtensionList = ExtensionList [String]
newtype ExtensionList = ExtensionList { fromExtensionList :: [String] }
deriving (Show)
supportedExtensionList :: ExtensionList
supportedExtensionList = ExtensionList ["INFO", "ASYNC"]
supportedExtensionList = ExtensionList ["INFO", asyncExtension]
asyncExtension :: String
asyncExtension = "ASYNC"
asyncExtensionEnabled :: ExtensionList -> Bool
asyncExtensionEnabled l = asyncExtension `elem` fromExtensionList l
-- When the async extension is in use, a single external process
-- is started and used for all requests.
data ExternalAsync
= ExternalAsync ExternalAsyncRelay
| NoExternalAsync
| UncheckedExternalAsync
data ExternalAsyncRelay = ExternalAsyncRelay
{ asyncRelayLastId :: TVar Int
, asyncRelayExternalState :: Int -> IO ExternalState
}
data PrepareStatus = Unprepared | Prepared | FailedPrepare ErrorMsg
@ -337,14 +358,18 @@ instance Proto.Receivable ExceptionalMessage where
-- Messages used by the async protocol extension.
data AsyncMessage
= START_ASYNC JobId
| END_ASYNC JobId
| UPDATE_ASYNC JobId
= START_ASYNC JobId WrappedMsg
| END_ASYNC JobId WrappedMsg
| RESULT_ASYNC WrappedMsg
| ASYNC JobId WrappedMsg
| REPLY_ASYNC JobId WrappedMsg
instance Proto.Receivable AsyncMessage where
parseCommand "START-ASYNC" = Proto.parse1 START_ASYNC
parseCommand "END-ASYNC" = Proto.parse1 END_ASYNC
parseCommand "UPDATE-ASYNC" = Proto.parse1 UPDATE_ASYNC
parseCommand "START-ASYNC" = Proto.parse2 START_ASYNC
parseCommand "END-ASYNC" = Proto.parse2 END_ASYNC
parseCommand "RESULT-ASYNC" = Proto.parse1 RESULT_ASYNC
parseCommand "ASYNC" = Proto.parse2 ASYNC
parseCommand "REPLY-ASYNC" = Proto.parse2 REPLY_ASYNC
parseCommand _ = Proto.parseFail
-- Data types used for parameters when communicating with the remote.
@ -355,6 +380,7 @@ type Description = String
type ProtocolVersion = Int
type Size = Maybe Integer
type JobId = String
type WrappedMsg = String
supportedProtocolVersions :: [ProtocolVersion]
supportedProtocolVersions = [1]

View file

@ -961,6 +961,7 @@ Executable git-annex
Remote.Directory
Remote.Directory.LegacyChunked
Remote.External
Remote.External.AsyncExtension
Remote.External.Types
Remote.GCrypt
Remote.Git