diff --git a/apps/emqx_prometheus/src/emqx_prometheus_auth.erl b/apps/emqx_prometheus/src/emqx_prometheus_auth.erl index 02010aaf7..5fa9057da 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_auth.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_auth.erl @@ -24,8 +24,12 @@ -export([collect/1]). %% for bpapi +-behaviour(emqx_prometheus_cluster). -export([ - fetch_metric_data_from_local_node/0 + fetch_data_from_local_node/0, + fetch_cluster_consistented_data/0, + aggre_or_zip_init_acc/0, + logic_sum_metrics/0 ]). %% %% @private @@ -127,7 +131,7 @@ deregister_cleanup(_) -> ok. Callback :: prometheus_collector:collect_mf_callback(). %% erlfmt-ignore collect_mf(?PROMETHEUS_AUTH_REGISTRY, Callback) -> - RawData = raw_data(?GET_PROM_DATA_MODE()), + RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), ok = add_collect_family(Callback, ?AUTHNS_WITH_TYPE, ?MG(authn, RawData)), ok = add_collect_family(Callback, ?AUTHN_USERS_COUNT_WITH_TYPE, ?MG(authn_users_count, RawData)), ok = add_collect_family(Callback, ?AUTHZS_WITH_TYPE, ?MG(authz, RawData)), @@ -139,8 +143,7 @@ collect_mf(_, _) -> %% @private collect(<<"json">>) -> - RawData = raw_data(?GET_PROM_DATA_MODE()), - %% TODO: merge node name in json format + RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), #{ emqx_authn => collect_json_data(?MG(authn, RawData)), emqx_authz => collect_json_data(?MG(authz, RawData)), @@ -159,37 +162,28 @@ add_collect_family(Name, Data, Callback, Type) -> collect_metrics(Name, Metrics) -> collect_auth(Name, Metrics). -%% @private -fetch_metric_data_from_local_node() -> +%% behaviour +fetch_data_from_local_node() -> {node(self()), #{ authn => authn_data(), authz => authz_data() }}. -fetch_cluster_consistented_metric_data() -> +fetch_cluster_consistented_data() -> #{ authn_users_count => authn_users_count_data(), authz_rules_count => authz_rules_count_data(), banned_count => banned_count_data() }. -%% raw data for different format modes -raw_data(?PROM_DATA_MODE__ALL_NODES_AGGREGATED) -> - AggregatedNodesMetrics = aggre_cluster(all_nodes_metrics()), - maps:merge(AggregatedNodesMetrics, fetch_cluster_consistented_metric_data()); -raw_data(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) -> - %% then fold from all nodes - AllNodesMetrics = with_node_name_label(all_nodes_metrics()), - maps:merge(AllNodesMetrics, fetch_cluster_consistented_metric_data()); -raw_data(?PROM_DATA_MODE__NODE) -> - {_Node, LocalNodeMetrics} = fetch_metric_data_from_local_node(), - maps:merge(LocalNodeMetrics, fetch_cluster_consistented_metric_data()). +aggre_or_zip_init_acc() -> + #{ + authn => maps:from_keys(authn_metric_names(), []), + authz => maps:from_keys(authz_metric_names(), []) + }. -all_nodes_metrics() -> - Nodes = mria:running_nodes(), - _ResL = emqx_prometheus_proto_v2:raw_prom_data( - Nodes, ?MODULE, fetch_metric_data_from_local_node, [] - ). +logic_sum_metrics() -> + ?LOGICAL_SUM_METRIC_NAMES. %%-------------------------------------------------------------------- %% Collector @@ -286,7 +280,7 @@ lookup_authn_metrics_local(Id) -> case emqx_authn_api:lookup_from_local_node(?GLOBAL, Id) of {ok, {_Node, Status, #{counters := Counters}, _ResourceMetrics}} -> #{ - emqx_authn_status => emqx_prometheus_utils:status_to_number(Status), + emqx_authn_status => emqx_prometheus_cluster:status_to_number(Status), emqx_authn_nomatch => ?MG0(nomatch, Counters), emqx_authn_total => ?MG0(total, Counters), emqx_authn_success => ?MG0(success, Counters), @@ -297,7 +291,7 @@ lookup_authn_metrics_local(Id) -> end. authn_metric_names() -> - emqx_prometheus_utils:metric_names(?AUTHNS_WITH_TYPE). + emqx_prometheus_cluster:metric_names(?AUTHNS_WITH_TYPE). %%==================== %% Authn users count @@ -364,7 +358,7 @@ lookup_authz_metrics_local(Type) -> case emqx_authz_api_sources:lookup_from_local_node(Type) of {ok, {_Node, Status, #{counters := Counters}, _ResourceMetrics}} -> #{ - emqx_authz_status => emqx_prometheus_utils:status_to_number(Status), + emqx_authz_status => emqx_prometheus_cluster:status_to_number(Status), emqx_authz_nomatch => ?MG0(nomatch, Counters), emqx_authz_total => ?MG0(total, Counters), emqx_authz_success => ?MG0(success, Counters), @@ -375,7 +369,7 @@ lookup_authz_metrics_local(Type) -> end. authz_metric_names() -> - emqx_prometheus_utils:metric_names(?AUTHZS_WITH_TYPE). + emqx_prometheus_cluster:metric_names(?AUTHZS_WITH_TYPE). %%==================== %% Authz rules count @@ -418,7 +412,7 @@ banned_count_data() -> %% merge / zip formatting funcs for type `application/json` collect_json_data(Data) -> - emqx_prometheus_utils:collect_json_data(Data, fun zip_json_auth_metrics/3). + emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_auth_metrics/3). collect_banned_data() -> #{emqx_banned_count => banned_count_data()}. @@ -440,7 +434,7 @@ zip_json_auth_metrics(Key, Points, [] = _AccIn) -> Points ); zip_json_auth_metrics(Key, Points, AllResultedAcc) -> - ThisKeyResult = lists:foldl(emqx_prometheus_utils:point_to_map_fun(Key), [], Points), + ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points), lists:zipwith(fun maps:merge/2, AllResultedAcc, ThisKeyResult). users_or_rule_count(#{id := Id}) -> @@ -462,14 +456,6 @@ users_or_rule_count(#{type := Type}) -> users_or_rule_count(_) -> #{}. -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% merge / zip formatting funcs for type `text/plain` -aggre_cluster(ResL) -> - emqx_prometheus_utils:aggre_cluster(?LOGICAL_SUM_METRIC_NAMES, ResL, aggre_or_zip_init_acc()). - -with_node_name_label(ResL) -> - emqx_prometheus_utils:with_node_name_label(ResL, aggre_or_zip_init_acc()). - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Helper funcs @@ -488,12 +474,6 @@ mnesia_size(Tab) -> mnesia:table_info(Tab, size). do_metric(emqx_authn_enable, #{enable := B}, _) -> - emqx_prometheus_utils:boolean_to_number(B); + emqx_prometheus_cluster:boolean_to_number(B); do_metric(K, _, Metrics) -> ?MG0(K, Metrics). - -aggre_or_zip_init_acc() -> - #{ - authn => maps:from_keys(authn_metric_names(), []), - authz => maps:from_keys(authz_metric_names(), []) - }. diff --git a/apps/emqx_prometheus/src/emqx_prometheus_utils.erl b/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl similarity index 76% rename from apps/emqx_prometheus/src/emqx_prometheus_utils.erl rename to apps/emqx_prometheus/src/emqx_prometheus_cluster.erl index fadfb5c47..e48df0f8b 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_utils.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl @@ -13,9 +13,13 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_prometheus_utils). +-module(emqx_prometheus_cluster). + +-include("emqx_prometheus.hrl"). -export([ + raw_data/2, + collect_json_data/2, aggre_cluster/3, @@ -28,9 +32,34 @@ metric_names/1 ]). +-callback fetch_cluster_consistented_data() -> map(). + +-callback fetch_data_from_local_node() -> {node(), map()}. + +-callback aggre_or_zip_init_acc() -> map(). + -define(MG(K, MAP), maps:get(K, MAP)). -define(PG0(K, PROPLISTS), proplists:get_value(K, PROPLISTS, 0)). +raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_AGGREGATED) -> + AllNodesMetrics = aggre_cluster(Module), + Cluster = Module:fetch_cluster_consistented_data(), + maps:merge(AllNodesMetrics, Cluster); +raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) -> + AllNodesMetrics = with_node_name_label(Module), + Cluster = Module:fetch_cluster_consistented_data(), + maps:merge(AllNodesMetrics, Cluster); +raw_data(Module, ?PROM_DATA_MODE__NODE) -> + {_Node, LocalNodeMetrics} = Module:fetch_data_from_local_node(), + Cluster = Module:fetch_cluster_consistented_data(), + maps:merge(LocalNodeMetrics, Cluster). + +metrics_data_from_all_nodes(Module) -> + Nodes = mria:running_nodes(), + _ResL = emqx_prometheus_proto_v2:raw_prom_data( + Nodes, Module, fetch_data_from_local_node, [] + ). + collect_json_data(Data, Func) when is_function(Func, 3) -> maps:fold( fun(K, V, Acc) -> @@ -42,6 +71,17 @@ collect_json_data(Data, Func) when is_function(Func, 3) -> collect_json_data(_, _) -> error(badarg). +aggre_cluster(Module) -> + do_aggre_cluster( + Module:logic_sum_metrics(), + metrics_data_from_all_nodes(Module), + Module:aggre_or_zip_init_acc() + ). + +with_node_name_label(Module) -> + ResL = metrics_data_from_all_nodes(Module), + do_with_node_name_label(ResL, Module:aggre_or_zip_init_acc()). + aggre_cluster(LogicSumKs, ResL, Init) -> do_aggre_cluster(LogicSumKs, ResL, Init). @@ -58,7 +98,6 @@ do_aggre_cluster(LogicSumKs, [{ok, {_NodeName, NodeMetric}} | Rest], AccIn) -> AccIn, NodeMetric ) - %% merge_node_and_acc() ); do_aggre_cluster(LogicSumKs, [{_, _} | Rest], AccIn) -> do_aggre_cluster(LogicSumKs, Rest, AccIn). diff --git a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl index 729e1f640..bfd011eaa 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl @@ -29,8 +29,12 @@ ]). %% for bpapi +-behaviour(emqx_prometheus_cluster). -export([ - fetch_metric_data_from_local_node/0 + fetch_data_from_local_node/0, + fetch_cluster_consistented_data/0, + aggre_or_zip_init_acc/0, + logic_sum_metrics/0 ]). -export([add_collect_family/4]). @@ -120,6 +124,37 @@ emqx_connector_status ]). +%%-------------------------------------------------------------------- +%% Callback for emqx_prometheus_cluster +%%-------------------------------------------------------------------- + +fetch_data_from_local_node() -> + Rules = emqx_rule_engine:get_rules(), + Bridges = emqx_bridge:list(), + {node(self()), #{ + rule_specific_data => rule_specific_data(Rules), + action_specific_data => action_specific_data(Bridges), + connector_specific_data => connector_specific_data(Bridges) + }}. + +fetch_cluster_consistented_data() -> + Rules = emqx_rule_engine:get_rules(), + Bridges = emqx_bridge:list(), + (maybe_collect_schema_registry())#{ + rules_data => rules_data(Rules), + connectors_data => connectors_data(Bridges) + }. + +aggre_or_zip_init_acc() -> + #{ + rule_specific_data => maps:from_keys(rule_specific_metric_names(), []), + action_specific_data => maps:from_keys(action_specific_metric_names(), []), + connector_specific_data => maps:from_keys(connectr_specific_metric_names(), []) + }. + +logic_sum_metrics() -> + ?LOGICAL_SUM_METRIC_NAMES. + %%-------------------------------------------------------------------- %% Collector API %%-------------------------------------------------------------------- @@ -132,7 +167,7 @@ deregister_cleanup(_) -> ok. _Registry :: prometheus_registry:registry(), Callback :: prometheus_collector:collect_mf_callback(). collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) -> - RawData = raw_data(?GET_PROM_DATA_MODE()), + RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), %% Data Integration Overview ok = add_collect_family(Callback, ?RULES_WITH_TYPE, ?MG(rules_data, RawData)), @@ -157,7 +192,7 @@ collect_mf(_, _) -> %% @private collect(<<"json">>) -> - RawData = raw_data(?GET_PROM_DATA_MODE()), + RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), Rules = emqx_rule_engine:get_rules(), Bridges = emqx_bridge:list(), #{ @@ -183,24 +218,6 @@ add_collect_family(Name, Data, Callback, Type) -> collect_metrics(Name, Metrics) -> collect_di(Name, Metrics). -%% @private -fetch_metric_data_from_local_node() -> - Rules = emqx_rule_engine:get_rules(), - Bridges = emqx_bridge:list(), - {node(self()), #{ - rule_specific_data => rule_specific_data(Rules), - action_specific_data => action_specific_data(Bridges), - connector_specific_data => connector_specific_data(Bridges) - }}. - -fetch_cluster_consistented_metric_data() -> - Rules = emqx_rule_engine:get_rules(), - Bridges = emqx_bridge:list(), - (maybe_collect_schema_registry())#{ - rules_data => rules_data(Rules), - connectors_data => connectors_data(Bridges) - }. - -if(?EMQX_RELEASE_EDITION == ee). maybe_collect_family_schema_registry(Callback) -> ok = add_collect_family(Callback, ?SCHEMA_REGISTRY_WITH_TYPE, schema_registry_data()), @@ -216,24 +233,6 @@ maybe_collect_schema_registry() -> #{}. -endif. -%% raw data for different format modes -raw_data(?PROM_DATA_MODE__ALL_NODES_AGGREGATED) -> - AggregatedNodesMetrics = aggre_cluster(metrics_data_from_all_nodes()), - maps:merge(AggregatedNodesMetrics, fetch_cluster_consistented_metric_data()); -raw_data(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) -> - %% then fold from all nodes - AllNodesMetrics = with_node_name_label(metrics_data_from_all_nodes()), - maps:merge(AllNodesMetrics, fetch_cluster_consistented_metric_data()); -raw_data(?PROM_DATA_MODE__NODE) -> - {_Node, LocalNodeMetrics} = fetch_metric_data_from_local_node(), - maps:merge(LocalNodeMetrics, fetch_cluster_consistented_metric_data()). - -metrics_data_from_all_nodes() -> - Nodes = mria:running_nodes(), - _ResL = emqx_prometheus_proto_v2:raw_prom_data( - Nodes, ?MODULE, fetch_metric_data_from_local_node, [] - ). - %%-------------------------------------------------------------------- %% Collector %%-------------------------------------------------------------------- @@ -398,7 +397,7 @@ get_metric(#{id := Id, enable := Bool} = _Rule) -> case emqx_metrics_worker:get_metrics(rule_metrics, Id) of #{counters := Counters} -> #{ - emqx_rule_enable => emqx_prometheus_utils:boolean_to_number(Bool), + emqx_rule_enable => emqx_prometheus_cluster:boolean_to_number(Bool), emqx_rule_matched => ?MG(matched, Counters), emqx_rule_failed => ?MG(failed, Counters), emqx_rule_passed => ?MG(passed, Counters), @@ -415,7 +414,7 @@ get_metric(#{id := Id, enable := Bool} = _Rule) -> end. rule_specific_metric_names() -> - emqx_prometheus_utils:metric_names(?RULES_SPECIFIC_WITH_TYPE). + emqx_prometheus_cluster:metric_names(?RULES_SPECIFIC_WITH_TYPE). %%==================== %% Specific Action @@ -469,7 +468,7 @@ get_bridge_metric(Type, Name) -> end. action_specific_metric_names() -> - emqx_prometheus_utils:metric_names(?ACTION_SPECIFIC_WITH_TYPE). + emqx_prometheus_cluster:metric_names(?ACTION_SPECIFIC_WITH_TYPE). %%==================== %% Specific Connector @@ -501,12 +500,12 @@ get_connector_status(#{resource_data := ResourceData} = _Bridge) -> Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData), Status = ?MG(status, ResourceData), #{ - emqx_connector_enable => emqx_prometheus_utils:boolean_to_number(Enabled), - emqx_connector_status => emqx_prometheus_utils:status_to_number(Status) + emqx_connector_enable => emqx_prometheus_cluster:boolean_to_number(Enabled), + emqx_connector_status => emqx_prometheus_cluster:status_to_number(Status) }. connectr_specific_metric_names() -> - emqx_prometheus_utils:metric_names(?CONNECTOR_SPECIFIC_WITH_TYPE). + emqx_prometheus_cluster:metric_names(?CONNECTOR_SPECIFIC_WITH_TYPE). %%-------------------------------------------------------------------- %% Collect functions @@ -521,19 +520,19 @@ collect_data_integration_overview(Rules, Bridges) -> M1 = lists:foldl( fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end, #{}, - emqx_prometheus_utils:metric_names(?RULES_WITH_TYPE) + emqx_prometheus_cluster:metric_names(?RULES_WITH_TYPE) ), M2 = lists:foldl( fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end, #{}, - emqx_prometheus_utils:metric_names(?CONNECTORS_WITH_TYPE) + emqx_prometheus_cluster:metric_names(?CONNECTORS_WITH_TYPE) ), M3 = maybe_collect_schema_registry(), lists:foldl(fun(M, AccIn) -> maps:merge(M, AccIn) end, #{}, [M1, M2, M3]). collect_json_data(Data) -> - emqx_prometheus_utils:collect_json_data(Data, fun zip_json_data_integration_metrics/3). + emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_data_integration_metrics/3). %% for initialized empty AccIn %% The following fields will be put into Result @@ -555,23 +554,5 @@ zip_json_data_integration_metrics(Key, Points, [] = _AccIn) -> Points ); zip_json_data_integration_metrics(Key, Points, AllResultedAcc) -> - ThisKeyResult = lists:foldl(emqx_prometheus_utils:point_to_map_fun(Key), [], Points), + ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points), lists:zipwith(fun maps:merge/2, AllResultedAcc, ThisKeyResult). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% merge / zip formatting funcs for type `text/plain` -aggre_cluster(ResL) -> - emqx_prometheus_utils:aggre_cluster(?LOGICAL_SUM_METRIC_NAMES, ResL, aggre_or_zip_init_acc()). - -with_node_name_label(ResL) -> - emqx_prometheus_utils:with_node_name_label(ResL, aggre_or_zip_init_acc()). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% Helper funcs - -aggre_or_zip_init_acc() -> - #{ - rule_specific_data => maps:from_keys(rule_specific_metric_names(), []), - action_specific_data => maps:from_keys(action_specific_metric_names(), []), - connector_specific_data => maps:from_keys(connectr_specific_metric_names(), []) - }.