Merge remote-tracking branch 'andrew/ft/EMQX-10713/unified-route-tab' into 0831-make-use-of-new-routing-table

This commit is contained in:
Zaiming (Stone) Shi 2023-08-31 15:46:33 +02:00
commit 876d539336
20 changed files with 601 additions and 158 deletions

View File

@ -17,8 +17,9 @@
-ifndef(EMQX_ROUTER_HRL).
-define(EMQX_ROUTER_HRL, true).
%% ETS table for message routing
%% ETS tables for message routing
-define(ROUTE_TAB, emqx_route).
-define(ROUTE_TAB_FILTERS, emqx_route_filters).
%% Mnesia table for message routing
-define(ROUTING_NODE, emqx_routing_node).

View File

@ -55,7 +55,9 @@ prep_stop(_State) ->
emqx_boot:is_enabled(listeners) andalso
emqx_listeners:stop().
stop(_State) -> ok.
stop(_State) ->
ok = emqx_router:deinit_schema(),
ok.
-define(CONFIG_LOADER, config_loader).
-define(DEFAULT_LOADER, emqx).

View File

@ -21,7 +21,6 @@
-include("emqx.hrl").
-include("logger.hrl").
-include("types.hrl").
-include_lib("mria/include/mria.hrl").
-include_lib("emqx/include/emqx_router.hrl").
%% Mnesia bootstrap
@ -46,14 +45,21 @@
do_delete_route/2
]).
-export([cleanup_routes/1]).
-export([
match_routes/1,
lookup_routes/1,
has_routes/1
has_route/2
]).
-export([print_routes/1]).
-export([
foldl_routes/2,
foldr_routes/2
]).
-export([topics/0]).
%% gen_server callbacks
@ -66,10 +72,21 @@
code_change/3
]).
-export([
get_schema_vsn/0,
init_schema/0,
deinit_schema/0
]).
-type group() :: binary().
-type dest() :: node() | {group(), node()}.
-record(routeidx, {
entry :: emqx_topic_index:key(dest()),
unused = [] :: nil()
}).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
@ -88,6 +105,19 @@ mnesia(boot) ->
{write_concurrency, true}
]}
]}
]),
ok = mria:create_table(?ROUTE_TAB_FILTERS, [
{type, ordered_set},
{rlog_shard, ?ROUTE_SHARD},
{storage, ram_copies},
{record_name, routeidx},
{attributes, record_info(fields, routeidx)},
{storage_properties, [
{ets, [
{read_concurrency, true},
{write_concurrency, auto}
]}
]}
]).
%%--------------------------------------------------------------------
@ -121,43 +151,49 @@ do_add_route(Topic) when is_binary(Topic) ->
-spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
do_add_route(Topic, Dest) when is_binary(Topic) ->
Route = #route{topic = Topic, dest = Dest},
case lists:member(Route, lookup_routes(Topic)) of
case has_route(Topic, Dest) of
true ->
ok;
false ->
ok = emqx_router_helper:monitor(Dest),
case emqx_topic:wildcard(Topic) of
true ->
Fun = fun emqx_router_utils:insert_trie_route/2,
emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD);
false ->
emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route)
end
mria_insert_route(get_schema_vsn(), Topic, Dest)
end.
%% @doc Match routes
mria_insert_route(v2, Topic, Dest) ->
mria_insert_route_v2(Topic, Dest);
mria_insert_route(v1, Topic, Dest) ->
mria_insert_route_v1(Topic, Dest).
%% @doc Take a real topic (not filter) as input, return the matching topics and topic
%% filters associated with route destination.
-spec match_routes(emqx_types:topic()) -> [emqx_types:route()].
match_routes(Topic) when is_binary(Topic) ->
case match_trie(Topic) of
[] -> lookup_routes(Topic);
Matched -> lists:append([lookup_routes(To) || To <- [Topic | Matched]])
end.
match_routes(get_schema_vsn(), Topic).
%% Optimize: routing table will be replicated to all router nodes.
match_trie(Topic) ->
case emqx_trie:empty() of
true -> [];
false -> emqx_trie:match(Topic)
end.
match_routes(v2, Topic) ->
match_routes_v2(Topic);
match_routes(v1, Topic) ->
match_routes_v1(Topic).
%% @doc Take a topic or filter as input, and return the existing routes with exactly
%% this topic or filter.
-spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()].
lookup_routes(Topic) ->
ets:lookup(?ROUTE_TAB, Topic).
lookup_routes(get_schema_vsn(), Topic).
-spec has_routes(emqx_types:topic()) -> boolean().
has_routes(Topic) when is_binary(Topic) ->
ets:member(?ROUTE_TAB, Topic).
lookup_routes(v2, Topic) ->
lookup_routes_v2(Topic);
lookup_routes(v1, Topic) ->
lookup_routes_v1(Topic).
-spec has_route(emqx_types:topic(), dest()) -> boolean().
has_route(Topic, Dest) ->
has_route(get_schema_vsn(), Topic, Dest).
has_route(v2, Topic, Dest) ->
has_route_v2(Topic, Dest);
has_route(v1, Topic, Dest) ->
has_route_v1(Topic, Dest).
-spec delete_route(emqx_types:topic()) -> ok | {error, term()}.
delete_route(Topic) when is_binary(Topic) ->
@ -173,18 +209,21 @@ do_delete_route(Topic) when is_binary(Topic) ->
-spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
do_delete_route(Topic, Dest) ->
Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of
true ->
Fun = fun emqx_router_utils:delete_trie_route/2,
emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD);
false ->
emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route)
end.
mria_delete_route(get_schema_vsn(), Topic, Dest).
mria_delete_route(v2, Topic, Dest) ->
mria_delete_route_v2(Topic, Dest);
mria_delete_route(v1, Topic, Dest) ->
mria_delete_route_v1(Topic, Dest).
-spec topics() -> list(emqx_types:topic()).
topics() ->
mnesia:dirty_all_keys(?ROUTE_TAB).
topics(get_schema_vsn()).
topics(v2) ->
list_topics_v2();
topics(v1) ->
list_topics_v1().
%% @doc Print routes to a topic
-spec print_routes(emqx_types:topic()) -> ok.
@ -196,12 +235,276 @@ print_routes(Topic) ->
match_routes(Topic)
).
-spec cleanup_routes(node()) -> ok.
cleanup_routes(Node) ->
cleanup_routes(get_schema_vsn(), Node).
cleanup_routes(v2, Node) ->
cleanup_routes_v2(Node);
cleanup_routes(v1, Node) ->
cleanup_routes_v1(Node).
-spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
foldl_routes(FoldFun, AccIn) ->
fold_routes(get_schema_vsn(), foldl, FoldFun, AccIn).
-spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
foldr_routes(FoldFun, AccIn) ->
fold_routes(get_schema_vsn(), foldr, FoldFun, AccIn).
fold_routes(v2, FunName, FoldFun, AccIn) ->
fold_routes_v2(FunName, FoldFun, AccIn);
fold_routes(v1, FunName, FoldFun, AccIn) ->
fold_routes_v1(FunName, FoldFun, AccIn).
call(Router, Msg) ->
gen_server:call(Router, Msg, infinity).
pick(Topic) ->
gproc_pool:pick_worker(router_pool, Topic).
%%--------------------------------------------------------------------
%% Schema v1
%% --------------------------------------------------------------------
-dialyzer({nowarn_function, [cleanup_routes_v1/1]}).
mria_insert_route_v1(Topic, Dest) ->
Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of
true ->
mria_route_tab_insert_update_trie(Route);
false ->
mria_route_tab_insert(Route)
end.
mria_route_tab_insert_update_trie(Route) ->
emqx_router_utils:maybe_trans(
fun emqx_router_utils:insert_trie_route/2,
[?ROUTE_TAB, Route],
?ROUTE_SHARD
).
mria_route_tab_insert(Route) ->
mria:dirty_write(?ROUTE_TAB, Route).
mria_delete_route_v1(Topic, Dest) ->
Route = #route{topic = Topic, dest = Dest},
case emqx_topic:wildcard(Topic) of
true ->
mria_route_tab_delete_update_trie(Route);
false ->
mria_route_tab_delete(Route)
end.
mria_route_tab_delete_update_trie(Route) ->
emqx_router_utils:maybe_trans(
fun emqx_router_utils:delete_trie_route/2,
[?ROUTE_TAB, Route],
?ROUTE_SHARD
).
mria_route_tab_delete(Route) ->
mria:dirty_delete_object(?ROUTE_TAB, Route).
match_routes_v1(Topic) ->
lookup_route_tab(Topic) ++
lists:flatmap(fun lookup_route_tab/1, match_global_trie(Topic)).
match_global_trie(Topic) ->
case emqx_trie:empty() of
true -> [];
false -> emqx_trie:match(Topic)
end.
lookup_routes_v1(Topic) ->
lookup_route_tab(Topic).
lookup_route_tab(Topic) ->
ets:lookup(?ROUTE_TAB, Topic).
has_route_v1(Topic, Dest) ->
has_route_tab_entry(Topic, Dest).
has_route_tab_entry(Topic, Dest) ->
[] =/= ets:match(?ROUTE_TAB, #route{topic = Topic, dest = Dest}).
cleanup_routes_v1(Node) ->
Patterns = [
#route{_ = '_', dest = Node},
#route{_ = '_', dest = {'_', Node}}
],
mria:transaction(?ROUTE_SHARD, fun() ->
[
mnesia:delete_object(?ROUTE_TAB, Route, write)
|| Pat <- Patterns,
Route <- mnesia:match_object(?ROUTE_TAB, Pat, write)
]
end).
list_topics_v1() ->
list_route_tab_topics().
list_route_tab_topics() ->
mnesia:dirty_all_keys(?ROUTE_TAB).
fold_routes_v1(FunName, FoldFun, AccIn) ->
ets:FunName(FoldFun, AccIn, ?ROUTE_TAB).
%%--------------------------------------------------------------------
%% Schema v2
%% One bag table exclusively for regular, non-filter subscription
%% topics, and one `emqx_topic_index` table exclusively for wildcard
%% topics. Writes go to only one of the two tables at a time.
%% --------------------------------------------------------------------
mria_insert_route_v2(Topic, Dest) ->
case emqx_trie_search:filter(Topic) of
Words when is_list(Words) ->
K = emqx_topic_index:make_key(Words, Dest),
mria:dirty_write(?ROUTE_TAB_FILTERS, #routeidx{entry = K});
false ->
mria_route_tab_insert(#route{topic = Topic, dest = Dest})
end.
mria_delete_route_v2(Topic, Dest) ->
case emqx_trie_search:filter(Topic) of
Words when is_list(Words) ->
K = emqx_topic_index:make_key(Words, Dest),
mria:dirty_delete(?ROUTE_TAB_FILTERS, K);
false ->
mria_route_tab_delete(#route{topic = Topic, dest = Dest})
end.
match_routes_v2(Topic) ->
lookup_route_tab(Topic) ++
[match_to_route(M) || M <- match_filters(Topic)].
match_filters(Topic) ->
emqx_topic_index:matches(Topic, ?ROUTE_TAB_FILTERS, []).
lookup_routes_v2(Topic) ->
case emqx_topic:wildcard(Topic) of
true ->
Pat = #routeidx{entry = emqx_topic_index:make_key(Topic, '$1')},
[Dest || [Dest] <- ets:match(?ROUTE_TAB_FILTERS, Pat)];
false ->
lookup_route_tab(Topic)
end.
has_route_v2(Topic, Dest) ->
case emqx_topic:wildcard(Topic) of
true ->
ets:member(?ROUTE_TAB_FILTERS, emqx_topic_index:make_key(Topic, Dest));
false ->
has_route_tab_entry(Topic, Dest)
end.
cleanup_routes_v2(Node) ->
% NOTE
% No point in transaction here because all the operations on filters table are dirty.
ok = ets:foldl(
fun(#routeidx{entry = K}, ok) ->
case get_dest_node(emqx_topic_index:get_id(K)) of
Node ->
mria:dirty_delete(?ROUTE_TAB_FILTERS, K);
_ ->
ok
end
end,
ok,
?ROUTE_TAB_FILTERS
),
ok = ets:foldl(
fun(#route{dest = Dest} = Route, ok) ->
case get_dest_node(Dest) of
Node ->
mria:dirty_delete_object(?ROUTE_TAB, Route);
_ ->
ok
end
end,
ok,
?ROUTE_TAB
).
get_dest_node({_, Node}) ->
Node;
get_dest_node(Node) ->
Node.
list_topics_v2() ->
Pat = #routeidx{entry = '$1'},
Filters = [emqx_topic_index:get_topic(K) || [K] <- ets:match(?ROUTE_TAB_FILTERS, Pat)],
list_route_tab_topics() ++ Filters.
fold_routes_v2(FunName, FoldFun, AccIn) ->
FilterFoldFun = mk_filtertab_fold_fun(FoldFun),
Acc = ets:FunName(FoldFun, AccIn, ?ROUTE_TAB),
ets:FunName(FilterFoldFun, Acc, ?ROUTE_TAB_FILTERS).
mk_filtertab_fold_fun(FoldFun) ->
fun(#routeidx{entry = K}, Acc) -> FoldFun(match_to_route(K), Acc) end.
match_to_route(M) ->
#route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
%%--------------------------------------------------------------------
%% Routing table type
%% --------------------------------------------------------------------
-define(PT_SCHEMA_VSN, {?MODULE, schemavsn}).
-type schemavsn() :: v1 | v2.
-spec get_schema_vsn() -> schemavsn().
get_schema_vsn() ->
persistent_term:get(?PT_SCHEMA_VSN).
-spec init_schema() -> ok.
init_schema() ->
ConfSchema = emqx_config:get([broker, routing, storage_schema]),
Schema = choose_schema_vsn(ConfSchema),
ok = persistent_term:put(?PT_SCHEMA_VSN, Schema),
case Schema of
ConfSchema ->
?SLOG(info, #{
msg => "routing_schema_used",
schema => Schema
});
_ ->
?SLOG(notice, #{
msg => "configured_routing_schema_ignored",
schema_in_use => Schema,
configured => ConfSchema,
reason =>
"Could not use configured routing storage schema because "
"there are already non-empty routing tables pertaining to "
"another schema."
})
end.
-spec deinit_schema() -> ok.
deinit_schema() ->
_ = persistent_term:erase(?PT_SCHEMA_VSN),
ok.
-spec choose_schema_vsn(schemavsn()) -> schemavsn().
choose_schema_vsn(ConfType) ->
IsEmptyIndex = emqx_trie:empty(),
IsEmptyFilters = is_empty(?ROUTE_TAB_FILTERS),
case {IsEmptyIndex, IsEmptyFilters} of
{true, true} ->
ConfType;
{false, true} ->
v1;
{true, false} ->
v2
end.
is_empty(Tab) ->
ets:first(Tab) =:= '$end_of_table'.
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------

View File

@ -148,11 +148,12 @@ handle_info({mnesia_table_event, Event}, State) ->
handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
case mria_rlog:role() of
core ->
% TODO
% Node may flap, do we need to wait for any pending cleanups in `init/1`
% on the flapping node?
global:trans(
{?LOCK, self()},
fun() ->
mria:transaction(?ROUTE_SHARD, fun ?MODULE:cleanup_routes/1, [Node])
end
fun() -> cleanup_routes(Node) end
),
ok = mria:dirty_delete(?ROUTING_NODE, Node);
replicant ->
@ -197,11 +198,4 @@ stats_fun() ->
end.
cleanup_routes(Node) ->
Patterns = [
#route{_ = '_', dest = Node},
#route{_ = '_', dest = {'_', Node}}
],
[
mnesia:delete_object(?ROUTE_TAB, Route, write)
|| Pat <- Patterns, Route <- mnesia:match_object(?ROUTE_TAB, Pat, write)
].
emqx_router:cleanup_routes(Node).

View File

@ -23,6 +23,8 @@
-export([init/1]).
start_link() ->
%% Init and log routing table type
ok = emqx_router:init_schema(),
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->

View File

@ -1358,6 +1358,11 @@ fields("broker") ->
ref("broker_perf"),
#{importance => ?IMPORTANCE_HIDDEN}
)},
{"routing",
sc(
ref("broker_routing"),
#{importance => ?IMPORTANCE_HIDDEN}
)},
%% FIXME: Need new design for shared subscription group
{"shared_subscription_group",
sc(
@ -1369,6 +1374,17 @@ fields("broker") ->
}
)}
];
fields("broker_routing") ->
[
{"storage_schema",
sc(
hoconsc:enum([v1, v2]),
#{
default => v1,
desc => ?DESC(broker_routing_storage_schema)
}
)}
];
fields("shared_subscription_group") ->
[
{"strategy",

View File

@ -29,8 +29,8 @@
-export([get_topic/1]).
-export([get_record/2]).
-type word() :: binary() | '+' | '#'.
-type key(ID) :: {[word()], {ID}}.
-type key(ID) :: emqx_trie_search:key(ID).
-type words() :: emqx_trie_search:words().
-type match(ID) :: key(ID).
-type name() :: any().
@ -50,7 +50,7 @@ new(Name) ->
%% @doc Insert a new entry into the index that associates given topic filter to given
%% record ID, and attaches arbitrary record to the entry. This allows users to choose
%% between regular and "materialized" indexes, for example.
-spec insert(emqx_types:topic(), _ID, _Record, name()) -> true.
-spec insert(emqx_types:topic() | words(), _ID, _Record, name()) -> true.
insert(Filter, ID, Record, Name) ->
Tree = gbt(Name),
Key = key(Filter, ID),
@ -59,7 +59,7 @@ insert(Filter, ID, Record, Name) ->
%% @doc Delete an entry from the index that associates given topic filter to given
%% record ID. Deleting non-existing entry is not an error.
-spec delete(emqx_types:topic(), _ID, name()) -> true.
-spec delete(emqx_types:topic() | words(), _ID, name()) -> true.
delete(Filter, ID, Name) ->
Tree = gbt(Name),
Key = key(Filter, ID),

View File

@ -24,12 +24,15 @@
-export([match/2]).
-export([matches/3]).
-export([make_key/2]).
-export([get_id/1]).
-export([get_topic/1]).
-export([get_record/2]).
-type key(ID) :: emqx_trie_search:key(ID).
-type match(ID) :: key(ID).
-type words() :: emqx_trie_search:words().
%% @doc Create a new ETS table suitable for topic index.
%% Usable mostly for testing purposes.
@ -40,16 +43,20 @@ new() ->
%% @doc Insert a new entry into the index that associates given topic filter to given
%% record ID, and attaches arbitrary record to the entry. This allows users to choose
%% between regular and "materialized" indexes, for example.
-spec insert(emqx_types:topic(), _ID, _Record, ets:table()) -> true.
-spec insert(emqx_types:topic() | words(), _ID, _Record, ets:table()) -> true.
insert(Filter, ID, Record, Tab) ->
Key = key(Filter, ID),
Key = make_key(Filter, ID),
true = ets:insert(Tab, {Key, Record}).
%% @doc Delete an entry from the index that associates given topic filter to given
%% record ID. Deleting non-existing entry is not an error.
-spec delete(emqx_types:topic(), _ID, ets:table()) -> true.
-spec delete(emqx_types:topic() | words(), _ID, ets:table()) -> true.
delete(Filter, ID, Tab) ->
true = ets:delete(Tab, key(Filter, ID)).
ets:delete(Tab, make_key(Filter, ID)).
-spec make_key(emqx_types:topic() | words(), ID) -> key(ID).
make_key(TopicOrFilter, ID) ->
emqx_trie_search:make_key(TopicOrFilter, ID).
%% @doc Match given topic against the index and return the first match, or `false` if
%% no match is found.
@ -73,13 +80,16 @@ get_topic(Key) ->
emqx_trie_search:get_topic(Key).
%% @doc Fetch the record associated with the match.
%% NOTE: Only really useful for ETS tables where the record ID is the first element.
-spec get_record(match(_ID), ets:table()) -> _Record.
%% May return empty list if the index entry was deleted in the meantime.
%% NOTE: Only really useful for ETS tables where the record data is the last element.
-spec get_record(match(_ID), ets:table()) -> [_Record].
get_record(K, Tab) ->
ets:lookup_element(Tab, K, 2).
key(TopicOrFilter, ID) ->
emqx_trie_search:make_key(TopicOrFilter, ID).
case ets:lookup(Tab, K) of
[Entry] ->
[erlang:element(tuple_size(Entry), Entry)];
[] ->
[]
end.
make_nextf(Tab) ->
fun(Key) -> ets:next(Tab, Key) end.

View File

@ -98,24 +98,24 @@
-module(emqx_trie_search).
-export([make_key/2]).
-export([make_key/2, filter/1]).
-export([match/2, matches/3, get_id/1, get_topic/1]).
-export_type([key/1, word/0, nextf/0, opts/0]).
-export_type([key/1, word/0, words/0, nextf/0, opts/0]).
-define(END, '$end_of_table').
-type word() :: binary() | '+' | '#'.
-type words() :: [word()].
-type base_key() :: {binary() | [word()], {}}.
-type key(ID) :: {binary() | [word()], {ID}}.
-type nextf() :: fun((key(_) | base_key()) -> ?END | key(_)).
-type opts() :: [unique | return_first].
%% @doc Make a search-key for the given topic.
-spec make_key(emqx_types:topic(), ID) -> key(ID).
-spec make_key(emqx_types:topic() | words(), ID) -> key(ID).
make_key(Topic, ID) when is_binary(Topic) ->
Words = filter_words(Topic),
case emqx_topic:wildcard(Words) of
true ->
case filter(Topic) of
Words when is_list(Words) ->
%% it's a wildcard
{Words, {ID}};
false ->
@ -123,7 +123,15 @@ make_key(Topic, ID) when is_binary(Topic) ->
%% because they can be found with direct lookups.
%% it is also more compact in memory.
{Topic, {ID}}
end.
end;
make_key(Words, ID) when is_list(Words) ->
{Words, {ID}}.
%% @doc Parse a topic filter into a list of words. Returns `false` if it's not a filter.
-spec filter(emqx_types:topic()) -> words() | false.
filter(Topic) ->
Words = filter_words(Topic),
emqx_topic:wildcard(Words) andalso Words.
%% @doc Extract record ID from the match.
-spec get_id(key(ID)) -> ID.
@ -325,6 +333,7 @@ filter_words(Topic) when is_binary(Topic) ->
% `match_filter/3` expects.
[word(W, filter) || W <- emqx_topic:tokens(Topic)].
-spec topic_words(emqx_types:topic()) -> [binary()].
topic_words(Topic) when is_binary(Topic) ->
[word(W, topic) || W <- emqx_topic:tokens(Topic)].

View File

@ -1094,7 +1094,7 @@ t_multi_streams_unsub(Config) ->
?retry(
_Sleep2 = 100,
_Attempts2 = 50,
false = emqx_router:has_routes(Topic)
[] = emqx_router:lookup_routes(Topic)
),
case emqtt:publish_via(C, PubVia, Topic, #{}, <<6, 7, 8, 9>>, [{qos, PubQos}]) of

View File

@ -26,24 +26,37 @@
-define(R, emqx_router).
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
PrevBootModules = application:get_env(emqx, boot_modules),
emqx_common_test_helpers:boot_modules([router]),
emqx_common_test_helpers:start_apps([]),
all() ->
[
{prev_boot_modules, PrevBootModules}
| Config
{group, routing_schema_v1},
{group, routing_schema_v2}
].
end_per_suite(Config) ->
PrevBootModules = ?config(prev_boot_modules, Config),
case PrevBootModules of
undefined -> ok;
{ok, Mods} -> emqx_common_test_helpers:boot_modules(Mods)
end,
emqx_common_test_helpers:stop_apps([]).
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[
{routing_schema_v1, [], TCs},
{routing_schema_v2, [], TCs}
].
init_per_group(GroupName, Config) ->
WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]),
AppSpecs = [
{emqx, #{
config => mk_config(GroupName),
override_env => [{boot_modules, [router]}]
}}
],
Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}),
[{group_apps, Apps}, {group_name, GroupName} | Config].
end_per_group(_GroupName, Config) ->
ok = emqx_cth_suite:stop(?config(group_apps, Config)).
mk_config(routing_schema_v1) ->
"broker.routing.storage_schema = v1";
mk_config(routing_schema_v2) ->
"broker.routing.storage_schema = v2".
init_per_testcase(_TestCase, Config) ->
clear_tables(),
@ -70,6 +83,14 @@ end_per_testcase(_TestCase, _Config) ->
% t_topics(_) ->
% error('TODO').
t_verify_type(Config) ->
case ?config(group_name, Config) of
routing_schema_v1 ->
?assertEqual(v1, ?R:get_schema_vsn());
routing_schema_v2 ->
?assertEqual(v2, ?R:get_schema_vsn())
end.
t_add_delete(_) ->
?R:add_route(<<"a/b/c">>),
?R:add_route(<<"a/b/c">>, node()),
@ -79,6 +100,55 @@ t_add_delete(_) ->
?R:delete_route(<<"a/+/b">>, node()),
?assertEqual([], ?R:topics()).
t_add_delete_incremental(_) ->
?R:add_route(<<"a/b/c">>),
?R:add_route(<<"a/+/c">>, node()),
?R:add_route(<<"a/+/+">>, node()),
?R:add_route(<<"a/b/#">>, node()),
?R:add_route(<<"#">>, node()),
?assertEqual(
[
#route{topic = <<"#">>, dest = node()},
#route{topic = <<"a/+/+">>, dest = node()},
#route{topic = <<"a/+/c">>, dest = node()},
#route{topic = <<"a/b/#">>, dest = node()},
#route{topic = <<"a/b/c">>, dest = node()}
],
lists:sort(?R:match_routes(<<"a/b/c">>))
),
?R:delete_route(<<"a/+/c">>, node()),
?assertEqual(
[
#route{topic = <<"#">>, dest = node()},
#route{topic = <<"a/+/+">>, dest = node()},
#route{topic = <<"a/b/#">>, dest = node()},
#route{topic = <<"a/b/c">>, dest = node()}
],
lists:sort(?R:match_routes(<<"a/b/c">>))
),
?R:delete_route(<<"a/+/+">>, node()),
?assertEqual(
[
#route{topic = <<"#">>, dest = node()},
#route{topic = <<"a/b/#">>, dest = node()},
#route{topic = <<"a/b/c">>, dest = node()}
],
lists:sort(?R:match_routes(<<"a/b/c">>))
),
?R:delete_route(<<"a/b/#">>, node()),
?assertEqual(
[
#route{topic = <<"#">>, dest = node()},
#route{topic = <<"a/b/c">>, dest = node()}
],
lists:sort(?R:match_routes(<<"a/b/c">>))
),
?R:delete_route(<<"a/b/c">>, node()),
?assertEqual(
[#route{topic = <<"#">>, dest = node()}],
lists:sort(?R:match_routes(<<"a/b/c">>))
).
t_do_add_delete(_) ->
?R:do_add_route(<<"a/b/c">>),
?R:do_add_route(<<"a/b/c">>, node()),
@ -114,9 +184,9 @@ t_print_routes(_) ->
?R:add_route(<<"+/+">>),
?R:print_routes(<<"a/b">>).
t_has_routes(_) ->
t_has_route(_) ->
?R:add_route(<<"devices/+/messages">>, node()),
?assert(?R:has_routes(<<"devices/+/messages">>)),
?assert(?R:has_route(<<"devices/+/messages">>, node())),
?R:delete_route(<<"devices/+/messages">>).
t_unexpected(_) ->
@ -128,5 +198,5 @@ t_unexpected(_) ->
clear_tables() ->
lists:foreach(
fun mnesia:clear_table/1,
[?ROUTE_TAB, ?TRIE, emqx_trie_node]
[?ROUTE_TAB, ?ROUTE_TAB_FILTERS, ?TRIE]
).

View File

@ -26,55 +26,68 @@
-define(ROUTER_HELPER, emqx_router_helper).
all() -> emqx_common_test_helpers:all(?MODULE).
all() ->
[
{group, routing_schema_v1},
{group, routing_schema_v2}
].
init_per_suite(Config) ->
DistPid =
case net_kernel:nodename() of
ignored ->
%% calling `net_kernel:start' without `epmd'
%% running will result in a failure.
emqx_common_test_helpers:start_epmd(),
{ok, Pid} = net_kernel:start(['test@127.0.0.1', longnames]),
Pid;
_ ->
undefined
end,
emqx_common_test_helpers:start_apps([]),
[{dist_pid, DistPid} | Config].
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[
{routing_schema_v1, [], TCs},
{routing_schema_v2, [], TCs}
].
end_per_suite(Config) ->
DistPid = ?config(dist_pid, Config),
case DistPid of
Pid when is_pid(Pid) ->
net_kernel:stop();
_ ->
ok
end,
emqx_common_test_helpers:stop_apps([]).
init_per_group(GroupName, Config) ->
WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]),
AppSpecs = [{emqx, mk_config(GroupName)}],
Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}),
[{group_name, GroupName}, {group_apps, Apps} | Config].
end_per_group(_GroupName, Config) ->
ok = emqx_cth_suite:stop(?config(group_apps, Config)).
mk_config(routing_schema_v1) ->
#{
config => "broker.routing.storage_schema = v1",
override_env => [{boot_modules, [router]}]
};
mk_config(routing_schema_v2) ->
#{
config => "broker.routing.storage_schema = v2",
override_env => [{boot_modules, [router]}]
}.
init_per_testcase(TestCase, Config) when
TestCase =:= t_cleanup_membership_mnesia_down;
TestCase =:= t_cleanup_membership_node_down;
TestCase =:= t_cleanup_monitor_node_down
->
ok = snabbkaffe:start_trace(),
Slave = emqx_common_test_helpers:start_slave(some_node, []),
GroupName = ?config(group_name, Config),
WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName, TestCase]),
[Slave] = emqx_cth_cluster:start(
[
{?MODULE, #{
apps => [{emqx, mk_config(GroupName)}],
join_to => node()
}}
],
#{work_dir => WorkDir}
),
[{slave, Slave} | Config];
init_per_testcase(_TestCase, Config) ->
ok = snabbkaffe:start_trace(),
Config.
end_per_testcase(TestCase, Config) when
TestCase =:= t_cleanup_membership_mnesia_down;
TestCase =:= t_cleanup_membership_node_down;
TestCase =:= t_cleanup_monitor_node_down
->
Slave = ?config(slave, Config),
emqx_common_test_helpers:stop_slave(Slave),
mria:clear_table(?ROUTE_TAB),
snabbkaffe:stop(),
ok = emqx_cth_cluster:stop([Slave]),
ok = snabbkaffe:stop(),
ok;
end_per_testcase(_TestCase, _Config) ->
ok = snabbkaffe:stop(),
ok.
t_monitor(_) ->

