refactor: delete emqx_statsd all together

This commit is contained in:
Zaiming (Stone) Shi 2023-05-04 12:40:36 +02:00
parent 2bf6c593fd
commit 6e2cde8224
26 changed files with 26 additions and 950 deletions

View File

@ -45,6 +45,5 @@
{emqx_rule_engine,1}.
{emqx_shared_sub,1}.
{emqx_slow_subs,1}.
{emqx_statsd,1}.
{emqx_telemetry,1}.
{emqx_topic_metrics,1}.

View File

@ -51,6 +51,8 @@
"gen_rpc, recon, redbug, observer_cli, snabbkaffe, ekka, mria, amqp_client, rabbit_common"
).
-define(IGNORED_MODULES, "emqx_rpc").
-define(FORCE_DELETED_MODULES, [emqx_statsd, emqx_statsd_proto_v1]).
-define(FORCE_DELETED_APIS, [{emqx_statsd, 1}]).
%% List of known RPC backend modules:
-define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc").
%% List of known functions also known to do RPC:
@ -127,11 +129,16 @@ check_api_immutability(#{release := Rel1, api := APIs1}, #{release := Rel2, api
Val ->
ok;
undefined ->
case lists:member({API, Version}, ?FORCE_DELETED_APIS) of
true ->
ok;
false ->
setnok(),
logger:error(
"API ~p v~p was removed in release ~p without being deprecated.",
[API, Version, Rel2]
);
)
end;
_Val ->
setnok(),
logger:error(
@ -146,16 +153,24 @@ check_api_immutability(#{release := Rel1, api := APIs1}, #{release := Rel2, api
check_api_immutability(_, _) ->
ok.
filter_calls(Calls) ->
F = fun({{Mf, _, _}, {Mt, _, _}}) ->
(not lists:member(Mf, ?FORCE_DELETED_MODULES)) andalso
(not lists:member(Mt, ?FORCE_DELETED_MODULES))
end,
lists:filter(F, Calls).
%% Note: sets nok flag
-spec typecheck_apis(fulldump(), fulldump()) -> ok.
typecheck_apis(
#{release := CallerRelease, api := CallerAPIs, signatures := CallerSigs},
#{release := CalleeRelease, signatures := CalleeSigs}
) ->
AllCalls = lists:flatten([
AllCalls0 = lists:flatten([
[Calls, Casts]
|| #{calls := Calls, casts := Casts} <- maps:values(CallerAPIs)
]),
AllCalls = filter_calls(AllCalls0),
lists:foreach(
fun({From, To}) ->
Caller = get_param_types(CallerSigs, From),
@ -213,7 +228,7 @@ get_param_types(Signatures, {M, F, A}) ->
maps:from_list(lists:zip(A, AttrTypes));
_ ->
logger:critical("Call ~p:~p/~p is not found in PLT~n", [M, F, Arity]),
error(badkey)
error({badkey, {M, F, A}})
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

View File

@ -49,7 +49,6 @@
-define(MERGED_CONFIGS, [
emqx_bridge_schema,
emqx_retainer_schema,
emqx_statsd_schema,
emqx_authn_schema,
emqx_authz_schema,
emqx_auto_subscribe_schema,

View File

@ -134,7 +134,6 @@ basic_reboot_apps() ->
emqx_dashboard,
emqx_connector,
emqx_gateway,
emqx_statsd,
emqx_resource,
emqx_rule_engine,
emqx_bridge,

View File

@ -48,7 +48,6 @@ init_per_suite(Config) ->
emqx_modules,
emqx_dashboard,
emqx_gateway,
emqx_statsd,
emqx_resource,
emqx_rule_engine,
emqx_bridge,

View File

@ -1,19 +0,0 @@
.rebar3
_*
.eunit
*.o
*.beam
*.plt
*.swp
*.swo
.erlang.cookie
ebin
log
erl_crash.dump
.rebar
logs
_build
.idea
*.iml
rebar3.crashdump
*~

View File

@ -1,9 +0,0 @@
emqx_statsd
=====
An OTP application
Build
-----
$ rebar3 compile

View File

@ -1,19 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-define(APP, emqx_statsd).
-define(STATSD, [statsd]).
-define(SERVER_PARSE_OPTS, #{default_port => 8125}).

View File

@ -1,15 +0,0 @@
%% -*- mode: erlang -*-
{erl_opts, [debug_info]}.
{deps, [
{emqx, {path, "../emqx"}},
{emqx_utils, {path, "../emqx_utils"}},
{estatsd, {git, "https://github.com/emqx/estatsd", {tag, "0.1.0"}}}
]}.
{shell, [
% {config, "config/sys.config"},
{apps, [emqx_statsd]}
]}.
{project_plugins, [erlfmt]}.

View File

@ -1,18 +0,0 @@
%% -*- mode: erlang -*-
{application, emqx_statsd, [
{description, "EMQX Statsd"},
{vsn, "5.0.9"},
{registered, []},
{mod, {emqx_statsd_app, []}},
{applications, [
kernel,
stdlib,
estatsd,
emqx,
emqx_management
]},
{env, []},
{modules, []},
{licenses, ["Apache 2.0"]},
{links, []}
]}.

View File

@ -1,168 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_statsd).
-behaviour(gen_server).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-include("emqx_statsd.hrl").
-include_lib("emqx/include/logger.hrl").
-export([
start/0,
stop/0,
restart/0,
%% for rpc: remove after 5.1.x
do_start/0,
do_stop/0,
do_restart/0
]).
%% Interface
-export([start_link/1]).
%% Internal Exports
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
code_change/3,
terminate/2
]).
-define(SAMPLE_TIMEOUT, sample_timeout).
%% Remove after 5.1.x
start() -> check_multicall_result(emqx_statsd_proto_v1:start(mria:running_nodes())).
stop() -> check_multicall_result(emqx_statsd_proto_v1:stop(mria:running_nodes())).
restart() -> check_multicall_result(emqx_statsd_proto_v1:restart(mria:running_nodes())).
do_start() ->
emqx_statsd_sup:ensure_child_started(?APP).
do_stop() ->
emqx_statsd_sup:ensure_child_stopped(?APP).
do_restart() ->
ok = do_stop(),
ok = do_start(),
ok.
start_link(Conf) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, Conf, []).
init(Conf) ->
process_flag(trap_exit, true),
#{
tags := TagsRaw,
server := Server,
sample_time_interval := SampleTimeInterval,
flush_time_interval := FlushTimeInterval
} = Conf,
FlushTimeInterval1 = flush_interval(FlushTimeInterval, SampleTimeInterval),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?SERVER_PARSE_OPTS),
Tags = maps:fold(fun(K, V, Acc) -> [{to_bin(K), to_bin(V)} | Acc] end, [], TagsRaw),
Opts = [{tags, Tags}, {host, Host}, {port, Port}, {prefix, <<"emqx">>}],
{ok, Pid} = estatsd:start_link(Opts),
{ok,
ensure_timer(#{
sample_time_interval => SampleTimeInterval,
flush_time_interval => FlushTimeInterval1,
estatsd_pid => Pid
})}.
handle_call(_Req, _From, State) ->
{reply, ignore, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(
{timeout, Ref, ?SAMPLE_TIMEOUT},
State = #{
sample_time_interval := SampleTimeInterval,
flush_time_interval := FlushTimeInterval,
estatsd_pid := Pid,
timer := Ref
}
) ->
Metrics = emqx_metrics:all() ++ emqx_stats:getstats() ++ emqx_mgmt:vm_stats(),
SampleRate = SampleTimeInterval / FlushTimeInterval,
StatsdMetrics = [
{gauge, Name, Value, SampleRate, []}
|| {Name, Value} <- Metrics
],
ok = estatsd:submit(Pid, StatsdMetrics),
{noreply, ensure_timer(State), hibernate};
handle_info({'EXIT', Pid, Error}, State = #{estatsd_pid := Pid}) ->
{stop, {shutdown, Error}, State};
handle_info(_Msg, State) ->
{noreply, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
terminate(_Reason, #{estatsd_pid := Pid}) ->
estatsd:stop(Pid),
ok.
%%------------------------------------------------------------------------------
%% Internal function
%%------------------------------------------------------------------------------
flush_interval(FlushInterval, SampleInterval) when FlushInterval >= SampleInterval ->
FlushInterval;
flush_interval(_FlushInterval, SampleInterval) ->
?SLOG(
warning,
#{
msg =>
"Configured flush_time_interval is lower than sample_time_interval, "
"setting: flush_time_interval = sample_time_interval."
}
),
SampleInterval.
ensure_timer(State = #{sample_time_interval := SampleTimeInterval}) ->
State#{timer => emqx_utils:start_timer(SampleTimeInterval, ?SAMPLE_TIMEOUT)}.
check_multicall_result({Results, []}) ->
case
lists:all(
fun
(ok) -> true;
(_) -> false
end,
Results
)
of
true -> ok;
false -> error({bad_result, Results})
end;
check_multicall_result({_, _}) ->
error(multicall_failed).
to_bin(B) when is_binary(B) -> B;
to_bin(I) when is_integer(I) -> integer_to_binary(I);
to_bin(L) when is_list(L) -> list_to_binary(L);
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).

View File

@ -1,97 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_statsd_api).
-behaviour(minirest_api).
-include("emqx_statsd.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl").
-import(hoconsc, [mk/2, ref/2]).
-export([statsd/2]).
-export([
api_spec/0,
paths/0,
schema/1
]).
-define(API_TAG_STATSD, [<<"Monitor">>]).
-define(SCHEMA_MODULE, emqx_statsd_schema).
-define(INTERNAL_ERROR, 'INTERNAL_ERROR').
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() ->
["/statsd"].
schema("/statsd") ->
#{
'operationId' => statsd,
get =>
#{
deprecated => true,
description => ?DESC(get_statsd_config_api),
tags => ?API_TAG_STATSD,
responses =>
#{200 => statsd_config_schema()}
},
put =>
#{
deprecated => true,
description => ?DESC(update_statsd_config_api),
tags => ?API_TAG_STATSD,
'requestBody' => statsd_config_schema(),
responses =>
#{200 => statsd_config_schema()}
}
}.
%%--------------------------------------------------------------------
%% Helper funcs
%%--------------------------------------------------------------------
statsd_config_schema() ->
emqx_dashboard_swagger:schema_with_example(
ref(?SCHEMA_MODULE, "statsd"),
statsd_example()
).
statsd_example() ->
#{
enable => true,
flush_time_interval => <<"30s">>,
sample_time_interval => <<"30s">>,
server => <<"127.0.0.1:8125">>,
tags => #{}
}.
statsd(get, _Params) ->
{200, emqx:get_raw_config([<<"statsd">>], #{})};
statsd(put, #{body := Body}) ->
case emqx_statsd_config:update(Body) of
{ok, NewConfig} ->
{200, NewConfig};
{error, Reason} ->
Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),
{500, ?INTERNAL_ERROR, Message}
end.

View File

@ -1,34 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_statsd_app).
-behaviour(application).
-include("emqx_statsd.hrl").
-export([
start/2,
stop/1
]).
start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_statsd_sup:start_link(),
emqx_statsd_config:add_handler(),
{ok, Sup}.
stop(_) ->
emqx_statsd_config:remove_handler(),
ok.

View File

@ -1,54 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_statsd_config).
-behaviour(emqx_config_handler).
-include("emqx_statsd.hrl").
-export([add_handler/0, remove_handler/0]).
-export([post_config_update/5]).
-export([update/1]).
update(Config) ->
case
emqx_conf:update(
?STATSD,
Config,
#{rawconf_with_defaults => true, override_to => cluster}
)
of
{ok, #{raw_config := NewConfigRows}} ->
{ok, NewConfigRows};
{error, Reason} ->
{error, Reason}
end.
add_handler() ->
ok = emqx_config_handler:add_handler(?STATSD, ?MODULE),
ok.
remove_handler() ->
ok = emqx_config_handler:remove_handler(?STATSD),
ok.
post_config_update(?STATSD, _Req, #{enable := true} = New, _Old, _AppEnvs) ->
emqx_statsd_sup:ensure_child_stopped(?APP),
emqx_statsd_sup:ensure_child_started(?APP, New);
post_config_update(?STATSD, _Req, #{enable := false}, _Old, _AppEnvs) ->
emqx_statsd_sup:ensure_child_stopped(?APP);
post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) ->
ok.

View File

@ -1,96 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_statsd_schema).
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl").
-include("emqx_statsd.hrl").
-behaviour(hocon_schema).
-export([
namespace/0,
roots/0,
fields/1,
desc/1,
validations/0
]).
namespace() -> "statsd".
roots() ->
[{"statsd", hoconsc:mk(hoconsc:ref(?MODULE, "statsd"), #{importance => ?IMPORTANCE_HIDDEN})}].
fields("statsd") ->
[
{enable,
hoconsc:mk(
boolean(),
#{
default => false,
desc => ?DESC(enable)
}
)},
{server, server()},
{sample_time_interval, fun sample_interval/1},
{flush_time_interval, fun flush_interval/1},
{tags, fun tags/1}
].
desc("statsd") -> ?DESC(statsd);
desc(_) -> undefined.
server() ->
Meta = #{
default => <<"127.0.0.1:8125">>,
desc => ?DESC(?FUNCTION_NAME)
},
emqx_schema:servers_sc(Meta, ?SERVER_PARSE_OPTS).
sample_interval(type) -> emqx_schema:duration_ms();
sample_interval(default) -> <<"30s">>;
sample_interval(desc) -> ?DESC(?FUNCTION_NAME);
sample_interval(_) -> undefined.
flush_interval(type) -> emqx_schema:duration_ms();
flush_interval(default) -> <<"30s">>;
flush_interval(desc) -> ?DESC(?FUNCTION_NAME);
flush_interval(_) -> undefined.
tags(type) -> map();
tags(default) -> #{};
tags(desc) -> ?DESC(?FUNCTION_NAME);
tags(_) -> undefined.
validations() ->
[
{check_interval, fun check_interval/1}
].
check_interval(Conf) ->
case hocon_maps:get("statsd.sample_time_interval", Conf) of
undefined ->
ok;
Sample ->
Flush = hocon_maps:get("statsd.flush_time_interval", Conf),
case Sample =< Flush of
true ->
true;
false ->
{bad_interval, #{sample_time_interval => Sample, flush_time_interval => Flush}}
end
end.

View File

@ -1,81 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%%%-------------------------------------------------------------------
%% @doc emqx_statsd top level supervisor.
%% @end
%%%-------------------------------------------------------------------
-module(emqx_statsd_sup).
-behaviour(supervisor).
-export([
start_link/0,
ensure_child_started/1,
ensure_child_started/2,
ensure_child_stopped/1
]).
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(Mod, Opts), #{
id => Mod,
start => {Mod, start_link, Opts},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [Mod]
}).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-spec ensure_child_started(atom()) -> ok.
ensure_child_started(Mod) when is_atom(Mod) ->
ensure_child_started(Mod, emqx_conf:get([statsd], #{})).
-spec ensure_child_started(atom(), map()) -> ok.
ensure_child_started(Mod, Conf) when is_atom(Mod) ->
assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, [Conf]))).
%% @doc Stop the child worker process.
-spec ensure_child_stopped(any()) -> ok.
ensure_child_stopped(ChildId) ->
case supervisor:terminate_child(?MODULE, ChildId) of
ok ->
%% with terminate_child/2 returned 'ok', it's not possible
%% for supervisor:delete_child/2 to return {error, Reason}
ok = supervisor:delete_child(?MODULE, ChildId);
{error, not_found} ->
ok
end.
init([]) ->
Children =
case emqx_conf:get([statsd], #{}) of
#{enable := true} = Conf -> [?CHILD(emqx_statsd, [Conf])];
_ -> []
end,
{ok, {{one_for_one, 100, 3600}, Children}}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
assert_started({ok, _Pid}) -> ok;
assert_started({error, {already_started, _Pid}}) -> ok;
assert_started({error, Reason}) -> erlang:error(Reason).

View File

@ -1,44 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_statsd_proto_v1).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
start/1,
stop/1,
restart/1
]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.0.0".
-spec start([node()]) -> emqx_rpc:multicall_result().
start(Nodes) ->
rpc:multicall(Nodes, emqx_statsd, do_start, [], 5000).
-spec stop([node()]) -> emqx_rpc:multicall_result().
stop(Nodes) ->
rpc:multicall(Nodes, emqx_statsd, do_stop, [], 5000).
-spec restart([node()]) -> emqx_rpc:multicall_result().
restart(Nodes) ->
rpc:multicall(Nodes, emqx_statsd, do_restart, [], 5000).

