From 87a29beb5fdcdb43d861831d221b4a99037d0457 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 15 Mar 2022 11:55:47 +0800 Subject: [PATCH 1/4] fix(channel): send DISCONNECT packet if connection has been kicked fix #7241 --- apps/emqx/src/emqx_channel.erl | 9 ++++++++- apps/emqx/test/emqx_channel_SUITE.erl | 15 ++++++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 860f99300..ec573a013 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1143,7 +1143,14 @@ return_sub_unsub_ack(Packet, Channel) -> | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}. handle_call(kick, Channel) -> Channel1 = ensure_disconnected(kicked, Channel), - disconnect_and_shutdown(kicked, ok, Channel1); + case Channel1 of + ?IS_MQTT_V5 -> + shutdown(kicked, ok, + ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), Channel1); + _ -> + shutdown(kicked, ok, Channel1) + end; + handle_call(discard, Channel) -> disconnect_and_shutdown(discarded, ok, Channel); %% Session Takeover diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 40d2618c0..0731e4a3a 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -907,7 +907,12 @@ t_handle_out_unexpected(_) -> %%-------------------------------------------------------------------- t_handle_call_kick(_) -> - {shutdown, kicked, ok, _Chan} = emqx_channel:handle_call(kick, channel()). + Channelv5 = channel(), + Channelv4 = v4(Channelv5), + {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, Channelv4), + {shutdown, kicked, ok, + ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), + _} = emqx_channel:handle_call(kick, Channelv5). t_handle_call_discard(_) -> Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), @@ -1243,3 +1248,11 @@ quota() -> emqx_limiter_container:get_limiter_by_names([message_routing], limiter_cfg()). limiter_cfg() -> #{message_routing => default}. + +v4(Channel) -> + ConnInfo = emqx_channel:info(conninfo, Channel), + emqx_channel:set_field( + conninfo, + maps:put(proto_ver, ?MQTT_PROTO_V4, ConnInfo), + Channel + ). From df74c180b719c192475ca4c8f19fea04c5bf3556 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 15 Mar 2022 18:52:45 +0800 Subject: [PATCH 2/4] fix(channel): send will_msg if client has been kicked --- apps/emqx/src/emqx_channel.erl | 26 +++++++++++++++++--------- apps/emqx/test/emqx_channel_SUITE.erl | 22 +++++++++++++++++++++- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index ec573a013..8198a11c8 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1141,10 +1141,18 @@ return_sub_unsub_ack(Packet, Channel) -> {reply, Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}. -handle_call(kick, Channel) -> - Channel1 = ensure_disconnected(kicked, Channel), - case Channel1 of - ?IS_MQTT_V5 -> +handle_call(kick, Channel = #channel{ + conn_state = ConnState, + will_msg = WillMsg, + conninfo = #{proto_ver := ProtoVer} + }) -> + (WillMsg =/= undefined) andalso publish_will_msg(WillMsg), + Channel1 = case ConnState of + connected -> ensure_disconnected(kicked, Channel); + _ -> Channel + end, + case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of + true -> shutdown(kicked, ok, ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), Channel1); _ -> @@ -1227,7 +1235,7 @@ handle_info( -> emqx_config:get_zone_conf(Zone, [flapping_detect, enable]) andalso emqx_flapping:detect(ClientInfo), - Channel1 = ensure_disconnected(Reason, mabye_publish_will_msg(Channel)), + Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), case maybe_shutdown(Reason, Channel1) of {ok, Channel2} -> {ok, {event, disconnected}, Channel2}; Shutdown -> Shutdown @@ -2051,9 +2059,9 @@ ensure_disconnected( %%-------------------------------------------------------------------- %% Maybe Publish will msg -mabye_publish_will_msg(Channel = #channel{will_msg = undefined}) -> +maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) -> Channel; -mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> +maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> case will_delay_interval(WillMsg) of 0 -> ok = publish_will_msg(WillMsg), @@ -2063,10 +2071,10 @@ mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> end. will_delay_interval(WillMsg) -> - maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0). + maps:get('Will-Delay-Interval', + emqx_message:get_header(properties, WillMsg, #{}), 0). publish_will_msg(Msg) -> - %% TODO check if we should discard result here _ = emqx_broker:publish(Msg), ok. diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 0731e4a3a..51b97fde0 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -912,7 +912,27 @@ t_handle_call_kick(_) -> {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, Channelv4), {shutdown, kicked, ok, ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), - _} = emqx_channel:handle_call(kick, Channelv5). + _} = emqx_channel:handle_call(kick, Channelv5), + + DisconnectedChannelv5 = channel(#{conn_state => disconnected}), + DisconnectedChannelv4 = v4(DisconnectedChannelv5), + + {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, DisconnectedChannelv5), + {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, DisconnectedChannelv4). + +t_handle_kicked_publish_will_msg(_) -> + Self = self(), + ok = meck:expect(emqx_broker, publish, fun(M) -> Self ! {pub, M} end), + + Msg = emqx_message:make(test, <<"will_topic">>, <<"will_payload">>), + + {shutdown, kicked, ok, + ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), + _} = emqx_channel:handle_call(kick, channel(#{will_msg => Msg})), + receive + {pub, Msg} -> ok + after 200 -> ?assert(true) + end. t_handle_call_discard(_) -> Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), From a889b0b6a967670403b7bc87641e2fb9b1f1ad9f Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 18 Mar 2022 10:06:01 +0800 Subject: [PATCH 3/4] test: update test/emqx_channel_SUITE.erl Co-authored-by: Thales Macedo Garitezi --- apps/emqx/test/emqx_channel_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 51b97fde0..cffd89793 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -931,7 +931,7 @@ t_handle_kicked_publish_will_msg(_) -> _} = emqx_channel:handle_call(kick, channel(#{will_msg => Msg})), receive {pub, Msg} -> ok - after 200 -> ?assert(true) + after 200 -> exit(will_message_not_published) end. t_handle_call_discard(_) -> From 97c05d3a7252ef98a65e830df9bb5d17e0c55aa3 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 22 Mar 2022 15:27:10 +0800 Subject: [PATCH 4/4] chore(emqx): fmt codes --- apps/emqx/src/emqx_channel.erl | 41 ++++++++++++++++----------- apps/emqx/test/emqx_channel_SUITE.erl | 20 ++++++------- 2 files changed, 35 insertions(+), 26 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 8198a11c8..58d0c7e56 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1141,24 +1141,31 @@ return_sub_unsub_ack(Packet, Channel) -> {reply, Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}. -handle_call(kick, Channel = #channel{ - conn_state = ConnState, - will_msg = WillMsg, - conninfo = #{proto_ver := ProtoVer} - }) -> +handle_call( + kick, + Channel = #channel{ + conn_state = ConnState, + will_msg = WillMsg, + conninfo = #{proto_ver := ProtoVer} + } +) -> (WillMsg =/= undefined) andalso publish_will_msg(WillMsg), - Channel1 = case ConnState of - connected -> ensure_disconnected(kicked, Channel); - _ -> Channel - end, + Channel1 = + case ConnState of + connected -> ensure_disconnected(kicked, Channel); + _ -> Channel + end, case ProtoVer == ?MQTT_PROTO_V5 andalso ConnState == connected of true -> - shutdown(kicked, ok, - ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), Channel1); + shutdown( + kicked, + ok, + ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), + Channel1 + ); _ -> shutdown(kicked, ok, Channel1) end; - handle_call(discard, Channel) -> disconnect_and_shutdown(discarded, ok, Channel); %% Session Takeover @@ -2071,8 +2078,11 @@ maybe_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> end. will_delay_interval(WillMsg) -> - maps:get('Will-Delay-Interval', - emqx_message:get_header(properties, WillMsg, #{}), 0). + maps:get( + 'Will-Delay-Interval', + emqx_message:get_header(properties, WillMsg, #{}), + 0 + ). publish_will_msg(Msg) -> _ = emqx_broker:publish(Msg), @@ -2085,8 +2095,7 @@ disconnect_reason(?RC_SUCCESS) -> normal; disconnect_reason(ReasonCode) -> emqx_reason_codes:name(ReasonCode). reason_code(takenover) -> ?RC_SESSION_TAKEN_OVER; -reason_code(discarded) -> ?RC_SESSION_TAKEN_OVER; -reason_code(_) -> ?RC_NORMAL_DISCONNECTION. +reason_code(discarded) -> ?RC_SESSION_TAKEN_OVER. %%-------------------------------------------------------------------- %% Helper functions diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index cffd89793..8b1940a30 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -910,9 +910,9 @@ t_handle_call_kick(_) -> Channelv5 = channel(), Channelv4 = v4(Channelv5), {shutdown, kicked, ok, _} = emqx_channel:handle_call(kick, Channelv4), - {shutdown, kicked, ok, - ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), - _} = emqx_channel:handle_call(kick, Channelv5), + {shutdown, kicked, ok, ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), _} = emqx_channel:handle_call( + kick, Channelv5 + ), DisconnectedChannelv5 = channel(#{conn_state => disconnected}), DisconnectedChannelv4 = v4(DisconnectedChannelv5), @@ -926,9 +926,9 @@ t_handle_kicked_publish_will_msg(_) -> Msg = emqx_message:make(test, <<"will_topic">>, <<"will_payload">>), - {shutdown, kicked, ok, - ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), - _} = emqx_channel:handle_call(kick, channel(#{will_msg => Msg})), + {shutdown, kicked, ok, ?DISCONNECT_PACKET(?RC_ADMINISTRATIVE_ACTION), _} = emqx_channel:handle_call( + kick, channel(#{will_msg => Msg}) + ), receive {pub, Msg} -> ok after 200 -> exit(will_message_not_published) @@ -1272,7 +1272,7 @@ limiter_cfg() -> #{message_routing => default}. v4(Channel) -> ConnInfo = emqx_channel:info(conninfo, Channel), emqx_channel:set_field( - conninfo, - maps:put(proto_ver, ?MQTT_PROTO_V4, ConnInfo), - Channel - ). + conninfo, + maps:put(proto_ver, ?MQTT_PROTO_V4, ConnInfo), + Channel + ).