feat(prom_stats): aggregated/unaggregated prometheus data
This commit is contained in:
parent
6b064dd8eb
commit
b424f8ac12
File diff suppressed because it is too large
Load Diff
|
@ -81,43 +81,6 @@
|
||||||
-define(MG0(K, MAP), maps:get(K, MAP, 0)).
|
-define(MG0(K, MAP), maps:get(K, MAP, 0)).
|
||||||
-define(PG0(K, PROPLISTS), proplists:get_value(K, PROPLISTS, 0)).
|
-define(PG0(K, PROPLISTS), proplists:get_value(K, PROPLISTS, 0)).
|
||||||
|
|
||||||
-define(AUTHNS_WITH_TYPE, [
|
|
||||||
{emqx_authn_enable, gauge},
|
|
||||||
{emqx_authn_status, gauge},
|
|
||||||
{emqx_authn_nomatch, counter},
|
|
||||||
{emqx_authn_total, counter},
|
|
||||||
{emqx_authn_success, counter},
|
|
||||||
{emqx_authn_failed, counter}
|
|
||||||
]).
|
|
||||||
|
|
||||||
-define(AUTHZS_WITH_TYPE, [
|
|
||||||
{emqx_authz_enable, gauge},
|
|
||||||
{emqx_authz_status, gauge},
|
|
||||||
{emqx_authz_nomatch, counter},
|
|
||||||
{emqx_authz_total, counter},
|
|
||||||
{emqx_authz_success, counter},
|
|
||||||
{emqx_authz_failed, counter}
|
|
||||||
]).
|
|
||||||
|
|
||||||
-define(AUTHN_USERS_COUNT_WITH_TYPE, [
|
|
||||||
{emqx_authn_users_count, gauge}
|
|
||||||
]).
|
|
||||||
|
|
||||||
-define(AUTHZ_RULES_COUNT_WITH_TYPE, [
|
|
||||||
{emqx_authz_rules_count, gauge}
|
|
||||||
]).
|
|
||||||
|
|
||||||
-define(BANNED_WITH_TYPE, [
|
|
||||||
{emqx_banned_count, gauge}
|
|
||||||
]).
|
|
||||||
|
|
||||||
-define(LOGICAL_SUM_METRIC_NAMES, [
|
|
||||||
emqx_authn_enable,
|
|
||||||
emqx_authn_status,
|
|
||||||
emqx_authz_enable,
|
|
||||||
emqx_authz_status
|
|
||||||
]).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Collector API
|
%% Collector API
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -132,11 +95,11 @@ deregister_cleanup(_) -> ok.
|
||||||
%% erlfmt-ignore
|
%% erlfmt-ignore
|
||||||
collect_mf(?PROMETHEUS_AUTH_REGISTRY, Callback) ->
|
collect_mf(?PROMETHEUS_AUTH_REGISTRY, Callback) ->
|
||||||
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
|
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
|
||||||
ok = add_collect_family(Callback, ?AUTHNS_WITH_TYPE, ?MG(authn, RawData)),
|
ok = add_collect_family(Callback, authn_metric_meta(), ?MG(authn_data, RawData)),
|
||||||
ok = add_collect_family(Callback, ?AUTHN_USERS_COUNT_WITH_TYPE, ?MG(authn_users_count, RawData)),
|
ok = add_collect_family(Callback, authn_users_count_metric_meta(), ?MG(authn_users_count_data, RawData)),
|
||||||
ok = add_collect_family(Callback, ?AUTHZS_WITH_TYPE, ?MG(authz, RawData)),
|
ok = add_collect_family(Callback, authz_metric_meta(), ?MG(authz_data, RawData)),
|
||||||
ok = add_collect_family(Callback, ?AUTHZ_RULES_COUNT_WITH_TYPE, ?MG(authz_rules_count, RawData)),
|
ok = add_collect_family(Callback, authz_rules_count_metric_meta(), ?MG(authz_rules_count_data, RawData)),
|
||||||
ok = add_collect_family(Callback, ?BANNED_WITH_TYPE, ?MG(banned_count, RawData)),
|
ok = add_collect_family(Callback, banned_count_metric_meta(), ?MG(banned_count_data, RawData)),
|
||||||
ok;
|
ok;
|
||||||
collect_mf(_, _) ->
|
collect_mf(_, _) ->
|
||||||
ok.
|
ok.
|
||||||
|
@ -145,8 +108,8 @@ collect_mf(_, _) ->
|
||||||
collect(<<"json">>) ->
|
collect(<<"json">>) ->
|
||||||
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
|
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
|
||||||
#{
|
#{
|
||||||
emqx_authn => collect_json_data(?MG(authn, RawData)),
|
emqx_authn => collect_json_data(?MG(authn_data, RawData)),
|
||||||
emqx_authz => collect_json_data(?MG(authz, RawData)),
|
emqx_authz => collect_json_data(?MG(authz_data, RawData)),
|
||||||
emqx_banned => collect_banned_data()
|
emqx_banned => collect_banned_data()
|
||||||
};
|
};
|
||||||
collect(<<"prometheus">>) ->
|
collect(<<"prometheus">>) ->
|
||||||
|
@ -165,25 +128,30 @@ collect_metrics(Name, Metrics) ->
|
||||||
%% behaviour
|
%% behaviour
|
||||||
fetch_data_from_local_node() ->
|
fetch_data_from_local_node() ->
|
||||||
{node(self()), #{
|
{node(self()), #{
|
||||||
authn => authn_data(),
|
authn_data => authn_data(),
|
||||||
authz => authz_data()
|
authz_data => authz_data()
|
||||||
}}.
|
}}.
|
||||||
|
|
||||||
fetch_cluster_consistented_data() ->
|
fetch_cluster_consistented_data() ->
|
||||||
#{
|
#{
|
||||||
authn_users_count => authn_users_count_data(),
|
authn_users_count_data => authn_users_count_data(),
|
||||||
authz_rules_count => authz_rules_count_data(),
|
authz_rules_count_data => authz_rules_count_data(),
|
||||||
banned_count => banned_count_data()
|
banned_count_data => banned_count_data()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
aggre_or_zip_init_acc() ->
|
aggre_or_zip_init_acc() ->
|
||||||
#{
|
#{
|
||||||
authn => maps:from_keys(authn_metric_names(), []),
|
authn_data => maps:from_keys(authn_metric(names), []),
|
||||||
authz => maps:from_keys(authz_metric_names(), [])
|
authz_data => maps:from_keys(authz_metric(names), [])
|
||||||
}.
|
}.
|
||||||
|
|
||||||
logic_sum_metrics() ->
|
logic_sum_metrics() ->
|
||||||
?LOGICAL_SUM_METRIC_NAMES.
|
[
|
||||||
|
emqx_authn_enable,
|
||||||
|
emqx_authn_status,
|
||||||
|
emqx_authz_enable,
|
||||||
|
emqx_authz_status
|
||||||
|
].
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Collector
|
%% Collector
|
||||||
|
@ -243,6 +211,19 @@ collect_auth(emqx_banned_count, Data) ->
|
||||||
%%====================
|
%%====================
|
||||||
%% Authn overview
|
%% Authn overview
|
||||||
|
|
||||||
|
authn_metric_meta() ->
|
||||||
|
[
|
||||||
|
{emqx_authn_enable, gauge},
|
||||||
|
{emqx_authn_status, gauge},
|
||||||
|
{emqx_authn_nomatch, counter},
|
||||||
|
{emqx_authn_total, counter},
|
||||||
|
{emqx_authn_success, counter},
|
||||||
|
{emqx_authn_failed, counter}
|
||||||
|
].
|
||||||
|
|
||||||
|
authn_metric(names) ->
|
||||||
|
emqx_prometheus_cluster:metric_names(authn_metric_meta()).
|
||||||
|
|
||||||
-spec authn_data() -> #{Key => [Point]} when
|
-spec authn_data() -> #{Key => [Point]} when
|
||||||
Key :: authn_metric_name(),
|
Key :: authn_metric_name(),
|
||||||
Point :: {[Label], Metric},
|
Point :: {[Label], Metric},
|
||||||
|
@ -256,7 +237,7 @@ authn_data() ->
|
||||||
AccIn#{Key => authn_backend_to_points(Key, Authns)}
|
AccIn#{Key => authn_backend_to_points(Key, Authns)}
|
||||||
end,
|
end,
|
||||||
#{},
|
#{},
|
||||||
authn_metric_names()
|
authn_metric(names)
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec authn_backend_to_points(Key, list(Authn)) -> list(Point) when
|
-spec authn_backend_to_points(Key, list(Authn)) -> list(Point) when
|
||||||
|
@ -287,15 +268,17 @@ lookup_authn_metrics_local(Id) ->
|
||||||
emqx_authn_failed => ?MG0(failed, Counters)
|
emqx_authn_failed => ?MG0(failed, Counters)
|
||||||
};
|
};
|
||||||
{error, _Reason} ->
|
{error, _Reason} ->
|
||||||
maps:from_keys(authn_metric_names() -- [emqx_authn_enable], 0)
|
maps:from_keys(authn_metric(names) -- [emqx_authn_enable], 0)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
authn_metric_names() ->
|
|
||||||
emqx_prometheus_cluster:metric_names(?AUTHNS_WITH_TYPE).
|
|
||||||
|
|
||||||
%%====================
|
%%====================
|
||||||
%% Authn users count
|
%% Authn users count
|
||||||
|
|
||||||
|
authn_users_count_metric_meta() ->
|
||||||
|
[
|
||||||
|
{emqx_authn_users_count, gauge}
|
||||||
|
].
|
||||||
|
|
||||||
-define(AUTHN_MNESIA, emqx_authn_mnesia).
|
-define(AUTHN_MNESIA, emqx_authn_mnesia).
|
||||||
-define(AUTHN_SCRAM_MNESIA, emqx_authn_scram_mnesia).
|
-define(AUTHN_SCRAM_MNESIA, emqx_authn_scram_mnesia).
|
||||||
|
|
||||||
|
@ -321,6 +304,19 @@ authn_users_count_data() ->
|
||||||
%%====================
|
%%====================
|
||||||
%% Authz overview
|
%% Authz overview
|
||||||
|
|
||||||
|
authz_metric_meta() ->
|
||||||
|
[
|
||||||
|
{emqx_authz_enable, gauge},
|
||||||
|
{emqx_authz_status, gauge},
|
||||||
|
{emqx_authz_nomatch, counter},
|
||||||
|
{emqx_authz_total, counter},
|
||||||
|
{emqx_authz_success, counter},
|
||||||
|
{emqx_authz_failed, counter}
|
||||||
|
].
|
||||||
|
|
||||||
|
authz_metric(names) ->
|
||||||
|
emqx_prometheus_cluster:metric_names(authz_metric_meta()).
|
||||||
|
|
||||||
-spec authz_data() -> #{Key => [Point]} when
|
-spec authz_data() -> #{Key => [Point]} when
|
||||||
Key :: authz_metric_name(),
|
Key :: authz_metric_name(),
|
||||||
Point :: {[Label], Metric},
|
Point :: {[Label], Metric},
|
||||||
|
@ -334,7 +330,7 @@ authz_data() ->
|
||||||
AccIn#{Key => authz_backend_to_points(Key, Authzs)}
|
AccIn#{Key => authz_backend_to_points(Key, Authzs)}
|
||||||
end,
|
end,
|
||||||
#{},
|
#{},
|
||||||
authz_metric_names()
|
authz_metric(names)
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec authz_backend_to_points(Key, list(Authz)) -> list(Point) when
|
-spec authz_backend_to_points(Key, list(Authz)) -> list(Point) when
|
||||||
|
@ -365,15 +361,17 @@ lookup_authz_metrics_local(Type) ->
|
||||||
emqx_authz_failed => ?MG0(failed, Counters)
|
emqx_authz_failed => ?MG0(failed, Counters)
|
||||||
};
|
};
|
||||||
{error, _Reason} ->
|
{error, _Reason} ->
|
||||||
maps:from_keys(authz_metric_names() -- [emqx_authz_enable], 0)
|
maps:from_keys(authz_metric(names) -- [emqx_authz_enable], 0)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
authz_metric_names() ->
|
|
||||||
emqx_prometheus_cluster:metric_names(?AUTHZS_WITH_TYPE).
|
|
||||||
|
|
||||||
%%====================
|
%%====================
|
||||||
%% Authz rules count
|
%% Authz rules count
|
||||||
|
|
||||||
|
authz_rules_count_metric_meta() ->
|
||||||
|
[
|
||||||
|
{emqx_authz_rules_count, gauge}
|
||||||
|
].
|
||||||
|
|
||||||
-define(ACL_TABLE, emqx_acl).
|
-define(ACL_TABLE, emqx_acl).
|
||||||
|
|
||||||
authz_rules_count_data() ->
|
authz_rules_count_data() ->
|
||||||
|
@ -400,7 +398,13 @@ authz_rules_count_data() ->
|
||||||
%%====================
|
%%====================
|
||||||
%% Banned count
|
%% Banned count
|
||||||
|
|
||||||
-define(BANNED_TABLE, emqx_banned).
|
banned_count_metric_meta() ->
|
||||||
|
[
|
||||||
|
{emqx_banned_count, gauge}
|
||||||
|
].
|
||||||
|
-define(BANNED_TABLE,
|
||||||
|
emqx_banned
|
||||||
|
).
|
||||||
banned_count_data() ->
|
banned_count_data() ->
|
||||||
mnesia_size(?BANNED_TABLE).
|
mnesia_size(?BANNED_TABLE).
|
||||||
|
|
||||||
|
|
|
@ -65,65 +65,6 @@
|
||||||
-define(MG(K, MAP), maps:get(K, MAP)).
|
-define(MG(K, MAP), maps:get(K, MAP)).
|
||||||
-define(MG0(K, MAP), maps:get(K, MAP, 0)).
|
-define(MG0(K, MAP), maps:get(K, MAP, 0)).
|
||||||
|
|
||||||
-define(RULES_WITH_TYPE, [
|
|
||||||
{emqx_rules_count, gauge}
|
|
||||||
]).
|
|
||||||
|
|
||||||
-define(CONNECTORS_WITH_TYPE, [
|
|
||||||
{emqx_connectors_count, gauge}
|
|
||||||
]).
|
|
||||||
|
|
||||||
-define(RULES_SPECIFIC_WITH_TYPE, [
|
|
||||||
{emqx_rule_enable, gauge},
|
|
||||||
{emqx_rule_matched, counter},
|
|
||||||
{emqx_rule_failed, counter},
|
|
||||||
{emqx_rule_passed, counter},
|
|
||||||
{emqx_rule_failed_exception, counter},
|
|
||||||
{emqx_rule_failed_no_result, counter},
|
|
||||||
{emqx_rule_actions_total, counter},
|
|
||||||
{emqx_rule_actions_success, counter},
|
|
||||||
{emqx_rule_actions_failed, counter},
|
|
||||||
{emqx_rule_actions_failed_out_of_service, counter},
|
|
||||||
{emqx_rule_actions_failed_unknown, counter}
|
|
||||||
]).
|
|
||||||
|
|
||||||
-define(ACTION_SPECIFIC_WITH_TYPE, [
|
|
||||||
{emqx_action_matched, counter},
|
|
||||||
{emqx_action_dropped, counter},
|
|
||||||
{emqx_action_success, counter},
|
|
||||||
{emqx_action_failed, counter},
|
|
||||||
{emqx_action_inflight, gauge},
|
|
||||||
{emqx_action_received, counter},
|
|
||||||
{emqx_action_late_reply, counter},
|
|
||||||
{emqx_action_retried, counter},
|
|
||||||
{emqx_action_retried_success, counter},
|
|
||||||
{emqx_action_retried_failed, counter},
|
|
||||||
{emqx_action_dropped_resource_stopped, counter},
|
|
||||||
{emqx_action_dropped_resource_not_found, counter},
|
|
||||||
{emqx_action_dropped_queue_full, counter},
|
|
||||||
{emqx_action_dropped_other, counter},
|
|
||||||
{emqx_action_dropped_expired, counter},
|
|
||||||
{emqx_action_queuing, gauge}
|
|
||||||
]).
|
|
||||||
|
|
||||||
-define(CONNECTOR_SPECIFIC_WITH_TYPE, [
|
|
||||||
{emqx_connector_enable, gauge},
|
|
||||||
{emqx_connector_status, gauge}
|
|
||||||
]).
|
|
||||||
|
|
||||||
-if(?EMQX_RELEASE_EDITION == ee).
|
|
||||||
-define(SCHEMA_REGISTRY_WITH_TYPE, [
|
|
||||||
emqx_schema_registrys_count
|
|
||||||
]).
|
|
||||||
-else.
|
|
||||||
-endif.
|
|
||||||
|
|
||||||
-define(LOGICAL_SUM_METRIC_NAMES, [
|
|
||||||
emqx_rule_enable,
|
|
||||||
emqx_connector_enable,
|
|
||||||
emqx_connector_status
|
|
||||||
]).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Callback for emqx_prometheus_cluster
|
%% Callback for emqx_prometheus_cluster
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -132,28 +73,32 @@ fetch_data_from_local_node() ->
|
||||||
Rules = emqx_rule_engine:get_rules(),
|
Rules = emqx_rule_engine:get_rules(),
|
||||||
Bridges = emqx_bridge:list(),
|
Bridges = emqx_bridge:list(),
|
||||||
{node(self()), #{
|
{node(self()), #{
|
||||||
rule_specific_data => rule_specific_data(Rules),
|
rule_metric_data => rule_metric_data(Rules),
|
||||||
action_specific_data => action_specific_data(Bridges),
|
action_metric_data => action_metric_data(Bridges),
|
||||||
connector_specific_data => connector_specific_data(Bridges)
|
connector_metric_data => connector_metric_data(Bridges)
|
||||||
}}.
|
}}.
|
||||||
|
|
||||||
fetch_cluster_consistented_data() ->
|
fetch_cluster_consistented_data() ->
|
||||||
Rules = emqx_rule_engine:get_rules(),
|
Rules = emqx_rule_engine:get_rules(),
|
||||||
Bridges = emqx_bridge:list(),
|
Bridges = emqx_bridge:list(),
|
||||||
(maybe_collect_schema_registry())#{
|
(maybe_collect_schema_registry())#{
|
||||||
rules_data => rules_data(Rules),
|
rules_ov_data => rules_ov_data(Rules),
|
||||||
connectors_data => connectors_data(Bridges)
|
connectors_ov_data => connectors_ov_data(Bridges)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
aggre_or_zip_init_acc() ->
|
aggre_or_zip_init_acc() ->
|
||||||
#{
|
#{
|
||||||
rule_specific_data => maps:from_keys(rule_specific_metric_names(), []),
|
rule_metric_data => maps:from_keys(rule_metric(names), []),
|
||||||
action_specific_data => maps:from_keys(action_specific_metric_names(), []),
|
action_metric_data => maps:from_keys(action_metric(names), []),
|
||||||
connector_specific_data => maps:from_keys(connectr_specific_metric_names(), [])
|
connector_metric_data => maps:from_keys(connectr_metric(names), [])
|
||||||
}.
|
}.
|
||||||
|
|
||||||
logic_sum_metrics() ->
|
logic_sum_metrics() ->
|
||||||
?LOGICAL_SUM_METRIC_NAMES.
|
[
|
||||||
|
emqx_rule_enable,
|
||||||
|
emqx_connector_enable,
|
||||||
|
emqx_connector_status
|
||||||
|
].
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Collector API
|
%% Collector API
|
||||||
|
@ -170,21 +115,23 @@ collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) ->
|
||||||
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
|
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
|
||||||
|
|
||||||
%% Data Integration Overview
|
%% Data Integration Overview
|
||||||
ok = add_collect_family(Callback, ?RULES_WITH_TYPE, ?MG(rules_data, RawData)),
|
ok = add_collect_family(Callback, rules_ov_metric_meta(), ?MG(rules_ov_data, RawData)),
|
||||||
ok = add_collect_family(Callback, ?CONNECTORS_WITH_TYPE, ?MG(connectors_data, RawData)),
|
ok = add_collect_family(
|
||||||
|
Callback, connectors_ov_metric_meta(), ?MG(connectors_ov_data, RawData)
|
||||||
|
),
|
||||||
ok = maybe_collect_family_schema_registry(Callback),
|
ok = maybe_collect_family_schema_registry(Callback),
|
||||||
|
|
||||||
%% Rule Specific
|
%% Rule Metric
|
||||||
RuleSpecificDs = ?MG(rule_specific_data, RawData),
|
RuleMetricDs = ?MG(rule_metric_data, RawData),
|
||||||
ok = add_collect_family(Callback, ?RULES_SPECIFIC_WITH_TYPE, RuleSpecificDs),
|
ok = add_collect_family(Callback, rule_metric_meta(), RuleMetricDs),
|
||||||
|
|
||||||
%% Action Specific
|
%% Action Metric
|
||||||
ActionSpecificDs = ?MG(action_specific_data, RawData),
|
ActionMetricDs = ?MG(action_metric_data, RawData),
|
||||||
ok = add_collect_family(Callback, ?ACTION_SPECIFIC_WITH_TYPE, ActionSpecificDs),
|
ok = add_collect_family(Callback, action_metric_meta(), ActionMetricDs),
|
||||||
|
|
||||||
%% Connector Specific
|
%% Connector Metric
|
||||||
ConnectorSpecificDs = ?MG(connector_specific_data, RawData),
|
ConnectorMetricDs = ?MG(connector_metric_data, RawData),
|
||||||
ok = add_collect_family(Callback, ?CONNECTOR_SPECIFIC_WITH_TYPE, ConnectorSpecificDs),
|
ok = add_collect_family(Callback, connector_metric_meta(), ConnectorMetricDs),
|
||||||
|
|
||||||
ok;
|
ok;
|
||||||
collect_mf(_, _) ->
|
collect_mf(_, _) ->
|
||||||
|
@ -197,9 +144,9 @@ collect(<<"json">>) ->
|
||||||
Bridges = emqx_bridge:list(),
|
Bridges = emqx_bridge:list(),
|
||||||
#{
|
#{
|
||||||
data_integration_overview => collect_data_integration_overview(Rules, Bridges),
|
data_integration_overview => collect_data_integration_overview(Rules, Bridges),
|
||||||
rules => collect_json_data(?MG(rule_specific_data, RawData)),
|
rules => collect_json_data(?MG(rule_metric_data, RawData)),
|
||||||
actions => collect_json_data(?MG(action_specific_data, RawData)),
|
actions => collect_json_data(?MG(action_metric_data, RawData)),
|
||||||
connectors => collect_json_data(?MG(connector_specific_data, RawData))
|
connectors => collect_json_data(?MG(connector_metric_data, RawData))
|
||||||
};
|
};
|
||||||
collect(<<"prometheus">>) ->
|
collect(<<"prometheus">>) ->
|
||||||
prometheus_text_format:format(?PROMETHEUS_DATA_INTEGRATION_REGISTRY).
|
prometheus_text_format:format(?PROMETHEUS_DATA_INTEGRATION_REGISTRY).
|
||||||
|
@ -218,21 +165,6 @@ add_collect_family(Name, Data, Callback, Type) ->
|
||||||
collect_metrics(Name, Metrics) ->
|
collect_metrics(Name, Metrics) ->
|
||||||
collect_di(Name, Metrics).
|
collect_di(Name, Metrics).
|
||||||
|
|
||||||
-if(?EMQX_RELEASE_EDITION == ee).
|
|
||||||
maybe_collect_family_schema_registry(Callback) ->
|
|
||||||
ok = add_collect_family(Callback, ?SCHEMA_REGISTRY_WITH_TYPE, schema_registry_data()),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
maybe_collect_schema_registry() ->
|
|
||||||
schema_registry_data().
|
|
||||||
-else.
|
|
||||||
maybe_collect_family_schema_registry(_) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
maybe_collect_schema_registry() ->
|
|
||||||
#{}.
|
|
||||||
-endif.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Collector
|
%% Collector
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -244,88 +176,54 @@ maybe_collect_schema_registry() ->
|
||||||
%%====================
|
%%====================
|
||||||
%% All Rules
|
%% All Rules
|
||||||
%% Rules
|
%% Rules
|
||||||
collect_di(K = emqx_rules_count, Data) ->
|
collect_di(K = emqx_rules_count, Data) -> gauge_metric(?MG(K, Data));
|
||||||
gauge_metric(?MG(K, Data));
|
|
||||||
%%====================
|
%%====================
|
||||||
%% Schema Registry
|
%% Schema Registry
|
||||||
collect_di(K = emqx_schema_registrys_count, Data) ->
|
collect_di(K = emqx_schema_registrys_count, Data) -> gauge_metric(?MG(K, Data));
|
||||||
gauge_metric(?MG(K, Data));
|
|
||||||
%%====================
|
%%====================
|
||||||
%% Connectors
|
%% Connectors
|
||||||
collect_di(K = emqx_connectors_count, Data) ->
|
collect_di(K = emqx_connectors_count, Data) -> gauge_metric(?MG(K, Data));
|
||||||
gauge_metric(?MG(K, Data));
|
|
||||||
%%========================================
|
%%========================================
|
||||||
%% Data Integration for Specific: Rule && Action && Connector
|
%% Data Integration Metric for: Rule && Action && Connector
|
||||||
%%========================================
|
%%========================================
|
||||||
|
|
||||||
%%====================
|
%%====================
|
||||||
%% Specific Rule
|
%% Rule Metric
|
||||||
collect_di(K = emqx_rule_enable, Data) ->
|
collect_di(K = emqx_rule_enable, Data) -> gauge_metrics(?MG(K, Data));
|
||||||
gauge_metrics(?MG(K, Data));
|
collect_di(K = emqx_rule_matched, Data) -> counter_metrics(?MG(K, Data));
|
||||||
collect_di(K = emqx_rule_matched, Data) ->
|
collect_di(K = emqx_rule_failed, Data) -> counter_metrics(?MG(K, Data));
|
||||||
counter_metrics(?MG(K, Data));
|
collect_di(K = emqx_rule_passed, Data) -> counter_metrics(?MG(K, Data));
|
||||||
collect_di(K = emqx_rule_failed, Data) ->
|
collect_di(K = emqx_rule_failed_exception, Data) -> counter_metrics(?MG(K, Data));
|
||||||
counter_metrics(?MG(K, Data));
|
collect_di(K = emqx_rule_failed_no_result, Data) -> counter_metrics(?MG(K, Data));
|
||||||
collect_di(K = emqx_rule_passed, Data) ->
|
collect_di(K = emqx_rule_actions_total, Data) -> counter_metrics(?MG(K, Data));
|
||||||
counter_metrics(?MG(K, Data));
|
collect_di(K = emqx_rule_actions_success, Data) -> counter_metrics(?MG(K, Data));
|
||||||
collect_di(K = emqx_rule_failed_exception, Data) ->
|
collect_di(K = emqx_rule_actions_failed, Data) -> counter_metrics(?MG(K, Data));
|
||||||
counter_metrics(?MG(K, Data));
|
collect_di(K = emqx_rule_actions_failed_out_of_service, Data) -> counter_metrics(?MG(K, Data));
|
||||||
collect_di(K = emqx_rule_failed_no_result, Data) ->
|
collect_di(K = emqx_rule_actions_failed_unknown, Data) -> counter_metrics(?MG(K, Data));
|
||||||
counter_metrics(?MG(K, Data));
|
|
||||||
collect_di(K = emqx_rule_actions_total, Data) ->
|
|
||||||
counter_metrics(?MG(K, Data));
|
|
||||||
collect_di(K = emqx_rule_actions_success, Data) ->
|
|
||||||
counter_metrics(?MG(K, Data));
|
|
||||||
collect_di(K = emqx_rule_actions_failed, Data) ->
|
|
||||||
counter_metrics(?MG(K, Data));
|
|
||||||
collect_di(K = emqx_rule_actions_failed_out_of_service, Data) ->
|
|
||||||
counter_metrics(?MG(K, Data));
|
|
||||||
collect_di(K = emqx_rule_actions_failed_unknown, Data) ->
|
|
||||||
counter_metrics(?MG(K, Data));
|
|
||||||
%%====================
|
%%====================
|
||||||
%% Specific Action
|
%% Action Metric
|
||||||
|
collect_di(K = emqx_action_matched, Data) -> counter_metrics(?MG(K, Data));
|
||||||
collect_di(K = emqx_action_matched, Data) ->
|
collect_di(K = emqx_action_dropped, Data) -> counter_metrics(?MG(K, Data));
|
||||||
counter_metrics(?MG(K, Data));
|
collect_di(K = emqx_action_success, Data) -> counter_metrics(?MG(K, Data));
|
||||||
collect_di(K = emqx_action_dropped, Data) ->
|
collect_di(K = emqx_action_failed, Data) -> counter_metrics(?MG(K, Data));
|
||||||
counter_metrics(?MG(K, Data));
|
%% inflight type: gauge
|
||||||
collect_di(K = emqx_action_success, Data) ->
|
collect_di(K = emqx_action_inflight, Data) -> gauge_metrics(?MG(K, Data));
|
||||||
counter_metrics(?MG(K, Data));
|
collect_di(K = emqx_action_received, Data) -> counter_metrics(?MG(K, Data));
|
||||||
collect_di(K = emqx_action_failed, Data) ->
|
collect_di(K = emqx_action_late_reply, Data) -> counter_metrics(?MG(K, Data));
|
||||||
counter_metrics(?MG(K, Data));
|
collect_di(K = emqx_action_retried, Data) -> counter_metrics(?MG(K, Data));
|
||||||
collect_di(K = emqx_action_inflight, Data) ->
|
collect_di(K = emqx_action_retried_success, Data) -> counter_metrics(?MG(K, Data));
|
||||||
%% inflight type: gauge
|
collect_di(K = emqx_action_retried_failed, Data) -> counter_metrics(?MG(K, Data));
|
||||||
gauge_metrics(?MG(K, Data));
|
collect_di(K = emqx_action_dropped_resource_stopped, Data) -> counter_metrics(?MG(K, Data));
|
||||||
collect_di(K = emqx_action_received, Data) ->
|
collect_di(K = emqx_action_dropped_resource_not_found, Data) -> counter_metrics(?MG(K, Data));
|
||||||
counter_metrics(?MG(K, Data));
|
collect_di(K = emqx_action_dropped_queue_full, Data) -> counter_metrics(?MG(K, Data));
|
||||||
collect_di(K = emqx_action_late_reply, Data) ->
|
collect_di(K = emqx_action_dropped_other, Data) -> counter_metrics(?MG(K, Data));
|
||||||
counter_metrics(?MG(K, Data));
|
collect_di(K = emqx_action_dropped_expired, Data) -> counter_metrics(?MG(K, Data));
|
||||||
collect_di(K = emqx_action_retried, Data) ->
|
%% queuing type: gauge
|
||||||
counter_metrics(?MG(K, Data));
|
collect_di(K = emqx_action_queuing, Data) -> gauge_metrics(?MG(K, Data));
|
||||||
collect_di(K = emqx_action_retried_success, Data) ->
|
|
||||||
counter_metrics(?MG(K, Data));
|
|
||||||
collect_di(K = emqx_action_retried_failed, Data) ->
|
|
||||||
counter_metrics(?MG(K, Data));
|
|
||||||
collect_di(K = emqx_action_dropped_resource_stopped, Data) ->
|
|
||||||
counter_metrics(?MG(K, Data));
|
|
||||||
collect_di(K = emqx_action_dropped_resource_not_found, Data) ->
|
|
||||||
counter_metrics(?MG(K, Data));
|
|
||||||
collect_di(K = emqx_action_dropped_queue_full, Data) ->
|
|
||||||
counter_metrics(?MG(K, Data));
|
|
||||||
collect_di(K = emqx_action_dropped_other, Data) ->
|
|
||||||
counter_metrics(?MG(K, Data));
|
|
||||||
collect_di(K = emqx_action_dropped_expired, Data) ->
|
|
||||||
counter_metrics(?MG(K, Data));
|
|
||||||
collect_di(K = emqx_action_queuing, Data) ->
|
|
||||||
%% queuing type: gauge
|
|
||||||
gauge_metrics(?MG(K, Data));
|
|
||||||
%%====================
|
%%====================
|
||||||
%% Specific Connector
|
%% Connector Metric
|
||||||
|
collect_di(K = emqx_connector_enable, Data) -> gauge_metrics(?MG(K, Data));
|
||||||
collect_di(K = emqx_connector_enable, Data) ->
|
collect_di(K = emqx_connector_status, Data) -> gauge_metrics(?MG(K, Data)).
|
||||||
gauge_metrics(?MG(K, Data));
|
|
||||||
collect_di(K = emqx_connector_status, Data) ->
|
|
||||||
gauge_metrics(?MG(K, Data)).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
|
@ -338,8 +236,16 @@ collect_di(K = emqx_connector_status, Data) ->
|
||||||
%%====================
|
%%====================
|
||||||
%% All Rules
|
%% All Rules
|
||||||
|
|
||||||
|
rules_ov_metric_meta() ->
|
||||||
|
[
|
||||||
|
{emqx_rules_count, gauge}
|
||||||
|
].
|
||||||
|
|
||||||
|
rules_ov_metric(names) ->
|
||||||
|
emqx_prometheus_cluster:metric_names(rules_ov_metric_meta()).
|
||||||
|
|
||||||
-define(RULE_TAB, emqx_rule_engine).
|
-define(RULE_TAB, emqx_rule_engine).
|
||||||
rules_data(_Rules) ->
|
rules_ov_data(_Rules) ->
|
||||||
#{
|
#{
|
||||||
emqx_rules_count => ets:info(?RULE_TAB, size)
|
emqx_rules_count => ets:info(?RULE_TAB, size)
|
||||||
}.
|
}.
|
||||||
|
@ -348,36 +254,83 @@ rules_data(_Rules) ->
|
||||||
%% Schema Registry
|
%% Schema Registry
|
||||||
|
|
||||||
-if(?EMQX_RELEASE_EDITION == ee).
|
-if(?EMQX_RELEASE_EDITION == ee).
|
||||||
|
|
||||||
|
maybe_collect_family_schema_registry(Callback) ->
|
||||||
|
ok = add_collect_family(Callback, schema_registry_metric_meta(), schema_registry_data()),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
schema_registry_metric_meta() ->
|
||||||
|
[
|
||||||
|
{emqx_schema_registrys_count, gauge}
|
||||||
|
].
|
||||||
|
|
||||||
schema_registry_data() ->
|
schema_registry_data() ->
|
||||||
#{
|
#{
|
||||||
emqx_schema_registrys_count => erlang:map_size(emqx_schema_registry:list_schemas())
|
emqx_schema_registrys_count => erlang:map_size(emqx_schema_registry:list_schemas())
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
maybe_collect_schema_registry() ->
|
||||||
|
schema_registry_data().
|
||||||
|
|
||||||
-else.
|
-else.
|
||||||
|
|
||||||
|
maybe_collect_family_schema_registry(_) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
maybe_collect_schema_registry() ->
|
||||||
|
#{}.
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
%%====================
|
%%====================
|
||||||
%% Connectors
|
%% Connectors
|
||||||
|
|
||||||
connectors_data(Brdiges) ->
|
connectors_ov_metric_meta() ->
|
||||||
|
[
|
||||||
|
{emqx_connectors_count, gauge}
|
||||||
|
].
|
||||||
|
|
||||||
|
connectors_ov_metric(names) ->
|
||||||
|
emqx_prometheus_cluster:metric_names(connectors_ov_metric_meta()).
|
||||||
|
|
||||||
|
connectors_ov_data(Brdiges) ->
|
||||||
#{
|
#{
|
||||||
%% Both Bridge V1 and V2
|
%% Both Bridge V1 and V2
|
||||||
emqx_connectors_count => erlang:length(Brdiges)
|
emqx_connectors_count => erlang:length(Brdiges)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
%%========================================
|
%%========================================
|
||||||
%% Data Integration for Specific: Rule && Action && Connector
|
%% Data Integration Metric for: Rule && Action && Connector
|
||||||
%%========================================
|
%%========================================
|
||||||
|
|
||||||
%%====================
|
%%====================
|
||||||
%% Specific Rule
|
%% Rule Metric
|
||||||
%% With rule_id as label key: `rule_id`
|
%% With rule_id as label key: `rule_id`
|
||||||
|
|
||||||
rule_specific_data(Rules) ->
|
rule_metric_meta() ->
|
||||||
|
[
|
||||||
|
{emqx_rule_enable, gauge},
|
||||||
|
{emqx_rule_matched, counter},
|
||||||
|
{emqx_rule_failed, counter},
|
||||||
|
{emqx_rule_passed, counter},
|
||||||
|
{emqx_rule_failed_exception, counter},
|
||||||
|
{emqx_rule_failed_no_result, counter},
|
||||||
|
{emqx_rule_actions_total, counter},
|
||||||
|
{emqx_rule_actions_success, counter},
|
||||||
|
{emqx_rule_actions_failed, counter},
|
||||||
|
{emqx_rule_actions_failed_out_of_service, counter},
|
||||||
|
{emqx_rule_actions_failed_unknown, counter}
|
||||||
|
].
|
||||||
|
|
||||||
|
rule_metric(names) ->
|
||||||
|
emqx_prometheus_cluster:metric_names(rule_metric_meta()).
|
||||||
|
|
||||||
|
rule_metric_data(Rules) ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(#{id := Id} = Rule, AccIn) ->
|
fun(#{id := Id} = Rule, AccIn) ->
|
||||||
merge_acc_with_rules(Id, get_metric(Rule), AccIn)
|
merge_acc_with_rules(Id, get_metric(Rule), AccIn)
|
||||||
end,
|
end,
|
||||||
maps:from_keys(rule_specific_metric_names(), []),
|
maps:from_keys(rule_metric(names), []),
|
||||||
Rules
|
Rules
|
||||||
).
|
).
|
||||||
|
|
||||||
|
@ -413,20 +366,40 @@ get_metric(#{id := Id, enable := Bool} = _Rule) ->
|
||||||
}
|
}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
rule_specific_metric_names() ->
|
|
||||||
emqx_prometheus_cluster:metric_names(?RULES_SPECIFIC_WITH_TYPE).
|
|
||||||
|
|
||||||
%%====================
|
%%====================
|
||||||
%% Specific Action
|
%% Action Metric
|
||||||
%% With action_id: `{type}:{name}` as label key: `action_id`
|
%% With action_id: `{type}:{name}` as label key: `action_id`
|
||||||
|
|
||||||
action_specific_data(Bridges) ->
|
action_metric_meta() ->
|
||||||
|
[
|
||||||
|
{emqx_action_matched, counter},
|
||||||
|
{emqx_action_dropped, counter},
|
||||||
|
{emqx_action_success, counter},
|
||||||
|
{emqx_action_failed, counter},
|
||||||
|
{emqx_action_inflight, gauge},
|
||||||
|
{emqx_action_received, counter},
|
||||||
|
{emqx_action_late_reply, counter},
|
||||||
|
{emqx_action_retried, counter},
|
||||||
|
{emqx_action_retried_success, counter},
|
||||||
|
{emqx_action_retried_failed, counter},
|
||||||
|
{emqx_action_dropped_resource_stopped, counter},
|
||||||
|
{emqx_action_dropped_resource_not_found, counter},
|
||||||
|
{emqx_action_dropped_queue_full, counter},
|
||||||
|
{emqx_action_dropped_other, counter},
|
||||||
|
{emqx_action_dropped_expired, counter},
|
||||||
|
{emqx_action_queuing, gauge}
|
||||||
|
].
|
||||||
|
|
||||||
|
action_metric(names) ->
|
||||||
|
emqx_prometheus_cluster:metric_names(action_metric_meta()).
|
||||||
|
|
||||||
|
action_metric_data(Bridges) ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(#{type := Type, name := Name} = _Bridge, AccIn) ->
|
fun(#{type := Type, name := Name} = _Bridge, AccIn) ->
|
||||||
Id = emqx_bridge_resource:bridge_id(Type, Name),
|
Id = emqx_bridge_resource:bridge_id(Type, Name),
|
||||||
merge_acc_with_bridges(Id, get_bridge_metric(Type, Name), AccIn)
|
merge_acc_with_bridges(Id, get_bridge_metric(Type, Name), AccIn)
|
||||||
end,
|
end,
|
||||||
maps:from_keys(action_specific_metric_names(), []),
|
maps:from_keys(action_metric(names), []),
|
||||||
Bridges
|
Bridges
|
||||||
).
|
).
|
||||||
|
|
||||||
|
@ -467,20 +440,26 @@ get_bridge_metric(Type, Name) ->
|
||||||
}
|
}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
action_specific_metric_names() ->
|
|
||||||
emqx_prometheus_cluster:metric_names(?ACTION_SPECIFIC_WITH_TYPE).
|
|
||||||
|
|
||||||
%%====================
|
%%====================
|
||||||
%% Specific Connector
|
%% Connector Metric
|
||||||
%% With connector_id: `{type}:{name}` as label key: `connector_id`
|
%% With connector_id: `{type}:{name}` as label key: `connector_id`
|
||||||
|
|
||||||
connector_specific_data(Bridges) ->
|
connector_metric_meta() ->
|
||||||
|
[
|
||||||
|
{emqx_connector_enable, gauge},
|
||||||
|
{emqx_connector_status, gauge}
|
||||||
|
].
|
||||||
|
|
||||||
|
connectr_metric(names) ->
|
||||||
|
emqx_prometheus_cluster:metric_names(connector_metric_meta()).
|
||||||
|
|
||||||
|
connector_metric_data(Bridges) ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(#{type := Type, name := Name} = Bridge, AccIn) ->
|
fun(#{type := Type, name := Name} = Bridge, AccIn) ->
|
||||||
Id = emqx_bridge_resource:bridge_id(Type, Name),
|
Id = emqx_bridge_resource:bridge_id(Type, Name),
|
||||||
merge_acc_with_connectors(Id, get_connector_status(Bridge), AccIn)
|
merge_acc_with_connectors(Id, get_connector_status(Bridge), AccIn)
|
||||||
end,
|
end,
|
||||||
maps:from_keys(connectr_specific_metric_names(), []),
|
maps:from_keys(connectr_metric(names), []),
|
||||||
Bridges
|
Bridges
|
||||||
).
|
).
|
||||||
|
|
||||||
|
@ -504,9 +483,6 @@ get_connector_status(#{resource_data := ResourceData} = _Bridge) ->
|
||||||
emqx_connector_status => emqx_prometheus_cluster:status_to_number(Status)
|
emqx_connector_status => emqx_prometheus_cluster:status_to_number(Status)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
connectr_specific_metric_names() ->
|
|
||||||
emqx_prometheus_cluster:metric_names(?CONNECTOR_SPECIFIC_WITH_TYPE).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Collect functions
|
%% Collect functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -514,18 +490,18 @@ connectr_specific_metric_names() ->
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
%% merge / zip formatting funcs for type `application/json`
|
%% merge / zip formatting funcs for type `application/json`
|
||||||
collect_data_integration_overview(Rules, Bridges) ->
|
collect_data_integration_overview(Rules, Bridges) ->
|
||||||
RulesD = rules_data(Rules),
|
RulesD = rules_ov_data(Rules),
|
||||||
ConnectorsD = connectors_data(Bridges),
|
ConnectorsD = connectors_ov_data(Bridges),
|
||||||
|
|
||||||
M1 = lists:foldl(
|
M1 = lists:foldl(
|
||||||
fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end,
|
fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end,
|
||||||
#{},
|
#{},
|
||||||
emqx_prometheus_cluster:metric_names(?RULES_WITH_TYPE)
|
rules_ov_metric(names)
|
||||||
),
|
),
|
||||||
M2 = lists:foldl(
|
M2 = lists:foldl(
|
||||||
fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end,
|
fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end,
|
||||||
#{},
|
#{},
|
||||||
emqx_prometheus_cluster:metric_names(?CONNECTORS_WITH_TYPE)
|
connectors_ov_metric(names)
|
||||||
),
|
),
|
||||||
M3 = maybe_collect_schema_registry(),
|
M3 = maybe_collect_schema_registry(),
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue