diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index ee615b944..8c59f10c9 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -31,8 +31,8 @@ -export([ info/1 , info/2 + , get_mqtt_conf/2 , get_mqtt_conf/3 - , get_mqtt_conf/4 , set_conn_state/2 , get_session/1 , set_session/2 @@ -206,11 +206,11 @@ init(ConnInfo = #{peername := {PeerHost, _Port}, sockname := {_Host, SockPort}}, #{zone := Zone, listener := Listener}) -> Peercert = maps:get(peercert, ConnInfo, undefined), Protocol = maps:get(protocol, ConnInfo, mqtt), - MountPoint = case get_mqtt_conf(Zone, Listener, mountpoint) of + MountPoint = case get_mqtt_conf(Zone, mountpoint) of <<>> -> undefined; MP -> MP end, - QuotaPolicy = emqx_config:get_zone_conf(Zone, [rate_limit, quota]), + QuotaPolicy = emqx_config:get_listener_conf(Zone, Listener,[rate_limit, quota], []), ClientInfo = set_peercert_infos( Peercert, #{zone => Zone, @@ -249,11 +249,11 @@ set_peercert_infos(NoSSL, ClientInfo, _, _) NoSSL =:= undefined -> ClientInfo#{username => undefined}; -set_peercert_infos(Peercert, ClientInfo, Zone, Listener) -> +set_peercert_infos(Peercert, ClientInfo, Zone, _Listener) -> {DN, CN} = {esockd_peercert:subject(Peercert), esockd_peercert:common_name(Peercert)}, PeercetAs = fun(Key) -> - case get_mqtt_conf(Zone, Listener, Key) of + case get_mqtt_conf(Zone, Key) of cn -> CN; dn -> DN; crt -> Peercert; @@ -426,7 +426,7 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S end; handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), - Channel = #channel{clientinfo = ClientInfo = #{zone := Zone, listener := Listener}}) -> + Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> case emqx_packet:check(Packet) of ok -> TopicFilters0 = parse_topic_filters(TopicFilters), @@ -538,7 +538,7 @@ process_connect(AckProps, Channel = #channel{conninfo = ConnInfo, %%-------------------------------------------------------------------- process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), - Channel = #channel{clientinfo = #{zone := Zone, listener := Listener}}) -> + Channel = #channel{clientinfo = #{zone := Zone}}) -> case pipeline([fun check_quota_exceeded/2, fun process_alias/2, fun check_pub_alias/2, @@ -995,7 +995,7 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) handle_info({sock_closed, Reason}, Channel = #channel{conn_state = ConnState, - clientinfo = ClientInfo = #{zone := Zone, listener := Listener}}) + clientinfo = ClientInfo = #{zone := Zone}}) when ConnState =:= connected orelse ConnState =:= reauthenticating -> emqx_config:get_zone_conf(Zone, [flapping_detect, enable]) andalso emqx_flapping:detect(ClientInfo), @@ -1158,9 +1158,9 @@ enrich_conninfo(ConnPkt = #mqtt_packet_connect{ username = Username }, Channel = #channel{conninfo = ConnInfo, - clientinfo = #{zone := Zone, listener := Listener} + clientinfo = #{zone := Zone} }) -> - ExpiryInterval = expiry_interval(Zone, Listener, ConnPkt), + ExpiryInterval = expiry_interval(Zone, ConnPkt), NConnInfo = ConnInfo#{proto_name => ProtoName, proto_ver => ProtoVer, clean_start => CleanStart, @@ -1169,21 +1169,21 @@ enrich_conninfo(ConnPkt = #mqtt_packet_connect{ username => Username, conn_props => ConnProps, expiry_interval => ExpiryInterval, - receive_maximum => receive_maximum(Zone, Listener, ConnProps) + receive_maximum => receive_maximum(Zone, ConnProps) }, {ok, Channel#channel{conninfo = NConnInfo}}. %% If the Session Expiry Interval is absent the value 0 is used. -expiry_interval(_, _, #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5, +expiry_interval(_, #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5, properties = ConnProps}) -> emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0); -expiry_interval(Zone, Listener, #mqtt_packet_connect{clean_start = false}) -> - get_mqtt_conf(Zone, Listener, session_expiry_interval); -expiry_interval(_, _, #mqtt_packet_connect{clean_start = true}) -> +expiry_interval(Zone, #mqtt_packet_connect{clean_start = false}) -> + get_mqtt_conf(Zone, session_expiry_interval); +expiry_interval(_, #mqtt_packet_connect{clean_start = true}) -> 0. -receive_maximum(Zone, Listener, ConnProps) -> - MaxInflightConfig = case get_mqtt_conf(Zone, Listener, max_inflight) of +receive_maximum(Zone, ConnProps) -> + MaxInflightConfig = case get_mqtt_conf(Zone, max_inflight) of 0 -> ?RECEIVE_MAXIMUM_LIMIT; N -> N end, @@ -1232,9 +1232,9 @@ set_bridge_mode(_ConnPkt, _ClientInfo) -> ok. maybe_username_as_clientid(_ConnPkt, ClientInfo = #{username := undefined}) -> {ok, ClientInfo}; -maybe_username_as_clientid(_ConnPkt, ClientInfo = #{zone := Zone, listener := Listener, +maybe_username_as_clientid(_ConnPkt, ClientInfo = #{zone := Zone, username := Username}) -> - case get_mqtt_conf(Zone, Listener, use_username_as_clientid) of + case get_mqtt_conf(Zone, use_username_as_clientid) of true -> {ok, ClientInfo#{clientid => Username}}; false -> ok end. @@ -1477,9 +1477,8 @@ put_subid_in_subopts(_Properties, TopicFilters) -> TopicFilters. enrich_subopts(SubOpts, _Channel = ?IS_MQTT_V5) -> SubOpts; -enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge, - listener := Listener}}) -> - NL = flag(get_mqtt_conf(Zone, Listener, ignore_loop_deliver)), +enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge}}) -> + NL = flag(get_mqtt_conf(Zone, ignore_loop_deliver)), SubOpts#{rap => flag(IsBridge), nl => NL}. %%-------------------------------------------------------------------- @@ -1515,8 +1514,8 @@ enrich_connack_caps(AckProps, _Channel) -> AckProps. %%-------------------------------------------------------------------- %% Enrich server keepalive -enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone, listener := Listener}}) -> - case get_mqtt_conf(Zone, Listener, server_keepalive) of +enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone}}) -> + case get_mqtt_conf(Zone, server_keepalive) of disabled -> AckProps; Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive} end. @@ -1525,11 +1524,11 @@ enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone, listener %% Enrich response information enrich_response_information(AckProps, #channel{conninfo = #{conn_props := ConnProps}, - clientinfo = #{zone := Zone, listener := Listener}}) -> + clientinfo = #{zone := Zone}}) -> case emqx_mqtt_props:get('Request-Response-Information', ConnProps, 0) of 0 -> AckProps; 1 -> AckProps#{'Response-Information' => - case get_mqtt_conf(Zone, Listener, response_information, "") of + case get_mqtt_conf(Zone, response_information, "") of "" -> undefined; RspInfo -> RspInfo end} @@ -1581,9 +1580,8 @@ ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) -> ensure_keepalive_timer(0, Channel) -> Channel; ensure_keepalive_timer(disabled, Channel) -> Channel; -ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone, - listener := Listener}}) -> - Backoff = get_mqtt_conf(Zone, Listener, keepalive_backoff), +ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) -> + Backoff = get_mqtt_conf(Zone, keepalive_backoff), Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)), ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). @@ -1624,7 +1622,7 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) -> %%-------------------------------------------------------------------- %% Is ACL enabled? -is_acl_enabled(#{zone := Zone, listener := Listener, is_superuser := IsSuperuser}) -> +is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) -> (not IsSuperuser) andalso emqx_config:get_zone_conf(Zone, [acl, enable]). %%-------------------------------------------------------------------- @@ -1736,10 +1734,10 @@ sp(false) -> 0. flag(true) -> 1; flag(false) -> 0. -get_mqtt_conf(Zone, Listener, Key) -> +get_mqtt_conf(Zone, Key) -> emqx_config:get_zone_conf(Zone, [mqtt, Key]). -get_mqtt_conf(Zone, Listener, Key, Default) -> +get_mqtt_conf(Zone, Key, Default) -> emqx_config:get_zone_conf(Zone, [mqtt, Key], Default). %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 5d4e36f25..21ba5231e 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -269,7 +269,7 @@ init_state(Transport, Socket, #{zone := Zone, listener := Listener} = Opts) -> true -> undefined; false -> disabled end, - IdleTimeout = emqx_channel:get_mqtt_conf(Zone, Listener, idle_timeout), + IdleTimeout = emqx_channel:get_mqtt_conf(Zone, idle_timeout), IdleTimer = start_timer(IdleTimeout, idle_timeout), #state{transport = Transport, socket = Socket, @@ -908,5 +908,5 @@ get_state(Pid) -> get_active_n(Zone, Listener) -> case emqx_config:get([zones, Zone, listeners, Listener, type]) of quic -> 100; - _ -> emqx_config:get_zone_conf(Zone, [tcp, active_n]) + _ -> emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) end. diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 723295b79..540076eaf 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -244,7 +244,7 @@ check_origin_header(Req, #{zone := Zone, listener := Listener} = Opts) -> websocket_init([Req, #{zone := Zone, listener := Listener} = Opts]) -> {Peername, Peercert} = - case emqx_config:get_zone_conf(Zone, [proxy_protocol]) andalso + case emqx_config:get_listener_conf(Zone, Listener, [proxy_protocol]) andalso maps:get(proxy_header, Req) of #{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} -> SourceName = {SrcAddr, SrcPort}, @@ -296,7 +296,7 @@ websocket_init([Req, #{zone := Zone, listener := Listener} = Opts]) -> false -> disabled end, %% MQTT Idle Timeout - IdleTimeout = emqx_channel:get_mqtt_conf(Zone, Listener, idle_timeout), + IdleTimeout = emqx_channel:get_mqtt_conf(Zone, idle_timeout), IdleTimer = start_timer(IdleTimeout, idle_timeout), case emqx_config:get_zone_conf(emqx_channel:info(zone, Channel), [force_shutdown]) of @@ -372,7 +372,7 @@ websocket_info({check_gc, Stats}, State) -> websocket_info(Deliver = {deliver, _Topic, _Msg}, State = #state{zone = Zone, listener = Listener}) -> - ActiveN = emqx_config:get_zone_conf(Zone, [tcp, active_n]), + ActiveN = emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]), Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); @@ -564,7 +564,7 @@ handle_incoming(Packet, State = #state{zone = Zone, listener = Listener}) ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), ok = inc_incoming_stats(Packet), NState = case emqx_pd:get_counter(incoming_pubs) > - emqx_config:get_zone_conf(Zone, [tcp, active_n]) of + emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) of true -> postpone({cast, rate_limit}, State); false -> State end, @@ -601,7 +601,7 @@ handle_outgoing(Packets, State = #state{mqtt_piggyback = MQTTPiggyback, Oct = iolist_size(IoData), ok = inc_sent_stats(length(Packets), Oct), NState = case emqx_pd:get_counter(outgoing_pubs) > - emqx_config:get_zone_conf(Zone, [tcp, active_n]) of + emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) of true -> Stats = #{cnt => emqx_pd:reset_counter(outgoing_pubs), oct => emqx_pd:reset_counter(outgoing_bytes) @@ -789,4 +789,4 @@ set_field(Name, Value, State) -> setelement(Pos+1, State, Value). get_ws_opts(Zone, Listener, Key) -> - emqx_config:get_zone_conf(Zone, [websocket, Key]). + emqx_config:get_listener_conf(Zone, Listener, [websocket, Key]).