View File

@ -1,204 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_statsd_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(emqx_dashboard_api_test_helpers, [request/3, uri/1]).
-define(BASE_CONF, <<
"\n"
"statsd {\n"
"enable = true\n"
"flush_time_interval = 4s\n"
"sample_time_interval = 4s\n"
"server = \"127.0.0.1:8126\"\n"
"tags {\"t1\" = \"good\", test = 100}\n"
"}\n"
>>).
-define(BAD_CONF, <<
"\n"
"statsd {\n"
"enable = true\n"
"flush_time_interval = 4s\n"
"sample_time_interval = 4s\n"
"server = \"\"\n"
"tags {\"t1\" = \"good\", test = 100}\n"
"}\n"
>>).
-define(DEFAULT_CONF, <<
"\n"
"statsd {\n"
"enable = true\n"
"flush_time_interval = 4s\n"
"sample_time_interval = 4s\n"
"tags {\"t1\" = \"good\", test = 100}\n"
"}\n"
>>).
init_per_suite(Config) ->
emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_dashboard, emqx_statsd],
fun set_special_configs/1
),
ok = emqx_common_test_helpers:load_config(emqx_statsd_schema, ?BASE_CONF),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_statsd, emqx_dashboard, emqx_conf]).
set_special_configs(emqx_dashboard) ->
emqx_dashboard_api_test_helpers:set_default_config();
set_special_configs(_) ->
ok.
all() ->
emqx_common_test_helpers:all(?MODULE).
t_server_validator(_) ->
Server0 = emqx_conf:get_raw([statsd, server]),
?assertThrow(
#{
kind := validation_error,
path := "statsd.server",
reason := "cannot_be_empty",
value := ""
},
emqx_common_test_helpers:load_config(emqx_statsd_schema, ?BAD_CONF)
),
%% default
ok = emqx_common_test_helpers:load_config(emqx_statsd_schema, ?DEFAULT_CONF),
DefaultServer = default_server(),
?assertEqual(DefaultServer, emqx_conf:get_raw([statsd, server])),
DefaultServerStr = binary_to_list(DefaultServer),
?assertEqual(DefaultServerStr, emqx_conf:get([statsd, server])),
%% recover
ok = emqx_common_test_helpers:load_config(emqx_statsd_schema, ?BASE_CONF),
Server2 = emqx_conf:get_raw([statsd, server]),
?assertMatch(Server0, Server2),
ok.
t_statsd(_) ->
{ok, Socket} = gen_udp:open(8126, [{active, true}]),
receive
{udp, Socket1, Host, Port, Data} ->
ct:pal("receive:~p~n", [{Socket, Socket1, Host, Port}]),
?assert(length(Data) > 50),
?assert(nomatch =/= string:find(Data, "\nemqx.cpu_use:"))
after 10 * 1000 ->
error(timeout)
end,
gen_udp:close(Socket).
t_management(_) ->
?assertMatch(ok, emqx_statsd:start()),
?assertMatch(ok, emqx_statsd:start()),
?assertMatch(ok, emqx_statsd:stop()),
?assertMatch(ok, emqx_statsd:stop()),
?assertMatch(ok, emqx_statsd:restart()).
t_rest_http(_) ->
{ok, Res0} = request(get),
?assertEqual(
#{
<<"enable">> => true,
<<"flush_time_interval">> => <<"4s">>,
<<"sample_time_interval">> => <<"4s">>,
<<"server">> => <<"127.0.0.1:8126">>,
<<"tags">> => #{<<"t1">> => <<"good">>, <<"test">> => 100}
},
Res0
),
{ok, Res1} = request(put, #{enable => false}),
?assertMatch(#{<<"enable">> := false}, Res1),
?assertEqual(maps:remove(<<"enable">>, Res0), maps:remove(<<"enable">>, Res1)),
{ok, Res2} = request(get),
?assertEqual(Res1, Res2),
?assertEqual(
error, request(put, #{sample_time_interval => "11s", flush_time_interval => "10s"})
),
{ok, _} = request(put, #{enable => true}),
ok.
t_kill_exit(_) ->
{ok, _} = request(put, #{enable => true}),
Pid = erlang:whereis(emqx_statsd),
?assertEqual(ignore, gen_server:call(Pid, whatever)),
?assertEqual(ok, gen_server:cast(Pid, whatever)),
?assertEqual(Pid, erlang:whereis(emqx_statsd)),
#{estatsd_pid := Estatsd} = sys:get_state(emqx_statsd),
?assert(erlang:exit(Estatsd, kill)),
?assertEqual(false, is_process_alive(Estatsd)),
ct:sleep(150),
Pid1 = erlang:whereis(emqx_statsd),
?assertNotEqual(Pid, Pid1),
#{estatsd_pid := Estatsd1} = sys:get_state(emqx_statsd),
?assertNotEqual(Estatsd, Estatsd1),
ok.
t_config_update(_) ->
OldRawConf = emqx_conf:get_raw([statsd]),
{ok, _} = emqx_statsd_config:update(OldRawConf#{<<"enable">> => true}),
CommonKeys = [flush_time_interval, sample_time_interval],
OldConf = emqx_conf:get([statsd]),
OldStatsDState = sys:get_state(emqx_statsd),
OldPid = erlang:whereis(emqx_statsd),
?assertEqual(maps:with(CommonKeys, OldConf), maps:with(CommonKeys, OldStatsDState)),
NewRawConfExpect = OldRawConf#{
<<"flush_time_interval">> := <<"42s">>,
<<"sample_time_interval">> := <<"42s">>
},
try
{ok, _} = emqx_statsd_config:update(NewRawConfExpect),
NewRawConf = emqx_conf:get_raw([statsd]),
NewConf = emqx_conf:get([statsd]),
NewStatsDState = sys:get_state(emqx_statsd),
NewPid = erlang:whereis(emqx_statsd),
?assertNotEqual(OldRawConf, NewRawConf),
?assertEqual(NewRawConfExpect, NewRawConf),
?assertEqual(maps:with(CommonKeys, NewConf), maps:with(CommonKeys, NewStatsDState)),
?assertNotEqual(OldPid, NewPid)
after
{ok, _} = emqx_statsd_config:update(OldRawConf)
end,
%% bad server url
BadRawConf = OldRawConf#{<<"server">> := <<"">>},
{error, #{
kind := validation_error,
path := "statsd.server",
reason := "cannot_be_empty",
value := ""
}} = emqx_statsd_config:update(BadRawConf),
ok.
request(Method) -> request(Method, []).
request(Method, Body) ->
case request(Method, uri(["statsd"]), Body) of
{ok, 200, Res} ->
{ok, emqx_utils_json:decode(Res, [return_maps])};
{ok, _Status, _} ->
error
end.
default_server() ->
{server, Schema} = lists:keyfind(server, 1, emqx_statsd_schema:fields("statsd")),
hocon_schema:field_schema(Schema, default).

View File

@ -303,7 +303,6 @@ defmodule EMQXUmbrella.MixProject do
:emqx_bridge,
:emqx_modules,
:emqx_management,
:emqx_statsd,
:emqx_retainer,
:emqx_prometheus,
:emqx_auto_subscribe,
@ -369,7 +368,6 @@ defmodule EMQXUmbrella.MixProject do
emqx_management: :permanent,
emqx_dashboard: :permanent,
emqx_retainer: :permanent,
emqx_statsd: :permanent,
emqx_prometheus: :permanent,
emqx_psk: :permanent,
emqx_slow_subs: :permanent,

View File

@ -431,7 +431,6 @@ relx_apps(ReleaseType, Edition) ->
emqx_management,
emqx_dashboard,
emqx_retainer,
emqx_statsd,
emqx_prometheus,
emqx_psk,
emqx_slow_subs,

View File

@ -1,9 +0,0 @@
emqx_statsd_api {
get_statsd_config_api.desc:
"""List the configuration of StatsD metrics collection and push service."""
update_statsd_config_api.desc:
"""Update the configuration of StatsD metrics collection and push service."""
}

View File

@ -1,30 +0,0 @@
emqx_statsd_schema {
enable.desc:
"""Enable or disable StatsD metrics collection and push service."""
flush_interval.desc:
"""The push interval for metrics."""
get_statsd_config_api.desc:
"""List the configuration of StatsD metrics collection and push service."""
sample_interval.desc:
"""The sampling interval for metrics."""
server.desc:
"""StatsD server address."""
statsd.desc:
"""StatsD metrics collection and push configuration."""
statsd.label:
"""StatsD"""
tags.desc:
"""The tags for metrics."""
update_statsd_config_api.desc:
"""Update the configuration of StatsD metrics collection and push service."""
}

View File

@ -1,9 +0,0 @@
emqx_statsd_api {
get_statsd_config_api.desc:
"""列出 StatsD 指标采集和推送服务的的配置。"""
update_statsd_config_api.desc:
"""更新 StatsD 指标采集和推送服务的配置。"""
}

View File

@ -1,30 +0,0 @@
emqx_statsd_schema {
enable.desc:
"""启用或禁用 StatsD 指标采集和推送服务。"""
flush_interval.desc:
"""指标的推送间隔。"""
get_statsd_config_api.desc:
"""列出 StatsD 指标采集和推送服务的的配置。"""
sample_interval.desc:
"""指标的采样间隔。"""
server.desc:
"""StatsD 服务器地址。"""
statsd.desc:
"""StatsD 指标采集与推送配置。"""
statsd.label:
"""StatsD"""
tags.desc:
"""指标的标签。"""
update_statsd_config_api.desc:
"""更新 StatsD 指标采集和推送服务的配置。"""
}

View File

@ -30,6 +30,10 @@ for app in ${APPS}; do
else
old_app_version='not_found'
fi
if [ ! -f "$src_file" ]; then
# app is deleted
continue
fi
now_app_version=$(grep -E 'vsn' "$src_file" | grep -oE '"[0-9]+\.[0-9]+\.[0-9]+"' | tr -d '"')
if [ "$old_app_version" = 'not_found' ]; then