avoid building up a lot of threads all waiting for their chance to run a push
Only 2 threads are needed, one running, and one waiting to push something new. Any more is redundant and wasteful.
This commit is contained in:
parent
6e04194e84
commit
cc68b340ff
2 changed files with 59 additions and 18 deletions
|
@ -70,35 +70,72 @@ checkImportantNetMessages (storedclient, sentclient) = go <<~ netMessager
|
||||||
return (fromMaybe S.empty stored, fromMaybe S.empty sent)
|
return (fromMaybe S.empty stored, fromMaybe S.empty sent)
|
||||||
|
|
||||||
{- Runs an action that runs either the send or receive side of a push.
|
{- Runs an action that runs either the send or receive side of a push.
|
||||||
|
- Only one such action per side can run at a time. Other pushes, for
|
||||||
|
- the same, or other clients, need to wait their turn.
|
||||||
|
-
|
||||||
|
- Only one push is allowed to wait per client.
|
||||||
|
- There is no point in building up more.
|
||||||
-
|
-
|
||||||
- While the push is running, netMessagesPush will get messages put into it
|
- While the push is running, netMessagesPush will get messages put into it
|
||||||
- relating to this push, while any messages relating to other pushes
|
- relating to this push, while any messages relating to other pushes
|
||||||
- on the same side go to netMessagesDeferred. Once the push finishes,
|
- on the same side go to netMessagesDeferred. Once the push finishes,
|
||||||
- those deferred messages will be fed to handledeferred for processing.
|
- those deferred messages will be fed to handledeferred for processing.
|
||||||
-
|
|
||||||
- If this is called when a push of the same side is running, it will
|
|
||||||
- block until that push completes, and then run.
|
|
||||||
-}
|
-}
|
||||||
runPush :: PushSide -> ClientID -> Assistant a -> Assistant a
|
runPush :: PushSide -> ClientID -> Assistant Bool -> Assistant Bool
|
||||||
runPush side clientid a = do
|
runPush side clientid a = do
|
||||||
|
debugmsg "preparing to run"
|
||||||
nm <- getAssistant netMessager
|
nm <- getAssistant netMessager
|
||||||
let v = getSide side $ netMessagerPushRunning nm
|
|
||||||
debugmsg <- asIO1 $ \s -> netMessagerDebug clientid [s, show side]
|
|
||||||
let setup = do
|
let setup = do
|
||||||
debugmsg "preparing to run"
|
(canrun, release) <- atomically $ checkcanrun nm
|
||||||
atomically $ ifM (isNothing <$> tryReadTMVar v)
|
if canrun
|
||||||
( putTMVar v clientid
|
then atomically $ waittorun nm release
|
||||||
|
else return (False, noop)
|
||||||
|
let cleanup (_, release) = atomically release
|
||||||
|
go <- asIO1 $ \(run, _) ->
|
||||||
|
if run
|
||||||
|
then do
|
||||||
|
debugmsg "started running"
|
||||||
|
r <- a
|
||||||
|
debugmsg "finished running"
|
||||||
|
{- Empty the inbox, because stuff may have
|
||||||
|
- been left in it if the push failed. -}
|
||||||
|
emptyInbox clientid side
|
||||||
|
return r
|
||||||
|
else do
|
||||||
|
debugmsg "skipping running"
|
||||||
|
return False
|
||||||
|
r <- liftIO $ E.bracket setup cleanup go
|
||||||
|
return r
|
||||||
|
where
|
||||||
|
debugmsg s = netMessagerDebug clientid [s, show side]
|
||||||
|
-- check that this is one of the two threads allowed
|
||||||
|
-- to run at the same time, pushing to the same client
|
||||||
|
-- on the same side
|
||||||
|
checkcanrun nm = do
|
||||||
|
let v = getSide side $ netMessagerPushThreadCount nm
|
||||||
|
m <- readTVar v
|
||||||
|
case M.lookup clientid m of
|
||||||
|
Just count
|
||||||
|
| count > 2 -> return (False, noop)
|
||||||
|
_ -> do
|
||||||
|
writeTVar v $
|
||||||
|
M.insertWith' (+) clientid 1 m
|
||||||
|
let release = modifyTVar' v $
|
||||||
|
M.insertWith' (-) clientid 1
|
||||||
|
return (True, release)
|
||||||
|
-- block until this is the only thread performing
|
||||||
|
-- a push on this side, to any client
|
||||||
|
waittorun nm release = do
|
||||||
|
let v = getSide side $ netMessagerPushRunning nm
|
||||||
|
ifM (isNothing <$> tryReadTMVar v)
|
||||||
|
( do
|
||||||
|
putTMVar v clientid
|
||||||
|
let release' = do
|
||||||
|
void $ takeTMVar v
|
||||||
|
release
|
||||||
|
return (True, release')
|
||||||
, retry
|
, retry
|
||||||
)
|
)
|
||||||
debugmsg "started running"
|
|
||||||
let cleanup = do
|
|
||||||
debugmsg "finished running"
|
|
||||||
atomically $ takeTMVar v
|
|
||||||
r <- E.bracket_ setup cleanup <~> a
|
|
||||||
{- Empty the inbox, because stuff may have been left in it
|
|
||||||
- if the push failed. -}
|
|
||||||
emptyInbox clientid side
|
|
||||||
return r
|
|
||||||
|
|
||||||
{- Stores messages for a push into the appropriate inbox.
|
{- Stores messages for a push into the appropriate inbox.
|
||||||
-
|
-
|
||||||
|
|
|
@ -133,6 +133,9 @@ data NetMessager = NetMessager
|
||||||
-- only one side of a push can be running at a time
|
-- only one side of a push can be running at a time
|
||||||
-- the TMVars are empty when nothing is running
|
-- the TMVars are empty when nothing is running
|
||||||
, netMessagerPushRunning :: SideMap (TMVar ClientID)
|
, netMessagerPushRunning :: SideMap (TMVar ClientID)
|
||||||
|
-- number of threads trying to push to the same client
|
||||||
|
-- at the same time (either running, or waiting to run)
|
||||||
|
, netMessagerPushThreadCount :: SideMap (TVar (M.Map ClientID Int))
|
||||||
-- incoming messages containing data for a push,
|
-- incoming messages containing data for a push,
|
||||||
-- on a per-client and per-side basis
|
-- on a per-client and per-side basis
|
||||||
, netMessagesInboxes :: SideMap Inboxes
|
, netMessagesInboxes :: SideMap Inboxes
|
||||||
|
@ -146,3 +149,4 @@ newNetMessager = NetMessager
|
||||||
<*> newEmptySV
|
<*> newEmptySV
|
||||||
<*> mkSideMap newEmptyTMVar
|
<*> mkSideMap newEmptyTMVar
|
||||||
<*> mkSideMap (newTVar M.empty)
|
<*> mkSideMap (newTVar M.empty)
|
||||||
|
<*> mkSideMap (newTVar M.empty)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue