test(kafka): simplify consumer testsuite matrix setup

This commit is contained in:
Andrew Mayorov 2023-11-07 23:18:38 +07:00
parent 18cd98def6
commit aa458b65d6
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 49 additions and 104 deletions

View File

@ -30,29 +30,41 @@ all() ->
].
groups() ->
AllTCs = emqx_common_test_helpers:all(?MODULE),
SASLAuths = [
sasl_auth_plain,
sasl_auth_scram256,
sasl_auth_scram512,
sasl_auth_kerberos
SASLGroups = [
{sasl_auth_plain, testcases(sasl)},
{sasl_auth_scram256, testcases(sasl)},
{sasl_auth_scram512, testcases(sasl)},
{sasl_auth_kerberos, testcases(sasl_auth_kerberos)}
],
SASLAuthGroups = [{group, Type} || Type <- SASLAuths],
OnlyOnceTCs = only_once_tests(),
MatrixTCs = AllTCs -- OnlyOnceTCs,
SASLTests = [{Group, MatrixTCs} || Group <- SASLAuths],
SASLAuthGroups = [{group, Group} || {Group, _} <- SASLGroups],
[
{plain, MatrixTCs ++ OnlyOnceTCs},
{ssl, MatrixTCs},
{plain, testcases(plain)},
{ssl, testcases(common)},
{sasl_plain, SASLAuthGroups},
{sasl_ssl, SASLAuthGroups}
] ++ SASLTests.
| SASLGroups
].
sasl_only_tests() ->
[t_failed_creation_then_fixed].
%% tests that do not need to be run on all groups
only_once_tests() ->
testcases(all) ->
emqx_common_test_helpers:all(?MODULE);
testcases(plain) ->
%% NOTE: relevant only for a subset of SASL testcases
Exclude = [t_failed_creation_then_fixed],
testcases(all) -- Exclude;
testcases(common) ->
testcases(plain) -- testcases(once);
testcases(sasl) ->
testcases(all) -- testcases(once);
testcases(sasl_auth_kerberos) ->
%% NOTE: need a proxy to run these tests
Exclude = [
t_failed_creation_then_fixed,
t_on_get_status,
t_receive_after_recovery
],
testcases(sasl) -- Exclude;
testcases(once) ->
%% tests that do not need to be run on all groups
[
t_begin_offset_earliest,
t_bridge_rule_action_source,
@ -220,7 +232,7 @@ init_per_group(sasl_auth_kerberos, Config0) ->
(KV) ->
KV
end,
[{has_proxy, false}, {sasl_auth_mechanism, kerberos} | Config0]
[{sasl_auth_mechanism, kerberos} | Config0]
),
Config;
init_per_group(_Group, Config) ->
@ -264,43 +276,6 @@ end_per_group(Group, Config) when
end_per_group(_Group, _Config) ->
ok.
init_per_testcase(TestCase, Config) when
TestCase =:= t_failed_creation_then_fixed
->
KafkaType = ?config(kafka_type, Config),
AuthMechanism = ?config(sasl_auth_mechanism, Config),
IsSASL = lists:member(KafkaType, [sasl_plain, sasl_ssl]),
case {IsSASL, AuthMechanism} of
{true, kerberos} ->
[{skip_does_not_apply, true}];
{true, _} ->
common_init_per_testcase(TestCase, Config);
{false, _} ->
[{skip_does_not_apply, true}]
end;
init_per_testcase(TestCase, Config) when
TestCase =:= t_failed_creation_then_fixed
->
%% test with one partiton only for this case because
%% the wait probe may not be always sent to the same partition
HasProxy = proplists:get_value(has_proxy, Config, true),
case HasProxy of
false ->
[{skip_does_not_apply, true}];
true ->
common_init_per_testcase(TestCase, [{num_partitions, 1} | Config])
end;
init_per_testcase(TestCase, Config) when
TestCase =:= t_on_get_status;
TestCase =:= t_receive_after_recovery
->
HasProxy = proplists:get_value(has_proxy, Config, true),
case HasProxy of
false ->
[{skip_does_not_apply, true}];
true ->
common_init_per_testcase(TestCase, Config)
end;
init_per_testcase(t_cluster_group = TestCase, Config0) ->
Config = emqx_utils:merge_opts(Config0, [{num_partitions, 6}]),
common_init_per_testcase(TestCase, Config);
@ -393,30 +368,24 @@ common_init_per_testcase(TestCase, Config0) ->
].
end_per_testcase(_Testcase, Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of
true ->
ok;
false ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
ProducersConfigs = ?config(kafka_producers, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
delete_all_bridges(),
#{clientid := KafkaProducerClientId, producers := ProducersMapping} =
ProducersConfigs,
lists:foreach(
fun(Producers) ->
ok = wolff:stop_and_delete_supervised_producers(Producers)
end,
maps:values(ProducersMapping)
),
ok = wolff:stop_and_delete_supervised_client(KafkaProducerClientId),
%% in CI, apparently this needs more time since the
%% machines struggle with all the containers running...
emqx_common_test_helpers:call_janitor(60_000),
ok = snabbkaffe:stop(),
ok
end.
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
ProducersConfigs = ?config(kafka_producers, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
delete_all_bridges(),
#{clientid := KafkaProducerClientId, producers := ProducersMapping} =
ProducersConfigs,
lists:foreach(
fun(Producers) ->
ok = wolff:stop_and_delete_supervised_producers(Producers)
end,
maps:values(ProducersMapping)
),
ok = wolff:stop_and_delete_supervised_client(KafkaProducerClientId),
%% in CI, apparently this needs more time since the
%% machines struggle with all the containers running...
emqx_common_test_helpers:call_janitor(60_000),
ok = snabbkaffe:stop().
%%------------------------------------------------------------------------------
%% Helper fns
@ -1391,14 +1360,6 @@ t_multiple_topic_mappings(Config) ->
ok.
t_on_get_status(Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of
true ->
ok;
false ->
do_t_on_get_status(Config)
end.
do_t_on_get_status(Config) ->
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
@ -1421,14 +1382,6 @@ do_t_on_get_status(Config) ->
%% ensure that we can create and use the bridge successfully after
%% creating it with bad config.
t_failed_creation_then_fixed(Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of
true ->
ok;
false ->
?check_trace(do_t_failed_creation_then_fixed(Config), [])
end.
do_t_failed_creation_then_fixed(Config) ->
ct:timetrap({seconds, 180}),
MQTTTopic = ?config(mqtt_topic, Config),
MQTTQoS = ?config(mqtt_qos, Config),
@ -1516,14 +1469,6 @@ do_t_failed_creation_then_fixed(Config) ->
%% recovering from a network partition will make the subscribers
%% consume the messages produced during the down time.
t_receive_after_recovery(Config) ->
case proplists:get_bool(skip_does_not_apply, Config) of
true ->
ok;
false ->
do_t_receive_after_recovery(Config)
end.
do_t_receive_after_recovery(Config) ->
ct:timetrap(120_000),
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),