From 86dd1a3e057fc14aec7496b758d15aa60c87af2d Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 1 Nov 2022 17:26:31 +0800 Subject: [PATCH] test: different resons for disconnect event --- .../test/emqx_rulesql_SUITE.erl | 136 +++++++++++++++++- 1 file changed, 132 insertions(+), 4 deletions(-) diff --git a/apps/emqx_rule_engine/test/emqx_rulesql_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rulesql_SUITE.erl index f4f329e87..0a2f0d6dc 100644 --- a/apps/emqx_rule_engine/test/emqx_rulesql_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rulesql_SUITE.erl @@ -56,7 +56,10 @@ groups() -> {rulesql_select_events, [], [ t_sqlparse_event_client_connected_01 , t_sqlparse_event_client_connected_02 - , t_sqlparse_event_client_disconnected + , t_sqlparse_event_client_disconnected_normal + , t_sqlparse_event_client_disconnected_kicked + , t_sqlparse_event_client_disconnected_discarded + , t_sqlparse_event_client_disconnected_takeovered , t_sqlparse_event_session_subscribed , t_sqlparse_event_session_unsubscribed , t_sqlparse_event_message_delivered @@ -145,6 +148,12 @@ end_per_group(_Groupname, _Config) -> %% Testcase specific setup/teardown %%------------------------------------------------------------------------------ +init_per_testcase(t_sqlparse_event_client_disconnected_discarded, Config) -> + application:set_env(emqx, client_disconnect_discarded, true), + Config; +init_per_testcase(t_sqlparse_event_client_disconnected_takeovered, Config) -> + application:set_env(emqx, client_disconnect_takeovered, true), + Config; init_per_testcase(_TestCase, Config) -> init_events_counters(), ok = emqx_rule_registry:register_resource_types( @@ -152,6 +161,12 @@ init_per_testcase(_TestCase, Config) -> %ct:pal("============ ~p", [ets:tab2list(emqx_resource_type)]), Config. +end_per_testcase(t_sqlparse_event_client_disconnected_takeovered, Config) -> + application:set_env(emqx, client_disconnect_takeovered, false), %% back to default + Config; +end_per_testcase(t_sqlparse_event_client_disconnected_discarded, Config) -> + application:set_env(emqx, client_disconnect_discarded, false), %% back to default + Config; end_per_testcase(_TestCase, _Config) -> ok. @@ -523,9 +538,122 @@ t_sqlparse_event_client_connected_02(_Config) -> emqx_rule_registry:remove_rule(TopicRule). %% FROM $events/client_disconnected -t_sqlparse_event_client_disconnected(_Config) -> - %% TODO - ok. +t_sqlparse_event_client_disconnected_normal(_Config) -> + ok = emqx_rule_engine:load_providers(), + Sql = "select * " + "from \"$events/client_disconnected\" ", + RepubT = <<"repub/to/disconnected/normal">>, + + TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>), + + {ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, RepubT, 0), + ct:sleep(200), + {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client1), + emqtt:disconnect(Client1), + + receive {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(RepubT, T), + ?assertMatch(#{<<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps])) + after 1000 -> + ct:fail(wait_for_repub_disconnected_normal) + end, + emqtt:stop(Client), + + emqx_rule_registry:remove_rule(TopicRule). + +t_sqlparse_event_client_disconnected_kicked(_Config) -> + process_flag(trap_exit, true), + ok = emqx_rule_engine:load_providers(), + Sql = "select * " + "from \"$events/client_disconnected\" ", + RepubT = <<"repub/to/disconnected/kicked">>, + + TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>), + + {ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, RepubT, 0), + ct:sleep(200), + {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client1), + emqx_cm:kick_session(<<"emqx">>), + + receive {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(RepubT, T), + ?assertMatch(#{<<"reason">> := <<"kicked">>}, emqx_json:decode(Payload, [return_maps])) + after 1000 -> + ct:fail(wait_for_repub_disconnected_kicked) + end, + emqtt:stop(Client), + + process_flag(trap_exit, false). + +t_sqlparse_event_client_disconnected_discarded(_Config) -> + process_flag(trap_exit, true), + ok = emqx_rule_engine:load_providers(), + Sql = "select * " + "from \"$events/client_disconnected\" ", + RepubT = <<"repub/to/disconnected/discarded">>, + + TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>), + + {ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, RepubT, 0), + ct:sleep(200), + + {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client1), + + {ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, true}]), + {ok, _} = emqtt:connect(Client2), + + receive {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(RepubT, T), + ?assertMatch(#{<<"reason">> := <<"discarded">>}, emqx_json:decode(Payload, [return_maps])) + after 1000 -> + ct:fail(wait_for_repub_disconnected_discarded) + end, + emqtt:stop(Client), + emqtt:stop(Client2), + + emqx_rule_registry:remove_rule(TopicRule), + process_flag(trap_exit, false). + +t_sqlparse_event_client_disconnected_takeovered(_Config) -> + process_flag(trap_exit, true), + ok = emqx_rule_engine:load_providers(), + Sql = "select * " + "from \"$events/client_disconnected\" ", + RepubT = <<"repub/to/disconnected/takeovered">>, + + TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>), + + {ok, ClientRecv} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), + {ok, _} = emqtt:connect(ClientRecv), + {ok, _, _} = emqtt:subscribe(ClientRecv, RepubT, 0), + ct:sleep(200), + + {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client1), + + {ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false}]), + {ok, _} = emqtt:connect(Client2), + + receive {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(RepubT, T), + ?assertMatch(#{<<"reason">> := <<"takeovered">>}, emqx_json:decode(Payload, [return_maps])) + after 1000 -> + ct:fail(wait_for_repub_disconnected_discarded) + end, + + emqtt:stop(ClientRecv), emqtt:stop(Client2), + + emqx_rule_registry:remove_rule(TopicRule), + process_flag(trap_exit, false). %% FROM $events/session_subscribed t_sqlparse_event_session_subscribed(_Config) ->