further work on external async relay

This commit is contained in:
Joey Hess 2020-08-12 16:25:53 -04:00
parent 15706e6991
commit c9e8cafb98
No known key found for this signature in database
GPG key ID: DB12DB0FF05F8F38
3 changed files with 44 additions and 50 deletions

View file

@ -615,9 +615,9 @@ startExternal external =
(st, extensions) <- startExternal' external
if asyncExtensionEnabled extensions
then do
v <- liftIO $ runRelayToExternalAsync external st
st' <- liftIO $ relayToExternalAsync v
store (ExternalAsync v)
relay <- liftIO $ runRelayToExternalAsync external st
st' <- liftIO $ asyncRelayExternalState relay
store (ExternalAsync relay)
return st'
else do
store NoExternalAsync
@ -627,7 +627,7 @@ startExternal external =
fst <$> startExternal' external
v@(ExternalAsync relay) -> do
store v
liftIO $ relayToExternalAsync relay
liftIO $ asyncRelayExternalState relay
where
store = liftIO . atomically . putTMVar (externalAsync external)

View file

@ -17,35 +17,19 @@ import Control.Concurrent.STM
import Control.Concurrent.STM.TBMChan
import qualified Data.Map.Strict as M
-- | Constructs an ExternalState that can be used to communicate with
-- the external process via the relay.
relayToExternalAsync :: ExternalAsyncRelay -> IO ExternalState
relayToExternalAsync relay = do
n <- liftIO $ atomically $ do
v <- readTVar (asyncRelayLastId relay)
let !n = succ v
writeTVar (asyncRelayLastId relay) n
return n
asyncRelayExternalState relay n
-- | Starts a thread that will handle all communication with the external
-- process. The input ExternalState communicates directly with the external
-- process.
runRelayToExternalAsync :: External -> ExternalState -> IO ExternalAsyncRelay
runRelayToExternalAsync external st = do
startcomm <- runRelayToExternalAsync' external st
pv <- atomically $ newTVar 1
return $ ExternalAsyncRelay
{ asyncRelayLastId = pv
, asyncRelayExternalState = relaystate startcomm
}
where
relaystate startcomm n = do
(sendh, receiveh, shutdownh) <- startcomm (ClientId n)
return $ ExternalAsyncRelay $ do
(sendh, receiveh, shutdown) <- startcomm
return $ ExternalState
{ externalSend = atomically . writeTBMChan sendh
, externalReceive = fmap join $ atomically $ readTBMChan receiveh
, externalShutdown = atomically . writeTBMChan shutdownh
-- This shuts down the whole relay.
, externalShutdown = shutdown
-- These three TVars are shared amoung all
-- ExternalStates that use this relay; they're
-- common state about the external process.
@ -56,65 +40,67 @@ runRelayToExternalAsync external st = do
, externalConfigChanges = externalConfigChanges st
}
newtype ClientId = ClientId Int
deriving (Show, Eq, Ord)
runRelayToExternalAsync'
:: External
-> ExternalState
-> IO (ClientId -> IO (TBMChan String, TBMChan (Maybe String), TBMChan Bool))
-> IO (IO (TBMChan String, TBMChan (Maybe String), Bool -> IO ()))
runRelayToExternalAsync' external st = do
let startcomm n = error "TODO"
sendt <- async sendloop
newreqs <- newTVarIO []
void $ async (receiveloop newreqs M.empty sendt)
startedcomms <- newTVarIO []
let startcomm = do
toq <- newTBMChanIO 10
fromq <- newTBMChanIO 10
let c = (toq, fromq, shutdown)
atomically $ do
l <- readTVar startedcomms
-- This append is ok because the maximum size
-- is the number of jobs that git-annex is
-- configured to use, which is a relatively
-- small number.
writeTVar startedcomms (l ++ [c])
return c
void $ async $ sendloop newreqs startedcomms
void $ async $ receiveloop newreqs M.empty
return startcomm
where
receiveloop newreqs jidmap sendt = externalReceive st >>= \case
receiveloop newreqs jidmap = externalReceive st >>= \case
Just l -> case parseMessage l :: Maybe AsyncMessage of
Just (RESULT_ASYNC msg) -> getnext newreqs >>= \case
Just c -> do
relayto c msg
receiveloop newreqs jidmap sendt
receiveloop newreqs jidmap
Nothing -> protoerr "unexpected RESULT-ASYNC"
Just (START_ASYNC jid msg) -> getnext newreqs >>= \case
Just c -> do
relayto c msg
let !jidmap' = M.insert jid c jidmap
receiveloop newreqs jidmap' sendt
receiveloop newreqs jidmap'
Nothing -> protoerr "unexpected START-ASYNC"
Just (END_ASYNC jid msg) -> case M.lookup jid jidmap of
Just c -> do
relayto c msg
closerelayto c
let !jidmap' = M.delete jid jidmap
receiveloop newreqs jidmap' sendt
receiveloop newreqs jidmap'
Nothing -> protoerr "END-ASYNC with unknown jobid"
Just (ASYNC jid msg) -> case M.lookup jid jidmap of
Just c -> do
relayto c msg
let !jidmap' = M.delete jid jidmap
receiveloop newreqs jidmap' sendt
receiveloop newreqs jidmap'
Nothing -> protoerr "ASYNC with unknown jobid"
_ -> protoerr "unexpected non-async message"
Nothing -> do
-- Unable to receive anything more from the
-- process, so it's not usable any longer.
-- So close all chans, stop the process,
-- and avoid any new ExternalStates from being
-- created using it.
cancel sendt
atomically $ do
void $ tryTakeTMVar (externalAsync external)
putTMVar (externalAsync external)
UncheckedExternalAsync
forM_ (M.elems jidmap) closerelayto
externalShutdown st True
shutdown True
sendloop = do
sendloop newreqs startedcomms = do
error "TODO"
relayto (toq, _fromq) msg = atomically $ writeTBMChan toq msg
relayto (toq, _fromq) msg =
atomically $ writeTBMChan toq msg
closerelayto (toq, fromq) = do
atomically $ closeTBMChan toq
@ -126,5 +112,14 @@ runRelayToExternalAsync' external st = do
writeTVar l rest
return (Just c)
shutdown b = do
r <- atomically $ do
r <- tryTakeTMVar (externalAsync external)
putTMVar (externalAsync external)
UncheckedExternalAsync
return r
case r of
Just (ExternalAsync _) -> externalShutdown st b
_ -> noop
protoerr s = giveup ("async special remote protocol error: " ++ s)

View file

@ -115,8 +115,7 @@ data ExternalAsync
| UncheckedExternalAsync
data ExternalAsyncRelay = ExternalAsyncRelay
{ asyncRelayLastId :: TVar Int
, asyncRelayExternalState :: Int -> IO ExternalState
{ asyncRelayExternalState :: IO ExternalState
}
data PrepareStatus = Unprepared | Prepared | FailedPrepare ErrorMsg