feat(prom): data integration metrics in josn format

This commit is contained in:
JimMoen 2024-01-15 14:41:26 +08:00
parent 36f009b0c2
commit 8f7964f435
No known key found for this signature in database
2 changed files with 130 additions and 32 deletions

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -95,7 +95,6 @@ collect_mf(_, _) ->
%% @private
collect(<<"json">>) ->
%% TODO
#{
emqx_authn => collect_auth_data(authn),
emqx_authz => collect_auth_data(authz),

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -26,24 +26,7 @@
-export([add_collect_family/4]).
-export([
rules/0,
rules_data/1,
actions/0,
actions_data/1,
actions_exec_count/0,
actions_exec_count_data/0,
schema_registry/0,
schema_registry_data/0,
connectors/0,
connectors_data/1,
rule_specific/0,
rule_specific_data/1,
action_specific/0,
action_specific_data/1,
connector_specific/0,
connector_specific_data/1
]).
-export([actions_exec_count/0, actions_exec_count_data/0]).
-include("emqx_prometheus.hrl").
-include_lib("prometheus/include/prometheus.hrl").
@ -81,34 +64,97 @@ deregister_cleanup(_) -> ok.
-spec collect_mf(_Registry, Callback) -> ok when
_Registry :: prometheus_registry:registry(),
Callback :: prometheus_collector:collect_mf_callback().
%% erlfmt-ignore
collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) ->
Rules = emqx_rule_engine:get_rules(),
Bridges =emqx_bridge:list(),
Bridges = emqx_bridge:list(),
%% Data Integration Overview
_ = [add_collect_family(Name, rules_data(Rules), Callback, gauge) || Name <- rules()],
_ = [add_collect_family(Name, actions_data(Rules), Callback, gauge) || Name <- actions()],
_ = [add_collect_family(Name, connectors_data(Bridges), Callback, gauge) || Name <- connectors()],
_ = [add_collect_family(Name, rule_specific_data(Rules), Callback, gauge) || Name <- rule_specific()],
_ = [add_collect_family(Name, action_specific_data(Bridges), Callback, gauge) || Name <- action_specific()],
_ = [add_collect_family(Name, connector_specific_data(Bridges), Callback, gauge) || Name <- connector_specific()],
_ = [
add_collect_family(Name, connectors_data(Bridges), Callback, gauge)
|| Name <- connectors()
],
ok = maybe_collect_family_schema_registry(Callback),
%% Rule Specific
_ = [
add_collect_family(Name, rule_specific_data(Rules), Callback, gauge)
|| Name <- rule_specific()
],
%% Action Specific
_ = [
add_collect_family(Name, action_specific_data(Bridges), Callback, gauge)
|| Name <- action_specific()
],
%% Connector Specific
_ = [
add_collect_family(Name, connector_specific_data(Bridges), Callback, gauge)
|| Name <- connector_specific()
],
ok;
collect_mf(_, _) ->
ok.
%% @private
collect(<<"json">>) ->
%% TODO
#{};
Rules = emqx_rule_engine:get_rules(),
Bridges = emqx_bridge:list(),
#{
data_integration_overview => collect_data_integration(overview, {Rules, Bridges}),
rules => collect_data_integration(rules, Rules),
actions => collect_data_integration(actions, Bridges),
connectors => collect_data_integration(connectors, Bridges)
};
collect(<<"prometheus">>) ->
prometheus_text_format:format(?PROMETHEUS_DATA_INTEGRATION_REGISTRY).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
add_collect_family(Name, Data, Callback, Type) ->
Callback(create_mf(Name, _Help = <<"">>, Type, ?MODULE, Data)).
collect_metrics(Name, Metrics) ->
collect_di(Name, Metrics).
collect_data_integration(overview, {Rules, Bridges}) ->
RulesD = rules_data(Rules),
ActionsD = actions_data(Rules),
ConnectorsD = connectors_data(Bridges),
M1 = lists:foldl(fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end, #{}, rules()),
M2 = lists:foldl(fun(K, AccIn) -> AccIn#{K => ?MG(K, ActionsD)} end, #{}, actions()),
M3 = lists:foldl(fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end, #{}, connectors()),
M4 = maybe_collect_schema_registry(),
lists:foldl(fun(M, AccIn) -> maps:merge(M, AccIn) end, #{}, [M1, M2, M3, M4]);
collect_data_integration(Type = rules, Rules) ->
maps:fold(
fun(K, V, Acc) ->
zip_metrics(Type, K, V, Acc)
end,
[],
di_data(Type, Rules)
);
collect_data_integration(Type = actions, Rules) ->
maps:fold(
fun(K, V, Acc) ->
zip_metrics(Type, K, V, Acc)
end,
[],
di_data(Type, Rules)
);
collect_data_integration(Type = connectors, Bridges) ->
maps:fold(
fun(K, V, Acc) ->
zip_metrics(Type, K, V, Acc)
end,
[],
di_data(Type, Bridges)
).
-if(?EMQX_RELEASE_EDITION == ee).
maybe_collect_family_schema_registry(Callback) ->
_ = [
@ -116,9 +162,15 @@ maybe_collect_family_schema_registry(Callback) ->
|| Name <- schema_registry()
],
ok.
maybe_collect_schema_registry() ->
schema_registry_data().
-else.
maybe_collect_family_schema_registry(_) ->
ok.
maybe_collect_schema_registry() ->
#{}.
-endif.
%%--------------------------------------------------------------------
@ -307,7 +359,7 @@ actions_exec_count() ->
].
actions_exec_count_data() ->
[].
#{}.
%%====================
%% Schema Registry
@ -485,8 +537,6 @@ get_bridge_metric(Type, Name) ->
}
end.
%% TODO: Bridge V2
%%====================
%% Specific Connector
%% With connector_id: `{type}:{name}` as label key: `connector_id`
@ -537,3 +587,52 @@ boolean_to_number(false) -> 0.
status_to_number(connected) -> 1;
status_to_number(disconnected) -> 0.
zip_metrics(Type, K, V, Acc) ->
LabelK = label_key(Type),
do_zip_metrics(LabelK, K, V, Acc).
do_zip_metrics(LabelK, Key, Points, [] = _AccIn) ->
lists:foldl(
fun({[{K, LabelV}], Metric}, AccIn2) when K =:= LabelK ->
%% for initialized empty AccIn
%% The following fields will be put into Result
%% For Rules:
%% `id` => [RULE_ID]
%% For Actions
%% `id` => [ACTION_ID]
%% FOR Connectors
%% `id` => [CONNECTOR_ID] %% CONNECTOR_ID = BRIDGE_ID
%% formatted with {type}:{name}
Point =
#{
LabelK => LabelV, Key => Metric
},
[Point | AccIn2]
end,
[],
Points
);
do_zip_metrics(LabelK, Key, Points, AllResultedAcc) ->
ThisKeyResult = lists:foldl(
fun({[{K, Id}], Metric}, AccIn2) when K =:= LabelK ->
[#{LabelK => Id, Key => Metric} | AccIn2]
end,
[],
Points
),
lists:zipwith(
fun(AllResulted, ThisKeyMetricOut) ->
maps:merge(AllResulted, ThisKeyMetricOut)
end,
AllResultedAcc,
ThisKeyResult
).
di_data(rules, Rules) -> rule_specific_data(Rules);
di_data(actions, Bridges) -> action_specific_data(Bridges);
di_data(connectors, Bridges) -> connector_specific_data(Bridges).
label_key(rules) -> id;
label_key(actions) -> id;
label_key(connectors) -> id.