fix(router): generalize config option and make effects visible

This commit is contained in:
Andrew Mayorov 2023-08-25 20:01:48 +04:00
parent e85789306b
commit 8d2ebdea7e
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
7 changed files with 111 additions and 54 deletions

View File

@ -55,7 +55,9 @@ prep_stop(_State) ->
emqx_boot:is_enabled(listeners) andalso emqx_boot:is_enabled(listeners) andalso
emqx_listeners:stop(). emqx_listeners:stop().
stop(_State) -> ok. stop(_State) ->
ok = emqx_router:deinit_table_type(),
ok.
-define(CONFIG_LOADER, config_loader). -define(CONFIG_LOADER, config_loader).
-define(DEFAULT_LOADER, emqx). -define(DEFAULT_LOADER, emqx).

View File

@ -72,8 +72,11 @@
code_change/3 code_change/3
]). ]).
%% test / debugging purposes -export([
-export([is_unified_table_active/0]). get_table_type/0,
init_table_type/0,
deinit_table_type/0
]).
-type group() :: binary(). -type group() :: binary().
@ -155,12 +158,12 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
ok; ok;
false -> false ->
ok = emqx_router_helper:monitor(Dest), ok = emqx_router_helper:monitor(Dest),
mria_insert_route(is_unified_table_active(), Topic, Dest) mria_insert_route(get_table_type(), Topic, Dest)
end. end.
mria_insert_route(_Unified = true, Topic, Dest) -> mria_insert_route(unified, Topic, Dest) ->
mria_insert_route_unified(Topic, Dest); mria_insert_route_unified(Topic, Dest);
mria_insert_route(_Unified = false, Topic, Dest) -> mria_insert_route(regular, Topic, Dest) ->
Route = #route{topic = Topic, dest = Dest}, Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of case emqx_topic:wildcard(Topic) of
true -> true ->
@ -186,11 +189,11 @@ mria_insert_route(Route) ->
%% @doc Match routes %% @doc Match routes
-spec match_routes(emqx_types:topic()) -> [emqx_types:route()]. -spec match_routes(emqx_types:topic()) -> [emqx_types:route()].
match_routes(Topic) when is_binary(Topic) -> match_routes(Topic) when is_binary(Topic) ->
match_routes(is_unified_table_active(), Topic). match_routes(get_table_type(), Topic).
match_routes(_Unified = true, Topic) -> match_routes(unified, Topic) ->
[match_to_route(M) || M <- match_unified(Topic)]; [match_to_route(M) || M <- match_unified(Topic)];
match_routes(_Unified = false, Topic) -> match_routes(regular, Topic) ->
lookup_routes_regular(Topic) ++ lookup_routes_regular(Topic) ++
lists:flatmap(fun lookup_routes_regular/1, match_global_trie(Topic)). lists:flatmap(fun lookup_routes_regular/1, match_global_trie(Topic)).
@ -205,10 +208,10 @@ match_global_trie(Topic) ->
-spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()]. -spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()].
lookup_routes(Topic) -> lookup_routes(Topic) ->
case is_unified_table_active() of case get_table_type() of
true -> unified ->
lookup_routes_unified(Topic); lookup_routes_unified(Topic);
false -> regular ->
lookup_routes_regular(Topic) lookup_routes_regular(Topic)
end. end.
@ -224,10 +227,10 @@ match_to_route(M) ->
-spec has_route(emqx_types:topic(), dest()) -> boolean(). -spec has_route(emqx_types:topic(), dest()) -> boolean().
has_route(Topic, Dest) -> has_route(Topic, Dest) ->
case is_unified_table_active() of case get_table_type() of
true -> unified ->
has_route_unified(Topic, Dest); has_route_unified(Topic, Dest);
false -> regular ->
has_route_regular(Topic, Dest) has_route_regular(Topic, Dest)
end. end.
@ -251,11 +254,11 @@ do_delete_route(Topic) when is_binary(Topic) ->
-spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}. -spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
do_delete_route(Topic, Dest) -> do_delete_route(Topic, Dest) ->
mria_delete_route(is_unified_table_active(), Topic, Dest). mria_delete_route(get_table_type(), Topic, Dest).
mria_delete_route(_Unified = true, Topic, Dest) -> mria_delete_route(unified, Topic, Dest) ->
mria_delete_route_unified(Topic, Dest); mria_delete_route_unified(Topic, Dest);
mria_delete_route(_Unified = false, Topic, Dest) -> mria_delete_route(regular, Topic, Dest) ->
Route = #route{topic = Topic, dest = Dest}, Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of case emqx_topic:wildcard(Topic) of
true -> true ->
@ -278,27 +281,14 @@ mria_delete_route_update_trie(Route) ->
mria_delete_route(Route) -> mria_delete_route(Route) ->
mria:dirty_delete_object(?ROUTE_TAB, Route). mria:dirty_delete_object(?ROUTE_TAB, Route).
-spec is_unified_table_active() -> boolean().
is_unified_table_active() ->
is_empty(?ROUTE_TAB) andalso
((not is_empty(?ROUTE_TAB_UNIFIED)) orelse
emqx_config:get([broker, unified_routing_table])).
is_empty(Tab) ->
% NOTE
% Supposedly, should be better than `ets:info(Tab, size)` because the latter suffers
% from `{decentralized_counters, true}` which is default when `write_concurrency` is
% either `auto` or `true`.
ets:first(Tab) =:= '$end_of_table'.
-spec topics() -> list(emqx_types:topic()). -spec topics() -> list(emqx_types:topic()).
topics() -> topics() ->
topics(is_unified_table_active()). topics(get_table_type()).
topics(_Unified = true) -> topics(unified) ->
Pat = #routeidx{entry = '$1'}, Pat = #routeidx{entry = '$1'},
[emqx_topic_index:get_topic(K) || [K] <- ets:match(?ROUTE_TAB_UNIFIED, Pat)]; [emqx_topic_index:get_topic(K) || [K] <- ets:match(?ROUTE_TAB_UNIFIED, Pat)];
topics(_Unified = false) -> topics(regular) ->
mnesia:dirty_all_keys(?ROUTE_TAB). mnesia:dirty_all_keys(?ROUTE_TAB).
%% @doc Print routes to a topic %% @doc Print routes to a topic
@ -313,10 +303,10 @@ print_routes(Topic) ->
-spec cleanup_routes(node()) -> ok. -spec cleanup_routes(node()) -> ok.
cleanup_routes(Node) -> cleanup_routes(Node) ->
case is_unified_table_active() of case get_table_type() of
true -> unified ->
cleanup_routes_unified(Node); cleanup_routes_unified(Node);
false -> regular ->
cleanup_routes_regular(Node) cleanup_routes_regular(Node)
end. end.
@ -352,19 +342,19 @@ cleanup_routes_regular(Node) ->
-spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. -spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
foldl_routes(FoldFun, AccIn) -> foldl_routes(FoldFun, AccIn) ->
case is_unified_table_active() of case get_table_type() of
true -> unified ->
ets:foldl(mk_fold_fun_unified(FoldFun), AccIn, ?ROUTE_TAB_UNIFIED); ets:foldl(mk_fold_fun_unified(FoldFun), AccIn, ?ROUTE_TAB_UNIFIED);
false -> regular ->
ets:foldl(FoldFun, AccIn, ?ROUTE_TAB) ets:foldl(FoldFun, AccIn, ?ROUTE_TAB)
end. end.
-spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. -spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
foldr_routes(FoldFun, AccIn) -> foldr_routes(FoldFun, AccIn) ->
case is_unified_table_active() of case get_table_type() of
true -> unified ->
ets:foldr(mk_fold_fun_unified(FoldFun), AccIn, ?ROUTE_TAB_UNIFIED); ets:foldr(mk_fold_fun_unified(FoldFun), AccIn, ?ROUTE_TAB_UNIFIED);
false -> regular ->
ets:foldr(FoldFun, AccIn, ?ROUTE_TAB) ets:foldr(FoldFun, AccIn, ?ROUTE_TAB)
end. end.
@ -377,6 +367,61 @@ call(Router, Msg) ->
pick(Topic) -> pick(Topic) ->
gproc_pool:pick_worker(router_pool, Topic). gproc_pool:pick_worker(router_pool, Topic).
%%--------------------------------------------------------------------
%% Routing table type
%% --------------------------------------------------------------------
-define(PT_TABLE_TYPE, {?MODULE, tabtype}).
-type tabtype() :: regular | unified.
-spec get_table_type() -> tabtype().
get_table_type() ->
persistent_term:get(?PT_TABLE_TYPE).
-spec init_table_type() -> ok.
init_table_type() ->
ConfType = emqx_config:get([broker, routing_table_type]),
Type = choose_table_type(ConfType),
ok = persistent_term:put(?PT_TABLE_TYPE, Type),
case Type of
ConfType ->
?SLOG(info, #{
msg => "routing_table_type_used",
type => Type
});
_ ->
?SLOG(notice, #{
msg => "configured_routing_table_type_unacceptable",
type => Type,
configured => ConfType,
reason =>
"Could not use configured routing table type because "
"there's already non-empty routing table of another type."
})
end.
-spec deinit_table_type() -> ok.
deinit_table_type() ->
_ = persistent_term:erase(?PT_TABLE_TYPE),
ok.
-spec choose_table_type(tabtype()) -> tabtype().
choose_table_type(ConfType) ->
IsEmptyRegular = is_empty(?ROUTE_TAB),
IsEmptyUnified = is_empty(?ROUTE_TAB_UNIFIED),
case {IsEmptyRegular, IsEmptyUnified} of
{true, true} ->
ConfType;
{false, true} ->
regular;
{true, false} ->
unified
end.
is_empty(Tab) ->
ets:first(Tab) =:= '$end_of_table'.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -23,6 +23,8 @@
-export([init/1]). -export([init/1]).
start_link() -> start_link() ->
%% Init and log routing table type
ok = emqx_router:init_table_type(),
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> init([]) ->

