test(ws): add websocket tests
This commit is contained in:
parent
8f853982a6
commit
c5869b62dc
|
@ -28,16 +28,58 @@
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
|
|
||||||
all() -> emqx_ct:all(?MODULE).
|
all() ->
|
||||||
|
[ {group, all_cases}
|
||||||
|
, {group, connected_client_count_group}
|
||||||
|
].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
groups() ->
|
||||||
|
TCs = emqx_ct:all(?MODULE),
|
||||||
|
ConnClientTCs = [ t_connected_client_count_refresh
|
||||||
|
, t_connected_client_count_persistent
|
||||||
|
, t_connected_client_stats
|
||||||
|
],
|
||||||
|
OtherTCs = TCs -- ConnClientTCs,
|
||||||
|
[ {all_cases, [], OtherTCs}
|
||||||
|
, {connected_client_count_group, [ {group, tcp}
|
||||||
|
, {group, ws}
|
||||||
|
]}
|
||||||
|
, {tcp, [], ConnClientTCs}
|
||||||
|
, {ws, [], ConnClientTCs}
|
||||||
|
].
|
||||||
|
|
||||||
|
init_per_group(connected_client_count_group, Config) ->
|
||||||
|
Config;
|
||||||
|
init_per_group(tcp, Config) ->
|
||||||
|
emqx_ct_helpers:boot_modules(all),
|
||||||
|
emqx_ct_helpers:start_apps([]),
|
||||||
|
[{conn_fun, connect} | Config];
|
||||||
|
init_per_group(ws, Config) ->
|
||||||
|
emqx_ct_helpers:boot_modules(all),
|
||||||
|
emqx_ct_helpers:start_apps([]),
|
||||||
|
[ {ssl, false}
|
||||||
|
, {enable_websocket, true}
|
||||||
|
, {conn_fun, ws_connect}
|
||||||
|
, {port, 8083}
|
||||||
|
, {host, "localhost"}
|
||||||
|
| Config
|
||||||
|
];
|
||||||
|
init_per_group(_Group, Config) ->
|
||||||
emqx_ct_helpers:boot_modules(all),
|
emqx_ct_helpers:boot_modules(all),
|
||||||
emqx_ct_helpers:start_apps([]),
|
emqx_ct_helpers:start_apps([]),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_group(connected_client_count_group, _Config) ->
|
||||||
|
ok;
|
||||||
|
end_per_group(_Group, _Config) ->
|
||||||
emqx_ct_helpers:stop_apps([]).
|
emqx_ct_helpers:stop_apps([]).
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_suite(_Config) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
init_per_testcase(Case, Config) ->
|
init_per_testcase(Case, Config) ->
|
||||||
?MODULE:Case({init, Config}).
|
?MODULE:Case({init, Config}).
|
||||||
|
|
||||||
|
@ -285,10 +327,13 @@ t_connected_client_count_persistent({init, Config}) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
Config;
|
Config;
|
||||||
t_connected_client_count_persistent(Config) when is_list(Config) ->
|
t_connected_client_count_persistent(Config) when is_list(Config) ->
|
||||||
|
ConnFun = ?config(conn_fun, Config),
|
||||||
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
||||||
{ok, ConnPid0} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]),
|
{ok, ConnPid0} = emqtt:start_link([ {clean_start, false}
|
||||||
|
, {clientid, <<"clientid">>}
|
||||||
|
| Config]),
|
||||||
{{ok, _}, _} = wait_for_events(
|
{{ok, _}, _} = wait_for_events(
|
||||||
fun() -> emqtt:connect(ConnPid0) end,
|
fun() -> emqtt:ConnFun(ConnPid0) end,
|
||||||
[emqx_cm_connected_client_count_inc]
|
[emqx_cm_connected_client_count_inc]
|
||||||
),
|
),
|
||||||
?assertEqual(1, emqx_cm:get_connected_client_count()),
|
?assertEqual(1, emqx_cm:get_connected_client_count()),
|
||||||
|
@ -298,16 +343,22 @@ t_connected_client_count_persistent(Config) when is_list(Config) ->
|
||||||
),
|
),
|
||||||
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
||||||
%% reconnecting
|
%% reconnecting
|
||||||
{ok, ConnPid1} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]),
|
{ok, ConnPid1} = emqtt:start_link([ {clean_start, false}
|
||||||
|
, {clientid, <<"clientid">>}
|
||||||
|
| Config
|
||||||
|
]),
|
||||||
{{ok, _}, _} = wait_for_events(
|
{{ok, _}, _} = wait_for_events(
|
||||||
fun() -> emqtt:connect(ConnPid1) end,
|
fun() -> emqtt:ConnFun(ConnPid1) end,
|
||||||
[emqx_cm_connected_client_count_inc]
|
[emqx_cm_connected_client_count_inc]
|
||||||
),
|
),
|
||||||
?assertEqual(1, emqx_cm:get_connected_client_count()),
|
?assertEqual(1, emqx_cm:get_connected_client_count()),
|
||||||
%% taking over
|
%% taking over
|
||||||
{ok, ConnPid2} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]),
|
{ok, ConnPid2} = emqtt:start_link([ {clean_start, false}
|
||||||
|
, {clientid, <<"clientid">>}
|
||||||
|
| Config
|
||||||
|
]),
|
||||||
{{ok, _}, _} = wait_for_events(
|
{{ok, _}, _} = wait_for_events(
|
||||||
fun() -> emqtt:connect(ConnPid2) end,
|
fun() -> emqtt:ConnFun(ConnPid2) end,
|
||||||
[ emqx_cm_connected_client_count_dec
|
[ emqx_cm_connected_client_count_dec
|
||||||
, emqx_cm_connected_client_count_inc
|
, emqx_cm_connected_client_count_inc
|
||||||
]
|
]
|
||||||
|
@ -327,16 +378,20 @@ t_connected_client_count_persistent({'end', _Config}) ->
|
||||||
|
|
||||||
t_connected_client_count_refresh({init, Config}) ->
|
t_connected_client_count_refresh({init, Config}) ->
|
||||||
ok = snabbkaffe:start_trace(),
|
ok = snabbkaffe:start_trace(),
|
||||||
OldConfig = application:get_env(emqx, connected_client_count_refresh_period),
|
OldConfig = application:get_env(emqx, connected_client_count_refresh_period, undefined),
|
||||||
application:set_env(emqx, connected_client_count_refresh_period, 100),
|
application:set_env(emqx, connected_client_count_refresh_period, 100),
|
||||||
ok = supervisor:terminate_child(emqx_cm_sup, manager),
|
ok = supervisor:terminate_child(emqx_cm_sup, manager),
|
||||||
{ok, _} = supervisor:restart_child(emqx_cm_sup, manager),
|
{ok, _} = supervisor:restart_child(emqx_cm_sup, manager),
|
||||||
[{old_config, OldConfig} | Config];
|
[{old_config, OldConfig} | Config];
|
||||||
t_connected_client_count_refresh(Config) when is_list(Config) ->
|
t_connected_client_count_refresh(Config) when is_list(Config) ->
|
||||||
|
ConnFun = ?config(conn_fun, Config),
|
||||||
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
||||||
{ok, ConnPid} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]),
|
{ok, ConnPid} = emqtt:start_link([ {clean_start, false}
|
||||||
|
, {clientid, <<"clientid">>}
|
||||||
|
| Config
|
||||||
|
]),
|
||||||
{{ok, _}, _} = wait_for_events(
|
{{ok, _}, _} = wait_for_events(
|
||||||
fun() -> emqtt:connect(ConnPid) end,
|
fun() -> emqtt:ConnFun(ConnPid) end,
|
||||||
[emqx_cm_connected_client_count_inc]
|
[emqx_cm_connected_client_count_inc]
|
||||||
),
|
),
|
||||||
%% simulate count mismatch
|
%% simulate count mismatch
|
||||||
|
@ -349,14 +404,15 @@ t_connected_client_count_refresh(Config) when is_list(Config) ->
|
||||||
150
|
150
|
||||||
),
|
),
|
||||||
?assertEqual(10, emqx_cm:get_connected_client_count()),
|
?assertEqual(10, emqx_cm:get_connected_client_count()),
|
||||||
|
emqtt:disconnect(ConnPid),
|
||||||
ok;
|
ok;
|
||||||
t_connected_client_count_refresh({'end', Config}) ->
|
t_connected_client_count_refresh({'end', Config}) ->
|
||||||
OldConfig = proplists:get_value(old_config, Config),
|
OldConfig = proplists:get_value(old_config, Config),
|
||||||
case OldConfig of
|
case OldConfig of
|
||||||
undefined ->
|
Val when is_integer(Val) ->
|
||||||
skip;
|
application:set_env(emqx, connected_client_count_refresh_period, OldConfig);
|
||||||
_ ->
|
_ ->
|
||||||
application:set_env(emqx, connected_client_count_refresh_period, OldConfig)
|
skip
|
||||||
end,
|
end,
|
||||||
snabbkaffe:stop(),
|
snabbkaffe:stop(),
|
||||||
ets:delete_all_objects(emqx_channel_info),
|
ets:delete_all_objects(emqx_channel_info),
|
||||||
|
@ -365,15 +421,21 @@ t_connected_client_count_refresh({'end', Config}) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_connected_client_stats({init, Config}) ->
|
t_connected_client_stats({init, Config}) ->
|
||||||
|
ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats),
|
||||||
|
{ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),
|
||||||
ok = snabbkaffe:start_trace(),
|
ok = snabbkaffe:start_trace(),
|
||||||
Config;
|
Config;
|
||||||
t_connected_client_stats(Config) when is_list(Config) ->
|
t_connected_client_stats(Config) when is_list(Config) ->
|
||||||
|
ConnFun = ?config(conn_fun, Config),
|
||||||
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
||||||
?assertEqual(0, emqx_stats:getstat('live_connections.count')),
|
?assertEqual(0, emqx_stats:getstat('live_connections.count')),
|
||||||
?assertEqual(0, emqx_stats:getstat('live_connections.max')),
|
?assertEqual(0, emqx_stats:getstat('live_connections.max')),
|
||||||
{ok, ConnPid} = emqtt:start_link([{clean_start, false}, {clientid, <<"clientid">>}]),
|
{ok, ConnPid} = emqtt:start_link([ {clean_start, true}
|
||||||
|
, {clientid, <<"clientid">>}
|
||||||
|
| Config
|
||||||
|
]),
|
||||||
{{ok, _}, _} = wait_for_events(
|
{{ok, _}, _} = wait_for_events(
|
||||||
fun() -> emqtt:connect(ConnPid) end,
|
fun() -> emqtt:ConnFun(ConnPid) end,
|
||||||
[emqx_cm_connected_client_count_inc]
|
[emqx_cm_connected_client_count_inc]
|
||||||
),
|
),
|
||||||
%% ensure stats are synchronized
|
%% ensure stats are synchronized
|
||||||
|
@ -384,7 +446,10 @@ t_connected_client_stats(Config) when is_list(Config) ->
|
||||||
),
|
),
|
||||||
?assertEqual(1, emqx_stats:getstat('live_connections.count')),
|
?assertEqual(1, emqx_stats:getstat('live_connections.count')),
|
||||||
?assertEqual(1, emqx_stats:getstat('live_connections.max')),
|
?assertEqual(1, emqx_stats:getstat('live_connections.max')),
|
||||||
ok = emqtt:disconnect(ConnPid),
|
{ok, _} = wait_for_events(
|
||||||
|
fun() -> emqtt:disconnect(ConnPid) end,
|
||||||
|
[emqx_cm_connected_client_count_dec]
|
||||||
|
),
|
||||||
%% ensure stats are synchronized
|
%% ensure stats are synchronized
|
||||||
wait_for_stats(
|
wait_for_stats(
|
||||||
fun emqx_cm:stats_fun/0,
|
fun emqx_cm:stats_fun/0,
|
||||||
|
@ -396,6 +461,8 @@ t_connected_client_stats(Config) when is_list(Config) ->
|
||||||
ok;
|
ok;
|
||||||
t_connected_client_stats({'end', _Config}) ->
|
t_connected_client_stats({'end', _Config}) ->
|
||||||
ok = snabbkaffe:stop(),
|
ok = snabbkaffe:stop(),
|
||||||
|
ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats),
|
||||||
|
{ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
wait_for_events(Action, Kinds) ->
|
wait_for_events(Action, Kinds) ->
|
||||||
|
@ -439,21 +506,12 @@ insert_fake_channels() ->
|
||||||
Tab = emqx_channel_info,
|
Tab = emqx_channel_info,
|
||||||
Key = ets:first(Tab),
|
Key = ets:first(Tab),
|
||||||
[{_Chan, ChanInfo = #{conn_state := connected}, Stats}] = ets:lookup(Tab, Key),
|
[{_Chan, ChanInfo = #{conn_state := connected}, Stats}] = ets:lookup(Tab, Key),
|
||||||
lists:foreach(
|
ets:insert(Tab, [ {{"fake" ++ integer_to_list(N), undefined}, ChanInfo, Stats}
|
||||||
fun(N) ->
|
|| N <- lists:seq(1, 9)]),
|
||||||
ClientID = "fake" ++ integer_to_list(N),
|
|
||||||
ets:insert(Tab, {{ClientID, undefined}, ChanInfo, Stats})
|
|
||||||
end,
|
|
||||||
lists:seq(1, 9)
|
|
||||||
),
|
|
||||||
%% these should not be counted
|
%% these should not be counted
|
||||||
lists:foreach(
|
ets:insert(Tab, [ { {"fake" ++ integer_to_list(N), undefined}
|
||||||
fun(N) ->
|
, ChanInfo#{conn_state := disconnected}, Stats}
|
||||||
ClientID = "fake" ++ integer_to_list(N),
|
|| N <- lists:seq(10, 20)]).
|
||||||
ets:insert(Tab, {{ClientID, undefined}, ChanInfo#{conn_state := disconnected}, Stats})
|
|
||||||
end,
|
|
||||||
lists:seq(10, 20)
|
|
||||||
).
|
|
||||||
|
|
||||||
recv_msgs(Count) ->
|
recv_msgs(Count) ->
|
||||||
recv_msgs(Count, []).
|
recv_msgs(Count, []).
|
||||||
|
|
Loading…
Reference in New Issue