From df74c180b719c192475ca4c8f19fea04c5bf3556 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 15 Mar 2022 18:52:45 +0800 Subject: [PATCH] fix(channel): send will_msg if client has been kicked --- apps/emqx/src/emqx_channel.erl | 26 +++++++++++++++++--------- apps/emqx/test/emqx_channel_SUITE.erl | 22 +++++++++++++++++++++- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index ec573a013..8198a11c8 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1141,10 +1141,18 @@ return_sub_unsub_ack(Packet, Channel) -> {reply, Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}. -handle_call(kick, Channel) -> - Channel1 = ensure_disconnected(kicked, Channel), - case Channel1 of - ?IS_MQTT_V5 -> +handle_call(kick, Channel = #channel{ + conn_state = ConnState, + will_msg = WillMsg, + conninfo = #{proto_ver := ProtoVer} + }) -> + (WillMsg =/= undefined) andalso publish_will_msg(WillMsg), + Channel1 = case ConnState of + connected -> ensure_disconnected(kicked, Channel); + _ -> Channel + end, + case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of + true -> shutdown(kicked, ok, ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), Channel1); _ -> @@ -1227,7 +1235,7 @@ handle_info( -> emqx_config:get_zone_conf(Zone, [flapping_detect, enable]) andalso emqx_flapping:detect(ClientInfo), - Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)), + Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), case maybe_shutdown(Reason, Channel1) of {ok, Channel2} -> {ok, {event, disconnected}, Channel2}; Shutdown -> Shutdown @@ -2051,9 +2059,9 @@ ensure_disconnected( %%-------------------------------------------------------------------- %% Maybe Publish will msg -mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) -> +maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) -> Channel; -mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> +maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> case will_delay_interval(WillMsg) of 0 -> ok = publish_will_msg(WillMsg), @@ -2063,10 +2071,10 @@ mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> end. will_delay_interval(WillMsg) -> - maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0). + maps:get('Will-Delay-Interval', + emqx_message:get_header(properties, WillMsg, #{}), 0). publish_will_msg(Msg) -> - %% TODO check if we should discard result here _ = emqx_broker:publish(Msg), ok. diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 0731e4a3a..51b97fde0 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -912,7 +912,27 @@ t_handle_call_kick(_) -> {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, Channelv4), {shutdown, kicked, ok, ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), - _} = emqx_channel:handle_call(kick, Channelv5). + _} = emqx_channel:handle_call(kick, Channelv5), + + DisconnectedChannelv5 = channel(#{conn_state => disconnected}), + DisconnectedChannelv4 = v4(DisconnectedChannelv5), + + {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, DisconnectedChannelv5), + {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, DisconnectedChannelv4). + +t_handle_kicked_publish_will_msg(_) -> + Self = self(), + ok = meck:expect(emqx_broker, publish, fun(M) -> Self ! {pub, M} end), + + Msg = emqx_message:make(test, <<"will_topic">>, <<"will_payload">>), + + {shutdown, kicked, ok, + ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), + _} = emqx_channel:handle_call(kick, channel(#{will_msg => Msg})), + receive + {pub, Msg} -> ok + after 200 -> ?assert(true) + end. t_handle_call_discard(_) -> Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER),