From e17241884cb908e9a2f8649609fb8d8ac80f3fff Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 17 Jan 2020 19:48:39 +0800 Subject: [PATCH] Send DISCONNECT packet for mqttv5 (#3183) (#3208) --- src/emqx_channel.erl | 28 ++++++++++++++++++++-------- test/emqx_channel_SUITE.erl | 2 +- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 73186a1e1..cce2e1f97 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -700,14 +700,10 @@ return_unsuback(Packet, Channel) -> | {shutdown, Reason :: term(), Reply :: term(), channel()}). handle_call(kick, Channel) -> Channel1 = ensure_disconnected(kicked, Channel), - shutdown(kicked, ok, Channel1); + disconnect_and_shutdown(kicked, ok, Channel1); -handle_call(discard, Channel = #channel{conn_state = connected}) -> - Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), - {shutdown, discarded, ok, Packet, Channel}; - -handle_call(discard, Channel = #channel{conn_state = disconnected}) -> - shutdown(discarded, ok, Channel); +handle_call(discard, Channel) -> + disconnect_and_shutdown(discarded, ok, Channel); %% Session Takeover handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) -> @@ -719,7 +715,7 @@ handle_call({takeover, 'end'}, Channel = #channel{session = Session, %% TODO: Should not drain deliver here (side effect) Delivers = emqx_misc:drain_deliver(), AllPendings = lists:append(Delivers, Pendings), - shutdown(takeovered, AllPendings, Channel); + disconnect_and_shutdown(takeovered, AllPendings, Channel); handle_call(list_acl_cache, Channel) -> {reply, emqx_acl_cache:list_acl_cache(), Channel}; @@ -1293,6 +1289,10 @@ publish_will_msg(Msg) -> emqx_broker:publish(Msg). disconnect_reason(?RC_SUCCESS) -> normal; disconnect_reason(ReasonCode) -> emqx_reason_codes:name(ReasonCode). +reason_code(takeovered) -> ?RC_SESSION_TAKEN_OVER; +reason_code(discarded) -> ?RC_SESSION_TAKEN_OVER; +reason_code(_) -> ?RC_NORMAL_DISCONNECTION. + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- @@ -1330,6 +1330,18 @@ shutdown(success, Reply, Channel) -> shutdown(Reason, Reply, Channel) -> {shutdown, Reason, Reply, Channel}. +shutdown(success, Reply, Packet, Channel) -> + shutdown(normal, Reply, Packet, Channel); +shutdown(Reason, Reply, Packet, Channel) -> + {shutdown, Reason, Reply, Packet, Channel}. + +disconnect_and_shutdown(Reason, Reply, Channel = #channel{conn_state = connected, + conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) -> + shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), Channel); + +disconnect_and_shutdown(Reason, Reply, Channel) -> + shutdown(Reason, Reply, Channel). + sp(true) -> 1; sp(false) -> 0. diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 0fe0a811b..e3859a15d 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -407,7 +407,7 @@ t_handle_call_takeover_begin(_) -> t_handle_call_takeover_end(_) -> ok = meck:expect(emqx_session, takeover, fun(_) -> ok end), - {shutdown, takeovered, [], _Chan} = + {shutdown, takeovered, [], _, _Chan} = emqx_channel:handle_call({takeover, 'end'}, channel()). t_handle_call_unexpected(_) ->