diff --git a/.gitignore b/.gitignore index 7068c1c7d..5e91d4bc5 100644 --- a/.gitignore +++ b/.gitignore @@ -72,5 +72,7 @@ ct_run*/ apps/emqx_conf/etc/emqx.conf.all.rendered* rebar-git-cache.tar # build docker image locally +.dockerignore .docker_image_tag +.emqx_docker_image_tags .git/ diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index ebdfff0e7..ce694ba33 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -435,7 +435,7 @@ handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) -> true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), delete_route_if_needed({Group, Topic}), maybe_delete_round_robin_count({Group, Topic}), - {reply, ok, State}; + {reply, ok, update_stats(State)}; handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", req => Req}), {reply, ignored, State}. diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index cfbec3eb8..b136742d0 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -264,6 +264,8 @@ merge_cluster_rate(Node, Cluster) -> NCluster#{topics => V}; (retained_msg_count, V, NCluster) -> NCluster#{retained_msg_count => V}; + (shared_subscriptions, V, NCluster) -> + NCluster#{shared_subscriptions => V}; (license_quota, V, NCluster) -> NCluster#{license_quota => V}; %% for cluster sample, ignore node_uptime diff --git a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl index 93305727e..e13d63f45 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl @@ -21,24 +21,53 @@ -import(emqx_dashboard_SUITE, [auth_header_/0]). --include_lib("eunit/include/eunit.hrl"). -include("emqx_dashboard.hrl"). +-include_lib("eunit/include/eunit.hrl"). -define(SERVER, "http://127.0.0.1:18083"). -define(BASE_PATH, "/api/v5"). +-define(BASE_RETAINER_CONF, << + "retainer {\n" + " enable = true\n" + " msg_clear_interval = 0s\n" + " msg_expiry_interval = 0s\n" + " max_payload_size = 1MB\n" + " flow_control {\n" + " batch_read_number = 0\n" + " batch_deliver_number = 0\n" + " }\n" + " backend {\n" + " type = built_in_database\n" + " storage_type = ram\n" + " max_retained_messages = 0\n" + " }\n" + "}" +>>). + +%%-------------------------------------------------------------------- +%% CT boilerplate +%%-------------------------------------------------------------------- + all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - meck:new(emqx_retainer, [non_strict, passthrough, no_history, no_link]), - meck:expect(emqx_retainer, retained_count, fun() -> 0 end), - emqx_mgmt_api_test_util:init_suite([]), + ok = emqx_mgmt_api_test_util:init_suite([emqx, emqx_conf, emqx_retainer]), Config. end_per_suite(_Config) -> - meck:unload([emqx_retainer]), - emqx_mgmt_api_test_util:end_suite([]). + emqx_mgmt_api_test_util:end_suite([emqx_retainer]). + +set_special_configs(emqx_retainer) -> + emqx_retainer:update_config(?BASE_RETAINER_CONF), + ok; +set_special_configs(_App) -> + ok. + +%%-------------------------------------------------------------------- +%% Test Cases +%%-------------------------------------------------------------------- t_monitor_samplers_all(_Config) -> timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20), @@ -112,6 +141,65 @@ t_monitor_current_api_live_connections(_) -> {ok, _} = emqtt:connect(C2), ok = emqtt:disconnect(C2). +t_monitor_current_retained_count(_) -> + process_flag(trap_exit, true), + ClientId = <<"live_conn_tests">>, + {ok, C} = emqtt:start_link([{clean_start, false}, {clientid, ClientId}]), + {ok, _} = emqtt:connect(C), + _ = emqtt:publish(C, <<"t1">>, <<"qos1-retain">>, [{qos, 1}, {retain, true}]), + + ok = waiting_emqx_stats_and_monitor_update('retained.count'), + {ok, Res} = request(["monitor_current"]), + {ok, ResNode} = request(["monitor_current", "nodes", node()]), + + ?assertEqual(1, maps:get(<<"retained_msg_count">>, Res)), + ?assertEqual(1, maps:get(<<"retained_msg_count">>, ResNode)), + ok = emqtt:disconnect(C), + ok. + +t_monitor_current_shared_subscription(_) -> + process_flag(trap_exit, true), + ShareT = <<"$share/group1/t/1">>, + AssertFun = fun(Num) -> + {ok, Res} = request(["monitor_current"]), + {ok, ResNode} = request(["monitor_current", "nodes", node()]), + ?assertEqual(Num, maps:get(<<"shared_subscriptions">>, Res)), + ?assertEqual(Num, maps:get(<<"shared_subscriptions">>, ResNode)), + ok + end, + + ok = AssertFun(0), + + ClientId1 = <<"live_conn_tests1">>, + ClientId2 = <<"live_conn_tests2">>, + {ok, C1} = emqtt:start_link([{clean_start, false}, {clientid, ClientId1}]), + {ok, _} = emqtt:connect(C1), + _ = emqtt:subscribe(C1, {ShareT, 1}), + + ok = AssertFun(1), + + {ok, C2} = emqtt:start_link([{clean_start, true}, {clientid, ClientId2}]), + {ok, _} = emqtt:connect(C2), + _ = emqtt:subscribe(C2, {ShareT, 1}), + ok = AssertFun(2), + + _ = emqtt:unsubscribe(C2, ShareT), + ok = AssertFun(1), + _ = emqtt:subscribe(C2, {ShareT, 1}), + ok = AssertFun(2), + + ok = emqtt:disconnect(C1), + %% C1: clean_start = false, proto_ver = 3.1.1 + %% means disconnected but the session pid with a share-subscription is still alive + ok = AssertFun(2), + + _ = emqx_cm:kick_session(ClientId1), + ok = AssertFun(1), + + ok = emqtt:disconnect(C2), + ok = AssertFun(0), + ok. + t_monitor_reset(_) -> restart_monitor(), {ok, Rate} = request(["monitor_current"]), diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index b2aca37b6..8556e82d3 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -195,7 +195,7 @@ collect_mf(?PROMETHEUS_DEFAULT_REGISTRY, Callback) -> ok = add_collect_family(Callback, stats_metric_meta(), ?MG(stats_data, RawData)), ok = add_collect_family( Callback, - stats_metric_cluster_consistened_meta(), + stats_metric_cluster_consistented_meta(), ?MG(stats_data_cluster_consistented, RawData) ), ok = add_collect_family(Callback, vm_metric_meta(), ?MG(vm_data, RawData)), @@ -502,8 +502,6 @@ stats_metric_meta() -> {emqx_sessions_max, gauge, 'sessions.max'}, {emqx_channels_count, gauge, 'channels.count'}, {emqx_channels_max, gauge, 'channels.max'}, - {emqx_cluster_sessions_count, gauge, 'cluster_sessions.count'}, - {emqx_cluster_sessions_max, gauge, 'cluster_sessions.max'}, %% pub/sub stats {emqx_suboptions_count, gauge, 'suboptions.count'}, {emqx_suboptions_max, gauge, 'suboptions.max'}, @@ -511,21 +509,25 @@ stats_metric_meta() -> {emqx_subscribers_max, gauge, 'subscribers.max'}, {emqx_subscriptions_count, gauge, 'subscriptions.count'}, {emqx_subscriptions_max, gauge, 'subscriptions.max'}, - {emqx_subscriptions_shared_count, gauge, 'subscriptions.shared.count'}, - {emqx_subscriptions_shared_max, gauge, 'subscriptions.shared.max'}, %% delayed {emqx_delayed_count, gauge, 'delayed.count'}, {emqx_delayed_max, gauge, 'delayed.max'} ]. -stats_metric_cluster_consistened_meta() -> +stats_metric_cluster_consistented_meta() -> [ + %% sessions + {emqx_cluster_sessions_count, gauge, 'cluster_sessions.count'}, + {emqx_cluster_sessions_max, gauge, 'cluster_sessions.max'}, %% topics {emqx_topics_max, gauge, 'topics.max'}, {emqx_topics_count, gauge, 'topics.count'}, %% retained {emqx_retained_count, gauge, 'retained.count'}, - {emqx_retained_max, gauge, 'retained.max'} + {emqx_retained_max, gauge, 'retained.max'}, + %% shared subscriptions + {emqx_subscriptions_shared_count, gauge, 'subscriptions.shared.count'}, + {emqx_subscriptions_shared_max, gauge, 'subscriptions.shared.max'} ]. stats_data(Mode) -> @@ -545,7 +547,7 @@ stats_data_cluster_consistented() -> AccIn#{Name => [{[], ?C(MetricKAtom, Stats)}]} end, #{}, - stats_metric_cluster_consistened_meta() + stats_metric_cluster_consistented_meta() ). %%======================================== @@ -589,12 +591,19 @@ cluster_metric_meta() -> {emqx_cluster_nodes_stopped, gauge, undefined} ]. -cluster_data(Mode) -> +cluster_data(node) -> + Labels = [], + do_cluster_data(Labels); +cluster_data(_) -> + Labels = [{node, node(self())}], + do_cluster_data(Labels). + +do_cluster_data(Labels) -> Running = emqx:cluster_nodes(running), Stopped = emqx:cluster_nodes(stopped), #{ - emqx_cluster_nodes_running => [{with_node_label(Mode, []), length(Running)}], - emqx_cluster_nodes_stopped => [{with_node_label(Mode, []), length(Stopped)}] + emqx_cluster_nodes_running => [{Labels, length(Running)}], + emqx_cluster_nodes_stopped => [{Labels, length(Stopped)}] }. %%======================================== diff --git a/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl b/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl index 00a464811..8bba311dc 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl @@ -23,8 +23,6 @@ collect_json_data/2, - aggre_cluster/3, - point_to_map_fun/1, boolean_to_number/1, @@ -83,9 +81,6 @@ aggre_cluster(Module, Mode) -> Module:aggre_or_zip_init_acc() ). -aggre_cluster(LogicSumKs, ResL, Init) -> - do_aggre_cluster(LogicSumKs, ResL, Init). - do_aggre_cluster(_LogicSumKs, [], AccIn) -> AccIn; do_aggre_cluster(LogicSumKs, [{ok, {_NodeName, NodeMetric}} | Rest], AccIn) -> diff --git a/apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl b/apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl index a0009b8b2..72cbf8f96 100644 --- a/apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl +++ b/apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl @@ -287,12 +287,18 @@ assert_stats_metric_labels([MetricName | R] = _Metric, Mode) -> undefined -> ok; N when is_integer(N) -> - %% ct:print( - %% "====================~n" - %% "%% Metric: ~p~n" - %% "%% Expect labels count: ~p in Mode: ~p~n", - %% [_Metric, N, Mode] - %% ), + case N =:= length(lists:droplast(R)) of + true -> + ok; + false -> + ct:print( + "====================~n" + "%% Metric: ~p~n" + "%% Expect labels count: ~p in Mode: ~p~n" + "%% But got labels: ~p~n", + [_Metric, N, Mode, length(lists:droplast(R))] + ) + end, ?assertEqual(N, length(lists:droplast(R))) end. @@ -304,10 +310,14 @@ assert_stats_metric_labels([MetricName | R] = _Metric, Mode) -> %% `/prometheus/stats` %% BEGIN always no label +metric_meta(<<"emqx_cluster_sessions_count">>) -> ?meta(0, 0, 0); +metric_meta(<<"emqx_cluster_sessions_max">>) -> ?meta(0, 0, 0); metric_meta(<<"emqx_topics_max">>) -> ?meta(0, 0, 0); metric_meta(<<"emqx_topics_count">>) -> ?meta(0, 0, 0); metric_meta(<<"emqx_retained_count">>) -> ?meta(0, 0, 0); metric_meta(<<"emqx_retained_max">>) -> ?meta(0, 0, 0); +metric_meta(<<"emqx_subscriptions_shared_count">>) -> ?meta(0, 0, 0); +metric_meta(<<"emqx_subscriptions_shared_max">>) -> ?meta(0, 0, 0); %% END %% BEGIN no label in mode `node` metric_meta(<<"emqx_vm_cpu_use">>) -> ?meta(0, 1, 1); @@ -316,6 +326,8 @@ metric_meta(<<"emqx_vm_run_queue">>) -> ?meta(0, 1, 1); metric_meta(<<"emqx_vm_process_messages_in_queues">>) -> ?meta(0, 1, 1); metric_meta(<<"emqx_vm_total_memory">>) -> ?meta(0, 1, 1); metric_meta(<<"emqx_vm_used_memory">>) -> ?meta(0, 1, 1); +metric_meta(<<"emqx_cluster_nodes_running">>) -> ?meta(0, 1, 1); +metric_meta(<<"emqx_cluster_nodes_stopped">>) -> ?meta(0, 1, 1); %% END metric_meta(<<"emqx_cert_expiry_at">>) -> ?meta(2, 2, 2); metric_meta(<<"emqx_license_expiry_at">>) -> ?meta(0, 0, 0); diff --git a/changes/ce/fix-12714.en.md b/changes/ce/fix-12714.en.md new file mode 100644 index 000000000..4c021f72f --- /dev/null +++ b/changes/ce/fix-12714.en.md @@ -0,0 +1,12 @@ +Fixed some field errors in prometheus api `/prometheus/stats`. + +Related metrics names: +- `emqx_cluster_sessions_count` +- `emqx_cluster_sessions_max` +- `emqx_cluster_nodes_running` +- `emqx_cluster_nodes_stopped` +- `emqx_subscriptions_shared_count` +- `emqx_subscriptions_shared_max` + +Fixed the issue in endpoint: `/stats` that the values of fields `subscriptions.shared.count` and `subscriptions.shared.max` +can not be updated in time when the client disconnected or unsubscribed the Shared-Subscription.