From f29a8b0d5ce6d267959203baeece3a59f5a252dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=A5=87=E6=80=AA?= Date: Fri, 7 Feb 2020 10:07:36 +0800 Subject: [PATCH] Mqtt protocol tests (#3237) Add more test cases for MQTT --- src/emqx_connection.erl | 8 +- src/emqx_packet.erl | 3 + test/mqtt_protocol_v5_SUITE.erl | 499 +++++++++++++++++++++++++++++++- 3 files changed, 496 insertions(+), 14 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 11683a6f2..a3d3e41a6 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -136,10 +136,10 @@ info(sockstate, #state{sockstate = SockSt}) -> SockSt; info(active_n, #state{active_n = ActiveN}) -> ActiveN; -info(stats_timer, #state{stats_timer = Stats_timer}) -> - Stats_timer; -info(limit_timer, #state{limit_timer = Limit_timer}) -> - Limit_timer; +info(stats_timer, #state{stats_timer = StatsTimer}) -> + StatsTimer; +info(limit_timer, #state{limit_timer = LimitTimer}) -> + LimitTimer; info(limiter, #state{limiter = Limiter}) -> maybe_apply(fun emqx_limiter:info/1, Limiter). diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 6c17c17cb..354229f99 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -345,6 +345,9 @@ check_will_msg(#mqtt_packet_connect{will_flag = false}, _Caps) -> check_will_msg(#mqtt_packet_connect{will_retain = true}, _Opts = #{mqtt_retain_available := false}) -> {error, ?RC_RETAIN_NOT_SUPPORTED}; +check_will_msg(#mqtt_packet_connect{will_qos = WillQoS}, + _Opts = #{max_qos_allowed := MaxQoS}) when WillQoS > MaxQoS -> + {error, ?RC_QOS_NOT_SUPPORTED}; check_will_msg(#mqtt_packet_connect{will_topic = WillTopic}, _Opts) -> try emqx_topic:validate(name, WillTopic) of true -> ok diff --git a/test/mqtt_protocol_v5_SUITE.erl b/test/mqtt_protocol_v5_SUITE.erl index 01d4391f0..ddc20c040 100644 --- a/test/mqtt_protocol_v5_SUITE.erl +++ b/test/mqtt_protocol_v5_SUITE.erl @@ -48,6 +48,9 @@ end_per_suite(_Config) -> %% Helpers %%-------------------------------------------------------------------- +client_info(Key, Client) -> + maps:get(Key, maps:from_list(emqtt:info(Client)), undefined). + receive_messages(Count) -> receive_messages(Count, []). @@ -63,6 +66,14 @@ receive_messages(Count, Msgs) -> Msgs end. +receive_disconnect_reasoncode() -> + receive + {disconnected, ReasonCode, _} -> ReasonCode; + _Other -> receive_disconnect_reasoncode() + after 100 -> + error("no disconnect packet") + end. + clean_retained(Topic) -> {ok, Clean} = emqtt:start_link([{clean_start, true}]), {ok, _} = emqtt:connect(Clean), @@ -90,6 +101,106 @@ t_basic_test(_) -> %% Connection %%-------------------------------------------------------------------- +t_connect_clean_start(_) -> + {ok, Client1} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>},{proto_ver, v5},{clean_start, true}]), + {ok, _} = emqtt:connect(Client1), + ?assertEqual(0, client_info(session_present, Client1)), %% [MQTT-3.1.2-4] + ok = emqtt:pause(Client1), + {ok, Client2} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>},{proto_ver, v5},{clean_start, false}]), + {ok, _} = emqtt:connect(Client2), + ?assertEqual(1, client_info(session_present, Client2)), %% [MQTT-3.1.2-5] + ok = emqtt:disconnect(Client2), + {ok, Client3} = emqtt:start_link([{clientid, <<"new_client">>},{proto_ver, v5},{clean_start, false}]), + {ok, _} = emqtt:connect(Client3), + ?assertEqual(0, client_info(session_present, Client3)), %% [MQTT-3.1.2-6] + ok = emqtt:disconnect(Client3). + +t_connect_will_message(_) -> + Topic = nth(1, ?TOPICS), + Payload = "will message", + + {ok, Client1} = emqtt:start_link([ + {proto_ver, v5}, + {clean_start, true}, + {will_flag, true}, + {will_topic, Topic}, + {will_payload, Payload} + ]), + {ok, _} = emqtt:connect(Client1), + [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client1)), + ?assertNotEqual(undefined, maps:find(will_msg, emqx_connection:info(sys:get_state(ClientPid)))), %% [MQTT-3.1.2-7] + + {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client2), + {ok, _, [2]} = emqtt:subscribe(Client2, Topic, qos2), + + ok = emqtt:disconnect(Client1, 4), %% [MQTT-3.14.2-1] + [Msg | _ ] = receive_messages(1), + %% [MQTT-3.1.2-8] + ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)), + ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)), + ?assertEqual({ok, 0}, maps:find(qos, Msg)), + ok = emqtt:disconnect(Client2), + + {ok, Client3} = emqtt:start_link([ + {proto_ver, v5}, + {clean_start, true}, + {will_flag, true}, + {will_topic, Topic}, + {will_payload, Payload} + ]), + {ok, _} = emqtt:connect(Client3), + + {ok, Client4} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client4), + {ok, _, [2]} = emqtt:subscribe(Client4, Topic, qos2), + ok = emqtt:disconnect(Client3), + ?assertEqual(0, length(receive_messages(1))), %% [MQTT-3.1.2-10] + ok = emqtt:disconnect(Client4). + +t_connect_will_retain(_) -> + Topic = nth(1, ?TOPICS), + Payload = "will message", + + {ok, Client1} = emqtt:start_link([ + {proto_ver, v5}, + {clean_start, true}, + {will_flag, true}, + {will_topic, Topic}, + {will_payload, Payload}, + {will_retain, false} + ]), + {ok, _} = emqtt:connect(Client1), + + {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client2), + {ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{rap, true}, {qos, 2}]}]), + + ok = emqtt:disconnect(Client1, 4), + [Msg1 | _ ] = receive_messages(1), + ?assertEqual({ok, false}, maps:find(retain, Msg1)), %% [MQTT-3.1.2-14] + ok = emqtt:disconnect(Client2), + + {ok, Client3} = emqtt:start_link([ + {proto_ver, v5}, + {clean_start, true}, + {will_flag, true}, + {will_topic, Topic}, + {will_payload, Payload}, + {will_retain, true} + ]), + {ok, _} = emqtt:connect(Client3), + + {ok, Client4} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client4), + {ok, _, [2]} = emqtt:subscribe(Client4, #{}, [{Topic, [{rap, true}, {qos, 2}]}]), + + ok = emqtt:disconnect(Client3, 4), + [Msg2 | _ ] = receive_messages(1), + ?assertEqual({ok, true}, maps:find(retain, Msg2)), %% [MQTT-3.1.2-15] + ok = emqtt:disconnect(Client4), + clean_retained(Topic). + t_connect_idle_timeout(_) -> IdleTimeout = 2000, emqx_zone:set_env(external, idle_timeout, IdleTimeout), @@ -107,9 +218,9 @@ t_connect_limit_timeout(_) -> Topic = nth(1, ?TOPICS), emqx_zone:set_env(external, publish_limit, {2.0, 3}), - {ok, Client} = emqtt:start_link([{clientid, <<"t_connect_limit_timeout">>},{proto_ver, v5},{keepalive, 60}]), + {ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]), {ok, _} = emqtt:connect(Client), - [ClientPid] = emqx_cm:lookup_channels(<<"t_connect_limit_timeout">>), + [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)), ?assertEqual(undefined, emqx_connection:info(limit_timer, sys:get_state(ClientPid))), ok = emqtt:publish(Client, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 0), @@ -119,21 +230,21 @@ t_connect_limit_timeout(_) -> timer:sleep(200), ?assert(is_reference(emqx_connection:info(limit_timer, sys:get_state(ClientPid)))), - emqtt:disconnect(Client), + ok = emqtt:disconnect(Client), meck:unload(proplists). t_connect_emit_stats_timeout(_) -> IdleTimeout = 2000, emqx_zone:set_env(external, idle_timeout, IdleTimeout), - {ok, Client} = emqtt:start_link([{clientid, <<"t_connect_emit_stats_timeout">>},{proto_ver, v5},{keepalive, 60}]), + {ok, Client} = emqtt:start_link([{proto_ver, v5},{keepalive, 60}]), {ok, _} = emqtt:connect(Client), - [ClientPid] = emqx_cm:lookup_channels(<<"t_connect_emit_stats_timeout">>), + [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)), ?assert(is_reference(emqx_connection:info(stats_timer, sys:get_state(ClientPid)))), timer:sleep(IdleTimeout), ?assertEqual(undefined, emqx_connection:info(stats_timer, sys:get_state(ClientPid))), - emqtt:disconnect(Client). + ok = emqtt:disconnect(Client). %% [MQTT-3.1.2-22] t_connect_keepalive_timeout(_) -> @@ -144,13 +255,380 @@ t_connect_keepalive_timeout(_) -> {ok, _} = emqtt:connect(Client), emqtt:pause(Client), receive - Msg -> - ReasonCode = 141, - ?assertMatch({disconnected, ReasonCode, _Channel}, Msg) + {disconnected, ReasonCode, _Channel} -> ?assertEqual(141, ReasonCode) after round(timer:seconds(Keepalive) * 2 * 1.5 ) -> error("keepalive timeout") end. +%% [MQTT-3.1.2-23] +t_connect_session_expiry_interval(_) -> + Topic = nth(1, ?TOPICS), + Payload = "test message", + + {ok, Client1} = emqtt:start_link([ + {clientid, <<"t_connect_session_expiry_interval">>}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 7200}} + ]), + {ok, _} = emqtt:connect(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2), + ok = emqtt:disconnect(Client1), + + {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client2), + {ok, 2} = emqtt:publish(Client2, Topic, Payload, 2), + ok = emqtt:disconnect(Client2), + + {ok, Client3} = emqtt:start_link([ + {clientid, <<"t_connect_session_expiry_interval">>}, + {proto_ver, v5}, + {clean_start, false} + ]), + {ok, _} = emqtt:connect(Client3), + [Msg | _ ] = receive_messages(1), + ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)), + ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)), + ?assertEqual({ok, 2}, maps:find(qos, Msg)), + ok = emqtt:disconnect(Client3). + +%% [MQTT-3.1.3-9] +t_connect_will_delay_interval(_) -> + process_flag(trap_exit, true), + Topic = nth(1, ?TOPICS), + Payload = "will message", + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2), + + {ok, Client2} = emqtt:start_link([ + {clientid, <<"t_connect_will_delay_interval">>}, + {proto_ver, v5}, + {clean_start, true}, + {will_flag, true}, + {will_qos, 2}, + {will_topic, Topic}, + {will_payload, Payload}, + {will_props, #{'Will-Delay-Interval' => 3}}, + {properties, #{'Session-Expiry-Interval' => 7200}}, + {keepalive, 2} + ]), + {ok, _} = emqtt:connect(Client2), + + timer:sleep(5000), + ?assertEqual(0, length(receive_messages(1))), + timer:sleep(7000), + ?assertEqual(1, length(receive_messages(1))), + + {ok, Client3} = emqtt:start_link([ + {clientid, <<"t_connect_will_delay_interval">>}, + {proto_ver, v5}, + {clean_start, true}, + {will_flag, true}, + {will_qos, 2}, + {will_topic, Topic}, + {will_payload, Payload}, + {will_props, #{'Will-Delay-Interval' => 7200}}, + {properties, #{'Session-Expiry-Interval' => 3}}, + {keepalive, 2} + ]), + {ok, _} = emqtt:connect(Client3), + + timer:sleep(5000), + ?assertEqual(0, length(receive_messages(1))), + timer:sleep(7000), + ?assertEqual(1, length(receive_messages(1))), + + ok = emqtt:disconnect(Client1), + process_flag(trap_exit, false). + +%% [MQTT-3.1.4-3] +t_connect_duplicate_clientid(_) -> + {ok, Client1} = emqtt:start_link([ + {clientid, <<"t_connect_duplicate_clientid">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(Client1), + {ok, Client2} = emqtt:start_link([ + {clientid, <<"t_connect_duplicate_clientid">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(Client2), + ?assertEqual(142, receive_disconnect_reasoncode()). + +%%-------------------------------------------------------------------- +%% Connack +%%-------------------------------------------------------------------- + +t_connack_session_present(_) -> + {ok, Client1} = emqtt:start_link([ + {clientid, <<"t_connect_duplicate_clientid">>}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 7200}}, + {clean_start, true} + ]), + {ok, _} = emqtt:connect(Client1), + ?assertEqual(0, client_info(session_present, Client1)), %% [MQTT-3.2.2-2] + ok = emqtt:disconnect(Client1), + + {ok, Client2} = emqtt:start_link([ + {clientid, <<"t_connect_duplicate_clientid">>}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 7200}}, + {clean_start, false} + ]), + {ok, _} = emqtt:connect(Client2), + ?assertEqual(1, client_info(session_present, Client2)), %% [[MQTT-3.2.2-3]] + ok = emqtt:disconnect(Client2). + +t_connack_max_qos_allowed(_) -> + process_flag(trap_exit, true), + Topic = nth(1, ?TOPICS), + + %% max_qos_allowed = 0 + emqx_zone:set_env(external, max_qos_allowed, 0), + persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), + persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, Connack1} = emqtt:connect(Client1), + ?assertEqual(0, maps:get('Maximum-QoS',Connack1)), %% [MQTT-3.2.2-9] + + {ok, _, [0]} = emqtt:subscribe(Client1, Topic, 0), %% [MQTT-3.2.2-10] + {ok, _, [1]} = emqtt:subscribe(Client1, Topic, 1), %% [MQTT-3.2.2-10] + {ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2), %% [MQTT-3.2.2-10] + + {ok, _} = emqtt:publish(Client1, Topic, <<"Unsupported Qos 1">>, qos1), + ?assertEqual(155, receive_disconnect_reasoncode()), %% [MQTT-3.2.2-11] + + {ok, Client2} = emqtt:start_link([ + {proto_ver, v5}, + {will_flag, true}, + {will_topic, Topic}, + {will_payload, <<"Unsupported Qos">>}, + {will_qos, 2} + ]), + {error, Connack2} = emqtt:connect(Client2), + ?assertMatch({qos_not_supported,_ }, Connack2), %% [MQTT-3.2.2-12] + + %% max_qos_allowed = 1 + emqx_zone:set_env(external, max_qos_allowed, 1), + persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), + persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), + + {ok, Client3} = emqtt:start_link([{proto_ver, v5}]), + {ok, Connack3} = emqtt:connect(Client3), + ?assertEqual(1, maps:get('Maximum-QoS',Connack3)), %% [MQTT-3.2.2-9] + + {ok, _, [0]} = emqtt:subscribe(Client3, Topic, 0), %% [MQTT-3.2.2-10] + {ok, _, [1]} = emqtt:subscribe(Client3, Topic, 1), %% [MQTT-3.2.2-10] + {ok, _, [2]} = emqtt:subscribe(Client3, Topic, 2), %% [MQTT-3.2.2-10] + + {ok, _} = emqtt:publish(Client3, Topic, <<"Unsupported Qos 2">>, qos2), + ?assertEqual(155, receive_disconnect_reasoncode()), %% [MQTT-3.2.2-11] + + {ok, Client4} = emqtt:start_link([ + {proto_ver, v5}, + {will_flag, true}, + {will_topic, Topic}, + {will_payload, <<"Unsupported Qos">>}, + {will_qos, 2} + ]), + {error, Connack4} = emqtt:connect(Client4), + ?assertMatch({qos_not_supported,_ }, Connack4), %% [MQTT-3.2.2-12] + receive + {'EXIT', _, {shutdown,qos_not_supported}} -> ok + after 100 -> error("t_connack_max_qos_allowed") + end, + + %% max_qos_allowed = 2 + emqx_zone:set_env(external, max_qos_allowed, 2), + persistent_term:erase({emqx_zone, external, '$mqtt_caps'}), + persistent_term:erase({emqx_zone, external, '$mqtt_pub_caps'}), + + {ok, Client5} = emqtt:start_link([{proto_ver, v5}]), + {ok, Connack5} = emqtt:connect(Client5), + ?assertEqual(2, maps:get('Maximum-QoS',Connack5)), %% [MQTT-3.2.2-9] + ok = emqtt:disconnect(Client5), + + process_flag(trap_exit, false). + +t_connack_assigned_clienid(_) -> + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + ?assert(is_binary(client_info(clientid, Client1))), %% [MQTT-3.2.2-16] + ok = emqtt:disconnect(Client1). + +%%-------------------------------------------------------------------- +%% Publish +%%-------------------------------------------------------------------- + +t_publish_rap(_) -> + Topic = nth(1, ?TOPICS), + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{rap, true}, {qos, 2}]}]), + {ok, _} = emqtt:publish(Client1, Topic, #{}, <<"retained message">>, [{qos, ?QOS_1}, {retain, true}]), + [Msg1 | _] = receive_messages(1), + ?assertEqual(true, maps:get(retain, Msg1)), %% [MQTT-3.3.1-12] + ok = emqtt:disconnect(Client1), + + {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client2), + {ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{rap, false}, {qos, 2}]}]), + {ok, _} = emqtt:publish(Client2, Topic, #{}, <<"retained message">>, [{qos, ?QOS_1}, {retain, true}]), + [Msg2 | _] = receive_messages(1), + ?assertEqual(false, maps:get(retain, Msg2)), %% [MQTT-3.3.1-13] + ok = emqtt:disconnect(Client2), + + clean_retained(Topic). + +t_publish_wildtopic(_) -> + process_flag(trap_exit, true), + Topic = nth(1, ?WILD_TOPICS), + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + ok = emqtt:publish(Client1, Topic, <<"error topic">>), + ?assertEqual(144, receive_disconnect_reasoncode()), + + process_flag(trap_exit, false). + +t_publish_payload_format_indicator(_) -> + Topic = nth(1, ?TOPICS), + Properties = #{'Payload-Format-Indicator' => 233}, + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2), + ok = emqtt:publish(Client1, Topic, Properties, <<"Payload Format Indicator">>, [{qos, ?QOS_0}]), + [Msg1 | _] = receive_messages(1), + ?assertEqual(Properties, maps:get(properties, Msg1)), %% [MQTT-3.3.2-6] + ok = emqtt:disconnect(Client1). + +t_publish_topic_alias(_) -> + process_flag(trap_exit, true), + Topic = nth(1, ?TOPICS), + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + ok = emqtt:publish(Client1, Topic, #{'Topic-Alias' => 0}, <<"Topic-Alias">>, [{qos, ?QOS_0}]), + ?assertEqual(148, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-8] + + {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client2), + {ok, _, [2]} = emqtt:subscribe(Client2, Topic, qos2), + ok = emqtt:publish(Client2, Topic, #{'Topic-Alias' => 233}, <<"Topic-Alias">>, [{qos, ?QOS_0}]), + ok = emqtt:publish(Client2, <<"">>, #{'Topic-Alias' => 233}, <<"Topic-Alias">>, [{qos, ?QOS_0}]), + ?assertEqual(2, length(receive_messages(2))), %% [MQTT-3.3.2-12] + ok = emqtt:disconnect(Client2), + process_flag(trap_exit, false). + +t_publish_response_topic(_) -> + process_flag(trap_exit, true), + Topic = nth(1, ?TOPICS), + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + ok = emqtt:publish(Client1, Topic, #{'Response-Topic' => nth(1, ?WILD_TOPICS)}, <<"Response-Topic">>, [{qos, ?QOS_0}]), + ?assertEqual(130, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-14] + + process_flag(trap_exit, false). + +t_publish_properties(_) -> + Topic = nth(1, ?TOPICS), + Properties = #{ + 'Response-Topic' => Topic, %% [MQTT-3.3.2-15] + 'Correlation-Data' => <<"233">>, %% [MQTT-3.3.2-16] + 'User-Property' => [{<<"a">>, <<"2333">>}], %% [MQTT-3.3.2-18] + 'Content-Type' => <<"2333">> %% [MQTT-3.3.2-20] + }, + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2), + ok = emqtt:publish(Client1, Topic, Properties, <<"Publish Properties">>, [{qos, ?QOS_0}]), + [Msg1 | _] = receive_messages(1), + ?assertEqual(Properties, maps:get(properties, Msg1)), %% [MQTT-3.3.2-16] + ok = emqtt:disconnect(Client1). + +t_publish_overlapping_subscriptions(_) -> + Topic = nth(1, ?TOPICS), + Properties = #{'Subscription-Identifier' => 2333}, + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + {ok, _, [1]} = emqtt:subscribe(Client1, Properties, nth(1, ?WILD_TOPICS), qos1), + {ok, _, [0]} = emqtt:subscribe(Client1, Properties, nth(3, ?WILD_TOPICS), qos0), + {ok, _} = emqtt:publish(Client1, Topic, #{}, <<"t_publish_overlapping_subscriptions">>, [{qos, ?QOS_2}]), + + [Msg1 | _ ] = receive_messages(2), + ?assert( maps:get(qos, Msg1) < 2 ), %% [MQTT-3.3.4-2] + ?assertEqual(Properties, maps:get(properties, Msg1)), %% [MQTT-3.3.4-3] + ok = emqtt:disconnect(Client1). + +%%-------------------------------------------------------------------- +%% Subsctibe +%%-------------------------------------------------------------------- + +t_subscribe_no_local(_) -> + Topic = nth(1, ?TOPICS), + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{nl, true}, {qos, 2}]}]), + + {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client2), + {ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{nl, true}, {qos, 2}]}]), + + ok = emqtt:publish(Client1, Topic, <<"t_subscribe_no_local">>, 0), + ?assertEqual(1, length(receive_messages(2))), %% [MQTT-3.8.3-3] + ok = emqtt:disconnect(Client1). + +t_subscribe_actions(_) -> + Topic = nth(1, ?TOPICS), + Properties = #{'Subscription-Identifier' => 2333}, + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, Properties, Topic, qos2), + {ok, _, [1]} = emqtt:subscribe(Client1, Properties, Topic, qos1), + {ok, _} = emqtt:publish(Client1, Topic, <<"t_subscribe_actions">>, 2), + [Msg1 | _ ] = receive_messages(1), + ?assertEqual(1, maps:get(qos, Msg1)), %% [MQTT-3.8.4-3] [MQTT-3.8.4-8] + + {ok, _, [2,2]} = emqtt:subscribe(Client1, [{nth(1, ?TOPICS), qos2}, {nth(2, ?TOPICS), qos2}] ), %% [MQTT-3.8.4-5] [MQTT-3.8.4-6] [MQTT-3.8.4-7] + ok = emqtt:disconnect(Client1). +%%-------------------------------------------------------------------- +%% Unsubsctibe Unsuback +%%-------------------------------------------------------------------- + +t_unscbsctibe(_) -> + Topic1 = nth(1, ?TOPICS), + Topic2 = nth(2, ?TOPICS), + + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, Topic1, qos2), + {ok, _, [0]} = emqtt:unsubscribe(Client1, Topic1), %% [MQTT-3.10.4-4] + {ok, _, [17]} = emqtt:unsubscribe(Client1, <<"noExistTopic">>), %% [MQTT-3.10.4-5] + + {ok, _, [2, 2]} = emqtt:subscribe(Client1, [{Topic1, qos2}, {Topic2, qos2}]), + {ok, _, [0, 0, 17]} = emqtt:unsubscribe(Client1, [Topic1, Topic2, <<"noExistTopic">>]), %% [[MQTT-3.10.4-6]] [MQTT-3.11.3-1] [MQTT-3.11.3-2] + ok = emqtt:disconnect(Client1). + +%%-------------------------------------------------------------------- +%% Pingreq +%%-------------------------------------------------------------------- + +t_pingreq(_) -> + {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + pong = emqtt:ping(Client1), %% [MQTT-3.12.4-1] + ok = emqtt:disconnect(Client1). + %%-------------------------------------------------------------------- %% Shared Subscriptions %%-------------------------------------------------------------------- @@ -193,4 +671,5 @@ t_shared_subscriptions_client_terminates_when_qos_eq_2(_) -> error("disconnected timeout") end, - ?assertEqual(1, counters:get(CRef, 1)). + ?assertEqual(1, counters:get(CRef, 1)), + process_flag(trap_exit, false).