diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index 2179fba90..b4b1393d6 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -870,7 +870,7 @@ zones.default { ## received. ## ## @doc zones..mqtt.idle_timeout - ## ValueType: Duration | infinity + ## ValueType: Duration ## Default: 15s idle_timeout: 15s diff --git a/apps/emqx/src/emqx_access_control.erl b/apps/emqx/src/emqx_access_control.erl index 1ef885ed5..a2a11bd15 100644 --- a/apps/emqx/src/emqx_access_control.erl +++ b/apps/emqx/src/emqx_access_control.erl @@ -32,14 +32,9 @@ %%-------------------------------------------------------------------- -spec(authenticate(emqx_types:clientinfo()) -> {ok, result()} | {error, term()}). -authenticate(ClientInfo = #{zone := Zone}) -> - AuthResult = default_auth_result(Zone), - case emqx_zone:get_env(Zone, bypass_auth_plugins, false) of - true -> - return_auth_result(AuthResult); - false -> - return_auth_result(run_hooks('client.authenticate', [ClientInfo], AuthResult)) - end. +authenticate(ClientInfo = #{zone := Zone, listener := Listener}) -> + AuthResult = default_auth_result(Zone, Listener), + return_auth_result(run_hooks('client.authenticate', [ClientInfo], AuthResult)). %% @doc Check ACL -spec(check_acl(emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic()) @@ -59,17 +54,16 @@ check_acl_cache(ClientInfo, PubSub, Topic) -> AclResult -> AclResult end. -do_check_acl(ClientInfo = #{zone := Zone}, PubSub, Topic) -> - Default = emqx_zone:get_env(Zone, acl_nomatch, deny), - case run_hooks('client.check_acl', [ClientInfo, PubSub, Topic], Default) of +do_check_acl(ClientInfo, PubSub, Topic) -> + case run_hooks('client.check_acl', [ClientInfo, PubSub, Topic], allow) of allow -> allow; _Other -> deny end. -default_auth_result(Zone) -> - case emqx_zone:get_env(Zone, allow_anonymous, false) of - true -> #{auth_result => success, anonymous => true}; - false -> #{auth_result => not_authorized, anonymous => false} +default_auth_result(Zone, Listener) -> + case emqx_config:get_listener_conf(Zone, Listener, [auth, enable]) of + false -> #{auth_result => success, anonymous => true}; + true -> #{auth_result => not_authorized, anonymous => false} end. -compile({inline, [run_hooks/3]}). diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index e3cbff692..bbc48e709 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -31,6 +31,8 @@ -export([ info/1 , info/2 + , get_mqtt_conf/3 + , get_mqtt_conf/4 , set_conn_state/2 , get_session/1 , set_session/2 @@ -63,7 +65,7 @@ , maybe_apply/2 ]). --export_type([channel/0]). +-export_type([channel/0, opts/0]). -record(channel, { %% MQTT ConnInfo @@ -98,6 +100,8 @@ -type(channel() :: #channel{}). +-type(opts() :: #{zone := atom(), listener := atom(), atom() => term()}). + -type(conn_state() :: idle | connecting | connected | disconnected). -type(reply() :: {outgoing, emqx_types:packet()} @@ -151,7 +155,9 @@ info(connected_at, #channel{conninfo = ConnInfo}) -> info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; info(zone, #channel{clientinfo = ClientInfo}) -> - maps:get(zone, ClientInfo, undefined); + maps:get(zone, ClientInfo); +info(listener, #channel{clientinfo = ClientInfo}) -> + maps:get(listener, ClientInfo); info(clientid, #channel{clientinfo = ClientInfo}) -> maps:get(clientid, ClientInfo, undefined); info(username, #channel{clientinfo = ClientInfo}) -> @@ -195,17 +201,20 @@ caps(#channel{clientinfo = #{zone := Zone}}) -> %% Init the channel %%-------------------------------------------------------------------- --spec(init(emqx_types:conninfo(), proplists:proplist()) -> channel()). +-spec(init(emqx_types:conninfo(), opts()) -> channel()). init(ConnInfo = #{peername := {PeerHost, _Port}, - sockname := {_Host, SockPort}}, Options) -> - Zone = proplists:get_value(zone, Options), + sockname := {_Host, SockPort}}, #{zone := Zone, listener := Listener}) -> Peercert = maps:get(peercert, ConnInfo, undefined), Protocol = maps:get(protocol, ConnInfo, mqtt), - MountPoint = emqx_zone:mountpoint(Zone), - QuotaPolicy = emqx_zone:quota_policy(Zone), - ClientInfo = setting_peercert_infos( + MountPoint = case get_mqtt_conf(Zone, Listener, mountpoint) of + "" -> undefined; + MP -> MP + end, + QuotaPolicy = emqx_config:get_listener_conf(Zone, Listener, [rate_limit, quota]), + ClientInfo = set_peercert_infos( Peercert, #{zone => Zone, + listener => Listener, protocol => Protocol, peerhost => PeerHost, sockport => SockPort, @@ -214,7 +223,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}, mountpoint => MountPoint, is_bridge => false, is_superuser => false - }, Options), + }, Zone, Listener), {NClientInfo, NConnInfo} = take_ws_cookie(ClientInfo, ConnInfo), #channel{conninfo = NConnInfo, clientinfo = NClientInfo, @@ -222,7 +231,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}, outbound => #{} }, auth_cache = #{}, - quota = emqx_limiter:init(Zone, QuotaPolicy), + quota = emqx_limiter:init(Zone, quota_policy(QuotaPolicy)), timers = #{}, conn_state = idle, takeover = false, @@ -230,30 +239,34 @@ init(ConnInfo = #{peername := {PeerHost, _Port}, pendings = [] }. -setting_peercert_infos(NoSSL, ClientInfo, _Options) +quota_policy(RawPolicy) -> + [{Name, {list_to_integer(StrCount), + erlang:trunc(hocon_postprocess:duration(StrWind) / 1000)}} + || {Name, [StrCount, StrWind]} <- maps:to_list(RawPolicy)]. + +set_peercert_infos(NoSSL, ClientInfo, _, _) when NoSSL =:= nossl; NoSSL =:= undefined -> ClientInfo#{username => undefined}; -setting_peercert_infos(Peercert, ClientInfo, Options) -> +set_peercert_infos(Peercert, ClientInfo, Zone, Listener) -> {DN, CN} = {esockd_peercert:subject(Peercert), esockd_peercert:common_name(Peercert)}, - Username = peer_cert_as(peer_cert_as_username, Options, Peercert, DN, CN), - ClientId = peer_cert_as(peer_cert_as_clientid, Options, Peercert, DN, CN), - ClientInfo#{username => Username, clientid => ClientId, dn => DN, cn => CN}. - --dialyzer([{nowarn_function, [peer_cert_as/5]}]). -% esockd_peercert:peercert is opaque -% https://github.com/emqx/esockd/blob/master/src/esockd_peercert.erl -peer_cert_as(Key, Options, Peercert, DN, CN) -> - case proplists:get_value(Key, Options) of + PeercetAs = fun(Key) -> + % esockd_peercert:peercert is opaque + % https://github.com/emqx/esockd/blob/master/src/esockd_peercert.erl + case get_mqtt_conf(Zone, Listener, Key) of cn -> CN; dn -> DN; crt -> Peercert; pem -> base64:encode(Peercert); md5 -> emqx_passwd:hash(md5, Peercert); _ -> undefined - end. + end + end, + Username = PeercetAs(peer_cert_as_username), + ClientId = PeercetAs(peer_cert_as_clientid), + ClientInfo#{username => Username, clientid => ClientId, dn => DN, cn => CN}. take_ws_cookie(ClientInfo, ConnInfo) -> case maps:take(ws_cookie, ConnInfo) of @@ -403,16 +416,17 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S end; handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), - Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> + Channel = #channel{clientinfo = ClientInfo = #{zone := Zone, listener := Listener}}) -> case emqx_packet:check(Packet) of ok -> TopicFilters0 = parse_topic_filters(TopicFilters), TopicFilters1 = put_subid_in_subopts(Properties, TopicFilters0), TupleTopicFilters0 = check_sub_acls(TopicFilters1, Channel), - case emqx_zone:get_env(Zone, acl_deny_action, ignore) =:= disconnect andalso - lists:any(fun({_TopicFilter, ReasonCode}) -> - ReasonCode =:= ?RC_NOT_AUTHORIZED - end, TupleTopicFilters0) of + HasAclDeny = lists:any(fun({_TopicFilter, ReasonCode}) -> + ReasonCode =:= ?RC_NOT_AUTHORIZED + end, TupleTopicFilters0), + DenyAction = emqx_config:get_listener_conf(Zone, Listener, [acl, deny_action]), + case DenyAction =:= disconnect andalso HasAclDeny of true -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel); false -> Replace = fun @@ -512,7 +526,7 @@ process_connect(AckProps, Channel = #channel{conninfo = ConnInfo, %%-------------------------------------------------------------------- process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), - Channel = #channel{clientinfo = #{zone := Zone}}) -> + Channel = #channel{clientinfo = #{zone := Zone, listener := Listener}}) -> case pipeline([fun check_quota_exceeded/2, fun process_alias/2, fun check_pub_alias/2, @@ -525,7 +539,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), {error, Rc = ?RC_NOT_AUTHORIZED, NChannel} -> ?LOG(warning, "Cannot publish message to ~s due to ~s.", [Topic, emqx_reason_codes:text(Rc)]), - case emqx_zone:get_env(Zone, acl_deny_action, ignore) of + case emqx_config:get_listener_conf(Zone, Listener, [acl_deny_action]) of ignore -> case QoS of ?QOS_0 -> {ok, NChannel}; @@ -968,8 +982,8 @@ handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connected, - clientinfo = ClientInfo = #{zone := Zone}}) -> - emqx_zone:enable_flapping_detect(Zone) + clientinfo = ClientInfo = #{zone := Zone, listener := Listener}}) -> + emqx_config:get_listener_conf(Zone, Listener, [flapping_detect, enable]) andalso emqx_flapping:detect(ClientInfo), Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)), case maybe_shutdown(Reason, Channel1) of @@ -1130,9 +1144,9 @@ enrich_conninfo(ConnPkt = #mqtt_packet_connect{ username = Username }, Channel = #channel{conninfo = ConnInfo, - clientinfo = #{zone := Zone} + clientinfo = #{zone := Zone, listener := Listener} }) -> - ExpiryInterval = expiry_interval(Zone, ConnPkt), + ExpiryInterval = expiry_interval(Zone, Listener, ConnPkt), NConnInfo = ConnInfo#{proto_name => ProtoName, proto_ver => ProtoVer, clean_start => CleanStart, @@ -1141,22 +1155,21 @@ enrich_conninfo(ConnPkt = #mqtt_packet_connect{ username => Username, conn_props => ConnProps, expiry_interval => ExpiryInterval, - receive_maximum => receive_maximum(Zone, ConnProps) + receive_maximum => receive_maximum(Zone, Listener, ConnProps) }, {ok, Channel#channel{conninfo = NConnInfo}}. %% If the Session Expiry Interval is absent the value 0 is used. --compile({inline, [expiry_interval/2]}). -expiry_interval(_Zone, #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5, +expiry_interval(_, _, #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5, properties = ConnProps}) -> emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0); -expiry_interval(Zone, #mqtt_packet_connect{clean_start = false}) -> - emqx_zone:session_expiry_interval(Zone); -expiry_interval(_Zone, #mqtt_packet_connect{clean_start = true}) -> +expiry_interval(Zone, Listener, #mqtt_packet_connect{clean_start = false}) -> + get_mqtt_conf(Zone, Listener, session_expiry_interval); +expiry_interval(_, _, #mqtt_packet_connect{clean_start = true}) -> 0. -receive_maximum(Zone, ConnProps) -> - MaxInflightConfig = case emqx_zone:max_inflight(Zone) of +receive_maximum(Zone, Listener, ConnProps) -> + MaxInflightConfig = case get_mqtt_conf(Zone, Listener, max_inflight) of 0 -> ?RECEIVE_MAXIMUM_LIMIT; N -> N end, @@ -1205,8 +1218,9 @@ set_bridge_mode(_ConnPkt, _ClientInfo) -> ok. maybe_username_as_clientid(_ConnPkt, ClientInfo = #{username := undefined}) -> {ok, ClientInfo}; -maybe_username_as_clientid(_ConnPkt, ClientInfo = #{zone := Zone, username := Username}) -> - case emqx_zone:use_username_as_clientid(Zone) of +maybe_username_as_clientid(_ConnPkt, ClientInfo = #{zone := Zone, listener := Listener, + username := Username}) -> + case get_mqtt_conf(Zone, Listener, use_username_as_clientid) of true -> {ok, ClientInfo#{clientid => Username}}; false -> ok end. @@ -1234,8 +1248,8 @@ set_log_meta(_ConnPkt, #channel{clientinfo = #{clientid := ClientId}}) -> %%-------------------------------------------------------------------- %% Check banned -check_banned(_ConnPkt, #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> - case emqx_zone:enable_ban(Zone) andalso emqx_banned:check(ClientInfo) of +check_banned(_ConnPkt, #channel{clientinfo = ClientInfo}) -> + case emqx_banned:check(ClientInfo) of true -> {error, ?RC_BANNED}; false -> ok end. @@ -1463,8 +1477,9 @@ put_subid_in_subopts(_Properties, TopicFilters) -> TopicFilters. enrich_subopts(SubOpts, _Channel = ?IS_MQTT_V5) -> SubOpts; -enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge}}) -> - NL = flag(emqx_zone:ignore_loop_deliver(Zone)), +enrich_subopts(SubOpts, #channel{clientinfo = #{zone := Zone, is_bridge := IsBridge, + listener := Listener}}) -> + NL = flag(get_mqtt_conf(Zone, Listener, ignore_loop_deliver)), SubOpts#{rap => flag(IsBridge), nl => NL}. %%-------------------------------------------------------------------- @@ -1499,8 +1514,8 @@ enrich_connack_caps(AckProps, _Channel) -> AckProps. %%-------------------------------------------------------------------- %% Enrich server keepalive -enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone}}) -> - case emqx_zone:server_keepalive(Zone) of +enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone, listener := Listener}}) -> + case get_mqtt_conf(Zone, Listener, server_keepalive) of undefined -> AckProps; Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive} end. @@ -1509,10 +1524,14 @@ enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone}}) -> %% Enrich response information enrich_response_information(AckProps, #channel{conninfo = #{conn_props := ConnProps}, - clientinfo = #{zone := Zone}}) -> + clientinfo = #{zone := Zone, listener := Listener}}) -> case emqx_mqtt_props:get('Request-Response-Information', ConnProps, 0) of 0 -> AckProps; - 1 -> AckProps#{'Response-Information' => emqx_zone:response_information(Zone)} + 1 -> AckProps#{'Response-Information' => + case get_mqtt_conf(Zone, Listener, response_information, "") of + "" -> undefined; + RspInfo -> RspInfo + end} end. %%-------------------------------------------------------------------- @@ -1559,9 +1578,10 @@ ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel = #channel{conninfo ensure_keepalive(_AckProps, Channel = #channel{conninfo = ConnInfo}) -> ensure_keepalive_timer(maps:get(keepalive, ConnInfo), Channel). -ensure_keepalive_timer(0, Channel) -> Channel; -ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) -> - Backoff = emqx_zone:keepalive_backoff(Zone), +ensure_keepalive_timer(disabled, Channel) -> Channel; +ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone, + listener := Listener}}) -> + Backoff = get_mqtt_conf(Zone, Listener, keepalive_backoff), Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)), ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). @@ -1604,8 +1624,8 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) -> %% Is ACL enabled? -compile({inline, [is_acl_enabled/1]}). -is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) -> - (not IsSuperuser) andalso emqx_zone:enable_acl(Zone). +is_acl_enabled(#{zone := Zone, listener := Listener, is_superuser := IsSuperuser}) -> + (not IsSuperuser) andalso emqx_config:get_listener_conf(Zone, Listener, [acl, enable]). %%-------------------------------------------------------------------- %% Parse Topic Filters @@ -1715,6 +1735,12 @@ sp(false) -> 0. flag(true) -> 1; flag(false) -> 0. +get_mqtt_conf(Zone, Listener, Key) -> + emqx_config:get_listener_conf(Zone, Listener, [mqtt, Key]). + +get_mqtt_conf(Zone, Listener, Key, Default) -> + emqx_config:get_listener_conf(Zone, Listener, [mqtt, Key], Default). + %%-------------------------------------------------------------------- %% For CT tests %%-------------------------------------------------------------------- @@ -1722,4 +1748,3 @@ flag(false) -> 0. set_field(Name, Value, Channel) -> Pos = emqx_misc:index_of(Name, record_info(fields, channel)), setelement(Pos+1, Channel, Value). - diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 1cbc36e05..fb8fe241b 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -108,7 +108,6 @@ }). -type(state() :: #state{}). --type(opts() :: #{zone := atom(), listener := atom(), atom() => term()}). -define(ACTIVE_N, 100). -define(INFO_KEYS, [socktype, peername, sockname, sockstate]). @@ -137,7 +136,7 @@ , system_code_change/4 ]}). --spec(start_link(esockd:transport(), esockd:socket(), opts()) +-spec(start_link(esockd:transport(), esockd:socket(), emqx_channel:opts()) -> {ok, pid()}). start_link(Transport, Socket, Options) -> Args = [self(), Transport, Socket, Options], @@ -256,18 +255,23 @@ init_state(Transport, Socket, Options) -> }, Zone = maps:get(zone, Options), Listener = maps:get(listener, Options), - - PubLimit = emqx_zone:publish_limit(Zone), - BytesIn = proplists:get_value(rate_limit, Options), - RateLimit = emqx_zone:ratelimit(Zone), - Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit), - FrameOpts = emqx_zone:mqtt_frame_options(Zone), + Limiter = emqx_limiter:init(Zone, undefined, undefined, []), + FrameOpts = #{ + strict_mode => emqx_config:get_listener_conf(Zone, Listener, [mqtt, strict_mode]), + max_size => emqx_config:get_listener_conf(Zone, Listener, [mqtt, max_packet_size]) + }, ParseState = emqx_frame:initial_parse_state(FrameOpts), Serialize = emqx_frame:serialize_opts(), Channel = emqx_channel:init(ConnInfo, Options), - GcState = emqx_zone:init_gc_state(Zone), - StatsTimer = emqx_zone:stats_timer(Zone), - IdleTimeout = emqx_zone:idle_timeout(Zone), + GcState = case emqx_config:get_listener_conf(Zone, Listener, [force_gc]) of + #{enable := false} -> undefined; + GcPolicy -> emqx_gc:init(GcPolicy) + end, + StatsTimer = case emqx_config:get_listener_conf(Zone, Listener, [stats, enable]) of + true -> undefined; + false -> disabled + end, + IdleTimeout = emqx_channel:get_mqtt_conf(Zone, Listener, idle_timeout), IdleTimer = start_timer(IdleTimeout, idle_timeout), #state{transport = Transport, socket = Socket, @@ -291,8 +295,11 @@ run_loop(Parent, State = #state{transport = Transport, peername = Peername, channel = Channel}) -> emqx_logger:set_metadata_peername(esockd:format(Peername)), - emqx_misc:tune_heap_size(emqx_zone:oom_policy( - emqx_channel:info(zone, Channel))), + case emqx_config:get_listener_conf(emqx_channel:info(zone, Channel), + emqx_channel:info(listener, Channel), [force_shutdown]) of + #{enable := false} -> ok; + ShutdownPolicy -> emqx_misc:tune_heap_size(ShutdownPolicy) + end, case activate_socket(State) of {ok, NState} -> hibernate(Parent, NState); {error, Reason} -> @@ -783,15 +790,18 @@ run_gc(Stats, State = #state{gc_state = GcSt}) -> end. check_oom(State = #state{channel = Channel}) -> - Zone = emqx_channel:info(zone, Channel), - OomPolicy = emqx_zone:oom_policy(Zone), - ?tp(debug, check_oom, #{policy => OomPolicy}), - case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of - {shutdown, Reason} -> - %% triggers terminate/2 callback immediately - erlang:exit({shutdown, Reason}); - _Other -> - ok + ShutdownPolicy = emqx_config:get_listener_conf(emqx_channel:info(zone, Channel), + emqx_channel:info(listener, Channel), [force_shutdown]), + ?tp(debug, check_oom, #{policy => ShutdownPolicy}), + case ShutdownPolicy of + #{enable := false} -> ok; + ShutdownPolicy -> + case emqx_misc:check_oom(ShutdownPolicy) of + {shutdown, Reason} -> + %% triggers terminate/2 callback immediately + erlang:exit({shutdown, Reason}); + _ -> ok + end end, State. diff --git a/apps/emqx/src/emqx_frame.erl b/apps/emqx/src/emqx_frame.erl index 37063c65f..1737e8791 100644 --- a/apps/emqx/src/emqx_frame.erl +++ b/apps/emqx/src/emqx_frame.erl @@ -34,6 +34,9 @@ , serialize/2 ]). +-export([ set_opts/2 + ]). + -export_type([ options/0 , parse_state/0 , parse_result/0 @@ -81,11 +84,11 @@ initial_parse_state() -> -spec(initial_parse_state(options()) -> {none, options()}). initial_parse_state(Options) when is_map(Options) -> - ?none(merge_opts(Options)). + ?none(maps:merge(?DEFAULT_OPTIONS, Options)). -%% @pivate -merge_opts(Options) -> - maps:merge(?DEFAULT_OPTIONS, Options). +-spec set_opts(parse_state(), options()) -> parse_state(). +set_opts({_, OldOpts}, Opts) -> + maps:merge(OldOpts, Opts). %%-------------------------------------------------------------------- %% Parse MQTT Frame diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index b9830578a..9daf6c79e 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -68,7 +68,8 @@ console_print(_Fmt, _Args) -> ok. -> {ok, pid()} | {error, term()}). do_start_listener(ZoneName, ListenerName, #{type := tcp, bind := ListenOn} = Opts) -> esockd:open(listener_id(ZoneName, ListenerName), ListenOn, merge_default(esockd_opts(Opts)), - {emqx_connection, start_link, [{ZoneName, ListenerName}]}); + {emqx_connection, start_link, + [#{zone => ZoneName, listener => ListenerName}]}); %% Start MQTT/WS listener do_start_listener(ZoneName, ListenerName, #{type := ws, bind := ListenOn} = Opts) -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 616f65bfd..98488edf8 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -59,7 +59,7 @@ -export([includes/0]). structs() -> ["cluster", "node", "rpc", "log", "lager", - "acl", "mqtt", "zones", "listeners", "module", "broker", + "zones", "listeners", "module", "broker", "plugins", "sysmon", "alarm", "telemetry"] ++ includes(). @@ -244,7 +244,7 @@ fields("acl_cache") -> ]; fields("mqtt") -> - [ {"mountpoint", t(binary(), undefined, <<"">>)} + [ {"mountpoint", t(binary(), undefined, <<>>)} , {"idle_timeout", maybe_infinity(duration(), "15s")} , {"max_packet_size", maybe_infinity(bytesize(), "1MB")} , {"max_clientid_len", t(integer(), undefined, 65535)} @@ -256,7 +256,7 @@ fields("mqtt") -> , {"shared_subscription", t(boolean(), undefined, true)} , {"ignore_loop_deliver", t(boolean())} , {"strict_mode", t(boolean(), undefined, false)} - , {"response_information", t(string(), undefined, undefined)} + , {"response_information", t(string(), undefined, "")} , {"server_keepalive", maybe_disabled(integer())} , {"keepalive_backoff", t(float(), undefined, 0.75)} , {"max_subscriptions", maybe_infinity(integer())} @@ -365,7 +365,7 @@ fields("ws_opts") -> [ {"mqtt_path", t(string(), undefined, "/mqtt")} , {"mqtt_piggyback", t(union(single, multiple), undefined, multiple)} , {"compress", t(boolean())} - , {"idle_timeout", maybe_infinity(duration())} + , {"idle_timeout", t(duration(), undefined, "15s")} , {"max_frame_size", maybe_infinity(integer())} , {"fail_if_no_subprotocol", t(boolean(), undefined, true)} , {"supported_subprotocols", t(string(), undefined,