From 58b60181d791a14e3d81f1e7428bbb6200b4fd53 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 25 Jan 2024 17:22:34 +0800 Subject: [PATCH 1/4] fix(prom_mria): move mria callback into `emqx_prometheus.erl` - they will return in same api endpoint. --- Makefile | 2 +- apps/emqx_conf/src/emqx_conf_schema.erl | 3 +- apps/emqx_prometheus/src/emqx_prometheus.erl | 93 +++++++++++++++- .../src/emqx_prometheus_mria.erl | 102 ------------------ 4 files changed, 93 insertions(+), 107 deletions(-) delete mode 100644 apps/emqx_prometheus/src/emqx_prometheus_mria.erl diff --git a/Makefile b/Makefile index 0ccd48410..f2bb688c4 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ endif # Dashboard version # from https://github.com/emqx/emqx-dashboard5 export EMQX_DASHBOARD_VERSION ?= v1.6.1 -export EMQX_EE_DASHBOARD_VERSION ?= e1.5.0-beta.8 +export EMQX_EE_DASHBOARD_VERSION ?= e1.5.0-beta.10 PROFILE ?= emqx REL_PROFILES := emqx emqx-enterprise diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index abb2e14e3..bc0c97fee 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -1107,8 +1107,7 @@ tr_prometheus_collectors(Conf) -> %% emqx collectors emqx_prometheus, {'/prometheus/auth', emqx_prometheus_auth}, - {'/prometheus/data_integration', emqx_prometheus_data_integration}, - emqx_prometheus_mria + {'/prometheus/data_integration', emqx_prometheus_data_integration} %% builtin vm collectors | prometheus_collectors(Conf) ]. diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index ffa113a23..f9eddfedb 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -82,6 +82,7 @@ %%-------------------------------------------------------------------- -define(MG(K, MAP), maps:get(K, MAP)). +-define(MG(K, MAP, DEFAULT), maps:get(K, MAP, DEFAULT)). -define(MG0(K, MAP), maps:get(K, MAP, 0)). -define(C(K, L), proplists:get_value(K, L, 0)). @@ -210,6 +211,7 @@ collect_mf(?PROMETHEUS_DEFAULT_REGISTRY, Callback) -> ok = add_collect_family(Callback, authn_metric_meta(), ?MG(emqx_authn_data, RawData)), ok = add_collect_family(Callback, cert_metric_meta(), ?MG(cert_data, RawData)), + ok = add_collect_family(Callback, mria_metric_meta(), ?MG(mria_data, RawData)), ok = maybe_license_add_collect_family(Callback, RawData), ok; collect_mf(_Registry, _Callback) -> @@ -259,7 +261,8 @@ fetch_from_local_node(Mode) -> emqx_session_data => emqx_metric_data(session_metric_meta(), Mode), emqx_olp_data => emqx_metric_data(olp_metric_meta(), Mode), emqx_acl_data => emqx_metric_data(acl_metric_meta(), Mode), - emqx_authn_data => emqx_metric_data(authn_metric_meta(), Mode) + emqx_authn_data => emqx_metric_data(authn_metric_meta(), Mode), + mria_data => mria_data(Mode) }}. fetch_cluster_consistented_data() -> @@ -460,7 +463,19 @@ emqx_collect(K = emqx_authentication_failure, D) -> counter_metrics(?MG(K, D)); emqx_collect(K = emqx_license_expiry_at, D) -> gauge_metric(?MG(K, D)); %%-------------------------------------------------------------------- %% Certs -emqx_collect(K = emqx_cert_expiry_at, D) -> gauge_metrics(?MG(K, D)). +emqx_collect(K = emqx_cert_expiry_at, D) -> gauge_metrics(?MG(K, D)); +%% Mria +%% ========== core +emqx_collect(K = emqx_mria_last_intercepted_trans, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_mria_weight, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_mria_replicants, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_mria_server_mql, D) -> gauge_metrics(?MG(K, D, [])); +%% ========== replicant +emqx_collect(K = emqx_mria_lag, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_mria_bootstrap_time, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_mria_bootstrap_num_keys, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_mria_message_queue_len, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_mria_replayq_len, D) -> gauge_metrics(?MG(K, D, [])). %%-------------------------------------------------------------------- %% Indicators @@ -889,6 +904,80 @@ utc_time_to_datetime(Str) -> date_to_expiry_epoch(DateTime) -> calendar:datetime_to_gregorian_seconds(DateTime) - ?EPOCH_START. +%%======================================== +%% Mria +%%======================================== + +mria_metric_meta() -> + mria_metric_meta(core) ++ mria_metric_meta(replicant). + +mria_metric_meta(core) -> + [ + {emqx_mria_last_intercepted_trans, gauge, last_intercepted_trans}, + {emqx_mria_weight, gauge, weight}, + {emqx_mria_replicants, gauge, replicants}, + {emqx_mria_server_mql, gauge, server_mql} + ]; +mria_metric_meta(replicant) -> + [ + {emqx_mria_lag, gauge, lag}, + {emqx_mria_bootstrap_time, gauge, bootstrap_time}, + {emqx_mria_bootstrap_num_keys, gauge, bootstrap_num_keys}, + {emqx_mria_message_queue_len, gauge, message_queue_len}, + {emqx_mria_replayq_len, gauge, replayq_len} + ]. + +mria_data(Mode) -> + case mria_rlog:backend() of + rlog -> + mria_data(mria_rlog:role(), Mode); + mnesia -> + #{} + end. + +mria_data(Role, Mode) -> + lists:foldl( + fun({Name, _Type, MetricK}, AccIn) -> + %% TODO: only report shards that are up + DataFun = fun() -> get_shard_metrics(Mode, MetricK) end, + AccIn#{ + Name => catch_all(DataFun) + } + end, + #{}, + mria_metric_meta(Role) + ). + +get_shard_metrics(Mode, MetricK) -> + Labels = + case Mode of + node -> + []; + _ -> + [{node, node(self())}] + end, + [ + {[{shard, Shard} | Labels], get_shard_metric(MetricK, Shard)} + || Shard <- mria_schema:shards(), Shard =/= undefined + ]. + +get_shard_metric(replicants, Shard) -> + length(mria_status:agents(Shard)); +get_shard_metric(Metric, Shard) -> + case mria_status:get_shard_stats(Shard) of + #{Metric := Value} when is_number(Value) -> + Value; + _ -> + undefined + end. + +catch_all(DataFun) -> + try + DataFun() + catch + _:_ -> undefined + end. + %%-------------------------------------------------------------------- %% Collect functions %%-------------------------------------------------------------------- diff --git a/apps/emqx_prometheus/src/emqx_prometheus_mria.erl b/apps/emqx_prometheus/src/emqx_prometheus_mria.erl deleted file mode 100644 index e08e2c405..000000000 --- a/apps/emqx_prometheus/src/emqx_prometheus_mria.erl +++ /dev/null @@ -1,102 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- --module(emqx_prometheus_mria). - --export([ - deregister_cleanup/1, - collect_mf/2 -]). - --include_lib("prometheus/include/prometheus.hrl"). - -%% Please don't remove this attribute, prometheus uses it to -%% automatically register collectors. --behaviour(prometheus_collector). - -%%==================================================================== -%% Macros -%%==================================================================== - --define(METRIC_NAME_PREFIX, "emqx_mria_"). - -%%==================================================================== -%% Collector API -%%==================================================================== - -%% @private -deregister_cleanup(_) -> ok. - -%% @private --spec collect_mf(_Registry, Callback) -> ok when - _Registry :: prometheus_registry:registry(), - Callback :: prometheus_collector:collect_mf_callback(). -collect_mf(_Registry, Callback) -> - case mria_rlog:backend() of - rlog -> - Metrics = metrics(), - _ = [add_metric_family(Metric, Callback) || Metric <- Metrics], - ok; - mnesia -> - ok - end. - -add_metric_family({Name, Metrics}, Callback) -> - Callback( - prometheus_model_helpers:create_mf( - ?METRIC_NAME(Name), - <<"">>, - gauge, - catch_all(Metrics) - ) - ). - -%%==================================================================== -%% Internal functions -%%==================================================================== - -metrics() -> - Metrics = - case mria_rlog:role() of - replicant -> - [lag, bootstrap_time, bootstrap_num_keys, message_queue_len, replayq_len]; - core -> - [last_intercepted_trans, weight, replicants, server_mql] - end, - [{MetricId, fun() -> get_shard_metric(MetricId) end} || MetricId <- Metrics]. - -get_shard_metric(Metric) -> - %% TODO: only report shards that are up - [ - {[{shard, Shard}], get_shard_metric(Metric, Shard)} - || Shard <- mria_schema:shards(), Shard =/= undefined - ]. - -get_shard_metric(replicants, Shard) -> - length(mria_status:agents(Shard)); -get_shard_metric(Metric, Shard) -> - case mria_status:get_shard_stats(Shard) of - #{Metric := Value} when is_number(Value) -> - Value; - _ -> - undefined - end. - -catch_all(DataFun) -> - try - DataFun() - catch - _:_ -> undefined - end. From beeedd33d10dda83a18f51f9f717645ad47c857c Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 25 Jan 2024 17:53:59 +0800 Subject: [PATCH 2/4] fix(prom_mria): mria_data in init_acc --- apps/emqx_prometheus/src/emqx_prometheus.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index f9eddfedb..1e6862814 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -283,7 +283,8 @@ aggre_or_zip_init_acc() -> emqx_session_data => maps:from_keys(metrics_name(session_metric_meta()), []), emqx_olp_data => maps:from_keys(metrics_name(olp_metric_meta()), []), emqx_acl_data => maps:from_keys(metrics_name(acl_metric_meta()), []), - emqx_authn_data => maps:from_keys(metrics_name(authn_metric_meta()), []) + emqx_authn_data => maps:from_keys(metrics_name(authn_metric_meta()), []), + mria_data => maps:from_keys(metrics_name(mria_metric_meta()), []) }. logic_sum_metrics() -> From 7b041683bb13c5b95ba0fe65b58353dc8593d38c Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 25 Jan 2024 18:06:47 +0800 Subject: [PATCH 3/4] fix(prom_di): action enable and status --- .../src/emqx_prometheus_data_integration.erl | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl index f3bc98f50..32dc934c7 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl @@ -205,6 +205,8 @@ collect_di(K = emqx_rule_actions_failed_out_of_service, Data) -> counter_metrics collect_di(K = emqx_rule_actions_failed_unknown, Data) -> counter_metrics(?MG(K, Data)); %%==================== %% Action Metric +collect_di(K = emqx_action_enable, Data) -> gauge_metrics(?MG(K, Data)); +collect_di(K = emqx_action_status, Data) -> gauge_metrics(?MG(K, Data)); collect_di(K = emqx_action_matched, Data) -> counter_metrics(?MG(K, Data)); collect_di(K = emqx_action_dropped, Data) -> counter_metrics(?MG(K, Data)); collect_di(K = emqx_action_success, Data) -> counter_metrics(?MG(K, Data)); @@ -375,6 +377,8 @@ get_metric(#{id := Id, enable := Bool} = _Rule) -> action_metric_meta() -> [ + {emqx_action_enable, gauge}, + {emqx_action_status, gauge}, {emqx_action_matched, counter}, {emqx_action_dropped, counter}, {emqx_action_success, counter}, @@ -398,9 +402,11 @@ action_metric(names) -> action_metric_data(Mode, Bridges) -> lists:foldl( - fun(#{type := Type, name := Name} = _Bridge, AccIn) -> + fun(#{type := Type, name := Name} = Action, AccIn) -> Id = emqx_bridge_resource:bridge_id(Type, Name), - merge_acc_with_bridges(Mode, Id, get_bridge_metric(Type, Name), AccIn) + Status = get_action_status(Action), + Metrics = get_action_metric(Type, Name), + merge_acc_with_bridges(Mode, Id, maps:merge(Status, Metrics), AccIn) end, maps:from_keys(action_metric(names), []), Bridges @@ -415,10 +421,18 @@ merge_acc_with_bridges(Mode, Id, BridgeMetrics, PointsAcc) -> BridgeMetrics ). +get_action_status(#{resource_data := ResourceData} = _Action) -> + Enable = emqx_utils_maps:deep_get([config, enable], ResourceData), + Status = ?MG(status, ResourceData), + #{ + emqx_action_enable => emqx_prometheus_cluster:boolean_to_number(Enable), + emqx_action_status => emqx_prometheus_cluster:status_to_number(Status) + }. + action_point(Mode, Id, V) -> {with_node_label(Mode, [{id, Id}]), V}. -get_bridge_metric(Type, Name) -> +get_action_metric(Type, Name) -> #{counters := Counters, gauges := Gauges} = emqx_bridge_v2:get_metrics(Type, Name), #{ emqx_action_matched => ?MG0(matched, Counters), From 4dca1ef84815611911c37ad7cb432658391bdea4 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 25 Jan 2024 19:31:59 +0800 Subject: [PATCH 4/4] fix(prom_di): compatibility with bridge_v1 --- .../src/emqx_prometheus_cluster.erl | 1 - .../src/emqx_prometheus_data_integration.erl | 74 +++++++++++++++---- 2 files changed, 58 insertions(+), 17 deletions(-) diff --git a/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl b/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl index c4f1dc3b8..00a464811 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_cluster.erl @@ -24,7 +24,6 @@ collect_json_data/2, aggre_cluster/3, - %% with_node_name_label/2, point_to_map_fun/1, diff --git a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl index 32dc934c7..7f122aaed 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl @@ -73,20 +73,23 @@ fetch_from_local_node(Mode) -> Rules = emqx_rule_engine:get_rules(), - Bridges = emqx_bridge_v2:list(?ROOT_KEY_ACTIONS), + BridgesV1 = emqx:get_config([bridges], #{}), + BridgeV2Actions = emqx_bridge_v2:list(?ROOT_KEY_ACTIONS), Connectors = emqx_connector:list(), {node(self()), #{ rule_metric_data => rule_metric_data(Mode, Rules), - action_metric_data => action_metric_data(Mode, Bridges), - connector_metric_data => connector_metric_data(Mode, Connectors) + action_metric_data => action_metric_data(Mode, BridgeV2Actions), + connector_metric_data => connector_metric_data(Mode, BridgesV1, Connectors) }}. fetch_cluster_consistented_data() -> Rules = emqx_rule_engine:get_rules(), + %% for bridge v1 + BridgesV1 = emqx:get_config([bridges], #{}), Connectors = emqx_connector:list(), (maybe_collect_schema_registry())#{ rules_ov_data => rules_ov_data(Rules), - connectors_ov_data => connectors_ov_data(Connectors) + connectors_ov_data => connectors_ov_data(BridgesV1, Connectors) }. aggre_or_zip_init_acc() -> @@ -144,9 +147,13 @@ collect_mf(_, _) -> collect(<<"json">>) -> RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), Rules = emqx_rule_engine:get_rules(), - Bridges = emqx_bridge:list(), + Connectors = emqx_connector:list(), + %% for bridge v1 + BridgesV1 = emqx:get_config([bridges], #{}), #{ - data_integration_overview => collect_data_integration_overview(Rules, Bridges), + data_integration_overview => collect_data_integration_overview( + Rules, BridgesV1, Connectors + ), rules => collect_json_data(?MG(rule_metric_data, RawData)), actions => collect_json_data(?MG(action_metric_data, RawData)), connectors => collect_json_data(?MG(connector_metric_data, RawData)) @@ -298,10 +305,17 @@ connectors_ov_metric_meta() -> connectors_ov_metric(names) -> emqx_prometheus_cluster:metric_names(connectors_ov_metric_meta()). -connectors_ov_data(Connectors) -> +connectors_ov_data(BridgesV1, Connectors) -> + %% Both Bridge V1 and V2 + V1ConnectorsCnt = maps:fold( + fun(_Type, NameAndConf, AccIn) -> + AccIn + maps:size(NameAndConf) + end, + 0, + BridgesV1 + ), #{ - %% Both Bridge V1 and V2 - emqx_connectors_count => erlang:length(Connectors) + emqx_connectors_count => erlang:length(Connectors) + V1ConnectorsCnt }. %%======================================== @@ -466,16 +480,44 @@ connector_metric_meta() -> connectr_metric(names) -> emqx_prometheus_cluster:metric_names(connector_metric_meta()). -connector_metric_data(Mode, Connectors) -> +connector_metric_data(Mode, BridgesV1, Connectors) -> + AccIn = maps:from_keys(connectr_metric(names), []), + Acc0 = connector_metric_data_v1(Mode, BridgesV1, AccIn), + _AccOut = connector_metric_data_v2(Mode, Connectors, Acc0). + +connector_metric_data_v2(Mode, Connectors, InitAcc) -> lists:foldl( - fun(#{type := Type, name := Name} = Connector, AccIn) -> + fun(#{type := Type, name := Name, resource_data := ResourceData} = _Connector, AccIn) -> Id = emqx_connector_resource:connector_id(Type, Name), - merge_acc_with_connectors(Mode, Id, get_connector_status(Connector), AccIn) + merge_acc_with_connectors(Mode, Id, get_connector_status(ResourceData), AccIn) end, - maps:from_keys(connectr_metric(names), []), + InitAcc, Connectors ). +connector_metric_data_v1(Mode, BridgesV1, InitAcc) -> + maps:fold( + fun(Type, NameAndConfMap, Acc0) -> + maps:fold( + fun(Name, _Conf, Acc1) -> + BridgeV1Id = emqx_bridge_resource:resource_id(Type, Name), + case emqx_resource:get_instance(BridgeV1Id) of + {error, not_found} -> + Acc1; + {ok, _, ResourceData} -> + merge_acc_with_connectors( + Mode, BridgeV1Id, get_connector_status(ResourceData), Acc1 + ) + end + end, + Acc0, + NameAndConfMap + ) + end, + InitAcc, + BridgesV1 + ). + merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) -> maps:fold( fun(K, V, AccIn) -> @@ -488,7 +530,7 @@ merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) -> connector_point(Mode, Id, V) -> {with_node_label(Mode, [{id, Id}]), V}. -get_connector_status(#{resource_data := ResourceData} = _Connector) -> +get_connector_status(ResourceData) -> Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData), Status = ?MG(status, ResourceData), #{ @@ -502,9 +544,9 @@ get_connector_status(#{resource_data := ResourceData} = _Connector) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% merge / zip formatting funcs for type `application/json` -collect_data_integration_overview(Rules, Bridges) -> +collect_data_integration_overview(Rules, BridgesV1, Connectors) -> RulesD = rules_ov_data(Rules), - ConnectorsD = connectors_ov_data(Bridges), + ConnectorsD = connectors_ov_data(BridgesV1, Connectors), M1 = lists:foldl( fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end,