fix(prom_di): compatibility with bridge_v1

This commit is contained in:
JimMoen 2024-01-25 19:31:59 +08:00
parent 7b041683bb
commit 4dca1ef848
No known key found for this signature in database
2 changed files with 58 additions and 17 deletions

View File

@ -24,7 +24,6 @@
collect_json_data/2, collect_json_data/2,
aggre_cluster/3, aggre_cluster/3,
%% with_node_name_label/2,
point_to_map_fun/1, point_to_map_fun/1,

View File

@ -73,20 +73,23 @@
fetch_from_local_node(Mode) -> fetch_from_local_node(Mode) ->
Rules = emqx_rule_engine:get_rules(), Rules = emqx_rule_engine:get_rules(),
Bridges = emqx_bridge_v2:list(?ROOT_KEY_ACTIONS), BridgesV1 = emqx:get_config([bridges], #{}),
BridgeV2Actions = emqx_bridge_v2:list(?ROOT_KEY_ACTIONS),
Connectors = emqx_connector:list(), Connectors = emqx_connector:list(),
{node(self()), #{ {node(self()), #{
rule_metric_data => rule_metric_data(Mode, Rules), rule_metric_data => rule_metric_data(Mode, Rules),
action_metric_data => action_metric_data(Mode, Bridges), action_metric_data => action_metric_data(Mode, BridgeV2Actions),
connector_metric_data => connector_metric_data(Mode, Connectors) connector_metric_data => connector_metric_data(Mode, BridgesV1, Connectors)
}}. }}.
fetch_cluster_consistented_data() -> fetch_cluster_consistented_data() ->
Rules = emqx_rule_engine:get_rules(), Rules = emqx_rule_engine:get_rules(),
%% for bridge v1
BridgesV1 = emqx:get_config([bridges], #{}),
Connectors = emqx_connector:list(), Connectors = emqx_connector:list(),
(maybe_collect_schema_registry())#{ (maybe_collect_schema_registry())#{
rules_ov_data => rules_ov_data(Rules), rules_ov_data => rules_ov_data(Rules),
connectors_ov_data => connectors_ov_data(Connectors) connectors_ov_data => connectors_ov_data(BridgesV1, Connectors)
}. }.
aggre_or_zip_init_acc() -> aggre_or_zip_init_acc() ->
@ -144,9 +147,13 @@ collect_mf(_, _) ->
collect(<<"json">>) -> collect(<<"json">>) ->
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
Rules = emqx_rule_engine:get_rules(), Rules = emqx_rule_engine:get_rules(),
Bridges = emqx_bridge:list(), Connectors = emqx_connector:list(),
%% for bridge v1
BridgesV1 = emqx:get_config([bridges], #{}),
#{ #{
data_integration_overview => collect_data_integration_overview(Rules, Bridges), data_integration_overview => collect_data_integration_overview(
Rules, BridgesV1, Connectors
),
rules => collect_json_data(?MG(rule_metric_data, RawData)), rules => collect_json_data(?MG(rule_metric_data, RawData)),
actions => collect_json_data(?MG(action_metric_data, RawData)), actions => collect_json_data(?MG(action_metric_data, RawData)),
connectors => collect_json_data(?MG(connector_metric_data, RawData)) connectors => collect_json_data(?MG(connector_metric_data, RawData))
@ -298,10 +305,17 @@ connectors_ov_metric_meta() ->
connectors_ov_metric(names) -> connectors_ov_metric(names) ->
emqx_prometheus_cluster:metric_names(connectors_ov_metric_meta()). emqx_prometheus_cluster:metric_names(connectors_ov_metric_meta()).
connectors_ov_data(Connectors) -> connectors_ov_data(BridgesV1, Connectors) ->
%% Both Bridge V1 and V2
V1ConnectorsCnt = maps:fold(
fun(_Type, NameAndConf, AccIn) ->
AccIn + maps:size(NameAndConf)
end,
0,
BridgesV1
),
#{ #{
%% Both Bridge V1 and V2 emqx_connectors_count => erlang:length(Connectors) + V1ConnectorsCnt
emqx_connectors_count => erlang:length(Connectors)
}. }.
%%======================================== %%========================================
@ -466,16 +480,44 @@ connector_metric_meta() ->
connectr_metric(names) -> connectr_metric(names) ->
emqx_prometheus_cluster:metric_names(connector_metric_meta()). emqx_prometheus_cluster:metric_names(connector_metric_meta()).
connector_metric_data(Mode, Connectors) -> connector_metric_data(Mode, BridgesV1, Connectors) ->
AccIn = maps:from_keys(connectr_metric(names), []),
Acc0 = connector_metric_data_v1(Mode, BridgesV1, AccIn),
_AccOut = connector_metric_data_v2(Mode, Connectors, Acc0).
connector_metric_data_v2(Mode, Connectors, InitAcc) ->
lists:foldl( lists:foldl(
fun(#{type := Type, name := Name} = Connector, AccIn) -> fun(#{type := Type, name := Name, resource_data := ResourceData} = _Connector, AccIn) ->
Id = emqx_connector_resource:connector_id(Type, Name), Id = emqx_connector_resource:connector_id(Type, Name),
merge_acc_with_connectors(Mode, Id, get_connector_status(Connector), AccIn) merge_acc_with_connectors(Mode, Id, get_connector_status(ResourceData), AccIn)
end, end,
maps:from_keys(connectr_metric(names), []), InitAcc,
Connectors Connectors
). ).
connector_metric_data_v1(Mode, BridgesV1, InitAcc) ->
maps:fold(
fun(Type, NameAndConfMap, Acc0) ->
maps:fold(
fun(Name, _Conf, Acc1) ->
BridgeV1Id = emqx_bridge_resource:resource_id(Type, Name),
case emqx_resource:get_instance(BridgeV1Id) of
{error, not_found} ->
Acc1;
{ok, _, ResourceData} ->
merge_acc_with_connectors(
Mode, BridgeV1Id, get_connector_status(ResourceData), Acc1
)
end
end,
Acc0,
NameAndConfMap
)
end,
InitAcc,
BridgesV1
).
merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) -> merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) ->
maps:fold( maps:fold(
fun(K, V, AccIn) -> fun(K, V, AccIn) ->
@ -488,7 +530,7 @@ merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) ->
connector_point(Mode, Id, V) -> connector_point(Mode, Id, V) ->
{with_node_label(Mode, [{id, Id}]), V}. {with_node_label(Mode, [{id, Id}]), V}.
get_connector_status(#{resource_data := ResourceData} = _Connector) -> get_connector_status(ResourceData) ->
Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData), Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData),
Status = ?MG(status, ResourceData), Status = ?MG(status, ResourceData),
#{ #{
@ -502,9 +544,9 @@ get_connector_status(#{resource_data := ResourceData} = _Connector) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% merge / zip formatting funcs for type `application/json` %% merge / zip formatting funcs for type `application/json`
collect_data_integration_overview(Rules, Bridges) -> collect_data_integration_overview(Rules, BridgesV1, Connectors) ->
RulesD = rules_ov_data(Rules), RulesD = rules_ov_data(Rules),
ConnectorsD = connectors_ov_data(Bridges), ConnectorsD = connectors_ov_data(BridgesV1, Connectors),
M1 = lists:foldl( M1 = lists:foldl(
fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end, fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end,