diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 0c605b158..cf9f67f39 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -2241,8 +2241,7 @@ session_disconnect(_ClientInfo, _ConnInfo, undefined) -> %% When willmsg presents the decision whether or when to publish the Will Message are effected by %% the followings: %% - connecion state -%% - If it is MQTT normal disconnection (RC: 0) -%% - If it is MQTT normal disconnection (RC: 4) +%% - If it is MQTT normal disconnection (RC: 0) or abnormal (RC != 0) from the *client* %% - will delay interval (MQTT 5.0 only) %% - session expire Session Expiry (MQTT 5.0 only) %% - EMQX operations on the client @@ -2251,27 +2250,23 @@ session_disconnect(_ClientInfo, _ConnInfo, undefined) -> %% @NOTE: %% The caller does not need to take care of the case when process terminates while will_msg is published %% as it is designed by the spec. +%% @NOTE: +%% this function should be safe to be called multiple times in the life time of the connecion process, the willmsg +%% must be delete from the state if it is published or cleared. -spec maybe_publish_will_msg(Reason, channel()) -> channel() when Reason :: %% Connection is terminating because session is taken over by another process. takenover - %% Connection is terminating because of EMQX mgmt operation, also delete the session. + %% Connection is terminating because of EMQX mgmt operation, the session state is deleted with none-zero RC code | kicked - %% Connection is terminating because session is taken over by another process. + %% Connection is terminating because of client clean start new session. | discarded %% Connection is terminating because session is expired | expired %% Connection is terminating because of socket close/error | sock_closed - %% Connection is terminating with Reasons - | {shutdown, atom()} - %% Connection is terminating, delay willmsg publish is impossible. - | ?chan_terminating - %% Connection is terminating because of normal MQTT disconnection, implies delete the session. - | normal. -maybe_publish_will_msg(normal, Channel) -> - %% [MQTT-3.1.2-8] - Channel; + %% Session is terminating, delay willmsg publish is impossible. + | ?chan_terminating. maybe_publish_will_msg(_Reason, Channel = #channel{will_msg = undefined}) -> %% No will message to publish Channel; @@ -2286,7 +2281,7 @@ maybe_publish_will_msg( ConnState =:= connecting orelse ConnState =:= reauthenticating -> - %% Wrong state to publish + %% Wrong state to publish, they are intermediate state ?tp(debug, willmsg_wrong_state, #{clientid => ClientId}), Channel; maybe_publish_will_msg( @@ -2296,7 +2291,7 @@ maybe_publish_will_msg( } ) -> %% Unconditionally publish will message for MQTT 3.1.1 - ?tp(debug, willmsg_v3, #{clientid => ClientId}), + ?tp(debug, maybe_publish_willmsg_v3, #{clientid => ClientId}), _ = publish_will_msg(Channel#channel.clientinfo, WillMsg), Channel#channel{will_msg = undefined}; maybe_publish_will_msg( @@ -2316,12 +2311,12 @@ maybe_publish_will_msg( -> %% For the cases that session MUST be gone impiles that the will message MUST be published %% a. expired (session expired) - %% b. discarded (Session ends because another process starts new session with the same clientid) - %% c. kicked. (kicked by operation) + %% b. discarded (Session ends because of clean start) + %% c. kicked. (kicked by operation, abnormal conn close) %% d. internal_error (maybe not recoverable) %% This ensures willmsg will be published if the willmsg timer is scheduled but not fired %% OR fired but not yet handled - ?tp(debug, willmsg_session_ends, #{clientid => ClientId}), + ?tp(debug, maybe_publish_willmsg_session_ends, #{clientid => ClientId, reason => Reason}), _ = publish_will_msg(ClientInfo, WillMsg), remove_willmsg(Channel); maybe_publish_will_msg( @@ -2333,24 +2328,30 @@ maybe_publish_will_msg( } ) -> %% TAKEOVER [MQTT-3.1.4-3] - ?tp(debug, willmsg_takeover, #{clientid => ClientId}), + %% MQTT 5, Non-normative comment: + %% """" + %% If a Network Connection uses a Client Identifier of an existing Network Connection to the Server, + %% the Will Message for the exiting connection is sent unless the new connection specifies Clean Start + %% of 0 and the Will Delay is greater than zero. If the Will Delay is 0 the Will Message is sent at + %% the close of the existing Network Connection, and if Clean Start is 1 the Will Message is sent + %% because the Session ends. + %% """" + %% NOTE, above clean start=1 is `discard' scenarios not `takeover' scenario. case will_delay_interval(WillMsg) of 0 -> - %% MQTT 5, Non-normative comment: - %% """" - %% If a Network Connection uses a Client Identifier of an existing Network Connection to the Server, - %% the Will Message for the exiting connection is sent unless the new connection specifies Clean Start - %% of 0 and the Will Delay is greater than zero. If the Will Delay is 0 the Will Message is sent at - %% the close of the existing Network Connection, and if Clean Start is 1 the Will Message is sent - %% because the Session ends. - %% """" + ?tp(debug, maybe_publish_willmsg_takenover_pub, #{clientid => ClientId}), _ = publish_will_msg(ClientInfo, WillMsg); I when I > 0 -> - %% @NOTE: We do not publish willmsg when EI > 0 but that does not mean we shall delay the willmsg publish - %% because the session is already takenover by another process. If we delay the willmsg publish, the willmsg - %% will be published as there is no chance to get it cancelled. This is not stated clearly in the MQTT spec. + %% @NOTE Non-normative comment in MQTT 5.0 spec + %% """ + %% One use of this is to avoid publishing Will Messages if there is a temporary network + %% disconnection and the Client succeeds in reconnecting and continuing its Session + %% before the Will Message is published. + %% """ + ?tp(debug, maybe_publish_willmsg_takenover_skip, #{clientid => ClientId}), skip end, + %% Maybe we should cancel first then send remove_willmsg(Channel); maybe_publish_will_msg( {shutdown, _}, @@ -2363,9 +2364,10 @@ maybe_publish_will_msg( %% MQTT 5: 3.1.2.11.2 Session Expiry Interval %% If the Session Expiry Interval is absent the value 0 is used. %% If it is set to 0, or is absent, the Session ends when the Network Connection is closed. - %% Expire_interval == 0, means session is over at the time of calling with shutdown. - ?tp(debug, willmsg_takeover, #{clientid => ClientId}), + %% Expire_interval == 0, means session is end at the time of calling with shutdown. + ?tp(debug, maybe_publish_will_msg_shutdown, #{clientid => ClientId}), _ = publish_will_msg(ClientInfo, WillMsg), + %% Maybe we should cancel first then send remove_willmsg(Channel); maybe_publish_will_msg( Reason, @@ -2375,14 +2377,17 @@ maybe_publish_will_msg( conninfo = #{clientid := ClientId} } ) -> - %% Handles other Unknown Reasons. + %% Default to handle other reasons case will_delay_interval(WillMsg) of 0 -> - ?tp(debug, willmsg_other_publish, #{clientid => ClientId, reason => Reason}), + ?tp(debug, maybe_publish_will_msg_other_publish, #{ + clientid => ClientId, reason => Reason + }), _ = publish_will_msg(ClientInfo, WillMsg), + %% Maybe we should cancel first then send remove_willmsg(Channel); I when I > 0 -> - ?tp(debug, willmsg_other_delay, #{clientid => ClientId, reason => Reason}), + ?tp(debug, maybe_publish_will_msg_other_delay, #{clientid => ClientId, reason => Reason}), ensure_timer(will_message, timer:seconds(I), Channel) end. diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index 25b32bd50..4575634a9 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -41,6 +41,7 @@ init_per_suite(Config) -> [emqx], #{work_dir => emqx_cth_suite:work_dir(Config)} ), + emqx_logger:set_log_level(debug), [{apps, Apps} | Config]. end_per_suite(Config) -> @@ -284,7 +285,7 @@ t_takeover_clean_session_with_delayed_willmsg(Config) -> ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay10">>), {IsWill2, _ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>), - %% THEN: payload <<"willpayload_delay10">> should be published without delay. + %% THEN: payload <<"willpayload_delay10">> should be published without delay ?assert(IsWill1), ?assertNot(IsWill2), emqtt:stop(CPid2), @@ -432,7 +433,7 @@ t_takeover_before_session_expire_willdelay0(Config) -> ], Commands = %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> - %% and delay-interval 10s session expiry 3s. + %% and delay-interval 0s session expiry 3s. [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ [ {fun start_client/5, [ @@ -467,6 +468,7 @@ t_takeover_before_session_expire_willdelay0(Config) -> Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), + %% THEN: willmsg is published ?assert(IsWill), emqtt:stop(CPidSub), emqtt:stop(CPid2), @@ -526,6 +528,7 @@ t_takeover_before_session_expire(Config) -> Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), + %% THEN: No Willmsg is published ?assertNot(IsWill), ?assertNotEqual([], ReceivedNoWill), emqtt:stop(CPidSub), @@ -610,7 +613,7 @@ t_takeover_session_then_abnormal_disconnect(Config) -> ], Commands = %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> - %% and delay-interval 10s, session expiry 3s. + %% and will-delay-interval 10s > session expiry 3s. [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ [ {fun start_client/5, [ @@ -645,7 +648,7 @@ t_takeover_session_then_abnormal_disconnect(Config) -> Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), - %% THEN: willmsg is published before session expiry + %% THEN: willmsg is not published before session expiry ?assertNot(IsWill), ?assertNotEqual([], ReceivedNoWill), Received1 = [Msg || {publish, Msg} <- ?drainMailbox(3000)], @@ -668,7 +671,16 @@ t_takeover_session_then_abnormal_disconnect_2(Config) -> {proto_ver, ?config(mqtt_vsn, Config)}, {clean_start, false}, {will_topic, WillTopic}, - {will_payload, <<"willpayload_delay10">>}, + {will_payload, <<"willpayload_delay1">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 1}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + WillOpts2 = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay2">>}, {will_qos, 1}, {will_props, #{'Will-Delay-Interval' => 0}}, {properties, #{'Session-Expiry-Interval' => 3}} @@ -691,9 +703,9 @@ t_takeover_session_then_abnormal_disconnect_2(Config) -> [] } ] ++ - %% GIVEN: client reconnect with willmsg payload <<"willpayload_delay10">> - %% and delay-interval 0s, session expiry 3s. - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + %% GIVEN: client *reconnect* with willmsg payload <<"willpayload_delay2">> + %% and will-delay-interval 0s, session expiry 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts2]}], FCtx = lists:foldl( fun({Fun, Args}, Ctx) -> @@ -707,12 +719,15 @@ t_takeover_session_then_abnormal_disconnect_2(Config) -> assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), %% WHEN: client disconnect abnormally emqtt:disconnect(CPid2, ?RC_DISCONNECT_WITH_WILL_MESSAGE), - Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + Received = [Msg || {publish, Msg} <- ?drainMailbox(2000)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), - {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), - %% THEN: willmsg is published after session expiry - ?assert(IsWill), + {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay1">>), + %% THEN: willmsg1 of old conn is not published because will-delay-interval > 0 + ?assertNot(IsWill), ?assertNotEqual([], ReceivedNoWill), + %% THEN: willmsg1 is published because will-delay-interval is 0 + {IsWill2, _} = filter_payload(Received, <<"willpayload_delay2">>), + ?assert(IsWill2), emqtt:stop(CPidSub), ?assert(not is_process_alive(CPid1)), ?assert(not is_process_alive(CPid2)),