From 868b31d123186cc718ad935fc39d2eea1da526af Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 13 Jul 2021 16:37:18 +0800 Subject: [PATCH] fix(test): update the testcases for emqx_mqtt_protocol_v5_SUITE --- apps/emqx/src/emqx_connection.erl | 13 ++-- apps/emqx/src/emqx_quic_connection.erl | 2 - .../emqx/test/emqx_mqtt_protocol_v5_SUITE.erl | 68 ++++++------------- 3 files changed, 29 insertions(+), 54 deletions(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 59c735afc..9e688fc0f 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -466,14 +466,13 @@ handle_msg({Passive, _Sock}, State) handle_msg(Deliver = {deliver, _Topic, _Msg}, #state{zone = Zone, listener = Listener} = State) -> - ActiveN = emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]), + ActiveN = get_active_n(Zone, Listener), Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); %% Something sent handle_msg({inet_reply, _Sock, ok}, State = #state{zone = Zone, listener = Listener}) -> - case emqx_pd:get_counter(outgoing_pubs) > - emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) of + case emqx_pd:get_counter(outgoing_pubs) > get_active_n(Zone, Listener) of true -> Pubs = emqx_pd:reset_counter(outgoing_pubs), Bytes = emqx_pd:reset_counter(outgoing_bytes), @@ -823,7 +822,7 @@ activate_socket(State = #state{sockstate = blocked}) -> {ok, State}; activate_socket(State = #state{transport = Transport, socket = Socket, zone = Zone, listener = Listener}) -> - ActiveN = emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]), + ActiveN = get_active_n(Zone, Listener), case Transport:setopts(Socket, [{active, ActiveN}]) of ok -> {ok, State#state{sockstate = running}}; Error -> Error @@ -905,3 +904,9 @@ get_state(Pid) -> State = sys:get_state(Pid), maps:from_list(lists:zip(record_info(fields, state), tl(tuple_to_list(State)))). + +get_active_n(Zone, Listener) -> + case emqx_config:get([zones, Zone, listeners, Listener, type]) of + quic -> 100; + _ -> emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) + end. diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index b83522c6e..cd41e74a7 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -21,6 +21,4 @@ ]). new_conn(Conn, {_L, COpts, _S}) when is_map(COpts) -> - new_conn(Conn, maps:to_list(COpts)); -new_conn(Conn, COpts) -> emqx_connection:start_link(emqx_quic_stream, Conn, COpts). diff --git a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl index 2f3048277..607bee44b 100644 --- a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl @@ -217,10 +217,14 @@ t_connect_will_message(Config) -> ok = emqtt:disconnect(Client4). t_batch_subscribe(init, Config) -> + emqx_config:put_listener_conf(default, mqtt_tcp, [acl, enable], true), + emqx_config:put_listener_conf(default, mqtt_quic, [acl, enable], true), ok = meck:new(emqx_access_control, [non_strict, passthrough, no_history, no_link]), meck:expect(emqx_access_control, authorize, fun(_, _, _) -> deny end), Config; t_batch_subscribe('end', _Config) -> + emqx_config:put_listener_conf(default, mqtt_tcp, [acl, enable], false), + emqx_config:put_listener_conf(default, mqtt_quic, [acl, enable], false), meck:unload(emqx_access_control). t_batch_subscribe(Config) -> @@ -284,52 +288,22 @@ t_connect_will_retain(Config) -> t_connect_idle_timeout(_Config) -> IdleTimeout = 2000, - emqx_zone:set_env(external, idle_timeout, IdleTimeout), - + emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, idle_timeout], IdleTimeout), + emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, idle_timeout], IdleTimeout), {ok, Sock} = emqtt_sock:connect({127,0,0,1}, 1883, [], 60000), timer:sleep(IdleTimeout), ?assertMatch({error, closed}, emqtt_sock:recv(Sock,1024)). -t_connect_limit_timeout(init, Config) -> - ok = meck:new(proplists, [non_strict, passthrough, no_history, no_link, unstick]), - meck:expect(proplists, get_value, fun(active_n, _Options, _Default) -> 1; - (Arg1, ARg2, Arg3) -> meck:passthrough([Arg1, ARg2, Arg3]) - end), - Config; -t_connect_limit_timeout('end', _Config) -> - catch meck:unload(proplists). - -t_connect_limit_timeout(Config) -> - ConnFun = ?config(conn_fun, Config), - Topic = nth(1, ?TOPICS), - emqx_zone:set_env(external, publish_limit, {3, 5}), - - {ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60} | Config]), - {ok, _} = emqtt:ConnFun(Client), - [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)), - - ?assertEqual(undefined, emqx_connection:info(limit_timer, sys:get_state(ClientPid))), - Payload = <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, - {ok, 2} = emqtt:publish(Client, Topic, Payload, 1), - {ok, 3} = emqtt:publish(Client, Topic, Payload, 1), - {ok, 4} = emqtt:publish(Client, Topic, Payload, 1), - timer:sleep(250), - ?assert(is_reference(emqx_connection:info(limit_timer, sys:get_state(ClientPid)))), - - ok = emqtt:disconnect(Client), - emqx_zone:set_env(external, publish_limit, undefined), - meck:unload(proplists). - t_connect_emit_stats_timeout(init, Config) -> NewIdleTimeout = 1000, - OldIdleTimeout = emqx_zone:get_env(external, idle_timeout), - emqx_zone:set_env(external, idle_timeout, NewIdleTimeout), + emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, idle_timeout], NewIdleTimeout), + emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, idle_timeout], NewIdleTimeout), ok = snabbkaffe:start_trace(), - [{idle_timeout, NewIdleTimeout}, {old_idle_timeout, OldIdleTimeout} | Config]; -t_connect_emit_stats_timeout('end', Config) -> + [{idle_timeout, NewIdleTimeout} | Config]; +t_connect_emit_stats_timeout('end', _Config) -> snabbkaffe:stop(), - {_, OldIdleTimeout} = lists:keyfind(old_idle_timeout, 1, Config), - emqx_zone:set_env(external, idle_timeout, OldIdleTimeout), + emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, idle_timeout], 15000), + emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, idle_timeout], 15000), ok. t_connect_emit_stats_timeout(Config) -> @@ -497,7 +471,8 @@ t_connack_session_present(Config) -> t_connack_max_qos_allowed(init, Config) -> Config; t_connack_max_qos_allowed('end', _Config) -> - emqx_zone:set_env(external, max_qos_allowed, 2), + emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_qos_allowed], 2), + emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, max_qos_allowed], 2), ok. t_connack_max_qos_allowed(Config) -> ConnFun = ?config(conn_fun, Config), @@ -505,9 +480,8 @@ t_connack_max_qos_allowed(Config) -> Topic = nth(1, ?TOPICS), %% max_qos_allowed = 0 - emqx_zone:set_env(external, max_qos_allowed, 0), - persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), - persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), + emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_qos_allowed], 0), + emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, max_qos_allowed], 0), {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]), {ok, Connack1} = emqtt:ConnFun(Client1), @@ -532,9 +506,8 @@ t_connack_max_qos_allowed(Config) -> waiting_client_process_exit(Client2), %% max_qos_allowed = 1 - emqx_zone:set_env(external, max_qos_allowed, 1), - persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), - persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), + emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_qos_allowed], 1), + emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, max_qos_allowed], 1), {ok, Client3} = emqtt:start_link([{proto_ver, v5} | Config]), {ok, Connack3} = emqtt:ConnFun(Client3), @@ -559,9 +532,8 @@ t_connack_max_qos_allowed(Config) -> waiting_client_process_exit(Client4), %% max_qos_allowed = 2 - emqx_zone:set_env(external, max_qos_allowed, 2), - persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), - persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), + emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_qos_allowed], 2), + emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, max_qos_allowed], 2), {ok, Client5} = emqtt:start_link([{proto_ver, v5} | Config]), {ok, Connack5} = emqtt:ConnFun(Client5),