fix: statsd hocon schema & add test suit (#5060)

This commit is contained in:
DDDHuang 2021-06-25 09:18:52 +08:00 committed by zhanghongtong
parent b88c71b481
commit 44412549ab
5 changed files with 54 additions and 24 deletions

View File

@ -3,8 +3,8 @@
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
emqx_statsd:{ emqx_statsd:{
# statsd server host: "127.0.0.1"
server: "127.0.0.1:8125" port: 8125
batch_size: 10 batch_size: 10
prefix: "emqx" prefix: "emqx"
tags: {"from": "emqx"} tags: {"from": "emqx"}

View File

@ -3,7 +3,7 @@
-define(DEFAULT_HOST, {127, 0, 0, 1}). -define(DEFAULT_HOST, {127, 0, 0, 1}).
-define(DEFAULT_PORT, 8125). -define(DEFAULT_PORT, 8125).
-define(DEFAULT_PREFIX, undefined). -define(DEFAULT_PREFIX, undefined).
-define(DEFAULT_TAGS, []). -define(DEFAULT_TAGS, #{}).
-define(DEFAULT_BATCH_SIZE, 10). -define(DEFAULT_BATCH_SIZE, 10).
-define(DEFAULT_SAMPLE_TIME_INTERVAL, 10000). -define(DEFAULT_SAMPLE_TIME_INTERVAL, 10).
-define(DEFAULT_FLUSH_TIME_INTERVAL, 10000). -define(DEFAULT_FLUSH_TIME_INTERVAL, 10).

View File

@ -10,29 +10,35 @@
structs() -> ["emqx_statsd"]. structs() -> ["emqx_statsd"].
fields("emqx_statsd") -> fields("emqx_statsd") ->
[ {server, fun server/1} [ {host, fun host/1}
, {port, fun port/1}
, {prefix, fun prefix/1} , {prefix, fun prefix/1}
, {tags, map()} , {tags, map()}
, {batch_size, fun batch_size/1} , {batch_size, fun batch_size/1}
, {sample_time_interval, fun duration_s/1} , {sample_time_interval, fun duration_s/1}
, {flush_time_interval, fun duration_s/1}]. , {flush_time_interval, fun duration_s/1}].
server(type) -> string(); host(type) -> string();
server(default) -> "192.168.1.1:8125"; host(default) -> "127.0.0.1";
server(not_nullable) -> true; host(nullable) -> false;
server(_) -> undefined. host(_) -> undefined.
port(type) -> integer();
port(default) -> 8125;
port(nullable) -> true;
port(_) -> undefined.
prefix(type) -> string(); prefix(type) -> string();
prefix(default) -> "emqx"; prefix(default) -> "emqx";
prefix(not_nullable) -> false; prefix(nullable) -> true;
prefix(_) -> undefined. prefix(_) -> undefined.
batch_size(type) -> integer(); batch_size(type) -> integer();
batch_size(not_nullable) -> true; batch_size(nullable) -> false;
batch_size(default) -> 10; batch_size(default) -> 10;
batch_size(_) -> undefined. batch_size(_) -> undefined.
duration_s(type) -> emqx_schema:duration_s(); duration_s(type) -> emqx_schema:duration_s();
duration_s(not_nullable) -> true; duration_s(nullable) -> false;
duration_s(default) -> "10s"; duration_s(default) -> "10s";
duration_s(_) -> undefined. duration_s(_) -> undefined.

View File

@ -15,6 +15,8 @@
-export([init/1]). -export([init/1]).
-export([estatsd_options/0]).
start_link() -> start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
@ -39,20 +41,13 @@ estatsd_child_spec() ->
, modules => [estatsd]}. , modules => [estatsd]}.
estatsd_options() -> estatsd_options() ->
Server = get_conf(server, {?DEFAULT_HOST, ?DEFAULT_PORT}), Host = get_conf(host, ?DEFAULT_HOST),
{Host, Port} = host_port(Server), Port = get_conf(port, ?DEFAULT_PORT),
Prefix = get_conf(prefix, ?DEFAULT_PREFIX), Prefix = get_conf(prefix, ?DEFAULT_PREFIX),
Tags = tags(get_conf(tags, ?DEFAULT_TAGS)), Tags = tags(get_conf(tags, ?DEFAULT_TAGS)),
BatchSize = get_conf(batch_size, ?DEFAULT_BATCH_SIZE), BatchSize = get_conf(batch_size, ?DEFAULT_BATCH_SIZE),
[{host, Host}, {port, Port}, {prefix, Prefix}, {tags, Tags}, {batch_size, BatchSize}]. [{host, Host}, {port, Port}, {prefix, Prefix}, {tags, Tags}, {batch_size, BatchSize}].
host_port({Host, Port}) -> {Host, Port};
host_port(Server) ->
case string:tokens(Server, ":") of
[Domain] -> {Domain, ?DEFAULT_PORT};
[Domain, Port] -> {Domain, list_to_integer(Port)}
end.
tags(Map) -> tags(Map) ->
Tags = maps:to_list(Map), Tags = maps:to_list(Map),
[{atom_to_binary(Key, utf8), Value} || {Key, Value} <- Tags]. [{atom_to_binary(Key, utf8), Value} || {Key, Value} <- Tags].
@ -66,8 +61,8 @@ emqx_statsd_child_spec(Pid) ->
, modules => [emqx_statsd]}. , modules => [emqx_statsd]}.
emqx_statsd_options() -> emqx_statsd_options() ->
SampleTimeInterval = get_conf(sample_time_interval, ?DEFAULT_SAMPLE_TIME_INTERVAL), SampleTimeInterval = get_conf(sample_time_interval, ?DEFAULT_SAMPLE_TIME_INTERVAL) * 1000,
FlushTimeInterval = get_conf(flush_time_interval, ?DEFAULT_FLUSH_TIME_INTERVAL), FlushTimeInterval = get_conf(flush_time_interval, ?DEFAULT_FLUSH_TIME_INTERVAL) * 1000,
[{sample_time_interval, SampleTimeInterval}, {flush_time_interval, FlushTimeInterval}]. [{sample_time_interval, SampleTimeInterval}, {flush_time_interval, FlushTimeInterval}].
get_conf(Key, Default) -> get_conf(Key, Default) ->

View File

@ -0,0 +1,29 @@
-module(emqx_statsd_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_statsd]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_statsd]).
all() ->
emqx_ct:all(?MODULE).
t_statsd(_) ->
{ok, Socket} = gen_udp:open(8125),
receive
{udp, _Socket, _Host, _Port, Bin} ->
?assert(length(Bin) > 50)
after
11*1000 ->
?assert(true, failed)
end,
gen_udp:close(Socket).