diff --git a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl index 93305727e..e13d63f45 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl @@ -21,24 +21,53 @@ -import(emqx_dashboard_SUITE, [auth_header_/0]). --include_lib("eunit/include/eunit.hrl"). -include("emqx_dashboard.hrl"). +-include_lib("eunit/include/eunit.hrl"). -define(SERVER, "http://127.0.0.1:18083"). -define(BASE_PATH, "/api/v5"). +-define(BASE_RETAINER_CONF, << + "retainer {\n" + " enable = true\n" + " msg_clear_interval = 0s\n" + " msg_expiry_interval = 0s\n" + " max_payload_size = 1MB\n" + " flow_control {\n" + " batch_read_number = 0\n" + " batch_deliver_number = 0\n" + " }\n" + " backend {\n" + " type = built_in_database\n" + " storage_type = ram\n" + " max_retained_messages = 0\n" + " }\n" + "}" +>>). + +%%-------------------------------------------------------------------- +%% CT boilerplate +%%-------------------------------------------------------------------- + all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - meck:new(emqx_retainer, [non_strict, passthrough, no_history, no_link]), - meck:expect(emqx_retainer, retained_count, fun() -> 0 end), - emqx_mgmt_api_test_util:init_suite([]), + ok = emqx_mgmt_api_test_util:init_suite([emqx, emqx_conf, emqx_retainer]), Config. end_per_suite(_Config) -> - meck:unload([emqx_retainer]), - emqx_mgmt_api_test_util:end_suite([]). + emqx_mgmt_api_test_util:end_suite([emqx_retainer]). + +set_special_configs(emqx_retainer) -> + emqx_retainer:update_config(?BASE_RETAINER_CONF), + ok; +set_special_configs(_App) -> + ok. + +%%-------------------------------------------------------------------- +%% Test Cases +%%-------------------------------------------------------------------- t_monitor_samplers_all(_Config) -> timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20), @@ -112,6 +141,65 @@ t_monitor_current_api_live_connections(_) -> {ok, _} = emqtt:connect(C2), ok = emqtt:disconnect(C2). +t_monitor_current_retained_count(_) -> + process_flag(trap_exit, true), + ClientId = <<"live_conn_tests">>, + {ok, C} = emqtt:start_link([{clean_start, false}, {clientid, ClientId}]), + {ok, _} = emqtt:connect(C), + _ = emqtt:publish(C, <<"t1">>, <<"qos1-retain">>, [{qos, 1}, {retain, true}]), + + ok = waiting_emqx_stats_and_monitor_update('retained.count'), + {ok, Res} = request(["monitor_current"]), + {ok, ResNode} = request(["monitor_current", "nodes", node()]), + + ?assertEqual(1, maps:get(<<"retained_msg_count">>, Res)), + ?assertEqual(1, maps:get(<<"retained_msg_count">>, ResNode)), + ok = emqtt:disconnect(C), + ok. + +t_monitor_current_shared_subscription(_) -> + process_flag(trap_exit, true), + ShareT = <<"$share/group1/t/1">>, + AssertFun = fun(Num) -> + {ok, Res} = request(["monitor_current"]), + {ok, ResNode} = request(["monitor_current", "nodes", node()]), + ?assertEqual(Num, maps:get(<<"shared_subscriptions">>, Res)), + ?assertEqual(Num, maps:get(<<"shared_subscriptions">>, ResNode)), + ok + end, + + ok = AssertFun(0), + + ClientId1 = <<"live_conn_tests1">>, + ClientId2 = <<"live_conn_tests2">>, + {ok, C1} = emqtt:start_link([{clean_start, false}, {clientid, ClientId1}]), + {ok, _} = emqtt:connect(C1), + _ = emqtt:subscribe(C1, {ShareT, 1}), + + ok = AssertFun(1), + + {ok, C2} = emqtt:start_link([{clean_start, true}, {clientid, ClientId2}]), + {ok, _} = emqtt:connect(C2), + _ = emqtt:subscribe(C2, {ShareT, 1}), + ok = AssertFun(2), + + _ = emqtt:unsubscribe(C2, ShareT), + ok = AssertFun(1), + _ = emqtt:subscribe(C2, {ShareT, 1}), + ok = AssertFun(2), + + ok = emqtt:disconnect(C1), + %% C1: clean_start = false, proto_ver = 3.1.1 + %% means disconnected but the session pid with a share-subscription is still alive + ok = AssertFun(2), + + _ = emqx_cm:kick_session(ClientId1), + ok = AssertFun(1), + + ok = emqtt:disconnect(C2), + ok = AssertFun(0), + ok. + t_monitor_reset(_) -> restart_monitor(), {ok, Rate} = request(["monitor_current"]),