perf(broker): speedup trans when broker has a big mqueue

This commit is contained in:
William Yang 2021-04-28 23:26:11 +02:00 committed by Zaiming (Stone) Shi
parent a4d8ef4f93
commit 74bc7c6a07
3 changed files with 36 additions and 26 deletions

View File

@ -724,6 +724,7 @@ zone.external.acl_deny_action = ignore
## Numbers delimited by `|'. Zero or negative is to disable.
zone.external.force_gc_policy = 16000|16MB
## Max message queue length and total heap size to force shutdown
## connection/session process.
## Message queue here is the Erlang process mailbox, but not the number
@ -736,6 +737,16 @@ zone.external.force_gc_policy = 16000|16MB
## - 1000|32MB on ARCH_32 sytem
#zone.external.force_shutdown_policy = 10000|64MB
## Performance toggle for subscribe/unsubscribe wildcard topic.
## Change this toggle only when there are many wildcard topics.
## Value: Enum
## - key: mnesia translational updates with per-key locks. recommended for single node setup.
## - tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
## - global: global lock protected updates. recommended for larger cluster.
## NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster
## to be stopped before the change.
broker.perf.route_lock_type = key
## Maximum MQTT packet size allowed.
##
## Value: Bytes

View File

@ -2105,16 +2105,16 @@ end}.
{datatype, flag}
]}.
%% @doc performance toggle for subscribe/unsubscribe wildcard topic
%% change this toggle only if you have many wildcard topics.
%% @doc Performance toggle for subscribe/unsubscribe wildcard topic.
%% Change this toggle only when there are many wildcard topics.
%% key: mnesia translational updates with per-key locks. recommended for single node setup.
%% tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
%% global: global lock protected updates. recommended for larger cluster.
%% spawn: same as `key', but transaction is done in another proc, ideal for handling bursty traffic.
%% NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster
%%
{mapping, "broker.perf.route_lock_type", "emqx.route_lock_type", [
{default, key},
{datatype, {enum, [key, tab, global, spawn]}}
{datatype, {enum, [key, tab, global]}}
]}.
%%--------------------------------------------------------------------

View File

@ -264,32 +264,31 @@ maybe_trans(Fun, Args) ->
trans(fun() ->
emqx_trie:lock_tables(),
apply(Fun, Args)
end, []);
spawn ->
%% trigger selective receive optimization of compiler,
%% ideal for handling busty traffic.
Ref = erlang:make_ref(),
Owner = self(),
{WPid, RefMon} = spawn_monitor(fun() ->
Res = trans(Fun, Args),
Owner ! {Ref, Res}
end),
receive
{Ref, TransRes} ->
receive
{'DOWN', RefMon, process, WPid, normal} -> ok
end,
TransRes;
{'DOWN', RefMon, process, WPid, _Info} ->
{error, trans_crash}
end
end, [])
end.
-spec(trans(function(), list(any())) -> ok | {error, term()}).
trans(Fun, Args) ->
case mnesia:transaction(Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
%% trigger selective receive optimization of compiler,
%% ideal for handling bursty traffic.
Ref = erlang:make_ref(),
Owner = self(),
{WPid, RefMon} = spawn_monitor(
fun() ->
Res = case mnesia:transaction(Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
end,
Owner ! {Ref, Res}
end),
receive
{Ref, TransRes} ->
receive
{'DOWN', RefMon, process, WPid, normal} -> ok
end,
TransRes;
{'DOWN', RefMon, process, WPid, Info} ->
{error, {trans_crash, Info}}
end.
lock_router() ->