test(willmsg): test will delay and session expires
will delay > session expire will delay < session expire timer triggered events are handled in seq, exclude the case of (will delay == session expire)
This commit is contained in:
parent
dd62280e04
commit
5397402396
|
@ -50,7 +50,12 @@ end_per_suite(Config) ->
|
||||||
|
|
||||||
groups() ->
|
groups() ->
|
||||||
[
|
[
|
||||||
{mqttv3, [], emqx_common_test_helpers:all(?MODULE)},
|
{mqttv3, [],
|
||||||
|
emqx_common_test_helpers:all(?MODULE) --
|
||||||
|
[
|
||||||
|
t_session_expire_with_delayed_willmsg,
|
||||||
|
t_no_takeover_with_delayed_willmsg
|
||||||
|
]},
|
||||||
{mqttv5, [], emqx_common_test_helpers:all(?MODULE)}
|
{mqttv5, [], emqx_common_test_helpers:all(?MODULE)}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
@ -239,7 +244,7 @@ t_takeover_clean_session_with_delayed_willmsg(Config) ->
|
||||||
{will_payload, <<"willpayload_delay10">>},
|
{will_payload, <<"willpayload_delay10">>},
|
||||||
{will_qos, 1},
|
{will_qos, 1},
|
||||||
%% mqttv5 only
|
%% mqttv5 only
|
||||||
{properties, #{'Will-Delay-Interval' => 10000}}
|
{will_props, #{'Will-Delay-Interval' => 10}}
|
||||||
],
|
],
|
||||||
WillOptsClean = [
|
WillOptsClean = [
|
||||||
{proto_ver, ?config(mqtt_vsn, Config)},
|
{proto_ver, ?config(mqtt_vsn, Config)},
|
||||||
|
@ -286,11 +291,124 @@ t_takeover_clean_session_with_delayed_willmsg(Config) ->
|
||||||
?assert(not is_process_alive(CPid1)),
|
?assert(not is_process_alive(CPid1)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_no_takeover_with_delayed_willmsg(Config) ->
|
||||||
|
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
|
||||||
|
Client1Msgs = messages(ClientId, 0, 10),
|
||||||
|
WillOpts = [
|
||||||
|
{proto_ver, ?config(mqtt_vsn, Config)},
|
||||||
|
{clean_start, false},
|
||||||
|
{will_topic, WillTopic},
|
||||||
|
{will_payload, <<"willpayload_delay3">>},
|
||||||
|
{will_qos, 1},
|
||||||
|
%secs
|
||||||
|
{will_props, #{'Will-Delay-Interval' => 3}},
|
||||||
|
% secs
|
||||||
|
{properties, #{'Session-Expiry-Interval' => 10}}
|
||||||
|
],
|
||||||
|
Commands =
|
||||||
|
%% GIVEN: client connect with willmsg payload <<"willpayload_delay3">> and delay-interval 3s
|
||||||
|
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
|
||||||
|
[
|
||||||
|
{fun start_client/5, [
|
||||||
|
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
|
||||||
|
]}
|
||||||
|
] ++
|
||||||
|
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
|
||||||
|
|
||||||
|
FCtx = lists:foldl(
|
||||||
|
fun({Fun, Args}, Ctx) ->
|
||||||
|
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
|
||||||
|
apply(Fun, [Ctx | Args])
|
||||||
|
end,
|
||||||
|
#{},
|
||||||
|
Commands
|
||||||
|
),
|
||||||
|
|
||||||
|
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
|
||||||
|
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
|
||||||
|
assert_messages_missed(Client1Msgs, Received),
|
||||||
|
#{client := [CPidSub, CPid1]} = FCtx,
|
||||||
|
%% WHEN: client disconnects abnormally AND no reconnect after 3s.
|
||||||
|
exit(CPid1, kill),
|
||||||
|
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), killed),
|
||||||
|
|
||||||
|
Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
|
||||||
|
|
||||||
|
{IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay3">>),
|
||||||
|
?assertNot(IsWill),
|
||||||
|
?assertEqual([], ReceivedNoWill),
|
||||||
|
%% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after WILL delay.
|
||||||
|
Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)],
|
||||||
|
{IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay3">>),
|
||||||
|
?assertEqual([], ReceivedNoWill11),
|
||||||
|
?assert(IsWill11),
|
||||||
|
emqtt:stop(CPidSub),
|
||||||
|
?assert(not is_process_alive(CPid1)),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_session_expire_with_delayed_willmsg(Config) ->
|
||||||
|
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
|
||||||
|
Client1Msgs = messages(ClientId, 0, 10),
|
||||||
|
WillOpts = [
|
||||||
|
{proto_ver, ?config(mqtt_vsn, Config)},
|
||||||
|
{clean_start, false},
|
||||||
|
{will_topic, WillTopic},
|
||||||
|
{will_payload, <<"willpayload_delay10">>},
|
||||||
|
{will_qos, 1},
|
||||||
|
{will_props, #{'Will-Delay-Interval' => 10}},
|
||||||
|
{properties, #{'Session-Expiry-Interval' => 3}}
|
||||||
|
],
|
||||||
|
Commands =
|
||||||
|
%% GIVEN: client connect with willmsg payload <<"willpayload_delay10">>
|
||||||
|
%% and delay-interval 10s > session expiry 3s.
|
||||||
|
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
|
||||||
|
[
|
||||||
|
{fun start_client/5, [
|
||||||
|
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
|
||||||
|
]}
|
||||||
|
] ++
|
||||||
|
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs],
|
||||||
|
|
||||||
|
FCtx = lists:foldl(
|
||||||
|
fun({Fun, Args}, Ctx) ->
|
||||||
|
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
|
||||||
|
apply(Fun, [Ctx | Args])
|
||||||
|
end,
|
||||||
|
#{},
|
||||||
|
Commands
|
||||||
|
),
|
||||||
|
|
||||||
|
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
|
||||||
|
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
|
||||||
|
assert_messages_missed(Client1Msgs, Received),
|
||||||
|
#{client := [CPidSub, CPid1]} = FCtx,
|
||||||
|
%% WHEN: client disconnects abnormally AND no reconnect after 3s.
|
||||||
|
exit(CPid1, kill),
|
||||||
|
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), killed),
|
||||||
|
|
||||||
|
Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
|
||||||
|
{IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay10">>),
|
||||||
|
?assertNot(IsWill),
|
||||||
|
?assertEqual([], ReceivedNoWill),
|
||||||
|
%% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after session expiry.
|
||||||
|
Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)],
|
||||||
|
{IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay10">>),
|
||||||
|
?assertEqual([], ReceivedNoWill11),
|
||||||
|
?assert(IsWill11),
|
||||||
|
emqtt:stop(CPidSub),
|
||||||
|
?assert(not is_process_alive(CPid1)),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_kick_session(Config) ->
|
t_kick_session(Config) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
ClientId = atom_to_binary(?FUNCTION_NAME),
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||||
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
|
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
|
||||||
% emqx_logger:set_log_level(debug),
|
|
||||||
WillOpts = [
|
WillOpts = [
|
||||||
{proto_ver, ?config(mqtt_vsn, Config)},
|
{proto_ver, ?config(mqtt_vsn, Config)},
|
||||||
{clean_start, false},
|
{clean_start, false},
|
||||||
|
@ -440,4 +558,6 @@ assert_client_exit(Pid, v3, takenover) ->
|
||||||
assert_client_exit(Pid, v3, kicked) ->
|
assert_client_exit(Pid, v3, kicked) ->
|
||||||
?assertReceive({'EXIT', Pid, _});
|
?assertReceive({'EXIT', Pid, _});
|
||||||
assert_client_exit(Pid, v5, kicked) ->
|
assert_client_exit(Pid, v5, kicked) ->
|
||||||
?assertReceive({'EXIT', Pid, {disconnected, ?RC_ADMINISTRATIVE_ACTION, _}}).
|
?assertReceive({'EXIT', Pid, {disconnected, ?RC_ADMINISTRATIVE_ACTION, _}});
|
||||||
|
assert_client_exit(Pid, _, killed) ->
|
||||||
|
?assertReceive({'EXIT', Pid, killed}).
|
||||||
|
|
Loading…
Reference in New Issue