diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 9b43b3bf3..971b4d5d4 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -207,7 +207,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}, Peercert = maps:get(peercert, ConnInfo, undefined), Protocol = maps:get(protocol, ConnInfo, mqtt), MountPoint = case get_mqtt_conf(Zone, Listener, mountpoint) of - "" -> undefined; + <<>> -> undefined; MP -> MP end, QuotaPolicy = emqx_config:get_listener_conf(Zone, Listener, [rate_limit, quota]), diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 09ac7a683..2b2b628ad 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -24,7 +24,149 @@ -include_lib("eunit/include/eunit.hrl"). -all() -> emqx_ct:all(?MODULE). +all() -> + emqx_ct:all(?MODULE). + +mqtt_conf() -> + #{await_rel_timeout => 300, + idle_timeout => 15000, + ignore_loop_deliver => false, + keepalive_backoff => 0.75, + max_awaiting_rel => 100, + max_clientid_len => 65535, + max_inflight => 32, + max_mqueue_len => 1000, + max_packet_size => 1048576, + max_qos_allowed => 2, + max_subscriptions => infinity, + max_topic_alias => 65535, + max_topic_levels => 65535, + mountpoint => <<>>, + mqueue_default_priority => highest, + mqueue_priorities => [], + mqueue_store_qos0 => true, + peer_cert_as_clientid => disabled, + peer_cert_as_username => disabled, + response_information => [], + retain_available => true, + retry_interval => 30, + server_keepalive => disabled, + session_expiry_interval => 7200, + shared_subscription => true, + strict_mode => false, + upgrade_qos => false, + use_username_as_clientid => false, + wildcard_subscription => true}. + +listener_mqtt_tcp_conf() -> + #{acceptors => 16, + access_rules => ["allow all"], + bind => {{0,0,0,0},1883}, + max_connections => 1024000, + proxy_protocol => false, + proxy_protocol_timeout => 3000, + rate_limit => + #{conn_bytes_in => + ["100KB","10s"], + conn_messages_in => + ["100","10s"], + max_conn_rate => 1000, + quota => + #{conn_messages_routing => infinity, + overall_messages_routing => infinity}}, + tcp => + #{active_n => 100, + backlog => 1024, + buffer => 4096, + high_watermark => 1048576, + send_timeout => 15000, + send_timeout_close => + true}, + type => tcp}. + +listener_mqtt_ws_conf() -> + #{acceptors => 16, + access_rules => ["allow all"], + bind => {{0,0,0,0},8083}, + max_connections => 1024000, + proxy_protocol => false, + proxy_protocol_timeout => 3000, + rate_limit => + #{conn_bytes_in => + ["100KB","10s"], + conn_messages_in => + ["100","10s"], + max_conn_rate => 1000, + quota => + #{conn_messages_routing => infinity, + overall_messages_routing => infinity}}, + tcp => + #{active_n => 100, + backlog => 1024, + buffer => 4096, + high_watermark => 1048576, + send_timeout => 15000, + send_timeout_close => + true}, + type => ws, + websocket => + #{allow_origin_absence => + true, + check_origin_enable => + false, + check_origins => [], + compress => false, + deflate_opts => + #{client_max_window_bits => + 15, + mem_level => 8, + server_max_window_bits => + 15}, + fail_if_no_subprotocol => + true, + idle_timeout => 86400000, + max_frame_size => infinity, + mqtt_path => "/mqtt", + mqtt_piggyback => multiple, + proxy_address_header => + "x-forwarded-for", + proxy_port_header => + "x-forwarded-port", + supported_subprotocols => + ["mqtt","mqtt-v3", + "mqtt-v3.1.1", + "mqtt-v5"]}}. + +default_zone_conf() -> + #{zones => + #{default => + #{ acl => #{ + cache => #{enable => true,max_size => 32, ttl => 60000}, + deny_action => ignore, + enable => false + }, + auth => #{enable => false}, + overall_max_connections => infinity, + stats => #{enable => true}, + conn_congestion => + #{enable_alarm => true, min_alarm_sustain_duration => 60000}, + flapping_detect => + #{ban_time => 300000,enable => true, + max_count => 15,window_time => 60000}, + force_gc => + #{bytes => 16777216,count => 16000, + enable => true}, + force_shutdown => + #{enable => true, + max_heap_size => 4194304, + max_message_queue_len => 1000}, + mqtt => mqtt_conf(), + listeners => + #{mqtt_tcp => listener_mqtt_tcp_conf(), + mqtt_ws => listener_mqtt_ws_conf()} + } + } + }. %%-------------------------------------------------------------------- %% CT Callbacks @@ -50,6 +192,9 @@ init_per_suite(Config) -> ok = meck:new(emqx_metrics, [passthrough, no_history, no_link]), ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end), ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end), + %% Ban + meck:new(emqx_banned, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_banned, check, fun(_ConnInfo) -> false end), Config. end_per_suite(_Config) -> @@ -62,11 +207,10 @@ end_per_suite(_Config) -> ]). init_per_testcase(_TestCase, Config) -> - meck:new(emqx_zone, [passthrough, no_history, no_link]), + emqx_config:put(default_zone_conf()), Config. end_per_testcase(_TestCase, Config) -> - meck:unload([emqx_zone]), Config. %%-------------------------------------------------------------------- @@ -241,7 +385,7 @@ t_bad_receive_maximum(_) -> fun(true, _ClientInfo, _ConnInfo) -> {ok, #{session => session(), present => false}} end), - ok = meck:expect(emqx_zone, response_information, fun(_) -> test end), + emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, response_information], test), C1 = channel(#{conn_state => idle}), {shutdown, protocol_error, _, _} = emqx_channel:handle_in( @@ -254,8 +398,8 @@ t_override_client_receive_maximum(_) -> fun(true, _ClientInfo, _ConnInfo) -> {ok, #{session => session(), present => false}} end), - ok = meck:expect(emqx_zone, response_information, fun(_) -> test end), - ok = meck:expect(emqx_zone, max_inflight, fun(_) -> 0 end), + emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, response_information], test), + emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_inflight], 0), C1 = channel(#{conn_state => idle}), ClientCapacity = 2, {ok, [{event, connected}, _ConnAck], C2} = @@ -506,7 +650,7 @@ t_handle_out_connack_response_information(_) -> fun(true, _ClientInfo, _ConnInfo) -> {ok, #{session => session(), present => false}} end), - ok = meck:expect(emqx_zone, response_information, fun(_) -> test end), + emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, response_information], test), IdleChannel = channel(#{conn_state => idle}), {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, #{'Response-Information' := test})}], @@ -520,7 +664,7 @@ t_handle_out_connack_not_response_information(_) -> fun(true, _ClientInfo, _ConnInfo) -> {ok, #{session => session(), present => false}} end), - ok = meck:expect(emqx_zone, response_information, fun(_) -> test end), + emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, response_information], test), IdleChannel = channel(#{conn_state => idle}), {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, AckProps)}], _} = emqx_channel:handle_in( @@ -660,9 +804,6 @@ t_enrich_conninfo(_) -> t_enrich_client(_) -> {ok, _ConnPkt, _Chan} = emqx_channel:enrich_client(connpkt(), channel()). -t_check_banned(_) -> - ok = emqx_channel:check_banned(connpkt(), channel()). - t_auth_connect(_) -> {ok, _Chan} = emqx_channel:auth_connect(connpkt(), channel()). @@ -709,7 +850,7 @@ t_packing_alias(_) -> channel())). t_check_pub_acl(_) -> - ok = meck:expect(emqx_zone, enable_acl, fun(_) -> true end), + emqx_config:put_listener_conf(default, mqtt_tcp, [acl, enable], true), Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>), ok = emqx_channel:check_pub_acl(Publish, channel()). @@ -719,7 +860,7 @@ t_check_pub_alias(_) -> ok = emqx_channel:check_pub_alias(#mqtt_packet{variable = Publish}, Channel). t_check_sub_acls(_) -> - ok = meck:expect(emqx_zone, enable_acl, fun(_) -> true end), + emqx_config:put_listener_conf(default, mqtt_tcp, [acl, enable], true), TopicFilter = {<<"t">>, ?DEFAULT_SUBOPTS}, [{TopicFilter, 0}] = emqx_channel:check_sub_acls([TopicFilter], channel()). @@ -763,7 +904,7 @@ t_ws_cookie_init(_) -> conn_mod => emqx_ws_connection, ws_cookie => WsCookie }, - Channel = emqx_channel:init(ConnInfo, [{zone, zone}]), + Channel = emqx_channel:init(ConnInfo, #{zone => default, listener => mqtt_tcp}), ?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)). %%-------------------------------------------------------------------- @@ -788,7 +929,7 @@ channel(InitFields) -> maps:fold(fun(Field, Value, Channel) -> emqx_channel:set_field(Field, Value, Channel) end, - emqx_channel:init(ConnInfo, [{zone, zone}]), + emqx_channel:init(ConnInfo, #{zone => default, listener => mqtt_tcp}), maps:merge(#{clientinfo => clientinfo(), session => session(), conn_state => connected @@ -796,7 +937,8 @@ channel(InitFields) -> clientinfo() -> clientinfo(#{}). clientinfo(InitProps) -> - maps:merge(#{zone => zone, + maps:merge(#{zone => default, + listener => mqtt_tcp, protocol => mqtt, peerhost => {127,0,0,1}, clientid => <<"clientid">>,