From 7a24878436f61a830c4686d459be6e4b087ae288 Mon Sep 17 00:00:00 2001 From: Turtle Date: Mon, 26 Jul 2021 19:51:34 +0800 Subject: [PATCH] feat(prometheus): Support swagger prometheus API --- apps/emqx/src/emqx_schema.erl | 1 + apps/emqx_prometheus/etc/emqx_prometheus.conf | 1 + .../include/emqx_prometheus.hrl | 1 + apps/emqx_prometheus/src/emqx_prometheus.erl | 61 +++----- .../src/emqx_prometheus_api.erl | 138 ++++++++++++++++++ .../src/emqx_prometheus_app.erl | 17 ++- .../src/emqx_prometheus_schema.erl | 1 + .../src/emqx_prometheus_sup.erl | 51 +++++-- apps/emqx_statsd/src/emqx_statsd_api.erl | 2 +- apps/emqx_statsd/src/emqx_statsd_app.erl | 2 +- rebar.config.erl | 1 + 11 files changed, 217 insertions(+), 59 deletions(-) create mode 100644 apps/emqx_prometheus/src/emqx_prometheus_api.erl diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 19ca12142..05a0e4838 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -93,6 +93,7 @@ includes() -> , "emqx_management" , "emqx_dashboard" , "emqx_gateway" + , "emqx_prometheus" ]. -endif. diff --git a/apps/emqx_prometheus/etc/emqx_prometheus.conf b/apps/emqx_prometheus/etc/emqx_prometheus.conf index c450846fe..91fa5b284 100644 --- a/apps/emqx_prometheus/etc/emqx_prometheus.conf +++ b/apps/emqx_prometheus/etc/emqx_prometheus.conf @@ -4,4 +4,5 @@ emqx_prometheus:{ push_gateway_server: "http://127.0.0.1:9091" interval: "15s" + enable: true } diff --git a/apps/emqx_prometheus/include/emqx_prometheus.hrl b/apps/emqx_prometheus/include/emqx_prometheus.hrl index e69de29bb..589bbd024 100644 --- a/apps/emqx_prometheus/include/emqx_prometheus.hrl +++ b/apps/emqx_prometheus/include/emqx_prometheus.hrl @@ -0,0 +1 @@ +-define(APP, emqx_prometheus). diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 04ebd78d3..e369179ee 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -25,12 +25,6 @@ -include_lib("prometheus/include/prometheus.hrl"). -include_lib("prometheus/include/prometheus_model.hrl"). --rest_api(#{name => stats, - method => 'GET', - path => "/emqx_prometheus", - func => stats, - descr => "Get emqx all stats info" - }). -import(prometheus_model_helpers, [ create_mf/5 @@ -38,11 +32,8 @@ , counter_metric/1 ]). -%% REST APIs --export([stats/2]). - %% APIs --export([start_link/2]). +-export([start_link/1]). %% gen_server callbacks -export([ init/1 @@ -59,6 +50,8 @@ , collect_metrics/2 ]). +-export([collect/1]). + -define(C(K, L), proplists:get_value(K, L, 0)). -define(TIMER_MSG, '#interval'). @@ -69,25 +62,17 @@ %% APIs %%-------------------------------------------------------------------- -start_link(PushGateway, Interval) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [PushGateway, Interval], []). - -%%-------------------------------------------------------------------- -%% REST APIs - -stats(_Bindings, Params) -> - collect(proplists:get_value(<<"type">>, Params, <<"json">>)). +start_link(Opts) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- -init([undefined, Interval]) -> - {ok, #state{interval = Interval}}; - -init([PushGateway, Interval]) -> - Ref = erlang:start_timer(Interval, self(), ?TIMER_MSG), - {ok, #state{timer = Ref, push_gateway = PushGateway, interval = Interval}}. +init([Opts]) -> + Interval = maps:get(interval, Opts), + PushGateway = maps:get(push_gateway_server, Opts), + {ok, ensure_timer(#state{push_gateway = PushGateway, interval = Interval})}. handle_call(_Msg, _From, State) -> {noreply, State}. @@ -95,12 +80,12 @@ handle_call(_Msg, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({timeout, R, ?TIMER_MSG}, S = #state{interval=I, timer=R, push_gateway=Uri}) -> +handle_info({timeout, R, ?TIMER_MSG}, State = #state{timer=R, push_gateway=Uri}) -> [Name, Ip] = string:tokens(atom_to_list(node()), "@"), Url = lists:concat([Uri, "/metrics/job/", Name, "/instance/",Name, "~", Ip]), Data = prometheus_text_format:format(), httpc:request(post, {Url, [], "text/plain", Data}, [{autoredirect, true}], []), - {noreply, S#state{timer = erlang:start_timer(I, self(), ?TIMER_MSG)}}; + {noreply, ensure_timer(State)}; handle_info(_Msg, State) -> {noreply, State}. @@ -111,6 +96,8 @@ code_change(_OldVsn, State, _Extra) -> terminate(_Reason, _State) -> ok. +ensure_timer(State = #state{interval = Interval}) -> + State#state{timer = emqx_misc:start_timer(Interval, ?TIMER_MSG)}. %%-------------------------------------------------------------------- %% prometheus callbacks %%-------------------------------------------------------------------- @@ -138,18 +125,16 @@ collect(<<"json">>) -> Metrics = emqx_metrics:all(), Stats = emqx_stats:getstats(), VMData = emqx_vm_data(), - Data = [{stats, [collect_stats(Name, Stats) || Name <- emqx_stats()]}, - {metrics, [collect_stats(Name, VMData) || Name <- emqx_vm()]}, - {packets, [collect_stats(Name, Metrics) || Name <- emqx_metrics_packets()]}, - {messages, [collect_stats(Name, Metrics) || Name <- emqx_metrics_messages()]}, - {delivery, [collect_stats(Name, Metrics) || Name <- emqx_metrics_delivery()]}, - {client, [collect_stats(Name, Metrics) || Name <- emqx_metrics_client()]}, - {session, [collect_stats(Name, Metrics) || Name <- emqx_metrics_session()]}], - return({ok, Data}); + [{stats, [collect_stats(Name, Stats) || Name <- emqx_stats()]}, + {metrics, [collect_stats(Name, VMData) || Name <- emqx_vm()]}, + {packets, [collect_stats(Name, Metrics) || Name <- emqx_metrics_packets()]}, + {messages, [collect_stats(Name, Metrics) || Name <- emqx_metrics_messages()]}, + {delivery, [collect_stats(Name, Metrics) || Name <- emqx_metrics_delivery()]}, + {client, [collect_stats(Name, Metrics) || Name <- emqx_metrics_client()]}, + {session, [collect_stats(Name, Metrics) || Name <- emqx_metrics_session()]}]; collect(<<"prometheus">>) -> - Data = prometheus_text_format:format(), - {ok, #{<<"content-type">> => <<"text/plain">>}, Data}. + prometheus_text_format:format(). %% @private collect_stats(Name, Stats) -> @@ -608,7 +593,3 @@ emqx_cluster_data() -> #{running_nodes := Running, stopped_nodes := Stopped} = ekka_mnesia:cluster_info(), [{nodes_running, length(Running)}, {nodes_stopped, length(Stopped)}]. - -%% TODO: V5 API -return(_) -> - ok. diff --git a/apps/emqx_prometheus/src/emqx_prometheus_api.erl b/apps/emqx_prometheus/src/emqx_prometheus_api.erl new file mode 100644 index 000000000..c4d7da7a4 --- /dev/null +++ b/apps/emqx_prometheus/src/emqx_prometheus_api.erl @@ -0,0 +1,138 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_prometheus_api). + +-behaviour(minirest_api). + +-include("emqx_prometheus.hrl"). + +-import(emqx_mgmt_util, [ response_schema/1 + , response_schema/2 + , request_body_schema/1 + ]). + +-export([api_spec/0]). + +-export([ prometheus/2 + % , stats/2 + ]). + +api_spec() -> + {[prometheus_api()], schemas()}. + +schemas() -> + [#{prometheus => #{ + type => object, + properties => #{ + push_gateway_server => #{ + type => string, + description => <<"prometheus PushGateway Server">>, + example => get_raw(<<"push_gateway_server">>, <<"http://127.0.0.1:9091">>)}, + interval => #{ + type => string, + description => <<"Interval">>, + example => get_raw(<<"interval">>, <<"15s">>)}, + enable => #{ + type => boolean, + description => <<"Prometheus status">>, + example => get_raw(<<"enable">>, false)} + } + }}]. + +prometheus_api() -> + Metadata = #{ + get => #{ + description => <<"Get Prometheus info">>, + responses => #{ + <<"200">> => response_schema(prometheus) + } + }, + put => #{ + description => <<"Update Prometheus">>, + 'requestBody' => request_body_schema(prometheus), + responses => #{ + <<"200">> => + response_schema(<<"Update Prometheus successfully">>), + <<"400">> => + response_schema(<<"Bad Request">>, #{ + type => object, + properties => #{ + message => #{type => string}, + code => #{type => string} + } + }) + } + } + }, + {"/prometheus", Metadata, prometheus}. + +% prometheus_data_api() -> +% Metadata = #{ +% get => #{ +% description => <<"Get Prometheus Data">>, +% parameters => [#{ +% name => format_type, +% in => path, +% schema => #{type => string} +% }], +% responses => #{ +% <<"200">> => +% response_schema(<<"Update Prometheus successfully">>), +% <<"400">> => +% response_schema(<<"Bad Request">>, #{ +% type => object, +% properties => #{ +% message => #{type => string}, +% code => #{type => string} +% } +% }) +% } +% } +% }, +% {"/prometheus/stats", Metadata, stats}. + +prometheus(get, _Request) -> + Response = emqx_config:get_raw([<<"emqx_prometheus">>], #{}), + {200, Response}; + +prometheus(put, Request) -> + {ok, Body, _} = cowboy_req:read_body(Request), + Params = emqx_json:decode(Body, [return_maps]), + Enable = maps:get(<<"enable">>, Params), + ok = emqx_config:update([?APP], Params), + enable_prometheus(Enable). + +% stats(_Bindings, Params) -> +% Type = proplists:get_value(<<"format_type">>, Params, <<"json">>), +% Data = emqx_prometheus:collect(Type), +% case Type of +% <<"json">> -> +% {ok, Data}; +% <<"prometheus">> -> +% {ok, #{<<"content-type">> => <<"text/plain">>}, Data} +% end. + +enable_prometheus(true) -> + ok = emqx_prometheus_sup:stop_child(?APP), + emqx_prometheus_sup:start_child(?APP, emqx_config:get([?APP], #{})), + {200}; +enable_prometheus(false) -> + _ = emqx_prometheus_sup:stop_child(?APP), + {200}. + +get_raw(Key, Def) -> + emqx_config:get_raw([<<"emqx_prometheus">>] ++ [Key], Def). diff --git a/apps/emqx_prometheus/src/emqx_prometheus_app.erl b/apps/emqx_prometheus/src/emqx_prometheus_app.erl index 9024bd583..865bda543 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_app.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_app.erl @@ -18,20 +18,25 @@ -behaviour(application). --emqx_plugin(?MODULE). +-include("emqx_prometheus.hrl"). %% Application callbacks -export([ start/2 , stop/1 ]). --define(APP, emqx_prometheus). - start(_StartType, _StartArgs) -> - PushGateway = emqx_config:get([?APP, push_gateway_server], undefined), - Interval = emqx_config:get([?APP, interval], 15000), - emqx_prometheus_sup:start_link(PushGateway, Interval). + {ok, Sup} = emqx_prometheus_sup:start_link(), + maybe_enable_prometheus(), + {ok, Sup}. stop(_State) -> ok. +maybe_enable_prometheus() -> + case emqx_config:get([?APP, enable], false) of + true -> + emqx_prometheus_sup:start_child(?APP, emqx_config:get([?APP], #{})); + false -> + ok + end. diff --git a/apps/emqx_prometheus/src/emqx_prometheus_schema.erl b/apps/emqx_prometheus/src/emqx_prometheus_schema.erl index 486362e7b..10a74cd0a 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_schema.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_schema.erl @@ -27,4 +27,5 @@ structs() -> ["emqx_prometheus"]. fields("emqx_prometheus") -> [ {push_gateway_server, emqx_schema:t(string())} , {interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "15s")} + , {enable, emqx_schema:t(boolean(), undefined, false)} ]. diff --git a/apps/emqx_prometheus/src/emqx_prometheus_sup.erl b/apps/emqx_prometheus/src/emqx_prometheus_sup.erl index 8ebbb02ae..fd5223b6d 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_sup.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_sup.erl @@ -18,19 +18,48 @@ -behaviour(supervisor). --export([start_link/2]). +-export([ start_link/0 + , start_child/1 + , start_child/2 + , stop_child/1 + ]). -export([init/1]). -start_link(PushGateway, Interval) -> - supervisor:start_link({local, ?MODULE}, ?MODULE, [PushGateway, Interval]). +%% Helper macro for declaring children of supervisor +-define(CHILD(Mod, Opts), #{id => Mod, + start => {Mod, start_link, [Opts]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [Mod]}). -init([PushGateway, Interval]) -> - {ok, {#{strategy => one_for_one, intensity => 10, period => 100}, - [#{id => emqx_prometheus, - start => {emqx_prometheus, start_link, [PushGateway, Interval]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_prometheus]}]}}. +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). +-spec start_child(supervisor:child_spec()) -> ok. +start_child(ChildSpec) when is_map(ChildSpec) -> + assert_started(supervisor:start_child(?MODULE, ChildSpec)). + +-spec start_child(atom(), map()) -> ok. +start_child(Mod, Opts) when is_atom(Mod) andalso is_map(Opts) -> + assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Opts))). + +-spec(stop_child(any()) -> ok | {error, term()}). +stop_child(ChildId) -> + case supervisor:terminate_child(?MODULE, ChildId) of + ok -> supervisor:delete_child(?MODULE, ChildId); + Error -> Error + end. + +init([]) -> + {ok, {{one_for_one, 10, 3600}, []}}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +assert_started({ok, _Pid}) -> ok; +assert_started({ok, _Pid, _Info}) -> ok; +assert_started({error, {already_tarted, _Pid}}) -> ok; +assert_started({error, Reason}) -> erlang:error(Reason). diff --git a/apps/emqx_statsd/src/emqx_statsd_api.erl b/apps/emqx_statsd/src/emqx_statsd_api.erl index 2276684a2..b10dfdca9 100644 --- a/apps/emqx_statsd/src/emqx_statsd_api.erl +++ b/apps/emqx_statsd/src/emqx_statsd_api.erl @@ -99,7 +99,7 @@ enable_statsd(true) -> emqx_statsd_sup:start_child(?APP, emqx_config:get([?APP], #{})), {200}; enable_statsd(false) -> - ok = emqx_statsd_sup:stop_child(?APP), + _ = emqx_statsd_sup:stop_child(?APP), {200}. get_raw(Key, Def) -> diff --git a/apps/emqx_statsd/src/emqx_statsd_app.erl b/apps/emqx_statsd/src/emqx_statsd_app.erl index a5054a16c..4764c04e1 100644 --- a/apps/emqx_statsd/src/emqx_statsd_app.erl +++ b/apps/emqx_statsd/src/emqx_statsd_app.erl @@ -34,7 +34,7 @@ stop(_) -> maybe_enable_statsd() -> case emqx_config:get([?APP, enable], false) of true -> - emqx_statsd_sup:start_child(emqx_statsd, emqx_config:get([?APP], #{})); + emqx_statsd_sup:start_child(?APP, emqx_config:get([?APP], #{})); false -> ok end. diff --git a/rebar.config.erl b/rebar.config.erl index eddd6bac1..6364c369b 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -272,6 +272,7 @@ relx_apps(ReleaseType) -> , emqx_dashboard , emqx_retainer , emqx_statsd + , emqx_prometheus ] ++ [quicer || is_quicer_supported()] ++ [emqx_license || is_enterprise()]