diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index b8544d02d..bce5d3761 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -54,7 +54,9 @@ groups() -> emqx_common_test_helpers:all(?MODULE) -- [ t_session_expire_with_delayed_willmsg, - t_no_takeover_with_delayed_willmsg + t_no_takeover_with_delayed_willmsg, + t_takeover_before_session_expire, + t_takeover_before_willmsg_expire ]}, {mqttv5, [], emqx_common_test_helpers:all(?MODULE)} ]. @@ -405,6 +407,144 @@ t_session_expire_with_delayed_willmsg(Config) -> ?assert(not is_process_alive(CPid1)), ok. +t_takeover_before_session_expire(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + emqx_logger:set_log_level(debug), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 10}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> + %% and delay-interval 10s > session expiry 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + [ + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + } + ] ++ + %% WHEN: client session is taken over with in 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + ct:pal("FCtx: ~p", [FCtx]), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + assert_messages_missed(Client1Msgs, Received), + + Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + {IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay10">>), + ?assertNot(IsWill), + ?assertEqual([], ReceivedNoWill), + %% THEN: for MQTT v5, payload <<"willpayload_delay10">> should NOT be published. + Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], + {IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay10">>), + ?assertEqual([], ReceivedNoWill11), + ?assertNot(IsWill11), + emqtt:stop(CPidSub), + emqtt:stop(CPid2), + ?assert(not is_process_alive(CPid1)), + ok. + +t_takeover_before_willmsg_expire(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + emqx_logger:set_log_level(debug), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 3}}, + {properties, #{'Session-Expiry-Interval' => 10}} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> + %% and will-delay-interval 3s < session expiry 10s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + [ + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + } + ] ++ + %% WHEN: another client takeover the session with in 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + ct:pal("FCtx: ~p", [FCtx]), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + assert_messages_missed(Client1Msgs, Received), + + Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + {IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay10">>), + ?assertNot(IsWill), + ?assertEqual([], ReceivedNoWill), + %% THEN: for MQTT v5, payload <<"willpayload_delay10">> should NOT be published after 3s. + Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], + {IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay10">>), + ?assertEqual([], ReceivedNoWill11), + ?assertNot(IsWill11), + emqtt:stop(CPidSub), + emqtt:stop(CPid2), + ?assert(not is_process_alive(CPid1)), + ok. + t_kick_session(Config) -> process_flag(trap_exit, true), ClientId = atom_to_binary(?FUNCTION_NAME),