diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 46a1a7f51..296cce949 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -145,6 +145,7 @@ ). -define(LIMITER_ROUTING, message_routing). +-define(chan_terminating, chan_terminating). -dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}). @@ -873,6 +874,7 @@ do_unsubscribe( %% MQTT-v5.0: 3.14.4 DISCONNECT Actions maybe_clean_will_msg(?RC_SUCCESS, Channel) -> + %% [MQTT-3.14.4-3] Channel#channel{will_msg = undefined}; maybe_clean_will_msg(_ReasonCode, Channel) -> Channel. @@ -1165,7 +1167,8 @@ handle_call( shutdown(kicked, ok, Channel1) end; handle_call(discard, Channel) -> - disconnect_and_shutdown(discarded, ok, Channel); + Channel0 = maybe_publish_will_msg(discarded, Channel), + disconnect_and_shutdown(discarded, ok, Channel0); %% Session Takeover handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) -> reply(Session, Channel#channel{takeover = true}); @@ -1188,7 +1191,8 @@ handle_call( emqx_channel_takeover_end, #{clientid => ClientId} ), - disconnect_and_shutdown(takenover, AllPendings, Channel); + Channel0 = maybe_publish_will_msg(takenover, Channel), + disconnect_and_shutdown(takenover, AllPendings, Channel0); handle_call(list_authz_cache, Channel) -> {reply, emqx_authz_cache:list_authz_cache(), Channel}; handle_call( @@ -1240,7 +1244,6 @@ handle_info( ) when ConnState =:= connected orelse ConnState =:= reauthenticating -> - {Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session), Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(sock_closed, Channel)), Channel2 = Channel1#channel{session = Session1}, @@ -1354,8 +1357,9 @@ handle_timeout( handle_out(publish, Replies, Channel#channel{session = NSession}) end; handle_timeout(_TRef, expire_session, Channel = #channel{session = Session}) -> + Channel0 = maybe_publish_will_msg(expired, Channel), ok = emqx_session:destroy(Session), - shutdown(expired, Channel); + shutdown(expired, Channel0); handle_timeout( _TRef, will_message = TimerName, @@ -1439,10 +1443,9 @@ terminate({shutdown, Reason}, Channel) when Reason =:= kicked orelse Reason =:= discarded -> - Channel1 = maybe_publish_will_msg(Reason, Channel), - run_terminate_hook(Reason, Channel1); + run_terminate_hook(Reason, Channel); terminate(Reason, Channel) -> - Channel1 = maybe_publish_will_msg(Reason, Channel), + Channel1 = maybe_publish_will_msg(?chan_terminating, Channel), run_terminate_hook(Reason, Channel1). run_terminate_hook(_Reason, #channel{session = undefined}) -> @@ -2234,74 +2237,149 @@ session_disconnect(_ClientInfo, _ConnInfo, undefined) -> %%-------------------------------------------------------------------- %% Maybe Publish will msg +%% @doc May publish will message [MQTT-3.1.2-8] +%% When willmsg presents the decision whether or when to publish the Will Message are effected by +%% the followings: +%% - connecion state +%% - If it is MQTT normal disconnection (RC: 0) +%% - If it is MQTT normal disconnection (RC: 4) +%% - will delay interval (MQTT 5.0 only) +%% - session expire Session Expiry (MQTT 5.0 only) +%% - EMQX operations on the client +%% @NOTE: +%% Connection close with session expiry interval = 0 means session close. -spec maybe_publish_will_msg(Reason, channel()) -> channel() when - Reason :: takenover | kicked | discarded | expired | sock_closed | {shutdown, atom()}. + Reason :: + %% Connection will terminate because session is taken over by another process. + takenover + %% Connection will terminate because of EMQX mgmt operation, also delete the session. + | kicked + %% Connection will terminate because session is taken over by another process. + | discarded + %% Connection will terminate because session is expired + | expired + %% Connection will terminate because of socket close/error + | sock_closed + %% Connection will terminate with Reasons + | {shutdown, atom()} + %% Connection will terminate soon, delay willmsg publish is impossible. + | ?chan_terminating + %% Connection will terminate because of normal MQTT disconnection, implies delete the session. + | normal. +maybe_publish_will_msg(normal, Channel) -> + %% [MQTT-3.1.2-8] + Channel; maybe_publish_will_msg(_Reason, Channel = #channel{will_msg = undefined}) -> + %% No will message to publish Channel; -maybe_publish_will_msg({shutdown, not_authorized}, Channel) -> - Channel; -maybe_publish_will_msg(not_authorized, Channel) -> - Channel; -maybe_publish_will_msg(_Reason, Channel = #channel{conn_state = ConnState}) when +maybe_publish_will_msg( + _Reason, + Channel = #channel{ + conn_state = ConnState, + conninfo = #{clientid := ClientId} + } +) when ConnState =:= idle orelse ConnState =:= connecting orelse ConnState =:= reauthenticating -> + %% Wrong state to publish + ?tp(debug, willmsg_wrong_state, #{clientid => ClientId}), Channel; maybe_publish_will_msg( _Reason, - Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V3}, will_msg = WillMsg} + Channel = #channel{ + conninfo = #{proto_ver := ?MQTT_PROTO_V3, clientid := ClientId}, will_msg = WillMsg + } ) -> %% Unconditionally publish will message for MQTT 3.1.1 - ok = publish_will_msg(Channel#channel.clientinfo, WillMsg), + ?tp(debug, willmsg_v3, #{clientid => ClientId}), + _ = publish_will_msg(Channel#channel.clientinfo, WillMsg), Channel#channel{will_msg = undefined}; maybe_publish_will_msg( - Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers} + Reason, + Channel = #channel{ + clientinfo = ClientInfo, + conninfo = #{clientid := ClientId}, + will_msg = WillMsg + } ) when Reason =:= expired orelse Reason =:= discarded orelse - %% Unsure... - Reason =:= {shutdown, internal_error} orelse - Reason =:= kicked + Reason =:= kicked orelse + Reason =:= ?chan_terminating orelse + %% Depends on the session backend, we may lost the session + Reason =:= {shutdown, internal_error} -> - %% For the cases that session MUST be gone. + %% For the cases that session MUST be gone impiles that the will message MUST be published %% a. expired (session expired) - %% c. discarded (Session ends because another process starts new session with the same clientid) - %% b. kicked. (kicked by operation) + %% b. discarded (Session ends because another process starts new session with the same clientid) + %% c. kicked. (kicked by operation) %% d. internal_error (maybe not recoverable) %% This ensures willmsg will be published if the willmsg timer is scheduled but not fired %% OR fired but not yet handled - DelayedWillTimer = maps:get(will_message, Timers, undefined), - DelayedWillTimer =/= undefined andalso erlang:cancel_timer(DelayedWillTimer, [{async, true}]), + ?tp(debug, willmsg_session_ends, #{clientid => ClientId}), _ = publish_will_msg(ClientInfo, WillMsg), - Channel#channel{will_msg = undefined, timers = maps:remove(will_message, Timers)}; + remove_willmsg(Channel); maybe_publish_will_msg( - Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers} + takenover, + Channel = #channel{ + clientinfo = ClientInfo, + will_msg = WillMsg, + conninfo = #{clientid := ClientId} + } ) -> - %% For the cases that session MAY/MAY NOT be gone, we don't care about session expired or not. - %% willmsg publish could be defered. - IsSessionExpirationInProgress = maps:is_key(expire_session, Timers), - IsWillmsgScheduled = maps:is_key(will_message, Timers), + %% TAKEOVER [MQTT-3.1.4-3] + ?tp(debug, willmsg_takeover, #{clientid => ClientId}), case will_delay_interval(WillMsg) of 0 -> - %% [MQTT-3.1.2-8], 0 means will delay Will Delay Interval has elapsed - false = IsWillmsgScheduled, - ok = publish_will_msg(ClientInfo, WillMsg), - Channel#channel{will_msg = undefined}; - I when IsSessionExpirationInProgress andalso not IsWillmsgScheduled -> - %% We delay the will message publishing - %% Willmsg will be published whatever which timer fired first - ensure_timer(will_message, timer:seconds(I), Channel); - _ when IsSessionExpirationInProgress andalso IsWillmsgScheduled -> - %% Willmsg will be published whatever which timer fired first [MQTT-3.1.3-9]. - Channel; - _I when Reason =:= takenover -> - %% don't see the point to delay the willmsg - Channel; - _I when not IsSessionExpirationInProgress andalso IsWillmsgScheduled -> - Channel; - I when not IsSessionExpirationInProgress andalso not IsWillmsgScheduled -> - %% @FIXME: process may terminate before the timer fired + %% MQTT 5, Non-normative comment: + %% """" + %% If a Network Connection uses a Client Identifier of an existing Network Connection to the Server, + %% the Will Message for the exiting connection is sent unless the new connection specifies Clean Start + %% of 0 and the Will Delay is greater than zero. If the Will Delay is 0 the Will Message is sent at + %% the close of the existing Network Connection, and if Clean Start is 1 the Will Message is sent + %% because the Session ends. + %% """" + _ = publish_will_msg(ClientInfo, WillMsg); + I when I > 0 -> + %% @NOTE: We do not publish willmsg when EI > 0 but that does not mean we shall delay the willmsg publish + %% because the session is already takenover by another process. If we delay the willmsg publish, the willmsg + %% will be published as there is no chance to get it cancelled. This is not stated clearly in the MQTT spec. + skip + end, + remove_willmsg(Channel); +maybe_publish_will_msg( + {shutdown, _}, + Channel = #channel{ + conninfo = #{expiry_interval := 0, clientid := ClientId}, + clientinfo = ClientInfo, + will_msg = WillMsg + } +) -> + %% MQTT 5: 3.1.2.11.2 Session Expiry Interval + %% If the Session Expiry Interval is absent the value 0 is used. + %% If it is set to 0, or is absent, the Session ends when the Network Connection is closed. + %% Expire_interval == 0, means session is over at the time of calling with shutdown. + ?tp(debug, willmsg_takeover, #{clientid => ClientId}), + _ = publish_will_msg(ClientInfo, WillMsg), + remove_willmsg(Channel); +maybe_publish_will_msg( + Reason, + Channel = #channel{ + clientinfo = ClientInfo, + will_msg = WillMsg, + conninfo = #{clientid := ClientId} + } +) -> + %% Handles other Unknown Reasons. + case will_delay_interval(WillMsg) of + 0 -> + ?tp(debug, willmsg_other_publish, #{clientid => ClientId, reason => Reason}), + _ = publish_will_msg(ClientInfo, WillMsg), + remove_willmsg(Channel); + I when I > 0 -> + ?tp(debug, willmsg_other_delay, #{clientid => ClientId, reason => Reason}), ensure_timer(will_message, timer:seconds(I), Channel) end. @@ -2434,6 +2512,18 @@ get_mqtt_conf(Zone, Key) -> get_mqtt_conf(Zone, Key, Default) -> emqx_config:get_zone_conf(Zone, [mqtt, Key], Default). +-spec remove_willmsg(Old :: channel()) -> New :: channel(). +remove_willmsg(Channel = #channel{timers = Timers}) -> + case maps:get(will_message, Timers, undefined) of + undefined -> + Channel#channel{will_msg = undefined}; + DelayedWillTimer -> + ok = erlang:cancel_timer(DelayedWillTimer, [{async, true}, {info, false}]), + Channel#channel{ + will_msg = undefined, + timers = maps:remove(will_message, Timers) + } + end. %%-------------------------------------------------------------------- %% For CT tests %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index bce5d3761..25b32bd50 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -50,17 +50,22 @@ end_per_suite(Config) -> groups() -> [ - {mqttv3, [], - emqx_common_test_helpers:all(?MODULE) -- - [ - t_session_expire_with_delayed_willmsg, - t_no_takeover_with_delayed_willmsg, - t_takeover_before_session_expire, - t_takeover_before_willmsg_expire - ]}, + {mqttv3, [], emqx_common_test_helpers:all(?MODULE) -- tc_v5_only()}, {mqttv5, [], emqx_common_test_helpers:all(?MODULE)} ]. +tc_v5_only() -> + [ + t_session_expire_with_delayed_willmsg, + t_no_takeover_with_delayed_willmsg, + t_takeover_before_session_expire, + t_takeover_before_willmsg_expire, + t_takeover_before_session_expire_willdelay0, + t_takeover_session_then_normal_disconnect, + t_takeover_session_then_abnormal_disconnect, + t_takeover_session_then_abnormal_disconnect_2 + ]. + init_per_group(mqttv3, Config) -> lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v3}); init_per_group(mqttv5, Config) -> @@ -178,7 +183,6 @@ t_takeover_willmsg_clean_session(Config) -> Middle = ?CNT div 2, Client1Msgs = messages(ClientId, 0, Middle), Client2Msgs = messages(ClientId, Middle, ?CNT div 2), - AllMsgs = Client1Msgs ++ Client2Msgs, WillOpts = [ {proto_ver, ?config(mqtt_vsn, Config)}, {clean_start, false}, @@ -220,9 +224,7 @@ t_takeover_willmsg_clean_session(Config) -> Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_1">>), - {IsWill2, ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>), - assert_messages_missed(AllMsgs, ReceivedNoWill), - assert_messages_order(AllMsgs, ReceivedNoWill), + {IsWill2, _ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>), %% THEN: payload <<"willpayload_1">> should be published instead of <<"willpayload_2">> ?assert(IsWill1), ?assertNot(IsWill2), @@ -238,7 +240,6 @@ t_takeover_clean_session_with_delayed_willmsg(Config) -> Middle = ?CNT div 2, Client1Msgs = messages(ClientId, 0, Middle), Client2Msgs = messages(ClientId, Middle, ?CNT div 2), - AllMsgs = Client1Msgs ++ Client2Msgs, WillOpts = [ {proto_ver, ?config(mqtt_vsn, Config)}, {clean_start, false}, @@ -282,9 +283,7 @@ t_takeover_clean_session_with_delayed_willmsg(Config) -> Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay10">>), - {IsWill2, ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>), - assert_messages_missed(AllMsgs, ReceivedNoWill), - assert_messages_order(AllMsgs, ReceivedNoWill), + {IsWill2, _ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>), %% THEN: payload <<"willpayload_delay10">> should be published without delay. ?assert(IsWill1), ?assertNot(IsWill2), @@ -332,17 +331,19 @@ t_no_takeover_with_delayed_willmsg(Config) -> Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), assert_messages_missed(Client1Msgs, Received), + {IsWill0, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay3">>), + ?assertNot(IsWill0), + ?assertNotEqual([], ReceivedNoWill0), #{client := [CPidSub, CPid1]} = FCtx, %% WHEN: client disconnects abnormally AND no reconnect after 3s. exit(CPid1, kill), assert_client_exit(CPid1, ?config(mqtt_vsn, Config), killed), Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], - - {IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay3">>), - ?assertNot(IsWill), - ?assertEqual([], ReceivedNoWill), - %% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after WILL delay. + {IsWill1, ReceivedNoWill1} = filter_payload(Received1, <<"willpayload_delay3">>), + ?assertNot(IsWill1), + ?assertEqual([], ReceivedNoWill1), + %% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after WILL delay (3 secs). Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], {IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay3">>), ?assertEqual([], ReceivedNoWill11), @@ -388,6 +389,9 @@ t_session_expire_with_delayed_willmsg(Config) -> Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), + ?assertNot(IsWill), + ?assertNotEqual([], ReceivedNoWill), assert_messages_missed(Client1Msgs, Received), #{client := [CPidSub, CPid1]} = FCtx, %% WHEN: client disconnects abnormally AND no reconnect after 3s. @@ -395,25 +399,86 @@ t_session_expire_with_delayed_willmsg(Config) -> assert_client_exit(CPid1, ?config(mqtt_vsn, Config), killed), Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], - {IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay10">>), - ?assertNot(IsWill), - ?assertEqual([], ReceivedNoWill), + {IsWill1, ReceivedNoWill1} = filter_payload(Received1, <<"willpayload_delay10">>), + ?assertNot(IsWill1), + ?assertEqual([], ReceivedNoWill1), %% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after session expiry. Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], - {IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay10">>), - ?assertEqual([], ReceivedNoWill11), - ?assert(IsWill11), + {IsWill12, ReceivedNoWill2} = filter_payload(Received2, <<"willpayload_delay10">>), + ?assertEqual([], ReceivedNoWill2), + ?assert(IsWill12), emqtt:stop(CPidSub), ?assert(not is_process_alive(CPid1)), ok. +%% @TODO 'Server-Keep-Alive' +%% t_no_takeover_keepalive_fired(Config) -> +%% ok. + +t_takeover_before_session_expire_willdelay0(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 0}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> + %% and delay-interval 10s session expiry 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + [ + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(1000), + CTX + end, + [] + } + ] ++ + %% WHEN: client session is taken over within 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), + ?assert(IsWill), + emqtt:stop(CPidSub), + emqtt:stop(CPid2), + ?assert(not is_process_alive(CPid1)), + ok. + t_takeover_before_session_expire(Config) -> ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), process_flag(trap_exit, true), ClientId = atom_to_binary(?FUNCTION_NAME), WillTopic = <>/binary>>, Client1Msgs = messages(ClientId, 0, 10), - emqx_logger:set_log_level(debug), WillOpts = [ {proto_ver, ?config(mqtt_vsn, Config)}, {clean_start, false}, @@ -443,7 +508,7 @@ t_takeover_before_session_expire(Config) -> [] } ] ++ - %% WHEN: client session is taken over with in 3s. + %% WHEN: client session is taken over within 3s. [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], FCtx = lists:foldl( @@ -460,29 +525,205 @@ t_takeover_before_session_expire(Config) -> Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), - assert_messages_missed(Client1Msgs, Received), - - Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], - {IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay10">>), + {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), ?assertNot(IsWill), - ?assertEqual([], ReceivedNoWill), - %% THEN: for MQTT v5, payload <<"willpayload_delay10">> should NOT be published. - Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], - {IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay10">>), - ?assertEqual([], ReceivedNoWill11), - ?assertNot(IsWill11), + ?assertNotEqual([], ReceivedNoWill), emqtt:stop(CPidSub), emqtt:stop(CPid2), ?assert(not is_process_alive(CPid1)), ok. +t_takeover_session_then_normal_disconnect(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 10}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + Commands = + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + [ + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + } + ] ++ + %% GIVEN: client reconnect with willmsg payload <<"willpayload_delay10">> + %% and delay-interval 10s > session expiry 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + %% WHEN: client disconnect normally. + emqtt:disconnect(CPid2, ?RC_SUCCESS), + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), + %% THEN: willmsg is not published. + ?assertNot(IsWill), + ?assertNotEqual([], ReceivedNoWill), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ?assert(not is_process_alive(CPid2)), + ok. + +t_takeover_session_then_abnormal_disconnect(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 10}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> + %% and delay-interval 10s, session expiry 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + [ + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + } + ] ++ + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + %% WHEN: client disconnect abnormally + emqtt:disconnect(CPid2, ?RC_DISCONNECT_WITH_WILL_MESSAGE), + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), + %% THEN: willmsg is published before session expiry + ?assertNot(IsWill), + ?assertNotEqual([], ReceivedNoWill), + Received1 = [Msg || {publish, Msg} <- ?drainMailbox(3000)], + {IsWill1, ReceivedNoWill1} = filter_payload(Received1, <<"willpayload_delay10">>), + %% AND THEN: willmsg is published after session expiry + ?assert(IsWill1), + ?assertEqual([], ReceivedNoWill1), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ?assert(not is_process_alive(CPid2)), + ok. + +t_takeover_session_then_abnormal_disconnect_2(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 0}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + Commands = + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + [ + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + } + ] ++ + %% GIVEN: client reconnect with willmsg payload <<"willpayload_delay10">> + %% and delay-interval 0s, session expiry 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + %% WHEN: client disconnect abnormally + emqtt:disconnect(CPid2, ?RC_DISCONNECT_WITH_WILL_MESSAGE), + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), + %% THEN: willmsg is published after session expiry + ?assert(IsWill), + ?assertNotEqual([], ReceivedNoWill), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ?assert(not is_process_alive(CPid2)), + ok. + t_takeover_before_willmsg_expire(Config) -> ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), process_flag(trap_exit, true), ClientId = atom_to_binary(?FUNCTION_NAME), WillTopic = <>/binary>>, Client1Msgs = messages(ClientId, 0, 10), - emqx_logger:set_log_level(debug), WillOpts = [ {proto_ver, ?config(mqtt_vsn, Config)}, {clean_start, false}, @@ -590,8 +831,8 @@ t_kick_session(Config) -> Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), %% THEN: payload <<"willpayload_kick">> should be published - {IsWill1, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_kick">>), - ?assert(IsWill1), + {IsWill, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_kick">>), + ?assert(IsWill), emqtt:stop(CPidSub), ?assert(not is_process_alive(CPid1)), ok.