refactor: move telemetry to its own app

This commit is contained in:
Zaiming (Stone) Shi 2023-05-03 09:03:21 +02:00
parent c94b639886
commit 4a2e583e3f
21 changed files with 379 additions and 173 deletions

View File

@ -235,14 +235,18 @@ render_and_load_app_config(App, Opts) ->
%% turn throw into error %% turn throw into error
error({Conf, E, St}) error({Conf, E, St})
end. end.
do_render_app_config(App, Schema, ConfigFile, Opts) -> do_render_app_config(App, Schema, ConfigFile, Opts) ->
try
Vars = mustache_vars(App, Opts), Vars = mustache_vars(App, Opts),
RenderedConfigFile = render_config_file(ConfigFile, Vars), RenderedConfigFile = render_config_file(ConfigFile, Vars),
read_schema_configs(Schema, RenderedConfigFile), read_schema_configs(Schema, RenderedConfigFile),
force_set_config_file_paths(App, [RenderedConfigFile]), force_set_config_file_paths(App, [RenderedConfigFile]),
copy_certs(App, RenderedConfigFile), copy_certs(App, RenderedConfigFile),
ok. ok
catch
throw:skip ->
ok
end.
start_app(App, SpecAppConfig, Opts) -> start_app(App, SpecAppConfig, Opts) ->
render_and_load_app_config(App, Opts), render_and_load_app_config(App, Opts),
@ -290,6 +294,7 @@ render_config_file(ConfigFile, Vars0) ->
Temp = Temp =
case file:read_file(ConfigFile) of case file:read_file(ConfigFile) of
{ok, T} -> T; {ok, T} -> T;
{error, enoent} -> throw(skip);
{error, Reason} -> error({failed_to_read_config_template, ConfigFile, Reason}) {error, Reason} -> error({failed_to_read_config_template, ConfigFile, Reason})
end, end,
Vars = [{atom_to_list(N), iolist_to_binary(V)} || {N, V} <- maps:to_list(Vars0)], Vars = [{atom_to_list(N), iolist_to_binary(V)} || {N, V} <- maps:to_list(Vars0)],

View File

@ -53,6 +53,7 @@
emqx_authn_schema, emqx_authn_schema,
emqx_authz_schema, emqx_authz_schema,
emqx_auto_subscribe_schema, emqx_auto_subscribe_schema,
{emqx_telemetry_schema, ce},
emqx_modules_schema, emqx_modules_schema,
emqx_plugins_schema, emqx_plugins_schema,
emqx_dashboard_schema, emqx_dashboard_schema,
@ -109,11 +110,25 @@ roots() ->
] ++ ] ++
emqx_schema:roots(medium) ++ emqx_schema:roots(medium) ++
emqx_schema:roots(low) ++ emqx_schema:roots(low) ++
lists:flatmap(fun roots/1, ?MERGED_CONFIGS). lists:flatmap(fun roots/1, common_apps()).
validations() -> validations() ->
hocon_schema:validations(emqx_schema) ++ hocon_schema:validations(emqx_schema) ++
lists:flatmap(fun hocon_schema:validations/1, ?MERGED_CONFIGS). lists:flatmap(fun hocon_schema:validations/1, common_apps()).
common_apps() ->
Edition = emqx_release:edition(),
lists:filtermap(
fun
({N, E}) ->
case E =:= Edition of
true -> {true, N};
false -> false
end;
(N) when is_atom(N) -> {true, N}
end,
?MERGED_CONFIGS
).
fields("cluster") -> fields("cluster") ->
[ [

View File

@ -149,8 +149,8 @@ basic_reboot_apps() ->
emqx_plugins emqx_plugins
], ],
case emqx_release:edition() of case emqx_release:edition() of
ce -> CE; ce -> CE ++ [emqx_telemetry];
ee -> CE ++ [] ee -> CE
end. end.
sorted_reboot_apps() -> sorted_reboot_apps() ->

View File

@ -14,11 +14,5 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% The destination URL for the telemetry data report
-define(TELEMETRY_URL, "https://telemetry.emqx.io/api/telemetry").
%% Interval for reporting telemetry data, Default: 7d
-define(REPORT_INTERVAL, 604800).
-define(API_TAG_MQTT, [<<"MQTT">>]). -define(API_TAG_MQTT, [<<"MQTT">>]).
-define(API_SCHEMA_MODULE, emqx_modules_schema). -define(API_SCHEMA_MODULE, emqx_modules_schema).

View File

