diff --git a/apps/emqx_statsd/etc/emqx_statsd.conf b/apps/emqx_statsd/etc/emqx_statsd.conf index 27c08d343..c714bbb2e 100644 --- a/apps/emqx_statsd/etc/emqx_statsd.conf +++ b/apps/emqx_statsd/etc/emqx_statsd.conf @@ -2,41 +2,12 @@ ## Statsd for EMQ X ##-------------------------------------------------------------------- - ## The statsd server host - ## - ## Default: "127.0.0.1" - statsd.host = "127.0.0.1" - - ## The statsd server port - ## - ## Default: "127.0.0.1" - statsd.port = 8125 - - ## statsd prefix - ## - ## Default: emqx - # statsd.prefix = emqx - - ## statsd tag key - ## - # statsd.tag.1.key = from - - ## statsd tag value - ## - # statsd.tag.1.value = emqx - - ## statsd batch_size - ## - ## Default: 10 - statsd.batch_size = 10 - - ## statsd sample time interval - ## - ## Default: 10s - statsd.sample_time_interval = 10s - - ## statsd flush time interval - ## - ## Default: 10s - statsd.flush_time_interval = 10s - \ No newline at end of file +emqx_statsd:{ + # statsd server + server: "127.0.0.1:8125" + batch_size: 10 + prefix: "emqx" + tags: {"from": "emqx"} + sample_time_interval: "10s" + flush_time_interval: "10s" +} diff --git a/apps/emqx_statsd/priv/emqx_statsd.schema b/apps/emqx_statsd/priv/emqx_statsd.schema deleted file mode 100644 index b509b4065..000000000 --- a/apps/emqx_statsd/priv/emqx_statsd.schema +++ /dev/null @@ -1,53 +0,0 @@ -%% emqx_statsd config - - {mapping, "statsd.host", "emqx_statsd.host", [ - {default, "127.0.0.1"}, - {datatype, string} - ]}. - - {mapping, "statsd.port", "emqx_statsd.port", [ - {default, 8125}, - {datatype, integer} - ]}. - - {mapping, "statsd.prefix", "emqx_statsd.prefix", [ - {datatype, string} - ]}. - - {mapping, "statsd.tag.$id.key", "emqx_statsd.tag", [ - {datatype, string} - ]}. - - {mapping, "statsd.tag.$id.value", "emqx_statsd.tag", [ - {datatype, string} - ]}. - - {mapping, "statsd.batch_size", "emqx_statsd.batch_size", [ - {default, 10}, - {datatype, integer} - ]}. - - {mapping, "statsd.sample_time_interval", "emqx_statsd.sample_time_interval", [ - {default, "2s"}, - {datatype, {duration, ms}} - ]}. - - {mapping, "statsd.flush_time_interval", "emqx_statsd.flush_time_interval", [ - {default, "10s"}, - {datatype, {duration, ms}} - ]}. - - {translation, "emqx_stasd.host", fun(Conf) -> - {ok, IPAddress} = inet:parse_address(cuttlefish:conf_get("statsd.host", Conf, "127.0.0.1")), - IPAddress - end}. - - {translation, "emqx_statsd.tag", fun(Conf) -> - Tags = cuttlefish_variable:filter_by_prefix("statsd.tag", Conf), - lists:foldl( - fun({["statsd", "tag", Id, "key"], Key}, AccIn) -> - [{Key, cuttlefish:conf_get("statsd." ++ Id ++ ".value", Conf)} | AccIn]; - (_, AccIn) -> - AccIn - end, [], Tags) - end}. diff --git a/apps/emqx_statsd/src/emqx_statsd_app.erl b/apps/emqx_statsd/src/emqx_statsd_app.erl index 45d2c9708..cd998b158 100644 --- a/apps/emqx_statsd/src/emqx_statsd_app.erl +++ b/apps/emqx_statsd/src/emqx_statsd_app.erl @@ -28,11 +28,11 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_statsd_sup:start_link(), - emqx_statsd_sup:start_statsd(), + {ok, _} = emqx_statsd_sup:start_statsd(), ?LOG(info, "emqx statsd start: successfully"), {ok, Sup}. - stop(_) -> - emqx_statsd_sup:stop_statsd(), +stop(_) -> + ok = emqx_statsd_sup:stop_statsd(), ?LOG(info, "emqx statsd stop: successfully"), ok. diff --git a/apps/emqx_statsd/src/emqx_statsd_schema.erl b/apps/emqx_statsd/src/emqx_statsd_schema.erl new file mode 100644 index 000000000..fc45b41fc --- /dev/null +++ b/apps/emqx_statsd/src/emqx_statsd_schema.erl @@ -0,0 +1,38 @@ +-module(emqx_statsd_schema). + +-include_lib("typerefl/include/types.hrl"). + +-behaviour(hocon_schema). + +-export([ structs/0 + , fields/1]). + +structs() -> ["emqx_statsd"]. + +fields("emqx_statsd") -> + [ {server, fun server/1} + , {prefix, fun prefix/1} + , {tags, map()} + , {batch_size, fun batch_size/1} + , {sample_time_interval, fun duration_s/1} + , {flush_time_interval, fun duration_s/1}]. + +server(type) -> string(); +server(default) -> "192.168.1.1:8125"; +server(not_nullable) -> true; +server(_) -> undefined. + +prefix(type) -> string(); +prefix(default) -> "emqx"; +prefix(not_nullable) -> false; +prefix(_) -> undefined. + +batch_size(type) -> integer(); +batch_size(not_nullable) -> true; +batch_size(default) -> 10; +batch_size(_) -> undefined. + +duration_s(type) -> emqx_schema:duration_s(); +duration_s(not_nullable) -> true; +duration_s(default) -> "10s"; +duration_s(_) -> undefined. diff --git a/apps/emqx_statsd/src/emqx_statsd_sup.erl b/apps/emqx_statsd/src/emqx_statsd_sup.erl index e72f43225..91356f00f 100644 --- a/apps/emqx_statsd/src/emqx_statsd_sup.erl +++ b/apps/emqx_statsd/src/emqx_statsd_sup.erl @@ -19,15 +19,15 @@ supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - {ok, { {one_for_one, 10, 100}, []} }. + {ok, {{one_for_one, 10, 100}, []}}. start_statsd() -> {ok, Pid} = supervisor:start_child(?MODULE, estatsd_child_spec()), {ok, _Pid1} = supervisor:start_child(?MODULE, emqx_statsd_child_spec(Pid)). stop_statsd() -> - supervisor:terminate_child(emqx_statsd_sup, emqx_statsd), - supervisor:terminate_child(emqx_statsd_sup, estatsd). + ok = supervisor:terminate_child(?MODULE, emqx_statsd), + ok = supervisor:terminate_child(?MODULE, estatsd). %%============================================================================================== %% internal estatsd_child_spec() -> @@ -39,22 +39,36 @@ estatsd_child_spec() -> , modules => [estatsd]}. estatsd_options() -> - Host = application:get_env(?APP, host, ?DEFAULT_HOST), - Port = application:get_env(?APP, port, ?DEFAULT_PORT), - Prefix = application:get_env(?APP, prefix, ?DEFAULT_PREFIX), - Tags = application:get_env(?APP, tags, ?DEFAULT_TAGS), - BatchSize = application:get_env(?APP, batch_size, ?DEFAULT_BATCH_SIZE), + Server = get_conf(server, {?DEFAULT_HOST, ?DEFAULT_PORT}), + {Host, Port} = host_port(Server), + Prefix = get_conf(prefix, ?DEFAULT_PREFIX), + Tags = tags(get_conf(tags, ?DEFAULT_TAGS)), + BatchSize = get_conf(batch_size, ?DEFAULT_BATCH_SIZE), [{host, Host}, {port, Port}, {prefix, Prefix}, {tags, Tags}, {batch_size, BatchSize}]. +host_port({Host, Port}) -> {Host, Port}; +host_port(Server) -> + case string:tokens(Server, ":") of + [Domain] -> {Domain, ?DEFAULT_PORT}; + [Domain, Port] -> {Domain, list_to_integer(Port)} + end. + +tags(Map) -> + Tags = maps:to_list(Map), + [{atom_to_binary(Key, utf8), Value} || {Key, Value} <- Tags]. + emqx_statsd_child_spec(Pid) -> #{id => emqx_statsd - , start => {emqx_statsd, start_link, [emqx_statsd_options(Pid)]} + , start => {emqx_statsd, start_link, [[{estatsd_pid, Pid} | emqx_statsd_options()]]} , restart => permanent , shutdown => 5000 , type => worker , modules => [emqx_statsd]}. -emqx_statsd_options(Pid) -> - SampleTimeInterval = application:get_env(?APP, sample_time_interval, ?DEFAULT_SAMPLE_TIME_INTERVAL), - FlushTimeInterval = application:get_env(?APP, flush_time_interval, ?DEFAULT_FLUSH_TIME_INTERVAL), - [{estatsd_pid, Pid}, {sample_time_interval, SampleTimeInterval}, {flush_time_interval, FlushTimeInterval}]. +emqx_statsd_options() -> + SampleTimeInterval = get_conf(sample_time_interval, ?DEFAULT_SAMPLE_TIME_INTERVAL), + FlushTimeInterval = get_conf(flush_time_interval, ?DEFAULT_FLUSH_TIME_INTERVAL), + [{sample_time_interval, SampleTimeInterval}, {flush_time_interval, FlushTimeInterval}]. + +get_conf(Key, Default) -> + emqx_config:get([?APP, Key], Default).