diff --git a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl index 72fd7a6e9..c41d9a6fb 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_data_integration.erl @@ -26,8 +26,6 @@ -export([add_collect_family/4]). --export([actions_exec_count/0, actions_exec_count_data/0]). - -include("emqx_prometheus.hrl"). -include_lib("prometheus/include/prometheus.hrl"). @@ -36,7 +34,8 @@ [ create_mf/5, gauge_metric/1, - gauge_metrics/1 + gauge_metrics/1, + counter_metrics/1 ] ). @@ -53,6 +52,58 @@ -define(MG(K, MAP), maps:get(K, MAP)). -define(MG0(K, MAP), maps:get(K, MAP, 0)). +-define(RULES_WITH_TYPE, [ + {emqx_rules_count, gauge} +]). + +-define(CONNECTORS_WITH_TYPE, [ + {emqx_connectors_count, gauge} +]). + +-define(RULES_SPECIFIC_WITH_TYPE, [ + {emqx_rule_matched, counter}, + {emqx_rule_failed, counter}, + {emqx_rule_passed, counter}, + {emqx_rule_failed_exception, counter}, + {emqx_rule_failed_no_result, counter}, + {emqx_rule_actions_total, counter}, + {emqx_rule_actions_success, counter}, + {emqx_rule_actions_failed, counter}, + {emqx_rule_actions_failed_out_of_service, counter}, + {emqx_rule_actions_failed_unknown, counter} +]). + +-define(ACTION_SPECIFIC_WITH_TYPE, [ + {emqx_action_matched, counter}, + {emqx_action_dropped, counter}, + {emqx_action_success, counter}, + {emqx_action_failed, counter}, + {emqx_action_inflight, gauge}, + {emqx_action_received, counter}, + {emqx_action_late_reply, counter}, + {emqx_action_retried, counter}, + {emqx_action_retried_success, counter}, + {emqx_action_retried_failed, counter}, + {emqx_action_dropped_resource_stopped, counter}, + {emqx_action_dropped_resource_not_found, counter}, + {emqx_action_dropped_queue_full, counter}, + {emqx_action_dropped_other, counter}, + {emqx_action_dropped_expired, counter}, + {emqx_action_queuing, gauge} +]). + +-define(CONNECTOR_SPECIFIC_WITH_TYPE, [ + {emqx_connector_enable, gauge}, + {emqx_connector_status, gauge} +]). + +-if(?EMQX_RELEASE_EDITION == ee). +-define(SCHEMA_REGISTRY_WITH_TYPE, [ + emqx_schema_registrys_count +]). +-else. +-endif. + %%-------------------------------------------------------------------- %% Collector API %%-------------------------------------------------------------------- @@ -68,31 +119,20 @@ collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) -> Rules = emqx_rule_engine:get_rules(), Bridges = emqx_bridge:list(), %% Data Integration Overview - _ = [add_collect_family(Name, rules_data(Rules), Callback, gauge) || Name <- rules()], - _ = [add_collect_family(Name, actions_data(Rules), Callback, gauge) || Name <- actions()], - _ = [ - add_collect_family(Name, connectors_data(Bridges), Callback, gauge) - || Name <- connectors() - ], + ok = add_collect_family(Callback, ?RULES_WITH_TYPE, rules_data(Rules)), + ok = add_collect_family(Callback, ?CONNECTORS_WITH_TYPE, connectors_data(Bridges)), ok = maybe_collect_family_schema_registry(Callback), %% Rule Specific - _ = [ - add_collect_family(Name, rule_specific_data(Rules), Callback, gauge) - || Name <- rule_specific() - ], + ok = add_collect_family(Callback, ?RULES_SPECIFIC_WITH_TYPE, rule_specific_data(Rules)), %% Action Specific - _ = [ - add_collect_family(Name, action_specific_data(Bridges), Callback, gauge) - || Name <- action_specific() - ], + ok = add_collect_family(Callback, ?ACTION_SPECIFIC_WITH_TYPE, action_specific_data(Bridges)), %% Connector Specific - _ = [ - add_collect_family(Name, connector_specific_data(Bridges), Callback, gauge) - || Name <- connector_specific() - ], + ok = add_collect_family( + Callback, ?CONNECTOR_SPECIFIC_WITH_TYPE, connector_specific_data(Bridges) + ), ok; collect_mf(_, _) -> @@ -114,6 +154,10 @@ collect(<<"prometheus">>) -> %%==================== %% API Helpers +add_collect_family(Callback, MetricWithType, Data) -> + _ = [add_collect_family(Name, Data, Callback, Type) || {Name, Type} <- MetricWithType], + ok. + add_collect_family(Name, Data, Callback, Type) -> %% TODO: help document from Name Callback(create_mf(Name, _Help = <<"">>, Type, ?MODULE, Data)). @@ -123,15 +167,21 @@ collect_metrics(Name, Metrics) -> collect_data_integration_overview(Rules, Bridges) -> RulesD = rules_data(Rules), - ActionsD = actions_data(Rules), ConnectorsD = connectors_data(Bridges), - M1 = lists:foldl(fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end, #{}, rules()), - M2 = lists:foldl(fun(K, AccIn) -> AccIn#{K => ?MG(K, ActionsD)} end, #{}, actions()), - M3 = lists:foldl(fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end, #{}, connectors()), - M4 = maybe_collect_schema_registry(), + M1 = lists:foldl( + fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end, + #{}, + metric_names(?RULES_WITH_TYPE) + ), + M2 = lists:foldl( + fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end, + #{}, + metric_names(?CONNECTORS_WITH_TYPE) + ), + M3 = maybe_collect_schema_registry(), - lists:foldl(fun(M, AccIn) -> maps:merge(M, AccIn) end, #{}, [M1, M2, M3, M4]). + lists:foldl(fun(M, AccIn) -> maps:merge(M, AccIn) end, #{}, [M1, M2, M3]). collect_data_integration(Type, DataSeed) -> maps:fold( @@ -144,10 +194,7 @@ collect_data_integration(Type, DataSeed) -> -if(?EMQX_RELEASE_EDITION == ee). maybe_collect_family_schema_registry(Callback) -> - _ = [ - add_collect_family(Name, schema_registry_data(), Callback, gauge) - || Name <- schema_registry() - ], + ok = add_collect_family(Callback, ?SCHEMA_REGISTRY_WITH_TYPE, schema_registry_data()), ok. maybe_collect_schema_registry() -> @@ -171,25 +218,15 @@ maybe_collect_schema_registry() -> %%==================== %% All Rules %% Rules -collect_di(K = emqx_rule_count, Data) -> - gauge_metric(?MG(K, Data)); -collect_di(K = emqx_rules_matched_rate, Data) -> - gauge_metric(?MG(K, Data)); -collect_di(K = emqx_rules_matched_rate_last5m, Data) -> - gauge_metric(?MG(K, Data)); -%%==================== -%% All Actions -collect_di(K = emqx_rules_actions_rate, Data) -> - gauge_metric(?MG(K, Data)); -collect_di(K = emqx_rules_actions_rate_last5m, Data) -> +collect_di(K = emqx_rules_count, Data) -> gauge_metric(?MG(K, Data)); %%==================== %% Schema Registry -collect_di(K = emqx_schema_registry_count, Data) -> +collect_di(K = emqx_schema_registrys_count, Data) -> gauge_metric(?MG(K, Data)); %%==================== %% Connectors -collect_di(K = emqx_connector_count, Data) -> +collect_di(K = emqx_connectors_count, Data) -> gauge_metric(?MG(K, Data)); %%======================================== %% Data Integration for Specific: Rule && Action && Connector @@ -198,71 +235,61 @@ collect_di(K = emqx_connector_count, Data) -> %%==================== %% Specific Rule collect_di(K = emqx_rule_matched, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_rule_failed, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_rule_passed, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_rule_failed_exception, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_rule_failed_no_result, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_rule_actions_total, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_rule_actions_success, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_rule_actions_failed, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_rule_actions_failed_out_of_service, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_rule_actions_failed_unknown, Data) -> - gauge_metrics(?MG(K, Data)); -collect_di(K = emqx_rule_matched_rate, Data) -> - gauge_metrics(?MG(K, Data)); -collect_di(K = emqx_rule_matched_rate_last5m, Data) -> - gauge_metrics(?MG(K, Data)); -collect_di(K = emqx_rule_matched_rate_max, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); %%==================== %% Specific Action collect_di(K = emqx_action_matched, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_action_dropped, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_action_success, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_action_failed, Data) -> - gauge_metrics(?MG(K, Data)); -collect_di(K = emqx_action_rate, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_action_inflight, Data) -> + %% inflight type: gauge gauge_metrics(?MG(K, Data)); collect_di(K = emqx_action_received, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_action_late_reply, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_action_retried, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_action_retried_success, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_action_retried_failed, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_action_dropped_resource_stopped, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_action_dropped_resource_not_found, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_action_dropped_queue_full, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_action_dropped_other, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_action_dropped_expired, Data) -> - gauge_metrics(?MG(K, Data)); + counter_metrics(?MG(K, Data)); collect_di(K = emqx_action_queuing, Data) -> - gauge_metrics(?MG(K, Data)); -collect_di(K = emqx_action_rate_last5m, Data) -> - gauge_metrics(?MG(K, Data)); -collect_di(K = emqx_action_rate_max, Data) -> + %% queuing type: gauge gauge_metrics(?MG(K, Data)); %%==================== %% Specific Connector @@ -283,100 +310,30 @@ collect_di(K = emqx_connector_status, Data) -> %%==================== %% All Rules -rules() -> - [ - emqx_rule_count, - emqx_rules_matched_rate, - emqx_rules_matched_rate_last5m - ]. - -define(RULE_TAB, emqx_rule_engine). - -rules_data(Rules) -> - Rate = lists:foldl( - fun( - #{id := Id}, - #{emqx_rules_matched_rate := Rate, emqx_rules_matched_rate_last5m := RateLast5m} = AccIn - ) -> - RuleMetrics = emqx_metrics_worker:get_metrics(rule_metrics, Id), - AccIn#{ - emqx_rules_matched_rate => Rate + - emqx_utils_maps:deep_get([rate, matched, current], RuleMetrics, 0), - emqx_rules_matched_rate_last5m => RateLast5m + - emqx_utils_maps:deep_get([rate, matched, last5m], RuleMetrics, 0) - } - end, - _InitAcc = maps:from_keys(rules(), 0), - Rules - ), - Rate#{emqx_rule_count => ets:info(?RULE_TAB, size)}. - -%%==================== -%% All Actions - -actions() -> - [ - emqx_rules_actions_rate, - emqx_rules_actions_rate_last5m - ]. - -actions_data(Rules) -> - lists:foldl( - fun( - #{id := Id}, - #{emqx_rules_actions_rate := Rate, emqx_rules_actions_rate_last5m := RateLast5m} = - _AccIn - ) -> - RuleMetrics = emqx_metrics_worker:get_metrics(rule_metrics, Id), - _AccIn#{ - emqx_rules_actions_rate => Rate + - emqx_utils_maps:deep_get([rate, matched, current], RuleMetrics, 0), - emqx_rules_actions_rate_last5m => RateLast5m + - emqx_utils_maps:deep_get([rate, matched, last5m], RuleMetrics, 0) - } - end, - _InitAcc = maps:from_keys(actions(), 0), - Rules - ). - -actions_exec_count() -> - [ - emqx_action_sink, - emqx_action_source - ]. - -actions_exec_count_data() -> - #{}. +rules_data(_Rules) -> + #{ + emqx_rules_count => ets:info(?RULE_TAB, size) + }. %%==================== %% Schema Registry -if(?EMQX_RELEASE_EDITION == ee). -schema_registry() -> - [ - emqx_schema_registry_count - ]. - schema_registry_data() -> #{ - emqx_schema_registry_count => erlang:map_size(emqx_schema_registry:list_schemas()) + emqx_schema_registrys_count => erlang:map_size(emqx_schema_registry:list_schemas()) }. -else. - -endif. %%==================== %% Connectors -connectors() -> - [ - emqx_connector_count - ]. - connectors_data(Brdiges) -> #{ %% Both Bridge V1 and V2 - emqx_connector_count => erlang:length(Brdiges) + emqx_connectors_count => erlang:length(Brdiges) }. %%======================================== @@ -387,29 +344,12 @@ connectors_data(Brdiges) -> %% Specific Rule %% With rule_id as label key: `rule_id` -rule_specific() -> - [ - emqx_rule_matched, - emqx_rule_failed, - emqx_rule_passed, - emqx_rule_failed_exception, - emqx_rule_failed_no_result, - emqx_rule_actions_total, - emqx_rule_actions_success, - emqx_rule_actions_failed, - emqx_rule_actions_failed_out_of_service, - emqx_rule_actions_failed_unknown, - emqx_rule_matched_rate, - emqx_rule_matched_rate_last5m, - emqx_rule_matched_rate_max - ]. - rule_specific_data(Rules) -> lists:foldl( fun(#{id := Id} = Rule, AccIn) -> merge_acc_with_rules(Id, get_metric(Rule), AccIn) end, - maps:from_keys(rule_specific(), []), + maps:from_keys(metric_names(?RULES_SPECIFIC_WITH_TYPE), []), Rules ). @@ -427,7 +367,7 @@ rule_point(Id, V) -> get_metric(#{id := Id} = _Rule) -> case emqx_metrics_worker:get_metrics(rule_metrics, Id) of - #{counters := Counters, rate := #{matched := MatchedRate}} -> + #{counters := Counters} -> #{ emqx_rule_matched => ?MG(matched, Counters), emqx_rule_failed => ?MG(failed, Counters), @@ -440,10 +380,7 @@ get_metric(#{id := Id} = _Rule) -> emqx_rule_actions_failed_out_of_service => ?MG( 'actions.failed.out_of_service', Counters ), - emqx_rule_actions_failed_unknown => ?MG('actions.failed.unknown', Counters), - emqx_rule_matched_rate => ?MG(current, MatchedRate), - emqx_rule_matched_rate_last5m => ?MG(last5m, MatchedRate), - emqx_rule_matched_rate_max => ?MG(max, MatchedRate) + emqx_rule_actions_failed_unknown => ?MG('actions.failed.unknown', Counters) } end. @@ -451,36 +388,13 @@ get_metric(#{id := Id} = _Rule) -> %% Specific Action %% With action_id: `{type}:{name}` as label key: `action_id` -action_specific() -> - [ - emqx_action_matched, - emqx_action_dropped, - emqx_action_success, - emqx_action_failed, - emqx_action_rate, - emqx_action_inflight, - emqx_action_received, - emqx_action_late_reply, - emqx_action_retried, - emqx_action_retried_success, - emqx_action_retried_failed, - emqx_action_dropped_resource_stopped, - emqx_action_dropped_resource_not_found, - emqx_action_dropped_queue_full, - emqx_action_dropped_other, - emqx_action_dropped_expired, - emqx_action_queuing, - emqx_action_rate_last5m, - emqx_action_rate_max - ]. - action_specific_data(Bridges) -> lists:foldl( fun(#{type := Type, name := Name} = _Bridge, AccIn) -> Id = emqx_bridge_resource:bridge_id(Type, Name), merge_acc_with_bridges(Id, get_bridge_metric(Type, Name), AccIn) end, - maps:from_keys(action_specific(), []), + maps:from_keys(metric_names(?ACTION_SPECIFIC_WITH_TYPE), []), Bridges ). @@ -498,13 +412,12 @@ action_point(Id, V) -> get_bridge_metric(Type, Name) -> case emqx_bridge:get_metrics(Type, Name) of - #{counters := Counters, rate := #{matched := MatchedRate}, gauges := Gauges} -> + #{counters := Counters, gauges := Gauges} -> #{ emqx_action_matched => ?MG0(matched, Counters), emqx_action_dropped => ?MG0(dropped, Counters), emqx_action_success => ?MG0(success, Counters), emqx_action_failed => ?MG0(failed, Counters), - emqx_action_rate => ?MG0(current, MatchedRate), emqx_action_inflight => ?MG0(inflight, Gauges), emqx_action_received => ?MG0(received, Counters), emqx_action_late_reply => ?MG0(late_reply, Counters), @@ -518,9 +431,7 @@ get_bridge_metric(Type, Name) -> emqx_action_dropped_queue_full => ?MG0('dropped.queue_full', Counters), emqx_action_dropped_other => ?MG0('dropped.other', Counters), emqx_action_dropped_expired => ?MG0('dropped.expired', Counters), - emqx_action_queuing => ?MG0(queuing, Gauges), - emqx_action_rate_last5m => ?MG0(last5m, MatchedRate), - emqx_action_rate_max => ?MG0(max, MatchedRate) + emqx_action_queuing => ?MG0(queuing, Gauges) } end. @@ -528,19 +439,13 @@ get_bridge_metric(Type, Name) -> %% Specific Connector %% With connector_id: `{type}:{name}` as label key: `connector_id` -connector_specific() -> - [ - emqx_connector_enable, - emqx_connector_status - ]. - connector_specific_data(Bridges) -> lists:foldl( fun(#{type := Type, name := Name} = Bridge, AccIn) -> Id = emqx_bridge_resource:bridge_id(Type, Name), merge_acc_with_connectors(Id, get_connector_status(Bridge), AccIn) end, - maps:from_keys(connector_specific(), []), + maps:from_keys(metric_names(?CONNECTOR_SPECIFIC_WITH_TYPE), []), Bridges ). @@ -623,3 +528,6 @@ di_data(connectors, Bridges) -> connector_specific_data(Bridges). label_key(rules) -> id; label_key(actions) -> id; label_key(connectors) -> id. + +metric_names(MetricWithType) when is_list(MetricWithType) -> + [Name || {Name, _Type} <- MetricWithType].