Merge pull request #11843 from thalesmg/test-kafka-producer-nits-r53-20231030

test(kafka_producer): minor adjustments to test suite
This commit is contained in:
Zaiming (Stone) Shi 2023-10-30 18:27:10 +01:00 committed by GitHub
commit 124d79a1ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 20 deletions

View File

@ -1388,13 +1388,13 @@ select_free_port(GenModule, Fun) when
%% groups() -> %% groups() ->
%% emqx_common_test_helpers:groups(?MODULE, [case1, case2]). %% emqx_common_test_helpers:groups(?MODULE, [case1, case2]).
%% %%
%% case1(matrxi) -> %% case1(matrix) ->
%% {g1, [[tcp, no_auth], %% {g1, [[tcp, no_auth],
%% [ssl, no_auth], %% [ssl, no_auth],
%% [ssl, basic_auth] %% [ssl, basic_auth]
%% ]}; %% ]};
%% %%
%% case2(matrxi) -> %% case2(matrix) ->
%% {g1, ...} %% {g1, ...}
%% ... %% ...
%% %%

View File

@ -43,8 +43,6 @@
-define(BRIDGE_TYPE_V2, "kafka_producer"). -define(BRIDGE_TYPE_V2, "kafka_producer").
-define(BRIDGE_TYPE_BIN, <<"kafka">>). -define(BRIDGE_TYPE_BIN, <<"kafka">>).
-define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_kafka]).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% CT boilerplate %% CT boilerplate
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -100,11 +98,17 @@ init_per_suite(Config0) ->
_ -> _ ->
Config0 Config0
end, end,
%% Ensure enterprise bridge module is loaded Apps = emqx_cth_suite:start(
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), [
_ = emqx_bridge_enterprise:module_info(), emqx,
ok = emqx_connector_test_helpers:start_apps(?APPS), emqx_conf,
{ok, _} = application:ensure_all_started(emqx_connector), emqx_connector,
emqx_bridge_kafka,
emqx_bridge,
emqx_rule_engine
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
emqx_mgmt_api_test_util:init_suite(), emqx_mgmt_api_test_util:init_suite(),
wait_until_kafka_is_up(), wait_until_kafka_is_up(),
%% Wait until bridges API is up %% Wait until bridges API is up
@ -118,13 +122,12 @@ init_per_suite(Config0) ->
WaitUntilRestApiUp() WaitUntilRestApiUp()
end end
end)(), end)(),
Config. [{apps, Apps} | Config].
end_per_suite(_Config) -> end_per_suite(Config) ->
Apps = ?config(apps, Config),
emqx_mgmt_api_test_util:end_suite(), emqx_mgmt_api_test_util:end_suite(),
ok = emqx_common_test_helpers:stop_apps([emqx_conf]), ok = emqx_cth_suite:stop(Apps),
ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?APPS)),
_ = application:stop(emqx_connector),
ok. ok.
init_per_testcase(TestCase, Config) -> init_per_testcase(TestCase, Config) ->
@ -250,7 +253,7 @@ t_rest_api(Config) ->
%% So that we can check if new atoms are created when they are not supposed to be created %% So that we can check if new atoms are created when they are not supposed to be created
pre_create_atoms() -> pre_create_atoms() ->
[ [
'kafka_producer__probe_', kafka_producer__probe_,
probedryrun, probedryrun,
kafka__probe_ kafka__probe_
]. ].
@ -608,12 +611,12 @@ t_send_message_with_headers(Config) ->
} }
} }
}, },
"query_mode" => Mode,
"ssl" => #{} "ssl" => #{}
}), }),
{ok, _} = emqx_bridge:create( {ok, _} = emqx_bridge:create(
list_to_atom(Type), list_to_atom(Name), Conf list_to_atom(Type), list_to_atom(Name), Conf
), ),
% ConfigAtom = ConfigAtom1#{bridge_name => Name},
ResourceId = emqx_bridge_resource:resource_id(bin(Type), bin(Name)), ResourceId = emqx_bridge_resource:resource_id(bin(Type), bin(Name)),
BridgeV2Id = emqx_bridge_v2:id(bin(?BRIDGE_TYPE_V2), bin(Name)), BridgeV2Id = emqx_bridge_v2:id(bin(?BRIDGE_TYPE_V2), bin(Name)),
{ok, _Group, #{state := State}} = emqx_resource:get_instance(ResourceId), {ok, _Group, #{state := State}} = emqx_resource:get_instance(ResourceId),
@ -813,13 +816,13 @@ t_wrong_headers(_Config) ->
t_wrong_headers_from_message(matrix) -> t_wrong_headers_from_message(matrix) ->
{query_mode, [[sync], [async]]}; {query_mode, [[sync], [async]]};
t_wrong_headers_from_message(Config) -> t_wrong_headers_from_message(Config) ->
[Mode] = group_path(Config),
ct:comment(Mode),
HostsString = kafka_hosts_string(), HostsString = kafka_hosts_string(),
AuthSettings = "none", AuthSettings = "none",
Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]), Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
Type = ?BRIDGE_TYPE, Type = ?BRIDGE_TYPE,
Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
% ResourceId = emqx_bridge_resource:resource_id(Type, Name),
% BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
KafkaTopic = test_topic_one_partition(), KafkaTopic = test_topic_one_partition(),
Conf = config_with_headers(#{ Conf = config_with_headers(#{
"authentication" => AuthSettings, "authentication" => AuthSettings,
@ -834,13 +837,12 @@ t_wrong_headers_from_message(Config) ->
} }
} }
}, },
"query_mode" => Mode,
"ssl" => #{} "ssl" => #{}
}), }),
{ok, _} = emqx_bridge:create( {ok, _} = emqx_bridge:create(
list_to_atom(Type), list_to_atom(Name), Conf list_to_atom(Type), list_to_atom(Name), Conf
), ),
% ConfigAtom = ConfigAtom1#{bridge_name => Name},
% {ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom),
ResourceId = emqx_bridge_resource:resource_id(bin(Type), bin(Name)), ResourceId = emqx_bridge_resource:resource_id(bin(Type), bin(Name)),
{ok, _Group, #{state := State}} = emqx_resource:get_instance(ResourceId), {ok, _Group, #{state := State}} = emqx_resource:get_instance(ResourceId),
Time1 = erlang:unique_integer(), Time1 = erlang:unique_integer(),