211 lines
7.3 KiB
Erlang
211 lines
7.3 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
-module(emqx_auto_subscribe_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
-define(APP, emqx_auto_subscribe).
|
|
|
|
-define(TOPIC_C, <<"/c/${clientid}">>).
|
|
-define(TOPIC_U, <<"/u/${username}">>).
|
|
-define(TOPIC_H, <<"/h/${host}">>).
|
|
-define(TOPIC_P, <<"/p/${port}">>).
|
|
-define(TOPIC_A, <<"/client/${clientid}/username/${username}/host/${host}/port/${port}">>).
|
|
-define(TOPIC_S, <<"/topic/simple">>).
|
|
|
|
-define(TOPICS, [?TOPIC_C, ?TOPIC_U, ?TOPIC_H, ?TOPIC_P, ?TOPIC_A, ?TOPIC_S]).
|
|
|
|
-define(ENSURE_TOPICS, [
|
|
<<"/c/auto_sub_c">>,
|
|
<<"/u/auto_sub_u">>,
|
|
?TOPIC_S
|
|
]).
|
|
|
|
-define(CLIENT_ID, <<"auto_sub_c">>).
|
|
-define(CLIENT_USERNAME, <<"auto_sub_u">>).
|
|
|
|
all() ->
|
|
emqx_common_test_helpers:all(?MODULE).
|
|
|
|
init_per_suite(Config) ->
|
|
mria:start(),
|
|
application:stop(?APP),
|
|
meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]),
|
|
meck:expect(emqx_schema, fields, fun
|
|
("auto_subscribe") ->
|
|
meck:passthrough(["auto_subscribe"]) ++
|
|
emqx_auto_subscribe_schema:fields("auto_subscribe");
|
|
(F) ->
|
|
meck:passthrough([F])
|
|
end),
|
|
|
|
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
|
|
meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end),
|
|
meck:expect(emqx_resource, update, fun(_, _, _, _) -> {ok, meck_data} end),
|
|
meck:expect(emqx_resource, remove, fun(_) -> ok end),
|
|
|
|
application:load(emqx_dashboard),
|
|
application:load(?APP),
|
|
ok = emqx_common_test_helpers:load_config(
|
|
emqx_auto_subscribe_schema,
|
|
<<
|
|
"auto_subscribe {\n"
|
|
" topics = [\n"
|
|
" {\n"
|
|
" topic = \"/c/${clientid}\"\n"
|
|
" },\n"
|
|
" {\n"
|
|
" topic = \"/u/${username}\"\n"
|
|
" },\n"
|
|
" {\n"
|
|
" topic = \"/h/${host}\"\n"
|
|
" },\n"
|
|
" {\n"
|
|
" topic = \"/p/${port}\"\n"
|
|
" },\n"
|
|
" {\n"
|
|
" topic = \"/client/${clientid}/username/${username}/host/${host}/port/${port}\"\n"
|
|
" },\n"
|
|
" {\n"
|
|
" topic = \"/topic/simple\"\n"
|
|
" qos = 1\n"
|
|
" rh = 0\n"
|
|
" rap = 0\n"
|
|
" nl = 0\n"
|
|
" }\n"
|
|
" ]\n"
|
|
" }"
|
|
>>
|
|
),
|
|
emqx_mgmt_api_test_util:init_suite(
|
|
[emqx_conf, ?APP]
|
|
),
|
|
Config.
|
|
|
|
init_per_testcase(t_get_basic_usage_info, Config) ->
|
|
{ok, _} = emqx_auto_subscribe:update([]),
|
|
Config;
|
|
init_per_testcase(_TestCase, Config) ->
|
|
Config.
|
|
|
|
end_per_testcase(t_get_basic_usage_info, _Config) ->
|
|
{ok, _} = emqx_auto_subscribe:update([]),
|
|
ok;
|
|
end_per_testcase(_TestCase, _Config) ->
|
|
ok.
|
|
|
|
topic_config(T) ->
|
|
#{
|
|
topic => T,
|
|
qos => 0,
|
|
rh => 0,
|
|
rap => 0,
|
|
nl => 0
|
|
}.
|
|
|
|
end_per_suite(_) ->
|
|
application:unload(emqx_management),
|
|
application:unload(emqx_conf),
|
|
application:unload(?APP),
|
|
meck:unload(emqx_resource),
|
|
meck:unload(emqx_schema),
|
|
emqx_mgmt_api_test_util:end_suite([emqx_conf, ?APP]).
|
|
|
|
t_auto_subscribe(_) ->
|
|
emqx_auto_subscribe:update([#{<<"topic">> => Topic} || Topic <- ?TOPICS]),
|
|
{ok, Client} = emqtt:start_link(#{username => ?CLIENT_USERNAME, clientid => ?CLIENT_ID}),
|
|
{ok, _} = emqtt:connect(Client),
|
|
timer:sleep(200),
|
|
?assertEqual(check_subs(length(?TOPICS)), ok),
|
|
emqtt:disconnect(Client),
|
|
ok.
|
|
|
|
t_update(_) ->
|
|
Path = emqx_mgmt_api_test_util:api_path(["mqtt", "auto_subscribe"]),
|
|
Auth = emqx_mgmt_api_test_util:auth_header_(),
|
|
Body = [#{topic => ?TOPIC_S}],
|
|
{ok, Response} = emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, Body),
|
|
ResponseMap = emqx_utils_json:decode(Response, [return_maps]),
|
|
?assertEqual(1, erlang:length(ResponseMap)),
|
|
|
|
BadBody1 = #{topic => ?TOPIC_S},
|
|
?assertMatch(
|
|
{error, {"HTTP/1.1", 400, "Bad Request"}},
|
|
emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, BadBody1)
|
|
),
|
|
BadBody2 = [#{topic => ?TOPIC_S, qos => 3}],
|
|
?assertMatch(
|
|
{error, {"HTTP/1.1", 400, "Bad Request"}},
|
|
emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, BadBody2)
|
|
),
|
|
BadBody3 = [#{topic => ?TOPIC_S, rh => 10}],
|
|
?assertMatch(
|
|
{error, {"HTTP/1.1", 400, "Bad Request"}},
|
|
emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, BadBody3)
|
|
),
|
|
BadBody4 = [#{topic => ?TOPIC_S, rap => -1}],
|
|
?assertMatch(
|
|
{error, {"HTTP/1.1", 400, "Bad Request"}},
|
|
emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, BadBody4)
|
|
),
|
|
BadBody5 = [#{topic => ?TOPIC_S, nl => -1}],
|
|
?assertMatch(
|
|
{error, {"HTTP/1.1", 400, "Bad Request"}},
|
|
emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, BadBody5)
|
|
),
|
|
|
|
{ok, Client} = emqtt:start_link(#{username => ?CLIENT_USERNAME, clientid => ?CLIENT_ID}),
|
|
{ok, _} = emqtt:connect(Client),
|
|
timer:sleep(100),
|
|
?assertEqual(check_subs(ets:tab2list(emqx_suboption), [?TOPIC_S]), ok),
|
|
emqtt:disconnect(Client),
|
|
|
|
{ok, GETResponse} = emqx_mgmt_api_test_util:request_api(get, Path),
|
|
GETResponseMap = emqx_utils_json:decode(GETResponse, [return_maps]),
|
|
?assertEqual(1, erlang:length(GETResponseMap)),
|
|
ok.
|
|
|
|
t_get_basic_usage_info(_Config) ->
|
|
?assertEqual(#{auto_subscribe_count => 0}, emqx_auto_subscribe:get_basic_usage_info()),
|
|
AutoSubscribeTopics =
|
|
lists:map(
|
|
fun(N) ->
|
|
Num = integer_to_binary(N),
|
|
Topic = <<"auto/", Num/binary>>,
|
|
#{<<"topic">> => Topic}
|
|
end,
|
|
lists:seq(1, 3)
|
|
),
|
|
{ok, _} = emqx_auto_subscribe:update(AutoSubscribeTopics),
|
|
?assertEqual(#{auto_subscribe_count => 3}, emqx_auto_subscribe:get_basic_usage_info()),
|
|
ok.
|
|
|
|
check_subs(Count) ->
|
|
Subs = ets:tab2list(emqx_suboption),
|
|
ct:pal("---> ~p ~p ~n", [Subs, Count]),
|
|
?assert(length(Subs) >= Count),
|
|
check_subs((Subs), ?ENSURE_TOPICS).
|
|
|
|
check_subs([], []) ->
|
|
ok;
|
|
check_subs([{{Topic, _}, #{subid := ?CLIENT_ID}} | Subs], List) ->
|
|
check_subs(Subs, lists:delete(Topic, List));
|
|
check_subs([_ | Subs], List) ->
|
|
check_subs(Subs, List).
|