Merge pull request #7495 from savonarola/emqx_rewrite_api-emqx_telemetry_api-cov

chore(emqx_modules): improve emqx_rewrite_api and emqx_telemetry_api coverage
This commit is contained in:
Ilya Averyanov 2022-04-04 21:18:49 +03:00 committed by GitHub
commit c798374b36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 384 additions and 146 deletions

View File

@ -1,5 +1,7 @@
%% -*- mode: erlang -*-
{deps, [{emqx, {path, "../emqx"}}]}.
{deps, [
{emqx, {path, "../emqx"}},
{emqx_conf, {path, "../emqx_conf"}}
]}.
{project_plugins, [erlfmt]}.

View File

@ -46,7 +46,7 @@ maybe_enable_modules() ->
}
),
DelayedEnabled andalso emqx_delayed:enable(),
emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:enable(),
emqx_modules_conf:telemetry_status() andalso emqx_telemetry:enable(),
emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:enable(),
emqx_conf_cli:load(),
ok = emqx_rewrite:enable(),
@ -55,7 +55,7 @@ maybe_enable_modules() ->
maybe_disable_modules() ->
emqx_conf:get([delayed, enable], true) andalso emqx_delayed:disable(),
emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:disable(),
emqx_modules_conf:telemetry_status() andalso emqx_telemetry:disable(),
emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:disable(),
emqx_rewrite:disable(),
emqx_conf_cli:unload(),

View File

