From 1776acee8a6d7be44d03d849f61b916756adb2cc Mon Sep 17 00:00:00 2001 From: Turtle Date: Thu, 22 Jul 2021 18:08:55 +0800 Subject: [PATCH] refactor(statsd): optimize default value --- apps/emqx_statsd/include/emqx_statsd.hrl | 4 +- apps/emqx_statsd/src/emqx_statsd.app.src | 2 +- apps/emqx_statsd/src/emqx_statsd.erl | 179 +++++++++++--------- apps/emqx_statsd/src/emqx_statsd_app.erl | 11 +- apps/emqx_statsd/src/emqx_statsd_sup.erl | 56 +----- apps/emqx_statsd/test/emqx_statsd_SUITE.erl | 2 +- 6 files changed, 111 insertions(+), 143 deletions(-) diff --git a/apps/emqx_statsd/include/emqx_statsd.hrl b/apps/emqx_statsd/include/emqx_statsd.hrl index d88dbccbd..9e44c9fa6 100644 --- a/apps/emqx_statsd/include/emqx_statsd.hrl +++ b/apps/emqx_statsd/include/emqx_statsd.hrl @@ -5,5 +5,5 @@ -define(DEFAULT_PREFIX, undefined). -define(DEFAULT_TAGS, #{}). -define(DEFAULT_BATCH_SIZE, 10). --define(DEFAULT_SAMPLE_TIME_INTERVAL, 10). --define(DEFAULT_FLUSH_TIME_INTERVAL, 10). \ No newline at end of file +-define(DEFAULT_SAMPLE_TIME_INTERVAL, 10000). +-define(DEFAULT_FLUSH_TIME_INTERVAL, 10000). diff --git a/apps/emqx_statsd/src/emqx_statsd.app.src b/apps/emqx_statsd/src/emqx_statsd.app.src index 04338fd62..2885f4dc9 100644 --- a/apps/emqx_statsd/src/emqx_statsd.app.src +++ b/apps/emqx_statsd/src/emqx_statsd.app.src @@ -1,6 +1,6 @@ {application, emqx_statsd, [{description, "An OTP application"}, - {vsn, "0.1.0"}, + {vsn, "5.0.0"}, {registered, []}, {mod, {emqx_statsd_app, []}}, {applications, diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 3a73de664..6301e4563 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -1,100 +1,115 @@ %%-------------------------------------------------------------------- - %% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. - %% - %% Licensed under the Apache License, Version 2.0 (the "License"); - %% you may not use this file except in compliance with the License. - %% You may obtain a copy of the License at - %% - %% http://www.apache.org/licenses/LICENSE-2.0 - %% - %% Unless required by applicable law or agreed to in writing, software - %% distributed under the License is distributed on an "AS IS" BASIS, - %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - %% See the License for the specific language governing permissions and - %% limitations under the License. - %%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- - -module(emqx_statsd). +-module(emqx_statsd). - -behaviour(gen_server). +-behaviour(gen_server). - -ifdef(TEST). - -compile(export_all). - -compile(nowarn_export_all). - -endif. +-ifdef(TEST). +-compile(export_all). +-compile(nowarn_export_all). +-endif. - -include_lib("emqx/include/logger.hrl"). +-include("emqx_statsd.hrl"). - %% Interface - -export([start_link/1]). +%% Interface +-export([start_link/1]). - %% Internal Exports - -export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , code_change/3 - , terminate/2 - ]). +%% Internal Exports +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , code_change/3 + , terminate/2 + ]). - -record(state, { - timer :: reference(), - sample_time_interval :: pos_integer(), - flush_time_interval :: pos_integer(), - estatsd_pid :: pid() - }). +-record(state, { + timer :: reference() | undefined, + sample_time_interval :: pos_integer(), + flush_time_interval :: pos_integer(), + estatsd_pid :: pid() +}). - start_link(Opts) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). +start_link(Opts) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). - init([Opts]) -> - SampleTimeInterval = proplists:get_value(sample_time_interval, Opts), - FlushTimeInterval = proplists:get_value(flush_time_interval, Opts), - Ref = erlang:start_timer(SampleTimeInterval, self(), sample_timeout), - Pid = proplists:get_value(estatsd_pid, Opts), - {ok, #state{timer = Ref, - sample_time_interval = SampleTimeInterval, - flush_time_interval = FlushTimeInterval, - estatsd_pid = Pid}}. +init([Opts]) -> + process_flag(trap_exit, true), + Tags = tags(maps:get(tags, Opts, ?DEFAULT_TAGS)), + Opts1 = maps:without([sample_time_interval, + flush_time_interval], Opts#{tags => Tags}), + {ok, Pid} = estatsd:start_link(maps:to_list(Opts1)), + SampleTimeInterval = maps:get(sample_time_interval, Opts, ?DEFAULT_SAMPLE_TIME_INTERVAL), + FlushTimeInterval = maps:get(flush_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL), + {ok, ensure_timer(#state{sample_time_interval = SampleTimeInterval, + flush_time_interval = FlushTimeInterval, + estatsd_pid = Pid})}. - handle_call(_Req, _From, State) -> - {noreply, State}. +handle_call(_Req, _From, State) -> + {noreply, State}. - handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast(_Msg, State) -> + {noreply, State}. - handle_info({timeout, Ref, sample_timeout}, State = #state{sample_time_interval = SampleTimeInterval, - flush_time_interval = FlushTimeInterval, - estatsd_pid = Pid, - timer = Ref}) -> - Metrics = emqx_metrics:all() ++ emqx_stats:getstats() ++ emqx_vm_data(), - SampleRate = SampleTimeInterval / FlushTimeInterval, - StatsdMetrics = [{gauge, trans_metrics_name(Name), Value, SampleRate, []} || {Name, Value} <- Metrics], - estatsd:submit(Pid, StatsdMetrics), - {noreply, State#state{timer = erlang:start_timer(SampleTimeInterval, self(), sample_timeout)}}; +handle_info({timeout, Ref, sample_timeout}, + State = #state{sample_time_interval = SampleTimeInterval, + flush_time_interval = FlushTimeInterval, + estatsd_pid = Pid, + timer = Ref}) -> + Metrics = emqx_metrics:all() ++ emqx_stats:getstats() ++ emqx_vm_data(), + SampleRate = SampleTimeInterval / FlushTimeInterval, + StatsdMetrics = [{gauge, trans_metrics_name(Name), Value, SampleRate, []} || {Name, Value} <- Metrics], + estatsd:submit(Pid, StatsdMetrics), + {noreply, ensure_timer(State)}; - handle_info(_Msg, State) -> - {noreply, State}. +handle_info({'EXIT', Pid, Error}, State = #state{estatsd_pid = Pid}) -> + {stop, {shutdown, Error}, State}; - code_change(_OldVsn, State, _Extra) -> - {ok, State}. +handle_info(_Msg, State) -> + {noreply, State}. - terminate(_Reason, _State) -> - ok. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. - %%------------------------------------------------------------------------------ - %% Internale function - %%------------------------------------------------------------------------------ - trans_metrics_name(Name) -> - Name0 = atom_to_binary(Name, utf8), - binary_to_atom(<<"emqx.", Name0/binary>>, utf8). +terminate(_Reason, #state{estatsd_pid = Pid}) -> + estatsd:stop(Pid), + ok. - emqx_vm_data() -> - Idle = case cpu_sup:util([detailed]) of - {_, 0, 0, _} -> 0; %% Not support for Windows - {_Num, _Use, IdleList, _} -> proplists:get_value(idle, IdleList, 0) - end, - RunQueue = erlang:statistics(run_queue), - [{run_queue, RunQueue}, - {cpu_idle, Idle}, - {cpu_use, 100 - Idle}] ++ emqx_vm:mem_info(). +%%------------------------------------------------------------------------------ +%% Internale function +%%------------------------------------------------------------------------------ +trans_metrics_name(Name) -> + Name0 = atom_to_binary(Name, utf8), + binary_to_atom(<<"emqx.", Name0/binary>>, utf8). + +emqx_vm_data() -> + Idle = case cpu_sup:util([detailed]) of + {_, 0, 0, _} -> 0; %% Not support for Windows + {_Num, _Use, IdleList, _} -> proplists:get_value(idle, IdleList, 0) + end, + RunQueue = erlang:statistics(run_queue), + [{run_queue, RunQueue}, + {cpu_idle, Idle}, + {cpu_use, 100 - Idle}] ++ emqx_vm:mem_info(). + +tags(Map) -> + Tags = maps:to_list(Map), + [{atom_to_binary(Key, utf8), Value} || {Key, Value} <- Tags]. + + +ensure_timer(State =#state{sample_time_interval = SampleTimeInterval}) -> + State#state{timer = emqx_misc:start_timer(SampleTimeInterval, sample_timeout)}. diff --git a/apps/emqx_statsd/src/emqx_statsd_app.erl b/apps/emqx_statsd/src/emqx_statsd_app.erl index cd998b158..34e7ffd08 100644 --- a/apps/emqx_statsd/src/emqx_statsd_app.erl +++ b/apps/emqx_statsd/src/emqx_statsd_app.erl @@ -18,21 +18,12 @@ -behaviour(application). --include_lib("emqx/include/logger.hrl"). - - -emqx_plugin(?MODULE). - -export([ start/2 , stop/1 ]). start(_StartType, _StartArgs) -> - {ok, Sup} = emqx_statsd_sup:start_link(), - {ok, _} = emqx_statsd_sup:start_statsd(), - ?LOG(info, "emqx statsd start: successfully"), - {ok, Sup}. + emqx_statsd_sup:start_link(). stop(_) -> - ok = emqx_statsd_sup:stop_statsd(), - ?LOG(info, "emqx statsd stop: successfully"), ok. diff --git a/apps/emqx_statsd/src/emqx_statsd_sup.erl b/apps/emqx_statsd/src/emqx_statsd_sup.erl index e33ea5493..84b36d323 100644 --- a/apps/emqx_statsd/src/emqx_statsd_sup.erl +++ b/apps/emqx_statsd/src/emqx_statsd_sup.erl @@ -11,59 +11,21 @@ -export([start_link/0]). --export([start_statsd/0, stop_statsd/0]). - -export([init/1]). --export([estatsd_options/0]). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). - init([]) -> - {ok, {{one_for_one, 10, 100}, []}}. +init([]) -> + Opts = emqx_config:get([?APP], #{}), + {ok, {{one_for_one, 10, 3600}, + [#{id => emqx_statsd, + start => {emqx_statsd, start_link, [Opts]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_statsd]}]}}. -start_statsd() -> - {ok, Pid} = supervisor:start_child(?MODULE, estatsd_child_spec()), - {ok, _Pid1} = supervisor:start_child(?MODULE, emqx_statsd_child_spec(Pid)). -stop_statsd() -> - ok = supervisor:terminate_child(?MODULE, emqx_statsd), - ok = supervisor:terminate_child(?MODULE, estatsd). -%%============================================================================================== -%% internal -estatsd_child_spec() -> - #{id => estatsd - , start => {estatsd, start_link, [estatsd_options()]} - , restart => permanent - , shutdown => 5000 - , type => worker - , modules => [estatsd]}. -estatsd_options() -> - Host = get_conf(host, ?DEFAULT_HOST), - Port = get_conf(port, ?DEFAULT_PORT), - Prefix = get_conf(prefix, ?DEFAULT_PREFIX), - Tags = tags(get_conf(tags, ?DEFAULT_TAGS)), - BatchSize = get_conf(batch_size, ?DEFAULT_BATCH_SIZE), - [{host, Host}, {port, Port}, {prefix, Prefix}, {tags, Tags}, {batch_size, BatchSize}]. - -tags(Map) -> - Tags = maps:to_list(Map), - [{atom_to_binary(Key, utf8), Value} || {Key, Value} <- Tags]. - -emqx_statsd_child_spec(Pid) -> - #{id => emqx_statsd - , start => {emqx_statsd, start_link, [[{estatsd_pid, Pid} | emqx_statsd_options()]]} - , restart => permanent - , shutdown => 5000 - , type => worker - , modules => [emqx_statsd]}. - -emqx_statsd_options() -> - SampleTimeInterval = get_conf(sample_time_interval, ?DEFAULT_SAMPLE_TIME_INTERVAL) * 1000, - FlushTimeInterval = get_conf(flush_time_interval, ?DEFAULT_FLUSH_TIME_INTERVAL) * 1000, - [{sample_time_interval, SampleTimeInterval}, {flush_time_interval, FlushTimeInterval}]. - -get_conf(Key, Default) -> - emqx_config:get([?APP, Key], Default). diff --git a/apps/emqx_statsd/test/emqx_statsd_SUITE.erl b/apps/emqx_statsd/test/emqx_statsd_SUITE.erl index 2855e8ee5..9d06ee351 100644 --- a/apps/emqx_statsd/test/emqx_statsd_SUITE.erl +++ b/apps/emqx_statsd/test/emqx_statsd_SUITE.erl @@ -13,7 +13,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([emqx_statsd]). -all() -> +all() -> emqx_ct:all(?MODULE). t_statsd(_) ->