From 65483e972ed80b04f8b6d6a659ab1b2203a002d9 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 31 May 2023 11:12:53 +0800 Subject: [PATCH 1/6] refactor: use unified table define --- apps/emqx/{src => include}/emqx_cm.hrl | 10 ++++++ apps/emqx/include/emqx_router.hrl | 31 +++++++++++++++++++ apps/emqx/src/emqx_broker.erl | 7 ++--- apps/emqx/src/emqx_channel.erl | 3 ++ apps/emqx/src/emqx_cm.erl | 15 +++------ apps/emqx/src/emqx_cm_registry.erl | 14 ++++----- apps/emqx/src/emqx_router.erl | 3 +- apps/emqx/src/emqx_router_helper.erl | 9 +++--- apps/emqx/src/emqx_ws_connection.erl | 3 +- apps/emqx/src/proto/emqx_cm_proto_v1.erl | 2 +- apps/emqx/src/proto/emqx_cm_proto_v2.erl | 2 +- apps/emqx/test/emqx_cm_SUITE.erl | 7 +++-- .../test/emqx_quic_multistreams_SUITE.erl | 5 +-- apps/emqx/test/emqx_router_SUITE.erl | 3 +- apps/emqx/test/emqx_router_helper_SUITE.erl | 6 ++-- apps/emqx/test/emqx_takeover_SUITE.erl | 3 +- .../test/emqx_eviction_agent_SUITE.erl | 3 +- apps/emqx_gateway/test/test_mqtt_broker.erl | 3 +- apps/emqx_management/src/emqx_mgmt.erl | 10 +++--- .../src/emqx_mgmt_api_clients.erl | 8 ++--- .../src/emqx_mgmt_api_topics.erl | 3 +- apps/emqx_management/src/emqx_mgmt_cli.erl | 24 +++++++------- .../test/emqx_mgmt_api_topics_SUITE.erl | 3 +- 23 files changed, 110 insertions(+), 67 deletions(-) rename apps/emqx/{src => include}/emqx_cm.hrl (75%) create mode 100644 apps/emqx/include/emqx_router.hrl diff --git a/apps/emqx/src/emqx_cm.hrl b/apps/emqx/include/emqx_cm.hrl similarity index 75% rename from apps/emqx/src/emqx_cm.hrl rename to apps/emqx/include/emqx_cm.hrl index 3896ea30a..ae70f131f 100644 --- a/apps/emqx/src/emqx_cm.hrl +++ b/apps/emqx/include/emqx_cm.hrl @@ -13,9 +13,19 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- + -ifndef(EMQX_CM_HRL). -define(EMQX_CM_HRL, true). +%% Tables for channel management. +-define(CHAN_TAB, emqx_channel). +-define(CHAN_CONN_TAB, emqx_channel_conn). +-define(CHAN_INFO_TAB, emqx_channel_info). +-define(CHAN_LIVE_TAB, emqx_channel_live). + +%% Mria/Mnesia Tables for channel management. +-define(CHAN_REG_TAB, emqx_channel_registry). + -define(T_KICK, 5_000). -define(T_GET_INFO, 5_000). -define(T_TAKEOVER, 15_000). diff --git a/apps/emqx/include/emqx_router.hrl b/apps/emqx/include/emqx_router.hrl new file mode 100644 index 000000000..035ff5455 --- /dev/null +++ b/apps/emqx/include/emqx_router.hrl @@ -0,0 +1,31 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_ROUTER_HRL). +-define(EMQX_ROUTER_HRL, true). + +%% ETS table for message routing +-define(ROUTE_TAB, emqx_route). + +%% Mnesia table for message routing +-define(ROUTING_NODE, emqx_routing_node). + +%% ETS tables for PubSub +-define(SUBOPTION, emqx_suboption). +-define(SUBSCRIBER, emqx_subscriber). +-define(SUBSCRIPTION, emqx_subscription). + +-endif. diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 5f7c4aaf5..390b732b9 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -19,6 +19,8 @@ -behaviour(gen_server). -include("emqx.hrl"). +-include("emqx_router.hrl"). + -include("logger.hrl"). -include("types.hrl"). -include("emqx_mqtt.hrl"). @@ -80,11 +82,6 @@ -define(BROKER, ?MODULE). -%% ETS tables for PubSub --define(SUBOPTION, emqx_suboption). --define(SUBSCRIBER, emqx_subscriber). --define(SUBSCRIPTION, emqx_subscription). - %% Guards -define(IS_SUBID(Id), (is_binary(Id) orelse is_atom(Id))). diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 5637bb171..75be6b905 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1866,6 +1866,9 @@ check_pub_caps( %%-------------------------------------------------------------------- %% Check Sub Authorization +%% TODO: not only check topic filter. Qos chould be checked too. +%% Not implemented yet: +%% MQTT-3.1.1 [MQTT-3.8.4-6] and MQTT-5.0 [MQTT-3.8.4-7] check_sub_authzs(TopicFilters, Channel) -> check_sub_authzs(TopicFilters, Channel, []). diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index db3d48f5d..c193cea44 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -20,6 +20,7 @@ -behaviour(gen_server). -include("emqx.hrl"). +-include("emqx_cm.hrl"). -include("logger.hrl"). -include("types.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -118,14 +119,6 @@ _Stats :: emqx_types:stats() }. --include("emqx_cm.hrl"). - -%% Tables for channel management. --define(CHAN_TAB, emqx_channel). --define(CHAN_CONN_TAB, emqx_channel_conn). --define(CHAN_INFO_TAB, emqx_channel_info). --define(CHAN_LIVE_TAB, emqx_channel_live). - -define(CHAN_STATS, [ {?CHAN_TAB, 'channels.count', 'channels.max'}, {?CHAN_TAB, 'sessions.count', 'sessions.max'}, @@ -669,12 +662,12 @@ lookup_client({username, Username}) -> MatchSpec = [ {{'_', #{clientinfo => #{username => '$1'}}, '_'}, [{'=:=', '$1', Username}], ['$_']} ], - ets:select(emqx_channel_info, MatchSpec); + ets:select(?CHAN_INFO_TAB, MatchSpec); lookup_client({clientid, ClientId}) -> [ Rec - || Key <- ets:lookup(emqx_channel, ClientId), - Rec <- ets:lookup(emqx_channel_info, Key) + || Key <- ets:lookup(?CHAN_TAB, ClientId), + Rec <- ets:lookup(?CHAN_INFO_TAB, Key) ]. %% @private diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index 8a5639486..058bb53ec 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -20,6 +20,7 @@ -behaviour(gen_server). -include("emqx.hrl"). +-include("emqx_cm.hrl"). -include("logger.hrl"). -include("types.hrl"). @@ -50,7 +51,6 @@ ]). -define(REGISTRY, ?MODULE). --define(TAB, emqx_channel_registry). -define(LOCK, {?MODULE, cleanup_down}). -record(channel, {chid, pid}). @@ -78,7 +78,7 @@ register_channel(ClientId) when is_binary(ClientId) -> register_channel({ClientId, self()}); register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> case is_enabled() of - true -> mria:dirty_write(?TAB, record(ClientId, ChanPid)); + true -> mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid)); false -> ok end. @@ -91,14 +91,14 @@ unregister_channel(ClientId) when is_binary(ClientId) -> unregister_channel({ClientId, self()}); unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> case is_enabled() of - true -> mria:dirty_delete_object(?TAB, record(ClientId, ChanPid)); + true -> mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid)); false -> ok end. %% @doc Lookup the global channels. -spec lookup_channels(emqx_types:clientid()) -> list(pid()). lookup_channels(ClientId) -> - [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(?TAB, ClientId)]. + [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(?CHAN_REG_TAB, ClientId)]. record(ClientId, ChanPid) -> #channel{chid = ClientId, pid = ChanPid}. @@ -109,7 +109,7 @@ record(ClientId, ChanPid) -> init([]) -> mria_config:set_dirty_shard(?CM_SHARD, true), - ok = mria:create_table(?TAB, [ + ok = mria:create_table(?CHAN_REG_TAB, [ {type, bag}, {rlog_shard, ?CM_SHARD}, {storage, ram_copies}, @@ -166,7 +166,7 @@ cleanup_channels(Node) -> do_cleanup_channels(Node) -> Pat = [{#channel{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}], - lists:foreach(fun delete_channel/1, mnesia:select(?TAB, Pat, write)). + lists:foreach(fun delete_channel/1, mnesia:select(?CHAN_REG_TAB, Pat, write)). delete_channel(Chan) -> - mnesia:delete_object(?TAB, Chan, write). + mnesia:delete_object(?CHAN_REG_TAB, Chan, write). diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 42430af5d..95b6136a7 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -22,6 +22,7 @@ -include("logger.hrl"). -include("types.hrl"). -include_lib("mria/include/mria.hrl"). +-include_lib("emqx/include/emqx_router.hrl"). %% Mnesia bootstrap -export([mnesia/1]). @@ -69,8 +70,6 @@ -type dest() :: node() | {group(), node()}. --define(ROUTE_TAB, emqx_route). - %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index 4bff98072..78cf62d6c 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -19,6 +19,7 @@ -behaviour(gen_server). -include("emqx.hrl"). +-include("emqx_router.hrl"). -include("logger.hrl"). -include("types.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -54,8 +55,6 @@ -record(routing_node, {name, const = unused}). --define(ROUTE, emqx_route). --define(ROUTING_NODE, emqx_routing_node). -define(LOCK, {?MODULE, cleanup_routes}). -dialyzer({nowarn_function, [cleanup_routes/1]}). @@ -185,7 +184,7 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- stats_fun() -> - case ets:info(?ROUTE, size) of + case ets:info(?ROUTE_TAB, size) of undefined -> ok; Size -> @@ -198,6 +197,6 @@ cleanup_routes(Node) -> #route{_ = '_', dest = {'_', Node}} ], [ - mnesia:delete_object(?ROUTE, Route, write) - || Pat <- Patterns, Route <- mnesia:match_object(?ROUTE, Pat, write) + mnesia:delete_object(?ROUTE_TAB, Route, write) + || Pat <- Patterns, Route <- mnesia:match_object(?ROUTE_TAB, Pat, write) ]. diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 00fe545eb..6cc86e5f4 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -18,6 +18,7 @@ -module(emqx_ws_connection). -include("emqx.hrl"). +-include("emqx_cm.hrl"). -include("emqx_mqtt.hrl"). -include("logger.hrl"). -include("types.hrl"). @@ -1034,7 +1035,7 @@ check_max_connection(Type, Listener) -> allow; Max -> MatchSpec = [{{'_', emqx_ws_connection}, [], [true]}], - Curr = ets:select_count(emqx_channel_conn, MatchSpec), + Curr = ets:select_count(?CHAN_CONN_TAB, MatchSpec), case Curr >= Max of false -> allow; diff --git a/apps/emqx/src/proto/emqx_cm_proto_v1.erl b/apps/emqx/src/proto/emqx_cm_proto_v1.erl index d81469df4..3c176d586 100644 --- a/apps/emqx/src/proto/emqx_cm_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_cm_proto_v1.erl @@ -33,7 +33,7 @@ ]). -include("bpapi.hrl"). --include("src/emqx_cm.hrl"). +-include_lib("emqx/include/emqx_cm.hrl"). introduced_in() -> "5.0.0". diff --git a/apps/emqx/src/proto/emqx_cm_proto_v2.erl b/apps/emqx/src/proto/emqx_cm_proto_v2.erl index 4208df97f..9458ba1be 100644 --- a/apps/emqx/src/proto/emqx_cm_proto_v2.erl +++ b/apps/emqx/src/proto/emqx_cm_proto_v2.erl @@ -34,7 +34,7 @@ ]). -include("bpapi.hrl"). --include("src/emqx_cm.hrl"). +-include_lib("emqx/include/emqx_cm.hrl"). introduced_in() -> "5.0.0". diff --git a/apps/emqx/test/emqx_cm_SUITE.erl b/apps/emqx/test/emqx_cm_SUITE.erl index 5589f5bd8..6cb58be46 100644 --- a/apps/emqx/test/emqx_cm_SUITE.erl +++ b/apps/emqx/test/emqx_cm_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_cm.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -200,10 +201,10 @@ t_open_session_race_condition(_) -> end, Winner = WaitForDowns(Pids), - ?assertMatch([_], ets:lookup(emqx_channel, ClientId)), + ?assertMatch([_], ets:lookup(?CHAN_TAB, ClientId)), ?assertEqual([Winner], emqx_cm:lookup_channels(ClientId)), - ?assertMatch([_], ets:lookup(emqx_channel_conn, {ClientId, Winner})), - ?assertMatch([_], ets:lookup(emqx_channel_registry, ClientId)), + ?assertMatch([_], ets:lookup(?CHAN_CONN_TAB, {ClientId, Winner})), + ?assertMatch([_], ets:lookup(?CHAN_REG_TAB, ClientId)), exit(Winner, kill), receive diff --git a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl index 1f4ccab88..1b45cb669 100644 --- a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl +++ b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl @@ -23,6 +23,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("quicer/include/quicer.hrl"). +-include_lib("emqx/include/emqx_cm.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -1465,7 +1466,7 @@ t_multi_streams_emqx_ctrl_kill(Config) -> ), ClientId = proplists:get_value(clientid, emqtt:info(C)), - [{ClientId, TransPid}] = ets:lookup(emqx_channel, ClientId), + [{ClientId, TransPid}] = ets:lookup(?CHAN_TAB, ClientId), exit(TransPid, kill), %% Client should be closed @@ -1518,7 +1519,7 @@ t_multi_streams_emqx_ctrl_exit_normal(Config) -> ), ClientId = proplists:get_value(clientid, emqtt:info(C)), - [{ClientId, TransPid}] = ets:lookup(emqx_channel, ClientId), + [{ClientId, TransPid}] = ets:lookup(?CHAN_TAB, ClientId), emqx_connection:stop(TransPid), %% Client exit normal. diff --git a/apps/emqx/test/emqx_router_SUITE.erl b/apps/emqx/test/emqx_router_SUITE.erl index 2db0acf82..067f11634 100644 --- a/apps/emqx/test/emqx_router_SUITE.erl +++ b/apps/emqx/test/emqx_router_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_router.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). @@ -127,5 +128,5 @@ t_unexpected(_) -> clear_tables() -> lists:foreach( fun mnesia:clear_table/1, - [emqx_route, emqx_trie, emqx_trie_node] + [?ROUTE_TAB, ?TRIE, emqx_trie_node] ). diff --git a/apps/emqx/test/emqx_router_helper_SUITE.erl b/apps/emqx/test/emqx_router_helper_SUITE.erl index 9fc3bd97b..c0796288e 100644 --- a/apps/emqx/test/emqx_router_helper_SUITE.erl +++ b/apps/emqx/test/emqx_router_helper_SUITE.erl @@ -20,11 +20,11 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/emqx_router.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(ROUTER_HELPER, emqx_router_helper). --define(ROUTE_TAB, emqx_route). all() -> emqx_common_test_helpers:all(?MODULE). @@ -82,9 +82,9 @@ t_monitor(_) -> emqx_router_helper:monitor(undefined). t_mnesia(_) -> - ?ROUTER_HELPER ! {mnesia_table_event, {delete, {emqx_routing_node, node()}, undefined}}, + ?ROUTER_HELPER ! {mnesia_table_event, {delete, {?ROUTING_NODE, node()}, undefined}}, ?ROUTER_HELPER ! {mnesia_table_event, testing}, - ?ROUTER_HELPER ! {mnesia_table_event, {write, {emqx_routing_node, node()}, undefined}}, + ?ROUTER_HELPER ! {mnesia_table_event, {write, {?ROUTING_NODE, node()}, undefined}}, ?ROUTER_HELPER ! {membership, testing}, ?ROUTER_HELPER ! {membership, {mnesia, down, node()}}, ct:sleep(200). diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index d5b36c2c3..0b0ab7121 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_cm.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -117,7 +118,7 @@ load_meck(ClientId) -> [ChanPid] = emqx_cm:lookup_channels(ClientId), ChanInfo = #{conninfo := ConnInfo} = emqx_cm:get_chan_info(ClientId), NChanInfo = ChanInfo#{conninfo := ConnInfo#{conn_mod := fake_conn_mod}}, - true = ets:update_element(emqx_channel_info, {ClientId, ChanPid}, {2, NChanInfo}). + true = ets:update_element(?CHAN_INFO_TAB, {ClientId, ChanPid}, {2, NChanInfo}). unload_meck(_ClientId) -> meck:unload(fake_conn_mod). diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl index 883407a94..65df26387 100644 --- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl +++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl @@ -11,6 +11,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/asserts.hrl"). +-include_lib("emqx/include/emqx_cm.hrl"). -import( emqx_eviction_agent_test_helpers, @@ -295,7 +296,7 @@ t_session_serialization(_Config) -> ?assertMatch( #{data := [#{clientid := <<"client_with_session">>}]}, emqx_mgmt_api:cluster_query( - emqx_channel_info, + ?CHAN_INFO_TAB, #{}, [], fun emqx_mgmt_api_clients:qs2ms/2, diff --git a/apps/emqx_gateway/test/test_mqtt_broker.erl b/apps/emqx_gateway/test/test_mqtt_broker.erl index 497f81f76..74717dc1f 100644 --- a/apps/emqx_gateway/test/test_mqtt_broker.erl +++ b/apps/emqx_gateway/test/test_mqtt_broker.erl @@ -24,6 +24,7 @@ -record(state, {subscriber}). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_router.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). @@ -50,7 +51,7 @@ unsubscribe(Topic) -> gen_server:call(?MODULE, {unsubscribe, Topic}). get_subscrbied_topics() -> - [Topic || {_Client, Topic} <- ets:tab2list(emqx_subscription)]. + [Topic || {_Client, Topic} <- ets:tab2list(?SUBSCRIPTION)]. start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 24a4c3fe4..8f40190bd 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -17,6 +17,8 @@ -module(emqx_mgmt). -include("emqx_mgmt.hrl"). +-include_lib("emqx/include/emqx_cm.hrl"). + -elvis([{elvis_style, invalid_dynamic_call, disable}]). -elvis([{elvis_style, god_modules, disable}]). @@ -139,7 +141,7 @@ node_info() -> max_fds => proplists:get_value( max_fds, lists:usort(lists:flatten(erlang:system_info(check_io))) ), - connections => ets:info(emqx_channel, size), + connections => ets:info(?CHAN_TAB, size), node_status => 'running', uptime => proplists:get_value(uptime, BrokerInfo), version => iolist_to_binary(proplists:get_value(version, BrokerInfo)), @@ -487,7 +489,7 @@ subscribe([], _ClientId, _TopicTables) -> -spec do_subscribe(emqx_types:clientid(), emqx_types:topic_filters()) -> {subscribe, _} | {error, atom()}. do_subscribe(ClientId, TopicTables) -> - case ets:lookup(emqx_channel, ClientId) of + case ets:lookup(?CHAN_TAB, ClientId) of [] -> {error, channel_not_found}; [{_, Pid}] -> Pid ! {subscribe, TopicTables} end. @@ -514,7 +516,7 @@ unsubscribe([], _ClientId, _Topic) -> -spec do_unsubscribe(emqx_types:clientid(), emqx_types:topic()) -> {unsubscribe, _} | {error, _}. do_unsubscribe(ClientId, Topic) -> - case ets:lookup(emqx_channel, ClientId) of + case ets:lookup(?CHAN_TAB, ClientId) of [] -> {error, channel_not_found}; [{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(Topic)]} end. @@ -537,7 +539,7 @@ unsubscribe_batch([], _ClientId, _Topics) -> -spec do_unsubscribe_batch(emqx_types:clientid(), [emqx_types:topic()]) -> {unsubscribe_batch, _} | {error, _}. do_unsubscribe_batch(ClientId, Topics) -> - case ets:lookup(emqx_channel, ClientId) of + case ets:lookup(?CHAN_TAB, ClientId) of [] -> {error, channel_not_found}; [{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(Topic) || Topic <- Topics]} end. diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 5f2446aad..ec236c06c 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -20,6 +20,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_cm.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -57,7 +58,6 @@ %% for batch operation -export([do_subscribe/3]). --define(CLIENT_QTAB, emqx_channel_info). -define(TAGS, [<<"Clients">>]). -define(CLIENT_QSCHEMA, [ @@ -666,7 +666,7 @@ list_clients(QString) -> case maps:get(<<"node">>, QString, undefined) of undefined -> emqx_mgmt_api:cluster_query( - ?CLIENT_QTAB, + ?CHAN_INFO_TAB, QString, ?CLIENT_QSCHEMA, fun ?MODULE:qs2ms/2, @@ -678,7 +678,7 @@ list_clients(QString) -> QStringWithoutNode = maps:without([<<"node">>], QString), emqx_mgmt_api:node_query( Node1, - ?CLIENT_QTAB, + ?CHAN_INFO_TAB, QStringWithoutNode, ?CLIENT_QSCHEMA, fun ?MODULE:qs2ms/2, @@ -753,7 +753,7 @@ subscribe_batch(#{clientid := ClientID, topics := Topics}) -> %% We use emqx_channel instead of emqx_channel_info (used by the emqx_mgmt:lookup_client/2), %% as the emqx_channel_info table will only be populated after the hook `client.connected` %% has returned. So if one want to subscribe topics in this hook, it will fail. - case ets:lookup(emqx_channel, ClientID) of + case ets:lookup(?CHAN_TAB, ClientID) of [] -> {404, ?CLIENTID_NOT_FOUND}; _ -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_topics.erl b/apps/emqx_management/src/emqx_mgmt_api_topics.erl index f6e790bf4..b6294ecbf 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_topics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_topics.erl @@ -17,6 +17,7 @@ -module(emqx_mgmt_api_topics). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_router.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -111,7 +112,7 @@ do_list(Params) -> case emqx_mgmt_api:node_query( node(), - emqx_route, + ?ROUTE_TAB, Params, ?TOPICS_QUERY_SCHEMA, fun ?MODULE:qs2ms/2, diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 27de11d0f..3ebf8e314 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -17,6 +17,8 @@ -module(emqx_mgmt_cli). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_cm.hrl"). +-include_lib("emqx/include/emqx_router.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -168,7 +170,7 @@ sort_map_list_field(Field, Map) -> %% @doc Query clients clients(["list"]) -> - dump(emqx_channel, client); + dump(?CHAN_TAB, client); clients(["show", ClientId]) -> if_client(ClientId, fun print/1); clients(["kick", ClientId]) -> @@ -182,7 +184,7 @@ clients(_) -> ]). if_client(ClientId, Fun) -> - case ets:lookup(emqx_channel, (bin(ClientId))) of + case ets:lookup(?CHAN_TAB, (bin(ClientId))) of [] -> emqx_ctl:print("Not Found.~n"); [Channel] -> Fun({client, Channel}) end. @@ -191,9 +193,9 @@ if_client(ClientId, Fun) -> %% @doc Topics Command topics(["list"]) -> - dump(emqx_route, emqx_topic); + dump(?ROUTE_TAB, emqx_topic); topics(["show", Topic]) -> - Routes = ets:lookup(emqx_route, bin(Topic)), + Routes = ets:lookup(?ROUTE_TAB, bin(Topic)), [print({emqx_topic, Route}) || Route <- Routes]; topics(_) -> emqx_ctl:usage([ @@ -204,23 +206,23 @@ topics(_) -> subscriptions(["list"]) -> lists:foreach( fun(Suboption) -> - print({emqx_suboption, Suboption}) + print({?SUBOPTION, Suboption}) end, - ets:tab2list(emqx_suboption) + ets:tab2list(?SUBOPTION) ); subscriptions(["show", ClientId]) -> case ets:lookup(emqx_subid, bin(ClientId)) of [] -> emqx_ctl:print("Not Found.~n"); [{_, Pid}] -> - case ets:match_object(emqx_suboption, {{'_', Pid}, '_'}) of + case ets:match_object(?SUBOPTION, {{'_', Pid}, '_'}) of [] -> emqx_ctl:print("Not Found.~n"); - Suboption -> [print({emqx_suboption, Sub}) || Sub <- Suboption] + Suboption -> [print({?SUBOPTION, Sub}) || Sub <- Suboption] end end; subscriptions(["add", ClientId, Topic, QoS]) -> if_valid_qos(QoS, fun(IntQos) -> - case ets:lookup(emqx_channel, bin(ClientId)) of + case ets:lookup(?CHAN_TAB, bin(ClientId)) of [] -> emqx_ctl:print("Error: Channel not found!"); [{_, Pid}] -> @@ -230,7 +232,7 @@ subscriptions(["add", ClientId, Topic, QoS]) -> end end); subscriptions(["del", ClientId, Topic]) -> - case ets:lookup(emqx_channel, bin(ClientId)) of + case ets:lookup(?CHAN_TAB, bin(ClientId)) of [] -> emqx_ctl:print("Error: Channel not found!"); [{_, Pid}] -> @@ -841,7 +843,7 @@ print({emqx_topic, #route{topic = Topic, dest = {_, Node}}}) -> emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]); print({emqx_topic, #route{topic = Topic, dest = Node}}) -> emqx_ctl:print("~ts -> ~ts~n", [Topic, Node]); -print({emqx_suboption, {{Topic, Pid}, Options}}) when is_pid(Pid) -> +print({?SUBOPTION, {{Topic, Pid}, Options}}) when is_pid(Pid) -> SubId = maps:get(subid, Options), QoS = maps:get(qos, Options, 0), NL = maps:get(nl, Options, 0), diff --git a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl index e617c6dcb..a2f546267 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl @@ -18,11 +18,10 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("emqx/include/emqx_router.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --define(ROUTE_TAB, emqx_route). - all() -> emqx_common_test_helpers:all(?MODULE). From ea81a924f1702ab8da58a51aeb31396dea8c6238 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 1 Jun 2023 14:34:25 +0800 Subject: [PATCH 2/6] refactor: move topic() types def in `emqx_types.erl` --- apps/emqx/include/emqx_mqtt.hrl | 7 +++++++ apps/emqx/src/emqx_broker.erl | 4 ++-- apps/emqx/src/emqx_frame.erl | 2 ++ apps/emqx/src/emqx_mqueue.erl | 3 +-- apps/emqx/src/emqx_session_router.erl | 12 +++++------ apps/emqx/src/emqx_shared_sub.erl | 4 ++-- apps/emqx/src/emqx_topic.erl | 21 +++++++------------ apps/emqx/src/emqx_trie.erl | 6 +++--- apps/emqx/src/emqx_types.erl | 15 ++++++++++--- apps/emqx_authz/src/emqx_authz_mnesia.erl | 2 +- .../src/emqx_bridge_mqtt_egress.erl | 2 +- .../src/emqx_bridge_mqtt_ingress.erl | 2 +- .../emqx_retainer/src/emqx_retainer_index.erl | 14 ++++++------- 13 files changed, 52 insertions(+), 42 deletions(-) diff --git a/apps/emqx/include/emqx_mqtt.hrl b/apps/emqx/include/emqx_mqtt.hrl index e4f9111da..8a40f614f 100644 --- a/apps/emqx/include/emqx_mqtt.hrl +++ b/apps/emqx/include/emqx_mqtt.hrl @@ -48,6 +48,13 @@ {?MQTT_PROTO_V5, <<"MQTT">>} ]). +%%-------------------------------------------------------------------- +%% MQTT Topic and TopitFilter byte length +%%-------------------------------------------------------------------- + +%% MQTT-3.1.1 and MQTT-5.0 [MQTT-4.7.3-3] +-define(MAX_TOPIC_LEN, 65535). + %%-------------------------------------------------------------------- %% MQTT QoS Levels %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 390b732b9..1bbe4dc82 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -103,10 +103,10 @@ start_link(Pool, Id) -> create_tabs() -> TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}], - %% SubOption: {Topic, SubPid} -> SubOption + %% SubOption: {TopicFilter, SubPid} -> SubOption ok = emqx_utils_ets:new(?SUBOPTION, [ordered_set | TabOpts]), - %% Subscription: SubPid -> Topic1, Topic2, Topic3, ... + %% Subscription: SubPid -> TopicFilter1, TopicFilter2, TopicFilter3, ... %% duplicate_bag: o(1) insert ok = emqx_utils_ets:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]), diff --git a/apps/emqx/src/emqx_frame.erl b/apps/emqx/src/emqx_frame.erl index 5999e93f3..8620f834f 100644 --- a/apps/emqx/src/emqx_frame.erl +++ b/apps/emqx/src/emqx_frame.erl @@ -300,6 +300,7 @@ parse_connect2( ConnPacket = #mqtt_packet_connect{ proto_name = ProtoName, proto_ver = ProtoVer, + %% For bridge mode, non-standard implementation is_bridge = (BridgeTag =:= 8), clean_start = bool(CleanStart), will_flag = bool(WillFlag), @@ -762,6 +763,7 @@ serialize_variable( #mqtt_packet_connect{ proto_name = ProtoName, proto_ver = ProtoVer, + %% For bridge mode, non-standard implementation is_bridge = IsBridge, clean_start = CleanStart, will_flag = WillFlag, diff --git a/apps/emqx/src/emqx_mqueue.erl b/apps/emqx/src/emqx_mqueue.erl index fb43941ee..0ef7d56e5 100644 --- a/apps/emqx/src/emqx_mqueue.erl +++ b/apps/emqx/src/emqx_mqueue.erl @@ -75,11 +75,10 @@ -export_type([mqueue/0, options/0]). --type topic() :: emqx_types:topic(). -type priority() :: infinity | integer(). -type pq() :: emqx_pqueue:q(). -type count() :: non_neg_integer(). --type p_table() :: ?NO_PRIORITY_TABLE | #{topic() := priority()}. +-type p_table() :: ?NO_PRIORITY_TABLE | #{emqx_types:topic() := priority()}. -type options() :: #{ max_len := count(), priorities => p_table(), diff --git a/apps/emqx/src/emqx_session_router.erl b/apps/emqx/src/emqx_session_router.erl index 0435ddca3..47e4bdf74 100644 --- a/apps/emqx/src/emqx_session_router.erl +++ b/apps/emqx/src/emqx_session_router.erl @@ -57,9 +57,7 @@ code_change/3 ]). --type group() :: binary(). - --type dest() :: node() | {group(), node()}. +-type dest() :: node() | {emqx_types:group(), node()}. -define(ROUTE_RAM_TAB, emqx_session_route_ram). -define(ROUTE_DISC_TAB, emqx_session_route_disc). @@ -114,7 +112,7 @@ start_link(Pool, Id) -> %% Route APIs %%-------------------------------------------------------------------- --spec do_add_route(emqx_topic:topic(), dest()) -> ok | {error, term()}. +-spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}. do_add_route(Topic, SessionID) when is_binary(Topic) -> Route = #route{topic = Topic, dest = SessionID}, case lists:member(Route, lookup_routes(Topic)) of @@ -135,7 +133,7 @@ do_add_route(Topic, SessionID) when is_binary(Topic) -> end. %% @doc Match routes --spec match_routes(emqx_topic:topic()) -> [emqx_types:route()]. +-spec match_routes(emqx_types:topic()) -> [emqx_types:route()]. match_routes(Topic) when is_binary(Topic) -> case match_trie(Topic) of [] -> lookup_routes(Topic); @@ -153,7 +151,7 @@ match_trie(Topic) -> delete_routes(SessionID, Subscriptions) -> cast(pick(SessionID), {delete_routes, SessionID, Subscriptions}). --spec do_delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}. +-spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}. do_delete_route(Topic, SessionID) -> Route = #route{topic = Topic, dest = SessionID}, case emqx_topic:wildcard(Topic) of @@ -165,7 +163,7 @@ do_delete_route(Topic, SessionID) -> end. %% @doc Print routes to a topic --spec print_routes(emqx_topic:topic()) -> ok. +-spec print_routes(emqx_types:topic()) -> ok. print_routes(Topic) -> lists:foreach( fun(#route{topic = To, dest = SessionID}) -> diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index bc13a06c6..3a370ddba 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -97,7 +97,7 @@ -define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}). -define(SUBSCRIBER_DOWN, noproc). --type redispatch_to() :: ?REDISPATCH_TO(emqx_topic:group(), emqx_topic:topic()). +-type redispatch_to() :: ?REDISPATCH_TO(emqx_types:group(), emqx_types:topic()). -record(state, {pmon}). @@ -156,7 +156,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> end end. --spec strategy(emqx_topic:group()) -> strategy(). +-spec strategy(emqx_types:group()) -> strategy(). strategy(Group) -> try emqx:get_config([ diff --git a/apps/emqx/src/emqx_topic.erl b/apps/emqx/src/emqx_topic.erl index 6434154ed..f667455b4 100644 --- a/apps/emqx/src/emqx_topic.erl +++ b/apps/emqx/src/emqx_topic.erl @@ -16,6 +16,8 @@ -module(emqx_topic). +-include("emqx_mqtt.hrl"). + %% APIs -export([ match/2, @@ -33,18 +35,9 @@ parse/2 ]). --export_type([ - group/0, - topic/0, - word/0 -]). - --type group() :: binary(). --type topic() :: binary(). --type word() :: '' | '+' | '#' | binary(). --type words() :: list(word()). - --define(MAX_TOPIC_LEN, 65535). +-type topic() :: emqx_types:topic(). +-type word() :: emqx_types:word(). +-type words() :: emqx_types:words(). %%-------------------------------------------------------------------- %% APIs @@ -142,6 +135,7 @@ prepend(Parent0, W) -> _ -> <> end. +-spec bin(word()) -> binary(). bin('') -> <<>>; bin('+') -> <<"+">>; bin('#') -> <<"#">>; @@ -163,6 +157,7 @@ tokens(Topic) -> words(Topic) when is_binary(Topic) -> [word(W) || W <- tokens(Topic)]. +-spec word(binary()) -> word(). word(<<>>) -> ''; word(<<"+">>) -> '+'; word(<<"#">>) -> '#'; @@ -185,7 +180,7 @@ feed_var(Var, Val, [Var | Words], Acc) -> feed_var(Var, Val, [W | Words], Acc) -> feed_var(Var, Val, Words, [W | Acc]). --spec join(list(binary())) -> binary(). +-spec join(list(word())) -> binary(). join([]) -> <<>>; join([W]) -> diff --git a/apps/emqx/src/emqx_trie.erl b/apps/emqx/src/emqx_trie.erl index b4ff49bb3..229a0e3f4 100644 --- a/apps/emqx/src/emqx_trie.erl +++ b/apps/emqx/src/emqx_trie.erl @@ -114,7 +114,7 @@ create_session_trie(Type) -> insert(Topic) when is_binary(Topic) -> insert(Topic, ?TRIE). --spec insert_session(emqx_topic:topic()) -> ok. +-spec insert_session(emqx_types:topic()) -> ok. insert_session(Topic) when is_binary(Topic) -> insert(Topic, session_trie()). @@ -132,7 +132,7 @@ delete(Topic) when is_binary(Topic) -> delete(Topic, ?TRIE). %% @doc Delete a topic filter from the trie. --spec delete_session(emqx_topic:topic()) -> ok. +-spec delete_session(emqx_types:topic()) -> ok. delete_session(Topic) when is_binary(Topic) -> delete(Topic, session_trie()). @@ -148,7 +148,7 @@ delete(Topic, Trie) when is_binary(Topic) -> match(Topic) when is_binary(Topic) -> match(Topic, ?TRIE). --spec match_session(emqx_topic:topic()) -> list(emqx_topic:topic()). +-spec match_session(emqx_types:topic()) -> list(emqx_types:topic()). match_session(Topic) when is_binary(Topic) -> match(Topic, session_trie()). diff --git a/apps/emqx/src/emqx_types.erl b/apps/emqx/src/emqx_types.erl index a7ec0cec4..c6a567318 100644 --- a/apps/emqx/src/emqx_types.erl +++ b/apps/emqx/src/emqx_types.erl @@ -29,10 +29,16 @@ -export_type([ zone/0, pubsub/0, - topic/0, subid/0 ]). +-export_type([ + group/0, + topic/0, + word/0, + words/0 +]). + -export_type([ socktype/0, sockstate/0, @@ -122,9 +128,13 @@ -type zone() :: atom(). -type pubsub() :: publish | subscribe. --type topic() :: emqx_topic:topic(). -type subid() :: binary() | atom(). +-type group() :: binary() | undefined. +-type topic() :: binary(). +-type word() :: '' | '+' | '#' | binary(). +-type words() :: list(word()). + -type socktype() :: tcp | udp | ssl | proxy | atom(). -type sockstate() :: idle | running | blocked | closed. -type conninfo() :: #{ @@ -230,7 +240,6 @@ | {share, topic(), deliver_result()} ]. -type route() :: #route{}. --type group() :: emqx_topic:group(). -type route_entry() :: {topic(), node()} | {topic, group()}. -type command() :: #command{}. diff --git a/apps/emqx_authz/src/emqx_authz_mnesia.erl b/apps/emqx_authz/src/emqx_authz_mnesia.erl index 6b9c367d1..58df08653 100644 --- a/apps/emqx_authz/src/emqx_authz_mnesia.erl +++ b/apps/emqx_authz/src/emqx_authz_mnesia.erl @@ -33,7 +33,7 @@ -type clientid() :: {clientid, binary()}. -type who() :: username() | clientid() | all. --type rule() :: {emqx_authz_rule:permission(), emqx_authz_rule:action(), emqx_topic:topic()}. +-type rule() :: {emqx_authz_rule:permission(), emqx_authz_rule:action(), emqx_types:topic()}. -type rules() :: [rule()]. -record(emqx_acl, { diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl index 673a30726..2d50d92ce 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl @@ -49,7 +49,7 @@ -type egress() :: #{ local => #{ - topic => emqx_topic:topic() + topic => emqx_types:topic() }, remote := emqx_bridge_mqtt_msg:msgvars() }. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl index 91ec27e74..c0b2da908 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl @@ -43,7 +43,7 @@ -type ingress() :: #{ server := string(), remote := #{ - topic := emqx_topic:topic(), + topic := emqx_types:topic(), qos => emqx_types:qos() }, local := emqx_bridge_mqtt_msg:msgvars(), diff --git a/apps/emqx_retainer/src/emqx_retainer_index.erl b/apps/emqx_retainer/src/emqx_retainer_index.erl index 55fe7ce3f..d1e5425ed 100644 --- a/apps/emqx_retainer/src/emqx_retainer_index.erl +++ b/apps/emqx_retainer/src/emqx_retainer_index.erl @@ -31,7 +31,7 @@ -type index() :: list(pos_integer()). %% @doc Index key is a term that can be effectively searched in the index table. --type index_key() :: {index(), {emqx_topic:words(), emqx_topic:words()}}. +-type index_key() :: {index(), {emqx_types:words(), emqx_types:words()}}. -type match_pattern_part() :: term(). @@ -42,7 +42,7 @@ %% @doc Given words of a concrete topic (`Tokens') and a list of `Indices', %% constructs index keys for the topic and each of the indices. %% `Fun' is called with each of these keys. --spec foreach_index_key(fun((index_key()) -> any()), list(index()), emqx_topic:words()) -> ok. +-spec foreach_index_key(fun((index_key()) -> any()), list(index()), emqx_types:words()) -> ok. foreach_index_key(_Fun, [], _Tokens) -> ok; foreach_index_key(Fun, [Index | Indices], Tokens) -> @@ -59,7 +59,7 @@ foreach_index_key(Fun, [Index | Indices], Tokens) -> %% returns `{[2, 3], {[<<"b">>, <<"c">>], [<<"a">>, <<"d">>]}}' term. %% %% @see foreach_index_key/3 --spec to_index_key(index(), emqx_topic:words()) -> index_key(). +-spec to_index_key(index(), emqx_types:words()) -> index_key(). to_index_key(Index, Tokens) -> {Index, split_index_tokens(Index, Tokens, 1, [], [])}. @@ -73,7 +73,7 @@ to_index_key(Index, Tokens) -> %% %% @see foreach_index_key/3 %% @see to_index_key/2 --spec index_score(index(), emqx_topic:words()) -> non_neg_integer(). +-spec index_score(index(), emqx_types:words()) -> non_neg_integer(). index_score(Index, Tokens) -> index_score(Index, Tokens, 1, 0). @@ -92,7 +92,7 @@ select_index(Tokens, Indices) -> %% %% E.g. for `[2, 3]' index and ['+', <<"b">>, '+', <<"d">>] wildcard topic %% returns {[2, 3], {[<<"b">>, '_'], ['_', <<"d">>]}} pattern. --spec condition(index(), emqx_topic:words()) -> match_pattern_part(). +-spec condition(index(), emqx_types:words()) -> match_pattern_part(). condition(Index, Tokens) -> {Index, condition(Index, Tokens, 1, [], [])}. @@ -100,7 +100,7 @@ condition(Index, Tokens) -> %% %% E.g. for ['+', <<"b">>, '+', <<"d">>, '#'] wildcard topic %% returns ['_', <<"b">>, '_', <<"d">> | '_'] pattern. --spec condition(emqx_topic:words()) -> match_pattern_part(). +-spec condition(emqx_types:words()) -> match_pattern_part(). condition(Tokens) -> Tokens1 = [ case W =:= '+' of @@ -118,7 +118,7 @@ condition(Tokens) -> %% %% E.g given `{[2, 3], {[<<"b">>, <<"c">>], [<<"a">>, <<"d">>]}}' index key %% returns `[<<"a">>, <<"b">>, <<"c">>, <<"d">>]' topic. --spec restore_topic(index_key()) -> emqx_topic:words(). +-spec restore_topic(index_key()) -> emqx_types:words(). restore_topic({Index, {IndexTokens, OtherTokens}}) -> restore_topic(Index, IndexTokens, OtherTokens, 1, []). From 070e410c69473ea08ecda7905ea46dc87267fa67 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 1 Jun 2023 15:24:31 +0800 Subject: [PATCH 3/6] refactor: variable and func rename --- apps/emqx/src/emqx_channel.erl | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 75be6b905..79c9ba528 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -484,13 +484,13 @@ handle_in( {ok, Channel} end; handle_in( - Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), + SubPkt = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), Channel = #channel{clientinfo = ClientInfo} ) -> - case emqx_packet:check(Packet) of + case emqx_packet:check(SubPkt) of ok -> TopicFilters0 = parse_topic_filters(TopicFilters), - TopicFilters1 = put_subid_in_subopts(Properties, TopicFilters0), + TopicFilters1 = enrich_subopts_subid(Properties, TopicFilters0), TupleTopicFilters0 = check_sub_authzs(TopicFilters1, Channel), HasAuthzDeny = lists:any( fun({_TopicFilter, ReasonCode}) -> @@ -503,7 +503,10 @@ handle_in( true -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel); false -> - TopicFilters2 = [TopicFilter || {TopicFilter, 0} <- TupleTopicFilters0], + TopicFilters2 = [ + TopicFilter + || {TopicFilter, ?RC_SUCCESS} <- TupleTopicFilters0 + ], TopicFilters3 = run_hooks( 'client.subscribe', [ClientInfo, Properties], @@ -1879,7 +1882,7 @@ check_sub_authzs( ) -> case emqx_access_control:authorize(ClientInfo, subscribe, Topic) of allow -> - check_sub_authzs(More, Channel, [{TopicFilter, 0} | Acc]); + check_sub_authzs(More, Channel, [{TopicFilter, ?RC_SUCCESS} | Acc]); deny -> check_sub_authzs(More, Channel, [{TopicFilter, ?RC_NOT_AUTHORIZED} | Acc]) end; @@ -1895,9 +1898,9 @@ check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = ClientInfo}) -> %%-------------------------------------------------------------------- %% Enrich SubId -put_subid_in_subopts(#{'Subscription-Identifier' := SubId}, TopicFilters) -> +enrich_subopts_subid(#{'Subscription-Identifier' := SubId}, TopicFilters) -> [{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters]; -put_subid_in_subopts(_Properties, TopicFilters) -> +enrich_subopts_subid(_Properties, TopicFilters) -> TopicFilters. %%-------------------------------------------------------------------- From b0e18f5e753ab35c4f605e0ee03d65d8e6b74f5b Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 1 Jun 2023 17:31:13 +0800 Subject: [PATCH 4/6] fix(shared-sub): validate share topic error send DISCONNECT --- apps/emqx/include/emqx_mqtt.hrl | 5 +++++ apps/emqx/src/emqx_topic.erl | 34 +++++++++++++++++++++++++++-- apps/emqx/test/emqx_topic_SUITE.erl | 26 ++++++++++++++++++++-- 3 files changed, 61 insertions(+), 4 deletions(-) diff --git a/apps/emqx/include/emqx_mqtt.hrl b/apps/emqx/include/emqx_mqtt.hrl index 8a40f614f..2bd27339c 100644 --- a/apps/emqx/include/emqx_mqtt.hrl +++ b/apps/emqx/include/emqx_mqtt.hrl @@ -669,6 +669,11 @@ end). end ). +-define(SHARE_EMPTY_FILTER, share_subscription_topic_cannot_be_empty). +-define(SHARE_EMPTY_GROUP, share_subscription_group_name_cannot_be_empty). +-define(SHARE_RECURSIVELY, '$share_cannot_be_used_as_real_topic_filter'). +-define(SHARE_NAME_INVALID_CHAR, share_subscription_group_name_cannot_include_wildcard). + -define(FRAME_PARSE_ERROR, frame_parse_error). -define(FRAME_SERIALIZE_ERROR, frame_serialize_error). -define(THROW_FRAME_ERROR(Reason), erlang:throw({?FRAME_PARSE_ERROR, Reason})). diff --git a/apps/emqx/src/emqx_topic.erl b/apps/emqx/src/emqx_topic.erl index f667455b4..c1d91fbf1 100644 --- a/apps/emqx/src/emqx_topic.erl +++ b/apps/emqx/src/emqx_topic.erl @@ -90,11 +90,15 @@ validate({Type, Topic}) when Type =:= name; Type =:= filter -> -spec validate(name | filter, topic()) -> true. validate(_, <<>>) -> + %% MQTT-5.0 [MQTT-4.7.3-1] error(empty_topic); validate(_, Topic) when is_binary(Topic) andalso (size(Topic) > ?MAX_TOPIC_LEN) -> + %% MQTT-5.0 [MQTT-4.7.3-3] error(topic_too_long); -validate(filter, Topic) when is_binary(Topic) -> - validate2(words(Topic)); +validate(filter, SharedFilter = <<"$share/", _Rest/binary>>) -> + validate_share(SharedFilter); +validate(filter, Filter) when is_binary(Filter) -> + validate2(words(Filter)); validate(name, Topic) when is_binary(Topic) -> Words = words(Topic), validate2(Words) andalso @@ -122,6 +126,32 @@ validate3(<>) when C == $#; C == $+; C == 0 -> validate3(<<_/utf8, Rest/binary>>) -> validate3(Rest). +validate_share(<<"$share/", Rest/binary>>) when + Rest =:= <<>> orelse Rest =:= <<"/">> +-> + %% MQTT-5.0 [MQTT-4.8.2-1] + error(?SHARE_EMPTY_FILTER); +validate_share(<<"$share/", Rest/binary>>) -> + case binary:split(Rest, <<"/">>) of + %% MQTT-5.0 [MQTT-4.8.2-1] + [<<>>, _] -> + error(?SHARE_EMPTY_GROUP); + %% MQTT-5.0 [MQTT-4.7.3-1] + [_, <<>>] -> + error(?SHARE_EMPTY_FILTER); + [ShareName, Filter] -> + validate_share(ShareName, Filter) + end. + +validate_share(_, <<"$share/", _Rest/binary>>) -> + error(?SHARE_RECURSIVELY); +validate_share(ShareName, Filter) -> + case binary:match(ShareName, [<<"+">>, <<"#">>]) of + %% MQTT-5.0 [MQTT-4.8.2-2] + nomatch -> validate2(words(Filter)); + _ -> error(?SHARE_NAME_INVALID_CHAR) + end. + %% @doc Prepend a topic prefix. %% Ensured to have only one / between prefix and suffix. prepend(undefined, W) -> diff --git a/apps/emqx/test/emqx_topic_SUITE.erl b/apps/emqx/test/emqx_topic_SUITE.erl index cbd7e5a6d..fcbf053ba 100644 --- a/apps/emqx/test/emqx_topic_SUITE.erl +++ b/apps/emqx/test/emqx_topic_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_placeholder.hrl"). -import( @@ -130,14 +131,35 @@ t_validate(_) -> true = validate({filter, <<"x">>}), true = validate({name, <<"x//y">>}), true = validate({filter, <<"sport/tennis/#">>}), + %% MQTT-5.0 [MQTT-4.7.3-1] ?assertError(empty_topic, validate({name, <<>>})), + ?assertError(empty_topic, validate({filter, <<>>})), ?assertError(topic_name_error, validate({name, <<"abc/#">>})), ?assertError(topic_too_long, validate({name, long_topic()})), - ?assertError('topic_invalid_#', validate({filter, <<"abc/#/1">>})), ?assertError(topic_invalid_char, validate({filter, <<"abc/#xzy/+">>})), ?assertError(topic_invalid_char, validate({filter, <<"abc/xzy/+9827">>})), ?assertError(topic_invalid_char, validate({filter, <<"sport/tennis#">>})), - ?assertError('topic_invalid_#', validate({filter, <<"sport/tennis/#/ranking">>})). + %% MQTT-5.0 [MQTT-4.7.1-1] + ?assertError('topic_invalid_#', validate({filter, <<"abc/#/1">>})), + ?assertError('topic_invalid_#', validate({filter, <<"sport/tennis/#/ranking">>})), + %% MQTT-5.0 [MQTT-4.8.2-1] + ?assertError(?SHARE_EMPTY_FILTER, validate({filter, <<"$share/">>})), + ?assertError(?SHARE_EMPTY_FILTER, validate({filter, <<"$share//">>})), + ?assertError(?SHARE_EMPTY_GROUP, validate({filter, <<"$share//t">>})), + ?assertError(?SHARE_EMPTY_GROUP, validate({filter, <<"$share//test">>})), + %% MQTT-5.0 [MQTT-4.7.3-1] for shared-sub + ?assertError(?SHARE_EMPTY_FILTER, validate({filter, <<"$share/g/">>})), + ?assertError(?SHARE_EMPTY_FILTER, validate({filter, <<"$share/g2/">>})), + %% MQTT-5.0 [MQTT-4.8.2-2] + ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/p+q/1">>})), + ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/m+/1">>})), + ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/+n/1">>})), + ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/x#y/1">>})), + ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/x#/1">>})), + ?assertError(?SHARE_NAME_INVALID_CHAR, validate({filter, <<"$share/#y/1">>})), + %% share recursively + ?assertError(?SHARE_RECURSIVELY, validate({filter, <<"$share/g1/$share/t">>})), + true = validate({filter, <<"$share/g1/topic/$share">>}). t_sigle_level_validate(_) -> true = validate({filter, <<"+">>}), From 4ea348308397aae52990632fcdb8e9c777f37a6d Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 7 Jun 2023 16:30:18 +0800 Subject: [PATCH 5/6] perf(topic): use tail recursive to joining a topic --- apps/emqx/src/emqx_topic.erl | 32 +++++++++++++++-------------- apps/emqx/test/emqx_topic_SUITE.erl | 5 ++++- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/apps/emqx/src/emqx_topic.erl b/apps/emqx/src/emqx_topic.erl index c1d91fbf1..62dca99c7 100644 --- a/apps/emqx/src/emqx_topic.erl +++ b/apps/emqx/src/emqx_topic.erl @@ -39,6 +39,11 @@ -type word() :: emqx_types:word(). -type words() :: emqx_types:words(). +%% Guards +-define(MULTI_LEVEL_WILDCARD_NOT_LAST(C, REST), + ((C =:= '#' orelse C =:= <<"#">>) andalso REST =/= []) +). + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -110,7 +115,8 @@ validate2([]) -> % end with '#' validate2(['#']) -> true; -validate2(['#' | Words]) when length(Words) > 0 -> +%% MQTT-5.0 [MQTT-4.7.1-1] +validate2([C | Words]) when ?MULTI_LEVEL_WILDCARD_NOT_LAST(C, Words) -> error('topic_invalid_#'); validate2(['' | Words]) -> validate2(Words); @@ -213,20 +219,16 @@ feed_var(Var, Val, [W | Words], Acc) -> -spec join(list(word())) -> binary(). join([]) -> <<>>; -join([W]) -> - bin(W); -join(Words) -> - {_, Bin} = lists:foldr( - fun - (W, {true, Tail}) -> - {false, <>}; - (W, {false, Tail}) -> - {false, <>} - end, - {true, <<>>}, - [bin(W) || W <- Words] - ), - Bin. +join([Word | Words]) -> + do_join(bin(Word), Words). + +do_join(TopicAcc, []) -> + TopicAcc; +%% MQTT-5.0 [MQTT-4.7.1-1] +do_join(_TopicAcc, [C | Words]) when ?MULTI_LEVEL_WILDCARD_NOT_LAST(C, Words) -> + error('topic_invalid_#'); +do_join(TopicAcc, [Word | Words]) -> + do_join(<>, Words). -spec parse(topic() | {topic(), map()}) -> {topic(), #{share => binary()}}. parse(TopicFilter) when is_binary(TopicFilter) -> diff --git a/apps/emqx/test/emqx_topic_SUITE.erl b/apps/emqx/test/emqx_topic_SUITE.erl index fcbf053ba..521efe751 100644 --- a/apps/emqx/test/emqx_topic_SUITE.erl +++ b/apps/emqx/test/emqx_topic_SUITE.erl @@ -199,7 +199,10 @@ t_join(_) -> ?assertEqual(<<"+//#">>, join(['+', '', '#'])), ?assertEqual(<<"x/y/z/+">>, join([<<"x">>, <<"y">>, <<"z">>, '+'])), ?assertEqual(<<"/ab/cd/ef/">>, join(words(<<"/ab/cd/ef/">>))), - ?assertEqual(<<"ab/+/#">>, join(words(<<"ab/+/#">>))). + ?assertEqual(<<"ab/+/#">>, join(words(<<"ab/+/#">>))), + %% MQTT-5.0 [MQTT-4.7.1-1] + ?assertError('topic_invalid_#', join(['+', <<"a">>, '#', <<"b">>, '', '+'])), + ?assertError('topic_invalid_#', join(['+', <<"c">>, <<"#">>, <<"d">>, '', '+'])). t_systop(_) -> SysTop1 = iolist_to_binary(["$SYS/brokers/", atom_to_list(node()), "/xyz"]), From 7d073c0d3def42bb8ab7e8a0a92ebbaa4fd12726 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 7 Jun 2023 23:06:47 +0800 Subject: [PATCH 6/6] style: make static_checks happy --- rel/i18n/emqx_schema.hocon | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index c6901f937..78dd2ecaa 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -976,15 +976,13 @@ fields_tcp_opts_nodelay.label: """TCP_NODELAY""" fields_tcp_opts_keepalive.desc: -""" -Enable TCP keepalive for MQTT connections over TCP or SSL. +"""Enable TCP keepalive for MQTT connections over TCP or SSL. The value is three comma separated numbers in the format of 'Idle,Interval,Probes' - Idle: The number of seconds a connection needs to be idle before the server begins to send out keep-alive probes (Linux default 7200). - Interval: The number of seconds between TCP keep-alive probes (Linux default 75). - Probes: The maximum number of TCP keep-alive probes to send before giving up and killing the connection if no response is obtained from the other end (Linux default 9). For example "240,30,5" means: EMQX should start sending TCP keepalive probes after the connection is in idle for 240 seconds, and the probes are sent every 30 seconds until a response is received from the MQTT client, if it misses 5 consecutive responses, EMQX should close the connection. -Default: 'none' -""" +Default: 'none'""" fields_tcp_opts_keepalive.label: """TCP keepalive options"""