test: unlink the process would be exited by tcp close
This commit is contained in:
parent
86dd1a3e05
commit
cb3bd24528
|
@ -565,7 +565,6 @@ t_sqlparse_event_client_disconnected_normal(_Config) ->
|
||||||
emqx_rule_registry:remove_rule(TopicRule).
|
emqx_rule_registry:remove_rule(TopicRule).
|
||||||
|
|
||||||
t_sqlparse_event_client_disconnected_kicked(_Config) ->
|
t_sqlparse_event_client_disconnected_kicked(_Config) ->
|
||||||
process_flag(trap_exit, true),
|
|
||||||
ok = emqx_rule_engine:load_providers(),
|
ok = emqx_rule_engine:load_providers(),
|
||||||
Sql = "select * "
|
Sql = "select * "
|
||||||
"from \"$events/client_disconnected\" ",
|
"from \"$events/client_disconnected\" ",
|
||||||
|
@ -573,13 +572,15 @@ t_sqlparse_event_client_disconnected_kicked(_Config) ->
|
||||||
|
|
||||||
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
|
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
|
||||||
|
|
||||||
{ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
{ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(ClientRecvRepub),
|
||||||
{ok, _, _} = emqtt:subscribe(Client, RepubT, 0),
|
{ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0),
|
||||||
ct:sleep(200),
|
ct:sleep(200),
|
||||||
|
|
||||||
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
||||||
{ok, _} = emqtt:connect(Client1),
|
{ok, _} = emqtt:connect(Client1),
|
||||||
emqx_cm:kick_session(<<"emqx">>),
|
emqx_cm:kick_session(<<"emqx">>),
|
||||||
|
unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}}
|
||||||
|
|
||||||
receive {publish, #{topic := T, payload := Payload}} ->
|
receive {publish, #{topic := T, payload := Payload}} ->
|
||||||
?assertEqual(RepubT, T),
|
?assertEqual(RepubT, T),
|
||||||
|
@ -587,12 +588,10 @@ t_sqlparse_event_client_disconnected_kicked(_Config) ->
|
||||||
after 1000 ->
|
after 1000 ->
|
||||||
ct:fail(wait_for_repub_disconnected_kicked)
|
ct:fail(wait_for_repub_disconnected_kicked)
|
||||||
end,
|
end,
|
||||||
emqtt:stop(Client),
|
emqtt:stop(ClientRecvRepub),
|
||||||
|
emqx_rule_registry:remove_rule(TopicRule).
|
||||||
process_flag(trap_exit, false).
|
|
||||||
|
|
||||||
t_sqlparse_event_client_disconnected_discarded(_Config) ->
|
t_sqlparse_event_client_disconnected_discarded(_Config) ->
|
||||||
process_flag(trap_exit, true),
|
|
||||||
ok = emqx_rule_engine:load_providers(),
|
ok = emqx_rule_engine:load_providers(),
|
||||||
Sql = "select * "
|
Sql = "select * "
|
||||||
"from \"$events/client_disconnected\" ",
|
"from \"$events/client_disconnected\" ",
|
||||||
|
@ -600,13 +599,14 @@ t_sqlparse_event_client_disconnected_discarded(_Config) ->
|
||||||
|
|
||||||
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
|
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
|
||||||
|
|
||||||
{ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
{ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(ClientRecvRepub),
|
||||||
{ok, _, _} = emqtt:subscribe(Client, RepubT, 0),
|
{ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0),
|
||||||
ct:sleep(200),
|
ct:sleep(200),
|
||||||
|
|
||||||
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
||||||
{ok, _} = emqtt:connect(Client1),
|
{ok, _} = emqtt:connect(Client1),
|
||||||
|
unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}}
|
||||||
|
|
||||||
{ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, true}]),
|
{ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, true}]),
|
||||||
{ok, _} = emqtt:connect(Client2),
|
{ok, _} = emqtt:connect(Client2),
|
||||||
|
@ -617,14 +617,11 @@ t_sqlparse_event_client_disconnected_discarded(_Config) ->
|
||||||
after 1000 ->
|
after 1000 ->
|
||||||
ct:fail(wait_for_repub_disconnected_discarded)
|
ct:fail(wait_for_repub_disconnected_discarded)
|
||||||
end,
|
end,
|
||||||
emqtt:stop(Client),
|
|
||||||
emqtt:stop(Client2),
|
|
||||||
|
|
||||||
emqx_rule_registry:remove_rule(TopicRule),
|
emqtt:stop(ClientRecvRepub), emqtt:stop(Client2),
|
||||||
process_flag(trap_exit, false).
|
emqx_rule_registry:remove_rule(TopicRule).
|
||||||
|
|
||||||
t_sqlparse_event_client_disconnected_takeovered(_Config) ->
|
t_sqlparse_event_client_disconnected_takeovered(_Config) ->
|
||||||
process_flag(trap_exit, true),
|
|
||||||
ok = emqx_rule_engine:load_providers(),
|
ok = emqx_rule_engine:load_providers(),
|
||||||
Sql = "select * "
|
Sql = "select * "
|
||||||
"from \"$events/client_disconnected\" ",
|
"from \"$events/client_disconnected\" ",
|
||||||
|
@ -632,13 +629,14 @@ t_sqlparse_event_client_disconnected_takeovered(_Config) ->
|
||||||
|
|
||||||
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
|
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
|
||||||
|
|
||||||
{ok, ClientRecv} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
{ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
||||||
{ok, _} = emqtt:connect(ClientRecv),
|
{ok, _} = emqtt:connect(ClientRecvRepub),
|
||||||
{ok, _, _} = emqtt:subscribe(ClientRecv, RepubT, 0),
|
{ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0),
|
||||||
ct:sleep(200),
|
ct:sleep(200),
|
||||||
|
|
||||||
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
||||||
{ok, _} = emqtt:connect(Client1),
|
{ok, _} = emqtt:connect(Client1),
|
||||||
|
unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}}
|
||||||
|
|
||||||
{ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false}]),
|
{ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false}]),
|
||||||
{ok, _} = emqtt:connect(Client2),
|
{ok, _} = emqtt:connect(Client2),
|
||||||
|
@ -650,10 +648,8 @@ t_sqlparse_event_client_disconnected_takeovered(_Config) ->
|
||||||
ct:fail(wait_for_repub_disconnected_discarded)
|
ct:fail(wait_for_repub_disconnected_discarded)
|
||||||
end,
|
end,
|
||||||
|
|
||||||
emqtt:stop(ClientRecv), emqtt:stop(Client2),
|
emqtt:stop(ClientRecvRepub), emqtt:stop(Client2),
|
||||||
|
emqx_rule_registry:remove_rule(TopicRule).
|
||||||
emqx_rule_registry:remove_rule(TopicRule),
|
|
||||||
process_flag(trap_exit, false).
|
|
||||||
|
|
||||||
%% FROM $events/session_subscribed
|
%% FROM $events/session_subscribed
|
||||||
t_sqlparse_event_session_subscribed(_Config) ->
|
t_sqlparse_event_session_subscribed(_Config) ->
|
||||||
|
|
Loading…
Reference in New Issue