diff --git a/Makefile b/Makefile index dc8564c38..f322a0a80 100644 --- a/Makefile +++ b/Makefile @@ -15,8 +15,8 @@ endif # Dashbord version # from https://github.com/emqx/emqx-dashboard5 -export EMQX_DASHBOARD_VERSION ?= v1.2.5-1 -export EMQX_EE_DASHBOARD_VERSION ?= e1.0.8-beta.1 +export EMQX_DASHBOARD_VERSION ?= v1.2.6-beta.1 +export EMQX_EE_DASHBOARD_VERSION ?= e1.1.0-beta.2 # `:=` should be used here, otherwise the `$(shell ...)` will be executed every time when the variable is used # In make 4.4+, for backward-compatibility the value from the original environment is used. diff --git a/apps/emqx/src/emqx_cm.hrl b/apps/emqx/include/emqx_cm.hrl similarity index 75% rename from apps/emqx/src/emqx_cm.hrl rename to apps/emqx/include/emqx_cm.hrl index 3896ea30a..ae70f131f 100644 --- a/apps/emqx/src/emqx_cm.hrl +++ b/apps/emqx/include/emqx_cm.hrl @@ -13,9 +13,19 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- + -ifndef(EMQX_CM_HRL). -define(EMQX_CM_HRL, true). +%% Tables for channel management. +-define(CHAN_TAB, emqx_channel). +-define(CHAN_CONN_TAB, emqx_channel_conn). +-define(CHAN_INFO_TAB, emqx_channel_info). +-define(CHAN_LIVE_TAB, emqx_channel_live). + +%% Mria/Mnesia Tables for channel management. +-define(CHAN_REG_TAB, emqx_channel_registry). + -define(T_KICK, 5_000). -define(T_GET_INFO, 5_000). -define(T_TAKEOVER, 15_000). diff --git a/apps/emqx/include/emqx_mqtt.hrl b/apps/emqx/include/emqx_mqtt.hrl index e4f9111da..2bd27339c 100644 --- a/apps/emqx/include/emqx_mqtt.hrl +++ b/apps/emqx/include/emqx_mqtt.hrl @@ -48,6 +48,13 @@ {?MQTT_PROTO_V5, <<"MQTT">>} ]). +%%-------------------------------------------------------------------- +%% MQTT Topic and TopitFilter byte length +%%-------------------------------------------------------------------- + +%% MQTT-3.1.1 and MQTT-5.0 [MQTT-4.7.3-3] +-define(MAX_TOPIC_LEN, 65535). + %%-------------------------------------------------------------------- %% MQTT QoS Levels %%-------------------------------------------------------------------- @@ -662,6 +669,11 @@ end). end ). +-define(SHARE_EMPTY_FILTER, share_subscription_topic_cannot_be_empty). +-define(SHARE_EMPTY_GROUP, share_subscription_group_name_cannot_be_empty). +-define(SHARE_RECURSIVELY, '$share_cannot_be_used_as_real_topic_filter'). +-define(SHARE_NAME_INVALID_CHAR, share_subscription_group_name_cannot_include_wildcard). + -define(FRAME_PARSE_ERROR, frame_parse_error). -define(FRAME_SERIALIZE_ERROR, frame_serialize_error). -define(THROW_FRAME_ERROR(Reason), erlang:throw({?FRAME_PARSE_ERROR, Reason})). diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index b128db61d..6731ea285 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -31,11 +31,11 @@ %% NOTE: ALso make sure to follow the instructions in end of %% `apps/emqx/src/bpapi/README.md' -%% Community edition +%% Opensource edition -define(EMQX_RELEASE_CE, "5.1.0-alpha.3"). %% Enterprise edition -define(EMQX_RELEASE_EE, "5.1.0-alpha.3"). -%% the HTTP API version +%% The HTTP API version -define(EMQX_API_VERSION, "5.0"). diff --git a/apps/emqx/include/emqx_router.hrl b/apps/emqx/include/emqx_router.hrl new file mode 100644 index 000000000..035ff5455 --- /dev/null +++ b/apps/emqx/include/emqx_router.hrl @@ -0,0 +1,31 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_ROUTER_HRL). +-define(EMQX_ROUTER_HRL, true). + +%% ETS table for message routing +-define(ROUTE_TAB, emqx_route). + +%% Mnesia table for message routing +-define(ROUTING_NODE, emqx_routing_node). + +%% ETS tables for PubSub +-define(SUBOPTION, emqx_suboption). +-define(SUBSCRIBER, emqx_subscriber). +-define(SUBSCRIPTION, emqx_subscription). + +-endif. diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 5f7c4aaf5..1bbe4dc82 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -19,6 +19,8 @@ -behaviour(gen_server). -include("emqx.hrl"). +-include("emqx_router.hrl"). + -include("logger.hrl"). -include("types.hrl"). -include("emqx_mqtt.hrl"). @@ -80,11 +82,6 @@ -define(BROKER, ?MODULE). -%% ETS tables for PubSub --define(SUBOPTION, emqx_suboption). --define(SUBSCRIBER, emqx_subscriber). --define(SUBSCRIPTION, emqx_subscription). - %% Guards -define(IS_SUBID(Id), (is_binary(Id) orelse is_atom(Id))). @@ -106,10 +103,10 @@ start_link(Pool, Id) -> create_tabs() -> TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}], - %% SubOption: {Topic, SubPid} -> SubOption + %% SubOption: {TopicFilter, SubPid} -> SubOption ok = emqx_utils_ets:new(?SUBOPTION, [ordered_set | TabOpts]), - %% Subscription: SubPid -> Topic1, Topic2, Topic3, ... + %% Subscription: SubPid -> TopicFilter1, TopicFilter2, TopicFilter3, ... %% duplicate_bag: o(1) insert ok = emqx_utils_ets:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]), diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 5637bb171..79c9ba528 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -484,13 +484,13 @@ handle_in( {ok, Channel} end; handle_in( - Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), + SubPkt = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), Channel = #channel{clientinfo = ClientInfo} ) -> - case emqx_packet:check(Packet) of + case emqx_packet:check(SubPkt) of ok -> TopicFilters0 = parse_topic_filters(TopicFilters), - TopicFilters1 = put_subid_in_subopts(Properties, TopicFilters0), + TopicFilters1 = enrich_subopts_subid(Properties, TopicFilters0), TupleTopicFilters0 = check_sub_authzs(TopicFilters1, Channel), HasAuthzDeny = lists:any( fun({_TopicFilter, ReasonCode}) -> @@ -503,7 +503,10 @@ handle_in( true -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel); false -> - TopicFilters2 = [TopicFilter || {TopicFilter, 0} <- TupleTopicFilters0], + TopicFilters2 = [ + TopicFilter + || {TopicFilter, ?RC_SUCCESS} <- TupleTopicFilters0 + ], TopicFilters3 = run_hooks( 'client.subscribe', [ClientInfo, Properties], @@ -1866,6 +1869,9 @@ check_pub_caps( %%-------------------------------------------------------------------- %% Check Sub Authorization +%% TODO: not only check topic filter. Qos chould be checked too. +%% Not implemented yet: +%% MQTT-3.1.1 [MQTT-3.8.4-6] and MQTT-5.0 [MQTT-3.8.4-7] check_sub_authzs(TopicFilters, Channel) -> check_sub_authzs(TopicFilters, Channel, []). @@ -1876,7 +1882,7 @@ check_sub_authzs( ) -> case emqx_access_control:authorize(ClientInfo, subscribe, Topic) of allow -> - check_sub_authzs(More, Channel, [{TopicFilter, 0} | Acc]); + check_sub_authzs(More, Channel, [{TopicFilter, ?RC_SUCCESS} | Acc]); deny -> check_sub_authzs(More, Channel, [{TopicFilter, ?RC_NOT_AUTHORIZED} | Acc]) end; @@ -1892,9 +1898,9 @@ check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = ClientInfo}) -> %%-------------------------------------------------------------------- %% Enrich SubId -put_subid_in_subopts(#{'Subscription-Identifier' := SubId}, TopicFilters) -> +enrich_subopts_subid(#{'Subscription-Identifier' := SubId}, TopicFilters) -> [{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters]; -put_subid_in_subopts(_Properties, TopicFilters) -> +enrich_subopts_subid(_Properties, TopicFilters) -> TopicFilters. %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index db3d48f5d..c193cea44 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -20,6 +20,7 @@ -behaviour(gen_server). -include("emqx.hrl"). +-include("emqx_cm.hrl"). -include("logger.hrl"). -include("types.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -118,14 +119,6 @@ _Stats :: emqx_types:stats() }. --include("emqx_cm.hrl"). - -%% Tables for channel management. --define(CHAN_TAB, emqx_channel). --define(CHAN_CONN_TAB, emqx_channel_conn). --define(CHAN_INFO_TAB, emqx_channel_info). --define(CHAN_LIVE_TAB, emqx_channel_live). - -define(CHAN_STATS, [ {?CHAN_TAB, 'channels.count', 'channels.max'}, {?CHAN_TAB, 'sessions.count', 'sessions.max'}, @@ -669,12 +662,12 @@ lookup_client({username, Username}) -> MatchSpec = [ {{'_', #{clientinfo => #{username => '$1'}}, '_'}, [{'=:=', '$1', Username}], ['$_']} ], - ets:select(emqx_channel_info, MatchSpec); + ets:select(?CHAN_INFO_TAB, MatchSpec); lookup_client({clientid, ClientId}) -> [ Rec - || Key <- ets:lookup(emqx_channel, ClientId), - Rec <- ets:lookup(emqx_channel_info, Key) + || Key <- ets:lookup(?CHAN_TAB, ClientId), + Rec <- ets:lookup(?CHAN_INFO_TAB, Key) ]. %% @private diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index 8a5639486..058bb53ec 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -20,6 +20,7 @@ -behaviour(gen_server). -include("emqx.hrl"). +-include("emqx_cm.hrl"). -include("logger.hrl"). -include("types.hrl"). @@ -50,7 +51,6 @@ ]). -define(REGISTRY, ?MODULE). --define(TAB, emqx_channel_registry). -define(LOCK, {?MODULE, cleanup_down}). -record(channel, {chid, pid}). @@ -78,7 +78,7 @@ register_channel(ClientId) when is_binary(ClientId) -> register_channel({ClientId, self()}); register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> case is_enabled() of - true -> mria:dirty_write(?TAB, record(ClientId, ChanPid)); + true -> mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid)); false -> ok end. @@ -91,14 +91,14 @@ unregister_channel(ClientId) when is_binary(ClientId) -> unregister_channel({ClientId, self()}); unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> case is_enabled() of - true -> mria:dirty_delete_object(?TAB, record(ClientId, ChanPid)); + true -> mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid)); false -> ok end. %% @doc Lookup the global channels. -spec lookup_channels(emqx_types:clientid()) -> list(pid()). lookup_channels(ClientId) -> - [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(?TAB, ClientId)]. + [ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(?CHAN_REG_TAB, ClientId)]. record(ClientId, ChanPid) -> #channel{chid = ClientId, pid = ChanPid}. @@ -109,7 +109,7 @@ record(ClientId, ChanPid) -> init([]) -> mria_config:set_dirty_shard(?CM_SHARD, true), - ok = mria:create_table(?TAB, [ + ok = mria:create_table(?CHAN_REG_TAB, [ {type, bag}, {rlog_shard, ?CM_SHARD}, {storage, ram_copies}, @@ -166,7 +166,7 @@ cleanup_channels(Node) -> do_cleanup_channels(Node) -> Pat = [{#channel{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}], - lists:foreach(fun delete_channel/1, mnesia:select(?TAB, Pat, write)). + lists:foreach(fun delete_channel/1, mnesia:select(?CHAN_REG_TAB, Pat, write)). delete_channel(Chan) -> - mnesia:delete_object(?TAB, Chan, write). + mnesia:delete_object(?CHAN_REG_TAB, Chan, write). diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index 080172c7b..fe3d6f6ef 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -91,7 +91,7 @@ -export([ensure_atom_conf_path/2]). -ifdef(TEST). --export([erase_all/0]). +-export([erase_all/0, backup_and_write/2]). -endif. -include("logger.hrl"). @@ -105,6 +105,7 @@ -define(LISTENER_CONF_PATH(TYPE, LISTENER, PATH), [listeners, TYPE, LISTENER | PATH]). -define(CONFIG_NOT_FOUND_MAGIC, '$0tFound'). +-define(MAX_KEEP_BACKUP_CONFIGS, 10). -export_type([ update_request/0, @@ -597,43 +598,94 @@ save_to_config_map(Conf, RawConf) -> -spec save_to_override_conf(boolean(), raw_config(), update_opts()) -> ok | {error, term()}. save_to_override_conf(_, undefined, _) -> ok; -%% TODO: Remove deprecated override conf file when 5.1 save_to_override_conf(true, RawConf, Opts) -> case deprecated_conf_file(Opts) of undefined -> ok; FileName -> - ok = filelib:ensure_dir(FileName), - case file:write_file(FileName, hocon_pp:do(RawConf, #{})) of - ok -> - ok; - {error, Reason} -> - ?SLOG(error, #{ - msg => "failed_to_write_override_file", - filename => FileName, - reason => Reason - }), - {error, Reason} - end + backup_and_write(FileName, hocon_pp:do(RawConf, #{})) end; save_to_override_conf(false, RawConf, _Opts) -> case cluster_hocon_file() of undefined -> ok; FileName -> - ok = filelib:ensure_dir(FileName), - case file:write_file(FileName, hocon_pp:do(RawConf, #{})) of + backup_and_write(FileName, hocon_pp:do(RawConf, #{})) + end. + +%% @private This is the same human-readable timestamp format as +%% hocon-cli generated app.