View File

@ -1358,13 +1358,13 @@ fields("broker") ->
ref("broker_perf"), ref("broker_perf"),
#{importance => ?IMPORTANCE_HIDDEN} #{importance => ?IMPORTANCE_HIDDEN}
)}, )},
{"unified_routing_table", {"routing_table_type",
sc( sc(
boolean(), hoconsc:enum([regular, unified]),
#{ #{
default => false, default => regular,
importance => ?IMPORTANCE_HIDDEN, importance => ?IMPORTANCE_HIDDEN,
desc => ?DESC(broker_unified_routing_table) desc => ?DESC(broker_routing_table_type)
} }
)}, )},
%% FIXME: Need new design for shared subscription group %% FIXME: Need new design for shared subscription group

View File

@ -48,15 +48,15 @@ init_per_group(GroupName, Config) ->
}} }}
], ],
Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}), Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}),
[{group_apps, Apps} | Config]. [{group_apps, Apps}, {group_name, GroupName} | Config].
end_per_group(_GroupName, Config) -> end_per_group(_GroupName, Config) ->
ok = emqx_cth_suite:stop(?config(group_apps, Config)). ok = emqx_cth_suite:stop(?config(group_apps, Config)).
mk_config(routing_table_regular) -> mk_config(routing_table_regular) ->
"broker.unified_routing_table = false"; "broker.routing_table_type = regular";
mk_config(routing_table_unified) -> mk_config(routing_table_unified) ->
"broker.unified_routing_table = true". "broker.routing_table_type = unified".
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
clear_tables(), clear_tables(),
@ -83,6 +83,14 @@ end_per_testcase(_TestCase, _Config) ->
% t_topics(_) -> % t_topics(_) ->
% error('TODO'). % error('TODO').
t_verify_type(Config) ->
case ?config(group_name, Config) of
routing_table_regular ->
?assertEqual(regular, ?R:get_table_type());
routing_table_unified ->
?assertEqual(unified, ?R:get_table_type())
end.
t_add_delete(_) -> t_add_delete(_) ->
?R:add_route(<<"a/b/c">>), ?R:add_route(<<"a/b/c">>),
?R:add_route(<<"a/b/c">>, node()), ?R:add_route(<<"a/b/c">>, node()),

