emqx/apps/emqx_prometheus/src/emqx_prometheus_api.erl

364 lines
11 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_prometheus_api).
-behaviour(minirest_api).
-include("emqx_prometheus.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-import(
hoconsc,
[
mk/2,
ref/1
]
).
-export([
api_spec/0,
paths/0,
schema/1,
fields/1,
namespace/0
]).
%% handlers
-export([
setting/2,
stats/2,
auth/2,
data_integration/2,
schema_validation/2,
message_transformation/2
]).
-export([lookup_from_local_nodes/3]).
-define(TAGS, [<<"Monitor">>]).
-define(IS_TRUE(Val), ((Val =:= true) orelse (Val =:= <<"true">>))).
-define(IS_FALSE(Val), ((Val =:= false) orelse (Val =:= <<"false">>))).
namespace() -> undefined.
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() ->
[
"/prometheus",
"/prometheus/auth",
"/prometheus/stats",
"/prometheus/data_integration"
] ++ paths_ee().
-if(?EMQX_RELEASE_EDITION == ee).
paths_ee() ->
[
"/prometheus/schema_validation",
"/prometheus/message_transformation"
].
%% ELSE if(?EMQX_RELEASE_EDITION == ee).
-else.
paths_ee() ->
[].
%% END if(?EMQX_RELEASE_EDITION == ee).
-endif.
schema("/prometheus") ->
#{
'operationId' => setting,
get =>
#{
description => ?DESC(get_prom_conf_info),
tags => ?TAGS,
responses =>
#{200 => prometheus_setting_response()}
},
put =>
#{
description => ?DESC(update_prom_conf_info),
tags => ?TAGS,
'requestBody' => prometheus_setting_request(),
responses =>
#{200 => prometheus_setting_response()}
}
};
schema("/prometheus/auth") ->
#{
'operationId' => auth,
get =>
#{
description => ?DESC(get_prom_auth_data),
tags => ?TAGS,
parameters => [ref(mode)],
security => security(),
responses =>
#{200 => prometheus_data_schema()}
}
};
schema("/prometheus/stats") ->
#{
'operationId' => stats,
get =>
#{
description => ?DESC(get_prom_data),
tags => ?TAGS,
parameters => [ref(mode)],
security => security(),
responses =>
#{200 => prometheus_data_schema()}
}
};
schema("/prometheus/data_integration") ->
#{
'operationId' => data_integration,
get =>
#{
description => ?DESC(get_prom_data_integration_data),
tags => ?TAGS,
parameters => [ref(mode)],
security => security(),
responses =>
#{200 => prometheus_data_schema()}
}
};
schema("/prometheus/schema_validation") ->
#{
'operationId' => schema_validation,
get =>
#{
description => ?DESC(get_prom_schema_validation),
tags => ?TAGS,
parameters => [ref(mode)],
security => security(),
responses =>
#{200 => prometheus_data_schema()}
}
};
schema("/prometheus/message_transformation") ->
#{
'operationId' => message_transformation,
get =>
#{
description => ?DESC(get_prom_message_transformation),
tags => ?TAGS,
parameters => [ref(mode)],
security => security(),
responses =>
#{200 => prometheus_data_schema()}
}
}.
security() ->
case emqx_config:get([prometheus, enable_basic_auth], false) of
true -> [#{'basicAuth' => []}, #{'bearerAuth' => []}];
false -> []
end.
%% erlfmt-ignore
fields(mode) ->
[
{mode,
mk(
hoconsc:enum(?PROM_DATA_MODES),
#{
default => node,
desc => <<"
Metrics format mode.
`node`:
Return metrics from local node. And it is the default behaviour if `mode` not specified.
`all_nodes_aggregated`:
Return metrics for all nodes.
And if possible, calculate the arithmetic sum or logical sum of the indicators of all nodes.
`all_nodes_unaggregated`:
Return metrics from all nodes, and the metrics are not aggregated.
The node name will be included in the returned results to
indicate that certain metrics were returned on a certain node.
">>,
in => query,
required => false,
example => node
}
)}
].
%% bpapi
lookup_from_local_nodes(M, F, A) ->
erlang:apply(M, F, A).
%%--------------------------------------------------------------------
%% API Handler funcs
%%--------------------------------------------------------------------
setting(get, _Params) ->
Raw = emqx:get_raw_config([<<"prometheus">>], #{}),
Conf =
case emqx_prometheus_schema:is_recommend_type(Raw) of
true -> Raw;
false -> emqx_prometheus_config:to_recommend_type(Raw)
end,
{200, Conf};
setting(put, #{body := Body}) ->
case emqx_prometheus_config:update(Body) of
{ok, NewConfig} ->
{200, NewConfig};
{error, Reason} ->
Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),
{500, 'INTERNAL_ERROR', Message}
end.
stats(get, #{headers := Headers, query_string := Qs}) ->
collect(emqx_prometheus, collect_opts(Headers, Qs)).
auth(get, #{headers := Headers, query_string := Qs}) ->
collect(emqx_prometheus_auth, collect_opts(Headers, Qs)).
data_integration(get, #{headers := Headers, query_string := Qs}) ->
collect(emqx_prometheus_data_integration, collect_opts(Headers, Qs)).
schema_validation(get, #{headers := Headers, query_string := Qs}) ->
collect(emqx_prometheus_schema_validation, collect_opts(Headers, Qs)).
message_transformation(get, #{headers := Headers, query_string := Qs}) ->
collect(emqx_prometheus_message_transformation, collect_opts(Headers, Qs)).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
collect(Module, #{type := Type, mode := Mode}) ->
%% `Mode` is used to control the format of the returned data
%% It will used in callback `Module:collect_mf/1` to fetch data from node or cluster
%% And use this mode parameter to determine the formatting method of the returned information.
%% Since the arity of the callback function has been fixed.
%% so it is placed in the process dictionary of the current process.
?PUT_PROM_DATA_MODE(Mode),
Data =
case erlang:function_exported(Module, collect, 1) of
true ->
erlang:apply(Module, collect, [Type]);
false ->
?SLOG(error, #{
msg => "prometheus callback module not found, empty data responded",
module_name => Module
}),
<<>>
end,
gen_response(Type, Data).
collect_opts(Headers, Qs) ->
#{type => response_type(Headers), mode => mode(Qs)}.
response_type(#{<<"accept">> := <<"application/json">>}) ->
<<"json">>;
response_type(_) ->
<<"prometheus">>.
mode(#{<<"mode">> := Mode}) ->
case lists:member(Mode, ?PROM_DATA_MODES) of
true -> Mode;
false -> ?PROM_DATA_MODE__NODE
end;
mode(_) ->
?PROM_DATA_MODE__NODE.
gen_response(<<"json">>, Data) ->
{200, Data};
gen_response(<<"prometheus">>, Data) ->
{200, #{<<"content-type">> => <<"text/plain">>}, Data}.
prometheus_setting_request() ->
[{prometheus, #{type := Setting}}] = emqx_prometheus_schema:roots(),
emqx_dashboard_swagger:schema_with_examples(
Setting,
[
recommend_setting_example(),
legacy_setting_example()
]
).
%% Always return recommend setting
prometheus_setting_response() ->
{_, #{value := Example}} = recommend_setting_example(),
emqx_dashboard_swagger:schema_with_example(
?R_REF(emqx_prometheus_schema, recommend_setting),
Example
).
legacy_setting_example() ->
Summary = <<"legacy_deprecated_setting">>,
{Summary, #{
summary => Summary,
value => #{
enable => true,
interval => <<"15s">>,
push_gateway_server => <<"http://127.0.0.1:9091">>,
headers => #{<<"Authorization">> => <<"Basic YWRtaW46Y2JraG55eWd5QDE=">>},
job_name => <<"${name}/instance/${name}~${host}">>,
vm_dist_collector => <<"disabled">>,
vm_memory_collector => <<"disabled">>,
vm_msacc_collector => <<"disabled">>,
mnesia_collector => <<"disabled">>,
vm_statistics_collector => <<"disabled">>,
vm_system_info_collector => <<"disabled">>
}
}}.
recommend_setting_example() ->
Summary = <<"recommend_setting">>,
{Summary, #{
summary => Summary,
value => #{
enable_basic_auth => false,
push_gateway => #{
interval => <<"15s">>,
url => <<"http://127.0.0.1:9091">>,
headers => #{<<"Authorization">> => <<"Basic YWRtaW46Y2JraG55eWd5QDE=">>},
job_name => <<"${name}/instance/${name}~${host}">>
},
collectors => #{
vm_dist => <<"disabled">>,
vm_memory => <<"disabled">>,
vm_msacc => <<"disabled">>,
mnesia => <<"disabled">>,
vm_statistics => <<"disabled">>,
vm_system_info => <<"disabled">>
}
}
}}.
prometheus_data_schema() ->
#{
description =>
<<"Get Prometheus Data.">>,
content =>
[
{'text/plain', #{schema => #{type => string}}},
{'application/json', #{schema => #{type => object}}}
]
}.