Merge pull request #3209 from emqx/develop

Send DISCONNECT packet for mqttv5 (#3183)
This commit is contained in:
turtleDeng 2020-01-17 19:49:27 +08:00 committed by GitHub
commit d476dbcc14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 9 deletions

View File

@ -700,14 +700,10 @@ return_unsuback(Packet, Channel) ->
| {shutdown, Reason :: term(), Reply :: term(), channel()}). | {shutdown, Reason :: term(), Reply :: term(), channel()}).
handle_call(kick, Channel) -> handle_call(kick, Channel) ->
Channel1 = ensure_disconnected(kicked, Channel), Channel1 = ensure_disconnected(kicked, Channel),
shutdown(kicked, ok, Channel1); disconnect_and_shutdown(kicked, ok, Channel1);
handle_call(discard, Channel = #channel{conn_state = connected}) -> handle_call(discard, Channel) ->
Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), disconnect_and_shutdown(discarded, ok, Channel);
{shutdown, discarded, ok, Packet, Channel};
handle_call(discard, Channel = #channel{conn_state = disconnected}) ->
shutdown(discarded, ok, Channel);
%% Session Takeover %% Session Takeover
handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) -> handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) ->
@ -719,7 +715,7 @@ handle_call({takeover, 'end'}, Channel = #channel{session = Session,
%% TODO: Should not drain deliver here (side effect) %% TODO: Should not drain deliver here (side effect)
Delivers = emqx_misc:drain_deliver(), Delivers = emqx_misc:drain_deliver(),
AllPendings = lists:append(Delivers, Pendings), AllPendings = lists:append(Delivers, Pendings),
shutdown(takeovered, AllPendings, Channel); disconnect_and_shutdown(takeovered, AllPendings, Channel);
handle_call(list_acl_cache, Channel) -> handle_call(list_acl_cache, Channel) ->
{reply, emqx_acl_cache:list_acl_cache(), Channel}; {reply, emqx_acl_cache:list_acl_cache(), Channel};
@ -1293,6 +1289,10 @@ publish_will_msg(Msg) -> emqx_broker:publish(Msg).
disconnect_reason(?RC_SUCCESS) -> normal; disconnect_reason(?RC_SUCCESS) -> normal;
disconnect_reason(ReasonCode) -> emqx_reason_codes:name(ReasonCode). disconnect_reason(ReasonCode) -> emqx_reason_codes:name(ReasonCode).
reason_code(takeovered) -> ?RC_SESSION_TAKEN_OVER;
reason_code(discarded) -> ?RC_SESSION_TAKEN_OVER;
reason_code(_) -> ?RC_NORMAL_DISCONNECTION.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -1330,6 +1330,18 @@ shutdown(success, Reply, Channel) ->
shutdown(Reason, Reply, Channel) -> shutdown(Reason, Reply, Channel) ->
{shutdown, Reason, Reply, Channel}. {shutdown, Reason, Reply, Channel}.
shutdown(success, Reply, Packet, Channel) ->
shutdown(normal, Reply, Packet, Channel);
shutdown(Reason, Reply, Packet, Channel) ->
{shutdown, Reason, Reply, Packet, Channel}.
disconnect_and_shutdown(Reason, Reply, Channel = #channel{conn_state = connected,
conninfo = #{proto_ver := ?MQTT_PROTO_V5}}) ->
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), Channel);
disconnect_and_shutdown(Reason, Reply, Channel) ->
shutdown(Reason, Reply, Channel).
sp(true) -> 1; sp(true) -> 1;
sp(false) -> 0. sp(false) -> 0.

View File

@ -407,7 +407,7 @@ t_handle_call_takeover_begin(_) ->
t_handle_call_takeover_end(_) -> t_handle_call_takeover_end(_) ->
ok = meck:expect(emqx_session, takeover, fun(_) -> ok end), ok = meck:expect(emqx_session, takeover, fun(_) -> ok end),
{shutdown, takeovered, [], _Chan} = {shutdown, takeovered, [], _, _Chan} =
emqx_channel:handle_call({takeover, 'end'}, channel()). emqx_channel:handle_call({takeover, 'end'}, channel()).
t_handle_call_unexpected(_) -> t_handle_call_unexpected(_) ->