diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index ecf0042ca..c2387fe99 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.28"}, + {vsn, "0.1.29"}, {registered, [emqx_bridge_sup]}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 789cf509a..88cf236f4 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -1058,10 +1058,10 @@ maybe_unwrap({error, not_implemented}) -> maybe_unwrap(RpcMulticallResult) -> emqx_rpc:unwrap_erpc(RpcMulticallResult). -supported_versions(start_bridge_to_node) -> [2, 3, 4]; -supported_versions(start_bridges_to_all_nodes) -> [2, 3, 4]; -supported_versions(get_metrics_from_all_nodes) -> [4]; -supported_versions(_Call) -> [1, 2, 3, 4]. +supported_versions(start_bridge_to_node) -> [2, 3, 4, 5]; +supported_versions(start_bridges_to_all_nodes) -> [2, 3, 4, 5]; +supported_versions(get_metrics_from_all_nodes) -> [4, 5]; +supported_versions(_Call) -> [1, 2, 3, 4, 5]. redact(Term) -> emqx_utils:redact(Term). diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index f2459799b..43609dc27 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -245,7 +245,15 @@ install_bridge_v2( ConnectorId = emqx_connector_resource:resource_id( ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName ), - emqx_resource_manager:add_channel(ConnectorId, BridgeV2Id, Config), + ConfigWithTypeAndName = Config#{ + bridge_type => bin(BridgeV2Type), + bridge_name => bin(BridgeName) + }, + emqx_resource_manager:add_channel( + ConnectorId, + BridgeV2Id, + ConfigWithTypeAndName + ), ok. uninstall_bridge_v2( @@ -323,7 +331,15 @@ start_helper(BridgeV2Type, BridgeName, #{connector := ConnectorName} = Config) - ConnectorId = emqx_connector_resource:resource_id( ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName ), - emqx_resource_manager:add_channel(ConnectorId, BridgeV2Id, Config). + ConfigWithTypeAndName = Config#{ + bridge_type => bin(BridgeV2Type), + bridge_name => bin(BridgeName) + }, + emqx_resource_manager:add_channel( + ConnectorId, + BridgeV2Id, + ConfigWithTypeAndName + ). reset_metrics(Type, Name) -> reset_metrics_helper(Type, Name, lookup_raw_conf(Type, Name)). @@ -389,8 +405,14 @@ create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) -> fun(ConnectorId) -> {_, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId), ChannelTestId = id(BridgeType, BridgeName, ConnectorName), - BridgeV2Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2RawConf), - case emqx_resource_manager:add_channel(ConnectorId, ChannelTestId, BridgeV2Conf) of + Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2RawConf), + ConfWithTypeAndName = Conf#{ + bridge_type => bin(BridgeType), + bridge_name => bin(BridgeName) + }, + case + emqx_resource_manager:add_channel(ConnectorId, ChannelTestId, ConfWithTypeAndName) + of {error, Reason} -> {error, Reason}; ok -> diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index f08d3a829..1b19782a5 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -153,7 +153,9 @@ create_producers_for_bridge_v2( end, ok = check_topic_status(Hosts, KafkaConfig, KafkaTopic), ok = check_if_healthy_leaders(ClientId, KafkaTopic), - WolffProducerConfig = producers_config(BridgeType, BridgeName, ClientId, KafkaConfig, IsDryRun, BridgeV2Id), + WolffProducerConfig = producers_config( + BridgeType, BridgeName, ClientId, KafkaConfig, IsDryRun, BridgeV2Id + ), case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of {ok, Producers} -> ok = emqx_resource:allocate_resource(InstId, {?kafka_producers, BridgeV2Id}, Producers), diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index a2e9b45db..30e8f90a1 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -196,11 +196,11 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) -> true -> kafka_hosts_string_ssl(); false -> kafka_hosts_string() end, - SASLHostsString = - case UseSSL of - true -> kafka_hosts_string_ssl_sasl(); - false -> kafka_hosts_string_sasl() - end, + % SASLHostsString = + % case UseSSL of + % true -> kafka_hosts_string_ssl_sasl(); + % false -> kafka_hosts_string_sasl() + % end, BinifyMap = fun(Map) -> maps:from_list([ {erlang:iolist_to_binary(K), erlang:iolist_to_binary(V)} @@ -263,7 +263,8 @@ kafka_bridge_rest_api_all_auth_methods(UseSSL) -> pre_create_atoms() -> [ 'kafka_producer__probe_', - probedryrun + probedryrun, + kafka__probe_ ]. kafka_bridge_rest_api_helper(Config) -> @@ -342,7 +343,7 @@ kafka_bridge_rest_api_helper(Config) -> {ok, 204, _} = http_post(BridgesProbeParts, CreateBody), AtomsAfter = erlang:system_info(atom_count), ?assertEqual(AtomsBefore, AtomsAfter), - {ok, 204, X} = http_post(BridgesProbeParts, CreateBody), + {ok, 204, _X} = http_post(BridgesProbeParts, CreateBody), %% Create a rule that uses the bridge {ok, 201, Rule} = http_post( ["rules"], @@ -871,7 +872,7 @@ t_wrong_headers_from_message(Config) -> timestamp => Time2 }, ?assertError( - {badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"foo">>, <<"bar">>}]}}}}, + {badmatch, {error, {unrecoverable_error, {bad_kafka_header, #{<<"foo">> := <<"bar">>}}}}}, send(Config, ResourceId, Msg2, State, BridgeV2Id) ), Time3 = erlang:unique_integer(), @@ -882,23 +883,9 @@ t_wrong_headers_from_message(Config) -> timestamp => Time3 }, ?assertError( - {badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"key">>, <<"foo">>}]}}}}, + {badmatch, {error, {unrecoverable_error, {bad_kafka_header, #{<<"key">> := <<"foo">>}}}}}, send(Config, ResourceId, Msg3, State, BridgeV2Id) ), - Time4 = erlang:unique_integer(), - Payload4 = <<"[{\"key\":\"foo\", \"value\":\"bar\"}]">>, - Msg4 = #{ - clientid => integer_to_binary(Time4), - payload => Payload4, - timestamp => Time4 - }, - ?assertError( - {badmatch, - {error, - {unrecoverable_error, - {bad_kafka_header, [{<<"key">>, <<"foo">>}, {<<"value">>, <<"bar">>}]}}}}, - send(Config, ResourceId, Msg4, State, BridgeV2Id) - ), %% TODO: refactor those into init/end per testcase ok = ?PRODUCER:on_stop(ResourceId, State), ?assertEqual([], supervisor:which_children(wolff_client_sup)), @@ -933,7 +920,10 @@ do_send(Ref, Config, ResourceId, Msg, State, BridgeV2Id) when is_list(Config) -> F(ok); on_query_async -> {ok, _} = ?PRODUCER:on_query_async(ResourceId, {BridgeV2Id, Msg}, {F, []}, State), - ok + ok; + undefined -> + ok = ?PRODUCER:on_query(ResourceId, {BridgeV2Id, Msg}, State), + F(ok) end. publish_with_config_template_parameters(CtConfig, ConfigTemplateParameters) -> diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index a3ca61d82..7ecabb0ff 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_connector, [ {description, "EMQX Data Integration Connectors"}, - {vsn, "0.1.32"}, + {vsn, "0.1.33"}, {registered, []}, {mod, {emqx_connector_app, []}}, {applications, [