From b320b20c7b7328a18a77edcf8db97619069d4348 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 6 May 2024 16:17:02 +0800 Subject: [PATCH 1/3] fix(mqtt): disconnected event should not be sent twice --- apps/emqx/src/emqx_channel.erl | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index b3b384541..eb54f6ba1 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -2636,10 +2636,15 @@ disconnect_and_shutdown( -> NChannel = ensure_disconnected(Reason, Channel), shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel); -%% mqtt v3/v4 sessions, mqtt v5 other conn_state sessions -disconnect_and_shutdown(Reason, Reply, Channel) -> +%% mqtt v3/v4 connected sessions +disconnect_and_shutdown(Reason, Reply, Channel = #channel{conn_state = ConnState}) when + ConnState =:= connected orelse ConnState =:= reauthenticating +-> NChannel = ensure_disconnected(Reason, Channel), - shutdown(Reason, Reply, NChannel). + shutdown(Reason, Reply, NChannel); +%% other conn_state sessions +disconnect_and_shutdown(Reason, Reply, Channel) -> + shutdown(Reason, Reply, Channel). -compile({inline, [sp/1, flag/1]}). sp(true) -> 1; From 1642b06bf95f539cb57975895de46001f56c5afc Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 7 May 2024 10:30:53 +0800 Subject: [PATCH 2/3] test: add tests --- .../test/emqx_rule_engine_SUITE.erl | 63 ++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index b0ca00a0e..a84ead1c2 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -118,7 +118,8 @@ groups() -> t_event_client_disconnected_normal, t_event_client_disconnected_kicked, t_event_client_disconnected_discarded, - t_event_client_disconnected_takenover + t_event_client_disconnected_takenover, + t_event_client_disconnected_takenover_2 ]}, {telemetry, [], [ t_get_basic_usage_info_0, @@ -983,6 +984,66 @@ t_event_client_disconnected_takenover(_Config) -> delete_rule(TopicRule). +t_event_client_disconnected_takenover_2(_Config) -> + SQL = + "select * " + "from \"$events/client_disconnected\" ", + RepubT = <<"repub/to/disconnected/takenover">>, + + {ok, TopicRule} = emqx_rule_engine:create_rule( + #{ + sql => SQL, + id => ?TMP_RULEID, + actions => [republish_action(RepubT, <<>>)] + } + ), + + {ok, ClientRecv} = emqtt:start_link([ + {clientid, <<"get_repub_client">>}, {username, <<"emqx0">>} + ]), + {ok, _} = emqtt:connect(ClientRecv), + {ok, _, _} = emqtt:subscribe(ClientRecv, RepubT, 0), + ct:sleep(200), + + {ok, Client1} = emqtt:start_link([ + {clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false} + ]), + {ok, _} = emqtt:connect(Client1), + ok = emqtt:disconnect(Client1), + + %% receive the normal disconnected event + receive + {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(RepubT, T), + ?assertMatch( + #{<<"reason">> := <<"normal">>}, emqx_utils_json:decode(Payload, [return_maps]) + ) + after 1000 -> + ct:fail(wait_for_repub_disconnected_discarded) + end, + + {ok, Client2} = emqtt:start_link([ + {clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false} + ]), + {ok, _} = emqtt:connect(Client2), + + %% should not receive the takenoverdisconnected event + receive + {publish, #{topic := T1, payload := Payload1}} -> + ?assertEqual(RepubT, T1), + ?assertMatch( + #{<<"reason">> := <<"takenover">>}, emqx_utils_json:decode(Payload1, [return_maps]) + ), + ct:fail(wait_for_repub_disconnected_discarded) + after 1000 -> + ok + end, + + emqtt:stop(ClientRecv), + emqtt:stop(Client2), + + delete_rule(TopicRule). + client_connack_failed() -> {ok, Client} = emqtt:start_link( [ From c947455b1549684f3c0762f21d824e6ccd3e8421 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 7 May 2024 16:27:34 +0800 Subject: [PATCH 3/3] chore: add changes --- changes/ce/fix-12976.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ce/fix-12976.md diff --git a/changes/ce/fix-12976.md b/changes/ce/fix-12976.md new file mode 100644 index 000000000..82cf82b0d --- /dev/null +++ b/changes/ce/fix-12976.md @@ -0,0 +1 @@ +Fix the `client.disconnected` event being triggered when taking over a session that the socket has been disconnected before.