From 8d2ebdea7e1810109c76a2f7d89ddd3297c3298a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 25 Aug 2023 20:01:48 +0400 Subject: [PATCH] fix(router): generalize config option and make effects visible --- apps/emqx/src/emqx_app.erl | 4 +- apps/emqx/src/emqx_router.erl | 129 +++++++++++++------- apps/emqx/src/emqx_router_sup.erl | 2 + apps/emqx/src/emqx_schema.erl | 8 +- apps/emqx/test/emqx_router_SUITE.erl | 14 ++- apps/emqx/test/emqx_router_helper_SUITE.erl | 4 +- rel/i18n/emqx_schema.hocon | 4 +- 7 files changed, 111 insertions(+), 54 deletions(-) diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index 59a397836..4fae556ca 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -55,7 +55,9 @@ prep_stop(_State) -> emqx_boot:is_enabled(listeners) andalso emqx_listeners:stop(). -stop(_State) -> ok. +stop(_State) -> + ok = emqx_router:deinit_table_type(), + ok. -define(CONFIG_LOADER, config_loader). -define(DEFAULT_LOADER, emqx). diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 6859b98f3..7a8d88a55 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -72,8 +72,11 @@ code_change/3 ]). -%% test / debugging purposes --export([is_unified_table_active/0]). +-export([ + get_table_type/0, + init_table_type/0, + deinit_table_type/0 +]). -type group() :: binary(). @@ -155,12 +158,12 @@ do_add_route(Topic, Dest) when is_binary(Topic) -> ok; false -> ok = emqx_router_helper:monitor(Dest), - mria_insert_route(is_unified_table_active(), Topic, Dest) + mria_insert_route(get_table_type(), Topic, Dest) end. -mria_insert_route(_Unified = true, Topic, Dest) -> +mria_insert_route(unified, Topic, Dest) -> mria_insert_route_unified(Topic, Dest); -mria_insert_route(_Unified = false, Topic, Dest) -> +mria_insert_route(regular, Topic, Dest) -> Route = #route{topic = Topic, dest = Dest}, case emqx_topic:wildcard(Topic) of true -> @@ -186,11 +189,11 @@ mria_insert_route(Route) -> %% @doc Match routes -spec match_routes(emqx_types:topic()) -> [emqx_types:route()]. match_routes(Topic) when is_binary(Topic) -> - match_routes(is_unified_table_active(), Topic). + match_routes(get_table_type(), Topic). -match_routes(_Unified = true, Topic) -> +match_routes(unified, Topic) -> [match_to_route(M) || M <- match_unified(Topic)]; -match_routes(_Unified = false, Topic) -> +match_routes(regular, Topic) -> lookup_routes_regular(Topic) ++ lists:flatmap(fun lookup_routes_regular/1, match_global_trie(Topic)). @@ -205,10 +208,10 @@ match_global_trie(Topic) -> -spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()]. lookup_routes(Topic) -> - case is_unified_table_active() of - true -> + case get_table_type() of + unified -> lookup_routes_unified(Topic); - false -> + regular -> lookup_routes_regular(Topic) end. @@ -224,10 +227,10 @@ match_to_route(M) -> -spec has_route(emqx_types:topic(), dest()) -> boolean(). has_route(Topic, Dest) -> - case is_unified_table_active() of - true -> + case get_table_type() of + unified -> has_route_unified(Topic, Dest); - false -> + regular -> has_route_regular(Topic, Dest) end. @@ -251,11 +254,11 @@ do_delete_route(Topic) when is_binary(Topic) -> -spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}. do_delete_route(Topic, Dest) -> - mria_delete_route(is_unified_table_active(), Topic, Dest). + mria_delete_route(get_table_type(), Topic, Dest). -mria_delete_route(_Unified = true, Topic, Dest) -> +mria_delete_route(unified, Topic, Dest) -> mria_delete_route_unified(Topic, Dest); -mria_delete_route(_Unified = false, Topic, Dest) -> +mria_delete_route(regular, Topic, Dest) -> Route = #route{topic = Topic, dest = Dest}, case emqx_topic:wildcard(Topic) of true -> @@ -278,27 +281,14 @@ mria_delete_route_update_trie(Route) -> mria_delete_route(Route) -> mria:dirty_delete_object(?ROUTE_TAB, Route). --spec is_unified_table_active() -> boolean(). -is_unified_table_active() -> - is_empty(?ROUTE_TAB) andalso - ((not is_empty(?ROUTE_TAB_UNIFIED)) orelse - emqx_config:get([broker, unified_routing_table])). - -is_empty(Tab) -> - % NOTE - % Supposedly, should be better than `ets:info(Tab, size)` because the latter suffers - % from `{decentralized_counters, true}` which is default when `write_concurrency` is - % either `auto` or `true`. - ets:first(Tab) =:= '$end_of_table'. - -spec topics() -> list(emqx_types:topic()). topics() -> - topics(is_unified_table_active()). + topics(get_table_type()). -topics(_Unified = true) -> +topics(unified) -> Pat = #routeidx{entry = '$1'}, [emqx_topic_index:get_topic(K) || [K] <- ets:match(?ROUTE_TAB_UNIFIED, Pat)]; -topics(_Unified = false) -> +topics(regular) -> mnesia:dirty_all_keys(?ROUTE_TAB). %% @doc Print routes to a topic @@ -313,10 +303,10 @@ print_routes(Topic) -> -spec cleanup_routes(node()) -> ok. cleanup_routes(Node) -> - case is_unified_table_active() of - true -> + case get_table_type() of + unified -> cleanup_routes_unified(Node); - false -> + regular -> cleanup_routes_regular(Node) end. @@ -352,19 +342,19 @@ cleanup_routes_regular(Node) -> -spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. foldl_routes(FoldFun, AccIn) -> - case is_unified_table_active() of - true -> + case get_table_type() of + unified -> ets:foldl(mk_fold_fun_unified(FoldFun), AccIn, ?ROUTE_TAB_UNIFIED); - false -> + regular -> ets:foldl(FoldFun, AccIn, ?ROUTE_TAB) end. -spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc. foldr_routes(FoldFun, AccIn) -> - case is_unified_table_active() of - true -> + case get_table_type() of + unified -> ets:foldr(mk_fold_fun_unified(FoldFun), AccIn, ?ROUTE_TAB_UNIFIED); - false -> + regular -> ets:foldr(FoldFun, AccIn, ?ROUTE_TAB) end. @@ -377,6 +367,61 @@ call(Router, Msg) -> pick(Topic) -> gproc_pool:pick_worker(router_pool, Topic). +%%-------------------------------------------------------------------- +%% Routing table type +%% -------------------------------------------------------------------- + +-define(PT_TABLE_TYPE, {?MODULE, tabtype}). + +-type tabtype() :: regular | unified. + +-spec get_table_type() -> tabtype(). +get_table_type() -> + persistent_term:get(?PT_TABLE_TYPE). + +-spec init_table_type() -> ok. +init_table_type() -> + ConfType = emqx_config:get([broker, routing_table_type]), + Type = choose_table_type(ConfType), + ok = persistent_term:put(?PT_TABLE_TYPE, Type), + case Type of + ConfType -> + ?SLOG(info, #{ + msg => "routing_table_type_used", + type => Type + }); + _ -> + ?SLOG(notice, #{ + msg => "configured_routing_table_type_unacceptable", + type => Type, + configured => ConfType, + reason => + "Could not use configured routing table type because " + "there's already non-empty routing table of another type." + }) + end. + +-spec deinit_table_type() -> ok. +deinit_table_type() -> + _ = persistent_term:erase(?PT_TABLE_TYPE), + ok. + +-spec choose_table_type(tabtype()) -> tabtype(). +choose_table_type(ConfType) -> + IsEmptyRegular = is_empty(?ROUTE_TAB), + IsEmptyUnified = is_empty(?ROUTE_TAB_UNIFIED), + case {IsEmptyRegular, IsEmptyUnified} of + {true, true} -> + ConfType; + {false, true} -> + regular; + {true, false} -> + unified + end. + +is_empty(Tab) -> + ets:first(Tab) =:= '$end_of_table'. + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_router_sup.erl b/apps/emqx/src/emqx_router_sup.erl index 0fa48d9d2..398edc321 100644 --- a/apps/emqx/src/emqx_router_sup.erl +++ b/apps/emqx/src/emqx_router_sup.erl @@ -23,6 +23,8 @@ -export([init/1]). start_link() -> + %% Init and log routing table type + ok = emqx_router:init_table_type(), supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 7268a7e59..818688d86 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1358,13 +1358,13 @@ fields("broker") -> ref("broker_perf"), #{importance => ?IMPORTANCE_HIDDEN} )}, - {"unified_routing_table", + {"routing_table_type", sc( - boolean(), + hoconsc:enum([regular, unified]), #{ - default => false, + default => regular, importance => ?IMPORTANCE_HIDDEN, - desc => ?DESC(broker_unified_routing_table) + desc => ?DESC(broker_routing_table_type) } )}, %% FIXME: Need new design for shared subscription group diff --git a/apps/emqx/test/emqx_router_SUITE.erl b/apps/emqx/test/emqx_router_SUITE.erl index 7dec039c8..f9e1c6998 100644 --- a/apps/emqx/test/emqx_router_SUITE.erl +++ b/apps/emqx/test/emqx_router_SUITE.erl @@ -48,15 +48,15 @@ init_per_group(GroupName, Config) -> }} ], Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}), - [{group_apps, Apps} | Config]. + [{group_apps, Apps}, {group_name, GroupName} | Config]. end_per_group(_GroupName, Config) -> ok = emqx_cth_suite:stop(?config(group_apps, Config)). mk_config(routing_table_regular) -> - "broker.unified_routing_table = false"; + "broker.routing_table_type = regular"; mk_config(routing_table_unified) -> - "broker.unified_routing_table = true". + "broker.routing_table_type = unified". init_per_testcase(_TestCase, Config) -> clear_tables(), @@ -83,6 +83,14 @@ end_per_testcase(_TestCase, _Config) -> % t_topics(_) -> % error('TODO'). +t_verify_type(Config) -> + case ?config(group_name, Config) of + routing_table_regular -> + ?assertEqual(regular, ?R:get_table_type()); + routing_table_unified -> + ?assertEqual(unified, ?R:get_table_type()) + end. + t_add_delete(_) -> ?R:add_route(<<"a/b/c">>), ?R:add_route(<<"a/b/c">>, node()), diff --git a/apps/emqx/test/emqx_router_helper_SUITE.erl b/apps/emqx/test/emqx_router_helper_SUITE.erl index 12a3a34dd..26b9d8ddd 100644 --- a/apps/emqx/test/emqx_router_helper_SUITE.erl +++ b/apps/emqx/test/emqx_router_helper_SUITE.erl @@ -50,12 +50,12 @@ end_per_group(_GroupName, Config) -> mk_config(routing_table_regular) -> #{ - config => "broker.unified_routing_table = false", + config => "broker.routing_table_type = regular", override_env => [{boot_modules, [router]}] }; mk_config(routing_table_unified) -> #{ - config => "broker.unified_routing_table = true", + config => "broker.routing_table_type = unified", override_env => [{boot_modules, [router]}] }. diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index c63b248f4..d90fab47b 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1549,8 +1549,8 @@ fields_ws_opts_max_frame_size.label: sys_event_messages.desc: """Client events messages.""" -broker_unified_routing_table.desc: -"""Enable unified routing table. +broker_routing_table_type.desc: +"""Routing table type. Unified routing table should increase both subscription and routing performance at the cost of slight increase in memory consumption per subscription. NOTE: This is an experimental feature. NOTE: Full non-rolling cluster restart is needed after enabling or disabling this option for it to take any effect."""