feat(router): enable using 2 tables to store routes

Instead of a single unified table, to reap the benefits of cheap
`ets:lookup/2` per regular topic subscription route.

Change configuration option naming to reflect the change: user now has
an ability to choose _storage schema_.
This commit is contained in:
Andrew Mayorov 2023-08-29 17:41:59 +04:00
parent 270fd107b2
commit 063d6200c8
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
8 changed files with 298 additions and 209 deletions

View File

@ -19,7 +19,7 @@
%% ETS tables for message routing
-define(ROUTE_TAB, emqx_route).
-define(ROUTE_TAB_UNIFIED, emqx_route_unified).
-define(ROUTE_TAB_FILTERS, emqx_route_filters).
%% Mnesia table for message routing
-define(ROUTING_NODE, emqx_routing_node).

View File

@ -56,7 +56,7 @@ prep_stop(_State) ->
emqx_listeners:stop().
stop(_State) ->
ok = emqx_router:deinit_table_type(),
ok = emqx_router:deinit_schema(),
ok.
-define(CONFIG_LOADER, config_loader).

View File

@ -73,9 +73,9 @@
]).
-export([
get_table_type/0,
init_table_type/0,
deinit_table_type/0
get_schema_vsn/0,
init_schema/0,
deinit_schema/0
]).
-type group() :: binary().
@ -87,8 +87,6 @@
unused = [] :: nil()
}).
-dialyzer({nowarn_function, [cleanup_routes_regular/1]}).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
@ -108,7 +106,7 @@ mnesia(boot) ->
]}
]}
]),
ok = mria:create_table(?ROUTE_TAB_UNIFIED, [
ok = mria:create_table(?ROUTE_TAB_FILTERS, [
{type, ordered_set},
{rlog_shard, ?ROUTE_SHARD},
{storage, ram_copies},
@ -158,87 +156,41 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
ok;
false ->
ok = emqx_router_helper:monitor(Dest),
mria_insert_route(get_table_type(), Topic, Dest)
mria_insert_route(get_schema_vsn(), Topic, Dest)
end.
mria_insert_route(unified, Topic, Dest) ->
mria_insert_route_unified(Topic, Dest);
mria_insert_route(regular, Topic, Dest) ->
Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of
true ->
mria_insert_route_update_trie(Route);
false ->
mria_insert_route(Route)
end.
mria_insert_route_unified(Topic, Dest) ->
K = emqx_topic_index:make_key(Topic, Dest),
mria:dirty_write(?ROUTE_TAB_UNIFIED, #routeidx{entry = K}).
mria_insert_route_update_trie(Route) ->
emqx_router_utils:maybe_trans(
fun emqx_router_utils:insert_trie_route/2,
[?ROUTE_TAB, Route],
?ROUTE_SHARD
).
mria_insert_route(Route) ->
mria:dirty_write(?ROUTE_TAB, Route).
mria_insert_route(v2, Topic, Dest) ->
mria_insert_route_v2(Topic, Dest);
mria_insert_route(v1, Topic, Dest) ->
mria_insert_route_v1(Topic, Dest).
%% @doc Match routes
-spec match_routes(emqx_types:topic()) -> [emqx_types:route()].
match_routes(Topic) when is_binary(Topic) ->
match_routes(get_table_type(), Topic).
match_routes(get_schema_vsn(), Topic).
match_routes(unified, Topic) ->
[match_to_route(M) || M <- match_unified(Topic)];
match_routes(regular, Topic) ->
lookup_routes_regular(Topic) ++
lists:flatmap(fun lookup_routes_regular/1, match_global_trie(Topic)).
match_unified(Topic) ->
emqx_topic_index:matches(Topic, ?ROUTE_TAB_UNIFIED, []).
match_global_trie(Topic) ->
case emqx_trie:empty() of
true -> [];
false -> emqx_trie:match(Topic)
end.
match_routes(v2, Topic) ->
match_routes_v2(Topic);
match_routes(v1, Topic) ->
match_routes_v1(Topic).
-spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()].
lookup_routes(Topic) ->
case get_table_type() of
unified ->
lookup_routes_unified(Topic);
regular ->
lookup_routes_regular(Topic)
end.
lookup_routes(get_schema_vsn(), Topic).
lookup_routes_unified(Topic) ->
Pat = #routeidx{entry = emqx_topic_index:make_key(Topic, '$1')},
[Dest || [Dest] <- ets:match(?ROUTE_TAB_UNIFIED, Pat)].
lookup_routes_regular(Topic) ->
ets:lookup(?ROUTE_TAB, Topic).
match_to_route(M) ->
#route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
lookup_routes(v2, Topic) ->
lookup_routes_v2(Topic);
lookup_routes(v1, Topic) ->
lookup_routes_v1(Topic).
-spec has_route(emqx_types:topic(), dest()) -> boolean().
has_route(Topic, Dest) ->
case get_table_type() of
unified ->
has_route_unified(Topic, Dest);
regular ->
has_route_regular(Topic, Dest)
end.
has_route(get_schema_vsn(), Topic, Dest).
has_route_unified(Topic, Dest) ->
ets:member(?ROUTE_TAB_UNIFIED, emqx_topic_index:make_key(Topic, Dest)).
has_route_regular(Topic, Dest) ->
lists:any(fun(Route) -> Route#route.dest =:= Dest end, ets:lookup(?ROUTE_TAB, Topic)).
has_route(v2, Topic, Dest) ->
has_route_v2(Topic, Dest);
has_route(v1, Topic, Dest) ->
has_route_v1(Topic, Dest).
-spec delete_route(emqx_types:topic()) -> ok | {error, term()}.
delete_route(Topic) when is_binary(Topic) ->
@ -254,42 +206,21 @@ do_delete_route(Topic) when is_binary(Topic) ->
-spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
do_delete_route(Topic, Dest) ->
mria_delete_route(get_table_type(), Topic, Dest).
mria_delete_route(get_schema_vsn(), Topic, Dest).
mria_delete_route(unified, Topic, Dest) ->
mria_delete_route_unified(Topic, Dest);
mria_delete_route(regular, Topic, Dest) ->
Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of
true ->
mria_delete_route_update_trie(Route);
false ->
mria_delete_route(Route)
end.
mria_delete_route_unified(Topic, Dest) ->
K = emqx_topic_index:make_key(Topic, Dest),
mria:dirty_delete(?ROUTE_TAB_UNIFIED, K).
mria_delete_route_update_trie(Route) ->
emqx_router_utils:maybe_trans(
fun emqx_router_utils:delete_trie_route/2,
[?ROUTE_TAB, Route],
?ROUTE_SHARD
).
mria_delete_route(Route) ->
mria:dirty_delete_object(?ROUTE_TAB, Route).
mria_delete_route(v2, Topic, Dest) ->
mria_delete_route_v2(Topic, Dest);
mria_delete_route(v1, Topic, Dest) ->
mria_delete_route_v1(Topic, Dest).
-spec topics() -> list(emqx_types:topic()).
topics() ->
topics(get_table_type()).
topics(get_schema_vsn()).
topics(unified) ->
Pat = #routeidx{entry = '$1'},
[emqx_topic_index:get_topic(K) || [K] <- ets:match(?ROUTE_TAB_UNIFIED, Pat)];
topics(regular) ->
mnesia:dirty_all_keys(?ROUTE_TAB).
topics(v2) ->
list_topics_v2();
topics(v1) ->
list_topics_v1().
%% @doc Print routes to a topic
-spec print_routes(emqx_types:topic()) -> ok.
@ -303,31 +234,99 @@ print_routes(Topic) ->
-spec cleanup_routes(node()) -> ok.
cleanup_routes(Node) ->
case get_table_type() of
unified ->
cleanup_routes_unified(Node);
regular ->
cleanup_routes_regular(Node)
cleanup_routes(get_schema_vsn(), Node).
cleanup_routes(v2, Node) ->
cleanup_routes_v2(Node);
cleanup_routes(v1, Node) ->
cleanup_routes_v1(Node).
-spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
foldl_routes(FoldFun, AccIn) ->
fold_routes(get_schema_vsn(), foldl, FoldFun, AccIn).
-spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
foldr_routes(FoldFun, AccIn) ->
fold_routes(get_schema_vsn(), foldr, FoldFun, AccIn).
fold_routes(v2, FunName, FoldFun, AccIn) ->
fold_routes_v2(FunName, FoldFun, AccIn);
fold_routes(v1, FunName, FoldFun, AccIn) ->
fold_routes_v1(FunName, FoldFun, AccIn).
call(Router, Msg) ->
gen_server:call(Router, Msg, infinity).
pick(Topic) ->
gproc_pool:pick_worker(router_pool, Topic).
%%--------------------------------------------------------------------
%% Schema v1
%% --------------------------------------------------------------------
-dialyzer({nowarn_function, [cleanup_routes_v1/1]}).
mria_insert_route_v1(Topic, Dest) ->
Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of
true ->
mria_route_tab_insert_update_trie(Route);
false ->
mria_route_tab_insert(Route)
end.
cleanup_routes_unified(Node) ->
% NOTE
% No point in transaction here because all the operations on unified routing table
% are dirty.
ets:foldl(
fun(#routeidx{entry = K}, ok) ->
case emqx_topic_index:get_id(K) of
Node ->
mria:dirty_delete(?ROUTE_TAB_UNIFIED, K);
_ ->
ok
end
end,
ok,
?ROUTE_TAB_UNIFIED
mria_route_tab_insert_update_trie(Route) ->
emqx_router_utils:maybe_trans(
fun emqx_router_utils:insert_trie_route/2,
[?ROUTE_TAB, Route],
?ROUTE_SHARD
).
cleanup_routes_regular(Node) ->
mria_route_tab_insert(Route) ->
mria:dirty_write(?ROUTE_TAB, Route).
mria_delete_route_v1(Topic, Dest) ->
Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of
true ->
mria_route_tab_delete_update_trie(Route);
false ->
mria_route_tab_delete(Route)
end.
mria_route_tab_delete_update_trie(Route) ->
emqx_router_utils:maybe_trans(
fun emqx_router_utils:delete_trie_route/2,
[?ROUTE_TAB, Route],
?ROUTE_SHARD
).
mria_route_tab_delete(Route) ->
mria:dirty_delete_object(?ROUTE_TAB, Route).
match_routes_v1(Topic) ->
lookup_route_tab(Topic) ++
lists:flatmap(fun lookup_route_tab/1, match_global_trie(Topic)).
match_global_trie(Topic) ->
case emqx_trie:empty() of
true -> [];
false -> emqx_trie:match(Topic)
end.
lookup_routes_v1(Topic) ->
lookup_route_tab(Topic).
lookup_route_tab(Topic) ->
ets:lookup(?ROUTE_TAB, Topic).
has_route_v1(Topic, Dest) ->
has_route_tab_entry(Topic, Dest).
has_route_tab_entry(Topic, Dest) ->
[] =/= ets:match(?ROUTE_TAB, #route{topic = Topic, dest = Dest}).
cleanup_routes_v1(Node) ->
Patterns = [
#route{_ = '_', dest = Node},
#route{_ = '_', dest = {'_', Node}}
@ -340,83 +339,165 @@ cleanup_routes_regular(Node) ->
]
end).
-spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
foldl_routes(FoldFun, AccIn) ->
case get_table_type() of
unified ->
ets:foldl(mk_fold_fun_unified(FoldFun), AccIn, ?ROUTE_TAB_UNIFIED);
regular ->
ets:foldl(FoldFun, AccIn, ?ROUTE_TAB)
list_topics_v1() ->
list_route_tab_topics().
list_route_tab_topics() ->
mnesia:dirty_all_keys(?ROUTE_TAB).
fold_routes_v1(FunName, FoldFun, AccIn) ->
ets:FunName(FoldFun, AccIn, ?ROUTE_TAB).
%%--------------------------------------------------------------------
%% Schema v2
%% One bag table exclusively for regular, non-filter subscription
%% topics, and one `emqx_topic_index` table exclusively for wildcard
%% topics. Writes go to only one of the two tables at a time.
%% --------------------------------------------------------------------
mria_insert_route_v2(Topic, Dest) ->
case emqx_trie_search:filter(Topic) of
Words when is_list(Words) ->
K = emqx_topic_index:make_key(Words, Dest),
mria:dirty_write(?ROUTE_TAB_FILTERS, #routeidx{entry = K});
false ->
mria_route_tab_insert(#route{topic = Topic, dest = Dest})
end.
-spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
foldr_routes(FoldFun, AccIn) ->
case get_table_type() of
unified ->
ets:foldr(mk_fold_fun_unified(FoldFun), AccIn, ?ROUTE_TAB_UNIFIED);
regular ->
ets:foldr(FoldFun, AccIn, ?ROUTE_TAB)
mria_delete_route_v2(Topic, Dest) ->
case emqx_trie_search:filter(Topic) of
Words when is_list(Words) ->
K = emqx_topic_index:make_key(Words, Dest),
mria:dirty_delete(?ROUTE_TAB_FILTERS, K);
false ->
mria_route_tab_delete(#route{topic = Topic, dest = Dest})
end.
mk_fold_fun_unified(FoldFun) ->
match_routes_v2(Topic) ->
lookup_route_tab(Topic) ++
[match_to_route(M) || M <- match_filters(Topic)].
match_filters(Topic) ->
emqx_topic_index:matches(Topic, ?ROUTE_TAB_FILTERS, []).
lookup_routes_v2(Topic) ->
case emqx_topic:wildcard(Topic) of
true ->
Pat = #routeidx{entry = emqx_topic_index:make_key(Topic, '$1')},
[Dest || [Dest] <- ets:match(?ROUTE_TAB_FILTERS, Pat)];
false ->
lookup_route_tab(Topic)
end.
has_route_v2(Topic, Dest) ->
case emqx_topic:wildcard(Topic) of
true ->
ets:member(?ROUTE_TAB_FILTERS, emqx_topic_index:make_key(Topic, Dest));
false ->
has_route_tab_entry(Topic, Dest)
end.
cleanup_routes_v2(Node) ->
% NOTE
% No point in transaction here because all the operations on unified routing table
% are dirty.
ok = ets:foldl(
fun(#routeidx{entry = K}, ok) ->
case get_dest_node(emqx_topic_index:get_id(K)) of
Node ->
mria:dirty_delete(?ROUTE_TAB_FILTERS, K);
_ ->
ok
end
end,
ok,
?ROUTE_TAB_FILTERS
),
ok = ets:foldl(
fun(#route{dest = Dest} = Route, ok) ->
case get_dest_node(Dest) of
Node ->
mria:dirty_delete_object(?ROUTE_TAB, Route);
_ ->
ok
end
end,
ok,
?ROUTE_TAB
).
get_dest_node({_, Node}) ->
Node;
get_dest_node(Node) ->
Node.
list_topics_v2() ->
Pat = #routeidx{entry = '$1'},
Filters = [emqx_topic_index:get_topic(K) || [K] <- ets:match(?ROUTE_TAB_FILTERS, Pat)],
list_route_tab_topics() ++ Filters.
fold_routes_v2(FunName, FoldFun, AccIn) ->
FilterFoldFun = mk_filtertab_fold_fun(FoldFun),
Acc = ets:FunName(FoldFun, AccIn, ?ROUTE_TAB),
ets:FunName(FilterFoldFun, Acc, ?ROUTE_TAB_FILTERS).
mk_filtertab_fold_fun(FoldFun) ->
fun(#routeidx{entry = K}, Acc) -> FoldFun(match_to_route(K), Acc) end.
call(Router, Msg) ->
gen_server:call(Router, Msg, infinity).
pick(Topic) ->
gproc_pool:pick_worker(router_pool, Topic).
match_to_route(M) ->
#route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
%%--------------------------------------------------------------------
%% Routing table type
%% --------------------------------------------------------------------
-define(PT_TABLE_TYPE, {?MODULE, tabtype}).
-define(PT_SCHEMA_VSN, {?MODULE, schemavsn}).
-type tabtype() :: regular | unified.
-type schemavsn() :: v1 | v2.
-spec get_table_type() -> tabtype().
get_table_type() ->
persistent_term:get(?PT_TABLE_TYPE).
-spec get_schema_vsn() -> schemavsn().
get_schema_vsn() ->
persistent_term:get(?PT_SCHEMA_VSN).
-spec init_table_type() -> ok.
init_table_type() ->
ConfType = emqx_config:get([broker, routing_table_type]),
Type = choose_table_type(ConfType),
ok = persistent_term:put(?PT_TABLE_TYPE, Type),
case Type of
ConfType ->
-spec init_schema() -> ok.
init_schema() ->
ConfSchema = emqx_config:get([broker, routing, storage_schema]),
Schema = choose_schema_vsn(ConfSchema),
ok = persistent_term:put(?PT_SCHEMA_VSN, Schema),
case Schema of
ConfSchema ->
?SLOG(info, #{
msg => "routing_table_type_used",
type => Type
msg => "routing_schema_used",
schema => Schema
});
_ ->
?SLOG(notice, #{
msg => "configured_routing_table_type_unacceptable",
type => Type,
configured => ConfType,
msg => "configured_routing_schema_unacceptable",
schema => Schema,
configured => ConfSchema,
reason =>
"Could not use configured routing table type because "
"there's already non-empty routing table of another type."
"Could not use configured routing storage schema because "
"there are already non-empty routing tables pertaining to "
"another schema."
})
end.
-spec deinit_table_type() -> ok.
deinit_table_type() ->
_ = persistent_term:erase(?PT_TABLE_TYPE),
-spec deinit_schema() -> ok.
deinit_schema() ->
_ = persistent_term:erase(?PT_SCHEMA_VSN),
ok.
-spec choose_table_type(tabtype()) -> tabtype().
choose_table_type(ConfType) ->
IsEmptyRegular = is_empty(?ROUTE_TAB),
IsEmptyUnified = is_empty(?ROUTE_TAB_UNIFIED),
case {IsEmptyRegular, IsEmptyUnified} of
-spec choose_schema_vsn(schemavsn()) -> schemavsn().
choose_schema_vsn(ConfType) ->
IsEmptyIndex = emqx_trie:empty(),
IsEmptyFilters = is_empty(?ROUTE_TAB_FILTERS),
case {IsEmptyIndex, IsEmptyFilters} of
{true, true} ->
ConfType;
{false, true} ->
regular;
v1;
{true, false} ->
unified
v2
end.
is_empty(Tab) ->

View File

@ -24,7 +24,7 @@
start_link() ->
%% Init and log routing table type
ok = emqx_router:init_table_type(),
ok = emqx_router:init_schema(),
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->

View File

@ -1358,14 +1358,10 @@ fields("broker") ->
ref("broker_perf"),
#{importance => ?IMPORTANCE_HIDDEN}
)},
{"routing_table_type",
{"routing",
sc(
hoconsc:enum([regular, unified]),
#{
default => regular,
importance => ?IMPORTANCE_HIDDEN,
desc => ?DESC(broker_routing_table_type)
}
ref("broker_routing"),
#{importance => ?IMPORTANCE_HIDDEN}
)},
%% FIXME: Need new design for shared subscription group
{"shared_subscription_group",
@ -1378,6 +1374,17 @@ fields("broker") ->
}
)}
];
fields("broker_routing") ->
[
{"storage_schema",
sc(
hoconsc:enum([v1, v2]),
#{
default => v1,
desc => ?DESC(broker_routing_storage_schema)
}
)}
];
fields("shared_subscription_group") ->
[
{"strategy",

View File

@ -28,15 +28,15 @@
all() ->
[
{group, routing_table_regular},
{group, routing_table_unified}
{group, routing_schema_v1},
{group, routing_schema_v2}
].
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[
{routing_table_regular, [], TCs},
{routing_table_unified, [], TCs}
{routing_schema_v1, [], TCs},
{routing_schema_v2, [], TCs}
].
init_per_group(GroupName, Config) ->
@ -53,10 +53,10 @@ init_per_group(GroupName, Config) ->
end_per_group(_GroupName, Config) ->
ok = emqx_cth_suite:stop(?config(group_apps, Config)).
mk_config(routing_table_regular) ->
"broker.routing_table_type = regular";
mk_config(routing_table_unified) ->
"broker.routing_table_type = unified".
mk_config(routing_schema_v1) ->
"broker.routing.storage_schema = v1";
mk_config(routing_schema_v2) ->
"broker.routing.storage_schema = v2".
init_per_testcase(_TestCase, Config) ->
clear_tables(),
@ -85,10 +85,10 @@ end_per_testcase(_TestCase, _Config) ->
t_verify_type(Config) ->
case ?config(group_name, Config) of
routing_table_regular ->
?assertEqual(regular, ?R:get_table_type());
routing_table_unified ->
?assertEqual(unified, ?R:get_table_type())
routing_schema_v1 ->
?assertEqual(v1, ?R:get_schema_vsn());
routing_schema_v2 ->
?assertEqual(v2, ?R:get_schema_vsn())
end.
t_add_delete(_) ->
@ -198,5 +198,5 @@ t_unexpected(_) ->
clear_tables() ->
lists:foreach(
fun mnesia:clear_table/1,
[?ROUTE_TAB, ?ROUTE_TAB_UNIFIED, ?TRIE]
[?ROUTE_TAB, ?ROUTE_TAB_FILTERS, ?TRIE]
).

View File

@ -28,15 +28,15 @@
all() ->
[
{group, routing_table_regular},
{group, routing_table_unified}
{group, routing_schema_v1},
{group, routing_schema_v2}
].
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[
{routing_table_regular, [], TCs},
{routing_table_unified, [], TCs}
{routing_schema_v1, [], TCs},
{routing_schema_v2, [], TCs}
].
init_per_group(GroupName, Config) ->
@ -48,14 +48,14 @@ init_per_group(GroupName, Config) ->
end_per_group(_GroupName, Config) ->
ok = emqx_cth_suite:stop(?config(group_apps, Config)).
mk_config(routing_table_regular) ->
mk_config(routing_schema_v1) ->
#{
config => "broker.routing_table_type = regular",
config => "broker.routing.storage_schema = v1",
override_env => [{boot_modules, [router]}]
};
mk_config(routing_table_unified) ->
mk_config(routing_schema_v2) ->
#{
config => "broker.routing_table_type = unified",
config => "broker.routing.storage_schema = v2",
override_env => [{boot_modules, [router]}]
}.

View File

@ -1549,11 +1549,12 @@ fields_ws_opts_max_frame_size.label:
sys_event_messages.desc:
"""Client events messages."""
broker_routing_table_type.desc:
"""Routing table type.
Unified routing table should increase both subscription and routing performance at the cost of slight increase in memory consumption per subscription.
NOTE: This is an experimental feature.
NOTE: Full non-rolling cluster restart is needed after enabling or disabling this option for it to take any effect."""
broker_routing_storage_schema.desc:
"""Routing storage schema.
Set <code>v1</code> to leave the default.
Set <code>v2</code> to enable routing through 2 separate tables, one for topic filter and one for regular topic subscriptions. This schema should increase both subscription and routing performance at the cost of slight increase in memory consumption per subscription.
NOTE: Schema <code>v2</code> is still experimental.
NOTE: Full non-rolling cluster restart is needed after altering this option for it to take any effect."""
broker_perf_trie_compaction.desc:
"""Enable trie path compaction.