refactor: sync emqx_prometheus via emqx_config_handler

This commit is contained in:
Zhongwen Deng 2022-10-26 17:06:32 +08:00
parent dc3c0094f0
commit e22f8ff2a8
9 changed files with 138 additions and 100 deletions

View File

@ -1 +1,2 @@
-define(APP, emqx_prometheus). -define(APP, emqx_prometheus).
-define(PROMETHEUS, [prometheus]).

View File

@ -37,18 +37,8 @@
] ]
). ).
-export([
update/1,
start/0,
stop/0,
restart/0,
% for rpc
do_start/0,
do_stop/0
]).
%% APIs %% APIs
-export([start_link/1]). -export([start_link/1, info/0]).
%% gen_server callbacks %% gen_server callbacks
-export([ -export([
@ -73,84 +63,59 @@
-define(TIMER_MSG, '#interval'). -define(TIMER_MSG, '#interval').
-record(state, {push_gateway, timer, interval}). -define(HTTP_OPTIONS, [{autoredirect, true}, {timeout, 60000}]).
%%--------------------------------------------------------------------
%% update new config
update(Config) ->
case
emqx_conf:update(
[prometheus],
Config,
#{rawconf_with_defaults => true, override_to => cluster}
)
of
{ok, #{raw_config := NewConfigRows}} ->
case maps:get(<<"enable">>, Config, true) of
true ->
ok = restart();
false ->
ok = stop()
end,
{ok, NewConfigRows};
{error, Reason} ->
{error, Reason}
end.
start() ->
{_, []} = emqx_prometheus_proto_v1:start(mria_mnesia:running_nodes()),
ok.
stop() ->
{_, []} = emqx_prometheus_proto_v1:stop(mria_mnesia:running_nodes()),
ok.
restart() ->
ok = stop(),
ok = start().
do_start() ->
emqx_prometheus_sup:start_child(?APP, emqx_conf:get([prometheus])).
do_stop() ->
case emqx_prometheus_sup:stop_child(?APP) of
ok ->
ok;
{error, not_found} ->
ok
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
start_link(Opts) -> start_link([]) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
info() ->
gen_server:call(?MODULE, info).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([Opts]) -> init([]) ->
Interval = maps:get(interval, Opts), #{interval := Interval} = opts(),
PushGateway = maps:get(push_gateway_server, Opts), {ok, #{timer => ensure_timer(Interval), ok => 0, failed => 0}}.
{ok, ensure_timer(#state{push_gateway = PushGateway, interval = Interval})}.
handle_call(info, _From, State) ->
{reply, State#{opts => opts()}, State};
handle_call(_Msg, _From, State) -> handle_call(_Msg, _From, State) ->
{noreply, State}. {reply, ok, State}.
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({timeout, R, ?TIMER_MSG}, State = #state{timer = R, push_gateway = Uri}) -> handle_info({timeout, Timer, ?TIMER_MSG}, State = #{timer := Timer}) ->
#{interval := Interval, push_gateway_server := Server} = opts(),
PushRes = push_to_push_gateway(Server),
NewTimer = ensure_timer(Interval),
NewState = maps:update_with(PushRes, fun(C) -> C + 1 end, 1, State#{timer => NewTimer}),
%% Data is too big, hibernate for saving memory and stop system monitor warning.
{noreply, NewState, hibernate};
handle_info(_Msg, State) ->
{noreply, State}.
push_to_push_gateway(Uri) ->
[Name, Ip] = string:tokens(atom_to_list(node()), "@"), [Name, Ip] = string:tokens(atom_to_list(node()), "@"),
Url = lists:concat([Uri, "/metrics/job/", Name, "/instance/", Name, "~", Ip]), Url = lists:concat([Uri, "/metrics/job/", Name, "/instance/", Name, "~", Ip]),
Data = prometheus_text_format:format(), Data = prometheus_text_format:format(),
httpc:request(post, {Url, [], "text/plain", Data}, [{autoredirect, true}], []), case httpc:request(post, {Url, [], "text/plain", Data}, ?HTTP_OPTIONS, []) of
%% Data is too big, hibernate for saving memory and stop system monitor warning. {ok, {{"HTTP/1.1", 200, "OK"}, _Headers, _Body}} ->
{noreply, ensure_timer(State), hibernate}; ok;
handle_info(_Msg, State) -> Error ->
{noreply, State}. ?SLOG(error, #{
msg => "post_to_push_gateway_failed",
error => Error,
url => Url
}),
failed
end.
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
@ -158,11 +123,14 @@ code_change(_OldVsn, State, _Extra) ->
terminate(_Reason, _State) -> terminate(_Reason, _State) ->
ok. ok.
ensure_timer(State = #state{interval = Interval}) -> ensure_timer(Interval) ->
State#state{timer = emqx_misc:start_timer(Interval, ?TIMER_MSG)}. emqx_misc:start_timer(Interval, ?TIMER_MSG).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% prometheus callbacks %% prometheus callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
opts() ->
emqx_conf:get(?PROMETHEUS).
deregister_cleanup(_Registry) -> deregister_cleanup(_Registry) ->
ok. ok.

View File

@ -84,7 +84,7 @@ schema("/prometheus/stats") ->
prometheus(get, _Params) -> prometheus(get, _Params) ->
{200, emqx:get_raw_config([<<"prometheus">>], #{})}; {200, emqx:get_raw_config([<<"prometheus">>], #{})};
prometheus(put, #{body := Body}) -> prometheus(put, #{body := Body}) ->
case emqx_prometheus:update(Body) of case emqx_prometheus_config:update(Body) of
{ok, NewConfig} -> {ok, NewConfig} ->
{200, NewConfig}; {200, NewConfig};
{error, Reason} -> {error, Reason} ->
@ -120,7 +120,13 @@ prometheus_config_example() ->
#{ #{
enable => true, enable => true,
interval => "15s", interval => "15s",
push_gateway_server => <<"http://127.0.0.1:9091">> push_gateway_server => <<"http://127.0.0.1:9091">>,
vm_dist_collector => enabled,
mnesia_collector => enabled,
vm_statistics_collector => enabled,
vm_system_info_collector => enabled,
vm_memory_collector => enabled,
vm_msacc_collector => enabled
}. }.
prometheus_data_schema() -> prometheus_data_schema() ->

View File

@ -27,17 +27,10 @@
]). ]).
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_prometheus_sup:start_link(), Res = emqx_prometheus_sup:start_link(),
maybe_enable_prometheus(), emqx_prometheus_config:add_handler(),
{ok, Sup}. Res.
stop(_State) -> stop(_State) ->
emqx_prometheus_config:remove_handler(),
ok. ok.
maybe_enable_prometheus() ->
case emqx_conf:get([prometheus, enable], false) of
true ->
emqx_prometheus_sup:start_child(?APP, emqx_conf:get([prometheus], #{}));
false ->
ok
end.

View File

@ -0,0 +1,51 @@
%%%-------------------------------------------------------------------
%%% @author zhongwen
%%% @copyright (C) 2022, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 26. 10 2022 11:14
%%%-------------------------------------------------------------------
-module(emqx_prometheus_config).
-behaviour(emqx_config_handler).
-include("emqx_prometheus.hrl").
-export([add_handler/0, remove_handler/0]).
-export([post_config_update/5]).
-export([update/1]).
update(Config) ->
case
emqx_conf:update(
[prometheus],
Config,
#{rawconf_with_defaults => true, override_to => cluster}
)
of
{ok, #{raw_config := NewConfigRows}} ->
{ok, NewConfigRows};
{error, Reason} ->
{error, Reason}
end.
add_handler() ->
ok = emqx_config_handler:add_handler(?PROMETHEUS, ?MODULE),
ok.
remove_handler() ->
ok = emqx_config_handler:remove_handler(?PROMETHEUS),
ok.
post_config_update(?PROMETHEUS, _Req, New, _Old, AppEnvs) ->
application:set_env(AppEnvs),
update_prometheus(New),
ok;
post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) ->
ok.
update_prometheus(#{enable := true}) ->
emqx_prometheus_sup:start_child(?APP);
update_prometheus(#{enable := false}) ->
emqx_prometheus_sup:stop_child(?APP).

View File

@ -24,12 +24,13 @@
namespace/0, namespace/0,
roots/0, roots/0,
fields/1, fields/1,
desc/1 desc/1,
translation/1
]). ]).
namespace() -> "prometheus". namespace() -> "prometheus".
roots() -> ["prometheus"]. roots() -> [{"prometheus", ?HOCON(?R_REF("prometheus"), #{translate_to => ["prometheus"]})}].
fields("prometheus") -> fields("prometheus") ->
[ [
@ -124,3 +125,7 @@ fields("prometheus") ->
desc("prometheus") -> ?DESC(prometheus); desc("prometheus") -> ?DESC(prometheus);
desc(_) -> undefined. desc(_) -> undefined.
%% for CI test, CI don't load the whole emqx_conf_schema.
translation(Name) ->
emqx_conf_schema:translation(Name).

View File

@ -21,7 +21,6 @@
-export([ -export([
start_link/0, start_link/0,
start_child/1, start_child/1,
start_child/2,
stop_child/1 stop_child/1
]). ]).
@ -40,23 +39,27 @@
start_link() -> start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-spec start_child(supervisor:child_spec()) -> ok. -spec start_child(supervisor:child_spec() | atom()) -> ok.
start_child(ChildSpec) when is_map(ChildSpec) -> start_child(ChildSpec) when is_map(ChildSpec) ->
assert_started(supervisor:start_child(?MODULE, ChildSpec)). assert_started(supervisor:start_child(?MODULE, ChildSpec));
start_child(Mod) when is_atom(Mod) ->
-spec start_child(atom(), map()) -> ok. assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, []))).
start_child(Mod, Opts) when is_atom(Mod) andalso is_map(Opts) ->
assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Opts))).
-spec stop_child(any()) -> ok | {error, term()}. -spec stop_child(any()) -> ok | {error, term()}.
stop_child(ChildId) -> stop_child(ChildId) ->
case supervisor:terminate_child(?MODULE, ChildId) of case supervisor:terminate_child(?MODULE, ChildId) of
ok -> supervisor:delete_child(?MODULE, ChildId); ok -> supervisor:delete_child(?MODULE, ChildId);
{error, not_found} -> ok;
Error -> Error Error -> Error
end. end.
init([]) -> init([]) ->
{ok, {{one_for_one, 10, 3600}, []}}. Children =
case emqx_conf:get([prometheus, enable], false) of
false -> [];
true -> [?CHILD(emqx_prometheus, [])]
end,
{ok, {{one_for_one, 10, 3600}, Children}}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
@ -64,5 +67,5 @@ init([]) ->
assert_started({ok, _Pid}) -> ok; assert_started({ok, _Pid}) -> ok;
assert_started({ok, _Pid, _Info}) -> ok; assert_started({ok, _Pid, _Info}) -> ok;
assert_started({error, {already_tarted, _Pid}}) -> ok; assert_started({error, {already_started, _Pid}}) -> ok;
assert_started({error, Reason}) -> erlang:error(Reason). assert_started({error, Reason}) -> erlang:error(Reason).

View File

@ -74,7 +74,7 @@ t_start_stop(_) ->
?assertMatch(ok, emqx_prometheus:start()), ?assertMatch(ok, emqx_prometheus:start()),
?assertMatch(ok, emqx_prometheus:stop()), ?assertMatch(ok, emqx_prometheus:stop()),
?assertMatch(ok, emqx_prometheus:restart()), ?assertMatch(ok, emqx_prometheus:restart()),
%% wait the interval timer tigger %% wait the interval timer trigger
timer:sleep(2000). timer:sleep(2000).
t_collector_no_crash_test(_) -> t_collector_no_crash_test(_) ->

View File

@ -71,16 +71,27 @@ t_prometheus_api(_) ->
#{ #{
<<"push_gateway_server">> := _, <<"push_gateway_server">> := _,
<<"interval">> := _, <<"interval">> := _,
<<"enable">> := _ <<"enable">> := _,
<<"vm_statistics_collector">> := _,
<<"vm_system_info_collector">> := _,
<<"vm_memory_collector">> := _,
<<"vm_msacc_collector">> := _
}, },
Conf Conf
), ),
#{<<"enable">> := Enable} = Conf,
NewConf = Conf#{<<"interval">> := <<"2s">>}, ?assertEqual(Enable, undefined =/= erlang:whereis(emqx_prometheus)),
NewConf = Conf#{<<"interval">> => <<"2s">>, <<"vm_statistics_collector">> => <<"disabled">>},
{ok, Response2} = emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, NewConf), {ok, Response2} = emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, NewConf),
Conf2 = emqx_json:decode(Response2, [return_maps]), Conf2 = emqx_json:decode(Response2, [return_maps]),
?assertMatch(NewConf, Conf2), ?assertMatch(NewConf, Conf2),
?assertEqual({ok, []}, application:get_env(prometheus, vm_statistics_collector_metrics)),
?assertEqual({ok, all}, application:get_env(prometheus, vm_memory_collector_metrics)),
NewConf1 = Conf#{<<"enable">> => (not Enable)},
{ok, _Response3} = emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, NewConf1),
?assertEqual((not Enable), undefined =/= erlang:whereis(emqx_prometheus)),
ok. ok.
t_stats_api(_) -> t_stats_api(_) ->