diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 8198a11c8..58d0c7e56 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1141,24 +1141,31 @@ return_sub_unsub_ack(Packet, Channel) -> {reply, Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}. -handle_call(kick, Channel = #channel{ - conn_state = ConnState, - will_msg = WillMsg, - conninfo = #{proto_ver := ProtoVer} - }) -> +handle_call( + kick, + Channel = #channel{ + conn_state = ConnState, + will_msg = WillMsg, + conninfo = #{proto_ver := ProtoVer} + } +) -> (WillMsg =/= undefined) andalso publish_will_msg(WillMsg), - Channel1 = case ConnState of - connected -> ensure_disconnected(kicked, Channel); - _ -> Channel - end, + Channel1 = + case ConnState of + connected -> ensure_disconnected(kicked, Channel); + _ -> Channel + end, case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of true -> - shutdown(kicked, ok, - ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), Channel1); + shutdown( + kicked, + ok, + ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), + Channel1 + ); _ -> shutdown(kicked, ok, Channel1) end; - handle_call(discard, Channel) -> disconnect_and_shutdown(discarded, ok, Channel); %% Session Takeover @@ -2071,8 +2078,11 @@ maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> end. will_delay_interval(WillMsg) -> - maps:get('Will-Delay-Interval', - emqx_message:get_header(properties, WillMsg, #{}), 0). + maps:get( + 'Will-Delay-Interval', + emqx_message:get_header(properties, WillMsg, #{}), + 0 + ). publish_will_msg(Msg) -> _ = emqx_broker:publish(Msg), @@ -2085,8 +2095,7 @@ disconnect_reason(?RC_SUCCESS) -> normal; disconnect_reason(ReasonCode) -> emqx_reason_codes:name(ReasonCode). reason_code(takenover) -> ?RC_SESSION_TAKEN_OVER; -reason_code(discarded) -> ?RC_SESSION_TAKEN_OVER; -reason_code(_) -> ?RC_NORMAL_DISCONNECTION. +reason_code(discarded) -> ?RC_SESSION_TAKEN_OVER. %%-------------------------------------------------------------------- %% Helper functions diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index cffd89793..8b1940a30 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -910,9 +910,9 @@ t_handle_call_kick(_) -> Channelv5 = channel(), Channelv4 = v4(Channelv5), {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, Channelv4), - {shutdown, kicked, ok, - ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), - _} = emqx_channel:handle_call(kick, Channelv5), + {shutdown, kicked, ok, ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), _} = emqx_channel:handle_call( + kick, Channelv5 + ), DisconnectedChannelv5 = channel(#{conn_state => disconnected}), DisconnectedChannelv4 = v4(DisconnectedChannelv5), @@ -926,9 +926,9 @@ t_handle_kicked_publish_will_msg(_) -> Msg = emqx_message:make(test, <<"will_topic">>, <<"will_payload">>), - {shutdown, kicked, ok, - ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), - _} = emqx_channel:handle_call(kick, channel(#{will_msg => Msg})), + {shutdown, kicked, ok, ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), _} = emqx_channel:handle_call( + kick, channel(#{will_msg => Msg}) + ), receive {pub, Msg} -> ok after 200 -> exit(will_message_not_published) @@ -1272,7 +1272,7 @@ limiter_cfg() -> #{message_routing => default}. v4(Channel) -> ConnInfo = emqx_channel:info(conninfo, Channel), emqx_channel:set_field( - conninfo, - maps:put(proto_ver, ?MQTT_PROTO_V4, ConnInfo), - Channel - ). + conninfo, + maps:put(proto_ver, ?MQTT_PROTO_V4, ConnInfo), + Channel + ).