fix(prom): use name `mode` and macros to put/get format mode

This commit is contained in:
JimMoen 2024-01-19 11:23:40 +08:00
parent 8cb12c6a74
commit b480c5b371
No known key found for this signature in database
4 changed files with 47 additions and 28 deletions

View File

@ -28,3 +28,18 @@
?PROMETHEUS_AUTH_REGISTRY, ?PROMETHEUS_AUTH_REGISTRY,
?PROMETHEUS_DATA_INTEGRATION_REGISTRY ?PROMETHEUS_DATA_INTEGRATION_REGISTRY
]). ]).
-define(PROM_DATA_MODE__NODE, node).
-define(PROM_DATA_MODE__ALL_NODES_AGGREGATED, all_nodes_aggregated).
-define(PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, all_nodes_unaggregated).
-define(PROM_DATA_MODES, [
?PROM_DATA_MODE__NODE,
?PROM_DATA_MODE__ALL_NODES_AGGREGATED,
?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED
]).
-define(PROM_DATA_MODE_KEY__, prom_data_mode).
-define(PUT_PROM_DATA_MODE(MODE__), erlang:put(?PROM_DATA_MODE_KEY__, MODE__)).
-define(GET_PROM_DATA_MODE(), erlang:get(?PROM_DATA_MODE_KEY__)).

View File

