Merge pull request #7206 from lafirest/coverage/retainer

test(retainer): improve test coverage from 63% to 92%
This commit is contained in:
lafirest 2022-03-04 16:34:20 +08:00 committed by GitHub
commit e298ff9dca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 202 additions and 141 deletions

View File

@ -104,7 +104,7 @@ parameters() ->
})}].
fields(message_summary) ->
[ {id, mk(binary(), #{desc => <<"Message ID">>})}
[ {msgid, mk(binary(), #{desc => <<"Message ID">>})}
, {topic, mk(binary(), #{desc => "The topic"})}
, {qos, mk(emqx_schema:qos(), #{desc => "The QoS"})}
, {publish_at, mk(string(), #{desc => "Publish datetime, in RFC 3339 format"})}

View File

@ -23,10 +23,8 @@
]).
start(_Type, _Args) ->
{ok, Sup} = emqx_retainer_sup:start_link(),
emqx_retainer_cli:load(),
{ok, Sup}.
emqx_retainer_sup:start_link().
stop(_State) ->
emqx_retainer_cli:unload().
ok.

View File

@ -1,37 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_retainer_cli).
-include("emqx_retainer.hrl").
%% APIs
-export([ load/0
, cmd/1
, unload/0
]).
load() ->
emqx_ctl:register_command(retainer, {?MODULE, cmd}, []).
cmd(_) ->
emqx_ctl:usage([{"retainer info", "Show the count of retained messages"},
{"retainer topics", "Show all topics of retained messages"},
{"retainer clean", "Clean all retained messages"},
{"retainer clean <Topic>", "Clean retained messages by the specified topic filter"}]).
unload() ->
emqx_ctl:unregister_command(retainer).

View File

@ -25,6 +25,7 @@
-export([ start_link/2
, dispatch/2
, refresh_limiter/0
, worker/0
]).
%% gen_server callbacks
@ -50,6 +51,9 @@ refresh_limiter() ->
end,
Workers).
worker() ->
gproc_pool:pick_worker(?POOL, self()).
%%--------------------------------------------------------------------
%% @doc
%% Starts the server
@ -79,6 +83,7 @@ start_link(Pool, Id) ->
{stop, Reason :: term()} |
ignore.
init([Pool, Id]) ->
erlang:process_flag(trap_exit, true),
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
Bucket = emqx:get_config([retainer, flow_control, limiter_bucket_name]),
Limiter = emqx_limiter_server:connect(shared, Bucket),
@ -188,10 +193,6 @@ format_status(_Opt, Status) ->
cast(Msg) ->
gen_server:cast(worker(), Msg).
%% @private
worker() ->
gproc_pool:pick_worker(?POOL, self()).
-spec dispatch(context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}.
dispatch(Context, Pid, Topic, Cursor, Limiter) ->
Mod = emqx_retainer:get_backend_module(),

View File

@ -99,6 +99,10 @@ t_store_and_clean(_) ->
<<"this is a retained message">>,
[{qos, 0}, {retain, true}]),
timer:sleep(100),
{ok, List} = emqx_retainer:page_read(<<"retained">>, 1, 10),
?assertEqual(1, length(List)),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
?assertEqual(1, length(receive_messages(1))),
@ -109,6 +113,10 @@ t_store_and_clean(_) ->
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
?assertEqual(0, length(receive_messages(1))),
ok = emqx_retainer:clean(),
{ok, List2} = emqx_retainer:page_read(<<"retained">>, 1, 10),
?assertEqual(0, length(List2)),
ok = emqtt:disconnect(C1).
t_retain_handling(_) ->
@ -337,6 +345,96 @@ t_flow_control(_) ->
timer:sleep(500),
ok.
t_clear_expired(_) ->
ConfMod = fun(Conf) ->
Conf#{<<"msg_clear_interval">> := <<"1s">>, <<"msg_expiry_interval">> := <<"3s">>}
end,
Case = fun() ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
lists:foreach(fun(I) ->
emqtt:publish(C1,
<<"retained/", (I + 60):8/unsigned-integer>>,
#{'Message-Expiry-Interval' => 3},
<<"retained">>,
[{qos, 0}, {retain, true}])
end,
lists:seq(1, 5)),
timer:sleep(1000),
{ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
?assertEqual(5, erlang:length(List)),
timer:sleep(4500),
{ok, List2} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
?assertEqual(0, erlang:length(List2)),
ok = emqtt:disconnect(C1)
end,
with_conf(ConfMod, Case).
t_max_payload_size(_) ->
ConfMod = fun(Conf) -> Conf#{<<"max_payload_size">> := 6} end,
Case = fun() ->
emqx_retainer:clean(),
timer:sleep(500),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
emqtt:publish(C1,
<<"retained/1">>, #{}, <<"1234">>, [{qos, 0}, {retain, true}]),
emqtt:publish(C1,
<<"retained/2">>, #{}, <<"1234567">>, [{qos, 0}, {retain, true}]),
timer:sleep(500),
{ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
?assertEqual(1, erlang:length(List)),
ok = emqtt:disconnect(C1)
end,
with_conf(ConfMod, Case).
t_page_read(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
ok = emqx_retainer:clean(),
timer:sleep(500),
Fun = fun(I) ->
emqtt:publish(C1,
<<"retained/", (I + 60)>>,
<<"this is a retained message">>,
[{qos, 0}, {retain, true}]
)
end,
lists:foreach(Fun, lists:seq(1, 9)),
timer:sleep(200),
{ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 5),
?assertEqual(5, length(List)),
{ok, List2} = emqx_retainer:page_read(<<"retained/+">>, 2, 5),
?assertEqual(4, length(List2)),
ok = emqtt:disconnect(C1).
t_only_for_coverage(_) ->
?assertEqual("retainer", emqx_retainer_schema:namespace()),
ignored = gen_server:call(emqx_retainer, unexpected),
ok = gen_server:cast(emqx_retainer, unexpected),
unexpected = erlang:send(erlang:whereis(emqx_retainer), unexpected),
Dispatcher = emqx_retainer_dispatcher:worker(),
ignored = gen_server:call(Dispatcher, unexpected),
ok = gen_server:cast(Dispatcher, unexpected),
unexpected = erlang:send(Dispatcher, unexpected),
true = erlang:exit(Dispatcher, normal),
ok.
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
@ -356,3 +454,15 @@ receive_messages(Count, Msgs) ->
after 2000 ->
Msgs
end.
with_conf(ConfMod, Case) ->
Conf = emqx:get_raw_config([retainer]),
NewConf = ConfMod(Conf),
emqx_retainer:update_config(NewConf),
try
Case(),
emqx_retainer:update_config(Conf)
catch Type:Error:Strace ->
emqx_retainer:update_config(Conf),
erlang:raise(Type, Error, Strace)
end.

View File

@ -22,128 +22,117 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-import(emqx_common_test_http, [ request_api/3
, request_api/5
, get_http_data/1
, create_default_app/0
, delete_default_app/0
, default_auth_header/0
]).
-define(HOST, "http://127.0.0.1:8081/").
-define(API_VERSION, "v4").
-define(BASE_PATH, "api").
-define(CFG_URI, "/configs/retainer").
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
-import(emqx_mgmt_api_test_util, [request_api/2, request_api/5, api_path/1, auth_header_/0]).
all() ->
%% TODO: V5 API
%% emqx_common_test_helpers:all(?MODULE).
[].
groups() ->
[].
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
application:load(emqx_conf),
ok = ekka:start(),
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
meck:expect(emqx_alarm, activate, 3, ok),
meck:expect(emqx_alarm, deactivate, 3, ok),
application:stop(emqx_retainer),
emqx_common_test_helpers:start_apps([emqx_retainer, emqx_management], fun set_special_configs/1),
create_default_app(),
emqx_retainer_SUITE:load_base_conf(),
emqx_mgmt_api_test_util:init_suite([emqx_retainer]),
Config.
end_per_suite(_Config) ->
delete_default_app(),
emqx_common_test_helpers:stop_apps([emqx_management, emqx_retainer]).
end_per_suite(Config) ->
ekka:stop(),
mria:stop(),
mria_mnesia:delete_schema(),
meck:unload(emqx_alarm),
emqx_mgmt_api_test_util:end_suite([emqx_slow_subs]),
Config.
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(),
application:ensure_all_started(emqx_retainer),
timer:sleep(500),
Config.
set_special_configs(emqx_retainer) ->
emqx_retainer_SUITE:init_emqx_retainer_conf();
set_special_configs(emqx_management) ->
emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}],
applications =>[#{id => "admin", secret => "public"}]}),
ok;
set_special_configs(_) ->
ok.
%%------------------------------------------------------------------------------
%% Test Cases
%%------------------------------------------------------------------------------
t_config(_Config) ->
{ok, Return} = request_http_rest_lookup([?CFG_URI]),
NowCfg = get_http_data(Return),
NewCfg = NowCfg#{<<"msg_expiry_interval">> => timer:seconds(60)},
RetainerConf = #{<<"emqx_retainer">> => NewCfg},
Path = api_path(["mqtt", "retainer"]),
{ok, ConfJson} = request_api(get, Path),
ReturnConf = decode_json(ConfJson),
?assertMatch(#{backend := _, enable := _, flow_control := _,
max_payload_size := _, msg_clear_interval := _,
msg_expiry_interval := _},
ReturnConf),
{ok, _} = request_http_rest_update([?CFG_URI], RetainerConf),
{ok, UpdateReturn} = request_http_rest_lookup(["retainer"]),
?assertEqual(NewCfg, get_http_data(UpdateReturn)),
ok.
UpdateConf = fun(Enable) ->
RawConf = emqx_json:decode(ConfJson, [return_maps]),
UpdateJson = RawConf#{<<"enable">> := Enable},
{ok, UpdateResJson} = request_api(put,
Path, [], auth_header_(), UpdateJson),
UpdateRawConf = emqx_json:decode(UpdateResJson, [return_maps]),
?assertEqual(Enable, maps:get(<<"enable">>, UpdateRawConf))
end,
t_enable_disable(_Config) ->
Conf = switch_emqx_retainer(undefined, true),
UpdateConf(false),
UpdateConf(true).
t_messages(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
emqx_retainer:clean(),
timer:sleep(500),
Each = fun(I) ->
emqtt:publish(C1, <<"retained/", (I + 60)>>,
<<"retained">>,
[{qos, 0}, {retain, true}])
end,
lists:foreach(Each, lists:seq(1, 5)),
{ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
Msgs = decode_json(MsgsJson),
?assert(erlang:length(Msgs) >= 5), %% maybe has $SYS messages
[First | _] = Msgs,
?assertMatch(#{msgid := _, topic := _, qos := _,
publish_at := _, from_clientid := _, from_username := _
},
First),
ok = emqtt:disconnect(C1).
t_lookup_and_delete(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
emqx_retainer:clean(),
timer:sleep(500),
emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]),
timer:sleep(100),
emqtt:publish(C1, <<"retained/api">>, <<"retained">>, [{qos, 0}, {retain, true}]),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
?assertEqual(1, length(receive_messages(1))),
API = api_path(["mqtt", "retainer", "message", "retained%2Fapi"]),
{ok, LookupJson} = request_api(get, API),
LookupResult = decode_json(LookupJson),
_ = switch_emqx_retainer(Conf, false),
?assertMatch(#{msgid := _, topic := _, qos := _, payload := _,
publish_at := _, from_clientid := _, from_username := _
},
LookupResult),
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>),
emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]),
timer:sleep(100),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
?assertEqual(0, length(receive_messages(1))),
{ok, []} = request_api(delete, API),
{error, {"HTTP/1.1", 404, "Not Found"}} = request_api(get, API),
ok = emqtt:disconnect(C1).
%%--------------------------------------------------------------------
%% HTTP Request
%%--------------------------------------------------------------------
request_http_rest_lookup(Path) ->
request_api(get, uri([Path]), default_auth_header()).
request_http_rest_update(Path, Params) ->
request_api(put, uri([Path]), [], default_auth_header(), Params).
uri(Parts) when is_list(Parts) ->
NParts = [b2l(E) || E <- Parts],
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]).
%% @private
b2l(B) when is_binary(B) ->
binary_to_list(B);
b2l(L) when is_list(L) ->
L.
receive_messages(Count) ->
receive_messages(Count, []).
receive_messages(0, Msgs) ->
Msgs;
receive_messages(Count, Msgs) ->
receive
{publish, Msg} ->
ct:log("Msg: ~p ~n", [Msg]),
receive_messages(Count-1, [Msg|Msgs]);
Other ->
ct:log("Other Msg: ~p~n",[Other]),
receive_messages(Count, Msgs)
after 2000 ->
Msgs
end.
switch_emqx_retainer(undefined, IsEnable) ->
{ok, Return} = request_http_rest_lookup([?COMMON_SHARD]),
NowCfg = get_http_data(Return),
switch_emqx_retainer(NowCfg, IsEnable);
switch_emqx_retainer(NowCfg, IsEnable) ->
NewCfg = NowCfg#{<<"enable">> => IsEnable},
RetainerConf = #{<<"emqx_retainer">> => NewCfg},
{ok, _} = request_http_rest_update([?CFG_URI], RetainerConf),
NewCfg.
decode_json(Data) ->
BinJson = emqx_json:decode(Data, [return_maps]),
emqx_map_lib:unsafe_atom_key_map(BinJson).