diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index d7b143e43..b8544d02d 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -50,7 +50,12 @@ end_per_suite(Config) -> groups() -> [ - {mqttv3, [], emqx_common_test_helpers:all(?MODULE)}, + {mqttv3, [], + emqx_common_test_helpers:all(?MODULE) -- + [ + t_session_expire_with_delayed_willmsg, + t_no_takeover_with_delayed_willmsg + ]}, {mqttv5, [], emqx_common_test_helpers:all(?MODULE)} ]. @@ -239,7 +244,7 @@ t_takeover_clean_session_with_delayed_willmsg(Config) -> {will_payload, <<"willpayload_delay10">>}, {will_qos, 1}, %% mqttv5 only - {properties, #{'Will-Delay-Interval' => 10000}} + {will_props, #{'Will-Delay-Interval' => 10}} ], WillOptsClean = [ {proto_ver, ?config(mqtt_vsn, Config)}, @@ -286,11 +291,124 @@ t_takeover_clean_session_with_delayed_willmsg(Config) -> ?assert(not is_process_alive(CPid1)), ok. +t_no_takeover_with_delayed_willmsg(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay3">>}, + {will_qos, 1}, + %secs + {will_props, #{'Will-Delay-Interval' => 3}}, + % secs + {properties, #{'Session-Expiry-Interval' => 10}} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay3">> and delay-interval 3s + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + assert_messages_missed(Client1Msgs, Received), + #{client := [CPidSub, CPid1]} = FCtx, + %% WHEN: client disconnects abnormally AND no reconnect after 3s. + exit(CPid1, kill), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), killed), + + Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + + {IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay3">>), + ?assertNot(IsWill), + ?assertEqual([], ReceivedNoWill), + %% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after WILL delay. + Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], + {IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay3">>), + ?assertEqual([], ReceivedNoWill11), + ?assert(IsWill11), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ok. + +t_session_expire_with_delayed_willmsg(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 10}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> + %% and delay-interval 10s > session expiry 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + assert_messages_missed(Client1Msgs, Received), + #{client := [CPidSub, CPid1]} = FCtx, + %% WHEN: client disconnects abnormally AND no reconnect after 3s. + exit(CPid1, kill), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), killed), + + Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + {IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay10">>), + ?assertNot(IsWill), + ?assertEqual([], ReceivedNoWill), + %% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after session expiry. + Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], + {IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay10">>), + ?assertEqual([], ReceivedNoWill11), + ?assert(IsWill11), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ok. + t_kick_session(Config) -> process_flag(trap_exit, true), ClientId = atom_to_binary(?FUNCTION_NAME), WillTopic = <>/binary>>, - % emqx_logger:set_log_level(debug), WillOpts = [ {proto_ver, ?config(mqtt_vsn, Config)}, {clean_start, false}, @@ -440,4 +558,6 @@ assert_client_exit(Pid, v3, takenover) -> assert_client_exit(Pid, v3, kicked) -> ?assertReceive({'EXIT', Pid, _}); assert_client_exit(Pid, v5, kicked) -> - ?assertReceive({'EXIT', Pid, {disconnected, ?RC_ADMINISTRATIVE_ACTION, _}}). + ?assertReceive({'EXIT', Pid, {disconnected, ?RC_ADMINISTRATIVE_ACTION, _}}); +assert_client_exit(Pid, _, killed) -> + ?assertReceive({'EXIT', Pid, killed}).