From 31f016fa225b532260f0cbbfd747de7802f84836 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 19 Jul 2021 09:55:59 +0800 Subject: [PATCH] refactor(config): remove emqx_zone --- apps/emqx/src/emqx_congestion.erl | 10 +- apps/emqx/src/emqx_kernel_sup.erl | 1 - apps/emqx/src/emqx_limiter.erl | 8 +- apps/emqx/src/emqx_types.erl | 2 +- apps/emqx/src/emqx_zone.erl | 298 ------------------ apps/emqx/test/emqx_zone_SUITE.erl | 104 ------ apps/emqx_authn/test/emqx_authn_SUITE.erl | 4 +- .../test/emqx_authn_mnesia_SUITE.erl | 2 - .../src/exproto/emqx_exproto_channel.erl | 5 +- 9 files changed, 15 insertions(+), 419 deletions(-) delete mode 100644 apps/emqx/src/emqx_zone.erl delete mode 100644 apps/emqx/test/emqx_zone_SUITE.erl diff --git a/apps/emqx/src/emqx_congestion.erl b/apps/emqx/src/emqx_congestion.erl index 4ec20034d..f7e30c1da 100644 --- a/apps/emqx/src/emqx_congestion.erl +++ b/apps/emqx/src/emqx_congestion.erl @@ -55,8 +55,9 @@ cancel_alarms(Socket, Transport, Channel) -> end, ?ALL_ALARM_REASONS). is_alarm_enabled(Channel) -> - emqx_zone:get_env(emqx_channel:info(zone, Channel), - conn_congestion_alarm_enabled, false). + Zone = emqx_channel:info(zone, Channel), + Listener = emqx_channel:info(listener, Channel), + emqx_config:get_listener_conf(Zone, Listener, [conn_congestion, enable_alarm]). alarm_congestion(Socket, Transport, Channel, Reason) -> case has_alarm_sent(Reason) of @@ -68,8 +69,9 @@ alarm_congestion(Socket, Transport, Channel, Reason) -> cancel_alarm_congestion(Socket, Transport, Channel, Reason) -> Zone = emqx_channel:info(zone, Channel), - WontClearIn = emqx_zone:get_env(Zone, conn_congestion_min_alarm_sustain_duration, - ?WONT_CLEAR_IN), + Listener = emqx_channel:info(listener, Channel), + WontClearIn = emqx_config:get_listener_conf(Zone, Listener, [conn_congestion, + min_alarm_sustain_duration]), case has_alarm_sent(Reason) andalso long_time_since_last_alarm(Reason, WontClearIn) of true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason); false -> ok diff --git a/apps/emqx/src/emqx_kernel_sup.erl b/apps/emqx/src/emqx_kernel_sup.erl index 1c6b0617e..5ca283481 100644 --- a/apps/emqx/src/emqx_kernel_sup.erl +++ b/apps/emqx/src/emqx_kernel_sup.erl @@ -35,7 +35,6 @@ init([]) -> , child_spec(emqx_stats, worker) , child_spec(emqx_metrics, worker) , child_spec(emqx_ctl, worker) - , child_spec(emqx_zone, worker) ]}}. child_spec(M, Type) -> diff --git a/apps/emqx/src/emqx_limiter.erl b/apps/emqx/src/emqx_limiter.erl index 181e5c6bf..b4cf745ff 100644 --- a/apps/emqx/src/emqx_limiter.erl +++ b/apps/emqx/src/emqx_limiter.erl @@ -27,7 +27,7 @@ -record(limiter, { %% Zone - zone :: emqx_zone:zone(), + zone :: atom(), %% Checkers checkers :: [checker()] }). @@ -35,7 +35,7 @@ -type(checker() :: #{ name := name() , capacity := non_neg_integer() , interval := non_neg_integer() - , consumer := esockd_rate_limit:bucket() | emqx_zone:zone() + , consumer := esockd_rate_limit:bucket() | atom() }). -type(name() :: conn_bytes_in @@ -59,7 +59,7 @@ %% APIs %%-------------------------------------------------------------------- --spec(init(emqx_zone:zone(), +-spec(init(atom(), maybe(esockd_rate_limit:config()), maybe(esockd_rate_limit:config()), policy()) -> maybe(limiter())). @@ -69,7 +69,7 @@ init(Zone, PubLimit, BytesIn, Specs) -> Filtered = maps:filter(fun(_, V) -> V /= undefined end, Merged), init(Zone, maps:to_list(Filtered)). --spec(init(emqx_zone:zone(), policy()) -> maybe(limiter())). +-spec(init(atom(), policy()) -> maybe(limiter())). init(_Zone, []) -> undefined; init(Zone, Specs) -> diff --git a/apps/emqx/src/emqx_types.erl b/apps/emqx/src/emqx_types.erl index 09ec54b9d..0b99b40eb 100644 --- a/apps/emqx/src/emqx_types.erl +++ b/apps/emqx/src/emqx_types.erl @@ -101,7 +101,7 @@ qos1 | at_least_once | qos2 | exactly_once). --type(zone() :: emqx_zone:zone()). +-type(zone() :: atom()). -type(pubsub() :: publish | subscribe). -type(topic() :: emqx_topic:topic()). -type(subid() :: binary() | atom()). diff --git a/apps/emqx/src/emqx_zone.erl b/apps/emqx/src/emqx_zone.erl deleted file mode 100644 index 459c36764..000000000 --- a/apps/emqx/src/emqx_zone.erl +++ /dev/null @@ -1,298 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_zone). - --behaviour(gen_server). - --include("emqx.hrl"). --include("emqx_mqtt.hrl"). --include("logger.hrl"). --include("types.hrl"). - --logger_header("[Zone]"). - --compile({inline, - [ idle_timeout/1 - , publish_limit/1 - , ratelimit/1 - , mqtt_frame_options/1 - , mqtt_strict_mode/1 - , max_packet_size/1 - , mountpoint/1 - , use_username_as_clientid/1 - , stats_timer/1 - , enable_stats/1 - , enable_acl/1 - , enable_ban/1 - , enable_flapping_detect/1 - , ignore_loop_deliver/1 - , server_keepalive/1 - , keepalive_backoff/1 - , max_inflight/1 - , session_expiry_interval/1 - , force_gc_policy/1 - , force_shutdown_policy/1 - , response_information/1 - , quota_policy/1 - , get_env/2 - , get_env/3 - ]}). - -%% APIs --export([start_link/0, stop/0]). - -%% Zone Option API --export([ idle_timeout/1 - %% XXX: Dedeprecated at v4.2 - , publish_limit/1 - , ratelimit/1 - , mqtt_frame_options/1 - , mqtt_strict_mode/1 - , max_packet_size/1 - , mountpoint/1 - , use_username_as_clientid/1 - , stats_timer/1 - , enable_stats/1 - , enable_acl/1 - , enable_ban/1 - , enable_flapping_detect/1 - , ignore_loop_deliver/1 - , server_keepalive/1 - , keepalive_backoff/1 - , max_inflight/1 - , session_expiry_interval/1 - , force_gc_policy/1 - , force_shutdown_policy/1 - , response_information/1 - , quota_policy/1 - ]). - --export([ init_gc_state/1 - , oom_policy/1 - ]). - -%% Zone API --export([ get_env/2 - , get_env/3 - , set_env/3 - , unset_env/2 - , unset_all_env/0 - ]). - --export([force_reload/0]). - -%% gen_server callbacks --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). - --import(emqx_misc, [maybe_apply/2]). - --export_type([zone/0]). - --type(zone() :: atom()). - --define(TAB, ?MODULE). --define(SERVER, ?MODULE). --define(DEFAULT_IDLE_TIMEOUT, 30000). --define(KEY(Zone, Key), {?MODULE, Zone, Key}). - --spec(start_link() -> startlink_ret()). -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - --spec(stop() -> ok). -stop() -> - gen_server:stop(?SERVER). - --spec(init_gc_state(zone()) -> maybe(emqx_gc:gc_state())). -init_gc_state(Zone) -> - maybe_apply(fun emqx_gc:init/1, force_gc_policy(Zone)). - --spec(oom_policy(zone()) -> emqx_types:oom_policy()). -oom_policy(Zone) -> force_shutdown_policy(Zone). - -%%-------------------------------------------------------------------- -%% Zone Options API -%%-------------------------------------------------------------------- - --spec(idle_timeout(zone()) -> pos_integer()). -idle_timeout(Zone) -> - get_env(Zone, idle_timeout, ?DEFAULT_IDLE_TIMEOUT). - --spec(publish_limit(zone()) -> maybe(esockd_rate_limit:config())). -publish_limit(Zone) -> - get_env(Zone, publish_limit). - --spec(ratelimit(zone()) -> [emqx_limiter:specs()]). -ratelimit(Zone) -> - get_env(Zone, ratelimit, []). - --spec(mqtt_frame_options(zone()) -> emqx_frame:options()). -mqtt_frame_options(Zone) -> - #{strict_mode => mqtt_strict_mode(Zone), - max_size => max_packet_size(Zone) - }. - --spec(mqtt_strict_mode(zone()) -> boolean()). -mqtt_strict_mode(Zone) -> - get_env(Zone, strict_mode, false). - --spec(max_packet_size(zone()) -> integer()). -max_packet_size(Zone) -> - get_env(Zone, max_packet_size, ?MAX_PACKET_SIZE). - --spec(mountpoint(zone()) -> maybe(emqx_mountpoint:mountpoint())). -mountpoint(Zone) -> get_env(Zone, mountpoint). - --spec(use_username_as_clientid(zone()) -> boolean()). -use_username_as_clientid(Zone) -> - get_env(Zone, use_username_as_clientid, false). - --spec(stats_timer(zone()) -> undefined | disabled). -stats_timer(Zone) -> - case enable_stats(Zone) of true -> undefined; false -> disabled end. - --spec(enable_stats(zone()) -> boolean()). -enable_stats(Zone) -> - get_env(Zone, enable_stats, true). - --spec(enable_acl(zone()) -> boolean()). -enable_acl(Zone) -> - get_env(Zone, enable_acl, true). - --spec(enable_ban(zone()) -> boolean()). -enable_ban(Zone) -> - get_env(Zone, enable_ban, false). - --spec(enable_flapping_detect(zone()) -> boolean()). -enable_flapping_detect(Zone) -> - get_env(Zone, enable_flapping_detect, false). - --spec(ignore_loop_deliver(zone()) -> boolean()). -ignore_loop_deliver(Zone) -> - get_env(Zone, ignore_loop_deliver, false). - --spec(server_keepalive(zone()) -> maybe(pos_integer())). -server_keepalive(Zone) -> - get_env(Zone, server_keepalive). - --spec(keepalive_backoff(zone()) -> float()). -keepalive_backoff(Zone) -> - get_env(Zone, keepalive_backoff, 0.75). - --spec(max_inflight(zone()) -> 0..65535). -max_inflight(Zone) -> - get_env(Zone, max_inflight, 65535). - --spec(session_expiry_interval(zone()) -> non_neg_integer()). -session_expiry_interval(Zone) -> - get_env(Zone, session_expiry_interval, 0). - --spec(force_gc_policy(zone()) -> maybe(emqx_gc:opts())). -force_gc_policy(Zone) -> - get_env(Zone, force_gc_policy). - --spec(force_shutdown_policy(zone()) -> maybe(emqx_oom:opts())). -force_shutdown_policy(Zone) -> - get_env(Zone, force_shutdown_policy). - --spec(response_information(zone()) -> string()). -response_information(Zone) -> - get_env(Zone, response_information). - --spec(quota_policy(zone()) -> emqx_quota:policy()). -quota_policy(Zone) -> - get_env(Zone, quota, []). - -%%-------------------------------------------------------------------- -%% APIs -%%-------------------------------------------------------------------- - --spec(get_env(maybe(zone()), atom()) -> maybe(term())). -get_env(undefined, Key) -> emqx:get_env(Key); -get_env(Zone, Key) -> - get_env(Zone, Key, undefined). - --spec(get_env(maybe(zone()), atom(), term()) -> maybe(term())). -get_env(undefined, Key, Def) -> - emqx:get_env(Key, Def); -get_env(Zone, Key, Def) -> - try persistent_term:get(?KEY(Zone, Key)) - catch error:badarg -> - emqx:get_env(Key, Def) - end. - --spec(set_env(zone(), atom(), term()) -> ok). -set_env(Zone, Key, Val) -> - persistent_term:put(?KEY(Zone, Key), Val). - --spec(unset_env(zone(), atom()) -> boolean()). -unset_env(Zone, Key) -> - persistent_term:erase(?KEY(Zone, Key)). - --spec(unset_all_env() -> ok). -unset_all_env() -> - [unset_env(Zone, Key) || {?KEY(Zone, Key), _Val} <- persistent_term:get()], - ok. - --spec(force_reload() -> ok). -force_reload() -> - gen_server:call(?SERVER, force_reload). - -%%-------------------------------------------------------------------- -%% gen_server callbacks -%%-------------------------------------------------------------------- - -init([]) -> - _ = do_reload(), - {ok, #{}}. - -handle_call(force_reload, _From, State) -> - _ = do_reload(), - {reply, ok, State}; - -handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), - {reply, ignored, State}. - -handle_cast(Msg, State) -> - ?LOG(error, "Unexpected cast: ~p", [Msg]), - {noreply, State}. - -handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -do_reload() -> - [persistent_term:put(?KEY(Zone, Key), Val) - || {Zone, Opts} <- emqx:get_env(zones, []), {Key, Val} <- Opts]. - diff --git a/apps/emqx/test/emqx_zone_SUITE.erl b/apps/emqx/test/emqx_zone_SUITE.erl deleted file mode 100644 index 8294ac0da..000000000 --- a/apps/emqx/test/emqx_zone_SUITE.erl +++ /dev/null @@ -1,104 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_zone_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("eunit/include/eunit.hrl"). - --define(ENVS, [{use_username_as_clientid, false}, - {server_keepalive, 60}, - {upgrade_qos, false}, - {session_expiry_interval, 7200}, - {retry_interval, 20}, - {mqueue_store_qos0, true}, - {mqueue_priorities, none}, - {mqueue_default_priority, highest}, - {max_subscriptions, 0}, - {max_mqueue_len, 1000}, - {max_inflight, 32}, - {max_awaiting_rel, 100}, - {keepalive_backoff, 0.75}, - {ignore_loop_deliver, false}, - {idle_timeout, 15000}, - {force_shutdown_policy, #{max_heap_size => 838860800, - message_queue_len => 8000}}, - {force_gc_policy, #{bytes => 1048576, count => 1000}}, - {enable_stats, true}, - {enable_flapping_detect, false}, - {enable_ban, true}, - {enable_acl, true}, - {await_rel_timeout, 300}, - {acl_deny_action, ignore} - ]). - -all() -> emqx_ct:all(?MODULE). - -init_per_suite(Config) -> - _ = application:load(emqx), - application:set_env(emqx, zone_env, val), - application:set_env(emqx, zones, [{zone, ?ENVS}]), - Config. - -end_per_suite(_Config) -> - emqx_zone:unset_all_env(), - application:unset_env(emqx, zone_env), - application:unset_env(emqx, zones). - -t_zone_env_func(_) -> - lists:foreach(fun({Env, Val}) -> - case erlang:function_exported(emqx_zone, Env, 1) of - true -> - ?assertEqual(Val, erlang:apply(emqx_zone, Env, [zone])); - false -> ok - end - end, ?ENVS). - -t_get_env(_) -> - ?assertEqual(val, emqx_zone:get_env(undefined, zone_env)), - ?assertEqual(val, emqx_zone:get_env(undefined, zone_env, def)), - ?assert(emqx_zone:get_env(zone, enable_acl)), - ?assert(emqx_zone:get_env(zone, enable_ban)), - ?assertEqual(defval, emqx_zone:get_env(extenal, key, defval)), - ?assertEqual(undefined, emqx_zone:get_env(external, key)), - ?assertEqual(undefined, emqx_zone:get_env(internal, key)), - ?assertEqual(def, emqx_zone:get_env(internal, key, def)). - -t_get_set_env(_) -> - ok = emqx_zone:set_env(zone, key, val), - ?assertEqual(val, emqx_zone:get_env(zone, key)), - true = emqx_zone:unset_env(zone, key), - ?assertEqual(undefined, emqx_zone:get_env(zone, key)). - -t_force_reload(_) -> - {ok, _} = emqx_zone:start_link(), - ?assertEqual(undefined, emqx_zone:get_env(xzone, key)), - application:set_env(emqx, zones, [{xzone, [{key, val}]}]), - ok = emqx_zone:force_reload(), - ?assertEqual(val, emqx_zone:get_env(xzone, key)), - emqx_zone:stop(). - -t_uncovered_func(_) -> - {ok, Pid} = emqx_zone:start_link(), - ignored = gen_server:call(Pid, unexpected_call), - ok = gen_server:cast(Pid, unexpected_cast), - ok = Pid ! ok, - emqx_zone:stop(). - -t_frame_options(_) -> - ?assertMatch(#{strict_mode := _, max_size := _ }, emqx_zone:mqtt_frame_options(zone)). diff --git a/apps/emqx_authn/test/emqx_authn_SUITE.erl b/apps/emqx_authn/test/emqx_authn_SUITE.erl index 1cf607bf2..d8bffdf59 100644 --- a/apps/emqx_authn/test/emqx_authn_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_SUITE.erl @@ -96,8 +96,8 @@ t_authenticator(_) -> ok. t_authenticate(_) -> - ?assertEqual(false, emqx_zone:get_env(external, bypass_auth_plugins, false)), - ClientInfo = #{zone => external, + ClientInfo = #{zone => default, + listener => mqtt_tcp, username => <<"myuser">>, password => <<"mypass">>}, ?assertEqual(ok, emqx_access_control:authenticate(ClientInfo)), diff --git a/apps/emqx_authn/test/emqx_authn_mnesia_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mnesia_SUITE.erl index 5a0fe06b3..899d2c24c 100644 --- a/apps/emqx_authn/test/emqx_authn_mnesia_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mnesia_SUITE.erl @@ -64,8 +64,6 @@ t_mnesia_authenticator(_) -> ?assertEqual({ok, #{user_id => <<"myuser">>}}, ?AUTH:add_user(?CHAIN, AuthenticatorName, UserInfo)), ?assertEqual({ok, #{user_id => <<"myuser">>}}, ?AUTH:lookup_user(?CHAIN, AuthenticatorName, <<"myuser">>)), - ?assertEqual(false, emqx_zone:get_env(external, bypass_auth_plugins, false)), - ClientInfo = #{zone => external, username => <<"myuser">>, password => <<"mypass">>}, diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index 9f2e9c364..f3b7c9b8a 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -446,9 +446,8 @@ do_unsubscribe(TopicFilter, UnSubOpts, Channel = parse_topic_filters(TopicFilters) -> lists:map(fun emqx_topic:parse/1, TopicFilters). --compile({inline, [is_acl_enabled/1]}). -is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) -> - (not IsSuperuser) andalso emqx_zone:enable_acl(Zone). +is_acl_enabled(#{zone := Zone, listener := Listener, is_superuser := IsSuperuser}) -> + (not IsSuperuser) andalso emqx_config:get_listener_conf(Zone, Listener, [acl, enable]). %%-------------------------------------------------------------------- %% Ensure & Hooks