fix(channel): send will_msg if client has been kicked

This commit is contained in:
JianBo He 2022-03-15 18:52:45 +08:00
parent 75239c1388
commit 604c384660
2 changed files with 38 additions and 10 deletions

View File

@ -924,10 +924,18 @@ return_sub_unsub_ack(Packet, Channel) ->
-> {reply, Reply :: term(), channel()}
| {shutdown, Reason :: term(), Reply :: term(), channel()}
| {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}).
handle_call(kick, Channel) ->
Channel1 = ensure_disconnected(kicked, Channel),
case Channel1 of
?IS_MQTT_V5 ->
handle_call(kick, Channel = #channel{
conn_state = ConnState,
will_msg = WillMsg,
conninfo = #{proto_ver := ProtoVer}
}) ->
(WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
Channel1 = case ConnState of
connected -> ensure_disconnected(kicked, Channel);
_ -> Channel
end,
case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of
true ->
shutdown(kicked, ok,
?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), Channel1);
_ ->
@ -990,7 +998,7 @@ handle_info({sock_closed, Reason}, Channel =
clientinfo = ClientInfo = #{zone := Zone}}) ->
emqx_zone:enable_flapping_detect(Zone)
andalso emqx_flapping:detect(ClientInfo),
Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)),
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
case maybe_shutdown(Reason, Channel1) of
{ok, Channel2} -> {ok, {event, disconnected}, Channel2};
Shutdown -> Shutdown
@ -1651,9 +1659,9 @@ ensure_disconnected(Reason, Channel = #channel{conninfo = ConnInfo,
%%--------------------------------------------------------------------
%% Maybe Publish will msg
mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
Channel;
mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
case will_delay_interval(WillMsg) of
0 ->
ok = publish_will_msg(WillMsg),
@ -1663,10 +1671,10 @@ mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) ->
end.
will_delay_interval(WillMsg) ->
maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0).
maps:get('Will-Delay-Interval',
emqx_message:get_header(properties, WillMsg, #{}), 0).
publish_will_msg(Msg) ->
%% TODO check if we should discard result here
_ = emqx_broker:publish(Msg),
ok.

View File

@ -582,7 +582,27 @@ t_handle_call_kick(_) ->
{shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, Channelv4),
{shutdown, kicked, ok,
?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION),
_} = emqx_channel:handle_call(kick, Channelv5).
_} = emqx_channel:handle_call(kick, Channelv5),
DisconnectedChannelv5 = channel(#{conn_state => disconnected}),
DisconnectedChannelv4 = v4(DisconnectedChannelv5),
{shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, DisconnectedChannelv5),
{shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, DisconnectedChannelv4).
t_handle_kicked_publish_will_msg(_) ->
Self = self(),
ok = meck:expect(emqx_broker, publish, fun(M) -> Self ! {pub, M} end),
Msg = emqx_message:make(test, <<"will_topic">>, <<"will_payload">>),
{shutdown, kicked, ok,
?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION),
_} = emqx_channel:handle_call(kick, channel(#{will_msg => Msg})),
receive
{pub, Msg} -> ok
after 200 -> ?assert(true)
end.
t_handle_call_discard(_) ->
Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER),