async proto fully tested and working
Including with a concurrent capable remote program. However, this is not quite ready to merge, there's a TODO in the code.
This commit is contained in:
parent
7546e686a2
commit
59cbb42ee2
1 changed files with 14 additions and 15 deletions
29
Remote/External/AsyncExtension.hs
vendored
29
Remote/External/AsyncExtension.hs
vendored
|
@ -85,26 +85,21 @@ receiveloop external st newconns jidmap mapjid sendq = externalReceive st >>= \c
|
||||||
Just (_n, c) -> do
|
Just (_n, c) -> do
|
||||||
relayto c msg
|
relayto c msg
|
||||||
loop
|
loop
|
||||||
Nothing -> abort "unexpected RESULT-ASYNC"
|
Nothing -> protoerr "unexpected RESULT-ASYNC"
|
||||||
Just (START_ASYNC jid) -> getnext newconns >>= \case
|
Just (START_ASYNC jid) -> getnext newconns >>= \case
|
||||||
Just v@(n, _c) -> do
|
Just v@(n, _c) -> do
|
||||||
atomically $ do
|
atomically $ do
|
||||||
modifyTVar' jidmap $ M.insert jid v
|
modifyTVar' jidmap $ M.insert jid v
|
||||||
modifyTVar' mapjid $ M.insert n jid
|
modifyTVar' mapjid $ M.insert n jid
|
||||||
loop
|
loop
|
||||||
Nothing -> abort "unexpected START-ASYNC"
|
Nothing -> protoerr "unexpected START-ASYNC"
|
||||||
Just (ASYNC jid msg) -> getjid jid >>= \case
|
Just (ASYNC jid msg) -> getjid jid >>= \case
|
||||||
Just (_n, c) -> do
|
Just (_n, c) -> do
|
||||||
relayto c msg
|
relayto c msg
|
||||||
loop
|
loop
|
||||||
Nothing -> abort "ASYNC with unknown jobid"
|
Nothing -> protoerr "ASYNC with unknown jobid"
|
||||||
_ -> abort "unexpected non-async message"
|
_ -> protoerr "unexpected non-async message"
|
||||||
Nothing -> do
|
Nothing -> closeandshutdown
|
||||||
-- Unable to receive anything more from the
|
|
||||||
-- process, so it's not usable any longer.
|
|
||||||
m <- readTVarIO jidmap
|
|
||||||
forM_ (M.elems m) (closerelayto . snd)
|
|
||||||
shutdown external st sendq True
|
|
||||||
where
|
where
|
||||||
loop = receiveloop external st newconns jidmap mapjid sendq
|
loop = receiveloop external st newconns jidmap mapjid sendq
|
||||||
|
|
||||||
|
@ -120,9 +115,16 @@ receiveloop external st newconns jidmap mapjid sendq = externalReceive st >>= \c
|
||||||
|
|
||||||
getjid jid = M.lookup jid <$> readTVarIO jidmap
|
getjid jid = M.lookup jid <$> readTVarIO jidmap
|
||||||
|
|
||||||
abort s = do
|
protoerr s = do
|
||||||
warningIO (protoerr s)
|
warningIO $ "async external special remote protocol error: " ++ s
|
||||||
|
closeandshutdown
|
||||||
|
|
||||||
|
closeandshutdown = do
|
||||||
shutdown external st sendq True
|
shutdown external st sendq True
|
||||||
|
(m, l) <- atomically $ (,)
|
||||||
|
<$> readTVar jidmap
|
||||||
|
<*> readTVar newconns
|
||||||
|
forM_ (M.elems m ++ l) (closerelayto . snd)
|
||||||
|
|
||||||
sendloop :: ExternalState -> NewConns -> MapJid -> JidMap -> SendQueue -> IO ()
|
sendloop :: ExternalState -> NewConns -> MapJid -> JidMap -> SendQueue -> IO ()
|
||||||
sendloop st newconns mapjid jidmap sendq = atomically (readTBMChan sendq) >>= \case
|
sendloop st newconns mapjid jidmap sendq = atomically (readTBMChan sendq) >>= \case
|
||||||
|
@ -168,6 +170,3 @@ shutdown external st sendq b = do
|
||||||
Just (ExternalAsync _) -> externalShutdown st b
|
Just (ExternalAsync _) -> externalShutdown st b
|
||||||
_ -> noop
|
_ -> noop
|
||||||
atomically $ closeTBMChan sendq
|
atomically $ closeTBMChan sendq
|
||||||
|
|
||||||
protoerr :: String -> String
|
|
||||||
protoerr s = "async external special remote protocol error: " ++ s
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue