From e22f8ff2a84d937ded206c5346797c637b4f545b Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Wed, 26 Oct 2022 17:06:32 +0800 Subject: [PATCH] refactor: sync emqx_prometheus via emqx_config_handler --- .../include/emqx_prometheus.hrl | 1 + apps/emqx_prometheus/src/emqx_prometheus.erl | 112 +++++++----------- .../src/emqx_prometheus_api.erl | 10 +- .../src/emqx_prometheus_app.erl | 15 +-- .../src/emqx_prometheus_config.erl | 51 ++++++++ .../src/emqx_prometheus_schema.erl | 9 +- .../src/emqx_prometheus_sup.erl | 21 ++-- .../test/emqx_prometheus_SUITE.erl | 2 +- .../test/emqx_prometheus_api_SUITE.erl | 17 ++- 9 files changed, 138 insertions(+), 100 deletions(-) create mode 100644 apps/emqx_prometheus/src/emqx_prometheus_config.erl diff --git a/apps/emqx_prometheus/include/emqx_prometheus.hrl b/apps/emqx_prometheus/include/emqx_prometheus.hrl index 589bbd024..36066a55d 100644 --- a/apps/emqx_prometheus/include/emqx_prometheus.hrl +++ b/apps/emqx_prometheus/include/emqx_prometheus.hrl @@ -1 +1,2 @@ -define(APP, emqx_prometheus). +-define(PROMETHEUS, [prometheus]). diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index d2c09774c..4edf371cc 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -37,18 +37,8 @@ ] ). --export([ - update/1, - start/0, - stop/0, - restart/0, - % for rpc - do_start/0, - do_stop/0 -]). - %% APIs --export([start_link/1]). +-export([start_link/1, info/0]). %% gen_server callbacks -export([ @@ -73,84 +63,59 @@ -define(TIMER_MSG, '#interval'). --record(state, {push_gateway, timer, interval}). - -%%-------------------------------------------------------------------- -%% update new config -update(Config) -> - case - emqx_conf:update( - [prometheus], - Config, - #{rawconf_with_defaults => true, override_to => cluster} - ) - of - {ok, #{raw_config := NewConfigRows}} -> - case maps:get(<<"enable">>, Config, true) of - true -> - ok = restart(); - false -> - ok = stop() - end, - {ok, NewConfigRows}; - {error, Reason} -> - {error, Reason} - end. - -start() -> - {_, []} = emqx_prometheus_proto_v1:start(mria_mnesia:running_nodes()), - ok. - -stop() -> - {_, []} = emqx_prometheus_proto_v1:stop(mria_mnesia:running_nodes()), - ok. - -restart() -> - ok = stop(), - ok = start(). - -do_start() -> - emqx_prometheus_sup:start_child(?APP, emqx_conf:get([prometheus])). - -do_stop() -> - case emqx_prometheus_sup:stop_child(?APP) of - ok -> - ok; - {error, not_found} -> - ok - end. +-define(HTTP_OPTIONS, [{autoredirect, true}, {timeout, 60000}]). %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- -start_link(Opts) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). +start_link([]) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +info() -> + gen_server:call(?MODULE, info). %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- -init([Opts]) -> - Interval = maps:get(interval, Opts), - PushGateway = maps:get(push_gateway_server, Opts), - {ok, ensure_timer(#state{push_gateway = PushGateway, interval = Interval})}. +init([]) -> + #{interval := Interval} = opts(), + {ok, #{timer => ensure_timer(Interval), ok => 0, failed => 0}}. +handle_call(info, _From, State) -> + {reply, State#{opts => opts()}, State}; handle_call(_Msg, _From, State) -> - {noreply, State}. + {reply, ok, State}. handle_cast(_Msg, State) -> {noreply, State}. -handle_info({timeout, R, ?TIMER_MSG}, State = #state{timer = R, push_gateway = Uri}) -> +handle_info({timeout, Timer, ?TIMER_MSG}, State = #{timer := Timer}) -> + #{interval := Interval, push_gateway_server := Server} = opts(), + PushRes = push_to_push_gateway(Server), + NewTimer = ensure_timer(Interval), + NewState = maps:update_with(PushRes, fun(C) -> C + 1 end, 1, State#{timer => NewTimer}), + %% Data is too big, hibernate for saving memory and stop system monitor warning. + {noreply, NewState, hibernate}; +handle_info(_Msg, State) -> + {noreply, State}. + +push_to_push_gateway(Uri) -> [Name, Ip] = string:tokens(atom_to_list(node()), "@"), Url = lists:concat([Uri, "/metrics/job/", Name, "/instance/", Name, "~", Ip]), Data = prometheus_text_format:format(), - httpc:request(post, {Url, [], "text/plain", Data}, [{autoredirect, true}], []), - %% Data is too big, hibernate for saving memory and stop system monitor warning. - {noreply, ensure_timer(State), hibernate}; -handle_info(_Msg, State) -> - {noreply, State}. + case httpc:request(post, {Url, [], "text/plain", Data}, ?HTTP_OPTIONS, []) of + {ok, {{"HTTP/1.1", 200, "OK"}, _Headers, _Body}} -> + ok; + Error -> + ?SLOG(error, #{ + msg => "post_to_push_gateway_failed", + error => Error, + url => Url + }), + failed + end. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -158,11 +123,14 @@ code_change(_OldVsn, State, _Extra) -> terminate(_Reason, _State) -> ok. -ensure_timer(State = #state{interval = Interval}) -> - State#state{timer = emqx_misc:start_timer(Interval, ?TIMER_MSG)}. +ensure_timer(Interval) -> + emqx_misc:start_timer(Interval, ?TIMER_MSG). + %%-------------------------------------------------------------------- %% prometheus callbacks %%-------------------------------------------------------------------- +opts() -> + emqx_conf:get(?PROMETHEUS). deregister_cleanup(_Registry) -> ok. diff --git a/apps/emqx_prometheus/src/emqx_prometheus_api.erl b/apps/emqx_prometheus/src/emqx_prometheus_api.erl index 9a81f3ea3..125eed560 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_api.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_api.erl @@ -84,7 +84,7 @@ schema("/prometheus/stats") -> prometheus(get, _Params) -> {200, emqx:get_raw_config([<<"prometheus">>], #{})}; prometheus(put, #{body := Body}) -> - case emqx_prometheus:update(Body) of + case emqx_prometheus_config:update(Body) of {ok, NewConfig} -> {200, NewConfig}; {error, Reason} -> @@ -120,7 +120,13 @@ prometheus_config_example() -> #{ enable => true, interval => "15s", - push_gateway_server => <<"http://127.0.0.1:9091">> + push_gateway_server => <<"http://127.0.0.1:9091">>, + vm_dist_collector => enabled, + mnesia_collector => enabled, + vm_statistics_collector => enabled, + vm_system_info_collector => enabled, + vm_memory_collector => enabled, + vm_msacc_collector => enabled }. prometheus_data_schema() -> diff --git a/apps/emqx_prometheus/src/emqx_prometheus_app.erl b/apps/emqx_prometheus/src/emqx_prometheus_app.erl index b9dd9c466..bdee12d0e 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_app.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_app.erl @@ -27,17 +27,10 @@ ]). start(_StartType, _StartArgs) -> - {ok, Sup} = emqx_prometheus_sup:start_link(), - maybe_enable_prometheus(), - {ok, Sup}. + Res = emqx_prometheus_sup:start_link(), + emqx_prometheus_config:add_handler(), + Res. stop(_State) -> + emqx_prometheus_config:remove_handler(), ok. - -maybe_enable_prometheus() -> - case emqx_conf:get([prometheus, enable], false) of - true -> - emqx_prometheus_sup:start_child(?APP, emqx_conf:get([prometheus], #{})); - false -> - ok - end. diff --git a/apps/emqx_prometheus/src/emqx_prometheus_config.erl b/apps/emqx_prometheus/src/emqx_prometheus_config.erl new file mode 100644 index 000000000..83762b7c7 --- /dev/null +++ b/apps/emqx_prometheus/src/emqx_prometheus_config.erl @@ -0,0 +1,51 @@ +%%%------------------------------------------------------------------- +%%% @author zhongwen +%%% @copyright (C) 2022, +%%% @doc +%%% +%%% @end +%%% Created : 26. 10月 2022 11:14 +%%%------------------------------------------------------------------- +-module(emqx_prometheus_config). + +-behaviour(emqx_config_handler). + +-include("emqx_prometheus.hrl"). + +-export([add_handler/0, remove_handler/0]). +-export([post_config_update/5]). +-export([update/1]). + +update(Config) -> + case + emqx_conf:update( + [prometheus], + Config, + #{rawconf_with_defaults => true, override_to => cluster} + ) + of + {ok, #{raw_config := NewConfigRows}} -> + {ok, NewConfigRows}; + {error, Reason} -> + {error, Reason} + end. + +add_handler() -> + ok = emqx_config_handler:add_handler(?PROMETHEUS, ?MODULE), + ok. + +remove_handler() -> + ok = emqx_config_handler:remove_handler(?PROMETHEUS), + ok. + +post_config_update(?PROMETHEUS, _Req, New, _Old, AppEnvs) -> + application:set_env(AppEnvs), + update_prometheus(New), + ok; +post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) -> + ok. + +update_prometheus(#{enable := true}) -> + emqx_prometheus_sup:start_child(?APP); +update_prometheus(#{enable := false}) -> + emqx_prometheus_sup:stop_child(?APP). diff --git a/apps/emqx_prometheus/src/emqx_prometheus_schema.erl b/apps/emqx_prometheus/src/emqx_prometheus_schema.erl index 4149485ff..09908167c 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_schema.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_schema.erl @@ -24,12 +24,13 @@ namespace/0, roots/0, fields/1, - desc/1 + desc/1, + translation/1 ]). namespace() -> "prometheus". -roots() -> ["prometheus"]. +roots() -> [{"prometheus", ?HOCON(?R_REF("prometheus"), #{translate_to => ["prometheus"]})}]. fields("prometheus") -> [ @@ -124,3 +125,7 @@ fields("prometheus") -> desc("prometheus") -> ?DESC(prometheus); desc(_) -> undefined. + +%% for CI test, CI don't load the whole emqx_conf_schema. +translation(Name) -> + emqx_conf_schema:translation(Name). diff --git a/apps/emqx_prometheus/src/emqx_prometheus_sup.erl b/apps/emqx_prometheus/src/emqx_prometheus_sup.erl index 65023da14..eaf96af43 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_sup.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_sup.erl @@ -21,7 +21,6 @@ -export([ start_link/0, start_child/1, - start_child/2, stop_child/1 ]). @@ -40,23 +39,27 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). --spec start_child(supervisor:child_spec()) -> ok. +-spec start_child(supervisor:child_spec() | atom()) -> ok. start_child(ChildSpec) when is_map(ChildSpec) -> - assert_started(supervisor:start_child(?MODULE, ChildSpec)). - --spec start_child(atom(), map()) -> ok. -start_child(Mod, Opts) when is_atom(Mod) andalso is_map(Opts) -> - assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Opts))). + assert_started(supervisor:start_child(?MODULE, ChildSpec)); +start_child(Mod) when is_atom(Mod) -> + assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, []))). -spec stop_child(any()) -> ok | {error, term()}. stop_child(ChildId) -> case supervisor:terminate_child(?MODULE, ChildId) of ok -> supervisor:delete_child(?MODULE, ChildId); + {error, not_found} -> ok; Error -> Error end. init([]) -> - {ok, {{one_for_one, 10, 3600}, []}}. + Children = + case emqx_conf:get([prometheus, enable], false) of + false -> []; + true -> [?CHILD(emqx_prometheus, [])] + end, + {ok, {{one_for_one, 10, 3600}, Children}}. %%-------------------------------------------------------------------- %% Internal functions @@ -64,5 +67,5 @@ init([]) -> assert_started({ok, _Pid}) -> ok; assert_started({ok, _Pid, _Info}) -> ok; -assert_started({error, {already_tarted, _Pid}}) -> ok; +assert_started({error, {already_started, _Pid}}) -> ok; assert_started({error, Reason}) -> erlang:error(Reason). diff --git a/apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl b/apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl index 3ae21511d..1a72b8952 100644 --- a/apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl +++ b/apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl @@ -74,7 +74,7 @@ t_start_stop(_) -> ?assertMatch(ok, emqx_prometheus:start()), ?assertMatch(ok, emqx_prometheus:stop()), ?assertMatch(ok, emqx_prometheus:restart()), - %% wait the interval timer tigger + %% wait the interval timer trigger timer:sleep(2000). t_collector_no_crash_test(_) -> diff --git a/apps/emqx_prometheus/test/emqx_prometheus_api_SUITE.erl b/apps/emqx_prometheus/test/emqx_prometheus_api_SUITE.erl index e72d7865a..59b3b9a17 100644 --- a/apps/emqx_prometheus/test/emqx_prometheus_api_SUITE.erl +++ b/apps/emqx_prometheus/test/emqx_prometheus_api_SUITE.erl @@ -71,16 +71,27 @@ t_prometheus_api(_) -> #{ <<"push_gateway_server">> := _, <<"interval">> := _, - <<"enable">> := _ + <<"enable">> := _, + <<"vm_statistics_collector">> := _, + <<"vm_system_info_collector">> := _, + <<"vm_memory_collector">> := _, + <<"vm_msacc_collector">> := _ }, Conf ), - - NewConf = Conf#{<<"interval">> := <<"2s">>}, + #{<<"enable">> := Enable} = Conf, + ?assertEqual(Enable, undefined =/= erlang:whereis(emqx_prometheus)), + NewConf = Conf#{<<"interval">> => <<"2s">>, <<"vm_statistics_collector">> => <<"disabled">>}, {ok, Response2} = emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, NewConf), Conf2 = emqx_json:decode(Response2, [return_maps]), ?assertMatch(NewConf, Conf2), + ?assertEqual({ok, []}, application:get_env(prometheus, vm_statistics_collector_metrics)), + ?assertEqual({ok, all}, application:get_env(prometheus, vm_memory_collector_metrics)), + + NewConf1 = Conf#{<<"enable">> => (not Enable)}, + {ok, _Response3} = emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, NewConf1), + ?assertEqual((not Enable), undefined =/= erlang:whereis(emqx_prometheus)), ok. t_stats_api(_) ->