refactor(config): don't allow inheritable config entries

This commit is contained in:
Shawn 2021-07-19 14:57:47 +08:00
parent 32b571091b
commit e6424d63d8
16 changed files with 103 additions and 89 deletions

View File

@ -52,15 +52,15 @@ drain_k() -> {?MODULE, drain_timestamp}.
-spec(is_enabled(atom(), atom()) -> boolean()). -spec(is_enabled(atom(), atom()) -> boolean()).
is_enabled(Zone, Listener) -> is_enabled(Zone, Listener) ->
emqx_config:get_listener_conf(Zone, Listener, [acl, cache, enable]). emqx_config:get_zone_conf(Zone, [acl, cache, enable]).
-spec(get_cache_max_size(atom(), atom()) -> integer()). -spec(get_cache_max_size(atom(), atom()) -> integer()).
get_cache_max_size(Zone, Listener) -> get_cache_max_size(Zone, Listener) ->
emqx_config:get_listener_conf(Zone, Listener, [acl, cache, max_size]). emqx_config:get_zone_conf(Zone, [acl, cache, max_size]).
-spec(get_cache_ttl(atom(), atom()) -> integer()). -spec(get_cache_ttl(atom(), atom()) -> integer()).
get_cache_ttl(Zone, Listener) -> get_cache_ttl(Zone, Listener) ->
emqx_config:get_listener_conf(Zone, Listener, [acl, cache, ttl]). emqx_config:get_zone_conf(Zone, [acl, cache, ttl]).
-spec(list_acl_cache(atom(), atom()) -> [acl_cache_entry()]). -spec(list_acl_cache(atom(), atom()) -> [acl_cache_entry()]).
list_acl_cache(Zone, Listener) -> list_acl_cache(Zone, Listener) ->

View File