@ -25,11 +25,12 @@
unload/0
]).
%% topci-metrics
-export([
topic_metrics/0,
add_topic_metrics/1,
remove_topic_metrics/1
remove_topic_metrics/1,
telemetry_status/0,
set_telemetry_status/1
]).
%% config handlers
@ -40,17 +41,21 @@
%%--------------------------------------------------------------------
%% Load/Unload
%%--------------------------------------------------------------------
-spec load() -> ok.
load() ->
emqx_conf:add_handler([topic_metrics], ?MODULE).
emqx_conf:add_handler([topic_metrics], ?MODULE),
emqx_conf:add_handler([telemetry], ?MODULE).
-spec unload() -> ok.
unload() ->
emqx_conf:remove_handler([telemetry]),
emqx_conf:remove_handler([topic_metrics]).
%%--------------------------------------------------------------------
%% Topic-Metrics
%%--------------------------------------------------------------------
-spec topic_metrics() -> [emqx_types:topic()].
topic_metrics() ->
@ -63,7 +68,7 @@ topic_metrics() ->
{ok, emqx_types:topic()}
| {error, term()}.
add_topic_metrics(Topic) ->
case cfg_update(topic_metrics, ?FUNCTION_NAME, Topic) of
case cfg_update([topic_metrics], ?FUNCTION_NAME, Topic) of
{ok, _} -> {ok, Topic};
{error, Reason} -> {error, Reason}
end.
@ -72,24 +77,21 @@ add_topic_metrics(Topic) ->
ok
| {error, term()}.
remove_topic_metrics(Topic) ->
case cfg_update(topic_metrics, ?FUNCTION_NAME, Topic) of
case cfg_update([topic_metrics], ?FUNCTION_NAME, Topic) of
{ok, _} -> ok;
{error, Reason} -> {error, Reason}
end.
cfg_update(topic_metrics, Action, Params) ->
res(
emqx_conf:update(
[topic_metrics],
{Action, Params},
#{override_to => cluster}
)
).
-spec telemetry_status() -> boolean().
telemetry_status() ->
emqx:get_config([telemetry, enable], true).
res({ok, Result}) -> {ok, Result};
res({error, {pre_config_update, ?MODULE, Reason}}) -> {error, Reason};
res({error, {post_config_update, ?MODULE, Reason}}) -> {error, Reason};
res({error, Reason}) -> {error, Reason}.
-spec set_telemetry_status(boolean()) -> ok | {error, term()}.
set_telemetry_status(Status) ->
case cfg_update([telemetry], set_telemetry_status, Status) of
{ok, _} -> ok;
{error, _} = Error -> Error
end.
%%--------------------------------------------------------------------
%% Config Handler
@ -116,7 +118,9 @@ pre_config_update(_, {remove_topic_metrics, Topic0}, RawConf) ->
{ok, RawConf -- [Topic]};
_ ->
{error, not_found}
end.
end;
pre_config_update(_, {set_telemetry_status, Status}, RawConf) ->
{ok, RawConf#{<<"enable">> => Status}}.
-spec post_config_update(
list(atom()),
@ -148,4 +152,33 @@ post_config_update(
case emqx_topic_metrics:deregister(Topic) of
ok -> ok;
{error, Reason} -> {error, Reason}
end;
post_config_update(
_,
{set_telemetry_status, Status},
_NewConfig,
_OldConfig,
_AppEnvs
) ->
case Status of
true -> emqx_telemetry:enable();
false -> emqx_telemetry:disable()
end.
%%--------------------------------------------------------------------
%% Private
%%--------------------------------------------------------------------
res({ok, Result}) -> {ok, Result};
res({error, {pre_config_update, ?MODULE, Reason}}) -> {error, Reason};
res({error, {post_config_update, ?MODULE, Reason}}) -> {error, Reason};
res({error, Reason}) -> {error, Reason}.
cfg_update(Path, Action, Params) ->
res(
emqx_conf:update(
Path,
{Action, Params},
#{override_to => cluster}
)
).

View File

@ -21,7 +21,6 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("kernel/include/file.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include("emqx_modules.hrl").
@ -49,8 +48,7 @@
-export([
get_uuid/0,
get_telemetry/0,
get_status/0
get_telemetry/0
]).
-export([official_version/1]).
@ -119,9 +117,6 @@ enable() ->
disable() ->
gen_server:call(?MODULE, disable).
get_status() ->
emqx_conf:get([telemetry, enable], true).
get_uuid() ->
gen_server:call(?MODULE, get_uuid).
@ -188,12 +183,10 @@ handle_continue(Continue, State) ->
{noreply, State}.
handle_info({timeout, TRef, time_to_report_telemetry_data}, State0 = #state{timer = TRef}) ->
State =
case get_status() of
true -> report_telemetry(State0);
false -> State0
end,
State = report_telemetry(State0),
{noreply, ensure_report_timer(State)};
handle_info({timeout, _TRef, time_to_report_telemetry_data}, State = #state{timer = undefined}) ->
{noreply, State};
handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}),
{noreply, State}.

View File

@ -22,15 +22,11 @@
-import(hoconsc, [mk/2, ref/1, ref/2, array/1]).
% -export([cli/1]).
-export([
status/2,
data/2
]).
-export([enable_telemetry/2]).
-export([
api_spec/0,
paths/0,
@ -209,11 +205,12 @@ fields(telemetry) ->
%%--------------------------------------------------------------------
%% HTTP API
%%--------------------------------------------------------------------
status(get, _Params) ->
{200, get_telemetry_status()};
status(put, #{body := Body}) ->
Enable = maps:get(<<"enable">>, Body),
case Enable =:= emqx_telemetry:get_status() of
case Enable =:= emqx_modules_conf:telemetry_status() of
true ->
Reason =
case Enable of
@ -222,78 +219,30 @@ status(put, #{body := Body}) ->
end,
{400, #{code => 'BAD_REQUEST', message => Reason}};
false ->
enable_telemetry(Enable),
{200, #{<<"enable">> => emqx_telemetry:get_status()}}
case enable_telemetry(Enable) of
ok ->
{200, get_telemetry_status()};
{error, Reason} ->
{400, #{
code => 'BAD_REQUEST',
message => Reason
}}
end
end.
data(get, _Request) ->
{200, emqx_json:encode(get_telemetry_data())}.
%%--------------------------------------------------------------------
%% CLI
%%--------------------------------------------------------------------
% cli(["enable", Enable0]) ->
% Enable = list_to_atom(Enable0),
% case Enable =:= emqx_telemetry:is_enabled() of
% true ->
% case Enable of
% true -> emqx_ctl:print("Telemetry status is already enabled~n");
% false -> emqx_ctl:print("Telemetry status is already disable~n")
% end;
% false ->
% enable_telemetry(Enable),
% case Enable of
% true -> emqx_ctl:print("Enable telemetry successfully~n");
% false -> emqx_ctl:print("Disable telemetry successfully~n")
% end
% end;
% cli(["get", "status"]) ->
% case get_telemetry_status() of
% [{enabled, true}] ->
% emqx_ctl:print("Telemetry is enabled~n");
% [{enabled, false}] ->
% emqx_ctl:print("Telemetry is disabled~n")
% end;
% cli(["get", "data"]) ->
% TelemetryData = get_telemetry_data(),
% case emqx_json:safe_encode(TelemetryData, [pretty]) of
% {ok, Bin} ->
% emqx_ctl:print("~ts~n", [Bin]);
% {error, _Reason} ->
% emqx_ctl:print("Failed to get telemetry data")
% end;
% cli(_) ->
% emqx_ctl:usage([{"telemetry enable", "Enable telemetry"},
% {"telemetry disable", "Disable telemetry"},
% {"telemetry get data", "Get reported telemetry data"}]).
%%--------------------------------------------------------------------
%% internal function
%% Internal functions
%%--------------------------------------------------------------------
enable_telemetry(Enable) ->
lists:foreach(
fun(Node) ->
enable_telemetry(Node, Enable)
end,
mria_mnesia:running_nodes()
).
enable_telemetry(Node, true) ->
is_ok(emqx_telemetry_proto_v1:enable_telemetry(Node));
enable_telemetry(Node, false) ->
is_ok(emqx_telemetry_proto_v1:disable_telemetry(Node)).
emqx_modules_conf:set_telemetry_status(Enable).
get_telemetry_status() ->
#{enabled => emqx_telemetry:get_status()}.
#{enable => emqx_modules_conf:telemetry_status()}.
get_telemetry_data() ->
{ok, TelemetryData} = emqx_telemetry:get_telemetry(),
TelemetryData.
is_ok(Result) ->
case Result of
{badrpc, Reason} -> {error, Reason};
Result -> Result
end.

View File

@ -20,10 +20,7 @@
-export([
introduced_in/0,
get_uuid/1,
enable_telemetry/1,
disable_telemetry/1
get_uuid/1
]).
-include_lib("emqx/include/bpapi.hrl").
@ -34,11 +31,3 @@ introduced_in() ->
-spec get_uuid(node()) -> {ok, binary()} | emqx_rpc:badrpc().
get_uuid(Node) ->
rpc:call(Node, emqx_telemetry, get_uuid, []).
-spec enable_telemetry(node()) -> _.
enable_telemetry(Node) ->
rpc:call(Node, emqx_telemetry, enable, []).
-spec disable_telemetry(node()) -> _.
disable_telemetry(Node) ->
rpc:call(Node, emqx_telemetry, disable, []).

View File

@ -35,13 +35,11 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
mria:start(),
ok = emqx_delayed:mnesia(boot),
emqx_common_test_helpers:start_apps([emqx_modules]),
emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]),
Config.
end_per_suite(_) ->
emqx_common_test_helpers:stop_apps([emqx_modules]).
emqx_common_test_helpers:stop_apps([emqx_modules, emqx_conf]).
init_per_testcase(t_load_case, Config) ->
Config;

View File

@ -22,31 +22,28 @@
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(REWRITE, <<
""
"\n"
"rewrite: [\n"
" {\n"
" action : publish\n"
" source_topic : \"x/#\"\n"
" re : \"^x/y/(.+)$\"\n"
" dest_topic : \"z/y/$1\"\n"
" },\n"
" {\n"
" action : subscribe\n"
" source_topic : \"y/+/z/#\"\n"
" re : \"^y/(.+)/z/(.+)$\"\n"
" dest_topic : \"y/z/$2\"\n"
" },\n"
" {\n"
" action : all\n"
" source_topic : \"all/+/x/#\"\n"
" re : \"^all/(.+)/x/(.+)$\"\n"
" dest_topic : \"all/x/$2\"\n"
" }\n"
"]"
""
>>).
-define(REWRITE, #{
<<"rewrite">> => [
#{
<<"action">> => <<"publish">>,
<<"dest_topic">> => <<"z/y/$1">>,
<<"re">> => <<"^x/y/(.+)$">>,
<<"source_topic">> => <<"x/#">>
},
#{
<<"action">> => <<"subscribe">>,
<<"dest_topic">> => <<"y/z/$2">>,
<<"re">> => <<"^y/(.+)/z/(.+)$">>,
<<"source_topic">> => <<"y/+/z/#">>
},
#{
<<"action">> => <<"all">>,
<<"dest_topic">> => <<"all/x/$2">>,
<<"re">> => <<"^all/(.+)/x/(.+)$">>,
<<"source_topic">> => <<"all/+/x/#">>
}
]
}).
all() -> emqx_common_test_helpers:all(?MODULE).

View File

@ -0,0 +1,110 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rewrite_api_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-import(emqx_dashboard_api_test_helpers, [request/3, uri/1]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(BASE_CONF, #{}).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
Config.
init_per_suite(Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF),
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_modules, emqx_dashboard],
fun set_special_configs/1
),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_dashboard, emqx_modules]),
ok.
set_special_configs(emqx_dashboard) ->
emqx_dashboard_api_test_helpers:set_default_config();
set_special_configs(_App) ->
ok.
%%------------------------------------------------------------------------------
%% Tests
%%------------------------------------------------------------------------------
t_mqtt_topic_rewrite(_) ->
Rules = [
#{
<<"source_topic">> => <<"test/#">>,
<<"re">> => <<"test/*">>,
<<"dest_topic">> => <<"test1/$2">>,
<<"action">> => <<"publish">>
}
],
?assertMatch(
{ok, 200, _},
request(
put,
uri(["mqtt", "topic_rewrite"]),
Rules
)
),
{ok, 200, Result} =
request(get, uri(["mqtt", "topic_rewrite"])),
?assertEqual(
Rules,
jsx:decode(Result)
).
t_mqtt_topic_rewrite_limit(_) ->
Rule =
#{
<<"source_topic">> => <<"test/#">>,
<<"re">> => <<"test/*">>,
<<"dest_topic">> => <<"test1/$2">>,
<<"action">> => <<"publish">>
},
Rules = [Rule || _ <- lists:seq(1, 21)],
?assertMatch(
{ok, 413, _},
request(
put,
uri(["mqtt", "topic_rewrite"]),
Rules
)
).
%%------------------------------------------------------------------------------
%% Helpers
%%------------------------------------------------------------------------------
request(Method, Url) ->
request(Method, Url, []).

