Merge pull request #11524 from keynslug/ft/EMQX-10713/unified-route-tab
feat(router): add routing storage schema with 2 tables
This commit is contained in:
commit
d26995a2aa
|
@ -35,7 +35,7 @@
|
||||||
-define(EMQX_RELEASE_CE, "5.1.5-build.3").
|
-define(EMQX_RELEASE_CE, "5.1.5-build.3").
|
||||||
|
|
||||||
%% Enterprise edition
|
%% Enterprise edition
|
||||||
-define(EMQX_RELEASE_EE, "5.2.0-alpha.3").
|
-define(EMQX_RELEASE_EE, "5.2.0-alpha.4").
|
||||||
|
|
||||||
%% The HTTP API version
|
%% The HTTP API version
|
||||||
-define(EMQX_API_VERSION, "5.0").
|
-define(EMQX_API_VERSION, "5.0").
|
||||||
|
|
|
@ -17,8 +17,9 @@
|
||||||
-ifndef(EMQX_ROUTER_HRL).
|
-ifndef(EMQX_ROUTER_HRL).
|
||||||
-define(EMQX_ROUTER_HRL, true).
|
-define(EMQX_ROUTER_HRL, true).
|
||||||
|
|
||||||
%% ETS table for message routing
|
%% ETS tables for message routing
|
||||||
-define(ROUTE_TAB, emqx_route).
|
-define(ROUTE_TAB, emqx_route).
|
||||||
|
-define(ROUTE_TAB_FILTERS, emqx_route_filters).
|
||||||
|
|
||||||
%% Mnesia table for message routing
|
%% Mnesia table for message routing
|
||||||
-define(ROUTING_NODE, emqx_routing_node).
|
-define(ROUTING_NODE, emqx_routing_node).
|
||||||
|
|
|
@ -55,7 +55,9 @@ prep_stop(_State) ->
|
||||||
emqx_boot:is_enabled(listeners) andalso
|
emqx_boot:is_enabled(listeners) andalso
|
||||||
emqx_listeners:stop().
|
emqx_listeners:stop().
|
||||||
|
|
||||||
stop(_State) -> ok.
|
stop(_State) ->
|
||||||
|
ok = emqx_router:deinit_schema(),
|
||||||
|
ok.
|
||||||
|
|
||||||
-define(CONFIG_LOADER, config_loader).
|
-define(CONFIG_LOADER, config_loader).
|
||||||
-define(DEFAULT_LOADER, emqx).
|
-define(DEFAULT_LOADER, emqx).
|
||||||
|
|
|
@ -21,7 +21,6 @@
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
-include("types.hrl").
|
-include("types.hrl").
|
||||||
-include_lib("mria/include/mria.hrl").
|
|
||||||
-include_lib("emqx/include/emqx_router.hrl").
|
-include_lib("emqx/include/emqx_router.hrl").
|
||||||
|
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
|
@ -46,16 +45,25 @@
|
||||||
do_delete_route/2
|
do_delete_route/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([cleanup_routes/1]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
match_routes/1,
|
match_routes/1,
|
||||||
lookup_routes/1,
|
lookup_routes/1
|
||||||
has_routes/1
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([print_routes/1]).
|
-export([print_routes/1]).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
foldl_routes/2,
|
||||||
|
foldr_routes/2
|
||||||
|
]).
|
||||||
|
|
||||||
-export([topics/0]).
|
-export([topics/0]).
|
||||||
|
|
||||||
|
%% Exported for tests
|
||||||
|
-export([has_route/2]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([
|
-export([
|
||||||
init/1,
|
init/1,
|
||||||
|
@ -66,10 +74,21 @@
|
||||||
code_change/3
|
code_change/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
get_schema_vsn/0,
|
||||||
|
init_schema/0,
|
||||||
|
deinit_schema/0
|
||||||
|
]).
|
||||||
|
|
||||||
-type group() :: binary().
|
-type group() :: binary().
|
||||||
|
|
||||||
-type dest() :: node() | {group(), node()}.
|
-type dest() :: node() | {group(), node()}.
|
||||||
|
|
||||||
|
-record(routeidx, {
|
||||||
|
entry :: emqx_topic_index:key(dest()),
|
||||||
|
unused = [] :: nil()
|
||||||
|
}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -88,6 +107,19 @@ mnesia(boot) ->
|
||||||
{write_concurrency, true}
|
{write_concurrency, true}
|
||||||
]}
|
]}
|
||||||
]}
|
]}
|
||||||
|
]),
|
||||||
|
ok = mria:create_table(?ROUTE_TAB_FILTERS, [
|
||||||
|
{type, ordered_set},
|
||||||
|
{rlog_shard, ?ROUTE_SHARD},
|
||||||
|
{storage, ram_copies},
|
||||||
|
{record_name, routeidx},
|
||||||
|
{attributes, record_info(fields, routeidx)},
|
||||||
|
{storage_properties, [
|
||||||
|
{ets, [
|
||||||
|
{read_concurrency, true},
|
||||||
|
{write_concurrency, auto}
|
||||||
|
]}
|
||||||
|
]}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -121,43 +153,49 @@ do_add_route(Topic) when is_binary(Topic) ->
|
||||||
|
|
||||||
-spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
|
-spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
|
||||||
do_add_route(Topic, Dest) when is_binary(Topic) ->
|
do_add_route(Topic, Dest) when is_binary(Topic) ->
|
||||||
Route = #route{topic = Topic, dest = Dest},
|
case has_route(Topic, Dest) of
|
||||||
case lists:member(Route, lookup_routes(Topic)) of
|
|
||||||
true ->
|
true ->
|
||||||
ok;
|
ok;
|
||||||
false ->
|
false ->
|
||||||
ok = emqx_router_helper:monitor(Dest),
|
ok = emqx_router_helper:monitor(Dest),
|
||||||
case emqx_topic:wildcard(Topic) of
|
mria_insert_route(get_schema_vsn(), Topic, Dest)
|
||||||
true ->
|
|
||||||
Fun = fun emqx_router_utils:insert_trie_route/2,
|
|
||||||
emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD);
|
|
||||||
false ->
|
|
||||||
emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route)
|
|
||||||
end
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Match routes
|
mria_insert_route(v2, Topic, Dest) ->
|
||||||
|
mria_insert_route_v2(Topic, Dest);
|
||||||
|
mria_insert_route(v1, Topic, Dest) ->
|
||||||
|
mria_insert_route_v1(Topic, Dest).
|
||||||
|
|
||||||
|
%% @doc Take a real topic (not filter) as input, return the matching topics and topic
|
||||||
|
%% filters associated with route destination.
|
||||||
-spec match_routes(emqx_types:topic()) -> [emqx_types:route()].
|
-spec match_routes(emqx_types:topic()) -> [emqx_types:route()].
|
||||||
match_routes(Topic) when is_binary(Topic) ->
|
match_routes(Topic) when is_binary(Topic) ->
|
||||||
case match_trie(Topic) of
|
match_routes(get_schema_vsn(), Topic).
|
||||||
[] -> lookup_routes(Topic);
|
|
||||||
Matched -> lists:append([lookup_routes(To) || To <- [Topic | Matched]])
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% Optimize: routing table will be replicated to all router nodes.
|
match_routes(v2, Topic) ->
|
||||||
match_trie(Topic) ->
|
match_routes_v2(Topic);
|
||||||
case emqx_trie:empty() of
|
match_routes(v1, Topic) ->
|
||||||
true -> [];
|
match_routes_v1(Topic).
|
||||||
false -> emqx_trie:match(Topic)
|
|
||||||
end.
|
|
||||||
|
|
||||||
|
%% @doc Take a topic or filter as input, and return the existing routes with exactly
|
||||||
|
%% this topic or filter.
|
||||||
-spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()].
|
-spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()].
|
||||||
lookup_routes(Topic) ->
|
lookup_routes(Topic) ->
|
||||||
ets:lookup(?ROUTE_TAB, Topic).
|
lookup_routes(get_schema_vsn(), Topic).
|
||||||
|
|
||||||
-spec has_routes(emqx_types:topic()) -> boolean().
|
lookup_routes(v2, Topic) ->
|
||||||
has_routes(Topic) when is_binary(Topic) ->
|
lookup_routes_v2(Topic);
|
||||||
ets:member(?ROUTE_TAB, Topic).
|
lookup_routes(v1, Topic) ->
|
||||||
|
lookup_routes_v1(Topic).
|
||||||
|
|
||||||
|
-spec has_route(emqx_types:topic(), dest()) -> boolean().
|
||||||
|
has_route(Topic, Dest) ->
|
||||||
|
has_route(get_schema_vsn(), Topic, Dest).
|
||||||
|
|
||||||
|
has_route(v2, Topic, Dest) ->
|
||||||
|
has_route_v2(Topic, Dest);
|
||||||
|
has_route(v1, Topic, Dest) ->
|
||||||
|
has_route_v1(Topic, Dest).
|
||||||
|
|
||||||
-spec delete_route(emqx_types:topic()) -> ok | {error, term()}.
|
-spec delete_route(emqx_types:topic()) -> ok | {error, term()}.
|
||||||
delete_route(Topic) when is_binary(Topic) ->
|
delete_route(Topic) when is_binary(Topic) ->
|
||||||
|
@ -173,18 +211,21 @@ do_delete_route(Topic) when is_binary(Topic) ->
|
||||||
|
|
||||||
-spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
|
-spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
|
||||||
do_delete_route(Topic, Dest) ->
|
do_delete_route(Topic, Dest) ->
|
||||||
Route = #route{topic = Topic, dest = Dest},
|
mria_delete_route(get_schema_vsn(), Topic, Dest).
|
||||||
case emqx_topic:wildcard(Topic) of
|
|
||||||
true ->
|
mria_delete_route(v2, Topic, Dest) ->
|
||||||
Fun = fun emqx_router_utils:delete_trie_route/2,
|
mria_delete_route_v2(Topic, Dest);
|
||||||
emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD);
|
mria_delete_route(v1, Topic, Dest) ->
|
||||||
false ->
|
mria_delete_route_v1(Topic, Dest).
|
||||||
emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route)
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec topics() -> list(emqx_types:topic()).
|
-spec topics() -> list(emqx_types:topic()).
|
||||||
topics() ->
|
topics() ->
|
||||||
mnesia:dirty_all_keys(?ROUTE_TAB).
|
topics(get_schema_vsn()).
|
||||||
|
|
||||||
|
topics(v2) ->
|
||||||
|
list_topics_v2();
|
||||||
|
topics(v1) ->
|
||||||
|
list_topics_v1().
|
||||||
|
|
||||||
%% @doc Print routes to a topic
|
%% @doc Print routes to a topic
|
||||||
-spec print_routes(emqx_types:topic()) -> ok.
|
-spec print_routes(emqx_types:topic()) -> ok.
|
||||||
|
@ -196,12 +237,290 @@ print_routes(Topic) ->
|
||||||
match_routes(Topic)
|
match_routes(Topic)
|
||||||
).
|
).
|
||||||
|
|
||||||
|
-spec cleanup_routes(node()) -> ok.
|
||||||
|
cleanup_routes(Node) ->
|
||||||
|
cleanup_routes(get_schema_vsn(), Node).
|
||||||
|
|
||||||
|
cleanup_routes(v2, Node) ->
|
||||||
|
cleanup_routes_v2(Node);
|
||||||
|
cleanup_routes(v1, Node) ->
|
||||||
|
cleanup_routes_v1(Node).
|
||||||
|
|
||||||
|
-spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
|
||||||
|
foldl_routes(FoldFun, AccIn) ->
|
||||||
|
fold_routes(get_schema_vsn(), foldl, FoldFun, AccIn).
|
||||||
|
|
||||||
|
-spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
|
||||||
|
foldr_routes(FoldFun, AccIn) ->
|
||||||
|
fold_routes(get_schema_vsn(), foldr, FoldFun, AccIn).
|
||||||
|
|
||||||
|
fold_routes(v2, FunName, FoldFun, AccIn) ->
|
||||||
|
fold_routes_v2(FunName, FoldFun, AccIn);
|
||||||
|
fold_routes(v1, FunName, FoldFun, AccIn) ->
|
||||||
|
fold_routes_v1(FunName, FoldFun, AccIn).
|
||||||
|
|
||||||
call(Router, Msg) ->
|
call(Router, Msg) ->
|
||||||
gen_server:call(Router, Msg, infinity).
|
gen_server:call(Router, Msg, infinity).
|
||||||
|
|
||||||
pick(Topic) ->
|
pick(Topic) ->
|
||||||
gproc_pool:pick_worker(router_pool, Topic).
|
gproc_pool:pick_worker(router_pool, Topic).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Schema v1
|
||||||
|
%% --------------------------------------------------------------------
|
||||||
|
|
||||||
|
-dialyzer({nowarn_function, [cleanup_routes_v1/1]}).
|
||||||
|
|
||||||
|
mria_insert_route_v1(Topic, Dest) ->
|
||||||
|
Route = #route{topic = Topic, dest = Dest},
|
||||||
|
case emqx_topic:wildcard(Topic) of
|
||||||
|
true ->
|
||||||
|
mria_route_tab_insert_update_trie(Route);
|
||||||
|
false ->
|
||||||
|
mria_route_tab_insert(Route)
|
||||||
|
end.
|
||||||
|
|
||||||
|
mria_route_tab_insert_update_trie(Route) ->
|
||||||
|
emqx_router_utils:maybe_trans(
|
||||||
|
fun emqx_router_utils:insert_trie_route/2,
|
||||||
|
[?ROUTE_TAB, Route],
|
||||||
|
?ROUTE_SHARD
|
||||||
|
).
|
||||||
|
|
||||||
|
mria_route_tab_insert(Route) ->
|
||||||
|
mria:dirty_write(?ROUTE_TAB, Route).
|
||||||
|
|
||||||
|
mria_delete_route_v1(Topic, Dest) ->
|
||||||
|
Route = #route{topic = Topic, dest = Dest},
|
||||||
|
case emqx_topic:wildcard(Topic) of
|
||||||
|
true ->
|
||||||
|
mria_route_tab_delete_update_trie(Route);
|
||||||
|
false ->
|
||||||
|
mria_route_tab_delete(Route)
|
||||||
|
end.
|
||||||
|
|
||||||
|
mria_route_tab_delete_update_trie(Route) ->
|
||||||
|
emqx_router_utils:maybe_trans(
|
||||||
|
fun emqx_router_utils:delete_trie_route/2,
|
||||||
|
[?ROUTE_TAB, Route],
|
||||||
|
?ROUTE_SHARD
|
||||||
|
).
|
||||||
|
|
||||||
|
mria_route_tab_delete(Route) ->
|
||||||
|
mria:dirty_delete_object(?ROUTE_TAB, Route).
|
||||||
|
|
||||||
|
match_routes_v1(Topic) ->
|
||||||
|
lookup_route_tab(Topic) ++
|
||||||
|
lists:flatmap(fun lookup_route_tab/1, match_global_trie(Topic)).
|
||||||
|
|
||||||
|
match_global_trie(Topic) ->
|
||||||
|
case emqx_trie:empty() of
|
||||||
|
true -> [];
|
||||||
|
false -> emqx_trie:match(Topic)
|
||||||
|
end.
|
||||||
|
|
||||||
|
lookup_routes_v1(Topic) ->
|
||||||
|
lookup_route_tab(Topic).
|
||||||
|
|
||||||
|
lookup_route_tab(Topic) ->
|
||||||
|
ets:lookup(?ROUTE_TAB, Topic).
|
||||||
|
|
||||||
|
has_route_v1(Topic, Dest) ->
|
||||||
|
has_route_tab_entry(Topic, Dest).
|
||||||
|
|
||||||
|
has_route_tab_entry(Topic, Dest) ->
|
||||||
|
[] =/= ets:match(?ROUTE_TAB, #route{topic = Topic, dest = Dest}).
|
||||||
|
|
||||||
|
cleanup_routes_v1(Node) ->
|
||||||
|
Patterns = [
|
||||||
|
#route{_ = '_', dest = Node},
|
||||||
|
#route{_ = '_', dest = {'_', Node}}
|
||||||
|
],
|
||||||
|
mria:transaction(?ROUTE_SHARD, fun() ->
|
||||||
|
[
|
||||||
|
mnesia:delete_object(?ROUTE_TAB, Route, write)
|
||||||
|
|| Pat <- Patterns,
|
||||||
|
Route <- mnesia:match_object(?ROUTE_TAB, Pat, write)
|
||||||
|
]
|
||||||
|
end).
|
||||||
|
|
||||||
|
list_topics_v1() ->
|
||||||
|
list_route_tab_topics().
|
||||||
|
|
||||||
|
list_route_tab_topics() ->
|
||||||
|
mnesia:dirty_all_keys(?ROUTE_TAB).
|
||||||
|
|
||||||
|
fold_routes_v1(FunName, FoldFun, AccIn) ->
|
||||||
|
ets:FunName(FoldFun, AccIn, ?ROUTE_TAB).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Schema v2
|
||||||
|
%% One bag table exclusively for regular, non-filter subscription
|
||||||
|
%% topics, and one `emqx_topic_index` table exclusively for wildcard
|
||||||
|
%% topics. Writes go to only one of the two tables at a time.
|
||||||
|
%% --------------------------------------------------------------------
|
||||||
|
|
||||||
|
mria_insert_route_v2(Topic, Dest) ->
|
||||||
|
case emqx_trie_search:filter(Topic) of
|
||||||
|
Words when is_list(Words) ->
|
||||||
|
K = emqx_topic_index:make_key(Words, Dest),
|
||||||
|
mria:dirty_write(?ROUTE_TAB_FILTERS, #routeidx{entry = K});
|
||||||
|
false ->
|
||||||
|
mria_route_tab_insert(#route{topic = Topic, dest = Dest})
|
||||||
|
end.
|
||||||
|
|
||||||
|
mria_delete_route_v2(Topic, Dest) ->
|
||||||
|
case emqx_trie_search:filter(Topic) of
|
||||||
|
Words when is_list(Words) ->
|
||||||
|
K = emqx_topic_index:make_key(Words, Dest),
|
||||||
|
mria:dirty_delete(?ROUTE_TAB_FILTERS, K);
|
||||||
|
false ->
|
||||||
|
mria_route_tab_delete(#route{topic = Topic, dest = Dest})
|
||||||
|
end.
|
||||||
|
|
||||||
|
match_routes_v2(Topic) ->
|
||||||
|
lookup_route_tab(Topic) ++
|
||||||
|
[match_to_route(M) || M <- match_filters(Topic)].
|
||||||
|
|
||||||
|
match_filters(Topic) ->
|
||||||
|
emqx_topic_index:matches(Topic, ?ROUTE_TAB_FILTERS, []).
|
||||||
|
|
||||||
|
lookup_routes_v2(Topic) ->
|
||||||
|
case emqx_topic:wildcard(Topic) of
|
||||||
|
true ->
|
||||||
|
Pat = #routeidx{entry = emqx_topic_index:make_key(Topic, '$1')},
|
||||||
|
[Dest || [Dest] <- ets:match(?ROUTE_TAB_FILTERS, Pat)];
|
||||||
|
false ->
|
||||||
|
lookup_route_tab(Topic)
|
||||||
|
end.
|
||||||
|
|
||||||
|
has_route_v2(Topic, Dest) ->
|
||||||
|
case emqx_topic:wildcard(Topic) of
|
||||||
|
true ->
|
||||||
|
ets:member(?ROUTE_TAB_FILTERS, emqx_topic_index:make_key(Topic, Dest));
|
||||||
|
false ->
|
||||||
|
has_route_tab_entry(Topic, Dest)
|
||||||
|
end.
|
||||||
|
|
||||||
|
cleanup_routes_v2(Node) ->
|
||||||
|
% NOTE
|
||||||
|
% No point in transaction here because all the operations on filters table are dirty.
|
||||||
|
ok = ets:foldl(
|
||||||
|
fun(#routeidx{entry = K}, ok) ->
|
||||||
|
case get_dest_node(emqx_topic_index:get_id(K)) of
|
||||||
|
Node ->
|
||||||
|
mria:dirty_delete(?ROUTE_TAB_FILTERS, K);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
ok,
|
||||||
|
?ROUTE_TAB_FILTERS
|
||||||
|
),
|
||||||
|
ok = ets:foldl(
|
||||||
|
fun(#route{dest = Dest} = Route, ok) ->
|
||||||
|
case get_dest_node(Dest) of
|
||||||
|
Node ->
|
||||||
|
mria:dirty_delete_object(?ROUTE_TAB, Route);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
ok,
|
||||||
|
?ROUTE_TAB
|
||||||
|
).
|
||||||
|
|
||||||
|
get_dest_node({_, Node}) ->
|
||||||
|
Node;
|
||||||
|
get_dest_node(Node) ->
|
||||||
|
Node.
|
||||||
|
|
||||||
|
list_topics_v2() ->
|
||||||
|
Pat = #routeidx{entry = '$1'},
|
||||||
|
Filters = [emqx_topic_index:get_topic(K) || [K] <- ets:match(?ROUTE_TAB_FILTERS, Pat)],
|
||||||
|
list_route_tab_topics() ++ Filters.
|
||||||
|
|
||||||
|
fold_routes_v2(FunName, FoldFun, AccIn) ->
|
||||||
|
FilterFoldFun = mk_filtertab_fold_fun(FoldFun),
|
||||||
|
Acc = ets:FunName(FoldFun, AccIn, ?ROUTE_TAB),
|
||||||
|
ets:FunName(FilterFoldFun, Acc, ?ROUTE_TAB_FILTERS).
|
||||||
|
|
||||||
|
mk_filtertab_fold_fun(FoldFun) ->
|
||||||
|
fun(#routeidx{entry = K}, Acc) -> FoldFun(match_to_route(K), Acc) end.
|
||||||
|
|
||||||
|
match_to_route(M) ->
|
||||||
|
#route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Routing table type
|
||||||
|
%% --------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(PT_SCHEMA_VSN, {?MODULE, schemavsn}).
|
||||||
|
|
||||||
|
-type schemavsn() :: v1 | v2.
|
||||||
|
|
||||||
|
-spec get_schema_vsn() -> schemavsn().
|
||||||
|
get_schema_vsn() ->
|
||||||
|
persistent_term:get(?PT_SCHEMA_VSN).
|
||||||
|
|
||||||
|
-spec init_schema() -> ok.
|
||||||
|
init_schema() ->
|
||||||
|
ok = mria:wait_for_tables([?ROUTE_TAB, ?ROUTE_TAB_FILTERS]),
|
||||||
|
ok = emqx_trie:wait_for_tables(),
|
||||||
|
ConfSchema = emqx_config:get([broker, routing, storage_schema]),
|
||||||
|
Schema = choose_schema_vsn(ConfSchema),
|
||||||
|
ok = persistent_term:put(?PT_SCHEMA_VSN, Schema),
|
||||||
|
case Schema of
|
||||||
|
ConfSchema ->
|
||||||
|
?SLOG(info, #{
|
||||||
|
msg => "routing_schema_used",
|
||||||
|
schema => Schema
|
||||||
|
});
|
||||||
|
_ ->
|
||||||
|
?SLOG(notice, #{
|
||||||
|
msg => "configured_routing_schema_ignored",
|
||||||
|
schema_in_use => Schema,
|
||||||
|
configured => ConfSchema,
|
||||||
|
reason =>
|
||||||
|
"Could not use configured routing storage schema because "
|
||||||
|
"there are already non-empty routing tables pertaining to "
|
||||||
|
"another schema."
|
||||||
|
})
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec deinit_schema() -> ok.
|
||||||
|
deinit_schema() ->
|
||||||
|
_ = persistent_term:erase(?PT_SCHEMA_VSN),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-spec choose_schema_vsn(schemavsn()) -> schemavsn().
|
||||||
|
choose_schema_vsn(ConfType) ->
|
||||||
|
IsEmptyIndex = emqx_trie:empty(),
|
||||||
|
IsEmptyFilters = is_empty(?ROUTE_TAB_FILTERS),
|
||||||
|
case {IsEmptyIndex, IsEmptyFilters} of
|
||||||
|
{true, true} ->
|
||||||
|
ConfType;
|
||||||
|
{false, true} ->
|
||||||
|
v1;
|
||||||
|
{true, false} ->
|
||||||
|
v2;
|
||||||
|
{false, false} ->
|
||||||
|
?SLOG(critical, #{
|
||||||
|
msg => "conflicting_routing_schemas_detected_in_cluster",
|
||||||
|
configured => ConfType,
|
||||||
|
reason =>
|
||||||
|
"There are records in the routing tables related to both v1 "
|
||||||
|
"and v2 storage schemas. This probably means that some nodes "
|
||||||
|
"in the cluster use v1 schema and some use v2, independently "
|
||||||
|
"of each other. The routing is likely broken. Manual intervention "
|
||||||
|
"and full cluster restart is required. This node will shut down."
|
||||||
|
}),
|
||||||
|
error(conflicting_routing_schemas_detected_in_cluster)
|
||||||
|
end.
|
||||||
|
|
||||||
|
is_empty(Tab) ->
|
||||||
|
ets:first(Tab) =:= '$end_of_table'.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -148,11 +148,12 @@ handle_info({mnesia_table_event, Event}, State) ->
|
||||||
handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
|
handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
|
||||||
case mria_rlog:role() of
|
case mria_rlog:role() of
|
||||||
core ->
|
core ->
|
||||||
|
% TODO
|
||||||
|
% Node may flap, do we need to wait for any pending cleanups in `init/1`
|
||||||
|
% on the flapping node?
|
||||||
global:trans(
|
global:trans(
|
||||||
{?LOCK, self()},
|
{?LOCK, self()},
|
||||||
fun() ->
|
fun() -> cleanup_routes(Node) end
|
||||||
mria:transaction(?ROUTE_SHARD, fun ?MODULE:cleanup_routes/1, [Node])
|
|
||||||
end
|
|
||||||
),
|
),
|
||||||
ok = mria:dirty_delete(?ROUTING_NODE, Node);
|
ok = mria:dirty_delete(?ROUTING_NODE, Node);
|
||||||
replicant ->
|
replicant ->
|
||||||
|
@ -197,11 +198,4 @@ stats_fun() ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
cleanup_routes(Node) ->
|
cleanup_routes(Node) ->
|
||||||
Patterns = [
|
emqx_router:cleanup_routes(Node).
|
||||||
#route{_ = '_', dest = Node},
|
|
||||||
#route{_ = '_', dest = {'_', Node}}
|
|
||||||
],
|
|
||||||
[
|
|
||||||
mnesia:delete_object(?ROUTE_TAB, Route, write)
|
|
||||||
|| Pat <- Patterns, Route <- mnesia:match_object(?ROUTE_TAB, Pat, write)
|
|
||||||
].
|
|
||||||
|
|
|
@ -23,6 +23,8 @@
|
||||||
-export([init/1]).
|
-export([init/1]).
|
||||||
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
|
%% Init and log routing table type
|
||||||
|
ok = emqx_router:init_schema(),
|
||||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
|
|
|
@ -1358,6 +1358,11 @@ fields("broker") ->
|
||||||
ref("broker_perf"),
|
ref("broker_perf"),
|
||||||
#{importance => ?IMPORTANCE_HIDDEN}
|
#{importance => ?IMPORTANCE_HIDDEN}
|
||||||
)},
|
)},
|
||||||
|
{"routing",
|
||||||
|
sc(
|
||||||
|
ref("broker_routing"),
|
||||||
|
#{importance => ?IMPORTANCE_HIDDEN}
|
||||||
|
)},
|
||||||
%% FIXME: Need new design for shared subscription group
|
%% FIXME: Need new design for shared subscription group
|
||||||
{"shared_subscription_group",
|
{"shared_subscription_group",
|
||||||
sc(
|
sc(
|
||||||
|
@ -1369,6 +1374,18 @@ fields("broker") ->
|
||||||
}
|
}
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
|
fields("broker_routing") ->
|
||||||
|
[
|
||||||
|
{"storage_schema",
|
||||||
|
sc(
|
||||||
|
hoconsc:enum([v1, v2]),
|
||||||
|
#{
|
||||||
|
default => v1,
|
||||||
|
'readOnly' => true,
|
||||||
|
desc => ?DESC(broker_routing_storage_schema)
|
||||||
|
}
|
||||||
|
)}
|
||||||
|
];
|
||||||
fields("shared_subscription_group") ->
|
fields("shared_subscription_group") ->
|
||||||
[
|
[
|
||||||
{"strategy",
|
{"strategy",
|
||||||
|
|
|
@ -29,8 +29,8 @@
|
||||||
-export([get_topic/1]).
|
-export([get_topic/1]).
|
||||||
-export([get_record/2]).
|
-export([get_record/2]).
|
||||||
|
|
||||||
-type word() :: binary() | '+' | '#'.
|
-type key(ID) :: emqx_trie_search:key(ID).
|
||||||
-type key(ID) :: {[word()], {ID}}.
|
-type words() :: emqx_trie_search:words().
|
||||||
-type match(ID) :: key(ID).
|
-type match(ID) :: key(ID).
|
||||||
-type name() :: any().
|
-type name() :: any().
|
||||||
|
|
||||||
|
@ -50,7 +50,7 @@ new(Name) ->
|
||||||
%% @doc Insert a new entry into the index that associates given topic filter to given
|
%% @doc Insert a new entry into the index that associates given topic filter to given
|
||||||
%% record ID, and attaches arbitrary record to the entry. This allows users to choose
|
%% record ID, and attaches arbitrary record to the entry. This allows users to choose
|
||||||
%% between regular and "materialized" indexes, for example.
|
%% between regular and "materialized" indexes, for example.
|
||||||
-spec insert(emqx_types:topic(), _ID, _Record, name()) -> true.
|
-spec insert(emqx_types:topic() | words(), _ID, _Record, name()) -> true.
|
||||||
insert(Filter, ID, Record, Name) ->
|
insert(Filter, ID, Record, Name) ->
|
||||||
Tree = gbt(Name),
|
Tree = gbt(Name),
|
||||||
Key = key(Filter, ID),
|
Key = key(Filter, ID),
|
||||||
|
@ -59,7 +59,7 @@ insert(Filter, ID, Record, Name) ->
|
||||||
|
|
||||||
%% @doc Delete an entry from the index that associates given topic filter to given
|
%% @doc Delete an entry from the index that associates given topic filter to given
|
||||||
%% record ID. Deleting non-existing entry is not an error.
|
%% record ID. Deleting non-existing entry is not an error.
|
||||||
-spec delete(emqx_types:topic(), _ID, name()) -> true.
|
-spec delete(emqx_types:topic() | words(), _ID, name()) -> true.
|
||||||
delete(Filter, ID, Name) ->
|
delete(Filter, ID, Name) ->
|
||||||
Tree = gbt(Name),
|
Tree = gbt(Name),
|
||||||
Key = key(Filter, ID),
|
Key = key(Filter, ID),
|
||||||
|
|
|
@ -24,12 +24,15 @@
|
||||||
-export([match/2]).
|
-export([match/2]).
|
||||||
-export([matches/3]).
|
-export([matches/3]).
|
||||||
|
|
||||||
|
-export([make_key/2]).
|
||||||
|
|
||||||
-export([get_id/1]).
|
-export([get_id/1]).
|
||||||
-export([get_topic/1]).
|
-export([get_topic/1]).
|
||||||
-export([get_record/2]).
|
-export([get_record/2]).
|
||||||
|
|
||||||
-type key(ID) :: emqx_trie_search:key(ID).
|
-type key(ID) :: emqx_trie_search:key(ID).
|
||||||
-type match(ID) :: key(ID).
|
-type match(ID) :: key(ID).
|
||||||
|
-type words() :: emqx_trie_search:words().
|
||||||
|
|
||||||
%% @doc Create a new ETS table suitable for topic index.
|
%% @doc Create a new ETS table suitable for topic index.
|
||||||
%% Usable mostly for testing purposes.
|
%% Usable mostly for testing purposes.
|
||||||
|
@ -40,16 +43,20 @@ new() ->
|
||||||
%% @doc Insert a new entry into the index that associates given topic filter to given
|
%% @doc Insert a new entry into the index that associates given topic filter to given
|
||||||
%% record ID, and attaches arbitrary record to the entry. This allows users to choose
|
%% record ID, and attaches arbitrary record to the entry. This allows users to choose
|
||||||
%% between regular and "materialized" indexes, for example.
|
%% between regular and "materialized" indexes, for example.
|
||||||
-spec insert(emqx_types:topic(), _ID, _Record, ets:table()) -> true.
|
-spec insert(emqx_types:topic() | words(), _ID, _Record, ets:table()) -> true.
|
||||||
insert(Filter, ID, Record, Tab) ->
|
insert(Filter, ID, Record, Tab) ->
|
||||||
Key = key(Filter, ID),
|
Key = make_key(Filter, ID),
|
||||||
true = ets:insert(Tab, {Key, Record}).
|
true = ets:insert(Tab, {Key, Record}).
|
||||||
|
|
||||||
%% @doc Delete an entry from the index that associates given topic filter to given
|
%% @doc Delete an entry from the index that associates given topic filter to given
|
||||||
%% record ID. Deleting non-existing entry is not an error.
|
%% record ID. Deleting non-existing entry is not an error.
|
||||||
-spec delete(emqx_types:topic(), _ID, ets:table()) -> true.
|
-spec delete(emqx_types:topic() | words(), _ID, ets:table()) -> true.
|
||||||
delete(Filter, ID, Tab) ->
|
delete(Filter, ID, Tab) ->
|
||||||
true = ets:delete(Tab, key(Filter, ID)).
|
ets:delete(Tab, make_key(Filter, ID)).
|
||||||
|
|
||||||
|
-spec make_key(emqx_types:topic() | words(), ID) -> key(ID).
|
||||||
|
make_key(TopicOrFilter, ID) ->
|
||||||
|
emqx_trie_search:make_key(TopicOrFilter, ID).
|
||||||
|
|
||||||
%% @doc Match given topic against the index and return the first match, or `false` if
|
%% @doc Match given topic against the index and return the first match, or `false` if
|
||||||
%% no match is found.
|
%% no match is found.
|
||||||
|
@ -73,13 +80,16 @@ get_topic(Key) ->
|
||||||
emqx_trie_search:get_topic(Key).
|
emqx_trie_search:get_topic(Key).
|
||||||
|
|
||||||
%% @doc Fetch the record associated with the match.
|
%% @doc Fetch the record associated with the match.
|
||||||
%% NOTE: Only really useful for ETS tables where the record ID is the first element.
|
%% May return empty list if the index entry was deleted in the meantime.
|
||||||
-spec get_record(match(_ID), ets:table()) -> _Record.
|
%% NOTE: Only really useful for ETS tables where the record data is the last element.
|
||||||
|
-spec get_record(match(_ID), ets:table()) -> [_Record].
|
||||||
get_record(K, Tab) ->
|
get_record(K, Tab) ->
|
||||||
ets:lookup_element(Tab, K, 2).
|
case ets:lookup(Tab, K) of
|
||||||
|
[Entry] ->
|
||||||
key(TopicOrFilter, ID) ->
|
[erlang:element(tuple_size(Entry), Entry)];
|
||||||
emqx_trie_search:make_key(TopicOrFilter, ID).
|
[] ->
|
||||||
|
[]
|
||||||
|
end.
|
||||||
|
|
||||||
make_nextf(Tab) ->
|
make_nextf(Tab) ->
|
||||||
fun(Key) -> ets:next(Tab, Key) end.
|
fun(Key) -> ets:next(Tab, Key) end.
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
-export([
|
-export([
|
||||||
mnesia/1,
|
mnesia/1,
|
||||||
|
wait_for_tables/0,
|
||||||
create_session_trie/1
|
create_session_trie/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -105,6 +106,10 @@ create_session_trie(Type) ->
|
||||||
]
|
]
|
||||||
).
|
).
|
||||||
|
|
||||||
|
-spec wait_for_tables() -> ok | {error, _Reason}.
|
||||||
|
wait_for_tables() ->
|
||||||
|
mria:wait_for_tables([?TRIE]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Topics APIs
|
%% Topics APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -98,24 +98,24 @@
|
||||||
|
|
||||||
-module(emqx_trie_search).
|
-module(emqx_trie_search).
|
||||||
|
|
||||||
-export([make_key/2]).
|
-export([make_key/2, filter/1]).
|
||||||
-export([match/2, matches/3, get_id/1, get_topic/1]).
|
-export([match/2, matches/3, get_id/1, get_topic/1]).
|
||||||
-export_type([key/1, word/0, nextf/0, opts/0]).
|
-export_type([key/1, word/0, words/0, nextf/0, opts/0]).
|
||||||
|
|
||||||
-define(END, '$end_of_table').
|
-define(END, '$end_of_table').
|
||||||
|
|
||||||
-type word() :: binary() | '+' | '#'.
|
-type word() :: binary() | '+' | '#'.
|
||||||
|
-type words() :: [word()].
|
||||||
-type base_key() :: {binary() | [word()], {}}.
|
-type base_key() :: {binary() | [word()], {}}.
|
||||||
-type key(ID) :: {binary() | [word()], {ID}}.
|
-type key(ID) :: {binary() | [word()], {ID}}.
|
||||||
-type nextf() :: fun((key(_) | base_key()) -> ?END | key(_)).
|
-type nextf() :: fun((key(_) | base_key()) -> ?END | key(_)).
|
||||||
-type opts() :: [unique | return_first].
|
-type opts() :: [unique | return_first].
|
||||||
|
|
||||||
%% @doc Make a search-key for the given topic.
|
%% @doc Make a search-key for the given topic.
|
||||||
-spec make_key(emqx_types:topic(), ID) -> key(ID).
|
-spec make_key(emqx_types:topic() | words(), ID) -> key(ID).
|
||||||
make_key(Topic, ID) when is_binary(Topic) ->
|
make_key(Topic, ID) when is_binary(Topic) ->
|
||||||
Words = filter_words(Topic),
|
case filter(Topic) of
|
||||||
case emqx_topic:wildcard(Words) of
|
Words when is_list(Words) ->
|
||||||
true ->
|
|
||||||
%% it's a wildcard
|
%% it's a wildcard
|
||||||
{Words, {ID}};
|
{Words, {ID}};
|
||||||
false ->
|
false ->
|
||||||
|
@ -123,7 +123,15 @@ make_key(Topic, ID) when is_binary(Topic) ->
|
||||||
%% because they can be found with direct lookups.
|
%% because they can be found with direct lookups.
|
||||||
%% it is also more compact in memory.
|
%% it is also more compact in memory.
|
||||||
{Topic, {ID}}
|
{Topic, {ID}}
|
||||||
end.
|
end;
|
||||||
|
make_key(Words, ID) when is_list(Words) ->
|
||||||
|
{Words, {ID}}.
|
||||||
|
|
||||||
|
%% @doc Parse a topic filter into a list of words. Returns `false` if it's not a filter.
|
||||||
|
-spec filter(emqx_types:topic()) -> words() | false.
|
||||||
|
filter(Topic) ->
|
||||||
|
Words = filter_words(Topic),
|
||||||
|
emqx_topic:wildcard(Words) andalso Words.
|
||||||
|
|
||||||
%% @doc Extract record ID from the match.
|
%% @doc Extract record ID from the match.
|
||||||
-spec get_id(key(ID)) -> ID.
|
-spec get_id(key(ID)) -> ID.
|
||||||
|
@ -325,6 +333,7 @@ filter_words(Topic) when is_binary(Topic) ->
|
||||||
% `match_filter/3` expects.
|
% `match_filter/3` expects.
|
||||||
[word(W, filter) || W <- emqx_topic:tokens(Topic)].
|
[word(W, filter) || W <- emqx_topic:tokens(Topic)].
|
||||||
|
|
||||||
|
-spec topic_words(emqx_types:topic()) -> [binary()].
|
||||||
topic_words(Topic) when is_binary(Topic) ->
|
topic_words(Topic) when is_binary(Topic) ->
|
||||||
[word(W, topic) || W <- emqx_topic:tokens(Topic)].
|
[word(W, topic) || W <- emqx_topic:tokens(Topic)].
|
||||||
|
|
||||||
|
|
|
@ -19,9 +19,13 @@
|
||||||
-export([start/2]).
|
-export([start/2]).
|
||||||
-export([stop/1]).
|
-export([stop/1]).
|
||||||
|
|
||||||
|
-export([start_bare_node/2]).
|
||||||
|
|
||||||
-export([share_load_module/2]).
|
-export([share_load_module/2]).
|
||||||
-export([node_name/1]).
|
-export([node_name/1]).
|
||||||
|
|
||||||
|
-export([node_name/1]).
|
||||||
|
|
||||||
-define(APPS_CLUSTERING, [gen_rpc, mria, ekka]).
|
-define(APPS_CLUSTERING, [gen_rpc, mria, ekka]).
|
||||||
|
|
||||||
-define(TIMEOUT_NODE_START_MS, 15000).
|
-define(TIMEOUT_NODE_START_MS, 15000).
|
||||||
|
@ -259,9 +263,6 @@ allocate_listener_ports(Types, Spec) ->
|
||||||
|
|
||||||
start_node_init(Spec = #{name := Node}) ->
|
start_node_init(Spec = #{name := Node}) ->
|
||||||
Node = start_bare_node(Node, Spec),
|
Node = start_bare_node(Node, Spec),
|
||||||
pong = net_adm:ping(Node),
|
|
||||||
% Preserve node spec right on the remote node
|
|
||||||
ok = set_node_opts(Node, Spec),
|
|
||||||
% Make it possible to call `ct:pal` and friends (if running under rebar3)
|
% Make it possible to call `ct:pal` and friends (if running under rebar3)
|
||||||
_ = share_load_module(Node, cthr),
|
_ = share_load_module(Node, cthr),
|
||||||
% Enable snabbkaffe trace forwarding
|
% Enable snabbkaffe trace forwarding
|
||||||
|
@ -366,7 +367,8 @@ listener_port(BasePort, wss) ->
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
start_bare_node(Name, #{driver := ct_slave}) ->
|
-spec start_bare_node(atom(), map()) -> node().
|
||||||
|
start_bare_node(Name, Spec = #{driver := ct_slave}) ->
|
||||||
{ok, Node} = ct_slave:start(
|
{ok, Node} = ct_slave:start(
|
||||||
node_name(Name),
|
node_name(Name),
|
||||||
[
|
[
|
||||||
|
@ -378,9 +380,15 @@ start_bare_node(Name, #{driver := ct_slave}) ->
|
||||||
{env, []}
|
{env, []}
|
||||||
]
|
]
|
||||||
),
|
),
|
||||||
Node;
|
init_bare_node(Node, Spec);
|
||||||
start_bare_node(Name, #{driver := slave}) ->
|
start_bare_node(Name, Spec = #{driver := slave}) ->
|
||||||
{ok, Node} = slave:start_link(host(), Name, ebin_path()),
|
{ok, Node} = slave:start_link(host(), Name, ebin_path()),
|
||||||
|
init_bare_node(Node, Spec).
|
||||||
|
|
||||||
|
init_bare_node(Node, Spec) ->
|
||||||
|
pong = net_adm:ping(Node),
|
||||||
|
% Preserve node spec right on the remote node
|
||||||
|
ok = set_node_opts(Node, Spec),
|
||||||
Node.
|
Node.
|
||||||
|
|
||||||
erl_flags() ->
|
erl_flags() ->
|
||||||
|
@ -403,6 +411,7 @@ share_load_module(Node, Module) ->
|
||||||
error
|
error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec node_name(atom()) -> node().
|
||||||
node_name(Name) ->
|
node_name(Name) ->
|
||||||
case string:tokens(atom_to_list(Name), "@") of
|
case string:tokens(atom_to_list(Name), "@") of
|
||||||
[_Name, _Host] ->
|
[_Name, _Host] ->
|
||||||
|
|
|
@ -261,7 +261,7 @@ merge_envs(false, E2) ->
|
||||||
merge_envs(_E, false) ->
|
merge_envs(_E, false) ->
|
||||||
[];
|
[];
|
||||||
merge_envs(E1, E2) ->
|
merge_envs(E1, E2) ->
|
||||||
E1 ++ E2.
|
lists:foldl(fun({K, _} = Opt, EAcc) -> lists:keystore(K, 1, EAcc, Opt) end, E1, E2).
|
||||||
|
|
||||||
merge_config(false, C2) ->
|
merge_config(false, C2) ->
|
||||||
C2;
|
C2;
|
||||||
|
|
|
@ -1094,7 +1094,7 @@ t_multi_streams_unsub(Config) ->
|
||||||
?retry(
|
?retry(
|
||||||
_Sleep2 = 100,
|
_Sleep2 = 100,
|
||||||
_Attempts2 = 50,
|
_Attempts2 = 50,
|
||||||
false = emqx_router:has_routes(Topic)
|
[] = emqx_router:lookup_routes(Topic)
|
||||||
),
|
),
|
||||||
|
|
||||||
case emqtt:publish_via(C, PubVia, Topic, #{}, <<6, 7, 8, 9>>, [{qos, PubQos}]) of
|
case emqtt:publish_via(C, PubVia, Topic, #{}, <<6, 7, 8, 9>>, [{qos, PubQos}]) of
|
||||||
|
|
|
@ -26,24 +26,37 @@
|
||||||
|
|
||||||
-define(R, emqx_router).
|
-define(R, emqx_router).
|
||||||
|
|
||||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
all() ->
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
|
||||||
PrevBootModules = application:get_env(emqx, boot_modules),
|
|
||||||
emqx_common_test_helpers:boot_modules([router]),
|
|
||||||
emqx_common_test_helpers:start_apps([]),
|
|
||||||
[
|
[
|
||||||
{prev_boot_modules, PrevBootModules}
|
{group, routing_schema_v1},
|
||||||
| Config
|
{group, routing_schema_v2}
|
||||||
].
|
].
|
||||||
|
|
||||||
end_per_suite(Config) ->
|
groups() ->
|
||||||
PrevBootModules = ?config(prev_boot_modules, Config),
|
TCs = emqx_common_test_helpers:all(?MODULE),
|
||||||
case PrevBootModules of
|
[
|
||||||
undefined -> ok;
|
{routing_schema_v1, [], TCs},
|
||||||
{ok, Mods} -> emqx_common_test_helpers:boot_modules(Mods)
|
{routing_schema_v2, [], TCs}
|
||||||
end,
|
].
|
||||||
emqx_common_test_helpers:stop_apps([]).
|
|
||||||
|
init_per_group(GroupName, Config) ->
|
||||||
|
WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]),
|
||||||
|
AppSpecs = [
|
||||||
|
{emqx, #{
|
||||||
|
config => mk_config(GroupName),
|
||||||
|
override_env => [{boot_modules, [router]}]
|
||||||
|
}}
|
||||||
|
],
|
||||||
|
Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}),
|
||||||
|
[{group_apps, Apps}, {group_name, GroupName} | Config].
|
||||||
|
|
||||||
|
end_per_group(_GroupName, Config) ->
|
||||||
|
ok = emqx_cth_suite:stop(?config(group_apps, Config)).
|
||||||
|
|
||||||
|
mk_config(routing_schema_v1) ->
|
||||||
|
"broker.routing.storage_schema = v1";
|
||||||
|
mk_config(routing_schema_v2) ->
|
||||||
|
"broker.routing.storage_schema = v2".
|
||||||
|
|
||||||
init_per_testcase(_TestCase, Config) ->
|
init_per_testcase(_TestCase, Config) ->
|
||||||
clear_tables(),
|
clear_tables(),
|
||||||
|
@ -52,23 +65,16 @@ init_per_testcase(_TestCase, Config) ->
|
||||||
end_per_testcase(_TestCase, _Config) ->
|
end_per_testcase(_TestCase, _Config) ->
|
||||||
clear_tables().
|
clear_tables().
|
||||||
|
|
||||||
% t_add_route(_) ->
|
|
||||||
% error('TODO').
|
|
||||||
|
|
||||||
% t_do_add_route(_) ->
|
|
||||||
% error('TODO').
|
|
||||||
|
|
||||||
% t_lookup_routes(_) ->
|
% t_lookup_routes(_) ->
|
||||||
% error('TODO').
|
% error('TODO').
|
||||||
|
|
||||||
% t_delete_route(_) ->
|
t_verify_type(Config) ->
|
||||||
% error('TODO').
|
case ?config(group_name, Config) of
|
||||||
|
routing_schema_v1 ->
|
||||||
% t_do_delete_route(_) ->
|
?assertEqual(v1, ?R:get_schema_vsn());
|
||||||
% error('TODO').
|
routing_schema_v2 ->
|
||||||
|
?assertEqual(v2, ?R:get_schema_vsn())
|
||||||
% t_topics(_) ->
|
end.
|
||||||
% error('TODO').
|
|
||||||
|
|
||||||
t_add_delete(_) ->
|
t_add_delete(_) ->
|
||||||
?R:add_route(<<"a/b/c">>),
|
?R:add_route(<<"a/b/c">>),
|
||||||
|
@ -79,6 +85,55 @@ t_add_delete(_) ->
|
||||||
?R:delete_route(<<"a/+/b">>, node()),
|
?R:delete_route(<<"a/+/b">>, node()),
|
||||||
?assertEqual([], ?R:topics()).
|
?assertEqual([], ?R:topics()).
|
||||||
|
|
||||||
|
t_add_delete_incremental(_) ->
|
||||||
|
?R:add_route(<<"a/b/c">>),
|
||||||
|
?R:add_route(<<"a/+/c">>, node()),
|
||||||
|
?R:add_route(<<"a/+/+">>, node()),
|
||||||
|
?R:add_route(<<"a/b/#">>, node()),
|
||||||
|
?R:add_route(<<"#">>, node()),
|
||||||
|
?assertEqual(
|
||||||
|
[
|
||||||
|
#route{topic = <<"#">>, dest = node()},
|
||||||
|
#route{topic = <<"a/+/+">>, dest = node()},
|
||||||
|
#route{topic = <<"a/+/c">>, dest = node()},
|
||||||
|
#route{topic = <<"a/b/#">>, dest = node()},
|
||||||
|
#route{topic = <<"a/b/c">>, dest = node()}
|
||||||
|
],
|
||||||
|
lists:sort(?R:match_routes(<<"a/b/c">>))
|
||||||
|
),
|
||||||
|
?R:delete_route(<<"a/+/c">>, node()),
|
||||||
|
?assertEqual(
|
||||||
|
[
|
||||||
|
#route{topic = <<"#">>, dest = node()},
|
||||||
|
#route{topic = <<"a/+/+">>, dest = node()},
|
||||||
|
#route{topic = <<"a/b/#">>, dest = node()},
|
||||||
|
#route{topic = <<"a/b/c">>, dest = node()}
|
||||||
|
],
|
||||||
|
lists:sort(?R:match_routes(<<"a/b/c">>))
|
||||||
|
),
|
||||||
|
?R:delete_route(<<"a/+/+">>, node()),
|
||||||
|
?assertEqual(
|
||||||
|
[
|
||||||
|
#route{topic = <<"#">>, dest = node()},
|
||||||
|
#route{topic = <<"a/b/#">>, dest = node()},
|
||||||
|
#route{topic = <<"a/b/c">>, dest = node()}
|
||||||
|
],
|
||||||
|
lists:sort(?R:match_routes(<<"a/b/c">>))
|
||||||
|
),
|
||||||
|
?R:delete_route(<<"a/b/#">>, node()),
|
||||||
|
?assertEqual(
|
||||||
|
[
|
||||||
|
#route{topic = <<"#">>, dest = node()},
|
||||||
|
#route{topic = <<"a/b/c">>, dest = node()}
|
||||||
|
],
|
||||||
|
lists:sort(?R:match_routes(<<"a/b/c">>))
|
||||||
|
),
|
||||||
|
?R:delete_route(<<"a/b/c">>, node()),
|
||||||
|
?assertEqual(
|
||||||
|
[#route{topic = <<"#">>, dest = node()}],
|
||||||
|
lists:sort(?R:match_routes(<<"a/b/c">>))
|
||||||
|
).
|
||||||
|
|
||||||
t_do_add_delete(_) ->
|
t_do_add_delete(_) ->
|
||||||
?R:do_add_route(<<"a/b/c">>),
|
?R:do_add_route(<<"a/b/c">>),
|
||||||
?R:do_add_route(<<"a/b/c">>, node()),
|
?R:do_add_route(<<"a/b/c">>, node()),
|
||||||
|
@ -114,9 +169,9 @@ t_print_routes(_) ->
|
||||||
?R:add_route(<<"+/+">>),
|
?R:add_route(<<"+/+">>),
|
||||||
?R:print_routes(<<"a/b">>).
|
?R:print_routes(<<"a/b">>).
|
||||||
|
|
||||||
t_has_routes(_) ->
|
t_has_route(_) ->
|
||||||
?R:add_route(<<"devices/+/messages">>, node()),
|
?R:add_route(<<"devices/+/messages">>, node()),
|
||||||
?assert(?R:has_routes(<<"devices/+/messages">>)),
|
?assert(?R:has_route(<<"devices/+/messages">>, node())),
|
||||||
?R:delete_route(<<"devices/+/messages">>).
|
?R:delete_route(<<"devices/+/messages">>).
|
||||||
|
|
||||||
t_unexpected(_) ->
|
t_unexpected(_) ->
|
||||||
|
@ -128,5 +183,5 @@ t_unexpected(_) ->
|
||||||
clear_tables() ->
|
clear_tables() ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun mnesia:clear_table/1,
|
fun mnesia:clear_table/1,
|
||||||
[?ROUTE_TAB, ?TRIE, emqx_trie_node]
|
[?ROUTE_TAB, ?ROUTE_TAB_FILTERS, ?TRIE]
|
||||||
).
|
).
|
||||||
|
|
|
@ -26,55 +26,45 @@
|
||||||
|
|
||||||
-define(ROUTER_HELPER, emqx_router_helper).
|
-define(ROUTER_HELPER, emqx_router_helper).
|
||||||
|
|
||||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
all() ->
|
||||||
|
[
|
||||||
|
{group, routing_schema_v1},
|
||||||
|
{group, routing_schema_v2}
|
||||||
|
].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
groups() ->
|
||||||
DistPid =
|
TCs = emqx_common_test_helpers:all(?MODULE),
|
||||||
case net_kernel:nodename() of
|
[
|
||||||
ignored ->
|
{routing_schema_v1, [], TCs},
|
||||||
%% calling `net_kernel:start' without `epmd'
|
{routing_schema_v2, [], TCs}
|
||||||
%% running will result in a failure.
|
].
|
||||||
emqx_common_test_helpers:start_epmd(),
|
|
||||||
{ok, Pid} = net_kernel:start(['test@127.0.0.1', longnames]),
|
|
||||||
Pid;
|
|
||||||
_ ->
|
|
||||||
undefined
|
|
||||||
end,
|
|
||||||
emqx_common_test_helpers:start_apps([]),
|
|
||||||
[{dist_pid, DistPid} | Config].
|
|
||||||
|
|
||||||
end_per_suite(Config) ->
|
init_per_group(GroupName, Config) ->
|
||||||
DistPid = ?config(dist_pid, Config),
|
WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]),
|
||||||
case DistPid of
|
AppSpecs = [{emqx, mk_config(GroupName)}],
|
||||||
Pid when is_pid(Pid) ->
|
Apps = emqx_cth_suite:start(AppSpecs, #{work_dir => WorkDir}),
|
||||||
net_kernel:stop();
|
[{group_name, GroupName}, {group_apps, Apps} | Config].
|
||||||
_ ->
|
|
||||||
ok
|
end_per_group(_GroupName, Config) ->
|
||||||
end,
|
ok = emqx_cth_suite:stop(?config(group_apps, Config)).
|
||||||
emqx_common_test_helpers:stop_apps([]).
|
|
||||||
|
mk_config(routing_schema_v1) ->
|
||||||
|
#{
|
||||||
|
config => "broker.routing.storage_schema = v1",
|
||||||
|
override_env => [{boot_modules, [router]}]
|
||||||
|
};
|
||||||
|
mk_config(routing_schema_v2) ->
|
||||||
|
#{
|
||||||
|
config => "broker.routing.storage_schema = v2",
|
||||||
|
override_env => [{boot_modules, [router]}]
|
||||||
|
}.
|
||||||
|
|
||||||
init_per_testcase(TestCase, Config) when
|
|
||||||
TestCase =:= t_cleanup_membership_mnesia_down;
|
|
||||||
TestCase =:= t_cleanup_membership_node_down;
|
|
||||||
TestCase =:= t_cleanup_monitor_node_down
|
|
||||||
->
|
|
||||||
ok = snabbkaffe:start_trace(),
|
|
||||||
Slave = emqx_common_test_helpers:start_slave(some_node, []),
|
|
||||||
[{slave, Slave} | Config];
|
|
||||||
init_per_testcase(_TestCase, Config) ->
|
init_per_testcase(_TestCase, Config) ->
|
||||||
|
ok = snabbkaffe:start_trace(),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_testcase(TestCase, Config) when
|
|
||||||
TestCase =:= t_cleanup_membership_mnesia_down;
|
|
||||||
TestCase =:= t_cleanup_membership_node_down;
|
|
||||||
TestCase =:= t_cleanup_monitor_node_down
|
|
||||||
->
|
|
||||||
Slave = ?config(slave, Config),
|
|
||||||
emqx_common_test_helpers:stop_slave(Slave),
|
|
||||||
mria:clear_table(?ROUTE_TAB),
|
|
||||||
snabbkaffe:stop(),
|
|
||||||
ok;
|
|
||||||
end_per_testcase(_TestCase, _Config) ->
|
end_per_testcase(_TestCase, _Config) ->
|
||||||
|
ok = snabbkaffe:stop(),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_monitor(_) ->
|
t_monitor(_) ->
|
||||||
|
@ -89,8 +79,8 @@ t_mnesia(_) ->
|
||||||
?ROUTER_HELPER ! {membership, {mnesia, down, node()}},
|
?ROUTER_HELPER ! {membership, {mnesia, down, node()}},
|
||||||
ct:sleep(200).
|
ct:sleep(200).
|
||||||
|
|
||||||
t_cleanup_membership_mnesia_down(Config) ->
|
t_cleanup_membership_mnesia_down(_Config) ->
|
||||||
Slave = ?config(slave, Config),
|
Slave = emqx_cth_cluster:node_name(?FUNCTION_NAME),
|
||||||
emqx_router:add_route(<<"a/b/c">>, Slave),
|
emqx_router:add_route(<<"a/b/c">>, Slave),
|
||||||
emqx_router:add_route(<<"d/e/f">>, node()),
|
emqx_router:add_route(<<"d/e/f">>, node()),
|
||||||
?assertMatch([_, _], emqx_router:topics()),
|
?assertMatch([_, _], emqx_router:topics()),
|
||||||
|
@ -101,8 +91,8 @@ t_cleanup_membership_mnesia_down(Config) ->
|
||||||
),
|
),
|
||||||
?assertEqual([<<"d/e/f">>], emqx_router:topics()).
|
?assertEqual([<<"d/e/f">>], emqx_router:topics()).
|
||||||
|
|
||||||
t_cleanup_membership_node_down(Config) ->
|
t_cleanup_membership_node_down(_Config) ->
|
||||||
Slave = ?config(slave, Config),
|
Slave = emqx_cth_cluster:node_name(?FUNCTION_NAME),
|
||||||
emqx_router:add_route(<<"a/b/c">>, Slave),
|
emqx_router:add_route(<<"a/b/c">>, Slave),
|
||||||
emqx_router:add_route(<<"d/e/f">>, node()),
|
emqx_router:add_route(<<"d/e/f">>, node()),
|
||||||
?assertMatch([_, _], emqx_router:topics()),
|
?assertMatch([_, _], emqx_router:topics()),
|
||||||
|
@ -113,13 +103,13 @@ t_cleanup_membership_node_down(Config) ->
|
||||||
),
|
),
|
||||||
?assertEqual([<<"d/e/f">>], emqx_router:topics()).
|
?assertEqual([<<"d/e/f">>], emqx_router:topics()).
|
||||||
|
|
||||||
t_cleanup_monitor_node_down(Config) ->
|
t_cleanup_monitor_node_down(_Config) ->
|
||||||
Slave = ?config(slave, Config),
|
Slave = emqx_cth_cluster:start_bare_node(?FUNCTION_NAME, #{driver => ct_slave}),
|
||||||
emqx_router:add_route(<<"a/b/c">>, Slave),
|
emqx_router:add_route(<<"a/b/c">>, Slave),
|
||||||
emqx_router:add_route(<<"d/e/f">>, node()),
|
emqx_router:add_route(<<"d/e/f">>, node()),
|
||||||
?assertMatch([_, _], emqx_router:topics()),
|
?assertMatch([_, _], emqx_router:topics()),
|
||||||
?wait_async_action(
|
?wait_async_action(
|
||||||
emqx_common_test_helpers:stop_slave(Slave),
|
emqx_cth_cluster:stop([Slave]),
|
||||||
#{?snk_kind := emqx_router_helper_cleanup_done, node := Slave},
|
#{?snk_kind := emqx_router_helper_cleanup_done, node := Slave},
|
||||||
1_000
|
1_000
|
||||||
),
|
),
|
||||||
|
|
|
@ -0,0 +1,258 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_routing_SUITE).
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("emqx/include/asserts.hrl").
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
[
|
||||||
|
{group, routing_schema_v1},
|
||||||
|
{group, routing_schema_v2},
|
||||||
|
t_routing_schema_switch_v1,
|
||||||
|
t_routing_schema_switch_v2
|
||||||
|
].
|
||||||
|
|
||||||
|
groups() ->
|
||||||
|
TCs = [
|
||||||
|
t_cluster_routing
|
||||||
|
],
|
||||||
|
[
|
||||||
|
{routing_schema_v1, [], TCs},
|
||||||
|
{routing_schema_v2, [], TCs}
|
||||||
|
].
|
||||||
|
|
||||||
|
init_per_group(GroupName, Config) ->
|
||||||
|
WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, GroupName]),
|
||||||
|
NodeSpecs = [
|
||||||
|
{emqx_routing_SUITE1, #{apps => [mk_emqx_appspec(GroupName, 1)], role => core}},
|
||||||
|
{emqx_routing_SUITE2, #{apps => [mk_emqx_appspec(GroupName, 2)], role => core}},
|
||||||
|
{emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(GroupName, 3)], role => replicant}}
|
||||||
|
],
|
||||||
|
Nodes = emqx_cth_cluster:start(NodeSpecs, #{work_dir => WorkDir}),
|
||||||
|
[{cluster, Nodes} | Config].
|
||||||
|
|
||||||
|
end_per_group(_GroupName, Config) ->
|
||||||
|
emqx_cth_cluster:stop(?config(cluster, Config)).
|
||||||
|
|
||||||
|
init_per_testcase(TC, Config) ->
|
||||||
|
WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, TC]),
|
||||||
|
[{work_dir, WorkDir} | Config].
|
||||||
|
|
||||||
|
end_per_testcase(_TC, _Config) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
mk_emqx_appspec(GroupName, N) ->
|
||||||
|
{emqx, #{
|
||||||
|
config => mk_config(GroupName, N),
|
||||||
|
after_start => fun() ->
|
||||||
|
% NOTE
|
||||||
|
% This one is actually defined on `emqx_conf_schema` level, but used
|
||||||
|
% in `emqx_broker`. Thus we have to resort to this ugly hack.
|
||||||
|
emqx_config:force_put([rpc, mode], async)
|
||||||
|
end
|
||||||
|
}}.
|
||||||
|
|
||||||
|
mk_genrpc_appspec() ->
|
||||||
|
{gen_rpc, #{
|
||||||
|
override_env => [{port_discovery, stateless}]
|
||||||
|
}}.
|
||||||
|
|
||||||
|
mk_config(GroupName, N) ->
|
||||||
|
#{
|
||||||
|
broker => mk_config_broker(GroupName),
|
||||||
|
listeners => mk_config_listeners(N)
|
||||||
|
}.
|
||||||
|
|
||||||
|
mk_config_broker(Vsn) when Vsn == routing_schema_v1; Vsn == v1 ->
|
||||||
|
#{routing => #{storage_schema => v1}};
|
||||||
|
mk_config_broker(Vsn) when Vsn == routing_schema_v2; Vsn == v2 ->
|
||||||
|
#{routing => #{storage_schema => v2}}.
|
||||||
|
|
||||||
|
mk_config_listeners(N) ->
|
||||||
|
Port = 1883 + N,
|
||||||
|
#{
|
||||||
|
tcp => #{default => #{bind => "127.0.0.1:" ++ integer_to_list(Port)}},
|
||||||
|
ssl => #{default => #{enable => false}},
|
||||||
|
ws => #{default => #{enable => false}},
|
||||||
|
wss => #{default => #{enable => false}}
|
||||||
|
}.
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
t_cluster_routing(Config) ->
|
||||||
|
Cluster = ?config(cluster, Config),
|
||||||
|
Clients = [C1, C2, C3] = [start_client(N) || N <- Cluster],
|
||||||
|
Commands = [
|
||||||
|
{fun publish/3, [C1, <<"a/b/c">>, <<"wontsee">>]},
|
||||||
|
{fun publish/3, [C2, <<"a/b/d">>, <<"wontsee">>]},
|
||||||
|
{fun subscribe/2, [C3, <<"a/+/c/#">>]},
|
||||||
|
{fun publish/3, [C1, <<"a/b/c">>, <<"01">>]},
|
||||||
|
{fun publish/3, [C2, <<"a/b/d">>, <<"wontsee">>]},
|
||||||
|
{fun subscribe/2, [C1, <<"a/b/c">>]},
|
||||||
|
{fun subscribe/2, [C2, <<"a/b/+">>]},
|
||||||
|
{fun publish/3, [C3, <<"a/b/c">>, <<"02">>]},
|
||||||
|
{fun publish/3, [C2, <<"a/b/d">>, <<"03">>]},
|
||||||
|
{fun publish/3, [C2, <<"a/b/c/d">>, <<"04">>]},
|
||||||
|
{fun subscribe/2, [C3, <<"a/b/d">>]},
|
||||||
|
{fun publish/3, [C1, <<"a/b/d">>, <<"05">>]},
|
||||||
|
{fun unsubscribe/2, [C3, <<"a/+/c/#">>]},
|
||||||
|
{fun publish/3, [C1, <<"a/b/c">>, <<"06">>]},
|
||||||
|
{fun publish/3, [C2, <<"a/b/d">>, <<"07">>]},
|
||||||
|
{fun publish/3, [C2, <<"a/b/c/d">>, <<"08">>]},
|
||||||
|
{fun unsubscribe/2, [C2, <<"a/b/+">>]},
|
||||||
|
{fun publish/3, [C1, <<"a/b/c">>, <<"09">>]},
|
||||||
|
{fun publish/3, [C2, <<"a/b/d">>, <<"10">>]},
|
||||||
|
{fun publish/3, [C2, <<"a/b/c/d">>, <<"11">>]},
|
||||||
|
{fun unsubscribe/2, [C3, <<"a/b/d">>]},
|
||||||
|
{fun unsubscribe/2, [C1, <<"a/b/c">>]},
|
||||||
|
{fun publish/3, [C1, <<"a/b/c">>, <<"wontsee">>]},
|
||||||
|
{fun publish/3, [C2, <<"a/b/d">>, <<"wontsee">>]}
|
||||||
|
],
|
||||||
|
ok = lists:foreach(fun({F, Args}) -> erlang:apply(F, Args) end, Commands),
|
||||||
|
_ = [emqtt:stop(C) || C <- Clients],
|
||||||
|
Deliveries = ?drainMailbox(),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
{pub, C1, #{topic := <<"a/b/c">>, payload := <<"02">>}},
|
||||||
|
{pub, C1, #{topic := <<"a/b/c">>, payload := <<"06">>}},
|
||||||
|
{pub, C1, #{topic := <<"a/b/c">>, payload := <<"09">>}},
|
||||||
|
{pub, C2, #{topic := <<"a/b/c">>, payload := <<"02">>}},
|
||||||
|
{pub, C2, #{topic := <<"a/b/d">>, payload := <<"03">>}},
|
||||||
|
{pub, C2, #{topic := <<"a/b/d">>, payload := <<"05">>}},
|
||||||
|
{pub, C2, #{topic := <<"a/b/c">>, payload := <<"06">>}},
|
||||||
|
{pub, C2, #{topic := <<"a/b/d">>, payload := <<"07">>}},
|
||||||
|
{pub, C3, #{topic := <<"a/b/c">>, payload := <<"01">>}},
|
||||||
|
{pub, C3, #{topic := <<"a/b/c">>, payload := <<"02">>}},
|
||||||
|
{pub, C3, #{topic := <<"a/b/c/d">>, payload := <<"04">>}},
|
||||||
|
{pub, C3, #{topic := <<"a/b/d">>, payload := <<"05">>}},
|
||||||
|
{pub, C3, #{topic := <<"a/b/d">>, payload := <<"07">>}},
|
||||||
|
{pub, C3, #{topic := <<"a/b/d">>, payload := <<"10">>}}
|
||||||
|
],
|
||||||
|
lists:sort(
|
||||||
|
fun({pub, CL, #{payload := PL}}, {pub, CR, #{payload := PR}}) ->
|
||||||
|
{CL, PL} < {CR, PR}
|
||||||
|
end,
|
||||||
|
Deliveries
|
||||||
|
)
|
||||||
|
).
|
||||||
|
|
||||||
|
start_client(Node) ->
|
||||||
|
Self = self(),
|
||||||
|
{ok, C} = emqtt:start_link(#{
|
||||||
|
port => get_mqtt_tcp_port(Node),
|
||||||
|
msg_handler => #{
|
||||||
|
publish => fun(Msg) -> Self ! {pub, self(), Msg} end
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
{ok, _Props} = emqtt:connect(C),
|
||||||
|
C.
|
||||||
|
|
||||||
|
publish(C, Topic, Payload) ->
|
||||||
|
{ok, #{reason_code := 0}} = emqtt:publish(C, Topic, Payload, 1).
|
||||||
|
|
||||||
|
subscribe(C, Topic) ->
|
||||||
|
% NOTE: sleeping here as lazy way to wait for subscribe to replicate
|
||||||
|
{ok, _Props, [0]} = emqtt:subscribe(C, Topic),
|
||||||
|
ok = timer:sleep(200).
|
||||||
|
|
||||||
|
unsubscribe(C, Topic) ->
|
||||||
|
% NOTE: sleeping here as lazy way to wait for unsubscribe to replicate
|
||||||
|
{ok, _Props, undefined} = emqtt:unsubscribe(C, Topic),
|
||||||
|
ok = timer:sleep(200).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
t_routing_schema_switch_v1(Config) ->
|
||||||
|
t_routing_schema_switch(_From = v2, _To = v1, Config).
|
||||||
|
|
||||||
|
t_routing_schema_switch_v2(Config) ->
|
||||||
|
t_routing_schema_switch(_From = v1, _To = v2, Config).
|
||||||
|
|
||||||
|
t_routing_schema_switch(VFrom, VTo, Config) ->
|
||||||
|
% Start first node with routing schema VTo (e.g. v1)
|
||||||
|
WorkDir = ?config(work_dir, Config),
|
||||||
|
[Node1] = emqx_cth_cluster:start(
|
||||||
|
[
|
||||||
|
{routing_schema_switch1, #{
|
||||||
|
apps => [mk_genrpc_appspec(), mk_emqx_appspec(VTo, 1)]
|
||||||
|
}}
|
||||||
|
],
|
||||||
|
#{work_dir => WorkDir}
|
||||||
|
),
|
||||||
|
% Ensure there's at least 1 route on Node1
|
||||||
|
C1 = start_client(Node1),
|
||||||
|
ok = subscribe(C1, <<"a/+/c">>),
|
||||||
|
ok = subscribe(C1, <<"d/e/f/#">>),
|
||||||
|
% Start rest of nodes with routing schema VFrom (e.g. v2)
|
||||||
|
[Node2, Node3] = emqx_cth_cluster:start(
|
||||||
|
[
|
||||||
|
{routing_schema_switch2, #{
|
||||||
|
apps => [mk_genrpc_appspec(), mk_emqx_appspec(VFrom, 2)],
|
||||||
|
base_port => 20000,
|
||||||
|
join_to => Node1
|
||||||
|
}},
|
||||||
|
{routing_schema_switch3, #{
|
||||||
|
apps => [mk_genrpc_appspec(), mk_emqx_appspec(VFrom, 3)],
|
||||||
|
base_port => 20100,
|
||||||
|
join_to => Node1
|
||||||
|
}}
|
||||||
|
],
|
||||||
|
#{work_dir => WorkDir}
|
||||||
|
),
|
||||||
|
% Verify that new nodes switched to schema v1/v2 in presence of v1/v2 routes respectively
|
||||||
|
Nodes = [Node1, Node2, Node3],
|
||||||
|
?assertEqual(
|
||||||
|
[{ok, VTo}, {ok, VTo}, {ok, VTo}],
|
||||||
|
erpc:multicall(Nodes, emqx_router, get_schema_vsn, [])
|
||||||
|
),
|
||||||
|
% Wait for all nodes to agree on cluster state
|
||||||
|
?retry(
|
||||||
|
500,
|
||||||
|
10,
|
||||||
|
?assertMatch(
|
||||||
|
[{ok, [Node1, Node2, Node3]}],
|
||||||
|
lists:usort(erpc:multicall(Nodes, emqx, running_nodes, []))
|
||||||
|
)
|
||||||
|
),
|
||||||
|
% Verify that routing works as expected
|
||||||
|
C2 = start_client(Node2),
|
||||||
|
ok = subscribe(C2, <<"a/+/d">>),
|
||||||
|
C3 = start_client(Node3),
|
||||||
|
ok = subscribe(C3, <<"d/e/f/#">>),
|
||||||
|
{ok, _} = publish(C1, <<"a/b/d">>, <<"hey-newbies">>),
|
||||||
|
{ok, _} = publish(C2, <<"a/b/c">>, <<"hi">>),
|
||||||
|
{ok, _} = publish(C3, <<"d/e/f/42">>, <<"hello">>),
|
||||||
|
?assertReceive({pub, C2, #{topic := <<"a/b/d">>, payload := <<"hey-newbies">>}}),
|
||||||
|
?assertReceive({pub, C1, #{topic := <<"a/b/c">>, payload := <<"hi">>}}),
|
||||||
|
?assertReceive({pub, C1, #{topic := <<"d/e/f/42">>, payload := <<"hello">>}}),
|
||||||
|
?assertReceive({pub, C3, #{topic := <<"d/e/f/42">>, payload := <<"hello">>}}),
|
||||||
|
?assertNotReceive(_),
|
||||||
|
ok = emqtt:stop(C1),
|
||||||
|
ok = emqtt:stop(C2),
|
||||||
|
ok = emqtt:stop(C3),
|
||||||
|
ok = emqx_cth_cluster:stop(Nodes).
|
||||||
|
|
||||||
|
%%
|
||||||
|
|
||||||
|
get_mqtt_tcp_port(Node) ->
|
||||||
|
{_, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
|
||||||
|
Port.
|
|
@ -1054,7 +1054,7 @@ t_queue_subscription(Config) when is_list(Config) ->
|
||||||
begin
|
begin
|
||||||
ct:pal("routes: ~p", [ets:tab2list(emqx_route)]),
|
ct:pal("routes: ~p", [ets:tab2list(emqx_route)]),
|
||||||
%% FIXME: should ensure we have 2 subscriptions
|
%% FIXME: should ensure we have 2 subscriptions
|
||||||
true = emqx_router:has_routes(Topic)
|
[_] = emqx_router:lookup_routes(Topic)
|
||||||
end
|
end
|
||||||
),
|
),
|
||||||
|
|
||||||
|
@ -1081,7 +1081,7 @@ t_queue_subscription(Config) when is_list(Config) ->
|
||||||
%% _Attempts0 = 50,
|
%% _Attempts0 = 50,
|
||||||
%% begin
|
%% begin
|
||||||
%% ct:pal("routes: ~p", [ets:tab2list(emqx_route)]),
|
%% ct:pal("routes: ~p", [ets:tab2list(emqx_route)]),
|
||||||
%% false = emqx_router:has_routes(Topic)
|
%% [] = emqx_router:lookup_routes(Topic)
|
||||||
%% end
|
%% end
|
||||||
%% ),
|
%% ),
|
||||||
ct:sleep(500),
|
ct:sleep(500),
|
||||||
|
|
|
@ -57,6 +57,17 @@ t_insert(Config) ->
|
||||||
?assertEqual(<<"sensor/#">>, topic(match(M, <<"sensor">>, Tab))),
|
?assertEqual(<<"sensor/#">>, topic(match(M, <<"sensor">>, Tab))),
|
||||||
?assertEqual(t_insert_3, id(match(M, <<"sensor">>, Tab))).
|
?assertEqual(t_insert_3, id(match(M, <<"sensor">>, Tab))).
|
||||||
|
|
||||||
|
t_insert_filter(Config) ->
|
||||||
|
M = get_module(Config),
|
||||||
|
Tab = M:new(),
|
||||||
|
Topic = <<"sensor/+/metric//#">>,
|
||||||
|
true = M:insert(Topic, 1, <<>>, Tab),
|
||||||
|
true = M:insert(emqx_trie_search:filter(Topic), 2, <<>>, Tab),
|
||||||
|
?assertEqual(
|
||||||
|
[Topic, Topic],
|
||||||
|
[topic(X) || X <- matches(M, <<"sensor/1/metric//2">>, Tab)]
|
||||||
|
).
|
||||||
|
|
||||||
t_match(Config) ->
|
t_match(Config) ->
|
||||||
M = get_module(Config),
|
M = get_module(Config),
|
||||||
Tab = M:new(),
|
Tab = M:new(),
|
||||||
|
|
|
@ -18,15 +18,30 @@
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
topic_validation_test() ->
|
-import(emqx_trie_search, [filter/1]).
|
||||||
|
|
||||||
|
filter_test_() ->
|
||||||
|
[
|
||||||
|
?_assertEqual(
|
||||||
|
[<<"sensor">>, '+', <<"metric">>, <<>>, '#'],
|
||||||
|
filter(<<"sensor/+/metric//#">>)
|
||||||
|
),
|
||||||
|
?_assertEqual(
|
||||||
|
false,
|
||||||
|
filter(<<"sensor/1/metric//42">>)
|
||||||
|
)
|
||||||
|
].
|
||||||
|
|
||||||
|
topic_validation_test_() ->
|
||||||
NextF = fun(_) -> '$end_of_table' end,
|
NextF = fun(_) -> '$end_of_table' end,
|
||||||
Call = fun(Topic) ->
|
Call = fun(Topic) ->
|
||||||
emqx_trie_search:match(Topic, NextF)
|
emqx_trie_search:match(Topic, NextF)
|
||||||
end,
|
end,
|
||||||
?assertError(badarg, Call(<<"+">>)),
|
[
|
||||||
?assertError(badarg, Call(<<"#">>)),
|
?_assertError(badarg, Call(<<"+">>)),
|
||||||
?assertError(badarg, Call(<<"a/+/b">>)),
|
?_assertError(badarg, Call(<<"#">>)),
|
||||||
?assertError(badarg, Call(<<"a/b/#">>)),
|
?_assertError(badarg, Call(<<"a/+/b">>)),
|
||||||
?assertEqual(false, Call(<<"a/b/b+">>)),
|
?_assertError(badarg, Call(<<"a/b/#">>)),
|
||||||
?assertEqual(false, Call(<<"a/b/c#">>)),
|
?_assertEqual(false, Call(<<"a/b/b+">>)),
|
||||||
ok.
|
?_assertEqual(false, Call(<<"a/b/c#">>))
|
||||||
|
].
|
||||||
|
|
|
@ -1929,7 +1929,7 @@ t_node_joins_existing_cluster(Config) ->
|
||||||
?retry(
|
?retry(
|
||||||
_Sleep2 = 100,
|
_Sleep2 = 100,
|
||||||
_Attempts2 = 50,
|
_Attempts2 = 50,
|
||||||
true = erpc:call(N2, emqx_router, has_routes, [MQTTTopic])
|
[] =/= erpc:call(N2, emqx_router, lookup_routes, [MQTTTopic])
|
||||||
),
|
),
|
||||||
{ok, SRef1} =
|
{ok, SRef1} =
|
||||||
snabbkaffe:subscribe(
|
snabbkaffe:subscribe(
|
||||||
|
|
|
@ -85,7 +85,7 @@ start_cluster(NamesWithPorts, Apps, Env) ->
|
||||||
NamesWithPorts
|
NamesWithPorts
|
||||||
),
|
),
|
||||||
Opts0 = [
|
Opts0 = [
|
||||||
{env, [{emqx, boot_modules, [broker, listeners]}] ++ Env},
|
{env, Env},
|
||||||
{apps, Apps},
|
{apps, Apps},
|
||||||
{conf,
|
{conf,
|
||||||
[{[listeners, Proto, default, enable], false} || Proto <- [ssl, ws, wss]] ++
|
[{[listeners, Proto, default, enable], false} || Proto <- [ssl, ws, wss]] ++
|
||||||
|
|
|
@ -22,9 +22,6 @@
|
||||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
-include("emqx_mgmt.hrl").
|
|
||||||
|
|
||||||
-define(PRINT_CMD(Cmd, Descr), io:format("~-48s# ~ts~n", [Cmd, Descr])).
|
|
||||||
-define(DATA_BACKUP_OPTS, #{print_fun => fun emqx_ctl:print/2}).
|
-define(DATA_BACKUP_OPTS, #{print_fun => fun emqx_ctl:print/2}).
|
||||||
|
|
||||||
-export([load/0]).
|
-export([load/0]).
|
||||||
|
@ -49,20 +46,6 @@
|
||||||
data/1
|
data/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(PROC_INFOKEYS, [
|
|
||||||
status,
|
|
||||||
memory,
|
|
||||||
message_queue_len,
|
|
||||||
total_heap_size,
|
|
||||||
heap_size,
|
|
||||||
stack_size,
|
|
||||||
reductions
|
|
||||||
]).
|
|
||||||
|
|
||||||
-define(MAX_LIMIT, 10000).
|
|
||||||
|
|
||||||
-define(APP, emqx).
|
|
||||||
|
|
||||||
-spec load() -> ok.
|
-spec load() -> ok.
|
||||||
load() ->
|
load() ->
|
||||||
Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
|
Cmds = [Fun || {Fun, _} <- ?MODULE:module_info(exports), is_cmd(Fun)],
|
||||||
|
@ -197,9 +180,12 @@ if_client(ClientId, Fun) ->
|
||||||
%% @doc Topics Command
|
%% @doc Topics Command
|
||||||
|
|
||||||
topics(["list"]) ->
|
topics(["list"]) ->
|
||||||
dump(?ROUTE_TAB, emqx_topic);
|
emqx_router:foldr_routes(
|
||||||
|
fun(Route, Acc) -> [print({emqx_topic, Route}) | Acc] end,
|
||||||
|
[]
|
||||||
|
);
|
||||||
topics(["show", Topic]) ->
|
topics(["show", Topic]) ->
|
||||||
Routes = ets:lookup(?ROUTE_TAB, bin(Topic)),
|
Routes = emqx_router:lookup_routes(Topic),
|
||||||
[print({emqx_topic, Route}) || Route <- Routes];
|
[print({emqx_topic, Route}) || Route <- Routes];
|
||||||
topics(_) ->
|
topics(_) ->
|
||||||
emqx_ctl:usage([
|
emqx_ctl:usage([
|
||||||
|
|
|
@ -225,8 +225,9 @@ get_rules_ordered_by_ts() ->
|
||||||
-spec get_rules_for_topic(Topic :: binary()) -> [rule()].
|
-spec get_rules_for_topic(Topic :: binary()) -> [rule()].
|
||||||
get_rules_for_topic(Topic) ->
|
get_rules_for_topic(Topic) ->
|
||||||
[
|
[
|
||||||
emqx_topic_index:get_record(M, ?RULE_TOPIC_INDEX)
|
Rule
|
||||||
|| M <- emqx_topic_index:matches(Topic, ?RULE_TOPIC_INDEX, [unique])
|
|| M <- emqx_topic_index:matches(Topic, ?RULE_TOPIC_INDEX, [unique]),
|
||||||
|
Rule <- lookup_rule(emqx_topic_index:get_id(M))
|
||||||
].
|
].
|
||||||
|
|
||||||
-spec get_rules_with_same_event(Topic :: binary()) -> [rule()].
|
-spec get_rules_with_same_event(Topic :: binary()) -> [rule()].
|
||||||
|
@ -284,11 +285,14 @@ is_of_event_name(EventName, Topic) ->
|
||||||
|
|
||||||
-spec get_rule(Id :: rule_id()) -> {ok, rule()} | not_found.
|
-spec get_rule(Id :: rule_id()) -> {ok, rule()} | not_found.
|
||||||
get_rule(Id) ->
|
get_rule(Id) ->
|
||||||
case ets:lookup(?RULE_TAB, Id) of
|
case lookup_rule(Id) of
|
||||||
[{Id, Rule}] -> {ok, Rule#{id => Id}};
|
[Rule] -> {ok, Rule};
|
||||||
[] -> not_found
|
[] -> not_found
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
lookup_rule(Id) ->
|
||||||
|
[Rule || {_Id, Rule} <- ets:lookup(?RULE_TAB, Id)].
|
||||||
|
|
||||||
load_hooks_for_rule(#{from := Topics}) ->
|
load_hooks_for_rule(#{from := Topics}) ->
|
||||||
lists:foreach(fun emqx_rule_events:load/1, Topics).
|
lists:foreach(fun emqx_rule_events:load/1, Topics).
|
||||||
|
|
||||||
|
@ -483,7 +487,7 @@ with_parsed_rule(Params = #{id := RuleId, sql := Sql, actions := Actions}, Creat
|
||||||
do_insert_rule(#{id := Id} = Rule) ->
|
do_insert_rule(#{id := Id} = Rule) ->
|
||||||
ok = load_hooks_for_rule(Rule),
|
ok = load_hooks_for_rule(Rule),
|
||||||
ok = maybe_add_metrics_for_rule(Id),
|
ok = maybe_add_metrics_for_rule(Id),
|
||||||
true = ets:insert(?RULE_TAB, {Id, maps:remove(id, Rule)}),
|
true = ets:insert(?RULE_TAB, {Id, Rule}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
do_delete_rule(#{id := Id} = Rule) ->
|
do_delete_rule(#{id := Id} = Rule) ->
|
||||||
|
@ -492,10 +496,10 @@ do_delete_rule(#{id := Id} = Rule) ->
|
||||||
true = ets:delete(?RULE_TAB, Id),
|
true = ets:delete(?RULE_TAB, Id),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
do_update_rule_index(#{id := Id, from := From} = Rule) ->
|
do_update_rule_index(#{id := Id, from := From}) ->
|
||||||
ok = lists:foreach(
|
ok = lists:foreach(
|
||||||
fun(Topic) ->
|
fun(Topic) ->
|
||||||
true = emqx_topic_index:insert(Topic, Id, Rule, ?RULE_TOPIC_INDEX)
|
true = emqx_topic_index:insert(Topic, Id, [], ?RULE_TOPIC_INDEX)
|
||||||
end,
|
end,
|
||||||
From
|
From
|
||||||
).
|
).
|
||||||
|
|
|
@ -14,8 +14,8 @@ type: application
|
||||||
|
|
||||||
# This is the chart version. This version number should be incremented each time you make changes
|
# This is the chart version. This version number should be incremented each time you make changes
|
||||||
# to the chart and its templates, including the app version.
|
# to the chart and its templates, including the app version.
|
||||||
version: 5.2.0-alpha.3
|
version: 5.2.0-alpha.4
|
||||||
|
|
||||||
# This is the version number of the application being deployed. This version number should be
|
# This is the version number of the application being deployed. This version number should be
|
||||||
# incremented each time you make changes to the application.
|
# incremented each time you make changes to the application.
|
||||||
appVersion: 5.2.0-alpha.3
|
appVersion: 5.2.0-alpha.4
|
||||||
|
|
|
@ -1549,6 +1549,13 @@ fields_ws_opts_max_frame_size.label:
|
||||||
sys_event_messages.desc:
|
sys_event_messages.desc:
|
||||||
"""Client events messages."""
|
"""Client events messages."""
|
||||||
|
|
||||||
|
broker_routing_storage_schema.desc:
|
||||||
|
"""Routing storage schema.
|
||||||
|
Set <code>v1</code> to leave the default.
|
||||||
|
<code>v2</code> is introduced in 5.2. It enables routing through 2 separate tables, one for topic filter and one for regular topic subscriptions. This schema should increase both subscription and routing performance at the cost of slight increase in memory consumption per subscription.
|
||||||
|
NOTE: Schema <code>v2</code> is still experimental.
|
||||||
|
NOTE: Full non-rolling cluster restart is needed after altering this option for it to take any effect."""
|
||||||
|
|
||||||
broker_perf_trie_compaction.desc:
|
broker_perf_trie_compaction.desc:
|
||||||
"""Enable trie path compaction.
|
"""Enable trie path compaction.
|
||||||
Enabling it significantly improves wildcard topic subscribe rate, if wildcard topics have unique prefixes like: 'sensor/{{id}}/+/', where ID is unique per subscriber.
|
Enabling it significantly improves wildcard topic subscribe rate, if wildcard topics have unique prefixes like: 'sensor/{{id}}/+/', where ID is unique per subscriber.
|
||||||
|
|
Loading…
Reference in New Issue