diff --git a/apps/emqx_statsd/i18n/emqx_statsd_schema_i18n.conf b/apps/emqx_statsd/i18n/emqx_statsd_schema_i18n.conf index 9c6eb5afb..46d654a46 100644 --- a/apps/emqx_statsd/i18n/emqx_statsd_schema_i18n.conf +++ b/apps/emqx_statsd/i18n/emqx_statsd_schema_i18n.conf @@ -45,6 +45,12 @@ emqx_statsd_schema { zh: """指标的推送间隔。""" } } + tags { + desc { + en: """The tags for metrics.""" + zh: """指标的标签。""" + } + } enable { desc { diff --git a/apps/emqx_statsd/include/emqx_statsd.hrl b/apps/emqx_statsd/include/emqx_statsd.hrl index 52f8774c0..92d856670 100644 --- a/apps/emqx_statsd/include/emqx_statsd.hrl +++ b/apps/emqx_statsd/include/emqx_statsd.hrl @@ -1,5 +1,2 @@ -define(APP, emqx_statsd). --define(DEFAULT_SAMPLE_TIME_INTERVAL, 10000). --define(DEFAULT_FLUSH_TIME_INTERVAL, 10000). --define(DEFAULT_HOST, "127.0.0.1"). --define(DEFAULT_PORT, 8125). +-define(STATSD, [statsd]). diff --git a/apps/emqx_statsd/src/emqx_statsd.app.src b/apps/emqx_statsd/src/emqx_statsd.app.src index 76b04204b..5f32567d6 100644 --- a/apps/emqx_statsd/src/emqx_statsd.app.src +++ b/apps/emqx_statsd/src/emqx_statsd.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_statsd, [ - {description, "An OTP application"}, - {vsn, "5.0.2"}, + {description, "EMQX Statsd"}, + {vsn, "5.0.3"}, {registered, []}, {mod, {emqx_statsd_app, []}}, {applications, [ diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 8154c9027..4b0a98cd3 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -28,18 +28,17 @@ -include_lib("emqx/include/logger.hrl"). -export([ - update/1, start/0, stop/0, restart/0, - %% for rpc + %% for rpc: remove after 5.1.x do_start/0, do_stop/0, do_restart/0 ]). %% Interface --export([start_link/1]). +-export([start_link/0]). %% Internal Exports -export([ @@ -51,40 +50,15 @@ terminate/2 ]). --record(state, { - timer :: reference() | undefined, - sample_time_interval :: pos_integer(), - flush_time_interval :: pos_integer(), - estatsd_pid :: pid() -}). - -update(Config) -> - case - emqx_conf:update( - [statsd], - Config, - #{rawconf_with_defaults => true, override_to => cluster} - ) - of - {ok, #{raw_config := NewConfigRows}} -> - ok = stop(), - case maps:get(<<"enable">>, Config, true) of - true -> - ok = restart(); - false -> - ok = stop() - end, - {ok, NewConfigRows}; - {error, Reason} -> - {error, Reason} - end. +-define(SAMPLE_TIMEOUT, sample_timeout). +%% Remove after 5.1.x start() -> check_multicall_result(emqx_statsd_proto_v1:start(mria_mnesia:running_nodes())). stop() -> check_multicall_result(emqx_statsd_proto_v1:stop(mria_mnesia:running_nodes())). restart() -> check_multicall_result(emqx_statsd_proto_v1:restart(mria_mnesia:running_nodes())). do_start() -> - emqx_statsd_sup:ensure_child_started(?APP, emqx_conf:get([statsd], #{})). + emqx_statsd_sup:ensure_child_started(?APP). do_stop() -> emqx_statsd_sup:ensure_child_stopped(?APP). @@ -94,59 +68,51 @@ do_restart() -> ok = do_start(), ok. -start_link(Opts) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -init([Opts]) -> +init([]) -> process_flag(trap_exit, true), - Tags = tags(maps:get(tags, Opts, #{})), - {Host, Port} = maps:get(server, Opts, {?DEFAULT_HOST, ?DEFAULT_PORT}), - Opts1 = maps:without( - [ - sample_time_interval, - flush_time_interval - ], - Opts#{ - tags => Tags, - host => Host, - port => Port, - prefix => <<"emqx">> - } - ), - {ok, Pid} = estatsd:start_link(maps:to_list(Opts1)), - SampleTimeInterval = maps:get(sample_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL), - FlushTimeInterval = maps:get(flush_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL), + #{ + tags := TagsRaw, + server := {Host, Port}, + sample_time_interval := SampleTimeInterval, + flush_time_interval := FlushTimeInterval + } = emqx_conf:get([statsd]), + Tags = maps:fold(fun(K, V, Acc) -> [{to_bin(K), to_bin(V)} | Acc] end, [], TagsRaw), + Opts = [{tags, Tags}, {host, Host}, {port, Port}, {prefix, <<"emqx">>}], + {ok, Pid} = estatsd:start_link(Opts), {ok, - ensure_timer(#state{ - sample_time_interval = SampleTimeInterval, - flush_time_interval = FlushTimeInterval, - estatsd_pid = Pid + ensure_timer(#{ + sample_time_interval => SampleTimeInterval, + flush_time_interval => FlushTimeInterval, + estatsd_pid => Pid })}. handle_call(_Req, _From, State) -> - {noreply, State}. + {reply, ignore, State}. handle_cast(_Msg, State) -> {noreply, State}. handle_info( - {timeout, Ref, sample_timeout}, - State = #state{ - sample_time_interval = SampleTimeInterval, - flush_time_interval = FlushTimeInterval, - estatsd_pid = Pid, - timer = Ref + {timeout, Ref, ?SAMPLE_TIMEOUT}, + State = #{ + sample_time_interval := SampleTimeInterval, + flush_time_interval := FlushTimeInterval, + estatsd_pid := Pid, + timer := Ref } ) -> Metrics = emqx_metrics:all() ++ emqx_stats:getstats() ++ emqx_vm_data(), SampleRate = SampleTimeInterval / FlushTimeInterval, StatsdMetrics = [ - {gauge, trans_metrics_name(Name), Value, SampleRate, []} + {gauge, Name, Value, SampleRate, []} || {Name, Value} <- Metrics ], - estatsd:submit(Pid, StatsdMetrics), - {noreply, ensure_timer(State)}; -handle_info({'EXIT', Pid, Error}, State = #state{estatsd_pid = Pid}) -> + ok = estatsd:submit(Pid, StatsdMetrics), + {noreply, ensure_timer(State), hibernate}; +handle_info({'EXIT', Pid, Error}, State = #{estatsd_pid := Pid}) -> {stop, {shutdown, Error}, State}; handle_info(_Msg, State) -> {noreply, State}. @@ -154,16 +120,13 @@ handle_info(_Msg, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -terminate(_Reason, #state{estatsd_pid = Pid}) -> +terminate(_Reason, #{estatsd_pid := Pid}) -> estatsd:stop(Pid), ok. %%------------------------------------------------------------------------------ %% Internal function %%------------------------------------------------------------------------------ -trans_metrics_name(Name) -> - Name0 = atom_to_binary(Name, utf8), - binary_to_atom(<<"emqx.", Name0/binary>>, utf8). emqx_vm_data() -> Idle = @@ -179,12 +142,8 @@ emqx_vm_data() -> {cpu_use, 100 - Idle} ] ++ emqx_vm:mem_info(). -tags(Map) -> - Tags = maps:to_list(Map), - [{atom_to_binary(Key, utf8), Value} || {Key, Value} <- Tags]. - -ensure_timer(State = #state{sample_time_interval = SampleTimeInterval}) -> - State#state{timer = emqx_misc:start_timer(SampleTimeInterval, sample_timeout)}. +ensure_timer(State = #{sample_time_interval := SampleTimeInterval}) -> + State#{timer => emqx_misc:start_timer(SampleTimeInterval, ?SAMPLE_TIMEOUT)}. check_multicall_result({Results, []}) -> case @@ -201,3 +160,8 @@ check_multicall_result({Results, []}) -> end; check_multicall_result({_, _}) -> error(multicall_failed). + +to_bin(B) when is_binary(B) -> B; +to_bin(I) when is_integer(I) -> integer_to_binary(I); +to_bin(L) when is_list(L) -> list_to_binary(L); +to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8). diff --git a/apps/emqx_statsd/src/emqx_statsd_api.erl b/apps/emqx_statsd/src/emqx_statsd_api.erl index 2f2e42303..6007a3327 100644 --- a/apps/emqx_statsd/src/emqx_statsd_api.erl +++ b/apps/emqx_statsd/src/emqx_statsd_api.erl @@ -77,15 +77,16 @@ statsd_config_schema() -> statsd_example() -> #{ enable => true, - flush_time_interval => "32s", - sample_time_interval => "32s", - server => "127.0.0.1:8125" + flush_time_interval => "30s", + sample_time_interval => "30s", + server => "127.0.0.1:8125", + tags => #{} }. statsd(get, _Params) -> {200, emqx:get_raw_config([<<"statsd">>], #{})}; statsd(put, #{body := Body}) -> - case emqx_statsd:update(Body) of + case emqx_statsd_config:update(Body) of {ok, NewConfig} -> {200, NewConfig}; {error, Reason} -> diff --git a/apps/emqx_statsd/src/emqx_statsd_app.erl b/apps/emqx_statsd/src/emqx_statsd_app.erl index 4b34006ac..b885772e0 100644 --- a/apps/emqx_statsd/src/emqx_statsd_app.erl +++ b/apps/emqx_statsd/src/emqx_statsd_app.erl @@ -27,15 +27,8 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_statsd_sup:start_link(), - maybe_enable_statsd(), + emqx_statsd_config:add_handler(), {ok, Sup}. stop(_) -> + emqx_statsd_config:remove_handler(), ok. - -maybe_enable_statsd() -> - case emqx_conf:get([statsd, enable], false) of - true -> - emqx_statsd_sup:ensure_child_started(?APP, emqx_conf:get([statsd], #{})); - false -> - ok - end. diff --git a/apps/emqx_statsd/src/emqx_statsd_config.erl b/apps/emqx_statsd/src/emqx_statsd_config.erl new file mode 100644 index 000000000..4ec71ed32 --- /dev/null +++ b/apps/emqx_statsd/src/emqx_statsd_config.erl @@ -0,0 +1,54 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_statsd_config). + +-behaviour(emqx_config_handler). + +-include("emqx_statsd.hrl"). + +-export([add_handler/0, remove_handler/0]). +-export([post_config_update/5]). +-export([update/1]). + +update(Config) -> + case + emqx_conf:update( + ?STATSD, + Config, + #{rawconf_with_defaults => true, override_to => cluster} + ) + of + {ok, #{raw_config := NewConfigRows}} -> + {ok, NewConfigRows}; + {error, Reason} -> + {error, Reason} + end. + +add_handler() -> + ok = emqx_config_handler:add_handler(?STATSD, ?MODULE), + ok. + +remove_handler() -> + ok = emqx_config_handler:remove_handler(?STATSD), + ok. + +post_config_update(?STATSD, _Req, #{enable := true}, _Old, _AppEnvs) -> + emqx_statsd_sup:ensure_child_stopped(?APP), + emqx_statsd_sup:ensure_child_started(?APP); +post_config_update(?STATSD, _Req, #{enable := false}, _Old, _AppEnvs) -> + emqx_statsd_sup:ensure_child_stopped(?APP); +post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) -> + ok. diff --git a/apps/emqx_statsd/src/emqx_statsd_schema.erl b/apps/emqx_statsd/src/emqx_statsd_schema.erl index 9efde5afc..3fb51f3bd 100644 --- a/apps/emqx_statsd/src/emqx_statsd_schema.erl +++ b/apps/emqx_statsd/src/emqx_statsd_schema.erl @@ -25,7 +25,8 @@ namespace/0, roots/0, fields/1, - desc/1 + desc/1, + validations/0 ]). namespace() -> "statsd". @@ -45,7 +46,8 @@ fields("statsd") -> )}, {server, fun server/1}, {sample_time_interval, fun sample_interval/1}, - {flush_time_interval, fun flush_interval/1} + {flush_time_interval, fun flush_interval/1}, + {tags, fun tags/1} ]. desc("statsd") -> ?DESC(statsd); @@ -59,12 +61,37 @@ server(_) -> undefined. sample_interval(type) -> emqx_schema:duration_ms(); sample_interval(required) -> true; -sample_interval(default) -> "10s"; +sample_interval(default) -> "30s"; sample_interval(desc) -> ?DESC(?FUNCTION_NAME); sample_interval(_) -> undefined. flush_interval(type) -> emqx_schema:duration_ms(); flush_interval(required) -> true; -flush_interval(default) -> "10s"; +flush_interval(default) -> "30s"; flush_interval(desc) -> ?DESC(?FUNCTION_NAME); flush_interval(_) -> undefined. + +tags(type) -> map(); +tags(required) -> false; +tags(default) -> #{}; +tags(desc) -> ?DESC(?FUNCTION_NAME); +tags(_) -> undefined. + +validations() -> + [ + {check_interval, fun check_interval/1} + ]. + +check_interval(Conf) -> + case hocon_maps:get("statsd.sample_time_interval", Conf) of + undefined -> + ok; + Sample -> + Flush = hocon_maps:get("statsd.flush_time_interval", Conf), + case Sample =< Flush of + true -> + true; + false -> + {bad_interval, #{sample_time_interval => Sample, flush_time_interval => Flush}} + end + end. diff --git a/apps/emqx_statsd/src/emqx_statsd_sup.erl b/apps/emqx_statsd/src/emqx_statsd_sup.erl index 851dbf8cc..f14242113 100644 --- a/apps/emqx_statsd/src/emqx_statsd_sup.erl +++ b/apps/emqx_statsd/src/emqx_statsd_sup.erl @@ -10,7 +10,6 @@ -export([ start_link/0, ensure_child_started/1, - ensure_child_started/2, ensure_child_stopped/1 ]). @@ -19,7 +18,7 @@ %% Helper macro for declaring children of supervisor -define(CHILD(Mod, Opts), #{ id => Mod, - start => {Mod, start_link, [Opts]}, + start => {Mod, start_link, Opts}, restart => permanent, shutdown => 5000, type => worker, @@ -29,13 +28,9 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). --spec ensure_child_started(supervisor:child_spec()) -> ok. -ensure_child_started(ChildSpec) when is_map(ChildSpec) -> - assert_started(supervisor:start_child(?MODULE, ChildSpec)). - --spec ensure_child_started(atom(), map()) -> ok. -ensure_child_started(Mod, Opts) when is_atom(Mod) andalso is_map(Opts) -> - assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Opts))). +-spec ensure_child_started(atom()) -> ok. +ensure_child_started(Mod) when is_atom(Mod) -> + assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, []))). %% @doc Stop the child worker process. -spec ensure_child_stopped(any()) -> ok. @@ -50,13 +45,17 @@ ensure_child_stopped(ChildId) -> end. init([]) -> - {ok, {{one_for_one, 10, 3600}, []}}. + Children = + case emqx_conf:get([statsd, enable], false) of + true -> [?CHILD(emqx_statsd, [])]; + false -> [] + end, + {ok, {{one_for_one, 100, 3600}, Children}}. %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- assert_started({ok, _Pid}) -> ok; -assert_started({ok, _Pid, _Info}) -> ok; assert_started({error, {already_started, _Pid}}) -> ok; assert_started({error, Reason}) -> erlang:error(Reason). diff --git a/apps/emqx_statsd/test/emqx_statsd_SUITE.erl b/apps/emqx_statsd/test/emqx_statsd_SUITE.erl index 08c78dd07..2b5074f48 100644 --- a/apps/emqx_statsd/test/emqx_statsd_SUITE.erl +++ b/apps/emqx_statsd/test/emqx_statsd_SUITE.erl @@ -5,28 +5,104 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). +-import(emqx_dashboard_api_test_helpers, [request/3, uri/1]). + +-define(BASE_CONF, << + "\n" + "statsd {\n" + "enable = true\n" + "flush_time_interval = 4s\n" + "sample_time_interval = 4s\n" + "server = \"127.0.0.1:8126\"\n" + "tags {\"t1\" = \"good\", test = 100}\n" + "}\n" +>>). init_per_suite(Config) -> - emqx_common_test_helpers:start_apps([emqx_statsd]), + emqx_common_test_helpers:start_apps( + [emqx_conf, emqx_dashboard, emqx_statsd], + fun set_special_configs/1 + ), + ok = emqx_common_test_helpers:load_config(emqx_statsd_schema, ?BASE_CONF, #{ + raw_with_default => true + }), Config. end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_statsd]). + emqx_common_test_helpers:stop_apps([emqx_statsd, emqx_dashboard, emqx_conf]). + +set_special_configs(emqx_dashboard) -> + emqx_dashboard_api_test_helpers:set_default_config(); +set_special_configs(_) -> + ok. all() -> emqx_common_test_helpers:all(?MODULE). t_statsd(_) -> - {ok, Socket} = gen_udp:open(8125), + {ok, Socket} = gen_udp:open(8126, [{active, true}]), receive - {udp, _Socket, _Host, _Port, Bin} -> - ?assert(length(Bin) > 50) - after 11 * 1000 -> - ?assert(true, failed) + {udp, Socket1, Host, Port, Data} -> + ct:pal("receive:~p~n", [{Socket, Socket1, Host, Port}]), + ?assert(length(Data) > 50), + ?assert(nomatch =/= string:find(Data, "\nemqx.cpu_use:")) + after 10 * 1000 -> + error(timeout) end, gen_udp:close(Socket). t_management(_) -> ?assertMatch(ok, emqx_statsd:start()), + ?assertMatch(ok, emqx_statsd:start()), + ?assertMatch(ok, emqx_statsd:stop()), ?assertMatch(ok, emqx_statsd:stop()), ?assertMatch(ok, emqx_statsd:restart()). + +t_rest_http(_) -> + {ok, Res0} = request(get), + ?assertEqual( + #{ + <<"enable">> => true, + <<"flush_time_interval">> => <<"4s">>, + <<"sample_time_interval">> => <<"4s">>, + <<"server">> => <<"127.0.0.1:8126">>, + <<"tags">> => #{<<"t1">> => <<"good">>, <<"test">> => 100} + }, + Res0 + ), + {ok, Res1} = request(put, #{enable => false}), + ?assertMatch(#{<<"enable">> := false}, Res1), + ?assertEqual(maps:remove(<<"enable">>, Res0), maps:remove(<<"enable">>, Res1)), + {ok, Res2} = request(get), + ?assertEqual(Res1, Res2), + ?assertEqual( + error, request(put, #{sample_time_interval => "11s", flush_time_interval => "10s"}) + ), + {ok, _} = request(put, #{enable => true}), + ok. + +t_kill_exit(_) -> + {ok, _} = request(put, #{enable => true}), + Pid = erlang:whereis(emqx_statsd), + ?assertEqual(ignore, gen_server:call(Pid, whatever)), + ?assertEqual(ok, gen_server:cast(Pid, whatever)), + ?assertEqual(Pid, erlang:whereis(emqx_statsd)), + #{estatsd_pid := Estatsd} = sys:get_state(emqx_statsd), + ?assert(erlang:exit(Estatsd, kill)), + ?assertEqual(false, is_process_alive(Estatsd)), + ct:sleep(150), + Pid1 = erlang:whereis(emqx_statsd), + ?assertNotEqual(Pid, Pid1), + #{estatsd_pid := Estatsd1} = sys:get_state(emqx_statsd), + ?assertNotEqual(Estatsd, Estatsd1), + ok. + +request(Method) -> request(Method, []). + +request(Method, Body) -> + case request(Method, uri(["statsd"]), Body) of + {ok, 200, Res} -> + {ok, emqx_json:decode(Res, [return_maps])}; + {ok, _Status, _} -> + error + end. diff --git a/changes/v5.0.11-en.md b/changes/v5.0.11-en.md index 45547e1eb..59ef42b4d 100644 --- a/changes/v5.0.11-en.md +++ b/changes/v5.0.11-en.md @@ -15,8 +15,11 @@ automatically if needed. Use `PUT /gateways/{name}/enable/{true|false}` to enable or disable gateway. No more `DELETE /gateways/{name}`. +- Support `statsd {tags: {"user-defined-tag" = "tag-value"}` configure and improve stability of `emqx_statsd` [#9363](http://github.com/emqx/emqx/pull/9363). + - Improve node name generation rules to avoid potential atom table overflow risk [#9387](https://github.com/emqx/emqx/pull/9387). + ## Bug fixes - Fix create trace sometime failed by end_at time has already passed. [#9303](https://github.com/emqx/emqx/pull/9303) diff --git a/changes/v5.0.11-zh.md b/changes/v5.0.11-zh.md index d372c15cb..b48c09e7a 100644 --- a/changes/v5.0.11-zh.md +++ b/changes/v5.0.11-zh.md @@ -13,6 +13,8 @@ - 重新设计了 /gateways API [9364](https://github.com/emqx/emqx/pull/9364)。 使用 PUT /gateways/{name} 代替了 POST /gateways,现在网关将在需要时自动加载,然后删除了 DELETE /gateways/{name},之后可以使用 PUT /gateways/{name}/enable/{true|false} 来开启或禁用网关。 +- 支持 `statsd {tags: {"user-defined-tag" = "tag-value"}` 配置,并提升 `emqx_statsd` 的稳定性 [#9363](http://github.com/emqx/emqx/pull/9363)。 + - 改进了节点名称生成规则,以避免潜在的原子表溢出风险 [#9387](https://github.com/emqx/emqx/pull/9387)。 ## 修复