From 3972a6b435d66704bc041ce691cdb0c78cbd7ae9 Mon Sep 17 00:00:00 2001 From: William Yang Date: Sun, 18 Apr 2021 23:24:16 +0200 Subject: [PATCH 1/4] perf(trie): use global lock Use global lock to reduce remote lock overhead. So that emqx route trans can run in dirty *sync* context. At least 10X subscribe/unsubscribe improvments. --- src/emqx_router.erl | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/src/emqx_router.erl b/src/emqx_router.erl index 754124298..1a8dd7786 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -118,7 +118,12 @@ do_add_route(Topic, Dest) when is_binary(Topic) -> false -> ok = emqx_router_helper:monitor(Dest), case emqx_topic:wildcard(Topic) of - true -> trans(fun insert_trie_route/1, [Route]); + true -> + lock_router(), + try trans(fun insert_trie_route/1, [Route]) + after + unlock_router() + end; false -> insert_direct_route(Route) end end. @@ -164,7 +169,12 @@ do_delete_route(Topic) when is_binary(Topic) -> do_delete_route(Topic, Dest) -> Route = #route{topic = Topic, dest = Dest}, case emqx_topic:wildcard(Topic) of - true -> trans(fun delete_trie_route/1, [Route]); + true -> + lock_router(), + try trans(fun delete_trie_route/1, [Route]) + after + unlock_router() + end; false -> delete_direct_route(Route) end. @@ -249,8 +259,19 @@ delete_trie_route(Route = #route{topic = Topic}) -> %% @private -spec(trans(function(), list(any())) -> ok | {error, term()}). trans(Fun, Args) -> - case mnesia:transaction(Fun, Args) of - {atomic, Ok} -> Ok; - {aborted, Reason} -> {error, Reason} + mnesia:sync_dirty(Fun, Args). + +lock_router() -> + %% if Retry is not 0, global:set_lock could sleep a random time up to 8s. + %% Considering we have a limited number of brokers, it is safe to use sleep 1 ms. + case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of + false -> + %% Force to sleep 1ms instead. + timer:sleep(1), + lock_router(); + true -> + ok end. +unlock_router() -> + global:del_lock({?MODULE, self()}). From 17870fdb39f1423425e9378eeca0701c10df3d6b Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 19 Apr 2021 14:37:40 +0200 Subject: [PATCH 2/4] perf(router): add route runs in async dirty context --- src/emqx_router.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_router.erl b/src/emqx_router.erl index 1a8dd7786..ed980cee5 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -259,7 +259,7 @@ delete_trie_route(Route = #route{topic = Topic}) -> %% @private -spec(trans(function(), list(any())) -> ok | {error, term()}). trans(Fun, Args) -> - mnesia:sync_dirty(Fun, Args). + mnesia:async_dirty(Fun, Args). lock_router() -> %% if Retry is not 0, global:set_lock could sleep a random time up to 8s. From 0166bb5a876927efebfaac10b2354357c60d1bf5 Mon Sep 17 00:00:00 2001 From: William Yang Date: Tue, 20 Apr 2021 21:57:47 +0200 Subject: [PATCH 3/4] fix: broker call should not timeout before client timeout So change broker call timeout to infinity. --- src/emqx_broker.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index c8d9c4b58..661262ad2 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -419,7 +419,7 @@ safe_update_stats(Tab, Stat, MaxStat) -> -compile({inline, [call/2, cast/2, pick/1]}). call(Broker, Req) -> - gen_server:call(Broker, Req). + gen_server:call(Broker, Req, infinity). cast(Broker, Msg) -> gen_server:cast(Broker, Msg). From 9b13bab2c90cfd60e37c9e08e5b3df585c710ded Mon Sep 17 00:00:00 2001 From: William Yang Date: Thu, 22 Apr 2021 11:04:56 +0200 Subject: [PATCH 4/4] perf: new perf toggle broker.perf.route_lock_type --- priv/emqx.schema | 10 ++++++++++ src/emqx_router.erl | 35 ++++++++++++++++++++++++----------- src/emqx_router_sup.erl | 4 ++++ src/emqx_trie.erl | 9 ++++++++- 4 files changed, 46 insertions(+), 12 deletions(-) diff --git a/priv/emqx.schema b/priv/emqx.schema index e5176ca56..8898d694c 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -2256,6 +2256,16 @@ end}. {datatype, flag} ]}. +%% @doc performance toggle for subscribe/unsubscribe wildcard topic +%% change this toggle only if you have many wildcard topics. +%% key: mnesia translational updates with per-key locks. recommended for single node setup. +%% tab: mnesia translational updates with table lock. recommended for multi-nodes setup. +%% global: global lock protected updates. recommended for larger cluster. +{mapping, "broker.perf.route_lock_type", "emqx.route_lock_type", [ + {default, key}, + {datatype, {enum, [key, tab, global]}} +]}. + %%-------------------------------------------------------------------- %% System Monitor %%-------------------------------------------------------------------- diff --git a/src/emqx_router.erl b/src/emqx_router.erl index ed980cee5..018c2b739 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -119,11 +119,7 @@ do_add_route(Topic, Dest) when is_binary(Topic) -> ok = emqx_router_helper:monitor(Dest), case emqx_topic:wildcard(Topic) of true -> - lock_router(), - try trans(fun insert_trie_route/1, [Route]) - after - unlock_router() - end; + maybe_trans(fun insert_trie_route/1, [Route]); false -> insert_direct_route(Route) end end. @@ -170,11 +166,7 @@ do_delete_route(Topic, Dest) -> Route = #route{topic = Topic, dest = Dest}, case emqx_topic:wildcard(Topic) of true -> - lock_router(), - try trans(fun delete_trie_route/1, [Route]) - after - unlock_router() - end; + maybe_trans(fun delete_trie_route/1, [Route]); false -> delete_direct_route(Route) end. @@ -257,9 +249,30 @@ delete_trie_route(Route = #route{topic = Topic}) -> end. %% @private +-spec(maybe_trans(function(), list(any())) -> ok | {error, term()}). +maybe_trans(Fun, Args) -> + case persistent_term:get(emqx_route_lock_type) of + key -> + trans(Fun, Args); + global -> + lock_router(), + try mnesia:sync_dirty(Fun, Args) + after + unlock_router() + end; + tab -> + trans(fun() -> + emqx_trie:lock_tables(), + apply(Fun, Args) + end, []) + end. + -spec(trans(function(), list(any())) -> ok | {error, term()}). trans(Fun, Args) -> - mnesia:async_dirty(Fun, Args). + case mnesia:transaction(Fun, Args) of + {atomic, Ok} -> Ok; + {aborted, Reason} -> {error, Reason} + end. lock_router() -> %% if Retry is not 0, global:set_lock could sleep a random time up to 8s. diff --git a/src/emqx_router_sup.erl b/src/emqx_router_sup.erl index 9060e58bf..9b0b397d4 100644 --- a/src/emqx_router_sup.erl +++ b/src/emqx_router_sup.erl @@ -34,6 +34,10 @@ init([]) -> type => worker, modules => [emqx_router_helper]}, + ok = persistent_term:put(emqx_route_lock_type, + application:get_env(emqx, route_lock_type, key) + ), + %% Router pool RouterPool = emqx_pool_sup:spec([router_pool, hash, {emqx_router, start_link, []}]), diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index ae85d92c2..ffc054c5d 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -31,7 +31,9 @@ , delete/1 ]). --export([empty/0]). +-export([ empty/0 + , lock_tables/0 + ]). -ifdef(TEST). -compile(export_all). @@ -122,6 +124,11 @@ delete(Topic) when is_binary(Topic) -> empty() -> ets:info(?TRIE_TAB, size) == 0. +-spec lock_tables() -> ok. +lock_tables() -> + mnesia:write_lock_table(?TRIE_TAB), + mnesia:write_lock_table(?TRIE_NODE_TAB). + %%-------------------------------------------------------------------- %% Internal functions %%--------------------------------------------------------------------