test(kafka_producer): minor adjustments to test suite
- Use `emqx_cth_suite`. - Use `query_mode` matrix value when setting up bridge in a couple test cases.
This commit is contained in:
parent
f463eff02d
commit
a60b96c5fd
|
@ -1388,13 +1388,13 @@ select_free_port(GenModule, Fun) when
|
||||||
%% groups() ->
|
%% groups() ->
|
||||||
%% emqx_common_test_helpers:groups(?MODULE, [case1, case2]).
|
%% emqx_common_test_helpers:groups(?MODULE, [case1, case2]).
|
||||||
%%
|
%%
|
||||||
%% case1(matrxi) ->
|
%% case1(matrix) ->
|
||||||
%% {g1, [[tcp, no_auth],
|
%% {g1, [[tcp, no_auth],
|
||||||
%% [ssl, no_auth],
|
%% [ssl, no_auth],
|
||||||
%% [ssl, basic_auth]
|
%% [ssl, basic_auth]
|
||||||
%% ]};
|
%% ]};
|
||||||
%%
|
%%
|
||||||
%% case2(matrxi) ->
|
%% case2(matrix) ->
|
||||||
%% {g1, ...}
|
%% {g1, ...}
|
||||||
%% ...
|
%% ...
|
||||||
%%
|
%%
|
||||||
|
|
|
@ -43,8 +43,6 @@
|
||||||
-define(BRIDGE_TYPE_V2, "kafka_producer").
|
-define(BRIDGE_TYPE_V2, "kafka_producer").
|
||||||
-define(BRIDGE_TYPE_BIN, <<"kafka">>).
|
-define(BRIDGE_TYPE_BIN, <<"kafka">>).
|
||||||
|
|
||||||
-define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_kafka]).
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% CT boilerplate
|
%% CT boilerplate
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -100,11 +98,17 @@ init_per_suite(Config0) ->
|
||||||
_ ->
|
_ ->
|
||||||
Config0
|
Config0
|
||||||
end,
|
end,
|
||||||
%% Ensure enterprise bridge module is loaded
|
Apps = emqx_cth_suite:start(
|
||||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
|
[
|
||||||
_ = emqx_bridge_enterprise:module_info(),
|
emqx,
|
||||||
ok = emqx_connector_test_helpers:start_apps(?APPS),
|
emqx_conf,
|
||||||
{ok, _} = application:ensure_all_started(emqx_connector),
|
emqx_connector,
|
||||||
|
emqx_bridge_kafka,
|
||||||
|
emqx_bridge,
|
||||||
|
emqx_rule_engine
|
||||||
|
],
|
||||||
|
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
||||||
|
),
|
||||||
emqx_mgmt_api_test_util:init_suite(),
|
emqx_mgmt_api_test_util:init_suite(),
|
||||||
wait_until_kafka_is_up(),
|
wait_until_kafka_is_up(),
|
||||||
%% Wait until bridges API is up
|
%% Wait until bridges API is up
|
||||||
|
@ -118,13 +122,12 @@ init_per_suite(Config0) ->
|
||||||
WaitUntilRestApiUp()
|
WaitUntilRestApiUp()
|
||||||
end
|
end
|
||||||
end)(),
|
end)(),
|
||||||
Config.
|
[{apps, Apps} | Config].
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(Config) ->
|
||||||
|
Apps = ?config(apps, Config),
|
||||||
emqx_mgmt_api_test_util:end_suite(),
|
emqx_mgmt_api_test_util:end_suite(),
|
||||||
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
|
ok = emqx_cth_suite:stop(Apps),
|
||||||
ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?APPS)),
|
|
||||||
_ = application:stop(emqx_connector),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
init_per_testcase(TestCase, Config) ->
|
init_per_testcase(TestCase, Config) ->
|
||||||
|
@ -250,7 +253,7 @@ t_rest_api(Config) ->
|
||||||
%% So that we can check if new atoms are created when they are not supposed to be created
|
%% So that we can check if new atoms are created when they are not supposed to be created
|
||||||
pre_create_atoms() ->
|
pre_create_atoms() ->
|
||||||
[
|
[
|
||||||
'kafka_producer__probe_',
|
kafka_producer__probe_,
|
||||||
probedryrun,
|
probedryrun,
|
||||||
kafka__probe_
|
kafka__probe_
|
||||||
].
|
].
|
||||||
|
@ -608,12 +611,12 @@ t_send_message_with_headers(Config) ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"query_mode" => Mode,
|
||||||
"ssl" => #{}
|
"ssl" => #{}
|
||||||
}),
|
}),
|
||||||
{ok, _} = emqx_bridge:create(
|
{ok, _} = emqx_bridge:create(
|
||||||
list_to_atom(Type), list_to_atom(Name), Conf
|
list_to_atom(Type), list_to_atom(Name), Conf
|
||||||
),
|
),
|
||||||
% ConfigAtom = ConfigAtom1#{bridge_name => Name},
|
|
||||||
ResourceId = emqx_bridge_resource:resource_id(bin(Type), bin(Name)),
|
ResourceId = emqx_bridge_resource:resource_id(bin(Type), bin(Name)),
|
||||||
BridgeV2Id = emqx_bridge_v2:id(bin(?BRIDGE_TYPE_V2), bin(Name)),
|
BridgeV2Id = emqx_bridge_v2:id(bin(?BRIDGE_TYPE_V2), bin(Name)),
|
||||||
{ok, _Group, #{state := State}} = emqx_resource:get_instance(ResourceId),
|
{ok, _Group, #{state := State}} = emqx_resource:get_instance(ResourceId),
|
||||||
|
@ -813,13 +816,13 @@ t_wrong_headers(_Config) ->
|
||||||
t_wrong_headers_from_message(matrix) ->
|
t_wrong_headers_from_message(matrix) ->
|
||||||
{query_mode, [[sync], [async]]};
|
{query_mode, [[sync], [async]]};
|
||||||
t_wrong_headers_from_message(Config) ->
|
t_wrong_headers_from_message(Config) ->
|
||||||
|
[Mode] = group_path(Config),
|
||||||
|
ct:comment(Mode),
|
||||||
HostsString = kafka_hosts_string(),
|
HostsString = kafka_hosts_string(),
|
||||||
AuthSettings = "none",
|
AuthSettings = "none",
|
||||||
Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
|
Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
|
||||||
Type = ?BRIDGE_TYPE,
|
Type = ?BRIDGE_TYPE,
|
||||||
Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
|
Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
|
||||||
% ResourceId = emqx_bridge_resource:resource_id(Type, Name),
|
|
||||||
% BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
|
|
||||||
KafkaTopic = test_topic_one_partition(),
|
KafkaTopic = test_topic_one_partition(),
|
||||||
Conf = config_with_headers(#{
|
Conf = config_with_headers(#{
|
||||||
"authentication" => AuthSettings,
|
"authentication" => AuthSettings,
|
||||||
|
@ -834,13 +837,12 @@ t_wrong_headers_from_message(Config) ->
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"query_mode" => Mode,
|
||||||
"ssl" => #{}
|
"ssl" => #{}
|
||||||
}),
|
}),
|
||||||
{ok, _} = emqx_bridge:create(
|
{ok, _} = emqx_bridge:create(
|
||||||
list_to_atom(Type), list_to_atom(Name), Conf
|
list_to_atom(Type), list_to_atom(Name), Conf
|
||||||
),
|
),
|
||||||
% ConfigAtom = ConfigAtom1#{bridge_name => Name},
|
|
||||||
% {ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom),
|
|
||||||
ResourceId = emqx_bridge_resource:resource_id(bin(Type), bin(Name)),
|
ResourceId = emqx_bridge_resource:resource_id(bin(Type), bin(Name)),
|
||||||
{ok, _Group, #{state := State}} = emqx_resource:get_instance(ResourceId),
|
{ok, _Group, #{state := State}} = emqx_resource:get_instance(ResourceId),
|
||||||
Time1 = erlang:unique_integer(),
|
Time1 = erlang:unique_integer(),
|
||||||
|
|
Loading…
Reference in New Issue