From 6edd66c9ad01325fcb327bbcffe15582581a2561 Mon Sep 17 00:00:00 2001 From: Turtle Date: Thu, 22 Jul 2021 18:08:55 +0800 Subject: [PATCH 1/4] refactor(statsd): optimize default value --- apps/emqx_statsd/include/emqx_statsd.hrl | 4 +- apps/emqx_statsd/src/emqx_statsd.app.src | 2 +- apps/emqx_statsd/src/emqx_statsd.erl | 179 +++++++++++--------- apps/emqx_statsd/src/emqx_statsd_app.erl | 11 +- apps/emqx_statsd/src/emqx_statsd_sup.erl | 56 +----- apps/emqx_statsd/test/emqx_statsd_SUITE.erl | 2 +- 6 files changed, 111 insertions(+), 143 deletions(-) diff --git a/apps/emqx_statsd/include/emqx_statsd.hrl b/apps/emqx_statsd/include/emqx_statsd.hrl index d88dbccbd..9e44c9fa6 100644 --- a/apps/emqx_statsd/include/emqx_statsd.hrl +++ b/apps/emqx_statsd/include/emqx_statsd.hrl @@ -5,5 +5,5 @@ -define(DEFAULT_PREFIX, undefined). -define(DEFAULT_TAGS, #{}). -define(DEFAULT_BATCH_SIZE, 10). --define(DEFAULT_SAMPLE_TIME_INTERVAL, 10). --define(DEFAULT_FLUSH_TIME_INTERVAL, 10). \ No newline at end of file +-define(DEFAULT_SAMPLE_TIME_INTERVAL, 10000). +-define(DEFAULT_FLUSH_TIME_INTERVAL, 10000). diff --git a/apps/emqx_statsd/src/emqx_statsd.app.src b/apps/emqx_statsd/src/emqx_statsd.app.src index 04338fd62..2885f4dc9 100644 --- a/apps/emqx_statsd/src/emqx_statsd.app.src +++ b/apps/emqx_statsd/src/emqx_statsd.app.src @@ -1,6 +1,6 @@ {application, emqx_statsd, [{description, "An OTP application"}, - {vsn, "0.1.0"}, + {vsn, "5.0.0"}, {registered, []}, {mod, {emqx_statsd_app, []}}, {applications, diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 3a73de664..6301e4563 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -1,100 +1,115 @@ %%-------------------------------------------------------------------- - %% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. - %% - %% Licensed under the Apache License, Version 2.0 (the "License"); - %% you may not use this file except in compliance with the License. - %% You may obtain a copy of the License at - %% - %% http://www.apache.org/licenses/LICENSE-2.0 - %% - %% Unless required by applicable law or agreed to in writing, software - %% distributed under the License is distributed on an "AS IS" BASIS, - %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - %% See the License for the specific language governing permissions and - %% limitations under the License. - %%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- - -module(emqx_statsd). +-module(emqx_statsd). - -behaviour(gen_server). +-behaviour(gen_server). - -ifdef(TEST). - -compile(export_all). - -compile(nowarn_export_all). - -endif. +-ifdef(TEST). +-compile(export_all). +-compile(nowarn_export_all). +-endif. - -include_lib("emqx/include/logger.hrl"). +-include("emqx_statsd.hrl"). - %% Interface - -export([start_link/1]). +%% Interface +-export([start_link/1]). - %% Internal Exports - -export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , code_change/3 - , terminate/2 - ]). +%% Internal Exports +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , code_change/3 + , terminate/2 + ]). - -record(state, { - timer :: reference(), - sample_time_interval :: pos_integer(), - flush_time_interval :: pos_integer(), - estatsd_pid :: pid() - }). +-record(state, { + timer :: reference() | undefined, + sample_time_interval :: pos_integer(), + flush_time_interval :: pos_integer(), + estatsd_pid :: pid() +}). - start_link(Opts) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). +start_link(Opts) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). - init([Opts]) -> - SampleTimeInterval = proplists:get_value(sample_time_interval, Opts), - FlushTimeInterval = proplists:get_value(flush_time_interval, Opts), - Ref = erlang:start_timer(SampleTimeInterval, self(), sample_timeout), - Pid = proplists:get_value(estatsd_pid, Opts), - {ok, #state{timer = Ref, - sample_time_interval = SampleTimeInterval, - flush_time_interval = FlushTimeInterval, - estatsd_pid = Pid}}. +init([Opts]) -> + process_flag(trap_exit, true), + Tags = tags(maps:get(tags, Opts, ?DEFAULT_TAGS)), + Opts1 = maps:without([sample_time_interval, + flush_time_interval], Opts#{tags => Tags}), + {ok, Pid} = estatsd:start_link(maps:to_list(Opts1)), + SampleTimeInterval = maps:get(sample_time_interval, Opts, ?DEFAULT_SAMPLE_TIME_INTERVAL), + FlushTimeInterval = maps:get(flush_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL), + {ok, ensure_timer(#state{sample_time_interval = SampleTimeInterval, + flush_time_interval = FlushTimeInterval, + estatsd_pid = Pid})}. - handle_call(_Req, _From, State) -> - {noreply, State}. +handle_call(_Req, _From, State) -> + {noreply, State}. - handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast(_Msg, State) -> + {noreply, State}. - handle_info({timeout, Ref, sample_timeout}, State = #state{sample_time_interval = SampleTimeInterval, - flush_time_interval = FlushTimeInterval, - estatsd_pid = Pid, - timer = Ref}) -> - Metrics = emqx_metrics:all() ++ emqx_stats:getstats() ++ emqx_vm_data(), - SampleRate = SampleTimeInterval / FlushTimeInterval, - StatsdMetrics = [{gauge, trans_metrics_name(Name), Value, SampleRate, []} || {Name, Value} <- Metrics], - estatsd:submit(Pid, StatsdMetrics), - {noreply, State#state{timer = erlang:start_timer(SampleTimeInterval, self(), sample_timeout)}}; +handle_info({timeout, Ref, sample_timeout}, + State = #state{sample_time_interval = SampleTimeInterval, + flush_time_interval = FlushTimeInterval, + estatsd_pid = Pid, + timer = Ref}) -> + Metrics = emqx_metrics:all() ++ emqx_stats:getstats() ++ emqx_vm_data(), + SampleRate = SampleTimeInterval / FlushTimeInterval, + StatsdMetrics = [{gauge, trans_metrics_name(Name), Value, SampleRate, []} || {Name, Value} <- Metrics], + estatsd:submit(Pid, StatsdMetrics), + {noreply, ensure_timer(State)}; - handle_info(_Msg, State) -> - {noreply, State}. +handle_info({'EXIT', Pid, Error}, State = #state{estatsd_pid = Pid}) -> + {stop, {shutdown, Error}, State}; - code_change(_OldVsn, State, _Extra) -> - {ok, State}. +handle_info(_Msg, State) -> + {noreply, State}. - terminate(_Reason, _State) -> - ok. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. - %%------------------------------------------------------------------------------ - %% Internale function - %%------------------------------------------------------------------------------ - trans_metrics_name(Name) -> - Name0 = atom_to_binary(Name, utf8), - binary_to_atom(<<"emqx.", Name0/binary>>, utf8). +terminate(_Reason, #state{estatsd_pid = Pid}) -> + estatsd:stop(Pid), + ok. - emqx_vm_data() -> - Idle = case cpu_sup:util([detailed]) of - {_, 0, 0, _} -> 0; %% Not support for Windows - {_Num, _Use, IdleList, _} -> proplists:get_value(idle, IdleList, 0) - end, - RunQueue = erlang:statistics(run_queue), - [{run_queue, RunQueue}, - {cpu_idle, Idle}, - {cpu_use, 100 - Idle}] ++ emqx_vm:mem_info(). +%%------------------------------------------------------------------------------ +%% Internale function +%%------------------------------------------------------------------------------ +trans_metrics_name(Name) -> + Name0 = atom_to_binary(Name, utf8), + binary_to_atom(<<"emqx.", Name0/binary>>, utf8). + +emqx_vm_data() -> + Idle = case cpu_sup:util([detailed]) of + {_, 0, 0, _} -> 0; %% Not support for Windows + {_Num, _Use, IdleList, _} -> proplists:get_value(idle, IdleList, 0) + end, + RunQueue = erlang:statistics(run_queue), + [{run_queue, RunQueue}, + {cpu_idle, Idle}, + {cpu_use, 100 - Idle}] ++ emqx_vm:mem_info(). + +tags(Map) -> + Tags = maps:to_list(Map), + [{atom_to_binary(Key, utf8), Value} || {Key, Value} <- Tags]. + + +ensure_timer(State =#state{sample_time_interval = SampleTimeInterval}) -> + State#state{timer = emqx_misc:start_timer(SampleTimeInterval, sample_timeout)}. diff --git a/apps/emqx_statsd/src/emqx_statsd_app.erl b/apps/emqx_statsd/src/emqx_statsd_app.erl index cd998b158..34e7ffd08 100644 --- a/apps/emqx_statsd/src/emqx_statsd_app.erl +++ b/apps/emqx_statsd/src/emqx_statsd_app.erl @@ -18,21 +18,12 @@ -behaviour(application). --include_lib("emqx/include/logger.hrl"). - - -emqx_plugin(?MODULE). - -export([ start/2 , stop/1 ]). start(_StartType, _StartArgs) -> - {ok, Sup} = emqx_statsd_sup:start_link(), - {ok, _} = emqx_statsd_sup:start_statsd(), - ?LOG(info, "emqx statsd start: successfully"), - {ok, Sup}. + emqx_statsd_sup:start_link(). stop(_) -> - ok = emqx_statsd_sup:stop_statsd(), - ?LOG(info, "emqx statsd stop: successfully"), ok. diff --git a/apps/emqx_statsd/src/emqx_statsd_sup.erl b/apps/emqx_statsd/src/emqx_statsd_sup.erl index e33ea5493..84b36d323 100644 --- a/apps/emqx_statsd/src/emqx_statsd_sup.erl +++ b/apps/emqx_statsd/src/emqx_statsd_sup.erl @@ -11,59 +11,21 @@ -export([start_link/0]). --export([start_statsd/0, stop_statsd/0]). - -export([init/1]). --export([estatsd_options/0]). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). - init([]) -> - {ok, {{one_for_one, 10, 100}, []}}. +init([]) -> + Opts = emqx_config:get([?APP], #{}), + {ok, {{one_for_one, 10, 3600}, + [#{id => emqx_statsd, + start => {emqx_statsd, start_link, [Opts]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_statsd]}]}}. -start_statsd() -> - {ok, Pid} = supervisor:start_child(?MODULE, estatsd_child_spec()), - {ok, _Pid1} = supervisor:start_child(?MODULE, emqx_statsd_child_spec(Pid)). -stop_statsd() -> - ok = supervisor:terminate_child(?MODULE, emqx_statsd), - ok = supervisor:terminate_child(?MODULE, estatsd). -%%============================================================================================== -%% internal -estatsd_child_spec() -> - #{id => estatsd - , start => {estatsd, start_link, [estatsd_options()]} - , restart => permanent - , shutdown => 5000 - , type => worker - , modules => [estatsd]}. -estatsd_options() -> - Host = get_conf(host, ?DEFAULT_HOST), - Port = get_conf(port, ?DEFAULT_PORT), - Prefix = get_conf(prefix, ?DEFAULT_PREFIX), - Tags = tags(get_conf(tags, ?DEFAULT_TAGS)), - BatchSize = get_conf(batch_size, ?DEFAULT_BATCH_SIZE), - [{host, Host}, {port, Port}, {prefix, Prefix}, {tags, Tags}, {batch_size, BatchSize}]. - -tags(Map) -> - Tags = maps:to_list(Map), - [{atom_to_binary(Key, utf8), Value} || {Key, Value} <- Tags]. - -emqx_statsd_child_spec(Pid) -> - #{id => emqx_statsd - , start => {emqx_statsd, start_link, [[{estatsd_pid, Pid} | emqx_statsd_options()]]} - , restart => permanent - , shutdown => 5000 - , type => worker - , modules => [emqx_statsd]}. - -emqx_statsd_options() -> - SampleTimeInterval = get_conf(sample_time_interval, ?DEFAULT_SAMPLE_TIME_INTERVAL) * 1000, - FlushTimeInterval = get_conf(flush_time_interval, ?DEFAULT_FLUSH_TIME_INTERVAL) * 1000, - [{sample_time_interval, SampleTimeInterval}, {flush_time_interval, FlushTimeInterval}]. - -get_conf(Key, Default) -> - emqx_config:get([?APP, Key], Default). diff --git a/apps/emqx_statsd/test/emqx_statsd_SUITE.erl b/apps/emqx_statsd/test/emqx_statsd_SUITE.erl index 2855e8ee5..9d06ee351 100644 --- a/apps/emqx_statsd/test/emqx_statsd_SUITE.erl +++ b/apps/emqx_statsd/test/emqx_statsd_SUITE.erl @@ -13,7 +13,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([emqx_statsd]). -all() -> +all() -> emqx_ct:all(?MODULE). t_statsd(_) -> From f42a26d862a8e28ec9fda3dacb57001f1d0394f3 Mon Sep 17 00:00:00 2001 From: Turtle Date: Fri, 23 Jul 2021 17:37:10 +0800 Subject: [PATCH 2/4] feat(statsd): add statsd http API --- apps/emqx/src/emqx.erl | 3 +- apps/emqx_statsd/etc/emqx_statsd.conf | 7 +- apps/emqx_statsd/include/emqx_statsd.hrl | 8 +- apps/emqx_statsd/src/emqx_statsd.erl | 10 +- apps/emqx_statsd/src/emqx_statsd_api.erl | 108 ++++++++++++++++++++ apps/emqx_statsd/src/emqx_statsd_app.erl | 15 ++- apps/emqx_statsd/src/emqx_statsd_schema.erl | 55 +++++----- apps/emqx_statsd/src/emqx_statsd_sup.erl | 51 ++++++--- 8 files changed, 196 insertions(+), 61 deletions(-) create mode 100644 apps/emqx_statsd/src/emqx_statsd_api.erl diff --git a/apps/emqx/src/emqx.erl b/apps/emqx/src/emqx.erl index 82688017a..2b34b4845 100644 --- a/apps/emqx/src/emqx.erl +++ b/apps/emqx/src/emqx.erl @@ -263,5 +263,4 @@ emqx_feature() -> , emqx_bridge_mqtt , emqx_modules , emqx_management - , emqx_retainer - , emqx_statsd]. + , emqx_retainer]. diff --git a/apps/emqx_statsd/etc/emqx_statsd.conf b/apps/emqx_statsd/etc/emqx_statsd.conf index a2daa5521..994986898 100644 --- a/apps/emqx_statsd/etc/emqx_statsd.conf +++ b/apps/emqx_statsd/etc/emqx_statsd.conf @@ -3,11 +3,8 @@ ##-------------------------------------------------------------------- emqx_statsd:{ - host: "127.0.0.1" - port: 8125 - batch_size: 10 - prefix: "emqx" - tags: {"from": "emqx"} + enable: true + server: "127.0.0.1:8125" sample_time_interval: "10s" flush_time_interval: "10s" } diff --git a/apps/emqx_statsd/include/emqx_statsd.hrl b/apps/emqx_statsd/include/emqx_statsd.hrl index 9e44c9fa6..52f8774c0 100644 --- a/apps/emqx_statsd/include/emqx_statsd.hrl +++ b/apps/emqx_statsd/include/emqx_statsd.hrl @@ -1,9 +1,5 @@ -define(APP, emqx_statsd). - --define(DEFAULT_HOST, {127, 0, 0, 1}). --define(DEFAULT_PORT, 8125). --define(DEFAULT_PREFIX, undefined). --define(DEFAULT_TAGS, #{}). --define(DEFAULT_BATCH_SIZE, 10). -define(DEFAULT_SAMPLE_TIME_INTERVAL, 10000). -define(DEFAULT_FLUSH_TIME_INTERVAL, 10000). +-define(DEFAULT_HOST, "127.0.0.1"). +-define(DEFAULT_PORT, 8125). diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 6301e4563..892731a6c 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -49,11 +49,15 @@ start_link(Opts) -> init([Opts]) -> process_flag(trap_exit, true), - Tags = tags(maps:get(tags, Opts, ?DEFAULT_TAGS)), + Tags = tags(maps:get(tags, Opts, #{})), + {Host, Port} = maps:get(server, Opts, {?DEFAULT_HOST, ?DEFAULT_PORT}), Opts1 = maps:without([sample_time_interval, - flush_time_interval], Opts#{tags => Tags}), + flush_time_interval], Opts#{tags => Tags, + host => Host, + port => Port, + prefix => <<"emqx">>}), {ok, Pid} = estatsd:start_link(maps:to_list(Opts1)), - SampleTimeInterval = maps:get(sample_time_interval, Opts, ?DEFAULT_SAMPLE_TIME_INTERVAL), + SampleTimeInterval = maps:get(sample_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL), FlushTimeInterval = maps:get(flush_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL), {ok, ensure_timer(#state{sample_time_interval = SampleTimeInterval, flush_time_interval = FlushTimeInterval, diff --git a/apps/emqx_statsd/src/emqx_statsd_api.erl b/apps/emqx_statsd/src/emqx_statsd_api.erl new file mode 100644 index 000000000..13391646a --- /dev/null +++ b/apps/emqx_statsd/src/emqx_statsd_api.erl @@ -0,0 +1,108 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_statsd_api). + +-behaviour(minirest_api). + +-include("emqx_statsd.hrl"). + +-import(emqx_mgmt_util, [ response_schema/1 + , response_schema/2 + , request_body_schema/1 + , request_body_schema/2 + ]). + +-export([api_spec/0]). + +-export([ statsd/2 + ]). + +api_spec() -> + {statsd_api(), schemas()}. + +schemas() -> + [#{statsd => #{ + type => object, + properties => #{ + server => #{ + type => string, + description => <<"Statsd Server">>, + example => get_raw(<<"server">>, <<"127.0.0.1:8125">>)}, + enable => #{ + type => boolean, + description => <<"Statsd status">>, + example => get_raw(<<"enable">>, false)}, + sample_time_interval => #{ + type => string, + description => <<"Sample Time Interval">>, + example => get_raw(<<"sample_time_interval">>, <<"10s">>)}, + flush_time_interval => #{ + type => string, + description => <<"Flush Time Interval">>, + example => get_raw(<<"flush_time_interval">>, <<"10s">>)} + } + }}]. + +statsd_api() -> + Metadata = #{ + get => #{ + description => <<"Get statsd info">>, + responses => #{ + <<"200">> => response_schema(<<"statsd">>) + } + }, + put => #{ + description => <<"Update Statsd">>, + 'requestBody' => request_body_schema(<<"statsd">>), + responses => #{ + <<"200">> => + response_schema(<<"Update Statsd successfully">>), + <<"400">> => + response_schema(<<"Bad Request">>, #{ + type => object, + properties => #{ + message => #{type => string}, + code => #{type => string} + } + }) + } + } + }, + [{"/statsd", Metadata, statsd}]. + +statsd(get, _Request) -> + Response = emqx_config:get_raw([<<"emqx_statsd">>], #{}), + {200, Response}; + +statsd(put, Request) -> + {ok, Body, _} = cowboy_req:read_body(Request), + Params = emqx_json:decode(Body, [return_maps]), + Enable = maps:get(<<"enable">>, Params), + ok = emqx_config:update_config([emqx_statsd], Params), + enable_statsd(Enable). + +enable_statsd(true) -> + ok = emqx_statsd_sup:stop_child(?APP), + emqx_statsd_sup:start_child(?APP, emqx_config:get([?APP], #{})), + {200}; +enable_statsd(false) -> + ok = emqx_statsd_sup:stop_child(?APP), + {200}. + + +get_raw(Key, Def) -> + emqx_config:get_raw([<<"emqx_statsd">>]++ [Key], Def). diff --git a/apps/emqx_statsd/src/emqx_statsd_app.erl b/apps/emqx_statsd/src/emqx_statsd_app.erl index 34e7ffd08..a5054a16c 100644 --- a/apps/emqx_statsd/src/emqx_statsd_app.erl +++ b/apps/emqx_statsd/src/emqx_statsd_app.erl @@ -18,12 +18,23 @@ -behaviour(application). +-include("emqx_statsd.hrl"). + -export([ start/2 , stop/1 ]). start(_StartType, _StartArgs) -> - emqx_statsd_sup:start_link(). - + {ok, Sup} = emqx_statsd_sup:start_link(), + maybe_enable_statsd(), + {ok, Sup}. stop(_) -> ok. + +maybe_enable_statsd() -> + case emqx_config:get([?APP, enable], false) of + true -> + emqx_statsd_sup:start_child(emqx_statsd, emqx_config:get([?APP], #{})); + false -> + ok + end. diff --git a/apps/emqx_statsd/src/emqx_statsd_schema.erl b/apps/emqx_statsd/src/emqx_statsd_schema.erl index 5600739e7..bbc7eedc0 100644 --- a/apps/emqx_statsd/src/emqx_statsd_schema.erl +++ b/apps/emqx_statsd/src/emqx_statsd_schema.erl @@ -4,41 +4,38 @@ -behaviour(hocon_schema). +-export([to_ip_port/1]). + -export([ structs/0 , fields/1]). +-typerefl_from_string({ip_port/0, emqx_statsd_schema, to_ip_port}). + structs() -> ["emqx_statsd"]. fields("emqx_statsd") -> - [ {host, fun host/1} - , {port, fun port/1} - , {prefix, fun prefix/1} - , {tags, map()} - , {batch_size, fun batch_size/1} - , {sample_time_interval, fun duration_s/1} - , {flush_time_interval, fun duration_s/1}]. + [ {enable, emqx_schema:t(boolean(), undefined, false)} + , {server, fun server/1} + , {sample_time_interval, fun duration_ms/1} + , {flush_time_interval, fun duration_ms/1} + ]. -host(type) -> string(); -host(default) -> "127.0.0.1"; -host(nullable) -> false; -host(_) -> undefined. +server(type) -> emqx_schema:ip_port(); +server(default) -> "127.0.0.1:8125"; +server(nullable) -> false; +server(_) -> undefined. -port(type) -> integer(); -port(default) -> 8125; -port(nullable) -> true; -port(_) -> undefined. +duration_ms(type) -> emqx_schema:duration_ms(); +duration_ms(nullable) -> false; +duration_ms(default) -> "10s"; +duration_ms(_) -> undefined. -prefix(type) -> string(); -prefix(default) -> "emqx"; -prefix(nullable) -> true; -prefix(_) -> undefined. - -batch_size(type) -> integer(); -batch_size(nullable) -> false; -batch_size(default) -> 10; -batch_size(_) -> undefined. - -duration_s(type) -> emqx_schema:duration_s(); -duration_s(nullable) -> false; -duration_s(default) -> "10s"; -duration_s(_) -> undefined. +to_ip_port(Str) -> + case string:tokens(Str, ":") of + [Ip, Port] -> + case inet:parse_address(Ip) of + {ok, R} -> {ok, {R, list_to_integer(Port)}}; + _ -> {error, Str} + end; + _ -> {error, Str} + end. diff --git a/apps/emqx_statsd/src/emqx_statsd_sup.erl b/apps/emqx_statsd/src/emqx_statsd_sup.erl index 84b36d323..02b50e3ca 100644 --- a/apps/emqx_statsd/src/emqx_statsd_sup.erl +++ b/apps/emqx_statsd/src/emqx_statsd_sup.erl @@ -7,25 +7,48 @@ -behaviour(supervisor). --include("emqx_statsd.hrl"). - --export([start_link/0]). +-export([ start_link/0 + , start_child/1 + , start_child/2 + , stop_child/1 + ]). -export([init/1]). +%% Helper macro for declaring children of supervisor +-define(CHILD(Mod, Opts), #{id => Mod, + start => {Mod, start_link, [Opts]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [Mod]}). - start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +-spec start_child(supervisor:child_spec()) -> ok. +start_child(ChildSpec) when is_map(ChildSpec) -> + assert_started(supervisor:start_child(?MODULE, ChildSpec)). + +-spec start_child(atom(), map()) -> ok. +start_child(Mod, Opts) when is_atom(Mod) andalso is_map(Opts) -> + assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Opts))). + +-spec(stop_child(any()) -> ok | {error, term()}). +stop_child(ChildId) -> + case supervisor:terminate_child(?MODULE, ChildId) of + ok -> supervisor:delete_child(?MODULE, ChildId); + Error -> Error + end. init([]) -> - Opts = emqx_config:get([?APP], #{}), - {ok, {{one_for_one, 10, 3600}, - [#{id => emqx_statsd, - start => {emqx_statsd, start_link, [Opts]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_statsd]}]}}. - + {ok, {{one_for_one, 10, 3600}, []}}. +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +assert_started({ok, _Pid}) -> ok; +assert_started({ok, _Pid, _Info}) -> ok; +assert_started({error, {already_tarted, _Pid}}) -> ok; +assert_started({error, Reason}) -> erlang:error(Reason). From 1776acee8a6d7be44d03d849f61b916756adb2cc Mon Sep 17 00:00:00 2001 From: Turtle Date: Thu, 22 Jul 2021 18:08:55 +0800 Subject: [PATCH 3/4] refactor(statsd): optimize default value --- apps/emqx_statsd/include/emqx_statsd.hrl | 4 +- apps/emqx_statsd/src/emqx_statsd.app.src | 2 +- apps/emqx_statsd/src/emqx_statsd.erl | 179 +++++++++++--------- apps/emqx_statsd/src/emqx_statsd_app.erl | 11 +- apps/emqx_statsd/src/emqx_statsd_sup.erl | 56 +----- apps/emqx_statsd/test/emqx_statsd_SUITE.erl | 2 +- 6 files changed, 111 insertions(+), 143 deletions(-) diff --git a/apps/emqx_statsd/include/emqx_statsd.hrl b/apps/emqx_statsd/include/emqx_statsd.hrl index d88dbccbd..9e44c9fa6 100644 --- a/apps/emqx_statsd/include/emqx_statsd.hrl +++ b/apps/emqx_statsd/include/emqx_statsd.hrl @@ -5,5 +5,5 @@ -define(DEFAULT_PREFIX, undefined). -define(DEFAULT_TAGS, #{}). -define(DEFAULT_BATCH_SIZE, 10). --define(DEFAULT_SAMPLE_TIME_INTERVAL, 10). --define(DEFAULT_FLUSH_TIME_INTERVAL, 10). \ No newline at end of file +-define(DEFAULT_SAMPLE_TIME_INTERVAL, 10000). +-define(DEFAULT_FLUSH_TIME_INTERVAL, 10000). diff --git a/apps/emqx_statsd/src/emqx_statsd.app.src b/apps/emqx_statsd/src/emqx_statsd.app.src index 04338fd62..2885f4dc9 100644 --- a/apps/emqx_statsd/src/emqx_statsd.app.src +++ b/apps/emqx_statsd/src/emqx_statsd.app.src @@ -1,6 +1,6 @@ {application, emqx_statsd, [{description, "An OTP application"}, - {vsn, "0.1.0"}, + {vsn, "5.0.0"}, {registered, []}, {mod, {emqx_statsd_app, []}}, {applications, diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 3a73de664..6301e4563 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -1,100 +1,115 @@ %%-------------------------------------------------------------------- - %% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. - %% - %% Licensed under the Apache License, Version 2.0 (the "License"); - %% you may not use this file except in compliance with the License. - %% You may obtain a copy of the License at - %% - %% http://www.apache.org/licenses/LICENSE-2.0 - %% - %% Unless required by applicable law or agreed to in writing, software - %% distributed under the License is distributed on an "AS IS" BASIS, - %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - %% See the License for the specific language governing permissions and - %% limitations under the License. - %%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- - -module(emqx_statsd). +-module(emqx_statsd). - -behaviour(gen_server). +-behaviour(gen_server). - -ifdef(TEST). - -compile(export_all). - -compile(nowarn_export_all). - -endif. +-ifdef(TEST). +-compile(export_all). +-compile(nowarn_export_all). +-endif. - -include_lib("emqx/include/logger.hrl"). +-include("emqx_statsd.hrl"). - %% Interface - -export([start_link/1]). +%% Interface +-export([start_link/1]). - %% Internal Exports - -export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , code_change/3 - , terminate/2 - ]). +%% Internal Exports +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , code_change/3 + , terminate/2 + ]). - -record(state, { - timer :: reference(), - sample_time_interval :: pos_integer(), - flush_time_interval :: pos_integer(), - estatsd_pid :: pid() - }). +-record(state, { + timer :: reference() | undefined, + sample_time_interval :: pos_integer(), + flush_time_interval :: pos_integer(), + estatsd_pid :: pid() +}). - start_link(Opts) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). +start_link(Opts) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). - init([Opts]) -> - SampleTimeInterval = proplists:get_value(sample_time_interval, Opts), - FlushTimeInterval = proplists:get_value(flush_time_interval, Opts), - Ref = erlang:start_timer(SampleTimeInterval, self(), sample_timeout), - Pid = proplists:get_value(estatsd_pid, Opts), - {ok, #state{timer = Ref, - sample_time_interval = SampleTimeInterval, - flush_time_interval = FlushTimeInterval, - estatsd_pid = Pid}}. +init([Opts]) -> + process_flag(trap_exit, true), + Tags = tags(maps:get(tags, Opts, ?DEFAULT_TAGS)), + Opts1 = maps:without([sample_time_interval, + flush_time_interval], Opts#{tags => Tags}), + {ok, Pid} = estatsd:start_link(maps:to_list(Opts1)), + SampleTimeInterval = maps:get(sample_time_interval, Opts, ?DEFAULT_SAMPLE_TIME_INTERVAL), + FlushTimeInterval = maps:get(flush_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL), + {ok, ensure_timer(#state{sample_time_interval = SampleTimeInterval, + flush_time_interval = FlushTimeInterval, + estatsd_pid = Pid})}. - handle_call(_Req, _From, State) -> - {noreply, State}. +handle_call(_Req, _From, State) -> + {noreply, State}. - handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast(_Msg, State) -> + {noreply, State}. - handle_info({timeout, Ref, sample_timeout}, State = #state{sample_time_interval = SampleTimeInterval, - flush_time_interval = FlushTimeInterval, - estatsd_pid = Pid, - timer = Ref}) -> - Metrics = emqx_metrics:all() ++ emqx_stats:getstats() ++ emqx_vm_data(), - SampleRate = SampleTimeInterval / FlushTimeInterval, - StatsdMetrics = [{gauge, trans_metrics_name(Name), Value, SampleRate, []} || {Name, Value} <- Metrics], - estatsd:submit(Pid, StatsdMetrics), - {noreply, State#state{timer = erlang:start_timer(SampleTimeInterval, self(), sample_timeout)}}; +handle_info({timeout, Ref, sample_timeout}, + State = #state{sample_time_interval = SampleTimeInterval, + flush_time_interval = FlushTimeInterval, + estatsd_pid = Pid, + timer = Ref}) -> + Metrics = emqx_metrics:all() ++ emqx_stats:getstats() ++ emqx_vm_data(), + SampleRate = SampleTimeInterval / FlushTimeInterval, + StatsdMetrics = [{gauge, trans_metrics_name(Name), Value, SampleRate, []} || {Name, Value} <- Metrics], + estatsd:submit(Pid, StatsdMetrics), + {noreply, ensure_timer(State)}; - handle_info(_Msg, State) -> - {noreply, State}. +handle_info({'EXIT', Pid, Error}, State = #state{estatsd_pid = Pid}) -> + {stop, {shutdown, Error}, State}; - code_change(_OldVsn, State, _Extra) -> - {ok, State}. +handle_info(_Msg, State) -> + {noreply, State}. - terminate(_Reason, _State) -> - ok. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. - %%------------------------------------------------------------------------------ - %% Internale function - %%------------------------------------------------------------------------------ - trans_metrics_name(Name) -> - Name0 = atom_to_binary(Name, utf8), - binary_to_atom(<<"emqx.", Name0/binary>>, utf8). +terminate(_Reason, #state{estatsd_pid = Pid}) -> + estatsd:stop(Pid), + ok. - emqx_vm_data() -> - Idle = case cpu_sup:util([detailed]) of - {_, 0, 0, _} -> 0; %% Not support for Windows - {_Num, _Use, IdleList, _} -> proplists:get_value(idle, IdleList, 0) - end, - RunQueue = erlang:statistics(run_queue), - [{run_queue, RunQueue}, - {cpu_idle, Idle}, - {cpu_use, 100 - Idle}] ++ emqx_vm:mem_info(). +%%------------------------------------------------------------------------------ +%% Internale function +%%------------------------------------------------------------------------------ +trans_metrics_name(Name) -> + Name0 = atom_to_binary(Name, utf8), + binary_to_atom(<<"emqx.", Name0/binary>>, utf8). + +emqx_vm_data() -> + Idle = case cpu_sup:util([detailed]) of + {_, 0, 0, _} -> 0; %% Not support for Windows + {_Num, _Use, IdleList, _} -> proplists:get_value(idle, IdleList, 0) + end, + RunQueue = erlang:statistics(run_queue), + [{run_queue, RunQueue}, + {cpu_idle, Idle}, + {cpu_use, 100 - Idle}] ++ emqx_vm:mem_info(). + +tags(Map) -> + Tags = maps:to_list(Map), + [{atom_to_binary(Key, utf8), Value} || {Key, Value} <- Tags]. + + +ensure_timer(State =#state{sample_time_interval = SampleTimeInterval}) -> + State#state{timer = emqx_misc:start_timer(SampleTimeInterval, sample_timeout)}. diff --git a/apps/emqx_statsd/src/emqx_statsd_app.erl b/apps/emqx_statsd/src/emqx_statsd_app.erl index cd998b158..34e7ffd08 100644 --- a/apps/emqx_statsd/src/emqx_statsd_app.erl +++ b/apps/emqx_statsd/src/emqx_statsd_app.erl @@ -18,21 +18,12 @@ -behaviour(application). --include_lib("emqx/include/logger.hrl"). - - -emqx_plugin(?MODULE). - -export([ start/2 , stop/1 ]). start(_StartType, _StartArgs) -> - {ok, Sup} = emqx_statsd_sup:start_link(), - {ok, _} = emqx_statsd_sup:start_statsd(), - ?LOG(info, "emqx statsd start: successfully"), - {ok, Sup}. + emqx_statsd_sup:start_link(). stop(_) -> - ok = emqx_statsd_sup:stop_statsd(), - ?LOG(info, "emqx statsd stop: successfully"), ok. diff --git a/apps/emqx_statsd/src/emqx_statsd_sup.erl b/apps/emqx_statsd/src/emqx_statsd_sup.erl index e33ea5493..84b36d323 100644 --- a/apps/emqx_statsd/src/emqx_statsd_sup.erl +++ b/apps/emqx_statsd/src/emqx_statsd_sup.erl @@ -11,59 +11,21 @@ -export([start_link/0]). --export([start_statsd/0, stop_statsd/0]). - -export([init/1]). --export([estatsd_options/0]). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). - init([]) -> - {ok, {{one_for_one, 10, 100}, []}}. +init([]) -> + Opts = emqx_config:get([?APP], #{}), + {ok, {{one_for_one, 10, 3600}, + [#{id => emqx_statsd, + start => {emqx_statsd, start_link, [Opts]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_statsd]}]}}. -start_statsd() -> - {ok, Pid} = supervisor:start_child(?MODULE, estatsd_child_spec()), - {ok, _Pid1} = supervisor:start_child(?MODULE, emqx_statsd_child_spec(Pid)). -stop_statsd() -> - ok = supervisor:terminate_child(?MODULE, emqx_statsd), - ok = supervisor:terminate_child(?MODULE, estatsd). -%%============================================================================================== -%% internal -estatsd_child_spec() -> - #{id => estatsd - , start => {estatsd, start_link, [estatsd_options()]} - , restart => permanent - , shutdown => 5000 - , type => worker - , modules => [estatsd]}. -estatsd_options() -> - Host = get_conf(host, ?DEFAULT_HOST), - Port = get_conf(port, ?DEFAULT_PORT), - Prefix = get_conf(prefix, ?DEFAULT_PREFIX), - Tags = tags(get_conf(tags, ?DEFAULT_TAGS)), - BatchSize = get_conf(batch_size, ?DEFAULT_BATCH_SIZE), - [{host, Host}, {port, Port}, {prefix, Prefix}, {tags, Tags}, {batch_size, BatchSize}]. - -tags(Map) -> - Tags = maps:to_list(Map), - [{atom_to_binary(Key, utf8), Value} || {Key, Value} <- Tags]. - -emqx_statsd_child_spec(Pid) -> - #{id => emqx_statsd - , start => {emqx_statsd, start_link, [[{estatsd_pid, Pid} | emqx_statsd_options()]]} - , restart => permanent - , shutdown => 5000 - , type => worker - , modules => [emqx_statsd]}. - -emqx_statsd_options() -> - SampleTimeInterval = get_conf(sample_time_interval, ?DEFAULT_SAMPLE_TIME_INTERVAL) * 1000, - FlushTimeInterval = get_conf(flush_time_interval, ?DEFAULT_FLUSH_TIME_INTERVAL) * 1000, - [{sample_time_interval, SampleTimeInterval}, {flush_time_interval, FlushTimeInterval}]. - -get_conf(Key, Default) -> - emqx_config:get([?APP, Key], Default). diff --git a/apps/emqx_statsd/test/emqx_statsd_SUITE.erl b/apps/emqx_statsd/test/emqx_statsd_SUITE.erl index 2855e8ee5..9d06ee351 100644 --- a/apps/emqx_statsd/test/emqx_statsd_SUITE.erl +++ b/apps/emqx_statsd/test/emqx_statsd_SUITE.erl @@ -13,7 +13,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([emqx_statsd]). -all() -> +all() -> emqx_ct:all(?MODULE). t_statsd(_) -> From 6617ba2772de4f00108f6f87eb5a9ef57d2f5874 Mon Sep 17 00:00:00 2001 From: Turtle Date: Fri, 23 Jul 2021 17:37:10 +0800 Subject: [PATCH 4/4] feat(statsd): add statsd http API --- apps/emqx/src/emqx.erl | 3 +- apps/emqx_statsd/etc/emqx_statsd.conf | 7 +- apps/emqx_statsd/include/emqx_statsd.hrl | 8 +- apps/emqx_statsd/src/emqx_statsd.erl | 10 +- apps/emqx_statsd/src/emqx_statsd_api.erl | 108 ++++++++++++++++++++ apps/emqx_statsd/src/emqx_statsd_app.erl | 15 ++- apps/emqx_statsd/src/emqx_statsd_schema.erl | 55 +++++----- apps/emqx_statsd/src/emqx_statsd_sup.erl | 51 ++++++--- 8 files changed, 196 insertions(+), 61 deletions(-) create mode 100644 apps/emqx_statsd/src/emqx_statsd_api.erl diff --git a/apps/emqx/src/emqx.erl b/apps/emqx/src/emqx.erl index 2fa0e630c..d969012da 100644 --- a/apps/emqx/src/emqx.erl +++ b/apps/emqx/src/emqx.erl @@ -250,5 +250,4 @@ emqx_feature() -> , emqx_bridge_mqtt , emqx_modules , emqx_management - , emqx_retainer - , emqx_statsd]. + , emqx_retainer]. diff --git a/apps/emqx_statsd/etc/emqx_statsd.conf b/apps/emqx_statsd/etc/emqx_statsd.conf index a2daa5521..994986898 100644 --- a/apps/emqx_statsd/etc/emqx_statsd.conf +++ b/apps/emqx_statsd/etc/emqx_statsd.conf @@ -3,11 +3,8 @@ ##-------------------------------------------------------------------- emqx_statsd:{ - host: "127.0.0.1" - port: 8125 - batch_size: 10 - prefix: "emqx" - tags: {"from": "emqx"} + enable: true + server: "127.0.0.1:8125" sample_time_interval: "10s" flush_time_interval: "10s" } diff --git a/apps/emqx_statsd/include/emqx_statsd.hrl b/apps/emqx_statsd/include/emqx_statsd.hrl index 9e44c9fa6..52f8774c0 100644 --- a/apps/emqx_statsd/include/emqx_statsd.hrl +++ b/apps/emqx_statsd/include/emqx_statsd.hrl @@ -1,9 +1,5 @@ -define(APP, emqx_statsd). - --define(DEFAULT_HOST, {127, 0, 0, 1}). --define(DEFAULT_PORT, 8125). --define(DEFAULT_PREFIX, undefined). --define(DEFAULT_TAGS, #{}). --define(DEFAULT_BATCH_SIZE, 10). -define(DEFAULT_SAMPLE_TIME_INTERVAL, 10000). -define(DEFAULT_FLUSH_TIME_INTERVAL, 10000). +-define(DEFAULT_HOST, "127.0.0.1"). +-define(DEFAULT_PORT, 8125). diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 6301e4563..892731a6c 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -49,11 +49,15 @@ start_link(Opts) -> init([Opts]) -> process_flag(trap_exit, true), - Tags = tags(maps:get(tags, Opts, ?DEFAULT_TAGS)), + Tags = tags(maps:get(tags, Opts, #{})), + {Host, Port} = maps:get(server, Opts, {?DEFAULT_HOST, ?DEFAULT_PORT}), Opts1 = maps:without([sample_time_interval, - flush_time_interval], Opts#{tags => Tags}), + flush_time_interval], Opts#{tags => Tags, + host => Host, + port => Port, + prefix => <<"emqx">>}), {ok, Pid} = estatsd:start_link(maps:to_list(Opts1)), - SampleTimeInterval = maps:get(sample_time_interval, Opts, ?DEFAULT_SAMPLE_TIME_INTERVAL), + SampleTimeInterval = maps:get(sample_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL), FlushTimeInterval = maps:get(flush_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL), {ok, ensure_timer(#state{sample_time_interval = SampleTimeInterval, flush_time_interval = FlushTimeInterval, diff --git a/apps/emqx_statsd/src/emqx_statsd_api.erl b/apps/emqx_statsd/src/emqx_statsd_api.erl new file mode 100644 index 000000000..13391646a --- /dev/null +++ b/apps/emqx_statsd/src/emqx_statsd_api.erl @@ -0,0 +1,108 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_statsd_api). + +-behaviour(minirest_api). + +-include("emqx_statsd.hrl"). + +-import(emqx_mgmt_util, [ response_schema/1 + , response_schema/2 + , request_body_schema/1 + , request_body_schema/2 + ]). + +-export([api_spec/0]). + +-export([ statsd/2 + ]). + +api_spec() -> + {statsd_api(), schemas()}. + +schemas() -> + [#{statsd => #{ + type => object, + properties => #{ + server => #{ + type => string, + description => <<"Statsd Server">>, + example => get_raw(<<"server">>, <<"127.0.0.1:8125">>)}, + enable => #{ + type => boolean, + description => <<"Statsd status">>, + example => get_raw(<<"enable">>, false)}, + sample_time_interval => #{ + type => string, + description => <<"Sample Time Interval">>, + example => get_raw(<<"sample_time_interval">>, <<"10s">>)}, + flush_time_interval => #{ + type => string, + description => <<"Flush Time Interval">>, + example => get_raw(<<"flush_time_interval">>, <<"10s">>)} + } + }}]. + +statsd_api() -> + Metadata = #{ + get => #{ + description => <<"Get statsd info">>, + responses => #{ + <<"200">> => response_schema(<<"statsd">>) + } + }, + put => #{ + description => <<"Update Statsd">>, + 'requestBody' => request_body_schema(<<"statsd">>), + responses => #{ + <<"200">> => + response_schema(<<"Update Statsd successfully">>), + <<"400">> => + response_schema(<<"Bad Request">>, #{ + type => object, + properties => #{ + message => #{type => string}, + code => #{type => string} + } + }) + } + } + }, + [{"/statsd", Metadata, statsd}]. + +statsd(get, _Request) -> + Response = emqx_config:get_raw([<<"emqx_statsd">>], #{}), + {200, Response}; + +statsd(put, Request) -> + {ok, Body, _} = cowboy_req:read_body(Request), + Params = emqx_json:decode(Body, [return_maps]), + Enable = maps:get(<<"enable">>, Params), + ok = emqx_config:update_config([emqx_statsd], Params), + enable_statsd(Enable). + +enable_statsd(true) -> + ok = emqx_statsd_sup:stop_child(?APP), + emqx_statsd_sup:start_child(?APP, emqx_config:get([?APP], #{})), + {200}; +enable_statsd(false) -> + ok = emqx_statsd_sup:stop_child(?APP), + {200}. + + +get_raw(Key, Def) -> + emqx_config:get_raw([<<"emqx_statsd">>]++ [Key], Def). diff --git a/apps/emqx_statsd/src/emqx_statsd_app.erl b/apps/emqx_statsd/src/emqx_statsd_app.erl index 34e7ffd08..a5054a16c 100644 --- a/apps/emqx_statsd/src/emqx_statsd_app.erl +++ b/apps/emqx_statsd/src/emqx_statsd_app.erl @@ -18,12 +18,23 @@ -behaviour(application). +-include("emqx_statsd.hrl"). + -export([ start/2 , stop/1 ]). start(_StartType, _StartArgs) -> - emqx_statsd_sup:start_link(). - + {ok, Sup} = emqx_statsd_sup:start_link(), + maybe_enable_statsd(), + {ok, Sup}. stop(_) -> ok. + +maybe_enable_statsd() -> + case emqx_config:get([?APP, enable], false) of + true -> + emqx_statsd_sup:start_child(emqx_statsd, emqx_config:get([?APP], #{})); + false -> + ok + end. diff --git a/apps/emqx_statsd/src/emqx_statsd_schema.erl b/apps/emqx_statsd/src/emqx_statsd_schema.erl index 5600739e7..bbc7eedc0 100644 --- a/apps/emqx_statsd/src/emqx_statsd_schema.erl +++ b/apps/emqx_statsd/src/emqx_statsd_schema.erl @@ -4,41 +4,38 @@ -behaviour(hocon_schema). +-export([to_ip_port/1]). + -export([ structs/0 , fields/1]). +-typerefl_from_string({ip_port/0, emqx_statsd_schema, to_ip_port}). + structs() -> ["emqx_statsd"]. fields("emqx_statsd") -> - [ {host, fun host/1} - , {port, fun port/1} - , {prefix, fun prefix/1} - , {tags, map()} - , {batch_size, fun batch_size/1} - , {sample_time_interval, fun duration_s/1} - , {flush_time_interval, fun duration_s/1}]. + [ {enable, emqx_schema:t(boolean(), undefined, false)} + , {server, fun server/1} + , {sample_time_interval, fun duration_ms/1} + , {flush_time_interval, fun duration_ms/1} + ]. -host(type) -> string(); -host(default) -> "127.0.0.1"; -host(nullable) -> false; -host(_) -> undefined. +server(type) -> emqx_schema:ip_port(); +server(default) -> "127.0.0.1:8125"; +server(nullable) -> false; +server(_) -> undefined. -port(type) -> integer(); -port(default) -> 8125; -port(nullable) -> true; -port(_) -> undefined. +duration_ms(type) -> emqx_schema:duration_ms(); +duration_ms(nullable) -> false; +duration_ms(default) -> "10s"; +duration_ms(_) -> undefined. -prefix(type) -> string(); -prefix(default) -> "emqx"; -prefix(nullable) -> true; -prefix(_) -> undefined. - -batch_size(type) -> integer(); -batch_size(nullable) -> false; -batch_size(default) -> 10; -batch_size(_) -> undefined. - -duration_s(type) -> emqx_schema:duration_s(); -duration_s(nullable) -> false; -duration_s(default) -> "10s"; -duration_s(_) -> undefined. +to_ip_port(Str) -> + case string:tokens(Str, ":") of + [Ip, Port] -> + case inet:parse_address(Ip) of + {ok, R} -> {ok, {R, list_to_integer(Port)}}; + _ -> {error, Str} + end; + _ -> {error, Str} + end. diff --git a/apps/emqx_statsd/src/emqx_statsd_sup.erl b/apps/emqx_statsd/src/emqx_statsd_sup.erl index 84b36d323..02b50e3ca 100644 --- a/apps/emqx_statsd/src/emqx_statsd_sup.erl +++ b/apps/emqx_statsd/src/emqx_statsd_sup.erl @@ -7,25 +7,48 @@ -behaviour(supervisor). --include("emqx_statsd.hrl"). - --export([start_link/0]). +-export([ start_link/0 + , start_child/1 + , start_child/2 + , stop_child/1 + ]). -export([init/1]). +%% Helper macro for declaring children of supervisor +-define(CHILD(Mod, Opts), #{id => Mod, + start => {Mod, start_link, [Opts]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [Mod]}). - start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +-spec start_child(supervisor:child_spec()) -> ok. +start_child(ChildSpec) when is_map(ChildSpec) -> + assert_started(supervisor:start_child(?MODULE, ChildSpec)). + +-spec start_child(atom(), map()) -> ok. +start_child(Mod, Opts) when is_atom(Mod) andalso is_map(Opts) -> + assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Opts))). + +-spec(stop_child(any()) -> ok | {error, term()}). +stop_child(ChildId) -> + case supervisor:terminate_child(?MODULE, ChildId) of + ok -> supervisor:delete_child(?MODULE, ChildId); + Error -> Error + end. init([]) -> - Opts = emqx_config:get([?APP], #{}), - {ok, {{one_for_one, 10, 3600}, - [#{id => emqx_statsd, - start => {emqx_statsd, start_link, [Opts]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_statsd]}]}}. - + {ok, {{one_for_one, 10, 3600}, []}}. +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +assert_started({ok, _Pid}) -> ok; +assert_started({ok, _Pid, _Info}) -> ok; +assert_started({error, {already_tarted, _Pid}}) -> ok; +assert_started({error, Reason}) -> erlang:error(Reason).