View File

@ -50,12 +50,12 @@ end_per_group(_GroupName, Config) ->
mk_config(routing_table_regular) -> mk_config(routing_table_regular) ->
#{ #{
config => "broker.unified_routing_table = false", config => "broker.routing_table_type = regular",
override_env => [{boot_modules, [router]}] override_env => [{boot_modules, [router]}]
}; };
mk_config(routing_table_unified) -> mk_config(routing_table_unified) ->
#{ #{
config => "broker.unified_routing_table = true", config => "broker.routing_table_type = unified",
override_env => [{boot_modules, [router]}] override_env => [{boot_modules, [router]}]
}. }.

View File

@ -1549,8 +1549,8 @@ fields_ws_opts_max_frame_size.label:
sys_event_messages.desc: sys_event_messages.desc:
"""Client events messages.""" """Client events messages."""
broker_unified_routing_table.desc: broker_routing_table_type.desc:
"""Enable unified routing table. """Routing table type.
Unified routing table should increase both subscription and routing performance at the cost of slight increase in memory consumption per subscription. Unified routing table should increase both subscription and routing performance at the cost of slight increase in memory consumption per subscription.
NOTE: This is an experimental feature. NOTE: This is an experimental feature.
NOTE: Full non-rolling cluster restart is needed after enabling or disabling this option for it to take any effect.""" NOTE: Full non-rolling cluster restart is needed after enabling or disabling this option for it to take any effect."""