@ -18,6 +18,7 @@
-behaviour(minirest_api). -behaviour(minirest_api).
-include("emqx_prometheus.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
@ -86,7 +87,7 @@ schema("/prometheus/auth") ->
#{ #{
description => ?DESC(get_prom_auth_data), description => ?DESC(get_prom_auth_data),
tags => ?TAGS, tags => ?TAGS,
parameters => [ref(format_mode)], parameters => [ref(mode)],
security => security(), security => security(),
responses => responses =>
#{200 => prometheus_data_schema()} #{200 => prometheus_data_schema()}
@ -99,7 +100,7 @@ schema("/prometheus/stats") ->
#{ #{
description => ?DESC(get_prom_data), description => ?DESC(get_prom_data),
tags => ?TAGS, tags => ?TAGS,
parameters => [ref(format_mode)], parameters => [ref(mode)],
security => security(), security => security(),
responses => responses =>
#{200 => prometheus_data_schema()} #{200 => prometheus_data_schema()}
@ -112,7 +113,7 @@ schema("/prometheus/data_integration") ->
#{ #{
description => ?DESC(get_prom_data_integration_data), description => ?DESC(get_prom_data_integration_data),
tags => ?TAGS, tags => ?TAGS,
parameters => [ref(format_mode)], parameters => [ref(mode)],
security => security(), security => security(),
responses => responses =>
#{200 => prometheus_data_schema()} #{200 => prometheus_data_schema()}
@ -125,11 +126,11 @@ security() ->
false -> [] false -> []
end. end.
fields(format_mode) -> fields(mode) ->
[ [
{format_mode, {mode,
mk( mk(
hoconsc:enum([node, nodes_aggregated, nodes_unaggregated]), hoconsc:enum(?PROM_DATA_MODES),
#{ #{
default => node, default => node,
desc => <<"Metrics format mode.">>, desc => <<"Metrics format mode.">>,
@ -178,8 +179,13 @@ data_integration(get, #{headers := Headers, query_string := Qs}) ->
%% Internal funcs %% Internal funcs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
collect(Module, #{type := Type, format_mode := FormatMode}) -> collect(Module, #{type := Type, mode := Mode}) ->
erlang:put(format_mode, FormatMode), %% `Mode` is used to control the format of the returned data
%% It will used in callback `Module:collect_mf/1` to fetch data from node or cluster
%% And use this mode parameter to determine the formatting method of the returned information.
%% Since the arity of the callback function has been fixed.
%% so it is placed in the process dictionary of the current process.
?PUT_PROM_DATA_MODE(Mode),
Data = Data =
case erlang:function_exported(Module, collect, 1) of case erlang:function_exported(Module, collect, 1) of
true -> true ->
@ -194,21 +200,20 @@ collect(Module, #{type := Type, format_mode := FormatMode}) ->
gen_response(Type, Data). gen_response(Type, Data).
collect_opts(Headers, Qs) -> collect_opts(Headers, Qs) ->
#{type => response_type(Headers), format_mode => format_mode(Qs)}. #{type => response_type(Headers), mode => mode(Qs)}.
response_type(#{<<"accept">> := <<"application/json">>}) -> response_type(#{<<"accept">> := <<"application/json">>}) ->
<<"json">>; <<"json">>;
response_type(_) -> response_type(_) ->
<<"prometheus">>. <<"prometheus">>.
format_mode(#{<<"format_mode">> := node}) -> mode(#{<<"mode">> := Mode}) ->
node; case lists:member(Mode, ?PROM_DATA_MODES) of
format_mode(#{<<"format_mode">> := nodes_aggregated}) -> true -> Mode;
nodes_aggregated; false -> ?PROM_DATA_MODE__NODE
format_mode(#{<<"format_mode">> := nodes_unaggregated}) -> end;
nodes_unaggregated; mode(_) ->
format_mode(_) -> ?PROM_DATA_MODE__NODE.
node.
gen_response(<<"json">>, Data) -> gen_response(<<"json">>, Data) ->
{200, Data}; {200, Data};

View File

@ -127,7 +127,7 @@ deregister_cleanup(_) -> ok.
Callback :: prometheus_collector:collect_mf_callback(). Callback :: prometheus_collector:collect_mf_callback().
%% erlfmt-ignore %% erlfmt-ignore
collect_mf(?PROMETHEUS_AUTH_REGISTRY, Callback) -> collect_mf(?PROMETHEUS_AUTH_REGISTRY, Callback) ->
RawData = raw_data(erlang:get(format_mode)), RawData = raw_data(?GET_PROM_DATA_MODE()),
ok = add_collect_family(Callback, ?AUTHNS_WITH_TYPE, ?MG(authn, RawData)), ok = add_collect_family(Callback, ?AUTHNS_WITH_TYPE, ?MG(authn, RawData)),
ok = add_collect_family(Callback, ?AUTHN_USERS_COUNT_WITH_TYPE, ?MG(authn_users_count, RawData)), ok = add_collect_family(Callback, ?AUTHN_USERS_COUNT_WITH_TYPE, ?MG(authn_users_count, RawData)),
ok = add_collect_family(Callback, ?AUTHZS_WITH_TYPE, ?MG(authz, RawData)), ok = add_collect_family(Callback, ?AUTHZS_WITH_TYPE, ?MG(authz, RawData)),
@ -139,8 +139,7 @@ collect_mf(_, _) ->
%% @private %% @private
collect(<<"json">>) -> collect(<<"json">>) ->
FormatMode = erlang:get(format_mode), RawData = raw_data(?GET_PROM_DATA_MODE()),
RawData = raw_data(FormatMode),
%% TODO: merge node name in json format %% TODO: merge node name in json format
#{ #{
emqx_authn => collect_json_data(?MG(authn, RawData)), emqx_authn => collect_json_data(?MG(authn, RawData)),
@ -175,14 +174,14 @@ fetch_cluster_consistented_metric_data() ->
}. }.
%% raw data for different format modes %% raw data for different format modes
raw_data(nodes_aggregated) -> raw_data(?PROM_DATA_MODE__ALL_NODES_AGGREGATED) ->
AggregatedNodesMetrics = aggre_cluster(all_nodes_metrics()), AggregatedNodesMetrics = aggre_cluster(all_nodes_metrics()),
maps:merge(AggregatedNodesMetrics, fetch_cluster_consistented_metric_data()); maps:merge(AggregatedNodesMetrics, fetch_cluster_consistented_metric_data());
raw_data(nodes_unaggregated) -> raw_data(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) ->
%% then fold from all nodes %% then fold from all nodes
AllNodesMetrics = with_node_name_label(all_nodes_metrics()), AllNodesMetrics = with_node_name_label(all_nodes_metrics()),
maps:merge(AllNodesMetrics, fetch_cluster_consistented_metric_data()); maps:merge(AllNodesMetrics, fetch_cluster_consistented_metric_data());
raw_data(node) -> raw_data(?PROM_DATA_MODE__NODE) ->
{_Node, LocalNodeMetrics} = fetch_metric_data_from_local_node(), {_Node, LocalNodeMetrics} = fetch_metric_data_from_local_node(),
maps:merge(LocalNodeMetrics, fetch_cluster_consistented_metric_data()). maps:merge(LocalNodeMetrics, fetch_cluster_consistented_metric_data()).

View File

@ -132,7 +132,7 @@ deregister_cleanup(_) -> ok.
_Registry :: prometheus_registry:registry(), _Registry :: prometheus_registry:registry(),
Callback :: prometheus_collector:collect_mf_callback(). Callback :: prometheus_collector:collect_mf_callback().
collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) -> collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) ->
RawData = raw_data(erlang:get(format_mode)), RawData = raw_data(?GET_PROM_DATA_MODE()),
%% Data Integration Overview %% Data Integration Overview
ok = add_collect_family(Callback, ?RULES_WITH_TYPE, ?MG(rules_data, RawData)), ok = add_collect_family(Callback, ?RULES_WITH_TYPE, ?MG(rules_data, RawData)),
@ -157,7 +157,7 @@ collect_mf(_, _) ->
%% @private %% @private
collect(<<"json">>) -> collect(<<"json">>) ->
RawData = raw_data(erlang:get(format_mode)), RawData = raw_data(?GET_PROM_DATA_MODE()),
Rules = emqx_rule_engine:get_rules(), Rules = emqx_rule_engine:get_rules(),
Bridges = emqx_bridge:list(), Bridges = emqx_bridge:list(),
#{ #{
@ -217,14 +217,14 @@ maybe_collect_schema_registry() ->
-endif. -endif.
%% raw data for different format modes %% raw data for different format modes
raw_data(nodes_aggregated) -> raw_data(?PROM_DATA_MODE__ALL_NODES_AGGREGATED) ->
AggregatedNodesMetrics = aggre_cluster(metrics_data_from_all_nodes()), AggregatedNodesMetrics = aggre_cluster(metrics_data_from_all_nodes()),
maps:merge(AggregatedNodesMetrics, fetch_cluster_consistented_metric_data()); maps:merge(AggregatedNodesMetrics, fetch_cluster_consistented_metric_data());
raw_data(nodes_unaggregated) -> raw_data(?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) ->
%% then fold from all nodes %% then fold from all nodes
AllNodesMetrics = with_node_name_label(metrics_data_from_all_nodes()), AllNodesMetrics = with_node_name_label(metrics_data_from_all_nodes()),
maps:merge(AllNodesMetrics, fetch_cluster_consistented_metric_data()); maps:merge(AllNodesMetrics, fetch_cluster_consistented_metric_data());
raw_data(node) -> raw_data(?PROM_DATA_MODE__NODE) ->
{_Node, LocalNodeMetrics} = fetch_metric_data_from_local_node(), {_Node, LocalNodeMetrics} = fetch_metric_data_from_local_node(),
maps:merge(LocalNodeMetrics, fetch_cluster_consistented_metric_data()). maps:merge(LocalNodeMetrics, fetch_cluster_consistented_metric_data()).