Merge pull request #10600 from zmstone/0504-delete-statsd-all-together
0504 delete statsd all together
This commit is contained in:
commit
fe81e9521a
|
@ -45,6 +45,5 @@
|
|||
{emqx_rule_engine,1}.
|
||||
{emqx_shared_sub,1}.
|
||||
{emqx_slow_subs,1}.
|
||||
{emqx_statsd,1}.
|
||||
{emqx_telemetry,1}.
|
||||
{emqx_topic_metrics,1}.
|
||||
|
|
|
@ -51,6 +51,8 @@
|
|||
"gen_rpc, recon, redbug, observer_cli, snabbkaffe, ekka, mria, amqp_client, rabbit_common"
|
||||
).
|
||||
-define(IGNORED_MODULES, "emqx_rpc").
|
||||
-define(FORCE_DELETED_MODULES, [emqx_statsd, emqx_statsd_proto_v1]).
|
||||
-define(FORCE_DELETED_APIS, [{emqx_statsd, 1}]).
|
||||
%% List of known RPC backend modules:
|
||||
-define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc").
|
||||
%% List of known functions also known to do RPC:
|
||||
|
@ -127,11 +129,16 @@ check_api_immutability(#{release := Rel1, api := APIs1}, #{release := Rel2, api
|
|||
Val ->
|
||||
ok;
|
||||
undefined ->
|
||||
setnok(),
|
||||
logger:error(
|
||||
"API ~p v~p was removed in release ~p without being deprecated.",
|
||||
[API, Version, Rel2]
|
||||
);
|
||||
case lists:member({API, Version}, ?FORCE_DELETED_APIS) of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
setnok(),
|
||||
logger:error(
|
||||
"API ~p v~p was removed in release ~p without being deprecated.",
|
||||
[API, Version, Rel2]
|
||||
)
|
||||
end;
|
||||
_Val ->
|
||||
setnok(),
|
||||
logger:error(
|
||||
|
@ -146,16 +153,24 @@ check_api_immutability(#{release := Rel1, api := APIs1}, #{release := Rel2, api
|
|||
check_api_immutability(_, _) ->
|
||||
ok.
|
||||
|
||||
filter_calls(Calls) ->
|
||||
F = fun({{Mf, _, _}, {Mt, _, _}}) ->
|
||||
(not lists:member(Mf, ?FORCE_DELETED_MODULES)) andalso
|
||||
(not lists:member(Mt, ?FORCE_DELETED_MODULES))
|
||||
end,
|
||||
lists:filter(F, Calls).
|
||||
|
||||
%% Note: sets nok flag
|
||||
-spec typecheck_apis(fulldump(), fulldump()) -> ok.
|
||||
typecheck_apis(
|
||||
#{release := CallerRelease, api := CallerAPIs, signatures := CallerSigs},
|
||||
#{release := CalleeRelease, signatures := CalleeSigs}
|
||||
) ->
|
||||
AllCalls = lists:flatten([
|
||||
AllCalls0 = lists:flatten([
|
||||
[Calls, Casts]
|
||||
|| #{calls := Calls, casts := Casts} <- maps:values(CallerAPIs)
|
||||
]),
|
||||
AllCalls = filter_calls(AllCalls0),
|
||||
lists:foreach(
|
||||
fun({From, To}) ->
|
||||
Caller = get_param_types(CallerSigs, From),
|
||||
|
@ -213,7 +228,7 @@ get_param_types(Signatures, {M, F, A}) ->
|
|||
maps:from_list(lists:zip(A, AttrTypes));
|
||||
_ ->
|
||||
logger:critical("Call ~p:~p/~p is not found in PLT~n", [M, F, Arity]),
|
||||
error(badkey)
|
||||
error({badkey, {M, F, A}})
|
||||
end.
|
||||
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
|
|
|
@ -49,7 +49,6 @@
|
|||
-define(MERGED_CONFIGS, [
|
||||
emqx_bridge_schema,
|
||||
emqx_retainer_schema,
|
||||
emqx_statsd_schema,
|
||||
emqx_authn_schema,
|
||||
emqx_authz_schema,
|
||||
emqx_auto_subscribe_schema,
|
||||
|
|
|
@ -134,7 +134,6 @@ basic_reboot_apps() ->
|
|||
emqx_dashboard,
|
||||
emqx_connector,
|
||||
emqx_gateway,
|
||||
emqx_statsd,
|
||||
emqx_resource,
|
||||
emqx_rule_engine,
|
||||
emqx_bridge,
|
||||
|
|
|
@ -48,7 +48,6 @@ init_per_suite(Config) ->
|
|||
emqx_modules,
|
||||
emqx_dashboard,
|
||||
emqx_gateway,
|
||||
emqx_statsd,
|
||||
emqx_resource,
|
||||
emqx_rule_engine,
|
||||
emqx_bridge,
|
||||
|
|
|
@ -1,19 +0,0 @@
|
|||
.rebar3
|
||||
_*
|
||||
.eunit
|
||||
*.o
|
||||
*.beam
|
||||
*.plt
|
||||
*.swp
|
||||
*.swo
|
||||
.erlang.cookie
|
||||
ebin
|
||||
log
|
||||
erl_crash.dump
|
||||
.rebar
|
||||
logs
|
||||
_build
|
||||
.idea
|
||||
*.iml
|
||||
rebar3.crashdump
|
||||
*~
|
|
@ -1,9 +0,0 @@
|
|||
emqx_statsd
|
||||
=====
|
||||
|
||||
An OTP application
|
||||
|
||||
Build
|
||||
-----
|
||||
|
||||
$ rebar3 compile
|
|
@ -1,19 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-define(APP, emqx_statsd).
|
||||
-define(STATSD, [statsd]).
|
||||
-define(SERVER_PARSE_OPTS, #{default_port => 8125}).
|
|
@ -1,15 +0,0 @@
|
|||
%% -*- mode: erlang -*-
|
||||
|
||||
{erl_opts, [debug_info]}.
|
||||
{deps, [
|
||||
{emqx, {path, "../emqx"}},
|
||||
{emqx_utils, {path, "../emqx_utils"}},
|
||||
{estatsd, {git, "https://github.com/emqx/estatsd", {tag, "0.1.0"}}}
|
||||
]}.
|
||||
|
||||
{shell, [
|
||||
% {config, "config/sys.config"},
|
||||
{apps, [emqx_statsd]}
|
||||
]}.
|
||||
|
||||
{project_plugins, [erlfmt]}.
|
|
@ -1,18 +0,0 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_statsd, [
|
||||
{description, "EMQX Statsd"},
|
||||
{vsn, "5.0.9"},
|
||||
{registered, []},
|
||||
{mod, {emqx_statsd_app, []}},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib,
|
||||
estatsd,
|
||||
emqx,
|
||||
emqx_management
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
{licenses, ["Apache 2.0"]},
|
||||
{links, []}
|
||||
]}.
|
|
@ -1,168 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_statsd).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-ifdef(TEST).
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
-endif.
|
||||
|
||||
-include("emqx_statsd.hrl").
|
||||
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
-export([
|
||||
start/0,
|
||||
stop/0,
|
||||
restart/0,
|
||||
%% for rpc: remove after 5.1.x
|
||||
do_start/0,
|
||||
do_stop/0,
|
||||
do_restart/0
|
||||
]).
|
||||
|
||||
%% Interface
|
||||
-export([start_link/1]).
|
||||
|
||||
%% Internal Exports
|
||||
-export([
|
||||
init/1,
|
||||
handle_call/3,
|
||||
handle_cast/2,
|
||||
handle_info/2,
|
||||
code_change/3,
|
||||
terminate/2
|
||||
]).
|
||||
|
||||
-define(SAMPLE_TIMEOUT, sample_timeout).
|
||||
|
||||
%% Remove after 5.1.x
|
||||
start() -> check_multicall_result(emqx_statsd_proto_v1:start(mria:running_nodes())).
|
||||
stop() -> check_multicall_result(emqx_statsd_proto_v1:stop(mria:running_nodes())).
|
||||
restart() -> check_multicall_result(emqx_statsd_proto_v1:restart(mria:running_nodes())).
|
||||
|
||||
do_start() ->
|
||||
emqx_statsd_sup:ensure_child_started(?APP).
|
||||
|
||||
do_stop() ->
|
||||
emqx_statsd_sup:ensure_child_stopped(?APP).
|
||||
|
||||
do_restart() ->
|
||||
ok = do_stop(),
|
||||
ok = do_start(),
|
||||
ok.
|
||||
|
||||
start_link(Conf) ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, Conf, []).
|
||||
|
||||
init(Conf) ->
|
||||
process_flag(trap_exit, true),
|
||||
#{
|
||||
tags := TagsRaw,
|
||||
server := Server,
|
||||
sample_time_interval := SampleTimeInterval,
|
||||
flush_time_interval := FlushTimeInterval
|
||||
} = Conf,
|
||||
FlushTimeInterval1 = flush_interval(FlushTimeInterval, SampleTimeInterval),
|
||||
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?SERVER_PARSE_OPTS),
|
||||
Tags = maps:fold(fun(K, V, Acc) -> [{to_bin(K), to_bin(V)} | Acc] end, [], TagsRaw),
|
||||
Opts = [{tags, Tags}, {host, Host}, {port, Port}, {prefix, <<"emqx">>}],
|
||||
{ok, Pid} = estatsd:start_link(Opts),
|
||||
{ok,
|
||||
ensure_timer(#{
|
||||
sample_time_interval => SampleTimeInterval,
|
||||
flush_time_interval => FlushTimeInterval1,
|
||||
estatsd_pid => Pid
|
||||
})}.
|
||||
|
||||
handle_call(_Req, _From, State) ->
|
||||
{reply, ignore, State}.
|
||||
|
||||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(
|
||||
{timeout, Ref, ?SAMPLE_TIMEOUT},
|
||||
State = #{
|
||||
sample_time_interval := SampleTimeInterval,
|
||||
flush_time_interval := FlushTimeInterval,
|
||||
estatsd_pid := Pid,
|
||||
timer := Ref
|
||||
}
|
||||
) ->
|
||||
Metrics = emqx_metrics:all() ++ emqx_stats:getstats() ++ emqx_mgmt:vm_stats(),
|
||||
SampleRate = SampleTimeInterval / FlushTimeInterval,
|
||||
StatsdMetrics = [
|
||||
{gauge, Name, Value, SampleRate, []}
|
||||
|| {Name, Value} <- Metrics
|
||||
],
|
||||
ok = estatsd:submit(Pid, StatsdMetrics),
|
||||
{noreply, ensure_timer(State), hibernate};
|
||||
handle_info({'EXIT', Pid, Error}, State = #{estatsd_pid := Pid}) ->
|
||||
{stop, {shutdown, Error}, State};
|
||||
handle_info(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
terminate(_Reason, #{estatsd_pid := Pid}) ->
|
||||
estatsd:stop(Pid),
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal function
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
flush_interval(FlushInterval, SampleInterval) when FlushInterval >= SampleInterval ->
|
||||
FlushInterval;
|
||||
flush_interval(_FlushInterval, SampleInterval) ->
|
||||
?SLOG(
|
||||
warning,
|
||||
#{
|
||||
msg =>
|
||||
"Configured flush_time_interval is lower than sample_time_interval, "
|
||||
"setting: flush_time_interval = sample_time_interval."
|
||||
}
|
||||
),
|
||||
SampleInterval.
|
||||
|
||||
ensure_timer(State = #{sample_time_interval := SampleTimeInterval}) ->
|
||||
State#{timer => emqx_utils:start_timer(SampleTimeInterval, ?SAMPLE_TIMEOUT)}.
|
||||
|
||||
check_multicall_result({Results, []}) ->
|
||||
case
|
||||
lists:all(
|
||||
fun
|
||||
(ok) -> true;
|
||||
(_) -> false
|
||||
end,
|
||||
Results
|
||||
)
|
||||
of
|
||||
true -> ok;
|
||||
false -> error({bad_result, Results})
|
||||
end;
|
||||
check_multicall_result({_, _}) ->
|
||||
error(multicall_failed).
|
||||
|
||||
to_bin(B) when is_binary(B) -> B;
|
||||
to_bin(I) when is_integer(I) -> integer_to_binary(I);
|
||||
to_bin(L) when is_list(L) -> list_to_binary(L);
|
||||
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).
|
|
@ -1,97 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_statsd_api).
|
||||
|
||||
-behaviour(minirest_api).
|
||||
|
||||
-include("emqx_statsd.hrl").
|
||||
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
|
||||
-import(hoconsc, [mk/2, ref/2]).
|
||||
|
||||
-export([statsd/2]).
|
||||
|
||||
-export([
|
||||
api_spec/0,
|
||||
paths/0,
|
||||
schema/1
|
||||
]).
|
||||
|
||||
-define(API_TAG_STATSD, [<<"Monitor">>]).
|
||||
-define(SCHEMA_MODULE, emqx_statsd_schema).
|
||||
|
||||
-define(INTERNAL_ERROR, 'INTERNAL_ERROR').
|
||||
|
||||
api_spec() ->
|
||||
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
|
||||
|
||||
paths() ->
|
||||
["/statsd"].
|
||||
|
||||
schema("/statsd") ->
|
||||
#{
|
||||
'operationId' => statsd,
|
||||
get =>
|
||||
#{
|
||||
deprecated => true,
|
||||
description => ?DESC(get_statsd_config_api),
|
||||
tags => ?API_TAG_STATSD,
|
||||
responses =>
|
||||
#{200 => statsd_config_schema()}
|
||||
},
|
||||
put =>
|
||||
#{
|
||||
deprecated => true,
|
||||
description => ?DESC(update_statsd_config_api),
|
||||
tags => ?API_TAG_STATSD,
|
||||
'requestBody' => statsd_config_schema(),
|
||||
responses =>
|
||||
#{200 => statsd_config_schema()}
|
||||
}
|
||||
}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helper funcs
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
statsd_config_schema() ->
|
||||
emqx_dashboard_swagger:schema_with_example(
|
||||
ref(?SCHEMA_MODULE, "statsd"),
|
||||
statsd_example()
|
||||
).
|
||||
|
||||
statsd_example() ->
|
||||
#{
|
||||
enable => true,
|
||||
flush_time_interval => <<"30s">>,
|
||||
sample_time_interval => <<"30s">>,
|
||||
server => <<"127.0.0.1:8125">>,
|
||||
tags => #{}
|
||||
}.
|
||||
|
||||
statsd(get, _Params) ->
|
||||
{200, emqx:get_raw_config([<<"statsd">>], #{})};
|
||||
statsd(put, #{body := Body}) ->
|
||||
case emqx_statsd_config:update(Body) of
|
||||
{ok, NewConfig} ->
|
||||
{200, NewConfig};
|
||||
{error, Reason} ->
|
||||
Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),
|
||||
{500, ?INTERNAL_ERROR, Message}
|
||||
end.
|
|
@ -1,34 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_statsd_app).
|
||||
|
||||
-behaviour(application).
|
||||
|
||||
-include("emqx_statsd.hrl").
|
||||
|
||||
-export([
|
||||
start/2,
|
||||
stop/1
|
||||
]).
|
||||
|
||||
start(_StartType, _StartArgs) ->
|
||||
{ok, Sup} = emqx_statsd_sup:start_link(),
|
||||
emqx_statsd_config:add_handler(),
|
||||
{ok, Sup}.
|
||||
stop(_) ->
|
||||
emqx_statsd_config:remove_handler(),
|
||||
ok.
|
|
@ -1,54 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_statsd_config).
|
||||
|
||||
-behaviour(emqx_config_handler).
|
||||
|
||||
-include("emqx_statsd.hrl").
|
||||
|
||||
-export([add_handler/0, remove_handler/0]).
|
||||
-export([post_config_update/5]).
|
||||
-export([update/1]).
|
||||
|
||||
update(Config) ->
|
||||
case
|
||||
emqx_conf:update(
|
||||
?STATSD,
|
||||
Config,
|
||||
#{rawconf_with_defaults => true, override_to => cluster}
|
||||
)
|
||||
of
|
||||
{ok, #{raw_config := NewConfigRows}} ->
|
||||
{ok, NewConfigRows};
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
add_handler() ->
|
||||
ok = emqx_config_handler:add_handler(?STATSD, ?MODULE),
|
||||
ok.
|
||||
|
||||
remove_handler() ->
|
||||
ok = emqx_config_handler:remove_handler(?STATSD),
|
||||
ok.
|
||||
|
||||
post_config_update(?STATSD, _Req, #{enable := true} = New, _Old, _AppEnvs) ->
|
||||
emqx_statsd_sup:ensure_child_stopped(?APP),
|
||||
emqx_statsd_sup:ensure_child_started(?APP, New);
|
||||
post_config_update(?STATSD, _Req, #{enable := false}, _Old, _AppEnvs) ->
|
||||
emqx_statsd_sup:ensure_child_stopped(?APP);
|
||||
post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) ->
|
||||
ok.
|
|
@ -1,96 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_statsd_schema).
|
||||
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include("emqx_statsd.hrl").
|
||||
|
||||
-behaviour(hocon_schema).
|
||||
|
||||
-export([
|
||||
namespace/0,
|
||||
roots/0,
|
||||
fields/1,
|
||||
desc/1,
|
||||
validations/0
|
||||
]).
|
||||
|
||||
namespace() -> "statsd".
|
||||
|
||||
roots() ->
|
||||
[{"statsd", hoconsc:mk(hoconsc:ref(?MODULE, "statsd"), #{importance => ?IMPORTANCE_HIDDEN})}].
|
||||
|
||||
fields("statsd") ->
|
||||
[
|
||||
{enable,
|
||||
hoconsc:mk(
|
||||
boolean(),
|
||||
#{
|
||||
default => false,
|
||||
desc => ?DESC(enable)
|
||||
}
|
||||
)},
|
||||
{server, server()},
|
||||
{sample_time_interval, fun sample_interval/1},
|
||||
{flush_time_interval, fun flush_interval/1},
|
||||
{tags, fun tags/1}
|
||||
].
|
||||
|
||||
desc("statsd") -> ?DESC(statsd);
|
||||
desc(_) -> undefined.
|
||||
|
||||
server() ->
|
||||
Meta = #{
|
||||
default => <<"127.0.0.1:8125">>,
|
||||
desc => ?DESC(?FUNCTION_NAME)
|
||||
},
|
||||
emqx_schema:servers_sc(Meta, ?SERVER_PARSE_OPTS).
|
||||
|
||||
sample_interval(type) -> emqx_schema:duration_ms();
|
||||
sample_interval(default) -> <<"30s">>;
|
||||
sample_interval(desc) -> ?DESC(?FUNCTION_NAME);
|
||||
sample_interval(_) -> undefined.
|
||||
|
||||
flush_interval(type) -> emqx_schema:duration_ms();
|
||||
flush_interval(default) -> <<"30s">>;
|
||||
flush_interval(desc) -> ?DESC(?FUNCTION_NAME);
|
||||
flush_interval(_) -> undefined.
|
||||
|
||||
tags(type) -> map();
|
||||
tags(default) -> #{};
|
||||
tags(desc) -> ?DESC(?FUNCTION_NAME);
|
||||
tags(_) -> undefined.
|
||||
|
||||
validations() ->
|
||||
[
|
||||
{check_interval, fun check_interval/1}
|
||||
].
|
||||
|
||||
check_interval(Conf) ->
|
||||
case hocon_maps:get("statsd.sample_time_interval", Conf) of
|
||||
undefined ->
|
||||
ok;
|
||||
Sample ->
|
||||
Flush = hocon_maps:get("statsd.flush_time_interval", Conf),
|
||||
case Sample =< Flush of
|
||||
true ->
|
||||
true;
|
||||
false ->
|
||||
{bad_interval, #{sample_time_interval => Sample, flush_time_interval => Flush}}
|
||||
end
|
||||
end.
|
|
@ -1,81 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
%%%-------------------------------------------------------------------
|
||||
%% @doc emqx_statsd top level supervisor.
|
||||
%% @end
|
||||
%%%-------------------------------------------------------------------
|
||||
|
||||
-module(emqx_statsd_sup).
|
||||
|
||||
-behaviour(supervisor).
|
||||
|
||||
-export([
|
||||
start_link/0,
|
||||
ensure_child_started/1,
|
||||
ensure_child_started/2,
|
||||
ensure_child_stopped/1
|
||||
]).
|
||||
|
||||
-export([init/1]).
|
||||
|
||||
%% Helper macro for declaring children of supervisor
|
||||
-define(CHILD(Mod, Opts), #{
|
||||
id => Mod,
|
||||
start => {Mod, start_link, Opts},
|
||||
restart => permanent,
|
||||
shutdown => 5000,
|
||||
type => worker,
|
||||
modules => [Mod]
|
||||
}).
|
||||
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
-spec ensure_child_started(atom()) -> ok.
|
||||
ensure_child_started(Mod) when is_atom(Mod) ->
|
||||
ensure_child_started(Mod, emqx_conf:get([statsd], #{})).
|
||||
|
||||
-spec ensure_child_started(atom(), map()) -> ok.
|
||||
ensure_child_started(Mod, Conf) when is_atom(Mod) ->
|
||||
assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, [Conf]))).
|
||||
|
||||
%% @doc Stop the child worker process.
|
||||
-spec ensure_child_stopped(any()) -> ok.
|
||||
ensure_child_stopped(ChildId) ->
|
||||
case supervisor:terminate_child(?MODULE, ChildId) of
|
||||
ok ->
|
||||
%% with terminate_child/2 returned 'ok', it's not possible
|
||||
%% for supervisor:delete_child/2 to return {error, Reason}
|
||||
ok = supervisor:delete_child(?MODULE, ChildId);
|
||||
{error, not_found} ->
|
||||
ok
|
||||
end.
|
||||
|
||||
init([]) ->
|
||||
Children =
|
||||
case emqx_conf:get([statsd], #{}) of
|
||||
#{enable := true} = Conf -> [?CHILD(emqx_statsd, [Conf])];
|
||||
_ -> []
|
||||
end,
|
||||
{ok, {{one_for_one, 100, 3600}, Children}}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
assert_started({ok, _Pid}) -> ok;
|
||||
assert_started({error, {already_started, _Pid}}) -> ok;
|
||||
assert_started({error, Reason}) -> erlang:error(Reason).
|
|
@ -1,44 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_statsd_proto_v1).
|
||||
|
||||
-behaviour(emqx_bpapi).
|
||||
|
||||
-export([
|
||||
introduced_in/0,
|
||||
|
||||
start/1,
|
||||
stop/1,
|
||||
restart/1
|
||||
]).
|
||||
|
||||
-include_lib("emqx/include/bpapi.hrl").
|
||||
|
||||
introduced_in() ->
|
||||
"5.0.0".
|
||||
|
||||
-spec start([node()]) -> emqx_rpc:multicall_result().
|
||||
start(Nodes) ->
|
||||
rpc:multicall(Nodes, emqx_statsd, do_start, [], 5000).
|
||||
|
||||
-spec stop([node()]) -> emqx_rpc:multicall_result().
|
||||
stop(Nodes) ->
|
||||
rpc:multicall(Nodes, emqx_statsd, do_stop, [], 5000).
|
||||
|
||||
-spec restart([node()]) -> emqx_rpc:multicall_result().
|
||||
restart(Nodes) ->
|
||||
rpc:multicall(Nodes, emqx_statsd, do_restart, [], 5000).
|
|
@ -1,204 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_statsd_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-import(emqx_dashboard_api_test_helpers, [request/3, uri/1]).
|
||||
|
||||
-define(BASE_CONF, <<
|
||||
"\n"
|
||||
"statsd {\n"
|
||||
"enable = true\n"
|
||||
"flush_time_interval = 4s\n"
|
||||
"sample_time_interval = 4s\n"
|
||||
"server = \"127.0.0.1:8126\"\n"
|
||||
"tags {\"t1\" = \"good\", test = 100}\n"
|
||||
"}\n"
|
||||
>>).
|
||||
-define(BAD_CONF, <<
|
||||
"\n"
|
||||
"statsd {\n"
|
||||
"enable = true\n"
|
||||
"flush_time_interval = 4s\n"
|
||||
"sample_time_interval = 4s\n"
|
||||
"server = \"\"\n"
|
||||
"tags {\"t1\" = \"good\", test = 100}\n"
|
||||
"}\n"
|
||||
>>).
|
||||
|
||||
-define(DEFAULT_CONF, <<
|
||||
"\n"
|
||||
"statsd {\n"
|
||||
"enable = true\n"
|
||||
"flush_time_interval = 4s\n"
|
||||
"sample_time_interval = 4s\n"
|
||||
"tags {\"t1\" = \"good\", test = 100}\n"
|
||||
"}\n"
|
||||
>>).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_common_test_helpers:start_apps(
|
||||
[emqx_conf, emqx_dashboard, emqx_statsd],
|
||||
fun set_special_configs/1
|
||||
),
|
||||
ok = emqx_common_test_helpers:load_config(emqx_statsd_schema, ?BASE_CONF),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_common_test_helpers:stop_apps([emqx_statsd, emqx_dashboard, emqx_conf]).
|
||||
|
||||
set_special_configs(emqx_dashboard) ->
|
||||
emqx_dashboard_api_test_helpers:set_default_config();
|
||||
set_special_configs(_) ->
|
||||
ok.
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_server_validator(_) ->
|
||||
Server0 = emqx_conf:get_raw([statsd, server]),
|
||||
?assertThrow(
|
||||
#{
|
||||
kind := validation_error,
|
||||
path := "statsd.server",
|
||||
reason := "cannot_be_empty",
|
||||
value := ""
|
||||
},
|
||||
emqx_common_test_helpers:load_config(emqx_statsd_schema, ?BAD_CONF)
|
||||
),
|
||||
%% default
|
||||
ok = emqx_common_test_helpers:load_config(emqx_statsd_schema, ?DEFAULT_CONF),
|
||||
DefaultServer = default_server(),
|
||||
?assertEqual(DefaultServer, emqx_conf:get_raw([statsd, server])),
|
||||
DefaultServerStr = binary_to_list(DefaultServer),
|
||||
?assertEqual(DefaultServerStr, emqx_conf:get([statsd, server])),
|
||||
%% recover
|
||||
ok = emqx_common_test_helpers:load_config(emqx_statsd_schema, ?BASE_CONF),
|
||||
Server2 = emqx_conf:get_raw([statsd, server]),
|
||||
?assertMatch(Server0, Server2),
|
||||
ok.
|
||||
|
||||
t_statsd(_) ->
|
||||
{ok, Socket} = gen_udp:open(8126, [{active, true}]),
|
||||
receive
|
||||
{udp, Socket1, Host, Port, Data} ->
|
||||
ct:pal("receive:~p~n", [{Socket, Socket1, Host, Port}]),
|
||||
?assert(length(Data) > 50),
|
||||
?assert(nomatch =/= string:find(Data, "\nemqx.cpu_use:"))
|
||||
after 10 * 1000 ->
|
||||
error(timeout)
|
||||
end,
|
||||
gen_udp:close(Socket).
|
||||
|
||||
t_management(_) ->
|
||||
?assertMatch(ok, emqx_statsd:start()),
|
||||
?assertMatch(ok, emqx_statsd:start()),
|
||||
?assertMatch(ok, emqx_statsd:stop()),
|
||||
?assertMatch(ok, emqx_statsd:stop()),
|
||||
?assertMatch(ok, emqx_statsd:restart()).
|
||||
|
||||
t_rest_http(_) ->
|
||||
{ok, Res0} = request(get),
|
||||
?assertEqual(
|
||||
#{
|
||||
<<"enable">> => true,
|
||||
<<"flush_time_interval">> => <<"4s">>,
|
||||
<<"sample_time_interval">> => <<"4s">>,
|
||||
<<"server">> => <<"127.0.0.1:8126">>,
|
||||
<<"tags">> => #{<<"t1">> => <<"good">>, <<"test">> => 100}
|
||||
},
|
||||
Res0
|
||||
),
|
||||
{ok, Res1} = request(put, #{enable => false}),
|
||||
?assertMatch(#{<<"enable">> := false}, Res1),
|
||||
?assertEqual(maps:remove(<<"enable">>, Res0), maps:remove(<<"enable">>, Res1)),
|
||||
{ok, Res2} = request(get),
|
||||
?assertEqual(Res1, Res2),
|
||||
?assertEqual(
|
||||
error, request(put, #{sample_time_interval => "11s", flush_time_interval => "10s"})
|
||||
),
|
||||
{ok, _} = request(put, #{enable => true}),
|
||||
ok.
|
||||
|
||||
t_kill_exit(_) ->
|
||||
{ok, _} = request(put, #{enable => true}),
|
||||
Pid = erlang:whereis(emqx_statsd),
|
||||
?assertEqual(ignore, gen_server:call(Pid, whatever)),
|
||||
?assertEqual(ok, gen_server:cast(Pid, whatever)),
|
||||
?assertEqual(Pid, erlang:whereis(emqx_statsd)),
|
||||
#{estatsd_pid := Estatsd} = sys:get_state(emqx_statsd),
|
||||
?assert(erlang:exit(Estatsd, kill)),
|
||||
?assertEqual(false, is_process_alive(Estatsd)),
|
||||
ct:sleep(150),
|
||||
Pid1 = erlang:whereis(emqx_statsd),
|
||||
?assertNotEqual(Pid, Pid1),
|
||||
#{estatsd_pid := Estatsd1} = sys:get_state(emqx_statsd),
|
||||
?assertNotEqual(Estatsd, Estatsd1),
|
||||
ok.
|
||||
|
||||
t_config_update(_) ->
|
||||
OldRawConf = emqx_conf:get_raw([statsd]),
|
||||
{ok, _} = emqx_statsd_config:update(OldRawConf#{<<"enable">> => true}),
|
||||
CommonKeys = [flush_time_interval, sample_time_interval],
|
||||
OldConf = emqx_conf:get([statsd]),
|
||||
OldStatsDState = sys:get_state(emqx_statsd),
|
||||
OldPid = erlang:whereis(emqx_statsd),
|
||||
?assertEqual(maps:with(CommonKeys, OldConf), maps:with(CommonKeys, OldStatsDState)),
|
||||
NewRawConfExpect = OldRawConf#{
|
||||
<<"flush_time_interval">> := <<"42s">>,
|
||||
<<"sample_time_interval">> := <<"42s">>
|
||||
},
|
||||
try
|
||||
{ok, _} = emqx_statsd_config:update(NewRawConfExpect),
|
||||
NewRawConf = emqx_conf:get_raw([statsd]),
|
||||
NewConf = emqx_conf:get([statsd]),
|
||||
NewStatsDState = sys:get_state(emqx_statsd),
|
||||
NewPid = erlang:whereis(emqx_statsd),
|
||||
?assertNotEqual(OldRawConf, NewRawConf),
|
||||
?assertEqual(NewRawConfExpect, NewRawConf),
|
||||
?assertEqual(maps:with(CommonKeys, NewConf), maps:with(CommonKeys, NewStatsDState)),
|
||||
?assertNotEqual(OldPid, NewPid)
|
||||
after
|
||||
{ok, _} = emqx_statsd_config:update(OldRawConf)
|
||||
end,
|
||||
%% bad server url
|
||||
BadRawConf = OldRawConf#{<<"server">> := <<"">>},
|
||||
{error, #{
|
||||
kind := validation_error,
|
||||
path := "statsd.server",
|
||||
reason := "cannot_be_empty",
|
||||
value := ""
|
||||
}} = emqx_statsd_config:update(BadRawConf),
|
||||
ok.
|
||||
|
||||
request(Method) -> request(Method, []).
|
||||
|
||||
request(Method, Body) ->
|
||||
case request(Method, uri(["statsd"]), Body) of
|
||||
{ok, 200, Res} ->
|
||||
{ok, emqx_utils_json:decode(Res, [return_maps])};
|
||||
{ok, _Status, _} ->
|
||||
error
|
||||
end.
|
||||
|
||||
default_server() ->
|
||||
{server, Schema} = lists:keyfind(server, 1, emqx_statsd_schema:fields("statsd")),
|
||||
hocon_schema:field_schema(Schema, default).
|
|
@ -0,0 +1,2 @@
|
|||
Deleted emqx_statsd application.
|
||||
|
2
mix.exs
2
mix.exs
|
@ -303,7 +303,6 @@ defmodule EMQXUmbrella.MixProject do
|
|||
:emqx_bridge,
|
||||
:emqx_modules,
|
||||
:emqx_management,
|
||||
:emqx_statsd,
|
||||
:emqx_retainer,
|
||||
:emqx_prometheus,
|
||||
:emqx_auto_subscribe,
|
||||
|
@ -369,7 +368,6 @@ defmodule EMQXUmbrella.MixProject do
|
|||
emqx_management: :permanent,
|
||||
emqx_dashboard: :permanent,
|
||||
emqx_retainer: :permanent,
|
||||
emqx_statsd: :permanent,
|
||||
emqx_prometheus: :permanent,
|
||||
emqx_psk: :permanent,
|
||||
emqx_slow_subs: :permanent,
|
||||
|
|
|
@ -431,7 +431,6 @@ relx_apps(ReleaseType, Edition) ->
|
|||
emqx_management,
|
||||
emqx_dashboard,
|
||||
emqx_retainer,
|
||||
emqx_statsd,
|
||||
emqx_prometheus,
|
||||
emqx_psk,
|
||||
emqx_slow_subs,
|
||||
|
|
|
@ -1,9 +0,0 @@
|
|||
emqx_statsd_api {
|
||||
|
||||
get_statsd_config_api.desc:
|
||||
"""List the configuration of StatsD metrics collection and push service."""
|
||||
|
||||
update_statsd_config_api.desc:
|
||||
"""Update the configuration of StatsD metrics collection and push service."""
|
||||
|
||||
}
|
|
@ -1,30 +0,0 @@
|
|||
emqx_statsd_schema {
|
||||
|
||||
enable.desc:
|
||||
"""Enable or disable StatsD metrics collection and push service."""
|
||||
|
||||
flush_interval.desc:
|
||||
"""The push interval for metrics."""
|
||||
|
||||
get_statsd_config_api.desc:
|
||||
"""List the configuration of StatsD metrics collection and push service."""
|
||||
|
||||
sample_interval.desc:
|
||||
"""The sampling interval for metrics."""
|
||||
|
||||
server.desc:
|
||||
"""StatsD server address."""
|
||||
|
||||
statsd.desc:
|
||||
"""StatsD metrics collection and push configuration."""
|
||||
|
||||
statsd.label:
|
||||
"""StatsD"""
|
||||
|
||||
tags.desc:
|
||||
"""The tags for metrics."""
|
||||
|
||||
update_statsd_config_api.desc:
|
||||
"""Update the configuration of StatsD metrics collection and push service."""
|
||||
|
||||
}
|
|
@ -1,9 +0,0 @@
|
|||
emqx_statsd_api {
|
||||
|
||||
get_statsd_config_api.desc:
|
||||
"""列出 StatsD 指标采集和推送服务的的配置。"""
|
||||
|
||||
update_statsd_config_api.desc:
|
||||
"""更新 StatsD 指标采集和推送服务的配置。"""
|
||||
|
||||
}
|
|
@ -1,30 +0,0 @@
|
|||
emqx_statsd_schema {
|
||||
|
||||
enable.desc:
|
||||
"""启用或禁用 StatsD 指标采集和推送服务。"""
|
||||
|
||||
flush_interval.desc:
|
||||
"""指标的推送间隔。"""
|
||||
|
||||
get_statsd_config_api.desc:
|
||||
"""列出 StatsD 指标采集和推送服务的的配置。"""
|
||||
|
||||
sample_interval.desc:
|
||||
"""指标的采样间隔。"""
|
||||
|
||||
server.desc:
|
||||
"""StatsD 服务器地址。"""
|
||||
|
||||
statsd.desc:
|
||||
"""StatsD 指标采集与推送配置。"""
|
||||
|
||||
statsd.label:
|
||||
"""StatsD"""
|
||||
|
||||
tags.desc:
|
||||
"""指标的标签。"""
|
||||
|
||||
update_statsd_config_api.desc:
|
||||
"""更新 StatsD 指标采集和推送服务的配置。"""
|
||||
|
||||
}
|
|
@ -30,6 +30,10 @@ for app in ${APPS}; do
|
|||
else
|
||||
old_app_version='not_found'
|
||||
fi
|
||||
if [ ! -f "$src_file" ]; then
|
||||
# app is deleted
|
||||
continue
|
||||
fi
|
||||
now_app_version=$(grep -E 'vsn' "$src_file" | grep -oE '"[0-9]+\.[0-9]+\.[0-9]+"' | tr -d '"')
|
||||
|
||||
if [ "$old_app_version" = 'not_found' ]; then
|
||||
|
|
Loading…
Reference in New Issue