test: fields in `/monitor_current`
- `retained_msg_count` - `shared_subscriptions`
This commit is contained in:
parent
02b3292025
commit
55a8488a4f
|
@ -21,24 +21,53 @@
|
||||||
|
|
||||||
-import(emqx_dashboard_SUITE, [auth_header_/0]).
|
-import(emqx_dashboard_SUITE, [auth_header_/0]).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
|
||||||
-include("emqx_dashboard.hrl").
|
-include("emqx_dashboard.hrl").
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
-define(SERVER, "http://127.0.0.1:18083").
|
-define(SERVER, "http://127.0.0.1:18083").
|
||||||
-define(BASE_PATH, "/api/v5").
|
-define(BASE_PATH, "/api/v5").
|
||||||
|
|
||||||
|
-define(BASE_RETAINER_CONF, <<
|
||||||
|
"retainer {\n"
|
||||||
|
" enable = true\n"
|
||||||
|
" msg_clear_interval = 0s\n"
|
||||||
|
" msg_expiry_interval = 0s\n"
|
||||||
|
" max_payload_size = 1MB\n"
|
||||||
|
" flow_control {\n"
|
||||||
|
" batch_read_number = 0\n"
|
||||||
|
" batch_deliver_number = 0\n"
|
||||||
|
" }\n"
|
||||||
|
" backend {\n"
|
||||||
|
" type = built_in_database\n"
|
||||||
|
" storage_type = ram\n"
|
||||||
|
" max_retained_messages = 0\n"
|
||||||
|
" }\n"
|
||||||
|
"}"
|
||||||
|
>>).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% CT boilerplate
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
meck:new(emqx_retainer, [non_strict, passthrough, no_history, no_link]),
|
ok = emqx_mgmt_api_test_util:init_suite([emqx, emqx_conf, emqx_retainer]),
|
||||||
meck:expect(emqx_retainer, retained_count, fun() -> 0 end),
|
|
||||||
emqx_mgmt_api_test_util:init_suite([]),
|
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
meck:unload([emqx_retainer]),
|
emqx_mgmt_api_test_util:end_suite([emqx_retainer]).
|
||||||
emqx_mgmt_api_test_util:end_suite([]).
|
|
||||||
|
set_special_configs(emqx_retainer) ->
|
||||||
|
emqx_retainer:update_config(?BASE_RETAINER_CONF),
|
||||||
|
ok;
|
||||||
|
set_special_configs(_App) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Test Cases
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
t_monitor_samplers_all(_Config) ->
|
t_monitor_samplers_all(_Config) ->
|
||||||
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
|
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
|
||||||
|
@ -112,6 +141,65 @@ t_monitor_current_api_live_connections(_) ->
|
||||||
{ok, _} = emqtt:connect(C2),
|
{ok, _} = emqtt:connect(C2),
|
||||||
ok = emqtt:disconnect(C2).
|
ok = emqtt:disconnect(C2).
|
||||||
|
|
||||||
|
t_monitor_current_retained_count(_) ->
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
ClientId = <<"live_conn_tests">>,
|
||||||
|
{ok, C} = emqtt:start_link([{clean_start, false}, {clientid, ClientId}]),
|
||||||
|
{ok, _} = emqtt:connect(C),
|
||||||
|
_ = emqtt:publish(C, <<"t1">>, <<"qos1-retain">>, [{qos, 1}, {retain, true}]),
|
||||||
|
|
||||||
|
ok = waiting_emqx_stats_and_monitor_update('retained.count'),
|
||||||
|
{ok, Res} = request(["monitor_current"]),
|
||||||
|
{ok, ResNode} = request(["monitor_current", "nodes", node()]),
|
||||||
|
|
||||||
|
?assertEqual(1, maps:get(<<"retained_msg_count">>, Res)),
|
||||||
|
?assertEqual(1, maps:get(<<"retained_msg_count">>, ResNode)),
|
||||||
|
ok = emqtt:disconnect(C),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_monitor_current_shared_subscription(_) ->
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
ShareT = <<"$share/group1/t/1">>,
|
||||||
|
AssertFun = fun(Num) ->
|
||||||
|
{ok, Res} = request(["monitor_current"]),
|
||||||
|
{ok, ResNode} = request(["monitor_current", "nodes", node()]),
|
||||||
|
?assertEqual(Num, maps:get(<<"shared_subscriptions">>, Res)),
|
||||||
|
?assertEqual(Num, maps:get(<<"shared_subscriptions">>, ResNode)),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
|
||||||
|
ok = AssertFun(0),
|
||||||
|
|
||||||
|
ClientId1 = <<"live_conn_tests1">>,
|
||||||
|
ClientId2 = <<"live_conn_tests2">>,
|
||||||
|
{ok, C1} = emqtt:start_link([{clean_start, false}, {clientid, ClientId1}]),
|
||||||
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
_ = emqtt:subscribe(C1, {ShareT, 1}),
|
||||||
|
|
||||||
|
ok = AssertFun(1),
|
||||||
|
|
||||||
|
{ok, C2} = emqtt:start_link([{clean_start, true}, {clientid, ClientId2}]),
|
||||||
|
{ok, _} = emqtt:connect(C2),
|
||||||
|
_ = emqtt:subscribe(C2, {ShareT, 1}),
|
||||||
|
ok = AssertFun(2),
|
||||||
|
|
||||||
|
_ = emqtt:unsubscribe(C2, ShareT),
|
||||||
|
ok = AssertFun(1),
|
||||||
|
_ = emqtt:subscribe(C2, {ShareT, 1}),
|
||||||
|
ok = AssertFun(2),
|
||||||
|
|
||||||
|
ok = emqtt:disconnect(C1),
|
||||||
|
%% C1: clean_start = false, proto_ver = 3.1.1
|
||||||
|
%% means disconnected but the session pid with a share-subscription is still alive
|
||||||
|
ok = AssertFun(2),
|
||||||
|
|
||||||
|
_ = emqx_cm:kick_session(ClientId1),
|
||||||
|
ok = AssertFun(1),
|
||||||
|
|
||||||
|
ok = emqtt:disconnect(C2),
|
||||||
|
ok = AssertFun(0),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_monitor_reset(_) ->
|
t_monitor_reset(_) ->
|
||||||
restart_monitor(),
|
restart_monitor(),
|
||||||
{ok, Rate} = request(["monitor_current"]),
|
{ok, Rate} = request(["monitor_current"]),
|
||||||
|
|
Loading…
Reference in New Issue