diff --git a/priv/emqx.schema b/priv/emqx.schema index 112469e2b..7b924b4e3 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -2026,6 +2026,16 @@ end}. {datatype, flag} ]}. +%% @doc performance toggle for subscribe/unsubscribe wildcard topic +%% change this toggle only if you have many wildcard topics. +%% key: mnesia translational updates with per-key locks. recommended for single node setup. +%% tab: mnesia translational updates with table lock. recommended for multi-nodes setup. +%% global: global lock protected updates. recommended for larger cluster. +{mapping, "broker.perf.route_lock_type", "emqx.route_lock_type", [ + {default, key}, + {datatype, {enum, [key, tab, global]}} +]}. + %%-------------------------------------------------------------------- %% System Monitor %%-------------------------------------------------------------------- diff --git a/src/emqx_router.erl b/src/emqx_router.erl index 26b5ea444..b77311e62 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -119,11 +119,7 @@ do_add_route(Topic, Dest) when is_binary(Topic) -> ok = emqx_router_helper:monitor(Dest), case emqx_topic:wildcard(Topic) of true -> - lock_router(), - try trans(fun insert_trie_route/1, [Route]) - after - unlock_router() - end; + maybe_trans(fun insert_trie_route/1, [Route]); false -> insert_direct_route(Route) end end. @@ -170,11 +166,7 @@ do_delete_route(Topic, Dest) -> Route = #route{topic = Topic, dest = Dest}, case emqx_topic:wildcard(Topic) of true -> - lock_router(), - try trans(fun delete_trie_route/1, [Route]) - after - unlock_router() - end; + maybe_trans(fun delete_trie_route/1, [Route]); false -> delete_direct_route(Route) end. @@ -257,9 +249,30 @@ delete_trie_route(Route = #route{topic = Topic}) -> end. %% @private +-spec(maybe_trans(function(), list(any())) -> ok | {error, term()}). +maybe_trans(Fun, Args) -> + case persistent_term:get(emqx_route_lock_type) of + key -> + trans(Fun, Args); + global -> + lock_router(), + try mnesia:sync_dirty(Fun, Args) + after + unlock_router() + end; + tab -> + trans(fun() -> + emqx_trie:lock_tables(), + apply(Fun, Args) + end, []) + end. + -spec(trans(function(), list(any())) -> ok | {error, term()}). trans(Fun, Args) -> - mnesia:async_dirty(Fun, Args). + case mnesia:transaction(Fun, Args) of + {atomic, Ok} -> Ok; + {aborted, Reason} -> {error, Reason} + end. lock_router() -> %% if Retry is not 0, global:set_lock could sleep a random time up to 8s. diff --git a/src/emqx_router_sup.erl b/src/emqx_router_sup.erl index 9060e58bf..9b0b397d4 100644 --- a/src/emqx_router_sup.erl +++ b/src/emqx_router_sup.erl @@ -34,6 +34,10 @@ init([]) -> type => worker, modules => [emqx_router_helper]}, + ok = persistent_term:put(emqx_route_lock_type, + application:get_env(emqx, route_lock_type, key) + ), + %% Router pool RouterPool = emqx_pool_sup:spec([router_pool, hash, {emqx_router, start_link, []}]), diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index 4189c0270..a5c3aa773 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -31,7 +31,9 @@ , delete/1 ]). --export([empty/0]). +-export([ empty/0 + , lock_tables/0 + ]). -ifdef(TEST). -compile(export_all). @@ -120,6 +122,11 @@ delete(Topic) when is_binary(Topic) -> empty() -> ets:info(?TRIE_TAB, size) == 0. +-spec lock_tables() -> ok. +lock_tables() -> + mnesia:write_lock_table(?TRIE_TAB), + mnesia:write_lock_table(?TRIE_NODE_TAB). + %%-------------------------------------------------------------------- %% Internal functions %%--------------------------------------------------------------------