diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 943f30629..4fd08c154 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -30,29 +30,41 @@ all() -> ]. groups() -> - AllTCs = emqx_common_test_helpers:all(?MODULE), - SASLAuths = [ - sasl_auth_plain, - sasl_auth_scram256, - sasl_auth_scram512, - sasl_auth_kerberos + SASLGroups = [ + {sasl_auth_plain, testcases(sasl)}, + {sasl_auth_scram256, testcases(sasl)}, + {sasl_auth_scram512, testcases(sasl)}, + {sasl_auth_kerberos, testcases(sasl_auth_kerberos)} ], - SASLAuthGroups = [{group, Type} || Type <- SASLAuths], - OnlyOnceTCs = only_once_tests(), - MatrixTCs = AllTCs -- OnlyOnceTCs, - SASLTests = [{Group, MatrixTCs} || Group <- SASLAuths], + SASLAuthGroups = [{group, Group} || {Group, _} <- SASLGroups], [ - {plain, MatrixTCs ++ OnlyOnceTCs}, - {ssl, MatrixTCs}, + {plain, testcases(plain)}, + {ssl, testcases(common)}, {sasl_plain, SASLAuthGroups}, {sasl_ssl, SASLAuthGroups} - ] ++ SASLTests. + | SASLGroups + ]. -sasl_only_tests() -> - [t_failed_creation_then_fixed]. - -%% tests that do not need to be run on all groups -only_once_tests() -> +testcases(all) -> + emqx_common_test_helpers:all(?MODULE); +testcases(plain) -> + %% NOTE: relevant only for a subset of SASL testcases + Exclude = [t_failed_creation_then_fixed], + testcases(all) -- Exclude; +testcases(common) -> + testcases(plain) -- testcases(once); +testcases(sasl) -> + testcases(all) -- testcases(once); +testcases(sasl_auth_kerberos) -> + %% NOTE: need a proxy to run these tests + Exclude = [ + t_failed_creation_then_fixed, + t_on_get_status, + t_receive_after_recovery + ], + testcases(sasl) -- Exclude; +testcases(once) -> + %% tests that do not need to be run on all groups [ t_begin_offset_earliest, t_bridge_rule_action_source, @@ -220,7 +232,7 @@ init_per_group(sasl_auth_kerberos, Config0) -> (KV) -> KV end, - [{has_proxy, false}, {sasl_auth_mechanism, kerberos} | Config0] + [{sasl_auth_mechanism, kerberos} | Config0] ), Config; init_per_group(_Group, Config) -> @@ -264,43 +276,6 @@ end_per_group(Group, Config) when end_per_group(_Group, _Config) -> ok. -init_per_testcase(TestCase, Config) when - TestCase =:= t_failed_creation_then_fixed --> - KafkaType = ?config(kafka_type, Config), - AuthMechanism = ?config(sasl_auth_mechanism, Config), - IsSASL = lists:member(KafkaType, [sasl_plain, sasl_ssl]), - case {IsSASL, AuthMechanism} of - {true, kerberos} -> - [{skip_does_not_apply, true}]; - {true, _} -> - common_init_per_testcase(TestCase, Config); - {false, _} -> - [{skip_does_not_apply, true}] - end; -init_per_testcase(TestCase, Config) when - TestCase =:= t_failed_creation_then_fixed --> - %% test with one partiton only for this case because - %% the wait probe may not be always sent to the same partition - HasProxy = proplists:get_value(has_proxy, Config, true), - case HasProxy of - false -> - [{skip_does_not_apply, true}]; - true -> - common_init_per_testcase(TestCase, [{num_partitions, 1} | Config]) - end; -init_per_testcase(TestCase, Config) when - TestCase =:= t_on_get_status; - TestCase =:= t_receive_after_recovery --> - HasProxy = proplists:get_value(has_proxy, Config, true), - case HasProxy of - false -> - [{skip_does_not_apply, true}]; - true -> - common_init_per_testcase(TestCase, Config) - end; init_per_testcase(t_cluster_group = TestCase, Config0) -> Config = emqx_utils:merge_opts(Config0, [{num_partitions, 6}]), common_init_per_testcase(TestCase, Config); @@ -393,30 +368,24 @@ common_init_per_testcase(TestCase, Config0) -> ]. end_per_testcase(_Testcase, Config) -> - case proplists:get_bool(skip_does_not_apply, Config) of - true -> - ok; - false -> - ProxyHost = ?config(proxy_host, Config), - ProxyPort = ?config(proxy_port, Config), - ProducersConfigs = ?config(kafka_producers, Config), - emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - delete_all_bridges(), - #{clientid := KafkaProducerClientId, producers := ProducersMapping} = - ProducersConfigs, - lists:foreach( - fun(Producers) -> - ok = wolff:stop_and_delete_supervised_producers(Producers) - end, - maps:values(ProducersMapping) - ), - ok = wolff:stop_and_delete_supervised_client(KafkaProducerClientId), - %% in CI, apparently this needs more time since the - %% machines struggle with all the containers running... - emqx_common_test_helpers:call_janitor(60_000), - ok = snabbkaffe:stop(), - ok - end. + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + ProducersConfigs = ?config(kafka_producers, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + delete_all_bridges(), + #{clientid := KafkaProducerClientId, producers := ProducersMapping} = + ProducersConfigs, + lists:foreach( + fun(Producers) -> + ok = wolff:stop_and_delete_supervised_producers(Producers) + end, + maps:values(ProducersMapping) + ), + ok = wolff:stop_and_delete_supervised_client(KafkaProducerClientId), + %% in CI, apparently this needs more time since the + %% machines struggle with all the containers running... + emqx_common_test_helpers:call_janitor(60_000), + ok = snabbkaffe:stop(). %%------------------------------------------------------------------------------ %% Helper fns @@ -1391,14 +1360,6 @@ t_multiple_topic_mappings(Config) -> ok. t_on_get_status(Config) -> - case proplists:get_bool(skip_does_not_apply, Config) of - true -> - ok; - false -> - do_t_on_get_status(Config) - end. - -do_t_on_get_status(Config) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), ProxyName = ?config(proxy_name, Config), @@ -1421,14 +1382,6 @@ do_t_on_get_status(Config) -> %% ensure that we can create and use the bridge successfully after %% creating it with bad config. t_failed_creation_then_fixed(Config) -> - case proplists:get_bool(skip_does_not_apply, Config) of - true -> - ok; - false -> - ?check_trace(do_t_failed_creation_then_fixed(Config), []) - end. - -do_t_failed_creation_then_fixed(Config) -> ct:timetrap({seconds, 180}), MQTTTopic = ?config(mqtt_topic, Config), MQTTQoS = ?config(mqtt_qos, Config), @@ -1516,14 +1469,6 @@ do_t_failed_creation_then_fixed(Config) -> %% recovering from a network partition will make the subscribers %% consume the messages produced during the down time. t_receive_after_recovery(Config) -> - case proplists:get_bool(skip_does_not_apply, Config) of - true -> - ok; - false -> - do_t_receive_after_recovery(Config) - end. - -do_t_receive_after_recovery(Config) -> ct:timetrap(120_000), ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config),