From 9dc1e162fa633b14bcb2dc25c979bc67fbd5c814 Mon Sep 17 00:00:00 2001 From: Turtle Date: Thu, 6 May 2021 18:38:55 +0800 Subject: [PATCH] fix(sn): fix clean_session=false willmsgs not sent --- apps/emqx_sn/src/emqx_sn_gateway.erl | 19 +++++---- apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl | 44 ++++++++++++++++++++ 2 files changed, 54 insertions(+), 9 deletions(-) diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index f24860e6b..bfb2f28df 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -610,8 +610,9 @@ handle_call(_From, Req, State = #state{channel = Channel}) -> stop(Reason, Reply, State#state{channel = NChannel}) end. -handle_info(Info, State = #state{channel = Channel}) -> - handle_return(emqx_channel:handle_info(Info, Channel), State). +handle_info({sock_closed, Reason} = Info, State = #state{channel = Channel}) -> + maybe_send_will_msg(Reason, State), + handle_return(emqx_channel:handle_info(Info, Channel), State). handle_timeout(TRef, TMsg, State = #state{channel = Channel}) -> handle_return(emqx_channel:handle_timeout(TRef, TMsg, Channel), State). @@ -782,21 +783,21 @@ stop({shutdown, Reason}, State) -> stop(Reason, State); stop(Reason, State) -> ?LOG(stop_log_level(Reason), "stop due to ~p", [Reason]), - case Reason of - %% FIXME: The Will-Msg should publish when a Session terminated! - Reason when Reason =:= normal -> - ok; - _ -> - do_publish_will(State) - end, + maybe_send_will_msg(Reason, State), {stop, {shutdown, Reason}, State}. stop({shutdown, Reason}, Reply, State) -> stop(Reason, Reply, State); stop(Reason, Reply, State) -> ?LOG(stop_log_level(Reason), "stop due to ~p", [Reason]), + maybe_send_will_msg(Reason, State), {stop, {shutdown, Reason}, Reply, State}. +maybe_send_will_msg(normal, _State) -> + ok; +maybe_send_will_msg(_Reason, State) -> + do_publish_will(State). + stop_log_level(Reason) when ?is_non_error_reason(Reason) -> debug; stop_log_level(_) -> diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl index 746d0f4a1..ad0c5f032 100644 --- a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl @@ -941,6 +941,41 @@ t_will_test5(_) -> gen_udp:close(Socket). +t_will_case06(_) -> + QoS = 1, + Duration = 1, + WillMsg = <<10, 11, 12, 13, 14>>, + WillTopic = <<"abc">>, + {ok, Socket} = gen_udp:open(0, [binary]), + ClientId = <<"test">>, + + ok = emqx_broker:subscribe(WillTopic), + + send_connect_msg_with_will1(Socket, Duration, ClientId), + ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), + + send_willtopic_msg(Socket, WillTopic, QoS), + ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), + + send_willmsg_msg(Socket, WillMsg), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + + send_pingreq_msg(Socket, undefined), + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), + + % wait udp client keepalive timeout + timer:sleep(2000), + + receive + {deliver, WillTopic, #message{payload = WillMsg}} -> ok; + Msg -> ct:print("recevived --- unex: ~p", [Msg]) + after + 1000 -> ct:fail(wait_willmsg_timeout) + end, + send_disconnect_msg(Socket, undefined), + + gen_udp:close(Socket). + t_asleep_test01_timeout(_) -> QoS = 1, Duration = 1, @@ -1564,6 +1599,15 @@ send_connect_msg_with_will(Socket, Duration, ClientId) -> ?FNU:2, ProtocolId:8, Duration:16, ClientId/binary>>, ok = gen_udp:send(Socket, ?HOST, ?PORT, ConnectPacket). +send_connect_msg_with_will1(Socket, Duration, ClientId) -> + Length = 10, + Will = 1, + CleanSession = 0, + ProtocolId = 1, + ConnectPacket = <>, + ok = gen_udp:send(Socket, ?HOST, ?PORT, ConnectPacket). + send_willtopic_msg(Socket, Topic, QoS) -> Length = 3+byte_size(Topic), MsgType = ?SN_WILLTOPIC,