refactor: update notes for willmsg

This commit is contained in:
William Yang 2024-02-26 16:34:10 +01:00
parent 975c7429e5
commit d5247cb567
2 changed files with 67 additions and 47 deletions

View File

@ -2241,8 +2241,7 @@ session_disconnect(_ClientInfo, _ConnInfo, undefined) ->
%% When willmsg presents the decision whether or when to publish the Will Message are effected by %% When willmsg presents the decision whether or when to publish the Will Message are effected by
%% the followings: %% the followings:
%% - connecion state %% - connecion state
%% - If it is MQTT normal disconnection (RC: 0) %% - If it is MQTT normal disconnection (RC: 0) or abnormal (RC != 0) from the *client*
%% - If it is MQTT normal disconnection (RC: 4)
%% - will delay interval (MQTT 5.0 only) %% - will delay interval (MQTT 5.0 only)
%% - session expire Session Expiry (MQTT 5.0 only) %% - session expire Session Expiry (MQTT 5.0 only)
%% - EMQX operations on the client %% - EMQX operations on the client
@ -2251,27 +2250,23 @@ session_disconnect(_ClientInfo, _ConnInfo, undefined) ->
%% @NOTE: %% @NOTE:
%% The caller does not need to take care of the case when process terminates while will_msg is published %% The caller does not need to take care of the case when process terminates while will_msg is published
%% as it is designed by the spec. %% as it is designed by the spec.
%% @NOTE:
%% this function should be safe to be called multiple times in the life time of the connecion process, the willmsg
%% must be delete from the state if it is published or cleared.
-spec maybe_publish_will_msg(Reason, channel()) -> channel() when -spec maybe_publish_will_msg(Reason, channel()) -> channel() when
Reason :: Reason ::
%% Connection is terminating because session is taken over by another process. %% Connection is terminating because session is taken over by another process.
takenover takenover
%% Connection is terminating because of EMQX mgmt operation, also delete the session. %% Connection is terminating because of EMQX mgmt operation, the session state is deleted with none-zero RC code
| kicked | kicked
%% Connection is terminating because session is taken over by another process. %% Connection is terminating because of client clean start new session.
| discarded | discarded
%% Connection is terminating because session is expired %% Connection is terminating because session is expired
| expired | expired
%% Connection is terminating because of socket close/error %% Connection is terminating because of socket close/error
| sock_closed | sock_closed
%% Connection is terminating with Reasons %% Session is terminating, delay willmsg publish is impossible.
| {shutdown, atom()} | ?chan_terminating.
%% Connection is terminating, delay willmsg publish is impossible.
| ?chan_terminating
%% Connection is terminating because of normal MQTT disconnection, implies delete the session.
| normal.
maybe_publish_will_msg(normal, Channel) ->
%% [MQTT-3.1.2-8]
Channel;
maybe_publish_will_msg(_Reason, Channel = #channel{will_msg = undefined}) -> maybe_publish_will_msg(_Reason, Channel = #channel{will_msg = undefined}) ->
%% No will message to publish %% No will message to publish
Channel; Channel;
@ -2286,7 +2281,7 @@ maybe_publish_will_msg(
ConnState =:= connecting orelse ConnState =:= connecting orelse
ConnState =:= reauthenticating ConnState =:= reauthenticating
-> ->
%% Wrong state to publish %% Wrong state to publish, they are intermediate state
?tp(debug, willmsg_wrong_state, #{clientid => ClientId}), ?tp(debug, willmsg_wrong_state, #{clientid => ClientId}),
Channel; Channel;
maybe_publish_will_msg( maybe_publish_will_msg(
@ -2296,7 +2291,7 @@ maybe_publish_will_msg(
} }
) -> ) ->
%% Unconditionally publish will message for MQTT 3.1.1 %% Unconditionally publish will message for MQTT 3.1.1
?tp(debug, willmsg_v3, #{clientid => ClientId}), ?tp(debug, maybe_publish_willmsg_v3, #{clientid => ClientId}),
_ = publish_will_msg(Channel#channel.clientinfo, WillMsg), _ = publish_will_msg(Channel#channel.clientinfo, WillMsg),
Channel#channel{will_msg = undefined}; Channel#channel{will_msg = undefined};
maybe_publish_will_msg( maybe_publish_will_msg(
@ -2316,12 +2311,12 @@ maybe_publish_will_msg(
-> ->
%% For the cases that session MUST be gone impiles that the will message MUST be published %% For the cases that session MUST be gone impiles that the will message MUST be published
%% a. expired (session expired) %% a. expired (session expired)
%% b. discarded (Session ends because another process starts new session with the same clientid) %% b. discarded (Session ends because of clean start)
%% c. kicked. (kicked by operation) %% c. kicked. (kicked by operation, abnormal conn close)
%% d. internal_error (maybe not recoverable) %% d. internal_error (maybe not recoverable)
%% This ensures willmsg will be published if the willmsg timer is scheduled but not fired %% This ensures willmsg will be published if the willmsg timer is scheduled but not fired
%% OR fired but not yet handled %% OR fired but not yet handled
?tp(debug, willmsg_session_ends, #{clientid => ClientId}), ?tp(debug, maybe_publish_willmsg_session_ends, #{clientid => ClientId, reason => Reason}),
_ = publish_will_msg(ClientInfo, WillMsg), _ = publish_will_msg(ClientInfo, WillMsg),
remove_willmsg(Channel); remove_willmsg(Channel);
maybe_publish_will_msg( maybe_publish_will_msg(
@ -2333,9 +2328,6 @@ maybe_publish_will_msg(
} }
) -> ) ->
%% TAKEOVER [MQTT-3.1.4-3] %% TAKEOVER [MQTT-3.1.4-3]
?tp(debug, willmsg_takeover, #{clientid => ClientId}),
case will_delay_interval(WillMsg) of
0 ->
%% MQTT 5, Non-normative comment: %% MQTT 5, Non-normative comment:
%% """" %% """"
%% If a Network Connection uses a Client Identifier of an existing Network Connection to the Server, %% If a Network Connection uses a Client Identifier of an existing Network Connection to the Server,
@ -2344,13 +2336,22 @@ maybe_publish_will_msg(
%% the close of the existing Network Connection, and if Clean Start is 1 the Will Message is sent %% the close of the existing Network Connection, and if Clean Start is 1 the Will Message is sent
%% because the Session ends. %% because the Session ends.
%% """" %% """"
%% NOTE, above clean start=1 is `discard' scenarios not `takeover' scenario.
case will_delay_interval(WillMsg) of
0 ->
?tp(debug, maybe_publish_willmsg_takenover_pub, #{clientid => ClientId}),
_ = publish_will_msg(ClientInfo, WillMsg); _ = publish_will_msg(ClientInfo, WillMsg);
I when I > 0 -> I when I > 0 ->
%% @NOTE: We do not publish willmsg when EI > 0 but that does not mean we shall delay the willmsg publish %% @NOTE Non-normative comment in MQTT 5.0 spec
%% because the session is already takenover by another process. If we delay the willmsg publish, the willmsg %% """
%% will be published as there is no chance to get it cancelled. This is not stated clearly in the MQTT spec. %% One use of this is to avoid publishing Will Messages if there is a temporary network
%% disconnection and the Client succeeds in reconnecting and continuing its Session
%% before the Will Message is published.
%% """
?tp(debug, maybe_publish_willmsg_takenover_skip, #{clientid => ClientId}),
skip skip
end, end,
%% Maybe we should cancel first then send
remove_willmsg(Channel); remove_willmsg(Channel);
maybe_publish_will_msg( maybe_publish_will_msg(
{shutdown, _}, {shutdown, _},
@ -2363,9 +2364,10 @@ maybe_publish_will_msg(
%% MQTT 5: 3.1.2.11.2 Session Expiry Interval %% MQTT 5: 3.1.2.11.2 Session Expiry Interval
%% If the Session Expiry Interval is absent the value 0 is used. %% If the Session Expiry Interval is absent the value 0 is used.
%% If it is set to 0, or is absent, the Session ends when the Network Connection is closed. %% If it is set to 0, or is absent, the Session ends when the Network Connection is closed.
%% Expire_interval == 0, means session is over at the time of calling with shutdown. %% Expire_interval == 0, means session is end at the time of calling with shutdown.
?tp(debug, willmsg_takeover, #{clientid => ClientId}), ?tp(debug, maybe_publish_will_msg_shutdown, #{clientid => ClientId}),
_ = publish_will_msg(ClientInfo, WillMsg), _ = publish_will_msg(ClientInfo, WillMsg),
%% Maybe we should cancel first then send
remove_willmsg(Channel); remove_willmsg(Channel);
maybe_publish_will_msg( maybe_publish_will_msg(
Reason, Reason,
@ -2375,14 +2377,17 @@ maybe_publish_will_msg(
conninfo = #{clientid := ClientId} conninfo = #{clientid := ClientId}
} }
) -> ) ->
%% Handles other Unknown Reasons. %% Default to handle other reasons
case will_delay_interval(WillMsg) of case will_delay_interval(WillMsg) of
0 -> 0 ->
?tp(debug, willmsg_other_publish, #{clientid => ClientId, reason => Reason}), ?tp(debug, maybe_publish_will_msg_other_publish, #{
clientid => ClientId, reason => Reason
}),
_ = publish_will_msg(ClientInfo, WillMsg), _ = publish_will_msg(ClientInfo, WillMsg),
%% Maybe we should cancel first then send
remove_willmsg(Channel); remove_willmsg(Channel);
I when I > 0 -> I when I > 0 ->
?tp(debug, willmsg_other_delay, #{clientid => ClientId, reason => Reason}), ?tp(debug, maybe_publish_will_msg_other_delay, #{clientid => ClientId, reason => Reason}),
ensure_timer(will_message, timer:seconds(I), Channel) ensure_timer(will_message, timer:seconds(I), Channel)
end. end.

View File

@ -41,6 +41,7 @@ init_per_suite(Config) ->
[emqx], [emqx],
#{work_dir => emqx_cth_suite:work_dir(Config)} #{work_dir => emqx_cth_suite:work_dir(Config)}
), ),
emqx_logger:set_log_level(debug),
[{apps, Apps} | Config]. [{apps, Apps} | Config].
end_per_suite(Config) -> end_per_suite(Config) ->
@ -284,7 +285,7 @@ t_takeover_clean_session_with_delayed_willmsg(Config) ->
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay10">>), {IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay10">>),
{IsWill2, _ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>), {IsWill2, _ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>),
%% THEN: payload <<"willpayload_delay10">> should be published without delay. %% THEN: payload <<"willpayload_delay10">> should be published without delay
?assert(IsWill1), ?assert(IsWill1),
?assertNot(IsWill2), ?assertNot(IsWill2),
emqtt:stop(CPid2), emqtt:stop(CPid2),
@ -432,7 +433,7 @@ t_takeover_before_session_expire_willdelay0(Config) ->
], ],
Commands = Commands =
%% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">>
%% and delay-interval 10s session expiry 3s. %% and delay-interval 0s session expiry 3s.
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
[ [
{fun start_client/5, [ {fun start_client/5, [
@ -467,6 +468,7 @@ t_takeover_before_session_expire_willdelay0(Config) ->
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), {IsWill, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>),
%% THEN: willmsg is published
?assert(IsWill), ?assert(IsWill),
emqtt:stop(CPidSub), emqtt:stop(CPidSub),
emqtt:stop(CPid2), emqtt:stop(CPid2),
@ -526,6 +528,7 @@ t_takeover_before_session_expire(Config) ->
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>),
%% THEN: No Willmsg is published
?assertNot(IsWill), ?assertNot(IsWill),
?assertNotEqual([], ReceivedNoWill), ?assertNotEqual([], ReceivedNoWill),
emqtt:stop(CPidSub), emqtt:stop(CPidSub),
@ -610,7 +613,7 @@ t_takeover_session_then_abnormal_disconnect(Config) ->
], ],
Commands = Commands =
%% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">>
%% and delay-interval 10s, session expiry 3s. %% and will-delay-interval 10s > session expiry 3s.
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++
[ [
{fun start_client/5, [ {fun start_client/5, [
@ -645,7 +648,7 @@ t_takeover_session_then_abnormal_disconnect(Config) ->
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>),
%% THEN: willmsg is published before session expiry %% THEN: willmsg is not published before session expiry
?assertNot(IsWill), ?assertNot(IsWill),
?assertNotEqual([], ReceivedNoWill), ?assertNotEqual([], ReceivedNoWill),
Received1 = [Msg || {publish, Msg} <- ?drainMailbox(3000)], Received1 = [Msg || {publish, Msg} <- ?drainMailbox(3000)],
@ -668,7 +671,16 @@ t_takeover_session_then_abnormal_disconnect_2(Config) ->
{proto_ver, ?config(mqtt_vsn, Config)}, {proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false}, {clean_start, false},
{will_topic, WillTopic}, {will_topic, WillTopic},
{will_payload, <<"willpayload_delay10">>}, {will_payload, <<"willpayload_delay1">>},
{will_qos, 1},
{will_props, #{'Will-Delay-Interval' => 1}},
{properties, #{'Session-Expiry-Interval' => 3}}
],
WillOpts2 = [
{proto_ver, ?config(mqtt_vsn, Config)},
{clean_start, false},
{will_topic, WillTopic},
{will_payload, <<"willpayload_delay2">>},
{will_qos, 1}, {will_qos, 1},
{will_props, #{'Will-Delay-Interval' => 0}}, {will_props, #{'Will-Delay-Interval' => 0}},
{properties, #{'Session-Expiry-Interval' => 3}} {properties, #{'Session-Expiry-Interval' => 3}}
@ -691,9 +703,9 @@ t_takeover_session_then_abnormal_disconnect_2(Config) ->
[] []
} }
] ++ ] ++
%% GIVEN: client reconnect with willmsg payload <<"willpayload_delay10">> %% GIVEN: client *reconnect* with willmsg payload <<"willpayload_delay2">>
%% and delay-interval 0s, session expiry 3s. %% and will-delay-interval 0s, session expiry 3s.
[{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts2]}],
FCtx = lists:foldl( FCtx = lists:foldl(
fun({Fun, Args}, Ctx) -> fun({Fun, Args}, Ctx) ->
@ -707,12 +719,15 @@ t_takeover_session_then_abnormal_disconnect_2(Config) ->
assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover),
%% WHEN: client disconnect abnormally %% WHEN: client disconnect abnormally
emqtt:disconnect(CPid2, ?RC_DISCONNECT_WITH_WILL_MESSAGE), emqtt:disconnect(CPid2, ?RC_DISCONNECT_WITH_WILL_MESSAGE),
Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], Received = [Msg || {publish, Msg} <- ?drainMailbox(2000)],
ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), ct:pal("received: ~p", [[P || #{payload := P} <- Received]]),
{IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay1">>),
%% THEN: willmsg is published after session expiry %% THEN: willmsg1 of old conn is not published because will-delay-interval > 0
?assert(IsWill), ?assertNot(IsWill),
?assertNotEqual([], ReceivedNoWill), ?assertNotEqual([], ReceivedNoWill),
%% THEN: willmsg1 is published because will-delay-interval is 0
{IsWill2, _} = filter_payload(Received, <<"willpayload_delay2">>),
?assert(IsWill2),
emqtt:stop(CPidSub), emqtt:stop(CPidSub),
?assert(not is_process_alive(CPid1)), ?assert(not is_process_alive(CPid1)),
?assert(not is_process_alive(CPid2)), ?assert(not is_process_alive(CPid2)),