@ -210,7 +210,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port},
<<>> -> undefined; <<>> -> undefined;
MP -> MP MP -> MP
end, end,
QuotaPolicy = emqx_config:get_listener_conf(Zone, Listener, [rate_limit, quota]), QuotaPolicy = emqx_config:get_zone_conf(Zone, [rate_limit, quota]),
ClientInfo = set_peercert_infos( ClientInfo = set_peercert_infos(
Peercert, Peercert,
#{zone => Zone, #{zone => Zone,
@ -435,7 +435,7 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
HasAclDeny = lists:any(fun({_TopicFilter, ReasonCode}) -> HasAclDeny = lists:any(fun({_TopicFilter, ReasonCode}) ->
ReasonCode =:= ?RC_NOT_AUTHORIZED ReasonCode =:= ?RC_NOT_AUTHORIZED
end, TupleTopicFilters0), end, TupleTopicFilters0),
DenyAction = emqx_config:get_listener_conf(Zone, Listener, [acl, deny_action]), DenyAction = emqx_config:get_zone_conf(Zone, [acl, deny_action]),
case DenyAction =:= disconnect andalso HasAclDeny of case DenyAction =:= disconnect andalso HasAclDeny of
true -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel); true -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel);
false -> false ->
@ -551,7 +551,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
{error, Rc = ?RC_NOT_AUTHORIZED, NChannel} -> {error, Rc = ?RC_NOT_AUTHORIZED, NChannel} ->
?LOG(warning, "Cannot publish message to ~s due to ~s.", ?LOG(warning, "Cannot publish message to ~s due to ~s.",
[Topic, emqx_reason_codes:text(Rc)]), [Topic, emqx_reason_codes:text(Rc)]),
case emqx_config:get_listener_conf(Zone, Listener, [acl_deny_action]) of case emqx_config:get_zone_conf(Zone, [acl_deny_action]) of
ignore -> ignore ->
case QoS of case QoS of
?QOS_0 -> {ok, NChannel}; ?QOS_0 -> {ok, NChannel};
@ -997,7 +997,7 @@ handle_info({sock_closed, Reason}, Channel =
#channel{conn_state = ConnState, #channel{conn_state = ConnState,
clientinfo = ClientInfo = #{zone := Zone, listener := Listener}}) clientinfo = ClientInfo = #{zone := Zone, listener := Listener}})
when ConnState =:= connected orelse ConnState =:= reauthenticating -> when ConnState =:= connected orelse ConnState =:= reauthenticating ->
emqx_config:get_listener_conf(Zone, Listener, [flapping_detect, enable]) emqx_config:get_zone_conf(Zone, [flapping_detect, enable])
andalso emqx_flapping:detect(ClientInfo), andalso emqx_flapping:detect(ClientInfo),
Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)), Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)),
case maybe_shutdown(Reason, Channel1) of case maybe_shutdown(Reason, Channel1) of
@ -1625,7 +1625,7 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Is ACL enabled? %% Is ACL enabled?
is_acl_enabled(#{zone := Zone, listener := Listener, is_superuser := IsSuperuser}) -> is_acl_enabled(#{zone := Zone, listener := Listener, is_superuser := IsSuperuser}) ->
(not IsSuperuser) andalso emqx_config:get_listener_conf(Zone, Listener, [acl, enable]). (not IsSuperuser) andalso emqx_config:get_zone_conf(Zone, [acl, enable]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Parse Topic Filters %% Parse Topic Filters
@ -1737,10 +1737,10 @@ flag(true) -> 1;
flag(false) -> 0. flag(false) -> 0.
get_mqtt_conf(Zone, Listener, Key) -> get_mqtt_conf(Zone, Listener, Key) ->
emqx_config:get_listener_conf(Zone, Listener, [mqtt, Key]). emqx_config:get_zone_conf(Zone, [mqtt, Key]).
get_mqtt_conf(Zone, Listener, Key, Default) -> get_mqtt_conf(Zone, Listener, Key, Default) ->
emqx_config:get_listener_conf(Zone, Listener, [mqtt, Key], Default). emqx_config:get_zone_conf(Zone, [mqtt, Key], Default).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% For CT tests %% For CT tests

View File

@ -25,6 +25,12 @@
, put/2 , put/2
]). ]).
-export([ get_zone_conf/2
, get_zone_conf/3
, put_zone_conf/3
, find_zone_conf/2
]).
-export([ get_listener_conf/3 -export([ get_listener_conf/3
, get_listener_conf/4 , get_listener_conf/4
, put_listener_conf/4 , put_listener_conf/4
@ -44,6 +50,8 @@
-define(CONF, ?MODULE). -define(CONF, ?MODULE).
-define(RAW_CONF, {?MODULE, raw}). -define(RAW_CONF, {?MODULE, raw}).
-define(ZONE_CONF_PATH(ZONE, PATH), [zones, ZONE | PATH]).
-define(LISTENER_CONF_PATH(ZONE, LISTENER, PATH), [zones, ZONE, listeners, LISTENER | PATH]).
-export_type([update_request/0, raw_config/0, config/0]). -export_type([update_request/0, raw_config/0, config/0]).
-type update_request() :: term(). -type update_request() :: term().
@ -67,32 +75,39 @@ get(KeyPath, Default) ->
find(KeyPath) -> find(KeyPath) ->
emqx_map_lib:deep_find(KeyPath, get()). emqx_map_lib:deep_find(KeyPath, get()).
-spec get_zone_conf(atom(), emqx_map_lib:config_key_path()) -> term().
get_zone_conf(Zone, KeyPath) ->
?MODULE:get(?ZONE_CONF_PATH(Zone, KeyPath)).
-spec get_zone_conf(atom(), emqx_map_lib:config_key_path(), term()) -> term().
get_zone_conf(Zone, KeyPath, Default) ->
?MODULE:get(?ZONE_CONF_PATH(Zone, KeyPath), Default).
-spec put_zone_conf(atom(), emqx_map_lib:config_key_path(), term()) -> ok.
put_zone_conf(Zone, KeyPath, Conf) ->
?MODULE:put(?ZONE_CONF_PATH(Zone, KeyPath), Conf).
-spec find_zone_conf(atom(), emqx_map_lib:config_key_path()) ->
{ok, term()} | {not_found, emqx_map_lib:config_key_path(), term()}.
find_zone_conf(Zone, KeyPath) ->
find(?ZONE_CONF_PATH(Zone, KeyPath)).
-spec get_listener_conf(atom(), atom(), emqx_map_lib:config_key_path()) -> term(). -spec get_listener_conf(atom(), atom(), emqx_map_lib:config_key_path()) -> term().
get_listener_conf(Zone, Listener, KeyPath) -> get_listener_conf(Zone, Listener, KeyPath) ->
case find_listener_conf(Zone, Listener, KeyPath) of ?MODULE:get(?LISTENER_CONF_PATH(Zone, Listener, KeyPath)).
{not_found, SubKeyPath, Data} -> error({not_found, SubKeyPath, Data});
{ok, Data} -> Data
end.
-spec get_listener_conf(atom(), atom(), emqx_map_lib:config_key_path(), term()) -> term(). -spec get_listener_conf(atom(), atom(), emqx_map_lib:config_key_path(), term()) -> term().
get_listener_conf(Zone, Listener, KeyPath, Default) -> get_listener_conf(Zone, Listener, KeyPath, Default) ->
case find_listener_conf(Zone, Listener, KeyPath) of ?MODULE:get(?LISTENER_CONF_PATH(Zone, Listener, KeyPath), Default).
{not_found, _, _} -> Default;
{ok, Data} -> Data
end.
-spec put_listener_conf(atom(), atom(), emqx_map_lib:config_key_path(), term()) -> ok. -spec put_listener_conf(atom(), atom(), emqx_map_lib:config_key_path(), term()) -> ok.
put_listener_conf(Zone, Listener, KeyPath, Conf) -> put_listener_conf(Zone, Listener, KeyPath, Conf) ->
?MODULE:put([zones, Zone, listeners, Listener | KeyPath], Conf). ?MODULE:put(?LISTENER_CONF_PATH(Zone, Listener, KeyPath), Conf).
-spec find_listener_conf(atom(), atom(), emqx_map_lib:config_key_path()) -> -spec find_listener_conf(atom(), atom(), emqx_map_lib:config_key_path()) ->
{ok, term()} | {not_found, emqx_map_lib:config_key_path(), term()}. {ok, term()} | {not_found, emqx_map_lib:config_key_path(), term()}.
find_listener_conf(Zone, Listener, KeyPath) -> find_listener_conf(Zone, Listener, KeyPath) ->
%% the configs in listener is prior to the ones in the zone find(?LISTENER_CONF_PATH(Zone, Listener, KeyPath)).
case find([zones, Zone, listeners, Listener | KeyPath]) of
{not_found, _, _} -> find([zones, Zone | KeyPath]);
{ok, Data} -> {ok, Data}
end.
-spec put(map()) -> ok. -spec put(map()) -> ok.
put(Config) -> put(Config) ->

View File

@ -255,17 +255,17 @@ init_state(Transport, Socket, #{zone := Zone, listener := Listener} = Opts) ->
}, },
Limiter = emqx_limiter:init(Zone, undefined, undefined, []), Limiter = emqx_limiter:init(Zone, undefined, undefined, []),
FrameOpts = #{ FrameOpts = #{
strict_mode => emqx_config:get_listener_conf(Zone, Listener, [mqtt, strict_mode]), strict_mode => emqx_config:get_zone_conf(Zone, [mqtt, strict_mode]),
max_size => emqx_config:get_listener_conf(Zone, Listener, [mqtt, max_packet_size]) max_size => emqx_config:get_zone_conf(Zone, [mqtt, max_packet_size])
}, },
ParseState = emqx_frame:initial_parse_state(FrameOpts), ParseState = emqx_frame:initial_parse_state(FrameOpts),
Serialize = emqx_frame:serialize_opts(), Serialize = emqx_frame:serialize_opts(),
Channel = emqx_channel:init(ConnInfo, Opts), Channel = emqx_channel:init(ConnInfo, Opts),
GcState = case emqx_config:get_listener_conf(Zone, Listener, [force_gc]) of GcState = case emqx_config:get_zone_conf(Zone, [force_gc]) of
#{enable := false} -> undefined; #{enable := false} -> undefined;
GcPolicy -> emqx_gc:init(GcPolicy) GcPolicy -> emqx_gc:init(GcPolicy)
end, end,
StatsTimer = case emqx_config:get_listener_conf(Zone, Listener, [stats, enable]) of StatsTimer = case emqx_config:get_zone_conf(Zone, [stats, enable]) of
true -> undefined; true -> undefined;
false -> disabled false -> disabled
end, end,
@ -293,8 +293,8 @@ run_loop(Parent, State = #state{transport = Transport,
peername = Peername, peername = Peername,
channel = Channel}) -> channel = Channel}) ->
emqx_logger:set_metadata_peername(esockd:format(Peername)), emqx_logger:set_metadata_peername(esockd:format(Peername)),
ShutdownPolicy = emqx_config:get_listener_conf(emqx_channel:info(zone, Channel), ShutdownPolicy = emqx_config:get_zone_conf(emqx_channel:info(zone, Channel),
emqx_channel:info(listener, Channel), [force_shutdown]), [force_shutdown]),
emqx_misc:tune_heap_size(ShutdownPolicy), emqx_misc:tune_heap_size(ShutdownPolicy),
case activate_socket(State) of case activate_socket(State) of
{ok, NState} -> hibernate(Parent, NState); {ok, NState} -> hibernate(Parent, NState);
@ -801,8 +801,8 @@ run_gc(Stats, State = #state{gc_state = GcSt}) ->
end. end.
check_oom(State = #state{channel = Channel}) -> check_oom(State = #state{channel = Channel}) ->
ShutdownPolicy = emqx_config:get_listener_conf(emqx_channel:info(zone, Channel), ShutdownPolicy = emqx_config:get_zone_conf(
emqx_channel:info(listener, Channel), [force_shutdown]), emqx_channel:info(zone, Channel), [force_shutdown]),
?tp(debug, check_oom, #{policy => ShutdownPolicy}), ?tp(debug, check_oom, #{policy => ShutdownPolicy}),
case emqx_misc:check_oom(ShutdownPolicy) of case emqx_misc:check_oom(ShutdownPolicy) of
{shutdown, Reason} -> {shutdown, Reason} ->
@ -908,5 +908,5 @@ get_state(Pid) ->
get_active_n(Zone, Listener) -> get_active_n(Zone, Listener) ->
case emqx_config:get([zones, Zone, listeners, Listener, type]) of case emqx_config:get([zones, Zone, listeners, Listener, type]) of
quic -> 100; quic -> 100;
_ -> emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) _ -> emqx_config:get_zone_conf(Zone, [tcp, active_n])
end. end.

View File

@ -90,7 +90,7 @@ detect(#{clientid := ClientId, peerhost := PeerHost, zone := Zone, listener := L
end. end.
get_policy(Zone, Listener) -> get_policy(Zone, Listener) ->
emqx_config:get_listener_conf(Zone, Listener, [flapping_detect]). emqx_config:get_zone_conf(Zone, [flapping_detect]).
now_diff(TS) -> erlang:system_time(millisecond) - TS. now_diff(TS) -> erlang:system_time(millisecond) - TS.

View File

@ -91,8 +91,7 @@ do_start_listener(ZoneName, ListenerName, #{type := quic, bind := ListenOn} = Op
, {key, maps:get(keyfile, Opts)} , {key, maps:get(keyfile, Opts)}
, {alpn, ["mqtt"]} , {alpn, ["mqtt"]}
, {conn_acceptors, maps:get(acceptors, Opts, DefAcceptors)} , {conn_acceptors, maps:get(acceptors, Opts, DefAcceptors)}
, {idle_timeout_ms, emqx_config:get_listener_conf(ZoneName, ListenerName, , {idle_timeout_ms, emqx_config:get_zone_conf(ZoneName, [mqtt, idle_timeout])}
[mqtt, idle_timeout])}
], ],
ConnectionOpts = #{conn_callback => emqx_quic_connection ConnectionOpts = #{conn_callback => emqx_quic_connection
, peer_unidi_stream_count => 1 , peer_unidi_stream_count => 1

View File

@ -115,5 +115,5 @@ do_check_sub(_Flags, _Caps) -> ok.
get_caps(Zone, Listener) -> get_caps(Zone, Listener) ->
lists:foldl(fun({K, V}, Acc) -> lists:foldl(fun({K, V}, Acc) ->
Acc#{K => emqx_config:get_listener_conf(Zone, Listener, [mqtt, K], V)} Acc#{K => emqx_config:get_zone_conf(Zone, [mqtt, K], V)}
end, #{}, maps:to_list(?DEFAULT_CAPS)). end, #{}, maps:to_list(?DEFAULT_CAPS)).

View File

@ -697,4 +697,4 @@ set_field(Name, Value, Session) ->
setelement(Pos+1, Session, Value). setelement(Pos+1, Session, Value).
get_conf(Zone, Listener, Key) -> get_conf(Zone, Listener, Key) ->
emqx_config:get_listener_conf(Zone, Listener, [mqtt, Key]). emqx_config:get_zone_conf(Zone, [mqtt, Key]).

View File

@ -244,7 +244,7 @@ check_origin_header(Req, #{zone := Zone, listener := Listener} = Opts) ->
websocket_init([Req, #{zone := Zone, listener := Listener} = Opts]) -> websocket_init([Req, #{zone := Zone, listener := Listener} = Opts]) ->
{Peername, Peercert} = {Peername, Peercert} =
case emqx_config:get_listener_conf(Zone, Listener, [proxy_protocol]) andalso case emqx_config:get_zone_conf(Zone, [proxy_protocol]) andalso
maps:get(proxy_header, Req) of maps:get(proxy_header, Req) of
#{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} -> #{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} ->
SourceName = {SrcAddr, SrcPort}, SourceName = {SrcAddr, SrcPort},
@ -281,25 +281,25 @@ websocket_init([Req, #{zone := Zone, listener := Listener} = Opts]) ->
Limiter = emqx_limiter:init(Zone, undefined, undefined, []), Limiter = emqx_limiter:init(Zone, undefined, undefined, []),
MQTTPiggyback = get_ws_opts(Zone, Listener, mqtt_piggyback), MQTTPiggyback = get_ws_opts(Zone, Listener, mqtt_piggyback),
FrameOpts = #{ FrameOpts = #{
strict_mode => emqx_config:get_listener_conf(Zone, Listener, [mqtt, strict_mode]), strict_mode => emqx_config:get_zone_conf(Zone, [mqtt, strict_mode]),
max_size => emqx_config:get_listener_conf(Zone, Listener, [mqtt, max_packet_size]) max_size => emqx_config:get_zone_conf(Zone, [mqtt, max_packet_size])
}, },
ParseState = emqx_frame:initial_parse_state(FrameOpts), ParseState = emqx_frame:initial_parse_state(FrameOpts),
Serialize = emqx_frame:serialize_opts(), Serialize = emqx_frame:serialize_opts(),
Channel = emqx_channel:init(ConnInfo, Opts), Channel = emqx_channel:init(ConnInfo, Opts),
GcState = case emqx_config:get_listener_conf(Zone, Listener, [force_gc]) of GcState = case emqx_config:get_zone_conf(Zone, [force_gc]) of
#{enable := false} -> undefined; #{enable := false} -> undefined;
GcPolicy -> emqx_gc:init(GcPolicy) GcPolicy -> emqx_gc:init(GcPolicy)
end, end,
StatsTimer = case emqx_config:get_listener_conf(Zone, Listener, [stats, enable]) of StatsTimer = case emqx_config:get_zone_conf(Zone, [stats, enable]) of
true -> undefined; true -> undefined;
false -> disabled false -> disabled
end, end,
%% MQTT Idle Timeout %% MQTT Idle Timeout
IdleTimeout = emqx_channel:get_mqtt_conf(Zone, Listener, idle_timeout), IdleTimeout = emqx_channel:get_mqtt_conf(Zone, Listener, idle_timeout),
IdleTimer = start_timer(IdleTimeout, idle_timeout), IdleTimer = start_timer(IdleTimeout, idle_timeout),
case emqx_config:get_listener_conf(emqx_channel:info(zone, Channel), case emqx_config:get_zone_conf(emqx_channel:info(zone, Channel),
emqx_channel:info(listener, Channel), [force_shutdown]) of [force_shutdown]) of
#{enable := false} -> ok; #{enable := false} -> ok;
ShutdownPolicy -> emqx_misc:tune_heap_size(ShutdownPolicy) ShutdownPolicy -> emqx_misc:tune_heap_size(ShutdownPolicy)
end, end,
@ -372,7 +372,7 @@ websocket_info({check_gc, Stats}, State) ->
websocket_info(Deliver = {deliver, _Topic, _Msg}, websocket_info(Deliver = {deliver, _Topic, _Msg},
State = #state{zone = Zone, listener = Listener}) -> State = #state{zone = Zone, listener = Listener}) ->
ActiveN = emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]), ActiveN = emqx_config:get_zone_conf(Zone, [tcp, active_n]),
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State); with_channel(handle_deliver, [Delivers], State);
@ -521,8 +521,8 @@ run_gc(Stats, State = #state{gc_state = GcSt}) ->
end. end.
check_oom(State = #state{channel = Channel}) -> check_oom(State = #state{channel = Channel}) ->
ShutdownPolicy = emqx_config:get_listener_conf(emqx_channel:info(zone, Channel), ShutdownPolicy = emqx_config:get_zone_conf(
emqx_channel:info(listener, Channel), [force_shutdown]), emqx_channel:info(zone, Channel), [force_shutdown]),
case ShutdownPolicy of case ShutdownPolicy of
#{enable := false} -> State; #{enable := false} -> State;
#{enable := true} -> #{enable := true} ->
@ -564,7 +564,7 @@ handle_incoming(Packet, State = #state{zone = Zone, listener = Listener})
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
ok = inc_incoming_stats(Packet), ok = inc_incoming_stats(Packet),
NState = case emqx_pd:get_counter(incoming_pubs) > NState = case emqx_pd:get_counter(incoming_pubs) >
emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) of emqx_config:get_zone_conf(Zone, [tcp, active_n]) of
true -> postpone({cast, rate_limit}, State); true -> postpone({cast, rate_limit}, State);
false -> State false -> State
end, end,
@ -601,7 +601,7 @@ handle_outgoing(Packets, State = #state{mqtt_piggyback = MQTTPiggyback,
Oct = iolist_size(IoData), Oct = iolist_size(IoData),
ok = inc_sent_stats(length(Packets), Oct), ok = inc_sent_stats(length(Packets), Oct),
NState = case emqx_pd:get_counter(outgoing_pubs) > NState = case emqx_pd:get_counter(outgoing_pubs) >
emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) of emqx_config:get_zone_conf(Zone, [tcp, active_n]) of
true -> true ->
Stats = #{cnt => emqx_pd:reset_counter(outgoing_pubs), Stats = #{cnt => emqx_pd:reset_counter(outgoing_pubs),
oct => emqx_pd:reset_counter(outgoing_bytes) oct => emqx_pd:reset_counter(outgoing_bytes)
@ -789,4 +789,4 @@ set_field(Name, Value, State) ->
setelement(Pos+1, State, Value). setelement(Pos+1, State, Value).
get_ws_opts(Zone, Listener, Key) -> get_ws_opts(Zone, Listener, Key) ->
emqx_config:get_listener_conf(Zone, Listener, [websocket, Key]). emqx_config:get_zone_conf(Zone, [websocket, Key]).

View File

@ -58,4 +58,4 @@ clientinfo(InitProps) ->
}, InitProps). }, InitProps).
toggle_auth(Bool) when is_boolean(Bool) -> toggle_auth(Bool) when is_boolean(Bool) ->
emqx_config:put_listener_conf(default, mqtt_tcp, [auth, enable], Bool). emqx_config:put_zone_conf(default, [auth, enable], Bool).

View File

@ -80,4 +80,4 @@ t_drain_acl_cache(_) ->
emqtt:stop(Client). emqtt:stop(Client).
toggle_acl(Bool) when is_boolean(Bool) -> toggle_acl(Bool) when is_boolean(Bool) ->
emqx_config:put_listener_conf(default, mqtt_tcp, [acl, enable], Bool). emqx_config:put_zone_conf(default, [acl, enable], Bool).

View File

@ -398,7 +398,7 @@ t_bad_receive_maximum(_) ->
fun(true, _ClientInfo, _ConnInfo) -> fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}} {ok, #{session => session(), present => false}}
end), end),
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, response_information], test), emqx_config:put_zone_conf(default, [mqtt, response_information], test),
C1 = channel(#{conn_state => idle}), C1 = channel(#{conn_state => idle}),
{shutdown, protocol_error, _, _} = {shutdown, protocol_error, _, _} =
emqx_channel:handle_in( emqx_channel:handle_in(
@ -411,8 +411,8 @@ t_override_client_receive_maximum(_) ->
fun(true, _ClientInfo, _ConnInfo) -> fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}} {ok, #{session => session(), present => false}}
end), end),
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, response_information], test), emqx_config:put_zone_conf(default, [mqtt, response_information], test),
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_inflight], 0), emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0),
C1 = channel(#{conn_state => idle}), C1 = channel(#{conn_state => idle}),
ClientCapacity = 2, ClientCapacity = 2,
{ok, [{event, connected}, _ConnAck], C2} = {ok, [{event, connected}, _ConnAck], C2} =
@ -663,7 +663,7 @@ t_handle_out_connack_response_information(_) ->
fun(true, _ClientInfo, _ConnInfo) -> fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}} {ok, #{session => session(), present => false}}
end), end),
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, response_information], test), emqx_config:put_zone_conf(default, [mqtt, response_information], test),
IdleChannel = channel(#{conn_state => idle}), IdleChannel = channel(#{conn_state => idle}),
{ok, [{event, connected}, {ok, [{event, connected},
{connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, #{'Response-Information' := test})}], {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, #{'Response-Information' := test})}],
@ -677,7 +677,7 @@ t_handle_out_connack_not_response_information(_) ->
fun(true, _ClientInfo, _ConnInfo) -> fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}} {ok, #{session => session(), present => false}}
end), end),
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, response_information], test), emqx_config:put_zone_conf(default, [mqtt, response_information], test),
IdleChannel = channel(#{conn_state => idle}), IdleChannel = channel(#{conn_state => idle}),
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, AckProps)}], _} = {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, AckProps)}], _} =
emqx_channel:handle_in( emqx_channel:handle_in(
@ -863,7 +863,7 @@ t_packing_alias(_) ->
channel())). channel())).
t_check_pub_acl(_) -> t_check_pub_acl(_) ->
emqx_config:put_listener_conf(default, mqtt_tcp, [acl, enable], true), emqx_config:put_zone_conf(default, [acl, enable], true),
Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>), Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
ok = emqx_channel:check_pub_acl(Publish, channel()). ok = emqx_channel:check_pub_acl(Publish, channel()).
@ -873,7 +873,7 @@ t_check_pub_alias(_) ->
ok = emqx_channel:check_pub_alias(#mqtt_packet{variable = Publish}, Channel). ok = emqx_channel:check_pub_alias(#mqtt_packet{variable = Publish}, Channel).
t_check_sub_acls(_) -> t_check_sub_acls(_) ->
emqx_config:put_listener_conf(default, mqtt_tcp, [acl, enable], true), emqx_config:put_zone_conf(default, [acl, enable], true),
TopicFilter = {<<"t">>, ?DEFAULT_SUBOPTS}, TopicFilter = {<<"t">>, ?DEFAULT_SUBOPTS},
[{TopicFilter, 0}] = emqx_channel:check_sub_acls([TopicFilter], channel()). [{TopicFilter, 0}] = emqx_channel:check_sub_acls([TopicFilter], channel()).

View File

@ -101,7 +101,7 @@ t_basic_v4(_Config) ->
t_basic([{proto_ver, v4}]). t_basic([{proto_ver, v4}]).
t_cm(_) -> t_cm(_) ->
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, idle_timeout], 1000), emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 1000),
ClientId = <<"myclient">>, ClientId = <<"myclient">>,
{ok, C} = emqtt:start_link([{clientid, ClientId}]), {ok, C} = emqtt:start_link([{clientid, ClientId}]),
{ok, _} = emqtt:connect(C), {ok, _} = emqtt:connect(C),
@ -111,7 +111,7 @@ t_cm(_) ->
ct:sleep(1200), ct:sleep(1200),
Stats = emqx_cm:get_chan_stats(ClientId), Stats = emqx_cm:get_chan_stats(ClientId),
?assertEqual(1, proplists:get_value(subscriptions_cnt, Stats)), ?assertEqual(1, proplists:get_value(subscriptions_cnt, Stats)),
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, idle_timeout], 15000). emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 15000).
t_cm_registry(_) -> t_cm_registry(_) ->
Info = supervisor:which_children(emqx_cm_sup), Info = supervisor:which_children(emqx_cm_sup),
@ -269,7 +269,7 @@ t_basic(_Opts) ->
ok = emqtt:disconnect(C). ok = emqtt:disconnect(C).
t_username_as_clientid(_) -> t_username_as_clientid(_) ->
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, use_username_as_clientid], true), emqx_config:put_zone_conf(default, [mqtt, use_username_as_clientid], true),
Username = <<"usera">>, Username = <<"usera">>,
{ok, C} = emqtt:start_link([{username, Username}]), {ok, C} = emqtt:start_link([{username, Username}]),
{ok, _} = emqtt:connect(C), {ok, _} = emqtt:connect(C),
@ -323,7 +323,7 @@ tls_certcn_as_clientid(TLSVsn) ->
tls_certcn_as_clientid(TLSVsn, RequiredTLSVsn) -> tls_certcn_as_clientid(TLSVsn, RequiredTLSVsn) ->
CN = <<"Client">>, CN = <<"Client">>,
emqx_config:put_listener_conf(default, mqtt_ssl, [mqtt, peer_cert_as_clientid], cn), emqx_config:put_zone_conf(default, [mqtt, peer_cert_as_clientid], cn),
SslConf = emqx_ct_helpers:client_ssl_twoway(TLSVsn), SslConf = emqx_ct_helpers:client_ssl_twoway(TLSVsn),
{ok, Client} = emqtt:start_link([{port, 8883}, {ssl, true}, {ssl_opts, SslConf}]), {ok, Client} = emqtt:start_link([{port, 8883}, {ssl, true}, {ssl_opts, SslConf}]),
{ok, _} = emqtt:connect(Client), {ok, _} = emqtt:connect(Client),

