Merge pull request #7309 from HJianBo/send_disconnect_pkt_while_kicked
fix(channel): send DISCONNECT packet if connection has been kicked
This commit is contained in:
commit
e4b5001a57
|
@ -49,6 +49,7 @@ File format:
|
||||||
* Fix errno=13 'Permission denied' Cannot create FIFO boot error in Amazon Linux 2022 (el8 package)
|
* Fix errno=13 'Permission denied' Cannot create FIFO boot error in Amazon Linux 2022 (el8 package)
|
||||||
* Fix user or appid created, name only allow `^[A-Za-z]+[A-Za-z0-9-_]*$`
|
* Fix user or appid created, name only allow `^[A-Za-z]+[A-Za-z0-9-_]*$`
|
||||||
* Fix subscribe http api crash by bad_qos `/mqtt/subscribe`,`/mqtt/subscribe_batch`.
|
* Fix subscribe http api crash by bad_qos `/mqtt/subscribe`,`/mqtt/subscribe_batch`.
|
||||||
|
* Send DISCONNECT packet with reason code 0x98 if connection has been kicked [#7309]
|
||||||
|
|
||||||
## v4.3.12
|
## v4.3.12
|
||||||
### Important changes
|
### Important changes
|
||||||
|
|
|
@ -924,9 +924,23 @@ return_sub_unsub_ack(Packet, Channel) ->
|
||||||
-> {reply, Reply :: term(), channel()}
|
-> {reply, Reply :: term(), channel()}
|
||||||
| {shutdown, Reason :: term(), Reply :: term(), channel()}
|
| {shutdown, Reason :: term(), Reply :: term(), channel()}
|
||||||
| {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}).
|
| {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}).
|
||||||
handle_call(kick, Channel) ->
|
handle_call(kick, Channel = #channel{
|
||||||
Channel1 = ensure_disconnected(kicked, Channel),
|
conn_state = ConnState,
|
||||||
disconnect_and_shutdown(kicked, ok, Channel1);
|
will_msg = WillMsg,
|
||||||
|
conninfo = #{proto_ver := ProtoVer}
|
||||||
|
}) ->
|
||||||
|
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
|
||||||
|
Channel1 = case ConnState of
|
||||||
|
connected -> ensure_disconnected(kicked, Channel);
|
||||||
|
_ -> Channel
|
||||||
|
end,
|
||||||
|
case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of
|
||||||
|
true ->
|
||||||
|
shutdown(kicked, ok,
|
||||||
|
?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), Channel1);
|
||||||
|
_ ->
|
||||||
|
shutdown(kicked, ok, Channel1)
|
||||||
|
end;
|
||||||
|
|
||||||
handle_call(discard, Channel) ->
|
handle_call(discard, Channel) ->
|
||||||
disconnect_and_shutdown(discarded, ok, Channel);
|
disconnect_and_shutdown(discarded, ok, Channel);
|
||||||
|
@ -984,7 +998,7 @@ handle_info({sock_closed, Reason}, Channel =
|
||||||
clientinfo = ClientInfo = #{zone := Zone}}) ->
|
clientinfo = ClientInfo = #{zone := Zone}}) ->
|
||||||
emqx_zone:enable_flapping_detect(Zone)
|
emqx_zone:enable_flapping_detect(Zone)
|
||||||
andalso emqx_flapping:detect(ClientInfo),
|
andalso emqx_flapping:detect(ClientInfo),
|
||||||
Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)),
|
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
|
||||||
case maybe_shutdown(Reason, Channel1) of
|
case maybe_shutdown(Reason, Channel1) of
|
||||||
{ok, Channel2} -> {ok, {event, disconnected}, Channel2};
|
{ok, Channel2} -> {ok, {event, disconnected}, Channel2};
|
||||||
Shutdown -> Shutdown
|
Shutdown -> Shutdown
|
||||||
|
@ -1645,9 +1659,9 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Maybe Publish will msg
|
%% Maybe Publish will msg
|
||||||
|
|
||||||
mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
|
maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
|
||||||
Channel;
|
Channel;
|
||||||
mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
|
maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
|
||||||
case will_delay_interval(WillMsg) of
|
case will_delay_interval(WillMsg) of
|
||||||
0 ->
|
0 ->
|
||||||
ok = publish_will_msg(WillMsg),
|
ok = publish_will_msg(WillMsg),
|
||||||
|
@ -1657,10 +1671,10 @@ mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
will_delay_interval(WillMsg) ->
|
will_delay_interval(WillMsg) ->
|
||||||
maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0).
|
maps:get('Will-Delay-Interval',
|
||||||
|
emqx_message:get_header(properties, WillMsg, #{}), 0).
|
||||||
|
|
||||||
publish_will_msg(Msg) ->
|
publish_will_msg(Msg) ->
|
||||||
%% TODO check if we should discard result here
|
|
||||||
_ = emqx_broker:publish(Msg),
|
_ = emqx_broker:publish(Msg),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
|
@ -577,7 +577,32 @@ t_handle_out_unexpected(_) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
t_handle_call_kick(_) ->
|
t_handle_call_kick(_) ->
|
||||||
{shutdown, kicked, ok, _Chan} = emqx_channel:handle_call(kick, channel()).
|
Channelv5 = channel(),
|
||||||
|
Channelv4 = v4(Channelv5),
|
||||||
|
{shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, Channelv4),
|
||||||
|
{shutdown, kicked, ok,
|
||||||
|
?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION),
|
||||||
|
_} = emqx_channel:handle_call(kick, Channelv5),
|
||||||
|
|
||||||
|
DisconnectedChannelv5 = channel(#{conn_state => disconnected}),
|
||||||
|
DisconnectedChannelv4 = v4(DisconnectedChannelv5),
|
||||||
|
|
||||||
|
{shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, DisconnectedChannelv5),
|
||||||
|
{shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, DisconnectedChannelv4).
|
||||||
|
|
||||||
|
t_handle_kicked_publish_will_msg(_) ->
|
||||||
|
Self = self(),
|
||||||
|
ok = meck:expect(emqx_broker, publish, fun(M) -> Self ! {pub, M} end),
|
||||||
|
|
||||||
|
Msg = emqx_message:make(test, <<"will_topic">>, <<"will_payload">>),
|
||||||
|
|
||||||
|
{shutdown, kicked, ok,
|
||||||
|
?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION),
|
||||||
|
_} = emqx_channel:handle_call(kick, channel(#{will_msg => Msg})),
|
||||||
|
receive
|
||||||
|
{pub, Msg} -> ok
|
||||||
|
after 200 -> exit(will_message_not_published)
|
||||||
|
end.
|
||||||
|
|
||||||
t_handle_call_discard(_) ->
|
t_handle_call_discard(_) ->
|
||||||
Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER),
|
Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER),
|
||||||
|
@ -858,3 +883,11 @@ session(InitFields) when is_map(InitFields) ->
|
||||||
quota() ->
|
quota() ->
|
||||||
emqx_limiter:init(zone, [{conn_messages_routing, {5, 1}},
|
emqx_limiter:init(zone, [{conn_messages_routing, {5, 1}},
|
||||||
{overall_messages_routing, {10, 1}}]).
|
{overall_messages_routing, {10, 1}}]).
|
||||||
|
|
||||||
|
v4(Channel) ->
|
||||||
|
ConnInfo = emqx_channel:info(conninfo, Channel),
|
||||||
|
emqx_channel:set_field(
|
||||||
|
conninfo,
|
||||||
|
maps:put(proto_ver, ?MQTT_PROTO_V4, ConnInfo),
|
||||||
|
Channel
|
||||||
|
).
|
||||||
|
|
Loading…
Reference in New Issue