feat(prometheus): Support swagger prometheus API

This commit is contained in:
Turtle 2021-07-26 19:51:34 +08:00 committed by turtleDeng
parent 190c7d8f6c
commit 7a24878436
11 changed files with 217 additions and 59 deletions

View File

@ -93,6 +93,7 @@ includes() ->
, "emqx_management"
, "emqx_dashboard"
, "emqx_gateway"
, "emqx_prometheus"
].
-endif.

View File

@ -4,4 +4,5 @@
emqx_prometheus:{
push_gateway_server: "http://127.0.0.1:9091"
interval: "15s"
enable: true
}

View File

@ -0,0 +1 @@
-define(APP, emqx_prometheus).

View File

@ -25,12 +25,6 @@
-include_lib("prometheus/include/prometheus.hrl").
-include_lib("prometheus/include/prometheus_model.hrl").
-rest_api(#{name => stats,
method => 'GET',
path => "/emqx_prometheus",
func => stats,
descr => "Get emqx all stats info"
}).
-import(prometheus_model_helpers,
[ create_mf/5
@ -38,11 +32,8 @@
, counter_metric/1
]).
%% REST APIs
-export([stats/2]).
%% APIs
-export([start_link/2]).
-export([start_link/1]).
%% gen_server callbacks
-export([ init/1
@ -59,6 +50,8 @@
, collect_metrics/2
]).
-export([collect/1]).
-define(C(K, L), proplists:get_value(K, L, 0)).
-define(TIMER_MSG, '#interval').
@ -69,25 +62,17 @@
%% APIs
%%--------------------------------------------------------------------
start_link(PushGateway, Interval) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [PushGateway, Interval], []).
%%--------------------------------------------------------------------
%% REST APIs
stats(_Bindings, Params) ->
collect(proplists:get_value(<<"type">>, Params, <<"json">>)).
start_link(Opts) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([undefined, Interval]) ->
{ok, #state{interval = Interval}};
init([PushGateway, Interval]) ->
Ref = erlang:start_timer(Interval, self(), ?TIMER_MSG),
{ok, #state{timer = Ref, push_gateway = PushGateway, interval = Interval}}.
init([Opts]) ->
Interval = maps:get(interval, Opts),
PushGateway = maps:get(push_gateway_server, Opts),
{ok, ensure_timer(#state{push_gateway = PushGateway, interval = Interval})}.
handle_call(_Msg, _From, State) ->
{noreply, State}.
@ -95,12 +80,12 @@ handle_call(_Msg, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({timeout, R, ?TIMER_MSG}, S = #state{interval=I, timer=R, push_gateway=Uri}) ->
handle_info({timeout, R, ?TIMER_MSG}, State = #state{timer=R, push_gateway=Uri}) ->
[Name, Ip] = string:tokens(atom_to_list(node()), "@"),
Url = lists:concat([Uri, "/metrics/job/", Name, "/instance/",Name, "~", Ip]),
Data = prometheus_text_format:format(),
httpc:request(post, {Url, [], "text/plain", Data}, [{autoredirect, true}], []),
{noreply, S#state{timer = erlang:start_timer(I, self(), ?TIMER_MSG)}};
{noreply, ensure_timer(State)};
handle_info(_Msg, State) ->
{noreply, State}.
@ -111,6 +96,8 @@ code_change(_OldVsn, State, _Extra) ->
terminate(_Reason, _State) ->
ok.
ensure_timer(State = #state{interval = Interval}) ->
State#state{timer = emqx_misc:start_timer(Interval, ?TIMER_MSG)}.
%%--------------------------------------------------------------------
%% prometheus callbacks
%%--------------------------------------------------------------------
@ -138,18 +125,16 @@ collect(<<"json">>) ->
Metrics = emqx_metrics:all(),
Stats = emqx_stats:getstats(),
VMData = emqx_vm_data(),
Data = [{stats, [collect_stats(Name, Stats) || Name <- emqx_stats()]},
{metrics, [collect_stats(Name, VMData) || Name <- emqx_vm()]},
{packets, [collect_stats(Name, Metrics) || Name <- emqx_metrics_packets()]},
{messages, [collect_stats(Name, Metrics) || Name <- emqx_metrics_messages()]},
{delivery, [collect_stats(Name, Metrics) || Name <- emqx_metrics_delivery()]},
{client, [collect_stats(Name, Metrics) || Name <- emqx_metrics_client()]},
{session, [collect_stats(Name, Metrics) || Name <- emqx_metrics_session()]}],
return({ok, Data});
[{stats, [collect_stats(Name, Stats) || Name <- emqx_stats()]},
{metrics, [collect_stats(Name, VMData) || Name <- emqx_vm()]},
{packets, [collect_stats(Name, Metrics) || Name <- emqx_metrics_packets()]},
{messages, [collect_stats(Name, Metrics) || Name <- emqx_metrics_messages()]},
{delivery, [collect_stats(Name, Metrics) || Name <- emqx_metrics_delivery()]},
{client, [collect_stats(Name, Metrics) || Name <- emqx_metrics_client()]},
{session, [collect_stats(Name, Metrics) || Name <- emqx_metrics_session()]}];
collect(<<"prometheus">>) ->
Data = prometheus_text_format:format(),
{ok, #{<<"content-type">> => <<"text/plain">>}, Data}.
prometheus_text_format:format().
%% @private
collect_stats(Name, Stats) ->
@ -608,7 +593,3 @@ emqx_cluster_data() ->
#{running_nodes := Running, stopped_nodes := Stopped} = ekka_mnesia:cluster_info(),
[{nodes_running, length(Running)},
{nodes_stopped, length(Stopped)}].
%% TODO: V5 API
return(_) ->
ok.

View File

@ -0,0 +1,138 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_prometheus_api).
-behaviour(minirest_api).
-include("emqx_prometheus.hrl").
-import(emqx_mgmt_util, [ response_schema/1
, response_schema/2
, request_body_schema/1
]).
-export([api_spec/0]).
-export([ prometheus/2
% , stats/2
]).
api_spec() ->
{[prometheus_api()], schemas()}.
schemas() ->
[#{prometheus => #{
type => object,
properties => #{
push_gateway_server => #{
type => string,
description => <<"prometheus PushGateway Server">>,
example => get_raw(<<"push_gateway_server">>, <<"http://127.0.0.1:9091">>)},
interval => #{
type => string,
description => <<"Interval">>,
example => get_raw(<<"interval">>, <<"15s">>)},
enable => #{
type => boolean,
description => <<"Prometheus status">>,
example => get_raw(<<"enable">>, false)}
}
}}].
prometheus_api() ->
Metadata = #{
get => #{
description => <<"Get Prometheus info">>,
responses => #{
<<"200">> => response_schema(prometheus)
}
},
put => #{
description => <<"Update Prometheus">>,
'requestBody' => request_body_schema(prometheus),
responses => #{
<<"200">> =>
response_schema(<<"Update Prometheus successfully">>),
<<"400">> =>
response_schema(<<"Bad Request">>, #{
type => object,
properties => #{
message => #{type => string},
code => #{type => string}
}
})
}
}
},
{"/prometheus", Metadata, prometheus}.
% prometheus_data_api() ->
% Metadata = #{
% get => #{
% description => <<"Get Prometheus Data">>,
% parameters => [#{
% name => format_type,
% in => path,
% schema => #{type => string}
% }],
% responses => #{
% <<"200">> =>
% response_schema(<<"Update Prometheus successfully">>),
% <<"400">> =>
% response_schema(<<"Bad Request">>, #{
% type => object,
% properties => #{
% message => #{type => string},
% code => #{type => string}
% }
% })
% }
% }
% },
% {"/prometheus/stats", Metadata, stats}.
prometheus(get, _Request) ->
Response = emqx_config:get_raw([<<"emqx_prometheus">>], #{}),
{200, Response};
prometheus(put, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Params = emqx_json:decode(Body, [return_maps]),
Enable = maps:get(<<"enable">>, Params),
ok = emqx_config:update([?APP], Params),
enable_prometheus(Enable).
% stats(_Bindings, Params) ->
% Type = proplists:get_value(<<"format_type">>, Params, <<"json">>),
% Data = emqx_prometheus:collect(Type),
% case Type of
% <<"json">> ->
% {ok, Data};
% <<"prometheus">> ->
% {ok, #{<<"content-type">> => <<"text/plain">>}, Data}
% end.
enable_prometheus(true) ->
ok = emqx_prometheus_sup:stop_child(?APP),
emqx_prometheus_sup:start_child(?APP, emqx_config:get([?APP], #{})),
{200};
enable_prometheus(false) ->
_ = emqx_prometheus_sup:stop_child(?APP),
{200}.
get_raw(Key, Def) ->
emqx_config:get_raw([<<"emqx_prometheus">>] ++ [Key], Def).

View File

@ -18,20 +18,25 @@
-behaviour(application).
-emqx_plugin(?MODULE).
-include("emqx_prometheus.hrl").
%% Application callbacks
-export([ start/2
, stop/1
]).
-define(APP, emqx_prometheus).
start(_StartType, _StartArgs) ->
PushGateway = emqx_config:get([?APP, push_gateway_server], undefined),
Interval = emqx_config:get([?APP, interval], 15000),
emqx_prometheus_sup:start_link(PushGateway, Interval).
{ok, Sup} = emqx_prometheus_sup:start_link(),
maybe_enable_prometheus(),
{ok, Sup}.
stop(_State) ->
ok.
maybe_enable_prometheus() ->
case emqx_config:get([?APP, enable], false) of
true ->
emqx_prometheus_sup:start_child(?APP, emqx_config:get([?APP], #{}));
false ->
ok
end.

View File

@ -27,4 +27,5 @@ structs() -> ["emqx_prometheus"].
fields("emqx_prometheus") ->
[ {push_gateway_server, emqx_schema:t(string())}
, {interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "15s")}
, {enable, emqx_schema:t(boolean(), undefined, false)}
].

View File

@ -18,19 +18,48 @@
-behaviour(supervisor).
-export([start_link/2]).
-export([ start_link/0
, start_child/1
, start_child/2
, stop_child/1
]).
-export([init/1]).
start_link(PushGateway, Interval) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [PushGateway, Interval]).
%% Helper macro for declaring children of supervisor
-define(CHILD(Mod, Opts), #{id => Mod,
start => {Mod, start_link, [Opts]},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [Mod]}).
init([PushGateway, Interval]) ->
{ok, {#{strategy => one_for_one, intensity => 10, period => 100},
[#{id => emqx_prometheus,
start => {emqx_prometheus, start_link, [PushGateway, Interval]},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_prometheus]}]}}.
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-spec start_child(supervisor:child_spec()) -> ok.
start_child(ChildSpec) when is_map(ChildSpec) ->
assert_started(supervisor:start_child(?MODULE, ChildSpec)).
-spec start_child(atom(), map()) -> ok.
start_child(Mod, Opts) when is_atom(Mod) andalso is_map(Opts) ->
assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Opts))).
-spec(stop_child(any()) -> ok | {error, term()}).
stop_child(ChildId) ->
case supervisor:terminate_child(?MODULE, ChildId) of
ok -> supervisor:delete_child(?MODULE, ChildId);
Error -> Error
end.
init([]) ->
{ok, {{one_for_one, 10, 3600}, []}}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
assert_started({ok, _Pid}) -> ok;
assert_started({ok, _Pid, _Info}) -> ok;
assert_started({error, {already_tarted, _Pid}}) -> ok;
assert_started({error, Reason}) -> erlang:error(Reason).

View File

@ -99,7 +99,7 @@ enable_statsd(true) ->
emqx_statsd_sup:start_child(?APP, emqx_config:get([?APP], #{})),
{200};
enable_statsd(false) ->
ok = emqx_statsd_sup:stop_child(?APP),
_ = emqx_statsd_sup:stop_child(?APP),
{200}.
get_raw(Key, Def) ->

View File

@ -34,7 +34,7 @@ stop(_) ->
maybe_enable_statsd() ->
case emqx_config:get([?APP, enable], false) of
true ->
emqx_statsd_sup:start_child(emqx_statsd, emqx_config:get([?APP], #{}));
emqx_statsd_sup:start_child(?APP, emqx_config:get([?APP], #{}));
false ->
ok
end.

View File

@ -272,6 +272,7 @@ relx_apps(ReleaseType) ->
, emqx_dashboard
, emqx_retainer
, emqx_statsd
, emqx_prometheus
]
++ [quicer || is_quicer_supported()]
++ [emqx_license || is_enterprise()]