feat(quic): Add tests and support more listener option

- support option `active_n`
- add quic group in emqx_mqtt_protocol_v5_SUITE
- fix rate limit
This commit is contained in:
William Yang 2021-06-29 16:20:58 +02:00 committed by turtleDeng
parent 41870a00b3
commit 82d0f2b016
5 changed files with 276 additions and 211 deletions

View File

@ -447,7 +447,7 @@ handle_msg({Closed, _Sock}, State)
handle_info({sock_closed, Closed}, close_socket(State)); handle_info({sock_closed, Closed}, close_socket(State));
handle_msg({Passive, _Sock}, State) handle_msg({Passive, _Sock}, State)
when Passive == tcp_passive; Passive == ssl_passive -> when Passive == tcp_passive; Passive == ssl_passive; Passive =:= quic_passive ->
%% In Stats %% In Stats
Pubs = emqx_pd:reset_counter(incoming_pubs), Pubs = emqx_pd:reset_counter(incoming_pubs),
Bytes = emqx_pd:reset_counter(incoming_bytes), Bytes = emqx_pd:reset_counter(incoming_bytes),
@ -738,9 +738,15 @@ handle_info({sock_error, Reason}, State) ->
end, end,
handle_info({sock_closed, Reason}, close_socket(State)); handle_info({sock_closed, Reason}, close_socket(State));
handle_info({quic, peer_send_shutdown, _Stream}, State) ->
handle_info({sock_closed, force}, close_socket(State));
handle_info({quic, closed, _Channel, ReasonFlag}, State) -> handle_info({quic, closed, _Channel, ReasonFlag}, State) ->
handle_info({sock_closed, ReasonFlag}, State); handle_info({sock_closed, ReasonFlag}, State);
handle_info({quic, closed, _Stream}, State) ->
handle_info({sock_closed, force}, State);
handle_info(Info, State) -> handle_info(Info, State) ->
with_channel(handle_info, [Info], State). with_channel(handle_info, [Info], State).

View File

@ -157,6 +157,7 @@ start_listener(quic, ListenOn, Options) ->
ConnectionOpts = [ {conn_callback, emqx_quic_connection} ConnectionOpts = [ {conn_callback, emqx_quic_connection}
, {peer_unidi_stream_count, 1} , {peer_unidi_stream_count, 1}
, {peer_bidi_stream_count, 10} , {peer_bidi_stream_count, 10}
| Options
], ],
StreamOpts = [], StreamOpts = [],
quicer:start_listener('mqtt:quic', ListenOn, {ListenOpts, ConnectionOpts, StreamOpts}). quicer:start_listener('mqtt:quic', ListenOn, {ListenOpts, ConnectionOpts, StreamOpts}).

View File

@ -52,7 +52,8 @@ getstat(Socket, Stats) ->
Res -> Res Res -> Res
end. end.
setopts(_Socket, _Opts) -> setopts(Socket, Opts) ->
[ quicer:setopt(Socket, Opt, V) || {Opt, V} <- Opts ],
ok. ok.
getopts(_Socket, _Opts) -> getopts(_Socket, _Opts) ->
@ -64,7 +65,7 @@ getopts(_Socket, _Opts) ->
{buffer,80000}]}. {buffer,80000}]}.
fast_close(Stream) -> fast_close(Stream) ->
quicer:close_stream(Stream), quicer:async_close_stream(Stream),
%% Stream might be closed already. %% Stream might be closed already.
ok. ok.

View File

