diff --git a/apps/emqx/include/emqx_router.hrl b/apps/emqx/include/emqx_router.hrl
index 99ca3e185..35a267aa7 100644
--- a/apps/emqx/include/emqx_router.hrl
+++ b/apps/emqx/include/emqx_router.hrl
@@ -19,7 +19,7 @@
%% ETS tables for message routing
-define(ROUTE_TAB, emqx_route).
--define(ROUTE_TAB_UNIFIED, emqx_route_unified).
+-define(ROUTE_TAB_FILTERS, emqx_route_filters).
%% Mnesia table for message routing
-define(ROUTING_NODE, emqx_routing_node).
diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl
index 4fae556ca..0f4987085 100644
--- a/apps/emqx/src/emqx_app.erl
+++ b/apps/emqx/src/emqx_app.erl
@@ -56,7 +56,7 @@ prep_stop(_State) ->
emqx_listeners:stop().
stop(_State) ->
- ok = emqx_router:deinit_table_type(),
+ ok = emqx_router:deinit_schema(),
ok.
-define(CONFIG_LOADER, config_loader).
diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl
index 7a8d88a55..9f6913dc4 100644
--- a/apps/emqx/src/emqx_router.erl
+++ b/apps/emqx/src/emqx_router.erl
@@ -73,9 +73,9 @@
]).
-export([
- get_table_type/0,
- init_table_type/0,
- deinit_table_type/0
+ get_schema_vsn/0,
+ init_schema/0,
+ deinit_schema/0
]).
-type group() :: binary().
@@ -87,8 +87,6 @@
unused = [] :: nil()
}).
--dialyzer({nowarn_function, [cleanup_routes_regular/1]}).
-
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
@@ -108,7 +106,7 @@ mnesia(boot) ->
]}
]}
]),
- ok = mria:create_table(?ROUTE_TAB_UNIFIED, [
+ ok = mria:create_table(?ROUTE_TAB_FILTERS, [
{type, ordered_set},
{rlog_shard, ?ROUTE_SHARD},
{storage, ram_copies},
@@ -158,87 +156,41 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
ok;
false ->
ok = emqx_router_helper:monitor(Dest),
- mria_insert_route(get_table_type(), Topic, Dest)
+ mria_insert_route(get_schema_vsn(), Topic, Dest)
end.
-mria_insert_route(unified, Topic, Dest) ->
- mria_insert_route_unified(Topic, Dest);
-mria_insert_route(regular, Topic, Dest) ->
- Route = #route{topic = Topic, dest = Dest},
- case emqx_topic:wildcard(Topic) of
- true ->
- mria_insert_route_update_trie(Route);
- false ->
- mria_insert_route(Route)
- end.
-
-mria_insert_route_unified(Topic, Dest) ->
- K = emqx_topic_index:make_key(Topic, Dest),
- mria:dirty_write(?ROUTE_TAB_UNIFIED, #routeidx{entry = K}).
-
-mria_insert_route_update_trie(Route) ->
- emqx_router_utils:maybe_trans(
- fun emqx_router_utils:insert_trie_route/2,
- [?ROUTE_TAB, Route],
- ?ROUTE_SHARD
- ).
-
-mria_insert_route(Route) ->
- mria:dirty_write(?ROUTE_TAB, Route).
+mria_insert_route(v2, Topic, Dest) ->
+ mria_insert_route_v2(Topic, Dest);
+mria_insert_route(v1, Topic, Dest) ->
+ mria_insert_route_v1(Topic, Dest).
%% @doc Match routes
-spec match_routes(emqx_types:topic()) -> [emqx_types:route()].
match_routes(Topic) when is_binary(Topic) ->
- match_routes(get_table_type(), Topic).
+ match_routes(get_schema_vsn(), Topic).
-match_routes(unified, Topic) ->
- [match_to_route(M) || M <- match_unified(Topic)];
-match_routes(regular, Topic) ->
- lookup_routes_regular(Topic) ++
- lists:flatmap(fun lookup_routes_regular/1, match_global_trie(Topic)).
-
-match_unified(Topic) ->
- emqx_topic_index:matches(Topic, ?ROUTE_TAB_UNIFIED, []).
-
-match_global_trie(Topic) ->
- case emqx_trie:empty() of
- true -> [];
- false -> emqx_trie:match(Topic)
- end.
+match_routes(v2, Topic) ->
+ match_routes_v2(Topic);
+match_routes(v1, Topic) ->
+ match_routes_v1(Topic).
-spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()].
lookup_routes(Topic) ->
- case get_table_type() of
- unified ->
- lookup_routes_unified(Topic);
- regular ->
- lookup_routes_regular(Topic)
- end.
+ lookup_routes(get_schema_vsn(), Topic).
-lookup_routes_unified(Topic) ->
- Pat = #routeidx{entry = emqx_topic_index:make_key(Topic, '$1')},
- [Dest || [Dest] <- ets:match(?ROUTE_TAB_UNIFIED, Pat)].
-
-lookup_routes_regular(Topic) ->
- ets:lookup(?ROUTE_TAB, Topic).
-
-match_to_route(M) ->
- #route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
+lookup_routes(v2, Topic) ->
+ lookup_routes_v2(Topic);
+lookup_routes(v1, Topic) ->
+ lookup_routes_v1(Topic).
-spec has_route(emqx_types:topic(), dest()) -> boolean().
has_route(Topic, Dest) ->
- case get_table_type() of
- unified ->
- has_route_unified(Topic, Dest);
- regular ->
- has_route_regular(Topic, Dest)
- end.
+ has_route(get_schema_vsn(), Topic, Dest).
-has_route_unified(Topic, Dest) ->
- ets:member(?ROUTE_TAB_UNIFIED, emqx_topic_index:make_key(Topic, Dest)).
-
-has_route_regular(Topic, Dest) ->
- lists:any(fun(Route) -> Route#route.dest =:= Dest end, ets:lookup(?ROUTE_TAB, Topic)).
+has_route(v2, Topic, Dest) ->
+ has_route_v2(Topic, Dest);
+has_route(v1, Topic, Dest) ->
+ has_route_v1(Topic, Dest).
-spec delete_route(emqx_types:topic()) -> ok | {error, term()}.
delete_route(Topic) when is_binary(Topic) ->
@@ -254,42 +206,21 @@ do_delete_route(Topic) when is_binary(Topic) ->
-spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
do_delete_route(Topic, Dest) ->
- mria_delete_route(get_table_type(), Topic, Dest).
+ mria_delete_route(get_schema_vsn(), Topic, Dest).
-mria_delete_route(unified, Topic, Dest) ->
- mria_delete_route_unified(Topic, Dest);
-mria_delete_route(regular, Topic, Dest) ->
- Route = #route{topic = Topic, dest = Dest},
- case emqx_topic:wildcard(Topic) of
- true ->
- mria_delete_route_update_trie(Route);
- false ->
- mria_delete_route(Route)
- end.
-
-mria_delete_route_unified(Topic, Dest) ->
- K = emqx_topic_index:make_key(Topic, Dest),
- mria:dirty_delete(?ROUTE_TAB_UNIFIED, K).
-
-mria_delete_route_update_trie(Route) ->
- emqx_router_utils:maybe_trans(
- fun emqx_router_utils:delete_trie_route/2,
- [?ROUTE_TAB, Route],
- ?ROUTE_SHARD
- ).
-
-mria_delete_route(Route) ->
- mria:dirty_delete_object(?ROUTE_TAB, Route).
+mria_delete_route(v2, Topic, Dest) ->
+ mria_delete_route_v2(Topic, Dest);
+mria_delete_route(v1, Topic, Dest) ->
+ mria_delete_route_v1(Topic, Dest).
-spec topics() -> list(emqx_types:topic()).
topics() ->
- topics(get_table_type()).
+ topics(get_schema_vsn()).
-topics(unified) ->
- Pat = #routeidx{entry = '$1'},
- [emqx_topic_index:get_topic(K) || [K] <- ets:match(?ROUTE_TAB_UNIFIED, Pat)];
-topics(regular) ->
- mnesia:dirty_all_keys(?ROUTE_TAB).
+topics(v2) ->
+ list_topics_v2();
+topics(v1) ->
+ list_topics_v1().
%% @doc Print routes to a topic
-spec print_routes(emqx_types:topic()) -> ok.
@@ -303,31 +234,99 @@ print_routes(Topic) ->
-spec cleanup_routes(node()) -> ok.
cleanup_routes(Node) ->
- case get_table_type() of
- unified ->
- cleanup_routes_unified(Node);
- regular ->
- cleanup_routes_regular(Node)
+ cleanup_routes(get_schema_vsn(), Node).
+
+cleanup_routes(v2, Node) ->
+ cleanup_routes_v2(Node);
+cleanup_routes(v1, Node) ->
+ cleanup_routes_v1(Node).
+
+-spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
+foldl_routes(FoldFun, AccIn) ->
+ fold_routes(get_schema_vsn(), foldl, FoldFun, AccIn).
+
+-spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
+foldr_routes(FoldFun, AccIn) ->
+ fold_routes(get_schema_vsn(), foldr, FoldFun, AccIn).
+
+fold_routes(v2, FunName, FoldFun, AccIn) ->
+ fold_routes_v2(FunName, FoldFun, AccIn);
+fold_routes(v1, FunName, FoldFun, AccIn) ->
+ fold_routes_v1(FunName, FoldFun, AccIn).
+
+call(Router, Msg) ->
+ gen_server:call(Router, Msg, infinity).
+
+pick(Topic) ->
+ gproc_pool:pick_worker(router_pool, Topic).
+
+%%--------------------------------------------------------------------
+%% Schema v1
+%% --------------------------------------------------------------------
+
+-dialyzer({nowarn_function, [cleanup_routes_v1/1]}).
+
+mria_insert_route_v1(Topic, Dest) ->
+ Route = #route{topic = Topic, dest = Dest},
+ case emqx_topic:wildcard(Topic) of
+ true ->
+ mria_route_tab_insert_update_trie(Route);
+ false ->
+ mria_route_tab_insert(Route)
end.
-cleanup_routes_unified(Node) ->
- % NOTE
- % No point in transaction here because all the operations on unified routing table
- % are dirty.
- ets:foldl(
- fun(#routeidx{entry = K}, ok) ->
- case emqx_topic_index:get_id(K) of
- Node ->
- mria:dirty_delete(?ROUTE_TAB_UNIFIED, K);
- _ ->
- ok
- end
- end,
- ok,
- ?ROUTE_TAB_UNIFIED
+mria_route_tab_insert_update_trie(Route) ->
+ emqx_router_utils:maybe_trans(
+ fun emqx_router_utils:insert_trie_route/2,
+ [?ROUTE_TAB, Route],
+ ?ROUTE_SHARD
).
-cleanup_routes_regular(Node) ->
+mria_route_tab_insert(Route) ->
+ mria:dirty_write(?ROUTE_TAB, Route).
+
+mria_delete_route_v1(Topic, Dest) ->
+ Route = #route{topic = Topic, dest = Dest},
+ case emqx_topic:wildcard(Topic) of
+ true ->
+ mria_route_tab_delete_update_trie(Route);
+ false ->
+ mria_route_tab_delete(Route)
+ end.
+
+mria_route_tab_delete_update_trie(Route) ->
+ emqx_router_utils:maybe_trans(
+ fun emqx_router_utils:delete_trie_route/2,
+ [?ROUTE_TAB, Route],
+ ?ROUTE_SHARD
+ ).
+
+mria_route_tab_delete(Route) ->
+ mria:dirty_delete_object(?ROUTE_TAB, Route).
+
+match_routes_v1(Topic) ->
+ lookup_route_tab(Topic) ++
+ lists:flatmap(fun lookup_route_tab/1, match_global_trie(Topic)).
+
+match_global_trie(Topic) ->
+ case emqx_trie:empty() of
+ true -> [];
+ false -> emqx_trie:match(Topic)
+ end.
+
+lookup_routes_v1(Topic) ->
+ lookup_route_tab(Topic).
+
+lookup_route_tab(Topic) ->
+ ets:lookup(?ROUTE_TAB, Topic).
+
+has_route_v1(Topic, Dest) ->
+ has_route_tab_entry(Topic, Dest).
+
+has_route_tab_entry(Topic, Dest) ->
+ [] =/= ets:match(?ROUTE_TAB, #route{topic = Topic, dest = Dest}).
+
+cleanup_routes_v1(Node) ->
Patterns = [
#route{_ = '_', dest = Node},
#route{_ = '_', dest = {'_', Node}}
@@ -340,83 +339,165 @@ cleanup_routes_regular(Node) ->
]
end).
--spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
-foldl_routes(FoldFun, AccIn) ->
- case get_table_type() of
- unified ->
- ets:foldl(mk_fold_fun_unified(FoldFun), AccIn, ?ROUTE_TAB_UNIFIED);
- regular ->
- ets:foldl(FoldFun, AccIn, ?ROUTE_TAB)
+list_topics_v1() ->
+ list_route_tab_topics().
+
+list_route_tab_topics() ->
+ mnesia:dirty_all_keys(?ROUTE_TAB).
+
+fold_routes_v1(FunName, FoldFun, AccIn) ->
+ ets:FunName(FoldFun, AccIn, ?ROUTE_TAB).
+
+%%--------------------------------------------------------------------
+%% Schema v2
+%% One bag table exclusively for regular, non-filter subscription
+%% topics, and one `emqx_topic_index` table exclusively for wildcard
+%% topics. Writes go to only one of the two tables at a time.
+%% --------------------------------------------------------------------
+
+mria_insert_route_v2(Topic, Dest) ->
+ case emqx_trie_search:filter(Topic) of
+ Words when is_list(Words) ->
+ K = emqx_topic_index:make_key(Words, Dest),
+ mria:dirty_write(?ROUTE_TAB_FILTERS, #routeidx{entry = K});
+ false ->
+ mria_route_tab_insert(#route{topic = Topic, dest = Dest})
end.
--spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
-foldr_routes(FoldFun, AccIn) ->
- case get_table_type() of
- unified ->
- ets:foldr(mk_fold_fun_unified(FoldFun), AccIn, ?ROUTE_TAB_UNIFIED);
- regular ->
- ets:foldr(FoldFun, AccIn, ?ROUTE_TAB)
+mria_delete_route_v2(Topic, Dest) ->
+ case emqx_trie_search:filter(Topic) of
+ Words when is_list(Words) ->
+ K = emqx_topic_index:make_key(Words, Dest),
+ mria:dirty_delete(?ROUTE_TAB_FILTERS, K);
+ false ->
+ mria_route_tab_delete(#route{topic = Topic, dest = Dest})
end.
-mk_fold_fun_unified(FoldFun) ->
+match_routes_v2(Topic) ->
+ lookup_route_tab(Topic) ++
+ [match_to_route(M) || M <- match_filters(Topic)].
+
+match_filters(Topic) ->
+ emqx_topic_index:matches(Topic, ?ROUTE_TAB_FILTERS, []).
+
+lookup_routes_v2(Topic) ->
+ case emqx_topic:wildcard(Topic) of
+ true ->
+ Pat = #routeidx{entry = emqx_topic_index:make_key(Topic, '$1')},
+ [Dest || [Dest] <- ets:match(?ROUTE_TAB_FILTERS, Pat)];
+ false ->
+ lookup_route_tab(Topic)
+ end.
+
+has_route_v2(Topic, Dest) ->
+ case emqx_topic:wildcard(Topic) of
+ true ->
+ ets:member(?ROUTE_TAB_FILTERS, emqx_topic_index:make_key(Topic, Dest));
+ false ->
+ has_route_tab_entry(Topic, Dest)
+ end.
+
+cleanup_routes_v2(Node) ->
+ % NOTE
+ % No point in transaction here because all the operations on unified routing table
+ % are dirty.
+ ok = ets:foldl(
+ fun(#routeidx{entry = K}, ok) ->
+ case get_dest_node(emqx_topic_index:get_id(K)) of
+ Node ->
+ mria:dirty_delete(?ROUTE_TAB_FILTERS, K);
+ _ ->
+ ok
+ end
+ end,
+ ok,
+ ?ROUTE_TAB_FILTERS
+ ),
+ ok = ets:foldl(
+ fun(#route{dest = Dest} = Route, ok) ->
+ case get_dest_node(Dest) of
+ Node ->
+ mria:dirty_delete_object(?ROUTE_TAB, Route);
+ _ ->
+ ok
+ end
+ end,
+ ok,
+ ?ROUTE_TAB
+ ).
+
+get_dest_node({_, Node}) ->
+ Node;
+get_dest_node(Node) ->
+ Node.
+
+list_topics_v2() ->
+ Pat = #routeidx{entry = '$1'},
+ Filters = [emqx_topic_index:get_topic(K) || [K] <- ets:match(?ROUTE_TAB_FILTERS, Pat)],
+ list_route_tab_topics() ++ Filters.
+
+fold_routes_v2(FunName, FoldFun, AccIn) ->
+ FilterFoldFun = mk_filtertab_fold_fun(FoldFun),
+ Acc = ets:FunName(FoldFun, AccIn, ?ROUTE_TAB),
+ ets:FunName(FilterFoldFun, Acc, ?ROUTE_TAB_FILTERS).
+
+mk_filtertab_fold_fun(FoldFun) ->
fun(#routeidx{entry = K}, Acc) -> FoldFun(match_to_route(K), Acc) end.
-call(Router, Msg) ->
- gen_server:call(Router, Msg, infinity).
-
-pick(Topic) ->
- gproc_pool:pick_worker(router_pool, Topic).
+match_to_route(M) ->
+ #route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
%%--------------------------------------------------------------------
%% Routing table type
%% --------------------------------------------------------------------
--define(PT_TABLE_TYPE, {?MODULE, tabtype}).
+-define(PT_SCHEMA_VSN, {?MODULE, schemavsn}).
--type tabtype() :: regular | unified.
+-type schemavsn() :: v1 | v2.
--spec get_table_type() -> tabtype().
-get_table_type() ->
- persistent_term:get(?PT_TABLE_TYPE).
+-spec get_schema_vsn() -> schemavsn().
+get_schema_vsn() ->
+ persistent_term:get(?PT_SCHEMA_VSN).
--spec init_table_type() -> ok.
-init_table_type() ->
- ConfType = emqx_config:get([broker, routing_table_type]),
- Type = choose_table_type(ConfType),
- ok = persistent_term:put(?PT_TABLE_TYPE, Type),
- case Type of
- ConfType ->
+-spec init_schema() -> ok.
+init_schema() ->
+ ConfSchema = emqx_config:get([broker, routing, storage_schema]),
+ Schema = choose_schema_vsn(ConfSchema),
+ ok = persistent_term:put(?PT_SCHEMA_VSN, Schema),
+ case Schema of
+ ConfSchema ->
?SLOG(info, #{
- msg => "routing_table_type_used",
- type => Type
+ msg => "routing_schema_used",
+ schema => Schema
});
_ ->
?SLOG(notice, #{
- msg => "configured_routing_table_type_unacceptable",
- type => Type,
- configured => ConfType,
+ msg => "configured_routing_schema_unacceptable",
+ schema => Schema,
+ configured => ConfSchema,
reason =>
- "Could not use configured routing table type because "
- "there's already non-empty routing table of another type."
+ "Could not use configured routing storage schema because "
+ "there are already non-empty routing tables pertaining to "
+ "another schema."
})
end.
--spec deinit_table_type() -> ok.
-deinit_table_type() ->
- _ = persistent_term:erase(?PT_TABLE_TYPE),
+-spec deinit_schema() -> ok.
+deinit_schema() ->
+ _ = persistent_term:erase(?PT_SCHEMA_VSN),
ok.
--spec choose_table_type(tabtype()) -> tabtype().
-choose_table_type(ConfType) ->
- IsEmptyRegular = is_empty(?ROUTE_TAB),
- IsEmptyUnified = is_empty(?ROUTE_TAB_UNIFIED),
- case {IsEmptyRegular, IsEmptyUnified} of
+-spec choose_schema_vsn(schemavsn()) -> schemavsn().
+choose_schema_vsn(ConfType) ->
+ IsEmptyIndex = emqx_trie:empty(),
+ IsEmptyFilters = is_empty(?ROUTE_TAB_FILTERS),
+ case {IsEmptyIndex, IsEmptyFilters} of
{true, true} ->
ConfType;
{false, true} ->
- regular;
+ v1;
{true, false} ->
- unified
+ v2
end.
is_empty(Tab) ->
diff --git a/apps/emqx/src/emqx_router_sup.erl b/apps/emqx/src/emqx_router_sup.erl
index 398edc321..588b0de8e 100644
--- a/apps/emqx/src/emqx_router_sup.erl
+++ b/apps/emqx/src/emqx_router_sup.erl
@@ -24,7 +24,7 @@
start_link() ->
%% Init and log routing table type
- ok = emqx_router:init_table_type(),
+ ok = emqx_router:init_schema(),
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl
index 818688d86..85bdd7b50 100644
--- a/apps/emqx/src/emqx_schema.erl
+++ b/apps/emqx/src/emqx_schema.erl
@@ -1358,14 +1358,10 @@ fields("broker") ->
ref("broker_perf"),
#{importance => ?IMPORTANCE_HIDDEN}
)},
- {"routing_table_type",
+ {"routing",
sc(
- hoconsc:enum([regular, unified]),
- #{
- default => regular,
- importance => ?IMPORTANCE_HIDDEN,
- desc => ?DESC(broker_routing_table_type)
- }
+ ref("broker_routing"),
+ #{importance => ?IMPORTANCE_HIDDEN}
)},
%% FIXME: Need new design for shared subscription group
{"shared_subscription_group",
@@ -1378,6 +1374,17 @@ fields("broker") ->
}
)}
];
+fields("broker_routing") ->
+ [
+ {"storage_schema",
+ sc(
+ hoconsc:enum([v1, v2]),
+ #{
+ default => v1,
+ desc => ?DESC(broker_routing_storage_schema)
+ }
+ )}
+ ];
fields("shared_subscription_group") ->
[
{"strategy",
diff --git a/apps/emqx/test/emqx_router_SUITE.erl b/apps/emqx/test/emqx_router_SUITE.erl
index f9e1c6998..adc898365 100644
--- a/apps/emqx/test/emqx_router_SUITE.erl
+++ b/apps/emqx/test/emqx_router_SUITE.erl
@@ -28,15 +28,15 @@
all() ->
[
- {group, routing_table_regular},
- {group, routing_table_unified}
+ {group, routing_schema_v1},
+ {group, routing_schema_v2}
].
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[
- {routing_table_regular, [], TCs},
- {routing_table_unified, [], TCs}
+ {routing_schema_v1, [], TCs},
+ {routing_schema_v2, [], TCs}
].
init_per_group(GroupName, Config) ->
@@ -53,10 +53,10 @@ init_per_group(GroupName, Config) ->
end_per_group(_GroupName, Config) ->
ok = emqx_cth_suite:stop(?config(group_apps, Config)).
-mk_config(routing_table_regular) ->
- "broker.routing_table_type = regular";
-mk_config(routing_table_unified) ->
- "broker.routing_table_type = unified".
+mk_config(routing_schema_v1) ->
+ "broker.routing.storage_schema = v1";
+mk_config(routing_schema_v2) ->
+ "broker.routing.storage_schema = v2".
init_per_testcase(_TestCase, Config) ->
clear_tables(),
@@ -85,10 +85,10 @@ end_per_testcase(_TestCase, _Config) ->
t_verify_type(Config) ->
case ?config(group_name, Config) of
- routing_table_regular ->
- ?assertEqual(regular, ?R:get_table_type());
- routing_table_unified ->
- ?assertEqual(unified, ?R:get_table_type())
+ routing_schema_v1 ->
+ ?assertEqual(v1, ?R:get_schema_vsn());
+ routing_schema_v2 ->
+ ?assertEqual(v2, ?R:get_schema_vsn())
end.
t_add_delete(_) ->
@@ -198,5 +198,5 @@ t_unexpected(_) ->
clear_tables() ->
lists:foreach(
fun mnesia:clear_table/1,
- [?ROUTE_TAB, ?ROUTE_TAB_UNIFIED, ?TRIE]
+ [?ROUTE_TAB, ?ROUTE_TAB_FILTERS, ?TRIE]
).
diff --git a/apps/emqx/test/emqx_router_helper_SUITE.erl b/apps/emqx/test/emqx_router_helper_SUITE.erl
index 26b9d8ddd..f4d3cc43a 100644
--- a/apps/emqx/test/emqx_router_helper_SUITE.erl
+++ b/apps/emqx/test/emqx_router_helper_SUITE.erl
@@ -28,15 +28,15 @@
all() ->
[
- {group, routing_table_regular},
- {group, routing_table_unified}
+ {group, routing_schema_v1},
+ {group, routing_schema_v2}
].
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[
- {routing_table_regular, [], TCs},
- {routing_table_unified, [], TCs}
+ {routing_schema_v1, [], TCs},
+ {routing_schema_v2, [], TCs}
].
init_per_group(GroupName, Config) ->
@@ -48,14 +48,14 @@ init_per_group(GroupName, Config) ->
end_per_group(_GroupName, Config) ->
ok = emqx_cth_suite:stop(?config(group_apps, Config)).
-mk_config(routing_table_regular) ->
+mk_config(routing_schema_v1) ->
#{
- config => "broker.routing_table_type = regular",
+ config => "broker.routing.storage_schema = v1",
override_env => [{boot_modules, [router]}]
};
-mk_config(routing_table_unified) ->
+mk_config(routing_schema_v2) ->
#{
- config => "broker.routing_table_type = unified",
+ config => "broker.routing.storage_schema = v2",
override_env => [{boot_modules, [router]}]
}.
diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon
index d90fab47b..4a8440e83 100644
--- a/rel/i18n/emqx_schema.hocon
+++ b/rel/i18n/emqx_schema.hocon
@@ -1549,11 +1549,12 @@ fields_ws_opts_max_frame_size.label:
sys_event_messages.desc:
"""Client events messages."""
-broker_routing_table_type.desc:
-"""Routing table type.
-Unified routing table should increase both subscription and routing performance at the cost of slight increase in memory consumption per subscription.
-NOTE: This is an experimental feature.
-NOTE: Full non-rolling cluster restart is needed after enabling or disabling this option for it to take any effect."""
+broker_routing_storage_schema.desc:
+"""Routing storage schema.
+Set v1
to leave the default.
+Set v2
to enable routing through 2 separate tables, one for topic filter and one for regular topic subscriptions. This schema should increase both subscription and routing performance at the cost of slight increase in memory consumption per subscription.
+NOTE: Schema v2
is still experimental.
+NOTE: Full non-rolling cluster restart is needed after altering this option for it to take any effect."""
broker_perf_trie_compaction.desc:
"""Enable trie path compaction.