Merge pull request #12714 from JimMoen/fix-shared-sub-count-in-api
0315 Fix `shared_subscriptions` counting error. followup #12670
This commit is contained in:
commit
59ef14ed9c
|
@ -72,5 +72,7 @@ ct_run*/
|
||||||
apps/emqx_conf/etc/emqx.conf.all.rendered*
|
apps/emqx_conf/etc/emqx.conf.all.rendered*
|
||||||
rebar-git-cache.tar
|
rebar-git-cache.tar
|
||||||
# build docker image locally
|
# build docker image locally
|
||||||
|
.dockerignore
|
||||||
.docker_image_tag
|
.docker_image_tag
|
||||||
|
.emqx_docker_image_tags
|
||||||
.git/
|
.git/
|
||||||
|
|
|
@ -435,7 +435,7 @@ handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) ->
|
||||||
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
||||||
delete_route_if_needed({Group, Topic}),
|
delete_route_if_needed({Group, Topic}),
|
||||||
maybe_delete_round_robin_count({Group, Topic}),
|
maybe_delete_round_robin_count({Group, Topic}),
|
||||||
{reply, ok, State};
|
{reply, ok, update_stats(State)};
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_call", req => Req}),
|
?SLOG(error, #{msg => "unexpected_call", req => Req}),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
|
@ -264,6 +264,8 @@ merge_cluster_rate(Node, Cluster) ->
|
||||||
NCluster#{topics => V};
|
NCluster#{topics => V};
|
||||||
(retained_msg_count, V, NCluster) ->
|
(retained_msg_count, V, NCluster) ->
|
||||||
NCluster#{retained_msg_count => V};
|
NCluster#{retained_msg_count => V};
|
||||||
|
(shared_subscriptions, V, NCluster) ->
|
||||||
|
NCluster#{shared_subscriptions => V};
|
||||||
(license_quota, V, NCluster) ->
|
(license_quota, V, NCluster) ->
|
||||||
NCluster#{license_quota => V};
|
NCluster#{license_quota => V};
|
||||||
%% for cluster sample, ignore node_uptime
|
%% for cluster sample, ignore node_uptime
|
||||||
|
|
|
@ -21,24 +21,53 @@
|
||||||
|
|
||||||
-import(emqx_dashboard_SUITE, [auth_header_/0]).
|
-import(emqx_dashboard_SUITE, [auth_header_/0]).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
|
||||||
-include("emqx_dashboard.hrl").
|
-include("emqx_dashboard.hrl").
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
-define(SERVER, "http://127.0.0.1:18083").
|
-define(SERVER, "http://127.0.0.1:18083").
|
||||||
-define(BASE_PATH, "/api/v5").
|
-define(BASE_PATH, "/api/v5").
|
||||||
|
|
||||||
|
-define(BASE_RETAINER_CONF, <<
|
||||||
|
"retainer {\n"
|
||||||
|
" enable = true\n"
|
||||||
|
" msg_clear_interval = 0s\n"
|
||||||
|
" msg_expiry_interval = 0s\n"
|
||||||
|
" max_payload_size = 1MB\n"
|
||||||
|
" flow_control {\n"
|
||||||
|
" batch_read_number = 0\n"
|
||||||
|
" batch_deliver_number = 0\n"
|
||||||
|
" }\n"
|
||||||
|
" backend {\n"
|
||||||
|
" type = built_in_database\n"
|
||||||
|
" storage_type = ram\n"
|
||||||
|
" max_retained_messages = 0\n"
|
||||||
|
" }\n"
|
||||||
|
"}"
|
||||||
|
>>).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% CT boilerplate
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
meck:new(emqx_retainer, [non_strict, passthrough, no_history, no_link]),
|
ok = emqx_mgmt_api_test_util:init_suite([emqx, emqx_conf, emqx_retainer]),
|
||||||
meck:expect(emqx_retainer, retained_count, fun() -> 0 end),
|
|
||||||
emqx_mgmt_api_test_util:init_suite([]),
|
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
meck:unload([emqx_retainer]),
|
emqx_mgmt_api_test_util:end_suite([emqx_retainer]).
|
||||||
emqx_mgmt_api_test_util:end_suite([]).
|
|
||||||
|
set_special_configs(emqx_retainer) ->
|
||||||
|
emqx_retainer:update_config(?BASE_RETAINER_CONF),
|
||||||
|
ok;
|
||||||
|
set_special_configs(_App) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Test Cases
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
t_monitor_samplers_all(_Config) ->
|
t_monitor_samplers_all(_Config) ->
|
||||||
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
|
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
|
||||||
|
@ -112,6 +141,65 @@ t_monitor_current_api_live_connections(_) ->
|
||||||
{ok, _} = emqtt:connect(C2),
|
{ok, _} = emqtt:connect(C2),
|
||||||
ok = emqtt:disconnect(C2).
|
ok = emqtt:disconnect(C2).
|
||||||
|
|
||||||
|
t_monitor_current_retained_count(_) ->
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
ClientId = <<"live_conn_tests">>,
|
||||||
|
{ok, C} = emqtt:start_link([{clean_start, false}, {clientid, ClientId}]),
|
||||||
|
{ok, _} = emqtt:connect(C),
|
||||||
|
_ = emqtt:publish(C, <<"t1">>, <<"qos1-retain">>, [{qos, 1}, {retain, true}]),
|
||||||
|
|
||||||
|
ok = waiting_emqx_stats_and_monitor_update('retained.count'),
|
||||||
|
{ok, Res} = request(["monitor_current"]),
|
||||||
|
{ok, ResNode} = request(["monitor_current", "nodes", node()]),
|
||||||
|
|
||||||
|
?assertEqual(1, maps:get(<<"retained_msg_count">>, Res)),
|
||||||
|
?assertEqual(1, maps:get(<<"retained_msg_count">>, ResNode)),
|
||||||
|
ok = emqtt:disconnect(C),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_monitor_current_shared_subscription(_) ->
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
ShareT = <<"$share/group1/t/1">>,
|
||||||
|
AssertFun = fun(Num) ->
|
||||||
|
{ok, Res} = request(["monitor_current"]),
|
||||||
|
{ok, ResNode} = request(["monitor_current", "nodes", node()]),
|
||||||
|
?assertEqual(Num, maps:get(<<"shared_subscriptions">>, Res)),
|
||||||
|
?assertEqual(Num, maps:get(<<"shared_subscriptions">>, ResNode)),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
|
||||||
|
ok = AssertFun(0),
|
||||||
|
|
||||||
|
ClientId1 = <<"live_conn_tests1">>,
|
||||||
|
ClientId2 = <<"live_conn_tests2">>,
|
||||||
|
{ok, C1} = emqtt:start_link([{clean_start, false}, {clientid, ClientId1}]),
|
||||||
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
_ = emqtt:subscribe(C1, {ShareT, 1}),
|
||||||
|
|
||||||
|
ok = AssertFun(1),
|
||||||
|
|
||||||
|
{ok, C2} = emqtt:start_link([{clean_start, true}, {clientid, ClientId2}]),
|
||||||
|
{ok, _} = emqtt:connect(C2),
|
||||||
|
_ = emqtt:subscribe(C2, {ShareT, 1}),
|
||||||
|
ok = AssertFun(2),
|
||||||
|
|
||||||
|
_ = emqtt:unsubscribe(C2, ShareT),
|
||||||
|
ok = AssertFun(1),
|
||||||
|
_ = emqtt:subscribe(C2, {ShareT, 1}),
|
||||||
|
ok = AssertFun(2),
|
||||||
|
|
||||||
|
ok = emqtt:disconnect(C1),
|
||||||
|
%% C1: clean_start = false, proto_ver = 3.1.1
|
||||||
|
%% means disconnected but the session pid with a share-subscription is still alive
|
||||||
|
ok = AssertFun(2),
|
||||||
|
|
||||||
|
_ = emqx_cm:kick_session(ClientId1),
|
||||||
|
ok = AssertFun(1),
|
||||||
|
|
||||||
|
ok = emqtt:disconnect(C2),
|
||||||
|
ok = AssertFun(0),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_monitor_reset(_) ->
|
t_monitor_reset(_) ->
|
||||||
restart_monitor(),
|
restart_monitor(),
|
||||||
{ok, Rate} = request(["monitor_current"]),
|
{ok, Rate} = request(["monitor_current"]),
|
||||||
|
|
|
@ -195,7 +195,7 @@ collect_mf(?PROMETHEUS_DEFAULT_REGISTRY, Callback) ->
|
||||||
ok = add_collect_family(Callback, stats_metric_meta(), ?MG(stats_data, RawData)),
|
ok = add_collect_family(Callback, stats_metric_meta(), ?MG(stats_data, RawData)),
|
||||||
ok = add_collect_family(
|
ok = add_collect_family(
|
||||||
Callback,
|
Callback,
|
||||||
stats_metric_cluster_consistened_meta(),
|
stats_metric_cluster_consistented_meta(),
|
||||||
?MG(stats_data_cluster_consistented, RawData)
|
?MG(stats_data_cluster_consistented, RawData)
|
||||||
),
|
),
|
||||||
ok = add_collect_family(Callback, vm_metric_meta(), ?MG(vm_data, RawData)),
|
ok = add_collect_family(Callback, vm_metric_meta(), ?MG(vm_data, RawData)),
|
||||||
|
@ -502,8 +502,6 @@ stats_metric_meta() ->
|
||||||
{emqx_sessions_max, gauge, 'sessions.max'},
|
{emqx_sessions_max, gauge, 'sessions.max'},
|
||||||
{emqx_channels_count, gauge, 'channels.count'},
|
{emqx_channels_count, gauge, 'channels.count'},
|
||||||
{emqx_channels_max, gauge, 'channels.max'},
|
{emqx_channels_max, gauge, 'channels.max'},
|
||||||
{emqx_cluster_sessions_count, gauge, 'cluster_sessions.count'},
|
|
||||||
{emqx_cluster_sessions_max, gauge, 'cluster_sessions.max'},
|
|
||||||
%% pub/sub stats
|
%% pub/sub stats
|
||||||
{emqx_suboptions_count, gauge, 'suboptions.count'},
|
{emqx_suboptions_count, gauge, 'suboptions.count'},
|
||||||
{emqx_suboptions_max, gauge, 'suboptions.max'},
|
{emqx_suboptions_max, gauge, 'suboptions.max'},
|
||||||
|
@ -511,21 +509,25 @@ stats_metric_meta() ->
|
||||||
{emqx_subscribers_max, gauge, 'subscribers.max'},
|
{emqx_subscribers_max, gauge, 'subscribers.max'},
|
||||||
{emqx_subscriptions_count, gauge, 'subscriptions.count'},
|
{emqx_subscriptions_count, gauge, 'subscriptions.count'},
|
||||||
{emqx_subscriptions_max, gauge, 'subscriptions.max'},
|
{emqx_subscriptions_max, gauge, 'subscriptions.max'},
|
||||||
{emqx_subscriptions_shared_count, gauge, 'subscriptions.shared.count'},
|
|
||||||
{emqx_subscriptions_shared_max, gauge, 'subscriptions.shared.max'},
|
|
||||||
%% delayed
|
%% delayed
|
||||||
{emqx_delayed_count, gauge, 'delayed.count'},
|
{emqx_delayed_count, gauge, 'delayed.count'},
|
||||||
{emqx_delayed_max, gauge, 'delayed.max'}
|
{emqx_delayed_max, gauge, 'delayed.max'}
|
||||||
].
|
].
|
||||||
|
|
||||||
stats_metric_cluster_consistened_meta() ->
|
stats_metric_cluster_consistented_meta() ->
|
||||||
[
|
[
|
||||||
|
%% sessions
|
||||||
|
{emqx_cluster_sessions_count, gauge, 'cluster_sessions.count'},
|
||||||
|
{emqx_cluster_sessions_max, gauge, 'cluster_sessions.max'},
|
||||||
%% topics
|
%% topics
|
||||||
{emqx_topics_max, gauge, 'topics.max'},
|
{emqx_topics_max, gauge, 'topics.max'},
|
||||||
{emqx_topics_count, gauge, 'topics.count'},
|
{emqx_topics_count, gauge, 'topics.count'},
|
||||||
%% retained
|
%% retained
|
||||||
{emqx_retained_count, gauge, 'retained.count'},
|
{emqx_retained_count, gauge, 'retained.count'},
|
||||||
{emqx_retained_max, gauge, 'retained.max'}
|
{emqx_retained_max, gauge, 'retained.max'},
|
||||||
|
%% shared subscriptions
|
||||||
|
{emqx_subscriptions_shared_count, gauge, 'subscriptions.shared.count'},
|
||||||
|
{emqx_subscriptions_shared_max, gauge, 'subscriptions.shared.max'}
|
||||||
].
|
].
|
||||||
|
|
||||||
stats_data(Mode) ->
|
stats_data(Mode) ->
|
||||||
|
@ -545,7 +547,7 @@ stats_data_cluster_consistented() ->
|
||||||
AccIn#{Name => [{[], ?C(MetricKAtom, Stats)}]}
|
AccIn#{Name => [{[], ?C(MetricKAtom, Stats)}]}
|
||||||
end,
|
end,
|
||||||
#{},
|
#{},
|
||||||
stats_metric_cluster_consistened_meta()
|
stats_metric_cluster_consistented_meta()
|
||||||
).
|
).
|
||||||
|
|
||||||
%%========================================
|
%%========================================
|
||||||
|
@ -589,12 +591,19 @@ cluster_metric_meta() ->
|
||||||
{emqx_cluster_nodes_stopped, gauge, undefined}
|
{emqx_cluster_nodes_stopped, gauge, undefined}
|
||||||
].
|
].
|
||||||
|
|
||||||
cluster_data(Mode) ->
|
cluster_data(node) ->
|
||||||
|
Labels = [],
|
||||||
|
do_cluster_data(Labels);
|
||||||
|
cluster_data(_) ->
|
||||||
|
Labels = [{node, node(self())}],
|
||||||
|
do_cluster_data(Labels).
|
||||||
|
|
||||||
|
do_cluster_data(Labels) ->
|
||||||
Running = emqx:cluster_nodes(running),
|
Running = emqx:cluster_nodes(running),
|
||||||
Stopped = emqx:cluster_nodes(stopped),
|
Stopped = emqx:cluster_nodes(stopped),
|
||||||
#{
|
#{
|
||||||
emqx_cluster_nodes_running => [{with_node_label(Mode, []), length(Running)}],
|
emqx_cluster_nodes_running => [{Labels, length(Running)}],
|
||||||
emqx_cluster_nodes_stopped => [{with_node_label(Mode, []), length(Stopped)}]
|
emqx_cluster_nodes_stopped => [{Labels, length(Stopped)}]
|
||||||
}.
|
}.
|
||||||
|
|
||||||
%%========================================
|
%%========================================
|
||||||
|
|
|
@ -23,8 +23,6 @@
|
||||||
|
|
||||||
collect_json_data/2,
|
collect_json_data/2,
|
||||||
|
|
||||||
aggre_cluster/3,
|
|
||||||
|
|
||||||
point_to_map_fun/1,
|
point_to_map_fun/1,
|
||||||
|
|
||||||
boolean_to_number/1,
|
boolean_to_number/1,
|
||||||
|
@ -83,9 +81,6 @@ aggre_cluster(Module, Mode) ->
|
||||||
Module:aggre_or_zip_init_acc()
|
Module:aggre_or_zip_init_acc()
|
||||||
).
|
).
|
||||||
|
|
||||||
aggre_cluster(LogicSumKs, ResL, Init) ->
|
|
||||||
do_aggre_cluster(LogicSumKs, ResL, Init).
|
|
||||||
|
|
||||||
do_aggre_cluster(_LogicSumKs, [], AccIn) ->
|
do_aggre_cluster(_LogicSumKs, [], AccIn) ->
|
||||||
AccIn;
|
AccIn;
|
||||||
do_aggre_cluster(LogicSumKs, [{ok, {_NodeName, NodeMetric}} | Rest], AccIn) ->
|
do_aggre_cluster(LogicSumKs, [{ok, {_NodeName, NodeMetric}} | Rest], AccIn) ->
|
||||||
|
|
|
@ -287,12 +287,18 @@ assert_stats_metric_labels([MetricName | R] = _Metric, Mode) ->
|
||||||
undefined ->
|
undefined ->
|
||||||
ok;
|
ok;
|
||||||
N when is_integer(N) ->
|
N when is_integer(N) ->
|
||||||
%% ct:print(
|
case N =:= length(lists:droplast(R)) of
|
||||||
%% "====================~n"
|
true ->
|
||||||
%% "%% Metric: ~p~n"
|
ok;
|
||||||
%% "%% Expect labels count: ~p in Mode: ~p~n",
|
false ->
|
||||||
%% [_Metric, N, Mode]
|
ct:print(
|
||||||
%% ),
|
"====================~n"
|
||||||
|
"%% Metric: ~p~n"
|
||||||
|
"%% Expect labels count: ~p in Mode: ~p~n"
|
||||||
|
"%% But got labels: ~p~n",
|
||||||
|
[_Metric, N, Mode, length(lists:droplast(R))]
|
||||||
|
)
|
||||||
|
end,
|
||||||
?assertEqual(N, length(lists:droplast(R)))
|
?assertEqual(N, length(lists:droplast(R)))
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -304,10 +310,14 @@ assert_stats_metric_labels([MetricName | R] = _Metric, Mode) ->
|
||||||
|
|
||||||
%% `/prometheus/stats`
|
%% `/prometheus/stats`
|
||||||
%% BEGIN always no label
|
%% BEGIN always no label
|
||||||
|
metric_meta(<<"emqx_cluster_sessions_count">>) -> ?meta(0, 0, 0);
|
||||||
|
metric_meta(<<"emqx_cluster_sessions_max">>) -> ?meta(0, 0, 0);
|
||||||
metric_meta(<<"emqx_topics_max">>) -> ?meta(0, 0, 0);
|
metric_meta(<<"emqx_topics_max">>) -> ?meta(0, 0, 0);
|
||||||
metric_meta(<<"emqx_topics_count">>) -> ?meta(0, 0, 0);
|
metric_meta(<<"emqx_topics_count">>) -> ?meta(0, 0, 0);
|
||||||
metric_meta(<<"emqx_retained_count">>) -> ?meta(0, 0, 0);
|
metric_meta(<<"emqx_retained_count">>) -> ?meta(0, 0, 0);
|
||||||
metric_meta(<<"emqx_retained_max">>) -> ?meta(0, 0, 0);
|
metric_meta(<<"emqx_retained_max">>) -> ?meta(0, 0, 0);
|
||||||
|
metric_meta(<<"emqx_subscriptions_shared_count">>) -> ?meta(0, 0, 0);
|
||||||
|
metric_meta(<<"emqx_subscriptions_shared_max">>) -> ?meta(0, 0, 0);
|
||||||
%% END
|
%% END
|
||||||
%% BEGIN no label in mode `node`
|
%% BEGIN no label in mode `node`
|
||||||
metric_meta(<<"emqx_vm_cpu_use">>) -> ?meta(0, 1, 1);
|
metric_meta(<<"emqx_vm_cpu_use">>) -> ?meta(0, 1, 1);
|
||||||
|
@ -316,6 +326,8 @@ metric_meta(<<"emqx_vm_run_queue">>) -> ?meta(0, 1, 1);
|
||||||
metric_meta(<<"emqx_vm_process_messages_in_queues">>) -> ?meta(0, 1, 1);
|
metric_meta(<<"emqx_vm_process_messages_in_queues">>) -> ?meta(0, 1, 1);
|
||||||
metric_meta(<<"emqx_vm_total_memory">>) -> ?meta(0, 1, 1);
|
metric_meta(<<"emqx_vm_total_memory">>) -> ?meta(0, 1, 1);
|
||||||
metric_meta(<<"emqx_vm_used_memory">>) -> ?meta(0, 1, 1);
|
metric_meta(<<"emqx_vm_used_memory">>) -> ?meta(0, 1, 1);
|
||||||
|
metric_meta(<<"emqx_cluster_nodes_running">>) -> ?meta(0, 1, 1);
|
||||||
|
metric_meta(<<"emqx_cluster_nodes_stopped">>) -> ?meta(0, 1, 1);
|
||||||
%% END
|
%% END
|
||||||
metric_meta(<<"emqx_cert_expiry_at">>) -> ?meta(2, 2, 2);
|
metric_meta(<<"emqx_cert_expiry_at">>) -> ?meta(2, 2, 2);
|
||||||
metric_meta(<<"emqx_license_expiry_at">>) -> ?meta(0, 0, 0);
|
metric_meta(<<"emqx_license_expiry_at">>) -> ?meta(0, 0, 0);
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
Fixed some field errors in prometheus api `/prometheus/stats`.
|
||||||
|
|
||||||
|
Related metrics names:
|
||||||
|
- `emqx_cluster_sessions_count`
|
||||||
|
- `emqx_cluster_sessions_max`
|
||||||
|
- `emqx_cluster_nodes_running`
|
||||||
|
- `emqx_cluster_nodes_stopped`
|
||||||
|
- `emqx_subscriptions_shared_count`
|
||||||
|
- `emqx_subscriptions_shared_max`
|
||||||
|
|
||||||
|
Fixed the issue in endpoint: `/stats` that the values of fields `subscriptions.shared.count` and `subscriptions.shared.max`
|
||||||
|
can not be updated in time when the client disconnected or unsubscribed the Shared-Subscription.
|
Loading…
Reference in New Issue