fix(prom_data_integration): fix metric type

This commit is contained in:
JimMoen 2024-01-16 22:59:00 +08:00
parent c3e9533260
commit 94032aafb2
No known key found for this signature in database
1 changed files with 123 additions and 215 deletions

View File

@ -26,8 +26,6 @@
-export([add_collect_family/4]). -export([add_collect_family/4]).
-export([actions_exec_count/0, actions_exec_count_data/0]).
-include("emqx_prometheus.hrl"). -include("emqx_prometheus.hrl").
-include_lib("prometheus/include/prometheus.hrl"). -include_lib("prometheus/include/prometheus.hrl").
@ -36,7 +34,8 @@
[ [
create_mf/5, create_mf/5,
gauge_metric/1, gauge_metric/1,
gauge_metrics/1 gauge_metrics/1,
counter_metrics/1
] ]
). ).
@ -53,6 +52,58 @@
-define(MG(K, MAP), maps:get(K, MAP)). -define(MG(K, MAP), maps:get(K, MAP)).
-define(MG0(K, MAP), maps:get(K, MAP, 0)). -define(MG0(K, MAP), maps:get(K, MAP, 0)).
-define(RULES_WITH_TYPE, [
{emqx_rules_count, gauge}
]).
-define(CONNECTORS_WITH_TYPE, [
{emqx_connectors_count, gauge}
]).
-define(RULES_SPECIFIC_WITH_TYPE, [
{emqx_rule_matched, counter},
{emqx_rule_failed, counter},
{emqx_rule_passed, counter},
{emqx_rule_failed_exception, counter},
{emqx_rule_failed_no_result, counter},
{emqx_rule_actions_total, counter},
{emqx_rule_actions_success, counter},
{emqx_rule_actions_failed, counter},
{emqx_rule_actions_failed_out_of_service, counter},
{emqx_rule_actions_failed_unknown, counter}
]).
-define(ACTION_SPECIFIC_WITH_TYPE, [
{emqx_action_matched, counter},
{emqx_action_dropped, counter},
{emqx_action_success, counter},
{emqx_action_failed, counter},
{emqx_action_inflight, gauge},
{emqx_action_received, counter},
{emqx_action_late_reply, counter},
{emqx_action_retried, counter},
{emqx_action_retried_success, counter},
{emqx_action_retried_failed, counter},
{emqx_action_dropped_resource_stopped, counter},
{emqx_action_dropped_resource_not_found, counter},
{emqx_action_dropped_queue_full, counter},
{emqx_action_dropped_other, counter},
{emqx_action_dropped_expired, counter},
{emqx_action_queuing, gauge}
]).
-define(CONNECTOR_SPECIFIC_WITH_TYPE, [
{emqx_connector_enable, gauge},
{emqx_connector_status, gauge}
]).
-if(?EMQX_RELEASE_EDITION == ee).
-define(SCHEMA_REGISTRY_WITH_TYPE, [
emqx_schema_registrys_count
]).
-else.
-endif.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Collector API %% Collector API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -68,31 +119,20 @@ collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) ->
Rules = emqx_rule_engine:get_rules(), Rules = emqx_rule_engine:get_rules(),
Bridges = emqx_bridge:list(), Bridges = emqx_bridge:list(),
%% Data Integration Overview %% Data Integration Overview
_ = [add_collect_family(Name, rules_data(Rules), Callback, gauge) || Name <- rules()], ok = add_collect_family(Callback, ?RULES_WITH_TYPE, rules_data(Rules)),
_ = [add_collect_family(Name, actions_data(Rules), Callback, gauge) || Name <- actions()], ok = add_collect_family(Callback, ?CONNECTORS_WITH_TYPE, connectors_data(Bridges)),
_ = [
add_collect_family(Name, connectors_data(Bridges), Callback, gauge)
|| Name <- connectors()
],
ok = maybe_collect_family_schema_registry(Callback), ok = maybe_collect_family_schema_registry(Callback),
%% Rule Specific %% Rule Specific
_ = [ ok = add_collect_family(Callback, ?RULES_SPECIFIC_WITH_TYPE, rule_specific_data(Rules)),
add_collect_family(Name, rule_specific_data(Rules), Callback, gauge)
|| Name <- rule_specific()
],
%% Action Specific %% Action Specific
_ = [ ok = add_collect_family(Callback, ?ACTION_SPECIFIC_WITH_TYPE, action_specific_data(Bridges)),
add_collect_family(Name, action_specific_data(Bridges), Callback, gauge)
|| Name <- action_specific()
],
%% Connector Specific %% Connector Specific
_ = [ ok = add_collect_family(
add_collect_family(Name, connector_specific_data(Bridges), Callback, gauge) Callback, ?CONNECTOR_SPECIFIC_WITH_TYPE, connector_specific_data(Bridges)
|| Name <- connector_specific() ),
],
ok; ok;
collect_mf(_, _) -> collect_mf(_, _) ->
@ -114,6 +154,10 @@ collect(<<"prometheus">>) ->
%%==================== %%====================
%% API Helpers %% API Helpers
add_collect_family(Callback, MetricWithType, Data) ->
_ = [add_collect_family(Name, Data, Callback, Type) || {Name, Type} <- MetricWithType],
ok.
add_collect_family(Name, Data, Callback, Type) -> add_collect_family(Name, Data, Callback, Type) ->
%% TODO: help document from Name %% TODO: help document from Name
Callback(create_mf(Name, _Help = <<"">>, Type, ?MODULE, Data)). Callback(create_mf(Name, _Help = <<"">>, Type, ?MODULE, Data)).
@ -123,15 +167,21 @@ collect_metrics(Name, Metrics) ->
collect_data_integration_overview(Rules, Bridges) -> collect_data_integration_overview(Rules, Bridges) ->
RulesD = rules_data(Rules), RulesD = rules_data(Rules),
ActionsD = actions_data(Rules),
ConnectorsD = connectors_data(Bridges), ConnectorsD = connectors_data(Bridges),
M1 = lists:foldl(fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end, #{}, rules()), M1 = lists:foldl(
M2 = lists:foldl(fun(K, AccIn) -> AccIn#{K => ?MG(K, ActionsD)} end, #{}, actions()), fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end,
M3 = lists:foldl(fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end, #{}, connectors()), #{},
M4 = maybe_collect_schema_registry(), metric_names(?RULES_WITH_TYPE)
),
M2 = lists:foldl(
fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end,
#{},
metric_names(?CONNECTORS_WITH_TYPE)
),
M3 = maybe_collect_schema_registry(),
lists:foldl(fun(M, AccIn) -> maps:merge(M, AccIn) end, #{}, [M1, M2, M3, M4]). lists:foldl(fun(M, AccIn) -> maps:merge(M, AccIn) end, #{}, [M1, M2, M3]).
collect_data_integration(Type, DataSeed) -> collect_data_integration(Type, DataSeed) ->
maps:fold( maps:fold(
@ -144,10 +194,7 @@ collect_data_integration(Type, DataSeed) ->
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
maybe_collect_family_schema_registry(Callback) -> maybe_collect_family_schema_registry(Callback) ->
_ = [ ok = add_collect_family(Callback, ?SCHEMA_REGISTRY_WITH_TYPE, schema_registry_data()),
add_collect_family(Name, schema_registry_data(), Callback, gauge)
|| Name <- schema_registry()
],
ok. ok.
maybe_collect_schema_registry() -> maybe_collect_schema_registry() ->
@ -171,25 +218,15 @@ maybe_collect_schema_registry() ->
%%==================== %%====================
%% All Rules %% All Rules
%% Rules %% Rules
collect_di(K = emqx_rule_count, Data) -> collect_di(K = emqx_rules_count, Data) ->
gauge_metric(?MG(K, Data));
collect_di(K = emqx_rules_matched_rate, Data) ->
gauge_metric(?MG(K, Data));
collect_di(K = emqx_rules_matched_rate_last5m, Data) ->
gauge_metric(?MG(K, Data));
%%====================
%% All Actions
collect_di(K = emqx_rules_actions_rate, Data) ->
gauge_metric(?MG(K, Data));
collect_di(K = emqx_rules_actions_rate_last5m, Data) ->
gauge_metric(?MG(K, Data)); gauge_metric(?MG(K, Data));
%%==================== %%====================
%% Schema Registry %% Schema Registry
collect_di(K = emqx_schema_registry_count, Data) -> collect_di(K = emqx_schema_registrys_count, Data) ->
gauge_metric(?MG(K, Data)); gauge_metric(?MG(K, Data));
%%==================== %%====================
%% Connectors %% Connectors
collect_di(K = emqx_connector_count, Data) -> collect_di(K = emqx_connectors_count, Data) ->
gauge_metric(?MG(K, Data)); gauge_metric(?MG(K, Data));
%%======================================== %%========================================
%% Data Integration for Specific: Rule && Action && Connector %% Data Integration for Specific: Rule && Action && Connector
@ -198,71 +235,61 @@ collect_di(K = emqx_connector_count, Data) ->
%%==================== %%====================
%% Specific Rule %% Specific Rule
collect_di(K = emqx_rule_matched, Data) -> collect_di(K = emqx_rule_matched, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_rule_failed, Data) -> collect_di(K = emqx_rule_failed, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_rule_passed, Data) -> collect_di(K = emqx_rule_passed, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_rule_failed_exception, Data) -> collect_di(K = emqx_rule_failed_exception, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_rule_failed_no_result, Data) -> collect_di(K = emqx_rule_failed_no_result, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_rule_actions_total, Data) -> collect_di(K = emqx_rule_actions_total, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_rule_actions_success, Data) -> collect_di(K = emqx_rule_actions_success, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_rule_actions_failed, Data) -> collect_di(K = emqx_rule_actions_failed, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_rule_actions_failed_out_of_service, Data) -> collect_di(K = emqx_rule_actions_failed_out_of_service, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_rule_actions_failed_unknown, Data) -> collect_di(K = emqx_rule_actions_failed_unknown, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_rule_matched_rate, Data) ->
gauge_metrics(?MG(K, Data));
collect_di(K = emqx_rule_matched_rate_last5m, Data) ->
gauge_metrics(?MG(K, Data));
collect_di(K = emqx_rule_matched_rate_max, Data) ->
gauge_metrics(?MG(K, Data));
%%==================== %%====================
%% Specific Action %% Specific Action
collect_di(K = emqx_action_matched, Data) -> collect_di(K = emqx_action_matched, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_action_dropped, Data) -> collect_di(K = emqx_action_dropped, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_action_success, Data) -> collect_di(K = emqx_action_success, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_action_failed, Data) -> collect_di(K = emqx_action_failed, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_action_rate, Data) ->
gauge_metrics(?MG(K, Data));
collect_di(K = emqx_action_inflight, Data) -> collect_di(K = emqx_action_inflight, Data) ->
%% inflight type: gauge
gauge_metrics(?MG(K, Data)); gauge_metrics(?MG(K, Data));
collect_di(K = emqx_action_received, Data) -> collect_di(K = emqx_action_received, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_action_late_reply, Data) -> collect_di(K = emqx_action_late_reply, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_action_retried, Data) -> collect_di(K = emqx_action_retried, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_action_retried_success, Data) -> collect_di(K = emqx_action_retried_success, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_action_retried_failed, Data) -> collect_di(K = emqx_action_retried_failed, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_action_dropped_resource_stopped, Data) -> collect_di(K = emqx_action_dropped_resource_stopped, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_action_dropped_resource_not_found, Data) -> collect_di(K = emqx_action_dropped_resource_not_found, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_action_dropped_queue_full, Data) -> collect_di(K = emqx_action_dropped_queue_full, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_action_dropped_other, Data) -> collect_di(K = emqx_action_dropped_other, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_action_dropped_expired, Data) -> collect_di(K = emqx_action_dropped_expired, Data) ->
gauge_metrics(?MG(K, Data)); counter_metrics(?MG(K, Data));
collect_di(K = emqx_action_queuing, Data) -> collect_di(K = emqx_action_queuing, Data) ->
gauge_metrics(?MG(K, Data)); %% queuing type: gauge
collect_di(K = emqx_action_rate_last5m, Data) ->
gauge_metrics(?MG(K, Data));
collect_di(K = emqx_action_rate_max, Data) ->
gauge_metrics(?MG(K, Data)); gauge_metrics(?MG(K, Data));
%%==================== %%====================
%% Specific Connector %% Specific Connector
@ -283,100 +310,30 @@ collect_di(K = emqx_connector_status, Data) ->
%%==================== %%====================
%% All Rules %% All Rules
rules() ->
[
emqx_rule_count,
emqx_rules_matched_rate,
emqx_rules_matched_rate_last5m
].
-define(RULE_TAB, emqx_rule_engine). -define(RULE_TAB, emqx_rule_engine).
rules_data(_Rules) ->
rules_data(Rules) -> #{
Rate = lists:foldl( emqx_rules_count => ets:info(?RULE_TAB, size)
fun( }.
#{id := Id},
#{emqx_rules_matched_rate := Rate, emqx_rules_matched_rate_last5m := RateLast5m} = AccIn
) ->
RuleMetrics = emqx_metrics_worker:get_metrics(rule_metrics, Id),
AccIn#{
emqx_rules_matched_rate => Rate +
emqx_utils_maps:deep_get([rate, matched, current], RuleMetrics, 0),
emqx_rules_matched_rate_last5m => RateLast5m +
emqx_utils_maps:deep_get([rate, matched, last5m], RuleMetrics, 0)
}
end,
_InitAcc = maps:from_keys(rules(), 0),
Rules
),
Rate#{emqx_rule_count => ets:info(?RULE_TAB, size)}.
%%====================
%% All Actions
actions() ->
[
emqx_rules_actions_rate,
emqx_rules_actions_rate_last5m
].
actions_data(Rules) ->
lists:foldl(
fun(
#{id := Id},
#{emqx_rules_actions_rate := Rate, emqx_rules_actions_rate_last5m := RateLast5m} =
_AccIn
) ->
RuleMetrics = emqx_metrics_worker:get_metrics(rule_metrics, Id),
_AccIn#{
emqx_rules_actions_rate => Rate +
emqx_utils_maps:deep_get([rate, matched, current], RuleMetrics, 0),
emqx_rules_actions_rate_last5m => RateLast5m +
emqx_utils_maps:deep_get([rate, matched, last5m], RuleMetrics, 0)
}
end,
_InitAcc = maps:from_keys(actions(), 0),
Rules
).
actions_exec_count() ->
[
emqx_action_sink,
emqx_action_source
].
actions_exec_count_data() ->
#{}.
%%==================== %%====================
%% Schema Registry %% Schema Registry
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
schema_registry() ->
[
emqx_schema_registry_count
].
schema_registry_data() -> schema_registry_data() ->
#{ #{
emqx_schema_registry_count => erlang:map_size(emqx_schema_registry:list_schemas()) emqx_schema_registrys_count => erlang:map_size(emqx_schema_registry:list_schemas())
}. }.
-else. -else.
-endif. -endif.
%%==================== %%====================
%% Connectors %% Connectors
connectors() ->
[
emqx_connector_count
].
connectors_data(Brdiges) -> connectors_data(Brdiges) ->
#{ #{
%% Both Bridge V1 and V2 %% Both Bridge V1 and V2
emqx_connector_count => erlang:length(Brdiges) emqx_connectors_count => erlang:length(Brdiges)
}. }.
%%======================================== %%========================================
@ -387,29 +344,12 @@ connectors_data(Brdiges) ->
%% Specific Rule %% Specific Rule
%% With rule_id as label key: `rule_id` %% With rule_id as label key: `rule_id`
rule_specific() ->
[
emqx_rule_matched,
emqx_rule_failed,
emqx_rule_passed,
emqx_rule_failed_exception,
emqx_rule_failed_no_result,
emqx_rule_actions_total,
emqx_rule_actions_success,
emqx_rule_actions_failed,
emqx_rule_actions_failed_out_of_service,
emqx_rule_actions_failed_unknown,
emqx_rule_matched_rate,
emqx_rule_matched_rate_last5m,
emqx_rule_matched_rate_max
].
rule_specific_data(Rules) -> rule_specific_data(Rules) ->
lists:foldl( lists:foldl(
fun(#{id := Id} = Rule, AccIn) -> fun(#{id := Id} = Rule, AccIn) ->
merge_acc_with_rules(Id, get_metric(Rule), AccIn) merge_acc_with_rules(Id, get_metric(Rule), AccIn)
end, end,
maps:from_keys(rule_specific(), []), maps:from_keys(metric_names(?RULES_SPECIFIC_WITH_TYPE), []),
Rules Rules
). ).
@ -427,7 +367,7 @@ rule_point(Id, V) ->
get_metric(#{id := Id} = _Rule) -> get_metric(#{id := Id} = _Rule) ->
case emqx_metrics_worker:get_metrics(rule_metrics, Id) of case emqx_metrics_worker:get_metrics(rule_metrics, Id) of
#{counters := Counters, rate := #{matched := MatchedRate}} -> #{counters := Counters} ->
#{ #{
emqx_rule_matched => ?MG(matched, Counters), emqx_rule_matched => ?MG(matched, Counters),
emqx_rule_failed => ?MG(failed, Counters), emqx_rule_failed => ?MG(failed, Counters),
@ -440,10 +380,7 @@ get_metric(#{id := Id} = _Rule) ->
emqx_rule_actions_failed_out_of_service => ?MG( emqx_rule_actions_failed_out_of_service => ?MG(
'actions.failed.out_of_service', Counters 'actions.failed.out_of_service', Counters
), ),
emqx_rule_actions_failed_unknown => ?MG('actions.failed.unknown', Counters), emqx_rule_actions_failed_unknown => ?MG('actions.failed.unknown', Counters)
emqx_rule_matched_rate => ?MG(current, MatchedRate),
emqx_rule_matched_rate_last5m => ?MG(last5m, MatchedRate),
emqx_rule_matched_rate_max => ?MG(max, MatchedRate)
} }
end. end.
@ -451,36 +388,13 @@ get_metric(#{id := Id} = _Rule) ->
%% Specific Action %% Specific Action
%% With action_id: `{type}:{name}` as label key: `action_id` %% With action_id: `{type}:{name}` as label key: `action_id`
action_specific() ->
[
emqx_action_matched,
emqx_action_dropped,
emqx_action_success,
emqx_action_failed,
emqx_action_rate,
emqx_action_inflight,
emqx_action_received,
emqx_action_late_reply,
emqx_action_retried,
emqx_action_retried_success,
emqx_action_retried_failed,
emqx_action_dropped_resource_stopped,
emqx_action_dropped_resource_not_found,
emqx_action_dropped_queue_full,
emqx_action_dropped_other,
emqx_action_dropped_expired,
emqx_action_queuing,
emqx_action_rate_last5m,
emqx_action_rate_max
].
action_specific_data(Bridges) -> action_specific_data(Bridges) ->
lists:foldl( lists:foldl(
fun(#{type := Type, name := Name} = _Bridge, AccIn) -> fun(#{type := Type, name := Name} = _Bridge, AccIn) ->
Id = emqx_bridge_resource:bridge_id(Type, Name), Id = emqx_bridge_resource:bridge_id(Type, Name),
merge_acc_with_bridges(Id, get_bridge_metric(Type, Name), AccIn) merge_acc_with_bridges(Id, get_bridge_metric(Type, Name), AccIn)
end, end,
maps:from_keys(action_specific(), []), maps:from_keys(metric_names(?ACTION_SPECIFIC_WITH_TYPE), []),
Bridges Bridges
). ).
@ -498,13 +412,12 @@ action_point(Id, V) ->
get_bridge_metric(Type, Name) -> get_bridge_metric(Type, Name) ->
case emqx_bridge:get_metrics(Type, Name) of case emqx_bridge:get_metrics(Type, Name) of
#{counters := Counters, rate := #{matched := MatchedRate}, gauges := Gauges} -> #{counters := Counters, gauges := Gauges} ->
#{ #{
emqx_action_matched => ?MG0(matched, Counters), emqx_action_matched => ?MG0(matched, Counters),
emqx_action_dropped => ?MG0(dropped, Counters), emqx_action_dropped => ?MG0(dropped, Counters),
emqx_action_success => ?MG0(success, Counters), emqx_action_success => ?MG0(success, Counters),
emqx_action_failed => ?MG0(failed, Counters), emqx_action_failed => ?MG0(failed, Counters),
emqx_action_rate => ?MG0(current, MatchedRate),
emqx_action_inflight => ?MG0(inflight, Gauges), emqx_action_inflight => ?MG0(inflight, Gauges),
emqx_action_received => ?MG0(received, Counters), emqx_action_received => ?MG0(received, Counters),
emqx_action_late_reply => ?MG0(late_reply, Counters), emqx_action_late_reply => ?MG0(late_reply, Counters),
@ -518,9 +431,7 @@ get_bridge_metric(Type, Name) ->
emqx_action_dropped_queue_full => ?MG0('dropped.queue_full', Counters), emqx_action_dropped_queue_full => ?MG0('dropped.queue_full', Counters),
emqx_action_dropped_other => ?MG0('dropped.other', Counters), emqx_action_dropped_other => ?MG0('dropped.other', Counters),
emqx_action_dropped_expired => ?MG0('dropped.expired', Counters), emqx_action_dropped_expired => ?MG0('dropped.expired', Counters),
emqx_action_queuing => ?MG0(queuing, Gauges), emqx_action_queuing => ?MG0(queuing, Gauges)
emqx_action_rate_last5m => ?MG0(last5m, MatchedRate),
emqx_action_rate_max => ?MG0(max, MatchedRate)
} }
end. end.
@ -528,19 +439,13 @@ get_bridge_metric(Type, Name) ->
%% Specific Connector %% Specific Connector
%% With connector_id: `{type}:{name}` as label key: `connector_id` %% With connector_id: `{type}:{name}` as label key: `connector_id`
connector_specific() ->
[
emqx_connector_enable,
emqx_connector_status
].
connector_specific_data(Bridges) -> connector_specific_data(Bridges) ->
lists:foldl( lists:foldl(
fun(#{type := Type, name := Name} = Bridge, AccIn) -> fun(#{type := Type, name := Name} = Bridge, AccIn) ->
Id = emqx_bridge_resource:bridge_id(Type, Name), Id = emqx_bridge_resource:bridge_id(Type, Name),
merge_acc_with_connectors(Id, get_connector_status(Bridge), AccIn) merge_acc_with_connectors(Id, get_connector_status(Bridge), AccIn)
end, end,
maps:from_keys(connector_specific(), []), maps:from_keys(metric_names(?CONNECTOR_SPECIFIC_WITH_TYPE), []),
Bridges Bridges
). ).
@ -623,3 +528,6 @@ di_data(connectors, Bridges) -> connector_specific_data(Bridges).
label_key(rules) -> id; label_key(rules) -> id;
label_key(actions) -> id; label_key(actions) -> id;
label_key(connectors) -> id. label_key(connectors) -> id.
metric_names(MetricWithType) when is_list(MetricWithType) ->
[Name || {Name, _Type} <- MetricWithType].