View File

@ -28,7 +28,7 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
snabbkaffe:fix_ct_logging(),
ok = meck:new(emqx_authz, [non_strict, passthrough, no_history, no_link]),
meck:expect(
emqx_authz,
acl_conf_file,
@ -139,8 +139,8 @@ t_uuid(_) ->
{ok, UUID2} = emqx_telemetry:get_uuid(),
emqx_telemetry:disable(),
emqx_telemetry:enable(),
emqx_telemetry_proto_v1:disable_telemetry(node()),
emqx_telemetry_proto_v1:enable_telemetry(node()),
emqx_modules_conf:set_telemetry_status(false),
emqx_modules_conf:set_telemetry_status(true),
{ok, UUID3} = emqx_telemetry:get_uuid(),
{ok, UUID4} = emqx_telemetry_proto_v1:get_uuid(node()),
?assertEqual(UUID2, UUID3),

View File

@ -0,0 +1,168 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_telemetry_api_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-import(emqx_dashboard_api_test_helpers, [request/2, request/3, uri/1]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(BASE_CONF, #{}).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF),
ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_authn, emqx_authz, emqx_modules, emqx_dashboard],
fun set_special_configs/1
),
Config.
end_per_suite(_Config) ->
{ok, _} = emqx:update_config(
[authorization],
#{
<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []
}
),
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_authn, emqx_authz, emqx_modules]),
ok.
init_per_testcase(t_status_fail, Config) ->
meck:new(emqx_telemetry, [non_strict, passthrough]),
meck:expect(emqx_telemetry, official_version, 1, false),
Config;
init_per_testcase(t_status, Config) ->
meck:new(emqx_telemetry, [non_strict, passthrough]),
meck:expect(emqx_telemetry, official_version, 1, true),
Config;
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(t_status_fail, _Config) ->
meck:unload(emqx_telemetry);
end_per_testcase(t_status, _Config) ->
meck:unload(emqx_telemetry);
end_per_testcase(_TestCase, _Config) ->
ok.
set_special_configs(emqx_dashboard) ->
emqx_dashboard_api_test_helpers:set_default_config();
set_special_configs(emqx_authz) ->
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),
{ok, _} = emqx:update_config([authorization, sources], []),
ok;
set_special_configs(_App) ->
ok.
%%------------------------------------------------------------------------------
%% Tests
%%------------------------------------------------------------------------------
t_status(_) ->
?assertMatch(
{ok, 200, _},
request(
put,
uri(["telemetry", "status"]),
#{<<"enable">> => false}
)
),
{ok, 200, Result0} =
request(get, uri(["telemetry", "status"])),
?assertEqual(
#{<<"enable">> => false},
jsx:decode(Result0)
),
?assertMatch(
{ok, 400, _},
request(
put,
uri(["telemetry", "status"]),
#{<<"enable">> => false}
)
),
?assertMatch(
{ok, 200, _},
request(
put,
uri(["telemetry", "status"]),
#{<<"enable">> => true}
)
),
{ok, 200, Result1} =
request(get, uri(["telemetry", "status"])),
?assertEqual(
#{<<"enable">> => true},
jsx:decode(Result1)
),
?assertMatch(
{ok, 400, _},
request(
put,
uri(["telemetry", "status"]),
#{<<"enable">> => true}
)
).
t_status_fail(_) ->
?assertMatch(
{ok, 400, _},
request(
put,
uri(["telemetry", "status"]),
#{<<"enable">> => false}
)
).
t_data(_) ->
{ok, 200, Result} =
request(get, uri(["telemetry", "data"])),
?assertMatch(
#{
<<"active_plugins">> := _,
<<"advanced_mqtt_features">> := _,
<<"build_info">> := _,
<<"emqx_version">> := _,
<<"license">> := _,
<<"messages_received">> := _,
<<"mqtt_runtime_insights">> := _,
<<"nodes_uuid">> := _,
<<"os_name">> := _,
<<"otp_version">> := _,
<<"uuid">> := _,
<<"vm_specs">> := _
},
jsx:decode(Result)
).

View File

@ -28,12 +28,12 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([emqx_modules]),
emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]),
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?TOPIC),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_modules]).
emqx_common_test_helpers:stop_apps([emqx_modules, emqx_conf]).
init_per_testcase(_Case, Config) ->
emqx_topic_metrics:enable(),

View File

@ -31,7 +31,6 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
lists:foreach(
fun emqx_modules_conf:remove_topic_metrics/1,
emqx_modules_conf:topic_metrics()