fix(test): update the testcases for emqx_mqtt_protocol_v5_SUITE

This commit is contained in:
Shawn 2021-07-13 16:37:18 +08:00
parent 6d9918d3e5
commit 868b31d123
3 changed files with 29 additions and 54 deletions

View File

@ -466,14 +466,13 @@ handle_msg({Passive, _Sock}, State)
handle_msg(Deliver = {deliver, _Topic, _Msg}, #state{zone = Zone,
listener = Listener} = State) ->
ActiveN = emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]),
ActiveN = get_active_n(Zone, Listener),
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State);
%% Something sent
handle_msg({inet_reply, _Sock, ok}, State = #state{zone = Zone, listener = Listener}) ->
case emqx_pd:get_counter(outgoing_pubs) >
emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]) of
case emqx_pd:get_counter(outgoing_pubs) > get_active_n(Zone, Listener) of
true ->
Pubs = emqx_pd:reset_counter(outgoing_pubs),
Bytes = emqx_pd:reset_counter(outgoing_bytes),
@ -823,7 +822,7 @@ activate_socket(State = #state{sockstate = blocked}) ->
{ok, State};
activate_socket(State = #state{transport = Transport, socket = Socket,
zone = Zone, listener = Listener}) ->
ActiveN = emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n]),
ActiveN = get_active_n(Zone, Listener),
case Transport:setopts(Socket, [{active, ActiveN}]) of
ok -> {ok, State#state{sockstate = running}};
Error -> Error
@ -905,3 +904,9 @@ get_state(Pid) ->
State = sys:get_state(Pid),
maps:from_list(lists:zip(record_info(fields, state),
tl(tuple_to_list(State)))).
get_active_n(Zone, Listener) ->
case emqx_config:get([zones, Zone, listeners, Listener, type]) of
quic -> 100;
_ -> emqx_config:get_listener_conf(Zone, Listener, [tcp, active_n])
end.

View File

@ -21,6 +21,4 @@
]).
new_conn(Conn, {_L, COpts, _S}) when is_map(COpts) ->
new_conn(Conn, maps:to_list(COpts));
new_conn(Conn, COpts) ->
emqx_connection:start_link(emqx_quic_stream, Conn, COpts).

View File

