diff --git a/apps/emqx_statsd/src/emqx_statsd.app.src b/apps/emqx_statsd/src/emqx_statsd.app.src index 27f842ce2..67825162e 100644 --- a/apps/emqx_statsd/src/emqx_statsd.app.src +++ b/apps/emqx_statsd/src/emqx_statsd.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_statsd, [ {description, "EMQX Statsd"}, - {vsn, "5.0.5"}, + {vsn, "5.0.6"}, {registered, []}, {mod, {emqx_statsd_app, []}}, {applications, [ diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 770320ddd..75c15fa9e 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -38,7 +38,7 @@ ]). %% Interface --export([start_link/0]). +-export([start_link/1]). %% Internal Exports -export([ @@ -68,17 +68,18 @@ do_restart() -> ok = do_start(), ok. -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +start_link(Conf) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, Conf, []). -init([]) -> +init(Conf) -> process_flag(trap_exit, true), #{ tags := TagsRaw, server := Server, sample_time_interval := SampleTimeInterval, flush_time_interval := FlushTimeInterval - } = emqx_conf:get([statsd]), + } = Conf, + FlushTimeInterval1 = flush_interval(FlushTimeInterval, SampleTimeInterval), {Host, Port} = emqx_schema:parse_server(Server, ?SERVER_PARSE_OPTS), Tags = maps:fold(fun(K, V, Acc) -> [{to_bin(K), to_bin(V)} | Acc] end, [], TagsRaw), Opts = [{tags, Tags}, {host, Host}, {port, Port}, {prefix, <<"emqx">>}], @@ -86,7 +87,7 @@ init([]) -> {ok, ensure_timer(#{ sample_time_interval => SampleTimeInterval, - flush_time_interval => FlushTimeInterval, + flush_time_interval => FlushTimeInterval1, estatsd_pid => Pid })}. @@ -129,6 +130,19 @@ terminate(_Reason, #{estatsd_pid := Pid}) -> %% Internal function %%------------------------------------------------------------------------------ +flush_interval(FlushInterval, SampleInterval) when FlushInterval >= SampleInterval -> + FlushInterval; +flush_interval(_FlushInterval, SampleInterval) -> + ?SLOG( + warning, + #{ + msg => + "Configured flush_time_interval is lower than sample_time_interval, " + "setting: flush_time_interval = sample_time_interval." + } + ), + SampleInterval. + ensure_timer(State = #{sample_time_interval := SampleTimeInterval}) -> State#{timer => emqx_misc:start_timer(SampleTimeInterval, ?SAMPLE_TIMEOUT)}. diff --git a/apps/emqx_statsd/src/emqx_statsd_config.erl b/apps/emqx_statsd/src/emqx_statsd_config.erl index b818d2691..6bc430956 100644 --- a/apps/emqx_statsd/src/emqx_statsd_config.erl +++ b/apps/emqx_statsd/src/emqx_statsd_config.erl @@ -45,9 +45,9 @@ remove_handler() -> ok = emqx_config_handler:remove_handler(?STATSD), ok. -post_config_update(?STATSD, _Req, #{enable := true}, _Old, _AppEnvs) -> +post_config_update(?STATSD, _Req, #{enable := true} = New, _Old, _AppEnvs) -> emqx_statsd_sup:ensure_child_stopped(?APP), - emqx_statsd_sup:ensure_child_started(?APP); + emqx_statsd_sup:ensure_child_started(?APP, New); post_config_update(?STATSD, _Req, #{enable := false}, _Old, _AppEnvs) -> emqx_statsd_sup:ensure_child_stopped(?APP); post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) -> diff --git a/apps/emqx_statsd/src/emqx_statsd_sup.erl b/apps/emqx_statsd/src/emqx_statsd_sup.erl index 2845fb505..35c1d332c 100644 --- a/apps/emqx_statsd/src/emqx_statsd_sup.erl +++ b/apps/emqx_statsd/src/emqx_statsd_sup.erl @@ -25,6 +25,7 @@ -export([ start_link/0, ensure_child_started/1, + ensure_child_started/2, ensure_child_stopped/1 ]). @@ -45,7 +46,11 @@ start_link() -> -spec ensure_child_started(atom()) -> ok. ensure_child_started(Mod) when is_atom(Mod) -> - assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, []))). + ensure_child_started(Mod, emqx_conf:get([statsd], #{})). + +-spec ensure_child_started(atom(), map()) -> ok. +ensure_child_started(Mod, Conf) when is_atom(Mod) -> + assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, [Conf]))). %% @doc Stop the child worker process. -spec ensure_child_stopped(any()) -> ok. @@ -61,9 +66,9 @@ ensure_child_stopped(ChildId) -> init([]) -> Children = - case emqx_conf:get([statsd, enable], false) of - true -> [?CHILD(emqx_statsd, [])]; - false -> [] + case emqx_conf:get([statsd], #{}) of + #{enable := true} = Conf -> [?CHILD(emqx_statsd, [Conf])]; + _ -> [] end, {ok, {{one_for_one, 100, 3600}, Children}}. diff --git a/apps/emqx_statsd/test/emqx_statsd_SUITE.erl b/apps/emqx_statsd/test/emqx_statsd_SUITE.erl index 2f8fa5a69..a203ef7d5 100644 --- a/apps/emqx_statsd/test/emqx_statsd_SUITE.erl +++ b/apps/emqx_statsd/test/emqx_statsd_SUITE.erl @@ -113,6 +113,32 @@ t_kill_exit(_) -> ?assertNotEqual(Estatsd, Estatsd1), ok. +t_config_update(_) -> + OldRawConf = emqx_conf:get_raw([statsd]), + {ok, _} = emqx_statsd_config:update(OldRawConf#{<<"enable">> => true}), + CommonKeys = [flush_time_interval, sample_time_interval], + OldConf = emqx_conf:get([statsd]), + OldStatsDState = sys:get_state(emqx_statsd), + OldPid = erlang:whereis(emqx_statsd), + ?assertEqual(maps:with(CommonKeys, OldConf), maps:with(CommonKeys, OldStatsDState)), + NewRawConfExpect = OldRawConf#{ + <<"flush_time_interval">> := <<"42s">>, + <<"sample_time_interval">> := <<"42s">> + }, + try + {ok, _} = emqx_statsd_config:update(NewRawConfExpect), + NewRawConf = emqx_conf:get_raw([statsd]), + NewConf = emqx_conf:get([statsd]), + NewStatsDState = sys:get_state(emqx_statsd), + NewPid = erlang:whereis(emqx_statsd), + ?assertNotEqual(OldRawConf, NewRawConf), + ?assertEqual(NewRawConfExpect, NewRawConf), + ?assertEqual(maps:with(CommonKeys, NewConf), maps:with(CommonKeys, NewStatsDState)), + ?assertNotEqual(OldPid, NewPid) + after + {ok, _} = emqx_statsd_config:update(OldRawConf) + end. + request(Method) -> request(Method, []). request(Method, Body) ->