diff --git a/Makefile b/Makefile index 48ca7ebcb..0ccd48410 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ endif # Dashboard version # from https://github.com/emqx/emqx-dashboard5 export EMQX_DASHBOARD_VERSION ?= v1.6.1 -export EMQX_EE_DASHBOARD_VERSION ?= e1.5.0-beta.3 +export EMQX_EE_DASHBOARD_VERSION ?= e1.5.0-beta.8 PROFILE ?= emqx REL_PROFILES := emqx emqx-enterprise diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index 95471ae5b..6dcd24355 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -1177,6 +1177,9 @@ format_resource( ) ). +%% FIXME: +%% missing metrics: +%% 'retried.success' and 'retried.failed' format_metrics(#{ counters := #{ 'dropped' := Dropped, diff --git a/apps/emqx_prometheus/rebar.config b/apps/emqx_prometheus/rebar.config index 649437765..578881d71 100644 --- a/apps/emqx_prometheus/rebar.config +++ b/apps/emqx_prometheus/rebar.config @@ -4,6 +4,7 @@ {emqx, {path, "../emqx"}}, {emqx_utils, {path, "../emqx_utils"}}, {emqx_auth, {path, "../emqx_auth"}}, + {emqx_resource, {path, "../emqx_resource"}}, {prometheus, {git, "https://github.com/emqx/prometheus.erl", {tag, "v4.10.0.2"}}} ]}. diff --git a/apps/emqx_prometheus/src/emqx_prometheus.app.src b/apps/emqx_prometheus/src/emqx_prometheus.app.src index 75c608087..9e9952d6c 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.app.src +++ b/apps/emqx_prometheus/src/emqx_prometheus.app.src @@ -5,7 +5,7 @@ {vsn, "5.0.19"}, {modules, []}, {registered, [emqx_prometheus_sup]}, - {applications, [kernel, stdlib, prometheus, emqx, emqx_auth, emqx_management]}, + {applications, [kernel, stdlib, prometheus, emqx, emqx_auth, emqx_resource, emqx_management]}, {mod, {emqx_prometheus_app, []}}, {env, []}, {licenses, ["Apache-2.0"]}, diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index af35acc36..ffa113a23 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -24,7 +24,7 @@ -behaviour(emqx_prometheus_cluster). -export([ - fetch_data_from_local_node/0, + fetch_from_local_node/1, fetch_cluster_consistented_data/0, aggre_or_zip_init_acc/0, logic_sum_metrics/0 @@ -90,8 +90,6 @@ -define(HTTP_OPTIONS, [{autoredirect, true}, {timeout, 60000}]). --define(LOGICAL_SUM_METRIC_NAMES, []). - %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -194,6 +192,11 @@ collect_mf(?PROMETHEUS_DEFAULT_REGISTRY, Callback) -> RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), %% TODO: license expiry epoch and cert expiry epoch should be cached ok = add_collect_family(Callback, stats_metric_meta(), ?MG(stats_data, RawData)), + ok = add_collect_family( + Callback, + stats_metric_cluster_consistened_meta(), + ?MG(stats_data_cluster_consistented, RawData) + ), ok = add_collect_family(Callback, vm_metric_meta(), ?MG(vm_data, RawData)), ok = add_collect_family(Callback, cluster_metric_meta(), ?MG(cluster_data, RawData)), @@ -216,8 +219,8 @@ collect_mf(_Registry, _Callback) -> collect(<<"json">>) -> RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), (maybe_license_collect_json_data(RawData))#{ - stats => collect_json_data(?MG(stats_data, RawData)), - metrics => collect_json_data(?MG(vm_data, RawData)), + stats => collect_stats_json_data(RawData), + metrics => collect_vm_json_data(?MG(vm_data, RawData)), packets => collect_json_data(?MG(emqx_packet_data, RawData)), messages => collect_json_data(?MG(emqx_message_data, RawData)), delivery => collect_json_data(?MG(emqx_delivery_data, RawData)), @@ -243,24 +246,25 @@ add_collect_family(Name, Data, Callback, Type) -> Callback(create_mf(Name, _Help = <<"">>, Type, ?MODULE, Data)). %% behaviour -fetch_data_from_local_node() -> +fetch_from_local_node(Mode) -> {node(self()), #{ - stats_data => stats_data(), - vm_data => vm_data(), - cluster_data => cluster_data(), + stats_data => stats_data(Mode), + vm_data => vm_data(Mode), + cluster_data => cluster_data(Mode), %% Metrics - emqx_packet_data => emqx_metric_data(emqx_packet_metric_meta()), - emqx_message_data => emqx_metric_data(message_metric_meta()), - emqx_delivery_data => emqx_metric_data(delivery_metric_meta()), - emqx_client_data => emqx_metric_data(client_metric_meta()), - emqx_session_data => emqx_metric_data(session_metric_meta()), - emqx_olp_data => emqx_metric_data(olp_metric_meta()), - emqx_acl_data => emqx_metric_data(acl_metric_meta()), - emqx_authn_data => emqx_metric_data(authn_metric_meta()) + emqx_packet_data => emqx_metric_data(emqx_packet_metric_meta(), Mode), + emqx_message_data => emqx_metric_data(message_metric_meta(), Mode), + emqx_delivery_data => emqx_metric_data(delivery_metric_meta(), Mode), + emqx_client_data => emqx_metric_data(client_metric_meta(), Mode), + emqx_session_data => emqx_metric_data(session_metric_meta(), Mode), + emqx_olp_data => emqx_metric_data(olp_metric_meta(), Mode), + emqx_acl_data => emqx_metric_data(acl_metric_meta(), Mode), + emqx_authn_data => emqx_metric_data(authn_metric_meta(), Mode) }}. fetch_cluster_consistented_data() -> (maybe_license_fetch_data())#{ + stats_data_cluster_consistented => stats_data_cluster_consistented(), cert_data => cert_data() }. @@ -280,7 +284,7 @@ aggre_or_zip_init_acc() -> }. logic_sum_metrics() -> - ?LOGICAL_SUM_METRIC_NAMES. + []. %%-------------------------------------------------------------------- %% Collector @@ -469,42 +473,57 @@ emqx_collect(K = emqx_cert_expiry_at, D) -> gauge_metrics(?MG(K, D)). stats_metric_meta() -> [ %% connections - {emqx_connections_count, counter, 'connections.count'}, - {emqx_connections_max, counter, 'connections.max'}, - {emqx_live_connections_count, counter, 'live_connections.count'}, - {emqx_live_connections_max, counter, 'live_connections.max'}, + {emqx_connections_count, gauge, 'connections.count'}, + {emqx_connections_max, gauge, 'connections.max'}, + {emqx_live_connections_count, gauge, 'live_connections.count'}, + {emqx_live_connections_max, gauge, 'live_connections.max'}, %% sessions - {emqx_sessions_count, counter, 'sessions.count'}, - {emqx_sessions_max, counter, 'sessions.max'}, - {emqx_channels_count, counter, 'channels.count'}, - {emqx_channels_max, counter, 'channels.max'}, + {emqx_sessions_count, gauge, 'sessions.count'}, + {emqx_sessions_max, gauge, 'sessions.max'}, + {emqx_channels_count, gauge, 'channels.count'}, + {emqx_channels_max, gauge, 'channels.max'}, %% pub/sub stats - {emqx_topics_count, counter, 'topics.count'}, - {emqx_topics_max, counter, 'topics.max'}, - {emqx_suboptions_count, counter, 'suboptions.count'}, - {emqx_suboptions_max, counter, 'suboptions.max'}, - {emqx_subscribers_count, counter, 'subscribers.count'}, - {emqx_subscribers_max, counter, 'subscribers.max'}, - {emqx_subscriptions_count, counter, 'subscriptions.count'}, - {emqx_subscriptions_max, counter, 'subscriptions.max'}, - {emqx_subscriptions_shared_count, counter, 'subscriptions.shared.count'}, - {emqx_subscriptions_shared_max, counter, 'subscriptions.shared.max'}, - %% retained - {emqx_retained_count, counter, 'retained.count'}, - {emqx_retained_max, counter, 'retained.max'}, + {emqx_suboptions_count, gauge, 'suboptions.count'}, + {emqx_suboptions_max, gauge, 'suboptions.max'}, + {emqx_subscribers_count, gauge, 'subscribers.count'}, + {emqx_subscribers_max, gauge, 'subscribers.max'}, + {emqx_subscriptions_count, gauge, 'subscriptions.count'}, + {emqx_subscriptions_max, gauge, 'subscriptions.max'}, + {emqx_subscriptions_shared_count, gauge, 'subscriptions.shared.count'}, + {emqx_subscriptions_shared_max, gauge, 'subscriptions.shared.max'}, %% delayed - {emqx_delayed_count, counter, 'delayed.count'}, - {emqx_delayed_max, counter, 'delayed.max'} + {emqx_delayed_count, gauge, 'delayed.count'}, + {emqx_delayed_max, gauge, 'delayed.max'} ]. -stats_data() -> +stats_metric_cluster_consistened_meta() -> + [ + %% topics + {emqx_topics_max, gauge, 'topics.max'}, + {emqx_topics_count, gauge, 'topics.count'}, + %% retained + {emqx_retained_count, gauge, 'retained.count'}, + {emqx_retained_max, gauge, 'retained.max'} + ]. + +stats_data(Mode) -> + Stats = emqx_stats:getstats(), + lists:foldl( + fun({Name, _Type, MetricKAtom}, AccIn) -> + AccIn#{Name => [{with_node_label(Mode, []), ?C(MetricKAtom, Stats)}]} + end, + #{}, + stats_metric_meta() + ). + +stats_data_cluster_consistented() -> Stats = emqx_stats:getstats(), lists:foldl( fun({Name, _Type, MetricKAtom}, AccIn) -> AccIn#{Name => [{[], ?C(MetricKAtom, Stats)}]} end, #{}, - stats_metric_meta() + stats_metric_cluster_consistened_meta() ). %%======================================== @@ -521,11 +540,18 @@ vm_metric_meta() -> {emqx_vm_used_memory, gauge, 'used_memory'} ]. -vm_data() -> +vm_data(Mode) -> VmStats = emqx_mgmt:vm_stats(), lists:foldl( fun({Name, _Type, MetricKAtom}, AccIn) -> - AccIn#{Name => [{[], ?C(MetricKAtom, VmStats)}]} + Labels = + case Mode of + node -> + []; + _ -> + [{node, node(self())}] + end, + AccIn#{Name => [{Labels, ?C(MetricKAtom, VmStats)}]} end, #{}, vm_metric_meta() @@ -541,23 +567,23 @@ cluster_metric_meta() -> {emqx_cluster_nodes_stopped, gauge, undefined} ]. -cluster_data() -> +cluster_data(Mode) -> Running = emqx:cluster_nodes(running), Stopped = emqx:cluster_nodes(stopped), #{ - emqx_cluster_nodes_running => [{[], length(Running)}], - emqx_cluster_nodes_stopped => [{[], length(Stopped)}] + emqx_cluster_nodes_running => [{with_node_label(Mode, []), length(Running)}], + emqx_cluster_nodes_stopped => [{with_node_label(Mode, []), length(Stopped)}] }. %%======================================== %% Metrics %%======================================== -emqx_metric_data(MetricNameTypeKeyL) -> +emqx_metric_data(MetricNameTypeKeyL, Mode) -> Metrics = emqx_metrics:all(), lists:foldl( fun({Name, _Type, MetricKAtom}, AccIn) -> - AccIn#{Name => [{[], ?C(MetricKAtom, Metrics)}]} + AccIn#{Name => [{with_node_label(Mode, []), ?C(MetricKAtom, Metrics)}]} end, #{}, MetricNameTypeKeyL @@ -870,10 +896,25 @@ date_to_expiry_epoch(DateTime) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% merge / zip formatting funcs for type `application/json` +collect_stats_json_data(RawData) -> + StatsData = ?MG(stats_data, RawData), + StatsClData = ?MG(stats_data_cluster_consistented, RawData), + D = maps:merge(StatsData, StatsClData), + collect_json_data(D). + %% always return json array collect_cert_json_data(Data) -> collect_json_data_(Data). +collect_vm_json_data(Data) -> + DataListPerNode = collect_json_data_(Data), + case {?GET_PROM_DATA_MODE(), DataListPerNode} of + {?PROM_DATA_MODE__NODE, [NData | _]} -> + NData; + {_, _} -> + DataListPerNode + end. + collect_json_data(Data0) -> DataListPerNode = collect_json_data_(Data0), case {?GET_PROM_DATA_MODE(), DataListPerNode} of @@ -913,6 +954,13 @@ zip_json_prom_stats_metrics(Key, Points, AllResultedAcc) -> metrics_name(MetricsAll) -> [Name || {Name, _, _} <- MetricsAll]. +with_node_label(?PROM_DATA_MODE__NODE, Labels) -> + Labels; +with_node_label(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Labels) -> + Labels; +with_node_label(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, Labels) -> + [{node, node(self())} | Labels]. + %%-------------------------------------------------------------------- %% bpapi diff --git a/apps/emqx_prometheus/src/emqx_prometheus_auth.erl b/apps/emqx_prometheus/src/emqx_prometheus_auth.erl index 0d0607518..f9086cfb8 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_auth.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_auth.erl @@ -26,7 +26,7 @@ %% for bpapi -behaviour(emqx_prometheus_cluster). -export([ - fetch_data_from_local_node/0, + fetch_from_local_node/1, fetch_cluster_consistented_data/0, aggre_or_zip_init_acc/0, logic_sum_metrics/0 @@ -64,8 +64,8 @@ | emqx_authz_status | emqx_authz_nomatch | emqx_authz_total - | emqx_authz_success - | emqx_authz_failed. + | emqx_authz_allow + | emqx_authz_deny. %% Please don't remove this attribute, prometheus uses it to %% automatically register collectors. @@ -126,10 +126,10 @@ collect_metrics(Name, Metrics) -> collect_auth(Name, Metrics). %% behaviour -fetch_data_from_local_node() -> +fetch_from_local_node(Mode) -> {node(self()), #{ - authn_data => authn_data(), - authz_data => authz_data() + authn_data => authn_data(Mode), + authz_data => authz_data(Mode) }}. fetch_cluster_consistented_data() -> @@ -186,9 +186,9 @@ collect_auth(K = emqx_authz_nomatch, Data) -> counter_metrics(?MG(K, Data)); collect_auth(K = emqx_authz_total, Data) -> counter_metrics(?MG(K, Data)); -collect_auth(K = emqx_authz_success, Data) -> +collect_auth(K = emqx_authz_allow, Data) -> counter_metrics(?MG(K, Data)); -collect_auth(K = emqx_authz_failed, Data) -> +collect_auth(K = emqx_authz_deny, Data) -> counter_metrics(?MG(K, Data)); %%==================== %% Authz rules count @@ -224,38 +224,41 @@ authn_metric_meta() -> authn_metric(names) -> emqx_prometheus_cluster:metric_names(authn_metric_meta()). --spec authn_data() -> #{Key => [Point]} when +-spec authn_data(atom()) -> #{Key => [Point]} when Key :: authn_metric_name(), Point :: {[Label], Metric}, Label :: IdLabel, IdLabel :: {id, AuthnName :: binary()}, Metric :: number(). -authn_data() -> +authn_data(Mode) -> Authns = emqx_config:get([authentication]), lists:foldl( fun(Key, AccIn) -> - AccIn#{Key => authn_backend_to_points(Key, Authns)} + AccIn#{Key => authn_backend_to_points(Mode, Key, Authns)} end, #{}, authn_metric(names) ). --spec authn_backend_to_points(Key, list(Authn)) -> list(Point) when +-spec authn_backend_to_points(atom(), Key, list(Authn)) -> list(Point) when Key :: authn_metric_name(), Authn :: map(), Point :: {[Label], Metric}, Label :: IdLabel, IdLabel :: {id, AuthnName :: binary()}, Metric :: number(). -authn_backend_to_points(Key, Authns) -> - do_authn_backend_to_points(Key, Authns, []). +authn_backend_to_points(Mode, Key, Authns) -> + do_authn_backend_to_points(Mode, Key, Authns, []). -do_authn_backend_to_points(_K, [], AccIn) -> +do_authn_backend_to_points(_Mode, _K, [], AccIn) -> lists:reverse(AccIn); -do_authn_backend_to_points(K, [Authn | Rest], AccIn) -> +do_authn_backend_to_points(Mode, K, [Authn | Rest], AccIn) -> Id = authenticator_id(Authn), - Point = {[{id, Id}], do_metric(K, Authn, lookup_authn_metrics_local(Id))}, - do_authn_backend_to_points(K, Rest, [Point | AccIn]). + Point = { + with_node_label(Mode, [{id, Id}]), + do_metric(K, Authn, lookup_authn_metrics_local(Id)) + }, + do_authn_backend_to_points(Mode, K, Rest, [Point | AccIn]). lookup_authn_metrics_local(Id) -> case emqx_authn_api:lookup_from_local_node(?GLOBAL, Id) of @@ -310,45 +313,48 @@ authz_metric_meta() -> {emqx_authz_status, gauge}, {emqx_authz_nomatch, counter}, {emqx_authz_total, counter}, - {emqx_authz_success, counter}, - {emqx_authz_failed, counter} + {emqx_authz_allow, counter}, + {emqx_authz_deny, counter} ]. authz_metric(names) -> emqx_prometheus_cluster:metric_names(authz_metric_meta()). --spec authz_data() -> #{Key => [Point]} when +-spec authz_data(atom()) -> #{Key => [Point]} when Key :: authz_metric_name(), Point :: {[Label], Metric}, Label :: TypeLabel, TypeLabel :: {type, AuthZType :: binary()}, Metric :: number(). -authz_data() -> +authz_data(Mode) -> Authzs = emqx_config:get([authorization, sources]), lists:foldl( fun(Key, AccIn) -> - AccIn#{Key => authz_backend_to_points(Key, Authzs)} + AccIn#{Key => authz_backend_to_points(Mode, Key, Authzs)} end, #{}, authz_metric(names) ). --spec authz_backend_to_points(Key, list(Authz)) -> list(Point) when +-spec authz_backend_to_points(atom(), Key, list(Authz)) -> list(Point) when Key :: authz_metric_name(), Authz :: map(), Point :: {[Label], Metric}, Label :: TypeLabel, TypeLabel :: {type, AuthZType :: binary()}, Metric :: number(). -authz_backend_to_points(Key, Authzs) -> - do_authz_backend_to_points(Key, Authzs, []). +authz_backend_to_points(Mode, Key, Authzs) -> + do_authz_backend_to_points(Mode, Key, Authzs, []). -do_authz_backend_to_points(_K, [], AccIn) -> +do_authz_backend_to_points(_Mode, _K, [], AccIn) -> lists:reverse(AccIn); -do_authz_backend_to_points(K, [Authz | Rest], AccIn) -> +do_authz_backend_to_points(Mode, K, [Authz | Rest], AccIn) -> Type = maps:get(type, Authz), - Point = {[{type, Type}], do_metric(K, Authz, lookup_authz_metrics_local(Type))}, - do_authz_backend_to_points(K, Rest, [Point | AccIn]). + Point = { + with_node_label(Mode, [{type, Type}]), + do_metric(K, Authz, lookup_authz_metrics_local(Type)) + }, + do_authz_backend_to_points(Mode, K, Rest, [Point | AccIn]). lookup_authz_metrics_local(Type) -> case emqx_authz_api_sources:lookup_from_local_node(Type) of @@ -357,8 +363,8 @@ lookup_authz_metrics_local(Type) -> emqx_authz_status => emqx_prometheus_cluster:status_to_number(Status), emqx_authz_nomatch => ?MG0(nomatch, Counters), emqx_authz_total => ?MG0(total, Counters), - emqx_authz_success => ?MG0(success, Counters), - emqx_authz_failed => ?MG0(failed, Counters) + emqx_authz_allow => ?MG0(allow, Counters), + emqx_authz_deny => ?MG0(deny, Counters) }; {error, _Reason} -> maps:from_keys(authz_metric(names) -- [emqx_authz_enable], 0) @@ -479,5 +485,14 @@ mnesia_size(Tab) -> do_metric(emqx_authn_enable, #{enable := B}, _) -> emqx_prometheus_cluster:boolean_to_number(B); +do_metric(emqx_authz_enable, #{enable := B}, _) -> + emqx_prometheus_cluster:boolean_to_number(B); do_metric(K, _, Metrics) -> ?MG0(K, Metrics). + +with_node_label(?PROM_DATA_MODE__NODE, Labels) -> + Labels; +with_node_label(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Labels) -> + Labels; +with_node_label(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, Labels) -> + [{node, node(self())} | Labels]. diff --git a/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl b/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl index 2a68c7b3b..c4f1dc3b8 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl @@ -16,6 +16,7 @@ -module(emqx_prometheus_cluster). -include("emqx_prometheus.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -export([ raw_data/2, @@ -23,7 +24,7 @@ collect_json_data/2, aggre_cluster/3, - with_node_name_label/2, + %% with_node_name_label/2, point_to_map_fun/1, @@ -34,33 +35,35 @@ -callback fetch_cluster_consistented_data() -> map(). --callback fetch_data_from_local_node() -> {node(), map()}. +-callback fetch_from_local_node(atom()) -> {node(), map()}. -callback aggre_or_zip_init_acc() -> map(). +-callback logic_sum_metrics() -> list(). + -define(MG(K, MAP), maps:get(K, MAP)). -define(PG0(K, PROPLISTS), proplists:get_value(K, PROPLISTS, 0)). raw_data(Module, undefined) -> %% TODO: for push gateway, the format mode should be configurable raw_data(Module, ?PROM_DATA_MODE__NODE); -raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_AGGREGATED) -> - AllNodesMetrics = aggre_cluster(Module), +raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_AGGREGATED = Mode) -> + AllNodesMetrics = aggre_cluster(Module, Mode), Cluster = Module:fetch_cluster_consistented_data(), maps:merge(AllNodesMetrics, Cluster); -raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) -> - AllNodesMetrics = with_node_name_label(Module), +raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED = Mode) -> + AllNodesMetrics = zip_cluster_data(Module, Mode), Cluster = Module:fetch_cluster_consistented_data(), maps:merge(AllNodesMetrics, Cluster); -raw_data(Module, ?PROM_DATA_MODE__NODE) -> - {_Node, LocalNodeMetrics} = Module:fetch_data_from_local_node(), +raw_data(Module, ?PROM_DATA_MODE__NODE = Mode) -> + {_Node, LocalNodeMetrics} = Module:fetch_from_local_node(Mode), Cluster = Module:fetch_cluster_consistented_data(), maps:merge(LocalNodeMetrics, Cluster). -metrics_data_from_all_nodes(Module) -> +fetch_data_from_all_nodes(Module, Mode) -> Nodes = mria:running_nodes(), _ResL = emqx_prometheus_proto_v2:raw_prom_data( - Nodes, Module, fetch_data_from_local_node, [] + Nodes, Module, fetch_from_local_node, [Mode] ). collect_json_data(Data, Func) when is_function(Func, 3) -> @@ -74,17 +77,13 @@ collect_json_data(Data, Func) when is_function(Func, 3) -> collect_json_data(_, _) -> error(badarg). -aggre_cluster(Module) -> +aggre_cluster(Module, Mode) -> do_aggre_cluster( Module:logic_sum_metrics(), - metrics_data_from_all_nodes(Module), + fetch_data_from_all_nodes(Module, Mode), Module:aggre_or_zip_init_acc() ). -with_node_name_label(Module) -> - ResL = metrics_data_from_all_nodes(Module), - do_with_node_name_label(ResL, Module:aggre_or_zip_init_acc()). - aggre_cluster(LogicSumKs, ResL, Init) -> do_aggre_cluster(LogicSumKs, ResL, Init). @@ -119,61 +118,65 @@ aggre_metric(LogicSumKs, NodeMetrics, AccIn0) -> do_aggre_metric(K, LogicSumKs, NodeMetrics, AccL) -> lists:foldl( - fun({Labels, Metric}, AccIn) -> - NMetric = - case lists:member(K, LogicSumKs) of - true -> - logic_sum(Metric, ?PG0(Labels, AccIn)); - false -> - Metric + ?PG0(Labels, AccIn) - end, - [{Labels, NMetric} | AccIn] + fun(Point = {_Labels, _Metric}, AccIn) -> + sum(K, LogicSumKs, Point, AccIn) end, AccL, NodeMetrics ). -with_node_name_label(ResL, Init) -> - do_with_node_name_label(ResL, Init). +sum(K, LogicSumKs, {Labels, Metric} = Point, MetricAccL) -> + case lists:keytake(Labels, 1, MetricAccL) of + {value, {Labels, MetricAcc}, NMetricAccL} -> + NPoint = {Labels, do_sum(K, LogicSumKs, Metric, MetricAcc)}, + [NPoint | NMetricAccL]; + false -> + [Point | MetricAccL] + end. -do_with_node_name_label([], AccIn) -> +do_sum(K, LogicSumKs, Metric, MetricAcc) -> + case lists:member(K, LogicSumKs) of + true -> + logic_sum(Metric, MetricAcc); + false -> + Metric + MetricAcc + end. + +zip_cluster_data(Module, Mode) -> + zip_cluster( + fetch_data_from_all_nodes(Module, Mode), + Module:aggre_or_zip_init_acc() + ). + +zip_cluster([], AccIn) -> AccIn; -do_with_node_name_label([{ok, {NodeName, NodeMetric}} | Rest], AccIn) -> - do_with_node_name_label( +zip_cluster([{ok, {_NodeName, NodeMetric}} | Rest], AccIn) -> + zip_cluster( Rest, maps:fold( fun(K, V, AccIn0) -> AccIn0#{ - K => zip_with_node_name(NodeName, V, ?MG(K, AccIn0)) + K => do_zip_cluster(V, ?MG(K, AccIn0)) } end, AccIn, NodeMetric ) ); -do_with_node_name_label([{_, _} | Rest], AccIn) -> - do_with_node_name_label(Rest, AccIn). +zip_cluster([{_, _} | Rest], AccIn) -> + zip_cluster(Rest, AccIn). -zip_with_node_name(NodeName, NodeMetrics, AccIn0) -> +do_zip_cluster(NodeMetrics, AccIn0) -> lists:foldl( fun(K, AccIn) -> - NAccL = do_zip_with_node_name(NodeName, ?MG(K, NodeMetrics), ?MG(K, AccIn)), + AccMetricL = ?MG(K, AccIn), + NAccL = ?MG(K, NodeMetrics) ++ AccMetricL, AccIn#{K => NAccL} end, AccIn0, maps:keys(NodeMetrics) ). -do_zip_with_node_name(NodeName, NodeMetrics, AccL) -> - lists:foldl( - fun({Labels, Metric}, AccIn) -> - NLabels = [{node, NodeName} | Labels], - [{NLabels, Metric} | AccIn] - end, - AccL, - NodeMetrics - ). - point_to_map_fun(Key) -> fun({Lables, Metric}, AccIn2) -> LablesKVMap = maps:from_list(Lables), @@ -192,11 +195,11 @@ logic_sum(_, _) -> boolean_to_number(true) -> 1; boolean_to_number(false) -> 0. -status_to_number(connected) -> 1; -%% for auth -status_to_number(stopped) -> 0; -%% for data_integration -status_to_number(disconnected) -> 0. +status_to_number(?status_connected) -> 1; +status_to_number(?status_connecting) -> 0; +status_to_number(?status_disconnected) -> 0; +status_to_number(?rm_status_stopped) -> 0; +status_to_number(_) -> 0. metric_names(MetricWithType) when is_list(MetricWithType) -> [Name || {Name, _Type} <- MetricWithType]. diff --git a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl index 008a029a8..f3bc98f50 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl @@ -31,7 +31,7 @@ %% for bpapi -behaviour(emqx_prometheus_cluster). -export([ - fetch_data_from_local_node/0, + fetch_from_local_node/1, fetch_cluster_consistented_data/0, aggre_or_zip_init_acc/0, logic_sum_metrics/0 @@ -69,21 +69,24 @@ %% Callback for emqx_prometheus_cluster %%-------------------------------------------------------------------- -fetch_data_from_local_node() -> +-define(ROOT_KEY_ACTIONS, actions). + +fetch_from_local_node(Mode) -> Rules = emqx_rule_engine:get_rules(), - Bridges = emqx_bridge:list(), + Bridges = emqx_bridge_v2:list(?ROOT_KEY_ACTIONS), + Connectors = emqx_connector:list(), {node(self()), #{ - rule_metric_data => rule_metric_data(Rules), - action_metric_data => action_metric_data(Bridges), - connector_metric_data => connector_metric_data(Bridges) + rule_metric_data => rule_metric_data(Mode, Rules), + action_metric_data => action_metric_data(Mode, Bridges), + connector_metric_data => connector_metric_data(Mode, Connectors) }}. fetch_cluster_consistented_data() -> Rules = emqx_rule_engine:get_rules(), - Bridges = emqx_bridge:list(), + Connectors = emqx_connector:list(), (maybe_collect_schema_registry())#{ rules_ov_data => rules_ov_data(Rules), - connectors_ov_data => connectors_ov_data(Bridges) + connectors_ov_data => connectors_ov_data(Connectors) }. aggre_or_zip_init_acc() -> @@ -293,10 +296,10 @@ connectors_ov_metric_meta() -> connectors_ov_metric(names) -> emqx_prometheus_cluster:metric_names(connectors_ov_metric_meta()). -connectors_ov_data(Brdiges) -> +connectors_ov_data(Connectors) -> #{ %% Both Bridge V1 and V2 - emqx_connectors_count => erlang:length(Brdiges) + emqx_connectors_count => erlang:length(Connectors) }. %%======================================== @@ -325,26 +328,26 @@ rule_metric_meta() -> rule_metric(names) -> emqx_prometheus_cluster:metric_names(rule_metric_meta()). -rule_metric_data(Rules) -> +rule_metric_data(Mode, Rules) -> lists:foldl( fun(#{id := Id} = Rule, AccIn) -> - merge_acc_with_rules(Id, get_metric(Rule), AccIn) + merge_acc_with_rules(Mode, Id, get_metric(Rule), AccIn) end, maps:from_keys(rule_metric(names), []), Rules ). -merge_acc_with_rules(Id, RuleMetrics, PointsAcc) -> +merge_acc_with_rules(Mode, Id, RuleMetrics, PointsAcc) -> maps:fold( fun(K, V, AccIn) -> - AccIn#{K => [rule_point(Id, V) | ?MG(K, AccIn)]} + AccIn#{K => [rule_point(Mode, Id, V) | ?MG(K, AccIn)]} end, PointsAcc, RuleMetrics ). -rule_point(Id, V) -> - {[{id, Id}], V}. +rule_point(Mode, Id, V) -> + {with_node_label(Mode, [{id, Id}]), V}. get_metric(#{id := Id, enable := Bool} = _Rule) -> case emqx_metrics_worker:get_metrics(rule_metrics, Id) of @@ -393,52 +396,48 @@ action_metric_meta() -> action_metric(names) -> emqx_prometheus_cluster:metric_names(action_metric_meta()). -action_metric_data(Bridges) -> +action_metric_data(Mode, Bridges) -> lists:foldl( fun(#{type := Type, name := Name} = _Bridge, AccIn) -> Id = emqx_bridge_resource:bridge_id(Type, Name), - merge_acc_with_bridges(Id, get_bridge_metric(Type, Name), AccIn) + merge_acc_with_bridges(Mode, Id, get_bridge_metric(Type, Name), AccIn) end, maps:from_keys(action_metric(names), []), Bridges ). -merge_acc_with_bridges(Id, BridgeMetrics, PointsAcc) -> +merge_acc_with_bridges(Mode, Id, BridgeMetrics, PointsAcc) -> maps:fold( fun(K, V, AccIn) -> - AccIn#{K => [action_point(Id, V) | ?MG(K, AccIn)]} + AccIn#{K => [action_point(Mode, Id, V) | ?MG(K, AccIn)]} end, PointsAcc, BridgeMetrics ). -action_point(Id, V) -> - {[{id, Id}], V}. +action_point(Mode, Id, V) -> + {with_node_label(Mode, [{id, Id}]), V}. get_bridge_metric(Type, Name) -> - case emqx_bridge:get_metrics(Type, Name) of - #{counters := Counters, gauges := Gauges} -> - #{ - emqx_action_matched => ?MG0(matched, Counters), - emqx_action_dropped => ?MG0(dropped, Counters), - emqx_action_success => ?MG0(success, Counters), - emqx_action_failed => ?MG0(failed, Counters), - emqx_action_inflight => ?MG0(inflight, Gauges), - emqx_action_received => ?MG0(received, Counters), - emqx_action_late_reply => ?MG0(late_reply, Counters), - emqx_action_retried => ?MG0(retried, Counters), - emqx_action_retried_success => ?MG0('retried.success', Counters), - emqx_action_retried_failed => ?MG0('retried.failed', Counters), - emqx_action_dropped_resource_stopped => ?MG0('dropped.resource_stopped', Counters), - emqx_action_dropped_resource_not_found => ?MG0( - 'dropped.resource_not_found', Counters - ), - emqx_action_dropped_queue_full => ?MG0('dropped.queue_full', Counters), - emqx_action_dropped_other => ?MG0('dropped.other', Counters), - emqx_action_dropped_expired => ?MG0('dropped.expired', Counters), - emqx_action_queuing => ?MG0(queuing, Gauges) - } - end. + #{counters := Counters, gauges := Gauges} = emqx_bridge_v2:get_metrics(Type, Name), + #{ + emqx_action_matched => ?MG0(matched, Counters), + emqx_action_dropped => ?MG0(dropped, Counters), + emqx_action_success => ?MG0(success, Counters), + emqx_action_failed => ?MG0(failed, Counters), + emqx_action_inflight => ?MG0(inflight, Gauges), + emqx_action_received => ?MG0(received, Counters), + emqx_action_late_reply => ?MG0(late_reply, Counters), + emqx_action_retried => ?MG0(retried, Counters), + emqx_action_retried_success => ?MG0('retried.success', Counters), + emqx_action_retried_failed => ?MG0('retried.failed', Counters), + emqx_action_dropped_resource_stopped => ?MG0('dropped.resource_stopped', Counters), + emqx_action_dropped_resource_not_found => ?MG0('dropped.resource_not_found', Counters), + emqx_action_dropped_queue_full => ?MG0('dropped.queue_full', Counters), + emqx_action_dropped_other => ?MG0('dropped.other', Counters), + emqx_action_dropped_expired => ?MG0('dropped.expired', Counters), + emqx_action_queuing => ?MG0(queuing, Gauges) + }. %%==================== %% Connector Metric @@ -453,29 +452,29 @@ connector_metric_meta() -> connectr_metric(names) -> emqx_prometheus_cluster:metric_names(connector_metric_meta()). -connector_metric_data(Bridges) -> +connector_metric_data(Mode, Connectors) -> lists:foldl( - fun(#{type := Type, name := Name} = Bridge, AccIn) -> - Id = emqx_bridge_resource:bridge_id(Type, Name), - merge_acc_with_connectors(Id, get_connector_status(Bridge), AccIn) + fun(#{type := Type, name := Name} = Connector, AccIn) -> + Id = emqx_connector_resource:connector_id(Type, Name), + merge_acc_with_connectors(Mode, Id, get_connector_status(Connector), AccIn) end, maps:from_keys(connectr_metric(names), []), - Bridges + Connectors ). -merge_acc_with_connectors(Id, ConnectorMetrics, PointsAcc) -> +merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) -> maps:fold( fun(K, V, AccIn) -> - AccIn#{K => [connector_point(Id, V) | ?MG(K, AccIn)]} + AccIn#{K => [connector_point(Mode, Id, V) | ?MG(K, AccIn)]} end, PointsAcc, ConnectorMetrics ). -connector_point(Id, V) -> - {[{id, Id}], V}. +connector_point(Mode, Id, V) -> + {with_node_label(Mode, [{id, Id}]), V}. -get_connector_status(#{resource_data := ResourceData} = _Bridge) -> +get_connector_status(#{resource_data := ResourceData} = _Connector) -> Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData), Status = ?MG(status, ResourceData), #{ @@ -532,3 +531,13 @@ zip_json_data_integration_metrics(Key, Points, [] = _AccIn) -> zip_json_data_integration_metrics(Key, Points, AllResultedAcc) -> ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points), lists:zipwith(fun maps:merge/2, AllResultedAcc, ThisKeyResult). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Helper funcs + +with_node_label(?PROM_DATA_MODE__NODE, Labels) -> + Labels; +with_node_label(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Labels) -> + Labels; +with_node_label(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, Labels) -> + [{node, node(self())} | Labels].