relayer receive loop is done

Receive loop looks right. Still need the send loop.

And, a complication is that some messages git-annex
sends need to be wrapped in REPLY_ASYNC, while others
do not. So will probably need to split externalSend
into two.
This commit is contained in:
Joey Hess 2020-08-12 15:54:30 -04:00
parent 06a4ab39fa
commit 15706e6991
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
4 changed files with 73 additions and 46 deletions

View file

@ -615,7 +615,7 @@ startExternal external =
(st, extensions) <- startExternal' external (st, extensions) <- startExternal' external
if asyncExtensionEnabled extensions if asyncExtensionEnabled extensions
then do then do
v <- liftIO $ runRelayToExternalAsync st v <- liftIO $ runRelayToExternalAsync external st
st' <- liftIO $ relayToExternalAsync v st' <- liftIO $ relayToExternalAsync v
store (ExternalAsync v) store (ExternalAsync v)
return st' return st'
@ -625,9 +625,9 @@ startExternal external =
v@NoExternalAsync -> do v@NoExternalAsync -> do
store v store v
fst <$> startExternal' external fst <$> startExternal' external
v@(ExternalAsync ExternalAsyncRelay) -> do v@(ExternalAsync relay) -> do
store v store v
liftIO $ relayToExternalAsync v liftIO $ relayToExternalAsync relay
where where
store = liftIO . atomically . putTMVar (externalAsync external) store = liftIO . atomically . putTMVar (externalAsync external)
@ -677,7 +677,7 @@ startExternal' external = do
exwanted <- receiveMessage st external exwanted <- receiveMessage st external
(\resp -> case resp of (\resp -> case resp of
EXTENSIONS_RESPONSE l -> result l EXTENSIONS_RESPONSE l -> result l
UNSUPPORTED_REQUEST -> result [] UNSUPPORTED_REQUEST -> result mempty
_ -> Nothing _ -> Nothing
) )
(const Nothing) (const Nothing)

View file

