Merge pull request #12976 from HJianBo/fix-duplicated-disconnect
Fix duplicated disconnect event
This commit is contained in:
commit
4403b4f5ce
|
@ -2636,10 +2636,15 @@ disconnect_and_shutdown(
|
||||||
->
|
->
|
||||||
NChannel = ensure_disconnected(Reason, Channel),
|
NChannel = ensure_disconnected(Reason, Channel),
|
||||||
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel);
|
shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel);
|
||||||
%% mqtt v3/v4 sessions, mqtt v5 other conn_state sessions
|
%% mqtt v3/v4 connected sessions
|
||||||
disconnect_and_shutdown(Reason, Reply, Channel) ->
|
disconnect_and_shutdown(Reason, Reply, Channel = #channel{conn_state = ConnState}) when
|
||||||
|
ConnState =:= connected orelse ConnState =:= reauthenticating
|
||||||
|
->
|
||||||
NChannel = ensure_disconnected(Reason, Channel),
|
NChannel = ensure_disconnected(Reason, Channel),
|
||||||
shutdown(Reason, Reply, NChannel).
|
shutdown(Reason, Reply, NChannel);
|
||||||
|
%% other conn_state sessions
|
||||||
|
disconnect_and_shutdown(Reason, Reply, Channel) ->
|
||||||
|
shutdown(Reason, Reply, Channel).
|
||||||
|
|
||||||
-compile({inline, [sp/1, flag/1]}).
|
-compile({inline, [sp/1, flag/1]}).
|
||||||
sp(true) -> 1;
|
sp(true) -> 1;
|
||||||
|
|
|
@ -118,7 +118,8 @@ groups() ->
|
||||||
t_event_client_disconnected_normal,
|
t_event_client_disconnected_normal,
|
||||||
t_event_client_disconnected_kicked,
|
t_event_client_disconnected_kicked,
|
||||||
t_event_client_disconnected_discarded,
|
t_event_client_disconnected_discarded,
|
||||||
t_event_client_disconnected_takenover
|
t_event_client_disconnected_takenover,
|
||||||
|
t_event_client_disconnected_takenover_2
|
||||||
]},
|
]},
|
||||||
{telemetry, [], [
|
{telemetry, [], [
|
||||||
t_get_basic_usage_info_0,
|
t_get_basic_usage_info_0,
|
||||||
|
@ -983,6 +984,66 @@ t_event_client_disconnected_takenover(_Config) ->
|
||||||
|
|
||||||
delete_rule(TopicRule).
|
delete_rule(TopicRule).
|
||||||
|
|
||||||
|
t_event_client_disconnected_takenover_2(_Config) ->
|
||||||
|
SQL =
|
||||||
|
"select * "
|
||||||
|
"from \"$events/client_disconnected\" ",
|
||||||
|
RepubT = <<"repub/to/disconnected/takenover">>,
|
||||||
|
|
||||||
|
{ok, TopicRule} = emqx_rule_engine:create_rule(
|
||||||
|
#{
|
||||||
|
sql => SQL,
|
||||||
|
id => ?TMP_RULEID,
|
||||||
|
actions => [republish_action(RepubT, <<>>)]
|
||||||
|
}
|
||||||
|
),
|
||||||
|
|
||||||
|
{ok, ClientRecv} = emqtt:start_link([
|
||||||
|
{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:connect(ClientRecv),
|
||||||
|
{ok, _, _} = emqtt:subscribe(ClientRecv, RepubT, 0),
|
||||||
|
ct:sleep(200),
|
||||||
|
|
||||||
|
{ok, Client1} = emqtt:start_link([
|
||||||
|
{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:connect(Client1),
|
||||||
|
ok = emqtt:disconnect(Client1),
|
||||||
|
|
||||||
|
%% receive the normal disconnected event
|
||||||
|
receive
|
||||||
|
{publish, #{topic := T, payload := Payload}} ->
|
||||||
|
?assertEqual(RepubT, T),
|
||||||
|
?assertMatch(
|
||||||
|
#{<<"reason">> := <<"normal">>}, emqx_utils_json:decode(Payload, [return_maps])
|
||||||
|
)
|
||||||
|
after 1000 ->
|
||||||
|
ct:fail(wait_for_repub_disconnected_discarded)
|
||||||
|
end,
|
||||||
|
|
||||||
|
{ok, Client2} = emqtt:start_link([
|
||||||
|
{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:connect(Client2),
|
||||||
|
|
||||||
|
%% should not receive the takenoverdisconnected event
|
||||||
|
receive
|
||||||
|
{publish, #{topic := T1, payload := Payload1}} ->
|
||||||
|
?assertEqual(RepubT, T1),
|
||||||
|
?assertMatch(
|
||||||
|
#{<<"reason">> := <<"takenover">>}, emqx_utils_json:decode(Payload1, [return_maps])
|
||||||
|
),
|
||||||
|
ct:fail(wait_for_repub_disconnected_discarded)
|
||||||
|
after 1000 ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
|
||||||
|
emqtt:stop(ClientRecv),
|
||||||
|
emqtt:stop(Client2),
|
||||||
|
|
||||||
|
delete_rule(TopicRule).
|
||||||
|
|
||||||
client_connack_failed() ->
|
client_connack_failed() ->
|
||||||
{ok, Client} = emqtt:start_link(
|
{ok, Client} = emqtt:start_link(
|
||||||
[
|
[
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fix the `client.disconnected` event being triggered when taking over a session that the socket has been disconnected before.
|
Loading…
Reference in New Issue