diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 192335a25..7b23121c5 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1134,16 +1134,14 @@ handle_call( kick, Channel = #channel{ conn_state = ConnState, - will_msg = WillMsg, - clientinfo = ClientInfo, conninfo = #{proto_ver := ProtoVer} } ) -> - (WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg), + Channel0 = maybe_publish_will_msg(Channel), Channel1 = case ConnState of - connected -> ensure_disconnected(kicked, Channel); - _ -> Channel + connected -> ensure_disconnected(kicked, Channel0); + _ -> Channel0 end, case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of true -> @@ -1426,17 +1424,9 @@ terminate(normal, Channel) -> run_terminate_hook(normal, Channel); terminate({shutdown, kicked}, Channel) -> run_terminate_hook(kicked, Channel); -terminate({shutdown, Reason}, Channel) when - Reason =:= discarded; - Reason =:= takenover --> - run_terminate_hook(Reason, Channel); -terminate(Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) -> - %% since will_msg is set to undefined as soon as it is published, - %% if will_msg still exists when the session is terminated, it - %% must be published immediately. - WillMsg =/= undefined andalso publish_will_msg(ClientInfo, WillMsg), - run_terminate_hook(Reason, Channel). +terminate(Reason, Channel) -> + Channel1 = maybe_publish_will_msg(Channel), + run_terminate_hook(Reason, Channel1). run_terminate_hook(_Reason, #channel{session = undefined}) -> ok; diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 96e4f54c7..c4671bc32 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -728,7 +728,8 @@ handle_call(_From, Req, State = #state{channel = Channel}) -> {shutdown, Reason, Reply, OutPacket, NChannel} -> NState = State#state{channel = NChannel}, ok = handle_outgoing(OutPacket, NState), - shutdown(Reason, Reply, NState) + NState2 = graceful_shutdown_transport(Reason, NState), + shutdown(Reason, Reply, NState2) end. %%-------------------------------------------------------------------- @@ -1234,6 +1235,12 @@ set_tcp_keepalive({Type, Id}) -> async_set_keepalive(Idle, Interval, Probes) end. +-spec graceful_shutdown_transport(atom(), state()) -> state(). +graceful_shutdown_transport(_Reason, S = #state{transport = Transport, socket = Socket}) -> + %% @TODO Reason is reserved for future use, quic transport + Transport:shutdown(Socket, read_write), + S#state{sockstate = closed}. + %%-------------------------------------------------------------------- %% For CT tests %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index 42f153b28..09244d67f 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -32,6 +32,7 @@ wait/1, getstat/2, fast_close/1, + shutdown/2, ensure_ok_or_exit/2, async_send/3, setopts/2, @@ -147,6 +148,9 @@ fast_close({quic, _Conn, Stream, _Info}) -> % quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), ok. +shutdown({quic, _Conn, Stream, _Info}, read_write) -> + quicer:async_shutdown_stream(Stream). + -spec ensure_ok_or_exit(atom(), list(term())) -> term(). ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) -> case erlang:apply(?MODULE, Fun, Args) of diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index 98d4e3102..74038a8fc 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -32,6 +32,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> %% Meck Transport ok = meck:new(emqx_transport, [non_strict, passthrough, no_history, no_link]), + ok = meck:expect(emqx_transport, shutdown, fun(_, _) -> ok end), %% Meck Channel ok = meck:new(emqx_channel, [passthrough, no_history, no_link]), %% Meck Cm diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index b732735bd..3f50db6e3 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -908,6 +908,8 @@ t_session_takeover(Config) when is_list(Config) -> ?assertMatch([_], emqx:publish(Message3)), ?assertMatch([_], emqx:publish(Message4)), {true, _} = last_message(<<"hello2">>, [ConnPid2]), + %% We may or may not recv dup hello2 due to QoS1 redelivery + _ = last_message(<<"hello2">>, [ConnPid2]), {true, _} = last_message(<<"hello3">>, [ConnPid2]), {true, _} = last_message(<<"hello4">>, [ConnPid2]), ?assertEqual([], collect_msgs(timer:seconds(2))), diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index 03a48e174..2eda29e5d 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -24,14 +24,17 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --define(TOPIC, <<"t">>). -define(CNT, 100). -define(SLEEP, 10). %%-------------------------------------------------------------------- %% Initial funcs -all() -> emqx_common_test_helpers:all(?MODULE). +all() -> + [ + {group, mqttv3}, + {group, mqttv5} + ]. init_per_suite(Config) -> Apps = emqx_cth_suite:start( @@ -44,27 +47,39 @@ end_per_suite(Config) -> Apps = ?config(apps, Config), ok = emqx_cth_suite:stop(Apps), ok. + +groups() -> + [ + {mqttv3, [], emqx_common_test_helpers:all(?MODULE)}, + {mqttv5, [], emqx_common_test_helpers:all(?MODULE)} + ]. + +init_per_group(mqttv3, Config) -> + lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v3}); +init_per_group(mqttv5, Config) -> + lists:keystore(mqtt_vsn, 1, Config, {mqtt_vsn, v5}). + +end_per_group(_Group, _Config) -> + ok. + %%-------------------------------------------------------------------- %% Testcases -t_takeover(_) -> +t_takeover(Config) -> process_flag(trap_exit, true), - ClientId = <<"clientid">>, + ClientId = atom_to_binary(?FUNCTION_NAME), + ClientOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false} + ], Middle = ?CNT div 2, - Client1Msgs = messages(0, Middle), - Client2Msgs = messages(Middle, ?CNT div 2), + Client1Msgs = messages(ClientId, 0, Middle), + Client2Msgs = messages(ClientId, Middle, ?CNT div 2), AllMsgs = Client1Msgs ++ Client2Msgs, - - meck:new(emqx_cm, [non_strict, passthrough]), - meck:expect(emqx_cm, takeover_session_end, fun(Arg) -> - ok = timer:sleep(?SLEEP * 2), - meck:passthrough([Arg]) - end), - Commands = - [{fun start_client/4, [ClientId, <<"t">>, ?QOS_1]}] ++ + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}] ++ [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ - [{fun start_client/4, [ClientId, <<"t">>, ?QOS_1]}] ++ + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, ClientOpts]}] ++ [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] ++ [{fun stop_client/1, []}], @@ -78,30 +93,144 @@ t_takeover(_) -> ), #{client := [CPid2, CPid1]} = FCtx, - ?assertReceive({'EXIT', CPid1, {disconnected, ?RC_SESSION_TAKEN_OVER, _}}), - ?assertReceive({'EXIT', CPid2, normal}), + assert_client_exit(CPid1, ?config(mqtt_vsn, Config)), + ?assertReceive({'EXIT', CPid2, normal}), Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], ct:pal("middle: ~p", [Middle]), ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), assert_messages_missed(AllMsgs, Received), assert_messages_order(AllMsgs, Received), - - meck:unload(emqx_cm), ok. -t_takover_in_cluster(_) -> - todo. +t_takeover_willmsg(Config) -> + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Middle = ?CNT div 2, + Client1Msgs = messages(ClientId, 0, Middle), + Client2Msgs = messages(ClientId, Middle, ?CNT div 2), + AllMsgs = Client1Msgs ++ Client2Msgs, + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload">>}, + {will_qos, 0} + ], + Commands = + %% GIVEN client connect with will message + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + %% WHEN client reconnect with clean_start = false + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config)), + Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill, ReceivedNoWill} = filter_payload(Received, <<"willpayload">>), + assert_messages_missed(AllMsgs, ReceivedNoWill), + assert_messages_order(AllMsgs, ReceivedNoWill), + %% THEN will message should be received + ?assert(IsWill), + emqtt:stop(CPidSub), + emqtt:stop(CPid2), + ?assertReceive({'EXIT', CPid2, normal}), + ?assert(not is_process_alive(CPid1)), + ok. + +t_takeover_willmsg_clean_session(Config) -> + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + WillTopic = <>/binary>>, + Middle = ?CNT div 2, + Client1Msgs = messages(ClientId, 0, Middle), + Client2Msgs = messages(ClientId, Middle, ?CNT div 2), + AllMsgs = Client1Msgs ++ Client2Msgs, + WillOpts = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, false}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_1">>}, + {will_qos, 1} + ], + WillOptsClean = [ + {proto_ver, ?config(mqtt_vsn, Config)}, + {clean_start, true}, + {will_topic, WillTopic}, + {will_payload, <<"willpayload_2">>}, + {will_qos, 1} + ], + + Commands = + %% GIVEN: client connect with willmsg payload <<"willpayload_1">> + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOpts]}] ++ + [ + {fun start_client/5, [ + <>/binary>>, WillTopic, ?QOS_1, [] + ]} + ] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client1Msgs] ++ + %% WHEN: client connects with clean_start=true and willmsg payload <<"willpayload_2">> + [{fun start_client/5, [ClientId, ClientId, ?QOS_1, WillOptsClean]}] ++ + [{fun publish_msg/2, [Msg]} || Msg <- Client2Msgs] ++ + [ + { + fun(CTX) -> + timer:sleep(1000), + CTX + end, + [] + } + ], + + FCtx = lists:foldl( + fun({Fun, Args}, Ctx) -> + ct:pal("COMMAND: ~p ~p", [element(2, erlang:fun_info(Fun, name)), Args]), + apply(Fun, [Ctx | Args]) + end, + #{}, + Commands + ), + #{client := [CPid2, CPidSub, CPid1]} = FCtx, + assert_client_exit(CPid1, ?config(mqtt_vsn, Config)), + Received = [Msg || {publish, Msg} <- ?drainMailbox(?SLEEP)], + ct:pal("received: ~p", [[P || #{payload := P} <- Received]]), + {IsWill1, ReceivedNoWill0} = filter_payload(Received, <<"willpayload_1">>), + {IsWill2, ReceivedNoWill} = filter_payload(ReceivedNoWill0, <<"willpayload_2">>), + assert_messages_missed(AllMsgs, ReceivedNoWill), + assert_messages_order(AllMsgs, ReceivedNoWill), + %% THEN: payload <<"willpayload_1">> should be published instead of <<"willpayload_2">> + ?assert(IsWill1), + ?assertNot(IsWill2), + emqtt:stop(CPid2), + emqtt:stop(CPidSub), + ?assert(not is_process_alive(CPid1)), + ok. + +%% t_takover_in_cluster(_) -> +%% todo. %%-------------------------------------------------------------------- %% Commands - -start_client(Ctx, ClientId, Topic, Qos) -> - {ok, CPid} = emqtt:start_link([ - {clientid, ClientId}, - {proto_ver, v5}, - {clean_start, false} - ]), +start_client(Ctx, ClientId, Topic, Qos, Opts) -> + {ok, CPid} = emqtt:start_link([{clientid, ClientId} | Opts]), _ = erlang:spawn_link(fun() -> {ok, _} = emqtt:connect(CPid), ct:pal("CLIENT: connected ~p", [CPid]), @@ -157,8 +286,8 @@ assert_messages_order([Msg | Expected], Received) -> assert_messages_order(Expected, Rest) end. -messages(Offset, Cnt) -> - [emqx_message:make(ct, ?QOS_1, ?TOPIC, payload(Offset + I)) || I <- lists:seq(1, Cnt)]. +messages(Topic, Offset, Cnt) -> + [emqx_message:make(ct, ?QOS_1, Topic, payload(Offset + I)) || I <- lists:seq(1, Cnt)]. payload(I) -> % NOTE @@ -170,3 +299,30 @@ payload(I) -> emqx_utils_calendar:now_to_rfc3339(millisecond) ]) ). + +%% @doc Filter out the message with matching target payload from the list of messages. +%% return '{IsTargetFound, ListOfOtherMessages}' +%% @end +-spec filter_payload(List :: [#{payload := binary()}], Payload :: binary()) -> + {IsPayloadFound :: boolean(), OtherPayloads :: [#{payload := binary()}]}. +filter_payload(List, Payload) when is_binary(Payload) -> + IsWill = lists:any( + fun + (#{payload := P}) -> Payload == P; + (_) -> false + end, + List + ), + Filtered = [ + Msg + || #{payload := P} = Msg <- List, + P =/= Payload + ], + {IsWill, Filtered}. + +%% @doc assert emqtt *client* process exits as expected. +assert_client_exit(Pid, v5) -> + %% @ref: MQTT 5.0 spec [MQTT-3.1.4-3] + ?assertReceive({'EXIT', Pid, {disconnected, ?RC_SESSION_TAKEN_OVER, _}}); +assert_client_exit(Pid, v3) -> + ?assertReceive({'EXIT', Pid, {shutdown, tcp_closed}}). diff --git a/changes/ce/fix-11868.en.md b/changes/ce/fix-11868.en.md new file mode 100644 index 000000000..da0a09dff --- /dev/null +++ b/changes/ce/fix-11868.en.md @@ -0,0 +1,2 @@ +Fix a bug that willmsg is not published after session takeover. +