fix(prom): connectors specific data

This commit is contained in:
JimMoen 2024-01-15 13:49:38 +08:00
parent 76d9ace582
commit 36f009b0c2
No known key found for this signature in database
1 changed files with 52 additions and 13 deletions

View File

@ -36,13 +36,13 @@
schema_registry/0, schema_registry/0,
schema_registry_data/0, schema_registry_data/0,
connectors/0, connectors/0,
connectors_data/0, connectors_data/1,
rule_specific/0, rule_specific/0,
rule_specific_data/1, rule_specific_data/1,
action_specific/0, action_specific/0,
action_specific_data/0, action_specific_data/1,
connector_specific/0, connector_specific/0,
connector_specific_data/0 connector_specific_data/1
]). ]).
-include("emqx_prometheus.hrl"). -include("emqx_prometheus.hrl").
@ -84,11 +84,13 @@ deregister_cleanup(_) -> ok.
%% erlfmt-ignore %% erlfmt-ignore
collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) -> collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) ->
Rules = emqx_rule_engine:get_rules(), Rules = emqx_rule_engine:get_rules(),
Bridges =emqx_bridge:list(),
_ = [add_collect_family(Name, rules_data(Rules), Callback, gauge) || Name <- rules()], _ = [add_collect_family(Name, rules_data(Rules), Callback, gauge) || Name <- rules()],
_ = [add_collect_family(Name, actions_data(Rules), Callback, gauge) || Name <- actions()], _ = [add_collect_family(Name, actions_data(Rules), Callback, gauge) || Name <- actions()],
_ = [add_collect_family(Name, connectors_data(), Callback, gauge) || Name <- connectors()], _ = [add_collect_family(Name, connectors_data(Bridges), Callback, gauge) || Name <- connectors()],
_ = [add_collect_family(Name, rule_specific_data(Rules), Callback, gauge) || Name <- rule_specific()], _ = [add_collect_family(Name, rule_specific_data(Rules), Callback, gauge) || Name <- rule_specific()],
_ = [add_collect_family(Name, action_specific_data(), Callback, gauge) || Name <- action_specific()], _ = [add_collect_family(Name, action_specific_data(Bridges), Callback, gauge) || Name <- action_specific()],
_ = [add_collect_family(Name, connector_specific_data(Bridges), Callback, gauge) || Name <- connector_specific()],
ok = maybe_collect_family_schema_registry(Callback), ok = maybe_collect_family_schema_registry(Callback),
ok; ok;
collect_mf(_, _) -> collect_mf(_, _) ->
@ -222,11 +224,15 @@ collect_di(K = emqx_action_queuing, Data) ->
collect_di(K = emqx_action_rate_last5m, Data) -> collect_di(K = emqx_action_rate_last5m, Data) ->
gauge_metrics(?MG(K, Data)); gauge_metrics(?MG(K, Data));
collect_di(K = emqx_action_rate_max, Data) -> collect_di(K = emqx_action_rate_max, Data) ->
gauge_metrics(?MG(K, Data)). gauge_metrics(?MG(K, Data));
%%==================== %%====================
%% Specific Connector %% Specific Connector
collect_di(K = emqx_connector_enable, Data) ->
gauge_metrics(?MG(K, Data));
collect_di(K = emqx_connector_status, Data) ->
gauge_metrics(?MG(K, Data)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -328,10 +334,10 @@ connectors() ->
emqx_connector_count emqx_connector_count
]. ].
connectors_data() -> connectors_data(Brdiges) ->
#{ #{
%% Both Bridge V1 and V2 %% Both Bridge V1 and V2
emqx_connector_count => erlang:length(emqx_bridge:list()) emqx_connector_count => erlang:length(Brdiges)
}. }.
%%======================================== %%========================================
@ -429,14 +435,14 @@ action_specific() ->
emqx_action_rate_max emqx_action_rate_max
]. ].
action_specific_data() -> action_specific_data(Bridges) ->
lists:foldl( lists:foldl(
fun(#{type := Type, name := Name} = _Bridge, AccIn) -> fun(#{type := Type, name := Name} = _Bridge, AccIn) ->
Id = emqx_bridge_resource:bridge_id(Type, Name), Id = emqx_bridge_resource:bridge_id(Type, Name),
merge_acc_with_bridges(Id, get_bridge_metric(Type, Name), AccIn) merge_acc_with_bridges(Id, get_bridge_metric(Type, Name), AccIn)
end, end,
maps:from_keys(action_specific(), []), maps:from_keys(action_specific(), []),
emqx_bridge:list() Bridges
). ).
merge_acc_with_bridges(Id, BridgeMetrics, PointsAcc) -> merge_acc_with_bridges(Id, BridgeMetrics, PointsAcc) ->
@ -491,10 +497,43 @@ connector_specific() ->
emqx_connector_status emqx_connector_status
]. ].
connector_specific_data() -> connector_specific_data(Bridges) ->
[]. lists:foldl(
fun(#{type := Type, name := Name} = Bridge, AccIn) ->
Id = emqx_bridge_resource:bridge_id(Type, Name),
merge_acc_with_connectors(Id, get_connector_status(Bridge), AccIn)
end,
maps:from_keys(connector_specific(), []),
Bridges
).
merge_acc_with_connectors(Id, ConnectorMetrics, PointsAcc) ->
maps:fold(
fun(K, V, AccIn) ->
AccIn#{K => [connector_point(Id, V) | ?MG(K, AccIn)]}
end,
PointsAcc,
ConnectorMetrics
).
connector_point(Id, V) ->
{[{id, Id}], V}.
get_connector_status(#{resource_data := ResourceData} = _Bridge) ->
Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData),
Status = ?MG(status, ResourceData),
#{
emqx_connector_enable => boolean_to_number(Enabled),
emqx_connector_status => status_to_number(Status)
}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Help funcs %% Help funcs
boolean_to_number(true) -> 1;
boolean_to_number(false) -> 0.
status_to_number(connected) -> 1;
status_to_number(disconnected) -> 0.