diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 494c77ec7..ea35abfba 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -2098,7 +2098,7 @@ parse_topic_filters(TopicFilters) -> lists:map(fun emqx_topic:parse/1, TopicFilters). %%-------------------------------------------------------------------- -%% Ensure disconnected +%% Maybe & Ensure disconnected ensure_disconnected( Reason, @@ -2205,6 +2205,7 @@ shutdown(success, Reply, Packet, Channel) -> shutdown(Reason, Reply, Packet, Channel) -> {shutdown, Reason, Reply, Packet, Channel}. +%% mqtt v5 connected sessions disconnect_and_shutdown( Reason, Reply, @@ -2214,9 +2215,12 @@ disconnect_and_shutdown( ) when ConnState =:= connected orelse ConnState =:= reauthenticating -> - shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), Channel); + NChannel = ensure_disconnected(Reason, Channel), + shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel); +%% mqtt v3/v4 sessions, mqtt v5 other conn_state sessions disconnect_and_shutdown(Reason, Reply, Channel) -> - shutdown(Reason, Reply, Channel). + NChannel = ensure_disconnected(Reason, Channel), + shutdown(Reason, Reply, NChannel). sp(true) -> 1; sp(false) -> 0. diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index beb7817af..df17e434a 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -33,6 +33,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> + emqx_common_test_helpers:boot_modules(all), emqx_channel_SUITE:set_test_listener_confs(), ?check_trace( ?wait_async_action( diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 4e0c7dfe3..17fe1a36c 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -91,7 +91,13 @@ groups() -> t_sqlparse_new_map, t_sqlparse_invalid_json ]}, - {events, [], [t_events]}, + {events, [], [ + t_events, + t_event_client_disconnected_normal, + t_event_client_disconnected_kicked, + t_event_client_disconnected_discarded, + t_event_client_disconnected_takenover + ]}, {telemetry, [], [ t_get_basic_usage_info_0, t_get_basic_usage_info_1 @@ -474,6 +480,165 @@ t_events(_Config) -> client_connack_failed(), ok. +t_event_client_disconnected_normal(_Config) -> + SQL = + "select * " + "from \"$events/client_disconnected\" ", + RepubT = <<"repub/to/disconnected/normal">>, + + {ok, TopicRule} = emqx_rule_engine:create_rule( + #{ + sql => SQL, + id => ?TMP_RULEID, + actions => [republish_action(RepubT, <<>>)] + } + ), + + {ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, RepubT, 0), + ct:sleep(200), + {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client1), + emqtt:disconnect(Client1), + + receive + {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(RepubT, T), + ?assertMatch(#{<<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps])) + after 1000 -> + ct:fail(wait_for_repub_disconnected_normal) + end, + emqtt:stop(Client), + + delete_rule(TopicRule). + +t_event_client_disconnected_kicked(_Config) -> + SQL = + "select * " + "from \"$events/client_disconnected\" ", + RepubT = <<"repub/to/disconnected/kicked">>, + + {ok, TopicRule} = emqx_rule_engine:create_rule( + #{ + sql => SQL, + id => ?TMP_RULEID, + actions => [republish_action(RepubT, <<>>)] + } + ), + + {ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, RepubT, 0), + ct:sleep(200), + + {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client1), + %% the process will receive {'EXIT',{shutdown,tcp_closed}} + unlink(Client1), + + emqx_cm:kick_session(<<"emqx">>), + + receive + {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(RepubT, T), + ?assertMatch(#{<<"reason">> := <<"kicked">>}, emqx_json:decode(Payload, [return_maps])) + after 1000 -> + ct:fail(wait_for_repub_disconnected_kicked) + end, + + emqtt:stop(Client), + delete_rule(TopicRule). + +t_event_client_disconnected_discarded(_Config) -> + SQL = + "select * " + "from \"$events/client_disconnected\" ", + RepubT = <<"repub/to/disconnected/discarded">>, + + {ok, TopicRule} = emqx_rule_engine:create_rule( + #{ + sql => SQL, + id => ?TMP_RULEID, + actions => [republish_action(RepubT, <<>>)] + } + ), + + {ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, RepubT, 0), + ct:sleep(200), + + {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client1), + %% the process will receive {'EXIT',{shutdown,tcp_closed}} + unlink(Client1), + + {ok, Client2} = emqtt:start_link([ + {clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, true} + ]), + {ok, _} = emqtt:connect(Client2), + + receive + {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(RepubT, T), + ?assertMatch( + #{<<"reason">> := <<"discarded">>}, emqx_json:decode(Payload, [return_maps]) + ) + after 1000 -> + ct:fail(wait_for_repub_disconnected_discarded) + end, + emqtt:stop(Client), + emqtt:stop(Client2), + + delete_rule(TopicRule). + +t_event_client_disconnected_takenover(_Config) -> + SQL = + "select * " + "from \"$events/client_disconnected\" ", + RepubT = <<"repub/to/disconnected/takenover">>, + + {ok, TopicRule} = emqx_rule_engine:create_rule( + #{ + sql => SQL, + id => ?TMP_RULEID, + actions => [republish_action(RepubT, <<>>)] + } + ), + + {ok, ClientRecv} = emqtt:start_link([ + {clientid, <<"get_repub_client">>}, {username, <<"emqx0">>} + ]), + {ok, _} = emqtt:connect(ClientRecv), + {ok, _, _} = emqtt:subscribe(ClientRecv, RepubT, 0), + ct:sleep(200), + + {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client1), + %% the process will receive {'EXIT',{shutdown,tcp_closed}} + unlink(Client1), + + {ok, Client2} = emqtt:start_link([ + {clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false} + ]), + {ok, _} = emqtt:connect(Client2), + + receive + {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(RepubT, T), + ?assertMatch( + #{<<"reason">> := <<"takenover">>}, emqx_json:decode(Payload, [return_maps]) + ) + after 1000 -> + ct:fail(wait_for_repub_disconnected_discarded) + end, + + emqtt:stop(ClientRecv), + emqtt:stop(Client2), + + delete_rule(TopicRule). + client_connack_failed() -> {ok, Client} = emqtt:start_link( [