From a2d5b834dadedcaa48927583625ad725c9f96e9b Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 8 Aug 2019 22:39:16 +0800 Subject: [PATCH] Improve emqx_pmon module and add more test cases --- src/emqx_pmon.erl | 49 +++--- test/emqx_alarm_handler_SUITE.erl | 103 ++++++++++++ test/emqx_channel_SUITE.erl | 57 +++++++ test/emqx_client_SUITE.erl | 259 ++++++++++++++++++++++++++++++ test/emqx_flapping_SUITE.erl | 56 +++++++ test/emqx_gc_SUITE.erl | 12 +- test/emqx_mod_rewrite_SUITE.erl | 81 ++++++++++ test/emqx_pmon_SUITE.erl | 4 + test/emqx_protocol_SUITE.erl | 144 +++++++++++++++++ test/emqx_shared_sub_SUITE.erl | 251 +++++++++++++++++++++++++++++ test/emqx_ws_channel_SUITE.erl | 57 +++++++ 11 files changed, 1047 insertions(+), 26 deletions(-) create mode 100644 test/emqx_alarm_handler_SUITE.erl create mode 100644 test/emqx_channel_SUITE.erl create mode 100644 test/emqx_client_SUITE.erl create mode 100644 test/emqx_flapping_SUITE.erl create mode 100644 test/emqx_mod_rewrite_SUITE.erl create mode 100644 test/emqx_protocol_SUITE.erl create mode 100644 test/emqx_shared_sub_SUITE.erl create mode 100644 test/emqx_ws_channel_SUITE.erl diff --git a/src/emqx_pmon.erl b/src/emqx_pmon.erl index 52d7a2619..eb74918cf 100644 --- a/src/emqx_pmon.erl +++ b/src/emqx_pmon.erl @@ -36,47 +36,49 @@ -opaque(pmon() :: {?MODULE, map()}). +-define(PMON(Map), {?MODULE, Map}). + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- -spec(new() -> pmon()). -new() -> - {?MODULE, maps:new()}. +new() -> ?PMON(maps:new()). -spec(monitor(pid(), pmon()) -> pmon()). -monitor(Pid, PM) -> - ?MODULE:monitor(Pid, undefined, PM). +monitor(Pid, PMon) -> + ?MODULE:monitor(Pid, undefined, PMon). -spec(monitor(pid(), term(), pmon()) -> pmon()). -monitor(Pid, Val, {?MODULE, PM}) -> - {?MODULE, case maps:is_key(Pid, PM) of - true -> PM; - false -> Ref = erlang:monitor(process, Pid), - maps:put(Pid, {Ref, Val}, PM) - end}. +monitor(Pid, Val, PMon = ?PMON(Map)) -> + case maps:is_key(Pid, Map) of + true -> PMon; + false -> + Ref = erlang:monitor(process, Pid), + ?PMON(maps:put(Pid, {Ref, Val}, Map)) + end. -spec(demonitor(pid(), pmon()) -> pmon()). -demonitor(Pid, {?MODULE, PM}) -> - {?MODULE, case maps:find(Pid, PM) of - {ok, {Ref, _Val}} -> - %% flush - _ = erlang:demonitor(Ref, [flush]), - maps:remove(Pid, PM); - error -> PM - end}. +demonitor(Pid, PMon = ?PMON(Map)) -> + case maps:find(Pid, Map) of + {ok, {Ref, _Val}} -> + %% flush + _ = erlang:demonitor(Ref, [flush]), + ?PMON(maps:remove(Pid, Map)); + error -> PMon + end. -spec(find(pid(), pmon()) -> error | {ok, term()}). -find(Pid, {?MODULE, PM}) -> - case maps:find(Pid, PM) of +find(Pid, ?PMON(Map)) -> + case maps:find(Pid, Map) of {ok, {_Ref, Val}} -> {ok, Val}; error -> error end. -spec(erase(pid(), pmon()) -> pmon()). -erase(Pid, {?MODULE, PM}) -> - {?MODULE, maps:remove(Pid, PM)}. +erase(Pid, ?PMON(Map)) -> + ?PMON(maps:remove(Pid, Map)). -spec(erase_all([pid()], pmon()) -> {[{pid(), term()}], pmon()}). erase_all(Pids, PMon0) -> @@ -90,6 +92,5 @@ erase_all(Pids, PMon0) -> end, {[], PMon0}, Pids). -spec(count(pmon()) -> non_neg_integer()). -count({?MODULE, PM}) -> - maps:size(PM). +count(?PMON(Map)) -> maps:size(Map). diff --git a/test/emqx_alarm_handler_SUITE.erl b/test/emqx_alarm_handler_SUITE.erl new file mode 100644 index 000000000..91cde5c11 --- /dev/null +++ b/test/emqx_alarm_handler_SUITE.erl @@ -0,0 +1,103 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_alarm_handler_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx.hrl"). +-include("emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + emqx_ct_helpers:start_apps([], fun set_special_configs/1), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +set_special_configs(emqx) -> + AclFile = emqx_ct_helpers:deps_path(emqx, "test/emqx_access_SUITE_data/acl_deny_action.conf"), + application:set_env(emqx, acl_file, AclFile); +set_special_configs(_App) -> ok. + +t_alarm_handler(_) -> + with_connection( + fun(Sock) -> + emqx_client_sock:send(Sock, + raw_send_serialize( + ?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_ver = ?MQTT_PROTO_V5}) + )), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?CONNACK_PACKET(?RC_SUCCESS), <<>>, _} = raw_recv_parse(Data), + + Topic1 = emqx_topic:systop(<<"alarms/alert">>), + Topic2 = emqx_topic:systop(<<"alarms/clear">>), + SubOpts = #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0}, + emqx_client_sock:send(Sock, + raw_send_serialize( + ?SUBSCRIBE_PACKET( + 1, + [{Topic1, SubOpts}, + {Topic2, SubOpts}]) + )), + + {ok, Data2} = gen_tcp:recv(Sock, 0), + {ok, ?SUBACK_PACKET(1, #{}, [2, 2]), <<>>, _} = raw_recv_parse(Data2), + + alarm_handler:set_alarm({alarm_for_test, #alarm{id = alarm_for_test, + severity = error, + title="alarm title", + summary="alarm summary" + }}), + + {ok, Data3} = gen_tcp:recv(Sock, 0), + + {ok, ?PUBLISH_PACKET(?QOS_0, Topic1, _, _), <<>>, _} = raw_recv_parse(Data3), + + ?assertEqual(true, lists:keymember(alarm_for_test, 1, emqx_alarm_handler:get_alarms())), + + alarm_handler:clear_alarm(alarm_for_test), + + {ok, Data4} = gen_tcp:recv(Sock, 0), + + {ok, ?PUBLISH_PACKET(?QOS_0, Topic2, _, _), <<>>, _} = raw_recv_parse(Data4), + + ?assertEqual(false, lists:keymember(alarm_for_test, 1, emqx_alarm_handler:get_alarms())) + + end). + +with_connection(DoFun) -> + {ok, Sock} = emqx_client_sock:connect({127, 0, 0, 1}, 1883, + [binary, {packet, raw}, {active, false}], + 3000), + try + DoFun(Sock) + after + emqx_client_sock:close(Sock) + end. + +raw_send_serialize(Packet) -> + emqx_frame:serialize(Packet, ?MQTT_PROTO_V5). + +raw_recv_parse(Bin) -> + emqx_frame:parse(Bin, emqx_frame:initial_parse_state(#{version => ?MQTT_PROTO_V5})). + diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl new file mode 100644 index 000000000..489a36a35 --- /dev/null +++ b/test/emqx_channel_SUITE.erl @@ -0,0 +1,57 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_channel_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + emqx_ct_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +t_basic(_) -> + Topic = <<"TopicA">>, + {ok, C} = emqtt:start_link([{port, 1883}]), + {ok, _} = emqtt:ws_connect(C), + {ok, _, [1]} = emqtt:subscribe(C, Topic, qos1), + {ok, _, [2]} = emqtt:subscribe(C, Topic, qos2), + {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2), + {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2), + {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2), + ?assertEqual(3, length(recv_msgs(3))), + ok = emqtt:disconnect(C). + +recv_msgs(Count) -> + recv_msgs(Count, []). + +recv_msgs(0, Msgs) -> + Msgs; +recv_msgs(Count, Msgs) -> + receive + {publish, Msg} -> + recv_msgs(Count-1, [Msg|Msgs]) + after 100 -> + Msgs + end. + diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl new file mode 100644 index 000000000..c475b7ca8 --- /dev/null +++ b/test/emqx_client_SUITE.erl @@ -0,0 +1,259 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_client_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-import(lists, [nth/2]). + +-include("emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(TOPICS, [<<"TopicA">>, + <<"TopicA/B">>, + <<"Topic/C">>, + <<"TopicA/C">>, + <<"/TopicA">> + ]). + +-define(WILD_TOPICS, [<<"TopicA/+">>, + <<"+/C">>, + <<"#">>, + <<"/#">>, + <<"/+">>, + <<"+/+">>, + <<"TopicA/#">> + ]). + + +all() -> + [{group, mqttv3}, + {group, mqttv4}, + {group, mqttv5} + ]. + +groups() -> + [{mqttv3, [non_parallel_tests], + [t_basic_v3 + ]}, + {mqttv4, [non_parallel_tests], + [t_basic_v4, + t_will_message, + %% t_offline_message_queueing, + t_overlapping_subscriptions, + %% t_keepalive, + %% t_redelivery_on_reconnect, + %% subscribe_failure_test, + t_dollar_topics_test + ]}, + {mqttv5, [non_parallel_tests], + [t_basic_with_props_v5 + ]} + ]. + +init_per_suite(Config) -> + emqx_ct_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +%%-------------------------------------------------------------------- +%% Test cases for MQTT v3 +%%-------------------------------------------------------------------- + +t_basic_v3(_) -> + t_basic([{proto_ver, v3}]). + +%%-------------------------------------------------------------------- +%% Test cases for MQTT v4 +%%-------------------------------------------------------------------- + +t_basic_v4(_Config) -> + t_basic([{proto_ver, v4}]). + +t_will_message(_Config) -> + {ok, C1} = emqx_client:start_link([{clean_start, true}, + {will_topic, nth(3, ?TOPICS)}, + {will_payload, <<"client disconnected">>}, + {keepalive, 1}]), + {ok, _} = emqx_client:connect(C1), + + {ok, C2} = emqx_client:start_link(), + {ok, _} = emqx_client:connect(C2), + + {ok, _, [2]} = emqx_client:subscribe(C2, nth(3, ?TOPICS), 2), + timer:sleep(5), + ok = emqx_client:stop(C1), + timer:sleep(5), + ?assertEqual(1, length(recv_msgs(1))), + ok = emqx_client:disconnect(C2), + ct:pal("Will message test succeeded"). + +t_offline_message_queueing(_) -> + {ok, C1} = emqx_client:start_link([{clean_start, false}, + {client_id, <<"c1">>}]), + {ok, _} = emqx_client:connect(C1), + + {ok, _, [2]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2), + ok = emqx_client:disconnect(C1), + {ok, C2} = emqx_client:start_link([{clean_start, true}, + {client_id, <<"c2">>}]), + {ok, _} = emqx_client:connect(C2), + + ok = emqx_client:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0), + {ok, _} = emqx_client:publish(C2, nth(3, ?TOPICS), <<"qos 1">>, 1), + {ok, _} = emqx_client:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2), + timer:sleep(10), + emqx_client:disconnect(C2), + {ok, C3} = emqx_client:start_link([{clean_start, false}, + {client_id, <<"c1">>}]), + {ok, _} = emqx_client:connect(C3), + + timer:sleep(10), + emqx_client:disconnect(C3), + ?assertEqual(3, length(recv_msgs(3))). + +t_overlapping_subscriptions(_) -> + {ok, C} = emqx_client:start_link([]), + {ok, _} = emqx_client:connect(C), + + {ok, _, [2, 1]} = emqx_client:subscribe(C, [{nth(7, ?WILD_TOPICS), 2}, + {nth(1, ?WILD_TOPICS), 1}]), + timer:sleep(10), + {ok, _} = emqx_client:publish(C, nth(4, ?TOPICS), <<"overlapping topic filters">>, 2), + timer:sleep(10), + + Num = length(recv_msgs(2)), + ?assert(lists:member(Num, [1, 2])), + if + Num == 1 -> + ct:pal("This server is publishing one message for all + matching overlapping subscriptions, not one for each."); + Num == 2 -> + ct:pal("This server is publishing one message per each + matching overlapping subscription."); + true -> ok + end, + emqx_client:disconnect(C). + +%% t_keepalive_test(_) -> +%% ct:print("Keepalive test starting"), +%% {ok, C1, _} = emqx_client:start_link([{clean_start, true}, +%% {keepalive, 5}, +%% {will_flag, true}, +%% {will_topic, nth(5, ?TOPICS)}, +%% %% {will_qos, 2}, +%% {will_payload, <<"keepalive expiry">>}]), +%% ok = emqx_client:pause(C1), +%% {ok, C2, _} = emqx_client:start_link([{clean_start, true}, +%% {keepalive, 0}]), +%% {ok, _, [2]} = emqx_client:subscribe(C2, nth(5, ?TOPICS), 2), +%% ok = emqx_client:disconnect(C2), +%% ?assertEqual(1, length(recv_msgs(1))), +%% ct:print("Keepalive test succeeded"). + +t_redelivery_on_reconnect(_) -> + ct:pal("Redelivery on reconnect test starting"), + {ok, C1} = emqx_client:start_link([{clean_start, false}, + {client_id, <<"c">>}]), + {ok, _} = emqx_client:connect(C1), + + {ok, _, [2]} = emqx_client:subscribe(C1, nth(7, ?WILD_TOPICS), 2), + timer:sleep(10), + ok = emqx_client:pause(C1), + {ok, _} = emqx_client:publish(C1, nth(2, ?TOPICS), <<>>, + [{qos, 1}, {retain, false}]), + {ok, _} = emqx_client:publish(C1, nth(4, ?TOPICS), <<>>, + [{qos, 2}, {retain, false}]), + timer:sleep(10), + ok = emqx_client:disconnect(C1), + ?assertEqual(0, length(recv_msgs(2))), + {ok, C2} = emqx_client:start_link([{clean_start, false}, + {client_id, <<"c">>}]), + {ok, _} = emqx_client:connect(C2), + + timer:sleep(10), + ok = emqx_client:disconnect(C2), + ?assertEqual(2, length(recv_msgs(2))). + +%% t_subscribe_sys_topics(_) -> +%% ct:print("Subscribe failure test starting"), +%% {ok, C, _} = emqx_client:start_link([]), +%% {ok, _, [2]} = emqx_client:subscribe(C, <<"$SYS/#">>, 2), +%% timer:sleep(10), +%% ct:print("Subscribe failure test succeeded"). + +t_dollar_topics(_) -> + ct:pal("$ topics test starting"), + {ok, C} = emqx_client:start_link([{clean_start, true}, + {keepalive, 0}]), + {ok, _} = emqx_client:connect(C), + + {ok, _, [1]} = emqx_client:subscribe(C, nth(6, ?WILD_TOPICS), 1), + {ok, _} = emqx_client:publish(C, << <<"$">>/binary, (nth(2, ?TOPICS))/binary>>, + <<"test">>, [{qos, 1}, {retain, false}]), + timer:sleep(10), + ?assertEqual(0, length(recv_msgs(1))), + ok = emqx_client:disconnect(C), + ct:pal("$ topics test succeeded"). + +%%-------------------------------------------------------------------- +%% Test cases for MQTT v5 +%%-------------------------------------------------------------------- + +t_basic_with_props_v5(_) -> + t_basic([{proto_ver, v5}, + {properties, #{'Receive-Maximum' => 4}} + ]). + +%%-------------------------------------------------------------------- +%% General test cases. +%%-------------------------------------------------------------------- + +t_basic(Opts) -> + Topic = nth(1, ?TOPICS), + {ok, C} = emqx_client:start_link([{proto_ver, v4}]), + {ok, _} = emqx_client:connect(C), + {ok, _, [1]} = emqx_client:subscribe(C, Topic, qos1), + {ok, _, [2]} = emqx_client:subscribe(C, Topic, qos2), + {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2), + {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2), + {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2), + ?assertEqual(3, length(recv_msgs(3))), + ok = emqx_client:disconnect(C). + +%%-------------------------------------------------------------------- +%% Helper functions +%%-------------------------------------------------------------------- + +recv_msgs(Count) -> + recv_msgs(Count, []). + +recv_msgs(0, Msgs) -> + Msgs; +recv_msgs(Count, Msgs) -> + receive + {publish, Msg} -> + recv_msgs(Count-1, [Msg|Msgs]); + _Other -> recv_msgs(Count, Msgs) %%TODO:: remove the branch? + after 100 -> + Msgs + end. + diff --git a/test/emqx_flapping_SUITE.erl b/test/emqx_flapping_SUITE.erl new file mode 100644 index 000000000..c587f25d4 --- /dev/null +++ b/test/emqx_flapping_SUITE.erl @@ -0,0 +1,56 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_flapping_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + emqx_ct_helpers:start_apps([]), + prepare_for_test(), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +t_flapping(_Config) -> + process_flag(trap_exit, true), + flapping_connect(5), + {ok, C} = emqx_client:start_link([{client_id, <<"Client">>}]), + {error, _} = emqx_client:connect(C), + receive + {'EXIT', Client, _Reason} -> + ct:log("receive exit signal, Client: ~p", [Client]) + after 1000 -> + ct:log("timeout") + end. + +flapping_connect(Times) -> + lists:foreach(fun do_connect/1, lists:seq(1, Times)). + +do_connect(_I) -> + {ok, C} = emqx_client:start_link([{client_id, <<"Client">>}]), + {ok, _} = emqx_client:connect(C), + ok = emqx_client:disconnect(C). + +prepare_for_test() -> + emqx_zone:set_env(external, enable_flapping_detect, true), + emqx_zone:set_env(external, flapping_threshold, {10, 60}), + emqx_zone:set_env(external, flapping_expiry_interval, 3600). + diff --git a/test/emqx_gc_SUITE.erl b/test/emqx_gc_SUITE.erl index 5e6c13317..1cdecebba 100644 --- a/test/emqx_gc_SUITE.erl +++ b/test/emqx_gc_SUITE.erl @@ -33,6 +33,9 @@ t_init(_) -> ?assertEqual(#{cnt => {10, 10}, oct => {10, 10}}, emqx_gc:info(GC3)). t_run(_) -> + Undefined = emqx_gc:init(false), + ?assertEqual(undefined, Undefined), + ?assertEqual({false, undefined}, emqx_gc:run(1, 1, Undefined)), GC = emqx_gc:init(#{count => 10, bytes => 10}), ?assertEqual({true, GC}, emqx_gc:run(1, 1000, GC)), ?assertEqual({true, GC}, emqx_gc:run(1000, 1, GC)), @@ -42,7 +45,10 @@ t_run(_) -> ?assertEqual(#{cnt => {10, 7}, oct => {10, 7}}, emqx_gc:info(GC2)), {false, GC3} = emqx_gc:run(3, 3, GC2), ?assertEqual(#{cnt => {10, 4}, oct => {10, 4}}, emqx_gc:info(GC3)), - ?assertEqual({true, GC}, emqx_gc:run(4, 4, GC3)). + ?assertEqual({true, GC}, emqx_gc:run(4, 4, GC3)), + %% Disabled? + DisabledGC = emqx_gc:init(#{count => 0, bytes => 0}), + ?assertEqual({false, DisabledGC}, emqx_gc:run(1, 1, DisabledGC)). t_info(_) -> ?assertEqual(undefined, emqx_gc:info(undefined)), @@ -54,5 +60,7 @@ t_reset(_) -> GC = emqx_gc:init(#{count => 10, bytes => 10}), {false, GC1} = emqx_gc:run(5, 5, GC), ?assertEqual(#{cnt => {10, 5}, oct => {10, 5}}, emqx_gc:info(GC1)), - ?assertEqual(GC, emqx_gc:reset(GC1)). + ?assertEqual(GC, emqx_gc:reset(GC1)), + DisabledGC = emqx_gc:init(#{count => 0, bytes => 0}), + ?assertEqual(DisabledGC, emqx_gc:reset(DisabledGC)). diff --git a/test/emqx_mod_rewrite_SUITE.erl b/test/emqx_mod_rewrite_SUITE.erl new file mode 100644 index 000000000..675918286 --- /dev/null +++ b/test/emqx_mod_rewrite_SUITE.erl @@ -0,0 +1,81 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mod_rewrite_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-import(emqx_mod_rewrite, + [ rewrite_subscribe/4 + , rewrite_unsubscribe/4 + , rewrite_publish/2 + ]). + +-include_lib("emqx.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(TEST_RULES, [<<"x/# ^x/y/(.+)$ z/y/$1">>, + <<"y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2">> + ]). + +all() -> emqx_ct:all(?MODULE). + +%%-------------------------------------------------------------------- +%% Test cases +%%-------------------------------------------------------------------- + +t_rewrite_subscribe(_) -> + ?assertEqual({ok, [{<<"test">>, #{}}]}, + rewrite(subscribe, [{<<"test">>, #{}}])), + ?assertEqual({ok, [{<<"z/y/test">>, #{}}]}, + rewrite(subscribe, [{<<"x/y/test">>, #{}}])), + ?assertEqual({ok, [{<<"y/z/test_topic">>, #{}}]}, + rewrite(subscribe, [{<<"y/test/z/test_topic">>, #{}}])). + +t_rewrite_unsubscribe(_) -> + ?assertEqual({ok, [{<<"test">>, #{}}]}, + rewrite(unsubscribe, [{<<"test">>, #{}}])), + ?assertEqual({ok, [{<<"z/y/test">>, #{}}]}, + rewrite(unsubscribe, [{<<"x/y/test">>, #{}}])), + ?assertEqual({ok, [{<<"y/z/test_topic">>, #{}}]}, + rewrite(unsubscribe, [{<<"y/test/z/test_topic">>, #{}}])). + +t_rewrite_publish(_) -> + ?assertMatch({ok, #message{topic = <<"test">>}}, + rewrite(publish, #message{topic = <<"test">>})), + ?assertMatch({ok, #message{topic = <<"z/y/test">>}}, + rewrite(publish, #message{topic = <<"x/y/test">>})), + ?assertMatch({ok, #message{topic = <<"y/z/test_topic">>}}, + rewrite(publish, #message{topic = <<"y/test/z/test_topic">>})). + +%%-------------------------------------------------------------------- +%% Helper functions +%%-------------------------------------------------------------------- + +rewrite(subscribe, TopicFilters) -> + rewrite_subscribe(#{}, #{}, TopicFilters, rules()); +rewrite(unsubscribe, TopicFilters) -> + rewrite_unsubscribe(#{}, #{}, TopicFilters, rules()); +rewrite(publish, Msg) -> rewrite_publish(Msg, rules()). + +rules() -> + [begin + [Topic, Re, Dest] = string:split(Rule, " ", all), + {ok, MP} = re:compile(Re), + {rewrite, Topic, MP, Dest} + end || Rule <- ?TEST_RULES]. + diff --git a/test/emqx_pmon_SUITE.erl b/test/emqx_pmon_SUITE.erl index f0c7ef64d..17c56729e 100644 --- a/test/emqx_pmon_SUITE.erl +++ b/test/emqx_pmon_SUITE.erl @@ -28,14 +28,17 @@ t_monitor(_) -> PMon1 = emqx_pmon:monitor(self(), PMon), ?assertEqual(1, emqx_pmon:count(PMon1)), PMon2 = emqx_pmon:demonitor(self(), PMon1), + PMon2 = emqx_pmon:demonitor(self(), PMon2), ?assertEqual(0, emqx_pmon:count(PMon2)). t_find(_) -> PMon = emqx_pmon:new(), PMon1 = emqx_pmon:monitor(self(), val, PMon), + PMon1 = emqx_pmon:monitor(self(), val, PMon1), ?assertEqual(1, emqx_pmon:count(PMon1)), ?assertEqual({ok, val}, emqx_pmon:find(self(), PMon1)), PMon2 = emqx_pmon:erase(self(), PMon1), + PMon2 = emqx_pmon:erase(self(), PMon1), ?assertEqual(error, emqx_pmon:find(self(), PMon2)). t_erase(_) -> @@ -44,6 +47,7 @@ t_erase(_) -> PMon2 = emqx_pmon:erase(self(), PMon1), ?assertEqual(0, emqx_pmon:count(PMon2)), {Items, PMon3} = emqx_pmon:erase_all([self()], PMon1), + {[], PMon3} = emqx_pmon:erase_all([self()], PMon3), ?assertEqual([{self(), val}], Items), ?assertEqual(0, emqx_pmon:count(PMon3)). diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl new file mode 100644 index 000000000..63aeda50e --- /dev/null +++ b/test/emqx_protocol_SUITE.erl @@ -0,0 +1,144 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_protocol_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-import(emqx_protocol, + [ handle_in/2 + , handle_out/2 + ]). + +-include("emqx.hrl"). +-include("emqx_mqtt.hrl"). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + emqx_ct_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +%%-------------------------------------------------------------------- +%% Test cases for handle_in +%%-------------------------------------------------------------------- + +t_handle_in_connect(_) -> + 'TODO'. + +t_handle_in_publish(_) -> + 'TODO'. + +t_handle_in_puback(_) -> + 'TODO'. + +t_handle_in_pubrec(_) -> + 'TODO'. + +t_handle_in_pubrel(_) -> + 'TODO'. + +t_handle_in_pubcomp(_) -> + 'TODO'. + +t_handle_in_subscribe(_) -> + 'TODO'. + +t_handle_in_unsubscribe(_) -> + 'TODO'. + +t_handle_in_pingreq(_) -> + with_proto(fun(PState) -> + {ok, ?PACKET(?PINGRESP), PState} = handle_in(?PACKET(?PINGREQ), PState) + end). + +t_handle_in_disconnect(_) -> + 'TODO'. + +t_handle_in_auth(_) -> + 'TODO'. + +%%-------------------------------------------------------------------- +%% Test cases for handle_deliver +%%-------------------------------------------------------------------- + +t_handle_deliver(_) -> + 'TODO'. + +%%-------------------------------------------------------------------- +%% Test cases for handle_out +%%-------------------------------------------------------------------- + +t_handle_out_conack(_) -> + 'TODO'. + +t_handle_out_publish(_) -> + 'TODO'. + +t_handle_out_puback(_) -> + 'TODO'. + +t_handle_out_pubrec(_) -> + 'TODO'. + +t_handle_out_pubrel(_) -> + 'TODO'. + +t_handle_out_pubcomp(_) -> + 'TODO'. + +t_handle_out_suback(_) -> + 'TODO'. + +t_handle_out_unsuback(_) -> + 'TODO'. + +t_handle_out_disconnect(_) -> + 'TODO'. + +t_handle_out_auth(_) -> + 'TODO'. + +%%-------------------------------------------------------------------- +%% Test cases for handle_timeout +%%-------------------------------------------------------------------- + +t_handle_timeout(_) -> + 'TODO'. + +%%-------------------------------------------------------------------- +%% Test cases for terminate +%%-------------------------------------------------------------------- + +t_terminate(_) -> + 'TODO'. + +%%-------------------------------------------------------------------- +%% Helper functions +%%-------------------------------------------------------------------- + +with_proto(Fun) -> + Fun(emqx_protocol:init(#{peername => {{127,0,0,1}, 3456}, + sockname => {{127,0,0,1}, 1883}, + conn_mod => emqx_channel}, + #{zone => ?MODULE})). + diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl new file mode 100644 index 000000000..78928c82f --- /dev/null +++ b/test/emqx_shared_sub_SUITE.erl @@ -0,0 +1,251 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_shared_sub_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(SUITE, ?MODULE). +-define(wait(For, Timeout), + emqx_ct_helpers:wait_for( + ?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). + +all() -> emqx_ct:all(?SUITE). + +init_per_suite(Config) -> + emqx_ct_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +t_random_basic(_) -> + ok = ensure_config(random), + ClientId = <<"ClientId">>, + {ok, ConnPid} = emqx_mock_client:start_link(ClientId), + {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal), + Message1 = emqx_message:make(<<"ClientId">>, 2, <<"foo">>, <<"hello">>), + emqx_session:subscribe(SPid, [{<<"foo">>, #{qos => 2, share => <<"group1">>}}]), + %% wait for the subscription to show up + ?wait(subscribed(<<"group1">>, <<"foo">>, SPid), 1000), + PacketId = 1, + emqx_session:publish(SPid, PacketId, Message1), + ?wait(case emqx_mock_client:get_last_message(ConnPid) of + [{publish, 1, _}] -> true; + Other -> Other + end, 1000), + emqx_session:pubrec(SPid, PacketId, reasoncode), + emqx_session:pubcomp(SPid, PacketId, reasoncode), + emqx_mock_client:close_session(ConnPid), + ok. + +%% Start two subscribers share subscribe to "$share/g1/foo/bar" +%% Set 'sticky' dispatch strategy, send 1st message to find +%% out which member it picked, then close its connection +%% send the second message, the message should be 'nack'ed +%% by the sticky session and delivered to the 2nd session. +%% After the connection for the 2nd session is also closed, +%% i.e. when all clients are offline, the following message(s) +%% should be delivered randomly. +t_no_connection_nack(_) -> + ok = ensure_config(sticky), + Publisher = <<"publisher">>, + Subscriber1 = <<"Subscriber1">>, + Subscriber2 = <<"Subscriber2">>, + QoS = 1, + Group = <<"g1">>, + Topic = <<"foo/bar">>, + {ok, PubConnPid} = emqx_mock_client:start_link(Publisher), + {ok, SubConnPid1} = emqx_mock_client:start_link(Subscriber1), + {ok, SubConnPid2} = emqx_mock_client:start_link(Subscriber2), + %% allow session to persist after connection shutdown + Attrs = #{expiry_interval => timer:seconds(30)}, + {ok, P_Pid} = emqx_mock_client:open_session(PubConnPid, Publisher, internal, Attrs), + {ok, SPid1} = emqx_mock_client:open_session(SubConnPid1, Subscriber1, internal, Attrs), + {ok, SPid2} = emqx_mock_client:open_session(SubConnPid2, Subscriber2, internal, Attrs), + emqx_session:subscribe(SPid1, [{Topic, #{qos => QoS, share => Group}}]), + emqx_session:subscribe(SPid2, [{Topic, #{qos => QoS, share => Group}}]), + %% wait for the subscriptions to show up + ?wait(subscribed(Group, Topic, SPid1), 1000), + ?wait(subscribed(Group, Topic, SPid2), 1000), + MkPayload = fun(PacketId) -> iolist_to_binary(["hello-", integer_to_list(PacketId)]) end, + SendF = fun(PacketId) -> emqx_session:publish(P_Pid, PacketId, emqx_message:make(Publisher, QoS, Topic, MkPayload(PacketId))) end, + SendF(1), + Ref = make_ref(), + CasePid = self(), + Received = + fun(PacketId, ConnPid) -> + Payload = MkPayload(PacketId), + case emqx_mock_client:get_last_message(ConnPid) of + [{publish, _, #message{payload = Payload}}] -> + CasePid ! {Ref, PacketId, ConnPid}, + true; + _Other -> + false + end + end, + ?wait(Received(1, SubConnPid1) orelse Received(1, SubConnPid2), 1000), + %% This is the connection which was picked by broker to dispatch (sticky) for 1st message + ConnPid = receive {Ref, 1, Pid} -> Pid after 1000 -> error(timeout) end, + %% Now kill the connection, expect all following messages to be delivered to the other subscriber. + emqx_mock_client:stop(ConnPid), + %% sleep then make synced calls to session processes to ensure that + %% the connection pid's 'EXIT' message is propagated to the session process + %% also to be sure sessions are still alive + timer:sleep(2), + _ = emqx_session:info(SPid1), + _ = emqx_session:info(SPid2), + %% Now we know what is the other still alive connection + [TheOtherConnPid] = [SubConnPid1, SubConnPid2] -- [ConnPid], + %% Send some more messages + PacketIdList = lists:seq(2, 10), + lists:foreach(fun(Id) -> + SendF(Id), + ?wait(Received(Id, TheOtherConnPid), 1000) + end, PacketIdList), + %% Now close the 2nd (last connection) + emqx_mock_client:stop(TheOtherConnPid), + timer:sleep(2), + %% both sessions should have conn_pid = undefined + ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))), + ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))), + %% send more messages, but all should be queued in session state + lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList), + {_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)), + {_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)), + ?assertEqual(length(PacketIdList), L1 + L2), + %% clean up + emqx_mock_client:close_session(PubConnPid), + emqx_sm:close_session(SPid1), + emqx_sm:close_session(SPid2), + ok. + +t_random(_) -> + test_two_messages(random). + +t_round_robin(_) -> + test_two_messages(round_robin). + +t_sticky(_) -> + test_two_messages(sticky). + +t_hash(_) -> + test_two_messages(hash, false). + +%% if the original subscriber dies, change to another one alive +t_not_so_sticky(_) -> + ok = ensure_config(sticky), + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + {ok, ConnPid1} = emqx_mock_client:start_link(ClientId1), + {ok, ConnPid2} = emqx_mock_client:start_link(ClientId2), + {ok, SPid1} = emqx_mock_client:open_session(ConnPid1, ClientId1, internal), + {ok, SPid2} = emqx_mock_client:open_session(ConnPid2, ClientId2, internal), + Message1 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 0, <<"foo/bar">>, <<"hello2">>), + emqx_session:subscribe(SPid1, [{<<"foo/bar">>, #{qos => 0, share => <<"group1">>}}]), + %% wait for the subscription to show up + ?wait(subscribed(<<"group1">>, <<"foo/bar">>, SPid1), 1000), + emqx_session:publish(SPid1, 1, Message1), + ?wait(case emqx_mock_client:get_last_message(ConnPid1) of + [{publish, _, #message{payload = <<"hello1">>}}] -> true; + Other -> Other + end, 1000), + emqx_mock_client:close_session(ConnPid1), + ?wait(not subscribed(<<"group1">>, <<"foo/bar">>, SPid1), 1000), + emqx_session:subscribe(SPid2, [{<<"foo/#">>, #{qos => 0, share => <<"group1">>}}]), + ?wait(subscribed(<<"group1">>, <<"foo/#">>, SPid2), 1000), + emqx_session:publish(SPid2, 2, Message2), + ?wait(case emqx_mock_client:get_last_message(ConnPid2) of + [{publish, _, #message{payload = <<"hello2">>}}] -> true; + Other -> Other + end, 1000), + emqx_mock_client:close_session(ConnPid2), + ?wait(not subscribed(<<"group1">>, <<"foo/#">>, SPid2), 1000), + ok. + +test_two_messages(Strategy) -> + test_two_messages(Strategy, _WithAck = true). + +test_two_messages(Strategy, WithAck) -> + ok = ensure_config(Strategy, WithAck), + Topic = <<"foo/bar">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + {ok, ConnPid1} = emqx_mock_client:start_link(ClientId1), + {ok, ConnPid2} = emqx_mock_client:start_link(ClientId2), + {ok, SPid1} = emqx_mock_client:open_session(ConnPid1, ClientId1, internal), + {ok, SPid2} = emqx_mock_client:open_session(ConnPid2, ClientId2, internal), + Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId1, 0, Topic, <<"hello2">>), + emqx_session:subscribe(SPid1, [{Topic, #{qos => 0, share => <<"group1">>}}]), + emqx_session:subscribe(SPid2, [{Topic, #{qos => 0, share => <<"group1">>}}]), + %% wait for the subscription to show up + ?wait(subscribed(<<"group1">>, Topic, SPid1) andalso + subscribed(<<"group1">>, Topic, SPid2), 1000), + emqx_broker:publish(Message1), + Me = self(), + WaitF = fun(ExpectedPayload) -> + case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of + {true, Pid} -> + Me ! {subscriber, Pid}, + true; + Other -> + Other + end + end, + ?wait(WaitF(<<"hello1">>), 2000), + UsedSubPid1 = receive {subscriber, P1} -> P1 end, + emqx_broker:publish(Message2), + ?wait(WaitF(<<"hello2">>), 2000), + UsedSubPid2 = receive {subscriber, P2} -> P2 end, + case Strategy of + sticky -> ?assert(UsedSubPid1 =:= UsedSubPid2); + round_robin -> ?assert(UsedSubPid1 =/= UsedSubPid2); + hash -> ?assert(UsedSubPid1 =:= UsedSubPid2); + _ -> ok + end, + emqx_mock_client:close_session(ConnPid1), + emqx_mock_client:close_session(ConnPid2), + ok. + +last_message(_ExpectedPayload, []) -> <<"not yet?">>; +last_message(ExpectedPayload, [Pid | Pids]) -> + case emqx_mock_client:get_last_message(Pid) of + [{publish, _, #message{payload = ExpectedPayload}}] -> {true, Pid}; + _Other -> last_message(ExpectedPayload, Pids) + end. + +%%-------------------------------------------------------------------- +%% help functions +%%-------------------------------------------------------------------- + +ensure_config(Strategy) -> + ensure_config(Strategy, _AckEnabled = true). + +ensure_config(Strategy, AckEnabled) -> + application:set_env(emqx, shared_subscription_strategy, Strategy), + application:set_env(emqx, shared_dispatch_ack_enabled, AckEnabled), + ok. + +subscribed(Group, Topic, Pid) -> + lists:member(Pid, emqx_shared_sub:subscribers(Group, Topic)). + diff --git a/test/emqx_ws_channel_SUITE.erl b/test/emqx_ws_channel_SUITE.erl new file mode 100644 index 000000000..7e08c125f --- /dev/null +++ b/test/emqx_ws_channel_SUITE.erl @@ -0,0 +1,57 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ws_channel_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + emqx_ct_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +t_basic(_) -> + Topic = <<"TopicA">>, + {ok, C} = emqtt:start_link([{port, 8083}]), + {ok, _} = emqtt:ws_connect(C), + {ok, _, [1]} = emqtt:subscribe(C, Topic, qos1), + {ok, _, [2]} = emqtt:subscribe(C, Topic, qos2), + {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2), + {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2), + {ok, _} = emqtt:publish(C, Topic, <<"qos 2">>, 2), + ?assertEqual(3, length(recv_msgs(3))), + ok = emqx_client:disconnect(C). + +recv_msgs(Count) -> + recv_msgs(Count, []). + +recv_msgs(0, Msgs) -> + Msgs; +recv_msgs(Count, Msgs) -> + receive + {publish, Msg} -> + recv_msgs(Count-1, [Msg|Msgs]) + after 100 -> + Msgs + end. +