diff --git a/apps/emqx_rule_engine/test/emqx_rulesql_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rulesql_SUITE.erl index 0a2f0d6dc..c2941c810 100644 --- a/apps/emqx_rule_engine/test/emqx_rulesql_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rulesql_SUITE.erl @@ -565,7 +565,6 @@ t_sqlparse_event_client_disconnected_normal(_Config) -> emqx_rule_registry:remove_rule(TopicRule). t_sqlparse_event_client_disconnected_kicked(_Config) -> - process_flag(trap_exit, true), ok = emqx_rule_engine:load_providers(), Sql = "select * " "from \"$events/client_disconnected\" ", @@ -573,13 +572,15 @@ t_sqlparse_event_client_disconnected_kicked(_Config) -> TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>), - {ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), - {ok, _} = emqtt:connect(Client), - {ok, _, _} = emqtt:subscribe(Client, RepubT, 0), + {ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), + {ok, _} = emqtt:connect(ClientRecvRepub), + {ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0), ct:sleep(200), + {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]), {ok, _} = emqtt:connect(Client1), emqx_cm:kick_session(<<"emqx">>), + unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}} receive {publish, #{topic := T, payload := Payload}} -> ?assertEqual(RepubT, T), @@ -587,12 +588,10 @@ t_sqlparse_event_client_disconnected_kicked(_Config) -> after 1000 -> ct:fail(wait_for_repub_disconnected_kicked) end, - emqtt:stop(Client), - - process_flag(trap_exit, false). + emqtt:stop(ClientRecvRepub), + emqx_rule_registry:remove_rule(TopicRule). t_sqlparse_event_client_disconnected_discarded(_Config) -> - process_flag(trap_exit, true), ok = emqx_rule_engine:load_providers(), Sql = "select * " "from \"$events/client_disconnected\" ", @@ -600,13 +599,14 @@ t_sqlparse_event_client_disconnected_discarded(_Config) -> TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>), - {ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), - {ok, _} = emqtt:connect(Client), - {ok, _, _} = emqtt:subscribe(Client, RepubT, 0), + {ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), + {ok, _} = emqtt:connect(ClientRecvRepub), + {ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0), ct:sleep(200), {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]), {ok, _} = emqtt:connect(Client1), + unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}} {ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, true}]), {ok, _} = emqtt:connect(Client2), @@ -617,14 +617,11 @@ t_sqlparse_event_client_disconnected_discarded(_Config) -> after 1000 -> ct:fail(wait_for_repub_disconnected_discarded) end, - emqtt:stop(Client), - emqtt:stop(Client2), - emqx_rule_registry:remove_rule(TopicRule), - process_flag(trap_exit, false). + emqtt:stop(ClientRecvRepub), emqtt:stop(Client2), + emqx_rule_registry:remove_rule(TopicRule). t_sqlparse_event_client_disconnected_takeovered(_Config) -> - process_flag(trap_exit, true), ok = emqx_rule_engine:load_providers(), Sql = "select * " "from \"$events/client_disconnected\" ", @@ -632,13 +629,14 @@ t_sqlparse_event_client_disconnected_takeovered(_Config) -> TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>), - {ok, ClientRecv} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), - {ok, _} = emqtt:connect(ClientRecv), - {ok, _, _} = emqtt:subscribe(ClientRecv, RepubT, 0), + {ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]), + {ok, _} = emqtt:connect(ClientRecvRepub), + {ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0), ct:sleep(200), {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]), {ok, _} = emqtt:connect(Client1), + unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}} {ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false}]), {ok, _} = emqtt:connect(Client2), @@ -650,10 +648,8 @@ t_sqlparse_event_client_disconnected_takeovered(_Config) -> ct:fail(wait_for_repub_disconnected_discarded) end, - emqtt:stop(ClientRecv), emqtt:stop(Client2), - - emqx_rule_registry:remove_rule(TopicRule), - process_flag(trap_exit, false). + emqtt:stop(ClientRecvRepub), emqtt:stop(Client2), + emqx_rule_registry:remove_rule(TopicRule). %% FROM $events/session_subscribed t_sqlparse_event_session_subscribed(_Config) ->