fix: fixup after rebasing on release-23

This commit is contained in:
Kjell Winblad 2023-10-17 18:15:27 +02:00 committed by Zaiming (Stone) Shi
parent d8a9778d7c
commit 2cd1c88f7f
6 changed files with 49 additions and 35 deletions

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge, [ {application, emqx_bridge, [
{description, "EMQX bridges"}, {description, "EMQX bridges"},
{vsn, "0.1.28"}, {vsn, "0.1.29"},
{registered, [emqx_bridge_sup]}, {registered, [emqx_bridge_sup]},
{mod, {emqx_bridge_app, []}}, {mod, {emqx_bridge_app, []}},
{applications, [ {applications, [

View File

@ -1058,10 +1058,10 @@ maybe_unwrap({error, not_implemented}) ->
maybe_unwrap(RpcMulticallResult) -> maybe_unwrap(RpcMulticallResult) ->
emqx_rpc:unwrap_erpc(RpcMulticallResult). emqx_rpc:unwrap_erpc(RpcMulticallResult).
supported_versions(start_bridge_to_node) -> [2, 3, 4]; supported_versions(start_bridge_to_node) -> [2, 3, 4, 5];
supported_versions(start_bridges_to_all_nodes) -> [2, 3, 4]; supported_versions(start_bridges_to_all_nodes) -> [2, 3, 4, 5];
supported_versions(get_metrics_from_all_nodes) -> [4]; supported_versions(get_metrics_from_all_nodes) -> [4, 5];
supported_versions(_Call) -> [1, 2, 3, 4]. supported_versions(_Call) -> [1, 2, 3, 4, 5].
redact(Term) -> redact(Term) ->
emqx_utils:redact(Term). emqx_utils:redact(Term).

View File

@ -245,7 +245,15 @@ install_bridge_v2(
ConnectorId = emqx_connector_resource:resource_id( ConnectorId = emqx_connector_resource:resource_id(
?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName
), ),
emqx_resource_manager:add_channel(ConnectorId, BridgeV2Id, Config), ConfigWithTypeAndName = Config#{
bridge_type => bin(BridgeV2Type),
bridge_name => bin(BridgeName)
},
emqx_resource_manager:add_channel(
ConnectorId,
BridgeV2Id,
ConfigWithTypeAndName
),
ok. ok.
uninstall_bridge_v2( uninstall_bridge_v2(
@ -323,7 +331,15 @@ start_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName} = Config) -
ConnectorId = emqx_connector_resource:resource_id( ConnectorId = emqx_connector_resource:resource_id(
?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName
), ),
emqx_resource_manager:add_channel(ConnectorId, BridgeV2Id, Config). ConfigWithTypeAndName = Config#{
bridge_type => bin(BridgeV2Type),
bridge_name => bin(BridgeName)
},
emqx_resource_manager:add_channel(
ConnectorId,
BridgeV2Id,
ConfigWithTypeAndName
).
reset_metrics(Type, Name) -> reset_metrics(Type, Name) ->
reset_metrics_helper(Type, Name, lookup_raw_conf(Type, Name)). reset_metrics_helper(Type, Name, lookup_raw_conf(Type, Name)).
@ -389,8 +405,14 @@ create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
fun(ConnectorId) -> fun(ConnectorId) ->
{_, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId), {_, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId),
ChannelTestId = id(BridgeType, BridgeName, ConnectorName), ChannelTestId = id(BridgeType, BridgeName, ConnectorName),
BridgeV2Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2RawConf), Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2RawConf),
case emqx_resource_manager:add_channel(ConnectorId, ChannelTestId, BridgeV2Conf) of ConfWithTypeAndName = Conf#{
bridge_type => bin(BridgeType),
bridge_name => bin(BridgeName)
},
case
emqx_resource_manager:add_channel(ConnectorId, ChannelTestId, ConfWithTypeAndName)
of
{error, Reason} -> {error, Reason} ->
{error, Reason}; {error, Reason};
ok -> ok ->

View File

@ -153,7 +153,9 @@ create_producers_for_bridge_v2(
end, end,
ok = check_topic_status(Hosts, KafkaConfig, KafkaTopic), ok = check_topic_status(Hosts, KafkaConfig, KafkaTopic),
ok = check_if_healthy_leaders(ClientId, KafkaTopic), ok = check_if_healthy_leaders(ClientId, KafkaTopic),
WolffProducerConfig = producers_config(BridgeType, BridgeName, ClientId, KafkaConfig, IsDryRun, BridgeV2Id), WolffProducerConfig = producers_config(
BridgeType, BridgeName, ClientId, KafkaConfig, IsDryRun, BridgeV2Id
),
case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of
{ok, Producers} -> {ok, Producers} ->
ok = emqx_resource:allocate_resource(InstId, {?kafka_producers, BridgeV2Id}, Producers), ok = emqx_resource:allocate_resource(InstId, {?kafka_producers, BridgeV2Id}, Producers),

View File

@ -196,11 +196,11 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
true -> kafka_hosts_string_ssl(); true -> kafka_hosts_string_ssl();
false -> kafka_hosts_string() false -> kafka_hosts_string()
end, end,
SASLHostsString = % SASLHostsString =
case UseSSL of % case UseSSL of
true -> kafka_hosts_string_ssl_sasl(); % true -> kafka_hosts_string_ssl_sasl();
false -> kafka_hosts_string_sasl() % false -> kafka_hosts_string_sasl()
end, % end,
BinifyMap = fun(Map) -> BinifyMap = fun(Map) ->
maps:from_list([ maps:from_list([
{erlang:iolist_to_binary(K), erlang:iolist_to_binary(V)} {erlang:iolist_to_binary(K), erlang:iolist_to_binary(V)}
@ -263,7 +263,8 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
pre_create_atoms() -> pre_create_atoms() ->
[ [
'kafka_producer__probe_', 'kafka_producer__probe_',
probedryrun probedryrun,
kafka__probe_
]. ].
kafka_bridge_rest_api_helper(Config) -> kafka_bridge_rest_api_helper(Config) ->
@ -342,7 +343,7 @@ kafka_bridge_rest_api_helper(Config) ->
{ok, 204, _} = http_post(BridgesProbeParts, CreateBody), {ok, 204, _} = http_post(BridgesProbeParts, CreateBody),
AtomsAfter = erlang:system_info(atom_count), AtomsAfter = erlang:system_info(atom_count),
?assertEqual(AtomsBefore, AtomsAfter), ?assertEqual(AtomsBefore, AtomsAfter),
{ok, 204, X} = http_post(BridgesProbeParts, CreateBody), {ok, 204, _X} = http_post(BridgesProbeParts, CreateBody),
%% Create a rule that uses the bridge %% Create a rule that uses the bridge
{ok, 201, Rule} = http_post( {ok, 201, Rule} = http_post(
["rules"], ["rules"],
@ -871,7 +872,7 @@ t_wrong_headers_from_message(Config) ->
timestamp => Time2 timestamp => Time2
}, },
?assertError( ?assertError(
{badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"foo">>, <<"bar">>}]}}}}, {badmatch, {error, {unrecoverable_error, {bad_kafka_header, #{<<"foo">> := <<"bar">>}}}}},
send(Config, ResourceId, Msg2, State, BridgeV2Id) send(Config, ResourceId, Msg2, State, BridgeV2Id)
), ),
Time3 = erlang:unique_integer(), Time3 = erlang:unique_integer(),
@ -882,23 +883,9 @@ t_wrong_headers_from_message(Config) ->
timestamp => Time3 timestamp => Time3
}, },
?assertError( ?assertError(
{badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"key">>, <<"foo">>}]}}}}, {badmatch, {error, {unrecoverable_error, {bad_kafka_header, #{<<"key">> := <<"foo">>}}}}},
send(Config, ResourceId, Msg3, State, BridgeV2Id) send(Config, ResourceId, Msg3, State, BridgeV2Id)
), ),
Time4 = erlang:unique_integer(),
Payload4 = <<"[{\"key\":\"foo\", \"value\":\"bar\"}]">>,
Msg4 = #{
clientid => integer_to_binary(Time4),
payload => Payload4,
timestamp => Time4
},
?assertError(
{badmatch,
{error,
{unrecoverable_error,
{bad_kafka_header, [{<<"key">>, <<"foo">>}, {<<"value">>, <<"bar">>}]}}}},
send(Config, ResourceId, Msg4, State, BridgeV2Id)
),
%% TODO: refactor those into init/end per testcase %% TODO: refactor those into init/end per testcase
ok = ?PRODUCER:on_stop(ResourceId, State), ok = ?PRODUCER:on_stop(ResourceId, State),
?assertEqual([], supervisor:which_children(wolff_client_sup)), ?assertEqual([], supervisor:which_children(wolff_client_sup)),
@ -933,7 +920,10 @@ do_send(Ref, Config, ResourceId, Msg, State, BridgeV2Id) when is_list(Config) ->
F(ok); F(ok);
on_query_async -> on_query_async ->
{ok, _} = ?PRODUCER:on_query_async(ResourceId, {BridgeV2Id, Msg}, {F, []}, State), {ok, _} = ?PRODUCER:on_query_async(ResourceId, {BridgeV2Id, Msg}, {F, []}, State),
ok ok;
undefined ->
ok = ?PRODUCER:on_query(ResourceId, {BridgeV2Id, Msg}, State),
F(ok)
end. end.
publish_with_config_template_parameters(CtConfig, ConfigTemplateParameters) -> publish_with_config_template_parameters(CtConfig, ConfigTemplateParameters) ->

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_connector, [ {application, emqx_connector, [
{description, "EMQX Data Integration Connectors"}, {description, "EMQX Data Integration Connectors"},
{vsn, "0.1.32"}, {vsn, "0.1.33"},
{registered, []}, {registered, []},
{mod, {emqx_connector_app, []}}, {mod, {emqx_connector_app, []}},
{applications, [ {applications, [