diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 7bbfd5bd5..81eacc9d0 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -192,6 +192,11 @@ collect_mf(?PROMETHEUS_DEFAULT_REGISTRY, Callback) -> RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), %% TODO: license expiry epoch and cert expiry epoch should be cached ok = add_collect_family(Callback, stats_metric_meta(), ?MG(stats_data, RawData)), + ok = add_collect_family( + Callback, + stats_metric_cluster_consistened_meta(), + ?MG(stats_data_cluster_consistented, RawData) + ), ok = add_collect_family(Callback, vm_metric_meta(), ?MG(vm_data, RawData)), ok = add_collect_family(Callback, cluster_metric_meta(), ?MG(cluster_data, RawData)), @@ -214,8 +219,8 @@ collect_mf(_Registry, _Callback) -> collect(<<"json">>) -> RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), (maybe_license_collect_json_data(RawData))#{ - stats => collect_json_data(?MG(stats_data, RawData)), - metrics => collect_json_data(?MG(vm_data, RawData)), + stats => collect_stats_json_data(RawData), + metrics => collect_vm_json_data(?MG(vm_data, RawData)), packets => collect_json_data(?MG(emqx_packet_data, RawData)), messages => collect_json_data(?MG(emqx_message_data, RawData)), delivery => collect_json_data(?MG(emqx_delivery_data, RawData)), @@ -259,6 +264,7 @@ fetch_from_local_node(Mode) -> fetch_cluster_consistented_data() -> (maybe_license_fetch_data())#{ + stats_data_cluster_consistented => stats_data_cluster_consistented(), cert_data => cert_data() }. @@ -477,8 +483,6 @@ stats_metric_meta() -> {emqx_channels_count, counter, 'channels.count'}, {emqx_channels_max, counter, 'channels.max'}, %% pub/sub stats - {emqx_topics_count, counter, 'topics.count'}, - {emqx_topics_max, counter, 'topics.max'}, {emqx_suboptions_count, counter, 'suboptions.count'}, {emqx_suboptions_max, counter, 'suboptions.max'}, {emqx_subscribers_count, counter, 'subscribers.count'}, @@ -487,14 +491,21 @@ stats_metric_meta() -> {emqx_subscriptions_max, counter, 'subscriptions.max'}, {emqx_subscriptions_shared_count, counter, 'subscriptions.shared.count'}, {emqx_subscriptions_shared_max, counter, 'subscriptions.shared.max'}, - %% retained - {emqx_retained_count, counter, 'retained.count'}, - {emqx_retained_max, counter, 'retained.max'}, %% delayed {emqx_delayed_count, counter, 'delayed.count'}, {emqx_delayed_max, counter, 'delayed.max'} ]. +stats_metric_cluster_consistened_meta() -> + [ + %% topics + {emqx_topics_max, counter, 'topics.max'}, + {emqx_topics_count, counter, 'topics.count'}, + %% retained + {emqx_retained_count, counter, 'retained.count'}, + {emqx_retained_max, counter, 'retained.max'} + ]. + stats_data(Mode) -> Stats = emqx_stats:getstats(), lists:foldl( @@ -505,6 +516,16 @@ stats_data(Mode) -> stats_metric_meta() ). +stats_data_cluster_consistented() -> + Stats = emqx_stats:getstats(), + lists:foldl( + fun({Name, _Type, MetricKAtom}, AccIn) -> + AccIn#{Name => [{[], ?C(MetricKAtom, Stats)}]} + end, + #{}, + stats_metric_cluster_consistened_meta() + ). + %%======================================== %% Erlang VM %%======================================== @@ -875,10 +896,25 @@ date_to_expiry_epoch(DateTime) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% merge / zip formatting funcs for type `application/json` +collect_stats_json_data(RawData) -> + StatsData = ?MG(stats_data, RawData), + StatsClData = ?MG(stats_data_cluster_consistented, RawData), + D = maps:merge(StatsData, StatsClData), + collect_json_data(D). + %% always return json array collect_cert_json_data(Data) -> collect_json_data_(Data). +collect_vm_json_data(Data) -> + DataListPerNode = collect_json_data_(Data), + case {?GET_PROM_DATA_MODE(), DataListPerNode} of + {?PROM_DATA_MODE__NODE, [NData | _]} -> + NData; + {_, _} -> + DataListPerNode + end. + collect_json_data(Data0) -> DataListPerNode = collect_json_data_(Data0), case {?GET_PROM_DATA_MODE(), DataListPerNode} of