From 8d3e953eefc303d9d4d515ef6a88e64c2a2e02c8 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 15 Mar 2022 11:55:47 +0800 Subject: [PATCH 1/4] fix(channel): send DISCONNECT packet if connection has been kicked fix #7241 --- src/emqx_channel.erl | 8 +++++++- test/emqx_channel_SUITE.erl | 15 ++++++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 32f0062ea..81be012c5 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -926,7 +926,13 @@ return_sub_unsub_ack(Packet, Channel) -> | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}). handle_call(kick, Channel) -> Channel1 = ensure_disconnected(kicked, Channel), - disconnect_and_shutdown(kicked, ok, Channel1); + case Channel1 of + ?IS_MQTT_V5 -> + shutdown(kicked, ok, + ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), Channel1); + _ -> + shutdown(kicked, ok, Channel1) + end; handle_call(discard, Channel) -> disconnect_and_shutdown(discarded, ok, Channel); diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index fd7da9f20..fb7453e05 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -577,7 +577,12 @@ t_handle_out_unexpected(_) -> %%-------------------------------------------------------------------- t_handle_call_kick(_) -> - {shutdown, kicked, ok, _Chan} = emqx_channel:handle_call(kick, channel()). + Channelv5 = channel(), + Channelv4 = v4(Channelv5), + {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, Channelv4), + {shutdown, kicked, ok, + ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), + _} = emqx_channel:handle_call(kick, Channelv5). t_handle_call_discard(_) -> Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), @@ -858,3 +863,11 @@ session(InitFields) when is_map(InitFields) -> quota() -> emqx_limiter:init(zone, [{conn_messages_routing, {5, 1}}, {overall_messages_routing, {10, 1}}]). + +v4(Channel) -> + ConnInfo = emqx_channel:info(conninfo, Channel), + emqx_channel:set_field( + conninfo, + maps:put(proto_ver, ?MQTT_PROTO_V4, ConnInfo), + Channel + ). From 75239c13887256760e2470248e36851340d19696 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 15 Mar 2022 12:52:09 +0800 Subject: [PATCH 2/4] chore: update changes-4.3.md --- CHANGES-4.3.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index a2d7f4edc..a7e161915 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -49,6 +49,7 @@ File format: * Fix errno=13 'Permission denied' Cannot create FIFO boot error in Amazon Linux 2022 (el8 package) * Fix user or appid created, name only allow `^[A-Za-z]+[A-Za-z0-9-_]*$` * Fix subscribe http api crash by bad_qos `/mqtt/subscribe`,`/mqtt/subscribe_batch`. +* Send DISCONNECT packet with reason code 0x98 if connection has been kicked [#7309] ## v4.3.12 ### Important changes From 604c384660490b292e8b6a293cede4e928bbafd0 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 15 Mar 2022 18:52:45 +0800 Subject: [PATCH 3/4] fix(channel): send will_msg if client has been kicked --- src/emqx_channel.erl | 26 +++++++++++++++++--------- test/emqx_channel_SUITE.erl | 22 +++++++++++++++++++++- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 81be012c5..f8c269135 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -924,10 +924,18 @@ return_sub_unsub_ack(Packet, Channel) -> -> {reply, Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}). -handle_call(kick, Channel) -> - Channel1 = ensure_disconnected(kicked, Channel), - case Channel1 of - ?IS_MQTT_V5 -> +handle_call(kick, Channel = #channel{ + conn_state = ConnState, + will_msg = WillMsg, + conninfo = #{proto_ver := ProtoVer} + }) -> + (WillMsg =/= undefined) andalso publish_will_msg(WillMsg), + Channel1 = case ConnState of + connected -> ensure_disconnected(kicked, Channel); + _ -> Channel + end, + case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of + true -> shutdown(kicked, ok, ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), Channel1); _ -> @@ -990,7 +998,7 @@ handle_info({sock_closed, Reason}, Channel = clientinfo = ClientInfo = #{zone := Zone}}) -> emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo), - Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)), + Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), case maybe_shutdown(Reason, Channel1) of {ok, Channel2} -> {ok, {event, disconnected}, Channel2}; Shutdown -> Shutdown @@ -1651,9 +1659,9 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo, %%-------------------------------------------------------------------- %% Maybe Publish will msg -mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) -> +maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) -> Channel; -mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> +maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> case will_delay_interval(WillMsg) of 0 -> ok = publish_will_msg(WillMsg), @@ -1663,10 +1671,10 @@ mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> end. will_delay_interval(WillMsg) -> - maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0). + maps:get('Will-Delay-Interval', + emqx_message:get_header(properties, WillMsg, #{}), 0). publish_will_msg(Msg) -> - %% TODO check if we should discard result here _ = emqx_broker:publish(Msg), ok. diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index fb7453e05..ec16a9589 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -582,7 +582,27 @@ t_handle_call_kick(_) -> {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, Channelv4), {shutdown, kicked, ok, ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), - _} = emqx_channel:handle_call(kick, Channelv5). + _} = emqx_channel:handle_call(kick, Channelv5), + + DisconnectedChannelv5 = channel(#{conn_state => disconnected}), + DisconnectedChannelv4 = v4(DisconnectedChannelv5), + + {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, DisconnectedChannelv5), + {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, DisconnectedChannelv4). + +t_handle_kicked_publish_will_msg(_) -> + Self = self(), + ok = meck:expect(emqx_broker, publish, fun(M) -> Self ! {pub, M} end), + + Msg = emqx_message:make(test, <<"will_topic">>, <<"will_payload">>), + + {shutdown, kicked, ok, + ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), + _} = emqx_channel:handle_call(kick, channel(#{will_msg => Msg})), + receive + {pub, Msg} -> ok + after 200 -> ?assert(true) + end. t_handle_call_discard(_) -> Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), From 706c7725f9b36e205f421d230856200efe930667 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 18 Mar 2022 10:06:01 +0800 Subject: [PATCH 4/4] test: update test/emqx_channel_SUITE.erl Co-authored-by: Thales Macedo Garitezi --- test/emqx_channel_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index ec16a9589..079b1a87d 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -601,7 +601,7 @@ t_handle_kicked_publish_will_msg(_) -> _} = emqx_channel:handle_call(kick, channel(#{will_msg => Msg})), receive {pub, Msg} -> ok - after 200 -> ?assert(true) + after 200 -> exit(will_message_not_published) end. t_handle_call_discard(_) ->