332 lines
9.8 KiB
Erlang
332 lines
9.8 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_retainer_api_SUITE).
|
|
|
|
-compile(nowarn_export_all).
|
|
-compile(export_all).
|
|
|
|
-include("emqx_retainer.hrl").
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
|
|
|
|
-import(emqx_mgmt_api_test_util, [request_api/2, request_api/5, api_path/1, auth_header_/0]).
|
|
|
|
all() ->
|
|
emqx_common_test_helpers:all(?MODULE).
|
|
|
|
init_per_suite(Config) ->
|
|
application:load(emqx_conf),
|
|
ok = ekka:start(),
|
|
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
|
|
emqx_retainer_SUITE:load_conf(),
|
|
emqx_mgmt_api_test_util:init_suite([emqx_retainer, emqx_conf]),
|
|
%% make sure no "$SYS/#" topics
|
|
emqx_conf:update([sys_topics], raw_systopic_conf(), #{override_to => cluster}),
|
|
Config.
|
|
|
|
end_per_suite(Config) ->
|
|
ekka:stop(),
|
|
mria:stop(),
|
|
mria_mnesia:delete_schema(),
|
|
emqx_mgmt_api_test_util:end_suite([emqx_retainer, emqx_conf]),
|
|
Config.
|
|
|
|
init_per_testcase(_, Config) ->
|
|
{ok, _} = emqx_cluster_rpc:start_link(),
|
|
Config.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Test Cases
|
|
%%------------------------------------------------------------------------------
|
|
|
|
t_config(_Config) ->
|
|
Path = api_path(["mqtt", "retainer"]),
|
|
{ok, ConfJson} = request_api(get, Path),
|
|
ReturnConf = decode_json(ConfJson),
|
|
?assertMatch(
|
|
#{
|
|
backend := _,
|
|
enable := _,
|
|
flow_control := _,
|
|
max_payload_size := _,
|
|
msg_clear_interval := _,
|
|
msg_expiry_interval := _
|
|
},
|
|
ReturnConf
|
|
),
|
|
|
|
UpdateConf = fun(Enable) ->
|
|
RawConf = emqx_utils_json:decode(ConfJson, [return_maps]),
|
|
UpdateJson = RawConf#{<<"enable">> := Enable},
|
|
{ok, UpdateResJson} = request_api(
|
|
put,
|
|
Path,
|
|
[],
|
|
auth_header_(),
|
|
UpdateJson
|
|
),
|
|
UpdateRawConf = emqx_utils_json:decode(UpdateResJson, [return_maps]),
|
|
?assertEqual(Enable, maps:get(<<"enable">>, UpdateRawConf))
|
|
end,
|
|
|
|
UpdateConf(false),
|
|
UpdateConf(true).
|
|
|
|
t_messages(_) ->
|
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
|
{ok, _} = emqtt:connect(C1),
|
|
emqx_retainer:clean(),
|
|
|
|
Each = fun(I) ->
|
|
emqtt:publish(
|
|
C1,
|
|
<<"retained/", (I + 60)>>,
|
|
<<"retained">>,
|
|
[{qos, 0}, {retain, true}]
|
|
)
|
|
end,
|
|
|
|
?check_trace(
|
|
{ok, {ok, _}} =
|
|
?wait_async_action(
|
|
lists:foreach(Each, lists:seq(1, 5)),
|
|
#{?snk_kind := message_retained, topic := <<"retained/A">>},
|
|
500
|
|
),
|
|
[]
|
|
),
|
|
|
|
{ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
|
|
#{data := Msgs, meta := _} = decode_json(MsgsJson),
|
|
MsgLen = erlang:length(Msgs),
|
|
?assert(
|
|
MsgLen =:= 5,
|
|
io_lib:format("message length is:~p~n", [MsgLen])
|
|
),
|
|
|
|
[First | _] = Msgs,
|
|
?assertMatch(
|
|
#{
|
|
msgid := _,
|
|
topic := _,
|
|
qos := _,
|
|
publish_at := _,
|
|
from_clientid := _,
|
|
from_username := _
|
|
},
|
|
First
|
|
),
|
|
|
|
ok = emqtt:disconnect(C1).
|
|
|
|
t_messages_page(_) ->
|
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
|
{ok, _} = emqtt:connect(C1),
|
|
emqx_retainer:clean(),
|
|
|
|
Each = fun(I) ->
|
|
emqtt:publish(
|
|
C1,
|
|
<<"retained/", (I + 60)>>,
|
|
<<"retained">>,
|
|
[{qos, 0}, {retain, true}]
|
|
)
|
|
end,
|
|
|
|
?check_trace(
|
|
{ok, {ok, _}} =
|
|
?wait_async_action(
|
|
lists:foreach(Each, lists:seq(1, 5)),
|
|
#{?snk_kind := message_retained, topic := <<"retained/A">>},
|
|
500
|
|
),
|
|
[]
|
|
),
|
|
Page = 4,
|
|
|
|
{ok, MsgsJson} = request_api(
|
|
get,
|
|
api_path([
|
|
"mqtt", "retainer", "messages?page=" ++ erlang:integer_to_list(Page) ++ "&limit=1"
|
|
])
|
|
),
|
|
#{data := Msgs, meta := #{page := Page, limit := 1}} = decode_json(MsgsJson),
|
|
MsgLen = erlang:length(Msgs),
|
|
?assert(
|
|
MsgLen =:= 1,
|
|
io_lib:format("message length is:~p~n", [MsgLen])
|
|
),
|
|
|
|
[OnlyOne] = Msgs,
|
|
Topic = <<"retained/", (Page + 60)>>,
|
|
?assertMatch(
|
|
#{
|
|
msgid := _,
|
|
topic := Topic,
|
|
qos := _,
|
|
publish_at := _,
|
|
from_clientid := _,
|
|
from_username := _
|
|
},
|
|
OnlyOne
|
|
),
|
|
|
|
ok = emqtt:disconnect(C1).
|
|
|
|
t_lookup_and_delete(_) ->
|
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
|
{ok, _} = emqtt:connect(C1),
|
|
emqx_retainer:clean(),
|
|
timer:sleep(300),
|
|
|
|
emqtt:publish(C1, <<"retained/api">>, <<"retained">>, [{qos, 0}, {retain, true}]),
|
|
timer:sleep(300),
|
|
|
|
API = api_path(["mqtt", "retainer", "message", "retained%2Fapi"]),
|
|
{ok, LookupJson} = request_api(get, API),
|
|
LookupResult = decode_json(LookupJson),
|
|
|
|
?assertMatch(
|
|
#{
|
|
msgid := _,
|
|
topic := _,
|
|
qos := _,
|
|
payload := _,
|
|
publish_at := _,
|
|
from_clientid := _,
|
|
from_username := _
|
|
},
|
|
LookupResult
|
|
),
|
|
|
|
{ok, []} = request_api(delete, API),
|
|
|
|
{error, {"HTTP/1.1", 404, "Not Found"}} = request_api(get, API),
|
|
|
|
ok = emqtt:disconnect(C1).
|
|
|
|
t_change_storage_type(_Config) ->
|
|
Path = api_path(["mqtt", "retainer"]),
|
|
{ok, ConfJson} = request_api(get, Path),
|
|
RawConf = emqx_utils_json:decode(ConfJson, [return_maps]),
|
|
%% pre-conditions
|
|
?assertMatch(
|
|
#{
|
|
<<"backend">> := #{
|
|
<<"type">> := <<"built_in_database">>,
|
|
<<"storage_type">> := <<"ram">>
|
|
},
|
|
<<"enable">> := true
|
|
},
|
|
RawConf
|
|
),
|
|
?assertEqual(ram_copies, mnesia:table_info(?TAB_INDEX_META, storage_type)),
|
|
?assertEqual(ram_copies, mnesia:table_info(?TAB_MESSAGE, storage_type)),
|
|
?assertEqual(ram_copies, mnesia:table_info(?TAB_INDEX, storage_type)),
|
|
%% insert some retained messages
|
|
{ok, C0} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
|
{ok, _} = emqtt:connect(C0),
|
|
ok = snabbkaffe:start_trace(),
|
|
Topic = <<"retained">>,
|
|
Payload = <<"retained">>,
|
|
{ok, {ok, _}} =
|
|
?wait_async_action(
|
|
emqtt:publish(C0, Topic, Payload, [{qos, 0}, {retain, true}]),
|
|
#{?snk_kind := message_retained, topic := Topic},
|
|
500
|
|
),
|
|
emqtt:stop(C0),
|
|
ok = snabbkaffe:stop(),
|
|
{ok, MsgsJson0} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
|
|
#{data := Msgs0, meta := _} = decode_json(MsgsJson0),
|
|
?assertEqual(1, length(Msgs0)),
|
|
|
|
ChangedConf = emqx_utils_maps:deep_merge(
|
|
RawConf,
|
|
#{
|
|
<<"backend">> =>
|
|
#{<<"storage_type">> => <<"disc">>}
|
|
}
|
|
),
|
|
{ok, UpdateResJson} = request_api(
|
|
put,
|
|
Path,
|
|
[],
|
|
auth_header_(),
|
|
ChangedConf
|
|
),
|
|
UpdatedRawConf = emqx_utils_json:decode(UpdateResJson, [return_maps]),
|
|
?assertMatch(
|
|
#{
|
|
<<"backend">> := #{
|
|
<<"type">> := <<"built_in_database">>,
|
|
<<"storage_type">> := <<"disc">>
|
|
},
|
|
<<"enable">> := true
|
|
},
|
|
UpdatedRawConf
|
|
),
|
|
?assertEqual(disc_copies, mnesia:table_info(?TAB_INDEX_META, storage_type)),
|
|
?assertEqual(disc_copies, mnesia:table_info(?TAB_MESSAGE, storage_type)),
|
|
?assertEqual(disc_copies, mnesia:table_info(?TAB_INDEX, storage_type)),
|
|
%% keep retained messages
|
|
{ok, MsgsJson1} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
|
|
#{data := Msgs1, meta := _} = decode_json(MsgsJson1),
|
|
?assertEqual(1, length(Msgs1)),
|
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
|
{ok, _} = emqtt:connect(C1),
|
|
{ok, _, _} = emqtt:subscribe(C1, Topic),
|
|
|
|
receive
|
|
{publish, #{topic := T, payload := P, retain := R}} ->
|
|
?assertEqual(Payload, P),
|
|
?assertEqual(Topic, T),
|
|
?assert(R),
|
|
ok
|
|
after 500 ->
|
|
emqtt:stop(C1),
|
|
ct:fail("should have preserved retained messages")
|
|
end,
|
|
emqtt:stop(C1),
|
|
|
|
ok.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% HTTP Request
|
|
%%--------------------------------------------------------------------
|
|
decode_json(Data) ->
|
|
BinJson = emqx_utils_json:decode(Data, [return_maps]),
|
|
emqx_utils_maps:unsafe_atom_key_map(BinJson).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal funcs
|
|
%%--------------------------------------------------------------------
|
|
raw_systopic_conf() ->
|
|
#{
|
|
<<"sys_event_messages">> =>
|
|
#{
|
|
<<"client_connected">> => false,
|
|
<<"client_disconnected">> => false,
|
|
<<"client_subscribed">> => false,
|
|
<<"client_unsubscribed">> => false
|
|
},
|
|
<<"sys_heartbeat_interval">> => <<"1440m">>,
|
|
<<"sys_msg_interval">> => <<"1440m">>
|
|
}.
|