diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index b0ca00a0e..a84ead1c2 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -118,7 +118,8 @@ groups() -> t_event_client_disconnected_normal, t_event_client_disconnected_kicked, t_event_client_disconnected_discarded, - t_event_client_disconnected_takenover + t_event_client_disconnected_takenover, + t_event_client_disconnected_takenover_2 ]}, {telemetry, [], [ t_get_basic_usage_info_0, @@ -983,6 +984,66 @@ t_event_client_disconnected_takenover(_Config) -> delete_rule(TopicRule). +t_event_client_disconnected_takenover_2(_Config) -> + SQL = + "select * " + "from \"$events/client_disconnected\" ", + RepubT = <<"repub/to/disconnected/takenover">>, + + {ok, TopicRule} = emqx_rule_engine:create_rule( + #{ + sql => SQL, + id => ?TMP_RULEID, + actions => [republish_action(RepubT, <<>>)] + } + ), + + {ok, ClientRecv} = emqtt:start_link([ + {clientid, <<"get_repub_client">>}, {username, <<"emqx0">>} + ]), + {ok, _} = emqtt:connect(ClientRecv), + {ok, _, _} = emqtt:subscribe(ClientRecv, RepubT, 0), + ct:sleep(200), + + {ok, Client1} = emqtt:start_link([ + {clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false} + ]), + {ok, _} = emqtt:connect(Client1), + ok = emqtt:disconnect(Client1), + + %% receive the normal disconnected event + receive + {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(RepubT, T), + ?assertMatch( + #{<<"reason">> := <<"normal">>}, emqx_utils_json:decode(Payload, [return_maps]) + ) + after 1000 -> + ct:fail(wait_for_repub_disconnected_discarded) + end, + + {ok, Client2} = emqtt:start_link([ + {clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false} + ]), + {ok, _} = emqtt:connect(Client2), + + %% should not receive the takenoverdisconnected event + receive + {publish, #{topic := T1, payload := Payload1}} -> + ?assertEqual(RepubT, T1), + ?assertMatch( + #{<<"reason">> := <<"takenover">>}, emqx_utils_json:decode(Payload1, [return_maps]) + ), + ct:fail(wait_for_repub_disconnected_discarded) + after 1000 -> + ok + end, + + emqtt:stop(ClientRecv), + emqtt:stop(Client2), + + delete_rule(TopicRule). + client_connack_failed() -> {ok, Client} = emqtt:start_link( [