@ -34,7 +34,6 @@ stop(_State) ->
maybe_enable_modules() -> maybe_enable_modules() ->
emqx_conf:get([delayed, enable], true) andalso emqx_delayed:load(), emqx_conf:get([delayed, enable], true) andalso emqx_delayed:load(),
emqx_modules_conf:is_telemetry_enabled() andalso emqx_telemetry:enable(),
emqx_observer_cli:enable(), emqx_observer_cli:enable(),
emqx_conf_cli:load(), emqx_conf_cli:load(),
ok = emqx_rewrite:enable(), ok = emqx_rewrite:enable(),
@ -43,7 +42,6 @@ maybe_enable_modules() ->
maybe_disable_modules() -> maybe_disable_modules() ->
emqx_conf:get([delayed, enable], true) andalso emqx_delayed:unload(), emqx_conf:get([delayed, enable], true) andalso emqx_delayed:unload(),
emqx_modules_conf:is_telemetry_enabled() andalso emqx_telemetry:disable(),
emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:disable(), emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:disable(),
emqx_rewrite:disable(), emqx_rewrite:disable(),
emqx_conf_cli:unload(), emqx_conf_cli:unload(),

View File

@ -28,9 +28,7 @@
-export([ -export([
topic_metrics/0, topic_metrics/0,
add_topic_metrics/1, add_topic_metrics/1,
remove_topic_metrics/1, remove_topic_metrics/1
is_telemetry_enabled/0,
set_telemetry_status/1
]). ]).
%% config handlers %% config handlers
@ -45,12 +43,10 @@
-spec load() -> ok. -spec load() -> ok.
load() -> load() ->
emqx_conf:add_handler([topic_metrics], ?MODULE), emqx_conf:add_handler([topic_metrics], ?MODULE).
emqx_conf:add_handler([telemetry], ?MODULE).
-spec unload() -> ok. -spec unload() -> ok.
unload() -> unload() ->
emqx_conf:remove_handler([telemetry]),
emqx_conf:remove_handler([topic_metrics]). emqx_conf:remove_handler([topic_metrics]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -82,18 +78,6 @@ remove_topic_metrics(Topic) ->
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end. end.
-spec is_telemetry_enabled() -> boolean().
is_telemetry_enabled() ->
IsOfficial = emqx_telemetry:official_version(emqx_release:version()),
emqx_conf:get([telemetry, enable], IsOfficial).
-spec set_telemetry_status(boolean()) -> ok | {error, term()}.
set_telemetry_status(Status) ->
case cfg_update([telemetry], set_telemetry_status, Status) of
{ok, _} -> ok;
{error, _} = Error -> Error
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Config Handler %% Config Handler
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -119,9 +103,7 @@ pre_config_update(_, {remove_topic_metrics, Topic0}, RawConf) ->
{ok, RawConf -- [Topic]}; {ok, RawConf -- [Topic]};
_ -> _ ->
{error, not_found} {error, not_found}
end; end.
pre_config_update(_, {set_telemetry_status, Status}, RawConf) ->
{ok, RawConf#{<<"enable">> => Status}}.
-spec post_config_update( -spec post_config_update(
list(atom()), list(atom()),
@ -153,17 +135,6 @@ post_config_update(
case emqx_topic_metrics:deregister(Topic) of case emqx_topic_metrics:deregister(Topic) of
ok -> ok; ok -> ok;
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end;
post_config_update(
_,
{set_telemetry_status, Status},
_NewConfig,
_OldConfig,
_AppEnvs
) ->
case Status of
true -> emqx_telemetry:enable();
false -> emqx_telemetry:disable()
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -33,7 +33,6 @@ namespace() -> modules.
roots() -> roots() ->
[ [
"delayed", "delayed",
"telemetry",
array("rewrite", #{ array("rewrite", #{
desc => "List of topic rewrite rules.", desc => "List of topic rewrite rules.",
importance => ?IMPORTANCE_HIDDEN, importance => ?IMPORTANCE_HIDDEN,
@ -46,8 +45,6 @@ roots() ->
}) })
]. ].
fields("telemetry") ->
[{enable, ?HOCON(boolean(), #{default => true, desc => "Enable telemetry."})}];
fields("delayed") -> fields("delayed") ->
[ [
{enable, ?HOCON(boolean(), #{default => true, desc => ?DESC(enable)})}, {enable, ?HOCON(boolean(), #{default => true, desc => ?DESC(enable)})},
@ -76,8 +73,6 @@ fields("rewrite") ->
fields("topic_metrics") -> fields("topic_metrics") ->
[{topic, ?HOCON(binary(), #{desc => "Collect metrics for the topic."})}]. [{topic, ?HOCON(binary(), #{desc => "Collect metrics for the topic."})}].
desc("telemetry") ->
"Settings for the telemetry module.";
desc("delayed") -> desc("delayed") ->
"Settings for the delayed module."; "Settings for the delayed module.";
desc("rewrite") -> desc("rewrite") ->

View File

@ -41,7 +41,6 @@ start_link() ->
init([]) -> init([]) ->
{ok, {ok,
{{one_for_one, 10, 3600}, [ {{one_for_one, 10, 3600}, [
?CHILD(emqx_telemetry),
?CHILD(emqx_topic_metrics), ?CHILD(emqx_topic_metrics),
?CHILD(emqx_trace), ?CHILD(emqx_trace),
?CHILD(emqx_delayed) ?CHILD(emqx_delayed)

View File

@ -0,0 +1,8 @@
%% -*- mode: erlang -*-
{deps, [
{emqx, {path, "../emqx"}},
{emqx_utils, {path, "../emqx_utils"}},
{emqx_conf, {path, "../emqx_conf"}}
]}.
{project_plugins, [erlfmt]}.

View File

@ -0,0 +1,15 @@
{application, emqx_telemetry, [
{description, "Report telemetry data for EMQX Opensource edition"},
{vsn, "0.1.0"},
{registered, []},
{mod, {emqx_telemetry_app, []}},
{applications, [
kernel,
stdlib
]},
{env, []},
{modules, []},
{licenses, ["Apache-2.0"]},
{links, []}
]}.

View File

@ -18,13 +18,6 @@
-behaviour(gen_server). -behaviour(gen_server).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include("emqx_modules.hrl").
-export([ -export([
start_link/0, start_link/0,
stop/0 stop/0
@ -41,9 +34,10 @@
code_change/3 code_change/3
]). ]).
%% config change hook points
-export([ -export([
enable/0, start_reporting/0,
disable/0 stop_reporting/0
]). ]).
-export([ -export([
@ -52,8 +46,6 @@
get_telemetry/0 get_telemetry/0
]). ]).
-export([official_version/1]).
%% Internal exports (RPC) %% Internal exports (RPC)
-export([ -export([
do_ensure_uuids/0 do_ensure_uuids/0
@ -72,6 +64,15 @@
get_value/3 get_value/3
]). ]).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% The destination URL for the telemetry data report
-define(TELEMETRY_URL, "https://telemetry.emqx.io/api/telemetry").
-define(REPORT_INTERVAL, 604800).
-record(telemetry, { -record(telemetry, {
id :: atom(), id :: atom(),
uuid :: binary() uuid :: binary()
@ -112,11 +113,17 @@ start_link() ->
stop() -> stop() ->
gen_server:stop(?MODULE). gen_server:stop(?MODULE).
enable() -> %% @doc Re-start the reporting timer after disabled.
gen_server:call(?MODULE, enable, 15_000). %% This is an async notification which never fails.
%% This is a no-op in enterprise edition.
start_reporting() ->
gen_server:cast(?MODULE, start_reporting).
disable() -> %% @doc Stop the reporting timer.
gen_server:call(?MODULE, disable). %% This is an async notification which never fails.
%% This is a no-op in enterprise eidtion.
stop_reporting() ->
gen_server:cast(?MODULE, stop_reporting).
get_node_uuid() -> get_node_uuid() ->
gen_server:call(?MODULE, get_node_uuid). gen_server:call(?MODULE, get_node_uuid).
@ -132,7 +139,9 @@ get_telemetry() ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init(_Opts) -> init(_Opts) ->
{ok, undefined, {continue, init}}. process_flag(trap_exit, true),
emqx_telemetry_config:on_server_start(),
{ok, "ignored", {continue, init}}.
handle_continue(init, _) -> handle_continue(init, _) ->
ok = mria:create_table( ok = mria:create_table(
@ -148,21 +157,12 @@ handle_continue(init, _) ->
ok = mria:wait_for_tables([?TELEMETRY]), ok = mria:wait_for_tables([?TELEMETRY]),
State0 = empty_state(), State0 = empty_state(),
{NodeUUID, ClusterUUID} = ensure_uuids(), {NodeUUID, ClusterUUID} = ensure_uuids(),
{noreply, State0#state{node_uuid = NodeUUID, cluster_uuid = ClusterUUID}}; case is_enabled() of
handle_continue(Continue, State) -> true -> ok = start_reporting();
?SLOG(error, #{msg => "unexpected_continue", continue => Continue}), false -> ok
{noreply, State}. end,
{noreply, State0#state{node_uuid = NodeUUID, cluster_uuid = ClusterUUID}}.
handle_call(enable, _From, State) ->
%% Wait a few moments before reporting the first telemetry, as the
%% apps might still be starting up. Also, this avoids hanging
%% `emqx_modules_app' initialization in case the POST request
%% takes a lot of time.
FirstReportTimeoutMS = timer:seconds(10),
{reply, ok, ensure_report_timer(FirstReportTimeoutMS, State)};
handle_call(disable, _From, State = #state{timer = Timer}) ->
emqx_utils:cancel_timer(Timer),
{reply, ok, State#state{timer = undefined}};
handle_call(get_node_uuid, _From, State = #state{node_uuid = UUID}) -> handle_call(get_node_uuid, _From, State = #state{node_uuid = UUID}) ->
{reply, {ok, UUID}, State}; {reply, {ok, UUID}, State};
handle_call(get_cluster_uuid, _From, State = #state{cluster_uuid = UUID}) -> handle_call(get_cluster_uuid, _From, State = #state{cluster_uuid = UUID}) ->
@ -174,6 +174,14 @@ handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}), ?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast(start_reporting, State) ->
%% Wait a few moments before reporting the first telemetry, as the
%% apps might still be starting up.
FirstReportTimeoutMS = timer:seconds(10),
{noreply, ensure_report_timer(FirstReportTimeoutMS, State)};
handle_cast(stop_reporting, State = #state{timer = Timer}) ->
emqx_utils:cancel_timer(Timer),
{noreply, State#state{timer = undefined}};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}. {noreply, State}.
@ -188,7 +196,7 @@ handle_info(Info, State) ->
{noreply, State}. {noreply, State}.
terminate(_Reason, _State) -> terminate(_Reason, _State) ->
ok. emqx_conf:remove_handler([telemetry]).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
@ -197,9 +205,8 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions %% Internal functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
official_version(Version) -> is_enabled() ->
Pt = "^\\d+\\.\\d+(?:\\.\\d+)?(?:(-(?:alpha|beta|rc)\\.[1-9][0-9]*))?$", emqx_telemetry_config:is_enabled().
match =:= re:run(Version, Pt, [{capture, none}]).
ensure_report_timer(State = #state{report_interval = ReportInterval}) -> ensure_report_timer(State = #state{report_interval = ReportInterval}) ->
ensure_report_timer(ReportInterval, State). ensure_report_timer(ReportInterval, State).

View File

@ -220,7 +220,7 @@ status(get, _Params) ->
{200, get_telemetry_status()}; {200, get_telemetry_status()};
status(put, #{body := Body}) -> status(put, #{body := Body}) ->
Enable = maps:get(<<"enable">>, Body), Enable = maps:get(<<"enable">>, Body),
case Enable =:= emqx_modules_conf:is_telemetry_enabled() of case Enable =:= is_enabled() of
true -> true ->
Reason = Reason =
case Enable of case Enable of
@ -241,7 +241,7 @@ status(put, #{body := Body}) ->
end. end.
data(get, _Request) -> data(get, _Request) ->
case emqx_modules_conf:is_telemetry_enabled() of case is_enabled() of
true -> true ->
{200, emqx_utils_json:encode(get_telemetry_data())}; {200, emqx_utils_json:encode(get_telemetry_data())};
false -> false ->
@ -256,11 +256,14 @@ data(get, _Request) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
enable_telemetry(Enable) -> enable_telemetry(Enable) ->
emqx_modules_conf:set_telemetry_status(Enable). emqx_telemetry_config:set_telemetry_status(Enable).
get_telemetry_status() -> get_telemetry_status() ->
#{enable => emqx_modules_conf:is_telemetry_enabled()}. #{enable => is_enabled()}.
get_telemetry_data() -> get_telemetry_data() ->
{ok, TelemetryData} = emqx_telemetry:get_telemetry(), {ok, TelemetryData} = emqx_telemetry:get_telemetry(),
TelemetryData. TelemetryData.
is_enabled() ->
emqx_telemetry_config:is_enabled().

View File

@ -0,0 +1,29 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_telemetry_app).
-behaviour(application).
-export([start/2, stop/1]).
start(_StartType, _StartArgs) ->
emqx_telemetry_sup:start_link().
stop(_State) ->
ok.
%% internal functions

View File

@ -0,0 +1,79 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_telemetry_config).
%% Public API
-export([
set_telemetry_status/1,
is_enabled/0
]).
%% emqx_config_handler callback
-export([
pre_config_update/3,
post_config_update/5
]).
%% internal use
-export([
on_server_start/0,
on_server_stop/0,
is_official_version/1
]).
is_enabled() ->
IsOfficial = ?MODULE:is_official_version(emqx_release:version()),
emqx_conf:get([telemetry, enable], IsOfficial).
on_server_start() ->
emqx_conf:add_handler([telemetry], ?MODULE).
on_server_stop() ->
emqx_conf:remove_handler([telemetry]).
-spec set_telemetry_status(boolean()) -> ok | {error, term()}.
set_telemetry_status(Status) ->
case cfg_update([telemetry], set_telemetry_status, Status) of
{ok, _} -> ok;
{error, _} = Error -> Error
end.
pre_config_update(_, {set_telemetry_status, Status}, RawConf) ->
{ok, RawConf#{<<"enable">> => Status}}.
post_config_update(
_,
{set_telemetry_status, Status},
_NewConfig,
_OldConfig,
_AppEnvs
) ->
case Status of
true -> emqx_telemetry:start_reporting();
false -> emqx_telemetry:stop_reporting()
end.
cfg_update(Path, Action, Params) ->
emqx_conf:update(
Path,
{Action, Params},
#{override_to => cluster}
).
is_official_version(Version) ->
Pt = "^\\d+\\.\\d+(?:\\.\\d+)?(?:(-(?:alpha|beta|rc)\\.[1-9][0-9]*))?$",
match =:= re:run(Version, Pt, [{capture, none}]).

View File

@ -0,0 +1,36 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_telemetry_schema).
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl").
-behaviour(hocon_schema).
-export([
roots/0,
fields/1,
desc/1
]).
roots() -> ["telemetry"].
fields("telemetry") ->
[{enable, ?HOCON(boolean(), #{default => true, desc => "Enable telemetry."})}].
desc(_) ->
undefined.

View File

@ -0,0 +1,45 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_telemetry_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
-define(SERVER, ?MODULE).
-define(CHILD(Mod), #{
id => Mod,
start => {Mod, start_link, []},
restart => transient,
shutdown => 5000,
type => worker,
modules => [Mod]
}).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) ->
SupFlags = #{
strategy => one_for_one,
intensity => 0,
period => 1
},
ChildSpecs = [?CHILD(emqx_telemetry)],
{ok, {SupFlags, ChildSpecs}}.

View File

@ -25,13 +25,21 @@
-import(proplists, [get_value/2]). -import(proplists, [get_value/2]).
-define(BASE_CONF, #{ -define(MODULES_CONF, #{
<<"dealyed">> => <<"true">>, <<"dealyed">> => <<"true">>,
<<"max_delayed_messages">> => <<"0">> <<"max_delayed_messages">> => <<"0">>
}). }).
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
suite() ->
[
{timetrap, {minutes, 1}},
{repeat, 1}
].
apps() -> [emqx_conf, emqx_retainer, emqx_authn, emqx_authz, emqx_modules, emqx_telemetry].
init_per_suite(Config) -> init_per_suite(Config) ->
net_kernel:start(['master@127.0.0.1', longnames]), net_kernel:start(['master@127.0.0.1', longnames]),
ok = meck:new(emqx_authz, [non_strict, passthrough, no_history, no_link]), ok = meck:new(emqx_authz, [non_strict, passthrough, no_history, no_link]),
@ -42,12 +50,9 @@ init_per_suite(Config) ->
emqx_common_test_helpers:deps_path(emqx_authz, "etc/acl.conf") emqx_common_test_helpers:deps_path(emqx_authz, "etc/acl.conf")
end end
), ),
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF), ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
emqx_gateway_test_utils:load_all_gateway_apps(), emqx_gateway_test_utils:load_all_gateway_apps(),
emqx_common_test_helpers:start_apps( start_apps(),
[emqx_conf, emqx_authn, emqx_authz, emqx_modules],
fun set_special_configs/1
),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
@ -61,7 +66,7 @@ end_per_suite(_Config) ->
), ),
mnesia:clear_table(cluster_rpc_commit), mnesia:clear_table(cluster_rpc_commit),
mnesia:clear_table(cluster_rpc_mfa), mnesia:clear_table(cluster_rpc_mfa),
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_authn, emqx_authz, emqx_modules]), stop_apps(),
meck:unload(emqx_authz), meck:unload(emqx_authz),
ok. ok.
@ -100,13 +105,9 @@ init_per_testcase(t_get_telemetry, Config) ->
Config; Config;
init_per_testcase(t_advanced_mqtt_features, Config) -> init_per_testcase(t_advanced_mqtt_features, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
{ok, Retainer} = emqx_retainer:start_link(),
{atomic, ok} = mria:clear_table(emqx_delayed), {atomic, ok} = mria:clear_table(emqx_delayed),
mock_advanced_mqtt_features(), mock_advanced_mqtt_features(),
[ Config;
{retainer, Retainer}
| Config
];
init_per_testcase(t_authn_authz_info, Config) -> init_per_testcase(t_authn_authz_info, Config) ->
mock_httpc(), mock_httpc(),
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
@ -116,13 +117,13 @@ init_per_testcase(t_authn_authz_info, Config) ->
create_authz(postgresql), create_authz(postgresql),
Config; Config;
init_per_testcase(t_enable, Config) -> init_per_testcase(t_enable, Config) ->
ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]), ok = meck:new(emqx_telemetry_config, [non_strict, passthrough, no_history, no_link]),
ok = meck:expect(emqx_telemetry, official_version, fun(_) -> true end), ok = meck:expect(emqx_telemetry_config, is_official_version, fun(_) -> true end),
mock_httpc(), mock_httpc(),
Config; Config;
init_per_testcase(t_send_after_enable, Config) -> init_per_testcase(t_send_after_enable, Config) ->
ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]), ok = meck:new(emqx_telemetry_config, [non_strict, passthrough, no_history, no_link]),
ok = meck:expect(emqx_telemetry, official_version, fun(_) -> true end), ok = meck:expect(emqx_telemetry_config, is_official_version, fun(_) -> true end),
mock_httpc(), mock_httpc(),
Config; Config;
init_per_testcase(t_rule_engine_and_data_bridge_info, Config) -> init_per_testcase(t_rule_engine_and_data_bridge_info, Config) ->
@ -172,12 +173,9 @@ init_per_testcase(t_uuid_restored_from_file, Config) ->
%% clear the UUIDs in the DB %% clear the UUIDs in the DB
{atomic, ok} = mria:clear_table(emqx_telemetry), {atomic, ok} = mria:clear_table(emqx_telemetry),
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_authn, emqx_authz, emqx_modules]), stop_apps(),
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF), ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
emqx_common_test_helpers:start_apps( start_apps(),
[emqx_conf, emqx_authn, emqx_authz, emqx_modules],
fun set_special_configs/1
),
Node = start_slave(n1), Node = start_slave(n1),
[ [
{n1, Node}, {n1, Node},
@ -205,11 +203,9 @@ end_per_testcase(t_get_telemetry, _Config) ->
meck:unload([httpc, emqx_telemetry]), meck:unload([httpc, emqx_telemetry]),
application:stop(emqx_gateway), application:stop(emqx_gateway),
ok; ok;
end_per_testcase(t_advanced_mqtt_features, Config) -> end_per_testcase(t_advanced_mqtt_features, _Config) ->
Retainer = ?config(retainer, Config),
process_flag(trap_exit, true), process_flag(trap_exit, true),
ok = emqx_retainer:clean(), ok = emqx_retainer:clean(),
exit(Retainer, kill),
{ok, _} = emqx_auto_subscribe:update([]), {ok, _} = emqx_auto_subscribe:update([]),
ok = emqx_rewrite:update([]), ok = emqx_rewrite:update([]),
{atomic, ok} = mria:clear_table(emqx_delayed), {atomic, ok} = mria:clear_table(emqx_delayed),
@ -228,9 +224,9 @@ end_per_testcase(t_authn_authz_info, _Config) ->
), ),
ok; ok;
end_per_testcase(t_enable, _Config) -> end_per_testcase(t_enable, _Config) ->
meck:unload([httpc, emqx_telemetry]); meck:unload([httpc, emqx_telemetry_config]);
end_per_testcase(t_send_after_enable, _Config) -> end_per_testcase(t_send_after_enable, _Config) ->
meck:unload([httpc, emqx_telemetry]); meck:unload([httpc, emqx_telemetry_config]);
end_per_testcase(t_rule_engine_and_data_bridge_info, _Config) -> end_per_testcase(t_rule_engine_and_data_bridge_info, _Config) ->
meck:unload(httpc), meck:unload(httpc),
lists:foreach( lists:foreach(
@ -278,10 +274,10 @@ t_node_uuid(_) ->
Parts = binary:split(UUID, <<"-">>, [global, trim]), Parts = binary:split(UUID, <<"-">>, [global, trim]),
?assertEqual(5, length(Parts)), ?assertEqual(5, length(Parts)),
{ok, NodeUUID2} = emqx_telemetry:get_node_uuid(), {ok, NodeUUID2} = emqx_telemetry:get_node_uuid(),
emqx_telemetry:disable(), emqx_telemetry:stop_reporting(),
emqx_telemetry:enable(), emqx_telemetry:start_reporting(),
emqx_modules_conf:set_telemetry_status(false), emqx_telemetry_config:set_telemetry_status(false),
emqx_modules_conf:set_telemetry_status(true), emqx_telemetry_config:set_telemetry_status(true),
{ok, NodeUUID3} = emqx_telemetry:get_node_uuid(), {ok, NodeUUID3} = emqx_telemetry:get_node_uuid(),
{ok, NodeUUID4} = emqx_telemetry_proto_v1:get_node_uuid(node()), {ok, NodeUUID4} = emqx_telemetry_proto_v1:get_node_uuid(node()),
?assertEqual(NodeUUID2, NodeUUID3), ?assertEqual(NodeUUID2, NodeUUID3),
@ -325,12 +321,9 @@ t_uuid_saved_to_file(_Config) ->
%% clear the UUIDs in the DB %% clear the UUIDs in the DB
{atomic, ok} = mria:clear_table(emqx_telemetry), {atomic, ok} = mria:clear_table(emqx_telemetry),
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_authn, emqx_authz, emqx_modules]), stop_apps(),
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF), ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
emqx_common_test_helpers:start_apps( start_apps(),
[emqx_conf, emqx_authn, emqx_authz, emqx_modules],
fun set_special_configs/1
),
{ok, NodeUUID} = emqx_telemetry:get_node_uuid(), {ok, NodeUUID} = emqx_telemetry:get_node_uuid(),
{ok, ClusterUUID} = emqx_telemetry:get_cluster_uuid(), {ok, ClusterUUID} = emqx_telemetry:get_cluster_uuid(),
?assertEqual( ?assertEqual(
@ -343,30 +336,35 @@ t_uuid_saved_to_file(_Config) ->
), ),
ok. ok.
t_official_version(_) -> t_is_official_version(_) ->
true = emqx_telemetry:official_version("0.0.0"), Is = fun(V) -> is_official_version(V) end,
true = emqx_telemetry:official_version("1.1.1"), true = lists:all(Is, [
true = emqx_telemetry:official_version("10.10.10"), "0.0.0",
false = emqx_telemetry:official_version("0.0.0.0"), "1.1.1",
false = emqx_telemetry:official_version("1.1.a"), "10.10.10",
true = emqx_telemetry:official_version("0.0-alpha.1"), "0.0-alpha.1",
true = emqx_telemetry:official_version("1.1-alpha.1"), "1.1-alpha.1",
true = emqx_telemetry:official_version("10.10-alpha.10"), "10.10-alpha.10",
false = emqx_telemetry:official_version("1.1-alpha.0"), "1.1-rc.1",
true = emqx_telemetry:official_version("1.1-beta.1"), "1.1-beta.1",
true = emqx_telemetry:official_version("1.1-rc.1"), "5.0.0",
false = emqx_telemetry:official_version("1.1-alpha.a"), "5.0.0-alpha.1",
true = emqx_telemetry:official_version("5.0.0"), "5.0.0-beta.4",
true = emqx_telemetry:official_version("5.0.0-alpha.1"), "5.0-rc.1",
true = emqx_telemetry:official_version("5.0.0-beta.4"), "5.0.0-rc.1"
true = emqx_telemetry:official_version("5.0-rc.1"), ]),
true = emqx_telemetry:official_version("5.0.0-rc.1"),
false = emqx_telemetry:official_version("5.0.0-alpha.a"), false = lists:any(Is, [
false = emqx_telemetry:official_version("5.0.0-beta.a"), "1.1-alpha.a",
false = emqx_telemetry:official_version("5.0.0-rc.a"), "1.1-alpha.0",
false = emqx_telemetry:official_version("5.0.0-foo"), "0.0.0.0",
false = emqx_telemetry:official_version("5.0.0-rc.1-ccdf7920"), "1.1.a",
ok. "5.0.0-alpha.a",
"5.0.0-beta.a",
"5.0.0-rc.a",
"5.0.0-foo",
"5.0.0-rc.1-ccdf7920"
]).
t_get_telemetry(_Config) -> t_get_telemetry(_Config) ->
{ok, TelemetryData} = emqx_telemetry:get_telemetry(), {ok, TelemetryData} = emqx_telemetry:get_telemetry(),
@ -432,23 +430,25 @@ t_num_clients(_Config) ->
{port, 1883}, {port, 1883},
{clean_start, false} {clean_start, false}
]), ]),
?wait_async_action( {{ok, _}, _} = ?wait_async_action(
{ok, _} = emqtt:connect(Client), {ok, _} = emqtt:connect(Client),
#{ #{
?snk_kind := emqx_stats_setstat, ?snk_kind := emqx_stats_setstat,
count_stat := 'live_connections.count', count_stat := 'live_connections.count',
value := 1 value := 1
} },
2000
), ),
{ok, TelemetryData0} = emqx_telemetry:get_telemetry(), {ok, TelemetryData0} = emqx_telemetry:get_telemetry(),
?assertEqual(1, get_value(num_clients, TelemetryData0)), ?assertEqual(1, get_value(num_clients, TelemetryData0)),
?wait_async_action( {ok, _} = ?wait_async_action(
ok = emqtt:disconnect(Client), ok = emqtt:disconnect(Client),
#{ #{
?snk_kind := emqx_stats_setstat, ?snk_kind := emqx_stats_setstat,
count_stat := 'live_connections.count', count_stat := 'live_connections.count',
value := 0 value := 0
} },
2000
), ),
{ok, TelemetryData1} = emqx_telemetry:get_telemetry(), {ok, TelemetryData1} = emqx_telemetry:get_telemetry(),
?assertEqual(0, get_value(num_clients, TelemetryData1)), ?assertEqual(0, get_value(num_clients, TelemetryData1)),
@ -485,19 +485,19 @@ t_authn_authz_info(_) ->
). ).
t_enable(_) -> t_enable(_) ->
ok = emqx_telemetry:enable(), ok = emqx_telemetry:start_reporting(),
ok = emqx_telemetry:disable(). ok = emqx_telemetry:stop_reporting().
t_send_after_enable(_) -> t_send_after_enable(_) ->
ok = emqx_telemetry:disable(), ok = emqx_telemetry:stop_reporting(),
ok = snabbkaffe:start_trace(), ok = snabbkaffe:start_trace(),
try try
ok = emqx_telemetry:enable(), ok = emqx_telemetry:start_reporting(),
Timeout = 12_000, Timeout = 12_000,
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
?wait_async_action( ?wait_async_action(
ok = emqx_telemetry:enable(), ok = emqx_telemetry:start_reporting(),
#{?snk_kind := telemetry_data_reported}, #{?snk_kind := telemetry_data_reported},
Timeout Timeout
) )
@ -818,11 +818,10 @@ start_slave(Name) ->
(emqx) -> (emqx) ->
application:set_env(emqx, boot_modules, []), application:set_env(emqx, boot_modules, []),
ekka:join(TestNode), ekka:join(TestNode),
emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF), emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
ok; ok;
(_App) -> (_App) ->
emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF), emqx_common_test_helpers:load_config(emqx_modules_schema, ?MODULES_CONF),
ok ok
end, end,
Opts = #{ Opts = #{
@ -837,7 +836,7 @@ start_slave(Name) ->
env_handler => Handler, env_handler => Handler,
load_apps => [gen_rpc, emqx], load_apps => [gen_rpc, emqx],
listener_ports => [], listener_ports => [],
apps => [emqx_conf, emqx_modules] apps => [emqx, emqx_conf, emqx_retainer, emqx_modules, emqx_telemetry]
}, },
emqx_common_test_helpers:start_slave(Name, Opts). emqx_common_test_helpers:start_slave(Name, Opts).
@ -861,3 +860,12 @@ leave_cluster() ->
application:set_env(mria, db_backend, mnesia), application:set_env(mria, db_backend, mnesia),
ekka:leave() ekka:leave()
end. end.
is_official_version(V) ->
emqx_telemetry_config:is_official_version(V).
start_apps() ->
emqx_common_test_helpers:start_apps(apps(), fun set_special_configs/1).
stop_apps() ->
emqx_common_test_helpers:stop_apps(lists:reverse(apps())).

View File

@ -31,7 +31,7 @@ all() ->
init_per_suite(Config) -> init_per_suite(Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF), ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF),
ok = emqx_mgmt_api_test_util:init_suite( ok = emqx_mgmt_api_test_util:init_suite(
[emqx_conf, emqx_authn, emqx_authz, emqx_modules], [emqx_conf, emqx_authn, emqx_authz, emqx_telemetry],
fun set_special_configs/1 fun set_special_configs/1
), ),
@ -47,7 +47,7 @@ end_per_suite(_Config) ->
} }
), ),
emqx_mgmt_api_test_util:end_suite([ emqx_mgmt_api_test_util:end_suite([
emqx_conf, emqx_authn, emqx_authz, emqx_modules emqx_conf, emqx_authn, emqx_authz, emqx_telemetry
]), ]),
ok. ok.

View File

@ -477,8 +477,7 @@ relx_apps_per_edition(ee) ->
emqx_ee_schema_registry emqx_ee_schema_registry
]; ];
relx_apps_per_edition(ce) -> relx_apps_per_edition(ce) ->
[]. [emqx_telemetry].
relx_overlay(ReleaseType, Edition) -> relx_overlay(ReleaseType, Edition) ->
[ [
{mkdir, "log/"}, {mkdir, "log/"},