fix: restart emqx_statsd with the updated configuration

emqx_config_handler:post_config_update/5 cb is called before an updated config is saved.
Thus, a process being restarted in that callback cannot get the latest config by calling
emqx_conf:get/2, because that update is not saved yet.

Relates to EMQX-9055
This commit is contained in:
Serge Tupchii 2023-03-03 18:03:32 +02:00
parent d90e7d568e
commit bff087f40a
4 changed files with 42 additions and 11 deletions

View File

@ -38,7 +38,7 @@
]). ]).
%% Interface %% Interface
-export([start_link/0]). -export([start_link/1]).
%% Internal Exports %% Internal Exports
-export([ -export([
@ -68,17 +68,17 @@ do_restart() ->
ok = do_start(), ok = do_start(),
ok. ok.
start_link() -> start_link(Conf) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, Conf, []).
init([]) -> init(Conf) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
#{ #{
tags := TagsRaw, tags := TagsRaw,
server := Server, server := Server,
sample_time_interval := SampleTimeInterval, sample_time_interval := SampleTimeInterval,
flush_time_interval := FlushTimeInterval flush_time_interval := FlushTimeInterval
} = emqx_conf:get([statsd]), } = Conf,
{Host, Port} = emqx_schema:parse_server(Server, ?SERVER_PARSE_OPTS), {Host, Port} = emqx_schema:parse_server(Server, ?SERVER_PARSE_OPTS),
Tags = maps:fold(fun(K, V, Acc) -> [{to_bin(K), to_bin(V)} | Acc] end, [], TagsRaw), Tags = maps:fold(fun(K, V, Acc) -> [{to_bin(K), to_bin(V)} | Acc] end, [], TagsRaw),
Opts = [{tags, Tags}, {host, Host}, {port, Port}, {prefix, <<"emqx">>}], Opts = [{tags, Tags}, {host, Host}, {port, Port}, {prefix, <<"emqx">>}],

View File

@ -45,9 +45,9 @@ remove_handler() ->
ok = emqx_config_handler:remove_handler(?STATSD), ok = emqx_config_handler:remove_handler(?STATSD),
ok. ok.
post_config_update(?STATSD, _Req, #{enable := true}, _Old, _AppEnvs) -> post_config_update(?STATSD, _Req, #{enable := true} = New, _Old, _AppEnvs) ->
emqx_statsd_sup:ensure_child_stopped(?APP), emqx_statsd_sup:ensure_child_stopped(?APP),
emqx_statsd_sup:ensure_child_started(?APP); emqx_statsd_sup:ensure_child_started(?APP, New);
post_config_update(?STATSD, _Req, #{enable := false}, _Old, _AppEnvs) -> post_config_update(?STATSD, _Req, #{enable := false}, _Old, _AppEnvs) ->
emqx_statsd_sup:ensure_child_stopped(?APP); emqx_statsd_sup:ensure_child_stopped(?APP);
post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) -> post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) ->

View File

@ -25,6 +25,7 @@
-export([ -export([
start_link/0, start_link/0,
ensure_child_started/1, ensure_child_started/1,
ensure_child_started/2,
ensure_child_stopped/1 ensure_child_stopped/1
]). ]).
@ -45,7 +46,11 @@ start_link() ->
-spec ensure_child_started(atom()) -> ok. -spec ensure_child_started(atom()) -> ok.
ensure_child_started(Mod) when is_atom(Mod) -> ensure_child_started(Mod) when is_atom(Mod) ->
assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, []))). ensure_child_started(Mod, emqx_conf:get([statsd], #{})).
-spec ensure_child_started(atom(), map()) -> ok.
ensure_child_started(Mod, Conf) when is_atom(Mod) ->
assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, [Conf]))).
%% @doc Stop the child worker process. %% @doc Stop the child worker process.
-spec ensure_child_stopped(any()) -> ok. -spec ensure_child_stopped(any()) -> ok.
@ -61,9 +66,9 @@ ensure_child_stopped(ChildId) ->
init([]) -> init([]) ->
Children = Children =
case emqx_conf:get([statsd, enable], false) of case emqx_conf:get([statsd], #{}) of
true -> [?CHILD(emqx_statsd, [])]; #{enable := true} = Conf -> [?CHILD(emqx_statsd, [Conf])];
false -> [] _ -> []
end, end,
{ok, {{one_for_one, 100, 3600}, Children}}. {ok, {{one_for_one, 100, 3600}, Children}}.

View File

@ -113,6 +113,32 @@ t_kill_exit(_) ->
?assertNotEqual(Estatsd, Estatsd1), ?assertNotEqual(Estatsd, Estatsd1),
ok. ok.
t_config_update(_) ->
OldRawConf = emqx_conf:get_raw([statsd]),
{ok, _} = emqx_statsd_config:update(OldRawConf#{<<"enable">> => true}),
CommonKeys = [flush_time_interval, sample_time_interval],
OldConf = emqx_conf:get([statsd]),
OldStatsDState = sys:get_state(emqx_statsd),
OldPid = erlang:whereis(emqx_statsd),
?assertEqual(maps:with(CommonKeys, OldConf), maps:with(CommonKeys, OldStatsDState)),
NewRawConfExpect = OldRawConf#{
<<"flush_time_interval">> := <<"42s">>,
<<"sample_time_interval">> := <<"42s">>
},
try
{ok, _} = emqx_statsd_config:update(NewRawConfExpect),
NewRawConf = emqx_conf:get_raw([statsd]),
NewConf = emqx_conf:get([statsd]),
NewStatsDState = sys:get_state(emqx_statsd),
NewPid = erlang:whereis(emqx_statsd),
?assertNotEqual(OldRawConf, NewRawConf),
?assertEqual(NewRawConfExpect, NewRawConf),
?assertEqual(maps:with(CommonKeys, NewConf), maps:with(CommonKeys, NewStatsDState)),
?assertNotEqual(OldPid, NewPid)
after
{ok, _} = emqx_statsd_config:update(OldRawConf)
end.
request(Method) -> request(Method, []). request(Method) -> request(Method, []).
request(Method, Body) -> request(Method, Body) ->