diff --git a/apps/emqx/src/emqx.erl b/apps/emqx/src/emqx.erl index 2fa0e630c..d969012da 100644 --- a/apps/emqx/src/emqx.erl +++ b/apps/emqx/src/emqx.erl @@ -250,5 +250,4 @@ emqx_feature() -> , emqx_bridge_mqtt , emqx_modules , emqx_management - , emqx_retainer - , emqx_statsd]. + , emqx_retainer]. diff --git a/apps/emqx_statsd/etc/emqx_statsd.conf b/apps/emqx_statsd/etc/emqx_statsd.conf index a2daa5521..994986898 100644 --- a/apps/emqx_statsd/etc/emqx_statsd.conf +++ b/apps/emqx_statsd/etc/emqx_statsd.conf @@ -3,11 +3,8 @@ ##-------------------------------------------------------------------- emqx_statsd:{ - host: "127.0.0.1" - port: 8125 - batch_size: 10 - prefix: "emqx" - tags: {"from": "emqx"} + enable: true + server: "127.0.0.1:8125" sample_time_interval: "10s" flush_time_interval: "10s" } diff --git a/apps/emqx_statsd/include/emqx_statsd.hrl b/apps/emqx_statsd/include/emqx_statsd.hrl index 9e44c9fa6..52f8774c0 100644 --- a/apps/emqx_statsd/include/emqx_statsd.hrl +++ b/apps/emqx_statsd/include/emqx_statsd.hrl @@ -1,9 +1,5 @@ -define(APP, emqx_statsd). - --define(DEFAULT_HOST, {127, 0, 0, 1}). --define(DEFAULT_PORT, 8125). --define(DEFAULT_PREFIX, undefined). --define(DEFAULT_TAGS, #{}). --define(DEFAULT_BATCH_SIZE, 10). -define(DEFAULT_SAMPLE_TIME_INTERVAL, 10000). -define(DEFAULT_FLUSH_TIME_INTERVAL, 10000). +-define(DEFAULT_HOST, "127.0.0.1"). +-define(DEFAULT_PORT, 8125). diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 6301e4563..892731a6c 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -49,11 +49,15 @@ start_link(Opts) -> init([Opts]) -> process_flag(trap_exit, true), - Tags = tags(maps:get(tags, Opts, ?DEFAULT_TAGS)), + Tags = tags(maps:get(tags, Opts, #{})), + {Host, Port} = maps:get(server, Opts, {?DEFAULT_HOST, ?DEFAULT_PORT}), Opts1 = maps:without([sample_time_interval, - flush_time_interval], Opts#{tags => Tags}), + flush_time_interval], Opts#{tags => Tags, + host => Host, + port => Port, + prefix => <<"emqx">>}), {ok, Pid} = estatsd:start_link(maps:to_list(Opts1)), - SampleTimeInterval = maps:get(sample_time_interval, Opts, ?DEFAULT_SAMPLE_TIME_INTERVAL), + SampleTimeInterval = maps:get(sample_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL), FlushTimeInterval = maps:get(flush_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL), {ok, ensure_timer(#state{sample_time_interval = SampleTimeInterval, flush_time_interval = FlushTimeInterval, diff --git a/apps/emqx_statsd/src/emqx_statsd_api.erl b/apps/emqx_statsd/src/emqx_statsd_api.erl new file mode 100644 index 000000000..13391646a --- /dev/null +++ b/apps/emqx_statsd/src/emqx_statsd_api.erl @@ -0,0 +1,108 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_statsd_api). + +-behaviour(minirest_api). + +-include("emqx_statsd.hrl"). + +-import(emqx_mgmt_util, [ response_schema/1 + , response_schema/2 + , request_body_schema/1 + , request_body_schema/2 + ]). + +-export([api_spec/0]). + +-export([ statsd/2 + ]). + +api_spec() -> + {statsd_api(), schemas()}. + +schemas() -> + [#{statsd => #{ + type => object, + properties => #{ + server => #{ + type => string, + description => <<"Statsd Server">>, + example => get_raw(<<"server">>, <<"127.0.0.1:8125">>)}, + enable => #{ + type => boolean, + description => <<"Statsd status">>, + example => get_raw(<<"enable">>, false)}, + sample_time_interval => #{ + type => string, + description => <<"Sample Time Interval">>, + example => get_raw(<<"sample_time_interval">>, <<"10s">>)}, + flush_time_interval => #{ + type => string, + description => <<"Flush Time Interval">>, + example => get_raw(<<"flush_time_interval">>, <<"10s">>)} + } + }}]. + +statsd_api() -> + Metadata = #{ + get => #{ + description => <<"Get statsd info">>, + responses => #{ + <<"200">> => response_schema(<<"statsd">>) + } + }, + put => #{ + description => <<"Update Statsd">>, + 'requestBody' => request_body_schema(<<"statsd">>), + responses => #{ + <<"200">> => + response_schema(<<"Update Statsd successfully">>), + <<"400">> => + response_schema(<<"Bad Request">>, #{ + type => object, + properties => #{ + message => #{type => string}, + code => #{type => string} + } + }) + } + } + }, + [{"/statsd", Metadata, statsd}]. + +statsd(get, _Request) -> + Response = emqx_config:get_raw([<<"emqx_statsd">>], #{}), + {200, Response}; + +statsd(put, Request) -> + {ok, Body, _} = cowboy_req:read_body(Request), + Params = emqx_json:decode(Body, [return_maps]), + Enable = maps:get(<<"enable">>, Params), + ok = emqx_config:update_config([emqx_statsd], Params), + enable_statsd(Enable). + +enable_statsd(true) -> + ok = emqx_statsd_sup:stop_child(?APP), + emqx_statsd_sup:start_child(?APP, emqx_config:get([?APP], #{})), + {200}; +enable_statsd(false) -> + ok = emqx_statsd_sup:stop_child(?APP), + {200}. + + +get_raw(Key, Def) -> + emqx_config:get_raw([<<"emqx_statsd">>]++ [Key], Def). diff --git a/apps/emqx_statsd/src/emqx_statsd_app.erl b/apps/emqx_statsd/src/emqx_statsd_app.erl index 34e7ffd08..a5054a16c 100644 --- a/apps/emqx_statsd/src/emqx_statsd_app.erl +++ b/apps/emqx_statsd/src/emqx_statsd_app.erl @@ -18,12 +18,23 @@ -behaviour(application). +-include("emqx_statsd.hrl"). + -export([ start/2 , stop/1 ]). start(_StartType, _StartArgs) -> - emqx_statsd_sup:start_link(). - + {ok, Sup} = emqx_statsd_sup:start_link(), + maybe_enable_statsd(), + {ok, Sup}. stop(_) -> ok. + +maybe_enable_statsd() -> + case emqx_config:get([?APP, enable], false) of + true -> + emqx_statsd_sup:start_child(emqx_statsd, emqx_config:get([?APP], #{})); + false -> + ok + end. diff --git a/apps/emqx_statsd/src/emqx_statsd_schema.erl b/apps/emqx_statsd/src/emqx_statsd_schema.erl index 5600739e7..bbc7eedc0 100644 --- a/apps/emqx_statsd/src/emqx_statsd_schema.erl +++ b/apps/emqx_statsd/src/emqx_statsd_schema.erl @@ -4,41 +4,38 @@ -behaviour(hocon_schema). +-export([to_ip_port/1]). + -export([ structs/0 , fields/1]). +-typerefl_from_string({ip_port/0, emqx_statsd_schema, to_ip_port}). + structs() -> ["emqx_statsd"]. fields("emqx_statsd") -> - [ {host, fun host/1} - , {port, fun port/1} - , {prefix, fun prefix/1} - , {tags, map()} - , {batch_size, fun batch_size/1} - , {sample_time_interval, fun duration_s/1} - , {flush_time_interval, fun duration_s/1}]. + [ {enable, emqx_schema:t(boolean(), undefined, false)} + , {server, fun server/1} + , {sample_time_interval, fun duration_ms/1} + , {flush_time_interval, fun duration_ms/1} + ]. -host(type) -> string(); -host(default) -> "127.0.0.1"; -host(nullable) -> false; -host(_) -> undefined. +server(type) -> emqx_schema:ip_port(); +server(default) -> "127.0.0.1:8125"; +server(nullable) -> false; +server(_) -> undefined. -port(type) -> integer(); -port(default) -> 8125; -port(nullable) -> true; -port(_) -> undefined. +duration_ms(type) -> emqx_schema:duration_ms(); +duration_ms(nullable) -> false; +duration_ms(default) -> "10s"; +duration_ms(_) -> undefined. -prefix(type) -> string(); -prefix(default) -> "emqx"; -prefix(nullable) -> true; -prefix(_) -> undefined. - -batch_size(type) -> integer(); -batch_size(nullable) -> false; -batch_size(default) -> 10; -batch_size(_) -> undefined. - -duration_s(type) -> emqx_schema:duration_s(); -duration_s(nullable) -> false; -duration_s(default) -> "10s"; -duration_s(_) -> undefined. +to_ip_port(Str) -> + case string:tokens(Str, ":") of + [Ip, Port] -> + case inet:parse_address(Ip) of + {ok, R} -> {ok, {R, list_to_integer(Port)}}; + _ -> {error, Str} + end; + _ -> {error, Str} + end. diff --git a/apps/emqx_statsd/src/emqx_statsd_sup.erl b/apps/emqx_statsd/src/emqx_statsd_sup.erl index 84b36d323..02b50e3ca 100644 --- a/apps/emqx_statsd/src/emqx_statsd_sup.erl +++ b/apps/emqx_statsd/src/emqx_statsd_sup.erl @@ -7,25 +7,48 @@ -behaviour(supervisor). --include("emqx_statsd.hrl"). - --export([start_link/0]). +-export([ start_link/0 + , start_child/1 + , start_child/2 + , stop_child/1 + ]). -export([init/1]). +%% Helper macro for declaring children of supervisor +-define(CHILD(Mod, Opts), #{id => Mod, + start => {Mod, start_link, [Opts]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [Mod]}). - start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +-spec start_child(supervisor:child_spec()) -> ok. +start_child(ChildSpec) when is_map(ChildSpec) -> + assert_started(supervisor:start_child(?MODULE, ChildSpec)). + +-spec start_child(atom(), map()) -> ok. +start_child(Mod, Opts) when is_atom(Mod) andalso is_map(Opts) -> + assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Opts))). + +-spec(stop_child(any()) -> ok | {error, term()}). +stop_child(ChildId) -> + case supervisor:terminate_child(?MODULE, ChildId) of + ok -> supervisor:delete_child(?MODULE, ChildId); + Error -> Error + end. init([]) -> - Opts = emqx_config:get([?APP], #{}), - {ok, {{one_for_one, 10, 3600}, - [#{id => emqx_statsd, - start => {emqx_statsd, start_link, [Opts]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_statsd]}]}}. - + {ok, {{one_for_one, 10, 3600}, []}}. +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +assert_started({ok, _Pid}) -> ok; +assert_started({ok, _Pid, _Info}) -> ok; +assert_started({error, {already_tarted, _Pid}}) -> ok; +assert_started({error, Reason}) -> erlang:error(Reason).