diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 81be012c5..f8c269135 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -924,10 +924,18 @@ return_sub_unsub_ack(Packet, Channel) -> -> {reply, Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}). -handle_call(kick, Channel) -> - Channel1 = ensure_disconnected(kicked, Channel), - case Channel1 of - ?IS_MQTT_V5 -> +handle_call(kick, Channel = #channel{ + conn_state = ConnState, + will_msg = WillMsg, + conninfo = #{proto_ver := ProtoVer} + }) -> + (WillMsg =/= undefined) andalso publish_will_msg(WillMsg), + Channel1 = case ConnState of + connected -> ensure_disconnected(kicked, Channel); + _ -> Channel + end, + case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of + true -> shutdown(kicked, ok, ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), Channel1); _ -> @@ -990,7 +998,7 @@ handle_info({sock_closed, Reason}, Channel = clientinfo = ClientInfo = #{zone := Zone}}) -> emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo), - Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)), + Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), case maybe_shutdown(Reason, Channel1) of {ok, Channel2} -> {ok, {event, disconnected}, Channel2}; Shutdown -> Shutdown @@ -1651,9 +1659,9 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo, %%-------------------------------------------------------------------- %% Maybe Publish will msg -mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) -> +maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) -> Channel; -mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> +maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> case will_delay_interval(WillMsg) of 0 -> ok = publish_will_msg(WillMsg), @@ -1663,10 +1671,10 @@ mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> end. will_delay_interval(WillMsg) -> - maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0). + maps:get('Will-Delay-Interval', + emqx_message:get_header(properties, WillMsg, #{}), 0). publish_will_msg(Msg) -> - %% TODO check if we should discard result here _ = emqx_broker:publish(Msg), ok. diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index fb7453e05..ec16a9589 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -582,7 +582,27 @@ t_handle_call_kick(_) -> {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, Channelv4), {shutdown, kicked, ok, ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), - _} = emqx_channel:handle_call(kick, Channelv5). + _} = emqx_channel:handle_call(kick, Channelv5), + + DisconnectedChannelv5 = channel(#{conn_state => disconnected}), + DisconnectedChannelv4 = v4(DisconnectedChannelv5), + + {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, DisconnectedChannelv5), + {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, DisconnectedChannelv4). + +t_handle_kicked_publish_will_msg(_) -> + Self = self(), + ok = meck:expect(emqx_broker, publish, fun(M) -> Self ! {pub, M} end), + + Msg = emqx_message:make(test, <<"will_topic">>, <<"will_payload">>), + + {shutdown, kicked, ok, + ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), + _} = emqx_channel:handle_call(kick, channel(#{will_msg => Msg})), + receive + {pub, Msg} -> ok + after 200 -> ?assert(true) + end. t_handle_call_discard(_) -> Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER),