diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 8e3ee400b..7300281d7 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -447,7 +447,7 @@ handle_msg({Closed, _Sock}, State) handle_info({sock_closed, Closed}, close_socket(State)); handle_msg({Passive, _Sock}, State) - when Passive == tcp_passive; Passive == ssl_passive -> + when Passive == tcp_passive; Passive == ssl_passive; Passive =:= quic_passive -> %% In Stats Pubs = emqx_pd:reset_counter(incoming_pubs), Bytes = emqx_pd:reset_counter(incoming_bytes), @@ -738,9 +738,15 @@ handle_info({sock_error, Reason}, State) -> end, handle_info({sock_closed, Reason}, close_socket(State)); +handle_info({quic, peer_send_shutdown, _Stream}, State) -> + handle_info({sock_closed, force}, close_socket(State)); + handle_info({quic, closed, _Channel, ReasonFlag}, State) -> handle_info({sock_closed, ReasonFlag}, State); +handle_info({quic, closed, _Stream}, State) -> + handle_info({sock_closed, force}, State); + handle_info(Info, State) -> with_channel(handle_info, [Info], State). diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index c7d42e2e4..48e99926d 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -157,6 +157,7 @@ start_listener(quic, ListenOn, Options) -> ConnectionOpts = [ {conn_callback, emqx_quic_connection} , {peer_unidi_stream_count, 1} , {peer_bidi_stream_count, 10} + | Options ], StreamOpts = [], quicer:start_listener('mqtt:quic', ListenOn, {ListenOpts, ConnectionOpts, StreamOpts}). diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index e5cd4c3fc..64e851758 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -52,7 +52,8 @@ getstat(Socket, Stats) -> Res -> Res end. -setopts(_Socket, _Opts) -> +setopts(Socket, Opts) -> + [ quicer:setopt(Socket, Opt, V) || {Opt, V} <- Opts ], ok. getopts(_Socket, _Opts) -> @@ -64,7 +65,7 @@ getopts(_Socket, _Opts) -> {buffer,80000}]}. fast_close(Stream) -> - quicer:close_stream(Stream), + quicer:async_close_stream(Stream), %% Stream might be closed already. ok. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 440dd8117..d32073ee5 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -392,8 +392,7 @@ fields("wss_listener_settings") -> lists:keydelete("high_watermark", 1, Settings); fields("quic_listener_settings") -> - Unsupported = [ "active_n" - , "access" + Unsupported = [ "access" , "proxy_protocol" , "proxy_protocol_timeout" , "backlog" diff --git a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl index ab4d96eea..2f3048277 100644 --- a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl @@ -23,6 +23,7 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("common_test/include/ct.hrl"). -import(lists, [nth/2]). @@ -32,18 +33,37 @@ -define(WILD_TOPICS, [<<"TopicA/+">>, <<"+/C">>, <<"#">>, <<"/#">>, <<"/+">>, <<"+/+">>, <<"TopicA/#">>]). -all() -> emqx_ct:all(?MODULE). +all() -> + [ {group, tcp} + , {group, quic} + ]. + +groups() -> + TCs = emqx_ct:all(?MODULE), + [ {tcp, [], TCs} + , {quic, [], TCs} + ]. + +init_per_group(tcp, Config) -> + emqx_ct_helpers:start_apps([]), + [ {port, 1883}, {conn_fun, connect} | Config]; +init_per_group(quic, Config) -> + emqx_ct_helpers:start_apps([]), + [ {port, 14567}, {conn_fun, quic_connect} | Config]; +init_per_group(_, Config) -> + emqx_ct_helpers:stop_apps([]), + Config. + +end_per_group(_Group, _Config) -> + ok. init_per_suite(Config) -> - %% Meck emqtt - ok = meck:new(emqtt, [non_strict, passthrough, no_history, no_link]), %% Start Apps emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([]), Config. end_per_suite(_Config) -> - ok = meck:unload(emqtt), emqx_ct_helpers:stop_apps([]). init_per_testcase(TestCase, Config) -> @@ -97,9 +117,10 @@ waiting_client_process_exit(C) -> 1000 -> error({waiting_timeout, C}) end. -clean_retained(Topic) -> - {ok, Clean} = emqtt:start_link([{clean_start, true}]), - {ok, _} = emqtt:connect(Clean), +clean_retained(Topic, Config) -> + ConnFun = ?config(conn_fun, Config), + {ok, Clean} = emqtt:start_link([{clean_start, true} | Config]), + {ok, _} = emqtt:ConnFun(Clean), {ok, _} = emqtt:publish(Clean, Topic, #{}, <<"">>, [{qos, ?QOS_1}, {retain, true}]), ok = emqtt:disconnect(Clean). @@ -107,11 +128,12 @@ clean_retained(Topic) -> %% Test Cases %%-------------------------------------------------------------------- -t_basic_test(_) -> +t_basic_test(Config) -> + ConnFun = ?config(conn_fun, Config), Topic = nth(1, ?TOPICS), ct:print("Basic test starting"), - {ok, C} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(C), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(C), {ok, _, [1]} = emqtt:subscribe(C, Topic, qos1), {ok, _, [2]} = emqtt:subscribe(C, Topic, qos2), {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2), @@ -124,16 +146,17 @@ t_basic_test(_) -> %% Connection %%-------------------------------------------------------------------- -t_connect_clean_start(_) -> +t_connect_clean_start(Config) -> + ConnFun = ?config(conn_fun, Config), process_flag(trap_exit, true), {ok, Client1} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>}, - {proto_ver, v5},{clean_start, true}]), - {ok, _} = emqtt:connect(Client1), + {proto_ver, v5},{clean_start, true} | Config]), + {ok, _} = emqtt:ConnFun(Client1), ?assertEqual(0, client_info(session_present, Client1)), %% [MQTT-3.1.2-4] ok = emqtt:pause(Client1), {ok, Client2} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>}, - {proto_ver, v5},{clean_start, false}]), - {ok, _} = emqtt:connect(Client2), + {proto_ver, v5},{clean_start, false} | Config]), + {ok, _} = emqtt:ConnFun(Client2), ?assertEqual(1, client_info(session_present, Client2)), %% [MQTT-3.1.2-5] ?assertEqual(142, receive_disconnect_reasoncode()), waiting_client_process_exit(Client1), @@ -142,32 +165,32 @@ t_connect_clean_start(_) -> waiting_client_process_exit(Client2), {ok, Client3} = emqtt:start_link([{clientid, <<"new_client">>}, - {proto_ver, v5},{clean_start, false}]), - {ok, _} = emqtt:connect(Client3), + {proto_ver, v5},{clean_start, false} | Config]), + {ok, _} = emqtt:ConnFun(Client3), ?assertEqual(0, client_info(session_present, Client3)), %% [MQTT-3.1.2-6] ok = emqtt:disconnect(Client3), waiting_client_process_exit(Client3), process_flag(trap_exit, false). -t_connect_will_message(_) -> +t_connect_will_message(Config) -> + ConnFun = ?config(conn_fun, Config), Topic = nth(1, ?TOPICS), Payload = "will message", - {ok, Client1} = emqtt:start_link([ - {proto_ver, v5}, - {clean_start, true}, - {will_flag, true}, - {will_topic, Topic}, - {will_payload, Payload} - ]), - {ok, _} = emqtt:connect(Client1), + {ok, Client1} = emqtt:start_link([ {proto_ver, v5}, + {clean_start, true}, + {will_flag, true}, + {will_topic, Topic}, + {will_payload, Payload} | Config + ]), + {ok, _} = emqtt:ConnFun(Client1), [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client1)), Info = emqx_connection:info(sys:get_state(ClientPid)), ?assertNotEqual(undefined, maps:find(will_msg, Info)), %% [MQTT-3.1.2-7] - {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client2), + {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client2), {ok, _, [2]} = emqtt:subscribe(Client2, Topic, qos2), ok = emqtt:disconnect(Client1, 4), %% [MQTT-3.14.2-1] @@ -178,27 +201,32 @@ t_connect_will_message(_) -> ?assertEqual({ok, 0}, maps:find(qos, Msg)), ok = emqtt:disconnect(Client2), - {ok, Client3} = emqtt:start_link([ - {proto_ver, v5}, - {clean_start, true}, - {will_flag, true}, - {will_topic, Topic}, - {will_payload, Payload} - ]), - {ok, _} = emqtt:connect(Client3), + {ok, Client3} = emqtt:start_link([ {proto_ver, v5}, + {clean_start, true}, + {will_flag, true}, + {will_topic, Topic}, + {will_payload, Payload} | Config + ]), + {ok, _} = emqtt:ConnFun(Client3), - {ok, Client4} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client4), + {ok, Client4} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client4), {ok, _, [2]} = emqtt:subscribe(Client4, Topic, qos2), ok = emqtt:disconnect(Client3), ?assertEqual(0, length(receive_messages(1))), %% [MQTT-3.1.2-10] ok = emqtt:disconnect(Client4). -t_batch_subscribe(_) -> - {ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"batch_test">>}]), - {ok, _} = emqtt:connect(Client), +t_batch_subscribe(init, Config) -> ok = meck:new(emqx_access_control, [non_strict, passthrough, no_history, no_link]), meck:expect(emqx_access_control, authorize, fun(_, _, _) -> deny end), + Config; +t_batch_subscribe('end', _Config) -> + meck:unload(emqx_access_control). + +t_batch_subscribe(Config) -> + ConnFun = ?config(conn_fun, Config), + {ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"batch_test">>} | Config]), + {ok, _} = emqtt:ConnFun(Client), {ok, _, [?RC_NOT_AUTHORIZED, ?RC_NOT_AUTHORIZED, ?RC_NOT_AUTHORIZED]} = emqtt:subscribe(Client, [{<<"t1">>, qos1}, @@ -209,25 +237,25 @@ t_batch_subscribe(_) -> ?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(Client, [<<"t1">>, <<"t2">>, <<"t3">>]), - meck:unload(emqx_access_control), emqtt:disconnect(Client). -t_connect_will_retain(_) -> +t_connect_will_retain(Config) -> + ConnFun = ?config(conn_fun, Config), + process_flag(trap_exit, true), Topic = nth(1, ?TOPICS), Payload = "will message", - {ok, Client1} = emqtt:start_link([ - {proto_ver, v5}, - {clean_start, true}, - {will_flag, true}, - {will_topic, Topic}, - {will_payload, Payload}, - {will_retain, false} - ]), - {ok, _} = emqtt:connect(Client1), + {ok, Client1} = emqtt:start_link([ {proto_ver, v5}, + {clean_start, true}, + {will_flag, true}, + {will_topic, Topic}, + {will_payload, Payload}, + {will_retain, false} | Config + ]), + {ok, _} = emqtt:ConnFun(Client1), - {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client2), + {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client2), {ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{rap, true}, {qos, 2}]}]), ok = emqtt:disconnect(Client1, 4), @@ -235,27 +263,26 @@ t_connect_will_retain(_) -> ?assertEqual({ok, false}, maps:find(retain, Msg1)), %% [MQTT-3.1.2-14] ok = emqtt:disconnect(Client2), - {ok, Client3} = emqtt:start_link([ - {proto_ver, v5}, - {clean_start, true}, - {will_flag, true}, - {will_topic, Topic}, - {will_payload, Payload}, - {will_retain, true} - ]), - {ok, _} = emqtt:connect(Client3), + {ok, Client3} = emqtt:start_link([ {proto_ver, v5}, + {clean_start, true}, + {will_flag, true}, + {will_topic, Topic}, + {will_payload, Payload}, + {will_retain, true} | Config + ]), + {ok, _} = emqtt:ConnFun(Client3), - {ok, Client4} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client4), + {ok, Client4} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client4), {ok, _, [2]} = emqtt:subscribe(Client4, #{}, [{Topic, [{rap, true}, {qos, 2}]}]), ok = emqtt:disconnect(Client3, 4), [Msg2 | _ ] = receive_messages(1), ?assertEqual({ok, true}, maps:find(retain, Msg2)), %% [MQTT-3.1.2-15] ok = emqtt:disconnect(Client4), - clean_retained(Topic). + clean_retained(Topic, Config). -t_connect_idle_timeout(_) -> +t_connect_idle_timeout(_Config) -> IdleTimeout = 2000, emqx_zone:set_env(external, idle_timeout, IdleTimeout), @@ -263,25 +290,30 @@ t_connect_idle_timeout(_) -> timer:sleep(IdleTimeout), ?assertMatch({error, closed}, emqtt_sock:recv(Sock,1024)). -t_connect_limit_timeout(_) -> +t_connect_limit_timeout(init, Config) -> ok = meck:new(proplists, [non_strict, passthrough, no_history, no_link, unstick]), meck:expect(proplists, get_value, fun(active_n, _Options, _Default) -> 1; (Arg1, ARg2, Arg3) -> meck:passthrough([Arg1, ARg2, Arg3]) end), + Config; +t_connect_limit_timeout('end', _Config) -> + catch meck:unload(proplists). +t_connect_limit_timeout(Config) -> + ConnFun = ?config(conn_fun, Config), Topic = nth(1, ?TOPICS), emqx_zone:set_env(external, publish_limit, {3, 5}), - {ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]), - {ok, _} = emqtt:connect(Client), + {ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60} | Config]), + {ok, _} = emqtt:ConnFun(Client), [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)), ?assertEqual(undefined, emqx_connection:info(limit_timer, sys:get_state(ClientPid))), Payload = <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, - ok = emqtt:publish(Client, Topic, Payload, 0), - ok = emqtt:publish(Client, Topic, Payload, 0), - ok = emqtt:publish(Client, Topic, Payload, 0), - timer:sleep(200), + {ok, 2} = emqtt:publish(Client, Topic, Payload, 1), + {ok, 3} = emqtt:publish(Client, Topic, Payload, 1), + {ok, 4} = emqtt:publish(Client, Topic, Payload, 1), + timer:sleep(250), ?assert(is_reference(emqx_connection:info(limit_timer, sys:get_state(ClientPid)))), ok = emqtt:disconnect(Client), @@ -301,9 +333,10 @@ t_connect_emit_stats_timeout('end', Config) -> ok. t_connect_emit_stats_timeout(Config) -> + ConnFun = ?config(conn_fun, Config), {_, IdleTimeout} = lists:keyfind(idle_timeout, 1, Config), - {ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]), - {ok, _} = emqtt:connect(Client), + {ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60} | Config]), + {ok, _} = emqtt:ConnFun(Client), [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)), ?assert(is_reference(emqx_connection:info(stats_timer, sys:get_state(ClientPid)))), ?block_until(#{?snk_kind := cancel_stats_timer}, IdleTimeout * 2, _BackInTime = 0), @@ -311,15 +344,16 @@ t_connect_emit_stats_timeout(Config) -> ok = emqtt:disconnect(Client). %% [MQTT-3.1.2-22] -t_connect_keepalive_timeout(_) -> +t_connect_keepalive_timeout(Config) -> + ConnFun = ?config(conn_fun, Config), %% Prevent the emqtt client bringing us down on the disconnect. process_flag(trap_exit, true), Keepalive = 2, {ok, Client} = emqtt:start_link([{proto_ver, v5}, - {keepalive, Keepalive}]), - {ok, _} = emqtt:connect(Client), + {keepalive, Keepalive} | Config]), + {ok, _} = emqtt:ConnFun(Client), emqtt:pause(Client), receive {disconnected, ReasonCode, _Channel} -> ?assertEqual(141, ReasonCode) @@ -328,30 +362,30 @@ t_connect_keepalive_timeout(_) -> end. %% [MQTT-3.1.2-23] -t_connect_session_expiry_interval(_) -> +t_connect_session_expiry_interval(Config) -> + ConnFun = ?config(conn_fun, Config), Topic = nth(1, ?TOPICS), Payload = "test message", - {ok, Client1} = emqtt:start_link([ - {clientid, <<"t_connect_session_expiry_interval">>}, - {proto_ver, v5}, - {properties, #{'Session-Expiry-Interval' => 7200}} + {ok, Client1} = emqtt:start_link([ {clientid, <<"t_connect_session_expiry_interval">>}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 7200}} + | Config ]), - {ok, _} = emqtt:connect(Client1), + {ok, _} = emqtt:ConnFun(Client1), {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2), ok = emqtt:disconnect(Client1), - {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client2), + {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client2), {ok, 2} = emqtt:publish(Client2, Topic, Payload, 2), ok = emqtt:disconnect(Client2), - {ok, Client3} = emqtt:start_link([ - {clientid, <<"t_connect_session_expiry_interval">>}, - {proto_ver, v5}, - {clean_start, false} + {ok, Client3} = emqtt:start_link([ {clientid, <<"t_connect_session_expiry_interval">>}, + {proto_ver, v5}, + {clean_start, false} | Config ]), - {ok, _} = emqtt:connect(Client3), + {ok, _} = emqtt:ConnFun(Client3), [Msg | _ ] = receive_messages(1), ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)), ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)), @@ -360,13 +394,13 @@ t_connect_session_expiry_interval(_) -> %% [MQTT-3.1.3-9] %% !!!REFACTOR NEED: -%t_connect_will_delay_interval(_) -> +%t_connect_will_delay_interval(Config) -> % process_flag(trap_exit, true), % Topic = nth(1, ?TOPICS), % Payload = "will message", % -% {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), -% {ok, _} = emqtt:connect(Client1), +% {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]), +% {ok, _} = emqtt:ConnFun(Client1), % {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2), % % {ok, Client2} = emqtt:start_link([ @@ -379,9 +413,9 @@ t_connect_session_expiry_interval(_) -> % {will_payload, Payload}, % {will_props, #{'Will-Delay-Interval' => 3}}, % {properties, #{'Session-Expiry-Interval' => 7200}}, -% {keepalive, 2} +% {keepalive, 2} | Config % ]), -% {ok, _} = emqtt:connect(Client2), +% {ok, _} = emqtt:ConnFun(Client2), % timer:sleep(50), % erlang:exit(Client2, kill), % timer:sleep(2000), @@ -399,9 +433,9 @@ t_connect_session_expiry_interval(_) -> % {will_payload, Payload}, % {will_props, #{'Will-Delay-Interval' => 7200}}, % {properties, #{'Session-Expiry-Interval' => 3}}, -% {keepalive, 2} +% {keepalive, 2} | Config % ]), -% {ok, _} = emqtt:connect(Client3), +% {ok, _} = emqtt:ConnFun(Client3), % timer:sleep(50), % erlang:exit(Client3, kill), % @@ -418,18 +452,17 @@ t_connect_session_expiry_interval(_) -> % process_flag(trap_exit, false). %% [MQTT-3.1.4-3] -t_connect_duplicate_clientid(_) -> +t_connect_duplicate_clientid(Config) -> + ConnFun = ?config(conn_fun, Config), process_flag(trap_exit, true), - {ok, Client1} = emqtt:start_link([ - {clientid, <<"t_connect_duplicate_clientid">>}, - {proto_ver, v5} - ]), - {ok, _} = emqtt:connect(Client1), - {ok, Client2} = emqtt:start_link([ - {clientid, <<"t_connect_duplicate_clientid">>}, - {proto_ver, v5} - ]), - {ok, _} = emqtt:connect(Client2), + {ok, Client1} = emqtt:start_link([ {clientid, <<"t_connect_duplicate_clientid">>}, + {proto_ver, v5} | Config + ]), + {ok, _} = emqtt:ConnFun(Client1), + {ok, Client2} = emqtt:start_link([ {clientid, <<"t_connect_duplicate_clientid">>}, + {proto_ver, v5} | Config + ]), + {ok, _} = emqtt:ConnFun(Client2), ?assertEqual(142, receive_disconnect_reasoncode()), waiting_client_process_exit(Client1), @@ -441,28 +474,33 @@ t_connect_duplicate_clientid(_) -> %% Connack %%-------------------------------------------------------------------- -t_connack_session_present(_) -> - {ok, Client1} = emqtt:start_link([ - {clientid, <<"t_connect_duplicate_clientid">>}, - {proto_ver, v5}, - {properties, #{'Session-Expiry-Interval' => 7200}}, - {clean_start, true} - ]), - {ok, _} = emqtt:connect(Client1), +t_connack_session_present(Config) -> + ConnFun = ?config(conn_fun, Config), + {ok, Client1} = emqtt:start_link([ {clientid, <<"t_connect_duplicate_clientid">>}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 7200}}, + {clean_start, true} | Config + ]), + {ok, _} = emqtt:ConnFun(Client1), ?assertEqual(0, client_info(session_present, Client1)), %% [MQTT-3.2.2-2] ok = emqtt:disconnect(Client1), - {ok, Client2} = emqtt:start_link([ - {clientid, <<"t_connect_duplicate_clientid">>}, - {proto_ver, v5}, - {properties, #{'Session-Expiry-Interval' => 7200}}, - {clean_start, false} - ]), - {ok, _} = emqtt:connect(Client2), + {ok, Client2} = emqtt:start_link([ {clientid, <<"t_connect_duplicate_clientid">>}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 7200}}, + {clean_start, false} | Config + ]), + {ok, _} = emqtt:ConnFun(Client2), ?assertEqual(1, client_info(session_present, Client2)), %% [[MQTT-3.2.2-3]] ok = emqtt:disconnect(Client2). -t_connack_max_qos_allowed(_) -> +t_connack_max_qos_allowed(init, Config) -> + Config; +t_connack_max_qos_allowed('end', _Config) -> + emqx_zone:set_env(external, max_qos_allowed, 2), + ok. +t_connack_max_qos_allowed(Config) -> + ConnFun = ?config(conn_fun, Config), process_flag(trap_exit, true), Topic = nth(1, ?TOPICS), @@ -471,8 +509,8 @@ t_connack_max_qos_allowed(_) -> persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), - {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), - {ok, Connack1} = emqtt:connect(Client1), + {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, Connack1} = emqtt:ConnFun(Client1), ?assertEqual(0, maps:get('Maximum-QoS', Connack1)), %% [MQTT-3.2.2-9] {ok, _, [0]} = emqtt:subscribe(Client1, Topic, 0), %% [MQTT-3.2.2-10] @@ -483,14 +521,13 @@ t_connack_max_qos_allowed(_) -> ?assertEqual(155, receive_disconnect_reasoncode()), %% [MQTT-3.2.2-11] waiting_client_process_exit(Client1), - {ok, Client2} = emqtt:start_link([ - {proto_ver, v5}, - {will_flag, true}, - {will_topic, Topic}, - {will_payload, <<"Unsupported Qos">>}, - {will_qos, 2} - ]), - {error, Connack2} = emqtt:connect(Client2), + {ok, Client2} = emqtt:start_link([ {proto_ver, v5}, + {will_flag, true}, + {will_topic, Topic}, + {will_payload, <<"Unsupported Qos">>}, + {will_qos, 2} | Config + ]), + {error, Connack2} = emqtt:ConnFun(Client2), ?assertMatch({qos_not_supported, _}, Connack2), %% [MQTT-3.2.2-12] waiting_client_process_exit(Client2), @@ -499,8 +536,8 @@ t_connack_max_qos_allowed(_) -> persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), - {ok, Client3} = emqtt:start_link([{proto_ver, v5}]), - {ok, Connack3} = emqtt:connect(Client3), + {ok, Client3} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, Connack3} = emqtt:ConnFun(Client3), ?assertEqual(1, maps:get('Maximum-QoS', Connack3)), %% [MQTT-3.2.2-9] {ok, _, [0]} = emqtt:subscribe(Client3, Topic, 0), %% [MQTT-3.2.2-10] @@ -511,14 +548,13 @@ t_connack_max_qos_allowed(_) -> ?assertEqual(155, receive_disconnect_reasoncode()), %% [MQTT-3.2.2-11] waiting_client_process_exit(Client3), - {ok, Client4} = emqtt:start_link([ - {proto_ver, v5}, - {will_flag, true}, - {will_topic, Topic}, - {will_payload, <<"Unsupported Qos">>}, - {will_qos, 2} - ]), - {error, Connack4} = emqtt:connect(Client4), + {ok, Client4} = emqtt:start_link([ {proto_ver, v5}, + {will_flag, true}, + {will_topic, Topic}, + {will_payload, <<"Unsupported Qos">>}, + {will_qos, 2} | Config + ]), + {error, Connack4} = emqtt:ConnFun(Client4), ?assertMatch({qos_not_supported, _}, Connack4), %% [MQTT-3.2.2-12] waiting_client_process_exit(Client4), @@ -527,17 +563,18 @@ t_connack_max_qos_allowed(_) -> persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), - {ok, Client5} = emqtt:start_link([{proto_ver, v5}]), - {ok, Connack5} = emqtt:connect(Client5), + {ok, Client5} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, Connack5} = emqtt:ConnFun(Client5), ?assertEqual(undefined, maps:get('Maximum-QoS', Connack5, undefined)), %% [MQTT-3.2.2-9] ok = emqtt:disconnect(Client5), waiting_client_process_exit(Client5), process_flag(trap_exit, false). -t_connack_assigned_clienid(_) -> - {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client1), +t_connack_assigned_clienid(Config) -> + ConnFun = ?config(conn_fun, Config), + {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client1), ?assert(is_binary(client_info(clientid, Client1))), %% [MQTT-3.2.2-16] ok = emqtt:disconnect(Client1). @@ -545,11 +582,12 @@ t_connack_assigned_clienid(_) -> %% Publish %%-------------------------------------------------------------------- -t_publish_rap(_) -> +t_publish_rap(Config) -> + ConnFun = ?config(conn_fun, Config), Topic = nth(1, ?TOPICS), - {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client1), + {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client1), {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{rap, true}, {qos, 2}]}]), {ok, _} = emqtt:publish(Client1, Topic, #{}, <<"retained message">>, [{qos, ?QOS_1}, {retain, true}]), @@ -557,8 +595,8 @@ t_publish_rap(_) -> ?assertEqual(true, maps:get(retain, Msg1)), %% [MQTT-3.3.1-12] ok = emqtt:disconnect(Client1), - {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client2), + {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client2), {ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{rap, false}, {qos, 2}]}]), {ok, _} = emqtt:publish(Client2, Topic, #{}, <<"retained message">>, [{qos, ?QOS_1}, {retain, true}]), @@ -566,44 +604,47 @@ t_publish_rap(_) -> ?assertEqual(false, maps:get(retain, Msg2)), %% [MQTT-3.3.1-13] ok = emqtt:disconnect(Client2), - clean_retained(Topic). + clean_retained(Topic, Config). -t_publish_wildtopic(_) -> +t_publish_wildtopic(Config) -> + ConnFun = ?config(conn_fun, Config), process_flag(trap_exit, true), Topic = nth(1, ?WILD_TOPICS), - {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client1), + {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client1), ok = emqtt:publish(Client1, Topic, <<"error topic">>), ?assertEqual(144, receive_disconnect_reasoncode()), waiting_client_process_exit(Client1), process_flag(trap_exit, false). -t_publish_payload_format_indicator(_) -> +t_publish_payload_format_indicator(Config) -> + ConnFun = ?config(conn_fun, Config), Topic = nth(1, ?TOPICS), Properties = #{'Payload-Format-Indicator' => 233}, - {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client1), + {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client1), {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2), ok = emqtt:publish(Client1, Topic, Properties, <<"Payload Format Indicator">>, [{qos, ?QOS_0}]), [Msg1 | _] = receive_messages(1), ?assertEqual(Properties, maps:get(properties, Msg1)), %% [MQTT-3.3.2-6] ok = emqtt:disconnect(Client1). -t_publish_topic_alias(_) -> +t_publish_topic_alias(Config) -> + ConnFun = ?config(conn_fun, Config), process_flag(trap_exit, true), Topic = nth(1, ?TOPICS), - {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client1), + {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client1), ok = emqtt:publish(Client1, Topic, #{'Topic-Alias' => 0}, <<"Topic-Alias">>, [{qos, ?QOS_0}]), ?assertEqual(148, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-8] waiting_client_process_exit(Client1), - {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client2), + {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client2), {ok, _, [2]} = emqtt:subscribe(Client2, Topic, qos2), ok = emqtt:publish(Client2, Topic, #{'Topic-Alias' => 233}, <<"Topic-Alias">>, [{qos, ?QOS_0}]), @@ -615,12 +656,13 @@ t_publish_topic_alias(_) -> process_flag(trap_exit, false). -t_publish_response_topic(_) -> +t_publish_response_topic(Config) -> + ConnFun = ?config(conn_fun, Config), process_flag(trap_exit, true), Topic = nth(1, ?TOPICS), - {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client1), + {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client1), ok = emqtt:publish(Client1, Topic, #{'Response-Topic' => nth(1, ?WILD_TOPICS)}, <<"Response-Topic">>, [{qos, ?QOS_0}]), ?assertEqual(130, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-14] @@ -628,7 +670,8 @@ t_publish_response_topic(_) -> process_flag(trap_exit, false). -t_publish_properties(_) -> +t_publish_properties(Config) -> + ConnFun = ?config(conn_fun, Config), Topic = nth(1, ?TOPICS), Properties = #{ 'Response-Topic' => Topic, %% [MQTT-3.3.2-15] @@ -637,20 +680,21 @@ t_publish_properties(_) -> 'Content-Type' => <<"2333">> %% [MQTT-3.3.2-20] }, - {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client1), + {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client1), {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2), ok = emqtt:publish(Client1, Topic, Properties, <<"Publish Properties">>, [{qos, ?QOS_0}]), [Msg1 | _] = receive_messages(1), ?assertEqual(Properties, maps:get(properties, Msg1)), %% [MQTT-3.3.2-16] ok = emqtt:disconnect(Client1). -t_publish_overlapping_subscriptions(_) -> +t_publish_overlapping_subscriptions(Config) -> + ConnFun = ?config(conn_fun, Config), Topic = nth(1, ?TOPICS), Properties = #{'Subscription-Identifier' => 2333}, - {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client1), + {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client1), {ok, _, [1]} = emqtt:subscribe(Client1, Properties, nth(1, ?WILD_TOPICS), qos1), {ok, _, [0]} = emqtt:subscribe(Client1, Properties, nth(3, ?WILD_TOPICS), qos0), {ok, _} = emqtt:publish(Client1, Topic, #{}, @@ -665,13 +709,15 @@ t_publish_overlapping_subscriptions(_) -> %% Subsctibe %%-------------------------------------------------------------------- -t_subscribe_topic_alias(_) -> +t_subscribe_topic_alias(Config) -> + ConnFun = ?config(conn_fun, Config), Topic1 = nth(1, ?TOPICS), Topic2 = nth(2, ?TOPICS), - {ok, Client1} = emqtt:start_link([{proto_ver, v5}, - {properties, #{'Topic-Alias-Maximum' => 1}} + {ok, Client1} = emqtt:start_link([ {proto_ver, v5}, + {properties, #{'Topic-Alias-Maximum' => 1}} + | Config ]), - {ok, _} = emqtt:connect(Client1), + {ok, _} = emqtt:ConnFun(Client1), {ok, _, [2]} = emqtt:subscribe(Client1, Topic1, qos2), {ok, _, [2]} = emqtt:subscribe(Client1, Topic2, qos2), @@ -692,27 +738,29 @@ t_subscribe_topic_alias(_) -> ok = emqtt:disconnect(Client1). -t_subscribe_no_local(_) -> +t_subscribe_no_local(Config) -> + ConnFun = ?config(conn_fun, Config), Topic = nth(1, ?TOPICS), - {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client1), + {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client1), {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{nl, true}, {qos, 2}]}]), - {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client2), + {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client2), {ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{nl, true}, {qos, 2}]}]), ok = emqtt:publish(Client1, Topic, <<"t_subscribe_no_local">>, 0), ?assertEqual(1, length(receive_messages(2))), %% [MQTT-3.8.3-3] ok = emqtt:disconnect(Client1). -t_subscribe_actions(_) -> +t_subscribe_actions(Config) -> + ConnFun = ?config(conn_fun, Config), Topic = nth(1, ?TOPICS), Properties = #{'Subscription-Identifier' => 2333}, - {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client1), + {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client1), {ok, _, [2]} = emqtt:subscribe(Client1, Properties, Topic, qos2), {ok, _, [1]} = emqtt:subscribe(Client1, Properties, Topic, qos1), {ok, _} = emqtt:publish(Client1, Topic, <<"t_subscribe_actions">>, 2), @@ -726,12 +774,13 @@ t_subscribe_actions(_) -> %% Unsubsctibe Unsuback %%-------------------------------------------------------------------- -t_unscbsctibe(_) -> +t_unscbsctibe(Config) -> + ConnFun = ?config(conn_fun, Config), Topic1 = nth(1, ?TOPICS), Topic2 = nth(2, ?TOPICS), - {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client1), + {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client1), {ok, _, [2]} = emqtt:subscribe(Client1, Topic1, qos2), {ok, _, [0]} = emqtt:unsubscribe(Client1, Topic1), %% [MQTT-3.10.4-4] {ok, _, [17]} = emqtt:unsubscribe(Client1, <<"noExistTopic">>), %% [MQTT-3.10.4-5] @@ -745,9 +794,10 @@ t_unscbsctibe(_) -> %% Pingreq %%-------------------------------------------------------------------- -t_pingreq(_) -> - {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), - {ok, _} = emqtt:connect(Client1), +t_pingreq(Config) -> + ConnFun = ?config(conn_fun, Config), + {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:ConnFun(Client1), pong = emqtt:ping(Client1), %% [MQTT-3.12.4-1] ok = emqtt:disconnect(Client1). @@ -755,7 +805,14 @@ t_pingreq(_) -> %% Shared Subscriptions %%-------------------------------------------------------------------- -t_shared_subscriptions_client_terminates_when_qos_eq_2(_) -> +t_shared_subscriptions_client_terminates_when_qos_eq_2(init, Config) -> + ok = meck:new(emqtt, [non_strict, passthrough, no_history, no_link]), + Config; +t_shared_subscriptions_client_terminates_when_qos_eq_2('end', _Config) -> + catch meck:unload(emqtt). + +t_shared_subscriptions_client_terminates_when_qos_eq_2(Config) -> + ConnFun = ?config(conn_fun, Config), process_flag(trap_exit, true), application:set_env(emqx, shared_dispatch_ack_enabled, true), @@ -766,32 +823,33 @@ t_shared_subscriptions_client_terminates_when_qos_eq_2(_) -> meck:expect(emqtt, connected, fun(cast, ?PUBLISH_PACKET(?QOS_2, _PacketId), _State) -> ok = counters:add(CRef, 1, 1), - {stop, {shutdown, for_testiong}}; + {stop, {shutdown, for_testing}}; (Arg1, ARg2, Arg3) -> meck:passthrough([Arg1, ARg2, Arg3]) end), - {ok, Sub1} = emqtt:start_link([{proto_ver, v5}, + {ok, Sub1} = emqtt:start_link([ {proto_ver, v5}, {clientid, <<"sub_client_1">>}, - {keepalive, 5}]), - {ok, _} = emqtt:connect(Sub1), + {keepalive, 5} | Config + ]), + {ok, _} = emqtt:ConnFun(Sub1), {ok, _, [2]} = emqtt:subscribe(Sub1, SharedTopic, qos2), {ok, Sub2} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"sub_client_2">>}, - {keepalive, 5}]), - {ok, _} = emqtt:connect(Sub2), + {keepalive, 5} | Config]), + {ok, _} = emqtt:ConnFun(Sub2), {ok, _, [2]} = emqtt:subscribe(Sub2, SharedTopic, qos2), - {ok, Pub} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"pub_client">>}]), - {ok, _} = emqtt:connect(Pub), + {ok, Pub} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"pub_client">>} | Config]), + {ok, _} = emqtt:ConnFun(Pub), {ok, _} = emqtt:publish(Pub, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 2), receive - {'EXIT', _,{shutdown, for_testiong}} -> + {'EXIT', _,{shutdown, for_testing}} -> ok after 1000 -> - error("disconnected timeout") + ct:fail("disconnected timeout") end, ?assertEqual(1, counters:get(CRef, 1)),