refactor: use unified table define

This commit is contained in:
JimMoen 2023-05-31 11:12:53 +08:00
parent 63d42091e2
commit 65483e972e
No known key found for this signature in database
GPG Key ID: 87A520B4F76BA86D
23 changed files with 110 additions and 67 deletions

View File

@ -13,9 +13,19 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-ifndef(EMQX_CM_HRL).
-define(EMQX_CM_HRL, true).
%% Tables for channel management.
-define(CHAN_TAB, emqx_channel).
-define(CHAN_CONN_TAB, emqx_channel_conn).
-define(CHAN_INFO_TAB, emqx_channel_info).
-define(CHAN_LIVE_TAB, emqx_channel_live).
%% Mria/Mnesia Tables for channel management.
-define(CHAN_REG_TAB, emqx_channel_registry).
-define(T_KICK, 5_000).
-define(T_GET_INFO, 5_000).
-define(T_TAKEOVER, 15_000).

View File

@ -0,0 +1,31 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-ifndef(EMQX_ROUTER_HRL).
-define(EMQX_ROUTER_HRL, true).
%% ETS table for message routing
-define(ROUTE_TAB, emqx_route).
%% Mnesia table for message routing
-define(ROUTING_NODE, emqx_routing_node).
%% ETS tables for PubSub
-define(SUBOPTION, emqx_suboption).
-define(SUBSCRIBER, emqx_subscriber).
-define(SUBSCRIPTION, emqx_subscription).
-endif.

View File

@ -19,6 +19,8 @@
-behaviour(gen_server).
-include("emqx.hrl").
-include("emqx_router.hrl").
-include("logger.hrl").
-include("types.hrl").
-include("emqx_mqtt.hrl").
@ -80,11 +82,6 @@
-define(BROKER, ?MODULE).
%% ETS tables for PubSub
-define(SUBOPTION, emqx_suboption).
-define(SUBSCRIBER, emqx_subscriber).
-define(SUBSCRIPTION, emqx_subscription).
%% Guards
-define(IS_SUBID(Id), (is_binary(Id) orelse is_atom(Id))).

View File

