From c5869b62dc964cbda97c677f6251883896e53fcb Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 4 Nov 2021 11:42:25 -0300 Subject: [PATCH] test(ws): add websocket tests --- test/emqx_broker_SUITE.erl | 122 +++++++++++++++++++++++++++---------- 1 file changed, 90 insertions(+), 32 deletions(-) diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 693f9b51e..4dc73701f 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -28,16 +28,58 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -all() -> emqx_ct:all(?MODULE). +all() -> + [ {group, all_cases} + , {group, connected_client_count_group} + ]. -init_per_suite(Config) -> +groups() -> + TCs = emqx_ct:all(?MODULE), + ConnClientTCs = [ t_connected_client_count_refresh + , t_connected_client_count_persistent + , t_connected_client_stats + ], + OtherTCs = TCs -- ConnClientTCs, + [ {all_cases, [], OtherTCs} + , {connected_client_count_group, [ {group, tcp} + , {group, ws} + ]} + , {tcp, [], ConnClientTCs} + , {ws, [], ConnClientTCs} + ]. + +init_per_group(connected_client_count_group, Config) -> + Config; +init_per_group(tcp, Config) -> + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([]), + [{conn_fun, connect} | Config]; +init_per_group(ws, Config) -> + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([]), + [ {ssl, false} + , {enable_websocket, true} + , {conn_fun, ws_connect} + , {port, 8083} + , {host, "localhost"} + | Config + ]; +init_per_group(_Group, Config) -> emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([]), Config. -end_per_suite(_Config) -> +end_per_group(connected_client_count_group, _Config) -> + ok; +end_per_group(_Group, _Config) -> emqx_ct_helpers:stop_apps([]). +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + init_per_testcase(Case, Config) -> ?MODULE:Case({init, Config}). @@ -285,10 +327,13 @@ t_connected_client_count_persistent({init, Config}) -> process_flag(trap_exit, true), Config; t_connected_client_count_persistent(Config) when is_list(Config) -> + ConnFun = ?config(conn_fun, Config), ?assertEqual(0, emqx_cm:get_connected_client_count()), - {ok, ConnPid0} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]), + {ok, ConnPid0} = emqtt:start_link([ {clean_start, false} + , {clientid, <<"clientid">>} + | Config]), {{ok, _}, _} = wait_for_events( - fun() -> emqtt:connect(ConnPid0) end, + fun() -> emqtt:ConnFun(ConnPid0) end, [emqx_cm_connected_client_count_inc] ), ?assertEqual(1, emqx_cm:get_connected_client_count()), @@ -298,16 +343,22 @@ t_connected_client_count_persistent(Config) when is_list(Config) -> ), ?assertEqual(0, emqx_cm:get_connected_client_count()), %% reconnecting - {ok, ConnPid1} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]), + {ok, ConnPid1} = emqtt:start_link([ {clean_start, false} + , {clientid, <<"clientid">>} + | Config + ]), {{ok, _}, _} = wait_for_events( - fun() -> emqtt:connect(ConnPid1) end, + fun() -> emqtt:ConnFun(ConnPid1) end, [emqx_cm_connected_client_count_inc] ), ?assertEqual(1, emqx_cm:get_connected_client_count()), %% taking over - {ok, ConnPid2} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]), + {ok, ConnPid2} = emqtt:start_link([ {clean_start, false} + , {clientid, <<"clientid">>} + | Config + ]), {{ok, _}, _} = wait_for_events( - fun() -> emqtt:connect(ConnPid2) end, + fun() -> emqtt:ConnFun(ConnPid2) end, [ emqx_cm_connected_client_count_dec , emqx_cm_connected_client_count_inc ] @@ -327,16 +378,20 @@ t_connected_client_count_persistent({'end', _Config}) -> t_connected_client_count_refresh({init, Config}) -> ok = snabbkaffe:start_trace(), - OldConfig = application:get_env(emqx, connected_client_count_refresh_period), + OldConfig = application:get_env(emqx, connected_client_count_refresh_period, undefined), application:set_env(emqx, connected_client_count_refresh_period, 100), ok = supervisor:terminate_child(emqx_cm_sup, manager), {ok, _} = supervisor:restart_child(emqx_cm_sup, manager), [{old_config, OldConfig} | Config]; t_connected_client_count_refresh(Config) when is_list(Config) -> + ConnFun = ?config(conn_fun, Config), ?assertEqual(0, emqx_cm:get_connected_client_count()), - {ok, ConnPid} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]), + {ok, ConnPid} = emqtt:start_link([ {clean_start, false} + , {clientid, <<"clientid">>} + | Config + ]), {{ok, _}, _} = wait_for_events( - fun() -> emqtt:connect(ConnPid) end, + fun() -> emqtt:ConnFun(ConnPid) end, [emqx_cm_connected_client_count_inc] ), %% simulate count mismatch @@ -349,14 +404,15 @@ t_connected_client_count_refresh(Config) when is_list(Config) -> 150 ), ?assertEqual(10, emqx_cm:get_connected_client_count()), + emqtt:disconnect(ConnPid), ok; t_connected_client_count_refresh({'end', Config}) -> OldConfig = proplists:get_value(old_config, Config), case OldConfig of - undefined -> - skip; + Val when is_integer(Val) -> + application:set_env(emqx, connected_client_count_refresh_period, OldConfig); _ -> - application:set_env(emqx, connected_client_count_refresh_period, OldConfig) + skip end, snabbkaffe:stop(), ets:delete_all_objects(emqx_channel_info), @@ -365,15 +421,21 @@ t_connected_client_count_refresh({'end', Config}) -> ok. t_connected_client_stats({init, Config}) -> + ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats), + {ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats), ok = snabbkaffe:start_trace(), Config; t_connected_client_stats(Config) when is_list(Config) -> + ConnFun = ?config(conn_fun, Config), ?assertEqual(0, emqx_cm:get_connected_client_count()), ?assertEqual(0, emqx_stats:getstat('live_connections.count')), ?assertEqual(0, emqx_stats:getstat('live_connections.max')), - {ok, ConnPid} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]), + {ok, ConnPid} = emqtt:start_link([ {clean_start, true} + , {clientid, <<"clientid">>} + | Config + ]), {{ok, _}, _} = wait_for_events( - fun() -> emqtt:connect(ConnPid) end, + fun() -> emqtt:ConnFun(ConnPid) end, [emqx_cm_connected_client_count_inc] ), %% ensure stats are synchronized @@ -384,7 +446,10 @@ t_connected_client_stats(Config) when is_list(Config) -> ), ?assertEqual(1, emqx_stats:getstat('live_connections.count')), ?assertEqual(1, emqx_stats:getstat('live_connections.max')), - ok = emqtt:disconnect(ConnPid), + {ok, _} = wait_for_events( + fun() -> emqtt:disconnect(ConnPid) end, + [emqx_cm_connected_client_count_dec] + ), %% ensure stats are synchronized wait_for_stats( fun emqx_cm:stats_fun/0, @@ -396,6 +461,8 @@ t_connected_client_stats(Config) when is_list(Config) -> ok; t_connected_client_stats({'end', _Config}) -> ok = snabbkaffe:stop(), + ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats), + {ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats), ok. wait_for_events(Action, Kinds) -> @@ -439,21 +506,12 @@ insert_fake_channels() -> Tab = emqx_channel_info, Key = ets:first(Tab), [{_Chan, ChanInfo = #{conn_state := connected}, Stats}] = ets:lookup(Tab, Key), - lists:foreach( - fun(N) -> - ClientID = "fake" ++ integer_to_list(N), - ets:insert(Tab, {{ClientID, undefined}, ChanInfo, Stats}) - end, - lists:seq(1, 9) - ), + ets:insert(Tab, [ {{"fake" ++ integer_to_list(N), undefined}, ChanInfo, Stats} + || N <- lists:seq(1, 9)]), %% these should not be counted - lists:foreach( - fun(N) -> - ClientID = "fake" ++ integer_to_list(N), - ets:insert(Tab, {{ClientID, undefined}, ChanInfo#{conn_state := disconnected}, Stats}) - end, - lists:seq(10, 20) - ). + ets:insert(Tab, [ { {"fake" ++ integer_to_list(N), undefined} + , ChanInfo#{conn_state := disconnected}, Stats} + || N <- lists:seq(10, 20)]). recv_msgs(Count) -> recv_msgs(Count, []).