feat(statsd): add statsd http API

This commit is contained in:
Turtle 2021-07-23 17:37:10 +08:00
parent 1776acee8a
commit 6617ba2772
8 changed files with 196 additions and 61 deletions

View File

@ -250,5 +250,4 @@ emqx_feature() ->
, emqx_bridge_mqtt
, emqx_modules
, emqx_management
, emqx_retainer
, emqx_statsd].
, emqx_retainer].

View File

@ -3,11 +3,8 @@
##--------------------------------------------------------------------
emqx_statsd:{
host: "127.0.0.1"
port: 8125
batch_size: 10
prefix: "emqx"
tags: {"from": "emqx"}
enable: true
server: "127.0.0.1:8125"
sample_time_interval: "10s"
flush_time_interval: "10s"
}

View File

@ -1,9 +1,5 @@
-define(APP, emqx_statsd).
-define(DEFAULT_HOST, {127, 0, 0, 1}).
-define(DEFAULT_PORT, 8125).
-define(DEFAULT_PREFIX, undefined).
-define(DEFAULT_TAGS, #{}).
-define(DEFAULT_BATCH_SIZE, 10).
-define(DEFAULT_SAMPLE_TIME_INTERVAL, 10000).
-define(DEFAULT_FLUSH_TIME_INTERVAL, 10000).
-define(DEFAULT_HOST, "127.0.0.1").
-define(DEFAULT_PORT, 8125).

View File

@ -49,11 +49,15 @@ start_link(Opts) ->
init([Opts]) ->
process_flag(trap_exit, true),
Tags = tags(maps:get(tags, Opts, ?DEFAULT_TAGS)),
Tags = tags(maps:get(tags, Opts, #{})),
{Host, Port} = maps:get(server, Opts, {?DEFAULT_HOST, ?DEFAULT_PORT}),
Opts1 = maps:without([sample_time_interval,
flush_time_interval], Opts#{tags => Tags}),
flush_time_interval], Opts#{tags => Tags,
host => Host,
port => Port,
prefix => <<"emqx">>}),
{ok, Pid} = estatsd:start_link(maps:to_list(Opts1)),
SampleTimeInterval = maps:get(sample_time_interval, Opts, ?DEFAULT_SAMPLE_TIME_INTERVAL),
SampleTimeInterval = maps:get(sample_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL),
FlushTimeInterval = maps:get(flush_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL),
{ok, ensure_timer(#state{sample_time_interval = SampleTimeInterval,
flush_time_interval = FlushTimeInterval,

View File

@ -0,0 +1,108 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_statsd_api).
-behaviour(minirest_api).
-include("emqx_statsd.hrl").
-import(emqx_mgmt_util, [ response_schema/1
, response_schema/2
, request_body_schema/1
, request_body_schema/2
]).
-export([api_spec/0]).
-export([ statsd/2
]).
api_spec() ->
{statsd_api(), schemas()}.
schemas() ->
[#{statsd => #{
type => object,
properties => #{
server => #{
type => string,
description => <<"Statsd Server">>,
example => get_raw(<<"server">>, <<"127.0.0.1:8125">>)},
enable => #{
type => boolean,
description => <<"Statsd status">>,
example => get_raw(<<"enable">>, false)},
sample_time_interval => #{
type => string,
description => <<"Sample Time Interval">>,
example => get_raw(<<"sample_time_interval">>, <<"10s">>)},
flush_time_interval => #{
type => string,
description => <<"Flush Time Interval">>,
example => get_raw(<<"flush_time_interval">>, <<"10s">>)}
}
}}].
statsd_api() ->
Metadata = #{
get => #{
description => <<"Get statsd info">>,
responses => #{
<<"200">> => response_schema(<<"statsd">>)
}
},
put => #{
description => <<"Update Statsd">>,
'requestBody' => request_body_schema(<<"statsd">>),
responses => #{
<<"200">> =>
response_schema(<<"Update Statsd successfully">>),
<<"400">> =>
response_schema(<<"Bad Request">>, #{
type => object,
properties => #{
message => #{type => string},
code => #{type => string}
}
})
}
}
},
[{"/statsd", Metadata, statsd}].
statsd(get, _Request) ->
Response = emqx_config:get_raw([<<"emqx_statsd">>], #{}),
{200, Response};
statsd(put, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Params = emqx_json:decode(Body, [return_maps]),
Enable = maps:get(<<"enable">>, Params),
ok = emqx_config:update_config([emqx_statsd], Params),
enable_statsd(Enable).
enable_statsd(true) ->
ok = emqx_statsd_sup:stop_child(?APP),
emqx_statsd_sup:start_child(?APP, emqx_config:get([?APP], #{})),
{200};
enable_statsd(false) ->
ok = emqx_statsd_sup:stop_child(?APP),
{200}.
get_raw(Key, Def) ->
emqx_config:get_raw([<<"emqx_statsd">>]++ [Key], Def).

View File

@ -18,12 +18,23 @@
-behaviour(application).
-include("emqx_statsd.hrl").
-export([ start/2
, stop/1
]).
start(_StartType, _StartArgs) ->
emqx_statsd_sup:start_link().
{ok, Sup} = emqx_statsd_sup:start_link(),
maybe_enable_statsd(),
{ok, Sup}.
stop(_) ->
ok.
maybe_enable_statsd() ->
case emqx_config:get([?APP, enable], false) of
true ->
emqx_statsd_sup:start_child(emqx_statsd, emqx_config:get([?APP], #{}));
false ->
ok
end.

View File

@ -4,41 +4,38 @@
-behaviour(hocon_schema).
-export([to_ip_port/1]).
-export([ structs/0
, fields/1]).
-typerefl_from_string({ip_port/0, emqx_statsd_schema, to_ip_port}).
structs() -> ["emqx_statsd"].
fields("emqx_statsd") ->
[ {host, fun host/1}
, {port, fun port/1}
, {prefix, fun prefix/1}
, {tags, map()}
, {batch_size, fun batch_size/1}
, {sample_time_interval, fun duration_s/1}
, {flush_time_interval, fun duration_s/1}].
[ {enable, emqx_schema:t(boolean(), undefined, false)}
, {server, fun server/1}
, {sample_time_interval, fun duration_ms/1}
, {flush_time_interval, fun duration_ms/1}
].
host(type) -> string();
host(default) -> "127.0.0.1";
host(nullable) -> false;
host(_) -> undefined.
server(type) -> emqx_schema:ip_port();
server(default) -> "127.0.0.1:8125";
server(nullable) -> false;
server(_) -> undefined.
port(type) -> integer();
port(default) -> 8125;
port(nullable) -> true;
port(_) -> undefined.
duration_ms(type) -> emqx_schema:duration_ms();
duration_ms(nullable) -> false;
duration_ms(default) -> "10s";
duration_ms(_) -> undefined.
prefix(type) -> string();
prefix(default) -> "emqx";
prefix(nullable) -> true;
prefix(_) -> undefined.
batch_size(type) -> integer();
batch_size(nullable) -> false;
batch_size(default) -> 10;
batch_size(_) -> undefined.
duration_s(type) -> emqx_schema:duration_s();
duration_s(nullable) -> false;
duration_s(default) -> "10s";
duration_s(_) -> undefined.
to_ip_port(Str) ->
case string:tokens(Str, ":") of
[Ip, Port] ->
case inet:parse_address(Ip) of
{ok, R} -> {ok, {R, list_to_integer(Port)}};
_ -> {error, Str}
end;
_ -> {error, Str}
end.

View File

@ -7,25 +7,48 @@
-behaviour(supervisor).
-include("emqx_statsd.hrl").
-export([start_link/0]).
-export([ start_link/0
, start_child/1
, start_child/2
, stop_child/1
]).
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
Opts = emqx_config:get([?APP], #{}),
{ok, {{one_for_one, 10, 3600},
[#{id => emqx_statsd,
start => {emqx_statsd, start_link, [Opts]},
%% Helper macro for declaring children of supervisor
-define(CHILD(Mod, Opts), #{id => Mod,
start => {Mod, start_link, [Opts]},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_statsd]}]}}.
modules => [Mod]}).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-spec start_child(supervisor:child_spec()) -> ok.
start_child(ChildSpec) when is_map(ChildSpec) ->
assert_started(supervisor:start_child(?MODULE, ChildSpec)).
-spec start_child(atom(), map()) -> ok.
start_child(Mod, Opts) when is_atom(Mod) andalso is_map(Opts) ->
assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Opts))).
-spec(stop_child(any()) -> ok | {error, term()}).
stop_child(ChildId) ->
case supervisor:terminate_child(?MODULE, ChildId) of
ok -> supervisor:delete_child(?MODULE, ChildId);
Error -> Error
end.
init([]) ->
{ok, {{one_for_one, 10, 3600}, []}}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
assert_started({ok, _Pid}) -> ok;
assert_started({ok, _Pid, _Info}) -> ok;
assert_started({error, {already_tarted, _Pid}}) -> ok;
assert_started({error, Reason}) -> erlang:error(Reason).