@ -392,8 +392,7 @@ fields("wss_listener_settings") ->
lists:keydelete("high_watermark", 1, Settings); lists:keydelete("high_watermark", 1, Settings);
fields("quic_listener_settings") -> fields("quic_listener_settings") ->
Unsupported = [ "active_n" Unsupported = [ "access"
, "access"
, "proxy_protocol" , "proxy_protocol"
, "proxy_protocol_timeout" , "proxy_protocol_timeout"
, "backlog" , "backlog"

View File

@ -23,6 +23,7 @@
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("common_test/include/ct.hrl").
-import(lists, [nth/2]). -import(lists, [nth/2]).
@ -32,18 +33,37 @@
-define(WILD_TOPICS, [<<"TopicA/+">>, <<"+/C">>, <<"#">>, <<"/#">>, <<"/+">>, -define(WILD_TOPICS, [<<"TopicA/+">>, <<"+/C">>, <<"#">>, <<"/#">>, <<"/+">>,
<<"+/+">>, <<"TopicA/#">>]). <<"+/+">>, <<"TopicA/#">>]).
all() -> emqx_ct:all(?MODULE). all() ->
[ {group, tcp}
, {group, quic}
].
groups() ->
TCs = emqx_ct:all(?MODULE),
[ {tcp, [], TCs}
, {quic, [], TCs}
].
init_per_group(tcp, Config) ->
emqx_ct_helpers:start_apps([]),
[ {port, 1883}, {conn_fun, connect} | Config];
init_per_group(quic, Config) ->
emqx_ct_helpers:start_apps([]),
[ {port, 14567}, {conn_fun, quic_connect} | Config];
init_per_group(_, Config) ->
emqx_ct_helpers:stop_apps([]),
Config.
end_per_group(_Group, _Config) ->
ok.
init_per_suite(Config) -> init_per_suite(Config) ->
%% Meck emqtt
ok = meck:new(emqtt, [non_strict, passthrough, no_history, no_link]),
%% Start Apps %% Start Apps
emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]), emqx_ct_helpers:start_apps([]),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
ok = meck:unload(emqtt),
emqx_ct_helpers:stop_apps([]). emqx_ct_helpers:stop_apps([]).
init_per_testcase(TestCase, Config) -> init_per_testcase(TestCase, Config) ->
@ -97,9 +117,10 @@ waiting_client_process_exit(C) ->
1000 -> error({waiting_timeout, C}) 1000 -> error({waiting_timeout, C})
end. end.
clean_retained(Topic) -> clean_retained(Topic, Config) ->
{ok, Clean} = emqtt:start_link([{clean_start, true}]), ConnFun = ?config(conn_fun, Config),
{ok, _} = emqtt:connect(Clean), {ok, Clean} = emqtt:start_link([{clean_start, true} | Config]),
{ok, _} = emqtt:ConnFun(Clean),
{ok, _} = emqtt:publish(Clean, Topic, #{}, <<"">>, [{qos, ?QOS_1}, {retain, true}]), {ok, _} = emqtt:publish(Clean, Topic, #{}, <<"">>, [{qos, ?QOS_1}, {retain, true}]),
ok = emqtt:disconnect(Clean). ok = emqtt:disconnect(Clean).
@ -107,11 +128,12 @@ clean_retained(Topic) ->
%% Test Cases %% Test Cases
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_basic_test(_) -> t_basic_test(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = nth(1, ?TOPICS), Topic = nth(1, ?TOPICS),
ct:print("Basic test starting"), ct:print("Basic test starting"),
{ok, C} = emqtt:start_link([{proto_ver, v5}]), {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:connect(C), {ok, _} = emqtt:ConnFun(C),
{ok, _, [1]} = emqtt:subscribe(C, Topic, qos1), {ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),
{ok, _, [2]} = emqtt:subscribe(C, Topic, qos2), {ok, _, [2]} = emqtt:subscribe(C, Topic, qos2),
{ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2), {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2),
@ -124,16 +146,17 @@ t_basic_test(_) ->
%% Connection %% Connection
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_connect_clean_start(_) -> t_connect_clean_start(Config) ->
ConnFun = ?config(conn_fun, Config),
process_flag(trap_exit, true), process_flag(trap_exit, true),
{ok, Client1} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>}, {ok, Client1} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>},
{proto_ver, v5},{clean_start, true}]), {proto_ver, v5},{clean_start, true} | Config]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:ConnFun(Client1),
?assertEqual(0, client_info(session_present, Client1)), %% [MQTT-3.1.2-4] ?assertEqual(0, client_info(session_present, Client1)), %% [MQTT-3.1.2-4]
ok = emqtt:pause(Client1), ok = emqtt:pause(Client1),
{ok, Client2} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>}, {ok, Client2} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>},
{proto_ver, v5},{clean_start, false}]), {proto_ver, v5},{clean_start, false} | Config]),
{ok, _} = emqtt:connect(Client2), {ok, _} = emqtt:ConnFun(Client2),
?assertEqual(1, client_info(session_present, Client2)), %% [MQTT-3.1.2-5] ?assertEqual(1, client_info(session_present, Client2)), %% [MQTT-3.1.2-5]
?assertEqual(142, receive_disconnect_reasoncode()), ?assertEqual(142, receive_disconnect_reasoncode()),
waiting_client_process_exit(Client1), waiting_client_process_exit(Client1),
@ -142,32 +165,32 @@ t_connect_clean_start(_) ->
waiting_client_process_exit(Client2), waiting_client_process_exit(Client2),
{ok, Client3} = emqtt:start_link([{clientid, <<"new_client">>}, {ok, Client3} = emqtt:start_link([{clientid, <<"new_client">>},
{proto_ver, v5},{clean_start, false}]), {proto_ver, v5},{clean_start, false} | Config]),
{ok, _} = emqtt:connect(Client3), {ok, _} = emqtt:ConnFun(Client3),
?assertEqual(0, client_info(session_present, Client3)), %% [MQTT-3.1.2-6] ?assertEqual(0, client_info(session_present, Client3)), %% [MQTT-3.1.2-6]
ok = emqtt:disconnect(Client3), ok = emqtt:disconnect(Client3),
waiting_client_process_exit(Client3), waiting_client_process_exit(Client3),
process_flag(trap_exit, false). process_flag(trap_exit, false).
t_connect_will_message(_) -> t_connect_will_message(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = nth(1, ?TOPICS), Topic = nth(1, ?TOPICS),
Payload = "will message", Payload = "will message",
{ok, Client1} = emqtt:start_link([ {ok, Client1} = emqtt:start_link([ {proto_ver, v5},
{proto_ver, v5},
{clean_start, true}, {clean_start, true},
{will_flag, true}, {will_flag, true},
{will_topic, Topic}, {will_topic, Topic},
{will_payload, Payload} {will_payload, Payload} | Config
]), ]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:ConnFun(Client1),
[ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client1)), [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client1)),
Info = emqx_connection:info(sys:get_state(ClientPid)), Info = emqx_connection:info(sys:get_state(ClientPid)),
?assertNotEqual(undefined, maps:find(will_msg, Info)), %% [MQTT-3.1.2-7] ?assertNotEqual(undefined, maps:find(will_msg, Info)), %% [MQTT-3.1.2-7]
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]), {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:connect(Client2), {ok, _} = emqtt:ConnFun(Client2),
{ok, _, [2]} = emqtt:subscribe(Client2, Topic, qos2), {ok, _, [2]} = emqtt:subscribe(Client2, Topic, qos2),
ok = emqtt:disconnect(Client1, 4), %% [MQTT-3.14.2-1] ok = emqtt:disconnect(Client1, 4), %% [MQTT-3.14.2-1]
@ -178,27 +201,32 @@ t_connect_will_message(_) ->
?assertEqual({ok, 0}, maps:find(qos, Msg)), ?assertEqual({ok, 0}, maps:find(qos, Msg)),
ok = emqtt:disconnect(Client2), ok = emqtt:disconnect(Client2),
{ok, Client3} = emqtt:start_link([ {ok, Client3} = emqtt:start_link([ {proto_ver, v5},
{proto_ver, v5},
{clean_start, true}, {clean_start, true},
{will_flag, true}, {will_flag, true},
{will_topic, Topic}, {will_topic, Topic},
{will_payload, Payload} {will_payload, Payload} | Config
]), ]),
{ok, _} = emqtt:connect(Client3), {ok, _} = emqtt:ConnFun(Client3),
{ok, Client4} = emqtt:start_link([{proto_ver, v5}]), {ok, Client4} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:connect(Client4), {ok, _} = emqtt:ConnFun(Client4),
{ok, _, [2]} = emqtt:subscribe(Client4, Topic, qos2), {ok, _, [2]} = emqtt:subscribe(Client4, Topic, qos2),
ok = emqtt:disconnect(Client3), ok = emqtt:disconnect(Client3),
?assertEqual(0, length(receive_messages(1))), %% [MQTT-3.1.2-10] ?assertEqual(0, length(receive_messages(1))), %% [MQTT-3.1.2-10]
ok = emqtt:disconnect(Client4). ok = emqtt:disconnect(Client4).
t_batch_subscribe(_) -> t_batch_subscribe(init, Config) ->
{ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"batch_test">>}]),
{ok, _} = emqtt:connect(Client),
ok = meck:new(emqx_access_control, [non_strict, passthrough, no_history, no_link]), ok = meck:new(emqx_access_control, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_access_control, authorize, fun(_, _, _) -> deny end), meck:expect(emqx_access_control, authorize, fun(_, _, _) -> deny end),
Config;
t_batch_subscribe('end', _Config) ->
meck:unload(emqx_access_control).
t_batch_subscribe(Config) ->
ConnFun = ?config(conn_fun, Config),
{ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"batch_test">>} | Config]),
{ok, _} = emqtt:ConnFun(Client),
{ok, _, [?RC_NOT_AUTHORIZED, {ok, _, [?RC_NOT_AUTHORIZED,
?RC_NOT_AUTHORIZED, ?RC_NOT_AUTHORIZED,
?RC_NOT_AUTHORIZED]} = emqtt:subscribe(Client, [{<<"t1">>, qos1}, ?RC_NOT_AUTHORIZED]} = emqtt:subscribe(Client, [{<<"t1">>, qos1},
@ -209,25 +237,25 @@ t_batch_subscribe(_) ->
?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(Client, [<<"t1">>, ?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(Client, [<<"t1">>,
<<"t2">>, <<"t2">>,
<<"t3">>]), <<"t3">>]),
meck:unload(emqx_access_control),
emqtt:disconnect(Client). emqtt:disconnect(Client).
t_connect_will_retain(_) -> t_connect_will_retain(Config) ->
ConnFun = ?config(conn_fun, Config),
process_flag(trap_exit, true),
Topic = nth(1, ?TOPICS), Topic = nth(1, ?TOPICS),
Payload = "will message", Payload = "will message",
{ok, Client1} = emqtt:start_link([ {ok, Client1} = emqtt:start_link([ {proto_ver, v5},
{proto_ver, v5},
{clean_start, true}, {clean_start, true},
{will_flag, true}, {will_flag, true},
{will_topic, Topic}, {will_topic, Topic},
{will_payload, Payload}, {will_payload, Payload},
{will_retain, false} {will_retain, false} | Config
]), ]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:ConnFun(Client1),
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]), {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:connect(Client2), {ok, _} = emqtt:ConnFun(Client2),
{ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{rap, true}, {qos, 2}]}]), {ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{rap, true}, {qos, 2}]}]),
ok = emqtt:disconnect(Client1, 4), ok = emqtt:disconnect(Client1, 4),
@ -235,27 +263,26 @@ t_connect_will_retain(_) ->
?assertEqual({ok, false}, maps:find(retain, Msg1)), %% [MQTT-3.1.2-14] ?assertEqual({ok, false}, maps:find(retain, Msg1)), %% [MQTT-3.1.2-14]
ok = emqtt:disconnect(Client2), ok = emqtt:disconnect(Client2),
{ok, Client3} = emqtt:start_link([ {ok, Client3} = emqtt:start_link([ {proto_ver, v5},
{proto_ver, v5},
{clean_start, true}, {clean_start, true},
{will_flag, true}, {will_flag, true},
{will_topic, Topic}, {will_topic, Topic},
{will_payload, Payload}, {will_payload, Payload},
{will_retain, true} {will_retain, true} | Config
]), ]),
{ok, _} = emqtt:connect(Client3), {ok, _} = emqtt:ConnFun(Client3),
{ok, Client4} = emqtt:start_link([{proto_ver, v5}]), {ok, Client4} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:connect(Client4), {ok, _} = emqtt:ConnFun(Client4),
{ok, _, [2]} = emqtt:subscribe(Client4, #{}, [{Topic, [{rap, true}, {qos, 2}]}]), {ok, _, [2]} = emqtt:subscribe(Client4, #{}, [{Topic, [{rap, true}, {qos, 2}]}]),
ok = emqtt:disconnect(Client3, 4), ok = emqtt:disconnect(Client3, 4),
[Msg2 | _ ] = receive_messages(1), [Msg2 | _ ] = receive_messages(1),
?assertEqual({ok, true}, maps:find(retain, Msg2)), %% [MQTT-3.1.2-15] ?assertEqual({ok, true}, maps:find(retain, Msg2)), %% [MQTT-3.1.2-15]
ok = emqtt:disconnect(Client4), ok = emqtt:disconnect(Client4),
clean_retained(Topic). clean_retained(Topic, Config).
t_connect_idle_timeout(_) -> t_connect_idle_timeout(_Config) ->
IdleTimeout = 2000, IdleTimeout = 2000,
emqx_zone:set_env(external, idle_timeout, IdleTimeout), emqx_zone:set_env(external, idle_timeout, IdleTimeout),
@ -263,25 +290,30 @@ t_connect_idle_timeout(_) ->
timer:sleep(IdleTimeout), timer:sleep(IdleTimeout),
?assertMatch({error, closed}, emqtt_sock:recv(Sock,1024)). ?assertMatch({error, closed}, emqtt_sock:recv(Sock,1024)).
t_connect_limit_timeout(_) -> t_connect_limit_timeout(init, Config) ->
ok = meck:new(proplists, [non_strict, passthrough, no_history, no_link, unstick]), ok = meck:new(proplists, [non_strict, passthrough, no_history, no_link, unstick]),
meck:expect(proplists, get_value, fun(active_n, _Options, _Default) -> 1; meck:expect(proplists, get_value, fun(active_n, _Options, _Default) -> 1;
(Arg1, ARg2, Arg3) -> meck:passthrough([Arg1, ARg2, Arg3]) (Arg1, ARg2, Arg3) -> meck:passthrough([Arg1, ARg2, Arg3])
end), end),
Config;
t_connect_limit_timeout('end', _Config) ->
catch meck:unload(proplists).
t_connect_limit_timeout(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = nth(1, ?TOPICS), Topic = nth(1, ?TOPICS),
emqx_zone:set_env(external, publish_limit, {3, 5}), emqx_zone:set_env(external, publish_limit, {3, 5}),
{ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]), {ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60} | Config]),
{ok, _} = emqtt:connect(Client), {ok, _} = emqtt:ConnFun(Client),
[ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)), [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)),
?assertEqual(undefined, emqx_connection:info(limit_timer, sys:get_state(ClientPid))), ?assertEqual(undefined, emqx_connection:info(limit_timer, sys:get_state(ClientPid))),
Payload = <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, Payload = <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>,
ok = emqtt:publish(Client, Topic, Payload, 0), {ok, 2} = emqtt:publish(Client, Topic, Payload, 1),
ok = emqtt:publish(Client, Topic, Payload, 0), {ok, 3} = emqtt:publish(Client, Topic, Payload, 1),
ok = emqtt:publish(Client, Topic, Payload, 0), {ok, 4} = emqtt:publish(Client, Topic, Payload, 1),
timer:sleep(200), timer:sleep(250),
?assert(is_reference(emqx_connection:info(limit_timer, sys:get_state(ClientPid)))), ?assert(is_reference(emqx_connection:info(limit_timer, sys:get_state(ClientPid)))),
ok = emqtt:disconnect(Client), ok = emqtt:disconnect(Client),
@ -301,9 +333,10 @@ t_connect_emit_stats_timeout('end', Config) ->
ok. ok.
t_connect_emit_stats_timeout(Config) -> t_connect_emit_stats_timeout(Config) ->
ConnFun = ?config(conn_fun, Config),
{_, IdleTimeout} = lists:keyfind(idle_timeout, 1, Config), {_, IdleTimeout} = lists:keyfind(idle_timeout, 1, Config),
{ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]), {ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60} | Config]),
{ok, _} = emqtt:connect(Client), {ok, _} = emqtt:ConnFun(Client),
[ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)), [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)),
?assert(is_reference(emqx_connection:info(stats_timer, sys:get_state(ClientPid)))), ?assert(is_reference(emqx_connection:info(stats_timer, sys:get_state(ClientPid)))),
?block_until(#{?snk_kind := cancel_stats_timer}, IdleTimeout * 2, _BackInTime = 0), ?block_until(#{?snk_kind := cancel_stats_timer}, IdleTimeout * 2, _BackInTime = 0),
@ -311,15 +344,16 @@ t_connect_emit_stats_timeout(Config) ->
ok = emqtt:disconnect(Client). ok = emqtt:disconnect(Client).
%% [MQTT-3.1.2-22] %% [MQTT-3.1.2-22]
t_connect_keepalive_timeout(_) -> t_connect_keepalive_timeout(Config) ->
ConnFun = ?config(conn_fun, Config),
%% Prevent the emqtt client bringing us down on the disconnect. %% Prevent the emqtt client bringing us down on the disconnect.
process_flag(trap_exit, true), process_flag(trap_exit, true),
Keepalive = 2, Keepalive = 2,
{ok, Client} = emqtt:start_link([{proto_ver, v5}, {ok, Client} = emqtt:start_link([{proto_ver, v5},
{keepalive, Keepalive}]), {keepalive, Keepalive} | Config]),
{ok, _} = emqtt:connect(Client), {ok, _} = emqtt:ConnFun(Client),
emqtt:pause(Client), emqtt:pause(Client),
receive receive
{disconnected, ReasonCode, _Channel} -> ?assertEqual(141, ReasonCode) {disconnected, ReasonCode, _Channel} -> ?assertEqual(141, ReasonCode)
@ -328,30 +362,30 @@ t_connect_keepalive_timeout(_) ->
end. end.
%% [MQTT-3.1.2-23] %% [MQTT-3.1.2-23]
t_connect_session_expiry_interval(_) -> t_connect_session_expiry_interval(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = nth(1, ?TOPICS), Topic = nth(1, ?TOPICS),
Payload = "test message", Payload = "test message",
{ok, Client1} = emqtt:start_link([ {ok, Client1} = emqtt:start_link([ {clientid, <<"t_connect_session_expiry_interval">>},
{clientid, <<"t_connect_session_expiry_interval">>},
{proto_ver, v5}, {proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 7200}} {properties, #{'Session-Expiry-Interval' => 7200}}
| Config
]), ]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:ConnFun(Client1),
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2), {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2),
ok = emqtt:disconnect(Client1), ok = emqtt:disconnect(Client1),
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]), {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:connect(Client2), {ok, _} = emqtt:ConnFun(Client2),
{ok, 2} = emqtt:publish(Client2, Topic, Payload, 2), {ok, 2} = emqtt:publish(Client2, Topic, Payload, 2),
ok = emqtt:disconnect(Client2), ok = emqtt:disconnect(Client2),
{ok, Client3} = emqtt:start_link([ {ok, Client3} = emqtt:start_link([ {clientid, <<"t_connect_session_expiry_interval">>},
{clientid, <<"t_connect_session_expiry_interval">>},
{proto_ver, v5}, {proto_ver, v5},
{clean_start, false} {clean_start, false} | Config
]), ]),
{ok, _} = emqtt:connect(Client3), {ok, _} = emqtt:ConnFun(Client3),
[Msg | _ ] = receive_messages(1), [Msg | _ ] = receive_messages(1),
?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)), ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)),
?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)), ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)),
@ -360,13 +394,13 @@ t_connect_session_expiry_interval(_) ->
%% [MQTT-3.1.3-9] %% [MQTT-3.1.3-9]
%% !!!REFACTOR NEED: %% !!!REFACTOR NEED:
%t_connect_will_delay_interval(_) -> %t_connect_will_delay_interval(Config) ->
% process_flag(trap_exit, true), % process_flag(trap_exit, true),
% Topic = nth(1, ?TOPICS), % Topic = nth(1, ?TOPICS),
% Payload = "will message", % Payload = "will message",
% %
% {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), % {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
% {ok, _} = emqtt:connect(Client1), % {ok, _} = emqtt:ConnFun(Client1),
% {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2), % {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2),
% %
% {ok, Client2} = emqtt:start_link([ % {ok, Client2} = emqtt:start_link([
@ -379,9 +413,9 @@ t_connect_session_expiry_interval(_) ->
% {will_payload, Payload}, % {will_payload, Payload},
% {will_props, #{'Will-Delay-Interval' => 3}}, % {will_props, #{'Will-Delay-Interval' => 3}},
% {properties, #{'Session-Expiry-Interval' => 7200}}, % {properties, #{'Session-Expiry-Interval' => 7200}},
% {keepalive, 2} % {keepalive, 2} | Config
% ]), % ]),
% {ok, _} = emqtt:connect(Client2), % {ok, _} = emqtt:ConnFun(Client2),
% timer:sleep(50), % timer:sleep(50),
% erlang:exit(Client2, kill), % erlang:exit(Client2, kill),
% timer:sleep(2000), % timer:sleep(2000),
@ -399,9 +433,9 @@ t_connect_session_expiry_interval(_) ->
% {will_payload, Payload}, % {will_payload, Payload},
% {will_props, #{'Will-Delay-Interval' => 7200}}, % {will_props, #{'Will-Delay-Interval' => 7200}},
% {properties, #{'Session-Expiry-Interval' => 3}}, % {properties, #{'Session-Expiry-Interval' => 3}},
% {keepalive, 2} % {keepalive, 2} | Config
% ]), % ]),
% {ok, _} = emqtt:connect(Client3), % {ok, _} = emqtt:ConnFun(Client3),
% timer:sleep(50), % timer:sleep(50),
% erlang:exit(Client3, kill), % erlang:exit(Client3, kill),
% %
@ -418,18 +452,17 @@ t_connect_session_expiry_interval(_) ->
% process_flag(trap_exit, false). % process_flag(trap_exit, false).
%% [MQTT-3.1.4-3] %% [MQTT-3.1.4-3]
t_connect_duplicate_clientid(_) -> t_connect_duplicate_clientid(Config) ->
ConnFun = ?config(conn_fun, Config),
process_flag(trap_exit, true), process_flag(trap_exit, true),
{ok, Client1} = emqtt:start_link([ {ok, Client1} = emqtt:start_link([ {clientid, <<"t_connect_duplicate_clientid">>},
{clientid, <<"t_connect_duplicate_clientid">>}, {proto_ver, v5} | Config
{proto_ver, v5}
]), ]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:ConnFun(Client1),
{ok, Client2} = emqtt:start_link([ {ok, Client2} = emqtt:start_link([ {clientid, <<"t_connect_duplicate_clientid">>},
{clientid, <<"t_connect_duplicate_clientid">>}, {proto_ver, v5} | Config
{proto_ver, v5}
]), ]),
{ok, _} = emqtt:connect(Client2), {ok, _} = emqtt:ConnFun(Client2),
?assertEqual(142, receive_disconnect_reasoncode()), ?assertEqual(142, receive_disconnect_reasoncode()),
waiting_client_process_exit(Client1), waiting_client_process_exit(Client1),
@ -441,28 +474,33 @@ t_connect_duplicate_clientid(_) ->
%% Connack %% Connack
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_connack_session_present(_) -> t_connack_session_present(Config) ->
{ok, Client1} = emqtt:start_link([ ConnFun = ?config(conn_fun, Config),
{clientid, <<"t_connect_duplicate_clientid">>}, {ok, Client1} = emqtt:start_link([ {clientid, <<"t_connect_duplicate_clientid">>},
{proto_ver, v5}, {proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 7200}}, {properties, #{'Session-Expiry-Interval' => 7200}},
{clean_start, true} {clean_start, true} | Config
]), ]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:ConnFun(Client1),
?assertEqual(0, client_info(session_present, Client1)), %% [MQTT-3.2.2-2] ?assertEqual(0, client_info(session_present, Client1)), %% [MQTT-3.2.2-2]
ok = emqtt:disconnect(Client1), ok = emqtt:disconnect(Client1),
{ok, Client2} = emqtt:start_link([ {ok, Client2} = emqtt:start_link([ {clientid, <<"t_connect_duplicate_clientid">>},
{clientid, <<"t_connect_duplicate_clientid">>},
{proto_ver, v5}, {proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 7200}}, {properties, #{'Session-Expiry-Interval' => 7200}},
{clean_start, false} {clean_start, false} | Config
]), ]),
{ok, _} = emqtt:connect(Client2), {ok, _} = emqtt:ConnFun(Client2),
?assertEqual(1, client_info(session_present, Client2)), %% [[MQTT-3.2.2-3]] ?assertEqual(1, client_info(session_present, Client2)), %% [[MQTT-3.2.2-3]]
ok = emqtt:disconnect(Client2). ok = emqtt:disconnect(Client2).
t_connack_max_qos_allowed(_) -> t_connack_max_qos_allowed(init, Config) ->
Config;
t_connack_max_qos_allowed('end', _Config) ->
emqx_zone:set_env(external, max_qos_allowed, 2),
ok.
t_connack_max_qos_allowed(Config) ->
ConnFun = ?config(conn_fun, Config),
process_flag(trap_exit, true), process_flag(trap_exit, true),
Topic = nth(1, ?TOPICS), Topic = nth(1, ?TOPICS),
@ -471,8 +509,8 @@ t_connack_max_qos_allowed(_) ->
persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), persistent_term:erase({emqx_zone, external, '$mqtt_caps'}),
persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}),
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, Connack1} = emqtt:connect(Client1), {ok, Connack1} = emqtt:ConnFun(Client1),
?assertEqual(0, maps:get('Maximum-QoS', Connack1)), %% [MQTT-3.2.2-9] ?assertEqual(0, maps:get('Maximum-QoS', Connack1)), %% [MQTT-3.2.2-9]
{ok, _, [0]} = emqtt:subscribe(Client1, Topic, 0), %% [MQTT-3.2.2-10] {ok, _, [0]} = emqtt:subscribe(Client1, Topic, 0), %% [MQTT-3.2.2-10]
@ -483,14 +521,13 @@ t_connack_max_qos_allowed(_) ->
?assertEqual(155, receive_disconnect_reasoncode()), %% [MQTT-3.2.2-11] ?assertEqual(155, receive_disconnect_reasoncode()), %% [MQTT-3.2.2-11]
waiting_client_process_exit(Client1), waiting_client_process_exit(Client1),
{ok, Client2} = emqtt:start_link([ {ok, Client2} = emqtt:start_link([ {proto_ver, v5},
{proto_ver, v5},
{will_flag, true}, {will_flag, true},
{will_topic, Topic}, {will_topic, Topic},
{will_payload, <<"Unsupported Qos">>}, {will_payload, <<"Unsupported Qos">>},
{will_qos, 2} {will_qos, 2} | Config
]), ]),
{error, Connack2} = emqtt:connect(Client2), {error, Connack2} = emqtt:ConnFun(Client2),
?assertMatch({qos_not_supported, _}, Connack2), %% [MQTT-3.2.2-12] ?assertMatch({qos_not_supported, _}, Connack2), %% [MQTT-3.2.2-12]
waiting_client_process_exit(Client2), waiting_client_process_exit(Client2),
@ -499,8 +536,8 @@ t_connack_max_qos_allowed(_) ->
persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), persistent_term:erase({emqx_zone, external, '$mqtt_caps'}),
persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}),
{ok, Client3} = emqtt:start_link([{proto_ver, v5}]), {ok, Client3} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, Connack3} = emqtt:connect(Client3), {ok, Connack3} = emqtt:ConnFun(Client3),
?assertEqual(1, maps:get('Maximum-QoS', Connack3)), %% [MQTT-3.2.2-9] ?assertEqual(1, maps:get('Maximum-QoS', Connack3)), %% [MQTT-3.2.2-9]
{ok, _, [0]} = emqtt:subscribe(Client3, Topic, 0), %% [MQTT-3.2.2-10] {ok, _, [0]} = emqtt:subscribe(Client3, Topic, 0), %% [MQTT-3.2.2-10]
@ -511,14 +548,13 @@ t_connack_max_qos_allowed(_) ->
?assertEqual(155, receive_disconnect_reasoncode()), %% [MQTT-3.2.2-11] ?assertEqual(155, receive_disconnect_reasoncode()), %% [MQTT-3.2.2-11]
waiting_client_process_exit(Client3), waiting_client_process_exit(Client3),
{ok, Client4} = emqtt:start_link([ {ok, Client4} = emqtt:start_link([ {proto_ver, v5},
{proto_ver, v5},
{will_flag, true}, {will_flag, true},
{will_topic, Topic}, {will_topic, Topic},
{will_payload, <<"Unsupported Qos">>}, {will_payload, <<"Unsupported Qos">>},
{will_qos, 2} {will_qos, 2} | Config
]), ]),
{error, Connack4} = emqtt:connect(Client4), {error, Connack4} = emqtt:ConnFun(Client4),
?assertMatch({qos_not_supported, _}, Connack4), %% [MQTT-3.2.2-12] ?assertMatch({qos_not_supported, _}, Connack4), %% [MQTT-3.2.2-12]
waiting_client_process_exit(Client4), waiting_client_process_exit(Client4),
@ -527,17 +563,18 @@ t_connack_max_qos_allowed(_) ->
persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), persistent_term:erase({emqx_zone, external, '$mqtt_caps'}),
persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}),
{ok, Client5} = emqtt:start_link([{proto_ver, v5}]), {ok, Client5} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, Connack5} = emqtt:connect(Client5), {ok, Connack5} = emqtt:ConnFun(Client5),
?assertEqual(undefined, maps:get('Maximum-QoS', Connack5, undefined)), %% [MQTT-3.2.2-9] ?assertEqual(undefined, maps:get('Maximum-QoS', Connack5, undefined)), %% [MQTT-3.2.2-9]
ok = emqtt:disconnect(Client5), ok = emqtt:disconnect(Client5),
waiting_client_process_exit(Client5), waiting_client_process_exit(Client5),
process_flag(trap_exit, false). process_flag(trap_exit, false).
t_connack_assigned_clienid(_) -> t_connack_assigned_clienid(Config) ->
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]), ConnFun = ?config(conn_fun, Config),
{ok, _} = emqtt:connect(Client1), {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:ConnFun(Client1),
?assert(is_binary(client_info(clientid, Client1))), %% [MQTT-3.2.2-16] ?assert(is_binary(client_info(clientid, Client1))), %% [MQTT-3.2.2-16]
ok = emqtt:disconnect(Client1). ok = emqtt:disconnect(Client1).
@ -545,11 +582,12 @@ t_connack_assigned_clienid(_) ->
%% Publish %% Publish
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_publish_rap(_) -> t_publish_rap(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = nth(1, ?TOPICS), Topic = nth(1, ?TOPICS),
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:ConnFun(Client1),
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{rap, true}, {qos, 2}]}]), {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{rap, true}, {qos, 2}]}]),
{ok, _} = emqtt:publish(Client1, Topic, #{}, <<"retained message">>, {ok, _} = emqtt:publish(Client1, Topic, #{}, <<"retained message">>,
[{qos, ?QOS_1}, {retain, true}]), [{qos, ?QOS_1}, {retain, true}]),
@ -557,8 +595,8 @@ t_publish_rap(_) ->
?assertEqual(true, maps:get(retain, Msg1)), %% [MQTT-3.3.1-12] ?assertEqual(true, maps:get(retain, Msg1)), %% [MQTT-3.3.1-12]
ok = emqtt:disconnect(Client1), ok = emqtt:disconnect(Client1),
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]), {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:connect(Client2), {ok, _} = emqtt:ConnFun(Client2),
{ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{rap, false}, {qos, 2}]}]), {ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{rap, false}, {qos, 2}]}]),
{ok, _} = emqtt:publish(Client2, Topic, #{}, <<"retained message">>, {ok, _} = emqtt:publish(Client2, Topic, #{}, <<"retained message">>,
[{qos, ?QOS_1}, {retain, true}]), [{qos, ?QOS_1}, {retain, true}]),
@ -566,44 +604,47 @@ t_publish_rap(_) ->
?assertEqual(false, maps:get(retain, Msg2)), %% [MQTT-3.3.1-13] ?assertEqual(false, maps:get(retain, Msg2)), %% [MQTT-3.3.1-13]
ok = emqtt:disconnect(Client2), ok = emqtt:disconnect(Client2),
clean_retained(Topic). clean_retained(Topic, Config).
t_publish_wildtopic(_) -> t_publish_wildtopic(Config) ->
ConnFun = ?config(conn_fun, Config),
process_flag(trap_exit, true), process_flag(trap_exit, true),
Topic = nth(1, ?WILD_TOPICS), Topic = nth(1, ?WILD_TOPICS),
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:ConnFun(Client1),
ok = emqtt:publish(Client1, Topic, <<"error topic">>), ok = emqtt:publish(Client1, Topic, <<"error topic">>),
?assertEqual(144, receive_disconnect_reasoncode()), ?assertEqual(144, receive_disconnect_reasoncode()),
waiting_client_process_exit(Client1), waiting_client_process_exit(Client1),
process_flag(trap_exit, false). process_flag(trap_exit, false).
t_publish_payload_format_indicator(_) -> t_publish_payload_format_indicator(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = nth(1, ?TOPICS), Topic = nth(1, ?TOPICS),
Properties = #{'Payload-Format-Indicator' => 233}, Properties = #{'Payload-Format-Indicator' => 233},
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:ConnFun(Client1),
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2), {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2),
ok = emqtt:publish(Client1, Topic, Properties, <<"Payload Format Indicator">>, [{qos, ?QOS_0}]), ok = emqtt:publish(Client1, Topic, Properties, <<"Payload Format Indicator">>, [{qos, ?QOS_0}]),
[Msg1 | _] = receive_messages(1), [Msg1 | _] = receive_messages(1),
?assertEqual(Properties, maps:get(properties, Msg1)), %% [MQTT-3.3.2-6] ?assertEqual(Properties, maps:get(properties, Msg1)), %% [MQTT-3.3.2-6]
ok = emqtt:disconnect(Client1). ok = emqtt:disconnect(Client1).
t_publish_topic_alias(_) -> t_publish_topic_alias(Config) ->
ConnFun = ?config(conn_fun, Config),
process_flag(trap_exit, true), process_flag(trap_exit, true),
Topic = nth(1, ?TOPICS), Topic = nth(1, ?TOPICS),
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:ConnFun(Client1),
ok = emqtt:publish(Client1, Topic, #{'Topic-Alias' => 0}, <<"Topic-Alias">>, [{qos, ?QOS_0}]), ok = emqtt:publish(Client1, Topic, #{'Topic-Alias' => 0}, <<"Topic-Alias">>, [{qos, ?QOS_0}]),
?assertEqual(148, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-8] ?assertEqual(148, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-8]
waiting_client_process_exit(Client1), waiting_client_process_exit(Client1),
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]), {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:connect(Client2), {ok, _} = emqtt:ConnFun(Client2),
{ok, _, [2]} = emqtt:subscribe(Client2, Topic, qos2), {ok, _, [2]} = emqtt:subscribe(Client2, Topic, qos2),
ok = emqtt:publish(Client2, Topic, #{'Topic-Alias' => 233}, ok = emqtt:publish(Client2, Topic, #{'Topic-Alias' => 233},
<<"Topic-Alias">>, [{qos, ?QOS_0}]), <<"Topic-Alias">>, [{qos, ?QOS_0}]),
@ -615,12 +656,13 @@ t_publish_topic_alias(_) ->
process_flag(trap_exit, false). process_flag(trap_exit, false).
t_publish_response_topic(_) -> t_publish_response_topic(Config) ->
ConnFun = ?config(conn_fun, Config),
process_flag(trap_exit, true), process_flag(trap_exit, true),
Topic = nth(1, ?TOPICS), Topic = nth(1, ?TOPICS),
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:ConnFun(Client1),
ok = emqtt:publish(Client1, Topic, #{'Response-Topic' => nth(1, ?WILD_TOPICS)}, ok = emqtt:publish(Client1, Topic, #{'Response-Topic' => nth(1, ?WILD_TOPICS)},
<<"Response-Topic">>, [{qos, ?QOS_0}]), <<"Response-Topic">>, [{qos, ?QOS_0}]),
?assertEqual(130, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-14] ?assertEqual(130, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-14]
@ -628,7 +670,8 @@ t_publish_response_topic(_) ->
process_flag(trap_exit, false). process_flag(trap_exit, false).
t_publish_properties(_) -> t_publish_properties(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = nth(1, ?TOPICS), Topic = nth(1, ?TOPICS),
Properties = #{ Properties = #{
'Response-Topic' => Topic, %% [MQTT-3.3.2-15] 'Response-Topic' => Topic, %% [MQTT-3.3.2-15]
@ -637,20 +680,21 @@ t_publish_properties(_) ->
'Content-Type' => <<"2333">> %% [MQTT-3.3.2-20] 'Content-Type' => <<"2333">> %% [MQTT-3.3.2-20]
}, },
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:ConnFun(Client1),
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2), {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2),
ok = emqtt:publish(Client1, Topic, Properties, <<"Publish Properties">>, [{qos, ?QOS_0}]), ok = emqtt:publish(Client1, Topic, Properties, <<"Publish Properties">>, [{qos, ?QOS_0}]),
[Msg1 | _] = receive_messages(1), [Msg1 | _] = receive_messages(1),
?assertEqual(Properties, maps:get(properties, Msg1)), %% [MQTT-3.3.2-16] ?assertEqual(Properties, maps:get(properties, Msg1)), %% [MQTT-3.3.2-16]
ok = emqtt:disconnect(Client1). ok = emqtt:disconnect(Client1).
t_publish_overlapping_subscriptions(_) -> t_publish_overlapping_subscriptions(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = nth(1, ?TOPICS), Topic = nth(1, ?TOPICS),
Properties = #{'Subscription-Identifier' => 2333}, Properties = #{'Subscription-Identifier' => 2333},
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:ConnFun(Client1),
{ok, _, [1]} = emqtt:subscribe(Client1, Properties, nth(1, ?WILD_TOPICS), qos1), {ok, _, [1]} = emqtt:subscribe(Client1, Properties, nth(1, ?WILD_TOPICS), qos1),
{ok, _, [0]} = emqtt:subscribe(Client1, Properties, nth(3, ?WILD_TOPICS), qos0), {ok, _, [0]} = emqtt:subscribe(Client1, Properties, nth(3, ?WILD_TOPICS), qos0),
{ok, _} = emqtt:publish(Client1, Topic, #{}, {ok, _} = emqtt:publish(Client1, Topic, #{},
@ -665,13 +709,15 @@ t_publish_overlapping_subscriptions(_) ->
%% Subsctibe %% Subsctibe
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_subscribe_topic_alias(_) -> t_subscribe_topic_alias(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic1 = nth(1, ?TOPICS), Topic1 = nth(1, ?TOPICS),
Topic2 = nth(2, ?TOPICS), Topic2 = nth(2, ?TOPICS),
{ok, Client1} = emqtt:start_link([{proto_ver, v5}, {ok, Client1} = emqtt:start_link([ {proto_ver, v5},
{properties, #{'Topic-Alias-Maximum' => 1}} {properties, #{'Topic-Alias-Maximum' => 1}}
| Config
]), ]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:ConnFun(Client1),
{ok, _, [2]} = emqtt:subscribe(Client1, Topic1, qos2), {ok, _, [2]} = emqtt:subscribe(Client1, Topic1, qos2),
{ok, _, [2]} = emqtt:subscribe(Client1, Topic2, qos2), {ok, _, [2]} = emqtt:subscribe(Client1, Topic2, qos2),
@ -692,27 +738,29 @@ t_subscribe_topic_alias(_) ->
ok = emqtt:disconnect(Client1). ok = emqtt:disconnect(Client1).
t_subscribe_no_local(_) -> t_subscribe_no_local(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = nth(1, ?TOPICS), Topic = nth(1, ?TOPICS),
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:ConnFun(Client1),
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{nl, true}, {qos, 2}]}]), {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{nl, true}, {qos, 2}]}]),
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]), {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:connect(Client2), {ok, _} = emqtt:ConnFun(Client2),
{ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{nl, true}, {qos, 2}]}]), {ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{nl, true}, {qos, 2}]}]),
ok = emqtt:publish(Client1, Topic, <<"t_subscribe_no_local">>, 0), ok = emqtt:publish(Client1, Topic, <<"t_subscribe_no_local">>, 0),
?assertEqual(1, length(receive_messages(2))), %% [MQTT-3.8.3-3] ?assertEqual(1, length(receive_messages(2))), %% [MQTT-3.8.3-3]
ok = emqtt:disconnect(Client1). ok = emqtt:disconnect(Client1).
t_subscribe_actions(_) -> t_subscribe_actions(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = nth(1, ?TOPICS), Topic = nth(1, ?TOPICS),
Properties = #{'Subscription-Identifier' => 2333}, Properties = #{'Subscription-Identifier' => 2333},
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:ConnFun(Client1),
{ok, _, [2]} = emqtt:subscribe(Client1, Properties, Topic, qos2), {ok, _, [2]} = emqtt:subscribe(Client1, Properties, Topic, qos2),
{ok, _, [1]} = emqtt:subscribe(Client1, Properties, Topic, qos1), {ok, _, [1]} = emqtt:subscribe(Client1, Properties, Topic, qos1),
{ok, _} = emqtt:publish(Client1, Topic, <<"t_subscribe_actions">>, 2), {ok, _} = emqtt:publish(Client1, Topic, <<"t_subscribe_actions">>, 2),
@ -726,12 +774,13 @@ t_subscribe_actions(_) ->
%% Unsubsctibe Unsuback %% Unsubsctibe Unsuback
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_unscbsctibe(_) -> t_unscbsctibe(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic1 = nth(1, ?TOPICS), Topic1 = nth(1, ?TOPICS),
Topic2 = nth(2, ?TOPICS), Topic2 = nth(2, ?TOPICS),
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:ConnFun(Client1),
{ok, _, [2]} = emqtt:subscribe(Client1, Topic1, qos2), {ok, _, [2]} = emqtt:subscribe(Client1, Topic1, qos2),
{ok, _, [0]} = emqtt:unsubscribe(Client1, Topic1), %% [MQTT-3.10.4-4] {ok, _, [0]} = emqtt:unsubscribe(Client1, Topic1), %% [MQTT-3.10.4-4]
{ok, _, [17]} = emqtt:unsubscribe(Client1, <<"noExistTopic">>), %% [MQTT-3.10.4-5] {ok, _, [17]} = emqtt:unsubscribe(Client1, <<"noExistTopic">>), %% [MQTT-3.10.4-5]
@ -745,9 +794,10 @@ t_unscbsctibe(_) ->
%% Pingreq %% Pingreq
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_pingreq(_) -> t_pingreq(Config) ->
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]), ConnFun = ?config(conn_fun, Config),
{ok, _} = emqtt:connect(Client1), {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:ConnFun(Client1),
pong = emqtt:ping(Client1), %% [MQTT-3.12.4-1] pong = emqtt:ping(Client1), %% [MQTT-3.12.4-1]
ok = emqtt:disconnect(Client1). ok = emqtt:disconnect(Client1).
@ -755,7 +805,14 @@ t_pingreq(_) ->
%% Shared Subscriptions %% Shared Subscriptions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_shared_subscriptions_client_terminates_when_qos_eq_2(_) -> t_shared_subscriptions_client_terminates_when_qos_eq_2(init, Config) ->
ok = meck:new(emqtt, [non_strict, passthrough, no_history, no_link]),
Config;
t_shared_subscriptions_client_terminates_when_qos_eq_2('end', _Config) ->
catch meck:unload(emqtt).
t_shared_subscriptions_client_terminates_when_qos_eq_2(Config) ->
ConnFun = ?config(conn_fun, Config),
process_flag(trap_exit, true), process_flag(trap_exit, true),
application:set_env(emqx, shared_dispatch_ack_enabled, true), application:set_env(emqx, shared_dispatch_ack_enabled, true),
@ -766,32 +823,33 @@ t_shared_subscriptions_client_terminates_when_qos_eq_2(_) ->
meck:expect(emqtt, connected, meck:expect(emqtt, connected,
fun(cast, ?PUBLISH_PACKET(?QOS_2, _PacketId), _State) -> fun(cast, ?PUBLISH_PACKET(?QOS_2, _PacketId), _State) ->
ok = counters:add(CRef, 1, 1), ok = counters:add(CRef, 1, 1),
{stop, {shutdown, for_testiong}}; {stop, {shutdown, for_testing}};
(Arg1, ARg2, Arg3) -> meck:passthrough([Arg1, ARg2, Arg3]) (Arg1, ARg2, Arg3) -> meck:passthrough([Arg1, ARg2, Arg3])
end), end),
{ok, Sub1} = emqtt:start_link([{proto_ver, v5}, {ok, Sub1} = emqtt:start_link([ {proto_ver, v5},
{clientid, <<"sub_client_1">>}, {clientid, <<"sub_client_1">>},
{keepalive, 5}]), {keepalive, 5} | Config
{ok, _} = emqtt:connect(Sub1), ]),
{ok, _} = emqtt:ConnFun(Sub1),
{ok, _, [2]} = emqtt:subscribe(Sub1, SharedTopic, qos2), {ok, _, [2]} = emqtt:subscribe(Sub1, SharedTopic, qos2),
{ok, Sub2} = emqtt:start_link([{proto_ver, v5}, {ok, Sub2} = emqtt:start_link([{proto_ver, v5},
{clientid, <<"sub_client_2">>}, {clientid, <<"sub_client_2">>},
{keepalive, 5}]), {keepalive, 5} | Config]),
{ok, _} = emqtt:connect(Sub2), {ok, _} = emqtt:ConnFun(Sub2),
{ok, _, [2]} = emqtt:subscribe(Sub2, SharedTopic, qos2), {ok, _, [2]} = emqtt:subscribe(Sub2, SharedTopic, qos2),
{ok, Pub} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"pub_client">>}]), {ok, Pub} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"pub_client">>} | Config]),
{ok, _} = emqtt:connect(Pub), {ok, _} = emqtt:ConnFun(Pub),
{ok, _} = emqtt:publish(Pub, Topic, {ok, _} = emqtt:publish(Pub, Topic,
<<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 2), <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 2),
receive receive
{'EXIT', _,{shutdown, for_testiong}} -> {'EXIT', _,{shutdown, for_testing}} ->
ok ok
after 1000 -> after 1000 ->
error("disconnected timeout") ct:fail("disconnected timeout")
end, end,
?assertEqual(1, counters:get(CRef, 1)), ?assertEqual(1, counters:get(CRef, 1)),