chore(emqx_modules): add tests and refactor
* improve `emqx_rewrite_api` coverage * improve `emqx_telemetry_api` coverage * refactor `emqx_telemetry` to use config clustering for settings
This commit is contained in:
parent
5080c17ad6
commit
8f0497c441
|
@ -1,5 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
|
|
||||||
{deps, [{emqx, {path, "../emqx"}}]}.
|
{deps, [
|
||||||
|
{emqx, {path, "../emqx"}},
|
||||||
|
{emqx_conf, {path, "../emqx_conf"}}
|
||||||
|
]}.
|
||||||
{project_plugins, [erlfmt]}.
|
{project_plugins, [erlfmt]}.
|
||||||
|
|
|
@ -46,7 +46,7 @@ maybe_enable_modules() ->
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
DelayedEnabled andalso emqx_delayed:enable(),
|
DelayedEnabled andalso emqx_delayed:enable(),
|
||||||
emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:enable(),
|
emqx_modules_conf:telemetry_status() andalso emqx_telemetry:enable(),
|
||||||
emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:enable(),
|
emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:enable(),
|
||||||
emqx_conf_cli:load(),
|
emqx_conf_cli:load(),
|
||||||
ok = emqx_rewrite:enable(),
|
ok = emqx_rewrite:enable(),
|
||||||
|
@ -55,7 +55,7 @@ maybe_enable_modules() ->
|
||||||
|
|
||||||
maybe_disable_modules() ->
|
maybe_disable_modules() ->
|
||||||
emqx_conf:get([delayed, enable], true) andalso emqx_delayed:disable(),
|
emqx_conf:get([delayed, enable], true) andalso emqx_delayed:disable(),
|
||||||
emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:disable(),
|
emqx_modules_conf:telemetry_status() andalso emqx_telemetry:disable(),
|
||||||
emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:disable(),
|
emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:disable(),
|
||||||
emqx_rewrite:disable(),
|
emqx_rewrite:disable(),
|
||||||
emqx_conf_cli:unload(),
|
emqx_conf_cli:unload(),
|
||||||
|
|
|
@ -25,11 +25,12 @@
|
||||||
unload/0
|
unload/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% topci-metrics
|
|
||||||
-export([
|
-export([
|
||||||
topic_metrics/0,
|
topic_metrics/0,
|
||||||
add_topic_metrics/1,
|
add_topic_metrics/1,
|
||||||
remove_topic_metrics/1
|
remove_topic_metrics/1,
|
||||||
|
telemetry_status/0,
|
||||||
|
set_telemetry_status/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% config handlers
|
%% config handlers
|
||||||
|
@ -40,17 +41,21 @@
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Load/Unload
|
%% Load/Unload
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec load() -> ok.
|
-spec load() -> ok.
|
||||||
load() ->
|
load() ->
|
||||||
emqx_conf:add_handler([topic_metrics], ?MODULE).
|
emqx_conf:add_handler([topic_metrics], ?MODULE),
|
||||||
|
emqx_conf:add_handler([telemetry], ?MODULE).
|
||||||
|
|
||||||
-spec unload() -> ok.
|
-spec unload() -> ok.
|
||||||
unload() ->
|
unload() ->
|
||||||
|
emqx_conf:remove_handler([telemetry]),
|
||||||
emqx_conf:remove_handler([topic_metrics]).
|
emqx_conf:remove_handler([topic_metrics]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Topic-Metrics
|
%% Topic-Metrics
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec topic_metrics() -> [emqx_types:topic()].
|
-spec topic_metrics() -> [emqx_types:topic()].
|
||||||
topic_metrics() ->
|
topic_metrics() ->
|
||||||
|
@ -63,7 +68,7 @@ topic_metrics() ->
|
||||||
{ok, emqx_types:topic()}
|
{ok, emqx_types:topic()}
|
||||||
| {error, term()}.
|
| {error, term()}.
|
||||||
add_topic_metrics(Topic) ->
|
add_topic_metrics(Topic) ->
|
||||||
case cfg_update(topic_metrics, ?FUNCTION_NAME, Topic) of
|
case cfg_update([topic_metrics], ?FUNCTION_NAME, Topic) of
|
||||||
{ok, _} -> {ok, Topic};
|
{ok, _} -> {ok, Topic};
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, Reason}
|
||||||
end.
|
end.
|
||||||
|
@ -72,24 +77,21 @@ add_topic_metrics(Topic) ->
|
||||||
ok
|
ok
|
||||||
| {error, term()}.
|
| {error, term()}.
|
||||||
remove_topic_metrics(Topic) ->
|
remove_topic_metrics(Topic) ->
|
||||||
case cfg_update(topic_metrics, ?FUNCTION_NAME, Topic) of
|
case cfg_update([topic_metrics], ?FUNCTION_NAME, Topic) of
|
||||||
{ok, _} -> ok;
|
{ok, _} -> ok;
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
cfg_update(topic_metrics, Action, Params) ->
|
-spec telemetry_status() -> boolean().
|
||||||
res(
|
telemetry_status() ->
|
||||||
emqx_conf:update(
|
emqx:get_config([telemetry, enable], true).
|
||||||
[topic_metrics],
|
|
||||||
{Action, Params},
|
|
||||||
#{override_to => cluster}
|
|
||||||
)
|
|
||||||
).
|
|
||||||
|
|
||||||
res({ok, Result}) -> {ok, Result};
|
-spec set_telemetry_status(boolean()) -> ok | {error, term()}.
|
||||||
res({error, {pre_config_update, ?MODULE, Reason}}) -> {error, Reason};
|
set_telemetry_status(Status) ->
|
||||||
res({error, {post_config_update, ?MODULE, Reason}}) -> {error, Reason};
|
case cfg_update([telemetry], set_telemetry_status, Status) of
|
||||||
res({error, Reason}) -> {error, Reason}.
|
{ok, _} -> ok;
|
||||||
|
{error, _} = Error -> Error
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Config Handler
|
%% Config Handler
|
||||||
|
@ -116,7 +118,9 @@ pre_config_update(_, {remove_topic_metrics, Topic0}, RawConf) ->
|
||||||
{ok, RawConf -- [Topic]};
|
{ok, RawConf -- [Topic]};
|
||||||
_ ->
|
_ ->
|
||||||
{error, not_found}
|
{error, not_found}
|
||||||
end.
|
end;
|
||||||
|
pre_config_update(_, {set_telemetry_status, Status}, RawConf) ->
|
||||||
|
{ok, RawConf#{<<"enable">> => Status}}.
|
||||||
|
|
||||||
-spec post_config_update(
|
-spec post_config_update(
|
||||||
list(atom()),
|
list(atom()),
|
||||||
|
@ -148,4 +152,33 @@ post_config_update(
|
||||||
case emqx_topic_metrics:deregister(Topic) of
|
case emqx_topic_metrics:deregister(Topic) of
|
||||||
ok -> ok;
|
ok -> ok;
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, Reason}
|
||||||
|
end;
|
||||||
|
post_config_update(
|
||||||
|
_,
|
||||||
|
{set_telemetry_status, Status},
|
||||||
|
_NewConfig,
|
||||||
|
_OldConfig,
|
||||||
|
_AppEnvs
|
||||||
|
) ->
|
||||||
|
case Status of
|
||||||
|
true -> emqx_telemetry:enable();
|
||||||
|
false -> emqx_telemetry:disable()
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Private
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
res({ok, Result}) -> {ok, Result};
|
||||||
|
res({error, {pre_config_update, ?MODULE, Reason}}) -> {error, Reason};
|
||||||
|
res({error, {post_config_update, ?MODULE, Reason}}) -> {error, Reason};
|
||||||
|
res({error, Reason}) -> {error, Reason}.
|
||||||
|
|
||||||
|
cfg_update(Path, Action, Params) ->
|
||||||
|
res(
|
||||||
|
emqx_conf:update(
|
||||||
|
Path,
|
||||||
|
{Action, Params},
|
||||||
|
#{override_to => cluster}
|
||||||
|
)
|
||||||
|
).
|
||||||
|
|
|
@ -21,7 +21,6 @@
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
-include_lib("kernel/include/file.hrl").
|
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
-include("emqx_modules.hrl").
|
-include("emqx_modules.hrl").
|
||||||
|
@ -49,8 +48,7 @@
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
get_uuid/0,
|
get_uuid/0,
|
||||||
get_telemetry/0,
|
get_telemetry/0
|
||||||
get_status/0
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([official_version/1]).
|
-export([official_version/1]).
|
||||||
|
@ -119,9 +117,6 @@ enable() ->
|
||||||
disable() ->
|
disable() ->
|
||||||
gen_server:call(?MODULE, disable).
|
gen_server:call(?MODULE, disable).
|
||||||
|
|
||||||
get_status() ->
|
|
||||||
emqx_conf:get([telemetry, enable], true).
|
|
||||||
|
|
||||||
get_uuid() ->
|
get_uuid() ->
|
||||||
gen_server:call(?MODULE, get_uuid).
|
gen_server:call(?MODULE, get_uuid).
|
||||||
|
|
||||||
|
@ -188,12 +183,10 @@ handle_continue(Continue, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({timeout, TRef, time_to_report_telemetry_data}, State0 = #state{timer = TRef}) ->
|
handle_info({timeout, TRef, time_to_report_telemetry_data}, State0 = #state{timer = TRef}) ->
|
||||||
State =
|
State = report_telemetry(State0),
|
||||||
case get_status() of
|
|
||||||
true -> report_telemetry(State0);
|
|
||||||
false -> State0
|
|
||||||
end,
|
|
||||||
{noreply, ensure_report_timer(State)};
|
{noreply, ensure_report_timer(State)};
|
||||||
|
handle_info({timeout, _TRef, time_to_report_telemetry_data}, State = #state{timer = undefined}) ->
|
||||||
|
{noreply, State};
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
|
@ -22,15 +22,11 @@
|
||||||
|
|
||||||
-import(hoconsc, [mk/2, ref/1, ref/2, array/1]).
|
-import(hoconsc, [mk/2, ref/1, ref/2, array/1]).
|
||||||
|
|
||||||
% -export([cli/1]).
|
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
status/2,
|
status/2,
|
||||||
data/2
|
data/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([enable_telemetry/2]).
|
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
api_spec/0,
|
api_spec/0,
|
||||||
paths/0,
|
paths/0,
|
||||||
|
@ -209,11 +205,12 @@ fields(telemetry) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% HTTP API
|
%% HTTP API
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
status(get, _Params) ->
|
status(get, _Params) ->
|
||||||
{200, get_telemetry_status()};
|
{200, get_telemetry_status()};
|
||||||
status(put, #{body := Body}) ->
|
status(put, #{body := Body}) ->
|
||||||
Enable = maps:get(<<"enable">>, Body),
|
Enable = maps:get(<<"enable">>, Body),
|
||||||
case Enable =:= emqx_telemetry:get_status() of
|
case Enable =:= emqx_modules_conf:telemetry_status() of
|
||||||
true ->
|
true ->
|
||||||
Reason =
|
Reason =
|
||||||
case Enable of
|
case Enable of
|
||||||
|
@ -222,78 +219,30 @@ status(put, #{body := Body}) ->
|
||||||
end,
|
end,
|
||||||
{400, #{code => 'BAD_REQUEST', message => Reason}};
|
{400, #{code => 'BAD_REQUEST', message => Reason}};
|
||||||
false ->
|
false ->
|
||||||
enable_telemetry(Enable),
|
case enable_telemetry(Enable) of
|
||||||
{200, #{<<"enable">> => emqx_telemetry:get_status()}}
|
ok ->
|
||||||
|
{200, get_telemetry_status()};
|
||||||
|
{error, Reason} ->
|
||||||
|
{400, #{
|
||||||
|
code => 'BAD_REQUEST',
|
||||||
|
message => Reason
|
||||||
|
}}
|
||||||
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
data(get, _Request) ->
|
data(get, _Request) ->
|
||||||
{200, emqx_json:encode(get_telemetry_data())}.
|
{200, emqx_json:encode(get_telemetry_data())}.
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% CLI
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
% cli(["enable", Enable0]) ->
|
|
||||||
% Enable = list_to_atom(Enable0),
|
|
||||||
% case Enable =:= emqx_telemetry:is_enabled() of
|
|
||||||
% true ->
|
|
||||||
% case Enable of
|
|
||||||
% true -> emqx_ctl:print("Telemetry status is already enabled~n");
|
|
||||||
% false -> emqx_ctl:print("Telemetry status is already disable~n")
|
|
||||||
% end;
|
|
||||||
% false ->
|
|
||||||
% enable_telemetry(Enable),
|
|
||||||
% case Enable of
|
|
||||||
% true -> emqx_ctl:print("Enable telemetry successfully~n");
|
|
||||||
% false -> emqx_ctl:print("Disable telemetry successfully~n")
|
|
||||||
% end
|
|
||||||
% end;
|
|
||||||
|
|
||||||
% cli(["get", "status"]) ->
|
|
||||||
% case get_telemetry_status() of
|
|
||||||
% [{enabled, true}] ->
|
|
||||||
% emqx_ctl:print("Telemetry is enabled~n");
|
|
||||||
% [{enabled, false}] ->
|
|
||||||
% emqx_ctl:print("Telemetry is disabled~n")
|
|
||||||
% end;
|
|
||||||
|
|
||||||
% cli(["get", "data"]) ->
|
|
||||||
% TelemetryData = get_telemetry_data(),
|
|
||||||
% case emqx_json:safe_encode(TelemetryData, [pretty]) of
|
|
||||||
% {ok, Bin} ->
|
|
||||||
% emqx_ctl:print("~ts~n", [Bin]);
|
|
||||||
% {error, _Reason} ->
|
|
||||||
% emqx_ctl:print("Failed to get telemetry data")
|
|
||||||
% end;
|
|
||||||
|
|
||||||
% cli(_) ->
|
|
||||||
% emqx_ctl:usage([{"telemetry enable", "Enable telemetry"},
|
|
||||||
% {"telemetry disable", "Disable telemetry"},
|
|
||||||
% {"telemetry get data", "Get reported telemetry data"}]).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% internal function
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
enable_telemetry(Enable) ->
|
enable_telemetry(Enable) ->
|
||||||
lists:foreach(
|
emqx_modules_conf:set_telemetry_status(Enable).
|
||||||
fun(Node) ->
|
|
||||||
enable_telemetry(Node, Enable)
|
|
||||||
end,
|
|
||||||
mria_mnesia:running_nodes()
|
|
||||||
).
|
|
||||||
|
|
||||||
enable_telemetry(Node, true) ->
|
|
||||||
is_ok(emqx_telemetry_proto_v1:enable_telemetry(Node));
|
|
||||||
enable_telemetry(Node, false) ->
|
|
||||||
is_ok(emqx_telemetry_proto_v1:disable_telemetry(Node)).
|
|
||||||
|
|
||||||
get_telemetry_status() ->
|
get_telemetry_status() ->
|
||||||
#{enabled => emqx_telemetry:get_status()}.
|
#{enable => emqx_modules_conf:telemetry_status()}.
|
||||||
|
|
||||||
get_telemetry_data() ->
|
get_telemetry_data() ->
|
||||||
{ok, TelemetryData} = emqx_telemetry:get_telemetry(),
|
{ok, TelemetryData} = emqx_telemetry:get_telemetry(),
|
||||||
TelemetryData.
|
TelemetryData.
|
||||||
|
|
||||||
is_ok(Result) ->
|
|
||||||
case Result of
|
|
||||||
{badrpc, Reason} -> {error, Reason};
|
|
||||||
Result -> Result
|
|
||||||
end.
|
|
||||||
|
|
|
@ -20,10 +20,7 @@
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
introduced_in/0,
|
introduced_in/0,
|
||||||
|
get_uuid/1
|
||||||
get_uuid/1,
|
|
||||||
enable_telemetry/1,
|
|
||||||
disable_telemetry/1
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include_lib("emqx/include/bpapi.hrl").
|
-include_lib("emqx/include/bpapi.hrl").
|
||||||
|
@ -34,11 +31,3 @@ introduced_in() ->
|
||||||
-spec get_uuid(node()) -> {ok, binary()} | emqx_rpc:badrpc().
|
-spec get_uuid(node()) -> {ok, binary()} | emqx_rpc:badrpc().
|
||||||
get_uuid(Node) ->
|
get_uuid(Node) ->
|
||||||
rpc:call(Node, emqx_telemetry, get_uuid, []).
|
rpc:call(Node, emqx_telemetry, get_uuid, []).
|
||||||
|
|
||||||
-spec enable_telemetry(node()) -> _.
|
|
||||||
enable_telemetry(Node) ->
|
|
||||||
rpc:call(Node, emqx_telemetry, enable, []).
|
|
||||||
|
|
||||||
-spec disable_telemetry(node()) -> _.
|
|
||||||
disable_telemetry(Node) ->
|
|
||||||
rpc:call(Node, emqx_telemetry, disable, []).
|
|
||||||
|
|
|
@ -35,13 +35,11 @@ all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
mria:start(),
|
emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]),
|
||||||
ok = emqx_delayed:mnesia(boot),
|
|
||||||
emqx_common_test_helpers:start_apps([emqx_modules]),
|
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_) ->
|
end_per_suite(_) ->
|
||||||
emqx_common_test_helpers:stop_apps([emqx_modules]).
|
emqx_common_test_helpers:stop_apps([emqx_modules, emqx_conf]).
|
||||||
|
|
||||||
init_per_testcase(t_load_case, Config) ->
|
init_per_testcase(t_load_case, Config) ->
|
||||||
Config;
|
Config;
|
||||||
|
|
|
@ -22,31 +22,28 @@
|
||||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
-define(REWRITE, <<
|
-define(REWRITE, #{
|
||||||
""
|
<<"rewrite">> => [
|
||||||
"\n"
|
#{
|
||||||
"rewrite: [\n"
|
<<"action">> => <<"publish">>,
|
||||||
" {\n"
|
<<"dest_topic">> => <<"z/y/$1">>,
|
||||||
" action : publish\n"
|
<<"re">> => <<"^x/y/(.+)$">>,
|
||||||
" source_topic : \"x/#\"\n"
|
<<"source_topic">> => <<"x/#">>
|
||||||
" re : \"^x/y/(.+)$\"\n"
|
},
|
||||||
" dest_topic : \"z/y/$1\"\n"
|
#{
|
||||||
" },\n"
|
<<"action">> => <<"subscribe">>,
|
||||||
" {\n"
|
<<"dest_topic">> => <<"y/z/$2">>,
|
||||||
" action : subscribe\n"
|
<<"re">> => <<"^y/(.+)/z/(.+)$">>,
|
||||||
" source_topic : \"y/+/z/#\"\n"
|
<<"source_topic">> => <<"y/+/z/#">>
|
||||||
" re : \"^y/(.+)/z/(.+)$\"\n"
|
},
|
||||||
" dest_topic : \"y/z/$2\"\n"
|
#{
|
||||||
" },\n"
|
<<"action">> => <<"all">>,
|
||||||
" {\n"
|
<<"dest_topic">> => <<"all/x/$2">>,
|
||||||
" action : all\n"
|
<<"re">> => <<"^all/(.+)/x/(.+)$">>,
|
||||||
" source_topic : \"all/+/x/#\"\n"
|
<<"source_topic">> => <<"all/+/x/#">>
|
||||||
" re : \"^all/(.+)/x/(.+)$\"\n"
|
}
|
||||||
" dest_topic : \"all/x/$2\"\n"
|
]
|
||||||
" }\n"
|
}).
|
||||||
"]"
|
|
||||||
""
|
|
||||||
>>).
|
|
||||||
|
|
||||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,110 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_rewrite_api_SUITE).
|
||||||
|
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
-compile(export_all).
|
||||||
|
|
||||||
|
-import(emqx_dashboard_api_test_helpers, [request/3, uri/1]).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
|
-define(BASE_CONF, #{}).
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_testcase(_, Config) ->
|
||||||
|
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF),
|
||||||
|
|
||||||
|
ok = emqx_common_test_helpers:start_apps(
|
||||||
|
[emqx_conf, emqx_modules, emqx_dashboard],
|
||||||
|
fun set_special_configs/1
|
||||||
|
),
|
||||||
|
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_suite(_Config) ->
|
||||||
|
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_dashboard, emqx_modules]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
set_special_configs(emqx_dashboard) ->
|
||||||
|
emqx_dashboard_api_test_helpers:set_default_config();
|
||||||
|
set_special_configs(_App) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Tests
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_mqtt_topic_rewrite(_) ->
|
||||||
|
Rules = [
|
||||||
|
#{
|
||||||
|
<<"source_topic">> => <<"test/#">>,
|
||||||
|
<<"re">> => <<"test/*">>,
|
||||||
|
<<"dest_topic">> => <<"test1/$2">>,
|
||||||
|
<<"action">> => <<"publish">>
|
||||||
|
}
|
||||||
|
],
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{ok, 200, _},
|
||||||
|
request(
|
||||||
|
put,
|
||||||
|
uri(["mqtt", "topic_rewrite"]),
|
||||||
|
Rules
|
||||||
|
)
|
||||||
|
),
|
||||||
|
|
||||||
|
{ok, 200, Result} =
|
||||||
|
request(get, uri(["mqtt", "topic_rewrite"])),
|
||||||
|
|
||||||
|
?assertEqual(
|
||||||
|
Rules,
|
||||||
|
jsx:decode(Result)
|
||||||
|
).
|
||||||
|
|
||||||
|
t_mqtt_topic_rewrite_limit(_) ->
|
||||||
|
Rule =
|
||||||
|
#{
|
||||||
|
<<"source_topic">> => <<"test/#">>,
|
||||||
|
<<"re">> => <<"test/*">>,
|
||||||
|
<<"dest_topic">> => <<"test1/$2">>,
|
||||||
|
<<"action">> => <<"publish">>
|
||||||
|
},
|
||||||
|
|
||||||
|
Rules = [Rule || _ <- lists:seq(1, 21)],
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{ok, 413, _},
|
||||||
|
request(
|
||||||
|
put,
|
||||||
|
uri(["mqtt", "topic_rewrite"]),
|
||||||
|
Rules
|
||||||
|
)
|
||||||
|
).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Helpers
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
request(Method, Url) ->
|
||||||
|
request(Method, Url, []).
|
|
@ -28,7 +28,7 @@
|
||||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
snabbkaffe:fix_ct_logging(),
|
ok = meck:new(emqx_authz, [non_strict, passthrough, no_history, no_link]),
|
||||||
meck:expect(
|
meck:expect(
|
||||||
emqx_authz,
|
emqx_authz,
|
||||||
acl_conf_file,
|
acl_conf_file,
|
||||||
|
@ -139,8 +139,8 @@ t_uuid(_) ->
|
||||||
{ok, UUID2} = emqx_telemetry:get_uuid(),
|
{ok, UUID2} = emqx_telemetry:get_uuid(),
|
||||||
emqx_telemetry:disable(),
|
emqx_telemetry:disable(),
|
||||||
emqx_telemetry:enable(),
|
emqx_telemetry:enable(),
|
||||||
emqx_telemetry_proto_v1:disable_telemetry(node()),
|
emqx_modules_conf:set_telemetry_status(false),
|
||||||
emqx_telemetry_proto_v1:enable_telemetry(node()),
|
emqx_modules_conf:set_telemetry_status(true),
|
||||||
{ok, UUID3} = emqx_telemetry:get_uuid(),
|
{ok, UUID3} = emqx_telemetry:get_uuid(),
|
||||||
{ok, UUID4} = emqx_telemetry_proto_v1:get_uuid(node()),
|
{ok, UUID4} = emqx_telemetry_proto_v1:get_uuid(node()),
|
||||||
?assertEqual(UUID2, UUID3),
|
?assertEqual(UUID2, UUID3),
|
||||||
|
|
|
@ -0,0 +1,168 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_telemetry_api_SUITE).
|
||||||
|
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
-compile(export_all).
|
||||||
|
|
||||||
|
-import(emqx_dashboard_api_test_helpers, [request/2, request/3, uri/1]).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
|
-define(BASE_CONF, #{}).
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF),
|
||||||
|
|
||||||
|
ok = emqx_common_test_helpers:start_apps(
|
||||||
|
[emqx_conf, emqx_authn, emqx_authz, emqx_modules, emqx_dashboard],
|
||||||
|
fun set_special_configs/1
|
||||||
|
),
|
||||||
|
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_suite(_Config) ->
|
||||||
|
{ok, _} = emqx:update_config(
|
||||||
|
[authorization],
|
||||||
|
#{
|
||||||
|
<<"no_match">> => <<"allow">>,
|
||||||
|
<<"cache">> => #{<<"enable">> => <<"true">>},
|
||||||
|
<<"sources">> => []
|
||||||
|
}
|
||||||
|
),
|
||||||
|
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_authn, emqx_authz, emqx_modules]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
init_per_testcase(t_status_fail, Config) ->
|
||||||
|
meck:new(emqx_telemetry, [non_strict, passthrough]),
|
||||||
|
meck:expect(emqx_telemetry, official_version, 1, false),
|
||||||
|
Config;
|
||||||
|
init_per_testcase(t_status, Config) ->
|
||||||
|
meck:new(emqx_telemetry, [non_strict, passthrough]),
|
||||||
|
meck:expect(emqx_telemetry, official_version, 1, true),
|
||||||
|
Config;
|
||||||
|
init_per_testcase(_TestCase, Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(t_status_fail, _Config) ->
|
||||||
|
meck:unload(emqx_telemetry);
|
||||||
|
end_per_testcase(t_status, _Config) ->
|
||||||
|
meck:unload(emqx_telemetry);
|
||||||
|
end_per_testcase(_TestCase, _Config) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
set_special_configs(emqx_dashboard) ->
|
||||||
|
emqx_dashboard_api_test_helpers:set_default_config();
|
||||||
|
set_special_configs(emqx_authz) ->
|
||||||
|
{ok, _} = emqx:update_config([authorization, cache, enable], false),
|
||||||
|
{ok, _} = emqx:update_config([authorization, no_match], deny),
|
||||||
|
{ok, _} = emqx:update_config([authorization, sources], []),
|
||||||
|
ok;
|
||||||
|
set_special_configs(_App) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Tests
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_status(_) ->
|
||||||
|
?assertMatch(
|
||||||
|
{ok, 200, _},
|
||||||
|
request(
|
||||||
|
put,
|
||||||
|
uri(["telemetry", "status"]),
|
||||||
|
#{<<"enable">> => false}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
|
||||||
|
{ok, 200, Result0} =
|
||||||
|
request(get, uri(["telemetry", "status"])),
|
||||||
|
|
||||||
|
?assertEqual(
|
||||||
|
#{<<"enable">> => false},
|
||||||
|
jsx:decode(Result0)
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{ok, 400, _},
|
||||||
|
request(
|
||||||
|
put,
|
||||||
|
uri(["telemetry", "status"]),
|
||||||
|
#{<<"enable">> => false}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{ok, 200, _},
|
||||||
|
request(
|
||||||
|
put,
|
||||||
|
uri(["telemetry", "status"]),
|
||||||
|
#{<<"enable">> => true}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
|
||||||
|
{ok, 200, Result1} =
|
||||||
|
request(get, uri(["telemetry", "status"])),
|
||||||
|
|
||||||
|
?assertEqual(
|
||||||
|
#{<<"enable">> => true},
|
||||||
|
jsx:decode(Result1)
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{ok, 400, _},
|
||||||
|
request(
|
||||||
|
put,
|
||||||
|
uri(["telemetry", "status"]),
|
||||||
|
#{<<"enable">> => true}
|
||||||
|
)
|
||||||
|
).
|
||||||
|
|
||||||
|
t_status_fail(_) ->
|
||||||
|
?assertMatch(
|
||||||
|
{ok, 400, _},
|
||||||
|
request(
|
||||||
|
put,
|
||||||
|
uri(["telemetry", "status"]),
|
||||||
|
#{<<"enable">> => false}
|
||||||
|
)
|
||||||
|
).
|
||||||
|
|
||||||
|
t_data(_) ->
|
||||||
|
{ok, 200, Result} =
|
||||||
|
request(get, uri(["telemetry", "data"])),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"active_plugins">> := _,
|
||||||
|
<<"advanced_mqtt_features">> := _,
|
||||||
|
<<"build_info">> := _,
|
||||||
|
<<"emqx_version">> := _,
|
||||||
|
<<"license">> := _,
|
||||||
|
<<"messages_received">> := _,
|
||||||
|
<<"mqtt_runtime_insights">> := _,
|
||||||
|
<<"nodes_uuid">> := _,
|
||||||
|
<<"os_name">> := _,
|
||||||
|
<<"otp_version">> := _,
|
||||||
|
<<"uuid">> := _,
|
||||||
|
<<"vm_specs">> := _
|
||||||
|
},
|
||||||
|
jsx:decode(Result)
|
||||||
|
).
|
|
@ -28,12 +28,12 @@ all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
emqx_common_test_helpers:boot_modules(all),
|
emqx_common_test_helpers:boot_modules(all),
|
||||||
emqx_common_test_helpers:start_apps([emqx_modules]),
|
emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]),
|
||||||
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?TOPIC),
|
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?TOPIC),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
emqx_common_test_helpers:stop_apps([emqx_modules]).
|
emqx_common_test_helpers:stop_apps([emqx_modules, emqx_conf]).
|
||||||
|
|
||||||
init_per_testcase(_Case, Config) ->
|
init_per_testcase(_Case, Config) ->
|
||||||
emqx_topic_metrics:enable(),
|
emqx_topic_metrics:enable(),
|
||||||
|
|
|
@ -31,7 +31,6 @@ all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
init_per_testcase(_, Config) ->
|
init_per_testcase(_, Config) ->
|
||||||
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
|
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun emqx_modules_conf:remove_topic_metrics/1,
|
fun emqx_modules_conf:remove_topic_metrics/1,
|
||||||
emqx_modules_conf:topic_metrics()
|
emqx_modules_conf:topic_metrics()
|
||||||
|
|
Loading…
Reference in New Issue