diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index 251f2d0a5..8dba8b0f8 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -76,7 +76,7 @@ listeners.tcp.default { ## is delivered to the subscriber. The mountpoint is a way that users can use ## to implement isolation of message routing between different listeners. ## - ## For example if a clientA subscribes to "t" with `listeners.tcp..mqtt.mountpoint` + ## For example if a clientA subscribes to "t" with `listeners.tcp..mountpoint` ## set to "some_tenant", then the client accually subscribes to the topic ## "some_tenant/t". Similarly if another clientB (connected to the same listener ## with the clientA) send a message to topic "t", the message is accually route @@ -89,7 +89,7 @@ listeners.tcp.default { ## - %c: clientid ## - %u: username ## - ## @doc listeners.tcp..mqtt.mountpoint + ## @doc listeners.tcp..mountpoint ## ValueType: String ## Default: "" mountpoint = "" @@ -175,7 +175,7 @@ listeners.ssl.default { ## is delivered to the subscriber. The mountpoint is a way that users can use ## to implement isolation of message routing between different listeners. ## - ## For example if a clientA subscribes to "t" with `listeners.ssl..mqtt.mountpoint` + ## For example if a clientA subscribes to "t" with `listeners.ssl..mountpoint` ## set to "some_tenant", then the client accually subscribes to the topic ## "some_tenant/t". Similarly if another clientB (connected to the same listener ## with the clientA) send a message to topic "t", the message is accually route @@ -188,7 +188,7 @@ listeners.ssl.default { ## - %c: clientid ## - %u: username ## - ## @doc listeners.ssl..mqtt.mountpoint + ## @doc listeners.ssl..mountpoint ## ValueType: String ## Default: "" mountpoint = "" @@ -261,7 +261,7 @@ listeners.quic.default { ## is delivered to the subscriber. The mountpoint is a way that users can use ## to implement isolation of message routing between different listeners. ## - ## For example if a clientA subscribes to "t" with `listeners.quic..mqtt.mountpoint` + ## For example if a clientA subscribes to "t" with `listeners.quic..mountpoint` ## set to "some_tenant", then the client accually subscribes to the topic ## "some_tenant/t". Similarly if another clientB (connected to the same listener ## with the clientA) send a message to topic "t", the message is accually route @@ -274,7 +274,7 @@ listeners.quic.default { ## - %c: clientid ## - %u: username ## - ## @doc listeners.quic..mqtt.mountpoint + ## @doc listeners.quic..mountpoint ## ValueType: String ## Default: "" mountpoint = "" @@ -355,7 +355,7 @@ listeners.ws.default { ## is delivered to the subscriber. The mountpoint is a way that users can use ## to implement isolation of message routing between different listeners. ## - ## For example if a clientA subscribes to "t" with `listeners.ws..mqtt.mountpoint` + ## For example if a clientA subscribes to "t" with `listeners.ws..mountpoint` ## set to "some_tenant", then the client accually subscribes to the topic ## "some_tenant/t". Similarly if another clientB (connected to the same listener ## with the clientA) send a message to topic "t", the message is accually route @@ -368,7 +368,7 @@ listeners.ws.default { ## - %c: clientid ## - %u: username ## - ## @doc listeners.ws..mqtt.mountpoint + ## @doc listeners.ws..mountpoint ## ValueType: String ## Default: "" mountpoint = "" @@ -458,7 +458,7 @@ listeners.wss.default { ## is delivered to the subscriber. The mountpoint is a way that users can use ## to implement isolation of message routing between different listeners. ## - ## For example if a clientA subscribes to "t" with `listeners.wss..mqtt.mountpoint` + ## For example if a clientA subscribes to "t" with `listeners.wss..mountpoint` ## set to "some_tenant", then the client accually subscribes to the topic ## "some_tenant/t". Similarly if another clientB (connected to the same listener ## with the clientA) send a message to topic "t", the message is accually route @@ -471,7 +471,7 @@ listeners.wss.default { ## - %c: clientid ## - %u: username ## - ## @doc listeners.wss..mqtt.mountpoint + ## @doc listeners.wss..mountpoint ## ValueType: String ## Default: "" mountpoint = "" diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 59c6447ab..553b038f6 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -202,14 +202,15 @@ caps(#channel{clientinfo = #{zone := Zone}}) -> -spec(init(emqx_types:conninfo(), opts()) -> channel()). init(ConnInfo = #{peername := {PeerHost, _Port}, - sockname := {_Host, SockPort}}, #{zone := Zone, listener := Listener}) -> + sockname := {_Host, SockPort}}, + #{zone := Zone, listener := {Type, Listener}}) -> Peercert = maps:get(peercert, ConnInfo, undefined), Protocol = maps:get(protocol, ConnInfo, mqtt), - MountPoint = case get_mqtt_conf(Zone, mountpoint) of + MountPoint = case emqx_config:get_listener_conf(Type, Listener, [mountpoint]) of <<>> -> undefined; MP -> MP end, - QuotaPolicy = emqx_config:get_listener_conf(Zone, Listener,[rate_limit, quota], []), + QuotaPolicy = emqx_config:get_zone_conf(Zone, [quota], #{}), ClientInfo = set_peercert_infos( Peercert, #{zone => Zone, diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index dcb50fa4e..7d7f215c2 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -102,8 +102,8 @@ idle_timer :: maybe(reference()), %% Zone name zone :: atom(), - %% Listener Name - listener :: atom() + %% Listener Type and Name + listener :: {Type::atom(), Name::atom()} }). -type(state() :: #state{}). @@ -463,15 +463,15 @@ handle_msg({Passive, _Sock}, State) NState1 = check_oom(run_gc(InStats, NState)), handle_info(activate_socket, NState1); -handle_msg(Deliver = {deliver, _Topic, _Msg}, #state{zone = Zone, - listener = Listener} = State) -> - ActiveN = get_active_n(Zone, Listener), +handle_msg(Deliver = {deliver, _Topic, _Msg}, #state{ + listener = {Type, Listener}} = State) -> + ActiveN = get_active_n(Type, Listener), Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); %% Something sent -handle_msg({inet_reply, _Sock, ok}, State = #state{zone = Zone, listener = Listener}) -> - case emqx_pd:get_counter(outgoing_pubs) > get_active_n(Zone, Listener) of +handle_msg({inet_reply, _Sock, ok}, State = #state{listener = {Type, Listener}}) -> + case emqx_pd:get_counter(outgoing_pubs) > get_active_n(Type, Listener) of true -> Pubs = emqx_pd:reset_counter(outgoing_pubs), Bytes = emqx_pd:reset_counter(outgoing_bytes), @@ -820,8 +820,8 @@ activate_socket(State = #state{sockstate = closed}) -> activate_socket(State = #state{sockstate = blocked}) -> {ok, State}; activate_socket(State = #state{transport = Transport, socket = Socket, - zone = Zone, listener = Listener}) -> - ActiveN = get_active_n(Zone, Listener), + listener = {Type, Listener}}) -> + ActiveN = get_active_n(Type, Listener), case Transport:setopts(Socket, [{active, ActiveN}]) of ok -> {ok, State#state{sockstate = running}}; Error -> Error @@ -904,8 +904,6 @@ get_state(Pid) -> maps:from_list(lists:zip(record_info(fields, state), tl(tuple_to_list(State)))). -get_active_n(Zone, Listener) -> - case emqx:get_config([zones, Zone, listeners, Listener, type]) of - quic -> 100; - _ -> emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) - end. +get_active_n(quic, _Listener) -> 100; +get_active_n(Type, Listener) -> + emqx_config:get_listener_conf(Type, Listener, [tcp, active_n]). diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 375a5c990..d5c6c9173 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -134,12 +134,12 @@ stop_listener(ListenerId) -> apply_on_listener(ListenerId, fun stop_listener/3). -spec(stop_listener(atom(), atom(), map()) -> ok | {error, term()}). -stop_listener(Type, ListenerName, #{type := tcp, bind := ListenOn}) -> +stop_listener(Type, ListenerName, #{bind := ListenOn}) when Type == tcp; Type == ssl -> esockd:close(listener_id(Type, ListenerName), ListenOn); -stop_listener(Type, ListenerName, #{type := ws}) -> +stop_listener(Type, ListenerName, _Conf) when Type == ws; Type == wss -> cowboy:stop_listener(listener_id(Type, ListenerName)); -stop_listener(Type, ListenerName, #{type := quic}) -> - quicer:stop_listener(listener_id(Type, ListenerName)). +stop_listener(quic, ListenerName, _Conf) -> + quicer:stop_listener(listener_id(quic, ListenerName)). -ifndef(TEST). console_print(Fmt, Args) -> ?ULOG(Fmt, Args). @@ -156,7 +156,7 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when Type == tcp; Type == ssl -> esockd:open(listener_id(Type, ListenerName), ListenOn, merge_default(esockd_opts(Type, Opts)), {emqx_connection, start_link, - [#{type => Type, listener => ListenerName, + [#{listener => {Type, ListenerName}, zone => zone(Opts)}]}); %% Start MQTT/WS listener @@ -189,8 +189,7 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) -> , peer_unidi_stream_count => 1 , peer_bidi_stream_count => 10 , zone => zone(Opts) - , type => quic - , listener => ListenerName + , listener => {quic, ListenerName} }, StreamOpts = [], quicer:start_listener(listener_id(quic, ListenerName), @@ -201,7 +200,7 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) -> esockd_opts(Type, Opts0) -> Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0), - Opts2 = case emqx_map_lib:deep_get([rate_limit, max_conn_rate], Opts0) of + Opts2 = case emqx_config:get_zone_conf(zone(Opts0), [rate_limit, max_conn_rate]) of infinity -> Opts1; Rate -> Opts1#{max_conn_rate => Rate} end, @@ -213,7 +212,7 @@ esockd_opts(Type, Opts0) -> ws_opts(Type, ListenerName, Opts) -> WsPaths = [{maps:get(mqtt_path, Opts, "/mqtt"), emqx_ws_connection, - #{zone => zone(Opts), type => Type, listener => ListenerName}}], + #{zone => zone(Opts), listener => {Type, ListenerName}}}], Dispatch = cowboy_router:compile([{'_', WsPaths}]), ProxyProto = maps:get(proxy_protocol, Opts, false), #{env => #{dispatch => Dispatch}, proxy_header => ProxyProto}. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 866e14d48..347f9068b 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -72,7 +72,7 @@ structs() -> ["zones", "mqtt", "flapping_detect", "force_shutdown", "force_gc", "conn_congestion", "rate_limit", "quota", "listeners", "broker", "plugins", - "sysmon", "alarm", "authorization"]. + "stats", "sysmon", "alarm", "authorization"]. fields("stats") -> [ {"enable", t(boolean(), undefined, true)} @@ -132,7 +132,6 @@ fields("zone_settings") -> , {"force_shutdown", ref("force_shutdown")} , {"conn_congestion", ref("conn_congestion")} , {"force_gc", ref("force_gc")} - , {"listeners", t("listeners")} ]; fields("rate_limit") -> @@ -348,9 +347,8 @@ base_listener() -> [ {"bind", t(union(ip_port(), integer()))} , {"acceptors", t(integer(), undefined, 16)} , {"max_connections", maybe_infinity(integer(), infinity)} - , {"rate_limit", ref("rate_limit")} , {"mountpoint", t(binary(), undefined, <<>>)} - , {"zone", t(binary(), undefined, undefined)} + , {"zone", t(atom(), undefined, default)} ]. %% utils diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index b76567f6c..32a81c26a 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -85,8 +85,8 @@ idle_timer :: maybe(reference()), %% Zone name zone :: atom(), - %% Listener Name - listener :: atom() + %% Listener Type and Name + listener :: {Type::atom(), Name::atom()} }). -type(state() :: #state{}). @@ -173,12 +173,12 @@ call(WsPid, Req, Timeout) when is_pid(WsPid) -> %% WebSocket callbacks %%-------------------------------------------------------------------- -init(Req, #{zone := Zone, listener := Listener} = Opts) -> +init(Req, #{listener := {Type, Listener}} = Opts) -> %% WS Transport Idle Timeout - WsOpts = #{compress => get_ws_opts(Zone, Listener, compress), - deflate_opts => get_ws_opts(Zone, Listener, deflate_opts), - max_frame_size => get_ws_opts(Zone, Listener, max_frame_size), - idle_timeout => get_ws_opts(Zone, Listener, idle_timeout) + WsOpts = #{compress => get_ws_opts(Type, Listener, compress), + deflate_opts => get_ws_opts(Type, Listener, deflate_opts), + max_frame_size => get_ws_opts(Type, Listener, max_frame_size), + idle_timeout => get_ws_opts(Type, Listener, idle_timeout) }, case check_origin_header(Req, Opts) of {error, Message} -> @@ -187,17 +187,17 @@ init(Req, #{zone := Zone, listener := Listener} = Opts) -> ok -> parse_sec_websocket_protocol(Req, Opts, WsOpts) end. -parse_sec_websocket_protocol(Req, #{zone := Zone, listener := Listener} = Opts, WsOpts) -> +parse_sec_websocket_protocol(Req, #{listener := {Type, Listener}} = Opts, WsOpts) -> case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of undefined -> - case get_ws_opts(Zone, Listener, fail_if_no_subprotocol) of + case get_ws_opts(Type, Listener, fail_if_no_subprotocol) of true -> {ok, cowboy_req:reply(400, Req), WsOpts}; false -> {cowboy_websocket, Req, [Req, Opts], WsOpts} end; Subprotocols -> - SupportedSubprotocols = get_ws_opts(Zone, Listener, supported_subprotocols), + SupportedSubprotocols = get_ws_opts(Type, Listener, supported_subprotocols), NSupportedSubprotocols = [list_to_binary(Subprotocol) || Subprotocol <- SupportedSubprotocols], case pick_subprotocol(Subprotocols, NSupportedSubprotocols) of @@ -221,29 +221,29 @@ pick_subprotocol([Subprotocol | Rest], SupportedSubprotocols) -> pick_subprotocol(Rest, SupportedSubprotocols) end. -parse_header_fun_origin(Req, #{zone := Zone, listener := Listener}) -> +parse_header_fun_origin(Req, #{listener := {Type, Listener}}) -> case cowboy_req:header(<<"origin">>, Req) of undefined -> - case get_ws_opts(Zone, Listener, allow_origin_absence) of + case get_ws_opts(Type, Listener, allow_origin_absence) of true -> ok; false -> {error, origin_header_cannot_be_absent} end; Value -> - case lists:member(Value, get_ws_opts(Zone, Listener, check_origins)) of + case lists:member(Value, get_ws_opts(Type, Listener, check_origins)) of true -> ok; false -> {origin_not_allowed, Value} end end. -check_origin_header(Req, #{zone := Zone, listener := Listener} = Opts) -> - case get_ws_opts(Zone, Listener, check_origin_enable) of +check_origin_header(Req, #{listener := {Type, Listener}} = Opts) -> + case get_ws_opts(Type, Listener, check_origin_enable) of true -> parse_header_fun_origin(Req, Opts); false -> ok end. -websocket_init([Req, #{zone := Zone, listener := Listener} = Opts]) -> +websocket_init([Req, #{zone := Zone, listener := {Type, Listener}} = Opts]) -> {Peername, Peercert} = - case emqx_config:get_listener_conf(Zone, Listener, [proxy_protocol]) andalso + case emqx_config:get_listener_conf(Type, Listener, [proxy_protocol]) andalso maps:get(proxy_header, Req) of #{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} -> SourceName = {SrcAddr, SrcPort}, @@ -278,7 +278,7 @@ websocket_init([Req, #{zone := Zone, listener := Listener} = Opts]) -> conn_mod => ?MODULE }, Limiter = emqx_limiter:init(Zone, undefined, undefined, []), - MQTTPiggyback = get_ws_opts(Zone, Listener, mqtt_piggyback), + MQTTPiggyback = get_ws_opts(Type, Listener, mqtt_piggyback), FrameOpts = #{ strict_mode => emqx_config:get_zone_conf(Zone, [mqtt, strict_mode]), max_size => emqx_config:get_zone_conf(Zone, [mqtt, max_packet_size]) @@ -317,7 +317,7 @@ websocket_init([Req, #{zone := Zone, listener := Listener} = Opts]) -> idle_timeout = IdleTimeout, idle_timer = IdleTimer, zone = Zone, - listener = Listener + listener = {Type, Listener} }, hibernate}. websocket_handle({binary, Data}, State) when is_list(Data) -> @@ -370,8 +370,8 @@ websocket_info({check_gc, Stats}, State) -> return(check_oom(run_gc(Stats, State))); websocket_info(Deliver = {deliver, _Topic, _Msg}, - State = #state{zone = Zone, listener = Listener}) -> - ActiveN = emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]), + State = #state{listener = {Type, Listener}}) -> + ActiveN = get_active_n(Type, Listener), Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); @@ -558,12 +558,12 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) -> %% Handle incoming packet %%-------------------------------------------------------------------- -handle_incoming(Packet, State = #state{zone = Zone, listener = Listener}) +handle_incoming(Packet, State = #state{listener = {Type, Listener}}) when is_record(Packet, mqtt_packet) -> ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), ok = inc_incoming_stats(Packet), NState = case emqx_pd:get_counter(incoming_pubs) > - emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) of + get_active_n(Type, Listener) of true -> postpone({cast, rate_limit}, State); false -> State end, @@ -595,12 +595,12 @@ with_channel(Fun, Args, State = #state{channel = Channel}) -> %%-------------------------------------------------------------------- handle_outgoing(Packets, State = #state{mqtt_piggyback = MQTTPiggyback, - zone = Zone, listener = Listener}) -> + listener = {Type, Listener}}) -> IoData = lists:map(serialize_and_inc_stats_fun(State), Packets), Oct = iolist_size(IoData), ok = inc_sent_stats(length(Packets), Oct), NState = case emqx_pd:get_counter(outgoing_pubs) > - emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) of + get_active_n(Type, Listener) of true -> Stats = #{cnt => emqx_pd:reset_counter(outgoing_pubs), oct => emqx_pd:reset_counter(outgoing_bytes) @@ -749,10 +749,10 @@ classify([Event|More], Packets, Cmds, Events) -> trigger(Event) -> erlang:send(self(), Event). -get_peer(Req, #{zone := Zone, listener := Listener}) -> +get_peer(Req, #{listener := {Type, Listener}}) -> {PeerAddr, PeerPort} = cowboy_req:peer(Req), AddrHeader = cowboy_req:header( - get_ws_opts(Zone, Listener, proxy_address_header), Req, <<>>), + get_ws_opts(Type, Listener, proxy_address_header), Req, <<>>), ClientAddr = case string:tokens(binary_to_list(AddrHeader), ", ") of [] -> undefined; @@ -766,7 +766,7 @@ get_peer(Req, #{zone := Zone, listener := Listener}) -> PeerAddr end, PortHeader = cowboy_req:header( - get_ws_opts(Zone, Listener, proxy_port_header), Req, <<>>), + get_ws_opts(Type, Listener, proxy_port_header), Req, <<>>), ClientPort = case string:tokens(binary_to_list(PortHeader), ", ") of [] -> undefined; @@ -787,5 +787,8 @@ set_field(Name, Value, State) -> Pos = emqx_misc:index_of(Name, record_info(fields, state)), setelement(Pos+1, State, Value). -get_ws_opts(Zone, Listener, Key) -> - emqx_config:get_listener_conf(Zone, Listener, [websocket, Key]). +get_ws_opts(Type, Listener, Key) -> + emqx_config:get_listener_conf(Type, Listener, [websocket, Key]). + +get_active_n(Type, Listener) -> + emqx_config:get_listener_conf(Type, Listener, [tcp, active_n]). \ No newline at end of file diff --git a/apps/emqx/test/emqx_access_control_SUITE.erl b/apps/emqx/test/emqx_access_control_SUITE.erl index 8cfa17523..00a1f9fbe 100644 --- a/apps/emqx/test/emqx_access_control_SUITE.erl +++ b/apps/emqx/test/emqx_access_control_SUITE.erl @@ -46,7 +46,7 @@ t_authorize(_) -> clientinfo() -> clientinfo(#{}). clientinfo(InitProps) -> maps:merge(#{zone => default, - listener => mqtt_tcp, + listener => {tcp, default}, protocol => mqtt, peerhost => {127,0,0,1}, clientid => <<"clientid">>, diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index dfbe56916..031f89612 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -27,149 +27,112 @@ all() -> emqx_ct:all(?MODULE). +force_gc_conf() -> + #{bytes => 16777216,count => 16000,enable => true}. + +force_shutdown_conf() -> + #{enable => true,max_heap_size => 4194304, max_message_queue_len => 1000}. + +rate_limit_conf() -> + #{conn_bytes_in => ["100KB","10s"], + conn_messages_in => ["100","10s"], + max_conn_rate => 1000, + quota => + #{conn_messages_routing => infinity, + overall_messages_routing => infinity}}. + +rpc_conf() -> + #{async_batch_size => 256,authentication_timeout => 5000, + call_receive_timeout => 15000,connect_timeout => 5000, + mode => async,port_discovery => stateless, + send_timeout => 5000,socket_buffer => 1048576, + socket_keepalive_count => 9,socket_keepalive_idle => 900, + socket_keepalive_interval => 75,socket_recbuf => 1048576, + socket_sndbuf => 1048576,tcp_client_num => 1, + tcp_server_port => 5369}. + mqtt_conf() -> - #{await_rel_timeout => 300000, - idle_timeout => 15000, - ignore_loop_deliver => false, - keepalive_backoff => 0.75, - max_awaiting_rel => 100, - max_clientid_len => 65535, - max_inflight => 32, - max_mqueue_len => 1000, - max_packet_size => 1048576, - max_qos_allowed => 2, - max_subscriptions => infinity, - max_topic_alias => 65535, - max_topic_levels => 65535, - mountpoint => <<>>, - mqueue_default_priority => lowest, - mqueue_priorities => #{}, - mqueue_store_qos0 => true, - peer_cert_as_clientid => disabled, - peer_cert_as_username => disabled, - response_information => [], - retain_available => true, - retry_interval => 30000, - server_keepalive => disabled, - session_expiry_interval => 7200000, - shared_subscription => true, - strict_mode => false, - upgrade_qos => false, - use_username_as_clientid => false, - wildcard_subscription => true}. + #{await_rel_timeout => 300000,idle_timeout => 15000, + ignore_loop_deliver => false,keepalive_backoff => 0.75, + max_awaiting_rel => 100,max_clientid_len => 65535, + max_inflight => 32,max_mqueue_len => 1000, + max_packet_size => 1048576,max_qos_allowed => 2, + max_subscriptions => infinity,max_topic_alias => 65535, + max_topic_levels => 65535,mqueue_default_priority => lowest, + mqueue_priorities => disabled,mqueue_store_qos0 => true, + peer_cert_as_clientid => disabled, + peer_cert_as_username => disabled, + response_information => [],retain_available => true, + retry_interval => 30000,server_keepalive => disabled, + session_expiry_interval => 7200000, + shared_subscription => true,strict_mode => false, + upgrade_qos => false,use_username_as_clientid => false, + wildcard_subscription => true}. + listener_mqtt_tcp_conf() -> #{acceptors => 16, - access_rules => ["allow all"], - bind => {{0,0,0,0},1883}, - max_connections => 1024000, - proxy_protocol => false, - proxy_protocol_timeout => 3000, - rate_limit => - #{conn_bytes_in => - ["100KB","10s"], - conn_messages_in => - ["100","10s"], - max_conn_rate => 1000, - quota => - #{conn_messages_routing => infinity, - overall_messages_routing => infinity}}, - tcp => - #{active_n => 100, - backlog => 1024, - buffer => 4096, - high_watermark => 1048576, - send_timeout => 15000, - send_timeout_close => - true}, - type => tcp}. + zone => default, + access_rules => ["allow all"], + bind => {{0,0,0,0},1883}, + max_connections => 1024000,mountpoint => <<>>, + proxy_protocol => false,proxy_protocol_timeout => 3000, + tcp => #{ + active_n => 100,backlog => 1024,buffer => 4096, + high_watermark => 1048576,nodelay => false, + reuseaddr => true,send_timeout => 15000, + send_timeout_close => true}}. listener_mqtt_ws_conf() -> #{acceptors => 16, - access_rules => ["allow all"], - bind => {{0,0,0,0},8083}, - max_connections => 1024000, - proxy_protocol => false, - proxy_protocol_timeout => 3000, - rate_limit => - #{conn_bytes_in => - ["100KB","10s"], - conn_messages_in => - ["100","10s"], - max_conn_rate => 1000, - quota => - #{conn_messages_routing => infinity, - overall_messages_routing => infinity}}, - tcp => - #{active_n => 100, - backlog => 1024, - buffer => 4096, - high_watermark => 1048576, - send_timeout => 15000, - send_timeout_close => - true}, - type => ws, - websocket => - #{allow_origin_absence => - true, - check_origin_enable => - false, - check_origins => [], - compress => false, - deflate_opts => - #{client_max_window_bits => - 15, - mem_level => 8, - server_max_window_bits => - 15}, - fail_if_no_subprotocol => - true, - idle_timeout => 86400000, - max_frame_size => infinity, - mqtt_path => "/mqtt", - mqtt_piggyback => multiple, - proxy_address_header => - "x-forwarded-for", - proxy_port_header => - "x-forwarded-port", - supported_subprotocols => - ["mqtt","mqtt-v3", - "mqtt-v3.1.1", - "mqtt-v5"]}}. + zone => default, + access_rules => ["allow all"], + bind => {{0,0,0,0},8083}, + max_connections => 1024000,mountpoint => <<>>, + proxy_protocol => false,proxy_protocol_timeout => 3000, + tcp => + #{active_n => 100,backlog => 1024,buffer => 4096, + high_watermark => 1048576,nodelay => false, + reuseaddr => true,send_timeout => 15000, + send_timeout_close => true}, + websocket => + #{allow_origin_absence => true,check_origin_enable => false, + check_origins => [],compress => false, + deflate_opts => + #{client_max_window_bits => 15,mem_level => 8, + server_max_window_bits => 15}, + fail_if_no_subprotocol => true,idle_timeout => 86400000, + max_frame_size => infinity,mqtt_path => "/mqtt", + mqtt_piggyback => multiple, + proxy_address_header => "x-forwarded-for", + proxy_port_header => "x-forwarded-port", + supported_subprotocols => + ["mqtt","mqtt-v3","mqtt-v3.1.1","mqtt-v5"]}}. -default_zone_conf() -> - #{zones => - #{default => - #{ authorization => #{ - cache => #{enable => true,max_size => 32, ttl => 60000}, - deny_action => ignore, - enable => false - }, - auth => #{enable => false}, - overall_max_connections => infinity, - stats => #{enable => true}, - conn_congestion => - #{enable_alarm => true, min_alarm_sustain_duration => 60000}, - flapping_detect => - #{ban_time => 300000,enable => false, - max_count => 15,window_time => 60000}, - force_gc => - #{bytes => 16777216,count => 16000, - enable => true}, - force_shutdown => - #{enable => true, - max_heap_size => 4194304, - max_message_queue_len => 1000}, - mqtt => mqtt_conf(), - listeners => - #{mqtt_tcp => listener_mqtt_tcp_conf(), - mqtt_ws => listener_mqtt_ws_conf()} - } - } +listeners_conf() -> + #{tcp => #{default => listener_mqtt_tcp_conf()}, + ws => #{default => listener_mqtt_ws_conf()} }. -set_default_zone_conf() -> - emqx_config:put(default_zone_conf()). +stats_conf() -> + #{enable => true}. + +zone_conf() -> + #{}. + +basic_conf() -> + #{rate_limit => rate_limit_conf(), + force_gc => force_gc_conf(), + force_shutdown => force_shutdown_conf(), + mqtt => mqtt_conf(), + rpc => rpc_conf(), + stats => stats_conf(), + listeners => listeners_conf(), + zones => zone_conf() + }. + +set_test_listenser_confs() -> + emqx_config:put(basic_conf()). %%-------------------------------------------------------------------- %% CT Callbacks @@ -211,7 +174,7 @@ end_per_suite(_Config) -> ]). init_per_testcase(_TestCase, Config) -> - set_default_zone_conf(), + set_test_listenser_confs(), Config. end_per_testcase(_TestCase, Config) -> @@ -917,7 +880,7 @@ t_ws_cookie_init(_) -> conn_mod => emqx_ws_connection, ws_cookie => WsCookie }, - Channel = emqx_channel:init(ConnInfo, #{zone => default, listener => mqtt_tcp}), + Channel = emqx_channel:init(ConnInfo, #{zone => default, listener => {tcp, default}}), ?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)). %%-------------------------------------------------------------------- @@ -942,7 +905,7 @@ channel(InitFields) -> maps:fold(fun(Field, Value, Channel) -> emqx_channel:set_field(Field, Value, Channel) end, - emqx_channel:init(ConnInfo, #{zone => default, listener => mqtt_tcp}), + emqx_channel:init(ConnInfo, #{zone => default, listener => {tcp, default}}), maps:merge(#{clientinfo => clientinfo(), session => session(), conn_state => connected @@ -951,7 +914,7 @@ channel(InitFields) -> clientinfo() -> clientinfo(#{}). clientinfo(InitProps) -> maps:merge(#{zone => default, - listener => mqtt_tcp, + listener => {tcp, default}, protocol => mqtt, peerhost => {127,0,0,1}, clientid => <<"clientid">>, diff --git a/apps/emqx/test/emqx_client_SUITE.erl b/apps/emqx/test/emqx_client_SUITE.erl index c6a450471..117a0f5b9 100644 --- a/apps/emqx/test/emqx_client_SUITE.erl +++ b/apps/emqx/test/emqx_client_SUITE.erl @@ -79,8 +79,8 @@ groups() -> init_per_suite(Config) -> emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([]), - emqx_config:put_listener_conf(default, mqtt_ssl, [ssl, verify], verify_peer), - emqx_listeners:restart_listener('default:mqtt_ssl'), + emqx_config:put_listener_conf(ssl, default, [ssl, verify], verify_peer), + emqx_listeners:restart_listener('ssl:default'), Config. end_per_suite(_Config) -> diff --git a/apps/emqx/test/emqx_cm_SUITE.erl b/apps/emqx/test/emqx_cm_SUITE.erl index 75d0a899c..d492edd0e 100644 --- a/apps/emqx/test/emqx_cm_SUITE.erl +++ b/apps/emqx/test/emqx_cm_SUITE.erl @@ -89,7 +89,7 @@ t_open_session(_) -> ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end), ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end), - ClientInfo = #{zone => default, listener => mqtt_tcp, + ClientInfo = #{zone => default, listener => {tcp, default}, clientid => <<"clientid">>, username => <<"username">>, peerhost => {127,0,0,1}}, @@ -114,7 +114,7 @@ rand_client_id() -> t_open_session_race_condition(_) -> ClientId = rand_client_id(), - ClientInfo = #{zone => default, listener => mqtt_tcp, + ClientInfo = #{zone => default, listener => {tcp, default}, clientid => ClientId, username => <<"username">>, peerhost => {127,0,0,1}}, diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index 0d5114325..5784ad6aa 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -57,7 +57,7 @@ init_per_suite(Config) -> ok = meck:expect(emqx_alarm, deactivate, fun(_) -> ok end), ok = meck:expect(emqx_alarm, deactivate, fun(_, _) -> ok end), - emqx_channel_SUITE:set_default_zone_conf(), + emqx_channel_SUITE:set_test_listenser_confs(), Config. end_per_suite(_Config) -> @@ -219,9 +219,9 @@ t_handle_msg_deliver(_) -> t_handle_msg_inet_reply(_) -> ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end), - emqx_config:put_listener_conf(default, mqtt_tcp, [tcp, active_n], 0), + emqx_config:put_listener_conf(tcp, default, [tcp, active_n], 0), ?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st())), - emqx_config:put_listener_conf(default, mqtt_tcp, [tcp, active_n], 100), + emqx_config:put_listener_conf(tcp, default, [tcp, active_n], 100), ?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st())), ?assertMatch({stop, {shutdown, for_testing}, _St}, handle_msg({inet_reply, for_testing, {error, for_testing}}, st())). @@ -456,7 +456,7 @@ with_conn(TestFun, Opts) when is_map(Opts) -> TrapExit = maps:get(trap_exit, Opts, false), process_flag(trap_exit, TrapExit), {ok, CPid} = emqx_connection:start_link(emqx_transport, sock, - maps:merge(Opts, #{zone => default, listener => mqtt_tcp})), + maps:merge(Opts, #{zone => default, listener => {tcp, default}})), TestFun(CPid), TrapExit orelse emqx_connection:stop(CPid), ok. @@ -479,7 +479,7 @@ st(InitFields) when is_map(InitFields) -> st(InitFields, #{}). st(InitFields, ChannelFields) when is_map(InitFields) -> St = emqx_connection:init_state(emqx_transport, sock, #{zone => default, - listener => mqtt_tcp}), + listener => {tcp, default}}), maps:fold(fun(N, V, S) -> emqx_connection:set_field(N, V, S) end, emqx_connection:set_field(channel, channel(ChannelFields), St), InitFields @@ -500,7 +500,7 @@ channel(InitFields) -> expiry_interval => 0 }, ClientInfo = #{zone => default, - listener => mqtt_tcp, + listener => {tcp, default}, protocol => mqtt, peerhost => {127,0,0,1}, clientid => <<"clientid">>, @@ -513,7 +513,7 @@ channel(InitFields) -> maps:fold(fun(Field, Value, Channel) -> emqx_channel:set_field(Field, Value, Channel) end, - emqx_channel:init(ConnInfo, #{zone => default, listener => mqtt_tcp}), + emqx_channel:init(ConnInfo, #{zone => default, listener => {tcp, default}}), maps:merge(#{clientinfo => ClientInfo, session => Session, conn_state => connected diff --git a/apps/emqx/test/emqx_flapping_SUITE.erl b/apps/emqx/test/emqx_flapping_SUITE.erl index eca276b84..5ac6b9cdf 100644 --- a/apps/emqx/test/emqx_flapping_SUITE.erl +++ b/apps/emqx/test/emqx_flapping_SUITE.erl @@ -40,7 +40,7 @@ end_per_suite(_Config) -> t_detect_check(_) -> ClientInfo = #{zone => default, - listener => mqtt_tcp, + listener => {tcp, default}, clientid => <<"client007">>, peerhost => {127,0,0,1} }, @@ -64,7 +64,7 @@ t_detect_check(_) -> t_expired_detecting(_) -> ClientInfo = #{zone => default, - listener => mqtt_tcp, + listener => {tcp, default}, clientid => <<"client008">>, peerhost => {127,0,0,1}}, false = emqx_flapping:detect(ClientInfo), diff --git a/apps/emqx/test/emqx_session_SUITE.erl b/apps/emqx/test/emqx_session_SUITE.erl index 1aa5b0196..f52dacc14 100644 --- a/apps/emqx/test/emqx_session_SUITE.erl +++ b/apps/emqx/test/emqx_session_SUITE.erl @@ -29,7 +29,7 @@ all() -> emqx_ct:all(?MODULE). %%-------------------------------------------------------------------- init_per_suite(Config) -> - emqx_channel_SUITE:set_default_zone_conf(), + emqx_channel_SUITE:set_test_listenser_confs(), ok = meck:new([emqx_hooks, emqx_metrics, emqx_broker], [passthrough, no_history, no_link]), ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end), diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl index b25f051eb..767a7994e 100644 --- a/apps/emqx/test/emqx_ws_connection_SUITE.erl +++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl @@ -48,7 +48,7 @@ init_per_testcase(TestCase, Config) when TestCase =/= t_ws_pingreq_before_connected, TestCase =/= t_ws_non_check_origin -> - emqx_channel_SUITE:set_default_zone_conf(), + emqx_channel_SUITE:set_test_listenser_confs(), %% Mock cowboy_req ok = meck:new(cowboy_req, [passthrough, no_history, no_link]), ok = meck:expect(cowboy_req, header, fun(_, _, _) -> <<>> end), @@ -119,7 +119,7 @@ t_info(_) -> } = SockInfo. set_ws_opts(Key, Val) -> - emqx_config:put_listener_conf(default, mqtt_ws, [websocket, Key], Val). + emqx_config:put_listener_conf(ws, default, [websocket, Key], Val). t_header(_) -> ok = meck:expect(cowboy_req, header, @@ -127,7 +127,7 @@ t_header(_) -> (<<"x-forwarded-port">>, _, _) -> <<"1000">> end), set_ws_opts(proxy_address_header, <<"x-forwarded-for">>), set_ws_opts(proxy_port_header, <<"x-forwarded-port">>), - {ok, St, _} = ?ws_conn:websocket_init([req, #{zone => default, listener => mqtt_ws}]), + {ok, St, _} = ?ws_conn:websocket_init([req, #{zone => default, listener => {ws, default}}]), WsPid = spawn(fun() -> receive {call, From, info} -> gen_server:reply(From, ?ws_conn:info(St)) @@ -222,8 +222,8 @@ t_ws_sub_protocols_mqtt_equivalents(_) -> start_ws_client(#{protocols => [<<"not-mqtt">>]})). t_ws_check_origin(_) -> - emqx_config:put_listener_conf(default, mqtt_ws, [websocket, check_origin_enable], true), - emqx_config:put_listener_conf(default, mqtt_ws, [websocket, check_origins], + emqx_config:put_listener_conf(ws, default, [websocket, check_origin_enable], true), + emqx_config:put_listener_conf(ws, default, [websocket, check_origins], [<<"http://localhost:18083">>]), {ok, _} = application:ensure_all_started(gun), ?assertMatch({gun_upgrade, _}, @@ -234,8 +234,8 @@ t_ws_check_origin(_) -> headers => [{<<"origin">>, <<"http://localhost:18080">>}]})). t_ws_non_check_origin(_) -> - emqx_config:put_listener_conf(default, mqtt_ws, [websocket, check_origin_enable], false), - emqx_config:put_listener_conf(default, mqtt_ws, [websocket, check_origins], []), + emqx_config:put_listener_conf(ws, default, [websocket, check_origin_enable], false), + emqx_config:put_listener_conf(ws, default, [websocket, check_origins], []), {ok, _} = application:ensure_all_started(gun), ?assertMatch({gun_upgrade, _}, start_ws_client(#{protocols => [<<"mqtt">>], @@ -245,7 +245,7 @@ t_ws_non_check_origin(_) -> headers => [{<<"origin">>, <<"http://localhost:18080">>}]})). t_init(_) -> - Opts = #{listener => mqtt_ws, zone => default}, + Opts = #{listener => {ws, default}, zone => default}, ok = meck:expect(cowboy_req, parse_header, fun(_, req) -> undefined end), ok = meck:expect(cowboy_req, reply, fun(_, Req) -> Req end), {ok, req, _} = ?ws_conn:init(req, Opts), @@ -438,7 +438,7 @@ t_shutdown(_) -> st() -> st(#{}). st(InitFields) when is_map(InitFields) -> - {ok, St, _} = ?ws_conn:websocket_init([req, #{zone => default, listener => mqtt_ws}]), + {ok, St, _} = ?ws_conn:websocket_init([req, #{zone => default, listener => {ws, default}}]), maps:fold(fun(N, V, S) -> ?ws_conn:set_field(N, V, S) end, ?ws_conn:set_field(channel, channel(), St), InitFields @@ -459,7 +459,7 @@ channel(InitFields) -> expiry_interval => 0 }, ClientInfo = #{zone => default, - listener => mqtt_ws, + listener => {ws, default}, protocol => mqtt, peerhost => {127,0,0,1}, clientid => <<"clientid">>, @@ -472,7 +472,7 @@ channel(InitFields) -> maps:fold(fun(Field, Value, Channel) -> emqx_channel:set_field(Field, Value, Channel) end, - emqx_channel:init(ConnInfo, #{zone => default, listener => mqtt_ws}), + emqx_channel:init(ConnInfo, #{zone => default, listener => {ws, default}}), maps:merge(#{clientinfo => ClientInfo, session => Session, conn_state => connected diff --git a/apps/emqx_authn/test/emqx_authn_SUITE.erl b/apps/emqx_authn/test/emqx_authn_SUITE.erl index 0be04d6cf..eb7f0291a 100644 --- a/apps/emqx_authn/test/emqx_authn_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_SUITE.erl @@ -105,7 +105,7 @@ t_authenticator(_) -> t_authenticate(_) -> ClientInfo = #{zone => default, - listener => mqtt_tcp, + listener => {tcp, default}, username => <<"myuser">>, password => <<"mypass">>}, ?assertEqual({ok, #{superuser => false}}, emqx_access_control:authenticate(ClientInfo)), diff --git a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl index bbd4232dd..a455d0ab8 100644 --- a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl @@ -67,7 +67,7 @@ t_authz(_) -> protocol => mqtt, mountpoint => <<"fake">>, zone => default, - listener => mqtt_tcp + listener => {tcp, default} }, meck:expect(emqx_resource, query, fun(_, _) -> {ok, 204, fake_headers} end), diff --git a/apps/emqx_authz/test/emqx_authz_mongo_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mongo_SUITE.erl index dac106b37..1ad7b4f7a 100644 --- a/apps/emqx_authz/test/emqx_authz_mongo_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_mongo_SUITE.erl @@ -80,19 +80,19 @@ t_authz(_) -> username => <<"test">>, peerhost => {127,0,0,1}, zone => default, - listener => mqtt_tcp + listener => {tcp, default} }, ClientInfo2 = #{clientid => <<"test_clientid">>, username => <<"test_username">>, peerhost => {192,168,0,10}, zone => default, - listener => mqtt_tcp + listener => {tcp, default} }, ClientInfo3 = #{clientid => <<"test_clientid">>, username => <<"fake_username">>, peerhost => {127,0,0,1}, zone => default, - listener => mqtt_tcp + listener => {tcp, default} }, meck:expect(emqx_resource, query, fun(_, _) -> [] end), diff --git a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl index 0fba033a6..3c0320a42 100644 --- a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl @@ -77,19 +77,19 @@ t_authz(_) -> username => <<"test">>, peerhost => {127,0,0,1}, zone => default, - listener => mqtt_tcp + listener => {tcp, default} }, ClientInfo2 = #{clientid => <<"test_clientid">>, username => <<"test_username">>, peerhost => {192,168,0,10}, zone => default, - listener => mqtt_tcp + listener => {tcp, default} }, ClientInfo3 = #{clientid => <<"test_clientid">>, username => <<"fake_username">>, peerhost => {127,0,0,1}, zone => default, - listener => mqtt_tcp + listener => {tcp, default} }, meck:expect(emqx_resource, query, fun(_, _) -> {ok, ?COLUMNS, []} end), diff --git a/apps/emqx_authz/test/emqx_authz_pgsql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_pgsql_SUITE.erl index d21caa223..43ea271a6 100644 --- a/apps/emqx_authz/test/emqx_authz_pgsql_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_pgsql_SUITE.erl @@ -77,19 +77,19 @@ t_authz(_) -> username => <<"test">>, peerhost => {127,0,0,1}, zone => default, - listener => mqtt_tcp + listener => {tcp, default} }, ClientInfo2 = #{clientid => <<"test_clientid">>, username => <<"test_username">>, peerhost => {192,168,0,10}, zone => default, - listener => mqtt_tcp + listener => {tcp, default} }, ClientInfo3 = #{clientid => <<"test_clientid">>, username => <<"fake_username">>, peerhost => {127,0,0,1}, zone => default, - listener => mqtt_tcp + listener => {tcp, default} }, meck:expect(emqx_resource, query, fun(_, _) -> {ok, ?COLUMNS, []} end), diff --git a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl index 0da931cc7..46ed9579e 100644 --- a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl @@ -70,7 +70,7 @@ t_authz(_) -> username => <<"username">>, peerhost => {127,0,0,1}, zone => default, - listener => mqtt_tcp + listener => {tcp, default} }, meck:expect(emqx_resource, query, fun(_, _) -> {ok, []} end), diff --git a/apps/emqx_authz/test/emqx_authz_rule_SUITE.erl b/apps/emqx_authz/test/emqx_authz_rule_SUITE.erl index e6e450a63..ff215354a 100644 --- a/apps/emqx_authz/test/emqx_authz_rule_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_rule_SUITE.erl @@ -70,25 +70,25 @@ t_match(_) -> username => <<"test">>, peerhost => {127,0,0,1}, zone => default, - listener => mqtt_tcp + listener => {tcp, default} }, ClientInfo2 = #{clientid => <<"test">>, username => <<"test">>, peerhost => {192,168,1,10}, zone => default, - listener => mqtt_tcp + listener => {tcp, default} }, ClientInfo3 = #{clientid => <<"test">>, username => <<"fake">>, peerhost => {127,0,0,1}, zone => default, - listener => mqtt_tcp + listener => {tcp, default} }, ClientInfo4 = #{clientid => <<"fake">>, username => <<"test">>, peerhost => {127,0,0,1}, zone => default, - listener => mqtt_tcp + listener => {tcp, default} }, ?assertEqual({matched, deny}, diff --git a/apps/emqx_gateway/src/emqx_gateway_ctx.erl b/apps/emqx_gateway/src/emqx_gateway_ctx.erl index b5de6cb9a..8022c3797 100644 --- a/apps/emqx_gateway/src/emqx_gateway_ctx.erl +++ b/apps/emqx_gateway/src/emqx_gateway_ctx.erl @@ -69,7 +69,7 @@ authenticate(_Ctx = #{auth := undefined}, ClientInfo) -> authenticate(_Ctx = #{auth := ChainId}, ClientInfo0) -> ClientInfo = ClientInfo0#{ zone => default, - listener => mqtt_tcp, + listener => {tcp, default}, chain_id => ChainId }, case emqx_access_control:authenticate(ClientInfo) of diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_protocol.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_protocol.erl index f96fe714c..1c8b581a4 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_protocol.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_protocol.erl @@ -455,7 +455,7 @@ clientinfo(#lwm2m_state{peername = {PeerHost, _}, endpoint_name = EndpointName, mountpoint = Mountpoint}) -> #{zone => default, - listener => mqtt_tcp, %% FIXME: this won't work + listener => {tcp, default}, %% FIXME: this won't work protocol => lwm2m, peerhost => PeerHost, sockport => 5683, %% FIXME: diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index b1a74375d..250f43988 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -113,7 +113,7 @@ init(ConnInfo = #{peername := {PeerHost, _}, ClientInfo = setting_peercert_infos( Peercert, #{ zone => default - , listener => mqtt_tcp + , listener => {tcp, default} , protocol => stomp , peerhost => PeerHost , sockport => SockPort diff --git a/apps/emqx_management/test/emqx_mgmt_listeners_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_listeners_api_SUITE.erl index 07697243e..51ac403b2 100644 --- a/apps/emqx_management/test/emqx_mgmt_listeners_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_listeners_api_SUITE.erl @@ -61,7 +61,7 @@ t_get_node_listeners(_) -> get_api(Path). t_manage_listener(_) -> - ID = "default:mqtt_tcp", + ID = "tcp:default", manage_listener(ID, "stop", false), manage_listener(ID, "start", true), manage_listener(ID, "restart", true).