diff --git a/priv/emqx.schema b/priv/emqx.schema index 3692035de..a6cfa3fc6 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -2254,6 +2254,16 @@ end}. {datatype, flag} ]}. +%% @doc performance toggle for subscribe/unsubscribe wildcard topic +%% change this toggle only if you have many wildcard topics. +%% key: mnesia translational updates with per-key locks. recommended for single node setup. +%% tab: mnesia translational updates with table lock. recommended for multi-nodes setup. +%% global: global lock protected updates. recommended for larger cluster. +{mapping, "broker.perf.route_lock_type", "emqx.route_lock_type", [ + {default, key}, + {datatype, {enum, [key, tab, global]}} +]}. + %%-------------------------------------------------------------------- %% System Monitor %%-------------------------------------------------------------------- diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index f54f622aa..d3ad128bb 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -419,7 +419,7 @@ safe_update_stats(Tab, Stat, MaxStat) -> -compile({inline, [call/2, cast/2, pick/1]}). call(Broker, Req) -> - gen_server:call(Broker, Req). + gen_server:call(Broker, Req, infinity). cast(Broker, Msg) -> gen_server:cast(Broker, Msg). diff --git a/src/emqx_router.erl b/src/emqx_router.erl index 645b67152..afec1b288 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -118,7 +118,8 @@ do_add_route(Topic, Dest) when is_binary(Topic) -> false -> ok = emqx_router_helper:monitor(Dest), case emqx_topic:wildcard(Topic) of - true -> trans(fun insert_trie_route/1, [Route]); + true -> + maybe_trans(fun insert_trie_route/1, [Route]); false -> insert_direct_route(Route) end end. @@ -164,7 +165,8 @@ do_delete_route(Topic) when is_binary(Topic) -> do_delete_route(Topic, Dest) -> Route = #route{topic = Topic, dest = Dest}, case emqx_topic:wildcard(Topic) of - true -> trans(fun delete_trie_route/1, [Route]); + true -> + maybe_trans(fun delete_trie_route/1, [Route]); false -> delete_direct_route(Route) end. @@ -247,6 +249,24 @@ delete_trie_route(Route = #route{topic = Topic}) -> end. %% @private +-spec(maybe_trans(function(), list(any())) -> ok | {error, term()}). +maybe_trans(Fun, Args) -> + case persistent_term:get(emqx_route_lock_type) of + key -> + trans(Fun, Args); + global -> + lock_router(), + try mnesia:sync_dirty(Fun, Args) + after + unlock_router() + end; + tab -> + trans(fun() -> + emqx_trie:lock_tables(), + apply(Fun, Args) + end, []) + end. + -spec(trans(function(), list(any())) -> ok | {error, term()}). trans(Fun, Args) -> case mnesia:transaction(Fun, Args) of @@ -254,3 +274,17 @@ trans(Fun, Args) -> {aborted, Reason} -> {error, Reason} end. +lock_router() -> + %% if Retry is not 0, global:set_lock could sleep a random time up to 8s. + %% Considering we have a limited number of brokers, it is safe to use sleep 1 ms. + case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of + false -> + %% Force to sleep 1ms instead. + timer:sleep(1), + lock_router(); + true -> + ok + end. + +unlock_router() -> + global:del_lock({?MODULE, self()}). diff --git a/src/emqx_router_sup.erl b/src/emqx_router_sup.erl index 974ee4a91..1105a476b 100644 --- a/src/emqx_router_sup.erl +++ b/src/emqx_router_sup.erl @@ -34,6 +34,10 @@ init([]) -> type => worker, modules => [emqx_router_helper]}, + ok = persistent_term:put(emqx_route_lock_type, + application:get_env(emqx, route_lock_type, key) + ), + %% Router pool RouterPool = emqx_pool_sup:spec([router_pool, hash, {emqx_router, start_link, []}]), diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index 3a1034080..9eb42a56f 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -31,7 +31,9 @@ , delete/1 ]). --export([empty/0]). +-export([ empty/0 + , lock_tables/0 + ]). -ifdef(TEST). -compile(export_all). @@ -122,6 +124,11 @@ delete(Topic) when is_binary(Topic) -> empty() -> ets:info(?TRIE_TAB, size) == 0. +-spec lock_tables() -> ok. +lock_tables() -> + mnesia:write_lock_table(?TRIE_TAB), + mnesia:write_lock_table(?TRIE_NODE_TAB). + %%-------------------------------------------------------------------- %% Internal functions %%--------------------------------------------------------------------