View File

@ -1054,7 +1054,7 @@ t_queue_subscription(Config) when is_list(Config) ->
begin
ct:pal("routes: ~p", [ets:tab2list(emqx_route)]),
%% FIXME: should ensure we have 2 subscriptions
true = emqx_router:has_routes(Topic)
[_] = emqx_router:lookup_routes(Topic)
end
),
@ -1081,7 +1081,7 @@ t_queue_subscription(Config) when is_list(Config) ->
%% _Attempts0 = 50,
%% begin
%% ct:pal("routes: ~p", [ets:tab2list(emqx_route)]),
%% false = emqx_router:has_routes(Topic)
%% [] = emqx_router:lookup_routes(Topic)
%% end
%% ),
ct:sleep(500),

View File

@ -57,6 +57,17 @@ t_insert(Config) ->
?assertEqual(<<"sensor/#">>, topic(match(M, <<"sensor">>, Tab))),
?assertEqual(t_insert_3, id(match(M, <<"sensor">>, Tab))).
t_insert_filter(Config) ->
M = get_module(Config),
Tab = M:new(),
Topic = <<"sensor/+/metric//#">>,
true = M:insert(Topic, 1, <<>>, Tab),
true = M:insert(emqx_trie_search:filter(Topic), 2, <<>>, Tab),
?assertEqual(
[Topic, Topic],
[topic(X) || X <- matches(M, <<"sensor/1/metric//2">>, Tab)]
).
t_match(Config) ->
M = get_module(Config),
Tab = M:new(),

View File

@ -18,15 +18,30 @@
-include_lib("eunit/include/eunit.hrl").
topic_validation_test() ->
-import(emqx_trie_search, [filter/1]).
filter_test_() ->
[
?_assertEqual(
[<<"sensor">>, '+', <<"metric">>, <<>>, '#'],
filter(<<"sensor/+/metric//#">>)
),
?_assertEqual(
false,
filter(<<"sensor/1/metric//42">>)
)
].
topic_validation_test_() ->
NextF = fun(_) -> '$end_of_table' end,
Call = fun(Topic) ->
emqx_trie_search:match(Topic, NextF)
end,
?assertError(badarg, Call(<<"+">>)),
?assertError(badarg, Call(<<"#">>)),
?assertError(badarg, Call(<<"a/+/b">>)),
?assertError(badarg, Call(<<"a/b/#">>)),
?assertEqual(false, Call(<<"a/b/b+">>)),
?assertEqual(false, Call(<<"a/b/c#">>)),
ok.
[
?_assertError(badarg, Call(<<"+">>)),
?_assertError(badarg, Call(<<"#">>)),
?_assertError(badarg, Call(<<"a/+/b">>)),
?_assertError(badarg, Call(<<"a/b/#">>)),
?_assertEqual(false, Call(<<"a/b/b+">>)),
?_assertEqual(false, Call(<<"a/b/c#">>))
].

