diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index a2d7f4edc..a7e161915 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -49,6 +49,7 @@ File format: * Fix errno=13 'Permission denied' Cannot create FIFO boot error in Amazon Linux 2022 (el8 package) * Fix user or appid created, name only allow `^[A-Za-z]+[A-Za-z0-9-_]*$` * Fix subscribe http api crash by bad_qos `/mqtt/subscribe`,`/mqtt/subscribe_batch`. +* Send DISCONNECT packet with reason code 0x98 if connection has been kicked [#7309] ## v4.3.12 ### Important changes diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 32f0062ea..f8c269135 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -924,9 +924,23 @@ return_sub_unsub_ack(Packet, Channel) -> -> {reply, Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}). -handle_call(kick, Channel) -> - Channel1 = ensure_disconnected(kicked, Channel), - disconnect_and_shutdown(kicked, ok, Channel1); +handle_call(kick, Channel = #channel{ + conn_state = ConnState, + will_msg = WillMsg, + conninfo = #{proto_ver := ProtoVer} + }) -> + (WillMsg =/= undefined) andalso publish_will_msg(WillMsg), + Channel1 = case ConnState of + connected -> ensure_disconnected(kicked, Channel); + _ -> Channel + end, + case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of + true -> + shutdown(kicked, ok, + ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), Channel1); + _ -> + shutdown(kicked, ok, Channel1) + end; handle_call(discard, Channel) -> disconnect_and_shutdown(discarded, ok, Channel); @@ -984,7 +998,7 @@ handle_info({sock_closed, Reason}, Channel = clientinfo = ClientInfo = #{zone := Zone}}) -> emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo), - Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)), + Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), case maybe_shutdown(Reason, Channel1) of {ok, Channel2} -> {ok, {event, disconnected}, Channel2}; Shutdown -> Shutdown @@ -1645,9 +1659,9 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo, %%-------------------------------------------------------------------- %% Maybe Publish will msg -mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) -> +maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) -> Channel; -mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> +maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> case will_delay_interval(WillMsg) of 0 -> ok = publish_will_msg(WillMsg), @@ -1657,10 +1671,10 @@ mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> end. will_delay_interval(WillMsg) -> - maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0). + maps:get('Will-Delay-Interval', + emqx_message:get_header(properties, WillMsg, #{}), 0). publish_will_msg(Msg) -> - %% TODO check if we should discard result here _ = emqx_broker:publish(Msg), ok. diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index fd7da9f20..079b1a87d 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -577,7 +577,32 @@ t_handle_out_unexpected(_) -> %%-------------------------------------------------------------------- t_handle_call_kick(_) -> - {shutdown, kicked, ok, _Chan} = emqx_channel:handle_call(kick, channel()). + Channelv5 = channel(), + Channelv4 = v4(Channelv5), + {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, Channelv4), + {shutdown, kicked, ok, + ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), + _} = emqx_channel:handle_call(kick, Channelv5), + + DisconnectedChannelv5 = channel(#{conn_state => disconnected}), + DisconnectedChannelv4 = v4(DisconnectedChannelv5), + + {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, DisconnectedChannelv5), + {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, DisconnectedChannelv4). + +t_handle_kicked_publish_will_msg(_) -> + Self = self(), + ok = meck:expect(emqx_broker, publish, fun(M) -> Self ! {pub, M} end), + + Msg = emqx_message:make(test, <<"will_topic">>, <<"will_payload">>), + + {shutdown, kicked, ok, + ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), + _} = emqx_channel:handle_call(kick, channel(#{will_msg => Msg})), + receive + {pub, Msg} -> ok + after 200 -> exit(will_message_not_published) + end. t_handle_call_discard(_) -> Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), @@ -858,3 +883,11 @@ session(InitFields) when is_map(InitFields) -> quota() -> emqx_limiter:init(zone, [{conn_messages_routing, {5, 1}}, {overall_messages_routing, {10, 1}}]). + +v4(Channel) -> + ConnInfo = emqx_channel:info(conninfo, Channel), + emqx_channel:set_field( + conninfo, + maps:put(proto_ver, ?MQTT_PROTO_V4, ConnInfo), + Channel + ).