From 9eccfa09093dc798229fcaea757dd3308fbcd807 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 7 Aug 2023 22:28:37 +0400 Subject: [PATCH 01/26] refactor(router): isolate cleanup logic in router module --- apps/emqx/src/emqx_router.erl | 16 ++++++++++++++++ apps/emqx/src/emqx_router_helper.erl | 9 +-------- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 95b6136a7..fcb9b4e45 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -46,6 +46,8 @@ do_delete_route/2 ]). +-export([cleanup_routes/1]). + -export([ match_routes/1, lookup_routes/1, @@ -70,6 +72,8 @@ -type dest() :: node() | {group(), node()}. +-dialyzer({nowarn_function, [cleanup_routes/1]}). + %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- @@ -196,6 +200,18 @@ print_routes(Topic) -> match_routes(Topic) ). +-spec cleanup_routes(node()) -> ok. +cleanup_routes(Node) -> + Patterns = [ + #route{_ = '_', dest = Node}, + #route{_ = '_', dest = {'_', Node}} + ], + [ + mnesia:delete_object(?ROUTE_TAB, Route, write) + || Pat <- Patterns, + Route <- mnesia:match_object(?ROUTE_TAB, Pat, write) + ]. + call(Router, Msg) -> gen_server:call(Router, Msg, infinity). diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index 8d96bf81d..61573fcff 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -197,11 +197,4 @@ stats_fun() -> end. cleanup_routes(Node) -> - Patterns = [ - #route{_ = '_', dest = Node}, - #route{_ = '_', dest = {'_', Node}} - ], - [ - mnesia:delete_object(?ROUTE_TAB, Route, write) - || Pat <- Patterns, Route <- mnesia:match_object(?ROUTE_TAB, Pat, write) - ]. + emqx_router:cleanup_routes(Node). From dcb63440bcdcd42ad5e8a74d1921607ef4f79666 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 8 Aug 2023 15:34:04 +0400 Subject: [PATCH 02/26] refactor(mgmt): avoid dealing with router tab directly Instead, contain all this behind `emqx_router` module interface. --- apps/emqx/src/emqx_router.erl | 13 ++++++++++++ apps/emqx_management/src/emqx_mgmt_cli.erl | 24 +++++----------------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index fcb9b4e45..a403efa84 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -56,6 +56,11 @@ -export([print_routes/1]). +-export([ + foldl_routes/2, + foldr_routes/2 +]). + -export([topics/0]). %% gen_server callbacks @@ -212,6 +217,14 @@ cleanup_routes(Node) -> Route <- mnesia:match_object(?ROUTE_TAB, Pat, write) ]. +-spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. +foldl_routes(FoldFun, AccIn) -> + ets:foldl(FoldFun, AccIn, ?ROUTE_TAB). + +-spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. +foldr_routes(FoldFun, AccIn) -> + ets:foldr(FoldFun, AccIn, ?ROUTE_TAB). + call(Router, Msg) -> gen_server:call(Router, Msg, infinity). diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index aeed5b922..121e5b1a8 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -22,9 +22,6 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). --include("emqx_mgmt.hrl"). - --define(PRINT_CMD(Cmd, Descr), io:format("~-48s# ~ts~n", [Cmd, Descr])). -define(DATA_BACKUP_OPTS, #{print_fun => fun emqx_ctl:print/2}). -export([load/0]). @@ -49,20 +46,6 @@ data/1 ]). --define(PROC_INFOKEYS, [ - status, - memory, - message_queue_len, - total_heap_size, - heap_size, - stack_size, - reductions -]). - --define(MAX_LIMIT, 10000). - --define(APP, emqx). - -spec load() -> ok. load() -> Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)], @@ -197,9 +180,12 @@ if_client(ClientId, Fun) -> %% @doc Topics Command topics(["list"]) -> - dump(?ROUTE_TAB, emqx_topic); + emqx_router:foldr_routes( + fun(Route, Acc) -> [print({emqx_topic, Route}) | Acc] end, + [] + ); topics(["show", Topic]) -> - Routes = ets:lookup(?ROUTE_TAB, bin(Topic)), + Routes = emqx_router:lookup_routes(Topic), [print({emqx_topic, Route}) || Route <- Routes]; topics(_) -> emqx_ctl:usage([ From 84e40fb6fef90a1dc1466091e6a417b48c41cedf Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 15 Aug 2023 11:18:22 +0400 Subject: [PATCH 03/26] test(trie): add more involved route add + delete case That shows how current local trie implementation breaks because of lack of refcounting. --- apps/emqx/test/emqx_router_SUITE.erl | 49 ++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/apps/emqx/test/emqx_router_SUITE.erl b/apps/emqx/test/emqx_router_SUITE.erl index 067f11634..453f86257 100644 --- a/apps/emqx/test/emqx_router_SUITE.erl +++ b/apps/emqx/test/emqx_router_SUITE.erl @@ -79,6 +79,55 @@ t_add_delete(_) -> ?R:delete_route(<<"a/+/b">>, node()), ?assertEqual([], ?R:topics()). +t_add_delete_incremental(_) -> + ?R:add_route(<<"a/b/c">>), + ?R:add_route(<<"a/+/c">>, node()), + ?R:add_route(<<"a/+/+">>, node()), + ?R:add_route(<<"a/b/#">>, node()), + ?R:add_route(<<"#">>, node()), + ?assertEqual( + [ + #route{topic = <<"#">>, dest = node()}, + #route{topic = <<"a/+/+">>, dest = node()}, + #route{topic = <<"a/+/c">>, dest = node()}, + #route{topic = <<"a/b/#">>, dest = node()}, + #route{topic = <<"a/b/c">>, dest = node()} + ], + lists:sort(?R:match_routes(<<"a/b/c">>)) + ), + ?R:delete_route(<<"a/+/c">>, node()), + ?assertEqual( + [ + #route{topic = <<"#">>, dest = node()}, + #route{topic = <<"a/+/+">>, dest = node()}, + #route{topic = <<"a/b/#">>, dest = node()}, + #route{topic = <<"a/b/c">>, dest = node()} + ], + lists:sort(?R:match_routes(<<"a/b/c">>)) + ), + ?R:delete_route(<<"a/+/+">>, node()), + ?assertEqual( + [ + #route{topic = <<"#">>, dest = node()}, + #route{topic = <<"a/b/#">>, dest = node()}, + #route{topic = <<"a/b/c">>, dest = node()} + ], + lists:sort(?R:match_routes(<<"a/b/c">>)) + ), + ?R:delete_route(<<"a/b/#">>, node()), + ?assertEqual( + [ + #route{topic = <<"#">>, dest = node()}, + #route{topic = <<"a/b/c">>, dest = node()} + ], + lists:sort(?R:match_routes(<<"a/b/c">>)) + ), + ?R:delete_route(<<"a/b/c">>, node()), + ?assertEqual( + [#route{topic = <<"#">>, dest = node()}], + lists:sort(?R:match_routes(<<"a/b/c">>)) + ). + t_do_add_delete(_) -> ?R:do_add_route(<<"a/b/c">>), ?R:do_add_route(<<"a/b/c">>, node()), From 166375a0003a31d98a3bda9e6a4732bb4401dc5a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 16 Aug 2023 15:01:31 +0400 Subject: [PATCH 04/26] fix(topicidx): make `get_record/2` simpler to use in concurrent env The mechanic of `emqx_topic_index` cannot really guarantee atomicity of reading records associated with index matches, so instead it's probably better to make the user aware of that lack of this guarantee. --- apps/emqx/src/emqx_topic_index.erl | 12 +++++++++--- apps/emqx_rule_engine/src/emqx_rule_engine.erl | 5 +++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/apps/emqx/src/emqx_topic_index.erl b/apps/emqx/src/emqx_topic_index.erl index acbf36627..0b153ac01 100644 --- a/apps/emqx/src/emqx_topic_index.erl +++ b/apps/emqx/src/emqx_topic_index.erl @@ -73,10 +73,16 @@ get_topic(Key) -> emqx_trie_search:get_topic(Key). %% @doc Fetch the record associated with the match. -%% NOTE: Only really useful for ETS tables where the record ID is the first element. --spec get_record(match(_ID), ets:table()) -> _Record. +%% May return empty list if the index entry was deleted in the meantime. +%% NOTE: Only really useful for ETS tables where the record data is the last element. +-spec get_record(match(_ID), ets:table()) -> [_Record]. get_record(K, Tab) -> - ets:lookup_element(Tab, K, 2). + case ets:lookup(Tab, K) of + [Entry] -> + [erlang:element(tuple_size(Entry), Entry)]; + [] -> + [] + end. key(TopicOrFilter, ID) -> emqx_trie_search:make_key(TopicOrFilter, ID). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 41d1ed433..0060ed819 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -225,8 +225,9 @@ get_rules_ordered_by_ts() -> -spec get_rules_for_topic(Topic :: binary()) -> [rule()]. get_rules_for_topic(Topic) -> [ - emqx_topic_index:get_record(M, ?RULE_TOPIC_INDEX) - || M <- emqx_topic_index:matches(Topic, ?RULE_TOPIC_INDEX, [unique]) + Rule + || M <- emqx_topic_index:matches(Topic, ?RULE_TOPIC_INDEX, [unique]), + Rule <- emqx_topic_index:get_record(M, ?RULE_TOPIC_INDEX) ]. -spec get_rules_with_same_event(Topic :: binary()) -> [rule()]. From 33e5e1ba5765d8f3d632ca24471633db71d8b64a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 25 Aug 2023 14:49:01 +0400 Subject: [PATCH 05/26] feat(router): add unified routing table --- apps/emqx/include/emqx_router.hrl | 3 +- apps/emqx/src/emqx_router.erl | 221 +++++++++++++++++--- apps/emqx/src/emqx_router_helper.erl | 8 +- apps/emqx/src/emqx_topic_index.erl | 13 +- apps/emqx/test/emqx_router_SUITE.erl | 45 ++-- apps/emqx/test/emqx_router_helper_SUITE.erl | 70 ++++--- 6 files changed, 280 insertions(+), 80 deletions(-) diff --git a/apps/emqx/include/emqx_router.hrl b/apps/emqx/include/emqx_router.hrl index 035ff5455..99ca3e185 100644 --- a/apps/emqx/include/emqx_router.hrl +++ b/apps/emqx/include/emqx_router.hrl @@ -17,8 +17,9 @@ -ifndef(EMQX_ROUTER_HRL). -define(EMQX_ROUTER_HRL, true). -%% ETS table for message routing +%% ETS tables for message routing -define(ROUTE_TAB, emqx_route). +-define(ROUTE_TAB_UNIFIED, emqx_route_unified). %% Mnesia table for message routing -define(ROUTING_NODE, emqx_routing_node). diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index a403efa84..d286af1d7 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -21,7 +21,6 @@ -include("emqx.hrl"). -include("logger.hrl"). -include("types.hrl"). --include_lib("mria/include/mria.hrl"). -include_lib("emqx/include/emqx_router.hrl"). %% Mnesia bootstrap @@ -73,11 +72,19 @@ code_change/3 ]). +%% test / debugging purposes +-export([is_unified_table_active/0]). + -type group() :: binary(). -type dest() :: node() | {group(), node()}. --dialyzer({nowarn_function, [cleanup_routes/1]}). +-record(routeidx, { + entry :: emqx_topic_index:key(dest()), + unused = [] :: nil() +}). + +-dialyzer({nowarn_function, [cleanup_routes_regular/1]}). %%-------------------------------------------------------------------- %% Mnesia bootstrap @@ -97,6 +104,19 @@ mnesia(boot) -> {write_concurrency, true} ]} ]} + ]), + ok = mria:create_table(?ROUTE_TAB_UNIFIED, [ + {type, ordered_set}, + {rlog_shard, ?ROUTE_SHARD}, + {storage, ram_copies}, + {record_name, routeidx}, + {attributes, record_info(fields, routeidx)}, + {storage_properties, [ + {ets, [ + {read_concurrency, true}, + {write_concurrency, auto} + ]} + ]} ]). %%-------------------------------------------------------------------- @@ -130,31 +150,54 @@ do_add_route(Topic) when is_binary(Topic) -> -spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}. do_add_route(Topic, Dest) when is_binary(Topic) -> - Route = #route{topic = Topic, dest = Dest}, - case lists:member(Route, lookup_routes(Topic)) of + case has_route(Topic, Dest) of true -> ok; false -> ok = emqx_router_helper:monitor(Dest), - case emqx_topic:wildcard(Topic) of - true -> - Fun = fun emqx_router_utils:insert_trie_route/2, - emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD); - false -> - emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route) - end + mria_insert_route(is_unified_table_active(), Topic, Dest) end. +mria_insert_route(_Unified = true, Topic, Dest) -> + mria_insert_route_unified(Topic, Dest); +mria_insert_route(_Unified = false, Topic, Dest) -> + Route = #route{topic = Topic, dest = Dest}, + case emqx_topic:wildcard(Topic) of + true -> + mria_insert_route_update_trie(Route); + false -> + mria_insert_route(Route) + end. + +mria_insert_route_unified(Topic, Dest) -> + K = emqx_topic_index:make_key(Topic, Dest), + mria:dirty_write(?ROUTE_TAB_UNIFIED, #routeidx{entry = K}). + +mria_insert_route_update_trie(Route) -> + emqx_router_utils:maybe_trans( + fun emqx_router_utils:insert_trie_route/2, + [?ROUTE_TAB, Route], + ?ROUTE_SHARD + ). + +mria_insert_route(Route) -> + mria:dirty_write(?ROUTE_TAB, Route). + %% @doc Match routes -spec match_routes(emqx_types:topic()) -> [emqx_types:route()]. match_routes(Topic) when is_binary(Topic) -> - case match_trie(Topic) of - [] -> lookup_routes(Topic); - Matched -> lists:append([lookup_routes(To) || To <- [Topic | Matched]]) - end. + match_routes(is_unified_table_active(), Topic). -%% Optimize: routing table will be replicated to all router nodes. -match_trie(Topic) -> +match_routes(_Unified = true, Topic) -> + [match_to_route(M) || M <- match_unified(Topic)]; +match_routes(_Unified = false, Topic) -> + lookup_routes_regular(Topic) ++ + lists:flatmap(fun lookup_routes_regular/1, match_global_trie(Topic)). + +match_unified(Topic) -> + emqx_topic_index:matches(Topic, ?ROUTE_TAB_UNIFIED, []). + +match_global_trie(Topic) -> case emqx_trie:empty() of true -> []; false -> emqx_trie:match(Topic) @@ -162,12 +205,59 @@ match_trie(Topic) -> -spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()]. lookup_routes(Topic) -> + case is_unified_table_active() of + true -> + lookup_routes_unified(Topic); + false -> + lookup_routes_regular(Topic) + end. + +lookup_routes_unified(Topic) -> + Pat = #routeidx{entry = emqx_topic_index:make_key(Topic, '$1')}, + [Dest || [Dest] <- ets:match(?ROUTE_TAB_UNIFIED, Pat)]. + +lookup_routes_regular(Topic) -> ets:lookup(?ROUTE_TAB, Topic). +match_to_route(M) -> + #route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}. + -spec has_routes(emqx_types:topic()) -> boolean(). has_routes(Topic) when is_binary(Topic) -> + case is_unified_table_active() of + true -> + has_routes_unified(Topic); + false -> + has_routes_regular(Topic) + end. + +has_routes_unified(Topic) -> + Pat = #routeidx{entry = emqx_topic_index:mk_key(Topic, '$1'), _ = '_'}, + case ets:match(?ROUTE_TAB_UNIFIED, Pat, 1) of + {[_], _} -> + true; + _ -> + false + end. + +has_routes_regular(Topic) -> ets:member(?ROUTE_TAB, Topic). +-spec has_route(emqx_types:topic(), dest()) -> boolean(). +has_route(Topic, Dest) -> + case is_unified_table_active() of + true -> + has_route_unified(Topic, Dest); + false -> + has_route_regular(Topic, Dest) + end. + +has_route_unified(Topic, Dest) -> + ets:member(?ROUTE_TAB_UNIFIED, emqx_topic_index:make_key(Topic, Dest)). + +has_route_regular(Topic, Dest) -> + lists:any(fun(Route) -> Route#route.dest =:= Dest end, ets:lookup(?ROUTE_TAB, Topic)). + -spec delete_route(emqx_types:topic()) -> ok | {error, term()}. delete_route(Topic) when is_binary(Topic) -> delete_route(Topic, node()). @@ -182,17 +272,54 @@ do_delete_route(Topic) when is_binary(Topic) -> -spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}. do_delete_route(Topic, Dest) -> + mria_delete_route(is_unified_table_active(), Topic, Dest). + +mria_delete_route(_Unified = true, Topic, Dest) -> + mria_delete_route_unified(Topic, Dest); +mria_delete_route(_Unified = false, Topic, Dest) -> Route = #route{topic = Topic, dest = Dest}, case emqx_topic:wildcard(Topic) of true -> - Fun = fun emqx_router_utils:delete_trie_route/2, - emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD); + mria_delete_route_update_trie(Route); false -> - emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route) + mria_delete_route(Route) end. +mria_delete_route_unified(Topic, Dest) -> + K = emqx_topic_index:make_key(Topic, Dest), + mria:dirty_delete(?ROUTE_TAB_UNIFIED, K). + +mria_delete_route_update_trie(Route) -> + emqx_router_utils:maybe_trans( + fun emqx_router_utils:delete_trie_route/2, + [?ROUTE_TAB, Route], + ?ROUTE_SHARD + ). + +mria_delete_route(Route) -> + mria:dirty_delete_object(?ROUTE_TAB, Route). + +-spec is_unified_table_active() -> boolean(). +is_unified_table_active() -> + is_empty(?ROUTE_TAB) andalso + ((not is_empty(?ROUTE_TAB_UNIFIED)) orelse + emqx_config:get([broker, unified_routing_table])). + +is_empty(Tab) -> + % NOTE + % Supposedly, should be better than `ets:info(Tab, size)` because the latter suffers + % from `{decentralized_counters, true}` which is default when `write_concurrency` is + % either `auto` or `true`. + ets:first(Tab) =:= '$end_of_table'. + -spec topics() -> list(emqx_types:topic()). topics() -> + topics(is_unified_table_active()). + +topics(_Unified = true) -> + Pat = #routeidx{entry = '$1'}, + [emqx_topic_index:get_topic(K) || [K] <- ets:match(?ROUTE_TAB_UNIFIED, Pat)]; +topics(_Unified = false) -> mnesia:dirty_all_keys(?ROUTE_TAB). %% @doc Print routes to a topic @@ -207,23 +334,63 @@ print_routes(Topic) -> -spec cleanup_routes(node()) -> ok. cleanup_routes(Node) -> + case is_unified_table_active() of + true -> + cleanup_routes_unified(Node); + false -> + cleanup_routes_regular(Node) + end. + +cleanup_routes_unified(Node) -> + % NOTE + % No point in transaction here because all the operations on unified routing table + % are dirty. + ets:foldl( + fun(#routeidx{entry = K}, ok) -> + case emqx_topic_index:get_id(K) of + Node -> + mria:dirty_delete(?ROUTE_TAB_UNIFIED, K); + _ -> + ok + end + end, + ok, + ?ROUTE_TAB_UNIFIED + ). + +cleanup_routes_regular(Node) -> Patterns = [ #route{_ = '_', dest = Node}, #route{_ = '_', dest = {'_', Node}} ], - [ - mnesia:delete_object(?ROUTE_TAB, Route, write) - || Pat <- Patterns, - Route <- mnesia:match_object(?ROUTE_TAB, Pat, write) - ]. + mria:transaction(?ROUTE_SHARD, fun() -> + [ + mnesia:delete_object(?ROUTE_TAB, Route, write) + || Pat <- Patterns, + Route <- mnesia:match_object(?ROUTE_TAB, Pat, write) + ] + end). -spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. foldl_routes(FoldFun, AccIn) -> - ets:foldl(FoldFun, AccIn, ?ROUTE_TAB). + case is_unified_table_active() of + true -> + ets:foldl(mk_fold_fun_unified(FoldFun), AccIn, ?ROUTE_TAB_UNIFIED); + false -> + ets:foldl(FoldFun, AccIn, ?ROUTE_TAB) + end. -spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. foldr_routes(FoldFun, AccIn) -> - ets:foldr(FoldFun, AccIn, ?ROUTE_TAB). + case is_unified_table_active() of + true -> + ets:foldr(mk_fold_fun_unified(FoldFun), AccIn, ?ROUTE_TAB_UNIFIED); + false -> + ets:foldr(FoldFun, AccIn, ?ROUTE_TAB) + end. + +mk_fold_fun_unified(FoldFun) -> + fun(#routeidx{entry = K}, Acc) -> FoldFun(match_to_route(K), Acc) end. call(Router, Msg) -> gen_server:call(Router, Msg, infinity). diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index 61573fcff..77f1cd11b 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -148,11 +148,13 @@ handle_info({mnesia_table_event, Event}, State) -> handle_info({nodedown, Node}, State = #{nodes := Nodes}) -> case mria_rlog:role() of core -> + % TODO + % Node may flap, do we need to wait for any pending cleanups in `init/1` + % on the flapping node? + % This also implies changing lock id to `{?LOCK, Node}`. global:trans( {?LOCK, self()}, - fun() -> - mria:transaction(?ROUTE_SHARD, fun ?MODULE:cleanup_routes/1, [Node]) - end + fun() -> cleanup_routes(Node) end ), ok = mria:dirty_delete(?ROUTING_NODE, Node); replicant -> diff --git a/apps/emqx/src/emqx_topic_index.erl b/apps/emqx/src/emqx_topic_index.erl index 0b153ac01..5db8ce7cc 100644 --- a/apps/emqx/src/emqx_topic_index.erl +++ b/apps/emqx/src/emqx_topic_index.erl @@ -24,6 +24,8 @@ -export([match/2]). -export([matches/3]). +-export([make_key/2]). + -export([get_id/1]). -export([get_topic/1]). -export([get_record/2]). @@ -42,14 +44,18 @@ new() -> %% between regular and "materialized" indexes, for example. -spec insert(emqx_types:topic(), _ID, _Record, ets:table()) -> true. insert(Filter, ID, Record, Tab) -> - Key = key(Filter, ID), + Key = make_key(Filter, ID), true = ets:insert(Tab, {Key, Record}). %% @doc Delete an entry from the index that associates given topic filter to given %% record ID. Deleting non-existing entry is not an error. -spec delete(emqx_types:topic(), _ID, ets:table()) -> true. delete(Filter, ID, Tab) -> - true = ets:delete(Tab, key(Filter, ID)). + ets:delete(Tab, make_key(Filter, ID)). + +-spec make_key(emqx_types:topic(), ID) -> key(ID). +make_key(TopicOrFilter, ID) -> + emqx_trie_search:make_key(TopicOrFilter, ID). %% @doc Match given topic against the index and return the first match, or `false` if %% no match is found. @@ -84,8 +90,5 @@ get_record(K, Tab) -> [] end. -key(TopicOrFilter, ID) -> - emqx_trie_search:make_key(TopicOrFilter, ID). - make_nextf(Tab) -> fun(Key) -> ets:next(Tab, Key) end. diff --git a/apps/emqx/test/emqx_router_SUITE.erl b/apps/emqx/test/emqx_router_SUITE.erl index 453f86257..04029e822 100644 --- a/apps/emqx/test/emqx_router_SUITE.erl +++ b/apps/emqx/test/emqx_router_SUITE.erl @@ -26,24 +26,37 @@ -define(R, emqx_router). -all() -> emqx_common_test_helpers:all(?MODULE). - -init_per_suite(Config) -> - PrevBootModules = application:get_env(emqx, boot_modules), - emqx_common_test_helpers:boot_modules([router]), - emqx_common_test_helpers:start_apps([]), +all() -> [ - {prev_boot_modules, PrevBootModules} - | Config + {group, routing_table_regular}, + {group, routing_table_unified} ]. -end_per_suite(Config) -> - PrevBootModules = ?config(prev_boot_modules, Config), - case PrevBootModules of - undefined -> ok; - {ok, Mods} -> emqx_common_test_helpers:boot_modules(Mods) - end, - emqx_common_test_helpers:stop_apps([]). +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + [ + {routing_table_regular, [], TCs}, + {routing_table_unified, [], TCs} + ]. + +init_per_group(GroupName, Config) -> + WorkDir = filename:join([?config(priv_dir, Config), GroupName]), + AppSpecs = [ + {emqx, #{ + config => mk_config(GroupName), + override_env => [{boot_modules, [router]}] + }} + ], + Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}), + [{group_apps, Apps} | Config]. + +end_per_group(_GroupName, Config) -> + ok = emqx_cth_suite:stop(?config(group_apps, Config)). + +mk_config(routing_table_regular) -> + "broker.unified_routing_table = false"; +mk_config(routing_table_unified) -> + "broker.unified_routing_table = true". init_per_testcase(_TestCase, Config) -> clear_tables(), @@ -177,5 +190,5 @@ t_unexpected(_) -> clear_tables() -> lists:foreach( fun mnesia:clear_table/1, - [?ROUTE_TAB, ?TRIE, emqx_trie_node] + [?ROUTE_TAB, ?ROUTE_TAB_UNIFIED, ?TRIE] ). diff --git a/apps/emqx/test/emqx_router_helper_SUITE.erl b/apps/emqx/test/emqx_router_helper_SUITE.erl index c0796288e..12a3a34dd 100644 --- a/apps/emqx/test/emqx_router_helper_SUITE.erl +++ b/apps/emqx/test/emqx_router_helper_SUITE.erl @@ -26,32 +26,38 @@ -define(ROUTER_HELPER, emqx_router_helper). -all() -> emqx_common_test_helpers:all(?MODULE). +all() -> + [ + {group, routing_table_regular}, + {group, routing_table_unified} + ]. -init_per_suite(Config) -> - DistPid = - case net_kernel:nodename() of - ignored -> - %% calling `net_kernel:start' without `epmd' - %% running will result in a failure. - emqx_common_test_helpers:start_epmd(), - {ok, Pid} = net_kernel:start(['test@127.0.0.1', longnames]), - Pid; - _ -> - undefined - end, - emqx_common_test_helpers:start_apps([]), - [{dist_pid, DistPid} | Config]. +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + [ + {routing_table_regular, [], TCs}, + {routing_table_unified, [], TCs} + ]. -end_per_suite(Config) -> - DistPid = ?config(dist_pid, Config), - case DistPid of - Pid when is_pid(Pid) -> - net_kernel:stop(); - _ -> - ok - end, - emqx_common_test_helpers:stop_apps([]). +init_per_group(GroupName, Config) -> + WorkDir = filename:join([?config(priv_dir, Config), GroupName]), + AppSpecs = [{emqx, mk_config(GroupName)}], + Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}), + [{group_name, GroupName}, {group_apps, Apps} | Config]. + +end_per_group(_GroupName, Config) -> + ok = emqx_cth_suite:stop(?config(group_apps, Config)). + +mk_config(routing_table_regular) -> + #{ + config => "broker.unified_routing_table = false", + override_env => [{boot_modules, [router]}] + }; +mk_config(routing_table_unified) -> + #{ + config => "broker.unified_routing_table = true", + override_env => [{boot_modules, [router]}] + }. init_per_testcase(TestCase, Config) when TestCase =:= t_cleanup_membership_mnesia_down; @@ -59,7 +65,16 @@ init_per_testcase(TestCase, Config) when TestCase =:= t_cleanup_monitor_node_down -> ok = snabbkaffe:start_trace(), - Slave = emqx_common_test_helpers:start_slave(some_node, []), + WorkDir = filename:join([?config(priv_dir, Config), ?config(group_name, Config), TestCase]), + [Slave] = emqx_cth_cluster:start( + [ + {?MODULE, #{ + apps => [{emqx, mk_config(?config(group_name, Config))}], + join_to => node() + }} + ], + #{work_dir => WorkDir} + ), [{slave, Slave} | Config]; init_per_testcase(_TestCase, Config) -> Config. @@ -70,9 +85,8 @@ end_per_testcase(TestCase, Config) when TestCase =:= t_cleanup_monitor_node_down -> Slave = ?config(slave, Config), - emqx_common_test_helpers:stop_slave(Slave), - mria:clear_table(?ROUTE_TAB), - snabbkaffe:stop(), + ok = emqx_cth_cluster:stop([Slave]), + ok = snabbkaffe:stop(), ok; end_per_testcase(_TestCase, _Config) -> ok. From 5d51687dbf3e554ac3125c3de5280d29ca4591ca Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 25 Aug 2023 14:49:50 +0400 Subject: [PATCH 06/26] feat(router): add unified routing table config option --- apps/emqx/src/emqx_schema.erl | 9 +++++++++ rel/i18n/emqx_schema.hocon | 6 ++++++ 2 files changed, 15 insertions(+) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index ff1446b6d..7268a7e59 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1358,6 +1358,15 @@ fields("broker") -> ref("broker_perf"), #{importance => ?IMPORTANCE_HIDDEN} )}, + {"unified_routing_table", + sc( + boolean(), + #{ + default => false, + importance => ?IMPORTANCE_HIDDEN, + desc => ?DESC(broker_unified_routing_table) + } + )}, %% FIXME: Need new design for shared subscription group {"shared_subscription_group", sc( diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 6c21b31ea..c63b248f4 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1549,6 +1549,12 @@ fields_ws_opts_max_frame_size.label: sys_event_messages.desc: """Client events messages.""" +broker_unified_routing_table.desc: +"""Enable unified routing table. +Unified routing table should increase both subscription and routing performance at the cost of slight increase in memory consumption per subscription. +NOTE: This is an experimental feature. +NOTE: Full non-rolling cluster restart is needed after enabling or disabling this option for it to take any effect.""" + broker_perf_trie_compaction.desc: """Enable trie path compaction. Enabling it significantly improves wildcard topic subscribe rate, if wildcard topics have unique prefixes like: 'sensor/{{id}}/+/', where ID is unique per subscriber. From e85789306b6b7677f06ea70e98ce42ec3ca2e37f Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 25 Aug 2023 15:43:15 +0400 Subject: [PATCH 07/26] chore(router): drop test-only `has_routes/1` Seems that it's too much support work only for test purposes, where `lookup_routes/1` is nearly as usable. --- apps/emqx/src/emqx_router.erl | 23 +------------------ .../test/emqx_quic_multistreams_SUITE.erl | 2 +- apps/emqx/test/emqx_router_SUITE.erl | 4 ++-- apps/emqx/test/emqx_shared_sub_SUITE.erl | 4 ++-- .../emqx_bridge_kafka_impl_consumer_SUITE.erl | 2 +- 5 files changed, 7 insertions(+), 28 deletions(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index d286af1d7..6859b98f3 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -50,7 +50,7 @@ -export([ match_routes/1, lookup_routes/1, - has_routes/1 + has_route/2 ]). -export([print_routes/1]). @@ -222,27 +222,6 @@ lookup_routes_regular(Topic) -> match_to_route(M) -> #route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}. --spec has_routes(emqx_types:topic()) -> boolean(). -has_routes(Topic) when is_binary(Topic) -> - case is_unified_table_active() of - true -> - has_routes_unified(Topic); - false -> - has_routes_regular(Topic) - end. - -has_routes_unified(Topic) -> - Pat = #routeidx{entry = emqx_topic_index:mk_key(Topic, '$1'), _ = '_'}, - case ets:match(?ROUTE_TAB_UNIFIED, Pat, 1) of - {[_], _} -> - true; - _ -> - false - end. - -has_routes_regular(Topic) -> - ets:member(?ROUTE_TAB, Topic). - -spec has_route(emqx_types:topic(), dest()) -> boolean(). has_route(Topic, Dest) -> case is_unified_table_active() of diff --git a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl index 1b45cb669..267782ff9 100644 --- a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl +++ b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl @@ -1094,7 +1094,7 @@ t_multi_streams_unsub(Config) -> ?retry( _Sleep2 = 100, _Attempts2 = 50, - false = emqx_router:has_routes(Topic) + [] = emqx_router:lookup_routes(Topic) ), case emqtt:publish_via(C, PubVia, Topic, #{}, <<6, 7, 8, 9>>, [{qos, PubQos}]) of diff --git a/apps/emqx/test/emqx_router_SUITE.erl b/apps/emqx/test/emqx_router_SUITE.erl index 04029e822..7dec039c8 100644 --- a/apps/emqx/test/emqx_router_SUITE.erl +++ b/apps/emqx/test/emqx_router_SUITE.erl @@ -176,9 +176,9 @@ t_print_routes(_) -> ?R:add_route(<<"+/+">>), ?R:print_routes(<<"a/b">>). -t_has_routes(_) -> +t_has_route(_) -> ?R:add_route(<<"devices/+/messages">>, node()), - ?assert(?R:has_routes(<<"devices/+/messages">>)), + ?assert(?R:has_route(<<"devices/+/messages">>, node())), ?R:delete_route(<<"devices/+/messages">>). t_unexpected(_) -> diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 6439981f6..7a7729878 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -1054,7 +1054,7 @@ t_queue_subscription(Config) when is_list(Config) -> begin ct:pal("routes: ~p", [ets:tab2list(emqx_route)]), %% FIXME: should ensure we have 2 subscriptions - true = emqx_router:has_routes(Topic) + [_] = emqx_router:lookup_routes(Topic) end ), @@ -1081,7 +1081,7 @@ t_queue_subscription(Config) when is_list(Config) -> %% _Attempts0 = 50, %% begin %% ct:pal("routes: ~p", [ets:tab2list(emqx_route)]), - %% false = emqx_router:has_routes(Topic) + %% [] = emqx_router:lookup_routes(Topic) %% end %% ), ct:sleep(500), diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 756fa8b3b..382e38fb7 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -1928,7 +1928,7 @@ t_node_joins_existing_cluster(Config) -> ?retry( _Sleep2 = 100, _Attempts2 = 50, - true = erpc:call(N2, emqx_router, has_routes, [MQTTTopic]) + [] =/= erpc:call(N2, emqx_router, lookup_routes, [MQTTTopic]) ), {ok, SRef1} = snabbkaffe:subscribe( From 8d2ebdea7e1810109c76a2f7d89ddd3297c3298a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 25 Aug 2023 20:01:48 +0400 Subject: [PATCH 08/26] fix(router): generalize config option and make effects visible --- apps/emqx/src/emqx_app.erl | 4 +- apps/emqx/src/emqx_router.erl | 129 +++++++++++++------- apps/emqx/src/emqx_router_sup.erl | 2 + apps/emqx/src/emqx_schema.erl | 8 +- apps/emqx/test/emqx_router_SUITE.erl | 14 ++- apps/emqx/test/emqx_router_helper_SUITE.erl | 4 +- rel/i18n/emqx_schema.hocon | 4 +- 7 files changed, 111 insertions(+), 54 deletions(-) diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index 59a397836..4fae556ca 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -55,7 +55,9 @@ prep_stop(_State) -> emqx_boot:is_enabled(listeners) andalso emqx_listeners:stop(). -stop(_State) -> ok. +stop(_State) -> + ok = emqx_router:deinit_table_type(), + ok. -define(CONFIG_LOADER, config_loader). -define(DEFAULT_LOADER, emqx). diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 6859b98f3..7a8d88a55 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -72,8 +72,11 @@ code_change/3 ]). -%% test / debugging purposes --export([is_unified_table_active/0]). +-export([ + get_table_type/0, + init_table_type/0, + deinit_table_type/0 +]). -type group() :: binary(). @@ -155,12 +158,12 @@ do_add_route(Topic, Dest) when is_binary(Topic) -> ok; false -> ok = emqx_router_helper:monitor(Dest), - mria_insert_route(is_unified_table_active(), Topic, Dest) + mria_insert_route(get_table_type(), Topic, Dest) end. -mria_insert_route(_Unified = true, Topic, Dest) -> +mria_insert_route(unified, Topic, Dest) -> mria_insert_route_unified(Topic, Dest); -mria_insert_route(_Unified = false, Topic, Dest) -> +mria_insert_route(regular, Topic, Dest) -> Route = #route{topic = Topic, dest = Dest}, case emqx_topic:wildcard(Topic) of true -> @@ -186,11 +189,11 @@ mria_insert_route(Route) -> %% @doc Match routes -spec match_routes(emqx_types:topic()) -> [emqx_types:route()]. match_routes(Topic) when is_binary(Topic) -> - match_routes(is_unified_table_active(), Topic). + match_routes(get_table_type(), Topic). -match_routes(_Unified = true, Topic) -> +match_routes(unified, Topic) -> [match_to_route(M) || M <- match_unified(Topic)]; -match_routes(_Unified = false, Topic) -> +match_routes(regular, Topic) -> lookup_routes_regular(Topic) ++ lists:flatmap(fun lookup_routes_regular/1, match_global_trie(Topic)). @@ -205,10 +208,10 @@ match_global_trie(Topic) -> -spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()]. lookup_routes(Topic) -> - case is_unified_table_active() of - true -> + case get_table_type() of + unified -> lookup_routes_unified(Topic); - false -> + regular -> lookup_routes_regular(Topic) end. @@ -224,10 +227,10 @@ match_to_route(M) -> -spec has_route(emqx_types:topic(), dest()) -> boolean(). has_route(Topic, Dest) -> - case is_unified_table_active() of - true -> + case get_table_type() of + unified -> has_route_unified(Topic, Dest); - false -> + regular -> has_route_regular(Topic, Dest) end. @@ -251,11 +254,11 @@ do_delete_route(Topic) when is_binary(Topic) -> -spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}. do_delete_route(Topic, Dest) -> - mria_delete_route(is_unified_table_active(), Topic, Dest). + mria_delete_route(get_table_type(), Topic, Dest). -mria_delete_route(_Unified = true, Topic, Dest) -> +mria_delete_route(unified, Topic, Dest) -> mria_delete_route_unified(Topic, Dest); -mria_delete_route(_Unified = false, Topic, Dest) -> +mria_delete_route(regular, Topic, Dest) -> Route = #route{topic = Topic, dest = Dest}, case emqx_topic:wildcard(Topic) of true -> @@ -278,27 +281,14 @@ mria_delete_route_update_trie(Route) -> mria_delete_route(Route) -> mria:dirty_delete_object(?ROUTE_TAB, Route). --spec is_unified_table_active() -> boolean(). -is_unified_table_active() -> - is_empty(?ROUTE_TAB) andalso - ((not is_empty(?ROUTE_TAB_UNIFIED)) orelse - emqx_config:get([broker, unified_routing_table])). - -is_empty(Tab) -> - % NOTE - % Supposedly, should be better than `ets:info(Tab, size)` because the latter suffers - % from `{decentralized_counters, true}` which is default when `write_concurrency` is - % either `auto` or `true`. - ets:first(Tab) =:= '$end_of_table'. - -spec topics() -> list(emqx_types:topic()). topics() -> - topics(is_unified_table_active()). + topics(get_table_type()). -topics(_Unified = true) -> +topics(unified) -> Pat = #routeidx{entry = '$1'}, [emqx_topic_index:get_topic(K) || [K] <- ets:match(?ROUTE_TAB_UNIFIED, Pat)]; -topics(_Unified = false) -> +topics(regular) -> mnesia:dirty_all_keys(?ROUTE_TAB). %% @doc Print routes to a topic @@ -313,10 +303,10 @@ print_routes(Topic) -> -spec cleanup_routes(node()) -> ok. cleanup_routes(Node) -> - case is_unified_table_active() of - true -> + case get_table_type() of + unified -> cleanup_routes_unified(Node); - false -> + regular -> cleanup_routes_regular(Node) end. @@ -352,19 +342,19 @@ cleanup_routes_regular(Node) -> -spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. foldl_routes(FoldFun, AccIn) -> - case is_unified_table_active() of - true -> + case get_table_type() of + unified -> ets:foldl(mk_fold_fun_unified(FoldFun), AccIn, ?ROUTE_TAB_UNIFIED); - false -> + regular -> ets:foldl(FoldFun, AccIn, ?ROUTE_TAB) end. -spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. foldr_routes(FoldFun, AccIn) -> - case is_unified_table_active() of - true -> + case get_table_type() of + unified -> ets:foldr(mk_fold_fun_unified(FoldFun), AccIn, ?ROUTE_TAB_UNIFIED); - false -> + regular -> ets:foldr(FoldFun, AccIn, ?ROUTE_TAB) end. @@ -377,6 +367,61 @@ call(Router, Msg) -> pick(Topic) -> gproc_pool:pick_worker(router_pool, Topic). +%%-------------------------------------------------------------------- +%% Routing table type +%% -------------------------------------------------------------------- + +-define(PT_TABLE_TYPE, {?MODULE, tabtype}). + +-type tabtype() :: regular | unified. + +-spec get_table_type() -> tabtype(). +get_table_type() -> + persistent_term:get(?PT_TABLE_TYPE). + +-spec init_table_type() -> ok. +init_table_type() -> + ConfType = emqx_config:get([broker, routing_table_type]), + Type = choose_table_type(ConfType), + ok = persistent_term:put(?PT_TABLE_TYPE, Type), + case Type of + ConfType -> + ?SLOG(info, #{ + msg => "routing_table_type_used", + type => Type + }); + _ -> + ?SLOG(notice, #{ + msg => "configured_routing_table_type_unacceptable", + type => Type, + configured => ConfType, + reason => + "Could not use configured routing table type because " + "there's already non-empty routing table of another type." + }) + end. + +-spec deinit_table_type() -> ok. +deinit_table_type() -> + _ = persistent_term:erase(?PT_TABLE_TYPE), + ok. + +-spec choose_table_type(tabtype()) -> tabtype(). +choose_table_type(ConfType) -> + IsEmptyRegular = is_empty(?ROUTE_TAB), + IsEmptyUnified = is_empty(?ROUTE_TAB_UNIFIED), + case {IsEmptyRegular, IsEmptyUnified} of + {true, true} -> + ConfType; + {false, true} -> + regular; + {true, false} -> + unified + end. + +is_empty(Tab) -> + ets:first(Tab) =:= '$end_of_table'. + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_router_sup.erl b/apps/emqx/src/emqx_router_sup.erl index 0fa48d9d2..398edc321 100644 --- a/apps/emqx/src/emqx_router_sup.erl +++ b/apps/emqx/src/emqx_router_sup.erl @@ -23,6 +23,8 @@ -export([init/1]). start_link() -> + %% Init and log routing table type + ok = emqx_router:init_table_type(), supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 7268a7e59..818688d86 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1358,13 +1358,13 @@ fields("broker") -> ref("broker_perf"), #{importance => ?IMPORTANCE_HIDDEN} )}, - {"unified_routing_table", + {"routing_table_type", sc( - boolean(), + hoconsc:enum([regular, unified]), #{ - default => false, + default => regular, importance => ?IMPORTANCE_HIDDEN, - desc => ?DESC(broker_unified_routing_table) + desc => ?DESC(broker_routing_table_type) } )}, %% FIXME: Need new design for shared subscription group diff --git a/apps/emqx/test/emqx_router_SUITE.erl b/apps/emqx/test/emqx_router_SUITE.erl index 7dec039c8..f9e1c6998 100644 --- a/apps/emqx/test/emqx_router_SUITE.erl +++ b/apps/emqx/test/emqx_router_SUITE.erl @@ -48,15 +48,15 @@ init_per_group(GroupName, Config) -> }} ], Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}), - [{group_apps, Apps} | Config]. + [{group_apps, Apps}, {group_name, GroupName} | Config]. end_per_group(_GroupName, Config) -> ok = emqx_cth_suite:stop(?config(group_apps, Config)). mk_config(routing_table_regular) -> - "broker.unified_routing_table = false"; + "broker.routing_table_type = regular"; mk_config(routing_table_unified) -> - "broker.unified_routing_table = true". + "broker.routing_table_type = unified". init_per_testcase(_TestCase, Config) -> clear_tables(), @@ -83,6 +83,14 @@ end_per_testcase(_TestCase, _Config) -> % t_topics(_) -> % error('TODO'). +t_verify_type(Config) -> + case ?config(group_name, Config) of + routing_table_regular -> + ?assertEqual(regular, ?R:get_table_type()); + routing_table_unified -> + ?assertEqual(unified, ?R:get_table_type()) + end. + t_add_delete(_) -> ?R:add_route(<<"a/b/c">>), ?R:add_route(<<"a/b/c">>, node()), diff --git a/apps/emqx/test/emqx_router_helper_SUITE.erl b/apps/emqx/test/emqx_router_helper_SUITE.erl index 12a3a34dd..26b9d8ddd 100644 --- a/apps/emqx/test/emqx_router_helper_SUITE.erl +++ b/apps/emqx/test/emqx_router_helper_SUITE.erl @@ -50,12 +50,12 @@ end_per_group(_GroupName, Config) -> mk_config(routing_table_regular) -> #{ - config => "broker.unified_routing_table = false", + config => "broker.routing_table_type = regular", override_env => [{boot_modules, [router]}] }; mk_config(routing_table_unified) -> #{ - config => "broker.unified_routing_table = true", + config => "broker.routing_table_type = unified", override_env => [{boot_modules, [router]}] }. diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index c63b248f4..d90fab47b 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1549,8 +1549,8 @@ fields_ws_opts_max_frame_size.label: sys_event_messages.desc: """Client events messages.""" -broker_unified_routing_table.desc: -"""Enable unified routing table. +broker_routing_table_type.desc: +"""Routing table type. Unified routing table should increase both subscription and routing performance at the cost of slight increase in memory consumption per subscription. NOTE: This is an experimental feature. NOTE: Full non-rolling cluster restart is needed after enabling or disabling this option for it to take any effect.""" From 2d931a05121c9bfd710c2d3832735154f5f4ab55 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 8 Aug 2023 20:06:21 +0400 Subject: [PATCH 09/26] test(evict): do not disable router module in cluster nodes --- .../test/emqx_eviction_agent_test_helpers.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl index 7425cb145..17c291eb6 100644 --- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl +++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl @@ -81,7 +81,7 @@ start_cluster(NamesWithPorts, Apps, Env) -> NamesWithPorts ), Opts0 = [ - {env, [{emqx, boot_modules, [broker, listeners]}] ++ Env}, + {env, Env}, {apps, Apps}, {conf, [{[listeners, Proto, default, enable], false} || Proto <- [ssl, ws, wss]] ++ From 270fd107b296700f38c6600f27a71e661633d8a1 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 29 Aug 2023 17:31:26 +0400 Subject: [PATCH 10/26] feat(triesearch): allow reusing filter tokenization result As a kind of micro-optimization. --- apps/emqx/src/emqx_topic_gbt.erl | 8 +++--- apps/emqx/src/emqx_topic_index.erl | 7 ++--- apps/emqx/src/emqx_trie_search.erl | 23 ++++++++++++----- apps/emqx/test/emqx_topic_index_SUITE.erl | 11 ++++++++ apps/emqx/test/emqx_trie_search_tests.erl | 31 +++++++++++++++++------ 5 files changed, 58 insertions(+), 22 deletions(-) diff --git a/apps/emqx/src/emqx_topic_gbt.erl b/apps/emqx/src/emqx_topic_gbt.erl index dade70cee..063cba21d 100644 --- a/apps/emqx/src/emqx_topic_gbt.erl +++ b/apps/emqx/src/emqx_topic_gbt.erl @@ -29,8 +29,8 @@ -export([get_topic/1]). -export([get_record/2]). --type word() :: binary() | '+' | '#'. --type key(ID) :: {[word()], {ID}}. +-type key(ID) :: emqx_trie_search:key(ID). +-type words() :: emqx_trie_search:words(). -type match(ID) :: key(ID). -type name() :: any(). @@ -50,7 +50,7 @@ new(Name) -> %% @doc Insert a new entry into the index that associates given topic filter to given %% record ID, and attaches arbitrary record to the entry. This allows users to choose %% between regular and "materialized" indexes, for example. --spec insert(emqx_types:topic(), _ID, _Record, name()) -> true. +-spec insert(emqx_types:topic() | words(), _ID, _Record, name()) -> true. insert(Filter, ID, Record, Name) -> Tree = gbt(Name), Key = key(Filter, ID), @@ -59,7 +59,7 @@ insert(Filter, ID, Record, Name) -> %% @doc Delete an entry from the index that associates given topic filter to given %% record ID. Deleting non-existing entry is not an error. --spec delete(emqx_types:topic(), _ID, name()) -> true. +-spec delete(emqx_types:topic() | words(), _ID, name()) -> true. delete(Filter, ID, Name) -> Tree = gbt(Name), Key = key(Filter, ID), diff --git a/apps/emqx/src/emqx_topic_index.erl b/apps/emqx/src/emqx_topic_index.erl index 5db8ce7cc..59dfdfeab 100644 --- a/apps/emqx/src/emqx_topic_index.erl +++ b/apps/emqx/src/emqx_topic_index.erl @@ -32,6 +32,7 @@ -type key(ID) :: emqx_trie_search:key(ID). -type match(ID) :: key(ID). +-type words() :: emqx_trie_search:words(). %% @doc Create a new ETS table suitable for topic index. %% Usable mostly for testing purposes. @@ -42,18 +43,18 @@ new() -> %% @doc Insert a new entry into the index that associates given topic filter to given %% record ID, and attaches arbitrary record to the entry. This allows users to choose %% between regular and "materialized" indexes, for example. --spec insert(emqx_types:topic(), _ID, _Record, ets:table()) -> true. +-spec insert(emqx_types:topic() | words(), _ID, _Record, ets:table()) -> true. insert(Filter, ID, Record, Tab) -> Key = make_key(Filter, ID), true = ets:insert(Tab, {Key, Record}). %% @doc Delete an entry from the index that associates given topic filter to given %% record ID. Deleting non-existing entry is not an error. --spec delete(emqx_types:topic(), _ID, ets:table()) -> true. +-spec delete(emqx_types:topic() | words(), _ID, ets:table()) -> true. delete(Filter, ID, Tab) -> ets:delete(Tab, make_key(Filter, ID)). --spec make_key(emqx_types:topic(), ID) -> key(ID). +-spec make_key(emqx_types:topic() | words(), ID) -> key(ID). make_key(TopicOrFilter, ID) -> emqx_trie_search:make_key(TopicOrFilter, ID). diff --git a/apps/emqx/src/emqx_trie_search.erl b/apps/emqx/src/emqx_trie_search.erl index b774e1459..c8c088b58 100644 --- a/apps/emqx/src/emqx_trie_search.erl +++ b/apps/emqx/src/emqx_trie_search.erl @@ -98,24 +98,24 @@ -module(emqx_trie_search). --export([make_key/2]). +-export([make_key/2, filter/1]). -export([match/2, matches/3, get_id/1, get_topic/1]). --export_type([key/1, word/0, nextf/0, opts/0]). +-export_type([key/1, word/0, words/0, nextf/0, opts/0]). -define(END, '$end_of_table'). -type word() :: binary() | '+' | '#'. +-type words() :: [word()]. -type base_key() :: {binary() | [word()], {}}. -type key(ID) :: {binary() | [word()], {ID}}. -type nextf() :: fun((key(_) | base_key()) -> ?END | key(_)). -type opts() :: [unique | return_first]. %% @doc Make a search-key for the given topic. --spec make_key(emqx_types:topic(), ID) -> key(ID). +-spec make_key(emqx_types:topic() | words(), ID) -> key(ID). make_key(Topic, ID) when is_binary(Topic) -> - Words = filter_words(Topic), - case emqx_topic:wildcard(Words) of - true -> + case filter(Topic) of + Words when is_list(Words) -> %% it's a wildcard {Words, {ID}}; false -> @@ -123,7 +123,15 @@ make_key(Topic, ID) when is_binary(Topic) -> %% because they can be found with direct lookups. %% it is also more compact in memory. {Topic, {ID}} - end. + end; +make_key(Words, ID) when is_list(Words) -> + {Words, {ID}}. + +%% @doc Parse a topic filter into a list of words. Returns `false` if it's not a filter. +-spec filter(emqx_types:topic()) -> words() | false. +filter(Topic) -> + Words = filter_words(Topic), + emqx_topic:wildcard(Words) andalso Words. %% @doc Extract record ID from the match. -spec get_id(key(ID)) -> ID. @@ -325,6 +333,7 @@ filter_words(Topic) when is_binary(Topic) -> % `match_filter/3` expects. [word(W, filter) || W <- emqx_topic:tokens(Topic)]. +-spec topic_words(emqx_types:topic()) -> [binary()]. topic_words(Topic) when is_binary(Topic) -> [word(W, topic) || W <- emqx_topic:tokens(Topic)]. diff --git a/apps/emqx/test/emqx_topic_index_SUITE.erl b/apps/emqx/test/emqx_topic_index_SUITE.erl index 08056a16f..9df9743f1 100644 --- a/apps/emqx/test/emqx_topic_index_SUITE.erl +++ b/apps/emqx/test/emqx_topic_index_SUITE.erl @@ -57,6 +57,17 @@ t_insert(Config) -> ?assertEqual(<<"sensor/#">>, topic(match(M, <<"sensor">>, Tab))), ?assertEqual(t_insert_3, id(match(M, <<"sensor">>, Tab))). +t_insert_filter(Config) -> + M = get_module(Config), + Tab = M:new(), + Topic = <<"sensor/+/metric//#">>, + true = M:insert(Topic, 1, <<>>, Tab), + true = M:insert(emqx_trie_search:filter(Topic), 2, <<>>, Tab), + ?assertEqual( + [Topic, Topic], + [topic(X) || X <- matches(M, <<"sensor/1/metric//2">>, Tab)] + ). + t_match(Config) -> M = get_module(Config), Tab = M:new(), diff --git a/apps/emqx/test/emqx_trie_search_tests.erl b/apps/emqx/test/emqx_trie_search_tests.erl index 75994131d..d78347de6 100644 --- a/apps/emqx/test/emqx_trie_search_tests.erl +++ b/apps/emqx/test/emqx_trie_search_tests.erl @@ -18,15 +18,30 @@ -include_lib("eunit/include/eunit.hrl"). -topic_validation_test() -> +-import(emqx_trie_search, [filter/1]). + +filter_test_() -> + [ + ?_assertEqual( + [<<"sensor">>, '+', <<"metric">>, <<>>, '#'], + filter(<<"sensor/+/metric//#">>) + ), + ?_assertEqual( + false, + filter(<<"sensor/1/metric//42">>) + ) + ]. + +topic_validation_test_() -> NextF = fun(_) -> '$end_of_table' end, Call = fun(Topic) -> emqx_trie_search:match(Topic, NextF) end, - ?assertError(badarg, Call(<<"+">>)), - ?assertError(badarg, Call(<<"#">>)), - ?assertError(badarg, Call(<<"a/+/b">>)), - ?assertError(badarg, Call(<<"a/b/#">>)), - ?assertEqual(false, Call(<<"a/b/b+">>)), - ?assertEqual(false, Call(<<"a/b/c#">>)), - ok. + [ + ?_assertError(badarg, Call(<<"+">>)), + ?_assertError(badarg, Call(<<"#">>)), + ?_assertError(badarg, Call(<<"a/+/b">>)), + ?_assertError(badarg, Call(<<"a/b/#">>)), + ?_assertEqual(false, Call(<<"a/b/b+">>)), + ?_assertEqual(false, Call(<<"a/b/c#">>)) + ]. From 063d6200c8db7fe3394f050472c6a1a60dc6bf7e Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 29 Aug 2023 17:41:59 +0400 Subject: [PATCH 11/26] feat(router): enable using 2 tables to store routes Instead of a single unified table, to reap the benefits of cheap `ets:lookup/2` per regular topic subscription route. Change configuration option naming to reflect the change: user now has an ability to choose _storage schema_. --- apps/emqx/include/emqx_router.hrl | 2 +- apps/emqx/src/emqx_app.erl | 2 +- apps/emqx/src/emqx_router.erl | 427 ++++++++++++-------- apps/emqx/src/emqx_router_sup.erl | 2 +- apps/emqx/src/emqx_schema.erl | 21 +- apps/emqx/test/emqx_router_SUITE.erl | 26 +- apps/emqx/test/emqx_router_helper_SUITE.erl | 16 +- rel/i18n/emqx_schema.hocon | 11 +- 8 files changed, 298 insertions(+), 209 deletions(-) diff --git a/apps/emqx/include/emqx_router.hrl b/apps/emqx/include/emqx_router.hrl index 99ca3e185..35a267aa7 100644 --- a/apps/emqx/include/emqx_router.hrl +++ b/apps/emqx/include/emqx_router.hrl @@ -19,7 +19,7 @@ %% ETS tables for message routing -define(ROUTE_TAB, emqx_route). --define(ROUTE_TAB_UNIFIED, emqx_route_unified). +-define(ROUTE_TAB_FILTERS, emqx_route_filters). %% Mnesia table for message routing -define(ROUTING_NODE, emqx_routing_node). diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index 4fae556ca..0f4987085 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -56,7 +56,7 @@ prep_stop(_State) -> emqx_listeners:stop(). stop(_State) -> - ok = emqx_router:deinit_table_type(), + ok = emqx_router:deinit_schema(), ok. -define(CONFIG_LOADER, config_loader). diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 7a8d88a55..9f6913dc4 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -73,9 +73,9 @@ ]). -export([ - get_table_type/0, - init_table_type/0, - deinit_table_type/0 + get_schema_vsn/0, + init_schema/0, + deinit_schema/0 ]). -type group() :: binary(). @@ -87,8 +87,6 @@ unused = [] :: nil() }). --dialyzer({nowarn_function, [cleanup_routes_regular/1]}). - %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- @@ -108,7 +106,7 @@ mnesia(boot) -> ]} ]} ]), - ok = mria:create_table(?ROUTE_TAB_UNIFIED, [ + ok = mria:create_table(?ROUTE_TAB_FILTERS, [ {type, ordered_set}, {rlog_shard, ?ROUTE_SHARD}, {storage, ram_copies}, @@ -158,87 +156,41 @@ do_add_route(Topic, Dest) when is_binary(Topic) -> ok; false -> ok = emqx_router_helper:monitor(Dest), - mria_insert_route(get_table_type(), Topic, Dest) + mria_insert_route(get_schema_vsn(), Topic, Dest) end. -mria_insert_route(unified, Topic, Dest) -> - mria_insert_route_unified(Topic, Dest); -mria_insert_route(regular, Topic, Dest) -> - Route = #route{topic = Topic, dest = Dest}, - case emqx_topic:wildcard(Topic) of - true -> - mria_insert_route_update_trie(Route); - false -> - mria_insert_route(Route) - end. - -mria_insert_route_unified(Topic, Dest) -> - K = emqx_topic_index:make_key(Topic, Dest), - mria:dirty_write(?ROUTE_TAB_UNIFIED, #routeidx{entry = K}). - -mria_insert_route_update_trie(Route) -> - emqx_router_utils:maybe_trans( - fun emqx_router_utils:insert_trie_route/2, - [?ROUTE_TAB, Route], - ?ROUTE_SHARD - ). - -mria_insert_route(Route) -> - mria:dirty_write(?ROUTE_TAB, Route). +mria_insert_route(v2, Topic, Dest) -> + mria_insert_route_v2(Topic, Dest); +mria_insert_route(v1, Topic, Dest) -> + mria_insert_route_v1(Topic, Dest). %% @doc Match routes -spec match_routes(emqx_types:topic()) -> [emqx_types:route()]. match_routes(Topic) when is_binary(Topic) -> - match_routes(get_table_type(), Topic). + match_routes(get_schema_vsn(), Topic). -match_routes(unified, Topic) -> - [match_to_route(M) || M <- match_unified(Topic)]; -match_routes(regular, Topic) -> - lookup_routes_regular(Topic) ++ - lists:flatmap(fun lookup_routes_regular/1, match_global_trie(Topic)). - -match_unified(Topic) -> - emqx_topic_index:matches(Topic, ?ROUTE_TAB_UNIFIED, []). - -match_global_trie(Topic) -> - case emqx_trie:empty() of - true -> []; - false -> emqx_trie:match(Topic) - end. +match_routes(v2, Topic) -> + match_routes_v2(Topic); +match_routes(v1, Topic) -> + match_routes_v1(Topic). -spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()]. lookup_routes(Topic) -> - case get_table_type() of - unified -> - lookup_routes_unified(Topic); - regular -> - lookup_routes_regular(Topic) - end. + lookup_routes(get_schema_vsn(), Topic). -lookup_routes_unified(Topic) -> - Pat = #routeidx{entry = emqx_topic_index:make_key(Topic, '$1')}, - [Dest || [Dest] <- ets:match(?ROUTE_TAB_UNIFIED, Pat)]. - -lookup_routes_regular(Topic) -> - ets:lookup(?ROUTE_TAB, Topic). - -match_to_route(M) -> - #route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}. +lookup_routes(v2, Topic) -> + lookup_routes_v2(Topic); +lookup_routes(v1, Topic) -> + lookup_routes_v1(Topic). -spec has_route(emqx_types:topic(), dest()) -> boolean(). has_route(Topic, Dest) -> - case get_table_type() of - unified -> - has_route_unified(Topic, Dest); - regular -> - has_route_regular(Topic, Dest) - end. + has_route(get_schema_vsn(), Topic, Dest). -has_route_unified(Topic, Dest) -> - ets:member(?ROUTE_TAB_UNIFIED, emqx_topic_index:make_key(Topic, Dest)). - -has_route_regular(Topic, Dest) -> - lists:any(fun(Route) -> Route#route.dest =:= Dest end, ets:lookup(?ROUTE_TAB, Topic)). +has_route(v2, Topic, Dest) -> + has_route_v2(Topic, Dest); +has_route(v1, Topic, Dest) -> + has_route_v1(Topic, Dest). -spec delete_route(emqx_types:topic()) -> ok | {error, term()}. delete_route(Topic) when is_binary(Topic) -> @@ -254,42 +206,21 @@ do_delete_route(Topic) when is_binary(Topic) -> -spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}. do_delete_route(Topic, Dest) -> - mria_delete_route(get_table_type(), Topic, Dest). + mria_delete_route(get_schema_vsn(), Topic, Dest). -mria_delete_route(unified, Topic, Dest) -> - mria_delete_route_unified(Topic, Dest); -mria_delete_route(regular, Topic, Dest) -> - Route = #route{topic = Topic, dest = Dest}, - case emqx_topic:wildcard(Topic) of - true -> - mria_delete_route_update_trie(Route); - false -> - mria_delete_route(Route) - end. - -mria_delete_route_unified(Topic, Dest) -> - K = emqx_topic_index:make_key(Topic, Dest), - mria:dirty_delete(?ROUTE_TAB_UNIFIED, K). - -mria_delete_route_update_trie(Route) -> - emqx_router_utils:maybe_trans( - fun emqx_router_utils:delete_trie_route/2, - [?ROUTE_TAB, Route], - ?ROUTE_SHARD - ). - -mria_delete_route(Route) -> - mria:dirty_delete_object(?ROUTE_TAB, Route). +mria_delete_route(v2, Topic, Dest) -> + mria_delete_route_v2(Topic, Dest); +mria_delete_route(v1, Topic, Dest) -> + mria_delete_route_v1(Topic, Dest). -spec topics() -> list(emqx_types:topic()). topics() -> - topics(get_table_type()). + topics(get_schema_vsn()). -topics(unified) -> - Pat = #routeidx{entry = '$1'}, - [emqx_topic_index:get_topic(K) || [K] <- ets:match(?ROUTE_TAB_UNIFIED, Pat)]; -topics(regular) -> - mnesia:dirty_all_keys(?ROUTE_TAB). +topics(v2) -> + list_topics_v2(); +topics(v1) -> + list_topics_v1(). %% @doc Print routes to a topic -spec print_routes(emqx_types:topic()) -> ok. @@ -303,31 +234,99 @@ print_routes(Topic) -> -spec cleanup_routes(node()) -> ok. cleanup_routes(Node) -> - case get_table_type() of - unified -> - cleanup_routes_unified(Node); - regular -> - cleanup_routes_regular(Node) + cleanup_routes(get_schema_vsn(), Node). + +cleanup_routes(v2, Node) -> + cleanup_routes_v2(Node); +cleanup_routes(v1, Node) -> + cleanup_routes_v1(Node). + +-spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. +foldl_routes(FoldFun, AccIn) -> + fold_routes(get_schema_vsn(), foldl, FoldFun, AccIn). + +-spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. +foldr_routes(FoldFun, AccIn) -> + fold_routes(get_schema_vsn(), foldr, FoldFun, AccIn). + +fold_routes(v2, FunName, FoldFun, AccIn) -> + fold_routes_v2(FunName, FoldFun, AccIn); +fold_routes(v1, FunName, FoldFun, AccIn) -> + fold_routes_v1(FunName, FoldFun, AccIn). + +call(Router, Msg) -> + gen_server:call(Router, Msg, infinity). + +pick(Topic) -> + gproc_pool:pick_worker(router_pool, Topic). + +%%-------------------------------------------------------------------- +%% Schema v1 +%% -------------------------------------------------------------------- + +-dialyzer({nowarn_function, [cleanup_routes_v1/1]}). + +mria_insert_route_v1(Topic, Dest) -> + Route = #route{topic = Topic, dest = Dest}, + case emqx_topic:wildcard(Topic) of + true -> + mria_route_tab_insert_update_trie(Route); + false -> + mria_route_tab_insert(Route) end. -cleanup_routes_unified(Node) -> - % NOTE - % No point in transaction here because all the operations on unified routing table - % are dirty. - ets:foldl( - fun(#routeidx{entry = K}, ok) -> - case emqx_topic_index:get_id(K) of - Node -> - mria:dirty_delete(?ROUTE_TAB_UNIFIED, K); - _ -> - ok - end - end, - ok, - ?ROUTE_TAB_UNIFIED +mria_route_tab_insert_update_trie(Route) -> + emqx_router_utils:maybe_trans( + fun emqx_router_utils:insert_trie_route/2, + [?ROUTE_TAB, Route], + ?ROUTE_SHARD ). -cleanup_routes_regular(Node) -> +mria_route_tab_insert(Route) -> + mria:dirty_write(?ROUTE_TAB, Route). + +mria_delete_route_v1(Topic, Dest) -> + Route = #route{topic = Topic, dest = Dest}, + case emqx_topic:wildcard(Topic) of + true -> + mria_route_tab_delete_update_trie(Route); + false -> + mria_route_tab_delete(Route) + end. + +mria_route_tab_delete_update_trie(Route) -> + emqx_router_utils:maybe_trans( + fun emqx_router_utils:delete_trie_route/2, + [?ROUTE_TAB, Route], + ?ROUTE_SHARD + ). + +mria_route_tab_delete(Route) -> + mria:dirty_delete_object(?ROUTE_TAB, Route). + +match_routes_v1(Topic) -> + lookup_route_tab(Topic) ++ + lists:flatmap(fun lookup_route_tab/1, match_global_trie(Topic)). + +match_global_trie(Topic) -> + case emqx_trie:empty() of + true -> []; + false -> emqx_trie:match(Topic) + end. + +lookup_routes_v1(Topic) -> + lookup_route_tab(Topic). + +lookup_route_tab(Topic) -> + ets:lookup(?ROUTE_TAB, Topic). + +has_route_v1(Topic, Dest) -> + has_route_tab_entry(Topic, Dest). + +has_route_tab_entry(Topic, Dest) -> + [] =/= ets:match(?ROUTE_TAB, #route{topic = Topic, dest = Dest}). + +cleanup_routes_v1(Node) -> Patterns = [ #route{_ = '_', dest = Node}, #route{_ = '_', dest = {'_', Node}} @@ -340,83 +339,165 @@ cleanup_routes_regular(Node) -> ] end). --spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. -foldl_routes(FoldFun, AccIn) -> - case get_table_type() of - unified -> - ets:foldl(mk_fold_fun_unified(FoldFun), AccIn, ?ROUTE_TAB_UNIFIED); - regular -> - ets:foldl(FoldFun, AccIn, ?ROUTE_TAB) +list_topics_v1() -> + list_route_tab_topics(). + +list_route_tab_topics() -> + mnesia:dirty_all_keys(?ROUTE_TAB). + +fold_routes_v1(FunName, FoldFun, AccIn) -> + ets:FunName(FoldFun, AccIn, ?ROUTE_TAB). + +%%-------------------------------------------------------------------- +%% Schema v2 +%% One bag table exclusively for regular, non-filter subscription +%% topics, and one `emqx_topic_index` table exclusively for wildcard +%% topics. Writes go to only one of the two tables at a time. +%% -------------------------------------------------------------------- + +mria_insert_route_v2(Topic, Dest) -> + case emqx_trie_search:filter(Topic) of + Words when is_list(Words) -> + K = emqx_topic_index:make_key(Words, Dest), + mria:dirty_write(?ROUTE_TAB_FILTERS, #routeidx{entry = K}); + false -> + mria_route_tab_insert(#route{topic = Topic, dest = Dest}) end. --spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. -foldr_routes(FoldFun, AccIn) -> - case get_table_type() of - unified -> - ets:foldr(mk_fold_fun_unified(FoldFun), AccIn, ?ROUTE_TAB_UNIFIED); - regular -> - ets:foldr(FoldFun, AccIn, ?ROUTE_TAB) +mria_delete_route_v2(Topic, Dest) -> + case emqx_trie_search:filter(Topic) of + Words when is_list(Words) -> + K = emqx_topic_index:make_key(Words, Dest), + mria:dirty_delete(?ROUTE_TAB_FILTERS, K); + false -> + mria_route_tab_delete(#route{topic = Topic, dest = Dest}) end. -mk_fold_fun_unified(FoldFun) -> +match_routes_v2(Topic) -> + lookup_route_tab(Topic) ++ + [match_to_route(M) || M <- match_filters(Topic)]. + +match_filters(Topic) -> + emqx_topic_index:matches(Topic, ?ROUTE_TAB_FILTERS, []). + +lookup_routes_v2(Topic) -> + case emqx_topic:wildcard(Topic) of + true -> + Pat = #routeidx{entry = emqx_topic_index:make_key(Topic, '$1')}, + [Dest || [Dest] <- ets:match(?ROUTE_TAB_FILTERS, Pat)]; + false -> + lookup_route_tab(Topic) + end. + +has_route_v2(Topic, Dest) -> + case emqx_topic:wildcard(Topic) of + true -> + ets:member(?ROUTE_TAB_FILTERS, emqx_topic_index:make_key(Topic, Dest)); + false -> + has_route_tab_entry(Topic, Dest) + end. + +cleanup_routes_v2(Node) -> + % NOTE + % No point in transaction here because all the operations on unified routing table + % are dirty. + ok = ets:foldl( + fun(#routeidx{entry = K}, ok) -> + case get_dest_node(emqx_topic_index:get_id(K)) of + Node -> + mria:dirty_delete(?ROUTE_TAB_FILTERS, K); + _ -> + ok + end + end, + ok, + ?ROUTE_TAB_FILTERS + ), + ok = ets:foldl( + fun(#route{dest = Dest} = Route, ok) -> + case get_dest_node(Dest) of + Node -> + mria:dirty_delete_object(?ROUTE_TAB, Route); + _ -> + ok + end + end, + ok, + ?ROUTE_TAB + ). + +get_dest_node({_, Node}) -> + Node; +get_dest_node(Node) -> + Node. + +list_topics_v2() -> + Pat = #routeidx{entry = '$1'}, + Filters = [emqx_topic_index:get_topic(K) || [K] <- ets:match(?ROUTE_TAB_FILTERS, Pat)], + list_route_tab_topics() ++ Filters. + +fold_routes_v2(FunName, FoldFun, AccIn) -> + FilterFoldFun = mk_filtertab_fold_fun(FoldFun), + Acc = ets:FunName(FoldFun, AccIn, ?ROUTE_TAB), + ets:FunName(FilterFoldFun, Acc, ?ROUTE_TAB_FILTERS). + +mk_filtertab_fold_fun(FoldFun) -> fun(#routeidx{entry = K}, Acc) -> FoldFun(match_to_route(K), Acc) end. -call(Router, Msg) -> - gen_server:call(Router, Msg, infinity). - -pick(Topic) -> - gproc_pool:pick_worker(router_pool, Topic). +match_to_route(M) -> + #route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}. %%-------------------------------------------------------------------- %% Routing table type %% -------------------------------------------------------------------- --define(PT_TABLE_TYPE, {?MODULE, tabtype}). +-define(PT_SCHEMA_VSN, {?MODULE, schemavsn}). --type tabtype() :: regular | unified. +-type schemavsn() :: v1 | v2. --spec get_table_type() -> tabtype(). -get_table_type() -> - persistent_term:get(?PT_TABLE_TYPE). +-spec get_schema_vsn() -> schemavsn(). +get_schema_vsn() -> + persistent_term:get(?PT_SCHEMA_VSN). --spec init_table_type() -> ok. -init_table_type() -> - ConfType = emqx_config:get([broker, routing_table_type]), - Type = choose_table_type(ConfType), - ok = persistent_term:put(?PT_TABLE_TYPE, Type), - case Type of - ConfType -> +-spec init_schema() -> ok. +init_schema() -> + ConfSchema = emqx_config:get([broker, routing, storage_schema]), + Schema = choose_schema_vsn(ConfSchema), + ok = persistent_term:put(?PT_SCHEMA_VSN, Schema), + case Schema of + ConfSchema -> ?SLOG(info, #{ - msg => "routing_table_type_used", - type => Type + msg => "routing_schema_used", + schema => Schema }); _ -> ?SLOG(notice, #{ - msg => "configured_routing_table_type_unacceptable", - type => Type, - configured => ConfType, + msg => "configured_routing_schema_unacceptable", + schema => Schema, + configured => ConfSchema, reason => - "Could not use configured routing table type because " - "there's already non-empty routing table of another type." + "Could not use configured routing storage schema because " + "there are already non-empty routing tables pertaining to " + "another schema." }) end. --spec deinit_table_type() -> ok. -deinit_table_type() -> - _ = persistent_term:erase(?PT_TABLE_TYPE), +-spec deinit_schema() -> ok. +deinit_schema() -> + _ = persistent_term:erase(?PT_SCHEMA_VSN), ok. --spec choose_table_type(tabtype()) -> tabtype(). -choose_table_type(ConfType) -> - IsEmptyRegular = is_empty(?ROUTE_TAB), - IsEmptyUnified = is_empty(?ROUTE_TAB_UNIFIED), - case {IsEmptyRegular, IsEmptyUnified} of +-spec choose_schema_vsn(schemavsn()) -> schemavsn(). +choose_schema_vsn(ConfType) -> + IsEmptyIndex = emqx_trie:empty(), + IsEmptyFilters = is_empty(?ROUTE_TAB_FILTERS), + case {IsEmptyIndex, IsEmptyFilters} of {true, true} -> ConfType; {false, true} -> - regular; + v1; {true, false} -> - unified + v2 end. is_empty(Tab) -> diff --git a/apps/emqx/src/emqx_router_sup.erl b/apps/emqx/src/emqx_router_sup.erl index 398edc321..588b0de8e 100644 --- a/apps/emqx/src/emqx_router_sup.erl +++ b/apps/emqx/src/emqx_router_sup.erl @@ -24,7 +24,7 @@ start_link() -> %% Init and log routing table type - ok = emqx_router:init_table_type(), + ok = emqx_router:init_schema(), supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 818688d86..85bdd7b50 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1358,14 +1358,10 @@ fields("broker") -> ref("broker_perf"), #{importance => ?IMPORTANCE_HIDDEN} )}, - {"routing_table_type", + {"routing", sc( - hoconsc:enum([regular, unified]), - #{ - default => regular, - importance => ?IMPORTANCE_HIDDEN, - desc => ?DESC(broker_routing_table_type) - } + ref("broker_routing"), + #{importance => ?IMPORTANCE_HIDDEN} )}, %% FIXME: Need new design for shared subscription group {"shared_subscription_group", @@ -1378,6 +1374,17 @@ fields("broker") -> } )} ]; +fields("broker_routing") -> + [ + {"storage_schema", + sc( + hoconsc:enum([v1, v2]), + #{ + default => v1, + desc => ?DESC(broker_routing_storage_schema) + } + )} + ]; fields("shared_subscription_group") -> [ {"strategy", diff --git a/apps/emqx/test/emqx_router_SUITE.erl b/apps/emqx/test/emqx_router_SUITE.erl index f9e1c6998..adc898365 100644 --- a/apps/emqx/test/emqx_router_SUITE.erl +++ b/apps/emqx/test/emqx_router_SUITE.erl @@ -28,15 +28,15 @@ all() -> [ - {group, routing_table_regular}, - {group, routing_table_unified} + {group, routing_schema_v1}, + {group, routing_schema_v2} ]. groups() -> TCs = emqx_common_test_helpers:all(?MODULE), [ - {routing_table_regular, [], TCs}, - {routing_table_unified, [], TCs} + {routing_schema_v1, [], TCs}, + {routing_schema_v2, [], TCs} ]. init_per_group(GroupName, Config) -> @@ -53,10 +53,10 @@ init_per_group(GroupName, Config) -> end_per_group(_GroupName, Config) -> ok = emqx_cth_suite:stop(?config(group_apps, Config)). -mk_config(routing_table_regular) -> - "broker.routing_table_type = regular"; -mk_config(routing_table_unified) -> - "broker.routing_table_type = unified". +mk_config(routing_schema_v1) -> + "broker.routing.storage_schema = v1"; +mk_config(routing_schema_v2) -> + "broker.routing.storage_schema = v2". init_per_testcase(_TestCase, Config) -> clear_tables(), @@ -85,10 +85,10 @@ end_per_testcase(_TestCase, _Config) -> t_verify_type(Config) -> case ?config(group_name, Config) of - routing_table_regular -> - ?assertEqual(regular, ?R:get_table_type()); - routing_table_unified -> - ?assertEqual(unified, ?R:get_table_type()) + routing_schema_v1 -> + ?assertEqual(v1, ?R:get_schema_vsn()); + routing_schema_v2 -> + ?assertEqual(v2, ?R:get_schema_vsn()) end. t_add_delete(_) -> @@ -198,5 +198,5 @@ t_unexpected(_) -> clear_tables() -> lists:foreach( fun mnesia:clear_table/1, - [?ROUTE_TAB, ?ROUTE_TAB_UNIFIED, ?TRIE] + [?ROUTE_TAB, ?ROUTE_TAB_FILTERS, ?TRIE] ). diff --git a/apps/emqx/test/emqx_router_helper_SUITE.erl b/apps/emqx/test/emqx_router_helper_SUITE.erl index 26b9d8ddd..f4d3cc43a 100644 --- a/apps/emqx/test/emqx_router_helper_SUITE.erl +++ b/apps/emqx/test/emqx_router_helper_SUITE.erl @@ -28,15 +28,15 @@ all() -> [ - {group, routing_table_regular}, - {group, routing_table_unified} + {group, routing_schema_v1}, + {group, routing_schema_v2} ]. groups() -> TCs = emqx_common_test_helpers:all(?MODULE), [ - {routing_table_regular, [], TCs}, - {routing_table_unified, [], TCs} + {routing_schema_v1, [], TCs}, + {routing_schema_v2, [], TCs} ]. init_per_group(GroupName, Config) -> @@ -48,14 +48,14 @@ init_per_group(GroupName, Config) -> end_per_group(_GroupName, Config) -> ok = emqx_cth_suite:stop(?config(group_apps, Config)). -mk_config(routing_table_regular) -> +mk_config(routing_schema_v1) -> #{ - config => "broker.routing_table_type = regular", + config => "broker.routing.storage_schema = v1", override_env => [{boot_modules, [router]}] }; -mk_config(routing_table_unified) -> +mk_config(routing_schema_v2) -> #{ - config => "broker.routing_table_type = unified", + config => "broker.routing.storage_schema = v2", override_env => [{boot_modules, [router]}] }. diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index d90fab47b..4a8440e83 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1549,11 +1549,12 @@ fields_ws_opts_max_frame_size.label: sys_event_messages.desc: """Client events messages.""" -broker_routing_table_type.desc: -"""Routing table type. -Unified routing table should increase both subscription and routing performance at the cost of slight increase in memory consumption per subscription. -NOTE: This is an experimental feature. -NOTE: Full non-rolling cluster restart is needed after enabling or disabling this option for it to take any effect.""" +broker_routing_storage_schema.desc: +"""Routing storage schema. +Set v1 to leave the default. +Set v2 to enable routing through 2 separate tables, one for topic filter and one for regular topic subscriptions. This schema should increase both subscription and routing performance at the cost of slight increase in memory consumption per subscription. +NOTE: Schema v2 is still experimental. +NOTE: Full non-rolling cluster restart is needed after altering this option for it to take any effect.""" broker_perf_trie_compaction.desc: """Enable trie path compaction. From 6b9cb063348f4225627731c47ac82431c3aa5594 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 30 Aug 2023 19:51:00 +0400 Subject: [PATCH 12/26] fix(router): add / refine some comments and log messages Co-authored-by: Zaiming (Stone) Shi --- apps/emqx/src/emqx_router.erl | 12 +++++++----- apps/emqx/src/emqx_router_helper.erl | 1 - 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 9f6913dc4..8f2947ba2 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -164,7 +164,8 @@ mria_insert_route(v2, Topic, Dest) -> mria_insert_route(v1, Topic, Dest) -> mria_insert_route_v1(Topic, Dest). -%% @doc Match routes +%% @doc Take a real topic (not filter) as input, return the matching topics and topic +%% filters associated with route destination. -spec match_routes(emqx_types:topic()) -> [emqx_types:route()]. match_routes(Topic) when is_binary(Topic) -> match_routes(get_schema_vsn(), Topic). @@ -174,6 +175,8 @@ match_routes(v2, Topic) -> match_routes(v1, Topic) -> match_routes_v1(Topic). +%% @doc Take a topic or filter as input, and return the existing routes with exactly +%% this topic or filter. -spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()]. lookup_routes(Topic) -> lookup_routes(get_schema_vsn(), Topic). @@ -399,8 +402,7 @@ has_route_v2(Topic, Dest) -> cleanup_routes_v2(Node) -> % NOTE - % No point in transaction here because all the operations on unified routing table - % are dirty. + % No point in transaction here because all the operations on filters table are dirty. ok = ets:foldl( fun(#routeidx{entry = K}, ok) -> case get_dest_node(emqx_topic_index:get_id(K)) of @@ -472,8 +474,8 @@ init_schema() -> }); _ -> ?SLOG(notice, #{ - msg => "configured_routing_schema_unacceptable", - schema => Schema, + msg => "configured_routing_schema_ignored", + schema_in_use => Schema, configured => ConfSchema, reason => "Could not use configured routing storage schema because " diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index 77f1cd11b..b9cdbae4b 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -151,7 +151,6 @@ handle_info({nodedown, Node}, State = #{nodes := Nodes}) -> % TODO % Node may flap, do we need to wait for any pending cleanups in `init/1` % on the flapping node? - % This also implies changing lock id to `{?LOCK, Node}`. global:trans( {?LOCK, self()}, fun() -> cleanup_routes(Node) end From b1defa29d7597c3e81701c85961b74d5bff904b3 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 30 Aug 2023 20:07:56 +0400 Subject: [PATCH 13/26] feat(ruleeng): avoid storing whole rules in topic index Because it doesn't really give any benefit, but wastes memory by duplication. --- apps/emqx_rule_engine/src/emqx_rule_engine.erl | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 0060ed819..01b9b76d0 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -227,7 +227,7 @@ get_rules_for_topic(Topic) -> [ Rule || M <- emqx_topic_index:matches(Topic, ?RULE_TOPIC_INDEX, [unique]), - Rule <- emqx_topic_index:get_record(M, ?RULE_TOPIC_INDEX) + Rule <- lookup_rule(emqx_topic_index:get_id(M)) ]. -spec get_rules_with_same_event(Topic :: binary()) -> [rule()]. @@ -285,11 +285,14 @@ is_of_event_name(EventName, Topic) -> -spec get_rule(Id :: rule_id()) -> {ok, rule()} | not_found. get_rule(Id) -> - case ets:lookup(?RULE_TAB, Id) of - [{Id, Rule}] -> {ok, Rule#{id => Id}}; + case lookup_rule(Id) of + [Rule] -> {ok, Rule}; [] -> not_found end. +lookup_rule(Id) -> + [Rule || {_Id, Rule} <- ets:lookup(?RULE_TAB, Id)]. + load_hooks_for_rule(#{from := Topics}) -> lists:foreach(fun emqx_rule_events:load/1, Topics). @@ -484,7 +487,7 @@ with_parsed_rule(Params = #{id := RuleId, sql := Sql, actions := Actions}, Creat do_insert_rule(#{id := Id} = Rule) -> ok = load_hooks_for_rule(Rule), ok = maybe_add_metrics_for_rule(Id), - true = ets:insert(?RULE_TAB, {Id, maps:remove(id, Rule)}), + true = ets:insert(?RULE_TAB, {Id, Rule}), ok. do_delete_rule(#{id := Id} = Rule) -> @@ -493,10 +496,10 @@ do_delete_rule(#{id := Id} = Rule) -> true = ets:delete(?RULE_TAB, Id), ok. -do_update_rule_index(#{id := Id, from := From} = Rule) -> +do_update_rule_index(#{id := Id, from := From}) -> ok = lists:foreach( fun(Topic) -> - true = emqx_topic_index:insert(Topic, Id, Rule, ?RULE_TOPIC_INDEX) + true = emqx_topic_index:insert(Topic, Id, [], ?RULE_TOPIC_INDEX) end, From ). From fb094e1d471b6ff51a2b005c1299c16a5cfebb8c Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 30 Aug 2023 20:11:43 +0400 Subject: [PATCH 14/26] test(router): avoid testsuite collisions --- apps/emqx/test/emqx_router_SUITE.erl | 2 +- apps/emqx/test/emqx_router_helper_SUITE.erl | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/apps/emqx/test/emqx_router_SUITE.erl b/apps/emqx/test/emqx_router_SUITE.erl index adc898365..c19363f93 100644 --- a/apps/emqx/test/emqx_router_SUITE.erl +++ b/apps/emqx/test/emqx_router_SUITE.erl @@ -40,7 +40,7 @@ groups() -> ]. init_per_group(GroupName, Config) -> - WorkDir = filename:join([?config(priv_dir, Config), GroupName]), + WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]), AppSpecs = [ {emqx, #{ config => mk_config(GroupName), diff --git a/apps/emqx/test/emqx_router_helper_SUITE.erl b/apps/emqx/test/emqx_router_helper_SUITE.erl index f4d3cc43a..d65a6f666 100644 --- a/apps/emqx/test/emqx_router_helper_SUITE.erl +++ b/apps/emqx/test/emqx_router_helper_SUITE.erl @@ -40,7 +40,7 @@ groups() -> ]. init_per_group(GroupName, Config) -> - WorkDir = filename:join([?config(priv_dir, Config), GroupName]), + WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]), AppSpecs = [{emqx, mk_config(GroupName)}], Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}), [{group_name, GroupName}, {group_apps, Apps} | Config]. @@ -60,16 +60,15 @@ mk_config(routing_schema_v2) -> }. init_per_testcase(TestCase, Config) when - TestCase =:= t_cleanup_membership_mnesia_down; - TestCase =:= t_cleanup_membership_node_down; TestCase =:= t_cleanup_monitor_node_down -> ok = snabbkaffe:start_trace(), - WorkDir = filename:join([?config(priv_dir, Config), ?config(group_name, Config), TestCase]), + GroupName = ?config(group_name, Config), + WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName, TestCase]), [Slave] = emqx_cth_cluster:start( [ {?MODULE, #{ - apps => [{emqx, mk_config(?config(group_name, Config))}], + apps => [{emqx, mk_config(GroupName)}], join_to => node() }} ], @@ -77,11 +76,10 @@ init_per_testcase(TestCase, Config) when ), [{slave, Slave} | Config]; init_per_testcase(_TestCase, Config) -> + ok = snabbkaffe:start_trace(), Config. end_per_testcase(TestCase, Config) when - TestCase =:= t_cleanup_membership_mnesia_down; - TestCase =:= t_cleanup_membership_node_down; TestCase =:= t_cleanup_monitor_node_down -> Slave = ?config(slave, Config), @@ -89,6 +87,7 @@ end_per_testcase(TestCase, Config) when ok = snabbkaffe:stop(), ok; end_per_testcase(_TestCase, _Config) -> + ok = snabbkaffe:stop(), ok. t_monitor(_) -> From c20ba0572a1efc838007b9db619fa2c3b932bb1e Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 31 Aug 2023 15:47:39 +0200 Subject: [PATCH 15/26] chore: bump to 5.2.0-alpha.4 --- apps/emqx/include/emqx_release.hrl | 2 +- deploy/charts/emqx-enterprise/Chart.yaml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index c7fa97be7..7b0fcad1a 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -35,7 +35,7 @@ -define(EMQX_RELEASE_CE, "5.1.5-build.3"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.2.0-alpha.3"). +-define(EMQX_RELEASE_EE, "5.2.0-alpha.4"). %% The HTTP API version -define(EMQX_API_VERSION, "5.0"). diff --git a/deploy/charts/emqx-enterprise/Chart.yaml b/deploy/charts/emqx-enterprise/Chart.yaml index 575c6b354..b72b67e81 100644 --- a/deploy/charts/emqx-enterprise/Chart.yaml +++ b/deploy/charts/emqx-enterprise/Chart.yaml @@ -14,8 +14,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 5.2.0-alpha.3 +version: 5.2.0-alpha.4 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 5.2.0-alpha.3 +appVersion: 5.2.0-alpha.4 From eb0385a28f8625f5c6aab3ee6ec49f70ae287b9d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 1 Sep 2023 11:57:30 +0400 Subject: [PATCH 16/26] chore(router): separate test-only function exports --- apps/emqx/src/emqx_router.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 8f2947ba2..092af06da 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -49,8 +49,7 @@ -export([ match_routes/1, - lookup_routes/1, - has_route/2 + lookup_routes/1 ]). -export([print_routes/1]). @@ -62,6 +61,9 @@ -export([topics/0]). +%% Exported for tests +-export([has_route/2]). + %% gen_server callbacks -export([ init/1, From 494626629700ff42613c7a772f1a6b403c6a22c9 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 1 Sep 2023 11:58:07 +0400 Subject: [PATCH 17/26] chore(schema): mention when routing schema v2 is introduced --- rel/i18n/emqx_schema.hocon | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 4a8440e83..9e33ffb57 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1552,7 +1552,7 @@ sys_event_messages.desc: broker_routing_storage_schema.desc: """Routing storage schema. Set v1 to leave the default. -Set v2 to enable routing through 2 separate tables, one for topic filter and one for regular topic subscriptions. This schema should increase both subscription and routing performance at the cost of slight increase in memory consumption per subscription. +v2 is introduced in 5.2. It enables routing through 2 separate tables, one for topic filter and one for regular topic subscriptions. This schema should increase both subscription and routing performance at the cost of slight increase in memory consumption per subscription. NOTE: Schema v2 is still experimental. NOTE: Full non-rolling cluster restart is needed after altering this option for it to take any effect.""" From 0a879bffd193ad76602b32c4fea0e60b91636bd6 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 1 Sep 2023 13:47:56 +0400 Subject: [PATCH 18/26] test(router): add e2e testcase for cluster-wide routing Which verifies that both router storage schemas work correctly. --- apps/emqx/test/emqx_cth_cluster.erl | 2 + apps/emqx/test/emqx_routing_SUITE.erl | 148 ++++++++++++++++++++++++++ 2 files changed, 150 insertions(+) create mode 100644 apps/emqx/test/emqx_routing_SUITE.erl diff --git a/apps/emqx/test/emqx_cth_cluster.erl b/apps/emqx/test/emqx_cth_cluster.erl index 5e8bd4103..19795d588 100644 --- a/apps/emqx/test/emqx_cth_cluster.erl +++ b/apps/emqx/test/emqx_cth_cluster.erl @@ -237,6 +237,8 @@ default_appspec(emqx_conf, Spec, _NodeSpecs) -> listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec) } }; +default_appspec(emqx, Spec, _NodeSpecs) -> + #{config => #{listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec)}}; default_appspec(_App, _, _) -> #{}. diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl new file mode 100644 index 000000000..809158db0 --- /dev/null +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -0,0 +1,148 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_routing_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("emqx/include/asserts.hrl"). + +all() -> + [ + {group, routing_schema_v1}, + {group, routing_schema_v2} + ]. + +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + [ + {routing_schema_v1, [], TCs}, + {routing_schema_v2, [], TCs} + ]. + +init_per_group(GroupName, Config) -> + WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]), + NodeSpec = #{ + apps => [ + {emqx, #{ + config => mk_config(GroupName), + after_start => fun() -> + % NOTE + % This one is actually defined on `emqx_conf_schema` level, but used + % in `emqx_broker`. Thus we have to resort to this ugly hack. + emqx_config:force_put([rpc, mode], async) + end + }} + ] + }, + NodeSpecs = [ + {emqx_routing_SUITE1, NodeSpec#{role => core}}, + {emqx_routing_SUITE2, NodeSpec#{role => core}}, + {emqx_routing_SUITE3, NodeSpec#{role => replicant}} + ], + Nodes = emqx_cth_cluster:start(NodeSpecs, #{work_dir => WorkDir}), + [{cluster, Nodes}, Config]. + +end_per_group(_GroupName, Config) -> + emqx_cth_cluster:stop(?config(cluster, Config)). + +mk_config(routing_schema_v1) -> + "broker.routing.storage_schema = v1"; +mk_config(routing_schema_v2) -> + "broker.routing.storage_schema = v2". + +t_cluster_routing(Config) -> + Cluster = ?config(cluster, Config), + Clients = [C1, C2, C3] = [start_client(N) || N <- Cluster], + Commands = [ + {fun publish/3, [C1, <<"a/b/c">>, <<"wontsee">>]}, + {fun publish/3, [C2, <<"a/b/d">>, <<"wontsee">>]}, + {fun subscribe/2, [C3, <<"a/+/c/#">>]}, + {fun publish/3, [C1, <<"a/b/c">>, <<"01">>]}, + {fun publish/3, [C2, <<"a/b/d">>, <<"wontsee">>]}, + {fun subscribe/2, [C1, <<"a/b/c">>]}, + {fun subscribe/2, [C2, <<"a/b/+">>]}, + {fun publish/3, [C3, <<"a/b/c">>, <<"02">>]}, + {fun publish/3, [C2, <<"a/b/d">>, <<"03">>]}, + {fun publish/3, [C2, <<"a/b/c/d">>, <<"04">>]}, + {fun subscribe/2, [C3, <<"a/b/d">>]}, + {fun publish/3, [C1, <<"a/b/d">>, <<"05">>]}, + {fun unsubscribe/2, [C3, <<"a/+/c/#">>]}, + {fun publish/3, [C1, <<"a/b/c">>, <<"06">>]}, + {fun publish/3, [C2, <<"a/b/d">>, <<"07">>]}, + {fun publish/3, [C2, <<"a/b/c/d">>, <<"08">>]}, + {fun unsubscribe/2, [C2, <<"a/b/+">>]}, + {fun publish/3, [C1, <<"a/b/c">>, <<"09">>]}, + {fun publish/3, [C2, <<"a/b/d">>, <<"10">>]}, + {fun publish/3, [C2, <<"a/b/c/d">>, <<"11">>]}, + {fun unsubscribe/2, [C3, <<"a/b/d">>]}, + {fun unsubscribe/2, [C1, <<"a/b/c">>]}, + {fun publish/3, [C1, <<"a/b/c">>, <<"wontsee">>]}, + {fun publish/3, [C2, <<"a/b/d">>, <<"wontsee">>]} + ], + ok = lists:foreach(fun({F, Args}) -> erlang:apply(F, Args) end, Commands), + _ = [emqtt:stop(C) || C <- Clients], + Deliveries = ?drainMailbox(), + ?assertMatch( + [ + {pub, C1, #{topic := <<"a/b/c">>, payload := <<"02">>}}, + {pub, C1, #{topic := <<"a/b/c">>, payload := <<"06">>}}, + {pub, C1, #{topic := <<"a/b/c">>, payload := <<"09">>}}, + {pub, C2, #{topic := <<"a/b/c">>, payload := <<"02">>}}, + {pub, C2, #{topic := <<"a/b/d">>, payload := <<"03">>}}, + {pub, C2, #{topic := <<"a/b/d">>, payload := <<"05">>}}, + {pub, C2, #{topic := <<"a/b/c">>, payload := <<"06">>}}, + {pub, C2, #{topic := <<"a/b/d">>, payload := <<"07">>}}, + {pub, C3, #{topic := <<"a/b/c">>, payload := <<"01">>}}, + {pub, C3, #{topic := <<"a/b/c">>, payload := <<"02">>}}, + {pub, C3, #{topic := <<"a/b/c/d">>, payload := <<"04">>}}, + {pub, C3, #{topic := <<"a/b/d">>, payload := <<"05">>}}, + {pub, C3, #{topic := <<"a/b/d">>, payload := <<"07">>}}, + {pub, C3, #{topic := <<"a/b/d">>, payload := <<"10">>}} + ], + lists:sort(Deliveries) + ). + +start_client(Node) -> + Self = self(), + {ok, C} = emqtt:start_link(#{ + port => get_mqtt_tcp_port(Node), + msg_handler => #{ + publish => fun(Msg) -> Self ! {pub, self(), Msg} end + } + }), + {ok, _Props} = emqtt:connect(C), + C. + +publish(C, Topic, Payload) -> + {ok, #{reason_code := 0}} = emqtt:publish(C, Topic, Payload, 1). + +subscribe(C, Topic) -> + % NOTE: sleeping here as lazy way to wait for subscribe to replicate + {ok, _Props, [0]} = emqtt:subscribe(C, Topic), + ok = timer:sleep(200). + +unsubscribe(C, Topic) -> + % NOTE: sleeping here as lazy way to wait for unsubscribe to replicate + {ok, _Props, undefined} = emqtt:unsubscribe(C, Topic), + ok = timer:sleep(200). + +get_mqtt_tcp_port(Node) -> + {_, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]), + Port. From 545f1c84a69843a766c10298160e101e88f614d8 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 1 Sep 2023 17:23:34 +0400 Subject: [PATCH 19/26] fix(cth): do not allocate ports for `emqx` app by default This causes tricky and impressively hard to track side effects down the line. Namely, loading `emqx_schema` _after_ `emqx_conf_schema` (as part of cluster node startup sequence) leads to a couple of schema root rewrites, because `emqx_schema` defines similar config roots yet slightly differently (e.g. `authorization`). --- apps/emqx/test/emqx_cth_cluster.erl | 2 - apps/emqx/test/emqx_routing_SUITE.erl | 55 +++++++++++++++++---------- 2 files changed, 35 insertions(+), 22 deletions(-) diff --git a/apps/emqx/test/emqx_cth_cluster.erl b/apps/emqx/test/emqx_cth_cluster.erl index 19795d588..5e8bd4103 100644 --- a/apps/emqx/test/emqx_cth_cluster.erl +++ b/apps/emqx/test/emqx_cth_cluster.erl @@ -237,8 +237,6 @@ default_appspec(emqx_conf, Spec, _NodeSpecs) -> listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec) } }; -default_appspec(emqx, Spec, _NodeSpecs) -> - #{config => #{listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec)}}; default_appspec(_App, _, _) -> #{}. diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl index 809158db0..945e84248 100644 --- a/apps/emqx/test/emqx_routing_SUITE.erl +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -38,23 +38,10 @@ groups() -> init_per_group(GroupName, Config) -> WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]), - NodeSpec = #{ - apps => [ - {emqx, #{ - config => mk_config(GroupName), - after_start => fun() -> - % NOTE - % This one is actually defined on `emqx_conf_schema` level, but used - % in `emqx_broker`. Thus we have to resort to this ugly hack. - emqx_config:force_put([rpc, mode], async) - end - }} - ] - }, NodeSpecs = [ - {emqx_routing_SUITE1, NodeSpec#{role => core}}, - {emqx_routing_SUITE2, NodeSpec#{role => core}}, - {emqx_routing_SUITE3, NodeSpec#{role => replicant}} + {emqx_routing_SUITE1, #{apps => mk_appspecs(GroupName, 1), role => core}}, + {emqx_routing_SUITE2, #{apps => mk_appspecs(GroupName, 2), role => core}}, + {emqx_routing_SUITE3, #{apps => mk_appspecs(GroupName, 3), role => replicant}} ], Nodes = emqx_cth_cluster:start(NodeSpecs, #{work_dir => WorkDir}), [{cluster, Nodes}, Config]. @@ -62,10 +49,38 @@ init_per_group(GroupName, Config) -> end_per_group(_GroupName, Config) -> emqx_cth_cluster:stop(?config(cluster, Config)). -mk_config(routing_schema_v1) -> - "broker.routing.storage_schema = v1"; -mk_config(routing_schema_v2) -> - "broker.routing.storage_schema = v2". +mk_appspecs(GroupName, N) -> + [ + {emqx, #{ + config => mk_config(GroupName, N), + after_start => fun() -> + % NOTE + % This one is actually defined on `emqx_conf_schema` level, but used + % in `emqx_broker`. Thus we have to resort to this ugly hack. + emqx_config:force_put([rpc, mode], async) + end + }} + ]. + +mk_config(GroupName, N) -> + #{ + broker => mk_config_broker(GroupName), + listeners => mk_config_listeners(N) + }. + +mk_config_broker(routing_schema_v1) -> + #{routing => #{storage_schema => v1}}; +mk_config_broker(routing_schema_v2) -> + #{routing => #{storage_schema => v2}}. + +mk_config_listeners(N) -> + Port = 1883 + N, + #{ + tcp => #{default => #{bind => "127.0.0.1:" ++ integer_to_list(Port)}}, + ssl => #{default => #{enable => false}}, + ws => #{default => #{enable => false}}, + wss => #{default => #{enable => false}} + }. t_cluster_routing(Config) -> Cluster = ?config(cluster, Config), From ebf13c90eac4d024c4918f6ac225f4e115370983 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 1 Sep 2023 19:48:56 +0400 Subject: [PATCH 20/26] test(router): make sort stable in e2e routing testcase --- apps/emqx/test/emqx_routing_SUITE.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl index 945e84248..dbc4dc193 100644 --- a/apps/emqx/test/emqx_routing_SUITE.erl +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -131,7 +131,12 @@ t_cluster_routing(Config) -> {pub, C3, #{topic := <<"a/b/d">>, payload := <<"07">>}}, {pub, C3, #{topic := <<"a/b/d">>, payload := <<"10">>}} ], - lists:sort(Deliveries) + lists:sort( + fun({pub, CL, #{payload := PL}}, {pub, CR, #{payload := PR}}) -> + {CL, PL} < {CR, PR} + end, + Deliveries + ) ). start_client(Node) -> From 4ab5f8374b88056cb21ad23df5d870e4faaae65b Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 1 Sep 2023 19:58:31 +0400 Subject: [PATCH 21/26] chore(schema): mark routing storage schema readOnly Co-authored-by: Thales Macedo Garitezi --- apps/emqx/src/emqx_schema.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index e5f46ffc4..04bd397ec 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1381,6 +1381,7 @@ fields("broker_routing") -> hoconsc:enum([v1, v2]), #{ default => v1, + 'readOnly' => true, desc => ?DESC(broker_routing_storage_schema) } )} From 5024304bf9dc83e02567c2cb4114e5486011bfc6 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 4 Sep 2023 14:37:46 +0400 Subject: [PATCH 22/26] fix(router): wait for tables replicate before choosing schema vsn --- apps/emqx/src/emqx_router.erl | 2 + apps/emqx/src/emqx_trie.erl | 5 + apps/emqx/test/emqx_routing_SUITE.erl | 130 ++++++++++++++++++++++---- 3 files changed, 117 insertions(+), 20 deletions(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 092af06da..2cd5ffb96 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -465,6 +465,8 @@ get_schema_vsn() -> -spec init_schema() -> ok. init_schema() -> + ok = mria:wait_for_tables([?ROUTE_TAB, ?ROUTE_TAB_FILTERS]), + ok = emqx_trie:wait_for_tables(), ConfSchema = emqx_config:get([broker, routing, storage_schema]), Schema = choose_schema_vsn(ConfSchema), ok = persistent_term:put(?PT_SCHEMA_VSN, Schema), diff --git a/apps/emqx/src/emqx_trie.erl b/apps/emqx/src/emqx_trie.erl index 229a0e3f4..76be97d3e 100644 --- a/apps/emqx/src/emqx_trie.erl +++ b/apps/emqx/src/emqx_trie.erl @@ -21,6 +21,7 @@ %% Mnesia bootstrap -export([ mnesia/1, + wait_for_tables/0, create_session_trie/1 ]). @@ -105,6 +106,10 @@ create_session_trie(Type) -> ] ). +-spec wait_for_tables() -> ok | {error, _Reason}. +wait_for_tables() -> + mria:wait_for_tables([?TRIE]). + %%-------------------------------------------------------------------- %% Topics APIs %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl index dbc4dc193..6966ac56a 100644 --- a/apps/emqx/test/emqx_routing_SUITE.erl +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -26,11 +26,15 @@ all() -> [ {group, routing_schema_v1}, - {group, routing_schema_v2} + {group, routing_schema_v2}, + t_routing_schema_switch_v1, + t_routing_schema_switch_v2 ]. groups() -> - TCs = emqx_common_test_helpers:all(?MODULE), + TCs = [ + t_cluster_routing + ], [ {routing_schema_v1, [], TCs}, {routing_schema_v2, [], TCs} @@ -39,28 +43,38 @@ groups() -> init_per_group(GroupName, Config) -> WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]), NodeSpecs = [ - {emqx_routing_SUITE1, #{apps => mk_appspecs(GroupName, 1), role => core}}, - {emqx_routing_SUITE2, #{apps => mk_appspecs(GroupName, 2), role => core}}, - {emqx_routing_SUITE3, #{apps => mk_appspecs(GroupName, 3), role => replicant}} + {emqx_routing_SUITE1, #{apps => [mk_emqx_appspec(GroupName, 1)], role => core}}, + {emqx_routing_SUITE2, #{apps => [mk_emqx_appspec(GroupName, 2)], role => core}}, + {emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(GroupName, 3)], role => replicant}} ], Nodes = emqx_cth_cluster:start(NodeSpecs, #{work_dir => WorkDir}), - [{cluster, Nodes}, Config]. + [{cluster, Nodes} | Config]. end_per_group(_GroupName, Config) -> emqx_cth_cluster:stop(?config(cluster, Config)). -mk_appspecs(GroupName, N) -> - [ - {emqx, #{ - config => mk_config(GroupName, N), - after_start => fun() -> - % NOTE - % This one is actually defined on `emqx_conf_schema` level, but used - % in `emqx_broker`. Thus we have to resort to this ugly hack. - emqx_config:force_put([rpc, mode], async) - end - }} - ]. +init_per_testcase(TC, Config) -> + WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, TC]), + [{work_dir, WorkDir} | Config]. + +end_per_testcase(_TC, _Config) -> + ok. + +mk_emqx_appspec(GroupName, N) -> + {emqx, #{ + config => mk_config(GroupName, N), + after_start => fun() -> + % NOTE + % This one is actually defined on `emqx_conf_schema` level, but used + % in `emqx_broker`. Thus we have to resort to this ugly hack. + emqx_config:force_put([rpc, mode], async) + end + }}. + +mk_genrpc_appspec() -> + {gen_rpc, #{ + override_env => [{port_discovery, stateless}] + }}. mk_config(GroupName, N) -> #{ @@ -68,9 +82,9 @@ mk_config(GroupName, N) -> listeners => mk_config_listeners(N) }. -mk_config_broker(routing_schema_v1) -> +mk_config_broker(Vsn) when Vsn == routing_schema_v1; Vsn == v1 -> #{routing => #{storage_schema => v1}}; -mk_config_broker(routing_schema_v2) -> +mk_config_broker(Vsn) when Vsn == routing_schema_v2; Vsn == v2 -> #{routing => #{storage_schema => v2}}. mk_config_listeners(N) -> @@ -82,6 +96,8 @@ mk_config_listeners(N) -> wss => #{default => #{enable => false}} }. +%% + t_cluster_routing(Config) -> Cluster = ?config(cluster, Config), Clients = [C1, C2, C3] = [start_client(N) || N <- Cluster], @@ -163,6 +179,80 @@ unsubscribe(C, Topic) -> {ok, _Props, undefined} = emqtt:unsubscribe(C, Topic), ok = timer:sleep(200). +%% + +t_routing_schema_switch_v1(Config) -> + t_routing_schema_switch(_From = v2, _To = v1, Config). + +t_routing_schema_switch_v2(Config) -> + t_routing_schema_switch(_From = v1, _To = v2, Config). + +t_routing_schema_switch(VFrom, VTo, Config) -> + % Start first node with routing schema VTo (e.g. v1) + WorkDir = ?config(work_dir, Config), + [Node1] = emqx_cth_cluster:start( + [ + {routing_schema_switch1, #{ + apps => [mk_genrpc_appspec(), mk_emqx_appspec(VTo, 1)] + }} + ], + #{work_dir => WorkDir} + ), + % Ensure there's at least 1 route on Node1 + C1 = start_client(Node1), + ok = subscribe(C1, <<"a/+/c">>), + ok = subscribe(C1, <<"d/e/f/#">>), + % Start rest of nodes with routing schema VFrom (e.g. v2) + [Node2, Node3] = emqx_cth_cluster:start( + [ + {routing_schema_switch2, #{ + apps => [mk_genrpc_appspec(), mk_emqx_appspec(VFrom, 2)], + base_port => 20000, + join_to => Node1 + }}, + {routing_schema_switch3, #{ + apps => [mk_genrpc_appspec(), mk_emqx_appspec(VFrom, 3)], + base_port => 20100, + join_to => Node1 + }} + ], + #{work_dir => WorkDir} + ), + % Verify that new nodes switched to schema v1/v2 in presence of v1/v2 routes respectively + Nodes = [Node1, Node2, Node3], + ?assertEqual( + [{ok, VTo}, {ok, VTo}, {ok, VTo}], + erpc:multicall(Nodes, emqx_router, get_schema_vsn, []) + ), + % Wait for all nodes to agree on cluster state + ?retry( + 500, + 10, + ?assertMatch( + [{ok, [Node1, Node2, Node3]}], + lists:usort(erpc:multicall(Nodes, emqx, running_nodes, [])) + ) + ), + % Verify that routing works as expected + C2 = start_client(Node2), + ok = subscribe(C2, <<"a/+/d">>), + C3 = start_client(Node3), + ok = subscribe(C3, <<"d/e/f/#">>), + {ok, _} = publish(C1, <<"a/b/d">>, <<"hey-newbies">>), + {ok, _} = publish(C2, <<"a/b/c">>, <<"hi">>), + {ok, _} = publish(C3, <<"d/e/f/42">>, <<"hello">>), + ?assertReceive({pub, C2, #{topic := <<"a/b/d">>, payload := <<"hey-newbies">>}}), + ?assertReceive({pub, C1, #{topic := <<"a/b/c">>, payload := <<"hi">>}}), + ?assertReceive({pub, C1, #{topic := <<"d/e/f/42">>, payload := <<"hello">>}}), + ?assertReceive({pub, C3, #{topic := <<"d/e/f/42">>, payload := <<"hello">>}}), + ?assertNotReceive(_), + ok = emqtt:stop(C1), + ok = emqtt:stop(C2), + ok = emqtt:stop(C3), + ok = emqx_cth_cluster:stop(Nodes). + +%% + get_mqtt_tcp_port(Node) -> {_, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]), Port. From f0a0c7d4b1b654d5531b5b91236be29a56394b10 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 4 Sep 2023 14:38:52 +0400 Subject: [PATCH 23/26] fix(cthsuite): avoid duplicate application env entries Because `application:set_env/1` complains loudly if there are duplicates. --- apps/emqx/test/emqx_cth_suite.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index 307e8d98b..e5668b77a 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -261,7 +261,7 @@ merge_envs(false, E2) -> merge_envs(_E, false) -> []; merge_envs(E1, E2) -> - E1 ++ E2. + lists:foldl(fun({K, _} = Opt, EAcc) -> lists:keystore(K, 1, EAcc, Opt) end, E1, E2). merge_config(false, C2) -> C2; From 893f69617a25418f91a0f7e693edec461bdc2cd4 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 4 Sep 2023 14:40:20 +0400 Subject: [PATCH 24/26] chore(test): drop obsolete TODOs --- apps/emqx/test/emqx_router_SUITE.erl | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/apps/emqx/test/emqx_router_SUITE.erl b/apps/emqx/test/emqx_router_SUITE.erl index c19363f93..1128112ff 100644 --- a/apps/emqx/test/emqx_router_SUITE.erl +++ b/apps/emqx/test/emqx_router_SUITE.erl @@ -65,24 +65,9 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, _Config) -> clear_tables(). -% t_add_route(_) -> -% error('TODO'). - -% t_do_add_route(_) -> -% error('TODO'). - % t_lookup_routes(_) -> % error('TODO'). -% t_delete_route(_) -> -% error('TODO'). - -% t_do_delete_route(_) -> -% error('TODO'). - -% t_topics(_) -> -% error('TODO'). - t_verify_type(Config) -> case ?config(group_name, Config) of routing_schema_v1 -> From 7d26b7bc1a89e6bbd3cca526606e90ba67616763 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 4 Sep 2023 16:37:38 +0400 Subject: [PATCH 25/26] fix(router): emit clearer error when conflicting schemas in use --- apps/emqx/src/emqx_router.erl | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 2cd5ffb96..464852ceb 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -503,7 +503,19 @@ choose_schema_vsn(ConfType) -> {false, true} -> v1; {true, false} -> - v2 + v2; + {false, false} -> + ?SLOG(critical, #{ + msg => "conflicting_routing_schemas_detected_in_cluster", + configured => ConfType, + reason => + "There are records in the routing tables related to both v1 " + "and v2 storage schemas. This probably means that some nodes " + "in the cluster use v1 schema and some use v2, independently " + "of each other. The routing is likely broken. Manual intervention " + "and full cluster restart is required. This node will shut down." + }), + error(conflicting_routing_schemas_detected_in_cluster) end. is_empty(Tab) -> From 9f0f2183871366ca63ed98850428575a2987a424 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 4 Sep 2023 18:05:59 +0400 Subject: [PATCH 26/26] test(router): simplify cluster-related testcase in helper testsuite --- apps/emqx/test/emqx_cth_cluster.erl | 21 ++++++++---- apps/emqx/test/emqx_router_helper_SUITE.erl | 37 ++++----------------- 2 files changed, 22 insertions(+), 36 deletions(-) diff --git a/apps/emqx/test/emqx_cth_cluster.erl b/apps/emqx/test/emqx_cth_cluster.erl index 5e8bd4103..6d1fddb42 100644 --- a/apps/emqx/test/emqx_cth_cluster.erl +++ b/apps/emqx/test/emqx_cth_cluster.erl @@ -19,8 +19,12 @@ -export([start/2]). -export([stop/1]). +-export([start_bare_node/2]). + -export([share_load_module/2]). +-export([node_name/1]). + -define(APPS_CLUSTERING, [gen_rpc, mria, ekka]). -define(TIMEOUT_NODE_START_MS, 15000). @@ -256,9 +260,6 @@ allocate_listener_ports(Types, Spec) -> start_node_init(Spec = #{name := Node}) -> Node = start_bare_node(Node, Spec), - pong = net_adm:ping(Node), - % Preserve node spec right on the remote node - ok = set_node_opts(Node, Spec), % Make it possible to call `ct:pal` and friends (if running under rebar3) _ = share_load_module(Node, cthr), % Enable snabbkaffe trace forwarding @@ -363,7 +364,8 @@ listener_port(BasePort, wss) -> %% -start_bare_node(Name, #{driver := ct_slave}) -> +-spec start_bare_node(atom(), map()) -> node(). +start_bare_node(Name, Spec = #{driver := ct_slave}) -> {ok, Node} = ct_slave:start( node_name(Name), [ @@ -375,9 +377,15 @@ start_bare_node(Name, #{driver := ct_slave}) -> {env, []} ] ), - Node; -start_bare_node(Name, #{driver := slave}) -> + init_bare_node(Node, Spec); +start_bare_node(Name, Spec = #{driver := slave}) -> {ok, Node} = slave:start_link(host(), Name, ebin_path()), + init_bare_node(Node, Spec). + +init_bare_node(Node, Spec) -> + pong = net_adm:ping(Node), + % Preserve node spec right on the remote node + ok = set_node_opts(Node, Spec), Node. erl_flags() -> @@ -400,6 +408,7 @@ share_load_module(Node, Module) -> error end. +-spec node_name(atom()) -> node(). node_name(Name) -> case string:tokens(atom_to_list(Name), "@") of [_Name, _Host] -> diff --git a/apps/emqx/test/emqx_router_helper_SUITE.erl b/apps/emqx/test/emqx_router_helper_SUITE.erl index d65a6f666..889c8293c 100644 --- a/apps/emqx/test/emqx_router_helper_SUITE.erl +++ b/apps/emqx/test/emqx_router_helper_SUITE.erl @@ -59,33 +59,10 @@ mk_config(routing_schema_v2) -> override_env => [{boot_modules, [router]}] }. -init_per_testcase(TestCase, Config) when - TestCase =:= t_cleanup_monitor_node_down --> - ok = snabbkaffe:start_trace(), - GroupName = ?config(group_name, Config), - WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName, TestCase]), - [Slave] = emqx_cth_cluster:start( - [ - {?MODULE, #{ - apps => [{emqx, mk_config(GroupName)}], - join_to => node() - }} - ], - #{work_dir => WorkDir} - ), - [{slave, Slave} | Config]; init_per_testcase(_TestCase, Config) -> ok = snabbkaffe:start_trace(), Config. -end_per_testcase(TestCase, Config) when - TestCase =:= t_cleanup_monitor_node_down --> - Slave = ?config(slave, Config), - ok = emqx_cth_cluster:stop([Slave]), - ok = snabbkaffe:stop(), - ok; end_per_testcase(_TestCase, _Config) -> ok = snabbkaffe:stop(), ok. @@ -102,8 +79,8 @@ t_mnesia(_) -> ?ROUTER_HELPER ! {membership, {mnesia, down, node()}}, ct:sleep(200). -t_cleanup_membership_mnesia_down(Config) -> - Slave = ?config(slave, Config), +t_cleanup_membership_mnesia_down(_Config) -> + Slave = emqx_cth_cluster:node_name(?FUNCTION_NAME), emqx_router:add_route(<<"a/b/c">>, Slave), emqx_router:add_route(<<"d/e/f">>, node()), ?assertMatch([_, _], emqx_router:topics()), @@ -114,8 +91,8 @@ t_cleanup_membership_mnesia_down(Config) -> ), ?assertEqual([<<"d/e/f">>], emqx_router:topics()). -t_cleanup_membership_node_down(Config) -> - Slave = ?config(slave, Config), +t_cleanup_membership_node_down(_Config) -> + Slave = emqx_cth_cluster:node_name(?FUNCTION_NAME), emqx_router:add_route(<<"a/b/c">>, Slave), emqx_router:add_route(<<"d/e/f">>, node()), ?assertMatch([_, _], emqx_router:topics()), @@ -126,13 +103,13 @@ t_cleanup_membership_node_down(Config) -> ), ?assertEqual([<<"d/e/f">>], emqx_router:topics()). -t_cleanup_monitor_node_down(Config) -> - Slave = ?config(slave, Config), +t_cleanup_monitor_node_down(_Config) -> + Slave = emqx_cth_cluster:start_bare_node(?FUNCTION_NAME, #{driver => ct_slave}), emqx_router:add_route(<<"a/b/c">>, Slave), emqx_router:add_route(<<"d/e/f">>, node()), ?assertMatch([_, _], emqx_router:topics()), ?wait_async_action( - emqx_common_test_helpers:stop_slave(Slave), + emqx_cth_cluster:stop([Slave]), #{?snk_kind := emqx_router_helper_cleanup_done, node := Slave}, 1_000 ),