View File

@ -1929,7 +1929,7 @@ t_node_joins_existing_cluster(Config) ->
?retry(
_Sleep2 = 100,
_Attempts2 = 50,
true = erpc:call(N2, emqx_router, has_routes, [MQTTTopic])
[] =/= erpc:call(N2, emqx_router, lookup_routes, [MQTTTopic])
),
{ok, SRef1} =
snabbkaffe:subscribe(

View File

@ -81,7 +81,7 @@ start_cluster(NamesWithPorts, Apps, Env) ->
NamesWithPorts
),
Opts0 = [
{env, [{emqx, boot_modules, [broker, listeners]}] ++ Env},
{env, Env},
{apps, Apps},
{conf,
[{[listeners, Proto, default, enable], false} || Proto <- [ssl, ws, wss]] ++

View File

@ -22,9 +22,6 @@
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
-include("emqx_mgmt.hrl").
-define(PRINT_CMD(Cmd, Descr), io:format("~-48s# ~ts~n", [Cmd, Descr])).
-define(DATA_BACKUP_OPTS, #{print_fun => fun emqx_ctl:print/2}).
-export([load/0]).
@ -49,20 +46,6 @@
data/1
]).
-define(PROC_INFOKEYS, [
status,
memory,
message_queue_len,
total_heap_size,
heap_size,
stack_size,
reductions
]).
-define(MAX_LIMIT, 10000).
-define(APP, emqx).
-spec load() -> ok.
load() ->
Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
@ -197,9 +180,12 @@ if_client(ClientId, Fun) ->
%% @doc Topics Command
topics(["list"]) ->
dump(?ROUTE_TAB, emqx_topic);
emqx_router:foldr_routes(
fun(Route, Acc) -> [print({emqx_topic, Route}) | Acc] end,
[]
);
topics(["show", Topic]) ->
Routes = ets:lookup(?ROUTE_TAB, bin(Topic)),
Routes = emqx_router:lookup_routes(Topic),
[print({emqx_topic, Route}) || Route <- Routes];
topics(_) ->
emqx_ctl:usage([

View File

@ -225,8 +225,9 @@ get_rules_ordered_by_ts() ->
-spec get_rules_for_topic(Topic :: binary()) -> [rule()].
get_rules_for_topic(Topic) ->
[
emqx_topic_index:get_record(M, ?RULE_TOPIC_INDEX)
|| M <- emqx_topic_index:matches(Topic, ?RULE_TOPIC_INDEX, [unique])
Rule
|| M <- emqx_topic_index:matches(Topic, ?RULE_TOPIC_INDEX, [unique]),
Rule <- lookup_rule(emqx_topic_index:get_id(M))
].
-spec get_rules_with_same_event(Topic :: binary()) -> [rule()].
@ -284,11 +285,14 @@ is_of_event_name(EventName, Topic) ->
-spec get_rule(Id :: rule_id()) -> {ok, rule()} | not_found.
get_rule(Id) ->
case ets:lookup(?RULE_TAB, Id) of
[{Id, Rule}] -> {ok, Rule#{id => Id}};
case lookup_rule(Id) of
[Rule] -> {ok, Rule};
[] -> not_found
end.
lookup_rule(Id) ->
[Rule || {_Id, Rule} <- ets:lookup(?RULE_TAB, Id)].
load_hooks_for_rule(#{from := Topics}) ->
lists:foreach(fun emqx_rule_events:load/1, Topics).
@ -483,7 +487,7 @@ with_parsed_rule(Params = #{id := RuleId, sql := Sql, actions := Actions}, Creat
do_insert_rule(#{id := Id} = Rule) ->
ok = load_hooks_for_rule(Rule),
ok = maybe_add_metrics_for_rule(Id),
true = ets:insert(?RULE_TAB, {Id, maps:remove(id, Rule)}),
true = ets:insert(?RULE_TAB, {Id, Rule}),
ok.
do_delete_rule(#{id := Id} = Rule) ->
@ -492,10 +496,10 @@ do_delete_rule(#{id := Id} = Rule) ->
true = ets:delete(?RULE_TAB, Id),
ok.
do_update_rule_index(#{id := Id, from := From} = Rule) ->
do_update_rule_index(#{id := Id, from := From}) ->
ok = lists:foreach(
fun(Topic) ->
true = emqx_topic_index:insert(Topic, Id, Rule, ?RULE_TOPIC_INDEX)
true = emqx_topic_index:insert(Topic, Id, [], ?RULE_TOPIC_INDEX)
end,
From
).

View File

@ -1549,6 +1549,13 @@ fields_ws_opts_max_frame_size.label:
sys_event_messages.desc:
"""Client events messages."""
broker_routing_storage_schema.desc:
"""Routing storage schema.
Set <code>v1</code> to leave the default.
Set <code>v2</code> to enable routing through 2 separate tables, one for topic filter and one for regular topic subscriptions. This schema should increase both subscription and routing performance at the cost of slight increase in memory consumption per subscription.
NOTE: Schema <code>v2</code> is still experimental.
NOTE: Full non-rolling cluster restart is needed after altering this option for it to take any effect."""
broker_perf_trie_compaction.desc:
"""Enable trie path compaction.
Enabling it significantly improves wildcard topic subscribe rate, if wildcard topics have unique prefixes like: 'sensor/{{id}}/+/', where ID is unique per subscriber.