View File

@ -26,7 +26,7 @@ all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]), emqx_ct_helpers:start_apps([]),
emqx_config:put_listener_conf(default, mqtt_tcp, [flapping_detect], emqx_config:put_zone_conf(default, [flapping_detect],
#{max_count => 3, #{max_count => 3,
window_time => 100, % 0.1s window_time => 100, % 0.1s
ban_time => 2000 %% 2s ban_time => 2000 %% 2s

View File

@ -26,8 +26,8 @@ all() -> emqx_ct:all(?MODULE).
t_check_pub(_) -> t_check_pub(_) ->
OldConf = emqx_config:get(), OldConf = emqx_config:get(),
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_qos_allowed], ?QOS_1), emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], ?QOS_1),
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, retain_available], false), emqx_config:put_zone_conf(default, [mqtt, retain_available], false),
timer:sleep(50), timer:sleep(50),
ok = emqx_mqtt_caps:check_pub(default, mqtt_tcp, #{qos => ?QOS_1, retain => false}), ok = emqx_mqtt_caps:check_pub(default, mqtt_tcp, #{qos => ?QOS_1, retain => false}),
PubFlags1 = #{qos => ?QOS_2, retain => false}, PubFlags1 = #{qos => ?QOS_2, retain => false},
@ -45,10 +45,10 @@ t_check_sub(_) ->
nl => 0, nl => 0,
qos => ?QOS_2 qos => ?QOS_2
}, },
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_topic_levels], 2), emqx_config:put_zone_conf(default, [mqtt, max_topic_levels], 2),
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_qos_allowed], ?QOS_1), emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], ?QOS_1),
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, shared_subscription], false), emqx_config:put_zone_conf(default, [mqtt, shared_subscription], false),
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, wildcard_subscription], false), emqx_config:put_zone_conf(default, [mqtt, wildcard_subscription], false),
timer:sleep(50), timer:sleep(50),
ok = emqx_mqtt_caps:check_sub(default, mqtt_tcp, <<"topic">>, SubOpts), ok = emqx_mqtt_caps:check_sub(default, mqtt_tcp, <<"topic">>, SubOpts),
?assertEqual({error, ?RC_TOPIC_FILTER_INVALID}, ?assertEqual({error, ?RC_TOPIC_FILTER_INVALID},

View File

@ -217,14 +217,14 @@ t_connect_will_message(Config) ->
ok = emqtt:disconnect(Client4). ok = emqtt:disconnect(Client4).
t_batch_subscribe(init, Config) -> t_batch_subscribe(init, Config) ->
emqx_config:put_listener_conf(default, mqtt_tcp, [acl, enable], true), emqx_config:put_zone_conf(default, [acl, enable], true),
emqx_config:put_listener_conf(default, mqtt_quic, [acl, enable], true), emqx_config:put_zone_conf(default, [acl, enable], true),
ok = meck:new(emqx_access_control, [non_strict, passthrough, no_history, no_link]), ok = meck:new(emqx_access_control, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_access_control, authorize, fun(_, _, _) -> deny end), meck:expect(emqx_access_control, authorize, fun(_, _, _) -> deny end),
Config; Config;
t_batch_subscribe('end', _Config) -> t_batch_subscribe('end', _Config) ->
emqx_config:put_listener_conf(default, mqtt_tcp, [acl, enable], false), emqx_config:put_zone_conf(default, [acl, enable], false),
emqx_config:put_listener_conf(default, mqtt_quic, [acl, enable], false), emqx_config:put_zone_conf(default, [acl, enable], false),
meck:unload(emqx_access_control). meck:unload(emqx_access_control).
t_batch_subscribe(Config) -> t_batch_subscribe(Config) ->
@ -288,22 +288,22 @@ t_connect_will_retain(Config) ->
t_connect_idle_timeout(_Config) -> t_connect_idle_timeout(_Config) ->
IdleTimeout = 2000, IdleTimeout = 2000,
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, idle_timeout], IdleTimeout), emqx_config:put_zone_conf(default, [mqtt, idle_timeout], IdleTimeout),
emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, idle_timeout], IdleTimeout), emqx_config:put_zone_conf(default, [mqtt, idle_timeout], IdleTimeout),
{ok, Sock} = emqtt_sock:connect({127,0,0,1}, 1883, [], 60000), {ok, Sock} = emqtt_sock:connect({127,0,0,1}, 1883, [], 60000),
timer:sleep(IdleTimeout), timer:sleep(IdleTimeout),
?assertMatch({error, closed}, emqtt_sock:recv(Sock,1024)). ?assertMatch({error, closed}, emqtt_sock:recv(Sock,1024)).
t_connect_emit_stats_timeout(init, Config) -> t_connect_emit_stats_timeout(init, Config) ->
NewIdleTimeout = 1000, NewIdleTimeout = 1000,
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, idle_timeout], NewIdleTimeout), emqx_config:put_zone_conf(default, [mqtt, idle_timeout], NewIdleTimeout),
emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, idle_timeout], NewIdleTimeout), emqx_config:put_zone_conf(default, [mqtt, idle_timeout], NewIdleTimeout),
ok = snabbkaffe:start_trace(), ok = snabbkaffe:start_trace(),
[{idle_timeout, NewIdleTimeout} | Config]; [{idle_timeout, NewIdleTimeout} | Config];
t_connect_emit_stats_timeout('end', _Config) -> t_connect_emit_stats_timeout('end', _Config) ->
snabbkaffe:stop(), snabbkaffe:stop(),
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, idle_timeout], 15000), emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 15000),
emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, idle_timeout], 15000), emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 15000),
ok. ok.
t_connect_emit_stats_timeout(Config) -> t_connect_emit_stats_timeout(Config) ->
@ -471,8 +471,8 @@ t_connack_session_present(Config) ->
t_connack_max_qos_allowed(init, Config) -> t_connack_max_qos_allowed(init, Config) ->
Config; Config;
t_connack_max_qos_allowed('end', _Config) -> t_connack_max_qos_allowed('end', _Config) ->
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_qos_allowed], 2), emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 2),
emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, max_qos_allowed], 2), emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 2),
ok. ok.
t_connack_max_qos_allowed(Config) -> t_connack_max_qos_allowed(Config) ->
ConnFun = ?config(conn_fun, Config), ConnFun = ?config(conn_fun, Config),
@ -480,8 +480,8 @@ t_connack_max_qos_allowed(Config) ->
Topic = nth(1, ?TOPICS), Topic = nth(1, ?TOPICS),
%% max_qos_allowed = 0 %% max_qos_allowed = 0
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_qos_allowed], 0), emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 0),
emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, max_qos_allowed], 0), emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 0),
{ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]), {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, Connack1} = emqtt:ConnFun(Client1), {ok, Connack1} = emqtt:ConnFun(Client1),
@ -506,8 +506,8 @@ t_connack_max_qos_allowed(Config) ->
waiting_client_process_exit(Client2), waiting_client_process_exit(Client2),
%% max_qos_allowed = 1 %% max_qos_allowed = 1
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_qos_allowed], 1), emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 1),
emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, max_qos_allowed], 1), emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 1),
{ok, Client3} = emqtt:start_link([{proto_ver, v5} | Config]), {ok, Client3} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, Connack3} = emqtt:ConnFun(Client3), {ok, Connack3} = emqtt:ConnFun(Client3),
@ -532,8 +532,8 @@ t_connack_max_qos_allowed(Config) ->
waiting_client_process_exit(Client4), waiting_client_process_exit(Client4),
%% max_qos_allowed = 2 %% max_qos_allowed = 2
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_qos_allowed], 2), emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 2),
emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, max_qos_allowed], 2), emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 2),
{ok, Client5} = emqtt:start_link([{proto_ver, v5} | Config]), {ok, Client5} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, Connack5} = emqtt:ConnFun(Client5), {ok, Connack5} = emqtt:ConnFun(Client5),