perf(broker): new perf toggle broker.perf.route_lock_type
This commit is contained in:
parent
154bf0d446
commit
e9c14df7a3
|
@ -2105,6 +2105,16 @@ end}.
|
|||
{datatype, flag}
|
||||
]}.
|
||||
|
||||
%% @doc performance toggle for subscribe/unsubscribe wildcard topic
|
||||
%% change this toggle only if you have many wildcard topics.
|
||||
%% key: mnesia translational updates with per-key locks. recommended for single node setup.
|
||||
%% tab: mnesia translational updates with table lock. recommended for multi-nodes setup.
|
||||
%% global: global lock protected updates. recommended for larger cluster.
|
||||
{mapping, "broker.perf.route_lock_type", "emqx.route_lock_type", [
|
||||
{default, key},
|
||||
{datatype, {enum, [key, tab, global]}}
|
||||
]}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% System Monitor
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -119,11 +119,7 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
|
|||
ok = emqx_router_helper:monitor(Dest),
|
||||
case emqx_topic:wildcard(Topic) of
|
||||
true ->
|
||||
lock_router(),
|
||||
try trans(fun insert_trie_route/1, [Route])
|
||||
after
|
||||
unlock_router()
|
||||
end;
|
||||
maybe_trans(fun insert_trie_route/1, [Route]);
|
||||
false -> insert_direct_route(Route)
|
||||
end
|
||||
end.
|
||||
|
@ -170,11 +166,7 @@ do_delete_route(Topic, Dest) ->
|
|||
Route = #route{topic = Topic, dest = Dest},
|
||||
case emqx_topic:wildcard(Topic) of
|
||||
true ->
|
||||
lock_router(),
|
||||
try trans(fun delete_trie_route/1, [Route])
|
||||
after
|
||||
unlock_router()
|
||||
end;
|
||||
maybe_trans(fun delete_trie_route/1, [Route]);
|
||||
false -> delete_direct_route(Route)
|
||||
end.
|
||||
|
||||
|
@ -257,9 +249,30 @@ delete_trie_route(Route = #route{topic = Topic}) ->
|
|||
end.
|
||||
|
||||
%% @private
|
||||
-spec(maybe_trans(function(), list(any())) -> ok | {error, term()}).
|
||||
maybe_trans(Fun, Args) ->
|
||||
case persistent_term:get(emqx_route_lock_type) of
|
||||
key ->
|
||||
trans(Fun, Args);
|
||||
global ->
|
||||
lock_router(),
|
||||
try mnesia:sync_dirty(Fun, Args)
|
||||
after
|
||||
unlock_router()
|
||||
end;
|
||||
tab ->
|
||||
trans(fun() ->
|
||||
emqx_trie:lock_tables(),
|
||||
apply(Fun, Args)
|
||||
end, [])
|
||||
end.
|
||||
|
||||
-spec(trans(function(), list(any())) -> ok | {error, term()}).
|
||||
trans(Fun, Args) ->
|
||||
mnesia:async_dirty(Fun, Args).
|
||||
case mnesia:transaction(Fun, Args) of
|
||||
{atomic, Ok} -> Ok;
|
||||
{aborted, Reason} -> {error, Reason}
|
||||
end.
|
||||
|
||||
lock_router() ->
|
||||
%% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
|
||||
|
|
|
@ -34,6 +34,10 @@ init([]) ->
|
|||
type => worker,
|
||||
modules => [emqx_router_helper]},
|
||||
|
||||
ok = persistent_term:put(emqx_route_lock_type,
|
||||
application:get_env(emqx, route_lock_type, key)
|
||||
),
|
||||
|
||||
%% Router pool
|
||||
RouterPool = emqx_pool_sup:spec([router_pool, hash,
|
||||
{emqx_router, start_link, []}]),
|
||||
|
|
|
@ -31,7 +31,9 @@
|
|||
, delete/1
|
||||
]).
|
||||
|
||||
-export([empty/0]).
|
||||
-export([ empty/0
|
||||
, lock_tables/0
|
||||
]).
|
||||
|
||||
-ifdef(TEST).
|
||||
-compile(export_all).
|
||||
|
@ -120,6 +122,11 @@ delete(Topic) when is_binary(Topic) ->
|
|||
empty() ->
|
||||
ets:info(?TRIE_TAB, size) == 0.
|
||||
|
||||
-spec lock_tables() -> ok.
|
||||
lock_tables() ->
|
||||
mnesia:write_lock_table(?TRIE_TAB),
|
||||
mnesia:write_lock_table(?TRIE_NODE_TAB).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
Loading…
Reference in New Issue