test(willmsg): session taken over before willmsg delay /session expire
This commit is contained in:
parent
5397402396
commit
6311b582ec
|
@ -54,7 +54,9 @@ groups() ->
|
|||
emqx_common_test_helpers:all(?MODULE) --
|
||||
[
|
||||
t_session_expire_with_delayed_willmsg,
|
||||
t_no_takeover_with_delayed_willmsg
|
||||
t_no_takeover_with_delayed_willmsg,
|
||||
t_takeover_before_session_expire,
|
||||
t_takeover_before_willmsg_expire
|
||||
]},
|
||||
{mqttv5, [], emqx_common_test_helpers:all(?MODULE)}
|
||||
].
|
||||
|
@ -405,6 +407,144 @@ t_session_expire_with_delayed_willmsg(Config) ->
|
|||
?assert(not is_process_alive(CPid1)),
|
||||
ok.
|
||||
|
||||
t_takeover_before_session_expire(Config) ->
|
||||
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
|
||||
process_flag(trap_exit, true),
|
||||
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
|
||||
Client1Msgs = messages(ClientId, 0, 10),
|
||||
emqx_logger:set_log_level(debug),
|
||||
WillOpts = [
|
||||
{proto_ver, ?config(mqtt_vsn, Config)},
|
||||
{clean_start, false},
|
||||
{will_topic, WillTopic},
|
||||
{will_payload, <<"willpayload_delay10">>},
|
||||
{will_qos, 1},
|
||||
{will_props, #{'Will-Delay-Interval' => 10}},
|
||||
{properties, #{'Session-Expiry-Interval' => 3}}
|
||||
],
|
||||
Commands =
|
||||
%% GIVEN: client connect with willmsg payload <<"willpayload_delay10">>
|
||||
%% and delay-interval 10s > session expiry 3s.
|
||||
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
|
||||
[
|
||||
{fun start_client/5, [
|
||||
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
|
||||
]}
|
||||
] ++
|
||||
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
|
||||
[
|
||||
%% avoid two clients race for session takeover
|
||||
{
|
||||
fun(CTX) ->
|
||||
timer:sleep(100),
|
||||
CTX
|
||||
end,
|
||||
[]
|
||||
}
|
||||
] ++
|
||||
%% WHEN: client session is taken over with in 3s.
|
||||
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}],
|
||||
|
||||
FCtx = lists:foldl(
|
||||
fun({Fun, Args}, Ctx) ->
|
||||
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
|
||||
apply(Fun, [Ctx | Args])
|
||||
end,
|
||||
#{},
|
||||
Commands
|
||||
),
|
||||
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
|
||||
ct:pal("FCtx: ~p", [FCtx]),
|
||||
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
|
||||
|
||||
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
|
||||
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
|
||||
assert_messages_missed(Client1Msgs, Received),
|
||||
|
||||
Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
|
||||
{IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay10">>),
|
||||
?assertNot(IsWill),
|
||||
?assertEqual([], ReceivedNoWill),
|
||||
%% THEN: for MQTT v5, payload <<"willpayload_delay10">> should NOT be published.
|
||||
Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)],
|
||||
{IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay10">>),
|
||||
?assertEqual([], ReceivedNoWill11),
|
||||
?assertNot(IsWill11),
|
||||
emqtt:stop(CPidSub),
|
||||
emqtt:stop(CPid2),
|
||||
?assert(not is_process_alive(CPid1)),
|
||||
ok.
|
||||
|
||||
t_takeover_before_willmsg_expire(Config) ->
|
||||
?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"),
|
||||
process_flag(trap_exit, true),
|
||||
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||
WillTopic = <<ClientId/binary, <<"willtopic">>/binary>>,
|
||||
Client1Msgs = messages(ClientId, 0, 10),
|
||||
emqx_logger:set_log_level(debug),
|
||||
WillOpts = [
|
||||
{proto_ver, ?config(mqtt_vsn, Config)},
|
||||
{clean_start, false},
|
||||
{will_topic, WillTopic},
|
||||
{will_payload, <<"willpayload_delay10">>},
|
||||
{will_qos, 1},
|
||||
{will_props, #{'Will-Delay-Interval' => 3}},
|
||||
{properties, #{'Session-Expiry-Interval' => 10}}
|
||||
],
|
||||
Commands =
|
||||
%% GIVEN: client connect with willmsg payload <<"willpayload_delay10">>
|
||||
%% and will-delay-interval 3s < session expiry 10s.
|
||||
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
|
||||
[
|
||||
{fun start_client/5, [
|
||||
<<ClientId/binary, <<"_willsub">>/binary>>, WillTopic, ?QOS_1, []
|
||||
]}
|
||||
] ++
|
||||
[{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++
|
||||
[
|
||||
%% avoid two clients race for session takeover
|
||||
{
|
||||
fun(CTX) ->
|
||||
timer:sleep(100),
|
||||
CTX
|
||||
end,
|
||||
[]
|
||||
}
|
||||
] ++
|
||||
%% WHEN: another client takeover the session with in 3s.
|
||||
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}],
|
||||
|
||||
FCtx = lists:foldl(
|
||||
fun({Fun, Args}, Ctx) ->
|
||||
ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]),
|
||||
apply(Fun, [Ctx | Args])
|
||||
end,
|
||||
#{},
|
||||
Commands
|
||||
),
|
||||
#{client := [CPid2, CPidSub, CPid1]} = FCtx,
|
||||
ct:pal("FCtx: ~p", [FCtx]),
|
||||
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
|
||||
|
||||
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
|
||||
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
|
||||
assert_messages_missed(Client1Msgs, Received),
|
||||
|
||||
Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
|
||||
{IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay10">>),
|
||||
?assertNot(IsWill),
|
||||
?assertEqual([], ReceivedNoWill),
|
||||
%% THEN: for MQTT v5, payload <<"willpayload_delay10">> should NOT be published after 3s.
|
||||
Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)],
|
||||
{IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay10">>),
|
||||
?assertEqual([], ReceivedNoWill11),
|
||||
?assertNot(IsWill11),
|
||||
emqtt:stop(CPidSub),
|
||||
emqtt:stop(CPid2),
|
||||
?assert(not is_process_alive(CPid1)),
|
||||
ok.
|
||||
|
||||
t_kick_session(Config) ->
|
||||
process_flag(trap_exit, true),
|
||||
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||
|
|
Loading…
Reference in New Issue