refactor(retainer): refactor emqx_retainer test case
This commit is contained in:
parent
47378e0e96
commit
5a87d941f6
|
@ -187,8 +187,8 @@ init([]) ->
|
||||||
end}.
|
end}.
|
||||||
|
|
||||||
handle_call({update_config, Conf}, _, State) ->
|
handle_call({update_config, Conf}, _, State) ->
|
||||||
State2 = update_config(State, Conf),
|
{ok, Config} = emqx:update_config([?APP], Conf),
|
||||||
_ = emqx:update_config([?APP], Conf),
|
State2 = update_config(State, maps:get(config, Config)),
|
||||||
{reply, ok, State2};
|
{reply, ok, State2};
|
||||||
|
|
||||||
handle_call({wait_semaphore, Id}, From, #{wait_quotas := Waits} = State) ->
|
handle_call({wait_semaphore, Id}, From, #{wait_quotas := Waits} = State) ->
|
||||||
|
|
|
@ -137,11 +137,7 @@ config(put, Req) ->
|
||||||
try
|
try
|
||||||
{ok, Body, _} = cowboy_req:read_body(Req),
|
{ok, Body, _} = cowboy_req:read_body(Req),
|
||||||
Cfg = emqx_json:decode(Body),
|
Cfg = emqx_json:decode(Body),
|
||||||
{ok, RawConf} = hocon:binary(jsx:encode(#{<<"mqtt_retainer">> => Cfg}),
|
emqx_retainer:update_config(Cfg),
|
||||||
#{format => richmap}),
|
|
||||||
RichConf = hocon_schema:check(emqx_retainer_schema, RawConf, #{atom_key => true}),
|
|
||||||
#{mqtt_retainer := Conf} = hocon_schema:richmap_to_map(RichConf),
|
|
||||||
emqx_retainer:update_config(Conf),
|
|
||||||
{200, #{<<"content-type">> => <<"text/plain">>}, <<"Update configs successfully">>}
|
{200, #{<<"content-type">> => <<"text/plain">>}, <<"Update configs successfully">>}
|
||||||
catch _:Reason:_ ->
|
catch _:Reason:_ ->
|
||||||
{400,
|
{400,
|
||||||
|
|
|
@ -26,59 +26,38 @@
|
||||||
|
|
||||||
all() -> emqx_ct:all(?MODULE).
|
all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
|
-define(BASE_CONF, <<"""
|
||||||
|
emqx_retainer {
|
||||||
|
enable = true
|
||||||
|
msg_clear_interval = 0s
|
||||||
|
msg_expiry_interval = 0s
|
||||||
|
max_payload_size = 1MB
|
||||||
|
flow_control {
|
||||||
|
max_read_number = 0
|
||||||
|
msg_deliver_quota = 0
|
||||||
|
quota_release_interval = 0s
|
||||||
|
}
|
||||||
|
config {
|
||||||
|
type = built_in_database
|
||||||
|
storage_type = ram
|
||||||
|
max_retained_messages = 0
|
||||||
|
}
|
||||||
|
}""">>).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Setups
|
%% Setups
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
application:stop(emqx_retainer),
|
ok = emqx_config:init_load(emqx_retainer_schema, ?BASE_CONF),
|
||||||
emqx_ct_helpers:start_apps([emqx_retainer], fun set_special_configs/1),
|
emqx_ct_helpers:start_apps([emqx_retainer]),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
emqx_ct_helpers:stop_apps([emqx_retainer]).
|
emqx_ct_helpers:stop_apps([emqx_retainer]).
|
||||||
|
|
||||||
init_per_testcase(TestCase, Config) ->
|
|
||||||
emqx_retainer:clean(),
|
|
||||||
DefaultCfg = new_emqx_retainer_conf(),
|
|
||||||
NewCfg = case TestCase of
|
|
||||||
t_message_expiry_2 ->
|
|
||||||
DefaultCfg#{msg_expiry_interval := 2000};
|
|
||||||
t_flow_control ->
|
|
||||||
DefaultCfg#{flow_control := #{max_read_number => 1,
|
|
||||||
msg_deliver_quota => 1,
|
|
||||||
quota_release_interval => timer:seconds(1)}};
|
|
||||||
_ ->
|
|
||||||
DefaultCfg
|
|
||||||
end,
|
|
||||||
emqx_retainer:update_config(NewCfg),
|
|
||||||
application:ensure_all_started(emqx_retainer),
|
|
||||||
Config.
|
|
||||||
|
|
||||||
set_special_configs(emqx_retainer) ->
|
|
||||||
init_emqx_retainer_conf();
|
|
||||||
set_special_configs(_) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
init_emqx_retainer_conf() ->
|
|
||||||
emqx_config:put([?APP], new_emqx_retainer_conf()).
|
|
||||||
|
|
||||||
new_emqx_retainer_conf() ->
|
|
||||||
#{enable => true,
|
|
||||||
msg_expiry_interval => 0,
|
|
||||||
msg_clear_interval => 0,
|
|
||||||
config => #{type => built_in_database,
|
|
||||||
max_retained_messages => 0,
|
|
||||||
storage_type => ram},
|
|
||||||
flow_control => #{max_read_number => 0,
|
|
||||||
msg_deliver_quota => 0,
|
|
||||||
quota_release_interval => 0},
|
|
||||||
max_payload_size => 1024 * 1024}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test Cases
|
%% Test Cases
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
t_store_and_clean(_) ->
|
t_store_and_clean(_) ->
|
||||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(C1),
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
@ -184,13 +163,14 @@ t_message_expiry(_) ->
|
||||||
ok = emqtt:disconnect(C1).
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
t_message_expiry_2(_) ->
|
t_message_expiry_2(_) ->
|
||||||
|
emqx_retainer:update_config(#{<<"msg_expiry_interval">> => <<"2s">>}),
|
||||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(C1),
|
{ok, _} = emqtt:connect(C1),
|
||||||
emqtt:publish(C1, <<"retained">>, <<"expire">>, [{qos, 0}, {retain, true}]),
|
emqtt:publish(C1, <<"retained">>, <<"expire">>, [{qos, 0}, {retain, true}]),
|
||||||
|
|
||||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
||||||
?assertEqual(1, length(receive_messages(1))),
|
?assertEqual(1, length(receive_messages(1))),
|
||||||
timer:sleep(3000),
|
timer:sleep(4000),
|
||||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
||||||
?assertEqual(0, length(receive_messages(1))),
|
?assertEqual(0, length(receive_messages(1))),
|
||||||
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>),
|
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>),
|
||||||
|
@ -216,6 +196,9 @@ t_clean(_) ->
|
||||||
ok = emqtt:disconnect(C1).
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
t_flow_control(_) ->
|
t_flow_control(_) ->
|
||||||
|
emqx_retainer:update_config(#{<<"flow_control">> => #{<<"max_read_number">> => 1,
|
||||||
|
<<"msg_deliver_quota">> => 1,
|
||||||
|
<<"quota_release_interval">> => <<"1s">>}}),
|
||||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(C1),
|
{ok, _} = emqtt:connect(C1),
|
||||||
emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]),
|
emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]),
|
||||||
|
|
|
@ -21,27 +21,38 @@
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
-define(BASE_CONF, <<"""
|
||||||
|
emqx_retainer {
|
||||||
|
enable = true
|
||||||
|
msg_clear_interval = 0s
|
||||||
|
msg_expiry_interval = 0s
|
||||||
|
max_payload_size = 1MB
|
||||||
|
flow_control {
|
||||||
|
max_read_number = 0
|
||||||
|
msg_deliver_quota = 0
|
||||||
|
quota_release_interval = 0s
|
||||||
|
}
|
||||||
|
config {
|
||||||
|
type = built_in_database
|
||||||
|
storage_type = ram
|
||||||
|
max_retained_messages = 0
|
||||||
|
}
|
||||||
|
}""">>).
|
||||||
|
|
||||||
all() -> emqx_ct:all(?MODULE).
|
all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
|
ok = emqx_config:init_load(emqx_retainer_schema, ?BASE_CONF),
|
||||||
%% Meck emqtt
|
%% Meck emqtt
|
||||||
ok = meck:new(emqtt, [non_strict, passthrough, no_history, no_link]),
|
ok = meck:new(emqtt, [non_strict, passthrough, no_history, no_link]),
|
||||||
%% Start Apps
|
%% Start Apps
|
||||||
emqx_ct_helpers:start_apps([emqx_retainer], fun set_special_configs/1),
|
emqx_ct_helpers:start_apps([emqx_retainer]),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
ok = meck:unload(emqtt),
|
ok = meck:unload(emqtt),
|
||||||
emqx_ct_helpers:stop_apps([emqx_retainer]).
|
emqx_ct_helpers:stop_apps([emqx_retainer]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Helpers
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
set_special_configs(emqx_retainer) ->
|
|
||||||
emqx_retainer_SUITE:init_emqx_retainer_conf();
|
|
||||||
set_special_configs(_) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
client_info(Key, Client) ->
|
client_info(Key, Client) ->
|
||||||
maps:get(Key, maps:from_list(emqtt:info(Client)), undefined).
|
maps:get(Key, maps:from_list(emqtt:info(Client)), undefined).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue