diff --git a/apps/emqx/test/emqx_cth_cluster.erl b/apps/emqx/test/emqx_cth_cluster.erl index 28ee1f30f..f3c5d97f9 100644 --- a/apps/emqx/test/emqx_cth_cluster.erl +++ b/apps/emqx/test/emqx_cth_cluster.erl @@ -109,6 +109,7 @@ start(Nodes, ClusterOpts) -> start(NodeSpecs). start(NodeSpecs) -> + emqx_common_test_helpers:clear_screen(), ct:pal("(Re)starting nodes:\n ~p", [NodeSpecs]), % 1. Start bare nodes with only basic applications running ok = start_nodes_init(NodeSpecs, ?TIMEOUT_NODE_START_MS), diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index 8d6a1dbc9..a11d875f3 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -119,7 +119,7 @@ current_rate(all) -> current_rate(Node) when Node == node() -> try {ok, Rate} = do_call(current_rate), - {ok, Rate} + {ok, adjust_individual_node_metrics(Rate)} catch _E:R -> ?SLOG(warning, #{msg => "dashboard_monitor_error", reason => R}), @@ -156,8 +156,8 @@ current_rate_cluster() -> case lists:foldl(Fun, #{}, mria:cluster_nodes(running)) of {badrpc, Reason} -> {badrpc, Reason}; - Rate -> - {ok, Rate} + Metrics -> + {ok, adjust_synthetic_cluster_metrics(Metrics)} end. %% ------------------------------------------------------------------------------------------------- @@ -281,22 +281,23 @@ merge_cluster_rate(Node, Cluster) -> ClusterValue = maps:get(Key, NCluster, 0), NCluster#{Key => Value + ClusterValue} end, - Metrics = maps:fold(Fun, Cluster, Node), - adjust_synthetic_cluster_metrics(Metrics). + maps:fold(Fun, Cluster, Node). + +adjust_individual_node_metrics(Metrics0) -> + %% ensure renamed + emqx_utils_maps:rename(durable_subscriptions, subscriptions_durable, Metrics0). adjust_synthetic_cluster_metrics(Metrics0) -> - %% ensure renamed - Metrics1 = emqx_utils_maps:rename(durable_subscriptions, subscriptions_durable, Metrics0), - DSSubs = maps:get(subscriptions_durable, Metrics1, 0), - RamSubs = maps:get(subscriptions, Metrics1, 0), - DisconnectedDSs = maps:get(disconnected_durable_sessions, Metrics1, 0), - Metrics2 = maps:update_with( + DSSubs = maps:get(subscriptions_durable, Metrics0, 0), + RamSubs = maps:get(subscriptions, Metrics0, 0), + DisconnectedDSs = maps:get(disconnected_durable_sessions, Metrics0, 0), + Metrics1 = maps:update_with( subscriptions, fun(Subs) -> Subs + DSSubs end, 0, - Metrics1 + Metrics0 ), - Metrics = maps:put(subscriptions_ram, RamSubs, Metrics2), + Metrics = maps:put(subscriptions_ram, RamSubs, Metrics1), maps:update_with( connections, fun(RamConns) -> RamConns + DisconnectedDSs end, diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index 3d9536299..2560f8f2a 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -253,7 +253,7 @@ maybe_reject_cluster_only_metrics(<<"all">>, Rates) -> Rates; maybe_reject_cluster_only_metrics(_Node, Rates) -> ClusterOnlyMetrics = [ - durable_subscriptions, + subscriptions_durable, disconnected_durable_sessions ], maps:without(ClusterOnlyMetrics, Rates). diff --git a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl index 8286810db..edf7c9bbf 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl @@ -49,6 +49,8 @@ "}" >>). +-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)). + %%-------------------------------------------------------------------- %% CT boilerplate %%-------------------------------------------------------------------- @@ -79,21 +81,37 @@ end_per_suite(_Config) -> ok. init_per_group(persistent_sessions = Group, Config) -> - Apps = emqx_cth_suite:start( + AppSpecsFn = fun(Enable) -> + Port = + case Enable of + true -> "18083"; + false -> "0" + end, [ emqx_conf, {emqx, "durable_sessions {enable = true}"}, {emqx_retainer, ?BASE_RETAINER_CONF}, emqx_management, emqx_mgmt_api_test_util:emqx_dashboard( - "dashboard.listeners.http { enable = true, bind = 18083 }\n" - "dashboard.sample_interval = 1s" + lists:concat([ + "dashboard.listeners.http { bind = " ++ Port ++ " }\n", + "dashboard.sample_interval = 1s\n", + "dashboard.listeners.http.enable = " ++ atom_to_list(Enable) + ]) ) - ], - #{work_dir => emqx_cth_suite:work_dir(Group, Config)} - ), - {ok, _} = emqx_common_test_http:create_default_app(), - [{apps, Apps} | Config]; + ] + end, + NodeSpecs = [ + {dashboard_monitor1, #{apps => AppSpecsFn(true)}}, + {dashboard_monitor2, #{apps => AppSpecsFn(false)}} + ], + Nodes = + [N1 | _] = emqx_cth_cluster:start( + NodeSpecs, + #{work_dir => emqx_cth_suite:work_dir(Group, Config)} + ), + ?ON(N1, {ok, _} = emqx_common_test_http:create_default_app()), + [{cluster, Nodes} | Config]; init_per_group(common = Group, Config) -> Apps = emqx_cth_suite:start( [ @@ -111,7 +129,11 @@ init_per_group(common = Group, Config) -> {ok, _} = emqx_common_test_http:create_default_app(), [{apps, Apps} | Config]. -end_per_group(_Group, Config) -> +end_per_group(persistent_sessions, Config) -> + Cluster = ?config(cluster, Config), + emqx_cth_cluster:stop(Cluster), + ok; +end_per_group(common, Config) -> Apps = ?config(apps, Config), emqx_cth_suite:stop(Apps), ok. @@ -325,26 +347,36 @@ t_monitor_api_error(_) -> ok. %% Verifies that subscriptions from persistent sessions are correctly accounted for. -t_persistent_session_stats(_Config) -> +t_persistent_session_stats(Config) -> + [N1, N2 | _] = ?config(cluster, Config), %% pre-condition - true = emqx_persistent_message:is_persistence_enabled(), + true = ?ON(N1, emqx_persistent_message:is_persistence_enabled()), + Port1 = get_mqtt_port(N1, tcp), + Port2 = get_mqtt_port(N2, tcp), NonPSClient = start_and_connect(#{ + port => Port1, clientid => <<"non-ps">>, expiry_interval => 0 }), - PSClient = start_and_connect(#{ - clientid => <<"ps">>, + PSClient1 = start_and_connect(#{ + port => Port1, + clientid => <<"ps1">>, + expiry_interval => 30 + }), + PSClient2 = start_and_connect(#{ + port => Port2, + clientid => <<"ps2">>, expiry_interval => 30 }), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"non/ps/topic/+">>, 2), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"non/ps/topic">>, 2), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"common/topic/+">>, 2), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"common/topic">>, 2), - {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"ps/topic/+">>, 2), - {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"ps/topic">>, 2), - {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"common/topic/+">>, 2), - {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"common/topic">>, 2), + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient1, <<"ps/topic/+">>, 2), + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient1, <<"ps/topic">>, 2), + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient1, <<"common/topic/+">>, 2), + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient1, <<"common/topic">>, 2), {ok, _} = snabbkaffe:block_until( ?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}), @@ -353,7 +385,7 @@ t_persistent_session_stats(_Config) -> ?retry(1_000, 10, begin ?assertMatch( {ok, #{ - <<"connections">> := 2, + <<"connections">> := 3, <<"disconnected_durable_sessions">> := 0, %% N.B.: we currently don't perform any deduplication between persistent %% and non-persistent routes, so we count `commont/topic' twice and get 8 @@ -363,25 +395,25 @@ t_persistent_session_stats(_Config) -> <<"subscriptions_ram">> := 4, <<"subscriptions_durable">> := 4 }}, - request(["monitor_current"]) + ?ON(N1, request(["monitor_current"])) ) end), %% Sanity checks - PSRouteCount = emqx_persistent_session_ds_router:stats(n_routes), + PSRouteCount = ?ON(N1, emqx_persistent_session_ds_router:stats(n_routes)), ?assert(PSRouteCount > 0, #{ps_route_count => PSRouteCount}), - PSSubCount = emqx_persistent_session_bookkeeper:get_subscription_count(), + PSSubCount = ?ON(N1, emqx_persistent_session_bookkeeper:get_subscription_count()), ?assert(PSSubCount > 0, #{ps_sub_count => PSSubCount}), %% Now with disconnected but alive persistent sessions {ok, {ok, _}} = ?wait_async_action( - emqtt:disconnect(PSClient), + emqtt:disconnect(PSClient1), #{?snk_kind := dashboard_monitor_flushed} ), ?retry(1_000, 10, begin ?assertMatch( {ok, #{ - <<"connections">> := 2, + <<"connections">> := 3, <<"disconnected_durable_sessions">> := 1, %% N.B.: we currently don't perform any deduplication between persistent %% and non-persistent routes, so we count `commont/topic' twice and get 8 @@ -391,7 +423,28 @@ t_persistent_session_stats(_Config) -> <<"subscriptions_ram">> := 4, <<"subscriptions_durable">> := 4 }}, - request(["monitor_current"]) + ?ON(N1, request(["monitor_current"])) + ) + end), + {ok, {ok, _}} = + ?wait_async_action( + emqtt:disconnect(PSClient2), + #{?snk_kind := dashboard_monitor_flushed} + ), + ?retry(1_000, 10, begin + ?assertMatch( + {ok, #{ + <<"connections">> := 3, + <<"disconnected_durable_sessions">> := 2, + %% N.B.: we currently don't perform any deduplication between persistent + %% and non-persistent routes, so we count `commont/topic' twice and get 8 + %% instead of 6 here. + <<"topics">> := 8, + <<"subscriptions">> := 8, + <<"subscriptions_ram">> := 4, + <<"subscriptions_durable">> := 4 + }}, + ?ON(N1, request(["monitor_current"])) ) end), @@ -467,15 +520,21 @@ waiting_emqx_stats_and_monitor_update(WaitKey) -> ok. start_and_connect(Opts) -> - Defaults = #{clean_start => false, expiry_interval => 30}, + Defaults = #{ + clean_start => false, + expiry_interval => 30, + port => 1883 + }, #{ clientid := ClientId, clean_start := CleanStart, - expiry_interval := EI + expiry_interval := EI, + port := Port } = maps:merge(Defaults, Opts), {ok, Client} = emqtt:start_link([ {clientid, ClientId}, {clean_start, CleanStart}, + {port, Port}, {proto_ver, v5}, {properties, #{'Session-Expiry-Interval' => EI}} ]), @@ -484,3 +543,7 @@ start_and_connect(Opts) -> end), {ok, _} = emqtt:connect(Client), Client. + +get_mqtt_port(Node, Type) -> + {_IP, Port} = ?ON(Node, emqx_config:get([listeners, Type, default, bind])), + Port.