@ -1866,6 +1866,9 @@ check_pub_caps(
%%--------------------------------------------------------------------
%% Check Sub Authorization
%% TODO: not only check topic filter. Qos chould be checked too.
%% Not implemented yet:
%% MQTT-3.1.1 [MQTT-3.8.4-6] and MQTT-5.0 [MQTT-3.8.4-7]
check_sub_authzs(TopicFilters, Channel) ->
check_sub_authzs(TopicFilters, Channel, []).

View File

@ -20,6 +20,7 @@
-behaviour(gen_server).
-include("emqx.hrl").
-include("emqx_cm.hrl").
-include("logger.hrl").
-include("types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -118,14 +119,6 @@
_Stats :: emqx_types:stats()
}.
-include("emqx_cm.hrl").
%% Tables for channel management.
-define(CHAN_TAB, emqx_channel).
-define(CHAN_CONN_TAB, emqx_channel_conn).
-define(CHAN_INFO_TAB, emqx_channel_info).
-define(CHAN_LIVE_TAB, emqx_channel_live).
-define(CHAN_STATS, [
{?CHAN_TAB, 'channels.count', 'channels.max'},
{?CHAN_TAB, 'sessions.count', 'sessions.max'},
@ -669,12 +662,12 @@ lookup_client({username, Username}) ->
MatchSpec = [
{{'_', #{clientinfo => #{username => '$1'}}, '_'}, [{'=:=', '$1', Username}], ['$_']}
],
ets:select(emqx_channel_info, MatchSpec);
ets:select(?CHAN_INFO_TAB, MatchSpec);
lookup_client({clientid, ClientId}) ->
[
Rec
|| Key <- ets:lookup(emqx_channel, ClientId),
Rec <- ets:lookup(emqx_channel_info, Key)
|| Key <- ets:lookup(?CHAN_TAB, ClientId),
Rec <- ets:lookup(?CHAN_INFO_TAB, Key)
].
%% @private

View File

@ -20,6 +20,7 @@
-behaviour(gen_server).
-include("emqx.hrl").
-include("emqx_cm.hrl").
-include("logger.hrl").
-include("types.hrl").
@ -50,7 +51,6 @@
]).
-define(REGISTRY, ?MODULE).
-define(TAB, emqx_channel_registry).
-define(LOCK, {?MODULE, cleanup_down}).
-record(channel, {chid, pid}).
@ -78,7 +78,7 @@ register_channel(ClientId) when is_binary(ClientId) ->
register_channel({ClientId, self()});
register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
case is_enabled() of
true -> mria:dirty_write(?TAB, record(ClientId, ChanPid));
true -> mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid));
false -> ok
end.
@ -91,14 +91,14 @@ unregister_channel(ClientId) when is_binary(ClientId) ->
unregister_channel({ClientId, self()});
unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
case is_enabled() of
true -> mria:dirty_delete_object(?TAB, record(ClientId, ChanPid));
true -> mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid));
false -> ok
end.
%% @doc Lookup the global channels.
-spec lookup_channels(emqx_types:clientid()) -> list(pid()).
lookup_channels(ClientId) ->
[ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(?TAB, ClientId)].
[ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(?CHAN_REG_TAB, ClientId)].
record(ClientId, ChanPid) ->
#channel{chid = ClientId, pid = ChanPid}.
@ -109,7 +109,7 @@ record(ClientId, ChanPid) ->
init([]) ->
mria_config:set_dirty_shard(?CM_SHARD, true),
ok = mria:create_table(?TAB, [
ok = mria:create_table(?CHAN_REG_TAB, [
{type, bag},
{rlog_shard, ?CM_SHARD},
{storage, ram_copies},
@ -166,7 +166,7 @@ cleanup_channels(Node) ->
do_cleanup_channels(Node) ->
Pat = [{#channel{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}],
lists:foreach(fun delete_channel/1, mnesia:select(?TAB, Pat, write)).
lists:foreach(fun delete_channel/1, mnesia:select(?CHAN_REG_TAB, Pat, write)).
delete_channel(Chan) ->
mnesia:delete_object(?TAB, Chan, write).
mnesia:delete_object(?CHAN_REG_TAB, Chan, write).

View File

@ -22,6 +22,7 @@
-include("logger.hrl").
-include("types.hrl").
-include_lib("mria/include/mria.hrl").
-include_lib("emqx/include/emqx_router.hrl").
%% Mnesia bootstrap
-export([mnesia/1]).
@ -69,8 +70,6 @@
-type dest() :: node() | {group(), node()}.
-define(ROUTE_TAB, emqx_route).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------

View File

@ -19,6 +19,7 @@
-behaviour(gen_server).
-include("emqx.hrl").
-include("emqx_router.hrl").
-include("logger.hrl").
-include("types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -54,8 +55,6 @@
-record(routing_node, {name, const = unused}).
-define(ROUTE, emqx_route).
-define(ROUTING_NODE, emqx_routing_node).
-define(LOCK, {?MODULE, cleanup_routes}).
-dialyzer({nowarn_function, [cleanup_routes/1]}).
@ -185,7 +184,7 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
stats_fun() ->
case ets:info(?ROUTE, size) of
case ets:info(?ROUTE_TAB, size) of
undefined ->
ok;
Size ->
@ -198,6 +197,6 @@ cleanup_routes(Node) ->
#route{_ = '_', dest = {'_', Node}}
],
[
mnesia:delete_object(?ROUTE, Route, write)
|| Pat <- Patterns, Route <- mnesia:match_object(?ROUTE, Pat, write)
mnesia:delete_object(?ROUTE_TAB, Route, write)
|| Pat <- Patterns, Route <- mnesia:match_object(?ROUTE_TAB, Pat, write)
].

View File

@ -18,6 +18,7 @@
-module(emqx_ws_connection).
-include("emqx.hrl").
-include("emqx_cm.hrl").
-include("emqx_mqtt.hrl").
-include("logger.hrl").
-include("types.hrl").
@ -1034,7 +1035,7 @@ check_max_connection(Type, Listener) ->
allow;
Max ->
MatchSpec = [{{'_', emqx_ws_connection}, [], [true]}],
Curr = ets:select_count(emqx_channel_conn, MatchSpec),
Curr = ets:select_count(?CHAN_CONN_TAB, MatchSpec),
case Curr >= Max of
false ->
allow;

View File

@ -33,7 +33,7 @@
]).
-include("bpapi.hrl").
-include("src/emqx_cm.hrl").
-include_lib("emqx/include/emqx_cm.hrl").
introduced_in() ->
"5.0.0".

View File

@ -34,7 +34,7 @@
]).
-include("bpapi.hrl").
-include("src/emqx_cm.hrl").
-include_lib("emqx/include/emqx_cm.hrl").
introduced_in() ->
"5.0.0".

View File

@ -20,6 +20,7 @@
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_cm.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -200,10 +201,10 @@ t_open_session_race_condition(_) ->
end,
Winner = WaitForDowns(Pids),
?assertMatch([_], ets:lookup(emqx_channel, ClientId)),
?assertMatch([_], ets:lookup(?CHAN_TAB, ClientId)),
?assertEqual([Winner], emqx_cm:lookup_channels(ClientId)),
?assertMatch([_], ets:lookup(emqx_channel_conn, {ClientId, Winner})),
?assertMatch([_], ets:lookup(emqx_channel_registry, ClientId)),
?assertMatch([_], ets:lookup(?CHAN_CONN_TAB, {ClientId, Winner})),
?assertMatch([_], ets:lookup(?CHAN_REG_TAB, ClientId)),
exit(Winner, kill),
receive

View File

@ -23,6 +23,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("quicer/include/quicer.hrl").
-include_lib("emqx/include/emqx_cm.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -1465,7 +1466,7 @@ t_multi_streams_emqx_ctrl_kill(Config) ->
),
ClientId = proplists:get_value(clientid, emqtt:info(C)),
[{ClientId, TransPid}] = ets:lookup(emqx_channel, ClientId),
[{ClientId, TransPid}] = ets:lookup(?CHAN_TAB, ClientId),
exit(TransPid, kill),
%% Client should be closed
@ -1518,7 +1519,7 @@ t_multi_streams_emqx_ctrl_exit_normal(Config) ->
),
ClientId = proplists:get_value(clientid, emqtt:info(C)),
[{ClientId, TransPid}] = ets:lookup(emqx_channel, ClientId),
[{ClientId, TransPid}] = ets:lookup(?CHAN_TAB, ClientId),
emqx_connection:stop(TransPid),
%% Client exit normal.

View File

@ -20,6 +20,7 @@
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_router.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
@ -127,5 +128,5 @@ t_unexpected(_) ->
clear_tables() ->
lists:foreach(
fun mnesia:clear_table/1,
[emqx_route, emqx_trie, emqx_trie_node]
[?ROUTE_TAB, ?TRIE, emqx_trie_node]
).

View File

@ -20,11 +20,11 @@
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx_router.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(ROUTER_HELPER, emqx_router_helper).
-define(ROUTE_TAB, emqx_route).
all() -> emqx_common_test_helpers:all(?MODULE).
@ -82,9 +82,9 @@ t_monitor(_) ->
emqx_router_helper:monitor(undefined).
t_mnesia(_) ->
?ROUTER_HELPER ! {mnesia_table_event, {delete, {emqx_routing_node, node()}, undefined}},
?ROUTER_HELPER ! {mnesia_table_event, {delete, {?ROUTING_NODE, node()}, undefined}},
?ROUTER_HELPER ! {mnesia_table_event, testing},
?ROUTER_HELPER ! {mnesia_table_event, {write, {emqx_routing_node, node()}, undefined}},
?ROUTER_HELPER ! {mnesia_table_event, {write, {?ROUTING_NODE, node()}, undefined}},
?ROUTER_HELPER ! {membership, testing},
?ROUTER_HELPER ! {membership, {mnesia, down, node()}},
ct:sleep(200).

View File

@ -20,6 +20,7 @@
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_cm.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -117,7 +118,7 @@ load_meck(ClientId) ->
[ChanPid] = emqx_cm:lookup_channels(ClientId),
ChanInfo = #{conninfo := ConnInfo} = emqx_cm:get_chan_info(ClientId),
NChanInfo = ChanInfo#{conninfo := ConnInfo#{conn_mod := fake_conn_mod}},
true = ets:update_element(emqx_channel_info, {ClientId, ChanPid}, {2, NChanInfo}).
true = ets:update_element(?CHAN_INFO_TAB, {ClientId, ChanPid}, {2, NChanInfo}).
unload_meck(_ClientId) ->
meck:unload(fake_conn_mod).

View File

@ -11,6 +11,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("emqx/include/emqx_cm.hrl").
-import(
emqx_eviction_agent_test_helpers,
@ -295,7 +296,7 @@ t_session_serialization(_Config) ->
?assertMatch(
#{data := [#{clientid := <<"client_with_session">>}]},
emqx_mgmt_api:cluster_query(
emqx_channel_info,
?CHAN_INFO_TAB,
#{},
[],
fun emqx_mgmt_api_clients:qs2ms/2,

View File

@ -24,6 +24,7 @@
-record(state, {subscriber}).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_router.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
@ -50,7 +51,7 @@ unsubscribe(Topic) ->
gen_server:call(?MODULE, {unsubscribe, Topic}).
get_subscrbied_topics() ->
[Topic || {_Client, Topic} <- ets:tab2list(emqx_subscription)].
[Topic || {_Client, Topic} <- ets:tab2list(?SUBSCRIPTION)].
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

View File

@ -17,6 +17,8 @@
-module(emqx_mgmt).
-include("emqx_mgmt.hrl").
-include_lib("emqx/include/emqx_cm.hrl").
-elvis([{elvis_style, invalid_dynamic_call, disable}]).
-elvis([{elvis_style, god_modules, disable}]).
@ -139,7 +141,7 @@ node_info() ->
max_fds => proplists:get_value(
max_fds, lists:usort(lists:flatten(erlang:system_info(check_io)))
),
connections => ets:info(emqx_channel, size),
connections => ets:info(?CHAN_TAB, size),
node_status => 'running',
uptime => proplists:get_value(uptime, BrokerInfo),
version => iolist_to_binary(proplists:get_value(version, BrokerInfo)),
@ -487,7 +489,7 @@ subscribe([], _ClientId, _TopicTables) ->
-spec do_subscribe(emqx_types:clientid(), emqx_types:topic_filters()) ->
{subscribe, _} | {error, atom()}.
do_subscribe(ClientId, TopicTables) ->
case ets:lookup(emqx_channel, ClientId) of
case ets:lookup(?CHAN_TAB, ClientId) of
[] -> {error, channel_not_found};
[{_, Pid}] -> Pid ! {subscribe, TopicTables}
end.
@ -514,7 +516,7 @@ unsubscribe([], _ClientId, _Topic) ->
-spec do_unsubscribe(emqx_types:clientid(), emqx_types:topic()) ->
{unsubscribe, _} | {error, _}.
do_unsubscribe(ClientId, Topic) ->
case ets:lookup(emqx_channel, ClientId) of
case ets:lookup(?CHAN_TAB, ClientId) of
[] -> {error, channel_not_found};
[{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(Topic)]}
end.
@ -537,7 +539,7 @@ unsubscribe_batch([], _ClientId, _Topics) ->
-spec do_unsubscribe_batch(emqx_types:clientid(), [emqx_types:topic()]) ->
{unsubscribe_batch, _} | {error, _}.
do_unsubscribe_batch(ClientId, Topics) ->
case ets:lookup(emqx_channel, ClientId) of
case ets:lookup(?CHAN_TAB, ClientId) of
[] -> {error, channel_not_found};
[{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(Topic) || Topic <- Topics]}
end.

View File

@ -20,6 +20,7 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_cm.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
@ -57,7 +58,6 @@
%% for batch operation
-export([do_subscribe/3]).
-define(CLIENT_QTAB, emqx_channel_info).
-define(TAGS, [<<"Clients">>]).
-define(CLIENT_QSCHEMA, [
@ -666,7 +666,7 @@ list_clients(QString) ->
case maps:get(<<"node">>, QString, undefined) of
undefined ->
emqx_mgmt_api:cluster_query(
?CLIENT_QTAB,
?CHAN_INFO_TAB,
QString,
?CLIENT_QSCHEMA,
fun ?MODULE:qs2ms/2,
@ -678,7 +678,7 @@ list_clients(QString) ->
QStringWithoutNode = maps:without([<<"node">>], QString),
emqx_mgmt_api:node_query(
Node1,
?CLIENT_QTAB,
?CHAN_INFO_TAB,
QStringWithoutNode,
?CLIENT_QSCHEMA,
fun ?MODULE:qs2ms/2,
@ -753,7 +753,7 @@ subscribe_batch(#{clientid := ClientID, topics := Topics}) ->
%% We use emqx_channel instead of emqx_channel_info (used by the emqx_mgmt:lookup_client/2),
%% as the emqx_channel_info table will only be populated after the hook `client.connected`
%% has returned. So if one want to subscribe topics in this hook, it will fail.
case ets:lookup(emqx_channel, ClientID) of
case ets:lookup(?CHAN_TAB, ClientID) of
[] ->
{404, ?CLIENTID_NOT_FOUND};
_ ->

View File

@ -17,6 +17,7 @@
-module(emqx_mgmt_api_topics).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_router.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
@ -111,7 +112,7 @@ do_list(Params) ->
case
emqx_mgmt_api:node_query(
node(),
emqx_route,
?ROUTE_TAB,
Params,
?TOPICS_QUERY_SCHEMA,
fun ?MODULE:qs2ms/2,

View File

@ -17,6 +17,8 @@
-module(emqx_mgmt_cli).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_cm.hrl").
-include_lib("emqx/include/emqx_router.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
@ -168,7 +170,7 @@ sort_map_list_field(Field, Map) ->
%% @doc Query clients
clients(["list"]) ->
dump(emqx_channel, client);
dump(?CHAN_TAB, client);
clients(["show", ClientId]) ->
if_client(ClientId, fun print/1);
clients(["kick", ClientId]) ->
@ -182,7 +184,7 @@ clients(_) ->
]).
if_client(ClientId, Fun) ->
case ets:lookup(emqx_channel, (bin(ClientId))) of
case ets:lookup(?CHAN_TAB, (bin(ClientId))) of
[] -> emqx_ctl:print("Not Found.~n");
[Channel] -> Fun({client, Channel})
end.
@ -191,9 +193,9 @@ if_client(ClientId, Fun) ->
%% @doc Topics Command
topics(["list"]) ->
dump(emqx_route, emqx_topic);
dump(?ROUTE_TAB, emqx_topic);
topics(["show", Topic]) ->
Routes = ets:lookup(emqx_route, bin(Topic)),
Routes = ets:lookup(?ROUTE_TAB, bin(Topic)),
[print({emqx_topic, Route}) || Route <- Routes];
topics(_) ->
emqx_ctl:usage([
@ -204,23 +206,23 @@ topics(_) ->
subscriptions(["list"]) ->
lists:foreach(
fun(Suboption) ->
print({emqx_suboption, Suboption})
print({?SUBOPTION, Suboption})
end,
ets:tab2list(emqx_suboption)
ets:tab2list(?SUBOPTION)
);
subscriptions(["show", ClientId]) ->
case ets:lookup(emqx_subid, bin(ClientId)) of
[] ->
emqx_ctl:print("Not Found.~n");
[{_, Pid}] ->
case ets:match_object(emqx_suboption, {{'_', Pid}, '_'}) of
case ets:match_object(?SUBOPTION, {{'_', Pid}, '_'}) of
[] -> emqx_ctl:print("Not Found.~n");
Suboption -> [print({emqx_suboption, Sub}) || Sub <- Suboption]
Suboption -> [print({?SUBOPTION, Sub}) || Sub <- Suboption]
end
end;
subscriptions(["add", ClientId, Topic, QoS]) ->
if_valid_qos(QoS, fun(IntQos) ->
case ets:lookup(emqx_channel, bin(ClientId)) of
case ets:lookup(?CHAN_TAB, bin(ClientId)) of
[] ->
emqx_ctl:print("Error: Channel not found!");
[{_, Pid}] ->
@ -230,7 +232,7 @@ subscriptions(["add", ClientId, Topic, QoS]) ->
end
end);
subscriptions(["del", ClientId, Topic]) ->
case ets:lookup(emqx_channel, bin(ClientId)) of
case ets:lookup(?CHAN_TAB, bin(ClientId)) of
[] ->
emqx_ctl:print("Error: Channel not found!");
[{_, Pid}] ->
@ -841,7 +843,7 @@ print({emqx_topic, #route{topic = Topic, dest = {_, Node}}}) ->
emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]);
print({emqx_topic, #route{topic = Topic, dest = Node}}) ->
emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]);
print({emqx_suboption, {{Topic, Pid}, Options}}) when is_pid(Pid) ->
print({?SUBOPTION, {{Topic, Pid}, Options}}) when is_pid(Pid) ->
SubId = maps:get(subid, Options),
QoS = maps:get(qos, Options, 0),
NL = maps:get(nl, Options, 0),

View File

@ -18,11 +18,10 @@
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx_router.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(ROUTE_TAB, emqx_route).
all() ->
emqx_common_test_helpers:all(?MODULE).