emqx/apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl

447 lines
12 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_retainer_api_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include("emqx_retainer.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/asserts.hrl").
-import(emqx_mgmt_api_test_util, [
request_api/2, request_api/4, request_api/5, api_path/1, auth_header_/0
]).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
[
emqx_conf,
emqx,
emqx_retainer,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{
work_dir => emqx_cth_suite:work_dir(Config)
}
),
_ = emqx_common_test_http:create_default_app(),
%% make sure no "$SYS/#" topics
_ = emqx_conf:update([sys_topics], raw_systopic_conf(), #{override_to => cluster}),
[{apps, Apps} | Config].
end_per_suite(Config) ->
emqx_common_test_http:delete_default_app(),
emqx_cth_suite:stop(?config(apps, Config)).
init_per_testcase(_, Config) ->
snabbkaffe:start_trace(),
emqx_retainer:clean(),
{ok, C} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C),
[{client, C} | Config].
end_per_testcase(_, Config) ->
ok = emqtt:disconnect(?config(client, Config)),
snabbkaffe:stop(),
ok.
%%------------------------------------------------------------------------------
%% Test Cases
%%------------------------------------------------------------------------------
t_config(_Config) ->
Path = api_path(["mqtt", "retainer"]),
{ok, ConfJson} = request_api(get, Path),
ReturnConf = decode_json(ConfJson),
?assertMatch(
#{
backend := _,
enable := _,
max_payload_size := _,
msg_clear_interval := _,
msg_expiry_interval := _
},
ReturnConf
),
UpdateConf = fun(Enable) ->
RawConf = emqx_utils_json:decode(ConfJson, [return_maps]),
UpdateJson = RawConf#{<<"enable">> := Enable},
{ok, UpdateResJson} = request_api(
put,
Path,
[],
auth_header_(),
UpdateJson
),
UpdateRawConf = emqx_utils_json:decode(UpdateResJson, [return_maps]),
?assertEqual(Enable, maps:get(<<"enable">>, UpdateRawConf))
end,
UpdateConf(false),
UpdateConf(true).
t_messages1(Config) ->
C = ?config(client, Config),
Each = fun(I) ->
emqtt:publish(
C,
<<"retained/", (I + 60)>>,
<<"retained">>,
[{qos, 0}, {retain, true}]
)
end,
?assertWaitEvent(
lists:foreach(Each, lists:seq(1, 5)),
#{?snk_kind := message_retained, topic := <<"retained/A">>},
500
),
{ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
#{data := Msgs, meta := _} = decode_json(MsgsJson),
MsgLen = erlang:length(Msgs),
?assert(
MsgLen =:= 5,
io_lib:format("message length is:~p~n", [MsgLen])
),
[First | _] = Msgs,
?assertMatch(
#{
msgid := _,
topic := _,
qos := _,
publish_at := _,
from_clientid := _,
from_username := _
},
First
).
t_messages2(Config) ->
C = ?config(client, Config),
ok = lists:foreach(
fun(Topic) ->
?assertWaitEvent(
emqtt:publish(C, Topic, <<"retained">>, [{qos, 0}, {retain, true}]),
#{?snk_kind := message_retained, topic := Topic},
500
)
end,
[<<"c">>, <<"c/1">>, <<"c/1/1">>]
),
{ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages?topic=c"])),
#{data := Msgs, meta := _} = decode_json(MsgsJson),
?assertEqual(1, length(Msgs)).
t_message1(Config) ->
C = ?config(client, Config),
?assertWaitEvent(
emqtt:publish(C, <<"c/1">>, <<"retained">>, [{qos, 0}, {retain, true}]),
#{?snk_kind := message_retained, topic := <<"c/1">>},
500
),
?assertMatch(
{error, {_, 404, _}},
request_api(
get,
api_path(["mqtt", "retainer", "message", "c"])
)
),
{ok, Json} =
request_api(
get,
api_path(["mqtt", "retainer", "message", "c%2F1"])
),
?assertMatch(
#{
topic := <<"c/1">>,
payload := <<"cmV0YWluZWQ=">>
},
decode_json(Json)
).
t_message2(Config) ->
C = ?config(client, Config),
?assertWaitEvent(
emqtt:publish(C, <<"c">>, <<"retained">>, [{qos, 0}, {retain, true}]),
#{?snk_kind := message_retained, topic := <<"c">>},
500
),
?assertMatch(
{error, {_, 404, _}},
request_api(
get,
api_path(["mqtt", "retainer", "message", "c%2F%2B"])
)
),
{ok, Json0} =
request_api(
get,
api_path(["mqtt", "retainer", "message", "c"])
),
?assertMatch(
#{
topic := <<"c">>,
payload := <<"cmV0YWluZWQ=">>
},
decode_json(Json0)
),
{ok, Json1} =
request_api(
get,
api_path(["mqtt", "retainer", "message", "c%2F%23"])
),
?assertMatch(
#{
topic := <<"c">>,
payload := <<"cmV0YWluZWQ=">>
},
decode_json(Json1)
).
t_messages_page(Config) ->
C = ?config(client, Config),
Each = fun(I) ->
emqtt:publish(
C,
<<"retained/", (I + 60)>>,
<<"retained">>,
[{qos, 0}, {retain, true}]
)
end,
?assertWaitEvent(
lists:foreach(Each, lists:seq(1, 5)),
#{?snk_kind := message_retained, topic := <<"retained/A">>},
500
),
Page = 4,
{ok, MsgsJson} = request_api(
get,
api_path([
"mqtt", "retainer", "messages?page=" ++ erlang:integer_to_list(Page) ++ "&limit=1"
])
),
#{data := Msgs, meta := #{page := Page, limit := 1}} = decode_json(MsgsJson),
MsgLen = erlang:length(Msgs),
?assert(
MsgLen =:= 1,
io_lib:format("message length is:~p~n", [MsgLen])
),
[OnlyOne] = Msgs,
Topic = <<"retained/", (Page + 60)>>,
?assertMatch(
#{
msgid := _,
topic := Topic,
qos := _,
publish_at := _,
from_clientid := _,
from_username := _
},
OnlyOne
).
t_lookup_and_delete(Config) ->
C = ?config(client, Config),
timer:sleep(300),
emqtt:publish(C, <<"retained/api">>, <<"retained">>, [{qos, 0}, {retain, true}]),
timer:sleep(300),
API = api_path(["mqtt", "retainer", "message", "retained%2Fapi"]),
{ok, LookupJson} = request_api(get, API),
LookupResult = decode_json(LookupJson),
?assertMatch(
#{
msgid := _,
topic := _,
qos := _,
payload := _,
publish_at := _,
from_clientid := _,
from_username := _
},
LookupResult
),
{ok, []} = request_api(delete, API),
{error, {"HTTP/1.1", 404, "Not Found"}} = request_api(get, API),
{error, {"HTTP/1.1", 404, "Not Found"}} = request_api(delete, API).
t_change_storage_type(_Config) ->
Path = api_path(["mqtt", "retainer"]),
{ok, ConfJson} = request_api(get, Path),
RawConf = emqx_utils_json:decode(ConfJson, [return_maps]),
%% pre-conditions
?assertMatch(
#{
<<"backend">> := #{
<<"type">> := <<"built_in_database">>,
<<"storage_type">> := <<"ram">>
},
<<"enable">> := true
},
RawConf
),
?assertEqual(ram_copies, mnesia:table_info(?TAB_INDEX_META, storage_type)),
?assertEqual(ram_copies, mnesia:table_info(?TAB_MESSAGE, storage_type)),
?assertEqual(ram_copies, mnesia:table_info(?TAB_INDEX, storage_type)),
%% insert some retained messages
{ok, C0} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C0),
ok = snabbkaffe:start_trace(),
Topic = <<"retained">>,
Payload = <<"retained">>,
{ok, {ok, _}} =
?wait_async_action(
emqtt:publish(C0, Topic, Payload, [{qos, 0}, {retain, true}]),
#{?snk_kind := message_retained, topic := Topic},
500
),
emqtt:stop(C0),
ok = snabbkaffe:stop(),
{ok, MsgsJson0} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
#{data := Msgs0, meta := _} = decode_json(MsgsJson0),
?assertEqual(1, length(Msgs0)),
ChangedConf = emqx_utils_maps:deep_merge(
RawConf,
#{
<<"backend">> =>
#{<<"storage_type">> => <<"disc">>}
}
),
{ok, UpdateResJson} = request_api(
put,
Path,
[],
auth_header_(),
ChangedConf
),
UpdatedRawConf = emqx_utils_json:decode(UpdateResJson, [return_maps]),
?assertMatch(
#{
<<"backend">> := #{
<<"type">> := <<"built_in_database">>,
<<"storage_type">> := <<"disc">>
},
<<"enable">> := true
},
UpdatedRawConf
),
?assertEqual(disc_copies, mnesia:table_info(?TAB_INDEX_META, storage_type)),
?assertEqual(disc_copies, mnesia:table_info(?TAB_MESSAGE, storage_type)),
?assertEqual(disc_copies, mnesia:table_info(?TAB_INDEX, storage_type)),
%% keep retained messages
{ok, MsgsJson1} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
#{data := Msgs1, meta := _} = decode_json(MsgsJson1),
?assertEqual(1, length(Msgs1)),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
{ok, _, _} = emqtt:subscribe(C1, Topic),
receive
{publish, #{topic := T, payload := P, retain := R}} ->
?assertEqual(Payload, P),
?assertEqual(Topic, T),
?assert(R),
ok
after 500 ->
emqtt:stop(C1),
ct:fail("should have preserved retained messages")
end,
emqtt:stop(C1),
ok.
t_match_and_clean(Config) ->
C = ?config(client, Config),
timer:sleep(300),
_ = [
emqtt:publish(C, <<P/binary, "/", S/binary>>, <<"retained">>, [{qos, 0}, {retain, true}])
|| P <- [<<"t">>, <<"f">>], S <- [<<"1">>, <<"2">>, <<"3">>]
],
timer:sleep(1000),
API = api_path(["mqtt", "retainer", "messages"]),
{ok, LookupJson} = request_api(get, API, "topic=t/%2B", auth_header_()),
LookupResult = decode_json(LookupJson),
Expected = lists:usort([<<"t/1">>, <<"t/2">>, <<"t/3">>]),
?assertMatch(
Expected,
lists:usort([Topic || #{topic := Topic} <- maps:get(data, LookupResult)])
),
CleanAPI = api_path(["mqtt", "retainer", "messages"]),
{ok, []} = request_api(delete, CleanAPI),
{ok, LookupJson2} = request_api(get, API),
?assertMatch(#{data := []}, decode_json(LookupJson2)).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
decode_json(Data) ->
BinJson = emqx_utils_json:decode(Data, [return_maps]),
emqx_utils_maps:unsafe_atom_key_map(BinJson).
raw_systopic_conf() ->
#{
<<"sys_event_messages">> =>
#{
<<"client_connected">> => false,
<<"client_disconnected">> => false,
<<"client_subscribed">> => false,
<<"client_unsubscribed">> => false
},
<<"sys_heartbeat_interval">> => <<"1440m">>,
<<"sys_msg_interval">> => <<"1440m">>
}.