@ -217,10 +217,14 @@ t_connect_will_message(Config) ->
ok = emqtt:disconnect(Client4).
t_batch_subscribe(init, Config) ->
emqx_config:put_listener_conf(default, mqtt_tcp, [acl, enable], true),
emqx_config:put_listener_conf(default, mqtt_quic, [acl, enable], true),
ok = meck:new(emqx_access_control, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_access_control, authorize, fun(_, _, _) -> deny end),
Config;
t_batch_subscribe('end', _Config) ->
emqx_config:put_listener_conf(default, mqtt_tcp, [acl, enable], false),
emqx_config:put_listener_conf(default, mqtt_quic, [acl, enable], false),
meck:unload(emqx_access_control).
t_batch_subscribe(Config) ->
@ -284,52 +288,22 @@ t_connect_will_retain(Config) ->
t_connect_idle_timeout(_Config) ->
IdleTimeout = 2000,
emqx_zone:set_env(external, idle_timeout, IdleTimeout),
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, idle_timeout], IdleTimeout),
emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, idle_timeout], IdleTimeout),
{ok, Sock} = emqtt_sock:connect({127,0,0,1}, 1883, [], 60000),
timer:sleep(IdleTimeout),
?assertMatch({error, closed}, emqtt_sock:recv(Sock,1024)).
t_connect_limit_timeout(init, Config) ->
ok = meck:new(proplists, [non_strict, passthrough, no_history, no_link, unstick]),
meck:expect(proplists, get_value, fun(active_n, _Options, _Default) -> 1;
(Arg1, ARg2, Arg3) -> meck:passthrough([Arg1, ARg2, Arg3])
end),
Config;
t_connect_limit_timeout('end', _Config) ->
catch meck:unload(proplists).
t_connect_limit_timeout(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = nth(1, ?TOPICS),
emqx_zone:set_env(external, publish_limit, {3, 5}),
{ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60} | Config]),
{ok, _} = emqtt:ConnFun(Client),
[ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)),
?assertEqual(undefined, emqx_connection:info(limit_timer, sys:get_state(ClientPid))),
Payload = <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>,
{ok, 2} = emqtt:publish(Client, Topic, Payload, 1),
{ok, 3} = emqtt:publish(Client, Topic, Payload, 1),
{ok, 4} = emqtt:publish(Client, Topic, Payload, 1),
timer:sleep(250),
?assert(is_reference(emqx_connection:info(limit_timer, sys:get_state(ClientPid)))),
ok = emqtt:disconnect(Client),
emqx_zone:set_env(external, publish_limit, undefined),
meck:unload(proplists).
t_connect_emit_stats_timeout(init, Config) ->
NewIdleTimeout = 1000,
OldIdleTimeout = emqx_zone:get_env(external, idle_timeout),
emqx_zone:set_env(external, idle_timeout, NewIdleTimeout),
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, idle_timeout], NewIdleTimeout),
emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, idle_timeout], NewIdleTimeout),
ok = snabbkaffe:start_trace(),
[{idle_timeout, NewIdleTimeout}, {old_idle_timeout, OldIdleTimeout} | Config];
t_connect_emit_stats_timeout('end', Config) ->
[{idle_timeout, NewIdleTimeout} | Config];
t_connect_emit_stats_timeout('end', _Config) ->
snabbkaffe:stop(),
{_, OldIdleTimeout} = lists:keyfind(old_idle_timeout, 1, Config),
emqx_zone:set_env(external, idle_timeout, OldIdleTimeout),
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, idle_timeout], 15000),
emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, idle_timeout], 15000),
ok.
t_connect_emit_stats_timeout(Config) ->
@ -497,7 +471,8 @@ t_connack_session_present(Config) ->
t_connack_max_qos_allowed(init, Config) ->
Config;
t_connack_max_qos_allowed('end', _Config) ->
emqx_zone:set_env(external, max_qos_allowed, 2),
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_qos_allowed], 2),
emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, max_qos_allowed], 2),
ok.
t_connack_max_qos_allowed(Config) ->
ConnFun = ?config(conn_fun, Config),
@ -505,9 +480,8 @@ t_connack_max_qos_allowed(Config) ->
Topic = nth(1, ?TOPICS),
%% max_qos_allowed = 0
emqx_zone:set_env(external, max_qos_allowed, 0),
persistent_term:erase({emqx_zone, external, '$mqtt_caps'}),
persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}),
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_qos_allowed], 0),
emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, max_qos_allowed], 0),
{ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, Connack1} = emqtt:ConnFun(Client1),
@ -532,9 +506,8 @@ t_connack_max_qos_allowed(Config) ->
waiting_client_process_exit(Client2),
%% max_qos_allowed = 1
emqx_zone:set_env(external, max_qos_allowed, 1),
persistent_term:erase({emqx_zone, external, '$mqtt_caps'}),
persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}),
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_qos_allowed], 1),
emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, max_qos_allowed], 1),
{ok, Client3} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, Connack3} = emqtt:ConnFun(Client3),
@ -559,9 +532,8 @@ t_connack_max_qos_allowed(Config) ->
waiting_client_process_exit(Client4),
%% max_qos_allowed = 2
emqx_zone:set_env(external, max_qos_allowed, 2),
persistent_term:erase({emqx_zone, external, '$mqtt_caps'}),
persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}),
emqx_config:put_listener_conf(default, mqtt_tcp, [mqtt, max_qos_allowed], 2),
emqx_config:put_listener_conf(default, mqtt_quic, [mqtt, max_qos_allowed], 2),
{ok, Client5} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, Connack5} = emqtt:ConnFun(Client5),