@ -9,7 +9,7 @@
module Remote.External.AsyncExtension where module Remote.External.AsyncExtension where
import Common.Annex import Common
import Remote.External.Types import Remote.External.Types
import Control.Concurrent.Async import Control.Concurrent.Async
@ -22,18 +22,18 @@ import qualified Data.Map.Strict as M
relayToExternalAsync :: ExternalAsyncRelay -> IO ExternalState relayToExternalAsync :: ExternalAsyncRelay -> IO ExternalState
relayToExternalAsync relay = do relayToExternalAsync relay = do
n <- liftIO $ atomically $ do n <- liftIO $ atomically $ do
v <- readTVar (asyncRelayLast relay) v <- readTVar (asyncRelayLastId relay)
let !n = succ v let !n = succ v
writeTVar (asyncRelayLast relay) n writeTVar (asyncRelayLastId relay) n
return n return n
return $ asyncRelayExternalState n asyncRelayExternalState relay n
-- | Starts a thread that will handle all communication with the external -- | Starts a thread that will handle all communication with the external
-- process. The input ExternalState communicates directly with the external -- process. The input ExternalState communicates directly with the external
-- process. -- process.
runRelayToExternalAsync :: ExternalState -> IO ExternalAsyncRelay runRelayToExternalAsync :: External -> ExternalState -> IO ExternalAsyncRelay
runRelayToExternalAsync st = do runRelayToExternalAsync external st = do
startcomm <- runRelayToExternalAsync' st startcomm <- runRelayToExternalAsync' external st
pv <- atomically $ newTVar 1 pv <- atomically $ newTVar 1
return $ ExternalAsyncRelay return $ ExternalAsyncRelay
{ asyncRelayLastId = pv { asyncRelayLastId = pv
@ -41,10 +41,10 @@ runRelayToExternalAsync st = do
} }
where where
relaystate startcomm n = do relaystate startcomm n = do
(sendh, receiveh, shutdownh) <- startcomm n (sendh, receiveh, shutdownh) <- startcomm (ClientId n)
ExternalState return $ ExternalState
{ externalSend = atomically . writeTBMChan sendh { externalSend = atomically . writeTBMChan sendh
, externalReceive = atomically . readTBMChan receiveh , externalReceive = fmap join $ atomically $ readTBMChan receiveh
, externalShutdown = atomically . writeTBMChan shutdownh , externalShutdown = atomically . writeTBMChan shutdownh
-- These three TVars are shared amoung all -- These three TVars are shared amoung all
-- ExternalStates that use this relay; they're -- ExternalStates that use this relay; they're
@ -60,51 +60,71 @@ newtype ClientId = ClientId Int
deriving (Show, Eq, Ord) deriving (Show, Eq, Ord)
runRelayToExternalAsync' runRelayToExternalAsync'
:: ExternalState :: External
-> ExternalState
-> IO (ClientId -> IO (TBMChan String, TBMChan (Maybe String), TBMChan Bool)) -> IO (ClientId -> IO (TBMChan String, TBMChan (Maybe String), TBMChan Bool))
runRelayToExternalAsync' st = do runRelayToExternalAsync' external st = do
let startcomm n = let startcomm n = error "TODO"
sendt <- async sendloop sendt <- async sendloop
void $ async (receiveloop [] Nothing sendt) newreqs <- newTVarIO []
void $ async (receiveloop newreqs M.empty sendt)
return startcomm return startcomm
where where
receiveloop newreqs currjid sendt = externalReceive st >>= \case receiveloop newreqs jidmap sendt = externalReceive st >>= \case
Just l -> case parseMessage l :: Maybe AsyncMessage of Just l -> case parseMessage l :: Maybe AsyncMessage of
Just (START_ASYNC jid) -> case newreqs of Just (RESULT_ASYNC msg) -> getnext newreqs >>= \case
[] -> giveup "async special remote protocol error: unexpected START-ASYNC" Just c -> do
(c:newreqs') -> do relayto c msg
let !receiverjids' = M.insert jid c receiverjids receiveloop newreqs jidmap sendt
receiveloop newreqs' Nothing receiverjids' sendt Nothing -> protoerr "unexpected RESULT-ASYNC"
Just (END_ASYNC jid) -> do Just (START_ASYNC jid msg) -> getnext newreqs >>= \case
let !receiverjids' = M.delete jid receiverjids Just c -> do
receiveloop newreqs (Just jid) receiverjids' sendt relayto c msg
Just (UPDATE_ASYNC jid) -> let !jidmap' = M.insert jid c jidmap
receiveloop newreqs (Just jid) receiverjids sendt receiveloop newreqs jidmap' sendt
Nothing -> case currjid of Nothing -> protoerr "unexpected START-ASYNC"
Just jid -> Just (END_ASYNC jid msg) -> case M.lookup jid jidmap of
-- Just c -> do
Nothing -> case newreqs of relayto c msg
[] -> giveup "async special remote protocol error: unexpected non-async message" closerelayto c
(c:_) -> do let !jidmap' = M.delete jid jidmap
case M.lookup c receivers of receiveloop newreqs jidmap' sendt
Just c -> atomically $ writeTBMChan c l Nothing -> protoerr "END-ASYNC with unknown jobid"
Nothing -> return () Just (ASYNC jid msg) -> case M.lookup jid jidmap of
receiveloop newreqs Nothing sendt Just c -> do
relayto c msg
let !jidmap' = M.delete jid jidmap
receiveloop newreqs jidmap' sendt
Nothing -> protoerr "ASYNC with unknown jobid"
_ -> protoerr "unexpected non-async message"
Nothing -> do Nothing -> do
-- Unable to receive anything more from the -- Unable to receive anything more from the
-- process, so it's not usable any longer. -- process, so it's not usable any longer.
-- So close all chans, stop the process, -- So close all chans, stop the process,
-- and avoid any new ExternalStates from being -- and avoid any new ExternalStates from being
-- created using it. -- created using it.
cancel sendt
atomically $ do atomically $ do
void $ tryTakeTMVar (externalAsync external) void $ tryTakeTMVar (externalAsync external)
putTMVar (externalAsync external) putTMVar (externalAsync external)
UncheckedExternalAsync UncheckedExternalAsync
forM_ (M.toList receivers) $ forM_ (M.elems jidmap) closerelayto
atomically . closeTBMChan
forM_ (M.toList senders) $
atomically . closeTBMChan
externalShutdown st True externalShutdown st True
cancel sendt
sendloop = do sendloop = do
error "TODO"
relayto (toq, _fromq) msg = atomically $ writeTBMChan toq msg
closerelayto (toq, fromq) = do
atomically $ closeTBMChan toq
atomically $ closeTBMChan fromq
getnext l = atomically $ readTVar l >>= \case
[] -> return Nothing
(c:rest) -> do
writeTVar l rest
return (Just c)
protoerr s = giveup ("async special remote protocol error: " ++ s)

View file

@ -6,6 +6,7 @@
-} -}
{-# LANGUAGE FlexibleInstances, TypeSynonymInstances #-} {-# LANGUAGE FlexibleInstances, TypeSynonymInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# OPTIONS_GHC -fno-warn-orphans #-} {-# OPTIONS_GHC -fno-warn-orphans #-}
module Remote.External.Types ( module Remote.External.Types (
@ -14,8 +15,11 @@ module Remote.External.Types (
ExternalType, ExternalType,
ExternalState(..), ExternalState(..),
PrepareStatus(..), PrepareStatus(..),
ExtensionList(..),
supportedExtensionList, supportedExtensionList,
asyncExtensionEnabled, asyncExtensionEnabled,
ExternalAsync(..),
ExternalAsyncRelay(..),
Proto.parseMessage, Proto.parseMessage,
Proto.Sendable(..), Proto.Sendable(..),
Proto.Receivable(..), Proto.Receivable(..),
@ -27,6 +31,7 @@ module Remote.External.Types (
RemoteRequest(..), RemoteRequest(..),
RemoteResponse(..), RemoteResponse(..),
ExceptionalMessage(..), ExceptionalMessage(..),
AsyncMessage(..),
ErrorMsg, ErrorMsg,
Setting, Setting,
Description, Description,
@ -91,7 +96,7 @@ type PID = Int
-- List of extensions to the protocol. -- List of extensions to the protocol.
newtype ExtensionList = ExtensionList { fromExtensionList :: [String] } newtype ExtensionList = ExtensionList { fromExtensionList :: [String] }
deriving (Show) deriving (Show, Monoid, Semigroup)
supportedExtensionList :: ExtensionList supportedExtensionList :: ExtensionList
supportedExtensionList = ExtensionList ["INFO", asyncExtension] supportedExtensionList = ExtensionList ["INFO", asyncExtension]

View file

@ -111,7 +111,9 @@ Here's the details about the additions to the protocol.
* `END-ASYNC JobId ReplyMsg` * `END-ASYNC JobId ReplyMsg`
Indicates that an async job is complete. The ReplyMsg indicates the result Indicates that an async job is complete. The ReplyMsg indicates the result
of the job, and is anything that would be sent as a protocol reply in the of the job, and is anything that would be sent as a protocol reply in the
non-async protocol. non-async protocol.
After this, the JobId is not in use, an indeed the same value could be
reused by a new `START-ASYNC` if desired.
* `RESULT-ASYNC ReplyMsg` * `RESULT-ASYNC ReplyMsg`
This is the same as sending `START-ASYNC` immediately followed by This is the same as sending `START-ASYNC` immediately followed by
`END-ASYNC`. This is often used to respond to `PREPARE`, `LISTCONFIGS`, `END-ASYNC`. This is often used to respond to `PREPARE`, `LISTCONFIGS`,