diff --git a/apps/emqx/src/emqx_cm.hrl b/apps/emqx/include/emqx_cm.hrl similarity index 75% rename from apps/emqx/src/emqx_cm.hrl rename to apps/emqx/include/emqx_cm.hrl index 3896ea30a..ae70f131f 100644 --- a/apps/emqx/src/emqx_cm.hrl +++ b/apps/emqx/include/emqx_cm.hrl @@ -13,9 +13,19 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- + -ifndef(EMQX_CM_HRL). -define(EMQX_CM_HRL, true). +%% Tables for channel management. +-define(CHAN_TAB, emqx_channel). +-define(CHAN_CONN_TAB, emqx_channel_conn). +-define(CHAN_INFO_TAB, emqx_channel_info). +-define(CHAN_LIVE_TAB, emqx_channel_live). + +%% Mria/Mnesia Tables for channel management. +-define(CHAN_REG_TAB, emqx_channel_registry). + -define(T_KICK, 5_000). -define(T_GET_INFO, 5_000). -define(T_TAKEOVER, 15_000). diff --git a/apps/emqx/include/emqx_router.hrl b/apps/emqx/include/emqx_router.hrl new file mode 100644 index 000000000..035ff5455 --- /dev/null +++ b/apps/emqx/include/emqx_router.hrl @@ -0,0 +1,31 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_ROUTER_HRL). +-define(EMQX_ROUTER_HRL, true). + +%% ETS table for message routing +-define(ROUTE_TAB, emqx_route). + +%% Mnesia table for message routing +-define(ROUTING_NODE, emqx_routing_node). + +%% ETS tables for PubSub +-define(SUBOPTION, emqx_suboption). +-define(SUBSCRIBER, emqx_subscriber). +-define(SUBSCRIPTION, emqx_subscription). + +-endif. diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 5f7c4aaf5..390b732b9 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -19,6 +19,8 @@ -behaviour(gen_server). -include("emqx.hrl"). +-include("emqx_router.hrl"). + -include("logger.hrl"). -include("types.hrl"). -include("emqx_mqtt.hrl"). @@ -80,11 +82,6 @@ -define(BROKER, ?MODULE). -%% ETS tables for PubSub --define(SUBOPTION, emqx_suboption). --define(SUBSCRIBER, emqx_subscriber). --define(SUBSCRIPTION, emqx_subscription). - %% Guards -define(IS_SUBID(Id), (is_binary(Id) orelse is_atom(Id))). diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 5637bb171..75be6b905 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1866,6 +1866,9 @@ check_pub_caps( %%-------------------------------------------------------------------- %% Check Sub Authorization +%% TODO: not only check topic filter. Qos chould be checked too. +%% Not implemented yet: +%% MQTT-3.1.1 [MQTT-3.8.4-6] and MQTT-5.0 [MQTT-3.8.4-7] check_sub_authzs(TopicFilters, Channel) -> check_sub_authzs(TopicFilters, Channel, []). diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index db3d48f5d..c193cea44 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -20,6 +20,7 @@ -behaviour(gen_server). -include("emqx.hrl"). +-include("emqx_cm.hrl"). -include("logger.hrl"). -include("types.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -118,14 +119,6 @@ _Stats :: emqx_types:stats() }. --include("emqx_cm.hrl"). - -%% Tables for channel management. --define(CHAN_TAB, emqx_channel). --define(CHAN_CONN_TAB, emqx_channel_conn). --define(CHAN_INFO_TAB, emqx_channel_info). --define(CHAN_LIVE_TAB, emqx_channel_live). - -define(CHAN_STATS, [ {?CHAN_TAB, 'channels.count', 'channels.max'}, {?CHAN_TAB, 'sessions.count', 'sessions.max'}, @@ -669,12 +662,12 @@ lookup_client({username, Username}) -> MatchSpec = [ {{'_', #{clientinfo => #{username => '$1'}}, '_'}, [{'=:=', '$1', Username}], ['$_']} ], - ets:select(emqx_channel_info, MatchSpec); + ets:select(?CHAN_INFO_TAB, MatchSpec); lookup_client({clientid, ClientId}) -> [ Rec - || Key <- ets:lookup(emqx_channel, ClientId), - Rec <- ets:lookup(emqx_channel_info, Key) + || Key <- ets:lookup(?CHAN_TAB, ClientId), + Rec <- ets:lookup(?CHAN_INFO_TAB, Key) ]. %% @private diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index 8a5639486..058bb53ec 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -20,6 +20,7 @@ -behaviour(gen_server). -include("emqx.hrl"). +-include("emqx_cm.hrl"). -include("logger.hrl"). -include("types.hrl"). @@ -50,7 +51,6 @@ ]). -define(REGISTRY, ?MODULE). --define(TAB, emqx_channel_registry). -define(LOCK, {?MODULE, cleanup_down}). -record(channel, {chid, pid}). @@ -78,7 +78,7 @@ register_channel(ClientId) when is_binary(ClientId) -> register_channel({ClientId, self()}); register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> case is_enabled() of - true -> mria:dirty_write(?TAB, record(ClientId, ChanPid)); + true -> mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid)); false -> ok end. @@ -91,14 +91,14 @@ unregister_channel(ClientId) when is_binary(ClientId) -> unregister_channel({ClientId, self()}); unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> case is_enabled() of - true -> mria:dirty_delete_object(?TAB, record(ClientId, ChanPid)); + true -> mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid)); false -> ok end. %% @doc Lookup the global channels. -spec lookup_channels(emqx_types:clientid()) -> list(pid()). lookup_channels(ClientId) -> - [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(?TAB, ClientId)]. + [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(?CHAN_REG_TAB, ClientId)]. record(ClientId, ChanPid) -> #channel{chid = ClientId, pid = ChanPid}. @@ -109,7 +109,7 @@ record(ClientId, ChanPid) -> init([]) -> mria_config:set_dirty_shard(?CM_SHARD, true), - ok = mria:create_table(?TAB, [ + ok = mria:create_table(?CHAN_REG_TAB, [ {type, bag}, {rlog_shard, ?CM_SHARD}, {storage, ram_copies}, @@ -166,7 +166,7 @@ cleanup_channels(Node) -> do_cleanup_channels(Node) -> Pat = [{#channel{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}], - lists:foreach(fun delete_channel/1, mnesia:select(?TAB, Pat, write)). + lists:foreach(fun delete_channel/1, mnesia:select(?CHAN_REG_TAB, Pat, write)). delete_channel(Chan) -> - mnesia:delete_object(?TAB, Chan, write). + mnesia:delete_object(?CHAN_REG_TAB, Chan, write). diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 42430af5d..95b6136a7 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -22,6 +22,7 @@ -include("logger.hrl"). -include("types.hrl"). -include_lib("mria/include/mria.hrl"). +-include_lib("emqx/include/emqx_router.hrl"). %% Mnesia bootstrap -export([mnesia/1]). @@ -69,8 +70,6 @@ -type dest() :: node() | {group(), node()}. --define(ROUTE_TAB, emqx_route). - %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index 4bff98072..78cf62d6c 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -19,6 +19,7 @@ -behaviour(gen_server). -include("emqx.hrl"). +-include("emqx_router.hrl"). -include("logger.hrl"). -include("types.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -54,8 +55,6 @@ -record(routing_node, {name, const = unused}). --define(ROUTE, emqx_route). --define(ROUTING_NODE, emqx_routing_node). -define(LOCK, {?MODULE, cleanup_routes}). -dialyzer({nowarn_function, [cleanup_routes/1]}). @@ -185,7 +184,7 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- stats_fun() -> - case ets:info(?ROUTE, size) of + case ets:info(?ROUTE_TAB, size) of undefined -> ok; Size -> @@ -198,6 +197,6 @@ cleanup_routes(Node) -> #route{_ = '_', dest = {'_', Node}} ], [ - mnesia:delete_object(?ROUTE, Route, write) - || Pat <- Patterns, Route <- mnesia:match_object(?ROUTE, Pat, write) + mnesia:delete_object(?ROUTE_TAB, Route, write) + || Pat <- Patterns, Route <- mnesia:match_object(?ROUTE_TAB, Pat, write) ]. diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 00fe545eb..6cc86e5f4 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -18,6 +18,7 @@ -module(emqx_ws_connection). -include("emqx.hrl"). +-include("emqx_cm.hrl"). -include("emqx_mqtt.hrl"). -include("logger.hrl"). -include("types.hrl"). @@ -1034,7 +1035,7 @@ check_max_connection(Type, Listener) -> allow; Max -> MatchSpec = [{{'_', emqx_ws_connection}, [], [true]}], - Curr = ets:select_count(emqx_channel_conn, MatchSpec), + Curr = ets:select_count(?CHAN_CONN_TAB, MatchSpec), case Curr >= Max of false -> allow; diff --git a/apps/emqx/src/proto/emqx_cm_proto_v1.erl b/apps/emqx/src/proto/emqx_cm_proto_v1.erl index d81469df4..3c176d586 100644 --- a/apps/emqx/src/proto/emqx_cm_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_cm_proto_v1.erl @@ -33,7 +33,7 @@ ]). -include("bpapi.hrl"). --include("src/emqx_cm.hrl"). +-include_lib("emqx/include/emqx_cm.hrl"). introduced_in() -> "5.0.0". diff --git a/apps/emqx/src/proto/emqx_cm_proto_v2.erl b/apps/emqx/src/proto/emqx_cm_proto_v2.erl index 4208df97f..9458ba1be 100644 --- a/apps/emqx/src/proto/emqx_cm_proto_v2.erl +++ b/apps/emqx/src/proto/emqx_cm_proto_v2.erl @@ -34,7 +34,7 @@ ]). -include("bpapi.hrl"). --include("src/emqx_cm.hrl"). +-include_lib("emqx/include/emqx_cm.hrl"). introduced_in() -> "5.0.0". diff --git a/apps/emqx/test/emqx_cm_SUITE.erl b/apps/emqx/test/emqx_cm_SUITE.erl index 5589f5bd8..6cb58be46 100644 --- a/apps/emqx/test/emqx_cm_SUITE.erl +++ b/apps/emqx/test/emqx_cm_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_cm.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -200,10 +201,10 @@ t_open_session_race_condition(_) -> end, Winner = WaitForDowns(Pids), - ?assertMatch([_], ets:lookup(emqx_channel, ClientId)), + ?assertMatch([_], ets:lookup(?CHAN_TAB, ClientId)), ?assertEqual([Winner], emqx_cm:lookup_channels(ClientId)), - ?assertMatch([_], ets:lookup(emqx_channel_conn, {ClientId, Winner})), - ?assertMatch([_], ets:lookup(emqx_channel_registry, ClientId)), + ?assertMatch([_], ets:lookup(?CHAN_CONN_TAB, {ClientId, Winner})), + ?assertMatch([_], ets:lookup(?CHAN_REG_TAB, ClientId)), exit(Winner, kill), receive diff --git a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl index 1f4ccab88..1b45cb669 100644 --- a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl +++ b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl @@ -23,6 +23,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("quicer/include/quicer.hrl"). +-include_lib("emqx/include/emqx_cm.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -1465,7 +1466,7 @@ t_multi_streams_emqx_ctrl_kill(Config) -> ), ClientId = proplists:get_value(clientid, emqtt:info(C)), - [{ClientId, TransPid}] = ets:lookup(emqx_channel, ClientId), + [{ClientId, TransPid}] = ets:lookup(?CHAN_TAB, ClientId), exit(TransPid, kill), %% Client should be closed @@ -1518,7 +1519,7 @@ t_multi_streams_emqx_ctrl_exit_normal(Config) -> ), ClientId = proplists:get_value(clientid, emqtt:info(C)), - [{ClientId, TransPid}] = ets:lookup(emqx_channel, ClientId), + [{ClientId, TransPid}] = ets:lookup(?CHAN_TAB, ClientId), emqx_connection:stop(TransPid), %% Client exit normal. diff --git a/apps/emqx/test/emqx_router_SUITE.erl b/apps/emqx/test/emqx_router_SUITE.erl index 2db0acf82..067f11634 100644 --- a/apps/emqx/test/emqx_router_SUITE.erl +++ b/apps/emqx/test/emqx_router_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_router.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). @@ -127,5 +128,5 @@ t_unexpected(_) -> clear_tables() -> lists:foreach( fun mnesia:clear_table/1, - [emqx_route, emqx_trie, emqx_trie_node] + [?ROUTE_TAB, ?TRIE, emqx_trie_node] ). diff --git a/apps/emqx/test/emqx_router_helper_SUITE.erl b/apps/emqx/test/emqx_router_helper_SUITE.erl index 9fc3bd97b..c0796288e 100644 --- a/apps/emqx/test/emqx_router_helper_SUITE.erl +++ b/apps/emqx/test/emqx_router_helper_SUITE.erl @@ -20,11 +20,11 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/emqx_router.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(ROUTER_HELPER, emqx_router_helper). --define(ROUTE_TAB, emqx_route). all() -> emqx_common_test_helpers:all(?MODULE). @@ -82,9 +82,9 @@ t_monitor(_) -> emqx_router_helper:monitor(undefined). t_mnesia(_) -> - ?ROUTER_HELPER ! {mnesia_table_event, {delete, {emqx_routing_node, node()}, undefined}}, + ?ROUTER_HELPER ! {mnesia_table_event, {delete, {?ROUTING_NODE, node()}, undefined}}, ?ROUTER_HELPER ! {mnesia_table_event, testing}, - ?ROUTER_HELPER ! {mnesia_table_event, {write, {emqx_routing_node, node()}, undefined}}, + ?ROUTER_HELPER ! {mnesia_table_event, {write, {?ROUTING_NODE, node()}, undefined}}, ?ROUTER_HELPER ! {membership, testing}, ?ROUTER_HELPER ! {membership, {mnesia, down, node()}}, ct:sleep(200). diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index d5b36c2c3..0b0ab7121 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_cm.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -117,7 +118,7 @@ load_meck(ClientId) -> [ChanPid] = emqx_cm:lookup_channels(ClientId), ChanInfo = #{conninfo := ConnInfo} = emqx_cm:get_chan_info(ClientId), NChanInfo = ChanInfo#{conninfo := ConnInfo#{conn_mod := fake_conn_mod}}, - true = ets:update_element(emqx_channel_info, {ClientId, ChanPid}, {2, NChanInfo}). + true = ets:update_element(?CHAN_INFO_TAB, {ClientId, ChanPid}, {2, NChanInfo}). unload_meck(_ClientId) -> meck:unload(fake_conn_mod). diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl index 883407a94..65df26387 100644 --- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl +++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl @@ -11,6 +11,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/asserts.hrl"). +-include_lib("emqx/include/emqx_cm.hrl"). -import( emqx_eviction_agent_test_helpers, @@ -295,7 +296,7 @@ t_session_serialization(_Config) -> ?assertMatch( #{data := [#{clientid := <<"client_with_session">>}]}, emqx_mgmt_api:cluster_query( - emqx_channel_info, + ?CHAN_INFO_TAB, #{}, [], fun emqx_mgmt_api_clients:qs2ms/2, diff --git a/apps/emqx_gateway/test/test_mqtt_broker.erl b/apps/emqx_gateway/test/test_mqtt_broker.erl index 497f81f76..74717dc1f 100644 --- a/apps/emqx_gateway/test/test_mqtt_broker.erl +++ b/apps/emqx_gateway/test/test_mqtt_broker.erl @@ -24,6 +24,7 @@ -record(state, {subscriber}). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_router.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). @@ -50,7 +51,7 @@ unsubscribe(Topic) -> gen_server:call(?MODULE, {unsubscribe, Topic}). get_subscrbied_topics() -> - [Topic || {_Client, Topic} <- ets:tab2list(emqx_subscription)]. + [Topic || {_Client, Topic} <- ets:tab2list(?SUBSCRIPTION)]. start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 24a4c3fe4..8f40190bd 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -17,6 +17,8 @@ -module(emqx_mgmt). -include("emqx_mgmt.hrl"). +-include_lib("emqx/include/emqx_cm.hrl"). + -elvis([{elvis_style, invalid_dynamic_call, disable}]). -elvis([{elvis_style, god_modules, disable}]). @@ -139,7 +141,7 @@ node_info() -> max_fds => proplists:get_value( max_fds, lists:usort(lists:flatten(erlang:system_info(check_io))) ), - connections => ets:info(emqx_channel, size), + connections => ets:info(?CHAN_TAB, size), node_status => 'running', uptime => proplists:get_value(uptime, BrokerInfo), version => iolist_to_binary(proplists:get_value(version, BrokerInfo)), @@ -487,7 +489,7 @@ subscribe([], _ClientId, _TopicTables) -> -spec do_subscribe(emqx_types:clientid(), emqx_types:topic_filters()) -> {subscribe, _} | {error, atom()}. do_subscribe(ClientId, TopicTables) -> - case ets:lookup(emqx_channel, ClientId) of + case ets:lookup(?CHAN_TAB, ClientId) of [] -> {error, channel_not_found}; [{_, Pid}] -> Pid ! {subscribe, TopicTables} end. @@ -514,7 +516,7 @@ unsubscribe([], _ClientId, _Topic) -> -spec do_unsubscribe(emqx_types:clientid(), emqx_types:topic()) -> {unsubscribe, _} | {error, _}. do_unsubscribe(ClientId, Topic) -> - case ets:lookup(emqx_channel, ClientId) of + case ets:lookup(?CHAN_TAB, ClientId) of [] -> {error, channel_not_found}; [{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(Topic)]} end. @@ -537,7 +539,7 @@ unsubscribe_batch([], _ClientId, _Topics) -> -spec do_unsubscribe_batch(emqx_types:clientid(), [emqx_types:topic()]) -> {unsubscribe_batch, _} | {error, _}. do_unsubscribe_batch(ClientId, Topics) -> - case ets:lookup(emqx_channel, ClientId) of + case ets:lookup(?CHAN_TAB, ClientId) of [] -> {error, channel_not_found}; [{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(Topic) || Topic <- Topics]} end. diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 5f2446aad..ec236c06c 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -20,6 +20,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_cm.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -57,7 +58,6 @@ %% for batch operation -export([do_subscribe/3]). --define(CLIENT_QTAB, emqx_channel_info). -define(TAGS, [<<"Clients">>]). -define(CLIENT_QSCHEMA, [ @@ -666,7 +666,7 @@ list_clients(QString) -> case maps:get(<<"node">>, QString, undefined) of undefined -> emqx_mgmt_api:cluster_query( - ?CLIENT_QTAB, + ?CHAN_INFO_TAB, QString, ?CLIENT_QSCHEMA, fun ?MODULE:qs2ms/2, @@ -678,7 +678,7 @@ list_clients(QString) -> QStringWithoutNode = maps:without([<<"node">>], QString), emqx_mgmt_api:node_query( Node1, - ?CLIENT_QTAB, + ?CHAN_INFO_TAB, QStringWithoutNode, ?CLIENT_QSCHEMA, fun ?MODULE:qs2ms/2, @@ -753,7 +753,7 @@ subscribe_batch(#{clientid := ClientID, topics := Topics}) -> %% We use emqx_channel instead of emqx_channel_info (used by the emqx_mgmt:lookup_client/2), %% as the emqx_channel_info table will only be populated after the hook `client.connected` %% has returned. So if one want to subscribe topics in this hook, it will fail. - case ets:lookup(emqx_channel, ClientID) of + case ets:lookup(?CHAN_TAB, ClientID) of [] -> {404, ?CLIENTID_NOT_FOUND}; _ -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_topics.erl b/apps/emqx_management/src/emqx_mgmt_api_topics.erl index f6e790bf4..b6294ecbf 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_topics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_topics.erl @@ -17,6 +17,7 @@ -module(emqx_mgmt_api_topics). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_router.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -111,7 +112,7 @@ do_list(Params) -> case emqx_mgmt_api:node_query( node(), - emqx_route, + ?ROUTE_TAB, Params, ?TOPICS_QUERY_SCHEMA, fun ?MODULE:qs2ms/2, diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 27de11d0f..3ebf8e314 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -17,6 +17,8 @@ -module(emqx_mgmt_cli). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_cm.hrl"). +-include_lib("emqx/include/emqx_router.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -168,7 +170,7 @@ sort_map_list_field(Field, Map) -> %% @doc Query clients clients(["list"]) -> - dump(emqx_channel, client); + dump(?CHAN_TAB, client); clients(["show", ClientId]) -> if_client(ClientId, fun print/1); clients(["kick", ClientId]) -> @@ -182,7 +184,7 @@ clients(_) -> ]). if_client(ClientId, Fun) -> - case ets:lookup(emqx_channel, (bin(ClientId))) of + case ets:lookup(?CHAN_TAB, (bin(ClientId))) of [] -> emqx_ctl:print("Not Found.~n"); [Channel] -> Fun({client, Channel}) end. @@ -191,9 +193,9 @@ if_client(ClientId, Fun) -> %% @doc Topics Command topics(["list"]) -> - dump(emqx_route, emqx_topic); + dump(?ROUTE_TAB, emqx_topic); topics(["show", Topic]) -> - Routes = ets:lookup(emqx_route, bin(Topic)), + Routes = ets:lookup(?ROUTE_TAB, bin(Topic)), [print({emqx_topic, Route}) || Route <- Routes]; topics(_) -> emqx_ctl:usage([ @@ -204,23 +206,23 @@ topics(_) -> subscriptions(["list"]) -> lists:foreach( fun(Suboption) -> - print({emqx_suboption, Suboption}) + print({?SUBOPTION, Suboption}) end, - ets:tab2list(emqx_suboption) + ets:tab2list(?SUBOPTION) ); subscriptions(["show", ClientId]) -> case ets:lookup(emqx_subid, bin(ClientId)) of [] -> emqx_ctl:print("Not Found.~n"); [{_, Pid}] -> - case ets:match_object(emqx_suboption, {{'_', Pid}, '_'}) of + case ets:match_object(?SUBOPTION, {{'_', Pid}, '_'}) of [] -> emqx_ctl:print("Not Found.~n"); - Suboption -> [print({emqx_suboption, Sub}) || Sub <- Suboption] + Suboption -> [print({?SUBOPTION, Sub}) || Sub <- Suboption] end end; subscriptions(["add", ClientId, Topic, QoS]) -> if_valid_qos(QoS, fun(IntQos) -> - case ets:lookup(emqx_channel, bin(ClientId)) of + case ets:lookup(?CHAN_TAB, bin(ClientId)) of [] -> emqx_ctl:print("Error: Channel not found!"); [{_, Pid}] -> @@ -230,7 +232,7 @@ subscriptions(["add", ClientId, Topic, QoS]) -> end end); subscriptions(["del", ClientId, Topic]) -> - case ets:lookup(emqx_channel, bin(ClientId)) of + case ets:lookup(?CHAN_TAB, bin(ClientId)) of [] -> emqx_ctl:print("Error: Channel not found!"); [{_, Pid}] -> @@ -841,7 +843,7 @@ print({emqx_topic, #route{topic = Topic, dest = {_, Node}}}) -> emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]); print({emqx_topic, #route{topic = Topic, dest = Node}}) -> emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]); -print({emqx_suboption, {{Topic, Pid}, Options}}) when is_pid(Pid) -> +print({?SUBOPTION, {{Topic, Pid}, Options}}) when is_pid(Pid) -> SubId = maps:get(subid, Options), QoS = maps:get(qos, Options, 0), NL = maps:get(nl, Options, 0), diff --git a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl index e617c6dcb..a2f546267 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl @@ -18,11 +18,10 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("emqx/include/emqx_router.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --define(ROUTE_TAB, emqx_route). - all() -> emqx_common_test_helpers:all(?MODULE).