From e6424d63d896b06510ab5b81f4875d6a700c3219 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 19 Jul 2021 14:57:47 +0800 Subject: [PATCH] refactor(config): don't allow inheritable config entries --- apps/emqx/src/emqx_acl_cache.erl | 6 +-- apps/emqx/src/emqx_channel.erl | 14 +++--- apps/emqx/src/emqx_config.erl | 43 +++++++++++++------ apps/emqx/src/emqx_connection.erl | 18 ++++---- apps/emqx/src/emqx_flapping.erl | 2 +- apps/emqx/src/emqx_listeners.erl | 3 +- apps/emqx/src/emqx_mqtt_caps.erl | 2 +- apps/emqx/src/emqx_session.erl | 2 +- apps/emqx/src/emqx_ws_connection.erl | 26 +++++------ apps/emqx/test/emqx_access_control_SUITE.erl | 2 +- apps/emqx/test/emqx_acl_cache_SUITE.erl | 2 +- apps/emqx/test/emqx_channel_SUITE.erl | 14 +++--- apps/emqx/test/emqx_client_SUITE.erl | 8 ++-- apps/emqx/test/emqx_flapping_SUITE.erl | 2 +- apps/emqx/test/emqx_mqtt_caps_SUITE.erl | 12 +++--- .../emqx/test/emqx_mqtt_protocol_v5_SUITE.erl | 36 ++++++++-------- 16 files changed, 103 insertions(+), 89 deletions(-) diff --git a/apps/emqx/src/emqx_acl_cache.erl b/apps/emqx/src/emqx_acl_cache.erl index d4c7cfbdb..f10aab711 100644 --- a/apps/emqx/src/emqx_acl_cache.erl +++ b/apps/emqx/src/emqx_acl_cache.erl @@ -52,15 +52,15 @@ drain_k() -> {?MODULE, drain_timestamp}. -spec(is_enabled(atom(), atom()) -> boolean()). is_enabled(Zone, Listener) -> - emqx_config:get_listener_conf(Zone, Listener, [acl, cache, enable]). + emqx_config:get_zone_conf(Zone, [acl, cache, enable]). -spec(get_cache_max_size(atom(), atom()) -> integer()). get_cache_max_size(Zone, Listener) -> - emqx_config:get_listener_conf(Zone, Listener, [acl, cache, max_size]). + emqx_config:get_zone_conf(Zone, [acl, cache, max_size]). -spec(get_cache_ttl(atom(), atom()) -> integer()). get_cache_ttl(Zone, Listener) -> - emqx_config:get_listener_conf(Zone, Listener, [acl, cache, ttl]). + emqx_config:get_zone_conf(Zone, [acl, cache, ttl]). -spec(list_acl_cache(atom(), atom()) -> [acl_cache_entry()]). list_acl_cache(Zone, Listener) -> diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 4d9b8ae12..ee615b944 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -210,7 +210,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}, <<>> -> undefined; MP -> MP end, - QuotaPolicy = emqx_config:get_listener_conf(Zone, Listener, [rate_limit, quota]), + QuotaPolicy = emqx_config:get_zone_conf(Zone, [rate_limit, quota]), ClientInfo = set_peercert_infos( Peercert, #{zone => Zone, @@ -435,7 +435,7 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), HasAclDeny = lists:any(fun({_TopicFilter, ReasonCode}) -> ReasonCode =:= ?RC_NOT_AUTHORIZED end, TupleTopicFilters0), - DenyAction = emqx_config:get_listener_conf(Zone, Listener, [acl, deny_action]), + DenyAction = emqx_config:get_zone_conf(Zone, [acl, deny_action]), case DenyAction =:= disconnect andalso HasAclDeny of true -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel); false -> @@ -551,7 +551,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), {error, Rc = ?RC_NOT_AUTHORIZED, NChannel} -> ?LOG(warning, "Cannot publish message to ~s due to ~s.", [Topic, emqx_reason_codes:text(Rc)]), - case emqx_config:get_listener_conf(Zone, Listener, [acl_deny_action]) of + case emqx_config:get_zone_conf(Zone, [acl_deny_action]) of ignore -> case QoS of ?QOS_0 -> {ok, NChannel}; @@ -997,7 +997,7 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = ConnState, clientinfo = ClientInfo = #{zone := Zone, listener := Listener}}) when ConnState =:= connected orelse ConnState =:= reauthenticating -> - emqx_config:get_listener_conf(Zone, Listener, [flapping_detect, enable]) + emqx_config:get_zone_conf(Zone, [flapping_detect, enable]) andalso emqx_flapping:detect(ClientInfo), Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)), case maybe_shutdown(Reason, Channel1) of @@ -1625,7 +1625,7 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) -> %%-------------------------------------------------------------------- %% Is ACL enabled? is_acl_enabled(#{zone := Zone, listener := Listener, is_superuser := IsSuperuser}) -> - (not IsSuperuser) andalso emqx_config:get_listener_conf(Zone, Listener, [acl, enable]). + (not IsSuperuser) andalso emqx_config:get_zone_conf(Zone, [acl, enable]). %%-------------------------------------------------------------------- %% Parse Topic Filters @@ -1737,10 +1737,10 @@ flag(true) -> 1; flag(false) -> 0. get_mqtt_conf(Zone, Listener, Key) -> - emqx_config:get_listener_conf(Zone, Listener, [mqtt, Key]). + emqx_config:get_zone_conf(Zone, [mqtt, Key]). get_mqtt_conf(Zone, Listener, Key, Default) -> - emqx_config:get_listener_conf(Zone, Listener, [mqtt, Key], Default). + emqx_config:get_zone_conf(Zone, [mqtt, Key], Default). %%-------------------------------------------------------------------- %% For CT tests diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index 64895fbc8..b2a3eaf74 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -25,6 +25,12 @@ , put/2 ]). +-export([ get_zone_conf/2 + , get_zone_conf/3 + , put_zone_conf/3 + , find_zone_conf/2 + ]). + -export([ get_listener_conf/3 , get_listener_conf/4 , put_listener_conf/4 @@ -44,6 +50,8 @@ -define(CONF, ?MODULE). -define(RAW_CONF, {?MODULE, raw}). +-define(ZONE_CONF_PATH(ZONE, PATH), [zones, ZONE | PATH]). +-define(LISTENER_CONF_PATH(ZONE, LISTENER, PATH), [zones, ZONE, listeners, LISTENER | PATH]). -export_type([update_request/0, raw_config/0, config/0]). -type update_request() :: term(). @@ -67,32 +75,39 @@ get(KeyPath, Default) -> find(KeyPath) -> emqx_map_lib:deep_find(KeyPath, get()). +-spec get_zone_conf(atom(), emqx_map_lib:config_key_path()) -> term(). +get_zone_conf(Zone, KeyPath) -> + ?MODULE:get(?ZONE_CONF_PATH(Zone, KeyPath)). + +-spec get_zone_conf(atom(), emqx_map_lib:config_key_path(), term()) -> term(). +get_zone_conf(Zone, KeyPath, Default) -> + ?MODULE:get(?ZONE_CONF_PATH(Zone, KeyPath), Default). + +-spec put_zone_conf(atom(), emqx_map_lib:config_key_path(), term()) -> ok. +put_zone_conf(Zone, KeyPath, Conf) -> + ?MODULE:put(?ZONE_CONF_PATH(Zone, KeyPath), Conf). + +-spec find_zone_conf(atom(), emqx_map_lib:config_key_path()) -> + {ok, term()} | {not_found, emqx_map_lib:config_key_path(), term()}. +find_zone_conf(Zone, KeyPath) -> + find(?ZONE_CONF_PATH(Zone, KeyPath)). + -spec get_listener_conf(atom(), atom(), emqx_map_lib:config_key_path()) -> term(). get_listener_conf(Zone, Listener, KeyPath) -> - case find_listener_conf(Zone, Listener, KeyPath) of - {not_found, SubKeyPath, Data} -> error({not_found, SubKeyPath, Data}); - {ok, Data} -> Data - end. + ?MODULE:get(?LISTENER_CONF_PATH(Zone, Listener, KeyPath)). -spec get_listener_conf(atom(), atom(), emqx_map_lib:config_key_path(), term()) -> term(). get_listener_conf(Zone, Listener, KeyPath, Default) -> - case find_listener_conf(Zone, Listener, KeyPath) of - {not_found, _, _} -> Default; - {ok, Data} -> Data - end. + ?MODULE:get(?LISTENER_CONF_PATH(Zone, Listener, KeyPath), Default). -spec put_listener_conf(atom(), atom(), emqx_map_lib:config_key_path(), term()) -> ok. put_listener_conf(Zone, Listener, KeyPath, Conf) -> - ?MODULE:put([zones, Zone, listeners, Listener | KeyPath], Conf). + ?MODULE:put(?LISTENER_CONF_PATH(Zone, Listener, KeyPath), Conf). -spec find_listener_conf(atom(), atom(), emqx_map_lib:config_key_path()) -> {ok, term()} | {not_found, emqx_map_lib:config_key_path(), term()}. find_listener_conf(Zone, Listener, KeyPath) -> - %% the configs in listener is prior to the ones in the zone - case find([zones, Zone, listeners, Listener | KeyPath]) of - {not_found, _, _} -> find([zones, Zone | KeyPath]); - {ok, Data} -> {ok, Data} - end. + find(?LISTENER_CONF_PATH(Zone, Listener, KeyPath)). -spec put(map()) -> ok. put(Config) -> diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 9e688fc0f..5d4e36f25 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -255,17 +255,17 @@ init_state(Transport, Socket, #{zone := Zone, listener := Listener} = Opts) -> }, Limiter = emqx_limiter:init(Zone, undefined, undefined, []), FrameOpts = #{ - strict_mode => emqx_config:get_listener_conf(Zone, Listener, [mqtt, strict_mode]), - max_size => emqx_config:get_listener_conf(Zone, Listener, [mqtt, max_packet_size]) + strict_mode => emqx_config:get_zone_conf(Zone, [mqtt, strict_mode]), + max_size => emqx_config:get_zone_conf(Zone, [mqtt, max_packet_size]) }, ParseState = emqx_frame:initial_parse_state(FrameOpts), Serialize = emqx_frame:serialize_opts(), Channel = emqx_channel:init(ConnInfo, Opts), - GcState = case emqx_config:get_listener_conf(Zone, Listener, [force_gc]) of + GcState = case emqx_config:get_zone_conf(Zone, [force_gc]) of #{enable := false} -> undefined; GcPolicy -> emqx_gc:init(GcPolicy) end, - StatsTimer = case emqx_config:get_listener_conf(Zone, Listener, [stats, enable]) of + StatsTimer = case emqx_config:get_zone_conf(Zone, [stats, enable]) of true -> undefined; false -> disabled end, @@ -293,8 +293,8 @@ run_loop(Parent, State = #state{transport = Transport, peername = Peername, channel = Channel}) -> emqx_logger:set_metadata_peername(esockd:format(Peername)), - ShutdownPolicy = emqx_config:get_listener_conf(emqx_channel:info(zone, Channel), - emqx_channel:info(listener, Channel), [force_shutdown]), + ShutdownPolicy = emqx_config:get_zone_conf(emqx_channel:info(zone, Channel), + [force_shutdown]), emqx_misc:tune_heap_size(ShutdownPolicy), case activate_socket(State) of {ok, NState} -> hibernate(Parent, NState); @@ -801,8 +801,8 @@ run_gc(Stats, State = #state{gc_state = GcSt}) -> end. check_oom(State = #state{channel = Channel}) -> - ShutdownPolicy = emqx_config:get_listener_conf(emqx_channel:info(zone, Channel), - emqx_channel:info(listener, Channel), [force_shutdown]), + ShutdownPolicy = emqx_config:get_zone_conf( + emqx_channel:info(zone, Channel), [force_shutdown]), ?tp(debug, check_oom, #{policy => ShutdownPolicy}), case emqx_misc:check_oom(ShutdownPolicy) of {shutdown, Reason} -> @@ -908,5 +908,5 @@ get_state(Pid) -> get_active_n(Zone, Listener) -> case emqx_config:get([zones, Zone, listeners, Listener, type]) of quic -> 100; - _ -> emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) + _ -> emqx_config:get_zone_conf(Zone, [tcp, active_n]) end. diff --git a/apps/emqx/src/emqx_flapping.erl b/apps/emqx/src/emqx_flapping.erl index 64633833b..b4ac7b228 100644 --- a/apps/emqx/src/emqx_flapping.erl +++ b/apps/emqx/src/emqx_flapping.erl @@ -90,7 +90,7 @@ detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone, listener := L end. get_policy(Zone, Listener) -> - emqx_config:get_listener_conf(Zone, Listener, [flapping_detect]). + emqx_config:get_zone_conf(Zone, [flapping_detect]). now_diff(TS) -> erlang:system_time(millisecond) - TS. diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 8cf3852e2..3ac8fd714 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -91,8 +91,7 @@ do_start_listener(ZoneName, ListenerName, #{type := quic, bind := ListenOn} = Op , {key, maps:get(keyfile, Opts)} , {alpn, ["mqtt"]} , {conn_acceptors, maps:get(acceptors, Opts, DefAcceptors)} - , {idle_timeout_ms, emqx_config:get_listener_conf(ZoneName, ListenerName, - [mqtt, idle_timeout])} + , {idle_timeout_ms, emqx_config:get_zone_conf(ZoneName, [mqtt, idle_timeout])} ], ConnectionOpts = #{conn_callback => emqx_quic_connection , peer_unidi_stream_count => 1 diff --git a/apps/emqx/src/emqx_mqtt_caps.erl b/apps/emqx/src/emqx_mqtt_caps.erl index 883f7354d..978d80471 100644 --- a/apps/emqx/src/emqx_mqtt_caps.erl +++ b/apps/emqx/src/emqx_mqtt_caps.erl @@ -115,5 +115,5 @@ do_check_sub(_Flags, _Caps) -> ok. get_caps(Zone, Listener) -> lists:foldl(fun({K, V}, Acc) -> - Acc#{K => emqx_config:get_listener_conf(Zone, Listener, [mqtt, K], V)} + Acc#{K => emqx_config:get_zone_conf(Zone, [mqtt, K], V)} end, #{}, maps:to_list(?DEFAULT_CAPS)). diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index c1f6767b4..1edd5bd88 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -697,4 +697,4 @@ set_field(Name, Value, Session) -> setelement(Pos+1, Session, Value). get_conf(Zone, Listener, Key) -> - emqx_config:get_listener_conf(Zone, Listener, [mqtt, Key]). + emqx_config:get_zone_conf(Zone, [mqtt, Key]). diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 947658035..723295b79 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -244,7 +244,7 @@ check_origin_header(Req, #{zone := Zone, listener := Listener} = Opts) -> websocket_init([Req, #{zone := Zone, listener := Listener} = Opts]) -> {Peername, Peercert} = - case emqx_config:get_listener_conf(Zone, Listener, [proxy_protocol]) andalso + case emqx_config:get_zone_conf(Zone, [proxy_protocol]) andalso maps:get(proxy_header, Req) of #{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} -> SourceName = {SrcAddr, SrcPort}, @@ -281,25 +281,25 @@ websocket_init([Req, #{zone := Zone, listener := Listener} = Opts]) -> Limiter = emqx_limiter:init(Zone, undefined, undefined, []), MQTTPiggyback = get_ws_opts(Zone, Listener, mqtt_piggyback), FrameOpts = #{ - strict_mode => emqx_config:get_listener_conf(Zone, Listener, [mqtt, strict_mode]), - max_size => emqx_config:get_listener_conf(Zone, Listener, [mqtt, max_packet_size]) + strict_mode => emqx_config:get_zone_conf(Zone, [mqtt, strict_mode]), + max_size => emqx_config:get_zone_conf(Zone, [mqtt, max_packet_size]) }, ParseState = emqx_frame:initial_parse_state(FrameOpts), Serialize = emqx_frame:serialize_opts(), Channel = emqx_channel:init(ConnInfo, Opts), - GcState = case emqx_config:get_listener_conf(Zone, Listener, [force_gc]) of + GcState = case emqx_config:get_zone_conf(Zone, [force_gc]) of #{enable := false} -> undefined; GcPolicy -> emqx_gc:init(GcPolicy) end, - StatsTimer = case emqx_config:get_listener_conf(Zone, Listener, [stats, enable]) of + StatsTimer = case emqx_config:get_zone_conf(Zone, [stats, enable]) of true -> undefined; false -> disabled end, %% MQTT Idle Timeout IdleTimeout = emqx_channel:get_mqtt_conf(Zone, Listener, idle_timeout), IdleTimer = start_timer(IdleTimeout, idle_timeout), - case emqx_config:get_listener_conf(emqx_channel:info(zone, Channel), - emqx_channel:info(listener, Channel), [force_shutdown]) of + case emqx_config:get_zone_conf(emqx_channel:info(zone, Channel), + [force_shutdown]) of #{enable := false} -> ok; ShutdownPolicy -> emqx_misc:tune_heap_size(ShutdownPolicy) end, @@ -372,7 +372,7 @@ websocket_info({check_gc, Stats}, State) -> websocket_info(Deliver = {deliver, _Topic, _Msg}, State = #state{zone = Zone, listener = Listener}) -> - ActiveN = emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]), + ActiveN = emqx_config:get_zone_conf(Zone, [tcp, active_n]), Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); @@ -521,8 +521,8 @@ run_gc(Stats, State = #state{gc_state = GcSt}) -> end. check_oom(State = #state{channel = Channel}) -> - ShutdownPolicy = emqx_config:get_listener_conf(emqx_channel:info(zone, Channel), - emqx_channel:info(listener, Channel), [force_shutdown]), + ShutdownPolicy = emqx_config:get_zone_conf( + emqx_channel:info(zone, Channel), [force_shutdown]), case ShutdownPolicy of #{enable := false} -> State; #{enable := true} -> @@ -564,7 +564,7 @@ handle_incoming(Packet, State = #state{zone = Zone, listener = Listener}) ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), ok = inc_incoming_stats(Packet), NState = case emqx_pd:get_counter(incoming_pubs) > - emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) of + emqx_config:get_zone_conf(Zone, [tcp, active_n]) of true -> postpone({cast, rate_limit}, State); false -> State end, @@ -601,7 +601,7 @@ handle_outgoing(Packets, State = #state{mqtt_piggyback = MQTTPiggyback, Oct = iolist_size(IoData), ok = inc_sent_stats(length(Packets), Oct), NState = case emqx_pd:get_counter(outgoing_pubs) > - emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) of + emqx_config:get_zone_conf(Zone, [tcp, active_n]) of true -> Stats = #{cnt => emqx_pd:reset_counter(outgoing_pubs), oct => emqx_pd:reset_counter(outgoing_bytes) @@ -789,4 +789,4 @@ set_field(Name, Value, State) -> setelement(Pos+1, State, Value). get_ws_opts(Zone, Listener, Key) -> - emqx_config:get_listener_conf(Zone, Listener, [websocket, Key]). + emqx_config:get_zone_conf(Zone, [websocket, Key]). diff --git a/apps/emqx/test/emqx_access_control_SUITE.erl b/apps/emqx/test/emqx_access_control_SUITE.erl index f5bf35acf..d459d28b2 100644 --- a/apps/emqx/test/emqx_access_control_SUITE.erl +++ b/apps/emqx/test/emqx_access_control_SUITE.erl @@ -58,4 +58,4 @@ clientinfo(InitProps) -> }, InitProps). toggle_auth(Bool) when is_boolean(Bool) -> - emqx_config:put_listener_conf(default, mqtt_tcp, [auth, enable], Bool). + emqx_config:put_zone_conf(default, [auth, enable], Bool). diff --git a/apps/emqx/test/emqx_acl_cache_SUITE.erl b/apps/emqx/test/emqx_acl_cache_SUITE.erl index f631f18cb..3708d0524 100644 --- a/apps/emqx/test/emqx_acl_cache_SUITE.erl +++ b/apps/emqx/test/emqx_acl_cache_SUITE.erl @@ -80,4 +80,4 @@ t_drain_acl_cache(_) -> emqtt:stop(Client). toggle_acl(Bool) when is_boolean(Bool) -> - emqx_config:put_listener_conf(default, mqtt_tcp, [acl, enable], Bool). + emqx_config:put_zone_conf(default, [acl, enable], Bool). diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 87b93a8e7..8461ae8c9 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -398,7 +398,7 @@ t_bad_receive_maximum(_) -> fun(true, _ClientInfo, _ConnInfo) -> {ok, #{session => session(), present => false}} end), - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, response_information], test), + emqx_config:put_zone_conf(default, [mqtt, response_information], test), C1 = channel(#{conn_state => idle}), {shutdown, protocol_error, _, _} = emqx_channel:handle_in( @@ -411,8 +411,8 @@ t_override_client_receive_maximum(_) -> fun(true, _ClientInfo, _ConnInfo) -> {ok, #{session => session(), present => false}} end), - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, response_information], test), - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_inflight], 0), + emqx_config:put_zone_conf(default, [mqtt, response_information], test), + emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0), C1 = channel(#{conn_state => idle}), ClientCapacity = 2, {ok, [{event, connected}, _ConnAck], C2} = @@ -663,7 +663,7 @@ t_handle_out_connack_response_information(_) -> fun(true, _ClientInfo, _ConnInfo) -> {ok, #{session => session(), present => false}} end), - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, response_information], test), + emqx_config:put_zone_conf(default, [mqtt, response_information], test), IdleChannel = channel(#{conn_state => idle}), {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, #{'Response-Information' := test})}], @@ -677,7 +677,7 @@ t_handle_out_connack_not_response_information(_) -> fun(true, _ClientInfo, _ConnInfo) -> {ok, #{session => session(), present => false}} end), - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, response_information], test), + emqx_config:put_zone_conf(default, [mqtt, response_information], test), IdleChannel = channel(#{conn_state => idle}), {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, AckProps)}], _} = emqx_channel:handle_in( @@ -863,7 +863,7 @@ t_packing_alias(_) -> channel())). t_check_pub_acl(_) -> - emqx_config:put_listener_conf(default, mqtt_tcp, [acl, enable], true), + emqx_config:put_zone_conf(default, [acl, enable], true), Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>), ok = emqx_channel:check_pub_acl(Publish, channel()). @@ -873,7 +873,7 @@ t_check_pub_alias(_) -> ok = emqx_channel:check_pub_alias(#mqtt_packet{variable = Publish}, Channel). t_check_sub_acls(_) -> - emqx_config:put_listener_conf(default, mqtt_tcp, [acl, enable], true), + emqx_config:put_zone_conf(default, [acl, enable], true), TopicFilter = {<<"t">>, ?DEFAULT_SUBOPTS}, [{TopicFilter, 0}] = emqx_channel:check_sub_acls([TopicFilter], channel()). diff --git a/apps/emqx/test/emqx_client_SUITE.erl b/apps/emqx/test/emqx_client_SUITE.erl index 552ad307c..c6a450471 100644 --- a/apps/emqx/test/emqx_client_SUITE.erl +++ b/apps/emqx/test/emqx_client_SUITE.erl @@ -101,7 +101,7 @@ t_basic_v4(_Config) -> t_basic([{proto_ver, v4}]). t_cm(_) -> - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, idle_timeout], 1000), + emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 1000), ClientId = <<"myclient">>, {ok, C} = emqtt:start_link([{clientid, ClientId}]), {ok, _} = emqtt:connect(C), @@ -111,7 +111,7 @@ t_cm(_) -> ct:sleep(1200), Stats = emqx_cm:get_chan_stats(ClientId), ?assertEqual(1, proplists:get_value(subscriptions_cnt, Stats)), - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, idle_timeout], 15000). + emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 15000). t_cm_registry(_) -> Info = supervisor:which_children(emqx_cm_sup), @@ -269,7 +269,7 @@ t_basic(_Opts) -> ok = emqtt:disconnect(C). t_username_as_clientid(_) -> - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, use_username_as_clientid], true), + emqx_config:put_zone_conf(default, [mqtt, use_username_as_clientid], true), Username = <<"usera">>, {ok, C} = emqtt:start_link([{username, Username}]), {ok, _} = emqtt:connect(C), @@ -323,7 +323,7 @@ tls_certcn_as_clientid(TLSVsn) -> tls_certcn_as_clientid(TLSVsn, RequiredTLSVsn) -> CN = <<"Client">>, - emqx_config:put_listener_conf(default, mqtt_ssl, [mqtt, peer_cert_as_clientid], cn), + emqx_config:put_zone_conf(default, [mqtt, peer_cert_as_clientid], cn), SslConf = emqx_ct_helpers:client_ssl_twoway(TLSVsn), {ok, Client} = emqtt:start_link([{port, 8883}, {ssl, true}, {ssl_opts, SslConf}]), {ok, _} = emqtt:connect(Client), diff --git a/apps/emqx/test/emqx_flapping_SUITE.erl b/apps/emqx/test/emqx_flapping_SUITE.erl index b4318ff64..eca276b84 100644 --- a/apps/emqx/test/emqx_flapping_SUITE.erl +++ b/apps/emqx/test/emqx_flapping_SUITE.erl @@ -26,7 +26,7 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([]), - emqx_config:put_listener_conf(default, mqtt_tcp, [flapping_detect], + emqx_config:put_zone_conf(default, [flapping_detect], #{max_count => 3, window_time => 100, % 0.1s ban_time => 2000 %% 2s diff --git a/apps/emqx/test/emqx_mqtt_caps_SUITE.erl b/apps/emqx/test/emqx_mqtt_caps_SUITE.erl index ac6b71c9f..7cdba6011 100644 --- a/apps/emqx/test/emqx_mqtt_caps_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_caps_SUITE.erl @@ -26,8 +26,8 @@ all() -> emqx_ct:all(?MODULE). t_check_pub(_) -> OldConf = emqx_config:get(), - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_qos_allowed], ?QOS_1), - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, retain_available], false), + emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], ?QOS_1), + emqx_config:put_zone_conf(default, [mqtt, retain_available], false), timer:sleep(50), ok = emqx_mqtt_caps:check_pub(default, mqtt_tcp, #{qos => ?QOS_1, retain => false}), PubFlags1 = #{qos => ?QOS_2, retain => false}, @@ -45,10 +45,10 @@ t_check_sub(_) -> nl => 0, qos => ?QOS_2 }, - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_topic_levels], 2), - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_qos_allowed], ?QOS_1), - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, shared_subscription], false), - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, wildcard_subscription], false), + emqx_config:put_zone_conf(default, [mqtt, max_topic_levels], 2), + emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], ?QOS_1), + emqx_config:put_zone_conf(default, [mqtt, shared_subscription], false), + emqx_config:put_zone_conf(default, [mqtt, wildcard_subscription], false), timer:sleep(50), ok = emqx_mqtt_caps:check_sub(default, mqtt_tcp, <<"topic">>, SubOpts), ?assertEqual({error, ?RC_TOPIC_FILTER_INVALID}, diff --git a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl index 607bee44b..8de571ba8 100644 --- a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl @@ -217,14 +217,14 @@ t_connect_will_message(Config) -> ok = emqtt:disconnect(Client4). t_batch_subscribe(init, Config) -> - emqx_config:put_listener_conf(default, mqtt_tcp, [acl, enable], true), - emqx_config:put_listener_conf(default, mqtt_quic, [acl, enable], true), + emqx_config:put_zone_conf(default, [acl, enable], true), + emqx_config:put_zone_conf(default, [acl, enable], true), ok = meck:new(emqx_access_control, [non_strict, passthrough, no_history, no_link]), meck:expect(emqx_access_control, authorize, fun(_, _, _) -> deny end), Config; t_batch_subscribe('end', _Config) -> - emqx_config:put_listener_conf(default, mqtt_tcp, [acl, enable], false), - emqx_config:put_listener_conf(default, mqtt_quic, [acl, enable], false), + emqx_config:put_zone_conf(default, [acl, enable], false), + emqx_config:put_zone_conf(default, [acl, enable], false), meck:unload(emqx_access_control). t_batch_subscribe(Config) -> @@ -288,22 +288,22 @@ t_connect_will_retain(Config) -> t_connect_idle_timeout(_Config) -> IdleTimeout = 2000, - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, idle_timeout], IdleTimeout), - emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, idle_timeout], IdleTimeout), + emqx_config:put_zone_conf(default, [mqtt, idle_timeout], IdleTimeout), + emqx_config:put_zone_conf(default, [mqtt, idle_timeout], IdleTimeout), {ok, Sock} = emqtt_sock:connect({127,0,0,1}, 1883, [], 60000), timer:sleep(IdleTimeout), ?assertMatch({error, closed}, emqtt_sock:recv(Sock,1024)). t_connect_emit_stats_timeout(init, Config) -> NewIdleTimeout = 1000, - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, idle_timeout], NewIdleTimeout), - emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, idle_timeout], NewIdleTimeout), + emqx_config:put_zone_conf(default, [mqtt, idle_timeout], NewIdleTimeout), + emqx_config:put_zone_conf(default, [mqtt, idle_timeout], NewIdleTimeout), ok = snabbkaffe:start_trace(), [{idle_timeout, NewIdleTimeout} | Config]; t_connect_emit_stats_timeout('end', _Config) -> snabbkaffe:stop(), - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, idle_timeout], 15000), - emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, idle_timeout], 15000), + emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 15000), + emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 15000), ok. t_connect_emit_stats_timeout(Config) -> @@ -471,8 +471,8 @@ t_connack_session_present(Config) -> t_connack_max_qos_allowed(init, Config) -> Config; t_connack_max_qos_allowed('end', _Config) -> - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_qos_allowed], 2), - emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, max_qos_allowed], 2), + emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 2), + emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 2), ok. t_connack_max_qos_allowed(Config) -> ConnFun = ?config(conn_fun, Config), @@ -480,8 +480,8 @@ t_connack_max_qos_allowed(Config) -> Topic = nth(1, ?TOPICS), %% max_qos_allowed = 0 - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_qos_allowed], 0), - emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, max_qos_allowed], 0), + emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 0), + emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 0), {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]), {ok, Connack1} = emqtt:ConnFun(Client1), @@ -506,8 +506,8 @@ t_connack_max_qos_allowed(Config) -> waiting_client_process_exit(Client2), %% max_qos_allowed = 1 - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_qos_allowed], 1), - emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, max_qos_allowed], 1), + emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 1), + emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 1), {ok, Client3} = emqtt:start_link([{proto_ver, v5} | Config]), {ok, Connack3} = emqtt:ConnFun(Client3), @@ -532,8 +532,8 @@ t_connack_max_qos_allowed(Config) -> waiting_client_process_exit(Client4), %% max_qos_allowed = 2 - emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_qos_allowed], 2), - emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, max_qos_allowed], 2), + emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 2), + emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 2), {ok, Client5} = emqtt:start_link([{proto_ver, v5} | Config]), {ok, Connack5} = emqtt:ConnFun(Client5),