From 07eec31a8ab70b2dca673efa2f93346da3697092 Mon Sep 17 00:00:00 2001 From: William Yang Date: Thu, 2 Nov 2023 13:25:27 +0100 Subject: [PATCH 01/14] fix: willmsg not published in takeover --- apps/emqx/src/emqx_channel.erl | 22 +-- apps/emqx/src/emqx_connection.erl | 9 +- apps/emqx/src/emqx_quic_stream.erl | 4 + apps/emqx/test/emqx_connection_SUITE.erl | 1 + apps/emqx/test/emqx_shared_sub_SUITE.erl | 2 + apps/emqx/test/emqx_takeover_SUITE.erl | 216 +++++++++++++++++++---- changes/ce/fix-11868.en.md | 2 + 7 files changed, 209 insertions(+), 47 deletions(-) create mode 100644 changes/ce/fix-11868.en.md diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 192335a25..7b23121c5 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1134,16 +1134,14 @@ handle_call( kick, Channel = #channel{ conn_state = ConnState, - will_msg = WillMsg, - clientinfo = ClientInfo, conninfo = #{proto_ver := ProtoVer} } ) -> - (WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg), + Channel0 = maybe_publish_will_msg(Channel), Channel1 = case ConnState of - connected -> ensure_disconnected(kicked, Channel); - _ -> Channel + connected -> ensure_disconnected(kicked, Channel0); + _ -> Channel0 end, case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of true -> @@ -1426,17 +1424,9 @@ terminate(normal, Channel) -> run_terminate_hook(normal, Channel); terminate({shutdown, kicked}, Channel) -> run_terminate_hook(kicked, Channel); -terminate({shutdown, Reason}, Channel) when - Reason =:= discarded; - Reason =:= takenover --> - run_terminate_hook(Reason, Channel); -terminate(Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) -> - %% since will_msg is set to undefined as soon as it is published, - %% if will_msg still exists when the session is terminated, it - %% must be published immediately. - WillMsg =/= undefined andalso publish_will_msg(ClientInfo, WillMsg), - run_terminate_hook(Reason, Channel). +terminate(Reason, Channel) -> + Channel1 = maybe_publish_will_msg(Channel), + run_terminate_hook(Reason, Channel1). run_terminate_hook(_Reason, #channel{session = undefined}) -> ok; diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 96e4f54c7..c4671bc32 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -728,7 +728,8 @@ handle_call(_From, Req, State = #state{channel = Channel}) -> {shutdown, Reason, Reply, OutPacket, NChannel} -> NState = State#state{channel = NChannel}, ok = handle_outgoing(OutPacket, NState), - shutdown(Reason, Reply, NState) + NState2 = graceful_shutdown_transport(Reason, NState), + shutdown(Reason, Reply, NState2) end. %%-------------------------------------------------------------------- @@ -1234,6 +1235,12 @@ set_tcp_keepalive({Type, Id}) -> async_set_keepalive(Idle, Interval, Probes) end. +-spec graceful_shutdown_transport(atom(), state()) -> state(). +graceful_shutdown_transport(_Reason, S = #state{transport = Transport, socket = Socket}) -> + %% @TODO Reason is reserved for future use, quic transport + Transport:shutdown(Socket, read_write), + S#state{sockstate = closed}. + %%-------------------------------------------------------------------- %% For CT tests %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index 42f153b28..09244d67f 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -32,6 +32,7 @@ wait/1, getstat/2, fast_close/1, + shutdown/2, ensure_ok_or_exit/2, async_send/3, setopts/2, @@ -147,6 +148,9 @@ fast_close({quic, _Conn, Stream, _Info}) -> % quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), ok. +shutdown({quic, _Conn, Stream, _Info}, read_write) -> + quicer:async_shutdown_stream(Stream). + -spec ensure_ok_or_exit(atom(), list(term())) -> term(). ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) -> case erlang:apply(?MODULE, Fun, Args) of diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index 98d4e3102..74038a8fc 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -32,6 +32,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> %% Meck Transport ok = meck:new(emqx_transport, [non_strict, passthrough, no_history, no_link]), + ok = meck:expect(emqx_transport, shutdown, fun(_, _) -> ok end), %% Meck Channel ok = meck:new(emqx_channel, [passthrough, no_history, no_link]), %% Meck Cm diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index b732735bd..3f50db6e3 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -908,6 +908,8 @@ t_session_takeover(Config) when is_list(Config) -> ?assertMatch([_], emqx:publish(Message3)), ?assertMatch([_], emqx:publish(Message4)), {true, _} = last_message(<<"hello2">>, [ConnPid2]), + %% We may or may not recv dup hello2 due to QoS1 redelivery + _ = last_message(<<"hello2">>, [ConnPid2]), {true, _} = last_message(<<"hello3">>, [ConnPid2]), {true, _} = last_message(<<"hello4">>, [ConnPid2]), ?assertEqual([], collect_msgs(timer:seconds(2))), diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index 03a48e174..2eda29e5d 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -24,14 +24,17 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --define(TOPIC, <<"t">>). -define(CNT, 100). -define(SLEEP, 10). %%-------------------------------------------------------------------- %% Initial funcs -all() -> emqx_common_test_helpers:all(?MODULE). +all() -> + [ + {group, mqttv3}, + {group, mqttv5} + ]. init_per_suite(Config) -> Apps = emqx_cth_suite:start( @@ -44,27 +47,39 @@ end_per_suite(Config) -> Apps = ?config(apps, Config), ok = emqx_cth_suite:stop(Apps), ok. + +groups() -> + [ + {mqttv3, [], emqx_common_test_helpers:all(?MODULE)}, + {mqttv5, [], emqx_common_test_helpers:all(?MODULE)} + ]. + +init_per_group(mqttv3, Config) -> + lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v3}); +init_per_group(mqttv5, Config) -> + lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v5}). + +end_per_group(_Group, _Config) -> + ok. + %%-------------------------------------------------------------------- %% Testcases -t_takeover(_) -> +t_takeover(Config) -> process_flag(trap_exit, true), - ClientId = <<"clientid">>, + ClientId = atom_to_binary(?FUNCTION_NAME), + ClientOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false} + ], Middle = ?CNT div 2, - Client1Msgs = messages(0, Middle), - Client2Msgs = messages(Middle, ?CNT div 2), + Client1Msgs = messages(ClientId, 0, Middle), + Client2Msgs = messages(ClientId, Middle, ?CNT div 2), AllMsgs = Client1Msgs ++ Client2Msgs, - - meck:new(emqx_cm, [non_strict, passthrough]), - meck:expect(emqx_cm, takeover_session_end, fun(Arg) -> - ok = timer:sleep(?SLEEP * 2), - meck:passthrough([Arg]) - end), - Commands = - [{fun start_client/4, [ClientId, <<"t">>, ?QOS_1]}] ++ + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}] ++ [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ - [{fun start_client/4, [ClientId, <<"t">>, ?QOS_1]}] ++ + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}] ++ [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] ++ [{fun stop_client/1, []}], @@ -78,30 +93,144 @@ t_takeover(_) -> ), #{client := [CPid2, CPid1]} = FCtx, - ?assertReceive({'EXIT', CPid1, {disconnected, ?RC_SESSION_TAKEN_OVER, _}}), - ?assertReceive({'EXIT', CPid2, normal}), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config)), + ?assertReceive({'EXIT', CPid2, normal}), Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], ct:pal("middle: ~p", [Middle]), ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), assert_messages_missed(AllMsgs, Received), assert_messages_order(AllMsgs, Received), - - meck:unload(emqx_cm), ok. -t_takover_in_cluster(_) -> - todo. +t_takeover_willmsg(Config) -> + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Middle = ?CNT div 2, + Client1Msgs = messages(ClientId, 0, Middle), + Client2Msgs = messages(ClientId, Middle, ?CNT div 2), + AllMsgs = Client1Msgs ++ Client2Msgs, + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload">>}, + {will_qos, 0} + ], + Commands = + %% GIVEN client connect with will message + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + %% WHEN client reconnect with clean_start = false + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config)), + Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload">>), + assert_messages_missed(AllMsgs, ReceivedNoWill), + assert_messages_order(AllMsgs, ReceivedNoWill), + %% THEN will message should be received + ?assert(IsWill), + emqtt:stop(CPidSub), + emqtt:stop(CPid2), + ?assertReceive({'EXIT', CPid2, normal}), + ?assert(not is_process_alive(CPid1)), + ok. + +t_takeover_willmsg_clean_session(Config) -> + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Middle = ?CNT div 2, + Client1Msgs = messages(ClientId, 0, Middle), + Client2Msgs = messages(ClientId, Middle, ?CNT div 2), + AllMsgs = Client1Msgs ++ Client2Msgs, + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_1">>}, + {will_qos, 1} + ], + WillOptsClean = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, true}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_2">>}, + {will_qos, 1} + ], + + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_1">> + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + %% WHEN: client connects with clean_start=true and willmsg payload <<"willpayload_2">> + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOptsClean]}] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] ++ + [ + { + fun(CTX) -> + timer:sleep(1000), + CTX + end, + [] + } + ], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config)), + Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_1">>), + {IsWill2, ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>), + assert_messages_missed(AllMsgs, ReceivedNoWill), + assert_messages_order(AllMsgs, ReceivedNoWill), + %% THEN: payload <<"willpayload_1">> should be published instead of <<"willpayload_2">> + ?assert(IsWill1), + ?assertNot(IsWill2), + emqtt:stop(CPid2), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ok. + +%% t_takover_in_cluster(_) -> +%% todo. %%-------------------------------------------------------------------- %% Commands - -start_client(Ctx, ClientId, Topic, Qos) -> - {ok, CPid} = emqtt:start_link([ - {clientid, ClientId}, - {proto_ver, v5}, - {clean_start, false} - ]), +start_client(Ctx, ClientId, Topic, Qos, Opts) -> + {ok, CPid} = emqtt:start_link([{clientid, ClientId} | Opts]), _ = erlang:spawn_link(fun() -> {ok, _} = emqtt:connect(CPid), ct:pal("CLIENT: connected ~p", [CPid]), @@ -157,8 +286,8 @@ assert_messages_order([Msg | Expected], Received) -> assert_messages_order(Expected, Rest) end. -messages(Offset, Cnt) -> - [emqx_message:make(ct, ?QOS_1, ?TOPIC, payload(Offset + I)) || I <- lists:seq(1, Cnt)]. +messages(Topic, Offset, Cnt) -> + [emqx_message:make(ct, ?QOS_1, Topic, payload(Offset + I)) || I <- lists:seq(1, Cnt)]. payload(I) -> % NOTE @@ -170,3 +299,30 @@ payload(I) -> emqx_utils_calendar:now_to_rfc3339(millisecond) ]) ). + +%% @doc Filter out the message with matching target payload from the list of messages. +%% return '{IsTargetFound, ListOfOtherMessages}' +%% @end +-spec filter_payload(List :: [#{payload := binary()}], Payload :: binary()) -> + {IsPayloadFound :: boolean(), OtherPayloads :: [#{payload := binary()}]}. +filter_payload(List, Payload) when is_binary(Payload) -> + IsWill = lists:any( + fun + (#{payload := P}) -> Payload == P; + (_) -> false + end, + List + ), + Filtered = [ + Msg + || #{payload := P} = Msg <- List, + P =/= Payload + ], + {IsWill, Filtered}. + +%% @doc assert emqtt *client* process exits as expected. +assert_client_exit(Pid, v5) -> + %% @ref: MQTT 5.0 spec [MQTT-3.1.4-3] + ?assertReceive({'EXIT', Pid, {disconnected, ?RC_SESSION_TAKEN_OVER, _}}); +assert_client_exit(Pid, v3) -> + ?assertReceive({'EXIT', Pid, {shutdown, tcp_closed}}). diff --git a/changes/ce/fix-11868.en.md b/changes/ce/fix-11868.en.md new file mode 100644 index 000000000..da0a09dff --- /dev/null +++ b/changes/ce/fix-11868.en.md @@ -0,0 +1,2 @@ +Fix a bug that willmsg is not published after session takeover. + From 9da4896f579425ad705def08920a05294354086b Mon Sep 17 00:00:00 2001 From: William Yang Date: Fri, 3 Nov 2023 16:06:42 +0100 Subject: [PATCH 02/14] fix(mqtt): ensure publish willmsg when session expires --- apps/emqx/src/emqx_channel.erl | 52 ++++++++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 7b23121c5..1eb3a2ea8 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1137,7 +1137,7 @@ handle_call( conninfo = #{proto_ver := ProtoVer} } ) -> - Channel0 = maybe_publish_will_msg(Channel), + Channel0 = maybe_publish_will_msg(kick, Channel), Channel1 = case ConnState of connected -> ensure_disconnected(kicked, Channel0); @@ -1230,8 +1230,9 @@ handle_info( ) when ConnState =:= connected orelse ConnState =:= reauthenticating -> + {Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session), - Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), + Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(sock_closed, Channel)), Channel2 = Channel1#channel{session = Session1}, case maybe_shutdown(Reason, Intent, Channel2) of {ok, Channel3} -> {ok, ?REPLY_EVENT(disconnected), Channel3}; @@ -1425,7 +1426,7 @@ terminate(normal, Channel) -> terminate({shutdown, kicked}, Channel) -> run_terminate_hook(kicked, Channel); terminate(Reason, Channel) -> - Channel1 = maybe_publish_will_msg(Channel), + Channel1 = maybe_publish_will_msg(Reason, Channel), run_terminate_hook(Reason, Channel1). run_terminate_hook(_Reason, #channel{session = undefined}) -> @@ -2217,16 +2218,43 @@ session_disconnect(_ClientInfo, _ConnInfo, undefined) -> %%-------------------------------------------------------------------- %% Maybe Publish will msg - -maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) -> +-spec maybe_publish_will_msg(Reason, channel()) -> channel() when + Reason :: kick | sock_closed | {shutdown, atom()}. +maybe_publish_will_msg(_Reason, Channel = #channel{will_msg = undefined}) -> Channel; -maybe_publish_will_msg(Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) -> - case will_delay_interval(WillMsg) of - 0 -> - ok = publish_will_msg(ClientInfo, WillMsg), - Channel#channel{will_msg = undefined}; - I -> - ensure_timer(will_message, timer:seconds(I), Channel) +maybe_publish_will_msg( + _Reason, + Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V3}, will_msg = WillMsg} +) -> + %% Unconditionally publish will message for MQTT 3.1.1 + ok = publish_will_msg(Channel#channel.clientinfo, WillMsg), + Channel#channel{will_msg = undefined}; +maybe_publish_will_msg( + Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers} +) when + Reason =:= {shutdown, expired} +-> + %% Must publish now without delay and cancel the will message timer. + DelayedWillTimer = maps:get(will_message, Timers, undefined), + DelayedWillTimer =/= undefined andalso erlang:cancel_timer(DelayedWillTimer, [{async, true}]), + _ = publish_will_msg(ClientInfo, WillMsg), + Channel#channel{will_msg = undefined, timers = maps:remove(will_message, Timers)}; +maybe_publish_will_msg( + _OtherReasons, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers} +) -> + case maps:get(will_message, Timers, undefined) of + undefined -> + %% May defer the will message publishing + case will_delay_interval(WillMsg) of + 0 -> + ok = publish_will_msg(ClientInfo, WillMsg), + Channel#channel{will_msg = undefined}; + I -> + ensure_timer(will_message, timer:seconds(I), Channel) + end; + %% Will message is already scheduled + _ -> + Channel end. will_delay_interval(WillMsg) -> From 6243cf0b0c3cd5cdf67363182c200b43e58d2e54 Mon Sep 17 00:00:00 2001 From: William Yang Date: Fri, 10 Nov 2023 08:55:59 +0100 Subject: [PATCH 03/14] fix(kick): defer willmsg publish when conn terminates because kick means shutdown connection AND delete session --- apps/emqx/src/emqx_channel.erl | 10 ++- apps/emqx/test/emqx_takeover_SUITE.erl | 92 +++++++++++++++++++------- 2 files changed, 72 insertions(+), 30 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 1eb3a2ea8..026ab3b43 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1137,11 +1137,10 @@ handle_call( conninfo = #{proto_ver := ProtoVer} } ) -> - Channel0 = maybe_publish_will_msg(kick, Channel), Channel1 = case ConnState of - connected -> ensure_disconnected(kicked, Channel0); - _ -> Channel0 + connected -> ensure_disconnected(kicked, Channel); + _ -> Channel end, case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of true -> @@ -1423,8 +1422,6 @@ terminate(_, #channel{conn_state = idle} = _Channel) -> ok; terminate(normal, Channel) -> run_terminate_hook(normal, Channel); -terminate({shutdown, kicked}, Channel) -> - run_terminate_hook(kicked, Channel); terminate(Reason, Channel) -> Channel1 = maybe_publish_will_msg(Reason, Channel), run_terminate_hook(Reason, Channel1). @@ -2232,7 +2229,8 @@ maybe_publish_will_msg( maybe_publish_will_msg( Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers} ) when - Reason =:= {shutdown, expired} + Reason =:= {shutdown, expired} orelse + Reason =:= {shutdown, kicked} -> %% Must publish now without delay and cancel the will message timer. DelayedWillTimer = maps:get(will_message, Timers, undefined), diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index 2eda29e5d..47e054907 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -94,7 +94,7 @@ t_takeover(Config) -> #{client := [CPid2, CPid1]} = FCtx, - assert_client_exit(CPid1, ?config(mqtt_vsn, Config)), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), ?assertReceive({'EXIT', CPid2, normal}), Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], ct:pal("middle: ~p", [Middle]), @@ -141,7 +141,7 @@ t_takeover_willmsg(Config) -> ), #{client := [CPid2, CPidSub, CPid1]} = FCtx, - assert_client_exit(CPid1, ?config(mqtt_vsn, Config)), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload">>), @@ -189,16 +189,7 @@ t_takeover_willmsg_clean_session(Config) -> [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ %% WHEN: client connects with clean_start=true and willmsg payload <<"willpayload_2">> [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOptsClean]}] ++ - [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] ++ - [ - { - fun(CTX) -> - timer:sleep(1000), - CTX - end, - [] - } - ], + [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs], FCtx = lists:foldl( fun({Fun, Args}, Ctx) -> @@ -209,7 +200,7 @@ t_takeover_willmsg_clean_session(Config) -> Commands ), #{client := [CPid2, CPidSub, CPid1]} = FCtx, - assert_client_exit(CPid1, ?config(mqtt_vsn, Config)), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_1">>), @@ -224,6 +215,58 @@ t_takeover_willmsg_clean_session(Config) -> ?assert(not is_process_alive(CPid1)), ok. +t_kick_session(Config) -> + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + % emqx_logger:set_log_level(debug), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_kick">>}, + {will_qos, 1} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_kick">> + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [ + %% kick may fail (not found) without this delay + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + } + ] ++ + %% WHEN: client is kicked with kick_session + [{fun kick_client/2, [ClientId]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), kicked), + Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + %% THEN: payload <<"willpayload_kick">> should be published + {IsWill1, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_kick">>), + ?assert(IsWill1), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ok. + %% t_takover_in_cluster(_) -> %% todo. @@ -238,6 +281,10 @@ start_client(Ctx, ClientId, Topic, Qos, Opts) -> end), Ctx#{client => [CPid | maps:get(client, Ctx, [])]}. +kick_client(Ctx, ClientId) -> + ok = emqx_cm:kick_session(ClientId), + Ctx. + publish_msg(Ctx, Msg) -> ok = timer:sleep(rand:uniform(?SLEEP)), case emqx:publish(Msg) of @@ -306,23 +353,20 @@ payload(I) -> -spec filter_payload(List :: [#{payload := binary()}], Payload :: binary()) -> {IsPayloadFound :: boolean(), OtherPayloads :: [#{payload := binary()}]}. filter_payload(List, Payload) when is_binary(Payload) -> - IsWill = lists:any( - fun - (#{payload := P}) -> Payload == P; - (_) -> false - end, - List - ), Filtered = [ Msg || #{payload := P} = Msg <- List, P =/= Payload ], - {IsWill, Filtered}. + {length(List) =/= length(Filtered), Filtered}. %% @doc assert emqtt *client* process exits as expected. -assert_client_exit(Pid, v5) -> +assert_client_exit(Pid, v5, takenover) -> %% @ref: MQTT 5.0 spec [MQTT-3.1.4-3] ?assertReceive({'EXIT', Pid, {disconnected, ?RC_SESSION_TAKEN_OVER, _}}); -assert_client_exit(Pid, v3) -> - ?assertReceive({'EXIT', Pid, {shutdown, tcp_closed}}). +assert_client_exit(Pid, v3, takenover) -> + ?assertReceive({'EXIT', Pid, {shutdown, tcp_closed}}); +assert_client_exit(Pid, v3, kicked) -> + ?assertReceive({'EXIT', Pid, _}); +assert_client_exit(Pid, v5, kicked) -> + ?assertReceive({'EXIT', Pid, {disconnected, ?RC_ADMINISTRATIVE_ACTION, _}}). From b76c701b1c0711adfe7a54e14aeab078598a3010 Mon Sep 17 00:00:00 2001 From: William Yang Date: Fri, 10 Nov 2023 11:45:17 +0100 Subject: [PATCH 04/14] test(takeover): add back the delay when takeover --- apps/emqx/test/emqx_takeover_SUITE.erl | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index 47e054907..d10d52661 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -76,6 +76,14 @@ t_takeover(Config) -> Client1Msgs = messages(ClientId, 0, Middle), Client2Msgs = messages(ClientId, Middle, ?CNT div 2), AllMsgs = Client1Msgs ++ Client2Msgs, + meck:new(emqx_cm, [non_strict, passthrough]), + meck:expect(emqx_cm, takeover_session_end, fun(Arg) -> + %% trigger more complex takeover conditions during 2-phase takeover protocol: + %% when messages are accumulated in 2 processes simultaneously, + %% and need to be properly ordered / deduplicated after the protocol commences. + ok = timer:sleep(?SLEEP * 2), + meck:passthrough([Arg]) + end), Commands = [{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}] ++ [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ @@ -101,6 +109,7 @@ t_takeover(Config) -> ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), assert_messages_missed(AllMsgs, Received), assert_messages_order(AllMsgs, Received), + meck:unload(emqx_cm), ok. t_takeover_willmsg(Config) -> From dd62280e0498e8a8fca99d36a737d059f71c2d20 Mon Sep 17 00:00:00 2001 From: William Yang Date: Fri, 10 Nov 2023 17:22:07 +0100 Subject: [PATCH 05/14] fix: handle delayed willmsg, part 1 --- apps/emqx/src/emqx_channel.erl | 92 ++++++++++++++++++------ apps/emqx/test/emqx_shared_sub_SUITE.erl | 3 + apps/emqx/test/emqx_takeover_SUITE.erl | 62 ++++++++++++++++ 3 files changed, 136 insertions(+), 21 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 026ab3b43..46a1a7f51 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -117,7 +117,17 @@ atom() => term() }. --type conn_state() :: idle | connecting | connected | reauthenticating | disconnected. +%% init +-type conn_state() :: + idle + %% mqtt connect recved but not acked + | connecting + %% mqtt connect acked + | connected + %% mqtt connected but reauthenticating + | reauthenticating + %% keepalive timeout or connection terminated + | disconnected. -type reply() :: {outgoing, emqx_types:packet()} @@ -1137,10 +1147,11 @@ handle_call( conninfo = #{proto_ver := ProtoVer} } ) -> + Channel0 = maybe_publish_will_msg(kicked, Channel), Channel1 = case ConnState of - connected -> ensure_disconnected(kicked, Channel); - _ -> Channel + connected -> ensure_disconnected(kicked, Channel0); + _ -> Channel0 end, case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of true -> @@ -1422,6 +1433,14 @@ terminate(_, #channel{conn_state = idle} = _Channel) -> ok; terminate(normal, Channel) -> run_terminate_hook(normal, Channel); +terminate({shutdown, Reason}, Channel) when + Reason =:= expired orelse + Reason =:= takenover orelse + Reason =:= kicked orelse + Reason =:= discarded +-> + Channel1 = maybe_publish_will_msg(Reason, Channel), + run_terminate_hook(Reason, Channel1); terminate(Reason, Channel) -> Channel1 = maybe_publish_will_msg(Reason, Channel), run_terminate_hook(Reason, Channel1). @@ -2216,9 +2235,19 @@ session_disconnect(_ClientInfo, _ConnInfo, undefined) -> %%-------------------------------------------------------------------- %% Maybe Publish will msg -spec maybe_publish_will_msg(Reason, channel()) -> channel() when - Reason :: kick | sock_closed | {shutdown, atom()}. + Reason :: takenover | kicked | discarded | expired | sock_closed | {shutdown, atom()}. maybe_publish_will_msg(_Reason, Channel = #channel{will_msg = undefined}) -> Channel; +maybe_publish_will_msg({shutdown, not_authorized}, Channel) -> + Channel; +maybe_publish_will_msg(not_authorized, Channel) -> + Channel; +maybe_publish_will_msg(_Reason, Channel = #channel{conn_state = ConnState}) when + ConnState =:= idle orelse + ConnState =:= connecting orelse + ConnState =:= reauthenticating +-> + Channel; maybe_publish_will_msg( _Reason, Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V3}, will_msg = WillMsg} @@ -2229,30 +2258,51 @@ maybe_publish_will_msg( maybe_publish_will_msg( Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers} ) when - Reason =:= {shutdown, expired} orelse - Reason =:= {shutdown, kicked} + Reason =:= expired orelse + Reason =:= discarded orelse + %% Unsure... + Reason =:= {shutdown, internal_error} orelse + Reason =:= kicked -> - %% Must publish now without delay and cancel the will message timer. + %% For the cases that session MUST be gone. + %% a. expired (session expired) + %% c. discarded (Session ends because another process starts new session with the same clientid) + %% b. kicked. (kicked by operation) + %% d. internal_error (maybe not recoverable) + %% This ensures willmsg will be published if the willmsg timer is scheduled but not fired + %% OR fired but not yet handled DelayedWillTimer = maps:get(will_message, Timers, undefined), DelayedWillTimer =/= undefined andalso erlang:cancel_timer(DelayedWillTimer, [{async, true}]), _ = publish_will_msg(ClientInfo, WillMsg), Channel#channel{will_msg = undefined, timers = maps:remove(will_message, Timers)}; maybe_publish_will_msg( - _OtherReasons, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers} + Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers} ) -> - case maps:get(will_message, Timers, undefined) of - undefined -> - %% May defer the will message publishing - case will_delay_interval(WillMsg) of - 0 -> - ok = publish_will_msg(ClientInfo, WillMsg), - Channel#channel{will_msg = undefined}; - I -> - ensure_timer(will_message, timer:seconds(I), Channel) - end; - %% Will message is already scheduled - _ -> - Channel + %% For the cases that session MAY/MAY NOT be gone, we don't care about session expired or not. + %% willmsg publish could be defered. + IsSessionExpirationInProgress = maps:is_key(expire_session, Timers), + IsWillmsgScheduled = maps:is_key(will_message, Timers), + case will_delay_interval(WillMsg) of + 0 -> + %% [MQTT-3.1.2-8], 0 means will delay Will Delay Interval has elapsed + false = IsWillmsgScheduled, + ok = publish_will_msg(ClientInfo, WillMsg), + Channel#channel{will_msg = undefined}; + I when IsSessionExpirationInProgress andalso not IsWillmsgScheduled -> + %% We delay the will message publishing + %% Willmsg will be published whatever which timer fired first + ensure_timer(will_message, timer:seconds(I), Channel); + _ when IsSessionExpirationInProgress andalso IsWillmsgScheduled -> + %% Willmsg will be published whatever which timer fired first [MQTT-3.1.3-9]. + Channel; + _I when Reason =:= takenover -> + %% don't see the point to delay the willmsg + Channel; + _I when not IsSessionExpirationInProgress andalso IsWillmsgScheduled -> + Channel; + I when not IsSessionExpirationInProgress andalso not IsWillmsgScheduled -> + %% @FIXME: process may terminate before the timer fired + ensure_timer(will_message, timer:seconds(I), Channel) end. will_delay_interval(WillMsg) -> diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 3f50db6e3..dc755e78c 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -923,6 +923,7 @@ t_session_kicked({init, Config}) when is_list(Config) -> t_session_kicked({'end', Config}) when is_list(Config) -> emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0); t_session_kicked(Config) when is_list(Config) -> + emqx_logger:set_log_level(debug), Topic = <<"foo/bar/1">>, ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, @@ -953,6 +954,8 @@ t_session_kicked(Config) when is_list(Config) -> %% on if it's picked as the first one for round_robin MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1), MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2), + + ct:pal("MsgRec1: ~p MsgRec2 ~p ~n", [MsgRec1, MsgRec2]), case MsgRec2 of <<"hello3">> -> ?assertEqual(<<"hello1">>, MsgRec1); diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index d10d52661..d7b143e43 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -224,6 +224,68 @@ t_takeover_willmsg_clean_session(Config) -> ?assert(not is_process_alive(CPid1)), ok. +t_takeover_clean_session_with_delayed_willmsg(Config) -> + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Middle = ?CNT div 2, + Client1Msgs = messages(ClientId, 0, Middle), + Client2Msgs = messages(ClientId, Middle, ?CNT div 2), + AllMsgs = Client1Msgs ++ Client2Msgs, + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + %% mqttv5 only + {properties, #{'Will-Delay-Interval' => 10000}} + ], + WillOptsClean = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, true}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_2">>}, + {will_qos, 1} + ], + + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> and delay-interval 10s + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + %% WHEN: client connects with clean_start=true and willmsg payload <<"willpayload_2">> + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOptsClean]}] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay10">>), + {IsWill2, ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>), + assert_messages_missed(AllMsgs, ReceivedNoWill), + assert_messages_order(AllMsgs, ReceivedNoWill), + %% THEN: payload <<"willpayload_delay10">> should be published without delay. + ?assert(IsWill1), + ?assertNot(IsWill2), + emqtt:stop(CPid2), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ok. + t_kick_session(Config) -> process_flag(trap_exit, true), ClientId = atom_to_binary(?FUNCTION_NAME), From 53974023965bc684bdc62dee25623282af1ff681 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 13 Nov 2023 16:25:29 +0100 Subject: [PATCH 06/14] test(willmsg): test will delay and session expires will delay > session expire will delay < session expire timer triggered events are handled in seq, exclude the case of (will delay == session expire) --- apps/emqx/test/emqx_takeover_SUITE.erl | 128 ++++++++++++++++++++++++- 1 file changed, 124 insertions(+), 4 deletions(-) diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index d7b143e43..b8544d02d 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -50,7 +50,12 @@ end_per_suite(Config) -> groups() -> [ - {mqttv3, [], emqx_common_test_helpers:all(?MODULE)}, + {mqttv3, [], + emqx_common_test_helpers:all(?MODULE) -- + [ + t_session_expire_with_delayed_willmsg, + t_no_takeover_with_delayed_willmsg + ]}, {mqttv5, [], emqx_common_test_helpers:all(?MODULE)} ]. @@ -239,7 +244,7 @@ t_takeover_clean_session_with_delayed_willmsg(Config) -> {will_payload, <<"willpayload_delay10">>}, {will_qos, 1}, %% mqttv5 only - {properties, #{'Will-Delay-Interval' => 10000}} + {will_props, #{'Will-Delay-Interval' => 10}} ], WillOptsClean = [ {proto_ver, ?config(mqtt_vsn, Config)}, @@ -286,11 +291,124 @@ t_takeover_clean_session_with_delayed_willmsg(Config) -> ?assert(not is_process_alive(CPid1)), ok. +t_no_takeover_with_delayed_willmsg(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay3">>}, + {will_qos, 1}, + %secs + {will_props, #{'Will-Delay-Interval' => 3}}, + % secs + {properties, #{'Session-Expiry-Interval' => 10}} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay3">> and delay-interval 3s + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + assert_messages_missed(Client1Msgs, Received), + #{client := [CPidSub, CPid1]} = FCtx, + %% WHEN: client disconnects abnormally AND no reconnect after 3s. + exit(CPid1, kill), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), killed), + + Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + + {IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay3">>), + ?assertNot(IsWill), + ?assertEqual([], ReceivedNoWill), + %% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after WILL delay. + Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], + {IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay3">>), + ?assertEqual([], ReceivedNoWill11), + ?assert(IsWill11), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ok. + +t_session_expire_with_delayed_willmsg(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 10}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> + %% and delay-interval 10s > session expiry 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + assert_messages_missed(Client1Msgs, Received), + #{client := [CPidSub, CPid1]} = FCtx, + %% WHEN: client disconnects abnormally AND no reconnect after 3s. + exit(CPid1, kill), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), killed), + + Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + {IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay10">>), + ?assertNot(IsWill), + ?assertEqual([], ReceivedNoWill), + %% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after session expiry. + Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], + {IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay10">>), + ?assertEqual([], ReceivedNoWill11), + ?assert(IsWill11), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ok. + t_kick_session(Config) -> process_flag(trap_exit, true), ClientId = atom_to_binary(?FUNCTION_NAME), WillTopic = <>/binary>>, - % emqx_logger:set_log_level(debug), WillOpts = [ {proto_ver, ?config(mqtt_vsn, Config)}, {clean_start, false}, @@ -440,4 +558,6 @@ assert_client_exit(Pid, v3, takenover) -> assert_client_exit(Pid, v3, kicked) -> ?assertReceive({'EXIT', Pid, _}); assert_client_exit(Pid, v5, kicked) -> - ?assertReceive({'EXIT', Pid, {disconnected, ?RC_ADMINISTRATIVE_ACTION, _}}). + ?assertReceive({'EXIT', Pid, {disconnected, ?RC_ADMINISTRATIVE_ACTION, _}}); +assert_client_exit(Pid, _, killed) -> + ?assertReceive({'EXIT', Pid, killed}). From 6311b582ec531490b79e88703fe99b44b5a9074e Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 13 Nov 2023 17:18:18 +0100 Subject: [PATCH 07/14] test(willmsg): session taken over before willmsg delay /session expire --- apps/emqx/test/emqx_takeover_SUITE.erl | 142 ++++++++++++++++++++++++- 1 file changed, 141 insertions(+), 1 deletion(-) diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index b8544d02d..bce5d3761 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -54,7 +54,9 @@ groups() -> emqx_common_test_helpers:all(?MODULE) -- [ t_session_expire_with_delayed_willmsg, - t_no_takeover_with_delayed_willmsg + t_no_takeover_with_delayed_willmsg, + t_takeover_before_session_expire, + t_takeover_before_willmsg_expire ]}, {mqttv5, [], emqx_common_test_helpers:all(?MODULE)} ]. @@ -405,6 +407,144 @@ t_session_expire_with_delayed_willmsg(Config) -> ?assert(not is_process_alive(CPid1)), ok. +t_takeover_before_session_expire(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + emqx_logger:set_log_level(debug), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 10}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> + %% and delay-interval 10s > session expiry 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + [ + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + } + ] ++ + %% WHEN: client session is taken over with in 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + ct:pal("FCtx: ~p", [FCtx]), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + assert_messages_missed(Client1Msgs, Received), + + Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + {IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay10">>), + ?assertNot(IsWill), + ?assertEqual([], ReceivedNoWill), + %% THEN: for MQTT v5, payload <<"willpayload_delay10">> should NOT be published. + Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], + {IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay10">>), + ?assertEqual([], ReceivedNoWill11), + ?assertNot(IsWill11), + emqtt:stop(CPidSub), + emqtt:stop(CPid2), + ?assert(not is_process_alive(CPid1)), + ok. + +t_takeover_before_willmsg_expire(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + emqx_logger:set_log_level(debug), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 3}}, + {properties, #{'Session-Expiry-Interval' => 10}} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> + %% and will-delay-interval 3s < session expiry 10s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + [ + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + } + ] ++ + %% WHEN: another client takeover the session with in 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + ct:pal("FCtx: ~p", [FCtx]), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + assert_messages_missed(Client1Msgs, Received), + + Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + {IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay10">>), + ?assertNot(IsWill), + ?assertEqual([], ReceivedNoWill), + %% THEN: for MQTT v5, payload <<"willpayload_delay10">> should NOT be published after 3s. + Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], + {IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay10">>), + ?assertEqual([], ReceivedNoWill11), + ?assertNot(IsWill11), + emqtt:stop(CPidSub), + emqtt:stop(CPid2), + ?assert(not is_process_alive(CPid1)), + ok. + t_kick_session(Config) -> process_flag(trap_exit, true), ClientId = atom_to_binary(?FUNCTION_NAME), From e5a3574d8999dbb892bae33096efdb8de753370e Mon Sep 17 00:00:00 2001 From: William Yang Date: Wed, 15 Nov 2023 13:55:26 +0100 Subject: [PATCH 08/14] fix: maybe send willmsg --- apps/emqx/src/emqx_channel.erl | 186 ++++++++++---- apps/emqx/test/emqx_takeover_SUITE.erl | 325 +++++++++++++++++++++---- 2 files changed, 421 insertions(+), 90 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 46a1a7f51..296cce949 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -145,6 +145,7 @@ ). -define(LIMITER_ROUTING, message_routing). +-define(chan_terminating, chan_terminating). -dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}). @@ -873,6 +874,7 @@ do_unsubscribe( %% MQTT-v5.0: 3.14.4 DISCONNECT Actions maybe_clean_will_msg(?RC_SUCCESS, Channel) -> + %% [MQTT-3.14.4-3] Channel#channel{will_msg = undefined}; maybe_clean_will_msg(_ReasonCode, Channel) -> Channel. @@ -1165,7 +1167,8 @@ handle_call( shutdown(kicked, ok, Channel1) end; handle_call(discard, Channel) -> - disconnect_and_shutdown(discarded, ok, Channel); + Channel0 = maybe_publish_will_msg(discarded, Channel), + disconnect_and_shutdown(discarded, ok, Channel0); %% Session Takeover handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) -> reply(Session, Channel#channel{takeover = true}); @@ -1188,7 +1191,8 @@ handle_call( emqx_channel_takeover_end, #{clientid => ClientId} ), - disconnect_and_shutdown(takenover, AllPendings, Channel); + Channel0 = maybe_publish_will_msg(takenover, Channel), + disconnect_and_shutdown(takenover, AllPendings, Channel0); handle_call(list_authz_cache, Channel) -> {reply, emqx_authz_cache:list_authz_cache(), Channel}; handle_call( @@ -1240,7 +1244,6 @@ handle_info( ) when ConnState =:= connected orelse ConnState =:= reauthenticating -> - {Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session), Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(sock_closed, Channel)), Channel2 = Channel1#channel{session = Session1}, @@ -1354,8 +1357,9 @@ handle_timeout( handle_out(publish, Replies, Channel#channel{session = NSession}) end; handle_timeout(_TRef, expire_session, Channel = #channel{session = Session}) -> + Channel0 = maybe_publish_will_msg(expired, Channel), ok = emqx_session:destroy(Session), - shutdown(expired, Channel); + shutdown(expired, Channel0); handle_timeout( _TRef, will_message = TimerName, @@ -1439,10 +1443,9 @@ terminate({shutdown, Reason}, Channel) when Reason =:= kicked orelse Reason =:= discarded -> - Channel1 = maybe_publish_will_msg(Reason, Channel), - run_terminate_hook(Reason, Channel1); + run_terminate_hook(Reason, Channel); terminate(Reason, Channel) -> - Channel1 = maybe_publish_will_msg(Reason, Channel), + Channel1 = maybe_publish_will_msg(?chan_terminating, Channel), run_terminate_hook(Reason, Channel1). run_terminate_hook(_Reason, #channel{session = undefined}) -> @@ -2234,74 +2237,149 @@ session_disconnect(_ClientInfo, _ConnInfo, undefined) -> %%-------------------------------------------------------------------- %% Maybe Publish will msg +%% @doc May publish will message [MQTT-3.1.2-8] +%% When willmsg presents the decision whether or when to publish the Will Message are effected by +%% the followings: +%% - connecion state +%% - If it is MQTT normal disconnection (RC: 0) +%% - If it is MQTT normal disconnection (RC: 4) +%% - will delay interval (MQTT 5.0 only) +%% - session expire Session Expiry (MQTT 5.0 only) +%% - EMQX operations on the client +%% @NOTE: +%% Connection close with session expiry interval = 0 means session close. -spec maybe_publish_will_msg(Reason, channel()) -> channel() when - Reason :: takenover | kicked | discarded | expired | sock_closed | {shutdown, atom()}. + Reason :: + %% Connection will terminate because session is taken over by another process. + takenover + %% Connection will terminate because of EMQX mgmt operation, also delete the session. + | kicked + %% Connection will terminate because session is taken over by another process. + | discarded + %% Connection will terminate because session is expired + | expired + %% Connection will terminate because of socket close/error + | sock_closed + %% Connection will terminate with Reasons + | {shutdown, atom()} + %% Connection will terminate soon, delay willmsg publish is impossible. + | ?chan_terminating + %% Connection will terminate because of normal MQTT disconnection, implies delete the session. + | normal. +maybe_publish_will_msg(normal, Channel) -> + %% [MQTT-3.1.2-8] + Channel; maybe_publish_will_msg(_Reason, Channel = #channel{will_msg = undefined}) -> + %% No will message to publish Channel; -maybe_publish_will_msg({shutdown, not_authorized}, Channel) -> - Channel; -maybe_publish_will_msg(not_authorized, Channel) -> - Channel; -maybe_publish_will_msg(_Reason, Channel = #channel{conn_state = ConnState}) when +maybe_publish_will_msg( + _Reason, + Channel = #channel{ + conn_state = ConnState, + conninfo = #{clientid := ClientId} + } +) when ConnState =:= idle orelse ConnState =:= connecting orelse ConnState =:= reauthenticating -> + %% Wrong state to publish + ?tp(debug, willmsg_wrong_state, #{clientid => ClientId}), Channel; maybe_publish_will_msg( _Reason, - Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V3}, will_msg = WillMsg} + Channel = #channel{ + conninfo = #{proto_ver := ?MQTT_PROTO_V3, clientid := ClientId}, will_msg = WillMsg + } ) -> %% Unconditionally publish will message for MQTT 3.1.1 - ok = publish_will_msg(Channel#channel.clientinfo, WillMsg), + ?tp(debug, willmsg_v3, #{clientid => ClientId}), + _ = publish_will_msg(Channel#channel.clientinfo, WillMsg), Channel#channel{will_msg = undefined}; maybe_publish_will_msg( - Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers} + Reason, + Channel = #channel{ + clientinfo = ClientInfo, + conninfo = #{clientid := ClientId}, + will_msg = WillMsg + } ) when Reason =:= expired orelse Reason =:= discarded orelse - %% Unsure... - Reason =:= {shutdown, internal_error} orelse - Reason =:= kicked + Reason =:= kicked orelse + Reason =:= ?chan_terminating orelse + %% Depends on the session backend, we may lost the session + Reason =:= {shutdown, internal_error} -> - %% For the cases that session MUST be gone. + %% For the cases that session MUST be gone impiles that the will message MUST be published %% a. expired (session expired) - %% c. discarded (Session ends because another process starts new session with the same clientid) - %% b. kicked. (kicked by operation) + %% b. discarded (Session ends because another process starts new session with the same clientid) + %% c. kicked. (kicked by operation) %% d. internal_error (maybe not recoverable) %% This ensures willmsg will be published if the willmsg timer is scheduled but not fired %% OR fired but not yet handled - DelayedWillTimer = maps:get(will_message, Timers, undefined), - DelayedWillTimer =/= undefined andalso erlang:cancel_timer(DelayedWillTimer, [{async, true}]), + ?tp(debug, willmsg_session_ends, #{clientid => ClientId}), _ = publish_will_msg(ClientInfo, WillMsg), - Channel#channel{will_msg = undefined, timers = maps:remove(will_message, Timers)}; + remove_willmsg(Channel); maybe_publish_will_msg( - Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers} + takenover, + Channel = #channel{ + clientinfo = ClientInfo, + will_msg = WillMsg, + conninfo = #{clientid := ClientId} + } ) -> - %% For the cases that session MAY/MAY NOT be gone, we don't care about session expired or not. - %% willmsg publish could be defered. - IsSessionExpirationInProgress = maps:is_key(expire_session, Timers), - IsWillmsgScheduled = maps:is_key(will_message, Timers), + %% TAKEOVER [MQTT-3.1.4-3] + ?tp(debug, willmsg_takeover, #{clientid => ClientId}), case will_delay_interval(WillMsg) of 0 -> - %% [MQTT-3.1.2-8], 0 means will delay Will Delay Interval has elapsed - false = IsWillmsgScheduled, - ok = publish_will_msg(ClientInfo, WillMsg), - Channel#channel{will_msg = undefined}; - I when IsSessionExpirationInProgress andalso not IsWillmsgScheduled -> - %% We delay the will message publishing - %% Willmsg will be published whatever which timer fired first - ensure_timer(will_message, timer:seconds(I), Channel); - _ when IsSessionExpirationInProgress andalso IsWillmsgScheduled -> - %% Willmsg will be published whatever which timer fired first [MQTT-3.1.3-9]. - Channel; - _I when Reason =:= takenover -> - %% don't see the point to delay the willmsg - Channel; - _I when not IsSessionExpirationInProgress andalso IsWillmsgScheduled -> - Channel; - I when not IsSessionExpirationInProgress andalso not IsWillmsgScheduled -> - %% @FIXME: process may terminate before the timer fired + %% MQTT 5, Non-normative comment: + %% """" + %% If a Network Connection uses a Client Identifier of an existing Network Connection to the Server, + %% the Will Message for the exiting connection is sent unless the new connection specifies Clean Start + %% of 0 and the Will Delay is greater than zero. If the Will Delay is 0 the Will Message is sent at + %% the close of the existing Network Connection, and if Clean Start is 1 the Will Message is sent + %% because the Session ends. + %% """" + _ = publish_will_msg(ClientInfo, WillMsg); + I when I > 0 -> + %% @NOTE: We do not publish willmsg when EI > 0 but that does not mean we shall delay the willmsg publish + %% because the session is already takenover by another process. If we delay the willmsg publish, the willmsg + %% will be published as there is no chance to get it cancelled. This is not stated clearly in the MQTT spec. + skip + end, + remove_willmsg(Channel); +maybe_publish_will_msg( + {shutdown, _}, + Channel = #channel{ + conninfo = #{expiry_interval := 0, clientid := ClientId}, + clientinfo = ClientInfo, + will_msg = WillMsg + } +) -> + %% MQTT 5: 3.1.2.11.2 Session Expiry Interval + %% If the Session Expiry Interval is absent the value 0 is used. + %% If it is set to 0, or is absent, the Session ends when the Network Connection is closed. + %% Expire_interval == 0, means session is over at the time of calling with shutdown. + ?tp(debug, willmsg_takeover, #{clientid => ClientId}), + _ = publish_will_msg(ClientInfo, WillMsg), + remove_willmsg(Channel); +maybe_publish_will_msg( + Reason, + Channel = #channel{ + clientinfo = ClientInfo, + will_msg = WillMsg, + conninfo = #{clientid := ClientId} + } +) -> + %% Handles other Unknown Reasons. + case will_delay_interval(WillMsg) of + 0 -> + ?tp(debug, willmsg_other_publish, #{clientid => ClientId, reason => Reason}), + _ = publish_will_msg(ClientInfo, WillMsg), + remove_willmsg(Channel); + I when I > 0 -> + ?tp(debug, willmsg_other_delay, #{clientid => ClientId, reason => Reason}), ensure_timer(will_message, timer:seconds(I), Channel) end. @@ -2434,6 +2512,18 @@ get_mqtt_conf(Zone, Key) -> get_mqtt_conf(Zone, Key, Default) -> emqx_config:get_zone_conf(Zone, [mqtt, Key], Default). +-spec remove_willmsg(Old :: channel()) -> New :: channel(). +remove_willmsg(Channel = #channel{timers = Timers}) -> + case maps:get(will_message, Timers, undefined) of + undefined -> + Channel#channel{will_msg = undefined}; + DelayedWillTimer -> + ok = erlang:cancel_timer(DelayedWillTimer, [{async, true}, {info, false}]), + Channel#channel{ + will_msg = undefined, + timers = maps:remove(will_message, Timers) + } + end. %%-------------------------------------------------------------------- %% For CT tests %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index bce5d3761..25b32bd50 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -50,17 +50,22 @@ end_per_suite(Config) -> groups() -> [ - {mqttv3, [], - emqx_common_test_helpers:all(?MODULE) -- - [ - t_session_expire_with_delayed_willmsg, - t_no_takeover_with_delayed_willmsg, - t_takeover_before_session_expire, - t_takeover_before_willmsg_expire - ]}, + {mqttv3, [], emqx_common_test_helpers:all(?MODULE) -- tc_v5_only()}, {mqttv5, [], emqx_common_test_helpers:all(?MODULE)} ]. +tc_v5_only() -> + [ + t_session_expire_with_delayed_willmsg, + t_no_takeover_with_delayed_willmsg, + t_takeover_before_session_expire, + t_takeover_before_willmsg_expire, + t_takeover_before_session_expire_willdelay0, + t_takeover_session_then_normal_disconnect, + t_takeover_session_then_abnormal_disconnect, + t_takeover_session_then_abnormal_disconnect_2 + ]. + init_per_group(mqttv3, Config) -> lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v3}); init_per_group(mqttv5, Config) -> @@ -178,7 +183,6 @@ t_takeover_willmsg_clean_session(Config) -> Middle = ?CNT div 2, Client1Msgs = messages(ClientId, 0, Middle), Client2Msgs = messages(ClientId, Middle, ?CNT div 2), - AllMsgs = Client1Msgs ++ Client2Msgs, WillOpts = [ {proto_ver, ?config(mqtt_vsn, Config)}, {clean_start, false}, @@ -220,9 +224,7 @@ t_takeover_willmsg_clean_session(Config) -> Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_1">>), - {IsWill2, ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>), - assert_messages_missed(AllMsgs, ReceivedNoWill), - assert_messages_order(AllMsgs, ReceivedNoWill), + {IsWill2, _ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>), %% THEN: payload <<"willpayload_1">> should be published instead of <<"willpayload_2">> ?assert(IsWill1), ?assertNot(IsWill2), @@ -238,7 +240,6 @@ t_takeover_clean_session_with_delayed_willmsg(Config) -> Middle = ?CNT div 2, Client1Msgs = messages(ClientId, 0, Middle), Client2Msgs = messages(ClientId, Middle, ?CNT div 2), - AllMsgs = Client1Msgs ++ Client2Msgs, WillOpts = [ {proto_ver, ?config(mqtt_vsn, Config)}, {clean_start, false}, @@ -282,9 +283,7 @@ t_takeover_clean_session_with_delayed_willmsg(Config) -> Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay10">>), - {IsWill2, ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>), - assert_messages_missed(AllMsgs, ReceivedNoWill), - assert_messages_order(AllMsgs, ReceivedNoWill), + {IsWill2, _ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>), %% THEN: payload <<"willpayload_delay10">> should be published without delay. ?assert(IsWill1), ?assertNot(IsWill2), @@ -332,17 +331,19 @@ t_no_takeover_with_delayed_willmsg(Config) -> Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), assert_messages_missed(Client1Msgs, Received), + {IsWill0, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay3">>), + ?assertNot(IsWill0), + ?assertNotEqual([], ReceivedNoWill0), #{client := [CPidSub, CPid1]} = FCtx, %% WHEN: client disconnects abnormally AND no reconnect after 3s. exit(CPid1, kill), assert_client_exit(CPid1, ?config(mqtt_vsn, Config), killed), Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], - - {IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay3">>), - ?assertNot(IsWill), - ?assertEqual([], ReceivedNoWill), - %% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after WILL delay. + {IsWill1, ReceivedNoWill1} = filter_payload(Received1, <<"willpayload_delay3">>), + ?assertNot(IsWill1), + ?assertEqual([], ReceivedNoWill1), + %% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after WILL delay (3 secs). Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], {IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay3">>), ?assertEqual([], ReceivedNoWill11), @@ -388,6 +389,9 @@ t_session_expire_with_delayed_willmsg(Config) -> Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), + ?assertNot(IsWill), + ?assertNotEqual([], ReceivedNoWill), assert_messages_missed(Client1Msgs, Received), #{client := [CPidSub, CPid1]} = FCtx, %% WHEN: client disconnects abnormally AND no reconnect after 3s. @@ -395,25 +399,86 @@ t_session_expire_with_delayed_willmsg(Config) -> assert_client_exit(CPid1, ?config(mqtt_vsn, Config), killed), Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], - {IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay10">>), - ?assertNot(IsWill), - ?assertEqual([], ReceivedNoWill), + {IsWill1, ReceivedNoWill1} = filter_payload(Received1, <<"willpayload_delay10">>), + ?assertNot(IsWill1), + ?assertEqual([], ReceivedNoWill1), %% THEN: for MQTT v5, payload <<"willpayload_delay3">> should be published after session expiry. Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], - {IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay10">>), - ?assertEqual([], ReceivedNoWill11), - ?assert(IsWill11), + {IsWill12, ReceivedNoWill2} = filter_payload(Received2, <<"willpayload_delay10">>), + ?assertEqual([], ReceivedNoWill2), + ?assert(IsWill12), emqtt:stop(CPidSub), ?assert(not is_process_alive(CPid1)), ok. +%% @TODO 'Server-Keep-Alive' +%% t_no_takeover_keepalive_fired(Config) -> +%% ok. + +t_takeover_before_session_expire_willdelay0(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 0}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> + %% and delay-interval 10s session expiry 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + [ + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(1000), + CTX + end, + [] + } + ] ++ + %% WHEN: client session is taken over within 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), + ?assert(IsWill), + emqtt:stop(CPidSub), + emqtt:stop(CPid2), + ?assert(not is_process_alive(CPid1)), + ok. + t_takeover_before_session_expire(Config) -> ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), process_flag(trap_exit, true), ClientId = atom_to_binary(?FUNCTION_NAME), WillTopic = <>/binary>>, Client1Msgs = messages(ClientId, 0, 10), - emqx_logger:set_log_level(debug), WillOpts = [ {proto_ver, ?config(mqtt_vsn, Config)}, {clean_start, false}, @@ -443,7 +508,7 @@ t_takeover_before_session_expire(Config) -> [] } ] ++ - %% WHEN: client session is taken over with in 3s. + %% WHEN: client session is taken over within 3s. [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], FCtx = lists:foldl( @@ -460,29 +525,205 @@ t_takeover_before_session_expire(Config) -> Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), - assert_messages_missed(Client1Msgs, Received), - - Received1 = [Msg || {publish, Msg} <- ?drainMailbox(1000)], - {IsWill, ReceivedNoWill} = filter_payload(Received1, <<"willpayload_delay10">>), + {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), ?assertNot(IsWill), - ?assertEqual([], ReceivedNoWill), - %% THEN: for MQTT v5, payload <<"willpayload_delay10">> should NOT be published. - Received2 = [Msg || {publish, Msg} <- ?drainMailbox(5000)], - {IsWill11, ReceivedNoWill11} = filter_payload(Received2, <<"willpayload_delay10">>), - ?assertEqual([], ReceivedNoWill11), - ?assertNot(IsWill11), + ?assertNotEqual([], ReceivedNoWill), emqtt:stop(CPidSub), emqtt:stop(CPid2), ?assert(not is_process_alive(CPid1)), ok. +t_takeover_session_then_normal_disconnect(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 10}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + Commands = + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + [ + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + } + ] ++ + %% GIVEN: client reconnect with willmsg payload <<"willpayload_delay10">> + %% and delay-interval 10s > session expiry 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + %% WHEN: client disconnect normally. + emqtt:disconnect(CPid2, ?RC_SUCCESS), + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), + %% THEN: willmsg is not published. + ?assertNot(IsWill), + ?assertNotEqual([], ReceivedNoWill), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ?assert(not is_process_alive(CPid2)), + ok. + +t_takeover_session_then_abnormal_disconnect(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 10}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> + %% and delay-interval 10s, session expiry 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + [ + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + } + ] ++ + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + %% WHEN: client disconnect abnormally + emqtt:disconnect(CPid2, ?RC_DISCONNECT_WITH_WILL_MESSAGE), + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), + %% THEN: willmsg is published before session expiry + ?assertNot(IsWill), + ?assertNotEqual([], ReceivedNoWill), + Received1 = [Msg || {publish, Msg} <- ?drainMailbox(3000)], + {IsWill1, ReceivedNoWill1} = filter_payload(Received1, <<"willpayload_delay10">>), + %% AND THEN: willmsg is published after session expiry + ?assert(IsWill1), + ?assertEqual([], ReceivedNoWill1), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ?assert(not is_process_alive(CPid2)), + ok. + +t_takeover_session_then_abnormal_disconnect_2(Config) -> + ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Client1Msgs = messages(ClientId, 0, 10), + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay10">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 0}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + Commands = + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + [ + %% avoid two clients race for session takeover + { + fun(CTX) -> + timer:sleep(100), + CTX + end, + [] + } + ] ++ + %% GIVEN: client reconnect with willmsg payload <<"willpayload_delay10">> + %% and delay-interval 0s, session expiry 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), + %% WHEN: client disconnect abnormally + emqtt:disconnect(CPid2, ?RC_DISCONNECT_WITH_WILL_MESSAGE), + Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), + %% THEN: willmsg is published after session expiry + ?assert(IsWill), + ?assertNotEqual([], ReceivedNoWill), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ?assert(not is_process_alive(CPid2)), + ok. + t_takeover_before_willmsg_expire(Config) -> ?config(mqtt_vsn, Config) =:= v5 orelse ct:fail("MQTTv5 Only"), process_flag(trap_exit, true), ClientId = atom_to_binary(?FUNCTION_NAME), WillTopic = <>/binary>>, Client1Msgs = messages(ClientId, 0, 10), - emqx_logger:set_log_level(debug), WillOpts = [ {proto_ver, ?config(mqtt_vsn, Config)}, {clean_start, false}, @@ -590,8 +831,8 @@ t_kick_session(Config) -> Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), %% THEN: payload <<"willpayload_kick">> should be published - {IsWill1, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_kick">>), - ?assert(IsWill1), + {IsWill, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_kick">>), + ?assert(IsWill), emqtt:stop(CPidSub), ?assert(not is_process_alive(CPid1)), ok. From 2ff33f98efc44531da7fe6493564391d399b95cf Mon Sep 17 00:00:00 2001 From: William Yang Date: Tue, 21 Nov 2023 13:35:46 +0100 Subject: [PATCH 09/14] chore(willmsg): add come comments --- apps/emqx/src/emqx_channel.erl | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 296cce949..0c605b158 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -2248,23 +2248,26 @@ session_disconnect(_ClientInfo, _ConnInfo, undefined) -> %% - EMQX operations on the client %% @NOTE: %% Connection close with session expiry interval = 0 means session close. +%% @NOTE: +%% The caller does not need to take care of the case when process terminates while will_msg is published +%% as it is designed by the spec. -spec maybe_publish_will_msg(Reason, channel()) -> channel() when Reason :: - %% Connection will terminate because session is taken over by another process. + %% Connection is terminating because session is taken over by another process. takenover - %% Connection will terminate because of EMQX mgmt operation, also delete the session. + %% Connection is terminating because of EMQX mgmt operation, also delete the session. | kicked - %% Connection will terminate because session is taken over by another process. + %% Connection is terminating because session is taken over by another process. | discarded - %% Connection will terminate because session is expired + %% Connection is terminating because session is expired | expired - %% Connection will terminate because of socket close/error + %% Connection is terminating because of socket close/error | sock_closed - %% Connection will terminate with Reasons + %% Connection is terminating with Reasons | {shutdown, atom()} - %% Connection will terminate soon, delay willmsg publish is impossible. + %% Connection is terminating, delay willmsg publish is impossible. | ?chan_terminating - %% Connection will terminate because of normal MQTT disconnection, implies delete the session. + %% Connection is terminating because of normal MQTT disconnection, implies delete the session. | normal. maybe_publish_will_msg(normal, Channel) -> %% [MQTT-3.1.2-8] From 975c7429e5871559a2c73c05a9d89a2035d080a3 Mon Sep 17 00:00:00 2001 From: William Yang Date: Fri, 23 Feb 2024 15:00:05 +0100 Subject: [PATCH 10/14] chore: clean in tests --- apps/emqx/test/emqx_shared_sub_SUITE.erl | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index dc755e78c..14617d95f 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -923,7 +923,6 @@ t_session_kicked({init, Config}) when is_list(Config) -> t_session_kicked({'end', Config}) when is_list(Config) -> emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0); t_session_kicked(Config) when is_list(Config) -> - emqx_logger:set_log_level(debug), Topic = <<"foo/bar/1">>, ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, From d5247cb567aa5f338922c5ee469c182696f601f9 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 26 Feb 2024 16:34:10 +0100 Subject: [PATCH 11/14] refactor: update notes for willmsg --- apps/emqx/src/emqx_channel.erl | 75 ++++++++++++++------------ apps/emqx/test/emqx_takeover_SUITE.erl | 39 +++++++++----- 2 files changed, 67 insertions(+), 47 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 0c605b158..cf9f67f39 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -2241,8 +2241,7 @@ session_disconnect(_ClientInfo, _ConnInfo, undefined) -> %% When willmsg presents the decision whether or when to publish the Will Message are effected by %% the followings: %% - connecion state -%% - If it is MQTT normal disconnection (RC: 0) -%% - If it is MQTT normal disconnection (RC: 4) +%% - If it is MQTT normal disconnection (RC: 0) or abnormal (RC != 0) from the *client* %% - will delay interval (MQTT 5.0 only) %% - session expire Session Expiry (MQTT 5.0 only) %% - EMQX operations on the client @@ -2251,27 +2250,23 @@ session_disconnect(_ClientInfo, _ConnInfo, undefined) -> %% @NOTE: %% The caller does not need to take care of the case when process terminates while will_msg is published %% as it is designed by the spec. +%% @NOTE: +%% this function should be safe to be called multiple times in the life time of the connecion process, the willmsg +%% must be delete from the state if it is published or cleared. -spec maybe_publish_will_msg(Reason, channel()) -> channel() when Reason :: %% Connection is terminating because session is taken over by another process. takenover - %% Connection is terminating because of EMQX mgmt operation, also delete the session. + %% Connection is terminating because of EMQX mgmt operation, the session state is deleted with none-zero RC code | kicked - %% Connection is terminating because session is taken over by another process. + %% Connection is terminating because of client clean start new session. | discarded %% Connection is terminating because session is expired | expired %% Connection is terminating because of socket close/error | sock_closed - %% Connection is terminating with Reasons - | {shutdown, atom()} - %% Connection is terminating, delay willmsg publish is impossible. - | ?chan_terminating - %% Connection is terminating because of normal MQTT disconnection, implies delete the session. - | normal. -maybe_publish_will_msg(normal, Channel) -> - %% [MQTT-3.1.2-8] - Channel; + %% Session is terminating, delay willmsg publish is impossible. + | ?chan_terminating. maybe_publish_will_msg(_Reason, Channel = #channel{will_msg = undefined}) -> %% No will message to publish Channel; @@ -2286,7 +2281,7 @@ maybe_publish_will_msg( ConnState =:= connecting orelse ConnState =:= reauthenticating -> - %% Wrong state to publish + %% Wrong state to publish, they are intermediate state ?tp(debug, willmsg_wrong_state, #{clientid => ClientId}), Channel; maybe_publish_will_msg( @@ -2296,7 +2291,7 @@ maybe_publish_will_msg( } ) -> %% Unconditionally publish will message for MQTT 3.1.1 - ?tp(debug, willmsg_v3, #{clientid => ClientId}), + ?tp(debug, maybe_publish_willmsg_v3, #{clientid => ClientId}), _ = publish_will_msg(Channel#channel.clientinfo, WillMsg), Channel#channel{will_msg = undefined}; maybe_publish_will_msg( @@ -2316,12 +2311,12 @@ maybe_publish_will_msg( -> %% For the cases that session MUST be gone impiles that the will message MUST be published %% a. expired (session expired) - %% b. discarded (Session ends because another process starts new session with the same clientid) - %% c. kicked. (kicked by operation) + %% b. discarded (Session ends because of clean start) + %% c. kicked. (kicked by operation, abnormal conn close) %% d. internal_error (maybe not recoverable) %% This ensures willmsg will be published if the willmsg timer is scheduled but not fired %% OR fired but not yet handled - ?tp(debug, willmsg_session_ends, #{clientid => ClientId}), + ?tp(debug, maybe_publish_willmsg_session_ends, #{clientid => ClientId, reason => Reason}), _ = publish_will_msg(ClientInfo, WillMsg), remove_willmsg(Channel); maybe_publish_will_msg( @@ -2333,24 +2328,30 @@ maybe_publish_will_msg( } ) -> %% TAKEOVER [MQTT-3.1.4-3] - ?tp(debug, willmsg_takeover, #{clientid => ClientId}), + %% MQTT 5, Non-normative comment: + %% """" + %% If a Network Connection uses a Client Identifier of an existing Network Connection to the Server, + %% the Will Message for the exiting connection is sent unless the new connection specifies Clean Start + %% of 0 and the Will Delay is greater than zero. If the Will Delay is 0 the Will Message is sent at + %% the close of the existing Network Connection, and if Clean Start is 1 the Will Message is sent + %% because the Session ends. + %% """" + %% NOTE, above clean start=1 is `discard' scenarios not `takeover' scenario. case will_delay_interval(WillMsg) of 0 -> - %% MQTT 5, Non-normative comment: - %% """" - %% If a Network Connection uses a Client Identifier of an existing Network Connection to the Server, - %% the Will Message for the exiting connection is sent unless the new connection specifies Clean Start - %% of 0 and the Will Delay is greater than zero. If the Will Delay is 0 the Will Message is sent at - %% the close of the existing Network Connection, and if Clean Start is 1 the Will Message is sent - %% because the Session ends. - %% """" + ?tp(debug, maybe_publish_willmsg_takenover_pub, #{clientid => ClientId}), _ = publish_will_msg(ClientInfo, WillMsg); I when I > 0 -> - %% @NOTE: We do not publish willmsg when EI > 0 but that does not mean we shall delay the willmsg publish - %% because the session is already takenover by another process. If we delay the willmsg publish, the willmsg - %% will be published as there is no chance to get it cancelled. This is not stated clearly in the MQTT spec. + %% @NOTE Non-normative comment in MQTT 5.0 spec + %% """ + %% One use of this is to avoid publishing Will Messages if there is a temporary network + %% disconnection and the Client succeeds in reconnecting and continuing its Session + %% before the Will Message is published. + %% """ + ?tp(debug, maybe_publish_willmsg_takenover_skip, #{clientid => ClientId}), skip end, + %% Maybe we should cancel first then send remove_willmsg(Channel); maybe_publish_will_msg( {shutdown, _}, @@ -2363,9 +2364,10 @@ maybe_publish_will_msg( %% MQTT 5: 3.1.2.11.2 Session Expiry Interval %% If the Session Expiry Interval is absent the value 0 is used. %% If it is set to 0, or is absent, the Session ends when the Network Connection is closed. - %% Expire_interval == 0, means session is over at the time of calling with shutdown. - ?tp(debug, willmsg_takeover, #{clientid => ClientId}), + %% Expire_interval == 0, means session is end at the time of calling with shutdown. + ?tp(debug, maybe_publish_will_msg_shutdown, #{clientid => ClientId}), _ = publish_will_msg(ClientInfo, WillMsg), + %% Maybe we should cancel first then send remove_willmsg(Channel); maybe_publish_will_msg( Reason, @@ -2375,14 +2377,17 @@ maybe_publish_will_msg( conninfo = #{clientid := ClientId} } ) -> - %% Handles other Unknown Reasons. + %% Default to handle other reasons case will_delay_interval(WillMsg) of 0 -> - ?tp(debug, willmsg_other_publish, #{clientid => ClientId, reason => Reason}), + ?tp(debug, maybe_publish_will_msg_other_publish, #{ + clientid => ClientId, reason => Reason + }), _ = publish_will_msg(ClientInfo, WillMsg), + %% Maybe we should cancel first then send remove_willmsg(Channel); I when I > 0 -> - ?tp(debug, willmsg_other_delay, #{clientid => ClientId, reason => Reason}), + ?tp(debug, maybe_publish_will_msg_other_delay, #{clientid => ClientId, reason => Reason}), ensure_timer(will_message, timer:seconds(I), Channel) end. diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index 25b32bd50..4575634a9 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -41,6 +41,7 @@ init_per_suite(Config) -> [emqx], #{work_dir => emqx_cth_suite:work_dir(Config)} ), + emqx_logger:set_log_level(debug), [{apps, Apps} | Config]. end_per_suite(Config) -> @@ -284,7 +285,7 @@ t_takeover_clean_session_with_delayed_willmsg(Config) -> ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_delay10">>), {IsWill2, _ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>), - %% THEN: payload <<"willpayload_delay10">> should be published without delay. + %% THEN: payload <<"willpayload_delay10">> should be published without delay ?assert(IsWill1), ?assertNot(IsWill2), emqtt:stop(CPid2), @@ -432,7 +433,7 @@ t_takeover_before_session_expire_willdelay0(Config) -> ], Commands = %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> - %% and delay-interval 10s session expiry 3s. + %% and delay-interval 0s session expiry 3s. [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ [ {fun start_client/5, [ @@ -467,6 +468,7 @@ t_takeover_before_session_expire_willdelay0(Config) -> Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill, _ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), + %% THEN: willmsg is published ?assert(IsWill), emqtt:stop(CPidSub), emqtt:stop(CPid2), @@ -526,6 +528,7 @@ t_takeover_before_session_expire(Config) -> Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), + %% THEN: No Willmsg is published ?assertNot(IsWill), ?assertNotEqual([], ReceivedNoWill), emqtt:stop(CPidSub), @@ -610,7 +613,7 @@ t_takeover_session_then_abnormal_disconnect(Config) -> ], Commands = %% GIVEN: client connect with willmsg payload <<"willpayload_delay10">> - %% and delay-interval 10s, session expiry 3s. + %% and will-delay-interval 10s > session expiry 3s. [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ [ {fun start_client/5, [ @@ -645,7 +648,7 @@ t_takeover_session_then_abnormal_disconnect(Config) -> Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), - %% THEN: willmsg is published before session expiry + %% THEN: willmsg is not published before session expiry ?assertNot(IsWill), ?assertNotEqual([], ReceivedNoWill), Received1 = [Msg || {publish, Msg} <- ?drainMailbox(3000)], @@ -668,7 +671,16 @@ t_takeover_session_then_abnormal_disconnect_2(Config) -> {proto_ver, ?config(mqtt_vsn, Config)}, {clean_start, false}, {will_topic, WillTopic}, - {will_payload, <<"willpayload_delay10">>}, + {will_payload, <<"willpayload_delay1">>}, + {will_qos, 1}, + {will_props, #{'Will-Delay-Interval' => 1}}, + {properties, #{'Session-Expiry-Interval' => 3}} + ], + WillOpts2 = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_delay2">>}, {will_qos, 1}, {will_props, #{'Will-Delay-Interval' => 0}}, {properties, #{'Session-Expiry-Interval' => 3}} @@ -691,9 +703,9 @@ t_takeover_session_then_abnormal_disconnect_2(Config) -> [] } ] ++ - %% GIVEN: client reconnect with willmsg payload <<"willpayload_delay10">> - %% and delay-interval 0s, session expiry 3s. - [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}], + %% GIVEN: client *reconnect* with willmsg payload <<"willpayload_delay2">> + %% and will-delay-interval 0s, session expiry 3s. + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts2]}], FCtx = lists:foldl( fun({Fun, Args}, Ctx) -> @@ -707,12 +719,15 @@ t_takeover_session_then_abnormal_disconnect_2(Config) -> assert_client_exit(CPid1, ?config(mqtt_vsn, Config), takenover), %% WHEN: client disconnect abnormally emqtt:disconnect(CPid2, ?RC_DISCONNECT_WITH_WILL_MESSAGE), - Received = [Msg || {publish, Msg} <- ?drainMailbox(1000)], + Received = [Msg || {publish, Msg} <- ?drainMailbox(2000)], ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), - {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay10">>), - %% THEN: willmsg is published after session expiry - ?assert(IsWill), + {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload_delay1">>), + %% THEN: willmsg1 of old conn is not published because will-delay-interval > 0 + ?assertNot(IsWill), ?assertNotEqual([], ReceivedNoWill), + %% THEN: willmsg1 is published because will-delay-interval is 0 + {IsWill2, _} = filter_payload(Received, <<"willpayload_delay2">>), + ?assert(IsWill2), emqtt:stop(CPidSub), ?assert(not is_process_alive(CPid1)), ?assert(not is_process_alive(CPid2)), From 88fc67b369b910ce8bc5bebbf17ee62ef1ad402f Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 26 Feb 2024 17:59:05 +0100 Subject: [PATCH 12/14] chore(willmsg): remove unreachable code --- apps/emqx/src/emqx_channel.erl | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index cf9f67f39..d238a78ca 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -2353,22 +2353,6 @@ maybe_publish_will_msg( end, %% Maybe we should cancel first then send remove_willmsg(Channel); -maybe_publish_will_msg( - {shutdown, _}, - Channel = #channel{ - conninfo = #{expiry_interval := 0, clientid := ClientId}, - clientinfo = ClientInfo, - will_msg = WillMsg - } -) -> - %% MQTT 5: 3.1.2.11.2 Session Expiry Interval - %% If the Session Expiry Interval is absent the value 0 is used. - %% If it is set to 0, or is absent, the Session ends when the Network Connection is closed. - %% Expire_interval == 0, means session is end at the time of calling with shutdown. - ?tp(debug, maybe_publish_will_msg_shutdown, #{clientid => ClientId}), - _ = publish_will_msg(ClientInfo, WillMsg), - %% Maybe we should cancel first then send - remove_willmsg(Channel); maybe_publish_will_msg( Reason, Channel = #channel{ From c8f9ffdc1f081d2410845c82fd8ea61f01ce487a Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 26 Feb 2024 21:51:34 +0100 Subject: [PATCH 13/14] chore: update some comments --- apps/emqx/src/emqx_channel.erl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index d238a78ca..51b66f4f9 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -2237,7 +2237,7 @@ session_disconnect(_ClientInfo, _ConnInfo, undefined) -> %%-------------------------------------------------------------------- %% Maybe Publish will msg -%% @doc May publish will message [MQTT-3.1.2-8] +%% @doc Maybe publish will message [MQTT-3.1.2-8] %% When willmsg presents the decision whether or when to publish the Will Message are effected by %% the followings: %% - connecion state @@ -2351,7 +2351,6 @@ maybe_publish_will_msg( ?tp(debug, maybe_publish_willmsg_takenover_skip, #{clientid => ClientId}), skip end, - %% Maybe we should cancel first then send remove_willmsg(Channel); maybe_publish_will_msg( Reason, @@ -2368,7 +2367,6 @@ maybe_publish_will_msg( clientid => ClientId, reason => Reason }), _ = publish_will_msg(ClientInfo, WillMsg), - %% Maybe we should cancel first then send remove_willmsg(Channel); I when I > 0 -> ?tp(debug, maybe_publish_will_msg_other_delay, #{clientid => ClientId, reason => Reason}), @@ -2504,6 +2502,7 @@ get_mqtt_conf(Zone, Key) -> get_mqtt_conf(Zone, Key, Default) -> emqx_config:get_zone_conf(Zone, [mqtt, Key], Default). +%% @doc unset will_msg and cancel the will_message timer -spec remove_willmsg(Old :: channel()) -> New :: channel(). remove_willmsg(Channel = #channel{timers = Timers}) -> case maps:get(will_message, Timers, undefined) of From 6c7b7741d530b2dcaf8f33265c222dca9632cae2 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 26 Feb 2024 23:05:49 +0100 Subject: [PATCH 14/14] feat(quic): mqtt 5.0 graceful shutdown in takeover --- apps/emqx/src/emqx_quic_stream.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index 09244d67f..f27009888 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -149,7 +149,8 @@ fast_close({quic, _Conn, Stream, _Info}) -> ok. shutdown({quic, _Conn, Stream, _Info}, read_write) -> - quicer:async_shutdown_stream(Stream). + %% A graceful shutdown means both side shutdown the read and write gracefully. + quicer:shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 1, 5000). -spec ensure_ok_or_exit(atom(), list(term())) -> term(). ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) ->