perf(broker): speedup trans when broker has a big mqueue

This commit is contained in:
William Yang 2021-04-28 23:26:11 +02:00 committed by turtleDeng
parent e02704e05a
commit ca9b90be9a
3 changed files with 35 additions and 26 deletions

View File

@ -296,6 +296,16 @@ broker.shared_dispatch_ack_enabled = false
## Value: Flag
broker.route_batch_clean = off
## Performance toggle for subscribe/unsubscribe wildcard topic.
## Change this toggle only when there are many wildcard topics.
## Value: Enum
## - key: mnesia translational updates with per-key locks. recommended for single node setup.
## - tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
## - global: global lock protected updates. recommended for larger cluster.
## NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster
## to be stopped before the change.
broker.perf.route_lock_type = key
##-------------------------------------------------------------------
## Plugins
##-------------------------------------------------------------------

View File

@ -2026,16 +2026,16 @@ end}.
{datatype, flag}
]}.
%% @doc performance toggle for subscribe/unsubscribe wildcard topic
%% change this toggle only if you have many wildcard topics.
%% @doc Performance toggle for subscribe/unsubscribe wildcard topic.
%% Change this toggle only when there are many wildcard topics.
%% key: mnesia translational updates with per-key locks. recommended for single node setup.
%% tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
%% global: global lock protected updates. recommended for larger cluster.
%% spawn: same as `key', but transaction is done in another proc, ideal for handling bursty traffic.
%% NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster
%%
{mapping, "broker.perf.route_lock_type", "emqx.route_lock_type", [
{default, key},
{datatype, {enum, [key, tab, global, spawn]}}
{datatype, {enum, [key, tab, global]}}
]}.
%%--------------------------------------------------------------------

View File

@ -264,32 +264,31 @@ maybe_trans(Fun, Args) ->
trans(fun() ->
emqx_trie:lock_tables(),
apply(Fun, Args)
end, []);
spawn ->
%% trigger selective receive optimization of compiler,
%% ideal for handling busty traffic.
Ref = erlang:make_ref(),
Owner = self(),
{WPid, RefMon} = spawn_monitor(fun() ->
Res = trans(Fun, Args),
Owner ! {Ref, Res}
end),
receive
{Ref, TransRes} ->
receive
{'DOWN', RefMon, process, WPid, normal} -> ok
end,
TransRes;
{'DOWN', RefMon, process, WPid, _Info} ->
{error, trans_crash}
end
end, [])
end.
-spec(trans(function(), list(any())) -> ok | {error, term()}).
trans(Fun, Args) ->
case mnesia:transaction(Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
%% trigger selective receive optimization of compiler,
%% ideal for handling bursty traffic.
Ref = erlang:make_ref(),
Owner = self(),
{WPid, RefMon} = spawn_monitor(
fun() ->
Res = case mnesia:transaction(Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
end,
Owner ! {Ref, Res}
end),
receive
{Ref, TransRes} ->
receive
{'DOWN', RefMon, process, WPid, normal} -> ok
end,
TransRes;
{'DOWN', RefMon, process, WPid, Info} ->
{error, {trans_crash, Info}}
end.
lock_router() ->